RocketMQ-延迟消息的处理流程介绍

概述

RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;

预设值的延迟时间间隔为:

1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;

在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;

broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面。

Broker处理延迟消息

CommitLog.putMessage()

//获取消息的sysflag
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        //非事务消息 或 已commit事务消息
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery 判断消息是否设置延迟
            if (msg.getDelayTimeLevel() > 0) {
                //判断延迟级别是否大于最大级别,如果大于最大值,则将延迟级别设置为最大级
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                //延迟消息的topic为 SCHEDULE_TOPIC_XXXX
                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                //获取延迟级别,一个延迟级别对应一个Queue
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                //消息原始的topic,queueid保存到消息的property中
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

1、判断消息类型,如果是非事务消息、已commit事务消息,才能处理延迟消息

2、判断消息是否设置延迟级别,如果延迟级别大于0,则该消息为延迟消息

3、判断延迟级别是否大于最大级别,如果大于最大值,则将延迟级别设置为最大级

4、延迟消息的topic为 SCHEDULE_TOPIC_XXXX

5、获取延迟级别,一个延迟级别对应一个Queue

6、消息原始的topic,queueid保存到消息的property中

7、修改消息的topci、queueid

启动延迟消息定时任务

ScheduleMessageService.start()

延迟消息投递

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • springboot整合rocketmq实现分布式事务

    1 执行流程 (1) 发送方向 MQ 服务端发送消息. (2) MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息. (3) 发送方开始执行本地事务逻辑. (4) 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息:MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息. (5)

  • RocketMQTemplate 注入失败的解决

    RocketMQTemplate 注入失败 在使用rocketmq 发送消息时,会发现 @Autowired private RocketMQTemplate rocketMQTemplate; 注入RocketMQTemplate 失败. 解决方案 究其原因是因为,配置文件中,我们没有添加 上图中蓝色的两行代码,指定发送的组名.写上后,问题解决. 好了,再来说说RocketMQTemplate 的基本使用吧~ RocketMQTemplate的使用 1.pom.xml依赖 <dependenc

  • 基于rocketmq的有序消费模式和并发消费模式的区别说明

    rocketmq消费者注册监听有两种模式 有序消费MessageListenerOrderly和并发消费MessageListenerConcurrently,这两种模式返回值不同. MessageListenerOrderly 正确消费返回 ConsumeOrderlyStatus.SUCCESS 稍后消费返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT MessageListenerConcurrently 正确消费返回 Consu

  • RocketMQ消息丢失场景以及解决方法

    既然使用在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题.在一些涉及到了金钱交易的场景下,消息丢失还是很致命的.那么在RocketMQ中存在哪几种消息丢失的场景呢? 先来一张最简单的消费流程图: 上图中大致包含了这么几种场景: 生产者产生消息发送给RocketMQRocketMQ接收到了消息之后,必然需要存到磁盘中,否则断电或宕机之后会造成数据的丢失消费者从RocketMQ中获取消息消费,消费成功之后,整个流程结束 这三种场景都可能会产生消息的丢失,如下图所示: 场景1中生产者将消息发送

  • 解决SpringBoot整合RocketMQ遇到的坑

    应用场景 在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group.Topic以及selectorExpression(数据过滤.选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换. 引入依赖 <!-- RocketMq Spring Boot Starter--> <dependency> <groupId>org.apache.rocketmq<

  • RocketMQ消息过滤与查询的实现

    消息过滤 RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的. RocketMQ这么做是还是在于其Producer端写入消息和Consomer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构. 其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的M

  • 使用RocketMQTemplate发送带tags的消息

    RocketMQTemplate发送带tags的消息 RocketMQTemplate是RocketMQ集成到Spring cloud之后提供的个方便发送消息的模板类,它是基本Spring 的消息机制实现的,对外只提供了Spring抽象出来的消息发送接口. 在单独使用RocketMQ的时候,发送消息使用的Message是'org.apache.rocketmq.common.message'包下面的Message,而使用RocketMQTemplate发送消息时,使用的Message是org.s

  • RocketMQ延迟消息简明介绍

    目录 前言 核心属性 RMQ_SYS_SCHEDULE_TOPIC FIRST_DELAY_TIME DELAY_FOR_A_WHILE DELAY_FOR_A_PERIOD delayLevelTable offsetTable 核心方法 queueId2DelayLevel delayLevel2QueueId updateOffset computeDeliverTimestamp start() shutdown() load() parseDelayLevel 前言 场景可以是这样的,

  • RocketMQ延迟消息超详细讲解

    目录 一.什么是延时消息 二.延时消息等级 三.延时消息使用场景 四.延时消息示例 五.延时消息实现原理 一.什么是延时消息 当消息写入到Broker后,不能立刻被消费者消费,需要等待指定的时长后才可被消费处理的消息,称为延时消息. 二.延时消息等级 RocketMQ延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的.默认支持18个等级的延迟消息,延时等级定义在RocketMQ服务端的MessageStoreConfig类中的如下变量中: // MessageStoreConf

  • RocketMq事务消息发送代码流程详解

    一.RocketMq事务消息流程: 1.首先会向broker发送一个预请求消息,消费者不可见 2.回调执行本地事务(比如操作数据库) 3.事务执行成功后,再次发送消息给broker,告诉broker事务执行成功这个消息要提交,让消费者可见.如果本地事务执行超时,会返回一个unknow,broker会发送一个消息回查,检查消息是否执行成功. 二.RocketMq事务消息实例: 1.引入rocketMq相关的依赖: <dependency> <groupId>org.apache.ro

  • RocketMQ-延迟消息的处理流程介绍

    概述 RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息: 预设值的延迟时间间隔为: 1s. 5s. 10s. 30s. 1m. 2m. 3m. 4m. 5m. 6m. 7m. 8m. 9m. 10m. 20m. 30m. 1h. 2h: 在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间: broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到

  • RocketMQ事务消息原理与使用详解

    目录 一.RocketMQ事务消息概要 二.RocketMQ事务消息使用案例 (1).定义消息监听器 (2).定义消息生产者 (3).定义消息消费者 (4).观察生产者控制台输出 (5).观察消费者控制台输出 三.RocketMQ事务消息原理 四.RocketMQ事务消息使用限制 一.RocketMQ事务消息概要 RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败.RocketMQ的事务消息提供类

  • Springboot 整合 RocketMQ 收发消息

    Springboot 整合 RocketMQ 收发消息 创建springboot项目 pom.xml添加rocketmq-spring-boot-starter依赖. <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version>

  • Springboot 整合 RocketMQ 收发消息的配置过程

    Springboot 整合 RocketMQ 收发消息 创建springboot项目 pom.xml添加rocketmq-spring-boot-starter依赖. <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version>

  • Springboot详解RocketMQ实现消息发送与接收流程

    springboot+rockermq 实现简单的消息发送与接收 普通消息的发送方式有3种:单向发送.同步发送和异步发送. 下面来介绍下 springboot+rockermq 整合实现 普通消息的发送与接收 创建Springboot项目,添加rockermq 依赖 <!--rocketMq依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-

  • RocketMQ broker 消息投递流程处理PULL_MESSAGE请求解析

    目录 RocketMq消息处理 1. 处理PULL_MESSAGE请求 2. 获取消息 3. 挂起请求:PullRequestHoldService#suspendPullRequest 3.1 处理挂起请求的线程:PullRequestHoldService 3.2 唤醒请求:PullMessageProcessor#executeRequestWhenWakeup 3.3 消息分发中唤醒consumer请求 总结 RocketMq消息处理 RocketMq消息处理整个流程如下: 本系列Roc

  • Go+Kafka实现延迟消息的实现示例

    目录 前言 原理 简单的实现 生产者 延迟服务 消费者 改进点 通用的延迟服务 生产者负责延迟服务 总结 前言 延迟队列是一个非常有用的工具,我们经常遇到需要使用延迟队列的场景,比如延迟通知,订单关闭等等. 这篇文章主要是使用Go+Kafka实现延迟消息. 使用了sarama客户端. 原理 Kafka实现延迟消息分为下面三步: 生产者把消息发送到延迟队列 延迟服务把延迟队列里超过延迟时间的消息写入真实队列 消费者消费真实队列里的消息 简单的实现 生产者 生产者只是把消息发送到延迟队列 msg :

随机推荐