springcloud中RabbitMQ死信队列与延迟交换机实现方法

目录
  • 0.引言
  • 1. 死信队列
    • 1.2 什么是死信?
    • 1.3 什么是死信队列?
    • 1.4 创建死信交换机、死信队列
    • 1.5 实现死信消息
      • 1.5.1 基于消费者进行reject或nack实现死信消息
      • 1.5.2 基于生存时间实现
      • 1.5.3 基于队列max_length实现
    • 1.6 基于死信队列实现消息延迟发送
      • 基于死信队列实现消息延迟发送的问题
  • 2. 延迟交换机
  • 3. 应用场景
  • 4. 练习题

0.引言

死信队列是消息队列中非常重要的概念,同时我们需要业务场景中都需要延迟发送的概念,比如12306中的30分钟后未支付订单取消。那么本期,我们就来讲解死信队列,以及如何通过延迟交换机来实现延迟发送的需求。

1. 死信队列

1.2 什么是死信?

理解死信队列前,我们先讲解什么是死信,所谓死信就是没有被成功消费的消息,但并不是所有未成功消费的消息都是死信消息,死信消息的产生来源于以下三种途径: (1)消息被消费者拒绝,参数requeue设置为false的消息 (2)过期的消息,过期消息分为两种: a. 发送消息时,设置了某一条消息的生存时间(message TTL),如果生存时间到了,消息还没有被消费,就会被标注为死信消息 b. 设置了队列的消息生存时间,针对队列中所有的消息,如果生存时间到了,消息还没有被消费,就会被标注为死信消息 (3)当队列达到了最大长度后,再发送过来的消息就会直接变成死信消息

1.3 什么是死信队列?

直接来讲,用来盛装死信的队列就是死信队列,好像是一句废话,所以其重点在于理解死信的概念。

死信队列的作用: (1)队列在已满的情况下,会将消息发送到死信队列中,这样消息就不会丢失了,回头再从死信队列里将消息取出来进行消费即可 (2)可以基于死信队列实现延迟消费的效果。具体的实现我们后续讲解

1.4 创建死信交换机、死信队列

死信交换机、死信队列其实都是普通的交换机、队列,只是专门声明出来用于存储死信消息的。我们只需要通过deadLetterExchange方法来声明死信交换机,然后用deadLetterRoutingKey方法来声明死信队列

如下代码所示,我们创建了test.queuetest.exchangedead.queuedead.exchange,并且在test.queue中将死信交换机和死信路由指定到了测试队列中

注意:涉及到修改队列、交换机属性的,如果该队列、交换机已经存在需要将其删除后才能生效,否则可能还会报错。

@Configuration
public class RabbitMqConfig {

    private static final String TEST_EXCHANGE = "test.exchange";
    private static final String TEST_QUEUE = "test.queue";
    private static final String TEST_ROUTING_KEY = "test.routing.key";
    private static final String DEAD_EXCHANGE = "dead.exchange";
    private static final String DEAD_QUEUE = "dead.queue";
    private static final String DEAD_ROUTING_KEY = "dead.routing.key";

    @Bean
    public Queue deadQueue(){
        return new Queue(DEAD_QUEUE);
    }
    public DirectExchange deadExchange(){
    // 设置演示,使用了直接交换机Direct,大家可以根据自己的业务情况声明为其他类型的交换机
        return new DirectExchange(DEAD_EXCHANGE);
    public Binding deadBinding(Queue deadQueue,Exchange deadExchange){
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    public Queue testQueue(){
        return QueueBuilder.durable(TEST_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTING_KEY).build();
    public DirectExchange testExchange(){
        return new DirectExchange(TEST_EXCHANGE);
    public Binding testQueueBing(Queue testQueue, DirectExchange testExchange){
        return BindingBuilder.bind(testQueue).to(testExchange).with(TEST_ROUTING_KEY);
}

1.5 实现死信消息

1.5.1 基于消费者进行reject或nack实现死信消息

@Component
public class QueueListener {
    @RabbitListener(queues = RabbitMqConfig.TEST_QUEUE)
    public void handler(MyMessage messageInfo, Message message, Channel channel) {
        try{
            System.out.println("接收的消息:"+messageInfo.toString());
            // requeue参数设置为false 设置死信消息
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            // multiple和requeue设置为false 设置死信消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
            // 返回ack 确认接收到消息
//            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (IOException e){
            try {
                channel.basicRecover();
            } catch (IOException ex) {
                ex.printStackTrace();
                log.error("消息处理失败:{}",e.getMessage());
            }
        }
    }
}

1.5.2 基于生存时间实现

(1)发送消息时设置生存时间

    @GetMapping("sendTestQueueWithExpiration")
    public String sendTestQueueWithExpiration(){
        MyMessage message = new MyMessage(1L,"物流提醒","到达装货区域,注意上传凭证",new Date());
        rabbitTemplate.convertAndSend(RabbitMqConfig.TEST_EXCHANGE,RabbitMqConfig.TEST_ROUTING_KEY, message,msg -> {
            msg.getMessageProperties().setExpiration("5000");
            return msg;
        });
        return "发送成功";
    }

(2)队列设置生存时间

    @Bean
    public Queue testQueue(){
        return QueueBuilder.durable(TEST_QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                // 10s 过期
                .ttl(10000)
                .build();
    }

1.5.3 基于队列max_length实现

    @Bean
    public Queue testQueue(){
        return QueueBuilder.durable(TEST_QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey(DEAD_ROUTING_KEY)
                // 容量最大100条
                .maxLength(100)
                .build();
    }

1.6 基于死信队列实现消息延迟发送

上述我们说过死信队列还可以消息延迟发送,其思路就是: (1)消息发送时设置消息的生存时间,其生存时间就是我们想要延迟的时间 (2)消息者监控死信队列进行消费

正常队列的消息因为没有消费者消费,同时又指定了生存时间,到达时间后消息转发到死信队列中,消费者监听了死信队列从而将其消费掉。

基于死信队列实现消息延迟发送的问题

如果有两个消息,一个是5s生存时间,一个是10s生存时间,当我们先发送了10s生存时间的消息到queue中时,因为rabbitmq只会监控队列最外侧的消息的生存时间,也就是监控10s生存时间的消息,而5s生存时间的消息只会在最外侧的10s消息到期后才会监控,也就导致我实际需要5s生存的消息,实际需要10s才监听到了。

所以呢,基于死信队列实现的延迟消息,只使用于延迟时间一致的消息。

为了适配更多的延迟场景,已经更加简单的实现延迟消息,我们引入了延迟交换机

2. 延迟交换机

延迟交换机并不是rabbitmq自带的功能,而是要通过安装延迟交换机插件delayed_message_exchange来实现

其插件的安装我们之间已经讲解过,不再累叙,可以参考如下博文 springcloud:安装rabbitmq并配置延迟队列插件

通过延迟交换机实现的延迟消息,其重点主要在交换机上,队列就是普通队列,消息发送到交换机上后,会记录消息的延迟时间,到达时间后才会发送到队列中,这样消费者通过监控队列,就能在指定时间获取到消息

因此延迟交换机与普通交换机的实现,只在创建交换机时,其他的操作与普通交换机无异,因此使用起来也很方便

创建延迟交换机,通过x-delayed-type属性声明交换机类型,可以是direct也可以是topic,具体支持4中交换机类型,如果不清楚的可以参考之前的博文

@Configuration
public class RabbitMqDelayConfig {
    public static final String DELAY_EXCHANGE = "delay.exchange";
    public static final String DELAY_QUEUE = "delay.queue";
    public static final String DELAY_ROUTING_KEY = "delay.routing.key";
    @Bean
    public Exchange delayExchange(){
        Map<String, Object> arguments = new HashMap<>(1);
        arguments.put("x-delayed-type","direct");
        return new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,arguments);
    }
    @Bean
    public Queue delayQueue(){
        return new Queue(DELAY_QUEUE);
    }
    @Bean
    public Binding delayBinding(Queue delayQueue, Exchange delayExchange){
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
    }

}

发送消息时指定延迟时间,单位毫秒

rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(30000);
                return message;
            }
        });

我们还可以将该方法封装为工具类方法,方便之后调用

/**
	 * 发送 延迟队列
	 * @param exchange 交换机
	 * @param routeKey 路由
	 * @param message 消息
	 * @param delaySecond 延迟秒数
	 */
	public void send(String exchange, String routeKey, Object message, int delaySecond){
		rabbitTemplate.convertAndSend(exchange,routeKey,message,msg -> {
			// 消息持久化
			msg.getMessageProperties().setDelay(delaySecond * 1000);
			return msg;
		});
	}

3. 应用场景

延迟消息的应用场景丰富,除了我们开篇所说的30分钟未支付自动取消订单,还比如到货后72小时未签收自动签收

基本上所有需要延迟触发的业务场景都可以用rabbitmq延迟队列来实现。

4. 练习题

对于刚接触rabbitmq的同学,这里我提供一个练习题给大家,也让大家在实操中加强对于rabbitmq的理解:

需求:订单到货后72小时未签收,自动签收 讲解:我们这里要实现订单到货后的自动签收功能,订单到货后会触发发送自动签收消息的方法,订单已签收的状态status为2,到货状态为1,如果72小时前已经签收了即status被更新为2了,那么需要取消自动签收(不执行自动签收,即忽略自动签收消息)

到此这篇关于springcloud:RabbitMQ死信队列与延迟交换机实现的文章就介绍到这了,更多相关springcloud RabbitMQ死信队列与延迟交换机内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • SpringCloud Stream 整合RabbitMQ的基本步骤

    目录 一.项目介绍 二.生产者 三.消费者 四.验证 在postman 访问生产者接口: 本篇简单介绍SpringCloud Stream 整合RabbitMQ基本步骤: 引入SpringCloud 引入SpringCloud Stream相关依赖 定义绑定接口: 消息生产者(Output…Binding) .消息消费者(Input…Binding) @EnableBinding 在对应类上进行定义 @StreamListener 在对应方法上创建监听用来消费消息 调用output的send()

  • Springcloud整合stream,rabbitmq实现消息驱动功能

    springcloud整合stream,rabbitmq实现消息驱动功能 1.代码实现: 创建项目stream 添加依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.2</version> <relativePath/>

  • springcloud安装rabbitmq并配置延迟队列插件的过程详解

    目录 0. 引言 1. docker安装 1.1 安装rabbitmq 1.2 安装延迟队列插件delayed_message_exchange 2. docker打包安装rabbitmq+延迟队列插件 0. 引言 本期主要讲解如何利用docker快速安装rabbitmq并且配置延迟队列插件 1. docker安装 1.1 安装rabbitmq 1.下载镜像 docker pull rabbitmq 2.安装镜像 docker run -d --hostname my-rabbit --name

  • springcloud中RabbitMQ死信队列与延迟交换机实现方法

    目录 0.引言 1. 死信队列 1.2 什么是死信? 1.3 什么是死信队列? 1.4 创建死信交换机.死信队列 1.5 实现死信消息 1.5.1 基于消费者进行reject或nack实现死信消息 1.5.2 基于生存时间实现 1.5.3 基于队列max_length实现 1.6 基于死信队列实现消息延迟发送 基于死信队列实现消息延迟发送的问题 2. 延迟交换机 3. 应用场景 4. 练习题 0.引言 死信队列是消息队列中非常重要的概念,同时我们需要业务场景中都需要延迟发送的概念,比如12306

  • 详解RabbitMQ中死信队列和延迟队列的使用详解

    目录 简介 死信队列 简介 示例 延迟队列 简介 使用场景 简介 本文介绍RabbitMQ的死信队列和延迟队列. 本内容也是Java后端面试中常见的问题. 死信队列 简介 DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列. 以下几种情况会导致消息变成死信: 消息被拒绝(Basic.Reject/Ba

  • GoLang RabbitMQ TTL与死信队列以及延迟队列详细讲解

    目录 TTL 死信队列 延迟队列 Go实现延迟队列 TTL TTL 全称 Time To Live(存活时间/过期时间).当消息到达存活时间后,还没有被消费,就会被自动清除.RabbitMQ可以设置两种过期时间: 对消息设置过期时间. 对整个队列(Queue)设置过期时间. 如何设置 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期. 设置消息过期时间使用参数:expiration,单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这

  • RabbitMQ死信机制实现延迟队列的实战

    目录 延迟队列 应用场景 Time To Live(TTL) Dead Letter Exchanges(DLX) 延迟队列 延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 应用场景 三方支付,扫码支付调用上游的扫码接口,当扫码有效期过后去调用查询接口查询结果.实现方式:每当一笔扫码支付请求后,立即将此订单号放入延迟队列中(RabbitMQ),队列过期时间为二维码有效期,此队列

  • SpringBoot整合RabbitMQ处理死信队列和延迟队列

    目录 简介 实例代码 路由配置 控制器 发送器 接收器 application.yml 实例测试 简介 说明 本文用示例介绍SpringBoot整合RabbitMQ时如何处理死信队列/延迟队列. RabbitMQ消息简介 RabbitMQ的消息默认不会超时. 什么是死信队列?什么是延迟队列? 死信队列: DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,

  • 运用.NetCore实例讲解RabbitMQ死信队列,延时队列

    目录 一.死信队列 二.延时队列 三.延时消息设置不同过期时间 四.延时消息用延时插件的方式实现 一.死信队列 描述:Q1队列绑定了x-dead-letter-exchange(死信交换机)为X2,x-dead-letter-routing-key(死信路由key)指向Q2(队列2) P(生产者)发送消息经X1(交换机1)路由到Q1(队列1),Q1的消息触发特定情况,自动把消息经X2(交换机2)路由到Q2(队列2),C(消费者)直接消息Q2的消息. 特定情况有哪些呢: 1.消息被拒(basic.

  • Springboot集成RabbitMQ死信队列的实现

    目录 关于死信队列 什么样的消息会进入死信队列? 场景分析 代码实现 场景模拟 生产者 消费者,设置死信队列监听 关于死信队列 在大多数的MQ中间件中,都有死信队列的概念.死信队列同其他的队列一样都是普通的队列.在RabbitMQ中并没有特定的"死信队列"类型,而是通过配置,将其实现. 当我们在创建一个业务的交换机和队列的时候,可以配置参数,指明另一个队列为当前队列的死信队列,在RabbitMQ中,死信队列(严格的说应该是死信交换机)被称为DLX Exchange.当消息"死

  • iOS应用程序中通过dispatch队列控制线程执行的方法

    GCD编程的核心就是dispatch队列,dispatch block的执行最终都会放进某个队列中去进行,它类似NSOperationQueue但更复杂也更强大,并且可以嵌套使用.所以说,结合block实现的GCD,把函数闭包(Closure)的特性发挥得淋漓尽致. dispatch队列的生成可以有这几种方式: 1. dispatch_queue_t queue = dispatch_queue_create("com.dispatch.serial", DISPATCH_QUEUE_

  • SpringBoot+RabbitMQ 实现死信队列的示例

    前言 死信:无法被消费的消息,称为死信. 如果死信一直留在队列中,会导致一直被消费,却从不消费成功. 所以我们专门开辟了一个来存放死信的队列,叫死信队列(DLX,dead-letter-exchange). 死信的几种来源: 消息 TTL 过期(time to live,存活时间,可以用在限时支付消息) 队列达到最大长度(队列满了,无法路由到该队列) 消息被拒绝( basic.reject / basic.nack ),并且 requeue = false 环境准备配置 准备 MQ 的队列和环境

  • Java RabbitMQ消息队列详解常见问题

    目录 消息堆积 保证消息不丢失 死信队列 延迟队列 RabbitMQ消息幂等问题 RabbitMQ消息自动重试机制 合理的选择重试机制 消费者开启手动ack模式 rabbitMQ如何解决消息幂等问题 RabbitMQ解决分布式事务问题 基于RabbitMQ解决分布式事务的思路 消息堆积 消息堆积的产生场景: 生产者产生的消息速度大于消费者消费的速度.解决:增加消费者的数量或速度. 没有消费者进行消费的时候.解决:死信队列.设置消息有效期.相当于对我们的消息设置有效期,在规定的时间内如果没有消费的

随机推荐