RocketMq 消息重试机制及死信队列详解

目录
  • 生产者消息重试
  • 消费者消息重试
    • 并发消费
    • 顺序消费
    • 并发消费和顺序消费区别
  • 死信队列
  • 实践出真知
    • 公共部分创建
    • 测试并发消费
      • 并发消费状态
    • 测试顺序消费
      • 顺序消费状态
    • 测试死信队列
      • 死信队列特性

生产者消息重试

消息队列中的消息消费时并不能保证总是成功的,那失败的消息该怎么进行消息补偿呢?这就用到今天的主角消息重试和死信队列了。

有时因为网路等原因生产者也可能发送消息失败,也会进行消息重试,生产者消息重试比较简单,在springboot中只要在配置文件中配置一下就可以了。

# 异步消息发送失败重试次数,默认为2
rocketmq.producer.retry-times-when-send-async-failed=2
# 消息发送失败重试次数,默认为2
rocketmq.producer.retry-times-when-send-failed=2

也可以通过下面这种方式配置

DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setRetryTimesWhenSendFailed(2);
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(2);

消费者消息重试

Apache RocketMQ 有两种消费模式:集群消费模式和广播消费模式。消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

同时RocketMq Push消费提供了两种消费方式:并发消费和顺序消费。

并发消费

在并发消费中,可能会有多个线程同时消费一个队列的消息,因此即使发送端通过发送顺序消息保证消息在同一个队列中按照FIFO的顺序,也无法保证消息实际被顺序消费,所有并发消费也可以称之为无序消费。

顺序消费

顺序消费是消息生产者发送过来的消息会遵循FIFO队列的思想,先进先出有顺序的消费消息。 对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

并发消费和顺序消费区别

顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。

并发消费失败后并不是投递回原Topic,而是投递到一个特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。

两者参数差别如下

消费类型 重试间隔 最大重试次数
顺序消费 间隔时间可通过自定义设置,SuspendCurrentQueueTimeMillis 最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX
并发消费 间隔时间根据重试次数阶梯变化,取值范围:1秒~2小时。不支持自定义配置 最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值

并发消费重试间隔如下:

第几次重试 与上次重试的间隔时间 第几次重试 与上次重试的间隔时间
1 10s 9 7min
2 30s 10 8min
3 1min 11 9min
4 2min 12 10min
5 3min 13 20min
6 4min 14 30min
7 5min 15 1h
8 6min 16 2h

死信队列

当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。

实践出真知

Talk is cheap,show you the code.

公共部分创建

  • 配置文件
rocketmq.name-server=localhost:9876
# 消费者组
rocketmq.producer.group=producer_group
rocketmq.consumer.topic=consumer_topic
rocketmq.consumer.group=consumer_group
  • 创建消费者RetryConsumerDemo
@Component
public class RetryConsumerDemo {
    @Value("${rocketmq.name-server}")
    private String namesrvAddr;
    @Value("${rocketmq.consumer.topic}")
    private String topic;
    @Value("${rocketmq.consumer.group}")
    private String consumerGroup;
    private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
    @PostConstruct
    public void start() {
        try {
            consumer.setNamesrvAddr(namesrvAddr);
            //设置集群消费模式
            consumer.setMessageModel(MessageModel.CLUSTERING);
            //设置消费超时时间(分钟)
            consumer.setConsumeTimeout(1);
            //订阅主题
            consumer.subscribe(topic , "*");
            //注册消息监听器
            consumer.registerMessageListener(new MessageListenerConcurrentlyImpl());
            //最大重试次数
            consumer.setMaxReconsumeTimes(2);
            //启动消费端
            consumer.start();
            System.out.println("Retry Consumer Start...");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}

测试并发消费

  • 创建并发消费监听类 并发消费监听类要实现MessageListenerConcurrently类
public class MessageListenerConcurrentlyImpl implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        if (CollectionUtils.isEmpty(msgs)) {
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        MessageExt message = msgs.get(0);
        try {
            final LocalDateTime now = LocalDateTime.now();
            //逐条消费
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            System.out.println("当前时间:"+now+", messageId: " + message.getMsgId() + ",topic: " +
                    message.getTopic()  + ",messageBody: " + messageBody);
            //模拟消费失败
            if ("Concurrently_test".equals(messageBody)) {
                int a = 1 / 0;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}
  • 注册监听类 在消费者类RetryConsumerDemo中注册监听类
//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrentlyImpl());
  • 测试
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RocketmqApplication.class)
class RocketmqApplicationTests {
    @Value("${rocketmq.consumer.topic}")
    private String topic;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Test
    public void testProducer(){
        String msg = "Concurrently_test";
        rocketMQTemplate.convertAndSend(topic , msg);
    }
}

测试结果:

后面重试时间太长就不做测试了,可以看到并发消费的消息时间都是按照上面那张时间间隔表来。

然后通过RocketMq Dashboard Topic一栏可以看到有一个重试消费者组%RETRY%consumer_group,这个消费者组内存放的就是consumer_group消费者组消费失败重试的消息。

并发消费的重试次数是可以修改的,重试次数对应参数DefaultMQPushConsumer类的maxReconsumeTimes属性,maxReconsumeTimes默认是-1,也就是默认会重试16次;

0代表不重试,只要失败就会放入死信队列;

1-16重试次数对应着上面时间间隔表中对应次数。

配置的最大重试次数超过16就按16处理。

并发消费状态

并发消费有两个状态CONSUME_SUCCESS和RECONSUME_LATER。返回CONSUME_SUCCESS代表着消费成功,返回RECONSUME_LATER代表进行消息重试。

public enum ConsumeConcurrentlyStatus {
    /**
     * Success consumption
     */
    CONSUME_SUCCESS,
    /**
     * Failure consumption,later try to consume
     */
    RECONSUME_LATER;
}

当MessageListenerConcurrently接口的consumeMessage方法返回ConsumeConcurrentlyStatus#RECONSUME_LATER、null或者方法抛异常了,都会进行消息重试。当然还是推荐返回ConsumeConcurrentlyStatus#RECONSUME_LATER。

测试顺序消费

顺序消费和并行消费其实都差不多的,只不过顺序消费实现的是MessageListenerOrderly 接口

  • 创建顺序消费监听类
public class MessageListenerOrderlyImpl implements MessageListenerOrderly {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        if (CollectionUtils.isEmpty(msgs)) {
            return ConsumeOrderlyStatus.SUCCESS;
        }
        MessageExt message = msgs.get(0);
        try {
            final LocalDateTime now = LocalDateTime.now();
            //逐条消费
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            System.out.println("当前时间:"+now+", messageId: " + message.getMsgId() + ",topic: " +
                    message.getTopic()  + ",messageBody: " + messageBody);
            //模拟消费失败
            if ("Orderly_test".equals(messageBody)) {
                int a = 1 / 0;
            }
            return ConsumeOrderlyStatus.SUCCESS;
        } catch (Exception e) {
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
    }
}
  • 注册监听类
//最大重试次数
consumer.setMaxReconsumeTimes(2);
//顺序消费 重试时间间隔
consumer.setSuspendCurrentQueueTimeMillis(2000);

SuspendCurrentQueueTimeMillis表示重试的时间间隔,默认是1s,这里修改成2s

  • 测试
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RocketmqApplication.class)
class RocketmqApplicationTests {
    @Value("${rocketmq.consumer.topic}")
    private String topic;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Test
    public void testProducer(){
        String msg = "Orderly_test";
        rocketMQTemplate.convertAndSend(topic , msg);
    }
}

测试结果:

可以看到三条结果,第一条是第一次消费的,其余两条是隔了2s重试的。重试2次之后这条数据就进入了死信队列。

顺序消费状态

顺序消费目前也是两个状态:SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT。SUSPEND_CURRENT_QUEUE_A_MOMENT意思是先暂停消费一下,过SuspendCurrentQueueTimeMillis时间间隔后再重试一下,而不是放到重试队列里。

public enum ConsumeOrderlyStatus {
    /**
     * Success consumption
     */
    SUCCESS,
    /**
     * Rollback consumption(only for binlog consumption)
     */
    @Deprecated
    ROLLBACK,
    /**
     * Commit offset(only for binlog consumption)
     */
    @Deprecated
    COMMIT,
    /**
     * Suspend current queue a moment
     */
    SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

测试死信队列

并发消费和顺序消费达到了最大重试次数之后就会放到死信队列。死信队列在一开始是不会被创建的,只有需要的时候才会被创建。就拿上面测试结果来看,进入到的死信队列就是%DLQ%consumer_group,进入死信队列的消息要收到处理。

死信队列特性

  • 不会再被消费者正常消费。
  • 一个死信队列对应一个分组, 而不是对应单个消费者实例。
  • 如果一个消费者组未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
  • 一个死信队列包含了对应 分组产生的所有死信消息,不论该消息属于哪个 Topic。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理

参考资料:

https://rocketmq.apache.org/docs/

以上就是RocketMq 消息重试机制及死信队列详解的详细内容,更多关于RocketMq 消息重试死信队列的资料请关注我们其它相关文章!

(0)

相关推荐

  • 图文并茂讲解RocketMQ消息类别

    目录 1.同步消息 2.异步消息 3.单向消息 1.同步消息 即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功) 生产者: public class Producer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer=new DefaultMQProducer("group1"); producer.setNamesrvAddr("19

  • RocketMQ普通消息实战演练详解

    目录 引言 普通消息同步发送 普通消息异步发送 普通消息单向发送 集群消费模式 广播消费模式 引言 之前研究了RocketMQ的源码,在这里将各种消息发送与消费的demo进行举例,方便以后使用的时候CV. 相关的配置,安装和启动在这篇文章有相关讲解  https://www.jb51.net/article/260237.htm 普通消息同步发送 同步消息是指发送出消息后,同步等待,直到接收到Broker发送成功的响应才会继续发送下一个消息.这个方式可以确保消息发送到Broker成功,一些重要的

  • RocketMQ消息发送流程源码剖析

    目录 正文 读源码 1 调用defaultMQProducerImpl.send() 2 设置过期时间 3 执行defaultMQProducerImpl.sendDefaultImpl()方法 sendDefaultImpl是发送消息的核心方法. 1 两个校验 2 获取topic路由信息 3 计算重试次数 4 执行队列选择方法 5 发送消息 正文 就是说,我们打了个比方,把RocketMQ比作码头上的一个小房子,来送孩子登船的家长比作生产者,拉走孩子们的船夫比作消费者,所以,RocketMQ的

  • RocketMQ消息队列实现随机消息发送当做七夕礼物

    目录 正文 1 下载并启动RocketMQ 1.1 首先启动name server 1.2 然后启动Broker 2 生产者 3 消费者 正文 都在过情人节,前端的小哥哥们给女朋友画个页面,美美的,写个chrome插件,好看的,俺们后端程序员咋办. 我给媳妇写首诗,哈哈 我决定,把想对媳妇说的,今天发送到一个MQ里边,然后在七夕当天,打开消费者,将这一段话给俺媳妇看.你看,这就是我好久前对你说的话,这就是我们后端程序员的浪漫.当然也可以多发送几个,到时候跟根据topic控制到底发什么,哈哈. 这

  • RocketMq 消息重试机制及死信队列详解

    目录 生产者消息重试 消费者消息重试 并发消费 顺序消费 并发消费和顺序消费区别 死信队列 实践出真知 公共部分创建 测试并发消费 并发消费状态 测试顺序消费 顺序消费状态 测试死信队列 死信队列特性 生产者消息重试 消息队列中的消息消费时并不能保证总是成功的,那失败的消息该怎么进行消息补偿呢?这就用到今天的主角消息重试和死信队列了. 有时因为网路等原因生产者也可能发送消息失败,也会进行消息重试,生产者消息重试比较简单,在springboot中只要在配置文件中配置一下就可以了. # 异步消息发送

  • Spring Boot系列教程之死信队列详解

    前言 在说死信队列之前,我们先介绍下为什么需要用死信队列. 如果想直接了解死信对接,直接跳入下文的"死信队列"部分即可. ack机制和requeue-rejected属性 我们还是基于上篇<Spring Boot系列--7步集成RabbitMQ>的demo代码来说. 在项目springboot-demo我们看到application.yaml文件部分配置内容如下 ... listener: type: simple simple: acknowledge-mode: aut

  • SpringBoot整合rockerMQ消息队列详解

    目录 Springboot整合RockerMQ 使用总结 消费模式 生产者组和消费者组 生产者投递消息的三种方式 如何保证消息不丢失 顺序消息 分布式事务 Springboot整合RockerMQ 1.maven依赖 <dependencies> <!-- springboot-web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>

  • Java 阻塞队列详解及简单使用

     Java 阻塞队列详解 概要: 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全"传输"数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 认识BlockingQueue阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示: 从上图我们可以很清楚看到,通过一个共享的队列,

  • Netty的Handler链调用机制及如何组织详解

    目录 什么是 Handler Handler 是怎么被组织起来的 Handler 链调用机制 简述 ChannelPipeline 如何调度 handler 什么是 Handler Netty是一款基于NIO的异步事件驱动网络应用框架,其核心概念之一就是Handler.而Handler是Netty中处理事件的核心组件,用于处理入站和出站的数据流,实现业务逻辑和网络协议的处理. 在Netty中,Handler是一个接口,主要分为两种:ChannelInboundHandler(入站Handler)

  • java中注解机制及其原理的详解

    java中注解机制及其原理的详解 什么是注解 注解也叫元数据,例如我们常见的@Override和@Deprecated,注解是JDK1.5版本开始引入的一个特性,用于对代码进行说明,可以对包.类.接口.字段.方法参数.局部变量等进行注解.它主要的作用有以下四方面: 生成文档,通过代码里标识的元数据生成javadoc文档. 编译检查,通过代码里标识的元数据让编译器在编译期间进行检查验证. 编译时动态处理,编译时通过代码里标识的元数据动态处理,例如动态生成代码. 运行时动态处理,运行时通过代码里标识

  • Android 消息分发使用EventBus的实例详解

    Android 消息分发使用EventBus的实例详解 1. AndroidStudio使用 dependencies { //最新版本 compile 'org.greenrobot:eventbus:3.0.0' //可以翻倍提高EventBus使用效率 provided 'de.greenrobot:eventbus-annotation-processor:3.0.0-beta1' } 2. 在基类Activity中配置 /** * Activity基类 */ protected Eve

  • C语言 表、栈和队列详解及实例代码

    C语言 表.栈和队列详解 表ADT 形如A1,A2,A3-An的表,这个表的大小为n,而大小为0的表称为空表,非空表中,Ai+1后继Ai,Ai-1前驱Ai,表ADT的相关操有PrintList打印表中的元素:CreateEmpty创建一个空表:Find返回关键字首次出现的位置:Insert和Delete从表的某个位置插入和删除某个关键字. 对表的所有操作都可以通过使用数组来实现,但在这里使用链表的方式来实现.链表(linked list)由一系列不必在内存中相连的结构组成,每个结构均含有元素和指

  • python发qq消息轰炸虐狗好友思路详解(完整代码)

    因为我的某个好友在情人节的时候秀恩爱,所以我灵光一闪制作了qq消息轰炸并记录了下来. 首先 我的编程环境是: windows 10系统 python3.6 记得要下载win32 pip install win32 思路介绍 其实也非常简单 将要发出去的句子储存在列表中 然后用随机模块调用 将随机出来的元素储存在剪贴板中 连接QQ 找到指定对象 疯狂输出 怎么样,简单吧 开始打代码吧 import random import win32gui as a import win32con as b i

  • python字符串驻留机制的使用范围知识点详解

    1.字符串的长度为0和1时. 2.符合标识符的字符串. 3.字符串只在编译时进行驻留,而非运行时. 4.[-5,256]之间的整数数字. 实例 >>> str1='jiumo' >>> str2='jiumo' >>> str1 is str2 True >>> id(str1) 1979078421896 >>> id(str2) 1979078421896 知识点扩充: 驻留时机 所有长度为 0 和长度为 1 的

随机推荐