SpringBoot Kafka 整合使用及安装教程

前提

假设你了解过 SpringBoot 和 Kafka。

1、SpringBoot

如果对 SpringBoot 不了解的话,建议去看看 DD 大佬 和 纯洁的微笑 的系列博客。

2、Kafka

Kafka 的话可以看看我前两天写的博客 : Kafka 安装及快速入门 学习的话自己开台虚拟机自己手动搭建环境吧,有条件的买服务器。

注意:一定要亲自自己安装实践,接下来我们将这两个进行整合。

创建项目

项目整体架构:

使用 IDEA 创建 SpringBoot 项目,这个很简单了,这里不做过多的讲解。

1、pom 文件代码如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>com.zhisheng</groupId>
 <artifactId>kafka-learning</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <packaging>jar</packaging>
 <name>kafka-learning</name>
 <description>Demo project for Spring Boot + kafka</description>
 <parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>1.5.9.RELEASE</version>
  <relativePath/> <!-- lookup parent from repository -->
 </parent>
 <properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  <java.version>1.8</java.version>
 </properties>
 <dependencies>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
   <groupId>org.projectlombok</groupId>
   <artifactId>lombok</artifactId>
   <optional>true</optional>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>1.1.1.RELEASE</version>
  </dependency>
  <dependency>
   <groupId>com.google.code.gson</groupId>
   <artifactId>gson</artifactId>
   <version>2.8.2</version>
  </dependency>
 </dependencies>
 <build>
  <plugins>
   <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
   </plugin>
  </plugins>
 </build>
</project>

主要引入了 spring-kafka 、lombok 、 gson 依赖。

2、消息实体类 Message.java 如下:

@Datapublic class Message { private Long id; //id
 private String msg; //消息
 private Date sendTime; //时间戳
}

3、消息发送类 KafkaSender.java

@Component
@Slf4jpublic
class KafkaSender {

 @Autowired
 private KafkaTemplate<String, String> kafkaTemplate;
 private Gson gson = new GsonBuilder().create();

 //发送消息方法
 public void send() {
  Message message = new Message();
  message.setId(System
    .currentTimeMillis());
  message.setMsg(UUID.randomUUID().toString());
  message.setSendTime(new Date())
  ;
  log.info("+++++++++++++++++++++ message = {}", gson.toJson(message));
  kafkaTemplate.send
    ("zhisheng", gson.toJson(message));
 }
}

就这样,发送消息代码就实现了。

这里关键的代码为 kafkaTemplate.send() 方法, zhisheng 是 Kafka 里的 topic ,这个 topic 在 Java 程序中是不需要提前在 Kafka 中设置的,因为它会在发送的时候自动创建你设置的 topic, gson.toJson(message) 是消息内容,这里暂时先说这么多了,不详解了,后面有机会继续把里面源码解读写篇博客出来(因为中途碰到坑,老子跟了几遍源码)。

4、消息接收类 KafkaReceiver.java

@Component
@Slf4jpublic
class KafkaReceiver {
 @KafkaListener(topics = {"zhisheng"})
 public void listen(ConsumerRecord<?, ?> record) {
  Optional<?> kafkaMessage = Optional.ofNullable(record.value());
  if (kafkaMessage.isPresent()) {
   Object message = kafkaMessage.get();
   log.info("----------------- record =" + record);
   log.info("------------------ message =" + message);
  }
 }
}

客户端 consumer 接收消息特别简单,直接用 @KafkaListener 注解即可,并在监听中设置监听的 topic , topics 是一个数组所以是可以绑定多个主题的,上面的代码中修改为 @KafkaListener(topics={"zhisheng","tian"}) 就可以同时监听两个 topic 的消息了。需要注意的是:这里的 topic 需要和消息发送类 KafkaSender.java 中设置的 topic 一致。

5、启动类 KafkaApplication.java

@SpringBootApplicationpublic
class KafkaApplication {
 public static void main(String[] args) {
  ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);
  KafkaSender sender = context.getBean(KafkaSender.class);
  for (int i = 0; i < 3; i++) {
   //调用消息发送类中的消息发送方法
   sender.send();
   try {
    Thread.sleep(3000);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }
 }
}

6、配置文件 application.properties

#============== kafka ===================# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.153.135:9092
##=============== provider =======================
#spring.kafka.producer.retries=0# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384spring.kafka.producer.buffer-memory=33554432
## 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
##=============== consumer =======================# 指定默认消费者group id
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
## 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.bootstrap-servers 后面设置你安装的 Kafka 的机器 IP 地址和端口号 9092。

如果你只是简单整合下,其他的几个默认就好了。

Kafka 设置

在你安装的 Kafka 目录文件下:

启动 zk

使用安装包中的脚本启动单节点 Zookeeper 实例:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

启动 Kafka 服务

使用 kafka-server-start.sh 启动 kafka 服务:

bin/kafka-server-start.sh config/server.properties

启动成功后!

千万注意:记得将你的虚拟机或者服务器关闭防火墙或者开启 Kafka 的端口 9092。

运行

出现这就代表整合成功了!

--------------------------------------------------------------------------------

我们看下 Kafka 中的 topic 列表就

bin/kafka-topics.sh --list --zookeeper localhost:2181

就会发现刚才我们程序中的 zhisheng 已经自己创建了。

总结

以上所述是小编给大家介绍的SpringBoot Kafka 整合使用及安装教程,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对我们网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!

(0)

相关推荐

  • 在springboot中对kafka进行读写的示例代码

    springboot对kafka的client很好的实现了集成,使用非常方便,本文也实现了一个在springboot中实现操作kafka的demo. 1.POM配置 只需要在dependencies中增加 spring-kafka的配置即可.完整效果如下: <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifa

  • springboot 1.5.2 集成kafka的简单例子

    本文介绍了springboot 1.5.2 集成kafka的简单例子 ,分享给大家,具体如下: 随着spring boot 1.5版本的发布,在spring项目中与kafka集成更为简便. 添加依赖 compile("org.springframework.kafka:spring-kafka:1.1.2.RELEASE") 添加application.properties #kafka # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-server

  • Springboot集成Kafka实现producer和consumer的示例代码

    本文介绍如何在springboot项目中集成kafka收发message. Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能.高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息.支持通过Kafka服务器和消费机集群来分区消息.支持Hadoop并行数据加载. 安装Kafka 因为安装kafka需要zookeeper的支持,所以Windows安装时需要将zookee

  • SpringBoot Kafka 整合使用及安装教程

    前提 假设你了解过 SpringBoot 和 Kafka. 1.SpringBoot 如果对 SpringBoot 不了解的话,建议去看看 DD 大佬 和 纯洁的微笑 的系列博客. 2.Kafka Kafka 的话可以看看我前两天写的博客 : Kafka 安装及快速入门 学习的话自己开台虚拟机自己手动搭建环境吧,有条件的买服务器. 注意:一定要亲自自己安装实践,接下来我们将这两个进行整合. 创建项目 项目整体架构: 使用 IDEA 创建 SpringBoot 项目,这个很简单了,这里不做过多的讲

  • Linux下Kafka分布式集群安装教程

    Kafka(http://kafka.apache.org/) 是由 LinkedIn 使用 Scala 编写的一个分布式消息系统,用作 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础,具有高水平扩展和高吞吐量.Spack.Elasticsearch 都支持与 Kafka 集成.下面看一下几种分布式开源消息队列系统的对比: Kafka 集群架构: 一般不建议直接使用 Kafka 自带的 Zookeeper 建立 zk 集群,这里我们使用独

  • SpringBoot整合minio快速入门教程(代码示例)

    分享一个快速使用springboot整合minio实现文件上传和下载的示例.前提是已经安装并运行minio服务,参考 minio快速入门文档 首先添加Minio的依赖 <dependency> <groupId>io.minio</groupId> <artifactId>minio</artifactId> <version>3.0.10</version> </dependency> 然后写一个contro

  • Springboot整合Redis的详细教程分享

    目录 1.Docker 安装 Redis 1.1 下载镜像 1.2 创建配置文件 1.3 启动Redis 1.4 进入Redis容器 1.5 redis 可视化工具 2.SpringBoot 整合Redis缓存 2.1 安装Redis 2.2 引入依赖 2.3 配置Redis地址端口 2.4 测试 1.Docker 安装 Redis 1.1 下载镜像 docker pull redis:6.2.6 1.2 创建配置文件 mkdir -p /mydata/redis/conf touch /myd

  • tomcat9 下载安装和配置+整合到eclipse的教程详解

    tomcat 官网 tomcat 相当于本地服务器,可以打开网页 下载到设置的位置,到此下载完成. 安装 1.解压下载的安装包 2.环境变量的配置,选择我的电脑,右键依次 属性–>高级–>环境变量–>系统变量,添加对CATALINA_HOME变量 对Path系统变量添加变量值 %CATALINA_HOME%\bin;%CATALINA_HOME%\lib\servlet-api.jar;%CATALINA_HOME%\lib\jsp-api.jar; 3.添加用户,进入D:\tomcat

  • SpringBoot整合MyBatis超详细教程

    1.整合MyBatis操作 前面一篇提到了SpringBoot整合基础的数据源JDBC.Druid操作,实际项目中更常用的还是MyBatis框架,而SpringBoot整合MyBatis进行CRUD也非常方便. 下面从配置模式.注解模式.混合模式三个方面进行说明MyBatis与SpringBoot的整合. 1.1.配置模式 MyBatis配置模式是指使用mybatis配置文件的方式与SpringBoot进行整合,相对应的就有mybatis-config.xml(用于配置驼峰命名,也可以省略这个文

  • IDEA整合jeesite4.x及安装教程

    Jeesite4.x安装教程(自学记录) 其中配置maven.jdk.mysql的操作都没有做备注 1.官方文档链接 https://jeesite.gitee.io/docs/install-deploy/ 2.下载代码到本地 git clone https://gitee.com/thinkgem/jeesite4.git 3.idea导入jeesite4工程 4.配置maven,以及下载相关依赖jar包(第一次时间比较长,耐心等待) 5.配置jdk (官方推荐jdk1.8版本) 6.配置数

  • SpringBoot整合mybatis-plus进阶详细教程

    目录 前言 wapper介绍 : 条件构造器 AbstractWrapper 一.什么是AbstractWrapper 二.QueryWrapper(LambdaQueryWrapper) 1.QueryWrapper用法示例 2.LambdaQueryWrapper用法示例 三.UpdateWrapper(LambdaUpdateWrapper) 1.UpdateWrapper用法示例 2.LambdaUpdateWrapper用法示例 mybatis-plus的插件 一.分页插件 1.配置分

  • SpringBoot中整合Minio文件存储的安装部署过程

    目录 背景 Minio安装部署 配置pom文件 配置yml文件 Minio工具类 初始化client 上传文件 下载文件 删除文件 背景 公司的开发框架集成了附件本地存储,阿里云,华为云等,现项目有要求附件存储与应用部署环境不能是同一台服务器,也不能使用云存储,经过技术选型后决定框架整合minio,将minio部署在另一台服务器开通外网端口即可解决问题 Minio安装部署 下载minio安装部署包,创建对应配置文件,这里提供一个整合后的压缩包 下载地址:http://xiazai.jb51.ne

  • 在SpringBoot中整合使用Netty框架的详细教程

    Netty是一个非常优秀的Socket框架.如果需要在SpringBoot开发的app中,提供Socket服务,那么Netty是不错的选择. Netty与SpringBoot的整合,我想无非就是要整合几个地方 让netty跟springboot生命周期保持一致,同生共死 让netty能用上ioc中的Bean 让netty能读取到全局的配置 整合Netty,提供WebSocket服务 这里演示一个案例,在SpringBoot中使用Netty提供一个Websocket服务. servlet容器本身提

随机推荐