Springcloud整合stream,rabbitmq实现消息驱动功能

springcloud整合stream,rabbitmq实现消息驱动功能

1.代码实现:

创建项目stream

添加依赖

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.cxh</groupId>
    <artifactId>stream</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>stream</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>8</java.version>
        <spring-cloud-alibaba-dependencies.version>2021.1</spring-cloud-alibaba-dependencies.version>
        <spring-cloud-dependencies.version>2021.0.0</spring-cloud-dependencies.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud-dependencies.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${spring-cloud-alibaba-dependencies.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>

监听类

 @EnableBinding(Sink.class)
public class SinkReceiver {

    private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);

    @StreamListener(Sink.INPUT)
    public void receive(String payload) {
        logger.info("Received: " + payload);
    }

}

2.实现效果:

启动rabbitmq, 项目stream

打开浏览器http://localhost:15672/,使用账号密码guest登录rabbitmq, 在队列中发现消息:

测试send

查看控制台消息:

com.cxh.stream.SinkReceiver              : Received: 测试send

补充

Spring Cloud Stream专门用于事件驱动的微服务系统,使用消息中间件来收发信息。使用Spring Cloud Stream可专注于业务开发,而不用花太多心思在应用与MQ之间的交互上。而且,在切换MQ后,也无须做太多的代码改动。

所以Spring Cloud Stream和RabbitMQ还可以整合实现消息的收发

整合过程

添加依赖

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

不同的MQ使用不同的依赖,非常容易切换。

定义处理收发的方法

队列无非就是收和发,所以我们要先定义好,怎么样发,怎么样收。

发送消息:

@Bean
public Supplier<String> pkslowSource() {
  return () -> {
    String message = "www.pkslow.com";
    log.info("Sending value: " + message);
    return message;
  };
}

只发送一个String,一般业务通常为Entity类。这里发送的内容也固定不变,实际业务可以通过查数据库,读文件等方式获取数据源。

接收消息:

@Bean
public Consumer<String> pkslowSink() {
  return message -> {
    log.info("Received message " + message);
  };
}

直接打印消息即可,项目中的逻辑可按具体业务实现。

配置属性

配置RabbitMQ:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: pkslow
    password: 123456

配置Spring Cloud Stream的相关项:

spring:
  cloud:
    stream:
      function:
        definition: pkslowSource;pkslowSink
      bindings:
        pkslowSource-out-0:
         destination: pkslow-topic
        pkslowSink-in-0:
          destination: pkslow-topic
      poller:
        fixed-delay: 500

spring.cloud.stream.function.definition会定义处理方法,如本文的收发消息的方法;

bindings配置对应的function;destination指向MQ的主题;

这里配了一个poller,每隔500ms就会发送一次消息。

运行

先启动个RabbitMQ:

docker run \
-e RABBITMQ_DEFAULT_USER=pkslow \
-e RABBITMQ_DEFAULT_PASS=123456 \
-p 5672:5672 -p 15672:15672 \
rabbitmq:3.8-management

运行程序后,会自己创建主题、发送信息、接收信息:

运行日志如下:

可以看到每一次发/收大概是间隔了500ms,当然不可能是精确的500ms。

以上就是Springcloud整合stream,rabbitmq实现消息驱动功能的详细内容,更多关于Springcloud stream rabbitmq消息驱动的资料请关注我们其它相关文章!

(0)

相关推荐

  • SpringCloud Stream 整合RabbitMQ的基本步骤

    目录 一.项目介绍 二.生产者 三.消费者 四.验证 在postman 访问生产者接口: 本篇简单介绍SpringCloud Stream 整合RabbitMQ基本步骤: 引入SpringCloud 引入SpringCloud Stream相关依赖 定义绑定接口: 消息生产者(Output…Binding) .消息消费者(Input…Binding) @EnableBinding 在对应类上进行定义 @StreamListener 在对应方法上创建监听用来消费消息 调用output的send()

  • SpringCloud Stream使用解析

    SpringCloudStream 官方定义Spring Cloud Stream 是一个构建消息驱动微服务的框架. 应用通过inputs和outputs来与Spring Cloud Stream中binder对象交互.通过我们配置来binding(绑定),而Spring Cloud Stream中的binder对象负责与消息中间件交互.所以,我们只需要搞清楚如何与Spring Cloud Stream 交互就可以方便使用消息驱动的方式. 通过使用Spring Integration来连接消息代

  • SpringCloud Stream消息驱动实例详解

    1. 消息驱动概述 1.1 是什么 在实际应用中有很多消息中间件,比如现在企业里常用的有ActiveMQ.RabbitMQ.RocketMQ.Kafka等,学习所有这些消息中间件无疑需要大量时间经历成本,那有没有一种技术,使我们不再需要关注具体的消息中间件的细节,而只需要用一种适配绑定的方式,自动的在各种消息中间件内切换呢?消息驱动就是这样的技术,它能 屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型. SpringCloud Stream是一个构件消息驱动微服务的框架.应用程序通过i

  • Springcloud整合stream,rabbitmq实现消息驱动功能

    springcloud整合stream,rabbitmq实现消息驱动功能 1.代码实现: 创建项目stream 添加依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.2</version> <relativePath/>

  • springcloud整合gateway实现网关全局过滤器功能

    目录 1.代码实现: 2.实现效果: springcloud整合gateway实现网关全局过滤器功能,在​ ​springcloud整合gateway实现网关 ​​基础功能上进行修改. 1.代码实现: 添加自定义全局过滤器 /**  * 自定义一个全局过滤器  * 实现 globalfilter , ordered接口  */ @Component public class LoginFilter implements GlobalFilter, Ordered {     /**      *

  • 最新SpringCloud Stream消息驱动讲解

    目录 SpringCloud Stream消息驱动 1.SpringCloud Stream概述 1.1.设计思想 1.2.标准的流程套路 1.3.编码API和常用注解 2.消息驱动之生产者(output) 2.1.新建模块cloud-stream-rabbitmq-provider8801 2.2.引入pom.xml配置文件 2.3.YAML配置文件 2.4.生产者启动类 2.5.业务实现 2.6.启动测试 3.消息驱动之消费者(input) 3.1.新建cloud-stream-rabbit

  • Springcloud Stream消息驱动工具使用介绍

    目录 springcloud Stream 什么是springcloud Stream 什么是Binder 为什么使用Stream Stream使用案例 前置知识 Stream处理消息的架构 Stream常用注解 消息生产者8801模块搭建 消息消费者8802模块搭建 Stream带来的问题 重复消费问题 自定义分组 持久化问题 springcloud Stream 什么是springcloud Stream   现在市面上有很多的消息中间件,每一个公司使用的都有所不同,为了减少学习的成本,sp

  • Spring Cloud Stream消息驱动组件使用方法介绍

    目录 1.Stream解决的痛点问题 2.Stream重要概念 3.传统MQ模型与Stream消息驱动模型 4.Stream消息通信方式及编程模型 4.1.Stream消息通信方式 4.2.Stream编程注解 4.3.Stream消息驱动之开发生产者端 4.4.Stream消息驱动之开发消费者端 5.Stream高级之自定义消息通道 6.Stream高级之消息分组 MQ:消息队列/消息中间件/消息代理,产品有很多,ActiveMQ RabbitMQ RocketMQ Kafka 1.Strea

  • 详解Spring Cloud Stream使用延迟消息实现定时任务(RabbitMQ)

    我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次.然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,比如:编写博文时候,设置2小时之后发送.对于这些开始时间不确定的定时任务,我们也可以通过Spring Cloud Stream来很好的处理. 为了实现开始时间不确定的定时任务触发,我们将引入延迟消息的使用.RabbitMQ中提供了关于延迟消息的插件,所以本文就来具体介绍

  • SpringBoot整合RabbitMQ实现消息确认机制

    前面几篇案例已经将常用的交换器(DirectExchange.TopicExchange.FanoutExchange)的用法介绍完了,现在我们来看一下消息的回调,也就是消息确认. 在rabbitmq-provider项目的application.yml文件上加上一些配置 server: port: 8021 spring: #给项目来个名字 application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.

  • RabbitMq消息防丢失功能实现方式讲解

    目录 1.概述 1.1.数据丢失的原因 1.2.如何防止数据丢失 2.手动应答 3.消息确认机制 3.1.AMQP事务 3.2.confirm 1.概述 1.1.数据丢失的原因 在消息中有三种可能性造成数据丢失: 消费者消费消息失败 生产者生产消息失败 MQ数据丢失 消费者消费消息失败: RabbitMq存在应答机制,默认为自动应答,MQ向消费者推送一条消息,消费者收到这条消息后会返回一个ack(应答)给MQ,MQ收到应答后会删除这条消息. 自动应答存在一个问题,就是消费者收到消息后立马就会给M

  • Spring Boot+RabbitMQ 通过fanout模式实现消息接收功能(支持消费者多实例部署)

    本文章适用的场景:同一条消息可以被多个消费者同时消费.注意:当消费者多实例部署时,会轮询消费消息.网上有大量的的案例展示:P生产一条消息,消费者服务C中建立Q1和Q2两个队列共同消费.但极少的材料展示:P生产一条消息后M1,消费者C1和C2可以同时消费M1,如下图所示.案例基于Spring Boot以及RabbitMQ的“fanout”类型exchange.已经实测可放心使用. 1.引入基本依赖,项目不同请您按自己的情况引入合适的依赖 <dependency> <groupId>o

随机推荐