springboot+kafka中@KafkaListener动态指定多个topic问题

目录
  • 说明
  • 总结一下大家问的最多的一个问题
  • 终极方法
    • 思路
    • 实现
    • 代码
  • 总结

说明

本项目为springboot+kafak的整合项目,故其用了springboot中对kafak的消费注解@KafkaListener

首先,application.properties中配置用逗号隔开的多个topic。

方法:利用Spring的SpEl表达式,将topics 配置为:@KafkaListener(topics = “#{’${topics}’.split(’,’)}”)

运行程序,console打印的效果如下:

因为只开了一条消费者线程,所以所有的topic和分区都分配给这条线程。

如果你想开多条消费者线程去消费这些topic,添加@KafkaListener注解的参数concurrency的值为自己想要的消费者个数即可(注意,消费者数要小于等于你开的所有topic的分区数总和)

运行程序,console打印的效果如下:

总结一下大家问的最多的一个问题

如何在程序运行的过程中,改变topic,消费者能够消费修改后的topic?

ans: 经过尝试,使用@KafkaListener注解实现不了此需求,在程序启动的时候,程序就会根据@KafkaListener的注解信息初始化好消费者去消费指定好的topic。如果在程序运行的过程中,修改topic,不会让此消费者修改消费者的配置再重新订阅topic的。

不过我们可以有个折中的办法,就是利用@KafkaListener的topicPattern参数来进行topic匹配。

具体如何操作的可以看下这篇文章:

https://www.jb51.net/article/271098.htm

终极方法

思路

不使用@KafkaListener,使用kafka原生客户端依赖,手动初始化消费者,开启消费者线程。

在消费者线程中,每次循环都从配置、数据库或者其他配置源获取最新的topic信息,与之前的topic比较,如果发生变化,重新订阅topic或者初始化消费者。

实现

加入kafka客户端依赖(本次测试服务端kafka版本:2.12-2.4.0)

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.3.0</version>
</dependency>

代码

@Service
@Slf4j
public class KafkaConsumers implements InitializingBean {

    /**
     * 消费者
     */
    private static KafkaConsumer<String, String> consumer;
    /**
     * topic
     */
    private List<String> topicList;

    public static String getNewTopic() {
        try {
            return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 初始化消费者(配置写死是为了快速测试,请大家使用配置文件)
     *
     * @param topicList
     * @return
     */
    public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) {
        //配置信息
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "192.168.9.185:9092");
        //必须指定消费者组
        props.put("group.id", "haha");
        //设置数据key和value的序列化处理类
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        //创建消息者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //订阅topic的消息
        consumer.subscribe(topicList);
        return consumer;
    }

    /**
     * 开启消费者线程
     * 异常请自己根据需求自己处理
     */
    @Override
    public void afterPropertiesSet() {
        // 初始化topic
        topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
        if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) {
            consumer = getInitConsumer(topicList);
            // 开启一个消费者线程
            new Thread(() -> {
                while (true) {
                    // 模拟从配置源中获取最新的topic(字符串,逗号隔开)
                    final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic()));
                    // 如果topic发生变化
                    if (!topicList.equals(newTopic)) {
                        log.info("topic 发生变化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList);
                        // method one:重新订阅topic:
                        topicList = newTopic;
                        consumer.subscribe(newTopic);
                        // method two:关闭原来的消费者,重新初始化一个消费者
                        //consumer.close();
                        //topicList = newTopic;
                        //consumer = getInitConsumer(newTopic);
                        continue;
                    }
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("key:" + record.key() + "" + ",value:" + record.value());
                    }
                }
            }).start();
        }
    }
}

说一下第72行代码:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

上面这行代码表示:在100ms内等待Kafka的broker返回数据.超市参数指定poll在多久之后可以返回,不管有没有可用的数据都要返回。

在修改topic后,必须等到此次poll拉取的消息处理完,while(true)循环的时候检测topic发生变化,才能重新订阅topic.

poll()方法一次拉取得消息数默认为:500,如下图,kafka客户端源码中设置的。

如果想自定义此配置,可在初始化消费者时加入

运行结果(测试的topic中都无数据)

注意:KafkaConsumer是线程不安全的,不要用一个KafkaConsumer实例开启多个消费者,要开启多个消费者,需要new 多个KafkaConsumer实例。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • spring boot整合kafka过程解析

    这篇文章主要介绍了spring boot整合kafka过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.启动kafka 启动kafka之前一定要启动zookeeper,因为要使用kafka必须要使用zookeeper. windows环境下启动,直接使用kafka自带的zookeeper: E:\kafka_2.12-2.4.0\bin\windows zookeeper-server-start.bat ..\..\config\z

  • Springboot集成kafka高级应用实战分享

    目录 深入应用 1.1 springboot-kafka 1.2 消息发送 1.2.1 发送类型 1.2.2 序列化 1.2.3 分区策略 1.3 消息消费 1.3.1 消息组别 1.3.2 位移提交 深入应用 1.1 springboot-kafka 1)配置文件 kafka: bootstrap-servers: 52.82.98.209:10903,52.82.98.209:10904 producer: # producer 生产者 retries: 0 # 重试次数 acks: 1 #

  • Springboot集成Kafka进行批量消费及踩坑点

    目录 引入依赖 创建配置类 Kafka 消费者 引入依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.11.RELEASE</version> </dependency> 因为我的项目的 springboot 版本是 1.5.22.RELE

  • springboot+kafka中@KafkaListener动态指定多个topic问题

    目录 说明 总结一下大家问的最多的一个问题 终极方法 思路 实现 代码 总结 说明 本项目为springboot+kafak的整合项目,故其用了springboot中对kafak的消费注解@KafkaListener 首先,application.properties中配置用逗号隔开的多个topic. 方法:利用Spring的SpEl表达式,将topics 配置为:@KafkaListener(topics = “#{’${topics}’.split(’,’)}”) 运行程序,console打

  • 如何在Java SpringBoot项目中配置动态数据源你知道吗

    目录 首先需要引入第三方依赖 只需要在配置文件中按照如下配置 创建如下两个数据库 entity mapper.xml mapper层 Service层 下面是两个测试方法 下面可以来看一下测试结果: 在我们工作中涉及到一些场景需要我们配置多数据源的操作,之前来说我们配置数据源需要写繁琐的配置类来配置我们的数据源,哪个是默认数据源等等,而现在我们可以使用"苞米豆"为我们提供的提供的第三方工具,只需要简单配置就可以实现多数据源之间的灵活切换了! 首先需要引入第三方依赖 <depend

  • Springboot项目启动时如何用命令动态指定环境

    Springboot 启动用命令指定环境 我们都知道springboot的yml文件可以配置多个环境,可以直接在application.yml中指定使用哪个环境. 例如:指定dev环境 这是在配置文件中写死的. 那么,如何在启动项目的时候动态指定呢? 其实,只需要在启动时多加一个命令就行: java -jar xxx.jar --spring.profiles.active=dev Springboot 启动命令介绍 SpringBoot 启动项目指定命令的三种方式 1.配置文件中添加命令 在

  • Springboot项目中运用vue+ElementUI+echarts前后端交互实现动态圆环图(推荐)

    目录 如何运用vue+echarts前后端交互实现动态饼图 前言 一.环境配置 1.1 安装acharts 1.2 全局引用 二.圆环图前端实现 2.1 先在vue页面添加渲染盒子 2.2 前端逻辑实现部分 2.3 展示(可按自己需求更改前端样式) 三.前后端数据交互实现 3.1 创建数据库 3.2 后台代码的编写 如何运用vue+echarts前后端交互实现动态饼图 前言 我们做项目的时候,常常需要一些统计图来展示我们的数据,作为web开发人员,会实现统计图是我们必会的技能.我将带大家来实现动

  • SpringBoot Kafka 整合使用及安装教程

    前提 假设你了解过 SpringBoot 和 Kafka. 1.SpringBoot 如果对 SpringBoot 不了解的话,建议去看看 DD 大佬 和 纯洁的微笑 的系列博客. 2.Kafka Kafka 的话可以看看我前两天写的博客 : Kafka 安装及快速入门 学习的话自己开台虚拟机自己手动搭建环境吧,有条件的买服务器. 注意:一定要亲自自己安装实践,接下来我们将这两个进行整合. 创建项目 项目整体架构: 使用 IDEA 创建 SpringBoot 项目,这个很简单了,这里不做过多的讲

  • SpringBoot整合MyBatisPlus配置动态数据源的方法

    MybatisPlus特性 •无侵入:只做增强不做改变,引入它不会对现有工程产生影响,如丝般顺滑 •损耗小:启动即会自动注入基本 CURD,性能基本无损耗,直接面向对象操作 •强大的 CRUD 操作:内置通用 Mapper.通用 Service,仅仅通过少量配置即可实现单表大部分 CRUD 操作,更有强大的条件构造器,满足各类使用需求 •支持 Lambda 形式调用:通过 Lambda 表达式,方便的编写各类查询条件,无需再担心字段写错 •支持多种数据库:支持 MySQL.MariaDB.Ora

  • 浅谈springboot项目中定时任务如何优雅退出

    在一个springboot项目中需要跑定时任务处理批数据时,突然有个Kill命令或者一个Ctrl+C的命令,此时我们需要当批数据处理完毕后才允许定时任务关闭,也就是当定时任务结束时才允许Kill命令生效. 启动类 启动类上我们获取到相应的上下文,捕捉相应命令.在这里插入代码片 @SpringBootApplication /**指定mapper对应包的路径*/ @MapperScan("com.youlanw.kz.dao") /**开启计划任务*/ @EnableScheduling

  • 详解SpringBoot配置文件启动时动态配置参数方法

    序言 当我们要同时启用多个项目而又要使用不同端口或者变换配置属性时,我们可以在配置文件中设置${变量名}的变量来获取启动时传入的参数,从而实现了动态配置参数,使启用项目更加灵活 例子 server: port: ${PORT:50101} #服务端口 spring: application: name: xc‐govern‐center #指定服务名 eureka: client: registerWithEureka: true #服务注册,是否将自己注册到Eureka服务中 fetchReg

  • Springboot居然可以设置动态的Banner(推荐)

    先给小火伴们留个悬念 嘿嘿 文末有神秘代码~ (大家可以猜猜用来干嘛滴先!) 嘿嘿 不知道过年这两天大家过得怎样呀~ 比如 春晚抢到了多少红包呀?

  • SpringBoot项目中接口防刷的完整代码

    一.自定义注解 import java.lang.annotation.Retention; import java.lang.annotation.Target; import static java.lang.annotation.ElementType.METHOD; import static java.lang.annotation.RetentionPolicy.RUNTIME; /** * @author Yang * @version 1.0 * @date 2021/2/22

随机推荐