一文快速掌握Spring Cloud Stream

目录
  • 一、概述简介
    • 1.1. cloud Stream是什么
    • 1.2. 设计思想
    • 1.4. 注解
  • 二、基于注解代码练习
    • 2.1. 消息驱动之生产者
    • 2.3. 目前存在的问题
    • 2.4. 分组解决重复消费问题
    • 2.5. 消息持久化
  • 三、函数式编程练习

本篇文章所涉及到的demo练习 使用的cloud 2021.0.3+ springboot2.6.8

一、概述简介

官网:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/

官网概述:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-preface-notable-deprecations

1.1. cloud Stream是什么

官方定义:Spring Cloud Stream是一个用于构建 与 共享消息系统 连接的高度可扩展的事件驱动微服务

目前主流的消息框架有:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

假设公司业务项目用了RabbitMQ,而大数据项目用了Kafka。这时候就会出现有两个消息框架,相对于程序员来说其实并不友好,还得两个都掌握,正常对于一个程序员来说熟练一个消息框架都不错了,何况还搞了两个,并且两个维护起来也不好维护。

RabbitMQ和Kafka是两个不同的框架,两个消息模型上也存在着差异,并且代码上用法也不一样。Spring Cloud Stream就是不再关注具体MQ的细节,可以在不改代码的基础上,来完成Rabbit和Kafka两个不同的消息中间件的切换(这里的切换指的是原本用的RabbitMQ,但是用着用着发现kafka比较符合,所以想要换框架)。

总结成一句话:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型

注意:遗憾的是目前仅支持RabbitMQ、Kafka

1.2. 设计思想

常规的MQ设计如下:

  • Message:生产者/消费者之间靠消息媒介传递信息内容
  • MessageChannel:消息必须走特定的通道
  • 队列:假如发消息会先发到消息队列当中
  • 消息队列的消息如何被消费呢:订阅的人可以进行消费

cloud Stream设计如下:

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程

注意:左图是官网的架构图

Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。

stream为了屏蔽差异,抽象出来了一个Binder层,而目前为止,只提供了两个框架的实现,通过具体的实现来连接消息中间件。

假如想要通过stream连接RabbitMQ就使用:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

假如想要通过stream连接Kafka就使用:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

Stream中的消息通信方式遵循了发布-订阅模式,Topic主题进行广播,在RabbitMQ就是Exchange,在Kakfa中就是Topic

1.3. 标准流程

  • Binder: 很方便的连接中间件,屏蔽差异
  • Channel: 通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channe对队列进行配置
  • Source(源:发送者)和Sink(水槽:接受者): 简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

1.4. 注解

注解完全是基于官方给的模型而定的!通过stream使用消息中间件也是非常简单的,直接使用以下注解就可以使用。

注意:注解依然是能用的,但是官方明确表示注解已经被弃用,弃用并不是不能用,而是用了会画横杠不建议用。但是功能是没有问题的,低版本的cloud是没有被弃用的。针对于注解和函数式编程两种我都会进行使用。

题外话:学技术永远是这样,技术一直在不断的更新迭代,真正学习一个技术并不是要掌握编码使用,而是要掌握他到底是什么,能干什么,要去深入理解他,对于编码,我认为其实不是很重要。就算你今天掌握了官方最新用法,回头人家又改写法了。

二、基于注解代码练习

生产者就是消息发送者,消费者就是消息接受者。这里我就不用kafka了,我直接用的是RabbitMQ。

windows下安装RabbitMQ:https://blog.csdn.net/weixin_43888891/article/details/126514021

2.1. 消息驱动之生产者

1.创建项目(可以是聚合可以是普通springboot项目)
2.添加pom

因为是和RabbitMQ整合,所以就是引入的spring-cloud-starter-stream-rabbit启动器

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <springboot.version>2.6.8</springboot.version>
    <springcloud.version>2021.0.3</springcloud.version>
</properties>

<dependencyManagement>
    <dependencies>
        <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-dependencies</artifactId>
           <version>${springboot.version}</version>
           <type>pom</type>
           <scope>import</scope>
       </dependency>
        <dependency>
           <groupId>org.springframework.cloud</groupId>
           <artifactId>spring-cloud-dependencies</artifactId>
           <version>${springcloud.version}</version>
           <type>pom</type>
           <scope>import</scope>
       </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
</dependencies>

3.添加application配置

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置

4.添加接口

public interface IMessageProvider {
    public String send();
}

5.添加实现类

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;

import javax.annotation.Resource;
import java.util.UUID;

// 可以理解为是一个消息的发送管道的定义
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {

    // 消息的发送管道
    @Resource
    private MessageChannel output;

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        // 创建并发送消息
        this.output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("***serial: " + serial);
        return serial;
    }
}

6.添加controller控制器

@RestController
public class SendMessageController {

    @Autowired
    private IMessageProvider iMessageProvider;

    @GetMapping("send")
    public String send() {
        return iMessageProvider.send();
    }
}

7.测试

(1)首先要保证RabbitMQ是可以访问的:http://localhost:15672

(2)启动项目访问:http://localhost:8801/send

下图波峰代表发送消息成功

启动后会创建交换机,名称就是application.yml当中的destination属性设置的

注意:停止服务后并没有删除交换机!!!

2.2. 消息驱动之消费者

1.创建项目
2.添加pom(pom和发送者依赖一模一样)

3.添加application配置

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置

4.添加监听(消费者只负责接受消息)

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("消费者1号,------->接收到的消息:" + message.getPayload() + "\t port: " + serverPort);
    }
}

5.测试

(1)启动RabbitMQ
(2)启动发送消息端服务
(3)启动消费者服务,启动后会发现,他自动会向这个交换机当中添加一个队列。

发送消息:http://localhost:8801/send
接受消息:

注意:当停止服务后消息队列会被自动删除!!!

2.3. 目前存在的问题

1.依照8802, clone出来一份运行8803,主要用来演示多个消费者的场景
2.启动8801生产者
3.启动8802消费者
4.启动8803消费者

当三个服务都启动后通过RabbitMQ界面会发现,一个交换机绑定了两个队列

运行后会发现存在两个问题:

有重复消费问题消息持久化问题

(1)重复消费问题:

发送消息后两个消费者都收到了消息:http://localhost:8801/send

比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费。

(2)消息持久化问题:

当生产者发送消息的时候,消费者恰好宕机了,但是过一会消费者恢复了,但是消息却没收到。那也就是意味着消息队列是临时消息队列。针对于这一点,大家也可以测试一下,加深一下印象。

2.4. 分组解决重复消费问题

原理: 微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。同一个组内会发生竞争关系,只有其中一个可以消费。

接下来直接调整两个消费者为同一个组:添加如下配置

当两个消费者都设置好后启动,会发现一个问题: 实际上分到一个组对于RabbitMQ来说就是两个消费者监听了一个队列。一个队列那也就意味着,当队列收到一条消息,哪个消费者谁先消费就是谁的,消费完队列里面就没有了,也就是只有一个消费者能消费到消息!

注意:假如不设置group属性的时候,默认是启动一个消费者,就会创建一个消费队列,启动多个服务就会创建多个队列。stream默认使用的是RabbitMQ的topic交换机。当发送者向这个交换机发送消息的时候,两个队列就都会接收到。关于RabbitMQ相关知识本篇不记录,后续会专门写RabbitMQ相关文章。

最终测试:8802/8803实现了轮询分组,每次只有一个消费者8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。

2.5. 消息持久化

当三个项目都启动着的时候,现在我们要做几件事:

停止8802和8803并去除掉8802的分组group: gxs,8803不去分组信息,停止掉项目的时候会发现消息队列并没有删除,说明一旦设置分组信息,消息队列就不再是临时队列。

2.8801发送4条消息启动8802然后消息并没有打印,没有收到消息(注意8802是去掉分组信息的)再启动8803,有分组属性配置,后台打出来了MQ上的消息

原因就是:当两个项目都停止的时候,队列并未删除,而8803还绑定了这个队列,所以他就算宕机了,又重启了,依然可以收到消息。而8802没有设置分组信息,他再启动后系统会给他创建一个临时队列,自然而然收不到之前的消息了。

三、函数式编程练习

官网介绍:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring_cloud_function

到此这篇关于Spring Cloud Stream详解的文章就介绍到这了,更多相关Spring Cloud Stream内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Springcloud整合stream,rabbitmq实现消息驱动功能

    springcloud整合stream,rabbitmq实现消息驱动功能 1.代码实现: 创建项目stream 添加依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.2</version> <relativePath/>

  • SpringCloud Stream 整合RabbitMQ的基本步骤

    目录 一.项目介绍 二.生产者 三.消费者 四.验证 在postman 访问生产者接口: 本篇简单介绍SpringCloud Stream 整合RabbitMQ基本步骤: 引入SpringCloud 引入SpringCloud Stream相关依赖 定义绑定接口: 消息生产者(Output…Binding) .消息消费者(Input…Binding) @EnableBinding 在对应类上进行定义 @StreamListener 在对应方法上创建监听用来消费消息 调用output的send()

  • Spring Cloud Stream异常处理过程解析

    应用处理 当消费者在处理接收到的消息时,有可能会由于某些原因而抛出异常.若希望对抛出来的异常进行处理的话,就需要采取一些异常处理手段,异常处理的方式可分为三种:应用层面的处理.系统层面的处理以及通过RetryTemplate进行处理. 本小节先来介绍较为常用的应用层面的异常处理方式,该方式又细分为局部处理和全局处理. 局部处理 Stream相关的配置内容如下: spring: cloud: stream: rocketmq: binder: name-server: 192.168.190.12

  • spring-cloud-stream结合kafka使用详解

    1.pom文件导入依赖 <!-- kafka --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> 2.application.yml文件配置 spring: cloud: stream: kafka: bind

  • Spring Cloud Stream简单用法

    目录 简单使用Spring Cloud Stream 构建基于RocketMQ的生产者和消费者 生产者 消费者 Stream其他特性 消息发送失败的处理 消费者错误处理 Spring Cloud Stream对Spring Cloud体系中的Mq进⾏了很好的上层抽象,可以让我们与具体消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,就像Hibernate屏蔽掉了具体数据库(Mysql/Oracle⼀样).如此⼀来,我们学习.开发.维护MQ都会变得轻松.⽬前Spring Cloud St

  • SpringCloud Stream使用解析

    SpringCloudStream 官方定义Spring Cloud Stream 是一个构建消息驱动微服务的框架. 应用通过inputs和outputs来与Spring Cloud Stream中binder对象交互.通过我们配置来binding(绑定),而Spring Cloud Stream中的binder对象负责与消息中间件交互.所以,我们只需要搞清楚如何与Spring Cloud Stream 交互就可以方便使用消息驱动的方式. 通过使用Spring Integration来连接消息代

  • 一文快速掌握Spring Cloud Stream

    目录 一.概述简介 1.1. cloud Stream是什么 1.2. 设计思想 1.4. 注解 二.基于注解代码练习 2.1. 消息驱动之生产者 2.3. 目前存在的问题 2.4. 分组解决重复消费问题 2.5. 消息持久化 三.函数式编程练习 本篇文章所涉及到的demo练习 使用的cloud 2021.0.3+ springboot2.6.8 一.概述简介 官网:https://docs.spring.io/spring-cloud-stream/docs/current/reference

  • Spring Cloud Stream微服务消息框架原理及实例解析

    随着近些年微服务在国内的盛行,消息驱动被提到的越来越多.主要原因是系统被拆分成多个模块后,一个业务往往需要在多个服务间相互调用,不管是采用HTTP还是RPC都是同步的,不可避免快等慢的情况发生,系统性能上很容易遇到瓶颈.在这样的背景下,将业务中实时性要求不是特别高且非主干的部分放到消息队列中是很好的选择,达到了异步解耦的效果. 目前消息队列有很多优秀的中间件,目前使用较多的主要有 RabbitMQ,Kafka,RocketMQ 等,这些中间件各有优势,有的对 AMQP(应用层标准高级消息队列协议

  • idea快速搭建spring cloud注册中心与注册的方法

    spring cloud快速搭建 Spring Cloud是一个微服务框架,它基于spring boot, Spring Cloud提供的全套的分布式系统解决方案 . 首先我们使用gradle来创建: 选择JDK以及勾选Java,然后下一步 起包名已经项目名,下一步: 选择我们本地的gradle包,一直下一步,点击build.gradle并添加我们的依赖: group 'com.gaofei' version '1.0-SNAPSHOT' //gradle使用的插件 apply plugin:

  • 详解Spring Cloud Stream使用延迟消息实现定时任务(RabbitMQ)

    我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次.然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,比如:编写博文时候,设置2小时之后发送.对于这些开始时间不确定的定时任务,我们也可以通过Spring Cloud Stream来很好的处理. 为了实现开始时间不确定的定时任务触发,我们将引入延迟消息的使用.RabbitMQ中提供了关于延迟消息的插件,所以本文就来具体介绍

  • Spring Cloud Stream如何实现服务之间的通讯

    Spring Cloud Stream Srping cloud Bus的底层实现就是Spring Cloud Stream,Spring Cloud Stream的目的是用于构建基于消息驱动(或事件驱动)的微服务架构.Spring Cloud Stream本身对Spring Messaging.Spring Integration.Spring Boot Actuator.Spring Boot Externalized Configuration等模块进行封装(整合)和扩展,下面我们实现两个

  • Spring Cloud Stream分区分组原理图解

    消息分组 通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上.默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能. 当把消费者复制一份,发现2个都能收到消息 2个消费者都加入同一个消费者 发现只有一个能收到 消息分区 有一些场

  • Spring Cloud Stream 高级特性使用详解

    目录 重试 消息发送失败的处理 消费错误处理 自定义MessageHandler类型 Endpoint端点 Metrics指标 Serverless Partition统一 Polling Consumer 支持多个Binder同时使用 建立事件机制 重试 Consumer端可以配置重试次数,当消息消费失败的时候会进行重试. 底层使用Spring Retry去重试,重试次数可自定义配置. # 默认重试次数为3,配置大于1时才会生效 spring.cloud.stream.bindings.<ch

  • Spring Cloud Stream消息驱动组件使用方法介绍

    目录 1.Stream解决的痛点问题 2.Stream重要概念 3.传统MQ模型与Stream消息驱动模型 4.Stream消息通信方式及编程模型 4.1.Stream消息通信方式 4.2.Stream编程注解 4.3.Stream消息驱动之开发生产者端 4.4.Stream消息驱动之开发消费者端 5.Stream高级之自定义消息通道 6.Stream高级之消息分组 MQ:消息队列/消息中间件/消息代理,产品有很多,ActiveMQ RabbitMQ RocketMQ Kafka 1.Strea

随机推荐