RabbitMQ消息有效期与死信的处理过程

目录
  • 一.前言
  • 二.设置消息有效期
    • 1.设置队列的有效期TTL
    • 2.设置队列的有效期Expire
    • 3.通过发送消息时设置有效期
  • 三.死信交换机DLX

一.前言

RabbitMQ的TTL全称为Time-To-Live,表示的是消息的有效期。消息如果在队列中一直没有被消费并且存在时间超过了TTL,消息就会变成了"死信" (Dead Message),后续无法再被消费了。如果不设置TTL,则表示此消息永久有效(默认消息是不会失效的)。如果将TTL设为0,则表示如果消息不能被立马消费则会被立即丢掉,这个特性可以部分替代RabbitMQ3.0以前支持的immediate参数,之所以所部分代替,是应为immediate参数在投递失败会有basic.return方法将消息体返回(这个功能可以利用死信队列来实现)。

设置TTL有两种方式:

  • 队列有效期:是声明队列的时候,在队列的属性中设置,这样该队列中的消息都会有相同的有效期
  • 消息有效期:发送消息时给消息设置属性,可以为每条消息都设置不同的TTL

如果两种方式都设置了,则以设置的较小的为准。

  • 区别:如果声明队列时设置了有效期,则消息过期了就会被删掉;如果是发消息时设置的有效期,消息过期了也不会被立马删掉,因为这时消息是否过期是在要投递给消费者时判断的。

二.设置消息有效期

1.设置队列的有效期TTL

定义队列的方法如下:

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;

该方法的arguments参数可以设置队列的属性,属性名为x-message-ttl,单位为毫秒。后台添加的话如下:

代码中设置如下:

Map<String, Object> arguments= new HashMap<String , Object>();
arguments.put("x-message-ttl " , 10000);//10秒钟  单位为毫秒
channel.queueDeclare(queueName , durable , exclusive , autoDelete , arguments) ;

命令行模式来设置:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":100000}' --apply-to queues

通过HTTP接口调用:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPUT -d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 100000}}'
http://ip:15672/api/queues/{vhost}/{queuename}

2.设置队列的有效期Expire

有效期Expire可以让队列在指定时间内 “未被使用” 的话会自动过期删除,未使用的意思是 queue 上没有任何 consumer,queue 没有被重新声明,并且在过期时间段内未调用过 basic.get 命令。该方式可用于,例如,RPC-style 的回复 queue, 其中许多queue 会被创建出来,但是却从未被使用。

服务器会确保在过期时间到达后 queue 被删除,但是不保证删除的动作有多么的及时。在服务器重启后,持久化的queue 的超时时间将重新计算。 x-expires 参数值以毫秒为单位,并且服从和 x-message-ttl 一样的约束条件,且不能设置为 0 。所以,如果该参数设置为 10000 ,则表示该 queue 如果在 10s之内未被使用则会被删除。

代码如下:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 10000);
channel.queueDeclare("queue", false, false, false, args);  

3.通过发送消息时设置有效期

发送消息的方法如下:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

在该方法的props参数可以设置其有效期:

       Map<String, Object> headers = new HashMap<String, Object>();
                        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                                .deliveryMode(2) // 消息持久
                                .contentEncoding("UTF-8") // 编码方式
                                .contentType("text/plain")
                                .expiration("100000")
                                .headers(headers)
                                .build();
      channel.basicPublish("", queueName, properties, message.getBytes());

通过HTTPAPI 接口设置:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPOST -d
'{"properties":{"expiration":"100000"},"routing_key":"routingkey","payload":"bodys","payload_encoding":"string"}'
http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish

三.死信交换机DLX

介绍

  • 死信队列:DLX,dead-letter-exchange
  • 利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX

消息变成死信几种情况

  • 消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
  • 消息过期
  • 队列达到最大长度

死信处理过程

  • DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
  • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
  • 可以监听这个队列中的消息做相应的处理。

用途

通过监控消费死信队列中消息,来观察和分析数据。
结合TTL实现延迟队列(比如下单超过多长时间自动关闭)

使用

代码如下:

channel.exchangeDeclare("dlx_exchange" , "direct"); //创建DLX: dlx_exchange
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange" , "dlx_exchange ");//设置死信交换机
args.put("x-dead-letter-routing-key" , "dlx-routing-key");//设置DLX的路由键(可以不设置)
channel.queueDeclare("myqueue" , false , false , false , args);

实例

public static void main(String[] args) throws Exception {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		//声明一个交换机,做死信交换机用
		channel.exchangeDeclare("dlx_exchange", "topic", true, false, null);
		//声明一个队列,做死信队列用
		channel.queueDeclare("dlx_queue", true, false, false, null);
		//队列绑定到交换机上
		channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*");

		channel.exchangeDeclare("normal_exchange", "fanout", true, false, null);
		Map<String, Object> arguments=new HashMap<String, Object>();
		arguments.put("x-message-ttl" , 1000);//设置消息有效期1秒,过期后变成私信消息,然后进入DLX
		arguments.put("x-dead-letter-exchange" , "dlx_exchange");//设置DLX
		arguments.put("x-dead-letter-routing-key" , "dlx.test");//设置DLX的路由键
		//为队列normal_queue 添加DLX
		channel.queueDeclare("normal_queue", true, false, false, arguments);
		channel.queueBind("normal_queue", "normal_exchange", "");
		channel.basicPublish("normal_exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, ("测试死信消息").getBytes());
		System.out.println("发送消息时间:"+ConnectionUtil.formatDate(new Date()));
		channel.close();
		connection.close();
	}

说明:

申明死信队列dlx_queue的绑定如下,与死信交换机dlx_exchange(topic类型)进行绑定,routing key为"dlx.*"
申明队列normal_queue,与交换机normal_exchange(fanout类型)进行绑定

执行流程:

  • 消息发送到交换机normal_exchange,然后路由到队列normal_queue上
  • 因为队列normal_queue没有消费者,消息过期后成为死信消息
  • 死信消息携带设置的x-dead-letter-routing-key=dlx.test进入到死信交换机dlx_exechage
  • dlx_exechage与dlx_queue绑定的routing key为"dlx.*",死信消息的路由键dlx.test符合该规则被路由到dlx.queue上面。

参考:

https://www.jianshu.com/p/986ee5eb78bc

https://blog.csdn.net/u012988901/article/details/88958654

到此这篇关于RabbitMQ之消息有效期与死信的文章就介绍到这了,更多相关RabbitMQ消息有效期内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Spring RabbitMQ死信机制原理实例详解

    死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因: 1.有消息被拒绝(basic.reject/ basic.nack)并且requeue=false 2.队列达到最大长度 3.消息TTL过期 采用死信机制的好处是可以提高系统的稳定性,当消息消费失败后,消息进入死信队列,可以对消息进行补偿,可以达到最终一致性的目标. 具体例子如下: @Bean public Queue deadQueue() { return new Queue(DEAD_QUEUE_NAME, tr

  • Springboot集成RabbitMQ死信队列的实现

    目录 关于死信队列 什么样的消息会进入死信队列? 场景分析 代码实现 场景模拟 生产者 消费者,设置死信队列监听 关于死信队列 在大多数的MQ中间件中,都有死信队列的概念.死信队列同其他的队列一样都是普通的队列.在RabbitMQ中并没有特定的"死信队列"类型,而是通过配置,将其实现. 当我们在创建一个业务的交换机和队列的时候,可以配置参数,指明另一个队列为当前队列的死信队列,在RabbitMQ中,死信队列(严格的说应该是死信交换机)被称为DLX Exchange.当消息"死

  • RabbitMQ死信机制实现延迟队列的实战

    目录 延迟队列 应用场景 Time To Live(TTL) Dead Letter Exchanges(DLX) 延迟队列 延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 应用场景 三方支付,扫码支付调用上游的扫码接口,当扫码有效期过后去调用查询接口查询结果.实现方式:每当一笔扫码支付请求后,立即将此订单号放入延迟队列中(RabbitMQ),队列过期时间为二维码有效期,此队列

  • SpringBoot集成RabbitMQ的方法(死信队列)

    介绍 死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因: 1.有消息被拒绝(basic.reject/ basic.nack)并且requeue=false 2.队列达到最大长度 3.消息TTL过期 场景 1.小时进入初始队列,等待30分钟后进入5分钟队列 2.消息等待5分钟后进入执行队列 3.执行失败后重新回到5分钟队列 4.失败5次后,消息进入2小时队列 5.消息等待2小时进入执行队列 6.失败5次后,将消息丢弃或做其他处理 使用 安装MQ 使用docker方式安装

  • RabbitMQ消息有效期与死信的处理过程

    目录 一.前言 二.设置消息有效期 1.设置队列的有效期TTL 2.设置队列的有效期Expire 3.通过发送消息时设置有效期 三.死信交换机DLX 一.前言 RabbitMQ的TTL全称为Time-To-Live,表示的是消息的有效期.消息如果在队列中一直没有被消费并且存在时间超过了TTL,消息就会变成了"死信" (Dead Message),后续无法再被消费了.如果不设置TTL,则表示此消息永久有效(默认消息是不会失效的).如果将TTL设为0,则表示如果消息不能被立马消费则会被立即

  • Java RabbitMQ消息队列详解常见问题

    目录 消息堆积 保证消息不丢失 死信队列 延迟队列 RabbitMQ消息幂等问题 RabbitMQ消息自动重试机制 合理的选择重试机制 消费者开启手动ack模式 rabbitMQ如何解决消息幂等问题 RabbitMQ解决分布式事务问题 基于RabbitMQ解决分布式事务的思路 消息堆积 消息堆积的产生场景: 生产者产生的消息速度大于消费者消费的速度.解决:增加消费者的数量或速度. 没有消费者进行消费的时候.解决:死信队列.设置消息有效期.相当于对我们的消息设置有效期,在规定的时间内如果没有消费的

  • SpringBoot整合RabbitMQ消息队列的完整步骤

    SpringBoot整合RabbitMQ 主要实现RabbitMQ以下三种消息队列: 简单消息队列(演示direct模式) 基于RabbitMQ特性的延时消息队列 基于RabbitMQ相关插件的延时消息队列 公共资源 1. 引入pom依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>

  • PHP实现RabbitMQ消息列队的示例代码

    目录 业务场景 1.首先部署好thinkphp6框架 2.安装workerman扩展 3.生产者 4.消费者 5.整体测试 业务场景 项目公司是主php做开发的,框架为thinkphp.众所周知,php本身的运行效率存在一定的缺陷,所以如果有一个很复杂很耗时的业务时,必须开发一个常驻内存的程序.首先我想到了php的workerman与swoole,但是这里应上面的标题哈,想将耗时任务交给另一个服务器,同时列队处理.所以这里我想独立部署一个rabbitMQ服务器用于处理列队任务. 当rabbitM

  • SpringBoot整合RabbitMQ实战教程附死信交换机

    目录 前言 环境 配置 配置文件 业务消费者 死信消费者 测试 前言 使用springboot,实现以下功能,有两个队列1.2,往里面发送消息,如果处理失败发生异常,可以重试3次,重试3次均失败,那么就将消息发送到死信队列进行统一处理,例如记录数据库.报警等完整demo项目代码https://gitee.com/daenmax/rabbit-mq-demo 环境 Windows10,IDEA,otp_win64_25.0,rabbitmq-server-3.10.41.双击C:\Program

  • Springboot 整合 RabbitMQ 消息队列 详情

    目录 生产者工程 POM依赖 application文件 生产者业务代码 测试 Direct 模式 业务代码 消费者 消息监听 Topic 模式 生产者 消费者 生产者工程 POM依赖 可以在创建工程时直接选择添加依赖. application文件 因为rabbitmq具有默认地址及用户信息,所以如果是本地rabbitmq可以不需要进行配置. RabbitMQ配置文件: 在使用相关交换机及队列时,我们需要实现声明交换机及队列,如果没有对应信息,则启动项目会失败.所以在使用springboot整合

  • RabbitMQ消息确认机制剖析

    目录 前言 消息确认 基本流程 消息确认模式 ConfirmCallback确认模式 ReturnCallback退回模式 消息发送者确认 消息接收者确认 basicAck模式 basicNack模式 basicReject模式 测试 解决办法 消费者确认失败 总结 前言 上一章讲解了RabbitMq的三种Exchange消息发送的模式,但是在默认情况下RabbitMQ并不能保证消息是否发送成功,以及是否能被成功消费,为了保证消息在传递过程中不丢失,需要对消息进行确认机制,来提高消息的可靠性.

  • 利用Python学习RabbitMQ消息队列

    RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱.邮局.投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处. RabbitMQ和邮局的主要区别就是RabbitMQ接收.存储和发送的是二进制数据----消息. rabbitmq基本管理命令: 一步启动Erlang node和Rabbit应用:sudo rabbitmq-serv

  • 使用PHP访问RabbitMQ消息队列的方法示例

    本文实例讲述了使用PHP访问RabbitMQ消息队列的方法.分享给大家供大家参考,具体如下: 扩展安装 PHP访问RabbitMQ实际使用的是AMQP协议,所以我们只要安装epel库中的php-pecl-amqp这个包即可 rpm -ivh http://mirror.neu.edu.cn/fedora/epel/6/x86_64/epel-release-6-8.noarch.rpm yum install php-pecl-amqp 交换建立 <?php $connection = new

  • 关于golang监听rabbitmq消息队列任务断线自动重连接的问题

    golang监听消息队列rabbitmq任务脚本,当rabbimq消息队列断开连接后自动重试,重新唤起协程执行任务 需求背景: goalng常驻内存任务脚本监听rbmq执行任务 任务脚本由supervisor来管理 当rabbitmq长时间断开连接会出现如下图 进程处于fatal状态 假如因为不可抗拒因素,rabbitmq服务器内存满了或者其它原因导致rabbitmq消息队列服务停止了 如果是短时间的停止重启,supervisor是可以即时唤醒该程序.如果服务器长时间没有恢复正常运行,程序就会出

随机推荐