关于利用RabbitMQ实现延迟任务的方法详解

开发过程中通常会碰到这样的需求:

  • 淘宝订单业务:下单后 30min 之内没有付款,就自动取消订单。
  • 饿了吗订餐通知:下单成功后 60s 之后给用户发送短信通知。
  • 关闭空闲连接:服务器中有很多客户端的连接,空闲一段时间之后需要关闭之。
  • 缓存:缓存中的对象,超过了空闲时间,从缓存中移出。
  • 任务超时处理:在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
  • 失败重试机制:业务操作失败后,间隔一定的时间进行失败重试。

这类业务的特点就是:需要延迟工作,需要进行失败重试。一种比较笨的方式是使用一个后台线程,遍历所有对象,挨个检查。这种方法简单好用,但是对象数量过多时,可能存在性能问题,检查间隔时间不好设置,间隔时间过大,影响精确度,过小则存在效率问题,而且做不到按超时的时间顺序处理。

再比如常见的场景:

场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。

场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。

上述类似的需求是我们经常会遇见的问题。最常用的方法是定期轮训数据库,设置状态。在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源。当面对千万级、上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了。除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮等方式。但如果系统的架构中本身就有RabbitMQ的话,那么选择RabbitMQ来实现类似的功能也是一种选择。

使用RabbitMQ来实现延迟任务必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,通过这两者的组合来实现上述需求。

消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串:

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

当上面的消息扔到队列中后,过了60秒,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去。单靠死信还不能实现延迟任务,还要靠Dead Letter Exchange。

Dead Letter Exchanges

Exchage的概念在这里就不在赘述,一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

2. 上面的消息的TTL到了,消息过期了。

3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

实现延迟队列

延迟任务通过消息的TTL和Dead Letter Exchange来实现。我们需要建立2个队列,一个用于发送消息,一个用于消息过期后的转发目标队列。

生产者输出消息到Queue1,并且这个消息是设置有有效时间的,比如60s。消息会在Queue1中等待60s,如果没有消费者收掉的话,它就是被转发到Queue2,Queue2有消费者,收到,处理延迟任务。

具体实现步骤如下:

第一步, 首先需要创建2个队列。Queue1和Queue2。Queue1是一个消息缓冲队列,在这个队列里面实现消息的过期转发。如下图,设置Dead letter exchange和Dead letter routing key。设置这两个属性就是当消息在这个队列中expire后,采用哪个路由发送。这个dlx的exchange需要事先创建好,就是一个普通的exchange。由于我们还需要向Queue1发送消息,那么还需要创建一个exchange,并且和Queue1绑定。例子中,exchange同样取名:queue1。

我们还需要建一个Queue2,这个队列用于消息在Queue1中过期后转发的目标队列。所以这个Queue2队列建好以后,需要绑定Queue1设置的死信路由:dlx。完成Queue2的绑定以后,环境就搭建完成了。

第二步,实现消息的Producer。由于我们的目的是让进入Queue1的消息过期,然后自动转送到Queue2中,所以发送的时候,需要设置过期时间。

ConnectionFactory factory = new ConnectionFactory();
   factory.setUsername("bsp");
   factory.setPassword("123456");
   factory.setVirtualHost("/");
   factory.setHost("10.23.22.42");
   factory.setPort(5672);
   conn = factory.newConnection();
   channel = conn.createChannel();
   byte[] messageBodyBytes = "Hello, world!".getBytes();
   byte i = 10;
   while (i-- > 0) {
    channel.basicPublish("queue1", "queue1", new AMQP.BasicProperties.Builder().expiration(String.valueOf(i * 1000)).build(),
      new byte[] { i });
   }

上面的代码我模拟了1-10号消息,消息的内容里面是1-10。过期的时间是10-1秒。这里要注意,虽然10是第一个发送,但是它过期的时间最长。

第三步,实现消息的Consumer。Consumer就是延迟任务的具体实施者。由于具体的任务往往是一个比较耗时的任务,所以一般来说,任务一般在异步线程中执行。

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("bsp");
factory.setPassword("123456");
factory.setVirtualHost("/");
factory.setHost("10.23.22.42");
factory.setPort(5672);
conn = factory.newConnection();
channel = conn.createChannel();
channel.basicConsume("queue2", true, "consumer", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
//do some work async
System.out.println(body[0]);
}
});

运行后如上面的程序,过了10s以后,消费者开始收到数据,但是它是一次性收到如下结果:

10、9 、8 、7 、6、5 、4 、3 、2 、1

Consumer第一个收到的还是10。虽然10是第一个放进队列,但是它的过期时间最长。所以由此可见,即使一个消息比在同一队列中的其他消息提前过期,提前过期的也不会优先进入死信队列,它们还是按照入库的顺序让消费者消费。如果第一进去的消息过期时间是1小时,那么死信队列的消费者也许等1小时才能收到第一个消息。参考官方文档发现“Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).”只有当过期的消息到了队列的顶端(队首),才会被真正的丢弃或者进入死信队列。

所以在考虑使用RabbitMQ来实现延迟任务队列的时候,需要确保业务上每个任务的延迟时间是一致的。如果遇到不同的任务类型需要不同的延时的话,需要为每一种不同延迟时间的消息建立单独的消息队列。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

(0)

相关推荐

  • C#实现rabbitmq 延迟队列功能实例代码

    最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉.当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭,但是在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源.当面对千万级.上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了.除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮等方式.但如果系统的架构中本身就

  • Spring Boot与RabbitMQ结合实现延迟队列的示例

    背景 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列.而一般的队列,消息一旦入队了之后就会被消费者马上消费. 场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理.这是就可以使用延时队列将订单信息发送到延时队列. 场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作.这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到只能设备. 延迟队列能做什么? 延迟队列多用于需

  • C#操作RabbitMQ的完整实例

    一.下载RabbitMQ http://www.rabbitmq.com/install-windows.html 二.下载OTP http://www.erlang.org/downloads 三.安装OTP.RabbitMQ 四.配置RabbitMQ 找到bat的目录 执行相关命令 1.添加用户密码 rabbitmqctl add_user wenli wenli 2.设置wenli为管理员rabbitmqctl set_user_tags wenli administrator 3.启动R

  • 关于利用RabbitMQ实现延迟任务的方法详解

    开发过程中通常会碰到这样的需求: 淘宝订单业务:下单后 30min 之内没有付款,就自动取消订单. 饿了吗订餐通知:下单成功后 60s 之后给用户发送短信通知. 关闭空闲连接:服务器中有很多客户端的连接,空闲一段时间之后需要关闭之. 缓存:缓存中的对象,超过了空闲时间,从缓存中移出. 任务超时处理:在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求. 失败重试机制:业务操作失败后,间隔一定的时间进行失败重试. 这类业务的特点就是:需要延迟工作,需要进行失败重试.一种比较笨的方式是使用一个后

  • 对DataFrame数据中的重复行,利用groupby累加合并的方法详解

    pandas读取一组数据,可能存在重复索引,虽然可以利用drop_duplicate直接删除,但是会删除重要信息. 比如同一ID用户,多次登录学习时间.要计算该用户总共''学习时间'',就要把重复的ID的''学习时间''累加. 可以结合groupby和sum函数完成该操作. 实例如下: 新建一个DataFrame,计算每个 id 的总共学习时间.其中 id 为one/two的存在重复学习时间.先利用 groupby 按照键 id 分组,然后利用sum()函数求和,即可得到每个id的总共学习时间.

  • 利用Python实现面部识别的方法详解

    人脸识别正在成为软件开发中的一种趋势.它有助于识别人脸并使应用程序更加健壮.在本教程中,我们将使用python和face_recognition库创建一个简单的人脸识别. 对于开发环境,我们将使用 Visual Studio Community Edition. 如果你的计算机上还没有安装它,你可以从这里下载.并使用 C++安装桌面开发. 现在我们有了使用 C++ 进行桌面开发的 Visual Studio,我们可以开始我们的项目了. 使用 Visual Studio 打开一个新目录并创建一个新

  • 利用JavaScript构建树形图的方法详解

    目录 什么是树形图 浏览JS树形图 创建一个基本的JS树形图 1. 创建一个HTML页面 2. 参考JavaScript文件 3.设置数据 4. 编写一些JS树形图代码 自定义JS树形图 A. 改变颜色 B. 应用线性色标 C. 格式化标签和工具提示 D. 按升序排列图块 树形图可视化广泛用于分层数据分析.如果你没有经验还想创建一个,那将会有些复杂.下面是一个详细教程,教你如何使用JavaScript创建交互式树形图. 宇宙中只有我们吗?我们每个人都曾在某个时候问过自己这个问题.当我们在考虑地球

  • Spring利用注解整合Mybatis的方法详解

    目录 一.环境准备 步骤1:数据库相关 步骤2:导入jar包 步骤3:创建模型类 步骤4:创建Dao接口和实现类 步骤5:创建Service接口和实现类 步骤6:添加jdbc.properties文件 步骤7:添加Mybatis核心配置文件 步骤8:编写测试程序 二.整合思路分析 三.整合步骤 步骤1:导入整合jar包 步骤2:创建数据源配置类 步骤3:创建Mybatis配置类 步骤4:创建Spring主配置类 步骤5:编写运行程序 一.环境准备 步骤1:数据库相关 建库建表 创建spring_

  • Python利用Pandas进行数据分析的方法详解

    目录 Series 代码 #1 代码 #2 代码#3 代码 #4 数据框 代码 #1 代码 #2 代码 #3 代码 #4 Pandas是最流行的用于数据分析的 Python 库.它提供高度优化的性能,后端源代码完全用C或Python编写. 我们可以通过以下方式分析 pandas 中的数据: 1.Series 2.数据帧 Series Series 是 pandas 中定义的一维(1-D)数组,可用于存储任何数据类型. 代码 #1 创建 Series # 创建 Series 的程序 # 导入 Pa

  • Springboot整合RabbitMq测试TTL的方法详解

    目录 什么是TTL? 如何设置TTL? 设定整个队列的过期时间 配置类编写 测试 配置 测试 总结 代码下载 什么是TTL? 在RabbitMq中,存在一种高级特性 TTL. TTL即Time To Live的缩写,含义为存活时间或者过期时间.即: 设定消息在队列中存活的时间.当指定时间内,消息依旧未被消费,则由队列自动将其删除. 如何设置TTL? 既然涉及到设定消息的存活时间,在RabbitMq中,存在两种设置方式: 设置整个队列的过期时间. 设置单个消息的过期时间. 设定整个队列的过期时间

  • 利用yum安装Redis的方法详解

    介绍 Redis是一个key-value存储系统.和Memcached类似,它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合).zset(sorted set –有序集合)和hash(哈希类型).这些数据类型都支持push/pop.add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的.在此基础上,redis支持各种不同方式的排序.与memcached一样,为了保证效率,数据都是缓存在内存中.区别的是redis会周期性的把更新

  • Golang利用自定义模板发送邮件的方法详解

    前言 在几周前,我开始工作于一个证券投资组合网站.虽然我只能使用 React 完成整个网站,但我决定使用 Go 来创建一个可以处理某些任务(例如发送 email)的 API 服务器,相信这是一个很好的做法. 我其中的一个页面是一个 contact 页面,目前看起来像这样: contact me 我想使用专门为此 contact 表单申请的 Gmail 帐户发送一封邮件.除了我以前用过 Javascript 发送电子邮件的事实,我没有特别选择 Go.但为什么不尝试 Go 呢?我觉得 Go 很棒.

  • Three.js利用顶点绘制立方体的方法详解

    前言 之前我们在学些WebGL基础的时候每天都是在一直研究顶点位置,法向量,绘制下标什么的.虽然复杂,但是毕竟原生,性能没得说. three.js也给我们提供了相关的接口供我们使用原生的方法绘制模型,下面话不多说了,来一起看看详细的介绍吧. 下面是我的个人一个案例. 首先,我创建了一个空白的形状: //立方体 var cubeGeometry = new THREE.Geometry(); 立方体的形状如下: // 创建一个立方体 // v6----- v5 // /| /| // v1----

随机推荐