Rabbitmq延迟队列实现定时任务的方法

场景

开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期、订单定时关闭、微信支付2小时未支付关闭订单等等,都需要用到定时任务,但是定时任务本身有一个问题,一般来说我们都是通过定时轮询查询数据库来判断是否有任务需要执行,也就是说不管怎么样,我们需要先查询数据库,而且有些任务对时间准确要求比较高的,需要每秒查询一次,对于系统小倒是无所谓,如果系统本身就大而且数据也多的情况下,这就不大现实了,所以需要其他方式的,当然实现的方式有多种多样的,比如Redis实现定时队列、基于优先级队列的JDK延迟队列、时间轮等。因为我们项目中本身就使用到了Rabbitmq,所以基于方便开发和维护的原则,我们使用了Rabbitmq延迟队列来实现定时任务,不知道rabbitmq是什么的和不知道springboot怎么集成Rabbitmq的可以查看我之前的文章 Spring boot集成RabbitMQ

Rabbitmq延迟队列

Rabbitmq本身是没有延迟队列的,只能通过Rabbitmq本身队列的特性来实现,想要Rabbitmq实现延迟队列,需要使用Rabbitmq的死信交换机(Exchange)和消息的存活时间TTL(Time To Live)

死信交换机

一个消息在满足如下条件下,会进死信交换机,记住这里是交换机而不是队列,一个交换机可以对应很多队列。

  1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
  2. 上面的消息的TTL到了,消息过期了。
  3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

死信交换机就是普通的交换机,只是因为我们把过期的消息扔进去,所以叫死信交换机,并不是说死信交换机是某种特定的交换机

消息TTL(消息存活时间)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);

可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串: 当上面的消息扔到队列中后,过了60秒,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去

处理流程图

创建交换机(Exchanges)和队列(Queues)

创建死信交换机

如图所示,就是创建一个普通的交换机,这里为了方便区分,把交换机的名字取为:delay

创建自动过期消息队列

这个队列的主要作用是让消息定时过期的,比如我们需要2小时候关闭订单,我们就需要把消息放进这个队列里面,把消息过期时间设置为2小时

创建一个一个名为delay_queue1的自动过期的队列,当然图片上面的参数并不会让消息自动过期,因为我们并没有设置x-message-ttl参数,如果整个队列的消息有消息都是相同的,可以设置,这里为了灵活,所以并没有设置,另外两个参数x-dead-letter-exchange代表消息过期后,消息要进入的交换机,这里配置的是delay,也就是死信交换机,x-dead-letter-routing-key是配置消息过期后,进入死信交换机的routing-key,跟发送消息的routing-key一个道理,根据这个key将消息放入不同的队列

创建消息处理队列

这个队列才是真正处理消息的队列,所有进入这个队列的消息都会被处理

消息队列的名字为delay_queue2

消息队列绑定到交换机

进入交换机详情页面,将创建的2个队列(delay queue1和delay queue2)绑定到交换机上面

自动过期消息队列的routing key 设置为delay

绑定delay queue2

delay queue2 的key要设置为创建自动过期的队列的x-dead-letter-routing-key参数,这样当消息过期的时候就可以自动把消息放入delay_queue2这个队列中了

绑定后的管理页面如下图:

当然这个绑定也可以使用代码来实现,只是为了直观表现,所以本文使用的管理平台来操作

发送消息

String msg = "hello word";
MessageProperties messageProperties = new MessageProperties();
  messageProperties.setExpiration("6000");
  messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());
  Message message = new Message(msg.getBytes(), messageProperties);
  rabbitTemplate.convertAndSend("delay", "delay",message);

主要的代码就是

messageProperties.setExpiration("6000"); 

设置了让消息6秒后过期

注意:因为要让消息自动过期,所以一定不能设置delay_queue1的监听,不能让这个队列里面的消息被接受到,否则消息一旦被消费,就不存在过期了

接收消息

接收消息配置好delay_queue2的监听就好了

package wang.raye.rabbitmq.demo1;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DelayQueue {
 /** 消息交换机的名字*/
 public static final String EXCHANGE = "delay";
 /** 队列key1*/
 public static final String ROUTINGKEY1 = "delay";
 /** 队列key2*/
 public static final String ROUTINGKEY2 = "delay_key";

 /**
  * 配置链接信息
  * @return
  */
 @Bean
 public ConnectionFactory connectionFactory() {
  CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672);

  connectionFactory.setUsername("kberp");
  connectionFactory.setPassword("kberp");
  connectionFactory.setVirtualHost("/");
  connectionFactory.setPublisherConfirms(true); // 必须要设置
  return connectionFactory;
 }

 /**
  * 配置消息交换机
  * 针对消费者配置
  FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
  HeadersExchange :通过添加属性key-value匹配
  DirectExchange:按照routingkey分发到指定队列
  TopicExchange:多关键字匹配
  */
 @Bean
 public DirectExchange defaultExchange() {
  return new DirectExchange(EXCHANGE, true, false);
 } 

 /**
  * 配置消息队列2
  * 针对消费者配置
  * @return
  */
 @Bean
 public Queue queue() {
  return new Queue("delay_queue2", true); //队列持久 

 }
 /**
  * 将消息队列2与交换机绑定
  * 针对消费者配置
  * @return
  */
 @Bean
 @Autowired
 public Binding binding() {
  return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);
 } 

 /**
  * 接受消息的监听,这个监听会接受消息队列1的消息
  * 针对消费者配置
  * @return
  */
 @Bean
 @Autowired
 public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {
  SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
  container.setQueues(queue());
  container.setExposeListenerChannel(true);
  container.setMaxConcurrentConsumers(1);
  container.setConcurrentConsumers(1);
  container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
  container.setMessageListener(new ChannelAwareMessageListener() {

   public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
    byte[] body = message.getBody();
    System.out.println("delay_queue2 收到消息 : " + new String(body));
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
   }
  });
  return container;
 }
}

在消息监听中处理需要定时处理的任务就好了,因为Rabbitmq能发送消息,所以可以把任务特征码发过来,比如关闭订单就把订单id发过来,这样就避免了需要查询一下那些订单需要关闭而加重MySQL负担了,毕竟一旦订单量大的话,查询本身也是一件很费IO的事情

总结

基于Rabbitmq实现定时任务,就是将消息设置一个过期时间,放入一个没有读取的队列中,让消息过期后自动转入另外一个队列中,监控这个队列消息的监听处来处理定时任务具体的操作

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • RabbitMQ的配置与安装教程全纪录

    前言 最近这几天身体不舒服,脖子痛的厉害,可能是上月太累了好久没写博客了,之前也说了公司的.Net项目部做了,改用Scale来做,原本想着会用java来搞,所以上个月在拼命的学java,这几天一直脖子不舒服,也就不学了,也一直在追剧,和陈工聊天时也讨论过要不要转java,继续学,考虑了好久决定还是先不折腾了,继续.Net.本来想着再休息一周呢,不过最近脖子好多了,今天正好没事做就研究了下RabbitMQ. MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法

  • linux安装RabbitMQ详细教程

    一.RabbitMQ概念 RabbitMQ是流行的开源消息队列系统,是AMQP(Advanced Message Queuing Protocol高级消息队列协议)的标准实现,用erlang语言开发.RabbitMQ据说具有良好的性能和时效性,同时还能够非常好的支持集群和负载部署,非常适合在较大规模的分布式系统中使用. Rabbit模式大概分为以下三种:单一模式.普通模式.镜像模式 单一模式:最简单的情况,非集群模式,即单实例服务. 普通模式:默认的集群模式. queue创建之后,如果没有其它p

  • RabbitMQ .NET消息队列使用详解

    本文实例为大家分享了RabbitMQ .NET消息队列使用方法,供大家参考,具体内容如下 首先下载安装包,我都环境是win7 64位: 去官网下载 otp_win64_19.0.exe  和rabbitmq-server-3.6.3.exe安装好 然后开始编程了: (1)创建生产者类: class Program { private static void Main() { //建立RabbitMQ连接和通道 var connectionFactory = new ConnectionFacto

  • springboot2.0+elasticsearch5.5+rabbitmq搭建搜索服务的坑

    前一阵子准备为项目搭建一个简单的搜索服务,虽然业务数据库mongodb提供了文本搜索的支持,但是在大量文档需要通过关键词进行定位时,es明显更加适合去作为一个搜索引擎(虽然我们之前大部分使用到了ELK那套分析和可视化的特性).Elasticsearch建立在Lucene之上并且支持极其快速的查询和丰富的查询语法,偶尔也可以作为一个轻量级的NoSQL.但是对复杂查询和聚合操作的能力并不是很强. 本篇不会提及如何搭建一个简单搜索服务,而是记录一下大约一周工作时间内遇见的几个坑.. 为什么选择elas

  • 基于RabbitMQ的简单应用(详解)

    虽然后台使用了读写分离技术,能够在一定程度上抗击高并发,但是如果并发量特别巨大时,主数据库不能同时处理高并发的请求,这时数据库容易宕机. 问题: 现在的问题是如何既能保证数据库正常运行,又能实现用户数据的入库操作? 解决方案: 引入rabbitMQ技术: 说明: 当数据库的访问压力过载时,这时会将过载以后的数据先保存到rabbitMQ中.其中的数据结构是队列的形式,先进先出.这时数据库从队列中取数据执行.一直到队列中的数据全部操作完成为止. RabbitMQ就是消息的中间件. RabbitMQ介

  • .Net使用RabbitMQ即时发消息Demo

    前言 最近项目要使用RabbitMQ,网上已经有很多优秀的文章了,百度百科对RabbitMQ阐述也非常明确,建议去看下,还有amqp协议.必须一提的是rabbitmq是由LShift提供的一个消息队列协议(AMQP)的开源实现,由以高性能.健壮以及可伸缩性出名的Erlang写成(因此也是继承了这些优点). 最近参考大神们的博客,自己做了一个RabbitMQ即时发消息的Demo.下面话不多说了,来一起看看详细的介绍吧. 步骤如下: 1.使用VS的NuGet安装包管理工具安装RabbitMQ.Cli

  • 基于RabbitMQ几种Exchange 模式详解

    AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列.生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机.先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储.同理,消费者也是如此.Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中. RabbitMQ提供了四种Exchange模式:fanout,direct,topic,header . header模式在实际使用中较少,本文只对前三种模式进行比

  • Rabbitmq延迟队列实现定时任务的方法

    场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等等,都需要用到定时任务,但是定时任务本身有一个问题,一般来说我们都是通过定时轮询查询数据库来判断是否有任务需要执行,也就是说不管怎么样,我们需要先查询数据库,而且有些任务对时间准确要求比较高的,需要每秒查询一次,对于系统小倒是无所谓,如果系统本身就大而且数据也多的情况下,这就不大现实了,所以需要其他方式的,当然实现的方式有多种多样的,比如Redis实现定时队列.基于优先

  • RabbitMQ延迟队列及消息延迟推送实现详解

    这篇文章主要介绍了RabbitMQ延迟队列及消息延迟推送实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能. 12306 购票支付确认页面.我们在选好票点击确定跳转的页面中往往都会有倒计时,代表着 3

  • 手把手带你掌握SpringBoot RabbitMQ延迟队列

    目录 1. 简介 2. 安装插件 3. 实现延迟队列 3.1 引入所需依赖 3.2 application.yaml 3.3 RabbitConfig 3.4 Producer 3.5 Consumer 3.6 测试代码 3.7 启动测试 1. 简介 我们在上一篇博文中遗留了一个小问题,就是虽然TTL + DLX能实现延迟队列的功能,但是有两个问题. 首先业务场景为:比如海底捞预约,每个人预约的时间段不一致,有个可能一个小时后,有的可能三个小时等,当快到预约时间点需要给用户进行短信通知. 通过给

  • RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知(实例代码)

    在第三方支付中,例如支付宝.或者微信,对于订单请求,第三方支付系统采用的是消息同步返回.异步通知+主动补偿查询的补偿机制. 由于互联网通信的不可靠性,例如双方网络.服务器.应用等因素的影响,不管是同步返回.异步通知.主动查询报文都可能出现超时无响应.报文丢失等情况,所以像支付业务,对结果的通知一般采用几种方案结合的补偿机制,不能完全依赖某一种机制.例如一个支付结果的通知,一方面会在支付页面跳转时候返回支付结果(一般只用作前端展示使用,非最终状态),同时会采用后台异步通知机制(有前台.后台通知的,

  • C#实现rabbitmq 延迟队列功能实例代码

    最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉.当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭,但是在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源.当面对千万级.上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了.除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮等方式.但如果系统的架构中本身就

  • 如何通过Python实现RabbitMQ延迟队列

    最近在做一任务时,遇到需要延迟处理的数据,最开始的做法是现将数据存储在数据库,然后写个脚本,隔五分钟扫描数据表再处理数据,实际效果并不好.因为系统本身一直在用RabbitMQ做异步处理任务的中间件,所以想到是否可以利用RabbitMQ实现延迟队列.功夫不负有心人,RabbitMQ虽然没有现成可用的延迟队列,但是可以利用其两个重要特性来实现之:1.Time To Live(TTL)消息超时机制:2.Dead Letter Exchanges(DLX)死信队列.下面将具体描述实现原理以及实现代 延迟

  • springcloud中RabbitMQ死信队列与延迟交换机实现方法

    目录 0.引言 1. 死信队列 1.2 什么是死信? 1.3 什么是死信队列? 1.4 创建死信交换机.死信队列 1.5 实现死信消息 1.5.1 基于消费者进行reject或nack实现死信消息 1.5.2 基于生存时间实现 1.5.3 基于队列max_length实现 1.6 基于死信队列实现消息延迟发送 基于死信队列实现消息延迟发送的问题 2. 延迟交换机 3. 应用场景 4. 练习题 0.引言 死信队列是消息队列中非常重要的概念,同时我们需要业务场景中都需要延迟发送的概念,比如12306

  • RabbitMQ死信机制实现延迟队列的实战

    目录 延迟队列 应用场景 Time To Live(TTL) Dead Letter Exchanges(DLX) 延迟队列 延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 应用场景 三方支付,扫码支付调用上游的扫码接口,当扫码有效期过后去调用查询接口查询结果.实现方式:每当一笔扫码支付请求后,立即将此订单号放入延迟队列中(RabbitMQ),队列过期时间为二维码有效期,此队列

  • RabbitMQ 实现延迟队列的两种方式详解

    目录 1. 用插件 1.1 安装插件 1.2 消息收发 2. DLX 实现延迟队列 2.1 延迟队列实现思路 2.2 案例 3. 小结 定时任务各种各样,常见的定时任务例如日志备份,我们可能在每天凌晨 3 点去备份,这种固定时间的定时任务我们一般采用 cron 表达式就能轻松的实现,还有一些比较特殊的定时任务,向大家看电影中的定时炸弹,3分钟后爆炸,这种定时任务就不太好用 cron 去描述,因为开始时间不确定,我们开发中有的时候也会遇到类似的需求,例如: 在电商项目中,当我们下单之后,一般需要

  • springcloud安装rabbitmq并配置延迟队列插件的过程详解

    目录 0. 引言 1. docker安装 1.1 安装rabbitmq 1.2 安装延迟队列插件delayed_message_exchange 2. docker打包安装rabbitmq+延迟队列插件 0. 引言 本期主要讲解如何利用docker快速安装rabbitmq并且配置延迟队列插件 1. docker安装 1.1 安装rabbitmq 1.下载镜像 docker pull rabbitmq 2.安装镜像 docker run -d --hostname my-rabbit --name

随机推荐