Spring Boot与RabbitMQ结合实现延迟队列的示例

背景

何为延迟队列?

顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。

场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列。

场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到只能设备。

延迟队列能做什么?

延迟队列多用于需要延迟工作的场景。最常见的是以下两种场景:

1、延迟消费。比如:

  1. 用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。
  2. 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。

2、延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。

如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。这种方案既不优雅,也不方便做成统一的服务便于开发人员使用。但是使用延迟队列的话,我们就可以轻而易举地完成。

如何实现?

别急,在下文中,我们将详细介绍如何利用Spring Boot加RabbitMQ来实现延迟队列。

本文出现的示例代码都已push到Github仓库中:https://github.com/Lovelcp/blog-demos/tree/master/spring-boot-rabbitmq-delay-queue

实现思路

在介绍具体的实现思路之前,我们先来介绍一下RabbitMQ的两个特性,一个是Time-To-Live Extensions,另一个是Dead Letter Exchanges。

Time-To-Live Extensions

RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。更多资料请查阅官方文档。

Dead Letter Exchange

刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:

  1. 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。
  2. 消息因为设置了TTL而过期。
  3. 消息进入了一条已经达到最大长度的队列。

如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。更多资料请查阅官方文档。

流程图

聪明的你肯定已经想到了,如何将RabbitMQ的TTL和DLX特性结合在一起,实现一个延迟队列。

针对于上述的延迟队列的两个场景,我们分别有以下两种流程图:

延迟消费

延迟消费是延迟队列最为常用的使用模式。如下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过RabbitMQ提供的TTL扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的DLX转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果。

延迟重试

延迟重试本质上也是延迟消费的一种,但是这种模式的结构与普通的延迟消费的流程图较为不同,所以单独拎出来介绍。

如下图所示,消费者发现该消息处理出现了异常,比如是因为网络波动引起的异常。那么如果不等待一段时间,直接就重试的话,很可能会导致在这期间内一直无法成功,造成一定的资源浪费。那么我们可以将其先放在缓冲队列中(图中红色队列),等消息经过一段的延迟时间后再次进入实际消费队列中(图中蓝色队列),此时由于已经过了“较长”的时间了,异常的一些波动通常已经恢复,这些消息可以被正常地消费。

代码实现

接下来我们将介绍如何在Spring Boot中实现基于RabbitMQ的延迟队列。我们假设读者已经拥有了Spring Boot与RabbitMQ的基本知识。

初始化工程

首先我们在Intellij中创建一个Spring Boot工程,并且添加spring-boot-starter-amqp扩展。

配置队列

从上述的流程图中我们可以看到,一个延迟队列的实现,需要一个缓冲队列以及一个实际的消费队列。又由于在RabbitMQ中,我们拥有两种消息过期的配置方式,所以在代码中,我们一共配置了三条队列:

  1. delay_queue_per_message_ttl:TTL配置在消息上的缓冲队列。
  2. delay_queue_per_queue_ttl:TTL配置在队列上的缓冲队列。
  3. delay_process_queue:实际消费队列。

我们通过Java Config的方式将上述的队列配置为Bean。由于我们添加了spring-boot-starter-amqp扩展,Spring Boot在启动时会根据我们的配置自动创建这些队列。为了方便接下来的测试,我们将delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的DLX配置为同一个,且过期的消息都会通过DLX转发到delay_process_queue。

delay_queue_per_message_ttl

首先介绍delay_queue_per_message_ttl的配置代码:

@Bean
Queue delayQueuePerMessageTTL() {
  return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME)
            .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX,dead letter发送到的exchange
            .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key
            .build();
}

其中,x-dead-letter-exchange声明了队列里的死信转发到的DLX名称,x-dead-letter-routing-key声明了这些死信在转发时携带的routing-key名称。

delay_queue_per_queue_ttl

类似地,delay_queue_per_queue_ttl的配置代码:

@Bean
Queue delayQueuePerQueueTTL() {
  return QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME)
            .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX
            .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key
            .withArgument("x-message-ttl", QUEUE_EXPIRATION) // 设置队列的过期时间
            .build();
}

delay_queue_per_queue_ttl队列的配置比delay_queue_per_message_ttl队列的配置多了一个x-message-ttl,该配置用来设置队列的过期时间。

delay_process_queue

delay_process_queue的配置最为简单:

@Bean
Queue delayProcessQueue() {
  return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME)
            .build();
}

配置Exchange

配置DLX

首先,我们需要配置DLX,代码如下:

@Bean
DirectExchange delayExchange() {
  return new DirectExchange(DELAY_EXCHANGE_NAME);
}

然后再将该DLX绑定到实际消费队列即delay_process_queue上。这样所有的死信都会通过DLX被转发到delay_process_queue:

@Bean
Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) {
  return BindingBuilder.bind(delayProcessQueue)
             .to(delayExchange)
             .with(DELAY_PROCESS_QUEUE_NAME);
}

配置延迟重试所需的Exchange

从延迟重试的流程图中我们可以看到,消息处理失败之后,我们需要将消息转发到缓冲队列,所以缓冲队列也需要绑定一个Exchange。在本例中,我们将delay_process_per_queue_ttl作为延迟重试里的缓冲队列。具体代码是如何配置的,这里就不赘述了,大家可以查阅我Github中的代码。

定义消费者

我们创建一个最简单的消费者ProcessReceiver,这个消费者监听delay_process_queue队列,对于接受到的消息,他会:

  1. 如果消息里的消息体不等于FAIL_MESSAGE,那么他会输出消息体。
  2. 如果消息里的消息体恰好是FAIL_MESSAGE,那么他会模拟抛出异常,然后将该消息重定向到缓冲队列(对应延迟重试场景)。

另外,我们还需要新建一个监听容器用于存放消费者,代码如下:

@Bean
SimpleMessageListenerContainer processContainer(ConnectionFactory connectionFactory, ProcessReceiver processReceiver) {
  SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  container.setConnectionFactory(connectionFactory);
  container.setQueueNames(DELAY_PROCESS_QUEUE_NAME); // 监听delay_process_queue
  container.setMessageListener(new MessageListenerAdapter(processReceiver));
  return container;
}

至此,我们前置的配置代码已经全部编写完成,接下来我们需要编写测试用例来测试我们的延迟队列。

编写测试用例

延迟消费场景

首先我们编写用于测试TTL设置在消息上的测试代码。

我们借助spring-rabbit包下提供的RabbitTemplate类来发送消息。由于我们添加了spring-boot-starter-amqp扩展,Spring Boot会在初始化时自动地将RabbitTemplate当成bean加载到容器中。

解决了消息的发送问题,那么又该如何为每个消息设置TTL呢?这里我们需要借助MessagePostProcessor。

MessagePostProcessor通常用来设置消息的Header以及消息的属性。我们新建一个ExpirationMessagePostProcessor类来负责设置消息的TTL属性:

/**
 * 设置消息的失效时间
 */
public class ExpirationMessagePostProcessor implements MessagePostProcessor {
  private final Long ttl; // 毫秒
  public ExpirationMessagePostProcessor(Long ttl) {
    this.ttl = ttl;
  }
  @Override
  public Message postProcessMessage(Message message) throws AmqpException {
    message.getMessageProperties()
        .setExpiration(ttl.toString()); // 设置per-message的失效时间
    return message;
  }
}

然后在调用RabbitTemplate的convertAndSend方法时,传入ExpirationMessagePostPorcessor即可。我们向缓冲队列中发送3条消息,过期时间依次为1秒,2秒和3秒。具体的代码如下所示:

@Test
public void testDelayQueuePerMessageTTL() throws InterruptedException {
  ProcessReceiver.latch = new CountDownLatch(3);
  for (int i = 1; i <= 3; i++) {
    long expiration = i * 1000;
    rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME,
        (Object) ("Message From delay_queue_per_message_ttl with expiration " + expiration), new ExpirationMessagePostProcessor(expiration));
  }
  ProcessReceiver.latch.await();
}

细心的朋友一定会问,为什么要在代码中加一个CountDownLatch呢?这是因为如果没有latch阻塞住测试方法的话,测试用例会直接结束,程序退出,我们就看不到消息被延迟消费的表现了。

那么类似地,测试TTL设置在队列上的代码如下:

@Test
public void testDelayQueuePerQueueTTL() throws InterruptedException {
  ProcessReceiver.latch = new CountDownLatch(3);
  for (int i = 1; i <= 3; i++) {
    rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_QUEUE_TTL_NAME,
        "Message From delay_queue_per_queue_ttl with expiration " + QueueConfig.QUEUE_EXPIRATION);
  }
  ProcessReceiver.latch.await();
}

我们向缓冲队列中发送3条消息。理论上这3条消息会在4秒后同时过期。

延迟重试场景

我们同样还需测试延迟重试场景。

@Test
public void testFailMessage() throws InterruptedException {
  ProcessReceiver.latch = new CountDownLatch(6);
  for (int i = 1; i <= 3; i++) {
    rabbitTemplate.convertAndSend(QueueConfig.DELAY_PROCESS_QUEUE_NAME, ProcessReceiver.FAIL_MESSAGE);
  }
  ProcessReceiver.latch.await();
}

我们向delay_process_queue发送3条会触发FAIL的消息,理论上这3条消息会在4秒后自动重试。

查看测试结果

延迟消费场景

延迟消费的场景测试我们分为了TTL设置在消息上和TTL设置在队列上两种。首先,我们先看一下TTL设置在消息上的测试结果:

从上图中我们可以看到,ProcessReceiver分别经过1秒、2秒、3秒收到消息。测试结果表明消息不仅被延迟消费了,而且每条消息的延迟时间是可以被个性化设置的。TTL设置在消息上的延迟消费场景测试成功。

然后,TTL设置在队列上的测试结果如下图:

从上图中我们可以看到,ProcessReceiver经过了4秒的延迟之后,同时收到了3条消息。测试结果表明消息不仅被延迟消费了,同时也证明了当TTL设置在队列上的时候,消息的过期时间是固定的。TTL设置在队列上的延迟消费场景测试成功。

延迟重试场景

接下来,我们再来看一下延迟重试的测试结果:

ProcessReceiver首先收到了3条会触发FAIL的消息,然后将其移动到缓冲队列之后,过了4秒,又收到了刚才的那3条消息。延迟重试场景测试成功。

总结

本文首先介绍了延迟队列的概念以及用途,并且通过代码详细讲解了如何通过Spring Boot和RabbitMQ实现一个延迟队列。希望本文能够对大家平时的学习和工作能有所启发和帮助。也希望大家多多支持我们。

(0)

相关推荐

  • spring boot整合RabbitMQ(Direct模式)

    springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp项目对消息各种支持. 1.新建一个Spring Boot工程,命名为:"rabbitmq-hello". 在pom.xml中引入如下依赖内容,其中spring-boot-starter-amqp用于支持RabbitMQ. <dependency> <groupId>org.springframework.boo

  • spring boot集成rabbitmq的实例教程

    一.RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache). 消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下: 从上图可看出,对于消息队列来说,生产者,消息队列,消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息

  • spring boot整合RabbitMQ实例详解(Fanout模式)

    1.Fanout Exchange介绍 Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略. 如上图所示,即当使用fanout交换器时,他会将消息广播到与该交换器绑定的所有队列上,这有利于你对单条消息做不同的反应. 例如存在以下场景:一个web服务要在用户完善信息时,获得积分奖励,这样你就可以创建两个对列,一个用来处理用户信息的请求,另一个对列获取这条消息是来完成积分奖励的任务. 2.代码示例 1).

  • 详解Spring Boot 配置多个RabbitMQ

    闲话 好久没有写博客了,6月份毕业,因为工作原因,公司上网受限,一直没能把学到的知识点写下来,工作了半年,其实学到的东西也不少,但是现在回忆起来的东西少之又少,有时甚至能在同个问题中踩了几次,越来越觉得及时记录一下学到的东西很重要. 好了,闲话少说,写下这段时间学习的东西,先记录一下用spring Boot配置多个RabbitMQ的情况... 最近公司新启动一个新平台的项目,需要用微服务这个这几年很火的概念来做,所以就学习了Spring Boot方面的知识,给同事展示Spring Boot的一些

  • Spring Boot整合RabbitMQ实例(Topic模式)

    1.Topic交换器介绍 Topic Exchange 转发消息主要是根据通配符. 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息. 在这种交换机模式下: 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等. 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*

  • 详解spring boot集成RabbitMQ

    RabbitMQ作为AMQP的代表性产品,在项目中大量使用.结合现在主流的spring boot,极大简化了开发过程中所涉及到的消息通信问题. 首先正确的安装RabbitMQ及运行正常. RabbitMQ需啊erlang环境,所以首先安装对应版本的erlang,可在RabbitMQ官网下载 # rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm 使用yum安装RabbitMQ,避免缺少依赖包引起的安装失败 # yum install rabbitmq-s

  • Spring Boot与RabbitMQ结合实现延迟队列的示例

    背景 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列.而一般的队列,消息一旦入队了之后就会被消费者马上消费. 场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理.这是就可以使用延时队列将订单信息发送到延时队列. 场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作.这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到只能设备. 延迟队列能做什么? 延迟队列多用于需

  • spring boot使用RabbitMQ实现topic 主题

    前一篇我们实现了消息系统的灵活配置.代替了使用扇形(fanout)交换器的配置.使用直连(direct)交换器,并且基于路由键后可以有选择性接收消息的能力. 虽然使用直连交换器可以改善我们的系统,但是它仍有局限性,它不能实现多重条件的路由. 在我们的消息系统中,我们不仅想要订阅基于路由键的队列,还想订阅基于生产消息的源.这些概念来自于Unix工具syslog.该日志基于严格的(info/warn/crit...) 和容易的(auth/cron/kern...)的路由方式.我们的例子比这个要简单.

  • Spring Boot集成RabbitMQ以及队列模式操作

    目录 前言 一.场景描述 二.准备工作 三.发布/订阅模式(Fanout) 生产者 消费者 四.Work模式 4.1 轮询模式 生产者 消费者 4.2 公平分发 生产者 消费者 生产者 消费者 五.路由模式(Direct) 六.主题模式(Topic) 小结 前言 本篇博客将会通过我们的实际场景来演示如何在Spring Boot中集成RabbitMQ以及如何对各种队列模式进行操作. 一.场景描述 我们通过模仿用户下订单时,订单系统分别通过短信,邮件或微信进行推送消息,如下图: 二.准备工作 (1)

  • Spring Boot整合RabbitMQ开发实战详解

    这篇文章主要讲基本的整合.先把代码跑起来,再说什么高级特性. RabbitMQ 中的一些术语 如果你打开 RabbitMQ web 控制台,你会发现其中有一个 Exhanges 不好理解.下面简单说明一下. 交换器(Exchange) 交换器就像路由器,我们先是把消息发到交换器,然后交换器再根据路由键(routingKey)把消息投递到对应的队列.(明白这个概念很重要,后面的代码里面充分体现了这一点) 队列(Queue) 队列很好理解,就不用解释了. 绑定(Binding) 交换器怎么知道把这条

  • springcloud安装rabbitmq并配置延迟队列插件的过程详解

    目录 0. 引言 1. docker安装 1.1 安装rabbitmq 1.2 安装延迟队列插件delayed_message_exchange 2. docker打包安装rabbitmq+延迟队列插件 0. 引言 本期主要讲解如何利用docker快速安装rabbitmq并且配置延迟队列插件 1. docker安装 1.1 安装rabbitmq 1.下载镜像 docker pull rabbitmq 2.安装镜像 docker run -d --hostname my-rabbit --name

  • spring boot整合log4j2及MQ消费处理系统日志示例

    目录 前言 1.添加相关jar依赖 2.系统log4j2.xml配置 3.添加处理日志的消息监听 前言 当系统的并发比较高的时候,日志的处理输出也是一种性能的开销负担,所以,选择一个中间件来处理消费日志必不可少! 下面是spring boot整合log4j2结合spring amqp来消费处理系统日志的实例,只需要简单的三步 1.添加相关jar依赖 <dependency> <groupId>org.springframework.boot</groupId> <

随机推荐