基于rocketmq的有序消费模式和并发消费模式的区别说明

rocketmq消费者注册监听有两种模式

有序消费MessageListenerOrderly和并发消费MessageListenerConcurrently,这两种模式返回值不同。

MessageListenerOrderly

正确消费返回

ConsumeOrderlyStatus.SUCCESS

稍后消费返回

ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
MessageListenerConcurrently

正确消费返回

ConsumeConcurrentlyStatus.CONSUME_SUCCESS

稍后消费返回

ConsumeConcurrentlyStatus.RECONSUME_LATER

顾名思义,有序消费模式是按照消息的顺序进行消费,但是除此之外,在实践过程中我发现和并发消费模式还有很大的区别的。

第一,速度,下面我打算用实验来探究一下。

使用mq发送消息,消费者使用有序消费模式消费,具体的业务是阻塞100ms

Long totalTime = 0L;
Date date1 = null;
Date date2 = new Date();
new MessageListenerOrderly() {
	@Override
	public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
			ConsumeOrderlyContext context) {
        logger.info("==========CONSUME_START===========");
		logger.info(Thread.currentThread().getName()
                            + " Receive New Messages: " + msgs.size());
        try {
        	if(date1 == null)
        		date1 = new Date();//在第一次消费时初始化
        	Thread.sleep(100);
       		logger.info("total:"+(++total));
        	date2 = new Date();
       		totalTime = (date2.getTime() - date1.getTime());
       		logger.info("totalTime:"+totalTime);
            logger.info("==========CONSUME_SUCCESS===========");
            return ConsumeOrderlyStatus.SUCCESS;
        }catch (Exception e) {
            logger.info("==========RECONSUME_LATER===========");
            logger.error(e.getMessage(),e);
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
	}
}

消费100条消息

速度挺快的,为了让结果更准确,将消息加到1000条

消费1000条消息

可以看到每一条消息平均耗时25ms,然而业务是阻塞100ms,这说明有序消费模式和同步消费可能并不是一回事,那如果不阻塞代码我们再来看一下结果

不阻塞过后速度明显提高了,那么我阻塞300ms会怎么样呢?

时间相比阻塞100ms多了2倍

接下来我们测试并发消费模式

Long totalTime = 0L;
Date date1 = null;
Date date2 = new Date();
new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(
                       List< MessageExt > msgs, ConsumeConcurrentlyContext context) {  

    		logger.info(Thread.currentThread().getName()
                                 + " Receive New Messages: " + msgs.size());
    		try {
    			if(date1 == null)
    				date1 = new Date();
            	Thread.sleep(100);
           		logger.info("total:"+(++total));
           		date2 = new Date();
           		totalTime = (date2.getTime() - date1.getTime());
           		logger.info("totalTime:"+totalTime);
                logger.info("==========CONSUME_SUCCESS===========");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {
                logger.info("==========RECONSUME_LATER===========");
                logger.error(e.getMessage(),e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
    }
}

基于上次的经验,同样测试三种情况,消费1000条不阻塞,消费1000条阻塞100ms,消费1000条阻塞300ms

消费1000条不阻塞的情况

和有序消费模式差不多,快个一两秒。

消费1000条阻塞100ms

竟然比不阻塞的情况更快,可能是误差把

消费1000条阻塞300ms

速度稍慢,但是还是比有序消费快得多。

结论是并发消费的消费速度要比有序消费更快。

另一个区别是消费失败时的处理不同,有序消费模式返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT后,消费者会立马消费这条消息,而使用并发消费模式,返回ConsumeConcurrentlyStatus.RECONSUME_LATER后,要过好几秒甚至十几秒才会再次消费。

我是在只有一条消息的情况下测试的。更重要的区别是,

返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT并不会增加消息的消费次数,mq消息有个默认最大消费次数16,消费次数到了以后,这条消息会进入死信队列,这个最大消费次数是可以在mqadmin中设置的。

mqadmin updateSubGroup -n 127.0.0.1:9876 -c DefaultCluster -g MonitorCumsumerGroupName -r 3

我测试后发现,并发模式下返回ConsumeConcurrentlyStatus.RECONSUME_LATER,同一个消息到达最大消费次数之后就不会再出现了。这说明有序消费模式可能并没有这个机制,这意味着你再有序消费模式下抛出固定异常,那么这条异常信息将会被永远消费,并且很可能会影响之后正常的消息。下面依然做个试验

Map<String, Integer> map = new HashMap<>();//保存消息错误消费次数
new MessageListenerOrderly() {

	@Override
	public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
			ConsumeOrderlyContext context) {
        try {
        	if(1 == 1)
        			throw new Exception();
            return ConsumeOrderlyStatus.SUCCESS;
        }catch (Exception e) {
        	MessageExt msg = msgs.get(0);
			if(map.containsKey(msg.getKeys())) {//消息每消费一次,加1
			    map.put(msg.getKeys(), map.get(msg.getKeys()) + 1);
			}else {
			    map.put(msg.getKeys(), 1);
			}
			logger.info(msg.getKeys()+":"+map.get(msg.getKeys()));
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
	}
}

发送了十条消息

可以看到虽然我发了十条消息,但是一直在消费同样四条消息,这可能跟消息broker有默认四条队列有关系。同时从时间可以看到,消费失败后,会马上拉这条信息。

至于并发消费模式则不会无限消费,而且消费失败后不会马上再消费。具体的就不尝试了。

结论是有序消费模式MessageListenerOrderly要慎重地处理异常,我则是用全局变量记录消息的错误消费次数,只要消费次数达到一定次数,那么就直接返回ConsumeOrderlyStatus.SUCCESS。

突然想到之前测试有序消费模式MessageListenerOrderly的时候为什么1000条消息阻塞100ms耗时25000ms了,因为有序消费模式是同时拉取四条队列消息的,这就对上了。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • springboot整合rocketmq实现分布式事务

    1 执行流程 (1) 发送方向 MQ 服务端发送消息. (2) MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息. (3) 发送方开始执行本地事务逻辑. (4) 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息:MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息. (5)

  • RocketMQTemplate 注入失败的解决

    RocketMQTemplate 注入失败 在使用rocketmq 发送消息时,会发现 @Autowired private RocketMQTemplate rocketMQTemplate; 注入RocketMQTemplate 失败. 解决方案 究其原因是因为,配置文件中,我们没有添加 上图中蓝色的两行代码,指定发送的组名.写上后,问题解决. 好了,再来说说RocketMQTemplate 的基本使用吧~ RocketMQTemplate的使用 1.pom.xml依赖 <dependenc

  • 使用RocketMQTemplate发送带tags的消息

    RocketMQTemplate发送带tags的消息 RocketMQTemplate是RocketMQ集成到Spring cloud之后提供的个方便发送消息的模板类,它是基本Spring 的消息机制实现的,对外只提供了Spring抽象出来的消息发送接口. 在单独使用RocketMQ的时候,发送消息使用的Message是'org.apache.rocketmq.common.message'包下面的Message,而使用RocketMQTemplate发送消息时,使用的Message是org.s

  • 解决SpringBoot整合RocketMQ遇到的坑

    应用场景 在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group.Topic以及selectorExpression(数据过滤.选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换. 引入依赖 <!-- RocketMq Spring Boot Starter--> <dependency> <groupId>org.apache.rocketmq<

  • RocketMQ消息过滤与查询的实现

    消息过滤 RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的. RocketMQ这么做是还是在于其Producer端写入消息和Consomer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构. 其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的M

  • RocketMQ-延迟消息的处理流程介绍

    概述 RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息: 预设值的延迟时间间隔为: 1s. 5s. 10s. 30s. 1m. 2m. 3m. 4m. 5m. 6m. 7m. 8m. 9m. 10m. 20m. 30m. 1h. 2h: 在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间: broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到

  • RocketMQ消息丢失场景以及解决方法

    既然使用在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题.在一些涉及到了金钱交易的场景下,消息丢失还是很致命的.那么在RocketMQ中存在哪几种消息丢失的场景呢? 先来一张最简单的消费流程图: 上图中大致包含了这么几种场景: 生产者产生消息发送给RocketMQRocketMQ接收到了消息之后,必然需要存到磁盘中,否则断电或宕机之后会造成数据的丢失消费者从RocketMQ中获取消息消费,消费成功之后,整个流程结束 这三种场景都可能会产生消息的丢失,如下图所示: 场景1中生产者将消息发送

  • 基于rocketmq的有序消费模式和并发消费模式的区别说明

    rocketmq消费者注册监听有两种模式 有序消费MessageListenerOrderly和并发消费MessageListenerConcurrently,这两种模式返回值不同. MessageListenerOrderly 正确消费返回 ConsumeOrderlyStatus.SUCCESS 稍后消费返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT MessageListenerConcurrently 正确消费返回 Consu

  • SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

    目录 消息队列实现分布式事务原理 RocketMQ的事务消息 代码实现 基础配置 发送半消息 执行本地事务与回查 Account-Service消费消息 测试 小结 消息队列实现分布式事务原理 首先让我们来看一下基于消息队列实现分布式事务的原理方案. 柔性事务 发送消息的服务有个OUTBOX数据表,在进行INSERT.UPDATE.DELETE 业务操作时也会给OUTBOX数据表INSERT一条消息记录,这样可以保证原子性,因为这是基于本地的ACID事务. OUTBOX表充当临时消息队列,然后我

  • 基于js 各种排序方法和sort方法的区别(详解)

    今天突发奇想,想明白sort方法是否比各种排序都有优势,所以就参考别人的代码,做了一个测试,结果令人惊讶啊,上代码. <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width,initial-scale=1.0,max

  • 基于RocketMQ推拉模式详解

    消费者客户端有两种方式从消息中间件获取消息并消费.严格意义上来讲,RocketMQ并没有实现PUSH模式,而是对拉模式进行一层包装,名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现. 通过 Pull 不断轮询 Broker 获取消息.当不存在新消息时,Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息. 1.概述 1.1.PULL方式 由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息:采用Pull方式,如何设置Pull消息的拉取频率需要重点去考虑,举个

  • 基于params、@PathVariabl和@RequestParam的用法与区别说明

    方法参数相关属性params.@PathVariabl和@RequestParam的使用 [1]params params:指定request中必须包含某些参数值是,才让该方法处理. @RequestMapping(value = "testParamsAndHeaders", params = { "username","age!=10" }) public String testParamsAndHeaders() { System.out.

  • golang基于errgroup实现并发调用的方法

    目录 串行调用 基于sync.WaitGroup实现简单的并发调用 基于errgroup.Group实现并发调用 总结 串行调用 在用go编写web/rpc服务器的时候,经常会出现需要对下游多 个/组 服务调用rpc(或者其他比较耗时的操作)的情况.按照自然的写法,比如对下游有ABC三个调用,串行顺着写,就总共要花费TimeA+TimeB+TimeC的时间: func Handler(ctx context.Context) { var a, b, c respType a = A(ctx) b

  • RocketMq 消息重试机制及死信队列详解

    目录 生产者消息重试 消费者消息重试 并发消费 顺序消费 并发消费和顺序消费区别 死信队列 实践出真知 公共部分创建 测试并发消费 并发消费状态 测试顺序消费 顺序消费状态 测试死信队列 死信队列特性 生产者消息重试 消息队列中的消息消费时并不能保证总是成功的,那失败的消息该怎么进行消息补偿呢?这就用到今天的主角消息重试和死信队列了. 有时因为网路等原因生产者也可能发送消息失败,也会进行消息重试,生产者消息重试比较简单,在springboot中只要在配置文件中配置一下就可以了. # 异步消息发送

  • Go并发编程实践

    前言 并发编程一直是Golang区别与其他语言的很大优势,也是实际工作场景中经常遇到的.近日笔者在组内分享了我们常见的并发场景,及代码示例,以期望大家能在遇到相同场景下,能快速的想到解决方案,或者是拿这些方案与自己实现的比较,取长补短.现整理出来与大家共享. 简单并发场景 很多时候,我们只想并发的做一件事情,比如测试某个接口的是否支持并发.那么我们就可以这么做: func RunScenario1() { count := 10 var wg sync.WaitGroup for i := 0;

  • Python使用grequests(gevent+requests)并发发送请求过程解析

    前言 requests是Python发送接口请求非常好用的一个三方库,由K神编写,简单,方便上手快.但是requests发送请求是串行的,即阻塞的.发送完一条请求才能发送另一条请求. 为了提升测试效率,一般我们需要并行发送请求.这里可以使用多线程,或者协程,gevent或者aiohttp,然而使用起来,都相对麻烦. grequests是K神基于gevent+requests编写的一个并发发送请求的库,使用起来非常简单. 安装方法: pip install gevent grequests 项目地

  • 浅谈mybatis 乐观锁实现,解决并发问题

    情景展示: 银行两操作员同时操作同一账户就是典型的例子. 比如A.B操作员同时读取一余额为1000元的账户,A操作员为该账户增加100元,B操作员同时为该账户扣除50元,A先提交,B后提交.最后实际账户余额为1000-50=950元,但本该为1000+100-50=1050.这就是典型的并发问题. 乐观锁机制在一定程度上解决了这个问题.乐观锁,大多是基于数据版本(Version)记录机制实现.何谓数据版本?即为数据增加一个版本标识,在基于数据库表的版本解决方案中,一般是通过为数据库表增加一个 "

随机推荐