手把手带你掌握SpringBoot RabbitMQ延迟队列

目录
  • 1. 简介
  • 2. 安装插件
  • 3. 实现延迟队列
    • 3.1 引入所需依赖
    • 3.2 application.yaml
    • 3.3 RabbitConfig
    • 3.4 Producer
    • 3.5 Consumer
    • 3.6 测试代码
    • 3.7 启动测试

1. 简介

我们在上一篇博文中遗留了一个小问题,就是虽然TTL + DLX能实现延迟队列的功能,但是有两个问题。

首先业务场景为:比如海底捞预约,每个人预约的时间段不一致,有个可能一个小时后,有的可能三个小时等,当快到预约时间点需要给用户进行短信通知。

通过给Queue设置过期时间的方式不现实,因为很有可能每条记录的过期时间都不一样,不可能设置那么多的Queue。直接给Message设置过期时间,这种方式也不好,因为这种方式是当该消息在队列头部时(消费时),才会单独判断这一消息是否过期。例:现在有两条消息,第一条消息过期时间为30s,而第二条消息过期时间为15s,当过了15秒后,第二条消息不会立即过期,而是要等第一条消息被消费后,第二条消息被消费时,才会判断是否过期,也就是等到第二条消息投往DLX已经过去45s了。

这也就抛出了本章主题:延迟队列

RabbitMQ默认没有提供延迟队列功能,而是要通过插件提供的x-delayed-message(延迟交换机)来实现。

延迟队列:用户可以使用该类型声明一个交换,x-delayed-message然后使用自定义标头发布消息,x-delay以毫秒为单位表示消息的延迟时间。消息将在x-delay毫秒后传递到相应的队列。

2. 安装插件

官方插件地址:https://www.rabbitmq.com/community-plugins.html

找到插件rabbitmq_delayed_message_exchange,进入GitHub下载本地RabbitMQ对应的插件版本(下载.ez文件)。

我这里下载的是3.8.9版本,如图:

下载到本地后将文件放置RabbitMQ的plugins目录。

我这里本地是使用docker-compose安装的服务,imagerabbitmq:3.8.3-management(虽然版本没对起来,但是测试能用,但是使用3.9的版本会报错,插件安装失败)安装的服务,操作步骤如下:

1.将下载好的文件放置RabbitMQ插件目录

rabbitmq:容器服务名

$ docker cp /Users/ludangxin/Downloads/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq:/opt/rabbitmq/plugins/

2.进入容器

$ docker exec -it rabbitmq /bin/bash

3.查看现有的插件列表

$ rabbitmq-plugins list
# 输出部分内容如下 [E*] = 明确启用; e = 隐式启用
[  ] rabbitmq_amqp1_0                  3.8.3
[  ] rabbitmq_auth_backend_cache       3.8.3
[  ] rabbitmq_auth_backend_http        3.8.3
[  ] rabbitmq_auth_backend_ldap        3.8.3
[  ] rabbitmq_auth_backend_oauth2      3.8.3
[  ] rabbitmq_auth_mechanism_ssl       3.8.3
[  ] rabbitmq_consistent_hash_exchange 3.8.3
[  ] rabbitmq_event_exchange           3.8.3
[  ] rabbitmq_federation               3.8.3
[  ] rabbitmq_federation_management    3.8.3
[  ] rabbitmq_jms_topic_exchange       3.8.3
[E*] rabbitmq_management               3.8.3
[e*] rabbitmq_management_agent         3.8.3
[  ] rabbitmq_mqtt                     3.8.3

4.启用插件

$ rabbitmq-plugins enable rabbitmq_delayed_message_exchange

再次查看安装列表就有了rabbitmq_delayed_message_exchange

安装完毕后登陆RabbitMQ控制台查看,会发现多了个x-delayed-message类型的Exchange。

3. 实现延迟队列

3.1 引入所需依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-test</artifactId>
    <scope>test</scope>
</dependency>

3.2 application.yaml

spring:
  rabbitmq:
    host: localhost
    port: 5672
    # rabbit 默认的虚拟主机
    virtual-host: /
    # rabbit 用户名密码
    username: admin
    password: admin123

3.3 RabbitConfig

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

/**
 * 延迟队列配置
 *
 * @author ludangxin
 * @date 2021/9/16
 */
@Configuration
public class RabbitDelayedConfig {
    public static final String QUEUE_NAME_DELAYED = "DELAY.QUEUE";
    public static final String EXCHANGE_NAME_DELAYED = "DELAY.EXCHANGE";
    public static final String ROUTING_KEY_DELAYED = "DELAY.#";

    @Bean(QUEUE_NAME_DELAYED)
    public Queue queue() {
       return QueueBuilder.durable(QUEUE_NAME_DELAYED).build();
    }

    @Bean(EXCHANGE_NAME_DELAYED)
    public CustomExchange exchange() {
       Map<String, Object> arguments = new HashMap<>(1);
       // 在这里声明一个主题类型的延迟队列,当然其他类型的也可以。
       arguments.put("x-delayed-type", "topic");
       return new CustomExchange(EXCHANGE_NAME_DELAYED, "x-delayed-message", true, false, arguments);
    }

    @Bean
    public Binding bindingNotify(@Qualifier(QUEUE_NAME_DELAYED) Queue queue, @Qualifier(EXCHANGE_NAME_DELAYED) CustomExchange customExchange) {
       return BindingBuilder.bind(queue).to(customExchange).with(ROUTING_KEY_DELAYED).noargs();
    }
}

3.4 Producer

import com.ldx.rabbitmq.config.RabbitDelayedConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * 延迟消息生产者
 *
 * @author ludangxin
 * @date 2021/9/9
 */
@Component
public class DelayProducer {

   @Autowired
   private RabbitTemplate rabbitTemplate;

   public void sendDelayedMsg(String msg, Integer delay) {
      MessageProperties mp = new MessageProperties();
      // 设置过期时间
      mp.setDelay(delay);
      Message message = new Message(msg.getBytes(), mp);
      rabbitTemplate.convertAndSend(RabbitDelayedConfig.EXCHANGE_NAME_DELAYED, "DELAY.MSG", message);
   }
}

3.5 Consumer

import com.ldx.rabbitmq.config.RabbitDelayedConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * 延迟消息消费者
 *
 * @author ludangxin
 * @date 2021/9/9
 */
@Slf4j
@Component
public class DelayConsumer {

    @RabbitListener(queues = {RabbitDelayedConfig.QUEUE_NAME_DELAYED})
    public void delayQueue(Message message){
        log.info(new String(message.getBody()) + ",结束时间为:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
    }

}

3.6 测试代码

@Autowired
private DelayProducer delayProducer;

@Test
@SneakyThrows
public void sendDelayedMsg() {
   for(int i = 16; i >= 10; i --) {
      String msg = "我将在" + i + "s后过期,开始时间为:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
      delayProducer.sendDelayedMsg(msg,i * 1000);
   }
   // 使进程阻塞,方便Consumer监听输出Message
   System.in.read();
}

3.7 启动测试

启动测试代码,输出内容如下:

从日志内容可以看出,消息存活了30s,符合预期。

2021-09-16 23:40:10.806 INFO 7883 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.Delay2Consumer : 我将在10s后过期,开始时间为:2021-09-16 23:40:00,结束时间为:2021-09-16 23:40:10
2021-09-16 23:40:11.792 INFO 7883 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.Delay2Consumer : 我将在11s后过期,开始时间为:2021-09-16 23:40:00,结束时间为:2021-09-16 23:40:11
2021-09-16 23:40:12.791 INFO 7883 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.Delay2Consumer : 我将在12s后过期,开始时间为:2021-09-16 23:40:00,结束时间为:2021-09-16 23:40:12
2021-09-16 23:40:13.791 INFO 7883 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.Delay2Consumer : 我将在13s后过期,开始时间为:2021-09-16 23:40:00,结束时间为:2021-09-16 23:40:13
2021-09-16 23:40:14.788 INFO 7883 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.Delay2Consumer : 我将在14s后过期,开始时间为:2021-09-16 23:40:00,结束时间为:2021-09-16 23:40:14
2021-09-16 23:40:15.785 INFO 7883 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.Delay2Consumer : 我将在15s后过期,开始时间为:2021-09-16 23:40:00,结束时间为:2021-09-16 23:40:15
2021-09-16 23:40:16.785 INFO 7883 --- [ntContainer#0-1] c.ldx.rabbitmq.consumer.Delay2Consumer : 我将在16s后过期,开始时间为:2021-09-16 23:40:00,结束时间为:2021-09-16 23:40:16

到此这篇关于手把手带你掌握SpringBoot RabbitMQ延迟队列的文章就介绍到这了,更多相关SpringBoot RabbitMQ 延迟队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • SpringBoot停止启动时测试检查rabbitmq操作

    目录 SpringBoot停止启动时测试检查rabbitmq 问题 解决 RabbitMQ的简单使用的Demo 1.声明 2.创建一个测试账户 3.pom依赖 5.创建入口类 6.测试 7.总结 SpringBoot停止启动时测试检查rabbitmq 问题 在Springboot项目中配置rabbitmq后,总是在每次启动时自动测试MQ的连接,如果测试不通过,就一直重连,导致项目无法正常启动.自己在开发与MQ无关的功能时,无法正常进行,十分耽误时间.如下所示: org.springframewo

  • 详解消息队列及RabbitMQ部署和使用

    目录 什么是消息队列 为什么需要消息队列 常见的消息队列 ActiveMQ RabbitMQ ZeroMQ Kafka RocketMQ RabbitMQ 的部署和使用 Python 编写生产者 Python 编写消费者 最后的话 什么是消息队列 消息队列拆开了看,就是消息 + 队列,消息是什么?其实就是程序之间通讯所用到的数据,消息从生产者那里产生,进入队列后,安装设计好的规则出队,由消费者消费.仅此而已. 为什么需要消息队列 消息队列,最重要的是队列,可以想象一下没有队列的场景,你去银行办业

  • 一篇文章教你将JAVA的RabbitMQz与SpringBoot整合

    目录 一.fanout:发布订阅型 二.direct:直连型 三.topic:通配符模式 四.消费者端接收消息 总结 本文主要聊SpringBoot整合RabbitMQ,主要分为生产者和消费者两个工程,目录结构如下: 先简单说一下RabbitMQ的一些核心概念: 1.虚拟主机vhost:vhost是物理隔离的,你可以将vhost看作是一个个小型的RabbitMQ 2.交换机exchange:生产者发送的消息不是直接到达队列的,而是交换机,然后交换机再根据路由key,路由到指定的队列,可以理解为一

  • SpringBoot整合RabbitMQ实现消息确认机制

    前面几篇案例已经将常用的交换器(DirectExchange.TopicExchange.FanoutExchange)的用法介绍完了,现在我们来看一下消息的回调,也就是消息确认. 在rabbitmq-provider项目的application.yml文件上加上一些配置 server: port: 8021 spring: #给项目来个名字 application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.

  • Springboot集成RabbitMQ死信队列的实现

    目录 关于死信队列 什么样的消息会进入死信队列? 场景分析 代码实现 场景模拟 生产者 消费者,设置死信队列监听 关于死信队列 在大多数的MQ中间件中,都有死信队列的概念.死信队列同其他的队列一样都是普通的队列.在RabbitMQ中并没有特定的"死信队列"类型,而是通过配置,将其实现. 当我们在创建一个业务的交换机和队列的时候,可以配置参数,指明另一个队列为当前队列的死信队列,在RabbitMQ中,死信队列(严格的说应该是死信交换机)被称为DLX Exchange.当消息"死

  • 手把手带你掌握SpringBoot RabbitMQ延迟队列

    目录 1. 简介 2. 安装插件 3. 实现延迟队列 3.1 引入所需依赖 3.2 application.yaml 3.3 RabbitConfig 3.4 Producer 3.5 Consumer 3.6 测试代码 3.7 启动测试 1. 简介 我们在上一篇博文中遗留了一个小问题,就是虽然TTL + DLX能实现延迟队列的功能,但是有两个问题. 首先业务场景为:比如海底捞预约,每个人预约的时间段不一致,有个可能一个小时后,有的可能三个小时等,当快到预约时间点需要给用户进行短信通知. 通过给

  • Rabbitmq延迟队列实现定时任务的方法

    场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等等,都需要用到定时任务,但是定时任务本身有一个问题,一般来说我们都是通过定时轮询查询数据库来判断是否有任务需要执行,也就是说不管怎么样,我们需要先查询数据库,而且有些任务对时间准确要求比较高的,需要每秒查询一次,对于系统小倒是无所谓,如果系统本身就大而且数据也多的情况下,这就不大现实了,所以需要其他方式的,当然实现的方式有多种多样的,比如Redis实现定时队列.基于优先

  • RabbitMQ延迟队列及消息延迟推送实现详解

    这篇文章主要介绍了RabbitMQ延迟队列及消息延迟推送实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能. 12306 购票支付确认页面.我们在选好票点击确定跳转的页面中往往都会有倒计时,代表着 3

  • RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知(实例代码)

    在第三方支付中,例如支付宝.或者微信,对于订单请求,第三方支付系统采用的是消息同步返回.异步通知+主动补偿查询的补偿机制. 由于互联网通信的不可靠性,例如双方网络.服务器.应用等因素的影响,不管是同步返回.异步通知.主动查询报文都可能出现超时无响应.报文丢失等情况,所以像支付业务,对结果的通知一般采用几种方案结合的补偿机制,不能完全依赖某一种机制.例如一个支付结果的通知,一方面会在支付页面跳转时候返回支付结果(一般只用作前端展示使用,非最终状态),同时会采用后台异步通知机制(有前台.后台通知的,

  • C#实现rabbitmq 延迟队列功能实例代码

    最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉.当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭,但是在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源.当面对千万级.上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了.除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮等方式.但如果系统的架构中本身就

  • 如何通过Python实现RabbitMQ延迟队列

    最近在做一任务时,遇到需要延迟处理的数据,最开始的做法是现将数据存储在数据库,然后写个脚本,隔五分钟扫描数据表再处理数据,实际效果并不好.因为系统本身一直在用RabbitMQ做异步处理任务的中间件,所以想到是否可以利用RabbitMQ实现延迟队列.功夫不负有心人,RabbitMQ虽然没有现成可用的延迟队列,但是可以利用其两个重要特性来实现之:1.Time To Live(TTL)消息超时机制:2.Dead Letter Exchanges(DLX)死信队列.下面将具体描述实现原理以及实现代 延迟

  • 手把手带你分析SpringBoot自动装配完成了Ribbon哪些核心操作

    目录 一.项目案例准备 1.Order服务 2.User服务 二.Ribbon原理分析 1.RibbonAutoConfiguration 2.LoadBalancerAutoConfiguration 总结 一.项目案例准备 首先我们大家案例环境,通过[RestTemplate]来实现服务调用,通过[Ribbon]实现客户端负载均衡操作. 1.Order服务 我们的Order服务作为服务提供者.创建SpringBoot项目,并添加相关依赖 <?xml version="1.0"

  • SpringBoot整合RabbitMQ处理死信队列和延迟队列

    目录 简介 实例代码 路由配置 控制器 发送器 接收器 application.yml 实例测试 简介 说明 本文用示例介绍SpringBoot整合RabbitMQ时如何处理死信队列/延迟队列. RabbitMQ消息简介 RabbitMQ的消息默认不会超时. 什么是死信队列?什么是延迟队列? 死信队列: DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,

  • SpringBoot集成Redisson实现延迟队列的场景分析

    使用场景 1.下单成功,30分钟未支付.支付超时,自动取消订单 2.订单签收,签收后7天未进行评价.订单超时未评价,系统默认好评 3.下单成功,商家5分钟未接单,订单取消 4.配送超时,推送短信提醒 ...... 对于延时比较长的场景.实时性不高的场景,我们可以采用任务调度的方式定时轮询处理.如:xxl-job 今天我们采用一种比较简单.轻量级的方式,使用 Redis 的延迟队列来进行处理.当然有更好的解决方案,可根据公司的技术选型和业务体系选择最优方案.如:使用消息中间件Kafka.Rabbi

  • Springboot+rabbitmq实现延时队列的两种方式

    什么是延时队列,延时队列应用于什么场景 延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费. 那么,为什么需要延迟消费呢?我们来看以下的场景 网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝.去哪儿网) 系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会 系统中的业务失败之后,需要重试 这些场景都非常常见,我们可以思考,比如第二个需求,系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会.那么一天之中肯定是会有很多个

随机推荐