spring boot 与kafka集成的示例代码

新建spring boot项目

这里使用intellij IDEA

添加kafka集成maven

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.example</groupId>
  <artifactId>demo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>demo</name>
  <description>Demo project for Spring Boot</description>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.8.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
</project>

项目中application.properties 添加

spring.kafka.bootstrap-servers=vm208:9092,vm:9092,vm50:9092
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.group-id=local_test
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.acks=1

新建KafkaConsumer消费类

package com.example.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {

  private Logger logger = LoggerFactory.getLogger(this.getClass());

  @KafkaListener(topics = {"test"})
  public void listen(ConsumerRecord<?, ?> record) {
    System.out.printf("offset = %d,key =%s,value=%s\n", record.offset(), record.key(), record.value());
  }
}

启动spring-boot程序,在kafka集群,模拟发送topic,检验接收

代码如下:

bin/kafka-console-producer.sh --broker-list    vm208:9092,vm210:9092,vm50:9092  --topic  test

编写producer代码

package com.example.demo.producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducer {

  @Autowired
  private KafkaTemplate kafkaTemplate;
  String topic="test";
  public void sendMessage(String key,String data){
    kafkaTemplate.send(new ProducerRecord(topic,key,data));
  }
}

建立一个restful模拟发送( //http://localhost:8080/kafka/send.do?key=2&data=allen-test-message)

package com.example.demo.controller;
import com.example.demo.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
  @Autowired
  private KafkaProducer kafkaProducer;
  @RequestMapping(value = "/kafka/send.do", method = RequestMethod.GET)
  public String sendMessage(@RequestParam(value = "key") String key, @RequestParam(value = "data") String data) {
    kafkaProducer.sendMessage(key, data);
    return "sucess";
  }
}

可以发现 spring-kafka大大减少了代码工作量.

官方文档: https://docs.spring.io/spring-kafka/docs/1.2.2.RELEASE/reference/html/

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • spring boot整合spring-kafka实现发送接收消息实例代码

    前言 由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka. 其实只是做数据的同步,我觉得选MQ没必要使用kafka.首先数据量不大,其实搞kafka又要搞集群,ZK.只是用做一些简单数据同步的话,有点大材小用. 没办法,咱只是个打工的,领导让搞就搞吧.刚开始的时候发现有一个spring-integration-kafka,描述中说是基于spring-kafka做了一次重写.但是我看了官方文档.实在是搞的有点头大

  • Spring Boot集成Kafka的示例代码

    本文介绍了Spring Boot集成Kafka的示例代码,分享给大家,也给自己留个笔记 系统环境 使用远程服务器上搭建的kafka服务 Ubuntu 16.04 LTS kafka_2.12-0.11.0.0.tgz zookeeper-3.5.2-alpha.tar.gz 集成过程 1.创建spring boot工程,添加相关依赖: <?xml version="1.0" encoding="UTF-8"?> <project xmlns=&qu

  • springboot 1.5.2 集成kafka的简单例子

    本文介绍了springboot 1.5.2 集成kafka的简单例子 ,分享给大家,具体如下: 随着spring boot 1.5版本的发布,在spring项目中与kafka集成更为简便. 添加依赖 compile("org.springframework.kafka:spring-kafka:1.1.2.RELEASE") 添加application.properties #kafka # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-server

  • 在springboot中对kafka进行读写的示例代码

    springboot对kafka的client很好的实现了集成,使用非常方便,本文也实现了一个在springboot中实现操作kafka的demo. 1.POM配置 只需要在dependencies中增加 spring-kafka的配置即可.完整效果如下: <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifa

  • spring boot与kafka集成的简单实例

    本文介绍了spring boot与kafka集成的简单实例,分享给大家,具体如下: 引入相关依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.

  • Spring boot集成Kafka+Storm的示例代码

    前言 由于业务需求需要把Strom与kafka整合到spring boot项目里,实现其他服务输出日志至kafka订阅话题,storm实时处理该话题完成数据监控及其他数据统计,但是网上教程较少,今天想写的就是如何整合storm+kafka 到spring boot,顺带说一说我遇到的坑. 使用工具及环境配置 ​ 1. java 版本jdk-1.8 ​ 2. 编译工具使用IDEA-2017 ​ 3. maven作为项目管理 ​ 4.spring boot-1.5.8.RELEASE 需求体现 1.

  • spring boot 与kafka集成的示例代码

    新建spring boot项目 这里使用intellij IDEA 添加kafka集成maven <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLoc

  • Spring Boot与React集成的示例代码

    前言 前不久学习了Web开发,用React写了前端,Spring Boot搭建了后端,然而没有成功地把两个工程结合起来,造成前端与后端之间需要跨域通信,带来了一些额外的工作. 这一次成功地将前端工程与后端结合在一个Project中,记录一下,也希望能帮到那些和我一样的入门小白. 环境 Windows 10 - x64, Java 1.8.0, node v8.9.4, npm 6.1.0 前奏 *JDK, Node 和 NPM请自行安装 新建一个Spring Boot工程 在Intellij里选

  • Spring Boot中使用RabbitMQ的示例代码

    很久没有写Spring Boot的内容了,正好最近在写Spring Cloud Bus的内容,因为内容会有一些相关性,所以先补一篇关于AMQP的整合. Message Broker与AMQP简介 Message Broker是一种消息验证.传输.路由的架构模式,其设计目标主要应用于下面这些场景: 消息路由到一个或多个目的地 消息转化为其他的表现方式 执行消息的聚集.消息的分解,并将结果发送到他们的目的地,然后重新组合相应返回给消息用户 调用Web服务来检索数据 响应事件或错误 使用发布-订阅模式

  • Spring Boot实现文件上传示例代码

    使用SpringBoot进行文件上传的方法和SpringMVC差不多,本文单独新建一个最简单的DEMO来说明一下. 主要步骤包括: 1.创建一个springboot项目工程,本例名称(demo-uploadfile). 2.配置 pom.xml 依赖. 3.创建和编写文件上传的 Controller(包含单文件上传和多文件上传). 4.创建和编写文件上传的 HTML 测试页面. 5.文件上传相关限制的配置(可选). 6.运行测试. 项目工程截图如下: 文件代码: <dependencies>

  • spring boot整合mybatis+mybatis-plus的示例代码

    Spring boot对于我来说是一个刚接触的新东西,学习过程中,发现这东西还是很容易上手的,Spring boot没配置时会默认使用Spring data jpa,这东西可以说一个极简洁的工具,可是我还是比较喜欢用mybatis,工具是没有最好的,只有这合适自己的. 说到mybatis,最近有一个很好用的工具--------mybatis-Plus(官网),现在更新的版本是2.1.2,这里使用的也是这个版本.我比较喜欢的功能是代码生成器,条件构造器,这样就可以更容易的去开发了. mybatis

  • spring boot实现软删除的示例代码

    本文开发环境:spring-boot:2.0.3.RELEASE + java1.8 WHY TO DO 软删除:即不进行真正的删除操作.由于我们实体间的约束性(外键)的存在,删除某些数据后,将导致其它的数据不完整.比如,计算机1801班的教师是张三,此时,我们如果把张三删除掉,那么在查询计算机1801班时,由于张三不存了,所以就会报EntityNotFound的错误.当然了,在有外键约束的数据库中,如果张三是1801班的教师,那么我们直接删除张三将报一个约束性的异常.也就是说:直接删除张三这个

  • Spring Boot 整合 Apache Dubbo的示例代码

    Apache Dubbo是一款高性能.轻量级的开源 Java RPC 框架,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现. 注意,是 Apache Dubbo,不再是 Alibaba Dubbo.简单来说就是 Alibaba 将 Dubbo 移交给 Apache 开源社区进行维护.参见 dubbo-spring-boot-project Spring Boot 系列:整合 Alibaba Dubbo 一.本文示例说明 1.1 框架版本Dubbo 版本

  • Spring Boot 的创建和运行示例代码详解

    目录 1.什么是Spring Boot 2.Spring Boot 优点 3. Spring Boot 项目创建 3.1 使用 Idea 社区版创建 4.项目目录介绍和运行 4.1 运行项目 点击启动类的 main ⽅法就可以运⾏ Spring Boot 项⽬了 4.2 验证项目是否成功 5. 注意事项:包路径错误 5.1 正确路径 6. Spring Boot 热部署(热加载) 6.1 添加框架⽀持  在 pom.xml 中添加如下框架引⽤: 6.2 开启项目自动编译 6.3 开启运⾏中热部署

  • Spring Boot使用模板freemarker的示例代码

    最近有好久没有更新博客了,感谢小伙伴的默默支持,不知道是谁又打赏了我一个小红包,谢谢. 今天我们讲讲怎么在Spring Boot中使用模板引擎freemarker,先看看今天的大纲: (1) freemarker介绍: (2) 新建spring-boot-freemarker工程: (3) 在pom.xml引入相关依赖: (4) 编写启动类: (5) 编写模板文件hello.ftl; (6) 编写访问类HelloController; (7) 测试: (8) freemarker配置: (9)

随机推荐