rabbitmq学习系列教程之消息应答(autoAck)、队列持久化(durable)及消息持久化

目录
  • 一、前言
  • 二、autoAck参数的讨论
  • 三、rabbitmq队列持久化操作
  • 四、2019.11.04问题补充
  • 五、2019.11.07消息的持久化
  • 六、2022.02.09增加队列持久化说明
  • 结语

一、前言

Boolean autoAck = false;
channel.basicConsume(queue_name, autoAck ,consumer);

在simple queue 和 work queue(轮询) 处理中,我们设置的消费者的消息监听都采用 channel.basicConsume(queue_name,true, consumer),其中参数二 boolean autoAck为true,但在fair prefetch 公平分发中设置为false,这个设置在整个消息队列和消息消费者之间有什么影响呢?

二、autoAck 参数的讨论

我们都明白一点,autoAck设置为true时,消息队列可以不用在意消息消费者是否处理完消息,一直发送全部消息。但在公平分发中,也就是autoAck设置为false,在发送一个消息后到没收到消息消费者成功消费消息的信息回执之间,是不会继续给这个消息继续发送消息的。

1、当 autoAck设置为true时,也就是自动确认模式,一旦消息队列将消息发送给消息消费者后,就会从内存中将这个消息删除

2、当autoAck设置为false时,也就是手动模式,如果此时的有一个消费者宕机,消息队列就会将这条消息继续发送给其他的消费者,这样数据在消息消费者集群的环境下,也就算是不丢失了。

在 Boolean autoAck = true的情况下,消息队列不会管消费者是否收到了消息,如果消费者宕机,消息也就丢失了。

在 Boolean autoAck = false的情况下,如果消费者1宕机了,消息队列没有收到消费者发送回的应答,就会将这个消息发送给下一个消费者处理。直到消费者处理完这个消息,并向消息队列发送了一个消息应答,告诉消息队列此时这个消息已经处理完成,消息队列才会将这个消息从内存中删除。

由此我们可以思考一个问题:从上面两个设置中,我们当然会认为false比较好了,但大家可能会忽略一个小问题,消息队列保存的消息是在内存中的,消息队列宕机了,内存中的消息也就清除了,如何做到消息的持久化保存呢?

三、rabbitmq 队列持久化操作

我们之前在消息的生产者和消息的消费者中都声明了一个消息队列。

channel.queueDeclare(queue_name, false, false, false, null);

他的源码介绍为:

Queue.DeclareOk queueDeclare(String queue,
boolean durable,
boolean exclusive,
boolean autoDelete,
Map<String, Object> arguments) throws IOException;

其中各项参数的含义:

queue:声明队列的名称
durable:如果我们声明一个持久队列,则为true(该队列将在服务器重启后保留下来)
exclusive:如果我们声明一个独占队列,则为true(仅限此连接)
autoDelete:如果我们声明一个自动删除队列(服务器将在不再使用它时将其删除)
arguments:队列的其他属性(构造参数)

【扩展:】arguments参数干啥用的?

当前的arguments参数的含义,用于设定队列的属性,如下所示:

1、x-expires设定队列有效期。表示队列在指定时间内未使用,则会删除。

2、x-message-ttl设定消息延迟发送时间

3、x-dead-letter-exchange设置死信交换机

4、x-dead-letter-routing-key设置路由

参照文章:

RabbitMQ之消息有效期与死信

rabbitmq创建queue时arguments参数注释

小细节:从上面的参数信息中我们发现一个参数durable,发现这个参数是声明队列为持久化队列,那我们改成 true 是否就可以了呢?我们尝试下!

修改send代码中的声明队列:

boolean durable = true;
channel.queueDeclare(queue_name, durable, false, false, null);

运行起来,结果。。

注意:出现这种情况的原因是我的rabbitmq中本身就存在一个设置好了的queue,如下所示:

如果在已存在的消息队列上,依据修改代码变更持久化队列操作,则会出现如上所述的异常信息。
但如果rabbitmq中不存在对应的消息队列时,则不会造成影响。
结论:

rabbitmq不允许对一个已存在的队列重新定义参数信息。

由上面的测试发现:

1、如果在localhost:15672中删除指定的queue,则可以创建出一个持久化队列。
2、重新定义一个网址上不存在的名称作为持久化队列。

最后再强调一点:
消息生产者和消息消费者的队列声明(队列设置),必须保持一致。
原因:rabbitmq不允许对一个已存在的队列重新定义参数信息
有些大佬说无需在消费者中声明队列,其实最好还是需要声明,原因在于,如果rabbitmq中不存在指定的queue_name的消息队列时,运行代码将会出现报错信息!!

四、2019.11.04 问题补充

上面的两个参数信息消息应答(autoAck)与消息持久化(durable),都往持久化的方向设置了,消息会持久化保存吗?
答案:错。

1、消息应答设置为手动模式,只是确保消息能够正常的被消费掉,而并非标识消息的持久化。
2、durable设置为true,只是说我们设置一个消息队列的属性为持久化队列,在rabbitmq中有很多个通道和队列,并非标识整体的消息就是持久化了。

为什么说按照上述设置条件,设定了消息队列后,消息队列中的消息还是不能持久化保存呢?

消息生产者生产50个消息并放入消息队列中

重启rabbitmq服务(模拟宕机)

重启完成后,访问 localhost:15672 查看 Queues属性,

发现消息队列在重启服务后是存在的,但其中的消息却不存在了。

要想彻底实现服务宕机等操作后,消息依旧能够实现持久化保存(硬盘保存),还需要继续进行学习,研究。

五、2019.11.07消息的持久化

通过上面的测试我们发现:

durable 只是表明消息队列的持久化,不表示消息的持久化。

在消息生产者生产消息推送至消息队列中(或消息转发器)时,我们使用了一个方法

channel.basicPublish(String exchange,
String routingKey,
BasicProperties props,
byte[] body)

其中的参数三 BasicProperties props表示额外配置属性。
那么这个属性在源码中有什么呢?

public static class BasicProperties extends
com.rabbitmq.client.impl.AMQBasicProperties {
        private String contentType;//消息类型如:text/plain
        private String contentEncoding;//编码
        private Map<String,Object> headers;
        private Integer deliveryMode;//1:nonpersistent 2:persistent
        private Integer priority;//优先级
        private String correlationId;
        private String replyTo;//反馈队列
        private String expiration;//expiration到期时间
        private String messageId;
        private Date timestamp;
        private String type;
        private String userId;
        private String appId;
        private String clusterId;
...

从源码中我们看到 BasicProperties中存在一条属性 deliveryMode,

1表示不持久化; 2表示持久化!

如何使用代码实现呢?

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import cn.linkpower.util.MqConnectUtil;

/**
 * 公平分发--谁做的快谁就多做!<br>
 * 只有在消息消费者成功消费消息,发送消费成功的指令给队列后,消息队列才会继续向该消费者发送下一条消息指令。<br>
 * @author 76519
 *
 */
public class Send {
	private static final String queue_name = "test_work_queue";
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		//1、建立连接
		Connection mqConnection = MqConnectUtil.getMqConnection();
		//2、建立信道(通道)
		Channel channel = mqConnection.createChannel();
		//3、声明队列(开启持久化)
		boolean durable = true;
		channel.queueDeclare(queue_name, durable, false, false, null);

		//公平分发---
		//为了开启公平分发操作,在消息消费者发送确认收到的指示后,消息队列才会给这个消费者继续发送下一条消息。
		//此处的 1 表示 限制发送给每个消费者每次最大的消息数。
		channel.basicQos(1);

		//4、发送消息
		for (int i = 0; i < 10; i++) {
			String string = "hello xiangjiao "+i;
			System.out.println("send msg = "+string);
			//发送消息
			//channel.basicPublish("", queue_name, null, string.getBytes());
			//消息持久化测试
			Builder builder = new Builder();
			builder.deliveryMode(2);
			BasicProperties properties = builder.build();
			channel.basicPublish("", queue_name, properties, string.getBytes());
			//消息发送慢一点
			Thread.sleep(i*5);
		}
		//5、使用完毕后,需要及时的关闭流应用
		channel.close();
		mqConnection.close();
	}
}

测试操作:
1、运行代码,查看 local’host:15672 登陆指定的账号,查询queue信息

2、重启rabbitmq服务。

3、重启后,再次查看 web 控制台

发现:当重新完全启动 rabbitmq 后,他会自动加载之前的消息至消息队列中。

但是此时并不能说明问题,我们是否忽略了一点,你确定了这个消息队列的消息了没有?

so 我们运行消息消费者 查看这个消息队列里面的消息到底是什么?

六、2022.02.09 增加队列持久化说明

在之前的代码中,设置队列属性为createChannel.queueDeclare(simpleQueueName, false, false, false, null),其中参数二代表该队列是否是一个持久化队列,此处设置的为false,表示非持久化

这是什么含义呢?

执行消息创建并添加至队列的代码逻辑。此时队列中存在数据,其次也存在该队列。

将Rabbitmq重启,再次查看web:

/sbin/service rabbitmq-server stop
/sbin/service rabbitmq-server start

此时通过web界面得知,该队列在rabbitmq重启后,队列没了!

结语

到此为止,消息持久化、队列持久化 算是琢磨个差不多了。

到此这篇关于rabbitmq学习系列教程之消息应答(autoAck)、队列持久化(durable)及消息持久化的文章就介绍到这了,更多相关rabbitmq消息应答队列持久化消息持久化内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 聊聊RabbitMQ发布确认高级问题

    目录 1.发布确认高级 1.1.发布确认SpringBoot版本 1.1.1.确认机制方案 1.1.2.代码架构图 1.1.3.配置文件 1.1.4.配置类 1.1.5.回调接口 1.1.6.生产者 1.1.7.消费者 1.1.8.测试结果 1.2.回退消息 1.2.1.Mandatory参数 1.2.2.配置文件 1.2.3.生产者代码 1.2.4.回调接口代码 1.2.5.测试结果 1.3.备份交换机 1.3.1.代码架构图 1.3.2.配置类代码 1.3.3.消费者代码 1.3.4.测试结

  • Java RabbitMQ的持久化和发布确认详解

    目录 1.持久化 1.1实现持久化 1.2不公平分发 1.3测试不公平分发 1.4预取值 1.4.1代码测试 2.发布确认 2.1单个确认发布 2.2批量确认发布 2.3异步确认发布 2.4处理未确认的消息 总结 1. 持久化 当RabbitMQ服务停掉以后消息生产者发送过的消息不丢失.默认情况下RabbitMQ退出或者崩溃时,会忽视掉队列和消息.为了保证消息不丢失需要将队列和消息都标记为持久化. 1.1 实现持久化 1.队列持久化:在创建队列时将channel.queueDeclare();第

  • RabbitMQ 的消息持久化与 Spring AMQP 的实现详解

    前言 要从奔溃的 RabbitMQ 中恢复的消息,我们需要做消息持久化.如果消息要从 RabbitMQ 奔溃中恢复,那么必须满足三点,且三者缺一不可. 交换器必须是持久化. 队列必须是持久化的. 消息必须是持久化的. 原生的实现方式 原生的 RabbitMQ 客户端需要完成三个步骤. 第一步,交换器的持久化. // 参数1 exchange :交换器名 // 参数2 type :交换器类型 // 参数3 durable :是否持久化 channel.exchangeDeclare(EXCHANG

  • rabbitmq学习系列教程之消息应答(autoAck)、队列持久化(durable)及消息持久化

    目录 一.前言 二.autoAck参数的讨论 三.rabbitmq队列持久化操作 四.2019.11.04问题补充 五.2019.11.07消息的持久化 六.2022.02.09增加队列持久化说明 结语 一.前言 Boolean autoAck = false; channel.basicConsume(queue_name, autoAck ,consumer); 在simple queue 和 work queue(轮询) 处理中,我们设置的消费者的消息监听都采用 channel.basic

  • Java RabbitMQ的工作队列与消息应答详解

    目录 WorkQueues 1.轮询分发消息 1.1抽取工具类 1.2编写两个工作线程 1.3编写生产者 1.4运行测试 1.5异常情况 2.消息应答 2.1自动应答 2.2手动应答 2.3消息自动重新入队 2.4手动应答测试 2.4.1生产者代码 2.4.2消费者代码 2.4.3测试 总结 Work Queues 工作队列(任务队列)主要思想是避免立即执行资源密集型任务,而不得不等待它完成,相反我们安排任务在之后执行.我们把任务封装为消息并将其发送到队列.在后台运行的工作进程将弹出任务并最终执

  • Spring Boot系列教程之死信队列详解

    前言 在说死信队列之前,我们先介绍下为什么需要用死信队列. 如果想直接了解死信对接,直接跳入下文的"死信队列"部分即可. ack机制和requeue-rejected属性 我们还是基于上篇<Spring Boot系列--7步集成RabbitMQ>的demo代码来说. 在项目springboot-demo我们看到application.yaml文件部分配置内容如下 ... listener: type: simple simple: acknowledge-mode: aut

  • C#开发微信门户及应用(3) 文本消息和图文消息应答

    微信应用如火如荼,很多公司都希望搭上信息快车,这个是一个商机,也是一个技术的方向,因此,有空研究下.学习下微信的相关开发,也就成为计划的安排事情之一了.本系列文章希望从一个循序渐进的角度上,全面介绍微信的相关开发过程和相关经验总结,希望给大家了解一下相关的开发历程. 在前面两篇两篇随笔<C#开发微信门户及应用(1)--开始使用微信接口>和<C#开发微信门户及应用(2)--微信消息的处理和应答>里面,大致介绍了我微信应用的框架构建,本随笔继续介绍这一主题,介绍消息应答里面的文本应答和

  • asp.net微信开发(消息应答)

    当普通微信用户向公众账号发消息时,微信服务器将POST消息的XML数据包到开发者填写的URL上. 请注意: 1.关于重试的消息排重,推荐使用msgid排重. 2.微信服务器在五秒内收不到响应会断掉连接,并且重新发起请求,总共重试三次.假如服务器无法保证在五秒内处理并回复,可以直接回复空串,微信服务器不会对此作任何处理,并且不会发起重试.详情请见"发送消息-被动回复消息". 3.为了保证更高的安全保障,开发者可以在公众平台官网的开发者中心处设置消息加密.开启加密后,用户发来的消息会被加密

  • LNMP系列教程之一 添加域名建立站点

    昨天老左分享了"Linux VPS CentOS安装LNMP系统环境教程",有些朋友说网上已经有过类似的教程,可能你再发布有些多余.我认为还是有必要的,一来是我自己学习使用,二来可以作为记录自己使用的时候笔记,以便下次使用的时候直接利用起来,同时别人的毕竟是别人的,自己还是需要梳理.就像我从现在开始,整理的关于LNMP的系列教程一样,从这一篇开始我将把LNMP的使用全过程做一个记录,一直到最后的维护升级等等.我相信通过这么一个积累,也是为以后跟我一样的新手可以有一个很好的参考.大家认为

  • Knockoutjs 学习系列(二)花式捆绑

    在上一篇Knockoutjs 学习系列(一)ko初体验文章中提到,ko中的 data-bind = "XX:OO"绑定大法除了可以绑定text.value等内容,还可以绑定visible.style等外观属性,也可以绑定click.textInput等各种事件,甚至还能控制程序流程.各种花式捆绑,绝对满足你的幻想. 下面简单讲讲各种绑定的使用,主要根据被绑定的属性分成表现类.流程类和交互类三种. 表现类属性 表现类的绑定属性有visible.text.html.css.style.at

  • C# Redis学习系列(一)Redis下载安装使用

    下一篇:C# Redis学习系列二:Redis基本设置 一.认识Redis 1. Redis 是一个高性能的key-value数据库. 2. 它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合).zset(sorted set --有序集合)和hash(哈希类型). 3.周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件 4.别人说的 比我好 Redis百度百科 二.下载 为了匹配我的教程,我推荐下载 redis-2.8.2400 三.如何安

  • spring boot集成rabbitmq的实例教程

    一.RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache). 消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下: 从上图可看出,对于消息队列来说,生产者,消息队列,消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息

  • SpringBoot系列教程JPA之基础环境搭建的方法

    JPA(Java Persistence API)Java持久化API,是 Java 持久化的标准规范,Hibernate是持久化规范的技术实现,而Spring Data JPA是在 Hibernate 基础上封装的一款框架.JPA作为标准,实际上并没有说局限于某个固定的数据源,事实上mysql,mongo, solr都是ok的.接下来我们将介绍下springboot结合jpa 来实现mysql的curd以及更加复杂一点的sql支持 jpa系列教程将包含以下几块 环境搭建 基础的插入.修改.删除

随机推荐