在Spring Boot应用程序中使用Apache Kafka的方法步骤详解

第1步:生成我们的项目: Spring Initializr来生成我们的项目。我们的项目将提供Spring MVC / Web支持和Apache Kafka支持。

第2步:发布/读取Kafka主题中的消息:

<b>public</b> <b>class</b> User {
  <b>private</b> String name;
  <b>private</b> <b>int</b> age;
  <b>public</b> User(String name, <b>int</b> age) {
    <b>this</b>.name = name;
    <b>this</b>.age = age;
  }
}

第3步:通过application.yml配置文件配置Kafka:

我们需要创建配置文件。我们需要以某种方式配置我们的Kafka生产者和消费者,以便能够发布和读取与主题相关的消息。相比建立一个使用@Configuration标注的Java类,我们可以直接使用配置文件application.properties或application.yml。Spring Boot让我们避免像过去一样编写的所有样板代码,同时为我们提供了更加智能的配置应用程序的方法,如下所示:

server: port: 9000
spring:
  kafka:
   consumer:
    bootstrap: localhost:9092
    group-id: group_id
    auto-offset-reset: earliest
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
    bootstrap: localhost:9092
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer

第4步:创建一个生产者,创建生产者会将我们的消息写入该主题。

<b>public</b> <b>class</b> Producer {
  <b>private</b> <b>static</b> <b>final</b> Logger logger = LoggerFactory.getLogger(Producer.<b>class</b>);
  <b>private</b> <b>static</b> <b>final</b> String TOPIC = <font>"users"</font><font>;
  @Autowired
  <b>private</b> KafkaTemplate<String, String> kafkaTemplate;
  <b>public</b> <b>void</b> sendMessage(String message) {
    logger.info(String.format(</font><font>"#### -> Producing message -> %s"</font><font>, message));
    <b>this</b>.kafkaTemplate.send(TOPIC, message);
  }
}
</font>

自动连接autowireKafkaTemplate ,使用它将消息发布到主题 - 这就是消息的生产者!

第5步:创建一个消费者,消费者是负责根据您自己的业务逻辑的需求阅读处理消息的消息的服务。要进行设置,请输入以下内容:

@Service
<b>public</b> <b>class</b> Consumer {

  <b>private</b> <b>final</b> Logger logger = LoggerFactory.getLogger(Producer.<b>class</b>);

  @KafkaListener(topics = <font>"users"</font><font>, groupId = </font><font>"group_id"</font><font>)
  <b>public</b> <b>void</b> consume(String message) throws IOException {
    logger.info(String.format(</font><font>"#### -> Consumed message -> %s"</font><font>, message));
  }
}
</font>

在这里,我们告诉我们的方法void consume(String message)订阅用户的主题,并将每条消息发送到应用程序日志。在您的实际应用程序中,您可以按照业务需要的方式处理消息。

第6步:创建REST控制器,们已经拥有了能够消费Kafka消息所需的全部内容。

为了充分展示我们创建的所有内容的工作原理,我们需要创建一个具有单一端点的控制器。消息将发布到此端点,然后由我们的生产者处理。然后,我们的消费者将通过登录到控制台来捕获并处理它。

@RestController
@RequestMapping(value = <font>"/kafka"</font><font>)
<b>public</b> <b>class</b> KafkaController {

  <b>private</b> <b>final</b> Producer producer;

  @Autowired
  KafkaController(Producer producer) {
    <b>this</b>.producer = producer;
  }

  @PostMapping(value = </font><font>"/publish"</font><font>)
  <b>public</b> <b>void</b> sendMessageToKafkaTopic(@RequestParam(</font><font>"message"</font><font>) String message) {
    <b>this</b>.producer.sendMessage(message);
  }
}
</font>

让我们使用cURL将消息发送给Kafka:

curl -X POST -F 'message=test' http://localhost:9000/kafka/publish

基本上就是这样!在不到10个步骤中,您了解了将Apache Kafka添加到Spring Boot项目是多么容易。如果您遵循本指南,您现在知道如何将Kafka集成到Spring Boot项目中,并且您已准备好使用这个超级工具!

总结

以上所述是小编给大家介绍的在Spring Boot应用程序中使用Apache Kafka的方法步骤详解,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对我们网站的支持!

(0)

相关推荐

  • 详解使用spring boot admin监控spring cloud应用程序

    Spring Boot提供的监控接口,例如:/health./info等等,实际上除了之前提到的信息,还有其他信息业需要监控:当前处于活跃状态的会话数量.当前应用的并发数.延迟以及其他度量信息. 最近在找一个spring cloud的监控组件,要求粒度要到每一个接口的,hystrix dashboard显然不适合,也不是这个应用场景.后来发现了spring boot admin这个神器,可以注册到Eureka和spring cloud无缝整合,页面AngularJS写的还算凑合,里面包含有许多功

  • 在springboot中对kafka进行读写的示例代码

    springboot对kafka的client很好的实现了集成,使用非常方便,本文也实现了一个在springboot中实现操作kafka的demo. 1.POM配置 只需要在dependencies中增加 spring-kafka的配置即可.完整效果如下: <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifa

  • Springboot集成Kafka实现producer和consumer的示例代码

    本文介绍如何在springboot项目中集成kafka收发message. Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能.高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息.支持通过Kafka服务器和消费机集群来分区消息.支持Hadoop并行数据加载. 安装Kafka 因为安装kafka需要zookeeper的支持,所以Windows安装时需要将zookee

  • spring boot整合spring-kafka实现发送接收消息实例代码

    前言 由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka. 其实只是做数据的同步,我觉得选MQ没必要使用kafka.首先数据量不大,其实搞kafka又要搞集群,ZK.只是用做一些简单数据同步的话,有点大材小用. 没办法,咱只是个打工的,领导让搞就搞吧.刚开始的时候发现有一个spring-integration-kafka,描述中说是基于spring-kafka做了一次重写.但是我看了官方文档.实在是搞的有点头大

  • spring boot与kafka集成的简单实例

    本文介绍了spring boot与kafka集成的简单实例,分享给大家,具体如下: 引入相关依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.

  • SpringBoot快速构建应用程序方法介绍

    首先介绍一下SpringBoot在Coding上的使用场景.Coding中使用SpringBoot搭建的Email服务. 之所以选择SpringBoot,原因就是更加轻量级.在往常的Spring项目中,依赖的库太多,配置太繁杂,用在这只提供Email服务的程序上未免也小题大做了.而SpringBoot提供一些非功能性的常见的大型项目类特性(如内嵌服务器.安全.度量.健康检查.外部化配置)使得我们在部署上也更为方便,如可以直接地内嵌Tomcat/Jetty(不需要单独去部署war包) 1.Spri

  • Spring boot集成Kafka+Storm的示例代码

    前言 由于业务需求需要把Strom与kafka整合到spring boot项目里,实现其他服务输出日志至kafka订阅话题,storm实时处理该话题完成数据监控及其他数据统计,但是网上教程较少,今天想写的就是如何整合storm+kafka 到spring boot,顺带说一说我遇到的坑. 使用工具及环境配置 ​ 1. java 版本jdk-1.8 ​ 2. 编译工具使用IDEA-2017 ​ 3. maven作为项目管理 ​ 4.spring boot-1.5.8.RELEASE 需求体现 1.

  • 在Spring Boot应用程序中使用Apache Kafka的方法步骤详解

    第1步:生成我们的项目: Spring Initializr来生成我们的项目.我们的项目将提供Spring MVC / Web支持和Apache Kafka支持. 第2步:发布/读取Kafka主题中的消息: <b>public</b> <b>class</b> User { <b>private</b> String name; <b>private</b> <b>int</b> age

  • 使用Spring-Retry解决Spring Boot应用程序中的重试问题

    目录 1.背景 2.引入依赖 3.开启spring-retry 4.在方法上添加@Retryable 5.重试完 6.注意事项 1.背景 在日常开发过程中,难免会与第三方接口发生交互,例如:远程服务调用当正常调用发生异常时,例如:网络抖动等短时间内的临时问题,程序为了更加健壮并且更不容易出现故障.需要重新触发业务操作,以防止间歇性的异常对程序照成的影响.这个可以就可以用重试来解决. 2.引入依赖 <!--因为spring-retry是基于aop实现,所以需要引入aop--> <depen

  • Java中Spring Boot+Socket实现与html页面的长连接实例详解

    Spring Boot+Socket实现与html页面的长连接,客户端给服务器端发消息,服务器给客户端轮询发送消息,附案例源码 功能介绍 客户端给所有在线用户发送消息客户端给指定在线用户发送消息服务器给客户端发送消息(轮询方式) 注意:socket只是实现一些简单的功能,具体的还需根据自身情况,代码稍微改造下 项目搭建 项目结构图 pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xml

  • 教你在Spring Boot微服务中集成gRPC通讯的方法

    一.首先声明gRPC接口 这里引入的是最新的gRpc-core 1.37版本, 采用的grcp-spring-boot-starter封装的版本进行实现,github地址: https://github.com/yidongnan/grpc-spring-boot-starter 要实现gRpc通讯, 先定义接口以及入参出参信息 syntax = "proto3"; option java_multiple_files = true; option java_package = &qu

  • Spring Boot 定制与优化内置的Tomcat容器实例详解

    1.Spring Boot 定制与优化内置Tomcat容器. > 内置的容器有三个分别是Undertow.Jetty.Tomcat,Spring Boot 对这三个容器分别进行了实现,它们上层接口都是EmbeddedServletContainerFactory,该接口也是本文的主要核心. 对于内置容器的定制与优化主要有两种方式,第一种方式是通过配置文件来配置,另外一种是通过码代码的方式.接下来主要对上述两种方式进行实现. 2.通过配置文件来定制与优化Tomcat > 配置的核心内容参考org

  • Spring boot jpa 删除数据和事务管理的问题实例详解

    今天我们介绍的是jpa删除和事务的一些坑,接下来看看具体内容. 业务场景(这是一个在线考试系统)和代码:根据问题的id删除答案 repository层: int deleteByQuestionId(Integer questionId); service 层: public void deleteChoiceAnswerByQuestionId(Integer questionId) { choiceAnswerRepository.deleteByQuestionId(questionId)

  • 微信小程序中的列表切换功能实例代码详解

    感觉这列表切换有点类似于轮播图,而且感觉这代码直接可以拿来用,稍微改一改样式什么的就OK了,列表切换也是用到的地方也很多 wxml中的代码如下: <!-- 标签页面标题 --> <view class="tab"> <view class="tab-item {{tab==0?'active':''}}" bindtap="changeItem" data-item="0">音乐推荐<

  • apache zookeeper使用方法实例详解

    本文涉及了Apache Zookeeper使用方法实例详解的相关知识,接下来我们就看看具体内容. 简介 Apache Zookeeper 是由 Apache Hadoop 的 Zookeeper 子项目发展而来,现在已经成为了 Apache 的顶级项目.Zookeeper 为分布式系统提供了高效可靠且易于使用的协同服务,它可以为分布式应用提供相当多的服务,诸如统一命名服务,配置管理,状态同步和组服务等. Zookeeper 接口简单,开发人员不必过多地纠结在分布式系统编程难于处理的同步和一致性问

  • 如何在Linux操作系统下安装Apache服务的方法实例详解

    链接下载: 操作环境 VMware虚拟机中CentOS 7.6 SecureCRT Xftp(Xmanager) 需求分析 使用Apache服务实现访问http 操作步骤 1.挂载光盘 [root@localhost ~]# mount /dev/cdrom /mnt 查看是否挂载 [root@localhost ~]# df -Th 2.从源码包编译安装程序 (编译安装) [root@localhost Packages]# yum -y install gcc gcc-c++ make 3.

  • Oracle表中重复数据去重的方法实例详解

    Oracle表中重复数据去重的方法实例详解 我们在项目中肯定会遇到一种情况,就是表中没有主键 有重复数据 或者有主键 但是部分字段有重复数据 而我们需要过滤掉重复数据 下面是一种解决方法 delete from mytest ms where rowid in (select aa.rid from (select rowid as rid, row_number() over(partition by s.name order by s.id) as nu from mytest s) aa

随机推荐