Spring Boot 中使用@KafkaListener并发批量接收消息的完整代码

kakfa是我们在项目开发中经常使用的消息中间件。由于它的写性能非常高,因此,经常会碰到读取Kafka消息队列时拥堵的情况。遇到这种情况时,有时我们不能直接清理整个topic,因为还有别的服务正在使用该topic。因此只能额外启动一个相同名称的consumer-group来加快消息消费(如果该topic只有一个分区,再启动一个新的消费者,没有作用)。

完整的代码在这里,欢迎加星号、fork。

官方文档在https://docs.spring.io/spring-kafka/reference/html/_reference.html

###第一步,并发消费###
先看代码,重点是这我们使用的是ConcurrentKafkaListenerContainerFactory并且设置了factory.setConcurrency(4); (我的topic有4个分区,为了加快消费将并发设置为4,也就是有4个KafkaMessageListenerContainer)

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

注意也可以直接在application.properties中添加spring.kafka.listener.concurrency=3,然后使用@KafkaListener并发消费。

###第二步,批量消费###
然后是批量消费。重点是factory.setBatchListener(true);
以及 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
一个设启用批量消费,一个设置批量消费每次最多消费多少条消息记录。

重点说明一下,我们设置的ConsumerConfig.MAX_POLL_RECORDS_CONFIG是50,并不是说如果没有达到50条消息,我们就一直等待。官方的解释是"The maximum number of records returned in a single call to poll().", 也就是50表示的是一次poll最多返回的记录数。

从启动日志中可以看到还有个 max.poll.interval.ms = 300000, 也就说每间隔max.poll.interval.ms我们就调用一次poll。每次poll最多返回50条记录。

max.poll.interval.ms官方解释是"The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. ";

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

   @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        return propsMap;
    }

启动日志截图

关于max.poll.records和max.poll.interval.ms官方解释截图:

###第三步,分区消费###
对于只有一个分区的topic,不需要分区消费,因为没有意义。下面的例子是针对有2个分区的情况(我的完整代码中有4个listenPartitionX方法,我的topic设置了4个分区),读者可以根据自己的情况进行调整。

public class MyListener {
    private static final String TPOIC = "topic02";

    @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
    public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
        log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id0 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p0 Received message={}",  message);
            }
        }
    }

    @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
    public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
        log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id1 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p1 Received message={}",  message);
            }
        }
}

关于分区和消费者关系,后面会补充,先摘录如下:
If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.

最后,总结,如果我们的topic有多个分区,经过以上步骤可以很好的加快消息消费。如果只有一个分区,因为已经有一个同名group id在消费了,新启动的一个基本上没有作用(本人测试结果)。

具体代码在这里,欢迎加星号,fork。

到此这篇关于Spring Boot 中使用@KafkaListener并发批量接收消息的文章就介绍到这了,更多相关Spring Boot 使用@KafkaListener并发批量接收消息内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • spring-Kafka中的@KafkaListener深入源码解读

    目录 前言 一.总体流程 二.源码解读 1.postProcessAfterInitialization 1.1.processKafkaListener 1.2.processListener 1.3.registerEndpoint 1.4.startIfNecessary 2.afterSingletonsInstantiated 2.1.afterPropertiesSet 2.2.registerAllEndpoints 总结 前言 本文主要通过深入了解源码,梳理从spring启动到真

  • springboot+kafka中@KafkaListener动态指定多个topic问题

    目录 说明 总结一下大家问的最多的一个问题 终极方法 思路 实现 代码 总结 说明 本项目为springboot+kafak的整合项目,故其用了springboot中对kafak的消费注解@KafkaListener 首先,application.properties中配置用逗号隔开的多个topic. 方法:利用Spring的SpEl表达式,将topics 配置为:@KafkaListener(topics = “#{’${topics}’.split(’,’)}”) 运行程序,console打

  • Spring Boot 中使用@KafkaListener并发批量接收消息的完整代码

    kakfa是我们在项目开发中经常使用的消息中间件.由于它的写性能非常高,因此,经常会碰到读取Kafka消息队列时拥堵的情况.遇到这种情况时,有时我们不能直接清理整个topic,因为还有别的服务正在使用该topic.因此只能额外启动一个相同名称的consumer-group来加快消息消费(如果该topic只有一个分区,再启动一个新的消费者,没有作用). 完整的代码在这里,欢迎加星号.fork. 官方文档在https://docs.spring.io/spring-kafka/reference/h

  • Spring boot调用Oracle存储过程的两种方式及完整代码

    前言 因工作需要将公司SSH项目改为Spingboot项目,将项目中部分需要调用存储过程的部分用entityManagerFactory.unwrap(SessionFactory.class).openSession()来获取Session实现后发现项目访问数据库超过十次就会挂掉,原因是Springboot连接池数量默认为10,猜测是每次访问数据库后连接未释放导致的,手动关闭session后问题解决. 解决问题的过程中又发现了另外两种调用方式: 直接用EntityManager的createS

  • Spring Boot中优雅的获取yml文件工具类

    如何在spring boot中优雅的获取.yml文件工具类呢 代码如下: package com.common.base.utils.base; import com.common.base.generator.ResourceManager; import org.yaml.snakeyaml.Yaml; import java.io.InputStream; import java.util.HashMap; import java.util.Map; /** * yml文件工具类 */ p

  • Spring Boot实战解决高并发数据入库之 Redis 缓存+MySQL 批量入库问题

    目录 前言 架构设计 代码实现 测试 总结 前言 最近在做阅读类的业务,需要记录用户的PV,UV: 项目状况:前期尝试业务阶段: 特点: 快速实现(不需要做太重,满足初期推广运营即可)快速投入市场去运营 收集用户的原始数据,三要素: 谁在什么时间阅读哪篇文章 提到PV,UV脑海中首先浮现特点: 需要考虑性能(每个客户每打开一篇文章进行记录)允许数据有较小误差(少部分数据丢失) 架构设计 架构图: 时序图 记录基础数据MySQL表结构 CREATE TABLE `zh_article_count`

  • spring boot中的properties参数配置详解

    application.properties application.properties是spring boot默认的配置文件,spring boot默认会在以下两个路径搜索并加载这个文件 src\main\resources src\main\resources\config 配置系统参数 在application.properties中可配置一些系统参数,spring boot会自动加载这个参数到相应的功能,如下 #端口,默认为8080 server.port=80 #访问路径,默认为/

  • 在Spring Boot中使用Spring-data-jpa实现分页查询

    在我们平时的工作中,查询列表在我们的系统中基本随处可见,那么我们如何使用jpa进行多条件查询以及查询列表分页呢?下面我将介绍两种多条件查询方式. 1.引入起步依赖   <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency&

  • spring boot中使用@Async实现异步调用任务

    什么是"异步调用"? "异步调用"对应的是"同步调用",同步调用指程序按照定义顺序依次执行,每一行程序都必须等待上一行程序执行完成之后才能执行:异步调用指程序在顺序执行时,不等待异步调用的语句返回结果就执行后面的程序.  同步调用 下面通过一个简单示例来直观的理解什么是同步调用: 定义Task类,创建三个处理函数分别模拟三个执行任务的操作,操作消耗时间随机取(10秒内) package com.kfit.task; import java.uti

  • Spring Boot中使用RabbitMQ的示例代码

    很久没有写Spring Boot的内容了,正好最近在写Spring Cloud Bus的内容,因为内容会有一些相关性,所以先补一篇关于AMQP的整合. Message Broker与AMQP简介 Message Broker是一种消息验证.传输.路由的架构模式,其设计目标主要应用于下面这些场景: 消息路由到一个或多个目的地 消息转化为其他的表现方式 执行消息的聚集.消息的分解,并将结果发送到他们的目的地,然后重新组合相应返回给消息用户 调用Web服务来检索数据 响应事件或错误 使用发布-订阅模式

  • spring boot中使用RabbitMQ routing路由详解

    在上一个教程中我们创建了一个扇形(fanout)交换器.我们能把消息已广播的形式传递给多个消费者. 要做什么?Routing 路由 在这个教程中,添加一个新的特性,我们可以只订阅消息的一部分.例如,将只连接我们感兴趣的颜色("orange", "black", "green"),并且把消息全部打印在控制台上. 绑定 交换器和队列是一种绑定关系.简单的理解为:队列对来自这个交换器中的信息感兴趣. 绑定可以加上一个额外的参数routingKey.Sp

  • Spring Boot中的那些条件判断的实现方法

    Spring Boot中的那些Conditional spring boot中为我们提供了丰富的Conditional来让我们得以非常方便的在项目中向容器中添加Bean.本文主要是对各个注解进行解释并辅以代码说明其用途. 所有ConditionalOnXXX的注解都可以放置在class或是method上,如果方式在class上,则会决定该class中所有的@Bean注解方法是否执行. @Conditional 下面其他的Conditional注解均是语法糖,可以通过下面的方法自定义Conditi

随机推荐