如何利用rabbitMq的死信队列实现延时消息

目录
  • 前言
  • mq基本的消息模型
  • mq死信队列的消息模型
    • maven依赖
    • 配置普通队列和死信队列
    • 死信队列消费者
    • 发送消息测试
  • 测试成功
  • 总结

前言

使用mq自带的死信去实现延时消息要注意一个坑点,就是mq只会检测队首的消息的过期时间,假设先放入队列10s过期消息,再放入2s过期。

mq会检测头部10s是否过期,10s不过期的情况下,2s就算过去也不会跑到死信.

mq基本的消息模型

mq死信队列的消息模型

简单的说就是先弄一个正常队列,然后不要设置消费者,接着给这个正常队列绑定一个死信队列,这个死信队列设置方式和正常队列没啥区别。

然后监听这个死信队列的消费.

一般死信队列由三大核心组件组成:死信交换机+死信路由+TTL(消息过期时间)

通常死信队列由“面向生产者的基本交换机+基本路由”绑定而成,所以生产者首先是将消息发送至“基本交换机+基本路由”所绑定而成的消息模型中,即间接性地进入到死信队列中,当过了TTL,消息将“挂掉”,从而进入下一个中转站,即“面下那个消费者的死信交换机+死信路由”所绑定而成的消息模型中.

maven依赖

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
 </dependency>

配置普通队列和死信队列

yml文件

spring:
  application:
    name: ttl-queue
  rabbitmq:
    host: 110.40.181.73
    port: 35672
    username: root
    password: 10086
    virtual-host: /fchan
    connection-timeout: 15000
    # 发送确认
    #publisher-confirms: true
    # 路由失败回调
    publisher-returns: true
    template:
      # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
      mandatory: true
    listener:
      simple:
        # 每次从RabbitMQ获取的消息数量
        prefetch: 1
        default-requeue-rejected: false
        # 每个队列启动的消费者数量
        concurrency: 1
        # 每个队列最大的消费者数量
        max-concurrency: 1
        # 签收模式为手动签收-那么需要在代码中手动ACK
        acknowledge-mode: manual
package com.fchan.mq.mqDelay.dlx;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyRabConfig {

    //定义普通任务队列(其实就是一个普通队列,只是没有消费者)
    @Bean("delayQue")
    public Queue delayQue(){
        //普通队列绑定死信交换机及路由key

        //delayQueue延时队列
        return QueueBuilder.durable("delayQueue")
                //指定这个延时队列下的死信交换机
                //如果消息过时则会被投递到当前对应的my-dlx-exchange
                .withArgument("x-dead-letter-exchange", "my-dlx-exchange")
                //死信交换机绑定的路由key
                //如果消息过时,my-dlx-exchange会根据routing-key-delay投递消息到对应的队列
                //mq发送消息的时候一般指定的是exchange交换机+这个routingKey来找到队列,这个应该是出于有时候需要让mq的交换机将一个消息发送到多个队列所采用的方式
                .withArgument("x-dead-letter-routing-key", "routing-key-delay")
                .build();
    }

    //定义普通队列的交换机
    @Bean("delayExchange")
    public Exchange delayExchange(){
        return ExchangeBuilder.directExchange("delayExchange").build();
    }

    //绑定普通队列的交换机
    @Bean("delayBinding")
    public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("delayBindingRoutingKey").noargs();
    }

    //定义死信队列
    @Bean("dlxQueue")
    public Queue dlxQueue(){
        return QueueBuilder.durable("my-dlx-queue").build();
    }

    //定义死信交换机,这里的死信交换机要和上面普通队列所绑定的交换机名称一致
    @Bean("dlxExchange")
    public Exchange dlxExchange(){
        return ExchangeBuilder.directExchange("my-dlx-exchange").build();
    }

    //绑定交换机与死信队列
    @Bean("dlxBinding")
    public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange, @Qualifier("dlxQueue") Queue queue){
        //这儿这个routingKey要和上面正常队列中所绑定的死信routingKey一致
        return BindingBuilder.bind(queue).to(exchange).with("routing-key-delay").noargs();
    }

}

死信队列消费者

package com.fchan.mq.mqDelay;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class MyRabbitConsume {
    Logger log = LoggerFactory.getLogger(MyRabbitConsume.class);

    @RabbitListener(queues = {"my-dlx-queue"})
    public void receiveDeadMessage(Map<String,Object> map, Message message, Channel channel) throws Exception {
        log.info("收到死信信息:{}",map);
        log.info("然后进行一系列逻辑处理 Thanks♪(・ω・)ノ");
		//手动确认消息
		channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

发送消息测试

/**
 * 发送消息到延时队列 delayQueue
 * 消息5秒后会被投递到死信队列
 * @return
 */
@GetMapping("sendMsgToDelay")
public String sendMsgToDelay(String msg){
    Map<String,Object> map = new HashMap<>();
    map.put("msg", msg);
    rabbitTemplate.convertAndSend("delayExchange","delayBindingRoutingKey", map, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            //设置消息过期时间,5秒过期
            message.getMessageProperties().setExpiration("5000");
            return message;
        }
    });
    return msg;
}

测试成功

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • SpringBoot使用RabbitMQ延时队列(小白必备)

    1.什么是MQ MQ,是一种跨进程的通信机制,用于上下游传递消息. 在互联网架构中,MQ是一种非常常见的上下游"逻辑解耦+物理解耦"的消息通信服务. 使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务. 为什么会产生消息列队? 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个: 不

  • 详解RabbitMQ中死信队列和延迟队列的使用详解

    目录 简介 死信队列 简介 示例 延迟队列 简介 使用场景 简介 本文介绍RabbitMQ的死信队列和延迟队列. 本内容也是Java后端面试中常见的问题. 死信队列 简介 DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列. 以下几种情况会导致消息变成死信: 消息被拒绝(Basic.Reject/Ba

  • SpringBoot整合RabbitMQ处理死信队列和延迟队列

    目录 简介 实例代码 路由配置 控制器 发送器 接收器 application.yml 实例测试 简介 说明 本文用示例介绍SpringBoot整合RabbitMQ时如何处理死信队列/延迟队列. RabbitMQ消息简介 RabbitMQ的消息默认不会超时. 什么是死信队列?什么是延迟队列? 死信队列: DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,

  • 如何利用rabbitMq的死信队列实现延时消息

    目录 前言 mq基本的消息模型 mq死信队列的消息模型 maven依赖 配置普通队列和死信队列 死信队列消费者 发送消息测试 测试成功 总结 前言 使用mq自带的死信去实现延时消息要注意一个坑点,就是mq只会检测队首的消息的过期时间,假设先放入队列10s过期消息,再放入2s过期. mq会检测头部10s是否过期,10s不过期的情况下,2s就算过去也不会跑到死信. mq基本的消息模型 mq死信队列的消息模型 简单的说就是先弄一个正常队列,然后不要设置消费者,接着给这个正常队列绑定一个死信队列,这个死

  • SpringBoot+RabbitMQ 实现死信队列的示例

    前言 死信:无法被消费的消息,称为死信. 如果死信一直留在队列中,会导致一直被消费,却从不消费成功. 所以我们专门开辟了一个来存放死信的队列,叫死信队列(DLX,dead-letter-exchange). 死信的几种来源: 消息 TTL 过期(time to live,存活时间,可以用在限时支付消息) 队列达到最大长度(队列满了,无法路由到该队列) 消息被拒绝( basic.reject / basic.nack ),并且 requeue = false 环境准备配置 准备 MQ 的队列和环境

  • Springboot集成RabbitMQ死信队列的实现

    目录 关于死信队列 什么样的消息会进入死信队列? 场景分析 代码实现 场景模拟 生产者 消费者,设置死信队列监听 关于死信队列 在大多数的MQ中间件中,都有死信队列的概念.死信队列同其他的队列一样都是普通的队列.在RabbitMQ中并没有特定的"死信队列"类型,而是通过配置,将其实现. 当我们在创建一个业务的交换机和队列的时候,可以配置参数,指明另一个队列为当前队列的死信队列,在RabbitMQ中,死信队列(严格的说应该是死信交换机)被称为DLX Exchange.当消息"死

  • springcloud中RabbitMQ死信队列与延迟交换机实现方法

    目录 0.引言 1. 死信队列 1.2 什么是死信? 1.3 什么是死信队列? 1.4 创建死信交换机.死信队列 1.5 实现死信消息 1.5.1 基于消费者进行reject或nack实现死信消息 1.5.2 基于生存时间实现 1.5.3 基于队列max_length实现 1.6 基于死信队列实现消息延迟发送 基于死信队列实现消息延迟发送的问题 2. 延迟交换机 3. 应用场景 4. 练习题 0.引言 死信队列是消息队列中非常重要的概念,同时我们需要业务场景中都需要延迟发送的概念,比如12306

  • RocketMq 消息重试机制及死信队列详解

    目录 生产者消息重试 消费者消息重试 并发消费 顺序消费 并发消费和顺序消费区别 死信队列 实践出真知 公共部分创建 测试并发消费 并发消费状态 测试顺序消费 顺序消费状态 测试死信队列 死信队列特性 生产者消息重试 消息队列中的消息消费时并不能保证总是成功的,那失败的消息该怎么进行消息补偿呢?这就用到今天的主角消息重试和死信队列了. 有时因为网路等原因生产者也可能发送消息失败,也会进行消息重试,生产者消息重试比较简单,在springboot中只要在配置文件中配置一下就可以了. # 异步消息发送

  • 如何基于sqlite实现kafka延时消息详解

    目录 1.需求 2.实现思路 2.1 整体实现思路 2.2 程序业务逻辑 2.3 实现细节 2.4 依赖框架 3.性能测试 4.部署 4.1 系统环境依赖 4.2 安装 4.3 程序迁移 4.4 排查日志 5.注意事项 6.闲聊 总结 1.需求 延时消息(或者说定时消息)是业务系统里一个常见的功能点.常用业务场景如: 1) 订单超时取消 2) 离线超过指定时间的用户,召回通知 3) 手机消失多久后通知监护人…… 现流行的实现方案主要有: 1)数据库定时轮询,扫描到达到延时时间的记录,业务处理,删

  • 运用.NetCore实例讲解RabbitMQ死信队列,延时队列

    目录 一.死信队列 二.延时队列 三.延时消息设置不同过期时间 四.延时消息用延时插件的方式实现 一.死信队列 描述:Q1队列绑定了x-dead-letter-exchange(死信交换机)为X2,x-dead-letter-routing-key(死信路由key)指向Q2(队列2) P(生产者)发送消息经X1(交换机1)路由到Q1(队列1),Q1的消息触发特定情况,自动把消息经X2(交换机2)路由到Q2(队列2),C(消费者)直接消息Q2的消息. 特定情况有哪些呢: 1.消息被拒(basic.

  • GoLang RabbitMQ TTL与死信队列以及延迟队列详细讲解

    目录 TTL 死信队列 延迟队列 Go实现延迟队列 TTL TTL 全称 Time To Live(存活时间/过期时间).当消息到达存活时间后,还没有被消费,就会被自动清除.RabbitMQ可以设置两种过期时间: 对消息设置过期时间. 对整个队列(Queue)设置过期时间. 如何设置 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期. 设置消息过期时间使用参数:expiration,单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这

随机推荐