springboot-rabbitmq-reply 消息直接回复模式详情

目录
  • 一、使用场景
  • 二、Reply实战
    • (1)依赖与YML配置
    • (2)RabbitMq bean配置
    • (3)消息生产端
    • (4)消息消费端
      • (1)方法一:sendTo注解+方法返回值
      • (2)方法二:读取生产端的消息使用模板发送
      • (3)方法三:方法返回值
      • (4)测试

一、使用场景

MQ的作用包括了解耦、异步等。

通常生产者只负责生产消息,而不关心消息谁去获取,或者消费结果如何;消费者只负责接收指定的消息进行业务处理而不关心消息从哪里来一级回复业务处理情况。但我们项目中有特殊的业务存在,我们作为消息生产者在生产消息后需要接收消费者的响应结果(说白了就是类似同步调用 请求响应的MQ使用),经过研究,MQ的Reply模式(直接回复模式)就是为此种业务模式而产生。

二、Reply实战

(1)依赖与YML配置

依赖:

我这里只列出最核心的rabbitMq所需依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置:

无其余特殊配置,因为reply就是rabbitmq的一种交互方式而已

spring:
  rabbitmq:
    host: 10.50.40.116
    port: 5673
    username: admin
    password: admin

(2)RabbitMq bean配置

package com.leilei.demo;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @author lei
 * @create 2022-09-19 21:44
 * @desc mq配置
 **/
@Configuration
public class RabbitMqConfig {
    @Bean
    public Queue bizQueue() {
        return new Queue("bizQueue");
    }
    @Bean
    public Queue replyQueue() {
        return new Queue("replyQueue");
    }
    @Bean
    FanoutExchange bizExchange() {
        return new FanoutExchange("bizExchange");
    }
}

业务类:

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Vehicle implements Serializable {
    private Integer id;
    private String name;
}

(3)消息生产端

消息生产端需要做的事情:有生产消息、接受消息消费响应

(1)生产消息

  • 1、生产消息,看业务场景选择是否生成全局唯一自定义的消息ID
  • 2、指定消息消费后响应的队列(Reply)
    /**
     * 生产消息
     *
     * @param
     * @return void
     * @author lei
     * @date 2022-09-19 21:59:18
     */
    public void replySend() {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setReplyTo("replyQueue");
        //todo 根据业务,做一个严谨的全局唯一ID,我这里暂时用UUID
        String correlationId = UUID.randomUUID().toString();
        // 我这里指定了唯一消息ID,看业务场景,消费者消费响应后,生产者端可根据消息ID做业务处理
        messageProperties.setCorrelationId(correlationId);
        Vehicle vehicle = new Vehicle(1, "川A0001");
        Message message = new Message(JSON.toJSONString(vehicle).getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("bizExchange","",message);
        System.out.println("生产者发送消息,自定义消息ID为:" + correlationId);
    }

(2)接受Reply响应

消费者消费消息后会将处理结果进行发送到一个队列,我们读取这里队列就可以拿到对应消息的响应结果进行业务处理了

    /**
     * 接收消息响应
     *
     * @param message
     * @return void
     * @author lei
     * @date 2022-09-19 21:59:27
     */
    @RabbitListener(queues = "replyQueue")
    public void replyResponse(Message message) {
        String s = new String(message.getBody());
        String correlationId = message.getMessageProperties().getCorrelationId();
        System.out.println("收到客户端响应消息ID:" + correlationId);
        //todo 根据消息ID可判断这是哪一个消息的响应,我们就可做业务操作
        System.out.println("收到客户端响应消息:" + s);
    }

(4)消息消费端

消息消费端需要做的事有:接受消息然后进行业务处理、响应消息

(1)方法一:sendTo注解+方法返回值

一般来说,我们mq消费者监听方法不需要返回值,我们这里使用sendTo注解,则需要将要响应的消息定义为返回值,sendTo注解中指定要响应到哪个队列

重点:

  • 1、sendTo注解指定要相应的队列(注意和生产端保持一致)
  • 2、方法定义的返回值内容就是要响应的消息,最终会发送到sendTo注解指定要相应的队列
  • 3、这种方法的缺点是消费端的主关性很高,因为sendTo指定的目标队列可以自己瞎写,导致生产者端无法正确收到消息响应,但我相信一般项目中也不会这么干
    /**
     * 方式1   SendTo指定响应队列
     *
     * @param message
     * @return String
     * @author lei
     * @date 2022-09-19 16:17:52
     */
    @RabbitListener(queues ="bizQueue")
    @SendTo("replyQueue")
    public String handleEmailMessage(Message message) {
        try {
            String msg=new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("---consumer接收到消息----{}",msg);
            return "客户端响应消息:"+msg+"处理完成!";
        } catch (Exception e) {
            log.error("处理业务消息失败",e);
        }
        return null;
    }

(2)方法二:读取生产端的消息使用模板发送

与普通的消费者方法一样,只需要RabbitListener注解监听业务队列;但还需要根据消息获取出ReplyTo地址,然后自己消费者方法内部手动发送消息

  • 1、优点,更强烈的感受到消息请求 响应的交互性,流程看起来更清晰
  • 2、缺点,代码不雅
    /**
     * 方式2  message消息获取内部reply rabbitmq手动发送
     *
     * @param message
     * @return String
     * @author lei
     * @date 2022-09-19 16:17:52
     */
    @RabbitListener(queues = "bizQueue")
    public void handleEmailMessage2(Message message) {
        try {
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("---consumer接收到消息----{}", msg);
            String replyTo = message.getMessageProperties().getReplyTo();
            System.out.println("接收到的reply:" + replyTo);
            rabbitTemplate.convertAndSend(replyTo, "客户端响应消息:" + msg + "处理完成!", x -> {
                x.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId());
                return x;
            });
        } catch (Exception e) {
            log.error("处理业务消息失败",e);
        }
    }

(3)方法三:方法返回值

这种方式与1其实是一致的,但我经过测试,因为生产者消息指定了ReplyTo的地址,消费者端无需自己再次手动指定,即生产消息到哪里,是否响应以及响应消息发送到哪里全由生产端自己空,消费者只需要处理自身业务以及返回结果

   /**
     * 方式三  方法有返回值,返回要响应的数据 (reply 由生产者发送消息时指定,消费者不做任何处理)
     *
     * @param message
     * @return String
     * @author lei
     * @date 2022-09-19 23:17:47
     */
    @RabbitListener(queues ="bizQueue")
    public String handleEmailMessage3(Message message) {
        try {
            String msg=new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("---consumer接收到消息----{}",msg);
            return "客户端响应消息:"+msg+"处理完成!";
        }
        catch (Exception e) {
            log.error("处理业务消息失败",e);
        }
        return null;
    }

(4)测试

生产消息:

消费消息与响应:

收到的响应:

链路:

如此,MQ版本的请求响应模式就完成了,其实很多大佬使用MQ来实现RPC就是用的ReplyTo啦!

到此这篇关于springboot-rabbitmq-reply 消息直接回复模式详情的文章就介绍到这了,更多相关springboot-rabbitmq-reply 内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • SpringBoot整合RabbitMQ实现交换机与队列的绑定

    目录 简介 配置方法概述 法1:配置类(简洁方法)(推荐) 法2:配置类(繁琐方法)(不推荐) 法3:使用方配置(不推荐) 法4:MQ服务端网页(不推荐) 简介 本文用实例介绍SpringBoot中RabbitMQ如何绑定交换机(交换器)与队列. 配置方法概述 交换机 下边两种方式等价. ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME).durable(true).build(); new TopicExchange(EXCHANGE_T

  • SpringBoot+RabbitMQ实现消息可靠传输详解

    目录 环境配置 消息丢失分析 生产阶段 生产端模拟消息丢失 RabbitMQ 消费端 环境配置 SpringBoot 整合 RabbitMQ 实现消息的发送. 1.添加 maven 依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <depen

  • SpringBoot 使用log4j2的配置过程

    目录 前言 日志接口(slf4j) 日志实现(log4j.logback.log4j2) 配置过程 涉及的POM部分文件 前言 日志接口(slf4j) slf4j是对所有日志框架制定的一种规范.标准.接口,并不是一个框架的具体的实现,因为接口并不能独立使用,需要和具体的日志框架实现配合使用(如log4j.logback). 接口用于定制规范,可以有多个实现,使用时是面向接口的(导入的包都是slf4j的包而不是具体某个日志框架中的包),即直接和接口交互,不直接使用实现,所以可以任意的更换实现而不用

  • Springboot整合Rabbitmq之Confirm和Return机制

    目录 前言 为什么会有Confirm Springboot整合Mq实现Confirm监听机制 依赖引入 增加配置文件,设定连接信息 配置队列.交换机,以及对其进行绑定 编写mq消息发送服务 编写消息发送接口 启动项目进行测试 正常测试 异常测试 什么是Return? 增加ReturnCallback监听并测试 修改RabbitmqService配置类 测试 总结 相关代码下载 前言 之前专栏中,对Springboot整合Rabbitmq都有一系列的配置和说明,但总缺少一些必要的描述信息.导致很多

  • SpringBoot利用AOP实现一个日志管理详解

    目录 1. 需求 2. 新建一张日志表 3. 写相应的Controller层 4.Service接口层 5.Service实现 6.Mapper接口 7.Mapper.xml(我用的是Mybatis) 8.CspLog 9.实体类SysOperCspLog 10. 定义日志管理的切面 11.AsyncFactoryCsp 12. 写一个Controller的Demo来执行一条日志试试 1. 需求 目前有这么个问题,有两个系统CSP和OMS,这俩系统共用的是同一套日志操作:Log;目前想区分下这俩

  • Springboot整合RabbitMq测试TTL的方法详解

    目录 什么是TTL? 如何设置TTL? 设定整个队列的过期时间 配置类编写 测试 配置 测试 总结 代码下载 什么是TTL? 在RabbitMq中,存在一种高级特性 TTL. TTL即Time To Live的缩写,含义为存活时间或者过期时间.即: 设定消息在队列中存活的时间.当指定时间内,消息依旧未被消费,则由队列自动将其删除. 如何设置TTL? 既然涉及到设定消息的存活时间,在RabbitMq中,存在两种设置方式: 设置整个队列的过期时间. 设置单个消息的过期时间. 设定整个队列的过期时间

  • springboot+rabbitmq实现智能家居实例详解

    目录 引言 一.什么是 MQTT协议? 二.为什么要用 MQTT协议? 三.MQTT协议介绍 MQTT数据包 1.固定头 2.可变头 3.消息体payload 消息质量(QoS ) 1.Qos 0 2.Qos 1 3.Qos 2 LWT(最后遗嘱) 四.MQTT协议应用场景 五.代码实现 1.启用 rabbitmq的mqtt协议 2.mqtt 客户端依赖包 3.消息发送者 4.消息订阅 六.测试消息 1.测试消息发送 2.测试消息订阅 七.应用注意事项 clientId 要唯一 八.其他中间件

  • 详解SpringBoot整合RabbitMQ如何实现消息确认

    目录 简介 生产者消息确认 介绍 流程 配置 ConfirmCallback ReturnCallback 注册ConfirmCallback和ReturnCallback 消费者消息确认 介绍 手动确认三种方式 简介 本文介绍SpringBoot整合RabbitMQ如何进行消息的确认. 生产者消息确认 介绍 发送消息确认:用来确认消息从 producer发送到 broker 然后broker 的 exchange 到 queue过程中,消息是否成功投递. 如果消息和队列是可持久化的,那么确认消

  • springboot-rabbitmq-reply 消息直接回复模式详情

    目录 一.使用场景 二.Reply实战 (1)依赖与YML配置 (2)RabbitMq bean配置 (3)消息生产端 (4)消息消费端 (1)方法一:sendTo注解+方法返回值 (2)方法二:读取生产端的消息使用模板发送 (3)方法三:方法返回值 (4)测试 一.使用场景 MQ的作用包括了解耦.异步等. 通常生产者只负责生产消息,而不关心消息谁去获取,或者消费结果如何:消费者只负责接收指定的消息进行业务处理而不关心消息从哪里来一级回复业务处理情况.但我们项目中有特殊的业务存在,我们作为消息生

  • SpringBoot+RabbitMQ方式收发消息的实现示例

    本篇会和SpringBoot做整合,采用自动配置的方式进行开发,我们只需要声明RabbitMQ地址就可以了,关于各种创建连接关闭连接的事都由Spring帮我们了~ 交给Spring帮我们管理连接可以让我们专注于业务逻辑,就像声明式事务一样易用,方便又高效. 祝有好收获,先赞后看,快乐无限. 本文代码:   https://gitee.com/he-erduo/spring-boot-learning-demo https://github.com/he-erduo/spring-boot-lea

  • springboot + rabbitmq 如何实现消息确认机制(踩坑经验)

    本文收录在个人博客:www.chengxy-nds.top,技术资源共享,一起进步 最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人成长的. 于是乎我主动报名参加了分享,咳咳咳~ ,真的不是为了那点KPI,就是想和大伙一起学习学习! 这次我分享的是 springboot + rabbitmq 如何实现消息确认机制,以及在实际开发中的一点踩坑经验,

  • SpringBoot整合RabbitMQ实现消息确认机制

    前面几篇案例已经将常用的交换器(DirectExchange.TopicExchange.FanoutExchange)的用法介绍完了,现在我们来看一下消息的回调,也就是消息确认. 在rabbitmq-provider项目的application.yml文件上加上一些配置 server: port: 8021 spring: #给项目来个名字 application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.

  • springboot中rabbitmq实现消息可靠性机制详解

    1. 生产者模块通过publisher confirm机制实现消息可靠性 1.1 生产者模块导入rabbitmq相关依赖 <!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-

  • RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合

    目录 1. 概述 2. 场景说明 3. 与Springboot的整合 3.1 引入依赖 3.2 生产服务配置 3.3 生产服务代码 3.4 消费服务配置 3.5 消费服务代码 3.6 Rest 测试代码 4. 综述 1. 概述 老话说的好:做人要懂得变通,善于思考,有时稍微转个弯,也许问题就解决了. 言归正传,之前我们聊了 RabbitMQ 3.9.7 镜像模式集群的搭建,今天我们来聊聊 RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合. 2. 场景说明 服务器

  • springboot实现公众号接收回复消息和超过5秒被动回复消息

    目录 1.首先第一步要接收微信消息,需要在公众号里设置与开发-基本配置里配置一下服务器配置 2.配置好公众号以后,开始接收微信消息 本次就是记录一下我的开发过程,不是教程,纯属自己做个笔记. 现在项目有个需求,需要用户在公众号发送图片消息的时候,我后台程序能接收到这个图片,并用ai处理图片并返回信息. 1.首先第一步要接收微信消息,需要在公众号里设置与开发-基本配置里配置一下服务器配置 这个url配置了以后,所以微信公众号的消息都会被推送到这个url对应的接口上,Token感觉没啥用,随便写一个

  • 前端与RabbitMQ实时消息推送未读消息小红点实现示例

    目录 引言 RabbitMQ 搭建 1.开启 mqtt 协议 服务端消息发送 1.mqtt 客户端依赖包 2.消息发送者 前端消息订阅 测试 总结 引言 前几天粉丝群里有个小伙伴问过:web 页面的未读消息(小红点)怎么实现比较简单,刚好本周手头有类似的开发任务,索性就整理出来供小伙伴们参考,没准哪天就能用得上呢. 之前在 <springboot + rabbitmq 做智能家居> 中说过可以用 rabbitmq 的 MQTT 协议做智能家居的指令推送,里边还提到过能用 MQTT 协议做 we

随机推荐