RocketMQ 延时级别配置方式

RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等。

其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推。

如何配置:

在服务器端(rocketmq-broker端)的属性配置文件中加入以下行:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

描述了各级别与延时时间的对应映射关系。

这个配置项配置了从1级开始各级延时的时间,如1表示延时1s,2表示延时5s,14表示延时10m,可以修改这个指定级别的延时时间;

时间单位支持:s、m、h、d,分别表示秒、分、时、天;

默认值就是上面声明的,可手工调整;

默认值已经够用,不建议调整【仅供参考,还是根据实际需要调整。调整默认值时注意同时要修改时间对应的level级别的值】

如何发送延时消息:

发送延时消息只需要在客户端(rocketmq-client端)待发送的消息( com.alibaba.rocketmq.common.message.Message )中设置延时级别delayLevel即可。

Message msg = new Message(topicName,"",keys,message.getBytes());
msg.setDelayTimeLevel(delayLevel);
SendResult sendResult = getMQProducer.send(msg);

RocketMQ定时(延迟)消息

RocketMQ 不支持任意时间自定义的延迟消息,仅支持内置预设值的延迟时间间隔的延迟消息。

预设值的延迟时间间隔为:

1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h

延时消息的使用场景

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

生产

package com.xin.rocketmq.demo.testrun;
import com.xin.rocketmq.demo.config.JmsConfig;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class ProducerDelay {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("192.168.10.11:9876");
        producer.start();
        Message msg1 = new Message(
                JmsConfig.TOPIC,
                "订单001".getBytes());
        msg1.setDelayTimeLevel(2);//延迟5秒
        Message msg2 = new Message(
                JmsConfig.TOPIC,
                "订单001".getBytes());
        msg2.setDelayTimeLevel(4);//延迟30秒
        SendResult sendResult1 = producer.send(msg1);
        SendResult sendResult2 = producer.send(msg2);
        System.out.println("Product1-同步发送-Product信息={}" + sendResult1);
        System.out.println("Product2-同步发送-Product信息={}" + sendResult2);
        producer.shutdown();
    }
}

消费

package com.xin.rocketmq.demo.testrun;
import com.xin.rocketmq.demo.config.JmsConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ConsumerDelay {
    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        // 设置NameServer的地址
        consumer.setNamesrvAddr("192.168.10.11:9876");
        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe(JmsConfig.TOPIC, "*");
        // 注册消息监听者
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // Print approximate delay time period
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
    }
}

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • 解决SpringBoot整合RocketMQ遇到的坑

    应用场景 在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group.Topic以及selectorExpression(数据过滤.选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换. 引入依赖 <!-- RocketMq Spring Boot Starter--> <dependency> <groupId>org.apache.rocketmq<

  • springboot整合rocketmq实现分布式事务

    1 执行流程 (1) 发送方向 MQ 服务端发送消息. (2) MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息. (3) 发送方开始执行本地事务逻辑. (4) 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息:MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息. (5)

  • 使用Kotlin+RocketMQ实现延时消息的示例代码

    一. 延时消息 延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 使用延时消息的典型场景,例如: 在电商系统中,用户下完订单30分钟内没支付,则订单可能会被取消. 在电商系统中,用户七天内没有评价商品,则默认好评. 这些场景对应的解决方案,包括: 轮询遍历数据库记录 JDK 的 DelayQueue ScheduledExecutorService 基于 Quartz 的定时任务 基于 Redis 的 zset 实现延时队列. 除此之外,

  • RocketMQ消息过滤与查询的实现

    消息过滤 RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的. RocketMQ这么做是还是在于其Producer端写入消息和Consomer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构. 其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的M

  • 基于rocketmq的有序消费模式和并发消费模式的区别说明

    rocketmq消费者注册监听有两种模式 有序消费MessageListenerOrderly和并发消费MessageListenerConcurrently,这两种模式返回值不同. MessageListenerOrderly 正确消费返回 ConsumeOrderlyStatus.SUCCESS 稍后消费返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT MessageListenerConcurrently 正确消费返回 Consu

  • 使用RocketMQTemplate发送带tags的消息

    RocketMQTemplate发送带tags的消息 RocketMQTemplate是RocketMQ集成到Spring cloud之后提供的个方便发送消息的模板类,它是基本Spring 的消息机制实现的,对外只提供了Spring抽象出来的消息发送接口. 在单独使用RocketMQ的时候,发送消息使用的Message是'org.apache.rocketmq.common.message'包下面的Message,而使用RocketMQTemplate发送消息时,使用的Message是org.s

  • RocketMQ-延迟消息的处理流程介绍

    概述 RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息: 预设值的延迟时间间隔为: 1s. 5s. 10s. 30s. 1m. 2m. 3m. 4m. 5m. 6m. 7m. 8m. 9m. 10m. 20m. 30m. 1h. 2h: 在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间: broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到

  • RocketMQTemplate 注入失败的解决

    RocketMQTemplate 注入失败 在使用rocketmq 发送消息时,会发现 @Autowired private RocketMQTemplate rocketMQTemplate; 注入RocketMQTemplate 失败. 解决方案 究其原因是因为,配置文件中,我们没有添加 上图中蓝色的两行代码,指定发送的组名.写上后,问题解决. 好了,再来说说RocketMQTemplate 的基本使用吧~ RocketMQTemplate的使用 1.pom.xml依赖 <dependenc

  • RocketMQ 延时级别配置方式

    RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等. 其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推. 如何配置: 在服务器端(rocketmq-broker端)的属性配置文件中加入以下行: messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 描述了各级别与延时时

  • Spark三种属性配置方式详解

    随着Spark项目的逐渐成熟, 越来越多的可配置参数被添加到Spark中来.在Spark中提供了三个地方用于配置: 1.Spark properties:这个可以控制应用程序的绝大部分属性.并且可以通过 SparkConf对象或者Java 系统属性进行设置: 2.环境变量(Environment variables):这个可以分别对每台机器进行相应的设置,比如IP.这个可以在每台机器的$SPARK_HOME/ conf/spark-env.sh脚本中进行设置: 3.日志:所有的日志相关的属性可以

  • Nginx自定义访问日志的配置方式

    前言 Nginx日志主要分为两种:访问日志和错误日志.日志开关在Nginx配置文件(/etc/nginx/nginx.conf)中设置,两种日志都可以选择性关闭,默认都是打开的. 访问日志主要记录客户端访问Nginx的每一个请求,格式可以自定义.通过访问日志,你可以得到用户地域来源.跳转来源.使用终端.某个URL访问量等相关信息. 日志的重要性不言而喻,一般来说我们开发过程中会为每个项目定义自己的日志格式以及存储路径. 就我们普通的JAVAWEB项目来说,重要的日志一般输出并存放在Tomcat的

  • Java中 log4j日志级别配置详解

    1.1 前言 说出来真是丢脸,最近被公司派到客户公司面试外包开发岗位,本来准备了什么redis.rabbitMQ.SSM框架的相关面试题以及自己做过的一些项目回顾,信心满满地去面试,结果别人一上来就问到了最近项目使用的日志系统是什么?日志级别是怎么配置的?当时我都蒙X了,平时都是项目经理搭的,我自己也是随便上网一搜往配置文件一黏贴就OK了.我就这么说完后面试官深深定了我一眼,当时我的内心羞愧到...... 1.2 闲话少说,讲讲日志的发展故事(如果已经了解的可以跳过,直接看1.3日志配置) 要想

  • AOP之事务管理<aop:advisor>的两种配置方式

    目录 AOP事务管理<aop:advisor>两种配置方式 方式一 方式二 hibernate事务配置Aop aop:advisor模式 AOP事务管理<aop:advisor>两种配置方式 方式一 @transactionManagerbean.xml <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.o

  • Vue中的ESLint配置方式

    目录 说说Vue项目中添加ESLint的规则 第一步:安装 第二步:配置.eslintrc文件 第三步:如图配置 用惯了组里统一的代码风格,再看默认的vue项目代码很是别扭,记录下ESLint配置,以后直接copy paste,美滋滋 官网:https://eslint.org/docs/rules/ ESLint配置方式 ESLint有三种配置方式,一般自定义规则较少时,选第2种:自定义配置很多,建议第1种:第3种常用于对某句代码禁用eslint. 1.在package.json中通过esli

  • 关于react-router的几种配置方式详解

    本文介绍关于react-router的几种配置方式详解,分享给大家,具体如下: 路由的概念 路由的作用就是将url和函数进行映射,在单页面应用中路由是必不可少的部分,路由配置就是一组指令,用来告诉router如何匹配url,以及对应的函数映射,即执行对应的代码. react-router 每一门JS框架都会有自己定制的router框架,react-router就是react开发应用御用的路由框架,目前它的最新的官方版本为4.1.2.本文给大家介绍的是react-router相比于其他router

  • hibernate4基本配置方式详解

    可编程的配置方式-1 如果在配置cfg.xml的时候,不想在里面配置hbm.xml怎么办呢?可在程序里使用可编程的配置方式,也就是使用程序来指定在cfg.xml里面的配置信息,不推荐这种方式.如下: Configuration cfg= new Configuration() .addResource("Item.hbm.xml") .addResource("Bid.hbm.xml"); 一个替代方法(有时是更好选择)是,指定被映射的类,让Hibernate帮你寻

  • IIS下PHP的三种配置方式对比

    在Windows IIS 6.0下配置PHP,通常有CGI.ISAPI和FastCGI三种配置方式,这三种模式都可以在IIS 6.0下成功运行,下面我就讲一下这三种方式配置的区别和性能上的差异.   1.CGI(通用网关接口/Common Gateway Interface)一般是可执行程序,例如EXE文件,和WEB服务器各自占据着不同的进程,而且一般一个CGI程序只能处理一个用户请求.这样,当用户请求数量非常多时,会大量占用系统的资源,如内存.CPU时间等,造成效能低下.   2.ISAPI(

  • spring aop两种配置方式

    第一种:注解配置AOP 注解配置AOP(使用 AspectJ 类库实现的),大致分为三步: 1. 使用注解@Aspect来定义一个切面,在切面中定义切入点(@Pointcut),通知类型(@Before, @AfterReturning,@After,@AfterThrowing,@Around). 2. 开发需要被拦截的类. 3. 将切面配置到xml中,当然,我们也可以使用自动扫描Bean的方式.这样的话,那就交由Spring AoP容器管理. 另外需要引用 aspectJ 的 jar 包:

随机推荐