Springcloud Stream消息驱动工具使用介绍

目录
  • springcloud Stream
    • 什么是springcloud Stream
      • 什么是Binder
      • 为什么使用Stream
  • Stream使用案例
    • 前置知识
      • Stream处理消息的架构
      • Stream常用注解
    • 消息生产者8801模块搭建
    • 消息消费者8802模块搭建
  • Stream带来的问题
    • 重复消费问题
      • 自定义分组
    • 持久化问题

springcloud Stream

什么是springcloud Stream

  现在市面上有很多的消息中间件,每一个公司使用的都有所不同,为了减少学习的成本,springcloud Stream可以让我们不再关注消息中间件MQ的具体细节,我们只需要通过适配绑定的方式即可实现不同MQ之间的切换,但是遗憾的是springcloud Stream目前只支持RabbitMQ和Kafka。

  SpringCloud Stream是一个构建消息驱动微服务的框架,应用程序通过inputs或者 outputs来与SpringCloud Stream中的binder进行交互,我们可以通过配置来binding ,而 SpringCloud Stream 的binder负责与中间件交互,所以我们只需要搞清楚如何与Stream交互就可以很方便的使用消息驱动了!

什么是Binder

  Binder是SpringCloud Stream的一个抽象概念,是应用与消息中间件之间的粘合剂,通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离,可以动态的改变消息的destinations(对应于 Kafka的topic,RabbitMQ的exchanges),这些都可以通过外部配置项来做到,甚至可以任意的改变中间件的类型但是不需要修改一行代码

为什么使用Stream

  比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同像RabbitMQ有exchange,kafka有Topic和Partitions分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移;这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这衬候springcloud Stream给我们提供了一种解耦合的方式。

Stream使用案例

前置知识

Stream处理消息的架构

  Source、Sink: 简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。Channel: 通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介。Binder: 消息的生产者和消费者中间层,实现了应用程序与消息中间件细节之间的隔离

  通过以上两张图片可知,消息的处理流向是:消息生产者处理完业务逻辑之后消息到达source中,接着前往Channel通道进行排队,然后通过binder绑定器将消息数据发送到底层mq,然后又通过binder绑定器接收到底层mq发送来的消息数据,接着前往Channel通道进行排队,由Sink接收到消息数据,消息消费者拿到消息数据执行相应的业务逻辑

Stream常用注解

消息生产者8801模块搭建

  第一步: 创建一个maven模块,引入相关依赖,最主要的就是stream整合rabbitmq的依赖

<!--stream的rabbitmq依赖-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

  第二步: 配置文件的编写

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

  第三步: 主程序类

@SpringBootApplication
public class CloudStreamRabbitmqProvider8801Application {
    public static void main(String[] args) {
        SpringApplication.run(CloudStreamRabbitmqProvider8801Application.class, args);
        System.out.println("启动成功");
    }
}

  第四步: 业务层service代码编写,注意:这里实现类注入的对象由之前的dao层对象换成了channel通道对象,详细的发送由实现类的第12完成

public interface IMessageProviderService {
    /**
     * 定义消息的推送管道
     *
     * @return
     */
    String send();
}
@EnableBinding(Source.class)
public class MessageProviderServiceImpl implements IMessageProviderService {
    /**
     * 消息发送管道/信道
     */
    @Resource
    private MessageChannel output;
    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());

        System.out.println("*****serial: " + serial);
        return serial;
    }
}

  第五步: controller接口

@RestController
public class SendMessageController {
    @Resource
    private IMessageProviderService messageProviderService;
    @GetMapping(value = "/sendMessage")
    public String sendMessage() {
        return messageProviderService.send();
    }
}

消息消费者8802模块搭建

  第一步: 创建一个maven模块,引入相关依赖,最主要的就是stream整合rabbitmq的依赖

<!--stream的rabbitmq依赖-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

  第二步: 配置文件的编写,与生产者的区别就在于bindings下的是input而不是output

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置
eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

  第三步: 主程序类

@SpringBootApplication
public class CloudStreamRabbitmqConsumer8802Application {
    public static void main(String[] args) {
        SpringApplication.run(CloudStreamRabbitmqConsumer8802Application.class, args);
        System.out.println("启动成功");
    }
}

  第四步: controller接口,使用url请求生产者8801,即可在消费者8802端接收到8801发送的消息

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
    @Value("${server.port}")
    private String serverPort;
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("消费者1号 ----> port:" + serverPort + "\t从8801接受到的消息是:" + message.getPayload());
    }
}

  两个模块搭建完成进行测试,首先启动注册中心7001,然后分别启动消息生产者8801和消息消费者8802,通过url请求访问8001的发送消息请求,会向指定管道中发送一条消息,如果此时这个管道中有消费者即可接收到这条消息。而如何指定消息的管道归属呢,就是通过配置文件中的indings.input.destination来指定,命名相同的服务就会处在同一条管道中

Stream带来的问题

重复消费问题

  按照之前的使用,会带来重复消费问题: 也就是说一个通道上有不止一个消息消费者,stream上默认每一个消费者都属于不同的组,这样的话就会导致这个消息被多个组的消费者重复消费

  知道了问题出现的原因就很容易解决了,只要我们自定义配置分组,将这些消费者都分配到同一个组中就能避免重复消费的问题出现了(同一个组间的消费者是竞争关系,不管组间有多少的消费者都只会消费一次)

自定义分组

  只需要在配置文件修改一处配置即可实现自定义组名并且自定义分组,组名相同的服务会被分配到同一组,通道内的消息数据会被该组中的所有消费者轮询消费

持久化问题

  上面自定义分组使用的group配置除了可以自定义分组和分组名之外,还可以实现消息的持久化,也就是说使用group配置自定义分组和分组名的消息消费者,就算在消息生产者发送消息的时候挂掉了,等这个消费者重启之后依然是能够消费之前发送的消息

这里一个生产者和两个消费者存在以下十三种情况(生产者发送四次消息):

1、都使用group分组的两个不同组成员,在生产者生产的时候

  • 都没挂(各消费四次)
  • 挂了其中一个(各消费四次)
  • 都挂了(各消费四次)

2、都使用group分组的两个同组成员,在生产者生产的时候

  • 都没挂(各消费两次)
  • 挂了其中一个(没挂的把四次消费完)
  • 都挂了(各消费两次)

3、其中一个使用group分组的两个成员,在生产者生产的时候

  • 都挂了(都不消费)
  • group的挂了(各消费四次)
  • 没group的挂了(没挂的消费四次,挂的由于没有持久化所以不消费)
  • 都没挂(各消费四次)

4、都不使用group分组的两个成员,在生产者生产的时候

  • 都挂了(都不消费)
  • 挂了其中一个(没挂的消费四次,挂的由于没有持久化所以不消费)
  • 都没挂(各消费四次)

  总之一句话,通道里的消息会持久化给使用group配置的消息消费者(每一组都有一份),就算发送消息的时候这些消费者挂了,如果同组的消费者有没挂的就会把这些消息竞争消费完;如果同组没有消费者,等他重启之后还是会消费这些消息

到此这篇关于Springcloud Stream消息驱动工具使用介绍的文章就介绍到这了,更多相关Springcloud Stream内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • SpringCloud Stream消息驱动实例详解

    1. 消息驱动概述 1.1 是什么 在实际应用中有很多消息中间件,比如现在企业里常用的有ActiveMQ.RabbitMQ.RocketMQ.Kafka等,学习所有这些消息中间件无疑需要大量时间经历成本,那有没有一种技术,使我们不再需要关注具体的消息中间件的细节,而只需要用一种适配绑定的方式,自动的在各种消息中间件内切换呢?消息驱动就是这样的技术,它能 屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型. SpringCloud Stream是一个构件消息驱动微服务的框架.应用程序通过i

  • Springcloud整合stream,rabbitmq实现消息驱动功能

    springcloud整合stream,rabbitmq实现消息驱动功能 1.代码实现: 创建项目stream 添加依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.2</version> <relativePath/>

  • Springcloud Stream消息驱动工具使用介绍

    目录 springcloud Stream 什么是springcloud Stream 什么是Binder 为什么使用Stream Stream使用案例 前置知识 Stream处理消息的架构 Stream常用注解 消息生产者8801模块搭建 消息消费者8802模块搭建 Stream带来的问题 重复消费问题 自定义分组 持久化问题 springcloud Stream 什么是springcloud Stream   现在市面上有很多的消息中间件,每一个公司使用的都有所不同,为了减少学习的成本,sp

  • 最新SpringCloud Stream消息驱动讲解

    目录 SpringCloud Stream消息驱动 1.SpringCloud Stream概述 1.1.设计思想 1.2.标准的流程套路 1.3.编码API和常用注解 2.消息驱动之生产者(output) 2.1.新建模块cloud-stream-rabbitmq-provider8801 2.2.引入pom.xml配置文件 2.3.YAML配置文件 2.4.生产者启动类 2.5.业务实现 2.6.启动测试 3.消息驱动之消费者(input) 3.1.新建cloud-stream-rabbit

  • Spring Cloud Stream消息驱动组件使用方法介绍

    目录 1.Stream解决的痛点问题 2.Stream重要概念 3.传统MQ模型与Stream消息驱动模型 4.Stream消息通信方式及编程模型 4.1.Stream消息通信方式 4.2.Stream编程注解 4.3.Stream消息驱动之开发生产者端 4.4.Stream消息驱动之开发消费者端 5.Stream高级之自定义消息通道 6.Stream高级之消息分组 MQ:消息队列/消息中间件/消息代理,产品有很多,ActiveMQ RabbitMQ RocketMQ Kafka 1.Strea

  • SpringCloud Stream使用解析

    SpringCloudStream 官方定义Spring Cloud Stream 是一个构建消息驱动微服务的框架. 应用通过inputs和outputs来与Spring Cloud Stream中binder对象交互.通过我们配置来binding(绑定),而Spring Cloud Stream中的binder对象负责与消息中间件交互.所以,我们只需要搞清楚如何与Spring Cloud Stream 交互就可以方便使用消息驱动的方式. 通过使用Spring Integration来连接消息代

  • SpringCloud Stream 整合RabbitMQ的基本步骤

    目录 一.项目介绍 二.生产者 三.消费者 四.验证 在postman 访问生产者接口: 本篇简单介绍SpringCloud Stream 整合RabbitMQ基本步骤: 引入SpringCloud 引入SpringCloud Stream相关依赖 定义绑定接口: 消息生产者(Output…Binding) .消息消费者(Input…Binding) @EnableBinding 在对应类上进行定义 @StreamListener 在对应方法上创建监听用来消费消息 调用output的send()

  • Springcloud Bus消息总线原理是实现详解

    目录 springcloud Bus 什么是springcloud Bus 什么是消息总线 Bus实现自动刷新的原理 RabbitMQ的下载配置 Erlang RabbitMQ Bus动态刷新 全局广播通知代码实现 定点通知代码实现 springcloud Bus 什么是springcloud Bus   上一章的springcloud Bus是对分布式微服务的远程配置,但是有一个遗留的问题就是,Config客户端对远程配置的刷新需要手动使用post请求来完成,这就使得Config客户端动态刷新

  • SpringCloud Gateway DispatcherHandler调用方法详细介绍

    目录 前言 DispatcherHandler类声明 handle方法 最后总结一下 前言 之前几节我们分析了请求是如何调用到HttpWebHandlerAdapter的,然后再调用到DispatcherHandler中,通过handle方法来处理具体的请求. DispatcherHandler的注入在自动装配那一节已经说过了,忘记的同学可以重新会看一下. DispatcherHandler类声明 public class DispatcherHandler implements WebHand

  • Java语言Lang包下常用的工具类介绍

    无论你在开发哪中 Java 应用程序,都免不了要写很多工具类/工具函数.你可知道,有很多现成的工具类可用,并且代码质量都很不错,不用你写,不用你调试,只要你发现. 在 Apache Jakarta Common 中, Lang 这个 Java 工具包是所有 Apache Jakarta Common 项目中被使用最广泛的,几乎你所知道的名气比较大的软件里面都有用到它,包括 Tomcat, Weblogic, Websphere, Eclipse 等等.我们就从这个包开始介绍整个 common 项

随机推荐