Java RabbitMQ的TTL和DLX全面精解

目录
  • RabbitMQ的TTL
    • 1、TTL概述
    • 2、设置消息有效期
      • 2.1、通过队列设置有效期
      • 2.2、通过发送消息时设置有效期
    • 3、设置队列有效期(不常用,仅作了解)
  • RabbitMQ的DLX
    • 1、DLX是什么
    • 2、DLX有什么用
    • 3、DLX使用方式

本节继续介绍RabbitMQ的高级特性:TTL(Time-To-Live消息有效期)和DLX(Dead-Letter-Exchange死信交换机、死信队列)

RabbitMQ的TTL

1、TTL概述

RabbitMQ的TTL全称为Time-To-Live,表示的是消息的有效期。消息如果在队列中一直没有被消费并且存在时间超过了TTL,消息就会变成了"死信" (Dead Message),后续无法再被消费了。设置TTL有两种方式:

  1. 第一种是声明队列的时候,在队列的属性中设置,这样该队列中的消息都会有相同的有效期;
  2. 第二种是发送消息时给消息设置属性,可以为每条消息都设置不同的TTL。

如果两种方式都设置了,则以设置的较小的为准。两者的区别:如果声明队列时设置了有效期,则消息过期了就会被删掉;如果是发消息时设置的有效期,消息过期了也不会被立马删掉,因为这时消息是否过期是在要投递给消费者时判断的。至于为啥要这样处理很容易想清楚:第一种方式队列的消息有效期都一样,先入队的在队列头部,头部也是最早要过期的消息,RabbitMQ起一个定时任务从队列的头部开始扫描是否有过期消息即可;第二种方式每条消息的过期时间不同,所以只有遍历整个队列才可以筛选出来过期的消息,这样效率太低了,而且消息量大了之后根本不可行的,可以等到消息要投递给消费者时再判断删除,虽然删除的不够及时但是不影响功能,其实就是用空间换时间。

如果不设置TTL,则表示此消息永久有效(默认消息是不会失效的)。如果将TTL设为0,则表示如果消息不能被立马消费则会被立即丢掉,这个特性可以部分替代RabbitMQ3.0以前支持的immediate参数,之所以所部分代替,是应为immediate参数在投递失败会有basic.return方法将消息体返回(这个功能可以利用死信队列来实现)。

2、设置消息有效期

2.1、通过队列设置有效期

还记得我们之前声明队列的方法吗,queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments),该方法的最后一个参数可以设置队列的属性,属性名为x-message-ttl,单位为毫秒。如果不清楚队列属性有哪些,可以查看web控制台的添加队列的地方。

具体代码如下:

//设置队列上所有的消息的有效期,单位为毫秒
Map<String, Object> argss = new HashMap<String , Object>();
arguments.put("x-message-ttl " , 5000);//5秒钟
channel.queueDeclare(queueName , durable , exclusive , autoDelete , arguments) ;

查看控制台的队列列表如下:D表示持久化,TTL表示设置了消息的有效期。

过了几秒钟后发现消息已经不存在了。

也可以用RabbitMQ的命令行模式来设置:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues

还可以通过HTTP接口调用:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPUT -d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 60000}}'
http://ip:15672/api/queues/{vhost}/{queuename}

2.2、通过发送消息时设置有效期

发送消息时basicPublish方法可以设置属性参数,里面通过expiration属性设置消息有效期,单位为毫秒,代码如下所示

Builder bd = new AMQP.BasicProperties().builder();
bd.deliveryMode(2);//持久化
bd.expiration("100000");//设置消息有效期100秒钟
BasicProperties pros = bd.build();
String message = "测试ttl消息";
channel.basicPublish(EXCHANGE_NAME, "error", true,false, pros, message.getBytes());

另外也可以通过HTTPAPI 接口设置:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPOST -d
'{"properties":{"expiration":"60000"},"routing_key":"routingkey","payload":"my body","payload_encoding":"string"}'
http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish

完整的通过队列设置消息有效期、发布消息时通过属性设置有效期的代码如下:可以运行后,观察下控制台,可以发现同时设置时,消息的有效期是以较小的为准的。项目GitHub地址 https://github.com/RookieMember/RabbitMQ-Learning.git

package cn.wkp.rabbitmq.newest.ttl;

import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import cn.wkp.rabbitmq.util.ConnectionUtil;

/**
 *
 * @ClassName: Send
 * @Description: 消息有效期
 * @author wkg
 * @date: 2019年4月1日 下午11:28:22
 */
public class Send {

	private final static String EXCHANGE_NAME = "ttl_exchange";
	private final static String QUEUE_NAME = "ttl_queue";

	public static void main(String[] argv) throws Exception {
		// 获取到连接以及mq通道
		Connection connection = ConnectionUtil.getConnection();
		// 从连接中创建通道
		Channel channel = connection.createChannel();

		// 声明交换机
		channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);

		//*****1:通过队列设置有效期 2:通过消息属性设置有效期,如果都设置了以较小的为准*****
		//声明队列
		Map<String, Object> arguments=new HashMap<String,Object>();
		//设置队列上所有的消息的有效期,单位为毫秒
		arguments.put("x-message-ttl", 5000);//5秒钟
		channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
		//绑定
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");

		Builder bd = new AMQP.BasicProperties().builder();
		bd.deliveryMode(2);//持久化
		bd.expiration("100000");//设置消息有效期100秒钟
		BasicProperties pros = bd.build();
		String message = "测试ttl消息";
		channel.basicPublish(EXCHANGE_NAME, "error", true,false, pros, message.getBytes());
		System.out.println("Sent message:" + message);
//		 关闭通道和连接
		channel.close();
		connection.close();
	}
}

3、设置队列有效期(不常用,仅作了解)

上面在web管控台添加队列的时候,我们看到有一个x-expires参数,可以让队列在指定时间内 "未被使用" 的话会自动过期删除,未使用的意思是 queue 上没有任何 consumer,queue 没有被重新声明,并且在过期时间段内未调用过 basic.get 命令。该方式可用于,例如,RPC-style 的回复 queue, 其中许多queue 会被创建出来,但是却从未被使用。

服务器会确保在过期时间到达后 queue 被删除,但是不保证删除的动作有多么的及时。在服务器重启后,持久化的queue 的超时时间将重新计算。 x-expires 参数值以毫秒为单位,并且服从和 x-message-ttl 一样的约束条件,且不能设置为 0 。所以,如果该参数设置为 1000 ,则表示该 queue 如果在 1s之内未被使用则会被删除。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 18000);  //队列有效期18秒
channel.queueDeclare("myqueue", false, false, false, args);  

RabbitMQ的DLX

1、DLX是什么

DLX是Dead-Letter-Exchange的简写,意思是死信交换机。

它的作用其实是用来接收死信消息(dead message)的。那什么是死信消息呢?一般消息变成死信消息有如下几种情况:

  • 消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
  • 消息过期
  • 队列达到最大长度

当消息在一个队列中变成了死信消息后,可以被发送到另一个交换机,这个交换机就是DLX,绑定DLX的队列成为死信队列。当这个队列中存在死信时, RabbitMQ 就会立即自动地将这个消息重新发布到设置的DLX 上去,进而被路由到绑定该DLX的死信队列上。可以监听这个队列中的消息、以进行相应的处理,这个特性与将消息的TTL 设置为0 配合使用可以弥补imrnediate 参数的功能。

2、DLX有什么用

因为消息如果未被正常消费并设置了requeue为false时会进入死信队列,我们可以监控消费死信队列中消息,来观察和分析系统的问题。DLX还有一个非常重要的作用,就是结合TTL实现延迟队列(延迟队列的使用范围还是挺广的:比如下单超过多长时间自动关闭;比如我们接入过第三方支付系统的同学一定知道,我们的订单中会传一个notify_url用于接收支付结果知,如果我们给第三方支付响应的不是成功的消息,其会隔一段时间继续调用通知我们的notify_url,超过几次后不再进行通知,一般通知频率都是 0秒-5秒-30秒-5分钟-30分钟-1小时-6小时-12小时;比如我们的家用电器定时关机。。。。。。这些场景都是可以用延迟队列实现的)。

3、DLX使用方式

下面在web管控台添加队列的时候,我们看到有两个DLX相关的参数:x-dead-letter-exchange和x-dead-letter-routing-key。x-dead-letter-exchange是设置队列的DLX的;x-dead-letter-routing-key是设置死信消息进入DLX时的routing key的,这个是可以不设置的,如果不设置,则默认使用原队列的routing key。

客户端可以通过channel.queueDeclare方法声明队列时设置x-dead-letter-exchange参数,具体代码如下所示

channel.exchangeDeclare("dlx_exchange" , "direct"); //创建DLX: dlx_exchange
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange" , "dlx_exchange ");//设置DLX
args.put("x-dead-letter-routing-key" , "dlx-routing-key");//设置DLX的路由键(可以不设置)
//为队列myqueue 添加DLX
channel.queueDeclare("myqueue" , false , false , false , args);

上面说的可能比较抽象,下面我们通过一个具体的例子,来演示一下DLX的具体使用:

package cn.wkp.rabbitmq.newest.dlx;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;

import cn.wkp.rabbitmq.util.ConnectionUtil;

public class SendDLX {

	public static void main(String[] args) throws Exception {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		//声明一个交换机,做死信交换机用
		channel.exchangeDeclare("dlx_exchange", "topic", true, false, null);
		//声明一个队列,做死信队列用
		channel.queueDeclare("dlx_queue", true, false, false, null);
		//队列绑定到交换机上
		channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*");

		channel.exchangeDeclare("normal_exchange", "fanout", true, false, null);
		Map<String, Object> arguments=new HashMap<String, Object>();
		arguments.put("x-message-ttl" , 5000);//设置消息有效期1秒,过期后变成私信消息,然后进入DLX
		arguments.put("x-dead-letter-exchange" , "dlx_exchange");//设置DLX
		arguments.put("x-dead-letter-routing-key" , "dlx.test");//设置DLX的路由键(可以不设置)
		//为队列normal_queue 添加DLX
		channel.queueDeclare("normal_queue", true, false, false, arguments);
		channel.queueBind("normal_queue", "normal_exchange", "");

		channel.basicPublish("normal_exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, ("测试死信消息").getBytes());
		System.out.println("发送消息时间:"+ConnectionUtil.formatDate(new Date()));

		channel.close();
		connection.close();
	}
}

上面是发送者的代码,运行后观察控制台可以看到如下所示:

死信队列dlx_queue的绑定如下,其已与死信交换机dlx_exchange(topic类型)进行了绑定,routing key为"dlx.*"

队列normal_queue的绑定如下,其已与交换机normal_exchange(fanout类型)进行了绑定

queues视图如下:DLX和DLK表示设置给normal_queue设置了死信交换机和死信消息的routing key,我们看到消息已经被路由到了死信队列上面。整个流程为:

  • 消息发送到交换机normal_exchange,然后路由到队列normal_queue上
  • 因为队列normal_queue没有消费者,消息过期后成为死信消息
  • 死信消息携带设置的x-dead-letter-routing-key=dlx.test进入到死信交换机dlx_exechage
  • dlx_exechage与dlx_queue绑定的routing key为"dlx.*",死信消息的路由键dlx.test符合该规则被路由到dlx.queue上面。

然后我们给死信队列添加消费者如下:我们测试一下死信消息进入DLX的时间,先将之前的那个死信消息删除

package cn.wkp.rabbitmq.newest.dlx;

import java.io.IOException;
import java.util.Date;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import cn.wkp.rabbitmq.util.ConnectionUtil;

public class RecvDLX {

	public static void main(String[] argv) throws Exception {
		Connection connection = ConnectionUtil.getConnection();
		final Channel channel = connection.createChannel();

		channel.exchangeDeclare("dlx_exchange", "topic", true, false, null);
		channel.queueDeclare("dlx_queue", true, false, false, null);
		channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*");

		// 指该消费者在接收到队列里的消息但没有返回确认结果之前,它不会将新的消息分发给它。
		channel.basicQos(1);

		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				System.out.println("消费者收到消息:" + new String(body)+",当前时间:"+ConnectionUtil.formatDate(new Date()));
				// 消费者手动发送ack应答
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		System.out.println("消费死信队列中的消息======================");
		// 监听队列
		channel.basicConsume("dlx_queue", false, consumer);
	}
}

运行结果如下(先运行的死信队列消费者,然后运行生产者):我们看到消息过期后10毫秒就被死信队列的消费者消费到了,显然,消息成为死信后是立即被发送到了DLX中。

消费死信队列中的消息======================
消费者收到消息:测试死信消息,当前时间:2019-04-13 16:30:05:740

发送消息时间:2019-04-13 16:30:00:730

关于RabbitMQ的TTL和DLX就先介绍到这里,下一节会继续介绍RabbitMQ的高级特性:RabbitMQ的延迟队列。

参考 朱忠华《RabbitMQ实战指南》

到此这篇关于Java RabbitMQ的TTL和DLX全面精解的文章就介绍到这了,更多相关Java RabbitMQ TTL DLX内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Springboot集成RabbitMQ死信队列的实现

    目录 关于死信队列 什么样的消息会进入死信队列? 场景分析 代码实现 场景模拟 生产者 消费者,设置死信队列监听 关于死信队列 在大多数的MQ中间件中,都有死信队列的概念.死信队列同其他的队列一样都是普通的队列.在RabbitMQ中并没有特定的"死信队列"类型,而是通过配置,将其实现. 当我们在创建一个业务的交换机和队列的时候,可以配置参数,指明另一个队列为当前队列的死信队列,在RabbitMQ中,死信队列(严格的说应该是死信交换机)被称为DLX Exchange.当消息"死

  • SpringBoot停止启动时测试检查rabbitmq操作

    目录 SpringBoot停止启动时测试检查rabbitmq 问题 解决 RabbitMQ的简单使用的Demo 1.声明 2.创建一个测试账户 3.pom依赖 5.创建入口类 6.测试 7.总结 SpringBoot停止启动时测试检查rabbitmq 问题 在Springboot项目中配置rabbitmq后,总是在每次启动时自动测试MQ的连接,如果测试不通过,就一直重连,导致项目无法正常启动.自己在开发与MQ无关的功能时,无法正常进行,十分耽误时间.如下所示: org.springframewo

  • 一篇文章教你将JAVA的RabbitMQz与SpringBoot整合

    目录 一.fanout:发布订阅型 二.direct:直连型 三.topic:通配符模式 四.消费者端接收消息 总结 本文主要聊SpringBoot整合RabbitMQ,主要分为生产者和消费者两个工程,目录结构如下: 先简单说一下RabbitMQ的一些核心概念: 1.虚拟主机vhost:vhost是物理隔离的,你可以将vhost看作是一个个小型的RabbitMQ 2.交换机exchange:生产者发送的消息不是直接到达队列的,而是交换机,然后交换机再根据路由key,路由到指定的队列,可以理解为一

  • 实战干货之基于SpringBoot的RabbitMQ多种模式队列

    目录 环境准备 安装RabbitMQ 依赖 连接配置 五种队列模式实现 1 点对点的队列 2 工作队列模式Work Queue 3 路由模式Routing 4 发布/订阅模式Publish/Subscribe 5 通配符模式Topics 总结 环境准备 安装RabbitMQ 由于RabbitMQ的安装比较简单,这里不再赘述.可自行到官网下载http://www.rabbitmq.com/download.html 依赖 SpringBoot项目导入依赖 <dependency> <gro

  • 手把手带你掌握SpringBoot RabbitMQ延迟队列

    目录 1. 简介 2. 安装插件 3. 实现延迟队列 3.1 引入所需依赖 3.2 application.yaml 3.3 RabbitConfig 3.4 Producer 3.5 Consumer 3.6 测试代码 3.7 启动测试 1. 简介 我们在上一篇博文中遗留了一个小问题,就是虽然TTL + DLX能实现延迟队列的功能,但是有两个问题. 首先业务场景为:比如海底捞预约,每个人预约的时间段不一致,有个可能一个小时后,有的可能三个小时等,当快到预约时间点需要给用户进行短信通知. 通过给

  • Java RabbitMQ的TTL和DLX全面精解

    目录 RabbitMQ的TTL 1.TTL概述 2.设置消息有效期 2.1.通过队列设置有效期 2.2.通过发送消息时设置有效期 3.设置队列有效期(不常用,仅作了解) RabbitMQ的DLX 1.DLX是什么 2.DLX有什么用 3.DLX使用方式 本节继续介绍RabbitMQ的高级特性:TTL(Time-To-Live消息有效期)和DLX(Dead-Letter-Exchange死信交换机.死信队列) RabbitMQ的TTL 1.TTL概述 RabbitMQ的TTL全称为Time-To-

  • Java数据结构之顺序表和链表精解

    目录 前言 1. 顺序表 代码实现 2. 链表 链表图解 代码实现 前言 两个数据结构:顺序表和链表 数据结构是一门学科,和语言无关. 数据 + 结构:一种描述和组织数据的方式. 1. 顺序表 顺序表是用一段物理地址连续的存储单元依次存储数据元素的线性结构,一般情况下采用数组存储.在数组上完成数据的增删查改.其逻辑上和物理上都是连续的. 问题引入:一个数组放在这,我们如何才能自己不去数,让程序自己进行计数? 答:在引入变量,每次放一个元素就更新一次.(如下图,为问题的示意) 也就是说顺序表的底层

  • Java RabbitMQ的持久化和发布确认详解

    目录 1.持久化 1.1实现持久化 1.2不公平分发 1.3测试不公平分发 1.4预取值 1.4.1代码测试 2.发布确认 2.1单个确认发布 2.2批量确认发布 2.3异步确认发布 2.4处理未确认的消息 总结 1. 持久化 当RabbitMQ服务停掉以后消息生产者发送过的消息不丢失.默认情况下RabbitMQ退出或者崩溃时,会忽视掉队列和消息.为了保证消息不丢失需要将队列和消息都标记为持久化. 1.1 实现持久化 1.队列持久化:在创建队列时将channel.queueDeclare();第

  • Java RabbitMQ的工作队列与消息应答详解

    目录 WorkQueues 1.轮询分发消息 1.1抽取工具类 1.2编写两个工作线程 1.3编写生产者 1.4运行测试 1.5异常情况 2.消息应答 2.1自动应答 2.2手动应答 2.3消息自动重新入队 2.4手动应答测试 2.4.1生产者代码 2.4.2消费者代码 2.4.3测试 总结 Work Queues 工作队列(任务队列)主要思想是避免立即执行资源密集型任务,而不得不等待它完成,相反我们安排任务在之后执行.我们把任务封装为消息并将其发送到队列.在后台运行的工作进程将弹出任务并最终执

  • Java RabbitMQ 中的消息长期不消费会过期吗

    目录 1. 默认情况 2. TTL 2.1 单条消息过期 2.2 队列消息过期 2.3 特殊情况 3. 死信队列 3.1 死信交换机 3.2 死信队列 3.3 实践 4. 小结 RabbitMQ 中的消息长期未被消费会过期吗?用过 RabbitMQ 的小伙伴可能都有这样的疑问,今天松哥就来和大家捋一捋这个问题. 1. 默认情况 首先我们来看看默认情况. 默认情况下,消息是不会过期的,也就是我们平日里在消息发送时,如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直

  • Java RabbitMQ高级特性详细分析

    目录 消息的可靠投递 确认模式 退回模式 Consumer Ack 消费端限流 TTL(Time To Live) 设置某个队列为过期队列 设置单独某个消息过期 死信队列 延迟队列 消息的可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 return 退回模式 rabbitmq整个消息投递的路径为: producer—>rabbitmq broker—>

  • Springboot整合RabbitMq测试TTL的方法详解

    目录 什么是TTL? 如何设置TTL? 设定整个队列的过期时间 配置类编写 测试 配置 测试 总结 代码下载 什么是TTL? 在RabbitMq中,存在一种高级特性 TTL. TTL即Time To Live的缩写,含义为存活时间或者过期时间.即: 设定消息在队列中存活的时间.当指定时间内,消息依旧未被消费,则由队列自动将其删除. 如何设置TTL? 既然涉及到设定消息的存活时间,在RabbitMq中,存在两种设置方式: 设置整个队列的过期时间. 设置单个消息的过期时间. 设定整个队列的过期时间

  • Java RabbitMQ消息队列详解常见问题

    目录 消息堆积 保证消息不丢失 死信队列 延迟队列 RabbitMQ消息幂等问题 RabbitMQ消息自动重试机制 合理的选择重试机制 消费者开启手动ack模式 rabbitMQ如何解决消息幂等问题 RabbitMQ解决分布式事务问题 基于RabbitMQ解决分布式事务的思路 消息堆积 消息堆积的产生场景: 生产者产生的消息速度大于消费者消费的速度.解决:增加消费者的数量或速度. 没有消费者进行消费的时候.解决:死信队列.设置消息有效期.相当于对我们的消息设置有效期,在规定的时间内如果没有消费的

  • Java学习笔记之面向对象编程精解

    目录 包 继承 super 关键字 final protected关键字 组合 多态 抽象类 接口 面向对象编程(包.继承.组合.多态 .抽象类.接口) 包 包其实就是类的集合,其本质上其实就是一个文件夹,里面存放的. java 文件里面有我们写的代码,并且包的存在还可以有效解决在实际开发过程中程序员. java文件命名重叠问题,其实就是,一个包里面是能有两个名字相同的.java文件,但是如果是不同的包下就允许这样的情况存在. 我们创建一个包只需要在 src 目录底下去创建就行了,公司里面给报的

  • Java基础之toString的序列化 匿名对象 复杂度精解

    目录 序列化 匿名对象 复杂度 时间复杂度 大O的渐进表示法 时间复杂度的分类 计算时间 复杂度的方法 空间复杂度 toString的序列化.匿名对象.复杂度 序列化 toString 方法的原理就是序列化,他可以帮助我们讲一个抽象的对象变得具体,譬如把对象里面的名字.年龄.身高等信息具象为字符串.(总之,序列化:将对象转化为字符串:反序列化:将字符串转化为对象). 匿名对象 匿名对象适用于只想使用一次的情况,因为匿名对象是没有引用的,每次用都要重新new 一遍对象,很麻烦. class Per

随机推荐