RocketMQ延迟消息超详细讲解

目录
  • 一、什么是延时消息
  • 二、延时消息等级
  • 三、延时消息使用场景
  • 四、延时消息示例
  • 五、延时消息实现原理

一、什么是延时消息

当消息写入到Broker后,不能立刻被消费者消费,需要等待指定的时长后才可被消费处理的消息,称为延时消息。

二、延时消息等级

RocketMQ延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。默认支持18个等级的延迟消息,延时等级定义在RocketMQ服务端的MessageStoreConfig类中的如下变量中:

// MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
//发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
level == 0,消息为非延迟消息
1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
level > maxLevel,则level== maxLevel,例如level==20,延迟2h

例如指定的延时等级为3,则表示延迟时长为10s,即延迟等级是从1开始计数的。

三、延时消息使用场景

采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。使用场景主要有:

(1)、电商交易系统的订单超时未支付,自动取消订单

在电商交易系统中,像淘宝、京东,我们提交了一个订单之后,在支付时都会提示,需要在指定时间内(例如30分钟)完成支付,否则订单将被取消的消息,实际上这个超时未支付功能就可以使用延时消息来实现。在下单成功之后,就发送一个延时消息,然后指定消息的延时时间为30分钟,这条消息将会在30分钟后投递给后台业务系统(Consumer),此时才能被消费者进行消费,消费消息的时候会再去检查这个订单的状态,确认下是否支付成功,如果支付成功,则忽略不处理;如果订单还是未支付,则进行取消订单、释放库存等操作;

(2)、活动场景

比如B站视频投稿经常会发起一些活动,Up主在活动期间可以按照活动规则投稿视频,在活动时间截止后,后台根据Up主完成任务的情况以及结合投稿视频的播放量等进行判定,然后派发对应的奖励。这种场景我们也可以采用延时消息来实现,即在发起活动后,同时发送一条延时消息,延时时间设置为本次活动周期的时间。当活动结束后,这条延时消息刚好可以被消费者进行消费,这样就可以消费消息然后执行一系列的逻辑处理。

(3)、其它信息提醒等场景;

四、延时消息示例

(1)、编写Consumer消费端并启动,等待接收Producer发送过来的消息

/**
 * 消息消费者
 */
public class MQConsumer {
    public static void main(String[] args) throws MQClientException {
        // 创建DefaultMQPushConsumer类并设定消费者名称
        DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test");
        // 设置NameServer地址,如果是集群的话,使用分号;分隔开
        mqPushConsumer.setNamesrvAddr("10.0.90.86:9876");
        // 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
        // 如果不是第一次启动,那么按照上次消费的位置继续消费
        mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 设置消费模型,集群还是广播,默认为集群
        mqPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        // 消费者最小线程量
        mqPushConsumer.setConsumeThreadMin(5);
        // 消费者最大线程量
        mqPushConsumer.setConsumeThreadMax(10);
        // 设置一次消费消息的条数,默认是1
        mqPushConsumer.setConsumeMessageBatchMaxSize(1);
        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息,如果订阅该主题下的所有tag,则使用*
        mqPushConsumer.subscribe("DelayTopic", "*");
        // 注册回调实现类来处理从broker拉取回来的消息
        mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            // 监听类实现MessageListenerConcurrently接口即可,重写consumeMessage方法接收数据
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt message : msgList) {
                    String body = new String(message.getBody(), StandardCharsets.UTF_8);
                    System.out.println("消费者接收到消息: " + message.toString() + "---消息内容为:" + body + "消息被消费时间:" + new Date(System.currentTimeMillis()) + ", 消息存储时间: " + new Date(message.getBornTimestamp()));
                }
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        mqPushConsumer.start();
    }
}

(2)、编写Producer生产端,发送延时消息

RocketMQ要实现发送延迟消息,只需在发送消息之前调用Message#setDelayTimeLevel()方法设置消息的延迟等级即可。

只需要设置一个延迟级别即可,注意不是具体的延迟时间。如果设置的延迟级别超过最大值,那么将会重置为最大值。

/**
 * Producer端发送延迟消息:只需在发送消息之前调用setDelayTimeLevel()方法设置消息的延迟等级即可
 */
public class SyncMQProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        // 创建DefaultMQProducer类并设定生产者名称
        DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test");
        // 设置NameServer地址,如果是集群的话,使用分号;分隔开
        mqProducer.setNamesrvAddr("10.0.90.86:9876");
        // 消息最大长度 默认4M
        mqProducer.setMaxMessageSize(4096);
        // 发送消息超时时间,默认3000
        mqProducer.setSendMsgTimeout(3000);
        // 发送消息失败重试次数,默认2
        mqProducer.setRetryTimesWhenSendAsyncFailed(2);
        // 启动消息生产者
        mqProducer.start();
        // 创建消息,并指定Topic(主题),Tag(标签)和消息内容
        Message message = new Message("DelayTopic", "", "hello, 这是延迟消息".getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 设置延时等级为3, 所以这个消息将在10s之后发送, RocketMQ目前只支持固定的几个延时时间,还不支持自定义延迟时间
        message.setDelayTimeLevel(3);
        // 发送同步消息到一个Broker,可以通过sendResult返回消息是否成功送达
        SendResult sendResult = mqProducer.send(message);
        System.out.println(sendResult);
        // 如果不再发送消息,关闭Producer实例
        mqProducer.shutdown();
    }
}

(3)、启动Producer

SendResult [sendStatus=SEND_OK, msgId=AC6E005A51B018B4AAC278E9F6F70000, offsetMsgId=0A005A5600002A9F0000000000003465, messageQueue=MessageQueue [topic=DelayTopic, brokerName=broker-a, queueId=0], queueOffset=3]

从控制台可以看到,消息发送状态为SEND_OK,说明延迟消息已经成功发送到RocketMQ Broker中。

(4)、观察Consumer是否接收到消息

消费者接收到消息: MessageExt [brokerName=broker-a, queueId=0, storeSize=241, queueOffset=1, sysFlag=0, bornTimestamp=1645673399032, bornHost=/10.0.90.115:62807, storeTimestamp=1645673403365, storeHost=/10.0.90.86:10911, msgId=0A005A5600002A9F000000000000355F, commitLogOffset=13663, bodyCRC=676533924, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='DelayTopic', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=DelayTopic, MAX_OFFSET=2, CONSUME_START_TIME=1645673409075, UNIQ_KEY=AC6E005A51B018B4AAC278E9F6F70000, CLUSTER=DefaultCluster, DELAY=3, WAIT=true, REAL_QID=0}, body=[104, 101, 108, 108, 111, 44, 32, -24, -65, -103, -26, -104, -81, -27, -69, -74, -24, -65, -97, -26, -74, -120, -26, -127, -81], transactionId='null'}]---消息内容为:hello, 这是延迟消息消息被消费时间:Thu Feb 24 11:30:09 CST 2022, 消息存储时间: Thu Feb 24 11:29:59 CST 2022

可以看到,延迟消息成功被消息,并且我们注意到,消息被Consumer消费的时间【Thu Feb 24 11:30:09 CST 2022】 - 消息存储时间【Thu Feb 24 11:29:59 CST 2022】 = 10s,发送消息的时候,指定的延迟等级也是10s,也就是消息的消费比存储时间晚10秒。

五、延时消息实现原理

RocketMQ延时消息会暂存在名为SCHEDULE_TOPIC_XXXX的Topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。

SCHEDULE_TOPIC_XXXX中consumequeue中的文件夹名称就是队列的名称,并且【队列名称 = 延迟等级 - 1】;如下图,在前面的例子中,我们执定消息的延迟时间为10s,对应的延迟等级是3,所以文件夹名称为【3 - 1 = 2】

延迟消息在RocketMQ Broker端的流转如下图所示:

主要包含以下6个步骤:

(1)、修改消息Topic名称和队列信息

RocketMQ Broker端在存储生产者写入的消息时,首先都会将其写入到CommitLog中。之后根据消息中的Topic信息和队列信息,将其转发到目标Topic的指定队列(ConsumeQueue)中。

由于消息一旦存储到ConsumeQueue中,消费者就能消费到,而延迟消息不能被立即消费,所以这里将Topic的名称修改为SCHEDULE_TOPIC_XXXX,并根据延迟级别确定要投递到哪个队列下。同时,还会将消息原来要发送到的目标Topic和队列信息存储到消息的属性中。

(2)、转发消息到延迟主题SCHEDULE_TOPIC_XXXX的CosumeQueue中

CommitLog中的消息转发到CosumeQueue中是异步进行的。在转发过程中,会对延迟消息进行特殊处理,主要是计算这条延迟消息需要在什么时候进行投递。

投递时间 = 消息存储时间(storeTimestamp) + 延迟级别对应的时间

需要注意的是,会将计算出的投递时间当做消息Tag的哈希值存储到CosumeQueue中,CosumeQueue单个存储单元组成结构如下图所示:

其中:

  • Commit Log Offset:记录在CommitLog中的位置;
  • Size:记录消息的大小;
  • Message Tag HashCode:记录消息Tag的哈希值,用于消息过滤。特别的,对于延迟消息,这个字段记录的是消息的投递时间戳。这也是为什么java中hashCode方法返回一个int型,只占用4个字节,而这里Message Tag HashCode字段却设计成8个字节的原因;

(3)、延迟服务消费SCHEDULE_TOPIC_XXXX消息

Broker内部有一个ScheduleMessageService类,其充当延迟服务,主要是消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中。

ScheduleMessageService在启动时,其会创建一个定时器Timer,并根据延迟级别的个数,启动对应数量的TimerTask,每个TimerTask负责一个延迟级别的消费与投递。

需要注意的是,每个TimeTask在检查消息是否到期时,首先检查对应队列中尚未投递第一条消息,如果这条消息没到期,那么之后的消息都不会检查。如果到期了,则进行投递,并检查之后的消息是否到期。

(4)、将信息重新存储到CommitLog中

在将消息到期后,需要投递到目标Topic。由于在第一步已经记录了原来的Topic和队列信息,因此这里重新设置,再存储到CommitLog即可。此外,由于之前Message Tag HashCode字段存储的是消息的投递时间,这里需要重新计算tag的哈希值后再存储。

(5)、将消息投递到目标Topic中

这一步与第二步类似,不过由于消息的Topic名称已经改为了目标Topic。因此消息会直接投递到目标Topic的ConsumeQueue中,之后消费者即消费到这条消息。

(6)、消费者消费目标topic中的数据。

到此这篇关于RocketMQ延迟消息超详细讲解的文章就介绍到这了,更多相关RocketMQ延迟消息内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • RocketMQ-延迟消息的处理流程介绍

    概述 RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息: 预设值的延迟时间间隔为: 1s. 5s. 10s. 30s. 1m. 2m. 3m. 4m. 5m. 6m. 7m. 8m. 9m. 10m. 20m. 30m. 1h. 2h: 在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间: broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到

  • RocketMQ延迟消息简明介绍

    目录 前言 核心属性 RMQ_SYS_SCHEDULE_TOPIC FIRST_DELAY_TIME DELAY_FOR_A_WHILE DELAY_FOR_A_PERIOD delayLevelTable offsetTable 核心方法 queueId2DelayLevel delayLevel2QueueId updateOffset computeDeliverTimestamp start() shutdown() load() parseDelayLevel 前言 场景可以是这样的,

  • RocketMQ延迟消息超详细讲解

    目录 一.什么是延时消息 二.延时消息等级 三.延时消息使用场景 四.延时消息示例 五.延时消息实现原理 一.什么是延时消息 当消息写入到Broker后,不能立刻被消费者消费,需要等待指定的时长后才可被消费处理的消息,称为延时消息. 二.延时消息等级 RocketMQ延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的.默认支持18个等级的延迟消息,延时等级定义在RocketMQ服务端的MessageStoreConfig类中的如下变量中: // MessageStoreConf

  • RocketMQ生产消息与消费消息超详细讲解

    目录 1 RocketMQ简介 2 MQ的常见产品 3 环境搭建 4 单生产者单消费者模式 5 单生产者多消费者模式 5.1默认模式(负载均衡) 5.2广播模式 6 多生产者多消费者模式 1 RocketMQ简介 RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目.并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的

  • C++ Boost log日志库超详细讲解

    目录 一.说明 二.库Boost.Log 一.说明 应用程序库是指通常专门用于独立应用程序开发而不用于库开发的库. Boost.Log 是一个日志库. Boost.ProgramOptions 是一个用于定义和解析命令行选项的库. Boost.Serialization 允许您序列化对象,例如,将它们保存到文件或从文件加载它们. Boost.Uuid 支持使用 UUID. 具体内容 62. Boost.Log 63. Boost.ProgramOptions 64. Boost.Serializ

  • 超详细讲解Java异常

    目录 一.Java异常架构与异常关键字 Java异常简介 Java异常架构 1.Throwable 2.Error(错误) 3.Exception(异常) 4.受检异常与非受检异常 Java异常关键字 二.Java异常处理 声明异常 抛出异常 捕获异常 如何选择异常类型 常见异常处理方式 1.直接抛出异常 2.封装异常再抛出 3.捕获异常 4.自定义异常 5.try-catch-finally 6.try-with-resource 三.Java异常常见面试题 1.Error 和 Excepti

  • Java 超详细讲解核心类Spring JdbcTemplate

    目录 JdbcTemplate概述 JdbcTemplate开发步骤 JdbcTemplate快速入门 Spring产生JdbcTemplate对象 JdbcTemplate的常用操作 修改操作 删除和查询全部操作 查询单个数据操作 本章小结 JdbcTemplate概述 它是spring框架中提供的一个对象,是对原始繁琐的Jdbc API对象的简单封装.spring框架为我们提供了很多的操作 模板类.例如:操作关系型数据的JdbcTemplate和HibernateTemplate,操作nos

  • Java SpringMVC数据响应超详细讲解

    目录 1)页面跳转   2)回写数据 3)配置注解驱动 4)知识要点 1)页面跳转   直接返回字符串:此种方式会将返回的字符串与视图解析器的前后缀拼接后跳转.  返回带有前缀的字符串: 转发: forward:/WEB-INF/views/index.jsp 重定向: redirect:/index.jsp 通过ModelAndView对象返回 @RequestMapping("/quick2") public ModelAndView quickMethod2(){ ModelAn

  • Spring Boot超详细讲解请求处理流程机制

    目录 1. 背景 2. Spring Boot 的请求处理流程设计 3. Servlet服务模式请求流程分析 3.1 ServletWebServerApplicationContext分析 3.2 Servlet服务模式之请求流程具体分析 4. Reactive服务模式请求流程分析 4.1 ReactiveWebServerApplicationContext分析 4.2 webflux服务模式之请求流程具体分析 5. 总结 1. 背景 之前我们对Spring Boot做了研究讲解,我们知道怎

  • Java超详细讲解WebMvcConfigurer拦截器

    目录 addInterceptors拦截器 addViewControllers页面跳转 addResourceHandlers静态资源 configureViewResolvers视图解析器 addCorsMappings跨域 configureMessageConverters信息转换器 WebMvcConfigurer接口常用的方法: /* 拦截器配置 */ void addInterceptors(InterceptorRegistry var1); /* 视图跳转控制器 */ void

  • php超详细讲解命名管道

    目录 进程间为什么要通信 进程如何实现通信 常见进程通信方式 管道概念 命名管道实现 posix_mkfifo函数 无血缘进程间通信 进程间为什么要通信 进程间通信的目的: 数据传输:一个 进程需要将它的数据 发送给另一个进程. 通知事件:一个进程需要向另一个或一组进程 发送消息,通知它(它们)发生了 某种事件(如进程终止时要通知父进程). 资源共享:多个进程之间 共享同样的资源 .为了做到这一点,需要内核提供互斥和同步机制. 进程控制:有些进程 希望完全控制另一个进程的执行 (如 Debug

  • Netty与NIO超详细讲解

    目录 Linux下的五种I/O模型 阻塞IO的流程 IO复用 信号驱动I/O 异步IO NIO I0多路复用 NIO核心组件 使用Java原生API实现NIO操作 Redis为什么支持高并发 Linux下的五种I/O模型 1)阻塞I/O(blocking I/O) 2)非阻塞I/O (nonblocking I/O) 3) I/O复用(select 和poll) (I/O multiplexing) 4)信号驱动I/O (signal driven I/O (SIGIO)) 5)异步I/O (a

随机推荐