被kafka-client和springkafka版本坑到自闭及解决

目录
  • 被kafka-client和springkafka版本坑
  • springboot、spring-kafka、kafka-client三者兼容性关系

被kafka-client和springkafka版本坑

上周刚刚欢天喜地的在linux上部了kafka,这周打算用spring-boot框架写个简单demo跑一下,结果悲剧就此展开。

首先建立maven工程:pom中添加spring boot kafka依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-parent</artifactId>
      <version>2.1.5.RELEASE</version>
      <relativePath/> <!-- lookup parent from repository -->
   </parent>
   <groupId>com.example</groupId>
   <artifactId>kafkaproducer</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>kafkaproducer</name>
   <description>Demo project for Spring Boot</description>
 
   <properties>
      <java.version>1.8</java.version>
   </properties>
 
   <dependencies>
      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-web</artifactId>
      </dependency>
 
      <dependency>
         <groupId>org.projectlombok</groupId>
         <artifactId>lombok</artifactId>
         <optional>true</optional>
      </dependency>
      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-test</artifactId>
         <scope>test</scope>
      </dependency>
      <dependency>
         <groupId>org.springframework.kafka</groupId>
         <artifactId>spring-kafka</artifactId>
      </dependency>
   </dependencies>
 
   <build>
      <plugins>
         <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
         </plugin>
      </plugins>
   </build>
 
 
</project>

配置文件如下:

server.port=8089
spring.kafka.bootstrap-servers=ip:port
spring.kafka.producer.retries= 0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.linger.ms=1

然后新建一个Producer类

package com.example.kafkaproducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
 
@Component
public class KafkaProducer {
    @Autowired
    KafkaTemplate kafkaTemplate;
    public void produce(){
        kafkaTemplate.send("test","hello word");
        System.out.println("发送消息");
    }
}

在test类中调用

package com.example.kafkaproducer;  
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner; 
 
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaproducerApplicationTests { 
   @Autowired KafkaProducer kafkaProducer; 
   @Test
   public void contextLoads() {
      kafkaProducer.produce();
   } 
}

然后控制台就会打印一个莫名奇妙的错误,没有打印任何堆栈信息,大概意思只是表达了连接不上。

Exception thrown when sending a message with key='null' and payload='' to topic

telnet ip+port 是可以通的

随后发现,xshell上启动的kafka-server在报这样一个错,更详细的没有留存。

ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
kafka.common.KafkaException: Wrong request type 18

百度了一下,很可能是Linux上的kafka版本和pom中引入的spring-kafka依赖不匹配造成的,于是查看对应关系

查看kafka,发现装的是一个0.8.2.1 版本的kafka,该版本的kafka是2015年3月发布的版本,可以说是十分古老,真是不知道为什么当初要选这么老的版本。

换了几次spring-kafka的pom之后,依然在报这个问题,于是我选择换更新的kafka的包。

换了2.2.0版本kafka的包,问题得到解决。

其中consumer的创建命令和老版本的不太一样,且consumer和producer需使用相同的端口号,而不是像之前producer配置为broker的端口,consumer配置为zookeeper的端口号。

./bin/kafka-console-consumer.sh --bootstrap-server ip:9092  --topic test

且config文件夹下server.properties文件中的一些配置和之前不太一样,需要注意的是,以下两行配置原来是被注解了的,需要在这里取消掉注解,并配置自己的ip。

listeners = PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://your.host.name:9092

springboot、spring-kafka、kafka-client三者兼容性关系

spring官方描述的spring-kafka的版本和kafka-clients的版本对应关系:

官方地址:https://spring.io/projects/spring-kafka

中间列:“Spring Integration for Apache Kafka Version 可忽略不看:

也就是说spring-kafka与spring-client是存在在一对多关系的,那是不是他所有的spring-client都可以选呢?

接着往下看(摘自官网):

他说啥 ?

  • springboot 1.5 你应该用的是spring-kafka 1.3.x.
  • springboot2.0你应该使用的是spring-kafka2.0.x.
  • 如果用的是spring boot2.1.x,那么你必须使用spring-kafka的版本是2.2.x。否则就会出现noClass等等各种异常。
  • spring-kafka的版本是2.1默认使用的spring-client是1.1.x,当你要使用另外两个时,你就要使用如下的版本配置.
  • 如果你用的是2.2.x的spring-kafka,只看第一张图,你会以为2.1.x的kafka-clients也可以用。但是spring说了,此时默认用的kafka-clients是2.0.x,如果你想用2.1.x,必须看文档附录,下图的大概意思,必须换掉下图所示的所有依赖版本。

也就是说并不是一对多 他默认的还是只有一个kafka-client来给你的,你要选其他的可以的,你添加一些额外配置

例如:

Spring Boot 2.1.0.RELEASE 版本,因此 spring-kafka 的版本为 2.2.0.RELEASE,kafka-clients 的默认版本为2.0.0,所以 kafka 的版本选用为 kafka_2.11-2.1.0 (前面的2.11代表的是Scala的版本后面为kafka的版本号)

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • SpringBoot集成Kafka的步骤

    SpringBoot集成Kafka 本篇主要讲解SpringBoot 如何集成Kafka ,并且简单的 编写了一个Demo 来测试 发送和消费功能 前言 选择的版本如下: springboot : 2.3.4.RELEASE spring-kafka : 2.5.6.RELEASE kafka : 2.5.1 zookeeper : 3.4.14 本Demo 使用的是 SpringBoot 比较高的版本 SpringBoot 2.3.4.RELEASE 它会引入 spring-kafka 2.5

  • Springboot 1.5.7整合Kafka-client代码示例

    在一次项目中,因甲方需要使用kafka消息队列推送数据,所以需要接入kafka,并且kafka的版本是2.11.但是我们项目使用的是Springboot 1.5.7的版本,对应的springboot.kafka.starter有冲突,所以就接入了kafka-client. Kafka 是一个分布式消息引擎与流处理平台,经常用做企业的消息总线.实时数据管道,有的还把它当做存储系统来使用. 早期 Kafka 的定位是一个高吞吐的分布式消息系统,目前则演变成了一个成熟的分布式消息引擎,以及流处理平台.

  • Springboot集成Kafka进行批量消费及踩坑点

    目录 引入依赖 创建配置类 Kafka 消费者 引入依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.11.RELEASE</version> </dependency> 因为我的项目的 springboot 版本是 1.5.22.RELE

  • 深入研究spring boot集成kafka之spring-kafka底层原理

    目录 前言 简单集成 引入依赖 添加配置 测试发送和接收 Spring-kafka-test嵌入式KafkaServer 引入依赖 启动服务 创建新的Topic 程序启动时创建TOPIC 代码逻辑中创建 PS:其他的方式创建TOPIC 引入依赖 api方式创建 命令方式创建 消息发送之KafkaTemplate探秘 获取发送结果 异步获取 同步获取 KAFKA事务消息 REPLYINGKAFKATEMPLATE获得消息回复 Spring-kafka消息消费用法探秘 @KAFKALISTENER的

  • 被kafka-client和springkafka版本坑到自闭及解决

    目录 被kafka-client和springkafka版本坑 springboot.spring-kafka.kafka-client三者兼容性关系 被kafka-client和springkafka版本坑 上周刚刚欢天喜地的在linux上部了kafka,这周打算用spring-boot框架写个简单demo跑一下,结果悲剧就此展开. 首先建立maven工程:pom中添加spring boot kafka依赖: <?xml version="1.0" encoding="

  • 关于spring版本与JDK版本不兼容的问题及解决方法

    在用ssh框架测试时出现问题,如下: java.lang.IllegalArgumentException at org.springframework.asm.ClassReader.<init>(Unknown Source) at org.springframework.asm.ClassReader.<init>(Unknown Source) at org.springframework.asm.ClassReader.<init>(Unknown Sourc

  • pyinstaller 3.6版本通过pip安装失败的解决办法(推荐)

    本机中原pyinstaller版本为3.5版本,本打算通过 pip install --upgrade pyinstaller进行升级,竟然报错,后面卸载再重新安装也一样报错,没办法看来通过pip是暂时安装不上了. 下面就讲解源码安装的方式,先从pypi.org中下载对应3.6版本的源码 https://pypi.org/project/PyInstaller/#files 下载完成后如果是win10操作系统的话,一定要以管理员的方式打包命令窗口,否则会提示权限不足的问题,比如我是把源码放到E盘

  • Vant Weapp组件踩坑:picker的初始赋值解决

    在使用vant的picker组件时,我希望有在页面加载时向数据库获取值来设置picker的默认值.开始我使用官方文档中的default-index属性,并在onLoad方法中进行this.setData().但是这样做picker的默认索引还是0. 于是我又将setData放入onShow().onReady()方法,均不奏效.(我真的不知道为什么,新建一个空白页面测试也是这样) 继而我查看官方文档,发现picker有实例方法setIndexes(),于是尝试. 然而我在onLoad().onR

  • python版本坑:md5例子(python2与python3中md5区别)

    起步 对于一些字符,python2和python3的md5加密出来是不一样的. # python2.7 pwd = "xxx" + chr(163) + "fj" checkcode = hashlib.md5(pwd).hexdigest() print checkcode # ea25a328180680aab82b2ef8c456b4ce # python3.6 pwd = "xxx" + chr(163) + "fj"

  • 详解AngularJS1.6版本中ui-router路由中/#!/的解决方法

    AngularJS 路由 是通过 # + 标记 帮助我们区分不同的逻辑页面并将不同的页面绑定到对应的控制器上.因此在设置好路由规则后,为html页面的a标签设置href路由链接切换不同的视图.Angular1.6版本之前通常有href="#..."或href="#/..."这两种写法,结果这两种写法在Angular1.6中没有任何反应. 结果查看了下浏览器地址栏,默认视图链接竟然显示"#!/..",是的,中间多加了个!号. AngularJS升级

  • JQuery 1.3.2以上版本中出现pareseerror错误的解决方法

    感觉很莫名奇妙,前一阵子还好好的,怎么现在就突然报错了? 程序根本没动.于是开始跟踪分析,请求.返回的内容...最后判断是jquery 不承认返回的内容是json格式.当时返回的内容是: 复制代码 代码如下: {'keylist':[ {'tid':'13',"sumnum':'1'},{'tid':'21','sumnum':'1'} ]} 经过仔细检查也没发现任何问题,返回流的字符已经设成utf-8,内容用ff检测,ff也能识别成json格式,但此时还是报"pareseerror&

  • 详解一次Vue低版本安卓白屏问题的解决过程

    因为业务需要一定要使用安卓4.4的webview浏览页面,测试的时候发生了白屏问题,很自然想到使用babel转换部分ES6语法. Babel 转换 Promise 和 Symbol ES6语法的配置 # 两项都需要放到生产依赖 npm install babel-polyfill --save npm install es6-promise --save // main.js import 'babel-polyfill'; import Es6Promise from 'es6-promise

  • mysql5.7版本root密码登录问题的解决方法

    发现上一篇文章解决了mysql服务无法启动问题后,竟然用root用户无密码不能登录,5.7版本不能在初始化时用root无密码登录,找了很多帖子后,解决了问题. 原来在上一篇文章的mysqld –initialize方法生成的data文件里的.err文件里,可以用word打开,会有生成root的临时初始密码 打开之后找到密码: 但是这个密码用一次后就会过期不能使用了.下面我们就来改这个密码 1.先关闭mysql服务,net stop mysql 2.修改你的mysql的配置文件my.ini或者my

  • SpringBoot遇到的坑@Qualifier报红的解决

    目录 SpringBoot遇到的坑@Qualifier报红 解决方法 SpringBoot注解@Qualifier用法 SpringBoot遇到的坑@Qualifier报红 今天写项目的时候@Qualifier一直报红,排查半天后面才知道原来是idea生成项目的时候把主配置的类放在一个包中导致默认包结构扫描发生改变,扫描不到你定义的组件,具体可以去看看SpirngBoot默认包结构规则. 解决方法 主配置类必须放在是你所扫描的包的父包上 SpringBoot注解@Qualifier用法 在Con

随机推荐