spring boot整合kafka过程解析

这篇文章主要介绍了spring boot整合kafka过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

一、启动kafka

  启动kafka之前一定要启动zookeeper,因为要使用kafka必须要使用zookeeper。

  windows环境下启动,直接使用kafka自带的zookeeper:

  E:\kafka_2.12-2.4.0\bin\windows zookeeper-server-start.bat ..\..\config\zookeeper.properties

  接下来启动kafka

  E:\kafka_2.12-2.4.0\bin\windows kafka-server-start.bat ..\..\config\server.properties

二、spring boot整合kafka项目实例

1.导入的maven

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>

配置文件:

server.port=80
#kafka地址,可以有多个
spring.kafka.bootstrap-servers=127.0.0.1:9092
#------生产者配置文件---------
#指定kafka消息体和key的编码格式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#设置等待acks返回的机制,有三个值
# 0:不等待返回的acks(可能会丢数据,因为发送消息没有了失败重试机制,但是这是最低延迟)
# 1:消息发送给kafka分区中的leader后就返回(如果follower没有同步完成leader就宕机了,就会丢数据)
# -1(默认):等待所有follower同步完消息后再发送(绝对不会丢数据)
spring.kafka.producer.acks=-1
# 消息累计到batch-size的值后,才会发送消息,默认为16384
spring.kafka.producer.batch-size=16384
#如果kafka迟迟不发送消息(这里指的是消息没堆积到指定数量),那么过了这个时间(单位:毫米)开始发送
spring.kafka.producer.properties.linger.ms=1
#设置缓冲区大小,默认是33554432
#这个缓冲区是kafka中两个线程里的共享变量
#这个两个线程是main和sender,main负责把消息发送到共享变量,sender从共享变量拉数据
spring.kafka.producer.buffer-memory=33554432
#失败重试发送的次数
spring.kafka.producer.retries=2
#------消费者配置文件---------
#指定kafka消息体和key的编码格式
spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#指定消费者组的group_id
spring.kafka.consumer.group-id=kafka_test
#kafka意外宕机时,再次开启消息消费的策略,共有三种策略
#earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
#latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据、
#none:当所有分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.auto-offset-reset=earliest
#自动提交offset
spring.kafka.consumer.enable-auto-commit=true
#提交offset时间间隔
spring.kafka.consumer.auto-commit-interval=100
#消费监听接口监听的主题不存在时,默认会报错因此要关掉这个
spring.kafka.listener.missing-topics-fatal=false

2.创建topic

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 使用代码创建的topic
 * 三个参数意思:topic的名称;分区数量,新主题的复制因子;如果指定了副本分配,则为-1。
 */
@Configuration
public class KafkaTopic {

   @Bean
  public NewTopic batchTopic() {
    return new NewTopic("testTopic", 8, (short) 1);
  }
}

3.生产者代码

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * kafka生产者代码
 */
@RestController
public class ProductorController {

  @Autowired
  private KafkaTemplate<String, String> kafkaTemplate;

  @RequestMapping("/test")
  public String show() {
    kafkaTemplate.send("testTopic", "你好");
    return "发送成功";
  }

}

4.消费者代码

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;

import java.util.Optional;

/**
 * kafka消费者代码
 */
@Configuration
public class KafkaConsumer {

  @KafkaListener(topics = "testTopic")
  public void consumer(ConsumerRecord consumerRecord){
    Optional<Object> kafkaMassage = Optional.ofNullable(consumerRecord.value());
    if(kafkaMassage.isPresent()){
      Object o = kafkaMassage.get();
      System.out.println("接收到的消息是:"+o);
    }

  }

}

测试结果:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Spring Boot集成Kafka的示例代码

    本文介绍了Spring Boot集成Kafka的示例代码,分享给大家,也给自己留个笔记 系统环境 使用远程服务器上搭建的kafka服务 Ubuntu 16.04 LTS kafka_2.12-0.11.0.0.tgz zookeeper-3.5.2-alpha.tar.gz 集成过程 1.创建spring boot工程,添加相关依赖: <?xml version="1.0" encoding="UTF-8"?> <project xmlns=&qu

  • Spring boot集成Kafka+Storm的示例代码

    前言 由于业务需求需要把Strom与kafka整合到spring boot项目里,实现其他服务输出日志至kafka订阅话题,storm实时处理该话题完成数据监控及其他数据统计,但是网上教程较少,今天想写的就是如何整合storm+kafka 到spring boot,顺带说一说我遇到的坑. 使用工具及环境配置 ​ 1. java 版本jdk-1.8 ​ 2. 编译工具使用IDEA-2017 ​ 3. maven作为项目管理 ​ 4.spring boot-1.5.8.RELEASE 需求体现 1.

  • spring boot与kafka集成的简单实例

    本文介绍了spring boot与kafka集成的简单实例,分享给大家,具体如下: 引入相关依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.

  • Springboot集成Kafka实现producer和consumer的示例代码

    本文介绍如何在springboot项目中集成kafka收发message. Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能.高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息.支持通过Kafka服务器和消费机集群来分区消息.支持Hadoop并行数据加载. 安装Kafka 因为安装kafka需要zookeeper的支持,所以Windows安装时需要将zookee

  • spring boot 与kafka集成的示例代码

    新建spring boot项目 这里使用intellij IDEA 添加kafka集成maven <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLoc

  • 在springboot中对kafka进行读写的示例代码

    springboot对kafka的client很好的实现了集成,使用非常方便,本文也实现了一个在springboot中实现操作kafka的demo. 1.POM配置 只需要在dependencies中增加 spring-kafka的配置即可.完整效果如下: <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifa

  • 在Spring Boot应用程序中使用Apache Kafka的方法步骤详解

    第1步:生成我们的项目: Spring Initializr来生成我们的项目.我们的项目将提供Spring MVC / Web支持和Apache Kafka支持. 第2步:发布/读取Kafka主题中的消息: <b>public</b> <b>class</b> User { <b>private</b> String name; <b>private</b> <b>int</b> age

  • springboot 1.5.2 集成kafka的简单例子

    本文介绍了springboot 1.5.2 集成kafka的简单例子 ,分享给大家,具体如下: 随着spring boot 1.5版本的发布,在spring项目中与kafka集成更为简便. 添加依赖 compile("org.springframework.kafka:spring-kafka:1.1.2.RELEASE") 添加application.properties #kafka # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-server

  • spring boot整合spring-kafka实现发送接收消息实例代码

    前言 由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka. 其实只是做数据的同步,我觉得选MQ没必要使用kafka.首先数据量不大,其实搞kafka又要搞集群,ZK.只是用做一些简单数据同步的话,有点大材小用. 没办法,咱只是个打工的,领导让搞就搞吧.刚开始的时候发现有一个spring-integration-kafka,描述中说是基于spring-kafka做了一次重写.但是我看了官方文档.实在是搞的有点头大

  • spring boot整合kafka过程解析

    这篇文章主要介绍了spring boot整合kafka过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.启动kafka 启动kafka之前一定要启动zookeeper,因为要使用kafka必须要使用zookeeper. windows环境下启动,直接使用kafka自带的zookeeper: E:\kafka_2.12-2.4.0\bin\windows zookeeper-server-start.bat ..\..\config\z

  • Spring Boot 整合 Druid过程解析

    这篇文章主要介绍了Spring Boot 整合 Druid过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 概述 Druid 是阿里巴巴开源平台上的一个项目,整个项目由数据库连接池.插件框架和 SQL 解析器组成.该项目主要是为了扩展 JDBC 的一些限制,可以让程序员实现一些特殊的需求,比如向密钥服务请求凭证.统计 SQL 信息.SQL 性能收集.SQL 注入检查.SQL 翻译等,程序员可以通过定制来实现自己需要的功能. Druid 是

  • Spring boot整合log4j2过程解析

    这篇文章主要介绍了Spring boot整合log4j2过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 以前整合过log4j2,但是今天再次整合发现都忘记了,而且也没有记下来 1.pom.xml中 (1)把spring-boot-starter-web包下面的spring-boot-starter-logging排除 <dependency> <groupId>org.springframework.boot</gr

  • Spring boot整合Mybatis-plus过程解析

    Mybatis初期使用比较麻烦,需要很多配置文件.实体类.dao层映射.还有很多其他的配置.初期开发使用generator可以根据表结构自动生产实体类.dao层代码,这样是可以减轻一部分开发量:后期mybatis进行大量的优化,现在可以使用注解版本,自动管理dao层和配置文件. maven 依赖 注意:本文使用的是mysql,数据库依赖就不展示了 <!-- 引入mvbatie -plus starter--> <dependency> <groupId>com.baom

  • Spring boot 整合KAFKA消息队列的示例

    这里使用 spring-kafka 依赖和 KafkaTemplate 对象来操作 Kafka 服务. 一.添加依赖和添加配置项 1.1.在 Pom 文件中添加依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 1.2.添加配置项 spring: kafka: b

  • 使用spring boot 整合kafka,延迟启动消费者

    spring boot 整合kafka,延迟启动消费者 spring boot整合kafka的时候一般使用@KafkaListener来设置消费者,但是这种方式在spring启动的时候就会立即开启消费者.如果有需要根据配置信息延迟开启指定的消费者就不能使用这种方式. 参考了类:KafkaListenerAnnotationBeanPostProcessor,我提取了一部分代码.可以根据需要随时动态的开启消费者.还可以很方便的启动多个消费者. 为了方便使用,我自定义了一个注解: import or

  • 关于spring boot整合kafka+注解方式

    目录 spring boot自动配置方式整合 spring boot自动配置的不足 spring boot下手动配置kafka 批量消费消息 spring boot整合kafka报错 spring boot自动配置方式整合 spring boot具有许多自动化配置,对于kafka的自动化配置当然也包含在内,基于spring boot自动配置方式整合kafka,需要做以下步骤. 引入kafka的pom依赖包 <!-- https://mvnrepository.com/artifact/org.s

  • Spring Boot整合Kafka教程详解

    目录 正文 步骤一:添加依赖项 步骤二:配置 Kafka 步骤三:创建一个生产者 步骤四:创建一个消费者 正文 本教程将介绍如何在 Spring Boot 应用程序中使用 Kafka.Kafka 是一个分布式的发布-订阅消息系统,它可以处理大量数据并提供高吞吐量. 在本教程中,我们将使用 Spring Boot 2.5.4 和 Kafka 2.8.0. 步骤一:添加依赖项 在 pom.xml 中添加以下依赖项: <dependency> <groupId>org.springfra

  • Spring Boot 整合 Shiro+Thymeleaf过程解析

    这篇文章主要介绍了Spring Boot 整合 Shiro+Thymeleaf过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1.导包 <!-- springboot 与 shiro 的集成--> <dependency> <groupId>org.apache.shiro</groupId> <artifactId>shiro-spring</artifactId> <

  • spring boot整合shiro安全框架过程解析

    这篇文章主要介绍了spring boot整合shiro安全框架过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 题记:在学习了springboot和thymeleaf之后,想完成一个项目练练手,于是使用springboot+mybatis和thymeleaf完成一个博客系统,在完成的过程中出现的一些问题,将这些问题记录下来,作为自己的学习心得.在这先感谢群主TyCoding的Tumo项目,虽然本人实在太菜了,好些地方看不懂,但还是使我受益

随机推荐