spring boot整合spring-kafka实现发送接收消息实例代码

前言

由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka. 其实只是做数据的同步,我觉得选MQ没必要使用kafka.首先数据量不大,其实搞kafka又要搞集群,ZK.只是用做一些简单数据同步的话,有点大材小用.

没办法,咱只是个打工的,领导让搞就搞吧.刚开始的时候发现有一个spring-integration-kafka,描述中说是基于spring-kafka做了一次重写.但是我看了官方文档.实在是搞的有点头大.功能一直没实现.文档写的也不是很漂亮,也可能是刚起步,有很多的问题.我这里只能放弃了,使用了spring-kafka.

实现方法

pom.xml文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>org.linuxsogood.sync</groupId>
 <artifactId>linuxsogood-sync</artifactId>
 <version>1.0.0-SNAPSHOT</version>

 <parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>1.4.0.RELEASE</version>
 </parent>

 <properties>
  <java.version>1.8</java.version>
  <!-- 依赖版本 -->
  <mybatis.version>3.3.1</mybatis.version>
  <mybatis.spring.version>1.2.4</mybatis.spring.version>
  <mapper.version>3.3.6</mapper.version>
  <pagehelper.version>4.1.1</pagehelper.version>
 </properties>

 <dependencies>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-jdbc</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-aop</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-freemarker</artifactId>
  </dependency>
  <!--<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-integration</artifactId>
   <scope>compile</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-kafka</artifactId>
   <version>2.0.1.RELEASE</version>
   <scope>compile</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-core</artifactId>
   <version>4.3.1.RELEASE</version>
   <scope>compile</scope>
  </dependency>-->
  <dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>1.1.0.RELEASE</version>
  </dependency>
  <!--<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka-test</artifactId>
   <version>1.1.0.RELEASE</version>
  </dependency>-->
  <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>4.12</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.assertj</groupId>
   <artifactId>assertj-core</artifactId>
   <version>3.5.2</version>
  </dependency>
  <dependency>
   <groupId>org.hamcrest</groupId>
   <artifactId>hamcrest-all</artifactId>
   <version>1.3</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.mockito</groupId>
   <artifactId>mockito-all</artifactId>
   <version>1.9.5</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-test</artifactId>
   <version>4.2.3.RELEASE</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
  </dependency>
  <dependency>
   <groupId>com.microsoft.sqlserver</groupId>
   <artifactId>sqljdbc4</artifactId>
   <version>4.0.0</version>
  </dependency>
  <dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>druid</artifactId>
   <version>1.0.11</version>
  </dependency>

  <!--Mybatis-->
  <dependency>
   <groupId>org.mybatis</groupId>
   <artifactId>mybatis</artifactId>
   <version>${mybatis.version}</version>
  </dependency>
  <dependency>
   <groupId>org.mybatis</groupId>
   <artifactId>mybatis-spring</artifactId>
   <version>${mybatis.spring.version}</version>
  </dependency>
  <!--<dependency>
   <groupId>org.mybatis.spring.boot</groupId>
   <artifactId>mybatis-spring-boot-starter</artifactId>
   <version>1.1.1</version>
  </dependency>-->
  <!-- Mybatis Generator -->
  <dependency>
   <groupId>org.mybatis.generator</groupId>
   <artifactId>mybatis-generator-core</artifactId>
   <version>1.3.2</version>
   <scope>compile</scope>
   <optional>true</optional>
  </dependency>
  <!--分页插件-->
  <dependency>
   <groupId>com.github.pagehelper</groupId>
   <artifactId>pagehelper</artifactId>
   <version>${pagehelper.version}</version>
  </dependency>
  <!--通用Mapper-->
  <dependency>
   <groupId>tk.mybatis</groupId>
   <artifactId>mapper</artifactId>
   <version>${mapper.version}</version>
  </dependency>
  <dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>fastjson</artifactId>
   <version>1.2.17</version>
  </dependency>
 </dependencies>
 <repositories>
  <repository>
   <id>repo.spring.io.milestone</id>
   <name>Spring Framework Maven Milestone Repository</name>
   <url>https://repo.spring.io/libs-milestone</url>
  </repository>
 </repositories>
 <build>
  <finalName>mybatis_generator</finalName>
  <plugins>
   <plugin>
    <groupId>org.mybatis.generator</groupId>
    <artifactId>mybatis-generator-maven-plugin</artifactId>
    <version>1.3.2</version>
    <configuration>
     <verbose>true</verbose>
     <overwrite>true</overwrite>
    </configuration>
   </plugin>
   <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <configuration>
     <mainClass>org.linuxsogood.sync.Starter</mainClass>
    </configuration>
   </plugin>
  </plugins>
 </build>
</project>

orm层使用了MyBatis,又使用了通用Mapper和分页插件.

kafka消费端配置

import org.linuxsogood.sync.listener.Listener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

 @Value("${kafka.broker.address}")
 private String brokerAddress;

 @Bean
 KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
 ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
 factory.setConsumerFactory(consumerFactory());
 factory.setConcurrency(3);
 factory.getContainerProperties().setPollTimeout(3000);
 return factory;
 }

 @Bean
 public ConsumerFactory<String, String> consumerFactory() {
 return new DefaultKafkaConsumerFactory<>(consumerConfigs());
 }

 @Bean
 public Map<String, Object> consumerConfigs() {
 Map<String, Object> propsMap = new HashMap<>();
 propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
 propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
 propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
 propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "firehome-group");
 propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 return propsMap;
 }

 @Bean
 public Listener listener() {
 return new Listener();
 }
}

生产者的配置.

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaProducerConfig {

 @Value("${kafka.broker.address}")
 private String brokerAddress;

 @Bean
 public ProducerFactory<String, String> producerFactory() {
 return new DefaultKafkaProducerFactory<>(producerConfigs());
 }

 @Bean
 public Map<String, Object> producerConfigs() {
 Map<String, Object> props = new HashMap<>();
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
 props.put(ProducerConfig.RETRIES_CONFIG, 0);
 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
 props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 return props;
 }

 @Bean
 public KafkaTemplate<String, String> kafkaTemplate() {
 return new KafkaTemplate<String, String>(producerFactory());
 }
}

监听,监听里面,写的就是业务逻辑了,从kafka里面得到数据后,具体怎么去处理. 如果需要开启kafka处理消息的广播模式,多个监听要监听不同的group,即方法上的注解@KafkaListener里的group一定要不一样.如果多个监听里的group写的一样,就会造成只有一个监听能处理其中的消息,另外监听就不能处理消息了.也即是kafka的分布式消息处理方式.

在同一个group里的监听,共同处理接收到的消息,会根据一定的算法来处理.如果不在一个组,但是监听的是同一个topic的话,就会形成广播模式

import com.alibaba.fastjson.JSON;
import org.linuxsogood.qilian.enums.CupMessageType;
import org.linuxsogood.qilian.kafka.MessageWrapper;
import org.linuxsogood.qilian.model.store.Store;
import org.linuxsogood.sync.mapper.StoreMapper;
import org.linuxsogood.sync.model.StoreExample;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;

import java.util.List;
import java.util.Optional;

public class Listener {

 private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class);

 @Autowired
 private StoreMapper storeMapper;

 /**
  * 监听kafka消息,如果有消息则消费,同步数据到新烽火的库
  * @param record 消息实体bean
  */
 @KafkaListener(topics = "linuxsogood-topic", group = "sync-group")
 public void listen(ConsumerRecord<?, ?> record) {
  Optional<?> kafkaMessage = Optional.ofNullable(record.value());
  if (kafkaMessage.isPresent()) {
   Object message = kafkaMessage.get();
   try {
    MessageWrapper messageWrapper = JSON.parseObject(message.toString(), MessageWrapper.class);
    CupMessageType type = messageWrapper.getType();
    //判断消息的数据类型,不同的数据入不同的表
    if (CupMessageType.STORE == type) {
     proceedStore(messageWrapper);
    }
   } catch (Exception e) {
    LOGGER.error("将接收到的消息保存到数据库时异常, 消息:{}, 异常:{}",message.toString(),e);
   }
  }
 }

 /**
  * 消息是店铺类型,店铺消息处理入库
  * @param messageWrapper 从kafka中得到的消息
  */
 private void proceedStore(MessageWrapper messageWrapper) {
  Object data = messageWrapper.getData();
  Store cupStore = JSON.parseObject(data.toString(), Store.class);
  StoreExample storeExample = new StoreExample();
  String storeName = StringUtils.isBlank(cupStore.getStoreOldName()) ? cupStore.getStoreName() : cupStore.getStoreOldName();
  storeExample.createCriteria().andStoreNameEqualTo(storeName);
  List<org.linuxsogood.sync.model.Store> stores = storeMapper.selectByExample(storeExample);
  org.linuxsogood.sync.model.Store convertStore = new org.linuxsogood.sync.model.Store();
  org.linuxsogood.sync.model.Store store = convertStore.convert(cupStore);
  //如果查询不到记录则新增
  if (stores.size() == 0) {
   storeMapper.insert(store);
  } else {
   store.setStoreId(stores.get(0).getStoreId());
   storeMapper.updateByPrimaryKey(store);
  }
 }

}

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

(0)

相关推荐

  • Spring Boot实战之发送邮件示例代码

    一.邮件服务简介 邮件服务在互联网早期就已经出现,如今已成为人们互联网生活中必不可少的一项服务.那么邮件服务是怎么工作的呢?如下给出邮件发送与接收的典型过程: 1.发件人使用SMTP协议传输邮件到邮件服务器A: 2.邮件服务器A根据邮件中指定的接收者,投送邮件至相应的邮件服务器B: 3.收件人使用POP3协议从邮件服务器B接收邮件. SMTP(Simple Mail Transfer Protocol)是电子邮件(email)传输的互联网标准,定义在RFC5321,默认使用端口25: POP3(

  • SpringBoot配置Email发送功能实例

    本篇介绍在SpringBoot中配置Email服务的具体步骤,以及常见的异常分析. 具体案例以QQ邮箱以及QQ企业邮箱为例. QQ邮箱发送方式 QQ企业邮箱发送方式 总结 tips: 下面提到的hashIndex指的是一个元素put到hashmap中时,要根据其key.hashcode & (table.size()-1)来决定其在table中的位置. table是一个数组,类型为Node.Node是hashmap的一个内部类,用来描述hashmap的元素的一些属性. 1.相关依赖包 <pa

  • Spring-boot JMS 发送消息慢的解决方法

    Spring-boot JMS 发送消息慢的问题解决 1.在<ActiveMQ 基于zookeeper的主从(levelDB Master/Slave)搭建以及Spring-boot下使用>中,采用以下代码进行JMS消息发送: @Service public class Producer { @Autowired private JmsMessagingTemplate jmsTemplate; public void sendMessage(Destination destination,

  • SpringBoot webSocket实现发送广播、点对点消息和Android接收

    1.SpringBoot webSocket SpringBoot 使用的websocket 协议,不是标准的websocket协议,使用的是名称叫做STOMP的协议. 1.1 STOMP协议说明 STOMP,Streaming Text Orientated Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议. 它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消

  • SpringBoot配置发送Email的示例代码

    本文介绍了SpringBoot配置发送Email,分享给大家,具体如下: 引入依赖 在 pom.xml 文件中引入邮件配置: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-mail</artifactId> </dependency> 配置文件 # JavaMailSender 邮件发送的配置 sprin

  • Spring Boot中利用JavaMailSender发送邮件的方法示例(附源码)

    快速入门 在Spring Boot的工程中的pom.xml中引入spring-boot-starter-mail依赖: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-mail</artifactId> </dependency> 如其他自动化配置模块一样,在完成了依赖引入之后,只需要在applicatio

  • Spring Boot实现邮件发送功能

    本文实例为大家分享了Spring Boot邮件发送功能的具体代码,供大家参考,具体内容如下 1.引入依赖 <!-- mail依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-mail</artifactId> </dependency> 2.参数配置 在application.pro

  • SpringBoot添加Email发送功能及常见异常详解

    1.完整的邮件发送代码 1.1.依赖包 <dependency> <groupId>org.springframework</groupId> <artifactId>spring-support</artifactId> <version>2.0.8</version> <exclusions> <exclusion> <groupId>javax.servlet</groupI

  • spring boot整合spring-kafka实现发送接收消息实例代码

    前言 由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka. 其实只是做数据的同步,我觉得选MQ没必要使用kafka.首先数据量不大,其实搞kafka又要搞集群,ZK.只是用做一些简单数据同步的话,有点大材小用. 没办法,咱只是个打工的,领导让搞就搞吧.刚开始的时候发现有一个spring-integration-kafka,描述中说是基于spring-kafka做了一次重写.但是我看了官方文档.实在是搞的有点头大

  • Spring Boot 中使用@KafkaListener并发批量接收消息的完整代码

    kakfa是我们在项目开发中经常使用的消息中间件.由于它的写性能非常高,因此,经常会碰到读取Kafka消息队列时拥堵的情况.遇到这种情况时,有时我们不能直接清理整个topic,因为还有别的服务正在使用该topic.因此只能额外启动一个相同名称的consumer-group来加快消息消费(如果该topic只有一个分区,再启动一个新的消费者,没有作用). 完整的代码在这里,欢迎加星号.fork. 官方文档在https://docs.spring.io/spring-kafka/reference/h

  • Spring Boot整合Spring Data JPA过程解析

    Spring Boot整合Spring Data JPA 1)加入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> &l

  • Spring Boot整合Spring Security简单实现登入登出从零搭建教程

    前言 Spring Security是一个能够为基于Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架.它提供了一组可以在Spring应用上下文中配置的Bean,充分利用了Spring IoC,DI(控制反转Inversion of Control ,DI:Dependency Injection 依赖注入)和AOP(面向切面编程)功能,为应用系统提供声明式的安全访问控制功能,减少了为企业系统安全控制编写大量重复代码的工作. 本文主要给大家介绍了关于Spring Boot整合S

  • Spring Boot整合Spring Security的示例代码

    本文讲述Spring Boot整合Spring Security在方法上使用注解实现权限控制,使用自定义UserDetailService,从MySQL中加载用户信息.使用Security自带的MD5加密,对用户密码进行加密.页面模板采用thymeleaf引擎. 源码地址:https://github.com/li5454yong/springboot-security.git 1.引入pom依赖 <parent> <groupId>org.springframework.boot

  • Spring Boot整合Spring Cache及Redis过程解析

    这篇文章主要介绍了Spring Boot整合Spring Cache及Redis过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1.安装redis a.由于官方是没有Windows版的,所以我们需要下载微软开发的redis,网址: https://github.com/MicrosoftArchive/redis/releases b.解压后,在redis根目录打开cmd界面,输入:redis-server.exe redis.wind

  • Spring Boot项目集成Knife4j接口文档的实例代码

    目录 1.在pom.xml引入依赖包 2.创建Knife4j配置文件 3.使用Knife4j注解 4.全局参数 Knife4j就相当于是swagger的升级版,对于我来说,它比swagger要好用得多 1.在pom.xml引入依赖包 <!-- Swagger配置依赖knife4j --> <dependency> <groupId>com.github.xiaoymin</groupId> <artifactId>knife4j-spring-b

  • Spring Boot整合Spring Data Jpa代码实例

    一.Spring Data Jpa的简介 spring data:其实就是spring 提供的一个操作数据的框架.而spring data JPA 只是spring data 框架下的一个基于JPA标准操作数据的模块. spring data jpa :基于JPA的标准对数据进行操作.简化操作持久层的代码,只需要编写接口就可以,不需要写sql语句,甚至可以不用自己手动创建数据库表. 二.添加依赖 <!--添加springdatajpa的依赖--> <dependency> <

  • Spring Boot 整合持久层之Spring Data JPA

    目录 整合Spring Data JPA 1. 创建数据库 2. 创建项目 3. 数据库配置 4. 创建实体类 5. 创建 BookDao 接口 6. 创建 BookService 7. 创建 BookController 8. 测试 整合Spring Data JPA JPA (Java Persistence API)和 Spring Data 是两个范畴的概念. Hibernate 是一个 ORM 框架,JPA 则是一种ORM,JPA 和 Hibernate 的关系就像 JDBC 与 JD

随机推荐