最新SpringCloud Stream消息驱动讲解

目录
  • SpringCloud Stream消息驱动
    • 1、SpringCloud Stream概述
      • 1.1、设计思想
      • 1.2、标准的流程套路
      • 1.3、编码API和常用注解
    • 2、消息驱动之生产者(output)
      • 2.1、新建模块cloud-stream-rabbitmq-provider8801
      • 2.2、引入pom.xml配置文件
      • 2.3、YAML配置文件
      • 2.4、生产者启动类
      • 2.5、业务实现
      • 2.6、启动测试
    • 3、消息驱动之消费者(input)
      • 3.1、新建cloud-stream-rabbitmq-consumer8802模块
      • 3.2、引入pom.xml依赖
      • 3.3、添加YAML配置文件
      • 3.4、添加启动类StreamMQMain8802
      • 3.5、业务实现
      • 3.6、启动项目测试
    • 4、分组消费与持久化
      • 4.1、完整参考cloud-stream-rabbitmq-consumer8802,创建8803项目
      • 4.2、启动项目发现问题

SpringCloud Stream消息驱动

1、SpringCloud Stream概述

官方地址:https://spring.io/projects/spring-cloud-stream#overview

中文指导手册地址:https://m.wang1314.com/doc/webapp/topic/20971999.html

SpringCloud Stream 是一个构建消息驱动微服务的框架
应用程序通过 outputs 或 inputs 来与 SpringCloud Stream 中的 binder 对象交互
SpringCloud Stream 中的 binder 对象负责与消息中间件交互
通过 SpringCloud Stream 连接消息中间件,以实现消息事件驱动

什么是SpringCloudStream官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

目前仅支持RabbitMQ、Kafka。

1.1、设计思想

1、标注的MQ流程

生产者/消费者之间靠消息媒介传递信息内容【massage】

消息必须走特定的通道【消息通道MessageChannel】

消息通道里的消息如何被消费呢,谁负责收发处理

消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅

2、Cloud Stream的作用

比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区。

这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

3、什么是Binder

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。

通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。 4、Stream中的消息通信方式遵循了发布-订阅模式

使用Topic主题进行广播

  • 在RabbitMQ就是Exchange
  • 在Kakfa中就是Topic

1.2、标准的流程套路

1、Binder:很方便的连接中间件,屏蔽不同的差异

2、Channel

通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置

3、Source和Sink

简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

1.3、编码API和常用注解

组成和注解 描述
Middleware 中间件,目前只支持RabbitM和Kafka
Binder Binder是应用与消息中间的封装,目前实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现
@Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过通道离开应用程序
@StreamListener 监听队列,用户消费者的队列的消息接收
@EnableBinding 指通道channel和exchange绑定在一起

2、消息驱动之生产者(output)

2.1、新建模块cloud-stream-rabbitmq-provider8801

2.2、引入pom.xml配置文件

如果是需要Stream整合的就将依赖改为spring-cloud-starter-stream-kafka

<dependencies>
    <!--stream整合rabbit依赖-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!--基础配置-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2.3、YAML配置文件

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称,消息生产者
          destination: studyExchange # 表示要使用的Exchange名称定义【自定义】
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置【上面的配置】

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

2.4、生产者启动类

 package com.zcl.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 描述:消息生产者启动类
 *
 * @author zhong
 * @date 2022-09-22 12:19
 */
@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class, args);
    }
}

2.5、业务实现

2.5.1、服务接口实现类

自己创建一个实现的接口以及里面的方法

注意:在这个服务实现类里面不是使用@Service注解了,因为不是web应用,而是Stream消息驱动,是与中间件进行打交道的不是与数据库

package com.zcl.springcloud.service.Impl;

import com.zcl.springcloud.service.IMessageProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * 描述:发送接口实现类
 * 必须使用@EnableBinding(Source.class)注解开启消息推送管道
 *
 * @author zhong
 * @date 2022-09-22 12:24
 */
@Slf4j
@EnableBinding(Source.class)
public class IMessageProviderImpl implements IMessageProvider {

    /**
     * 消息发送管道
     */
    @Resource
    private MessageChannel output;

    /**
     * 发送消息
     * @return
     */
    @Override
    public String send() {
        // 定义消息
        String serial = UUID.randomUUID().toString();
        // 构建并发送消息
        this.output.send(MessageBuilder.withPayload(serial).build());
        log.info("-------------- " + serial + " ----------------");
        return serial;
    }
}

2.5.2、控制器实现

package com.zcl.springcloud.controller;

import com.zcl.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * 描述:消息发送控制器
 *
 * @author zhong
 * @date 2022-09-22 12:37
 */
@RestController
public class SendMessageController {

    /**
     * 注入消息发送管道接口
     */
    @Resource
    private IMessageProvider messageProvider;

    /**
     * 每调用一次接口发送一次消息
     * @return
     */
    @GetMapping(value = "/sendMessage")
    public String sendMessage()
    {
        return messageProvider.send();
    }
}

2.6、启动测试

  • 启动7001Eureka访问中心
  • 启动8801消息发送者,启动成功以及观察RabbitMQ的管理界面

3.访问接口发送消息,查看MQ的管理页面波峰情况

3、消息驱动之消费者(input)

同样的参考如下流程图

3.1、新建cloud-stream-rabbitmq-consumer8802模块

3.2、引入pom.xml依赖

与8801一样

3.3、添加YAML配置文件

配置文件与消息生产的区别在于:

output: # 这个名字是一个通道的名称
	destination: studyExchange # 表示要使用的Exchange名称定义
server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

3.4、添加启动类StreamMQMain8802

与消息生产者一样

3.5、业务实现

必须要有@Component注解注入到Spring容器中

package com.zcl.springcloud.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * 描述:消息消费者控制器
 *
 * @author zhong
 * @date 2022-09-22 13:18
 */
@Slf4j
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {

    /**
     * 注入消费者的端口号
     */
    @Value("${server.port}")
    private String port;

    /**
     * 监听消息
     * @param message
     * @return
     */
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        log.info("消费者1号接收到的消息 ----- " + message.getPayload() + " -----,port: " + port);
    }
}

3.6、启动项目测试

  • 启动7001
  • 启动8801,消息发送者
  • 启动8802,消息消费者
  • 8801发送消息,8802消费消息,并查看具体的MQ波峰图

控制器输出

4、分组消费与持久化

4.1、完整参考cloud-stream-rabbitmq-consumer8802,创建8803项目

除了启动的端口号不一样之外其他的配置都一样

4.2、启动项目发现问题

  • 启动7001(Eureka服务中心)
  • 启动8801(生产)、8802(消费)、8803(消费)
  • 测试发送消失是否两个消费者都可以接收到

4.2.1、重复消费

目前是8802/8803同时都收到了,存在重复消费问题

解决方案:分组和持久化属性group

常见案例

比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。

不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费。

4.2.2、分组

自定义配置分组,自定义分为同一个组,解决重复消费问题

配置文件分组

分别给8801、8802进行分组【orderA】

重启项目查看MQ管理

orderB是历史记录,上面的配置以及都分为了ordeerA组,进入orderA组可以查看实际的消费者数量

同一组内会发生竞争关系,只有其中一个可以消费,启动项目测试是否为真

4.2.3、持久化

通过上述,解决了重复消费问题,再看看持久化

  • 停止8802/8803并去除掉8802的分组group: atguiguA,8803保留
  • 8801先发送7条消息到rabbitmq

3.先启动8802,无分组属性配置,后台没有打出来消息

8802因为取消了groupA的分组所以获取不到持久化的数据(如果重启mq也会消失)

4.再启动8803,有分组属性配置,后台打出来了MQ上的消息

8803保存groupA的分组所以在启动的时候就会将持久化的数据消费

到此这篇关于SpringCloud Stream消息驱动的文章就介绍到这了,更多相关SpringCloud Stream消息驱动内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Springcloud Stream消息驱动工具使用介绍

    目录 springcloud Stream 什么是springcloud Stream 什么是Binder 为什么使用Stream Stream使用案例 前置知识 Stream处理消息的架构 Stream常用注解 消息生产者8801模块搭建 消息消费者8802模块搭建 Stream带来的问题 重复消费问题 自定义分组 持久化问题 springcloud Stream 什么是springcloud Stream   现在市面上有很多的消息中间件,每一个公司使用的都有所不同,为了减少学习的成本,sp

  • SpringCloud Stream消息驱动实例详解

    1. 消息驱动概述 1.1 是什么 在实际应用中有很多消息中间件,比如现在企业里常用的有ActiveMQ.RabbitMQ.RocketMQ.Kafka等,学习所有这些消息中间件无疑需要大量时间经历成本,那有没有一种技术,使我们不再需要关注具体的消息中间件的细节,而只需要用一种适配绑定的方式,自动的在各种消息中间件内切换呢?消息驱动就是这样的技术,它能 屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型. SpringCloud Stream是一个构件消息驱动微服务的框架.应用程序通过i

  • Springcloud整合stream,rabbitmq实现消息驱动功能

    springcloud整合stream,rabbitmq实现消息驱动功能 1.代码实现: 创建项目stream 添加依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.2</version> <relativePath/>

  • 最新SpringCloud Stream消息驱动讲解

    目录 SpringCloud Stream消息驱动 1.SpringCloud Stream概述 1.1.设计思想 1.2.标准的流程套路 1.3.编码API和常用注解 2.消息驱动之生产者(output) 2.1.新建模块cloud-stream-rabbitmq-provider8801 2.2.引入pom.xml配置文件 2.3.YAML配置文件 2.4.生产者启动类 2.5.业务实现 2.6.启动测试 3.消息驱动之消费者(input) 3.1.新建cloud-stream-rabbit

  • Spring Cloud Stream消息驱动组件使用方法介绍

    目录 1.Stream解决的痛点问题 2.Stream重要概念 3.传统MQ模型与Stream消息驱动模型 4.Stream消息通信方式及编程模型 4.1.Stream消息通信方式 4.2.Stream编程注解 4.3.Stream消息驱动之开发生产者端 4.4.Stream消息驱动之开发消费者端 5.Stream高级之自定义消息通道 6.Stream高级之消息分组 MQ:消息队列/消息中间件/消息代理,产品有很多,ActiveMQ RabbitMQ RocketMQ Kafka 1.Strea

  • SpringCloud Stream使用解析

    SpringCloudStream 官方定义Spring Cloud Stream 是一个构建消息驱动微服务的框架. 应用通过inputs和outputs来与Spring Cloud Stream中binder对象交互.通过我们配置来binding(绑定),而Spring Cloud Stream中的binder对象负责与消息中间件交互.所以,我们只需要搞清楚如何与Spring Cloud Stream 交互就可以方便使用消息驱动的方式. 通过使用Spring Integration来连接消息代

  • SpringCloud Stream 整合RabbitMQ的基本步骤

    目录 一.项目介绍 二.生产者 三.消费者 四.验证 在postman 访问生产者接口: 本篇简单介绍SpringCloud Stream 整合RabbitMQ基本步骤: 引入SpringCloud 引入SpringCloud Stream相关依赖 定义绑定接口: 消息生产者(Output…Binding) .消息消费者(Input…Binding) @EnableBinding 在对应类上进行定义 @StreamListener 在对应方法上创建监听用来消费消息 调用output的send()

  • Springcloud Bus消息总线原理是实现详解

    目录 springcloud Bus 什么是springcloud Bus 什么是消息总线 Bus实现自动刷新的原理 RabbitMQ的下载配置 Erlang RabbitMQ Bus动态刷新 全局广播通知代码实现 定点通知代码实现 springcloud Bus 什么是springcloud Bus   上一章的springcloud Bus是对分布式微服务的远程配置,但是有一个遗留的问题就是,Config客户端对远程配置的刷新需要手动使用post请求来完成,这就使得Config客户端动态刷新

  • PHP实现登陆并抓取微信列表中最新一组微信消息的方法

    本文实例讲述了PHP实现登陆并抓取微信列表中最新一组微信消息的方法.分享给大家供大家参考,具体如下: <?php $_G['wx_g'] = array('init' => array( "wx_content" => array("weixin_user" => "微信号码", "weixin_pass" => "微信密码") ) ); wx_login(); $messge

  • EJB3.0部署消息驱动Bean抛javax.naming.NameNotFoundException异常

    在部署EJB的消息驱动Bean时遇到了如下的错误: ERROR [org.jboss.resource.adapter.jms.inflow.JmsActivation] (WorkManager(2)-2) Unable to reconnect   org.jboss.resource.adapter.jms.inflow.JmsActivationSpec@2705ea(ra=org.jboss.resource.adapter.jms.  JmsResourceAdapter@73761

随机推荐