SpringBoot+RabbitMQ实现消息可靠传输详解

目录
  • 环境配置
  • 消息丢失分析
  • 生产阶段
    • 生产端模拟消息丢失
  • RabbitMQ
  • 消费端

环境配置

SpringBoot 整合 RabbitMQ 实现消息的发送。

1.添加 maven 依赖

       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.添加 application.yml 配置文件

spring:
  rabbitmq:
    host: 192.168.3.19
    port: 5672
    username: admin
    password: xxxx

3.配置交换机、队列以及绑定

    @Bean
    public DirectExchange myExchange() {
        DirectExchange directExchange = new DirectExchange("myExchange");
        return directExchange;
    }

    @Bean
    public Queue myQueue() {
        Queue queue = new Queue("myQueue");
        return queue;
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey");
    }

4.生产发送消息

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public String send(String message) {
        rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message);
        System.out.println("【发送消息】" + message)
        return "【send message】" + message;
    }

5.消费者接收消息

    @RabbitListener(queuesToDeclare = @Queue("myQueue"))
    public void process(String msg, Channel channel, Message message) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date date = new Date();
        String time = sdf.format(date);
        System.out.println("【接收信息】" + msg + " 当前时间" + time);

6.调用生产端发送消息 hello,控制台输出:

【发送消息】hello
【接收信息】hello 当前时间2022-05-12 10:21:14

说明消息已经被成功接收。

消息丢失分析

一条消息的从生产到消费,消息丢失可能发生在以下几个阶段:

  • 生产端丢失: 生产者无法传输到 RabbitMQ
  • 存储端丢失: RabbitMQ 存储自身挂了
  • 消费端丢失:存储由于网络问题,无法发送到消费端,或者消费挂了,无法发送正常消费

RabbitMQ 从生产端、储存端、消费端都对可靠性传输做很好的支持。

生产阶段

生产阶段通过请求确认机制,来确保消息的可靠传输。当发送消息到 RabbitMQ 服务器 之后,RabbitMQ 收到消息之后,给发送返回一个请求确认,表示RabbitMQ 服务器已成功的接收到了消息。

配置application.yml

spring:
  rabbitmq:
    # 消息确认机制 生产者 -> 交换机
    publisher-confirms: true
    # 消息返回机制  交换机 -> 队列
    publisher-returns: true

配置

@Configuration
@Slf4j
public class RabbitConfig {

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("【correlationData】:" + correlationData);
                log.info("【ack】" + ack);
                log.info("【cause】" + cause);
                if (ack) {
                    log.info("【发送成功】");
                } else {
                    log.info("【发送失败】correlationData:" + correlationData + " cause:" + cause);
                }
            }
        });
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.warn("【消息发送失败】");
                log.info("【message】" + message);
                log.info("【replyCode】" + replyCode);
            }
        });

        return rabbitTemplate;
    }
}

消息从 生产者 到 交换机, 有confirmCallback 确认模式。发送消息成功后消息会调用方法confirm(CorrelationData correlationData, boolean ack, String cause),根据 ack 判断消息是否发送成功。

消息从 交换机 到 队列,有returnCallback 退回模式。

发送消息 product message 控制台输出如下:

【发送消息】product message
【接收信息】product message 当前时间2022-05-12 11:27:56
【correlationData】:null
【ack】true
【cause】null
【发送成功】

生产端模拟消息丢失

这里有两个方案:

  • 发送消息后立马关闭 broke,后者把网络关闭,但是broker关闭之后控制台一直就会报错,发送消息也报500错误。
  • 发送不存在的交换机:
// myExchange 修改成 myExchangexxxxx
rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);

结果:

【correlationData】:null
【ack】false
【cause】channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchangexxxxx' in vhost '/', class-id=60, method-id=40)
【发送失败】

当发送失败可以对消息进行重试

交换机正确,发送不存在的队列:

交换机接收到消息,返回成功通知,控制台输出:

【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]
【ack】true
【cause】null
【发送成功】

交换机没有找到队列,返回失败信息:

【消息发送失败】
【message】product message
【replyCode】312

RabbitMQ

开启队列持久化,创建的队列和交换机默认配置是持久化的。首先把队列和交换机设置正确,修改消费监听的队列,使得消息存放在队列里

修改队列的持久化,修改成非持久化:

    @Bean
    public Queue myQueue() {
        Queue queue = new Queue("myQueue",false);
        return queue;
    }

发送消息之后,消息存放在队列中,然后重启 RabbitMQ,消息不存在了。
设置队列持久化:

    @Bean
    public Queue myQueue() {
        Queue queue = new Queue("myQueue",true);
        return queue;
    }

重启之后,队列的消息还存在。

消费端

消费端默认开始 ack 自动确认模式,当队列消息被消费者接收,不管有没有被消费端消息,都自动删除队列中的消息。所以为了确保消费端能成功消费消息,将自动模式改成手动确认模式:

修改application.yml 文件

spring:
  rabbitmq:
    # 手动消息确认
    listener:
      simple:
        acknowledge-mode: manual

消费接收消息之后需要手动确认:

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    @RabbitListener(queuesToDeclare = @Queue("myQueue"))
    public void process(String msg, Channel channel, Message message) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date date = new Date();
        String time = sdf.format(date);
        System.out.println("【接收信息】" + msg + " 当前时间" + time);
        System.out.println(message.getMessageProperties().getDeliveryTag());
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

如果不添加:

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

发送两条消息

消息被接收后,没有确认,重新放到队列中:

重启项目,之后,队列的消息会发送到消费者,但是没有 ack 确认,还是继续会放回队列中。

加上 channel.basicAck 之后,再重启项目

队列消息就被删除了

basicAck 方法最后一个参数 multiple 表示是删除之前的队列。

multiple 设置成 true,把后面的队列都清理掉了

到此这篇关于SpringBoot+RabbitMQ实现消息可靠传输详解的文章就介绍到这了,更多相关SpringBoot RabbitMQ消息可靠传输内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • springboot中rabbitmq实现消息可靠性机制详解

    1. 生产者模块通过publisher confirm机制实现消息可靠性 1.1 生产者模块导入rabbitmq相关依赖 <!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-

  • SpringBoot整合RabbitMQ实现消息确认机制

    前面几篇案例已经将常用的交换器(DirectExchange.TopicExchange.FanoutExchange)的用法介绍完了,现在我们来看一下消息的回调,也就是消息确认. 在rabbitmq-provider项目的application.yml文件上加上一些配置 server: port: 8021 spring: #给项目来个名字 application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.

  • SpringBoot+RabbitMQ方式收发消息的实现示例

    本篇会和SpringBoot做整合,采用自动配置的方式进行开发,我们只需要声明RabbitMQ地址就可以了,关于各种创建连接关闭连接的事都由Spring帮我们了~ 交给Spring帮我们管理连接可以让我们专注于业务逻辑,就像声明式事务一样易用,方便又高效. 祝有好收获,先赞后看,快乐无限. 本文代码:   https://gitee.com/he-erduo/spring-boot-learning-demo https://github.com/he-erduo/spring-boot-lea

  • SpringBoot整合RabbitMQ消息队列的完整步骤

    SpringBoot整合RabbitMQ 主要实现RabbitMQ以下三种消息队列: 简单消息队列(演示direct模式) 基于RabbitMQ特性的延时消息队列 基于RabbitMQ相关插件的延时消息队列 公共资源 1. 引入pom依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>

  • springboot + rabbitmq 如何实现消息确认机制(踩坑经验)

    本文收录在个人博客:www.chengxy-nds.top,技术资源共享,一起进步 最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人成长的. 于是乎我主动报名参加了分享,咳咳咳~ ,真的不是为了那点KPI,就是想和大伙一起学习学习! 这次我分享的是 springboot + rabbitmq 如何实现消息确认机制,以及在实际开发中的一点踩坑经验,

  • SpringBoot+RabbitMQ实现消息可靠传输详解

    目录 环境配置 消息丢失分析 生产阶段 生产端模拟消息丢失 RabbitMQ 消费端 环境配置 SpringBoot 整合 RabbitMQ 实现消息的发送. 1.添加 maven 依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <depen

  • springboot+rabbitmq实现智能家居实例详解

    目录 引言 一.什么是 MQTT协议? 二.为什么要用 MQTT协议? 三.MQTT协议介绍 MQTT数据包 1.固定头 2.可变头 3.消息体payload 消息质量(QoS ) 1.Qos 0 2.Qos 1 3.Qos 2 LWT(最后遗嘱) 四.MQTT协议应用场景 五.代码实现 1.启用 rabbitmq的mqtt协议 2.mqtt 客户端依赖包 3.消息发送者 4.消息订阅 六.测试消息 1.测试消息发送 2.测试消息订阅 七.应用注意事项 clientId 要唯一 八.其他中间件

  • RabbitMQ .NET消息队列使用详解

    本文实例为大家分享了RabbitMQ .NET消息队列使用方法,供大家参考,具体内容如下 首先下载安装包,我都环境是win7 64位: 去官网下载 otp_win64_19.0.exe  和rabbitmq-server-3.6.3.exe安装好 然后开始编程了: (1)创建生产者类: class Program { private static void Main() { //建立RabbitMQ连接和通道 var connectionFactory = new ConnectionFacto

  • .net msmq消息队列实例详解

    本文为大家分享了.net msmq消息队列实例代码,供大家参考,具体内容如下 1.msmq消息队列windows环境安装 控制面板---->程序和功能---->启用或关闭Windows程序---->Microsoft Message Queue(MSMQ)服务器 选中如图所示功能点击"确认"进行安装,安装好后可在 "计算机管理"中进行查看 2.创建消息队列实体对象 /// <summary> /// 消息实体 /// </summ

  • JMS 之 Active MQ 的消息传输(详解)

    本文使用Active MQ5.6 一.消息协商器(Message Broker) broke:消息的交换器,就是对消息进行管理的容器.ActiveMQ 可以创建多个 Broker,客户端与ActiveMQ交互,实际上都是与ActiveMQ中的Broker交互,Broker配置在${MQ_HOME}\conf\activemq.xml. 二.连接器(Connectors)(一).传输连接器 (transportConnectors) transportConnectors 连接器:就是建立brok

  • SpringBoot中RabbitMQ集群的搭建详解

    目录 1. 两种模式 1.1 普通集群 1.2 镜像集群 1.3 节点类型 2. 搭建普通集群 2.1 预备知识 2.2 开始搭建 2.3 代码测试 2.4 反向测试 3. 搭建镜像集群 3.1 网页配置镜像队列 3.2 命令行配置镜像队列 4. 小结 单个的 RabbitMQ 肯定无法实现高可用,要想高可用,还得上集群. 今天松哥就来和大家聊一聊 RabbitMQ 集群的搭建. 1. 两种模式 说到集群,小伙伴们可能第一个问题是,如果我有一个 RabbitMQ 集群,那么是不是我的消息集群中的

  • Springboot整合RabbitMq测试TTL的方法详解

    目录 什么是TTL? 如何设置TTL? 设定整个队列的过期时间 配置类编写 测试 配置 测试 总结 代码下载 什么是TTL? 在RabbitMq中,存在一种高级特性 TTL. TTL即Time To Live的缩写,含义为存活时间或者过期时间.即: 设定消息在队列中存活的时间.当指定时间内,消息依旧未被消费,则由队列自动将其删除. 如何设置TTL? 既然涉及到设定消息的存活时间,在RabbitMq中,存在两种设置方式: 设置整个队列的过期时间. 设置单个消息的过期时间. 设定整个队列的过期时间

  • Java Spring Boot消息服务万字详解分析

    目录 消息服务概述 为什么要使用消息服务 异步处理 应用解耦 流量削峰 分布式事务管理 常用消息中间件介绍 ActiveMQ RabbitMQ RocketMQ RabbitMQ消息中间件 RabbitMQ简介 RabbitMQ工作模式介绍 Work queues(工作队列模式) Public/Subscribe(发布订阅模式) Routing(路由模式) Topics(通配符模式) RPC Headers RabbitMQ安装以及整合环境搭建 安装RabbitMQ 下载RabbitMQ 安装R

  • 基于RabbitMQ几种Exchange 模式详解

    AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列.生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机.先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储.同理,消费者也是如此.Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中. RabbitMQ提供了四种Exchange模式:fanout,direct,topic,header . header模式在实际使用中较少,本文只对前三种模式进行比

随机推荐