详解RabbitMq如何做到消息的可靠性投递

目录
  • 前言
  • RabbitMq的投递及消费流程
  • 提供者如何确保消息的成功投递
  • 单条消息的同步确认
  • 多条消息的同步确认
  • 异步消息确认
  • 消息的返回机制

前言

现在的一些互联网项目或者是高并发的项目中很少有没有引入消息队列的。 引入消息队列可以给这个项目带来很多的好处:比如

  • 削峰

这个就很好的理解,在系统中的请求量是固定的,但是有的时候会多出很多的突发流量,比如在有秒杀活动的时候,这种瞬时的高流量可能会打垮系统,这个时候就可以很好的引入MQ,将这些请求积压到MQ中,然后消费端在按照自已的能力去处理这里请求

  • 解耦合

比如现在有系统A,当系统A执行完成后,B、C系统需要拿到A系统的结果才可以继续执行,如果不引入MQ,A系统还要调用B、C系统,这样这A、B、C三个系统的耦合性就很大。引入MQ后A系统的执行结果只需要保证将消息投递到MQ就好,其它的两个系统只需要监听这个MQ的某个队列,这样就降低了这三个系统之间的耦合性。

  • 异步

再通过A、B、C这三个系统举例。A系统在返回给用户的执行结果前需要完成B、C系统的调用,这个总的执行时间是A+B+C的执行时间,如果引入MQ,A系统的执行完成后将数据投递到MQ,直接响应用户。B、C再这在通过监听完成数据的处理。这样也降低了用户的等待时间

除了这些好处,当然引入MQ还会有不好的地方:比如

  • 数据一致性问题

    • A系统执行完将数据投递到了MQ,B、C在消费的时候如果出现了问题,是不是就导致了数据不一致的问题
  • 可用性降低
    • 一个好好的系统,引入一个MQ,如果这个MQ拓机了呢?这个可能就需要集群来提高MQ的高可用。
  • 系统的复杂度提高
    • 引入了MQ,我们还需要关注消息是否被成功的投递,MQ中的消息被积压太多怎么办?消费端是否成功的消费的消息。

这些都是问题,所在是否要引入MQ还需要看业务需求

RabbitMq的投递及消费流程

这里有张投递消息到消费的流程图

从这张图上可看出这也是一种AMQP协议的实现。消息的提供者先是通过某一个信道将消息发送到交换机,然后交换机通通RoutingKey来将消息分发到某一个队列上。然后,消费者在临听某一个队列来进行消息的消费。

今天我们的主题是如何保证消息的投递可靠性。那么我们来想想在这个流程中那些位置可能会影响我们消息的投递可靠性?

从上图中我们可以总结出有二个因素影响着消息是否被成功投递和被成功消费

提供者

  • 提供者有没有将消成功的发送到MQ并被处理
  • 发送到MQ中的消息有没有成功的被路由到队列中

消费者

  • 消费者有没有成功的签收消息并成功处理。
  • 消费者是否可以保证消费者的稳定性

提供者如何确保消息的成功投递

解决这个问题,我们可以通过提供者的发送方确认机制来实现,这个发送方确认机制又分成三种:

  • 单条消息的同步确认
  • 多条消息的同步确认
  • 异步消息确认

单条消息的同步确认

首先要在当前的Channel上开启消息确认模式,然后通过waitForConfirms()方法进行消息确认是否发送成功。

public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("host");
        cf.setPort(5672);
        cf.setUsername("账号");
        cf.setPassword("密码");
        try(Connection connection = cf.newConnection();
            Channel channel = connection.createChannel()){
            channel.confirmSelect();
            Map<String,String> mes = new HashMap<>();
            mes.put("name","1111");
            String messageStr = objectMapper.writeValueAsString(mes);
            channel.basicPublish(
                    "exchange.drinks",
                    "drinks.juzi",
                    null,
                    messageStr.getBytes());
            boolean isSendSuccess = channel.waitForConfirms();
            if(isSendSuccess){
                System.out.print("消息发送成功");
            }
        }
    }

这样做的话每次发完消息后,都会确保消息是否发送成功。如果发送失败的话进行相应的处理。

多条消息的同步确认

多条消息的确认和单条的差不多,比如我将发送消息的代码放到一个循环内。

public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("host");
        cf.setPort(5672);
        cf.setUsername("账号");
        cf.setPassword("密码");
        try(Connection connection = cf.newConnection();
            Channel channel = connection.createChannel()){
            channel.confirmSelect();
            Map<String,String> mes = new HashMap<>();
            mes.put("name","1111");
            String messageStr = objectMapper.writeValueAsString(mes);
            for(int i = 0;i < 100;i++){
                channel.basicPublish(
                        "exchange.drinks",
                        "drinks.juzi",
                        null,
                        messageStr());
            }
            boolean isSendSuccess = channel.waitForConfirms();
            System.out.println(isSendSuccess);
        }
    }

这样的话当一批消息发送完成后,进行统一的消息确认是否发送成功,就成了多条的消息确认,不过并不推荐使用这种确认消息的方式

在多条的消息确认中,比如我先是发送了一批的消息,比如这批消息有100条,这个时候如果有其中的一条消息没有发送成功,这里返回的也是false,然尔我们并不能知道是具体的哪 一条消息发送失败。

异步消息确认

异步的消息确认是通过一个监听器来实现的,当消息发送后,会接着执行下面的逻辑,可能在稍会的一段时间,监听器监听到了Broker的返回,再进行逻辑的处理。

public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("host");
        cf.setPort(5672);
        cf.setUsername("账号");
        cf.setPassword("密码");
        try(Connection connection = cf.newConnection();
            Channel channel = connection.createChannel()){
            channel.confirmSelect();
            ConfirmListener confirmListener = new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("发送成功:" + deliveryTag + " multiple:" + multiple);
                }
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("发送失败:" + deliveryTag);
                }
            };
            channel.addConfirmListener(confirmListener);
            Map<String,String> mes = new HashMap<>();
            mes.put("name","11111");
            String messageStr = objectMapper.writeValueAsString(mes);
            for(int i = 0;i < 100;i++){
                channel.basicPublish(
                        "exchange.drinks",
                        "drinks.juzi",
                        null,
                        messageStr.getBytes());
            }
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

当成功的发送消息的时候会回调监听器中的handleAck方法,如果没有发送成功会回调handleNack方法 在这个监听器里面有两个参数一个deliveryTagmultiple:

  • deliveryTag:表示当前的Channel发送的第几条消息
  • multiple:是否在确认多条消息

这个异步的虽然在听觉上感觉比较厉害些,这里也不推荐使用,原因和上面的一样,我们并不能具休的知道是哪一条消息没有被确认发送。

综上:这里更加推荐单条消息确认,具体选择哪一种还是要用业务做出选择

注:注意一点是当一条消息成功的发送到Broker,但是如果没有正确的路由到队列,那么这时borker也是会返回true,因为Broker确时接收到了消息只是RoutingKey不可达,所以这里也会返回true,并且直接将消息丢弃

消息的返回机制

这个消息返回机制的作用就是在当一个消息成功的发送,但是并没有正确路由到队列的时候所回调的。

这也弥补了上面确认消息是否发送成功但没有路由到队列所返回true的问题 在使用消息返回机制的时候在发送消息时需要将mandatory置成true。再添加对应的监听器。

public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("host");
        cf.setPort(5672);
        cf.setUsername("账号");
        cf.setPassword("密码");
        try(Connection connection = cf.newConnection();
            Channel channel = connection.createChannel()){
            channel.addReturnListener(new ReturnCallback() {
                @Override
                public void handle(Return returnMessage) {
                    System.out.println("replyCode:" + returnMessage.getReplyCode() + " replyText:" + returnMessage.getReplyText() + " routingKey:"
                    + returnMessage.getRoutingKey() + " exchange:" + returnMessage.getExchange() + " body:" + new String(returnMessage.getBody()));
                }
            });
            Map<String,String> mes = new HashMap<>();
            mes.put("name","11111");
            String messageStr = objectMapper.writeValueAsString(mes);
            channel.basicPublish(
                    "exchange.drinks",
                    "drinks.juzi1",
                    true,
                    null,
                    messageStr.getBytes());
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

这里的addReturnListener方法有两个重载:只不过是handle的参数不同,一个是参数都显示在了参数列表内,一个是将参数封装到了Return对象内。当handle被回调的时候也可以获取到相应的参数比如:exchange routingkey body。

注:保证消息可靠性投递的前提是服务的高可用,服务不高可用谈其它的都是扯

以上就是详解RabbitMq如何做到消息的可靠性投递的详细内容,更多关于RabbitMq 消息可靠性投递的资料请关注我们其它相关文章!

(0)

相关推荐

  • RabbitMq如何做到消息的可靠性投递

    目录 如何保证消息不丢失 Go 实现 安装操作库 发送端的确认 消费端的确认 如何保证消息不丢失 在使用RabbitMQ的时候,我们需要保证消息不能丢失,消息从生产者生产出来一直到消费者消费成功,这条链路是这样的: 消息的可靠投递分为了两大内容:发送端的确认(p->broker和exchange->queue)和消费端的确认(queue->c). 发送端的确认 Rabbit提供了两种方式来保证发送端的消息可靠性投递:confirm 确认模式 和return 退回模式. confirm 确

  • 详解RabbitMq如何做到消息的可靠性投递

    目录 前言 RabbitMq的投递及消费流程 提供者如何确保消息的成功投递 单条消息的同步确认 多条消息的同步确认 异步消息确认 消息的返回机制 前言 现在的一些互联网项目或者是高并发的项目中很少有没有引入消息队列的. 引入消息队列可以给这个项目带来很多的好处:比如 削峰 这个就很好的理解,在系统中的请求量是固定的,但是有的时候会多出很多的突发流量,比如在有秒杀活动的时候,这种瞬时的高流量可能会打垮系统,这个时候就可以很好的引入MQ,将这些请求积压到MQ中,然后消费端在按照自已的能力去处理这里请

  • 详解RabbitMQ中死信队列和延迟队列的使用详解

    目录 简介 死信队列 简介 示例 延迟队列 简介 使用场景 简介 本文介绍RabbitMQ的死信队列和延迟队列. 本内容也是Java后端面试中常见的问题. 死信队列 简介 DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列. 以下几种情况会导致消息变成死信: 消息被拒绝(Basic.Reject/Ba

  • Springboot详解RocketMQ实现广播消息流程

    RocketMQ消息模式主要有两种:广播模式.集群模式(负载均衡模式) 广播模式是每个消费者,都会消费消息: 负载均衡模式是每一个消费只会被某一个消费者消费一次: 我们业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮箱,手机,站内提示: 我们可以通过@RocketMQMessageListener的messageModel属性值来设置,MessageModel.BROADCASTING是广播模式,MessageModel.CLUSTERING是默认集群负载均衡

  • 详解kafka中的消息分区分配算法

    目录 背景 RangeAssignor 定义 源码分析 场景 RoundRobinAssignor 定义 源码分析 场景 StickyAssignor 定义 场景 背景 kafka有分区机制,一个主题topic在创建的时候,会设置分区.如果只有一个分区,那所有的消费者都订阅的是这一个分区消息:如果有多个分区的话,那消费者之间又是如何分配的呢? 分配算法 RangeAssignor 定义 Kafka默认采⽤RangeAssignor的分配算法. RangeAssignor策略的原理是按照消费者总数

  • 详解微信小程序审核不通过的解决方法

    前言 近来,微信小程序一直活跃在开发者的眼球中.很多开发者都投身微信小程序的开发中,而这些开发者,总是需要面对最后一道难题:如何以一种优雅的姿势来通过微信官方的审核.本文基于几天前提交审核的一次总结,写得有不当的地方,请各位大佬指正. 问题描述 先上一下微信小程序平台常见拒绝情形的说明文件.由于我提交的小程序中包含了"分享群"的按钮,所以审核未通过,未通过的原因如下: 3.2.1 小程序的页面内容中,存在诱导类行为,包括但不限于诱导分享.诱导添加.诱导关注公众号.诱导下载等,要求用户分

  • 详解消息队列及RabbitMQ部署和使用

    目录 什么是消息队列 为什么需要消息队列 常见的消息队列 ActiveMQ RabbitMQ ZeroMQ Kafka RocketMQ RabbitMQ 的部署和使用 Python 编写生产者 Python 编写消费者 最后的话 什么是消息队列 消息队列拆开了看,就是消息 + 队列,消息是什么?其实就是程序之间通讯所用到的数据,消息从生产者那里产生,进入队列后,安装设计好的规则出队,由消费者消费.仅此而已. 为什么需要消息队列 消息队列,最重要的是队列,可以想象一下没有队列的场景,你去银行办业

  • SpringBoot+RabbitMQ实现消息可靠传输详解

    目录 环境配置 消息丢失分析 生产阶段 生产端模拟消息丢失 RabbitMQ 消费端 环境配置 SpringBoot 整合 RabbitMQ 实现消息的发送. 1.添加 maven 依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <depen

  • RabbitMQ延迟队列及消息延迟推送实现详解

    这篇文章主要介绍了RabbitMQ延迟队列及消息延迟推送实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能. 12306 购票支付确认页面.我们在选好票点击确定跳转的页面中往往都会有倒计时,代表着 3

  • 详解SpringBoot整合RabbitMQ如何实现消息确认

    目录 简介 生产者消息确认 介绍 流程 配置 ConfirmCallback ReturnCallback 注册ConfirmCallback和ReturnCallback 消费者消息确认 介绍 手动确认三种方式 简介 本文介绍SpringBoot整合RabbitMQ如何进行消息的确认. 生产者消息确认 介绍 发送消息确认:用来确认消息从 producer发送到 broker 然后broker 的 exchange 到 queue过程中,消息是否成功投递. 如果消息和队列是可持久化的,那么确认消

随机推荐