Spring Boot 集成 Kafkad的实现示例

Spring Boot 作为主流微服务框架,拥有成熟的社区生态。市场应用广泛,为了方便大家,整理了一个基于spring boot的常用中间件快速集成入门系列手册,涉及RPC、缓存、消息队列、分库分表、注册中心、分布式配置等常用开源组件,大概有几十篇文章,陆续会开放出来,感兴趣同学请提前关注&收藏

消息通信有两种基本模型,即发布-订阅(Pub-Sub)模型和点对点(Point to Point)模型,发布-订阅支持生产者消费者之间的一对多关系,而点对点模型中有且仅有一个消费者。

前言

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”。

Kafka高效地处理实时流式数据,可以实现与Storm、HBase和Spark的集成。作为聚类部署到多台服务器上,Kafka处理它所有的发布和订阅消息系统使用了四个API,即生产者API、消费者API、Stream API和Connector API。它能够传递大规模流式消息,自带容错功能,已经取代了一些传统消息系统,如JMS、AMQP等。

为什么使用kafka?

  • 削峰填谷。缓冲上下游瞬时突发流量,保护 “脆弱” 的下游系统不被压垮,避免引发全链路服务 “雪崩”。
  • 系统解耦。发送方和接收方的松耦合,一定程度简化了开发成本,减少了系统间不必要的直接依赖。
  • 异步通信:消息队列允许用户把消息放入队列但不立即处理它。
  • 可恢复性:即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

业务场景

  • 一些同步业务流程的非核心逻辑,对时间要求不是特别高,可以解耦异步来执行
  • 系统日志收集,采集并同步到kafka,一般采用ELK组合玩法
  • 一些大数据平台,用于各个系统间数据传递

基本架构

Kafka 运行在一个由一台或多台服务器组成的集群上,并且分区可以跨集群节点分布

1、Producer 生产消息,发送到Broker中

2、Leader状态的Broker接收消息,写入到相应topic中。在一个分区内,这些消息被索引并连同时间戳存储在一起

3、Leader状态的Broker接收完毕以后,传给Follow状态的Broker作为副本备份

4、 Consumer 消费者的进程可以从分区订阅,并消费消息

常用术语

  • Broker。负责接收和处理客户端发送过来的请求,以及对消息进行持久化。虽然多个 Broker 进程能够运行在同一台机器上,但更常见的做法是将不同的 Broker 分散运行在不同的机器上
  • 主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
  • 分区:Partition。一个有序不变的消息序列。每个主题下可以有多个分区。
  • 消息:这里的消息就是指 Kafka 处理的主要对象。
  • 消息位移:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。
  • 副本:Replica。Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。每个分区可配置多个副本实现高可用。一个分区的N个副本一定在N个不同的Broker上。
  • Leader:每个分区多个副本的“主”副本,生产者发送数据的对象,以及消费者消费数据的对象,都是 Leader。
  • Follower:每个分区多个副本的“从”副本,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 还会成为新的 Leader。
  • 生产者:Producer。向主题发布新消息的应用程序。
  • 消费者:Consumer。从主题订阅新消息的应用程序。
  • 消费者位移:Consumer Offset。表示消费者消费进度,每个消费者都有自己的消费者位移。offset保存在broker端的内部topic中,不是在clients中保存
  • 消费者组:Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
  • 重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

代码演示

外部依赖:

在 pom.xml 中添加 Kafka 依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

由于spring-boot-starter-parent 指定的版本号是2.1.5.RELEASE,spring boot 会对外部框架的版本号统一管理,spring-kafka 引入的版本是 2.2.6.RELEASE

配置文件:

在配置文件 application.yaml 中配置 Kafka 的相关参数,具体内容如下:

Spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      retries: 3  # 生产者发送失败时,重试次数
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # 生产者消息key和消息value的序列化处理类
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: tomge-consumer-group  # 默认消费者group id
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 100
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

对应的配置类 org.springframework.boot.autoconfigure.kafka.KafkaProperties,来初始化kafka相关的bean实例对象,并注册到spring容器中。

发送消息:

Spring Boot 作为一款支持快速开发的集成性框架,同样提供了一批以 -Template 命名的模板工具类用于实现消息通信。对于 Kafka 而言,这个工具类就是KafkaTemplate

KafkaTemplate 提供了一系列 send 方法用来发送消息,典型的 send 方法定义如下代码所示:

public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
 。。。。 省略
}

生产端提供了一个restful接口,模拟发送一条创建新用户消息。

@GetMapping("/add_user")
public Object add() {
    try {
        Long id = Long.valueOf(new Random().nextInt(1000));
        User user = User.builder().id(id).userName("TomGE").age(29).address("上海").build();
        ListenableFuture<SendResult> listenableFuture = kafkaTemplate.send(addUserTopic, JSON.toJSONString(user));
        
        // 提供回调方法,可以监控消息的成功或失败的后续处理
        listenableFuture.addCallback(new ListenableFutureCallback<SendResult>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("发送消息失败," + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult sendResult) {
                // 消息发送到的topic
                String topic = sendResult.getRecordMetadata().topic();
                // 消息发送到的分区
                int partition = sendResult.getRecordMetadata().partition();
                // 消息在分区内的offset
                long offset = sendResult.getRecordMetadata().offset();
                System.out.println(String.format("发送消息成功,topc:%s, partition: %s, offset:%s ", topic, partition, offset));
            }
        });
        return "消息发送成功";
    } catch (Exception e) {
        e.printStackTrace();
        return "消息发送失败";
    }
}

实际上开发使用的Kafka默认允许自动创建Topic,创建Topic时默认的分区数量是1,可以通过server.properties文件中的num.partitions=1修改默认分区数量。在生产环境中通常会关闭自动创建功能,Topic需要由运维人员先创建好。

消费消息:

在 Kafka 中消息通过服务器推送给各个消费者,而 Kafka 的消费者在消费消息时,需要提供一个监听器(Listener)对某个 Topic 实现监听,从而获取消息,这也是 Kafka 消费消息的唯一方式。

定义一个消费类,在处理具体消息业务逻辑的方法上添加 @KafkaListener 注解,并配置要消费的topic,代码如下所示:

@Component
public class UserConsumer {

    @KafkaListener(topics = "add_user")
    public void receiveMesage(String content) {
        System.out.println("消费消息:" + content);
    }
}

是不是很简单,添加kafka依赖、使用KafkaTemplate、@KafkaListener注解就完成消息的生产和消费,其实是SpringBoot在背后默默的做了很多工作,如果感兴趣可以研究下spring-boot-autoconfigure ,里面提供了常用开源框架的客户端实例封装。

演示工程代码

https://github.com/aalansehaiyang/spring-boot-bulking

模块:spring-boot-bulking-kafka

以上就是Spring Boot 集成 Kafkad的示例的详细内容,更多关于Spring Boot 集成 Kafka的资料请关注我们其它相关文章!

(0)

相关推荐

  • Spring boot集成Kafka+Storm的示例代码

    前言 由于业务需求需要把Strom与kafka整合到spring boot项目里,实现其他服务输出日志至kafka订阅话题,storm实时处理该话题完成数据监控及其他数据统计,但是网上教程较少,今天想写的就是如何整合storm+kafka 到spring boot,顺带说一说我遇到的坑. 使用工具及环境配置 ​ 1. java 版本jdk-1.8 ​ 2. 编译工具使用IDEA-2017 ​ 3. maven作为项目管理 ​ 4.spring boot-1.5.8.RELEASE 需求体现 1.

  • Spring Boot集成Kafka的示例代码

    本文介绍了Spring Boot集成Kafka的示例代码,分享给大家,也给自己留个笔记 系统环境 使用远程服务器上搭建的kafka服务 Ubuntu 16.04 LTS kafka_2.12-0.11.0.0.tgz zookeeper-3.5.2-alpha.tar.gz 集成过程 1.创建spring boot工程,添加相关依赖: <?xml version="1.0" encoding="UTF-8"?> <project xmlns=&qu

  • springboot 1.5.2 集成kafka的简单例子

    本文介绍了springboot 1.5.2 集成kafka的简单例子 ,分享给大家,具体如下: 随着spring boot 1.5版本的发布,在spring项目中与kafka集成更为简便. 添加依赖 compile("org.springframework.kafka:spring-kafka:1.1.2.RELEASE") 添加application.properties #kafka # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-server

  • SpringBoot集成Kafka的步骤

    SpringBoot集成Kafka 本篇主要讲解SpringBoot 如何集成Kafka ,并且简单的 编写了一个Demo 来测试 发送和消费功能 前言 选择的版本如下: springboot : 2.3.4.RELEASE spring-kafka : 2.5.6.RELEASE kafka : 2.5.1 zookeeper : 3.4.14 本Demo 使用的是 SpringBoot 比较高的版本 SpringBoot 2.3.4.RELEASE 它会引入 spring-kafka 2.5

  • Spring boot集成Kafka消息中间件代码实例

    一.创建Spring boot项目,添加如下依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <ar

  • spring boot 与kafka集成的示例代码

    新建spring boot项目 这里使用intellij IDEA 添加kafka集成maven <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLoc

  • spring boot与kafka集成的简单实例

    本文介绍了spring boot与kafka集成的简单实例,分享给大家,具体如下: 引入相关依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.

  • Springboot集成Kafka实现producer和consumer的示例代码

    本文介绍如何在springboot项目中集成kafka收发message. Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能.高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息.支持通过Kafka服务器和消费机集群来分区消息.支持Hadoop并行数据加载. 安装Kafka 因为安装kafka需要zookeeper的支持,所以Windows安装时需要将zookee

  • Spring Boot 集成 Kafkad的实现示例

    Spring Boot 作为主流微服务框架,拥有成熟的社区生态.市场应用广泛,为了方便大家,整理了一个基于spring boot的常用中间件快速集成入门系列手册,涉及RPC.缓存.消息队列.分库分表.注册中心.分布式配置等常用开源组件,大概有几十篇文章,陆续会开放出来,感兴趣同学请提前关注&收藏 消息通信有两种基本模型,即发布-订阅(Pub-Sub)模型和点对点(Point to Point)模型,发布-订阅支持生产者消费者之间的一对多关系,而点对点模型中有且仅有一个消费者. 前言 Kafka是

  • Spring boot 集成Dubbox的方法示例

    前言 因为工作原因,需要在项目中集成dubbo,所以去查询dubbo相关文档,发现dubbo目前已经不更新了,所以把目光投向了dubbox,dubbox是当当网基于dubbo二次开发的一个项目,dubbox,因为公司项目中一个是基于spring mvc 3.0的,一个是基于spring boot的,而spring boot相对来说文档少一点,所以此文记录下spring boot下如何继承dubbox 一.安装zookeeper 1.zookeeper简介 ZooKeeper是一个分布式的,开放源

  • 使用Spring Boot集成FastDFS的示例代码

    这篇文章我们介绍如何使用Spring Boot将文件上传到分布式文件系统FastDFS中. 这个项目会在上一个项目的基础上进行构建. 1.pom包配置 我们使用Spring Boot最新版本1.5.9.jdk使用1.8.tomcat8.0. <dependency> <groupId>org.csource</groupId> <artifactId>fastdfs-client-java</artifactId> <version>

  • spring boot集成mongodb的增删改查的示例代码

    添加依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency> properties配置 spring.data.mongodb.host=127.0.0.1 spring.data.mongodb.port=27017 spring

  • Spring Boot集成ShedLock分布式定时任务的实现示例

    一.ShedLock是什么? 官方地址:github.com/lukas-kreca- 以下是ShedLock锁提供者,通过外部存储实现锁,由下图可知外部存储集成的库还是很丰富的 本篇教程我们基于JdbcTemplate存储为例来使用ShedLock锁. 二.落地实现 1.1 引入依赖包 shedlock所需依赖包: <dependency> <groupId>net.javacrumbs.shedlock</groupId> <artifactId>she

  • spring boot集成redisson的最佳实践示例

    目录 前言 集成jedis实例,xml方式 集成前引用的jar springbean配置xml 集成redisson实例,javabean的方式 集成前引入的jar javabean配置如下 提供实例化javabean application.properties添加如下配置 前言 本文假使你了解spring boot并实践过,非spring boot用户可跳过也可借此研究一下. redisson是redis的java客户端程序,国内外很多公司都有在用,如下, 和spring的集成中官方给出的实

  • Spring Boot 集成JWT实现前后端认证的示例代码

    目录 前言 JWT简介 为什么要用JWT 传统session认证存在那些弊端? JWT认证的优势 JWT的数据结构 Header Payload Signature Spring Boot集成JWT 引入Jwt包 编写jwt工具类 Token认证拦截器 配置拦击器 登录验证流程 示例代码 总结 前言 小程序.H5应用的快速发展,使得前后端分离已经成为了趋势,然而系统认证却是系统的重要一部分,本文将讲解JWT如何实现前后端认证. JWT简介 JWT(全称:Json Web Token)是一个开放标

  • Spring Boot 集成Mybatis实现主从(多数据源)分离方案示例

    本文将介绍使用Spring Boot集成Mybatis并实现主从库分离的实现(同样适用于多数据源).延续之前的Spring Boot 集成MyBatis.项目还将集成分页插件PageHelper.通用Mapper以及Druid. 新建一个Maven项目,最终项目结构如下: 多数据源注入到sqlSessionFactory POM增加如下依赖: <!--JSON--> <dependency> <groupId>com.fasterxml.jackson.core<

随机推荐