Spring Cloud Stream如何实现服务之间的通讯

Spring Cloud Stream

Srping cloud Bus的底层实现就是Spring Cloud Stream,Spring Cloud Stream的目的是用于构建基于消息驱动(或事件驱动)的微服务架构。Spring Cloud Stream本身对Spring Messaging、Spring Integration、Spring Boot Actuator、Spring Boot Externalized Configuration等模块进行封装(整合)和扩展,下面我们实现两个服务之间的通讯来演示Spring Cloud Stream的使用方法。

整体概述

服务要想与其他服务通讯要定义通道,一般会定义输出通道和输入通道,输出通道用于发送消息,输入通道用于接收消息,每个通道都会有个名字(输入和输出只是通道类型,可以用不同的名字定义很多很多通道),不同通道的名字不能相同否则会报错(输入通道和输出通道不同类型的通道名称也不能相同),绑定器是操作RabbitMQ或Kafka的抽象层,为了屏蔽操作这些消息中间件的复杂性和不一致性,绑定器会用通道的名字在消息中间件中定义主题,一个主题内的消息生产者来自多个服务,一个主题内消息的消费者也是多个服务,也就是说消息的发布和消费是通过主题进行定义和组织的,通道的名字就是主题的名字,在RabbitMQ中主题使用Exchanges实现,在Kafka中主题使用Topic实现。

准备环境

创建两个项目spring-cloud-stream-a和spring-cloud-stream-b,spring-cloud-stream-a我们用Spring Cloud Stream实现通讯,spring-cloud-stream-b我们用Spring Cloud Stream的底层模块Spring Integration实现通讯。

两个项目的POM文件依赖都是:

<dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-test-support</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

spring-cloud-stream-binder-rabbit是指绑定器的实现使用RabbitMQ。

项目配置内容application.properties:

spring.application.name=spring-cloud-stream-a
server.port=9010

#设置默认绑定器
spring.cloud.stream.defaultBinder = rabbit

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.application.name=spring-cloud-stream-b
server.port=9011

#设置默认绑定器
spring.cloud.stream.defaultBinder = rabbit

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

启动一个rabbitmq:

docker pull rabbitmq:3-management
docker run -d --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

编写A项目代码

在A项目中定义一个输入通道一个输出通道,定义通道在接口中使用@Input和@Output注解定义,程序启动的时候Spring Cloud Stream会根据接口定义将实现类自动注入(Spring Cloud Stream自动实现该接口不需要写代码)。

A服务输入通道,通道名称ChatExchanges.A.Input,接口定义输入通道必须返回SubscribableChannel:

public interface ChatInput {
  String INPUT = "ChatExchanges.A.Input";
  @Input(ChatInput.INPUT)
  SubscribableChannel input();
}

A服务输出通道,通道名称ChatExchanges.A.Output,输出通道必须返回MessageChannel:

public interface ChatOutput {

  String OUTPUT = "ChatExchanges.A.Output";

  @Output(ChatOutput.OUTPUT)
  MessageChannel output();
}

定义消息实体类:

public class ChatMessage implements Serializable {

  private String name;
  private String message;
  private Date chatDate;

  //没有无参数的构造函数并行化会出错
  private ChatMessage(){}

  public ChatMessage(String name,String message,Date chatDate){
    this.name = name;
    this.message = message;
    this.chatDate = chatDate;
  }

  public String getName(){
    return this.name;
  }

  public String getMessage(){
    return this.message;
  }

  public Date getChatDate() { return this.chatDate; }

  public String ShowMessage(){
    return String.format("聊天消息:%s的时候,%s说%s。",this.chatDate,this.name,this.message);
  }
}

在业务处理类上用@EnableBinding注解绑定输入通道和输出通道,这个绑定动作其实就是创建并注册输入和输出通道的实现类到Bean中,所以可以直接是使用@Autowired进行注入使用,另外消息的串行化默认使用application/json格式(com.fastexml.jackson),最后用@StreamListener注解进行指定通道消息的监听:

//ChatInput.class的输入通道不在这里绑定,监听到数据会找不到AClient类的引用。
//Input和Output通道定义的名字不能一样,否则程序启动会抛异常。
@EnableBinding({ChatOutput.class,ChatInput.class})
public class AClient {

  private static Logger logger = LoggerFactory.getLogger(AClient.class);

  @Autowired
  private ChatOutput chatOutput;

  //StreamListener自带了Json转对象的能力,收到B的消息打印并回复B一个新的消息。
  @StreamListener(ChatInput.INPUT)
  public void PrintInput(ChatMessage message) {

    logger.info(message.ShowMessage());

    ChatMessage replyMessage = new ChatMessage("ClientA","A To B Message.", new Date());

    chatOutput.output().send(MessageBuilder.withPayload(replyMessage).build());
  }
}

到此A项目代码编写完成。

编写B项目代码

B项目使用Spring Integration实现消息的发布和消费,定义通道时我们要交换输入通道和输出通道的名称:

public interface ChatProcessor {

  String OUTPUT = "ChatExchanges.A.Input";
  String INPUT = "ChatExchanges.A.Output";

  @Input(ChatProcessor.INPUT)
  SubscribableChannel input();

  @Output(ChatProcessor.OUTPUT)
  MessageChannel output();
}

消息实体类:

public class ChatMessage {
  private String name;
  private String message;
  private Date chatDate;

  //没有无参数的构造函数并行化会出错
  private ChatMessage(){}

  public ChatMessage(String name,String message,Date chatDate){
    this.name = name;
    this.message = message;
    this.chatDate = chatDate;
  }

  public String getName(){
    return this.name;
  }

  public String getMessage(){
    return this.message;
  }

  public Date getChatDate() { return this.chatDate; }

  public String ShowMessage(){
    return String.format("聊天消息:%s的时候,%s说%s。",this.chatDate,this.name,this.message);
  }
}

业务处理类用@ServiceActivator注解代替@StreamListener,用@InboundChannelAdapter注解发布消息:

@EnableBinding(ChatProcessor.class)
public class BClient {

  private static Logger logger = LoggerFactory.getLogger(BClient.class);

  //@ServiceActivator没有Json转对象的能力需要借助@Transformer注解
  @ServiceActivator(inputChannel=ChatProcessor.INPUT)
  public void PrintInput(ChatMessage message) {

    logger.info(message.ShowMessage());
  }

  @Transformer(inputChannel = ChatProcessor.INPUT,outputChannel = ChatProcessor.INPUT)
  public ChatMessage transform(String message) throws Exception{
    ObjectMapper objectMapper = new ObjectMapper();
    return objectMapper.readValue(message,ChatMessage.class);
  }

  //每秒发出一个消息给A
  @Bean
  @InboundChannelAdapter(value = ChatProcessor.OUTPUT,poller = @Poller(fixedDelay="1000"))
  public GenericMessage<ChatMessage> SendChatMessage(){
    ChatMessage message = new ChatMessage("ClientB","B To A Message.", new Date());
    GenericMessage<ChatMessage> gm = new GenericMessage<>(message);
    return gm;
  }
}

运行程序

启动A项目和B项目:

源码

Github仓库:https://github.com/sunweisheng/spring-cloud-example

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 创建网关项目(Spring Cloud Gateway)过程详解

    创建网关项目 加入网关后微服务的架构图 创建项目 POM文件 <properties> <java.version>1.8</java.version> <spring-cloud.version>Greenwich.SR3</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springfram

  • Spring Boot 开发私有即时通信系统(WebSocket)

    1/ 概述 利用Spring Boot作为基础框架,Spring Security作为安全框架,WebSocket作为通信框架,实现点对点聊天和群聊天. 2/ 所需依赖 Spring Boot 版本 1.5.3,使用MongoDB存储数据(非必须),Maven依赖如下: <properties> <java.version>1.8</java.version> <thymeleaf.version>3.0.0.RELEASE</thymeleaf.ve

  • 详解Spring Cloud Stream使用延迟消息实现定时任务(RabbitMQ)

    我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次.然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,比如:编写博文时候,设置2小时之后发送.对于这些开始时间不确定的定时任务,我们也可以通过Spring Cloud Stream来很好的处理. 为了实现开始时间不确定的定时任务触发,我们将引入延迟消息的使用.RabbitMQ中提供了关于延迟消息的插件,所以本文就来具体介绍

  • Springboot实现阿里云通信短信服务有关短信验证码的发送功能

    前言 短信验证码是通过发送验证码到手机的一种有效的验证码系统.主要用于验证用户手机的合法性及敏感操作的身份验证. 现在市面上的短信服务平台有很多.大家在选择的时候未免会有些不好抉择.本人建议选择短信服务商应遵循以下几点: 服务商知名度高,业务流量大.(这样的平台可信度高) 服务稳定,不能经常宕机.(保证自身业务的流畅运行) 文档全面详细.(没文档怎么玩?) 最近的一个项目中,注册和修改密码时需要用到短信验证码校验手机号的功能.本人也是对比几家后,直接选择阿里云通信的短信服务.(本身项目服务器也是

  • springcloud config配置读取优先级过程详解

    情景描述 最近在修复Eureka的静态页面加载不出的缺陷时,最终发现是远程GIT仓库将静态资源访问方式配置给禁用了(spring.resources.add-mappings=false).虽然最后直接修改远程GIT仓库的此配置项给解决了(spring.resources.add-mappings=true),但是从中牵涉出的配置读取优先级我们必须好好的再回顾下 springcloud config读取仓库配置 通过config client模块来读取远程的仓库配置,只需要在boostrap.p

  • springcloud微服务基于redis集群的单点登录实现解析

    简介 本文介绍微服务架构中如何实现单点登录功能 创建三个服务: 操作redis集群的服务,用于多个服务之间共享数据 统一认证中心服务,用于整个系统的统一登录认证 服务消费者,用于测试单点登录 大体思路:每个服务都设置一个拦截器检查cookie中是否有token,若有token,则放行,若没有token,重定向到统一认证中心服务进行登录,登录成功后返回到被拦截的服务. 搭建redis集群服务 搭建redis集群参考文档 搭建统一认证中心 主函数添加注解 /** * 单点登录既要注册到服务注册中心,

  • Spring Cloud Stream异常处理过程解析

    应用处理 当消费者在处理接收到的消息时,有可能会由于某些原因而抛出异常.若希望对抛出来的异常进行处理的话,就需要采取一些异常处理手段,异常处理的方式可分为三种:应用层面的处理.系统层面的处理以及通过RetryTemplate进行处理. 本小节先来介绍较为常用的应用层面的异常处理方式,该方式又细分为局部处理和全局处理. 局部处理 Stream相关的配置内容如下: spring: cloud: stream: rocketmq: binder: name-server: 192.168.190.12

  • Spring Cloud Stream如何实现服务之间的通讯

    Spring Cloud Stream Srping cloud Bus的底层实现就是Spring Cloud Stream,Spring Cloud Stream的目的是用于构建基于消息驱动(或事件驱动)的微服务架构.Spring Cloud Stream本身对Spring Messaging.Spring Integration.Spring Boot Actuator.Spring Boot Externalized Configuration等模块进行封装(整合)和扩展,下面我们实现两个

  • Spring Cloud多个微服务之间调用代码实例

    这篇文章主要介绍了Spring Cloud多个微服务之间调用代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 现在又一个学生微服务 user 和 学校微服务 school,如果user需要访问school,我们应该怎么做? 1.使用RestTemplate方式 添加config import org.springframework.cloud.client.loadbalancer.LoadBalanced; import org.spr

  • 教你Spring Cloud保证各个微服务之间调用安全性

    导读:在微服务的架构下,系统会根据业务拆分为多个服务,各自负责单一的职责,在这样的架构下,我们需要确保各api的安全性,也就是说服务不是开放的,而是需要授权才可访问的,避免接口被不合法的请求所访问. 但是在在微服务集群中服务之间暴力的接口,或者对于第三方开放的接口如果不做及安全和认证,后果可想而知. 阅读下文之前思考几个问题: 如何在restTemplate远程调用请求增加添加统一认证? 服务认证如何规范加密和解密? 远程调用统一什么协议比较合适? 如下图,三个服务注册到同一个注册中心集群,服务

  • Spring Cloud Stream微服务消息框架原理及实例解析

    随着近些年微服务在国内的盛行,消息驱动被提到的越来越多.主要原因是系统被拆分成多个模块后,一个业务往往需要在多个服务间相互调用,不管是采用HTTP还是RPC都是同步的,不可避免快等慢的情况发生,系统性能上很容易遇到瓶颈.在这样的背景下,将业务中实时性要求不是特别高且非主干的部分放到消息队列中是很好的选择,达到了异步解耦的效果. 目前消息队列有很多优秀的中间件,目前使用较多的主要有 RabbitMQ,Kafka,RocketMQ 等,这些中间件各有优势,有的对 AMQP(应用层标准高级消息队列协议

  • 一文快速掌握Spring Cloud Stream

    目录 一.概述简介 1.1. cloud Stream是什么 1.2. 设计思想 1.4. 注解 二.基于注解代码练习 2.1. 消息驱动之生产者 2.3. 目前存在的问题 2.4. 分组解决重复消费问题 2.5. 消息持久化 三.函数式编程练习 本篇文章所涉及到的demo练习 使用的cloud 2021.0.3+ springboot2.6.8 一.概述简介 官网:https://docs.spring.io/spring-cloud-stream/docs/current/reference

  • Spring Cloud Stream消息驱动组件使用方法介绍

    目录 1.Stream解决的痛点问题 2.Stream重要概念 3.传统MQ模型与Stream消息驱动模型 4.Stream消息通信方式及编程模型 4.1.Stream消息通信方式 4.2.Stream编程注解 4.3.Stream消息驱动之开发生产者端 4.4.Stream消息驱动之开发消费者端 5.Stream高级之自定义消息通道 6.Stream高级之消息分组 MQ:消息队列/消息中间件/消息代理,产品有很多,ActiveMQ RabbitMQ RocketMQ Kafka 1.Strea

  • Spring Cloud Stream分区分组原理图解

    消息分组 通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上.默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能. 当把消费者复制一份,发现2个都能收到消息 2个消费者都加入同一个消费者 发现只有一个能收到 消息分区 有一些场

  • Spring Cloud 专题之Sleuth 服务跟踪实现方法

    目录 准备工作 实现跟踪 抽样收集 整合Zipkin 1.下载Zipkin 2.引入依赖配置 3.测试与分析 持久化到mysql 1.创建zipkin数据库 2.启动zipkin 3.测试与分析 在一个微服务架构中,系统的规模往往会比较大,各微服务之间的调用关系也错综复杂.通常一个有客户端发起的请求在后端系统中会经过多个不同的微服务调用阿里协同产生最后的请求结果.在复杂的微服务架构中,几乎每一个前端请求都会形成一条复杂的分布式的服务调用链路,在每条链路中任何一个依赖服务出现延迟过高或错误的时候都

  • spring cloud consul注册的服务报错critical的解决

    测试spring cloud 使用consul注册服务的时候,出现critical,如下: 怎么解决这个问题,现在只能看到health check检查失败了. 受限调用这个请求Get http://consulIp:8500/v1/agent/checks,调完请求,就会拿到返回数据: { ...... "service:test-service-xx-xx-xx-xx": { "Node": "zookeeper-server1", "

随机推荐