关于spring boot整合kafka+注解方式

目录
  • spring boot自动配置方式整合
    • spring boot自动配置的不足
  • spring boot下手动配置kafka
    • 批量消费消息
  • spring boot整合kafka报错

spring boot自动配置方式整合

spring boot具有许多自动化配置,对于kafka的自动化配置当然也包含在内,基于spring boot自动配置方式整合kafka,需要做以下步骤。

引入kafka的pom依赖包

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.2.RELEASE</version>
</dependency>

在配置文件中配置kafka相关属性配置,分别配置生产者和消费者的属性,在程序启动时,spring boot框架会自动读取这些配置的属性,创建相关的生产者、消费者等。下面展示一个简单的配置。

#kafka默认消费者配置
spring.kafka.consumer.bootstrap-servers=192.168.0.15:9092
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
#kafka默认生产者配置
spring.kafka.producer.bootstrap-servers=192.168.0.15:9092
spring.kafka.producer.acks=-1
spring.kafka.client-id=kafka-producer
spring.kafka.producer.batch-size=5

当然,在实际生产中的配置肯定比上面的配置复杂,需要一些定制化的操作,那么spring boot的自动化配置创建的生产者或者消费者都不能满足我们时,应该需要自定义化相关配置,这个在后续举例,这里先分析自动化配置。

在进行了如上配置之后,需要生产者时,使用方式为下代码所示。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {UserSSOApplication.class})
public class UserSSOApplicationTests {
   @Resource
    //注入kafkatemplete,这个由spring boot自动创建
   KafkaTemplate kafkaTemplate;
   @Test
   public void testKafkaSendMsg() {
       //发送消息
      kafkaTemplate.send("test", 0,12,"1222");
   }
}

消费者的使用注解方式整合,代码如下。

@Component
@Slf4j
public class KafkaMessageReceiver2 {
    //指定监听的topic,当前消费者组id
    @KafkaListener(topics = {"test"}, groupId = "receiver")
    public void registryReceiver(ConsumerRecord<Integer, String> integerStringConsumerRecords) {
        log.info(integerStringConsumerRecords.value());
    }
}

上面是最简单的配置,实现的一个简单例子,如果需要更加定制化的配置,可以参考类

org.springframework.boot.autoconfigure.kafka.KafkaProperties这里面包含了大部分需要的kafka配置。针对配置,在properties文件中添加即可。

spring boot自动配置的不足

上面是依赖spring boot自动化配置完成的整合方式,实际上所有的配置实现都是在org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中完成。可以看出个是依赖于@Configuration完成bean配置,这种配置方式基本能够实现大部分情况,只要熟悉org.springframework.boot.autoconfigure.kafka.KafkaProperties中的配置即可。

但是这种方式还有一个问题,就是org.springframework.boot.autoconfigure.kafka.KafkaProperties中并没有涵盖所有的org.apache.kafka.clients.producer.ProducerConfig中的配置,这就导致某些特殊配置不能依赖spring boot自动创建,需要我们手动创建Producer和comsumer。

@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import(KafkaAnnotationDrivenConfiguration.class)
public class KafkaAutoConfiguration {
   private final KafkaProperties properties;
   private final RecordMessageConverter messageConverter;
   public KafkaAutoConfiguration(KafkaProperties properties,
         ObjectProvider<RecordMessageConverter> messageConverter) {
      this.properties = properties;
      this.messageConverter = messageConverter.getIfUnique();
   }
   @Bean
   @ConditionalOnMissingBean(KafkaTemplate.class)
   public KafkaTemplate<?, ?> kafkaTemplate(
         ProducerFactory<Object, Object> kafkaProducerFactory,
         ProducerListener<Object, Object> kafkaProducerListener) {
      KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(
            kafkaProducerFactory);
      if (this.messageConverter != null) {
         kafkaTemplate.setMessageConverter(this.messageConverter);
      }
      kafkaTemplate.setProducerListener(kafkaProducerListener);
      kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
      return kafkaTemplate;
   }
   @Bean
   @ConditionalOnMissingBean(ConsumerFactory.class)
   public ConsumerFactory<?, ?> kafkaConsumerFactory() {
      return new DefaultKafkaConsumerFactory<>(
            this.properties.buildConsumerProperties());
   }
   @Bean
   @ConditionalOnMissingBean(ProducerFactory.class)
   public ProducerFactory<?, ?> kafkaProducerFactory() {
      DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
            this.properties.buildProducerProperties());
      String transactionIdPrefix = this.properties.getProducer()
            .getTransactionIdPrefix();
      if (transactionIdPrefix != null) {
         factory.setTransactionIdPrefix(transactionIdPrefix);
      }
      return factory;
   }
  //略略略
}

spring boot下手动配置kafka

由于需要对某些特殊配置进行配置,我们可能需要手动配置kafka相关的bean,创建一个配置类如下,类似于

org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration,这里创建了对应类型的bean之后,org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中的对应Bean定义将不起作用。

所有的生产者配置可以参考ProducerConfig类,所有的消费者配置可以参考ConsumerConfig类。

/**
 * kafka配置,实际上,在KafkaAutoConfiguration中已经有默认的根据配置文件信息创建配置,但是自动配置属性没有涵盖所有
 * 我们可以自定义创建相关bean,进行如下配置
 *
 * @author zhoujy
 * @date 2018年12月17日
 **/
@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;
    //构造消费者属性map,ConsumerConfig中的可配置属性比spring boot自动配置要多
    private Map<String, Object> consumerProperties(){
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "activity-service");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        return props;
    }
    /**
     * 不使用spring boot默认方式创建的DefaultKafkaConsumerFactory,重新定义创建方式
     * @return
     */
    @Bean("consumerFactory")
    public DefaultKafkaConsumerFactory consumerFactory(){
        return new DefaultKafkaConsumerFactory(consumerProperties());
    }
    @Bean("listenerContainerFactory")
    //个性化定义消费者
    public ConcurrentKafkaListenerContainerFactory listenerContainerFactory(DefaultKafkaConsumerFactory consumerFactory) {
        //指定使用DefaultKafkaConsumerFactory
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);

 //设置消费者ack模式为手动,看需求设置
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        //设置可批量拉取消息消费,拉取数量一次3,看需求设置
        factory.setConcurrency(3);
        factory.setBatchListener(true);
        return factory;
    }
   /* @Bean
     //代码创建方式topic
    public NewTopic batchTopic() {
        return new NewTopic("topic.quick.batch", 8, (short) 1);
    }*/
    //创建生产者配置map,ProducerConfig中的可配置属性比spring boot自动配置要多
    private Map<String, Object> producerProperties(){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "-1");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 5);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 500);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        return props;
    }
    /**
     * 不使用spring boot的KafkaAutoConfiguration默认方式创建的DefaultKafkaProducerFactory,重新定义
     * @return
     */
    @Bean("produceFactory")
    public DefaultKafkaProducerFactory produceFactory(){
        return new DefaultKafkaProducerFactory(producerProperties());
    }
    /**
     * 不使用spring boot的KafkaAutoConfiguration默认方式创建的KafkaTemplate,重新定义
     * @param produceFactory
     * @return
     */
    @Bean
    public KafkaTemplate kafkaTemplate(DefaultKafkaProducerFactory produceFactory){
        return new KafkaTemplate(produceFactory);
    }
}

生产者的使用方式,跟自动配置一样,直接注入KafkaTemplate即可。主要是消费者的使用有些不同。

批量消费消息

上面的消费者配置配置了一个bean,@Bean(“listenerContainerFactory”),这个bean可以指定为消费者,注解方式中是如下的使用方式。

containerFactory = "listenerContainerFactory"指定了使用listenerContainerFactory作为消费者。

  • 注意registryReceiver中的参数,ConsumerRecord对比之前的消费者,因为设置listenerContainerFactory是批量消费,因此ConsumerRecord是一个List,如果不是批量消费的话,相对应就是一个对象。
  • 注意第二个参数Acknowledgment,这个参数只有在设置消费者的ack应答模式为AckMode.MANUAL_IMMEDIATE才能注入,意思是需要手动ack。
@Component
@Slf4j
public class KafkaMessageReceiver {
    /**
     * listenerContainerFactory设置了批量拉取消息,因此参数是List<ConsumerRecord<Integer, String>>,否则是ConsumerRecord
     * @param integerStringConsumerRecords
     * @param acknowledgment
     */
    @KafkaListener(topics = {"test"}, containerFactory = "listenerContainerFactory")
    public void registryReceiver(List<ConsumerRecord<Integer, String>> integerStringConsumerRecords, Acknowledgment acknowledgment) {
        Iterator<ConsumerRecord<Integer, String>> it = integerStringConsumerRecords.iterator();
        while (it.hasNext()){
            ConsumerRecord<Integer, String> consumerRecords = it.next();
            //dosome
             acknowledgment.acknowledge();
        }
    }
}

如果不想要批量消费消息,那就可以另外定义一个bean类似于@Bean(“listenerContainerFactory”),如下,只要不设置批量消费即可。

@Bean("listenerContainerFactory2")
    //个性化定义消费者
    public ConcurrentKafkaListenerContainerFactory listenerContainerFactory2(DefaultKafkaConsumerFactory consumerFactory) {
        //指定使用DefaultKafkaConsumerFactory
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);

 //设置消费者ack模式为手动,看需求设置
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

spring boot整合kafka报错

Timeout expired while fetching topic metadata

这种报错应检查 kafka连接问题,服务是否启动,端口是否正确。

Kafka Producer error Expiring 10 record(s) for TOPIC:XXXXXX: xxx ms has passed since batch creation plus linger time

这种报错要考虑kafka和spring对应的版本问题,我的springboot 2.1.2在使用kafka_2.12-2.1.1时出现此问题,将kafka版本换为2.11-1.1.1后,问题解决。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • Spring boot 整合KAFKA消息队列的示例

    这里使用 spring-kafka 依赖和 KafkaTemplate 对象来操作 Kafka 服务. 一.添加依赖和添加配置项 1.1.在 Pom 文件中添加依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 1.2.添加配置项 spring: kafka: b

  • spring boot整合kafka过程解析

    这篇文章主要介绍了spring boot整合kafka过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.启动kafka 启动kafka之前一定要启动zookeeper,因为要使用kafka必须要使用zookeeper. windows环境下启动,直接使用kafka自带的zookeeper: E:\kafka_2.12-2.4.0\bin\windows zookeeper-server-start.bat ..\..\config\z

  • SpringBoot集成Kafka的步骤

    SpringBoot集成Kafka 本篇主要讲解SpringBoot 如何集成Kafka ,并且简单的 编写了一个Demo 来测试 发送和消费功能 前言 选择的版本如下: springboot : 2.3.4.RELEASE spring-kafka : 2.5.6.RELEASE kafka : 2.5.1 zookeeper : 3.4.14 本Demo 使用的是 SpringBoot 比较高的版本 SpringBoot 2.3.4.RELEASE 它会引入 spring-kafka 2.5

  • Spring Boot 集成 Kafkad的实现示例

    Spring Boot 作为主流微服务框架,拥有成熟的社区生态.市场应用广泛,为了方便大家,整理了一个基于spring boot的常用中间件快速集成入门系列手册,涉及RPC.缓存.消息队列.分库分表.注册中心.分布式配置等常用开源组件,大概有几十篇文章,陆续会开放出来,感兴趣同学请提前关注&收藏 消息通信有两种基本模型,即发布-订阅(Pub-Sub)模型和点对点(Point to Point)模型,发布-订阅支持生产者消费者之间的一对多关系,而点对点模型中有且仅有一个消费者. 前言 Kafka是

  • 关于spring boot整合kafka+注解方式

    目录 spring boot自动配置方式整合 spring boot自动配置的不足 spring boot下手动配置kafka 批量消费消息 spring boot整合kafka报错 spring boot自动配置方式整合 spring boot具有许多自动化配置,对于kafka的自动化配置当然也包含在内,基于spring boot自动配置方式整合kafka,需要做以下步骤. 引入kafka的pom依赖包 <!-- https://mvnrepository.com/artifact/org.s

  • 使用spring boot 整合kafka,延迟启动消费者

    spring boot 整合kafka,延迟启动消费者 spring boot整合kafka的时候一般使用@KafkaListener来设置消费者,但是这种方式在spring启动的时候就会立即开启消费者.如果有需要根据配置信息延迟开启指定的消费者就不能使用这种方式. 参考了类:KafkaListenerAnnotationBeanPostProcessor,我提取了一部分代码.可以根据需要随时动态的开启消费者.还可以很方便的启动多个消费者. 为了方便使用,我自定义了一个注解: import or

  • Spring Boot整合Kafka教程详解

    目录 正文 步骤一:添加依赖项 步骤二:配置 Kafka 步骤三:创建一个生产者 步骤四:创建一个消费者 正文 本教程将介绍如何在 Spring Boot 应用程序中使用 Kafka.Kafka 是一个分布式的发布-订阅消息系统,它可以处理大量数据并提供高吞吐量. 在本教程中,我们将使用 Spring Boot 2.5.4 和 Kafka 2.8.0. 步骤一:添加依赖项 在 pom.xml 中添加以下依赖项: <dependency> <groupId>org.springfra

  • Spring Boot整合mybatis使用注解实现动态Sql、参数传递等常用操作(实现方法)

    前面介绍了Spring Boot 整合mybatis 使用注解的方式实现数据库操作,介绍了如何自动生成注解版的mapper 和pojo类. 接下来介绍使用mybatis 常用注解以及如何传参数等数据库操作中的常用操作. 其实,mybatis 注解方式 和 XML配置方式两者的使用基本上相同,只有在构建 SQL 脚本有所区别,所以这里重点介绍两者之间的差异,以及增删改查,参数传递等注解的常用操作. 详解SpringBoot 快速整合Mybatis(去XML化+注解进阶)已经介绍过了,不清楚的朋友可

  • Spring Boot 整合 Mybatis Annotation 注解的完整 Web 案例

    前言 距离第一篇 Spring Boot 系列的博文 3 个月了.虽然 XML 形式是我比较推荐的,但是注解形式也是方便的.尤其一些小系统,快速的 CRUD 轻量级的系统. 这里感谢晓春 http://xchunzhao.tk/ 的 Pull Request,提供了 springboot-mybatis-annotation 的实现. 一.运行 springboot-mybatis-annotation 工程 然后Application 应用启动类的 main 函数,然后在浏览器访问: http

  • Spring Boot Web 开发注解篇

    一.spring-boot-starter-web 依赖概述 在 Spring Boot 快速入门中,只要在 pom.xml 加入了 spring-boot-starter-web 依赖,即可快速开发 web 应用.可见,Spring Boot 极大地简化了 Spring 应用从搭建到开发的过程,做到了「开箱即用」的方式.Spring Boot 已经提供很多「开箱即用」的依赖,如上面开发 web 应用使用的 spring-boot-starter-web ,都是以 spring-boot-sta

  • Spring Boot集成Kafka的示例代码

    本文介绍了Spring Boot集成Kafka的示例代码,分享给大家,也给自己留个笔记 系统环境 使用远程服务器上搭建的kafka服务 Ubuntu 16.04 LTS kafka_2.12-0.11.0.0.tgz zookeeper-3.5.2-alpha.tar.gz 集成过程 1.创建spring boot工程,添加相关依赖: <?xml version="1.0" encoding="UTF-8"?> <project xmlns=&qu

  • spring boot整合Cucumber(BDD)的方法

    本文介绍了spring boot整合Cucumber(BDD)的方法,分享给大家,具体如下: 1.新建一个springboot工程工程结构如下: 2.添加pom依赖 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLS

随机推荐