RabbitMQ 实现延迟队列的两种方式详解

目录
  • 1. 用插件
    • 1.1 安装插件
    • 1.2 消息收发
  • 2. DLX 实现延迟队列
    • 2.1 延迟队列实现思路
    • 2.2 案例
  • 3. 小结

定时任务各种各样,常见的定时任务例如日志备份,我们可能在每天凌晨 3 点去备份,这种固定时间的定时任务我们一般采用 cron 表达式就能轻松的实现,还有一些比较特殊的定时任务,向大家看电影中的定时炸弹,3分钟后爆炸,这种定时任务就不太好用 cron 去描述,因为开始时间不确定,我们开发中有的时候也会遇到类似的需求,例如:

  • 在电商项目中,当我们下单之后,一般需要 20 分钟之内或者 30 分钟之内付款,否则订单就会进入异常处理逻辑中,被取消,那么进入到异常处理逻辑中,就可以当成是一个延迟队列。
  • 我买了一个智能砂锅,可以用来煮粥,上班前把素材都放到锅里,然后设置几点几分开始煮粥,这样下班后就可以喝到香喷喷的粥了,那么这个煮粥的指令也可以看成是一个延迟任务,放到一个延迟队列中,时间到了再执行。
  • 公司的会议预定系统,在会议预定成功后,会在会议开始前半小时通知所有预定该会议的用户。
  • 安全工单超过 24 小时未处理,则自动拉企业微信群提醒相关责任人。
  • 用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时。

很多场景下我们都需要延迟队列。

本文以 RabbitMQ 为例来和大家聊一聊延迟队列的玩法。

整体上来说,在 RabbitMQ 上实现定时任务有两种方式:

  1. 利用 RabbitMQ 自带的消息过期和私信队列机制,实现定时任务。
  2. 使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来实现定时任务,这种方案较简单。

两种用法我们分别来看。

1. 用插件

1.1 安装插件

首先我们需要下载 rabbitmq_delayed_message_exchange 插件,这是一个 GitHub 上的开源项目,我们直接下载即可:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

选择适合自己的版本,我这里选择最新的 3.9.0 版。

下载完成后在命令行执行如下命令将下载文件拷贝到 Docker 容器中去:

docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez some-rabbit:/plugins

这里第一个参数是宿主机上的文件地址,第二个参数是拷贝到容器的位置。

接下来再执行如下命令进入到 RabbitMQ 容器中:

docker exec -it some-rabbit /bin/bash

进入到容器之后,执行如下命令启用插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

启用成功之后,还可以通过如下命令查看所有安装的插件,看看是否有我们刚刚安装过的插件,如下:

rabbitmq-plugins list

命令的完整执行过程如下图:

OK,配置完成之后,接下来我们执行 exit 命令退出 RabbitMQ 容器。然后开始编码。

1.2 消息收发

接下来开始消息收发。

首先我们创建一个 Spring Boot 项目,引入 Web 和 RabbitMQ 依赖,如下:

项目创建成功后,在 application.properties 中配置 RabbitMQ 的基本信息,如下:

spring.rabbitmq.host=localhost
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.virtual-host=/

接下来提供一个 RabbitMQ 的配置类:

@Configuration
public class RabbitConfig {
    public static final String QUEUE_NAME = "javaboy_delay_queue";
    public static final String EXCHANGE_NAME = "javaboy_delay_exchange";
    public static final String EXCHANGE_TYPE = "x-delayed-message";

    @Bean
    Queue queue() {
        return new Queue(QUEUE_NAME, true, false, false);
    }

    @Bean
    CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(EXCHANGE_NAME, EXCHANGE_TYPE, true, false,args);
    }

    @Bean
    Binding binding() {
        return BindingBuilder.bind(queue())
                .to(customExchange()).with(QUEUE_NAME).noargs();
    }
}

这里主要是交换机的定义有所不同,小伙伴们需要注意。

这里我们使用的交换机是 CustomExchange,这是一个 Spring 中提供的交换机,创建 CustomExchange 时有五个参数,含义分别如下:

  • 交换机名称。
  • 交换机类型,这个地方是固定的。
  • 交换机是否持久化。
  • 如果没有队列绑定到交换机,交换机是否删除。
  • 其他参数。

最后一个 args 参数中,指定了交换机消息分发的类型,这个类型就是大家熟知的 direct、fanout、topic 以及 header 几种,用了哪种类型,将来交换机分发消息就按哪种方式来。

接下来我们再创建一个消息消费者:

@Component
public class MsgReceiver {
    private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class);
    @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
    public void handleMsg(String msg) {
        logger.info("handleMsg,{}",msg);
    }
}

打印一下消息内容即可。

接下来再写一个单元测试方法来发送消息:

@SpringBootTest
class MqDelayedMsgDemoApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() throws UnsupportedEncodingException {
        Message msg = MessageBuilder.withBody(("hello 江南一点雨"+new Date()).getBytes("UTF-8")).setHeader("x-delay", 3000).build();
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.QUEUE_NAME, msg);
    }

}

在消息头中设置消息的延迟时间。

好啦,接下来启动 Spring Boot 项目,然后运行单元测试方法发送消息,最终的控制台打印日志如下:

从日志中可以看到消息延迟已经实现了。

2. DLX 实现延迟队列

2.1 延迟队列实现思路

延迟队列实现的思路也很简单,就是DLX(死信交换机)+TTL(消息超时时间)。

我们可以把死信队列就当成延迟队列。

具体来说是这样:

假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,同时为这条消息配置死信交换机和死信 routing_key,并且不为这个消息队列设置消费者,那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,此时我们有一个消费者就在“蹲点”这个死信队列,消息一进入死信队列,就立马被消费了。

这就是延迟队列的实现思路,是不是很简单?

2.2 案例

接下来松哥通过一个简单的案例,来和大家演示一下延迟队列的具体实现。

首先准备好一个启动的 RabbitMQ。

然后我们创建一个 Spring Boot 项目,引入 RabbitMQ 依赖:

然后在 application.properties 中配置一下 RabbitMQ 的基本连接信息:

spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672

接下来我们来配置两个消息队列:一个普通队列,一个死信队列:

@Configuration
public class QueueConfig {
    public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
    public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
    public static final String JAVABOY_ROUTING_KEY = "javaboy_routing_key";
    public static final String DLX_QUEUE_NAME = "dlx_queue_name";
    public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name";
    public static final String DLX_ROUTING_KEY = "dlx_routing_key";

    /**
     * 死信队列
     * @return
     */
    @Bean
    Queue dlxQueue() {
        return new Queue(DLX_QUEUE_NAME, true, false, false);
    }

    /**
     * 死信交换机
     * @return
     */
    @Bean
    DirectExchange dlxExchange() {
        return new DirectExchange(DLX_EXCHANGE_NAME, true, false);
    }

    /**
     * 绑定死信队列和死信交换机
     * @return
     */
    @Bean
    Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange())
                .with(DLX_ROUTING_KEY);
    }

    /**
     * 普通消息队列
     * @return
     */
    @Bean
    Queue javaboyQueue() {
        Map<String, Object> args = new HashMap<>();
        //设置消息过期时间
        args.put("x-message-ttl", 1000*10);
        //设置死信交换机
        args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
        //设置死信 routing_key
        args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
        return new Queue(JAVABOY_QUEUE_NAME, true, false, false, args);
    }

    /**
     * 普通交换机
     * @return
     */
    @Bean
    DirectExchange javaboyExchange() {
        return new DirectExchange(JAVABOY_EXCHANGE_NAME, true, false);
    }

    /**
     * 绑定普通队列和与之对应的交换机
     * @return
     */
    @Bean
    Binding javaboyBinding() {
        return BindingBuilder.bind(javaboyQueue())
                .to(javaboyExchange())
                .with(JAVABOY_ROUTING_KEY);
    }
}

这段配置代码虽然略长,不过原理其实简单。

  • 配置可以分为两组,第一组配置死信队列,第二组配置普通队列。每一组都由消息队列、消息交换机以及 Binding 三者组成。
  • 配置消息队列时,为消息队列指定死信队列。
  • 配置队列中的消息过期时间时,默认的时间单位时毫秒。

接下来我们为死信队列配置一个消费者,如下:

@Component
public class DlxConsumer {
    private static final Logger logger = LoggerFactory.getLogger(DlxConsumer.class);

    @RabbitListener(queues = QueueConfig.DLX_QUEUE_NAME)
    public void handle(String msg) {
        logger.info(msg);
    }
}

收到消息后就将之打印出来。

这就完事了。

启动项目。

最后我们在单元测试中发送一条消息:

@SpringBootTest
class DelayQueueApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
        System.out.println(new Date());
        rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_EXCHANGE_NAME, QueueConfig.JAVABOY_ROUTING_KEY, "hello javaboy!");
    }

}

这个就没啥好说的了,就是普通的消息发送,10 秒之后这条消息会在死信队列的消费者中被打印出来。

3. 小结

好啦,这就是我们用 RabbitMQ 做延迟队列的两种思路~感兴趣的小伙伴可以试试哦~ 

到此这篇关于RabbitMQ 实现延迟队列的两种方式详解的文章就介绍到这了,更多相关RabbitMQ 延迟队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 手把手带你掌握SpringBoot RabbitMQ延迟队列

    目录 1. 简介 2. 安装插件 3. 实现延迟队列 3.1 引入所需依赖 3.2 application.yaml 3.3 RabbitConfig 3.4 Producer 3.5 Consumer 3.6 测试代码 3.7 启动测试 1. 简介 我们在上一篇博文中遗留了一个小问题,就是虽然TTL + DLX能实现延迟队列的功能,但是有两个问题. 首先业务场景为:比如海底捞预约,每个人预约的时间段不一致,有个可能一个小时后,有的可能三个小时等,当快到预约时间点需要给用户进行短信通知. 通过给

  • 如何通过Python实现RabbitMQ延迟队列

    最近在做一任务时,遇到需要延迟处理的数据,最开始的做法是现将数据存储在数据库,然后写个脚本,隔五分钟扫描数据表再处理数据,实际效果并不好.因为系统本身一直在用RabbitMQ做异步处理任务的中间件,所以想到是否可以利用RabbitMQ实现延迟队列.功夫不负有心人,RabbitMQ虽然没有现成可用的延迟队列,但是可以利用其两个重要特性来实现之:1.Time To Live(TTL)消息超时机制:2.Dead Letter Exchanges(DLX)死信队列.下面将具体描述实现原理以及实现代 延迟

  • Spring Boot与RabbitMQ结合实现延迟队列的示例

    背景 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列.而一般的队列,消息一旦入队了之后就会被消费者马上消费. 场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理.这是就可以使用延时队列将订单信息发送到延时队列. 场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作.这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到只能设备. 延迟队列能做什么? 延迟队列多用于需

  • Rabbitmq延迟队列实现定时任务的方法

    场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等等,都需要用到定时任务,但是定时任务本身有一个问题,一般来说我们都是通过定时轮询查询数据库来判断是否有任务需要执行,也就是说不管怎么样,我们需要先查询数据库,而且有些任务对时间准确要求比较高的,需要每秒查询一次,对于系统小倒是无所谓,如果系统本身就大而且数据也多的情况下,这就不大现实了,所以需要其他方式的,当然实现的方式有多种多样的,比如Redis实现定时队列.基于优先

  • RabbitMQ延迟队列及消息延迟推送实现详解

    这篇文章主要介绍了RabbitMQ延迟队列及消息延迟推送实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能. 12306 购票支付确认页面.我们在选好票点击确定跳转的页面中往往都会有倒计时,代表着 3

  • RabbitMQ死信机制实现延迟队列的实战

    目录 延迟队列 应用场景 Time To Live(TTL) Dead Letter Exchanges(DLX) 延迟队列 延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 应用场景 三方支付,扫码支付调用上游的扫码接口,当扫码有效期过后去调用查询接口查询结果.实现方式:每当一笔扫码支付请求后,立即将此订单号放入延迟队列中(RabbitMQ),队列过期时间为二维码有效期,此队列

  • RabbitMQ 实现延迟队列的两种方式详解

    目录 1. 用插件 1.1 安装插件 1.2 消息收发 2. DLX 实现延迟队列 2.1 延迟队列实现思路 2.2 案例 3. 小结 定时任务各种各样,常见的定时任务例如日志备份,我们可能在每天凌晨 3 点去备份,这种固定时间的定时任务我们一般采用 cron 表达式就能轻松的实现,还有一些比较特殊的定时任务,向大家看电影中的定时炸弹,3分钟后爆炸,这种定时任务就不太好用 cron 去描述,因为开始时间不确定,我们开发中有的时候也会遇到类似的需求,例如: 在电商项目中,当我们下单之后,一般需要

  • Springboot+rabbitmq实现延时队列的两种方式

    什么是延时队列,延时队列应用于什么场景 延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费. 那么,为什么需要延迟消费呢?我们来看以下的场景 网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝.去哪儿网) 系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会 系统中的业务失败之后,需要重试 这些场景都非常常见,我们可以思考,比如第二个需求,系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会.那么一天之中肯定是会有很多个

  • Android事件处理的两种方式详解

    安卓提供了两种方式的事件处理:基于回调的事件处理和基于监听的事件处理. 基于监听的事件处理 基于监听的事件处理一般包含三个要素,分别是: Event Source(事件源):事件发生的场所,通常是各个组件 Event(事件):事件封装了界面组件上发生的特定事件(通常就是用户的一次操作) Event Listener(事件监听器):负责监听事件源发生的事件,并对各种事件作出相应的响应 下面使用一个简单的案例介绍按钮事件监听器 布局文件就是简单的线性布局器,上面是一个EditText,下面是一个Bu

  • IOS 指纹识别两种方式详解及实例

    IOS 指纹识别两种方式详解及实例 首先引入类名: #import <LocalAuthentication/LocalAuthentication.h> 然后在实现指纹识别的地方放入如下代码: 方式一: LAContext *lacontext = [[LAContext alloc]init]; // 判断设备是否支持指纹识别 BOOL isSupport = [lacontext canEvaluatePolicy:LAPolicyDeviceOwnerAuthenticationWit

  • springboot 注册服务注册中心(zk)的两种方式详解

    在使用springboot进行开发的过程中,我们经常需要处理这样的场景:在服务启动的时候,需要向服务注册中心(例如zk)注册服务状态,以便当服务状态改变的时候,可以故障摘除和负载均衡. 我遇到过两种注册的途径: 1.在Spring的webapplication启动完成后,直接进行注册: 2.在servlet容器启动完成后,通过listener进行注册. 本文通过一个demo讲述一下这两种注册方式,使用的是传统的向zk注册的方案. 1.Spring webapplication启动完成后注册 先上

  • 对Python使用mfcc的两种方式详解

    1.Librosa import librosa filepath = "/Users/birenjianmo/Desktop/learn/librosa/mp3/in.wav" y,sr = librosa.load(filepath) mfcc = librosa.feature.mfcc( y,sr,n_mfcc=13 ) 返回结构为(13,None)的np.Array,None表示任意数量 2.python_speech_features from python_speech_

  • mapper接口注入两种方式详解

    这篇文章主要介绍了mapper接口注入两种方式详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1.使用模板方式: <!--使用模板类实现mybatis --> <bean id="sqlSession" class="org.mybatis.spring.SqlSessionTemplate"> <constructor-arg name="sqlSessionFacto

  • IDEA安装阿里巴巴编码规范插件的两种方式详解(在线安装和离线安装)

    目录 1.在线安装: 2.离线安装: IDEA安装阿里巴巴编码规范插件的两种方式:在线安装和离线安装. 1.在线安装: 打开file-settings-Plugins.如图: 搜索到点击右边的install按钮,即可在线安装. 在线安装非常简单. 2.离线安装: 下载插件地址:https://plugins.jetbrains.com/plugin/10046-alibaba-java-coding-guidelines/versions或者 链接: https://pan.baidu.com/

  • Spring Bean属性注入的两种方式详解

    目录 属性注入概念 一.构造器注入 示例1 注意点 二.setter注入 示例2 三.如何选择注入方式 属性注入概念 Spring 属性注入(DI依赖注入)有两种方式:setter注入,构造器注入. 这个注入的属性可以是普通属性(基本数据类型与String等),也可以是一个引用数据类型(主要是对象),或者是一个集合(list.map.set等) 下表是属性注入bean标签中常用的元素 元素名称 描述 constructor-arg 构造器注入.该元素的 index 属性指定构造参数的索引(从 0

  • Spring框架实现AOP的两种方式详解

    目录 第一种AOP实现方式 AfterLog Log 配置文件 实例调用 定义接口 第二种AOP实现方式 第一种AOP实现方式 AfterLog package com.xxx.demo.service1; import org.junit.After; import org.springframework.aop.AfterReturningAdvice; import java.lang.reflect.Method; public class AfterLog implements Aft

随机推荐