SpringCloud Stream 整合RabbitMQ的基本步骤

目录
  • 一、项目介绍
  • 二、生产者
  • 三、消费者
  • 四、验证 在postman 访问生产者接口:

本篇简单介绍SpringCloud Stream 整合RabbitMQ基本步骤:

  • 引入SpringCloud
  • 引入SpringCloud Stream相关依赖
  • 定义绑定接口: 消息生产者(Output…Binding) 、消息消费者(Input…Binding)
  • @EnableBinding 在对应类上进行定义
  • @StreamListener 在对应方法上创建监听用来消费消息
  • 调用output的send()方法生产消息

一、项目介绍

演示SpringCloud Stream 整合RabbitMQ,项目可以在一个工程里完成,本次建立了一个工程mq-service,其中包含三个Module:

  • mq-service-base :基础模块(包含了共用依赖、共用变量)
  • mq-service-producer :生产者
  • mq-service-consumer :消费者

注: 完全可以在一个工程里实现,这里为了区分,并为了后续单独启动或停止生产者或消费者做实验,也为了适应实际应用项目,所以创建了不同Module

(1)版本

  • SpringBoot : 2.0.6.RELEASE
  • SpringCloud : Finchley.SR2
  • RabbitMQ : 3.8.1

(2)项目整体结构

(3)基础模块

1)pom.xml

这里作为公共模块引入SpringCloud、Spring Cloud Stream等,其中也再此引入fastjson、lombok等工具依赖
(完整代码见文章最下面)
其中Spring Cloud Stream如下:

<!-- Spring Cloud Stream, 用于MQ消息发送-->
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2) model

定义共用的变量,如CollectionRequest.java

二、生产者

(1)结构

(2)pom.xml

导入base的依赖即可,因为相关共用依赖在base中已经引入

<dependency>
	<groupId>com.zrk</groupId>
	<artifactId>mq-service-base</artifactId>
	<version>0.0.1-SNAPSHOT</version>
</dependency>

(3)定义绑定(接口)

OutputMessageBinding.java

public interface OutputMessageBinding {
    /** Topic 名称*/
    String OUTPUT = "message-center-out";
    @Output(OUTPUT)
    MessageChannel output();

}

(4)添加配置

# rabbitmq连接信息
spring.rabbitmq.addresses=192.168.1.125
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

spring.cloud.stream.bindings.message-center-out.destination=message-center
spring.cloud.stream.rabbit.bindings.message-center-out.consumer.exchangeType=fanout

(5) 调用方法

CollectionServiceImpl.java

@Service
@EnableBinding(OutputMessageBinding.class)
public class CollectionServiceImpl implements CollectionService{
    @Resource
    private OutputMessageBinding outputMessageBinding;
    /**
     * @param schoolName
     * @param content
     */
    @Override
    public void getCollection(String schoolName, String content) {
        CollectionRequest request = new CollectionRequest();
        request.setSchoolName(schoolName);
        request.setContent(content);
        outputMessageBinding.output().send(MessageBuilder.withPayload(request).build());
    }
}

注: 主要是两点

  • @EnableBinding 定义
  • outputMessageBinding.output().send(MessageBuilder.withPayload(request).build()); 生产消息

三、消费者

(1)结构

(2)pom.xml

导入base的依赖即可,因为相关共用依赖在base中已经引入

<dependency>
	<groupId>com.zrk</groupId>
	<artifactId>mq-service-base</artifactId>
	<version>0.0.1-SNAPSHOT</version>
</dependency>

(3)定义绑定(接口)

InputMessageBinding.java

public interface InputMessageBinding {
    String INPUT = "message-center-input";
    @Input(INPUT)
    SubscribableChannel input();
}

注: 消费者这里与生产者不同,用的是SubscribableChannel ,而生产者用的是MessageChannel

(4)添加配置

# rabbitmq连接信息
spring.rabbitmq.addresses=192.168.1.125
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.cloud.stream.bindings.message-center-input.destination=message-center
spring.cloud.stream.bindings.message-center-input.group=${spring.application.name}

(5) 调用方法

CollectionReceiver.java

@Slf4j
@EnableBinding(InputMessageBinding.class)
public class CollectionReceiver {
    @StreamListener(InputMessageBinding.INPUT)
    public void handle(String value){
        log.info("[消息] 接收到发送消息MQ: {}", value);
        CollectionRequest request = JSON.parseObject(value, CollectionRequest.class);
        log.info("处理收集信息:" + request.toString());
    }
}

注: 主要是两点

  • @EnableBinding 定义
  • @StreamListener 注册监听

至此,生产者与消费者都创建完成,分别启动两个项目,并调用生产者接口进行验证:

四、验证 在postman 访问生产者接口:

localhost:30110/collection/getCollectionschoolName=‘zrk’&content=‘send message to rabbitmq’

观察消费者日志:

查看rabbitmq首页

则证明已经整合成功,接下来将研究一下更多的配置与用法。

如果有需要,可以参考项目完整代码:https://github.com/zrk333/mq-service

到此这篇关于SpringCloud Stream 整合RabbitMQ的文章就介绍到这了,更多相关SpringCloud Stream 整合RabbitMQ内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • SpringCloud Stream消息驱动实例详解

    1. 消息驱动概述 1.1 是什么 在实际应用中有很多消息中间件,比如现在企业里常用的有ActiveMQ.RabbitMQ.RocketMQ.Kafka等,学习所有这些消息中间件无疑需要大量时间经历成本,那有没有一种技术,使我们不再需要关注具体的消息中间件的细节,而只需要用一种适配绑定的方式,自动的在各种消息中间件内切换呢?消息驱动就是这样的技术,它能 屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型. SpringCloud Stream是一个构件消息驱动微服务的框架.应用程序通过i

  • Springcloud整合stream,rabbitmq实现消息驱动功能

    springcloud整合stream,rabbitmq实现消息驱动功能 1.代码实现: 创建项目stream 添加依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.2</version> <relativePath/>

  • SpringCloud Stream使用解析

    SpringCloudStream 官方定义Spring Cloud Stream 是一个构建消息驱动微服务的框架. 应用通过inputs和outputs来与Spring Cloud Stream中binder对象交互.通过我们配置来binding(绑定),而Spring Cloud Stream中的binder对象负责与消息中间件交互.所以,我们只需要搞清楚如何与Spring Cloud Stream 交互就可以方便使用消息驱动的方式. 通过使用Spring Integration来连接消息代

  • SpringCloud Stream 整合RabbitMQ的基本步骤

    目录 一.项目介绍 二.生产者 三.消费者 四.验证 在postman 访问生产者接口: 本篇简单介绍SpringCloud Stream 整合RabbitMQ基本步骤: 引入SpringCloud 引入SpringCloud Stream相关依赖 定义绑定接口: 消息生产者(Output…Binding) .消息消费者(Input…Binding) @EnableBinding 在对应类上进行定义 @StreamListener 在对应方法上创建监听用来消费消息 调用output的send()

  • Springcloud Stream消息驱动工具使用介绍

    目录 springcloud Stream 什么是springcloud Stream 什么是Binder 为什么使用Stream Stream使用案例 前置知识 Stream处理消息的架构 Stream常用注解 消息生产者8801模块搭建 消息消费者8802模块搭建 Stream带来的问题 重复消费问题 自定义分组 持久化问题 springcloud Stream 什么是springcloud Stream   现在市面上有很多的消息中间件,每一个公司使用的都有所不同,为了减少学习的成本,sp

  • SpringBoot整合RabbitMQ消息队列的完整步骤

    SpringBoot整合RabbitMQ 主要实现RabbitMQ以下三种消息队列: 简单消息队列(演示direct模式) 基于RabbitMQ特性的延时消息队列 基于RabbitMQ相关插件的延时消息队列 公共资源 1. 引入pom依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>

  • 最新SpringCloud Stream消息驱动讲解

    目录 SpringCloud Stream消息驱动 1.SpringCloud Stream概述 1.1.设计思想 1.2.标准的流程套路 1.3.编码API和常用注解 2.消息驱动之生产者(output) 2.1.新建模块cloud-stream-rabbitmq-provider8801 2.2.引入pom.xml配置文件 2.3.YAML配置文件 2.4.生产者启动类 2.5.业务实现 2.6.启动测试 3.消息驱动之消费者(input) 3.1.新建cloud-stream-rabbit

  • SpringBoot整合RabbitMQ, 实现生产者与消费者的功能

    自然,依赖是少不了的.除了spring-boot-starter-web依赖外. 就这个是最主要的依赖了,其他的看着办就是了.我用的是gradle,用maven的看着弄也一样的.无非就是包+包名+版本 //AMQP compile('org.springframework.boot:spring-boot-starter-amqp:2.0.4.RELEASE') 这里有一个坑.导致我后来发送消息时一直连不上去.报错: java.net.SocketException: socket closed

  • SpringBoot整合Drools的实现步骤

    Drools有什么用 从我个人所待过的公司,其中做智能酒店这个项目时就用到规则引擎Drools,将它用于处理优惠劵规则. SpringBoot整合Drools初步实战 1.导入Maven依赖 <properties> <drools.version>7.14.0.Final</drools.version> </properties> <!-- drools --> <dependency> <groupId>org.dr

  • Springboot 整合 RabbitMQ 消息队列 详情

    目录 生产者工程 POM依赖 application文件 生产者业务代码 测试 Direct 模式 业务代码 消费者 消息监听 Topic 模式 生产者 消费者 生产者工程 POM依赖 可以在创建工程时直接选择添加依赖. application文件 因为rabbitmq具有默认地址及用户信息,所以如果是本地rabbitmq可以不需要进行配置. RabbitMQ配置文件: 在使用相关交换机及队列时,我们需要实现声明交换机及队列,如果没有对应信息,则启动项目会失败.所以在使用springboot整合

  • Spring Boot整合RabbitMQ实例(Topic模式)

    1.Topic交换器介绍 Topic Exchange 转发消息主要是根据通配符. 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息. 在这种交换机模式下: 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等. 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*

随机推荐