kafka监听问题的解决和剖析

问题如下:

  1. kafka为什么监听不到数据
  2. kafka为什么会有重复数据发送
  3. kafka数据重复如何解决
  4. 为什么kafka会出现俩个消费端都可以消费问题
  5. kafka监听配置文件

一. 解决问题一(kafka监听不到数据)

  首先kafka监听不得到数据,检查如下

  • 检查配置文件是否正确(可能会出现改了监听地址,监听Topic,监听的地址的数量问题)
  • 检查接收数据的正确性(比如原生的代码,可能是用byte序列化接收的数据,而你接收使用String。也是配置文件序列化问题,还有与发送者商量问题)
  • 检查kafka版本问题(一般的版本其实是没什么问题的,只有个别版本会出现监听不到问题)
  • 没有加
    @Component    犯了最不应该出差错的问题

  如果出现监听不到数据的问题,那么就试试更改方法一二,如果不可以在去试试方法三,之前出现这个问题也是查过 一般查到都会说  “低版本的服务器接收不到高版本的生产者发送的消息”,但是净由测试使用 用1.0.5RELEASE 和 2.6.3反复测试,并没有任何的问题。

如果按照版本一致,那么根本就不现实,因为可能不同的项目,springboot版本不一致的话,可能有的springboot版本低,那么你还得要求自己维护项目版本升级?如果出现第四种情况就无话可说了。

二. 解决问题二(kafka为什么会有重复数据发送)

  重复数据的发送问题如下

  1. 可能在发送者的那里的事务问题。mysql存储事务发生异常导致回滚操作,但是kafka消息却是已经发送到了服务器中。此事肯定会出现重复问题
  2. 生产者设置时间问题,生产发送设置的时间内,消息没完成发送,生产者以为消费者挂掉,便重新发送一个,导致重复
  3. offset问题,当项目重启,offset走到某一个位置已扔到kafka服务器中,但是项目被重启.那么offset会是在原本重启的那一个点的地方再次发送一次,这是kafka设计的问题,防止出现丢失数据问题

三. 解决问题三(kafka数据重复如何解决)

  目前我是使用的Redis进行的排重法,用的是Redis中的set,保证里面不存在重复,保证Redis里面不会存入太多的脏数据。并定期清理

  粘贴一下我的排重(Redis排重法)

//kafka prefix
  String cache = "kafka_cache";
  //kafka suffix
  Calendar c = Calendar.getInstance();
  SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  //0点,目前是为了设置为这一天的固定时间。这个完全可以去写个工具类自己弄,为了看的更清楚,麻烦了一点的写入
  SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd 00:00:00");
  String gtimeStart = sdf2.format(c.getTime());
  long time = sdf.parse(gtimeStart).getTime();

  //此位置为了设置是否是新的一天,新的一天需要设置定时时间,保证redis中不会存储太多无用数据
  Boolean flag = false;
  //数据接收
  Set<String> range = new HashSet<>();
  //判断是否存在
  if (redisTemplate.hasKey(cache + time)) {
  //存在则取出这个set
  range = redisTemplate.opsForSet().members(cache + time);
  }else {
  //不存在,则为下面过期时间的设置铺垫
  flag = true;
  }
  //判断监听到的数据是否是重复
  if (range.contains("测试需要")) {
  //重复则排出,根据逻辑自己修改
  continue;
  } else {
  //添加进去
  redisTemplate.opsForSet().add(cache + time, i+"");
  if (flag){
   //设置为24小时,保证新一天使用,之前使用的存储会消失掉
   redisTemplate.expire(cache + time,24,TimeUnit.HOURS);
   //不会在进入这个里面,如果多次的存入过期时间,那么这个key的过期时间就永远是24小时,一直就不会过期
   flag = false;
  }
  }

四. 解决问题四(为什么kafka会出现俩个消费端都可以消费问题)

  原因是因为在不同groupId之下,kafka接收到以后,会给监听他的每一个组发送一个他所收到的消息,但是两个消费端监听同一个租,那么就只有一个消费端可以消费到。

五. 粘一下我的监听配置文件

# 指定kafka 代理地址,可以多个,用逗号间隔
spring.kafka.bootstrap-servers= localhost:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id= test
# 是否自动提交
spring.kafka.consumer.enable-auto-commit= true
# 提交间隔的毫秒
spring.kafka.consumer.auto-commit-interval.ms=60000
# 最大轮询的次数
spring.kafka.consumer.max-poll-records=1
# 将偏移量重置为最新偏移量
spring.kafka.consumer.auto-offset-reset=earliest
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

总结

到此这篇关于kafka监听问题的解决和剖析的文章就介绍到这了,更多相关kafka监听问题内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Kafka 常用命令行详细介绍及整理

     Kafka 常用命令行详细介绍及整理 以下是kafka常用命令行总结: 1.查看topic的详细信息 ./kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testKJ1 2.为topic增加副本 ./kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execu

  • Java使用kafka发送和生产消息的示例

    1. maven依赖包 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> 2. 生产者代码 package com.lnho.example.kafka; import org.apache.kafka.c

  • Linux下Kafka单机安装配置方法(图文)

    介绍 Kafka是一个分布式的.可分区的.可复制的消息系统.它提供了普通消息系统的功能,但具有自己独特的设计.这个独特的设计是什么样的呢? 首先让我们看几个基本的消息系统术语: •Kafka将消息以topic为单位进行归纳. •将向Kafka topic发布消息的程序成为producers. •将预订topics并消费消息的程序成为consumer. •Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker. producers通过网络将消息发送到Kafka集群,集群

  • kafka生产者和消费者的javaAPI的示例代码

    写了个kafka的java demo 顺便记录下,仅供参考 1.创建maven项目 目录如下: 2.pom文件: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://mave

  • kafka生产实践(详解)

    1.引言 最近接触到一个APP流量分析的项目,类似于友盟.涉及到几个C端(客户端)高并发的接口,这几个接口主要用于C端数据的提交.在没有任何缓冲的情况下,一个接口涉及到5张表的提交.压测的结果很不理想,主要瓶颈就在与RDS的交互. 一台双核,16G机子,单实例,jdbc最大连接数100,吞吐量竟然只有50TPS. 能想到的改造方案就是引入一层缓冲,让C端接口不与RDS直接交互,很自然就想到了rabbitmq,但是rabbitmq对分布式的支持比较一般,我们的数据体量也比较大,所以我们借鉴了友盟,

  • Spring Boot集成Kafka的示例代码

    本文介绍了Spring Boot集成Kafka的示例代码,分享给大家,也给自己留个笔记 系统环境 使用远程服务器上搭建的kafka服务 Ubuntu 16.04 LTS kafka_2.12-0.11.0.0.tgz zookeeper-3.5.2-alpha.tar.gz 集成过程 1.创建spring boot工程,添加相关依赖: <?xml version="1.0" encoding="UTF-8"?> <project xmlns=&qu

  • kafka监听问题的解决和剖析

    问题如下: kafka为什么监听不到数据 kafka为什么会有重复数据发送 kafka数据重复如何解决 为什么kafka会出现俩个消费端都可以消费问题 kafka监听配置文件 一. 解决问题一(kafka监听不到数据) 首先kafka监听不得到数据,检查如下 检查配置文件是否正确(可能会出现改了监听地址,监听Topic,监听的地址的数量问题) 检查接收数据的正确性(比如原生的代码,可能是用byte序列化接收的数据,而你接收使用String.也是配置文件序列化问题,还有与发送者商量问题) 检查ka

  • Oracle 12.2监听无法启动解决方法

    在自己的虚拟机的做实验,突然发现使用PL/SQL Developer无法连接到数据库,报错ORA-12514,说是监听没有启动. 先介绍虚拟机一下环境:redhat7.2+GI 12.2.0.1+Oracle 12.2.0.1,为了测试12.2的ASM特性安装了GI.平时监听程序默认是开启启动的.但是今天不知道为什么没有启动.使用crsctl查看资源状态:发现监听的状态确实是OFFLINE状态 [root@rhel7 .oracle]# crsstat ---------------------

  • spring 整合kafka监听消费的配置过程

    前言 最近项目里有个需求,要消费kafka里的数据.之前也手动写过代码去消费kafka数据.但是转念一想.既然spring提供了消费kafka的方法.就没必要再去重复造轮子.于是尝试使用spring的API. 项目技术背景,使用springMVC,XML配置和注解相互使用.kafka的配置都是使用XML方式. 整合过程 1. 引入spring-kafka的依赖包 <dependency> <groupId>org.springframework.kafka</groupId&

  • MySQL 启动成功但未监听端口的解决方法

    问题描述 MySQL 启动成功,使用 ps -ef |grep mysql 可以看到进程,如下图: 也可以在服务器登陆,如下图: 但是使用 netstat -antp| grep 3306 可以看到没有监听端口. 查看 MySQL 配置文件,端口也没有更改. 解决办法 检查发现是配置文件中使用了 skip-networking,可以看到这个选项的的作用是不监听端口,同主机的用户通过 sockets 进行链接.外部主机由于没有监听端口,将无法连接. 将 skip-networking 注释掉之后,

  • Kafka的监听地址配置实例详解

    有时我们会碰到网络是通畅的,但却连不上Kafka,特别是在多网卡环境或者云环境上很容易出现,这个其实和Kafka的监听配置有关系.本文介绍监听相关的配置,目前监听相关的参数主要有下面几个: listeners advertised.listeners listener.security.protocol.map inter.broker.listener.name security.inter.broker.protocol advertised.host.name(历史遗留,已废弃,勿使用)

  • Oracle监听日志定期清理

    环境: Oracle 11.2.0 Win Server 2008 R2 Enterprise 原因:Oracle监听日志文件大小超过4G,oracle监听连接时断时续 解决办法:重新建立新的日志文件,通过计划任务定期执行,为方便,我这里每天执行一次. 批处理文件内容如下: @echo off rem 因数据库监听日志过大,影响oracle使用,需定期清理 rem 停止监听写日志 lsnrctl set log_status off rem 修改监听日志文件名称,每天执行一次 ren E:\ap

  • 使用watch监听路由变化和watch监听对象的实例

    一.watch监听路由变化 解决办法: export default{ data(){ return{} }, watch:{ "$route":"getPath" // 监听事件 }, methods:{ getPath(){ let path = this.$roune.path; //或得当前路径 进行逻辑判断 } } } 二.watch监听对象 例子: <el-select v-model="form.region" placeho

  • 详解SpringCloud eureka服务状态监听

    一.前言 近期由于公司不同平台项目之间的业务整合,需要做到相互访问! 每个平台均有自己的注册中心和服务,且注册中心相互之间并没有相互注册! 借助spring的事件监听,在eureka-server端监听服务注册,将所有服务的ip和port存放至redis库,然后让其他平台服务通过redis库获取ip和端口号,进而进行http调用.结构图如下: 二.事件解析 事件列表 在org.springframework.cloud.netflix.eureka.server.event包下会发现如下类: E

  • vue项目中销毁window.addEventListener事件监听解析

    目录 销毁window.addEventListener事件监听 window.addEventListener监听scroll事件 解决办法 使用throttle出现的新问题 最后代码 销毁window.addEventListener事件监听 今天在做项目的过程中,组件中调用iframe时,由于在组件的created方法中写了监听,用于接收iframe发来的信息,但是在组件销毁的时候并没有去掉监听,导致组件创建几次,监听方法就会执行几次,特此记录 created() {     window

  • 解析SQL Server CDC配合Kafka Connect监听数据变化的问题

    写在前面 好久没更新Blog了,从CRUD Boy转型大数据开发,拉宽了不少的知识面,从今年年初开始筹备.组建.招兵买马,到现在稳定开搞中,期间踏过无数的火坑,也许除了这篇还很写上三四篇. 进入主题,通常企业为了实现数据统计.数据分析.数据挖掘.解决信息孤岛等全局数据的系统化运作管理 ,为BI.经营分析.决策支持系统等深度开发应用奠定基础,挖掘数据价值 ,企业会开始着手建立数据仓库,数据中台.而这些数据来源则来自于企业的各个业务系统的数据或爬取外部的数据,从业务系统数据到数据仓库的过程就是一个E

随机推荐