Springboot 2.x集成kafka 2.2.0的示例代码

目录
  • 引言
  • 基本环境
  • 代码编写
    • 1、基本引用pom
    • 2、基本配置
    • 3、实体类
    • 4、生产者端
    • 5、消费者
    • 6、测试
  • 效果展示
  • 遇到的问题

引言

kafka近几年更新非常快,也可以看出kafka在企业中是用的频率越来越高,在springboot中集成kafka还是比较简单的,但是应该注意使用的版本和kafka中基本配置,这个地方需要信心,防止进入坑中。

版本对应地址:https://spring.io/projects/spring-kafka

基本环境

springboot版本2.1.4

kafka版本2.2.0

jdk 1.8

代码编写

1、基本引用pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.4.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.example</groupId>
	<artifactId>demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>kafkademo</name>
	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>2.2.0.RELEASE</version>
		</dependency>

		<dependency>
			<groupId>com.google.code.gson</groupId>
			<artifactId>gson</artifactId>
			<version>2.7</version>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

2、基本配置

spring.kafka.bootstrap-servers=2.1.1.1:9092
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#logging.level.root=debug

3、实体类

package com.example.demo.model;

import java.util.Date;

public class Messages {

    private Long id;

    private String msg;

    private Date sendTime;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public Date getSendTime() {
        return sendTime;
    }

    public void setSendTime(Date sendTime) {
        this.sendTime = sendTime;
    }
}

4、生产者端

package com.example.demo.service;

import com.example.demo.model.Messages;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.Date;
import java.util.UUID;

@Service
public class KafkaSender {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private Gson gson = new GsonBuilder().create();

    public void send() {
        Messages message = new Messages();
        message.setId(System.currentTimeMillis());
        message.setMsg("123");
        message.setSendTime(new Date());
        ListenableFuture<SendResult<String, String>> test0 = kafkaTemplate.send("newtopic", gson.toJson(message));
    }
}

5、消费者

package com.example.demo.service;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.Optional;

@Service
public class KafkaReceiver {

    @KafkaListener(topics = {"newtopic"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("record =" + record);
            System.out.println("message =" + message);

        }
    }

}

6、测试

在启动方法中模拟消息生产者,向kafka中发送消息

package com.example.demo;

import com.example.demo.service.KafkaSender;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication
public class KafkademoApplication {

	public static void main(String[] args) {
		ConfigurableApplicationContext context = SpringApplication.run(KafkademoApplication.class, args);

		KafkaSender sender = context.getBean(KafkaSender.class);
		for (int i = 0; i <1000; i++) {
			sender.send();

			try {
				Thread.sleep(300);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

	}

}

效果展示

命令行直接消费消息

遇到的问题

生产端连接kafka超时

at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)

解决方案:

修改kafka中的server.properties中的下面配置,将原来的默认配置替换成下面ip+端口的形式,重启kafka

到此这篇关于Springboot 2.x集成kafka 2.2.0的示例代码的文章就介绍到这了,更多相关Springboot集成kafka内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • SpringBoot集成kafka全面实战记录

    本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下<大白话kafka架构原理>.<秒懂kafka HA(高可用)>两篇文章. 一.生产者实践 普通生产者 带回调的生产者 自定义分区器 kafka事务提交 二.消费者实践 简单消费 指定topic.partition.offset消费 批量消费 监听异常处理器 消息过滤器 消息转发 定时启动/停止监听器 一.前戏 1.在项目中连接kafka,因为是外网,首先要开放kafka配置文件

  • springboot 1.5.2 集成kafka的简单例子

    本文介绍了springboot 1.5.2 集成kafka的简单例子 ,分享给大家,具体如下: 随着spring boot 1.5版本的发布,在spring项目中与kafka集成更为简便. 添加依赖 compile("org.springframework.kafka:spring-kafka:1.1.2.RELEASE") 添加application.properties #kafka # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-server

  • Springboot集成Kafka进行批量消费及踩坑点

    目录 引入依赖 创建配置类 Kafka 消费者 引入依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.11.RELEASE</version> </dependency> 因为我的项目的 springboot 版本是 1.5.22.RELE

  • SpringBoot集成Kafka的步骤

    SpringBoot集成Kafka 本篇主要讲解SpringBoot 如何集成Kafka ,并且简单的 编写了一个Demo 来测试 发送和消费功能 前言 选择的版本如下: springboot : 2.3.4.RELEASE spring-kafka : 2.5.6.RELEASE kafka : 2.5.1 zookeeper : 3.4.14 本Demo 使用的是 SpringBoot 比较高的版本 SpringBoot 2.3.4.RELEASE 它会引入 spring-kafka 2.5

  • Springboot集成Kafka实现producer和consumer的示例代码

    本文介绍如何在springboot项目中集成kafka收发message. Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能.高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息.支持通过Kafka服务器和消费机集群来分区消息.支持Hadoop并行数据加载. 安装Kafka 因为安装kafka需要zookeeper的支持,所以Windows安装时需要将zookee

  • Springboot 2.x集成kafka 2.2.0的示例代码

    目录 引言 基本环境 代码编写 1.基本引用pom 2.基本配置 3.实体类 4.生产者端 5.消费者 6.测试 效果展示 遇到的问题 引言 kafka近几年更新非常快,也可以看出kafka在企业中是用的频率越来越高,在springboot中集成kafka还是比较简单的,但是应该注意使用的版本和kafka中基本配置,这个地方需要信心,防止进入坑中. 版本对应地址:https://spring.io/projects/spring-kafka 基本环境 springboot版本2.1.4 kafk

  • springboot集成CAS实现单点登录的示例代码

    最近新参与的项目用到了cas单点登录,我还不会,这怎么能容忍!空了学习并搭建了一个spring-boot 集成CAS 的demo.实现了单点登录与登出. 单点登录英文全称是:Single Sign On,简称SSO. 含义:在多个相互信任的系统中,只要登录一个系统其他系统均可访问. CAS 是一种使用广泛的单点登录实现,分为客户端CAS Client和服务端 CAS Service,客户端就是我们的系统,服务端是认证中心,由CAS提供,我们需要稍作修改,启动起来就可以用.~~~~ 效果演示 ht

  • SpringBoot集成redis实现分布式锁的示例代码

    1.准备 使用redis实现分布式锁,需要用的setnx(),所以需要集成Jedis 需要引入jar,jar最好和redis的jar版本对应上,不然会出现版本冲突,使用的时候会报异常redis.clients.jedis.Jedis.set(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;I)Ljava/lang/String; 我使用的redis版本是2.3.0,Jedis使用的是3.3.0 <de

  • SpringBoot2.3集成ELK7.1.0的示例代码

    最近想用ELK做日志分析,所以先写了Demo来实验一下! 1.安装ELK(Elasticsearch+Logstash+Kibana),具体安装教程百度 2.查看是否安装成功,输入localhost:9200,localhost:5601,如下页面则安装成功 3.pom包依赖 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM

  • SpringBoot整合Redis实现消息发布与订阅的示例代码

    当我们在多个集群应用中使用到本地缓存时,在数据库数据得到更新后,为保持各个副本当前被修改的数据与数据库数据保持同步,在数据被操作后向其他集群应用发出被更新数据的通知,使其删除;下次当其他应用请求该被更新的数据时,应用会到数据库去取,也就是最新的数据,从而使得被更新数据与数据库保持同步! 能实现发送与接收信息的中间介有很多,比如:RocketMQ.RabbitMQ.ActiveMQ.Kafka等,本次主要简单介绍Redis的推送与订阅功能并集成Spring Boot的实现. 1.添加SpringB

  • SpringBoot实现阿里云短信接口对接的示例代码

    前言 公司最近项目需要一个手机验证码的功能,任务确定后,倍感亚历山大,以为和第三方对接的都好麻烦,查阿里的API.网上大神写的博客,各种查之后才发现,简单的一塌糊涂,这里想说个问题,不知道其他的攻城狮们是不是和我一样的心里,刚接触个没做过的任务时,会一脸懵里的着急,无从下手的感觉,后来会了,就觉得简单的一*,在这里我说一下自己的体会,遇到任何难点,先理思路.任务拆分.逐个查资料,其实一套下来,就不会那种一脸懵逼的干着急... 所需条件 1.阿里云账户 2.开通云通讯中的短信服务 3.申请短信签名

  • SpringBoot实现文件上传与下载功能的示例代码

    目录 Spring Boot文件上传与下载 举例说明 1.引入Apache Commons FileUpload组件依赖 2.设置上传文件大小限制 3.创建选择文件视图页面 4.创建控制器 5.创建文件下载视图页面 6.运行 Spring Boot文件上传与下载 在实际的Web应用开发中,为了成功上传文件,必须将表单的method设置为post,并将enctype设置为multipart/form-data.只有这种设置,浏览器才能将所选文件的二进制数据发送给服务器. 从Servlet 3.0开

  • Java Kafka实现延迟队列的示例代码

    目录 基于kafka如何实现延迟队列 完善细节 Java代码实现 还需要做什么 kafka作为一个使用广泛的消息队列,很多人都不会陌生,但当你在网上搜索“kafka 延迟队列”,出现的都是一些讲解时间轮或者只是提供了一些思路,并没有一份真实可用的代码实现,今天我们就来打破这个现象,提供一份可运行的代码,抛砖引玉,吸引更多的大神来分享. 基于kafka如何实现延迟队列 想要解决一个问题,我们需要先分解问题.kafka作为一个高性能的消息队列,只要消费能力足够,发出的消息都是会立刻收到的,因此我们需

  • SpringBoot结合JSR303对前端数据进行校验的示例代码

    一.校验分类 数据的校验一般分为**前端校验.后端校验** 二.前端校验 前端校验是最为明显的,先说一下: ① HTML 非空校验 如 HTML5 新增的属性required="true",一旦没有填写就输入框就显示红色,具体使用如: <input type="text" id="name" name="name" required="true"/> ② JS 同时在提交表单发送 Ajax请求

  • SpringBoot中使用Session共享实现分布式部署的示例代码

    前言:我们知道,在单体项目中,我们将用户信息存在 session 中,那么在该 session 过期之前,我们都可以从 session 中获取到用户信息,通过登录拦截,进行操作 但是分布式部署的时候,我们请求的服务器可能不是同一台服务器,那么我们就必须要面对 session 共享的问题,下面介绍的是在 SpringBoot 实现 session 共享的方式 一.创建项目 创建 SpringBoot 项目,选择 Maven 依赖 最终 pom.xml 文件如下: <!-- redis的依赖 -->

随机推荐