Springboot死信队列 DLX 配置和使用思路分析

目录
  • 前言
  • 什么是死信
  • 配置和测试死信
    • 思路分析
    • 配置类编写
    • 编写消息发送服务
    • 测试
  • 消息什么时候会成为死信消息?
  • 总结
  • 参考资料
  • 代码下载

前言

上一篇博客Springboot——整合RabbitMq测试TTL中,针对设置单个消息期限或者整个队列消息期限,进行了一些配置和说明。同时也都列举了一些区别关系。

但考虑过一个问题了没有?

不管是设置哪种方式,如果消息期限到了,队列都会将该消息进行丢弃处理。
这么做合适么?

假设是某个设备的重要信息,或者某个重要的订单信息,因为规定时间内未被及时消费就将其舍弃,是否会造成很严重的后果?

有人会说,设置消息永不过期!等着消费者能够成功监听到该队列,将消息消费不就可以了嘛!

但这里需要考虑另外一个问题:
每个服务器的容量是有上限的!如果消息一直存在队列,如果一直不会被消费,岂不是很占用服务器资源?

如何解决这个问题,就是今天这篇文章需要说到的死信队列

什么是死信

说道死信,可能大部分观众大姥爷会有懵逼的想法,什么是死信?

死信队列,俗称DLX,翻译过来的名称为Dead Letter Exchange 死信交换机

当消息限定时间内未被消费,成为 Dead Message后,可以被重新发送另一个交换机中,发挥其应有的价值!

配置和测试死信

思路分析

需要测试死信队列,则需要先梳理整体的思路,如可以采取如下方式进行配置:

从上面的逻辑图中,可以发现大致的思路:

1、消息队列分为正常交换机正常消息队列;以及死信交换机死信队列

2、正常队列针对死信信息,需要将数据 重新 发送至死信交换机中。

配置类编写

结合上面的思路,编写具体的配置类。如下所示:

package cn.linkpower.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * 死信队列配置
 */
@Configuration
public class DeadMsgMqConfig {
    // 定义正常交换机和正常队列信息(交换机名、队列名、路由key)
    public static final String queue_name = "xj_natural_queue";
    public static final String exchange_name = "xj_natural_exchange";
    public static final String routing_key = "xj_natural_routingKey";
    // 定义死信交换机名、死信队列名、路由key
    public static final String queue_name_dead = "xj_dead_queue";
    public static final String exchange_name_dead = "xj_dead_exchange";
    public static final String routing_key_dead = "xj_dead_routingKey";
    /**
     * 设置正常的消息队列;
     * 正常的消息队列具备以下几种功能:
     * 1、消息正常消费,需要绑定对应的消费者(这里为了测试死信,不创建消费者)
     * 2、当消息失效后,需要将指定的消息发送至 死信交换机 中
     * @return
     */
    @Bean(value = "getNaturalQueue")
    public Queue getNaturalQueue(){
        return QueueBuilder.durable(queue_name)
                // 正常的队列,在消息失效后,需要将消息丢入 死信 交换机中
                // 这里只需要针对名称进行绑定
                .withArgument("x-dead-letter-exchange",exchange_name_dead)
                // 丢入 死信交换机,需要设定指定的 routingkey
                .withArgument("x-dead-letter-routing-key",routing_key_dead)
                // 设置正常队列中消息的存活时间为 10s,当然也可以针对单个消息进行设定不同的过期时间
                .withArgument("x-message-ttl",10000)
                // 设定当前队列中,允许存放的最大消息数目
                .withArgument("x-max-length",10)
                .build();
    }
     * 设定正常的消息交换机
    @Bean(value = "getNaturalExchange")
    public Exchange getNaturalExchange(){
        // 这里为了测试,采取 direct exchange
        return ExchangeBuilder.directExchange(exchange_name)
                .durable(true) // 设定持久化
     * 将正常的消息交换机和正常的消息队列进行绑定
     * @param queue
     * @param directExchange
    @Bean
    public Binding bindNaturalExchangeAndQueue(
            @Qualifier(value = "getNaturalQueue") Queue queue,
            @Qualifier(value = "getNaturalExchange") Exchange directExchange
    ){
        return BindingBuilder
                // 绑定消息队列
                .bind(queue)
                // 至指定的消息交换机
                .to(directExchange)
                // 匹配 routingkey
                .with(routing_key)
                // 无参数,不加会报错提示
                .noargs();
     * 定义死信队列
    @Bean(value = "getDealQueue")
    public Queue getDealQueue(){
        return QueueBuilder.durable(queue_name_dead).build();
     * 定义死信交换机
    @Bean(value = "getDeadExchange")
    public Exchange getDeadExchange(){
        return ExchangeBuilder.directExchange(exchange_name_dead).durable(true).build();
     * 将死信交换机和死信队列进行绑定
     * @param deadQueue
     * @param directDeadExchange
    public Binding bindDeadExchangeAndQueue(
            @Qualifier(value = "getDealQueue") Queue deadQueue,
            @Qualifier(value = "getDeadExchange") Exchange directDeadExchange
        return BindingBuilder.bind(deadQueue).to(directDeadExchange).with(routing_key_dead).noargs();
}

编写消息发送服务

默认采取rabbitTemplate.convertAndSend方法,进行消息的发送处理。但为了保证消息生产者能够成功将数据发送至正常交换机,同时为了保证正常交换机能够将数据信息,推送至正常消息队列。需要对其增加监听。

package cn.linkpower.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 直接发送消息
     * @param exchange
     * @param routingKey
     * @param msg
     */
    public void sendMessage(String exchange,String routingKey,Object msg) {
        // 设置交换机处理失败消息的模式     true 表示消息由交换机 到达不了队列时,会将消息重新返回给生产者
        // 如果不设置这个指令,则交换机向队列推送消息失败后,不会触发 setReturnCallback
        rabbitTemplate.setMandatory(true);
        //消息消费者确认收到消息后,手动ack回执
        rabbitTemplate.setConfirmCallback(this);
        // return 配置
        rabbitTemplate.setReturnCallback(this);
        //发送消息
        rabbitTemplate.convertAndSend(exchange,routingKey,msg);
    }
     * 交换机并未将数据丢入指定的队列中时,触发
     *  channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes());
     *  参数三:true  表示如果消息无法正常投递,则return给生产者 ;false 表示直接丢弃
     * @param message   消息对象
     * @param replyCode 错误码
     * @param replyText 错误信息
     * @param exchange 交换机
     * @param routingKey 路由键
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" ");
     * 消息生产者发送消息至交换机时触发,用于判断交换机是否成功收到消息
     * @param correlationData  相关配置信息
     * @param ack exchange 交换机,判断交换机是否成功收到消息    true 表示交换机收到
     * @param cause  失败原因
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("---- confirm ----ack="+ack+"  cause="+String.valueOf(cause));
        log.info("correlationData -->"+correlationData.toString());
        if(ack){
            // 交换机接收到
            log.info("---- confirm ----ack==true  cause="+cause);
        }else{
            // 没有接收到
            log.info("---- confirm ----ack==false  cause="+cause);
        }
}

测试

既然说到测试,那么需要编写一个测试类,能够将产生的消息,推送至指定的正常消息交换机中去。

package cn.linkpower.controller;

import cn.linkpower.config.DeadMsgMqConfig;
import cn.linkpower.service.RabbitmqService;
import lombok.extern.slf4j.Slf4j;
import org.apache.tomcat.jni.Time;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
@Slf4j
@RestController
public class DeadMsgController {
    @Autowired
    private RabbitmqService rabbitmqService;
    @RequestMapping("/deadMsgTest")
    public String deadMsgTest() throws InterruptedException {
        // 向正常的消息队列中丢数据,测试限定时间未消费后,死信队列的情况
        // 配置文件中,针对于正常队列而言,设置有10条上限大小
        for (int i = 0; i < 20; i++) {
            String msg = "dead msg test "+i;
            log.info("发送消息,消息信息为:{}",msg);
            // 向正常的消息交换机中传递数据
            rabbitmqService.sendMessage(DeadMsgMqConfig.exchange_name,DeadMsgMqConfig.routing_key,msg);
            TimeUnit.SECONDS.sleep(2);
        }
        return "ok";
    }
}

启动项目,访问指定的链接,进行数据产生和将消息发送交换机操作:

http://localhost/deadMsgTest

控制台部分日志展示:

消息什么时候会成为死信消息?

1、队列消息长度到达限制;

2、消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false

channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

3、原队列存在消息过期设置,消息到达超时时间未被消费;

总结

此处只是为了进行配置和测试需要,暂未定义任何正常消息队列消费者死信消息队列消费者信息。

1、死信交换机和死信队列和普通的没有区别

2、当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

参考资料

RabbitMQ死信队列在SpringBoot中的使用

代码下载

gitee 代码下载

到此这篇关于Springboot死信队列 DLX 配置和使用的文章就介绍到这了,更多相关Springboot死信队列 DLX内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Springboot集成RabbitMQ死信队列的实现

    目录 关于死信队列 什么样的消息会进入死信队列? 场景分析 代码实现 场景模拟 生产者 消费者,设置死信队列监听 关于死信队列 在大多数的MQ中间件中,都有死信队列的概念.死信队列同其他的队列一样都是普通的队列.在RabbitMQ中并没有特定的"死信队列"类型,而是通过配置,将其实现. 当我们在创建一个业务的交换机和队列的时候,可以配置参数,指明另一个队列为当前队列的死信队列,在RabbitMQ中,死信队列(严格的说应该是死信交换机)被称为DLX Exchange.当消息"死

  • SpringBoot集成RabbitMQ的方法(死信队列)

    介绍 死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因: 1.有消息被拒绝(basic.reject/ basic.nack)并且requeue=false 2.队列达到最大长度 3.消息TTL过期 场景 1.小时进入初始队列,等待30分钟后进入5分钟队列 2.消息等待5分钟后进入执行队列 3.执行失败后重新回到5分钟队列 4.失败5次后,消息进入2小时队列 5.消息等待2小时进入执行队列 6.失败5次后,将消息丢弃或做其他处理 使用 安装MQ 使用docker方式安装

  • Springboot死信队列 DLX 配置和使用思路分析

    目录 前言 什么是死信 配置和测试死信 思路分析 配置类编写 编写消息发送服务 测试 消息什么时候会成为死信消息? 总结 参考资料 代码下载 前言 上一篇博客Springboot——整合RabbitMq测试TTL中,针对设置单个消息期限或者整个队列消息期限,进行了一些配置和说明.同时也都列举了一些区别关系. 但考虑过一个问题了没有? 不管是设置哪种方式,如果消息期限到了,队列都会将该消息进行丢弃处理.这么做合适么? 假设是某个设备的重要信息,或者某个重要的订单信息,因为规定时间内未被及时消费就将

  • SpringBoot+RabbitMQ 实现死信队列的示例

    前言 死信:无法被消费的消息,称为死信. 如果死信一直留在队列中,会导致一直被消费,却从不消费成功. 所以我们专门开辟了一个来存放死信的队列,叫死信队列(DLX,dead-letter-exchange). 死信的几种来源: 消息 TTL 过期(time to live,存活时间,可以用在限时支付消息) 队列达到最大长度(队列满了,无法路由到该队列) 消息被拒绝( basic.reject / basic.nack ),并且 requeue = false 环境准备配置 准备 MQ 的队列和环境

  • 详解RabbitMQ中死信队列和延迟队列的使用详解

    目录 简介 死信队列 简介 示例 延迟队列 简介 使用场景 简介 本文介绍RabbitMQ的死信队列和延迟队列. 本内容也是Java后端面试中常见的问题. 死信队列 简介 DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列. 以下几种情况会导致消息变成死信: 消息被拒绝(Basic.Reject/Ba

  • 运用.NetCore实例讲解RabbitMQ死信队列,延时队列

    目录 一.死信队列 二.延时队列 三.延时消息设置不同过期时间 四.延时消息用延时插件的方式实现 一.死信队列 描述:Q1队列绑定了x-dead-letter-exchange(死信交换机)为X2,x-dead-letter-routing-key(死信路由key)指向Q2(队列2) P(生产者)发送消息经X1(交换机1)路由到Q1(队列1),Q1的消息触发特定情况,自动把消息经X2(交换机2)路由到Q2(队列2),C(消费者)直接消息Q2的消息. 特定情况有哪些呢: 1.消息被拒(basic.

  • SpringBoot整合RabbitMQ处理死信队列和延迟队列

    目录 简介 实例代码 路由配置 控制器 发送器 接收器 application.yml 实例测试 简介 说明 本文用示例介绍SpringBoot整合RabbitMQ时如何处理死信队列/延迟队列. RabbitMQ消息简介 RabbitMQ的消息默认不会超时. 什么是死信队列?什么是延迟队列? 死信队列: DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,

  • Spring Boot系列教程之死信队列详解

    前言 在说死信队列之前,我们先介绍下为什么需要用死信队列. 如果想直接了解死信对接,直接跳入下文的"死信队列"部分即可. ack机制和requeue-rejected属性 我们还是基于上篇<Spring Boot系列--7步集成RabbitMQ>的demo代码来说. 在项目springboot-demo我们看到application.yaml文件部分配置内容如下 ... listener: type: simple simple: acknowledge-mode: aut

  • springcloud中RabbitMQ死信队列与延迟交换机实现方法

    目录 0.引言 1. 死信队列 1.2 什么是死信? 1.3 什么是死信队列? 1.4 创建死信交换机.死信队列 1.5 实现死信消息 1.5.1 基于消费者进行reject或nack实现死信消息 1.5.2 基于生存时间实现 1.5.3 基于队列max_length实现 1.6 基于死信队列实现消息延迟发送 基于死信队列实现消息延迟发送的问题 2. 延迟交换机 3. 应用场景 4. 练习题 0.引言 死信队列是消息队列中非常重要的概念,同时我们需要业务场景中都需要延迟发送的概念,比如12306

  • GoLang RabbitMQ TTL与死信队列以及延迟队列详细讲解

    目录 TTL 死信队列 延迟队列 Go实现延迟队列 TTL TTL 全称 Time To Live(存活时间/过期时间).当消息到达存活时间后,还没有被消费,就会被自动清除.RabbitMQ可以设置两种过期时间: 对消息设置过期时间. 对整个队列(Queue)设置过期时间. 如何设置 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期. 设置消息过期时间使用参数:expiration,单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这

随机推荐