Spring Boot集成Kafka的示例代码

本文介绍了Spring Boot集成Kafka的示例代码,分享给大家,也给自己留个笔记

系统环境

使用远程服务器上搭建的kafka服务

  1. Ubuntu 16.04 LTS
  2. kafka_2.12-0.11.0.0.tgz
  3. zookeeper-3.5.2-alpha.tar.gz

集成过程

1.创建spring boot工程,添加相关依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.laravelshao.springboot</groupId>
  <artifactId>spring-boot-integration-kafka</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>spring-boot-integration-kafka</name>
  <description>Demo project for Spring Boot</description>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.0.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!--kafka-->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-json</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
</project>

2.添加配置信息,这里使用yml文件

spring:
 kafka:
  bootstrap-servers:X.X.X.X:9092
  producer:
   value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  consumer:
   group-id: test
   auto-offset-reset: earliest
   value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
   properties:
    spring:
     json:
      trusted:
       packages: com.laravelshao.springboot.kafka

3.创建消息对象

public class Message {
  private Integer id;
  private String msg;

  public Message() {
  }

  public Message(Integer id, String msg) {
    this.id = id;
    this.msg = msg;
  }

  public Integer getId() {
    return id;
  }

  public void setId(Integer id) {
    this.id = id;
  }

  public String getMsg() {
    return msg;
  }

  public void setMsg(String msg) {
    this.msg = msg;
  }

  @Override
  public String toString() {
    return "Message{" +
        "id=" + id +
        ", msg='" + msg + '\'' +
        '}';
  }
}

4.创建生产者

package com.laravelshao.springboot.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
 * Created by shaoqinghua on 2018/3/23.
 */
@Component
public class Producer {
  private static Logger log = LoggerFactory.getLogger(Producer.class);

  @Autowired
  private KafkaTemplate kafkaTemplate;

  public void send(String topic, Message message) {
    kafkaTemplate.send(topic, message);
    log.info("Producer->topic:{}, message:{}", topic, message);
  }

}

5.创建消费者,使用@ KafkaListener注解监听主题

package com.laravelshao.springboot.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * Created by shaoqinghua on 2018/3/23.
 */
@Component
public class Consumer {
  private static Logger log = LoggerFactory.getLogger(Consumer.class);

  @KafkaListener(topics = "test_topic")
  public void receive(ConsumerRecord<String, Message> consumerRecord) {
    log.info("Consumer->topic:{}, value:{}", consumerRecord.topic(), consumerRecord.value());
  }

}

6.发送消费测试

package com.laravelshao.springboot;

import com.laravelshao.springboot.kafka.Message;
import com.laravelshao.springboot.kafka.Producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

@SpringBootApplication
public class IntegrationKafkaApplication {

  public static void main(String[] args) throws InterruptedException {
    ApplicationContext context = SpringApplication.run(IntegrationKafkaApplication.class, args);
    Producer producer = context.getBean(Producer.class);

    for (int i = 1; i < 10; i++) {
      producer.send("test_topic", new Message(i, "test topic message " + i));
      Thread.sleep(2000);
    }
  }

}

可以依次看到发送消息,消费消息

异常问题

反序列化异常(自定义的消息对象不在kafka信任的包路径下)?

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.719 Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test_topic-0 at offset 9. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.laravelshao.springboot.kafka.Message' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
 at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139)
 at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113)
 at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:191)
 at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
 at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
 at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
 at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
 at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:667)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.lang.Thread.run(Thread.java:745)

解决方法:将当前包添加到kafka信任的包路径下

spring:
 kafka:
  consumer:
   properties:
    spring:
     json:
      trusted:
       packages: com.laravelshao.springboot.kafka

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

您可能感兴趣的文章:

  • Spring boot集成Kafka+Storm的示例代码
  • springboot 1.5.2 集成kafka的简单例子
  • spring boot与kafka集成的简单实例
  • 在springboot中对kafka进行读写的示例代码
  • spring boot整合spring-kafka实现发送接收消息实例代码
(0)

相关推荐

  • spring boot与kafka集成的简单实例

    本文介绍了spring boot与kafka集成的简单实例,分享给大家,具体如下: 引入相关依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.

  • 在springboot中对kafka进行读写的示例代码

    springboot对kafka的client很好的实现了集成,使用非常方便,本文也实现了一个在springboot中实现操作kafka的demo. 1.POM配置 只需要在dependencies中增加 spring-kafka的配置即可.完整效果如下: <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifa

  • Spring boot集成Kafka+Storm的示例代码

    前言 由于业务需求需要把Strom与kafka整合到spring boot项目里,实现其他服务输出日志至kafka订阅话题,storm实时处理该话题完成数据监控及其他数据统计,但是网上教程较少,今天想写的就是如何整合storm+kafka 到spring boot,顺带说一说我遇到的坑. 使用工具及环境配置 ​ 1. java 版本jdk-1.8 ​ 2. 编译工具使用IDEA-2017 ​ 3. maven作为项目管理 ​ 4.spring boot-1.5.8.RELEASE 需求体现 1.

  • springboot 1.5.2 集成kafka的简单例子

    本文介绍了springboot 1.5.2 集成kafka的简单例子 ,分享给大家,具体如下: 随着spring boot 1.5版本的发布,在spring项目中与kafka集成更为简便. 添加依赖 compile("org.springframework.kafka:spring-kafka:1.1.2.RELEASE") 添加application.properties #kafka # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-server

  • spring boot整合spring-kafka实现发送接收消息实例代码

    前言 由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka. 其实只是做数据的同步,我觉得选MQ没必要使用kafka.首先数据量不大,其实搞kafka又要搞集群,ZK.只是用做一些简单数据同步的话,有点大材小用. 没办法,咱只是个打工的,领导让搞就搞吧.刚开始的时候发现有一个spring-integration-kafka,描述中说是基于spring-kafka做了一次重写.但是我看了官方文档.实在是搞的有点头大

  • Spring Boot集成Kafka的示例代码

    本文介绍了Spring Boot集成Kafka的示例代码,分享给大家,也给自己留个笔记 系统环境 使用远程服务器上搭建的kafka服务 Ubuntu 16.04 LTS kafka_2.12-0.11.0.0.tgz zookeeper-3.5.2-alpha.tar.gz 集成过程 1.创建spring boot工程,添加相关依赖: <?xml version="1.0" encoding="UTF-8"?> <project xmlns=&qu

  • 使用Spring Boot集成FastDFS的示例代码

    这篇文章我们介绍如何使用Spring Boot将文件上传到分布式文件系统FastDFS中. 这个项目会在上一个项目的基础上进行构建. 1.pom包配置 我们使用Spring Boot最新版本1.5.9.jdk使用1.8.tomcat8.0. <dependency> <groupId>org.csource</groupId> <artifactId>fastdfs-client-java</artifactId> <version>

  • Spring boot集成RabbitMQ的示例代码

    RabbitMQ简介 RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求. AMQP就是一个协议

  • spring boot整合Swagger2的示例代码

    Swagger 是一个规范和完整的框架,用于生成.描述.调用和可视化RESTful风格的 Web 服务.总体目标是使客户端和文件系统作为服务器以同样的速度来更新.文件的方法,参数和模型紧密集成到服务器端的代码,允许API来始终保持同步.Swagger 让部署管理和使用功能强大的API从未如此简单. 1.代码示例 1).在pom.xml文件中引入Swagger2 <dependency> <groupId>io.springfox</groupId> <artifa

  • 深入研究spring boot集成kafka之spring-kafka底层原理

    目录 前言 简单集成 引入依赖 添加配置 测试发送和接收 Spring-kafka-test嵌入式KafkaServer 引入依赖 启动服务 创建新的Topic 程序启动时创建TOPIC 代码逻辑中创建 PS:其他的方式创建TOPIC 引入依赖 api方式创建 命令方式创建 消息发送之KafkaTemplate探秘 获取发送结果 异步获取 同步获取 KAFKA事务消息 REPLYINGKAFKATEMPLATE获得消息回复 Spring-kafka消息消费用法探秘 @KAFKALISTENER的

  • Spring Boot集成Mybatis的实例代码(简洁版)

    概述 现在互联网应用中,大部分还是使用Mybatis来操作数据库的,本文介绍一下Spring Boot中如何集成Mybatis. 上篇介绍了Spring Boot 直接用jar运行项目的方法,需要的朋友点击查看. 创建Spring Boot工程 在 Spring Boot 开篇-创建和运行 一文中有一个小节介绍了如何使用Spring Boot的组件来创建工程.如果要集成Mybatis,只需要把Mysql和Mybatis这两个组件勾选一下即可. 当然也可以不通过这种方式,直接在POM.xml文件中

  • Spring boot配置 swagger的示例代码

    为什么使用Swagger 在实际开发中我们作为后端总是给前端或者其他系统提供接口,每次写完代码之后不可避免的都需要去写接口文档,首先写接口文档是一件繁琐的事,其次由接口到接口文档需要对字段.甚至是排版等.再加上如果我们是为多个系统提供接口时可能还需要按照不同系统的要求去书写文档,那么有没有一种方式让我们在开发阶段就给前端提供好接口文档,甚至我们可以把生成好的接口文档暴露出去供其他系统调用,那么这样我只需要一份代码即可. Spring boot配置 swagger 1.导入maven依赖 <!--

  • Spring boot集成Kafka消息中间件代码实例

    一.创建Spring boot项目,添加如下依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <ar

  • spring boot封装HttpClient的示例代码

    最近使用到了HttpClient,看了一下官方文档:HttpClient implementations are expected to be thread safe. It is recommended that the same instance of this class is reused for multiple request executions,翻译过来的意思就是:HttpClient的实现是线程安全的,可以重用相同的实例来执行多次请求.遇到这种描述的话,我们就应该想到,需要对H

随机推荐