SpringBoot集成Kafka的步骤

SpringBoot集成Kafka

本篇主要讲解SpringBoot 如何集成Kafka ,并且简单的 编写了一个Demo 来测试 发送和消费功能

前言

选择的版本如下:

springboot : 2.3.4.RELEASE

spring-kafka : 2.5.6.RELEASE

kafka : 2.5.1

zookeeper : 3.4.14

本Demo 使用的是 SpringBoot 比较高的版本 SpringBoot 2.3.4.RELEASE 它会引入 spring-kafka 2.5.6 RELEASE ,对应了版本关系中的
Spring Boot 2.3 users should use 2.5.x (Boot dependency management will use the correct version).

spring和 kafka 的版本 关系

https://spring.io/projects/sp...

1.搭建Kafka 和 Zookeeper 环境

搭建kafka 和 zookeeper 环境 并且启动 它们

2.创建Demo 项目引入spring-kafka

2.1 pom 文件

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
  <groupId>com.google.code.gson</groupId>
  <artifactId>gson</artifactId>
</dependency>

2.2 配置application.yml

spring:
 kafka:
  bootstrap-servers: 192.168.25.6:9092 #bootstrap-servers:连接kafka的地址,多个地址用逗号分隔
  consumer:
   group-id: myGroup
   enable-auto-commit: true
   auto-commit-interval: 100ms
   properties:
    session.timeout.ms: 15000
   key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
   value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
   auto-offset-reset: earliest
  producer:
   retries: 0 #若设置大于0的值,客户端会将发送失败的记录重新发送
   batch-size: 16384 #当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
   buffer-memory: 33554432 #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
   key-serializer: org.apache.kafka.common.serialization.StringSerializer #关键字的序列化类
   value-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化类

2.3 定义消息体Message

/**
 * @author johnny
 * @create 2020-09-23 上午9:21
 **/
@Data
public class Message {

  private Long id;

  private String msg;

  private Date sendTime;
}

2.4 定义KafkaSender

主要利用 KafkaTemplate 来发送消息 ,将消息封装成Message 并且进行 转化成Json串 发送到Kafka中

@Component
@Slf4j
public class KafkaSender {

  private final KafkaTemplate<String, String> kafkaTemplate;

  //构造器方式注入 kafkaTemplate
  public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
  }

  private Gson gson = new GsonBuilder().create();

  public void send(String msg) {
    Message message = new Message();

    message.setId(System.currentTimeMillis());
    message.setMsg(msg);
    message.setSendTime(new Date());
    log.info("【++++++++++++++++++ message :{}】", gson.toJson(message));
    //对 topic = hello2 的发送消息
    kafkaTemplate.send("hello2",gson.toJson(message));
  }

}

2.5 定义KafkaConsumer

在监听的方法上通过注解配置一个监听器即可,另外就是指定需要监听的topic
kafka的消息再接收端会被封装成ConsumerRecord对象返回,它内部的value属性就是实际的消息。

@Component
@Slf4j
public class KafkaConsumer {

  @KafkaListener(topics = {"hello2"})
  public void listen(ConsumerRecord<?, ?> record) {

    Optional.ofNullable(record.value())
        .ifPresent(message -> {
          log.info("【+++++++++++++++++ record = {} 】", record);
          log.info("【+++++++++++++++++ message = {}】", message);
        });
  }

}

3.测试 效果

提供一个 Http接口调用 KafkaSender 去发送消息

3.1 提供Http 测试接口

@RestController
@Slf4j
public class TestController {

  @Autowired
  private KafkaSender kafkaSender;

  @GetMapping("sendMessage/{msg}")
  public void sendMessage(@PathVariable("msg") String msg){
    kafkaSender.send(msg);
  }
}

3.2 启动项目

监听8080 端口

KafkaMessageListenerContainer中有 consumer group = myGroup 有一个 监听 hello2-0 topic 的 消费者

3.3 调用Http接口

http://localhost:8080/sendMessage/KafkaTestMsg

至此 SpringBoot集成Kafka 结束 。。

以上就是SpringBoot集成Kafka的步骤的详细内容,更多关于SpringBoot集成Kafka的资料请关注我们其它相关文章!

(0)

相关推荐

  • Springboot 1.5.7整合Kafka-client代码示例

    在一次项目中,因甲方需要使用kafka消息队列推送数据,所以需要接入kafka,并且kafka的版本是2.11.但是我们项目使用的是Springboot 1.5.7的版本,对应的springboot.kafka.starter有冲突,所以就接入了kafka-client. Kafka 是一个分布式消息引擎与流处理平台,经常用做企业的消息总线.实时数据管道,有的还把它当做存储系统来使用. 早期 Kafka 的定位是一个高吞吐的分布式消息系统,目前则演变成了一个成熟的分布式消息引擎,以及流处理平台.

  • springboot 1.5.2 集成kafka的简单例子

    本文介绍了springboot 1.5.2 集成kafka的简单例子 ,分享给大家,具体如下: 随着spring boot 1.5版本的发布,在spring项目中与kafka集成更为简便. 添加依赖 compile("org.springframework.kafka:spring-kafka:1.1.2.RELEASE") 添加application.properties #kafka # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-server

  • 在springboot中对kafka进行读写的示例代码

    springboot对kafka的client很好的实现了集成,使用非常方便,本文也实现了一个在springboot中实现操作kafka的demo. 1.POM配置 只需要在dependencies中增加 spring-kafka的配置即可.完整效果如下: <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifa

  • SpringBoot Kafka 整合使用及安装教程

    前提 假设你了解过 SpringBoot 和 Kafka. 1.SpringBoot 如果对 SpringBoot 不了解的话,建议去看看 DD 大佬 和 纯洁的微笑 的系列博客. 2.Kafka Kafka 的话可以看看我前两天写的博客 : Kafka 安装及快速入门 学习的话自己开台虚拟机自己手动搭建环境吧,有条件的买服务器. 注意:一定要亲自自己安装实践,接下来我们将这两个进行整合. 创建项目 项目整体架构: 使用 IDEA 创建 SpringBoot 项目,这个很简单了,这里不做过多的讲

  • Springboot集成Kafka实现producer和consumer的示例代码

    本文介绍如何在springboot项目中集成kafka收发message. Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能.高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息.支持通过Kafka服务器和消费机集群来分区消息.支持Hadoop并行数据加载. 安装Kafka 因为安装kafka需要zookeeper的支持,所以Windows安装时需要将zookee

  • SpringBoot集成Kafka的步骤

    SpringBoot集成Kafka 本篇主要讲解SpringBoot 如何集成Kafka ,并且简单的 编写了一个Demo 来测试 发送和消费功能 前言 选择的版本如下: springboot : 2.3.4.RELEASE spring-kafka : 2.5.6.RELEASE kafka : 2.5.1 zookeeper : 3.4.14 本Demo 使用的是 SpringBoot 比较高的版本 SpringBoot 2.3.4.RELEASE 它会引入 spring-kafka 2.5

  • SpringBoot集成EasyExcel的步骤

    目录 一 .EasyExcel简介 三.依赖 四.监听 五.接口导入Excel 六.接口 导出Excel (HttpServletResponse response, HttpServletRequest request) 七.本地导入.本地导出 一 .EasyExcel简介 EasyExcel优势    注解式自定义操作.    输入输出简单,提供输入输出过程的接口    支持一定程度的单元格合并等灵活化操作 二.常用注解 @ExcelProperty 指定当前字段对应excel中的那一列.可

  • SpringBoot集成kafka全面实战记录

    本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下<大白话kafka架构原理>.<秒懂kafka HA(高可用)>两篇文章. 一.生产者实践 普通生产者 带回调的生产者 自定义分区器 kafka事务提交 二.消费者实践 简单消费 指定topic.partition.offset消费 批量消费 监听异常处理器 消息过滤器 消息转发 定时启动/停止监听器 一.前戏 1.在项目中连接kafka,因为是外网,首先要开放kafka配置文件

  • Springboot集成Kafka进行批量消费及踩坑点

    目录 引入依赖 创建配置类 Kafka 消费者 引入依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.11.RELEASE</version> </dependency> 因为我的项目的 springboot 版本是 1.5.22.RELE

  • .Net Core 集成 Kafka的步骤

    目录 kafka broker topic partition consumer group 安装kafka .net 操作 kafka 生产者 消费者 运行一下 总结 最近维护的一个系统并发有点高,所以想引入一个消息队列来进行削峰.考察了一些产品,最终决定使用kafka来当做消息队列.以下是关于kafka的一些知识的整理笔记. kafka kafka 是分布式流式平台.它由linkedin开发,后贡献给了Apache开源组织并成为顶级开源项目.它可以应用在高并发场景下的日志系统,也可以当作消息

  • Springboot集成kafka高级应用实战分享

    目录 深入应用 1.1 springboot-kafka 1.2 消息发送 1.2.1 发送类型 1.2.2 序列化 1.2.3 分区策略 1.3 消息消费 1.3.1 消息组别 1.3.2 位移提交 深入应用 1.1 springboot-kafka 1)配置文件 kafka: bootstrap-servers: 52.82.98.209:10903,52.82.98.209:10904 producer: # producer 生产者 retries: 0 # 重试次数 acks: 1 #

  • SpringBoot集成Kafka 配置工具类的详细代码

    目录 1.单播模式,只有一个消费者组 2.广播模式,多个消费者组 spring-kafka 是基于 java版的 kafka client与spring的集成,提供了 KafkaTemplate,封装了各种方法,方便操作,它封装了apache的kafka-client,不需要再导入client依赖 <!-- kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <arti

  • Springboot集成Elasticsearch的步骤与相关功能

    目录 集成配置步骤 步骤1:加入 Maven 相关依赖 步骤2:配置 elasticsearch 的主机和端口 步骤3:配置 Elaseticsearch 客户端 步骤4:创建文档实体 步骤5:创建 controller,service, dao 层 相关功能实现 1. 添加文档  2. 修改文档  3. 根据ID查询文档  4. 根据ID删除文档  5. 查询所有文档  6. 条件查询(单个条件)  7. 条件查询(多条件)  8. 分页查询(降序)  9. 分页查询(升序)  10. 分页查

  • springboot 集成dubbo的步骤详解

    目录 第一步 搭建zookeeper环境 第二步 springboot集成dubbo 1.项目目录机构 2.代码编写 2.1 api目录 2.2 consumer目录 web访问.接口调用以及dubbo配置编写 2.3 provider目录 api接口实现以及dubbo配置 第三步 测试dubbo远程服务调用 附录: 写在前面:在阅读本文前,请前拥有dubbo基础知识,springboot知识 dubbo官网: http://dubbo.apache.orgdubbo github 源码地址:h

随机推荐