Java RabbitMQ的持久化和发布确认详解

目录
  • 1.持久化
    • 1.1实现持久化
    • 1.2不公平分发
    • 1.3测试不公平分发
    • 1.4预取值
      • 1.4.1代码测试
  • 2.发布确认
    • 2.1单个确认发布
    • 2.2批量确认发布
    • 2.3异步确认发布
    • 2.4处理未确认的消息
  • 总结

1. 持久化

当RabbitMQ服务停掉以后消息生产者发送过的消息不丢失。默认情况下RabbitMQ退出或者崩溃时,会忽视掉队列和消息。为了保证消息不丢失需要将队列和消息都标记为持久化。

1.1 实现持久化

1.队列持久化:在创建队列时将channel.queueDeclare();第二个参数改为true。

2.消息持久化:在使用信道发送消息时channel.basicPublish();将第三个参数改为:MessageProperties.PERSISTENT_TEXT_PLAIN表示持久化消息。

/**
 * @Description 持久化MQ
 * @date 2022/3/7 9:14
 */
public class Producer3 {
    private static final String LONG_QUEUE = "long_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 持久化队列
        channel.queueDeclare(LONG_QUEUE,true,false,false,null);
        Scanner scanner = new Scanner(System.in);
        int i = 0;
        while (scanner.hasNext()){
            i++;
            String msg = scanner.next() + i;
            // 持久化消息
            channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送消息:'" + msg + "'成功");
        }
    }
}

但是存储消息还有存在一个缓存的间隔点,没有真正的写入磁盘,持久性保证不够强,但是对于简单队列而言也绰绰有余。

1.2 不公平分发

轮询分发的方式在消费者处理效率不同的情况下并不适用。所以真正的公平应该是遵循能者多劳的前提。

在消费者处修改channel.basicQos(1);表示开启不公平分发

/**
 * @Description 不公平分发消费者
 * @date 2022/3/7 9:27
 */
public class Consumer2 {
    private static final String LONG_QUEUE = "long_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            // 模拟并发沉睡三十秒
            try {
                Thread.sleep(30000);
                System.out.println("线程B接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        // 设置不公平分发
        channel.basicQos(1);
        channel.basicConsume(LONG_QUEUE,false,deliverCallback,
                consumerTag -> {
                    System.out.println(consumerTag + "消费者取消消费");
                });
    }
}

1.3 测试不公平分发

测试目的:是否能实现能者多劳。

测试方法:两个消费者睡眠不同的事件来模拟处理事件不同,如果处理时间(睡眠时间)短的能够处理多个消息就代表目的达成。

先启动生产者创建队列,再分别启动两个消费者。

生产者按照顺序发四条消息:

睡眠时间短的线程A接收到了三条消息

而睡眠时间长的线程B只接收到的第二条消息:

因为线程B在处理消息时消耗的时间较长,所以就将其他消息分配给了线程A。

实验成功!

1.4 预取值

消息的发送和手动确认都是异步完成的,因此就存在一个未确认消息的缓冲区,开发人员希望能够限制缓冲区的大小,用来避免缓冲区里面无限制的未确认消息问题。

这里的预期值就值得是上述方法channel.basicQos();里面的参数,如果在当前信道上存在等于参数的消息就不会在安排当前信道进行消费消息。

1.4.1 代码测试

测试方法:

1.新建两个不同的消费者分别给定预期值5个2。

2.给睡眠时间长的指定为5,时间短的指定为2。

3.假如按照指定的预期值获取消息则表示测试成功,但并不是代表一定会按照5和2分配,这个类似于权重的判别。

代码根据上述代码修改预期值即可。

2. 发布确认

发布确认就是生产者发布消息到队列之后,队列确认进行持久化完毕再通知给生产者的过程。这样才能保证消息不会丢失。

需要注意的是需要开启队列持久化才能使用确认发布。
开启方法:channel.confirmSelect();

2.1 单个确认发布

是一种同步发布的方式,即发送完一个消息之后只有确认它确认发布后,后续的消息才会继续发布,在指定的时间内没有确认就会抛出异常。缺点就是特别慢。

/**
 * @Description 确认发布——单个确认
 * @date 2022/3/7 14:49
 */
public class SoloProducer {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_solo";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 产生队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 开启确认发布
        channel.confirmSelect();
        // 记录开始时间
        long beginTime = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = ""+i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            // 单个发布确认
            boolean flag = channel.waitForConfirms();
            if (flag){
                System.out.println("发送消息:" + i);
            }
        }
        // 记录结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");   }
}

2.2 批量确认发布

一批一批的确认发布可以提高系统的吞吐量。但是缺点是发生故障导致发布出现问题时,需要将整个批处理保存在内存中,后面再重新发布。

/**
 * @Description 确认发布——批量确认
 * @date 2022/3/7 14:49
 */
public class BatchProducer {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_batch";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 产生队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 开启确认发布
        channel.confirmSelect();
        // 设置一个多少一批确认一次。
        int batchSize = MESSAGE_COUNT / 10;
        // 记录开始时间
        long beginTime = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = ""+i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            // 批量发布确认
            if (i % batchSize == 0){
                if (channel.waitForConfirms()){
                    System.out.println("发送消息:" + i);
                }
            }
        }
        // 记录结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
    }
}

显然效率要比单个确认发布的高很多。

2.3 异步确认发布

在编程上比上述两个要复杂,但是性价比很高,无论是可靠性还行效率的都好很多,利用回调函数来达到消息可靠性传递的。

/**
 * @Description 确认发布——异步确认
 * @date 2022/3/7 14:49
 */
public class AsyncProducer {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_async";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 产生队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 开启确认发布
        channel.confirmSelect();
        // 记录开始时间
        long beginTime = System.currentTimeMillis();
        // 确认成功回调
        ConfirmCallback ackCallback = (deliveryTab,multiple) ->{
            System.out.println("确认成功消息:" + deliveryTab);
        };
        // 确认失败回调
        ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
            System.out.println("未确认的消息:" + deliveryTab);
        };
        // 消息监听器
        /**
         * addConfirmListener:
         *                  1. 确认成功的消息;
         *                  2. 确认失败的消息。
         */
        channel.addConfirmListener(ackCallback,nackCallback);
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = "" + i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
        }

        // 记录结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
    }
}

2.4 处理未确认的消息

最好的处理方式把未确认的消息放到一个基于内存的能被发布线程访问的队列。

例如:ConcurrentLinkedQueue可以在确认队列confirm callbacks与发布线程之间进行消息的传递。

处理方式:

1.记录要发送的全部消息;

2.在发布成功确认处删除;

3.打印未确认的消息。

使用一个哈希表存储消息,它的优点:

可以将需要和消息进行关联;轻松批量删除条目;支持高并发。

ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
/**
 * @Description 异步发布确认,处理未发布成功的消息
 * @date 2022/3/7 18:09
 */
public class AsyncProducerRemember {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_async_remember";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 产生队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 开启确认发布
        channel.confirmSelect();
        // 线程安全有序的一个hash表,适用与高并发
        ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>();
        // 记录开始时间
        long beginTime = System.currentTimeMillis();
        // 确认成功回调
        ConfirmCallback ackCallback = (deliveryTab, multiple) ->{
            //2. 在发布成功确认处删除;
            // 批量删除
            if (multiple){
                ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab);
                confirmMap.clear();
            }else {
                // 单独删除
                map.remove(deliveryTab);
            }
            System.out.println("确认成功消息:" + deliveryTab);
        };
        // 确认失败回调
        ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
            // 3. 打印未确认的消息。
            System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab);
        };
        // 消息监听器
        /**
         * addConfirmListener:
         *                  1. 确认成功的消息;
         *                  2. 确认失败的消息。
         */
        channel.addConfirmListener(ackCallback,nackCallback);
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = "" + i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            // 1. 记录要发送的全部消息;
            map.put(channel.getNextPublishSeqNo(),msg);
        }

        // 记录结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
    }
}

总结

显然来说,异步处理除了在编码处有些麻烦,在处理时间效率和可用性上都是比单处理和批处理好很多。

本篇文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注我们的更多内容!

(0)

相关推荐

  • 聊聊RabbitMQ发布确认高级问题

    目录 1.发布确认高级 1.1.发布确认SpringBoot版本 1.1.1.确认机制方案 1.1.2.代码架构图 1.1.3.配置文件 1.1.4.配置类 1.1.5.回调接口 1.1.6.生产者 1.1.7.消费者 1.1.8.测试结果 1.2.回退消息 1.2.1.Mandatory参数 1.2.2.配置文件 1.2.3.生产者代码 1.2.4.回调接口代码 1.2.5.测试结果 1.3.备份交换机 1.3.1.代码架构图 1.3.2.配置类代码 1.3.3.消费者代码 1.3.4.测试结

  • RabbitMQ 的消息持久化与 Spring AMQP 的实现详解

    前言 要从奔溃的 RabbitMQ 中恢复的消息,我们需要做消息持久化.如果消息要从 RabbitMQ 奔溃中恢复,那么必须满足三点,且三者缺一不可. 交换器必须是持久化. 队列必须是持久化的. 消息必须是持久化的. 原生的实现方式 原生的 RabbitMQ 客户端需要完成三个步骤. 第一步,交换器的持久化. // 参数1 exchange :交换器名 // 参数2 type :交换器类型 // 参数3 durable :是否持久化 channel.exchangeDeclare(EXCHANG

  • rabbitmq学习系列教程之消息应答(autoAck)、队列持久化(durable)及消息持久化

    目录 一.前言 二.autoAck参数的讨论 三.rabbitmq队列持久化操作 四.2019.11.04问题补充 五.2019.11.07消息的持久化 六.2022.02.09增加队列持久化说明 结语 一.前言 Boolean autoAck = false; channel.basicConsume(queue_name, autoAck ,consumer); 在simple queue 和 work queue(轮询) 处理中,我们设置的消费者的消息监听都采用 channel.basic

  • Java RabbitMQ的持久化和发布确认详解

    目录 1.持久化 1.1实现持久化 1.2不公平分发 1.3测试不公平分发 1.4预取值 1.4.1代码测试 2.发布确认 2.1单个确认发布 2.2批量确认发布 2.3异步确认发布 2.4处理未确认的消息 总结 1. 持久化 当RabbitMQ服务停掉以后消息生产者发送过的消息不丢失.默认情况下RabbitMQ退出或者崩溃时,会忽视掉队列和消息.为了保证消息不丢失需要将队列和消息都标记为持久化. 1.1 实现持久化 1.队列持久化:在创建队列时将channel.queueDeclare();第

  • Java RabbitMQ的工作队列与消息应答详解

    目录 WorkQueues 1.轮询分发消息 1.1抽取工具类 1.2编写两个工作线程 1.3编写生产者 1.4运行测试 1.5异常情况 2.消息应答 2.1自动应答 2.2手动应答 2.3消息自动重新入队 2.4手动应答测试 2.4.1生产者代码 2.4.2消费者代码 2.4.3测试 总结 Work Queues 工作队列(任务队列)主要思想是避免立即执行资源密集型任务,而不得不等待它完成,相反我们安排任务在之后执行.我们把任务封装为消息并将其发送到队列.在后台运行的工作进程将弹出任务并最终执

  • Java RabbitMQ的TTL和DLX全面精解

    目录 RabbitMQ的TTL 1.TTL概述 2.设置消息有效期 2.1.通过队列设置有效期 2.2.通过发送消息时设置有效期 3.设置队列有效期(不常用,仅作了解) RabbitMQ的DLX 1.DLX是什么 2.DLX有什么用 3.DLX使用方式 本节继续介绍RabbitMQ的高级特性:TTL(Time-To-Live消息有效期)和DLX(Dead-Letter-Exchange死信交换机.死信队列) RabbitMQ的TTL 1.TTL概述 RabbitMQ的TTL全称为Time-To-

  • 基于MyBatis的数据持久化框架的使用详解

    目录 一.MyBatis是什么 1.1.概述 1.2.什么是持久化 1.3.什么是ORM 1.4.MyBatis主要内容 1.5.优点 1.6.缺点 二.MyBatis架构 2.1.mybatis所依赖的jar包 2.2.MyBatis准备工作 三.MyBatis 核心对象 一.MyBatis是什么 1.1.概述 Mybatis是一个优秀的开源.轻量级持久层框架,它对JDBC操作数据库的过程进行封装,简化了加载驱动.创建连接.创建 statement 等繁杂的过程,使开发者只需要关注sql本身.

  • Java 生成随机字符串数组的实例详解

    Java 生成随机字符串数组的实例详解 利用Collections.sort()方法对泛型为String的List 进行排序.具体要求: 1.创建完List<String>之后,往其中添加十条随机字符串 2.每条字符串的长度为10以内的随机整数 3.每条字符串的每个字符都为随机生成的字符,字符可以重叠 4.每条随机字符串不可重复 将涉及到的知识有: String.StringBuffer.ListArray.泛型.Collections.sort.foreach.Random等相关知识,算是

  • java 设计模式(DAO)的实例详解

    java 设计模式(DAO)的实例详解 应用场景:在Java程序中,经常需要把数据持久化,也需要获取持久化的数据,但是在进行数据持久化的过程中面临诸多问题(如:数据源不同.存储类型不同.供应商不同.访问方式不同等等),请问如何能以统一的接口进行数据持久化的操作? 其实这个我没学号(≧ ﹏ ≦).我的理解就是一个产品面向的用户不是单一的,所以我们要兼容许多情况如前面提到的数据源不同.存储类型不同.供应商不同.访问方式不同等等. ★ 解决方案 DAO的理解: 1.DAO其实是利用组合工厂模式来解决问

  • RN在Android打包发布App(详解)

    1-:生成一个签名密钥 你可以用keytool命令生成一个私有密钥.在Windows上keytool命令放在JDK的bin目录中(比如C:\Program Files\Java\jdkx.x.x_x\bin),你可能需要在命令行中先进入那个目录才能执行此命令.在mac上,直接进入项目根目录输入一下命令: $ keytool -genkey -v -keystore my-release-key.keystore -alias my-key-alias -keyalg RSA -keysize 2

  • java 中maven pom.xml文件教程详解

    maven pom.xml文件教程详解,具体内容如下所示: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.x

  • tensorflow的ckpt及pb模型持久化方式及转化详解

    使用tensorflow训练模型的时候,模型持久化对我们来说非常重要. 如果我们的模型比较复杂,需要的数据比较多,那么在模型的训练时间会耗时很长.如果在训练过程中出现了模型不可预期的错误,导致训练意外终止,那么我们将会前功尽弃.为了解决这一问题,我们可以使用模型持久化(保存为ckpt文件格式)来保存我们在训练过程中的临时数据.. 如果我们训练出的模型需要提供给用户做离线预测,那么我们只需要完成前向传播过程.这个时候我们就可以使用模型持久化(保存为pb文件格式)来只保存前向传播过程中的变量并将变量

  • 在Java中操作Zookeeper的示例代码详解

    依赖 <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.0</version> </dependency> 连接到zkServer //连接字符串,zkServer的ip.port,如果是集群逗号分隔 String connectStr = "192.

随机推荐