spring-cloud-stream结合kafka使用详解

1.pom文件导入依赖

<!-- kafka -->
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

2.application.yml文件配置

spring:
 cloud:
  stream:
   kafka:
    binder:
     brokers: xxx.xxx.xxx.xx:xxxx // Kafka的消息中间件服务器地址
   bindings:
    xxx_output: // 通道名称
     destination: xxx // 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的
     // 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json)
    xxx_input:
     destination: xxx // 消息发往的目的地,对应topic
     group: xxx // 对应kafka的group

3.创建消息发送者

@EnableBinding(Source.class) // @EnableBinding 是绑定通道的,Soure.class是spring 提供的,表示这是一个可绑定的发布通道
@Service
public class MqService {

  @Resource(name = KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
  private MessageChannel oesWorkbenchChannel;

  /**
   * 发送一条kafka消息
   */
  public boolean sendLifeData(Object object) {
    return MqUtils.send(oesWorkbenchChannel, object, KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT);
  }
}

// 发布通道
public interface Source {
  @Output(KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
  MessageChannel oesWorkbenchLifeDataOutput(); // 发布通道用MessageChannel
}

4.创建消息监听者

@Slf4j
@EnableBinding(Sink.class)
public class WorkbenchStreamListener {

  @Resource
  private FileService fileService;

  @StreamListener(KafkaConstants.xxx_input) // 监听接受通道
  public void receiveData(MoveMessage moveMessage) {
  }
}

// 接受通道
public interface Sink {
  @Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT)
  SubscribableChannel oesWorkbenchMoveInput(); // 接受通道用SubscribableChannel
}

接下来就可以愉快的发送监听消息了

到此这篇关于spring-cloud-stream结合kafka使用详解的文章就介绍到这了,更多相关spring-cloud-stream整合kafka内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Spring Cloud Stream如何实现服务之间的通讯

    Spring Cloud Stream Srping cloud Bus的底层实现就是Spring Cloud Stream,Spring Cloud Stream的目的是用于构建基于消息驱动(或事件驱动)的微服务架构.Spring Cloud Stream本身对Spring Messaging.Spring Integration.Spring Boot Actuator.Spring Boot Externalized Configuration等模块进行封装(整合)和扩展,下面我们实现两个

  • Spring Cloud Stream分区分组原理图解

    消息分组 通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上.默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能. 当把消费者复制一份,发现2个都能收到消息 2个消费者都加入同一个消费者 发现只有一个能收到 消息分区 有一些场

  • Spring Cloud Stream异常处理过程解析

    应用处理 当消费者在处理接收到的消息时,有可能会由于某些原因而抛出异常.若希望对抛出来的异常进行处理的话,就需要采取一些异常处理手段,异常处理的方式可分为三种:应用层面的处理.系统层面的处理以及通过RetryTemplate进行处理. 本小节先来介绍较为常用的应用层面的异常处理方式,该方式又细分为局部处理和全局处理. 局部处理 Stream相关的配置内容如下: spring: cloud: stream: rocketmq: binder: name-server: 192.168.190.12

  • 详解Spring Cloud Stream使用延迟消息实现定时任务(RabbitMQ)

    我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次.然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,比如:编写博文时候,设置2小时之后发送.对于这些开始时间不确定的定时任务,我们也可以通过Spring Cloud Stream来很好的处理. 为了实现开始时间不确定的定时任务触发,我们将引入延迟消息的使用.RabbitMQ中提供了关于延迟消息的插件,所以本文就来具体介绍

  • Spring Cloud Stream 高级特性使用详解

    目录 重试 消息发送失败的处理 消费错误处理 自定义MessageHandler类型 Endpoint端点 Metrics指标 Serverless Partition统一 Polling Consumer 支持多个Binder同时使用 建立事件机制 重试 Consumer端可以配置重试次数,当消息消费失败的时候会进行重试. 底层使用Spring Retry去重试,重试次数可自定义配置. # 默认重试次数为3,配置大于1时才会生效 spring.cloud.stream.bindings.<ch

  • Spring Cloud Feign高级应用实例详解

    这篇文章主要介绍了Spring Cloud Feign高级应用实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1.使用feign进行服务间的调用 Spring boot2X Consul如何使用Feign实现服务调用 2.开启gzip压缩 Feign支持对请求与响应的压缩,以提高通信效率,需要在服务消费者配置文件开启压缩支持和压缩文件的类型 添加配置 feign.compression.request.enabled=true feig

  • Spring Cloud Alibaba 之 Nacos教程详解

    Nacos 技术讲解 一提到分布式系统就不的不提一下 CAP 原则 Nacos简介 Nacos是阿里的一个开源产品,它是针对微服务架构中的服务发现.配置管理.服务治理的综合性解决方案. 官方介绍是这样的: Nacos致力于帮助您发现.配置和管理微服务.Nacos提供了一组简单易用的特性集,帮助您实现动态服务发现.服务配置管理.服务及流量管理.Nacos帮助您更敏捷和容易地构建.交付和管理微服务平台.Nacos是构建以"服务"为中心的现代应用架构的服务基础设施. 什么是CAP CAP原则

  • Spring Cloud Zuul的重试配置详解

    Spring Cloud Zuul模块本身就包含了对于hystrix和ribbon的依赖,当我们使用zuul通过path和serviceId的组合来配置路由的时候,可以通过hystrix和ribbon的配置调整路由请求的各种时间超时机制. 1 ribbon配置举例 配置连接超时时间1秒,请求处理时间2秒,统一服务server尝试重连1次,切换server重连1次 ribbon: ConnectTimeout: 1000 ReadTimeout: 2000 MaxAutoRetries: 1 Ma

  • Spring Cloud Gateway使用Token验证详解

    引入依赖 <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <ty

  • Spring Cloud 动态刷新配置信息教程详解

    有时候在配置中心有些参数是需要修改的,这时候如何不重启而达到实时生效的效果呢? 添加依赖 <dependencies> ... <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> ... </dependencies>

  • Spring Boot整合Kafka教程详解

    目录 正文 步骤一:添加依赖项 步骤二:配置 Kafka 步骤三:创建一个生产者 步骤四:创建一个消费者 正文 本教程将介绍如何在 Spring Boot 应用程序中使用 Kafka.Kafka 是一个分布式的发布-订阅消息系统,它可以处理大量数据并提供高吞吐量. 在本教程中,我们将使用 Spring Boot 2.5.4 和 Kafka 2.8.0. 步骤一:添加依赖项 在 pom.xml 中添加以下依赖项: <dependency> <groupId>org.springfra

  • spring-cloud-stream结合kafka使用详解

    1.pom文件导入依赖 <!-- kafka --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> 2.application.yml文件配置 spring: cloud: stream: kafka: bind

  • 一文快速掌握Spring Cloud Stream

    目录 一.概述简介 1.1. cloud Stream是什么 1.2. 设计思想 1.4. 注解 二.基于注解代码练习 2.1. 消息驱动之生产者 2.3. 目前存在的问题 2.4. 分组解决重复消费问题 2.5. 消息持久化 三.函数式编程练习 本篇文章所涉及到的demo练习 使用的cloud 2021.0.3+ springboot2.6.8 一.概述简介 官网:https://docs.spring.io/spring-cloud-stream/docs/current/reference

随机推荐