RocketMQ消息丢失场景以及解决方法

既然使用在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题。在一些涉及到了金钱交易的场景下,消息丢失还是很致命的。那么在RocketMQ中存在哪几种消息丢失的场景呢?

先来一张最简单的消费流程图:

上图中大致包含了这么几种场景:

生产者产生消息发送给RocketMQRocketMQ接收到了消息之后,必然需要存到磁盘中,否则断电或宕机之后会造成数据的丢失消费者从RocketMQ中获取消息消费,消费成功之后,整个流程结束

这三种场景都可能会产生消息的丢失,如下图所示:

场景1中生产者将消息发送给Rocket MQ的时候,如果出现了网络抖动或者通信异常等问题,消息就有可能会丢失场景2中消息需要持久化到磁盘中,这时会有两种情况导致消息丢失

①RocketMQ为了减少磁盘的IO,会先将消息写入到os cache中,而不是直接写入到磁盘中,消费者从os cache中获取消息类似于直接从内存中获取消息,速度更快,过一段时间会由os线程异步的将消息刷入磁盘中,此时才算真正完成了消息的持久化。在这个过程中,如果消息还没有完成异步刷盘,RocketMQ中的Broker宕机的话,就会导致消息丢失

②如果消息已经被刷入了磁盘中,但是数据没有做任何备份,一旦磁盘损坏,那么消息也会丢失消费者成功从RocketMQ中获取到了消息,还没有将消息完全消费完的时候,就通知RocketMQ我已经将消息消费了,然后消费者宕机,但是RocketMQ认为消费者已经成功消费了数据,所以数据依旧丢失了

那么如何保证消息的零丢失呢?

1、场景1中保证消息不丢失的方案是使用RocketMQ自带的事务机制来发送消息,大致流程为

①首先生产者发送half消息到RocketMQ中,此时消费者是无法消费half消息的,若half消息就发送失败了,则执行相应的回滚逻辑

②half消息发送成功之后,且RocketMQ返回成功响应,则执行生产者的核心链路

③如果生产者自己的核心链路执行失败,则回滚,并通知RocketMQ删除half消息

④如果生产者的核心链路执行成功,则通知RocketMQ commit half消息,让消费者可以消费这条数据

其中还有一些RocketMQ长时间没有收到生产者是要commit/rollback操作的响应,回调生产者接口的细节,感兴趣的可以参考文末的 RocketMQ分布式事务原理

在使用了RocketMQ事务将生产者的消息成功发送给RocketMQ,就可以保证在这个阶段消息不会丢失在场景2中要保证消息不丢失,首先需要将os cache的异步刷盘策略改为同步刷盘,这一步需要修改Broker的配置文件,将flushDiskType改为SYNC_FLUSH同步刷盘策略,默认的是ASYNC_FLUSH异步刷盘。一旦同步刷盘返回成功,那么就一定保证消息已经持久化到磁盘中了;为了保证磁盘损坏不会丢失数据,我们需要对RocketMQ采用主从机构,集群部署,Leader中的数据在多个Follower中都存有备份,防止单点故障。在场景3中,消息到达了消费者,RocketMQ在代码中就能保证消息不会丢失

//注册消息监听器处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
 @Override
 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
  //对消息进行处理
  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
});

上面这段代码中,RocketMQ在消费者中注册了一个监听器,当消费者获取到了消息,就会去回调这个监听器函数,去处理里面的消息

当你的消息处理完毕之后,才会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
只有返回了CONSUME_SUCCESS,消费者才会告诉RocketMQ我已经消费完了,此时如果消费者宕机,消息已经处理完了,也就不会丢失消息了

如果消费者还没有返回CONSUME_SUCCESS时就宕机了,那么RocketMQ就会认为你这个消费者节点挂掉了,会自动故障转移,将消息交给消费者组的其他消费者去消费这个消息,保证消息不会丢失

为了保证消息不会丢失,在consumeMessage方法中就直接写消息消费的业务逻辑就可以了,如果非要搞一些骚操作,比如下面的代码

//注册消息监听器处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
 @Override
 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
 	//开启子线程异步处理消息
 	new Thread() {
			public void run() {
				//对消息进行处理
			}
		}.start();
  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
});

如果新开子线程异步处理消息的话,就有可能出现消息还没有被消费完,消费者告诉RocketMQ消息已经被消费了,结果宕机丢失消息的情况。

使用上面一整套的方案就可以在使用RocketMQ时保证消息零丢失,但是性能和吞吐量也将大幅下降

使用事务机制传输消息,会比普通的消息传输多出很多步骤,耗费性能同步刷盘相比异步刷盘,一个是存储在磁盘中,一个存储在内存中,速度完全不是一个数量级主从架构的话,需要Leader将数据同步给Follower消费时无法异步消费,只能等待消费完成再通知RocketMQ消费完成

消息零丢失是一把双刃剑,要想用好,还是要视具体的业务场景而定,选择合适的方案才是最好的

RocketMQ分布式事务原理

分布式事务常见的方案有TCC(Try-Confirm-Cancel),XA两阶段提交方案,可靠消息最终一致性方案,最大努力通知方案等等。其中可靠消息最终一致性方案主要就可以依靠RocketMQ来做,因为RocketMQ支持消息事务。先上一张图:

RocketMQ 事务消息的实现步骤如下:

  1. Producer发送half message给RocketMQ
  2. RocketMQ返回half message success(half message发送成功之后RocketMQ的消费者并不能消费这条消息,因为消息存储在Topic为 RMQ_SYS_TRANS_HALF_TOPIC 的消息消费队列中,而不是原先的Topic)
  3. 执行核心交易链路
  4. 返回执行交易链路的结果,如果失败则回滚
  5. 如果执行成功,则Producer返回一个COMMIT状态给RocketMQ
  6. 如果RocketMQ迟迟收不到Producer的返回结果,即这条消息的状态为UNKNOWN,则会回调服务接口,查询这条消息到底是commit还是rollback
  7. RocketMQ确认消息为commit,则Consumer可以消费到这条消息
  8. Consumer操作数据库,执行自己的事务
  9. Consumer成功消费之后返回一个ACK消息给RocketMQ,如果成功消费则显示消费成功,否则RocketMQ会重发消息给Consumer继续消费

RocketMQ 事务消息的实现原理基于两阶段提交和定时事务状态回查来决定消息最终是提交还是回滚,RocketMQ 先执行第一部分的事务,如果失败则回滚,如果成功则定时任务会去回查到事务执行成功,这个时候通知消费者执行第二阶段的事务,如果失败则不断重发消息给消费者消费,如果成功则整个流程走完,保证了事务的原子性。

总结

到此这篇关于RocketMQ消息丢失场景以及解决方法的文章就介绍到这了,更多相关RocketMQ消息丢失场景内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 使用Kotlin+RocketMQ实现延时消息的示例代码

    一. 延时消息 延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 使用延时消息的典型场景,例如: 在电商系统中,用户下完订单30分钟内没支付,则订单可能会被取消. 在电商系统中,用户七天内没有评价商品,则默认好评. 这些场景对应的解决方案,包括: 轮询遍历数据库记录 JDK 的 DelayQueue ScheduledExecutorService 基于 Quartz 的定时任务 基于 Redis 的 zset 实现延时队列. 除此之外,

  • RocketMQ获取指定消息的实现方法(源码)

    概要 消息查询是什么? 消息查询就是根据用户提供的msgId从MQ中取出该消息 RocketMQ如果有多个节点如何查询? 问题:RocketMQ分布式结构中,数据分散在各个节点,即便是同一Topic的数据,也未必都在一个broker上.客户端怎么知道数据该去哪个节点上查? 猜想1:逐个访问broker节点查询数据 猜想2:有某种数据中心存在,该中心知道所有消息存储的位置,只要向该中心查询即可得到消息具体位置,进而取得消息内容 实际: 1.消息Id中含有消息所在的broker的地址信息(IP\Po

  • RocketMq事务消息发送代码流程详解

    一.RocketMq事务消息流程: 1.首先会向broker发送一个预请求消息,消费者不可见 2.回调执行本地事务(比如操作数据库) 3.事务执行成功后,再次发送消息给broker,告诉broker事务执行成功这个消息要提交,让消费者可见.如果本地事务执行超时,会返回一个unknow,broker会发送一个消息回查,检查消息是否执行成功. 二.RocketMq事务消息实例: 1.引入rocketMq相关的依赖: <dependency> <groupId>org.apache.ro

  • RocketMQ重试机制及消息幂代码实例解析

    这篇文章主要介绍了RocketMQ重试机制及消息幂代码实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.重试机制 1.由于MQ经常处于复杂的分布式系统中,考虑网络波动,服务宕机,程序异常因素,很有可能出现消息发送或者消费失败的问题.因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点.如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响.所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好

  • java rocketmq--消息的产生(普通消息)

    前言 与消息发送紧密相关的几行代码: 1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); 2. producer.start(); 3. Message msg = new Message(...) 4. SendResult sendResult = producer.send(msg); 5. producer.shutdown(); 那这几行代码执行时,背后都做了什么? 一. 首先

  • RocketMQ消息丢失场景以及解决方法

    既然使用在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题.在一些涉及到了金钱交易的场景下,消息丢失还是很致命的.那么在RocketMQ中存在哪几种消息丢失的场景呢? 先来一张最简单的消费流程图: 上图中大致包含了这么几种场景: 生产者产生消息发送给RocketMQRocketMQ接收到了消息之后,必然需要存到磁盘中,否则断电或宕机之后会造成数据的丢失消费者从RocketMQ中获取消息消费,消费成功之后,整个流程结束 这三种场景都可能会产生消息的丢失,如下图所示: 场景1中生产者将消息发送

  • 5分钟快速了解数据库死锁产生的场景和解决方法

    前言 加锁(Locking)是数据库在并发访问时保证数据一致性和完整性的主要机制.任何事务都需要获得相应对象上的锁才能访问数据,读取数据的事务通常只需要获得读锁(共享锁),修改数据的事务需要获得写锁(排他锁).当两个事务互相之间需要等待对方释放获得的资源时,如果系统不进行干预则会一直等待下去,也就是进入了死锁(deadlock)状态. 以下内容适用于各种常见的数据库管理系统,包括 Oracle.MySQL.Microsoft SQL Server 以及 PostgreSQL 等. 死锁是如何产生

  • Ajax跨域访问Cookie丢失问题的解决方法

    ajax跨域访问,可以使用jsonp方法或设置Access-Control-Allow-Origin实现,关于设置Access-Control-Allow-Origin实现跨域访问可以参考之前我写的文章<ajax 设置Access-Control-Allow-Origin实现跨域访问> 1.ajax跨域访问,cookie丢失 首先创建两个测试域名 a.fdipzone.com 作为客户端域名 b.fdipzone.com 作为服务端域名 测试代码 setcookie.PHP 用于设置服务端co

  • javascript小数精度丢失的完美解决方法

    原因:js按照2进制来处理小数的加减乘除,在arg1的基础上 将arg2的精度进行扩展或逆扩展匹配,所以会出现如下情况. javascript(js)的小数点加减乘除问题,是一个js的bug如0.3*1 = 0.2999999999等,下面列出可以完美求出相应精度的四种js算法 function accDiv(arg1,arg2){ var t1=0,t2=0,r1,r2; try{t1=arg1.toString().split(".")[1].length}catch(e){} t

  • gson ajax 数字精度丢失问题的解决方法

    ajax传输的json,gson会发生丢失,long > 15的时候会丢失0 解决方案:直接把属性为long的属性自动加上双引号成为js的字符串,这样就不会发生丢失了,ajax自动识别为字符串. 用法: ajaxResult("",0,new Object()); //随便一个对象就可以,List 之类的 /** * 以Ajax方式输出常规操作结果 * * @param status * 返回状态,200表示成功, 500表示错误 * @param message * 操作结果描

  • Nginx session丢失问题处理解决方法

    在用nginx的反向代理tomcat的路径中,可能会出现session丢失问题.每发送一次请求 JESSIONID  都会发生改变,说明上一次形成的session丢失,从而创建新的session. 第一种情况: server{ listen 80; server_name www.jiahemdata.com www.jiahemdata.cn; charset utf-8; location /{ proxy_redirect off; proxy_pass http://127.0.0.1:

  • adb push中文路径文件名丢失后缀的解决方法

    adb 的一个BUG 今天刷机的时候,用以下命令多次 push 安装包到手机: adb push F:\刷机\Nexus5\lineage-14.1-20170314-nightly-hammerhead-signed.zip /sdcard/ 但在recovery 中 install 选择更新包时找不到更新包. 后来终于发现 push 过去的文件丢失了.zip后缀,自然找不到更新包了.重命名后一切正常. 经试验,并非文件名太长,或者路径太深造成. 下载了最新的 adb tools 做测试,发现

  • Django数据库连接丢失问题的解决方法

    问题 在Django中使用mysql偶尔会出现数据库连接丢失的情况,错误通常有如下两种 OperationalError: (2006, 'MySQL server has gone away') OperationalError: (2013, 'Lost connection to MySQL server during query') 查询mysql全局变量SHOW GLOBAL VARIABLES;可以看到wait_timeout,此变量表示连接空闲时间.如果客户端使用一个连接查询多次数

  • 针对Sqlserver大数据量插入速度慢或丢失数据的解决方法

    我的设备上每秒将2000条数据插入数据库,2个设备总共4000条,当在程序里面直接用insert语句插入时,两个设备同时插入大概总共能插入约2800条左右,数据丢失约1200条左右,测试了很多方法,整理出了两种效果比较明显的解决办法: 方法一:使用Sql Server函数: 1.将数据组合成字串,使用函数将数据插入内存表,后将内存表数据复制到要插入的表. 2.组合成的字符换格式:'111|222|333|456,7894,7458|0|1|2014-01-01 12:15:16;1111|222

  • C#向图片添加水印的两种不同场景与解决方法

    场景一 也就是大家经常用的,一般是图片的4个角落,基于横纵坐标来添加. 效果如下: 添加水印方法 static void addWatermarkText(Graphics picture,int fontsize, string _watermarkText, string _watermarkPosition, int _width, int _height) { int[] sizes = new int[] {32, 14, 12, 10, 8, 6, 4 }; Font crFont

随机推荐