RocketMQ事务消息保证消息的可靠性和一致性

这篇讲解一下rocketMq的事务消息的原理

在发送事务消息的时候,会加一个标识,表示这个消息是事务消息。broker接收到消息后,在我们之前看的代码里org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage会判断是否是事务消息。

if (sendTransactionPrepareMessage) {
    asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
    asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}

sendTransactionPrepareMessage=true表示是事务消息,所以走了一个单独的逻辑。

    public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
        return store.asyncPutMessage(parseHalfMessageInner(messageInner));
    }

这里parseHalfMessageInner这个方法里面开始了偷梁换柱,把topic和queueId都改了,把原本的信息先存在变量里面。所以实际上这个消息发到了半消息专有的topic里面,topic名字叫做RMQ_SYS_TRANS_HALF_TOPIC

    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
    }

然后其他代码还是和普通的消息一样,就是把事务消息做了转发,存在了RMQ_SYS_TRANS_HALF_TOPIC里面。

到这里发送半消息就成功了,然后最后客户端发送了半消息之后,会查一下本地事务的情况是否完成。这里有3种情况:commit、rollback、未知。完成和回滚都是确认的状态,这个比较好处理,比较难的是未知。我们先看能得到确认结果的情况。

如果完成和回滚,会给客户端发送结束事务的消息,这个消息叫END_TRANSACTION,包括消息里面包括了之前发送的半消息的id和offset。

broker处理的代码在org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest中。就是根据offset拿到半消息,然后如果是commit,就是把原本的topic和queueId还原,发到原本的队列里面,这样就可以正常消费了。然后把这个半消息“删除”。如果是rollBack,也是拿到这个半消息,然后直接“删除”就可以了。接下来看一下怎么“删除”。

为什么我删除会打引号呢?因为半消息其实就是跟正常的消息一样,存在commitLog文件里面,mq的设计,就没有删除这个功能。所以所谓的删除其实就是把这个消息消费掉,不做任何处理,就是删除了。

想象一下,这个半消息有commit/rollBack/未知,3种状态,未知的肯定不能删除,那他怎么知道哪些消息是可以删除的呢?总不能所有的都再去客户端查一下事务的结果吧?mq怎么做的呢?前面提到的删除其实就是把这些commit和rollBack处理过后的半消息,再保存起来,后面消费半消息的数据的时候,只要从里面查一下是否需要删除就可以了。

这里又有一个问题,怎么把需要删除的半消息存起来呢?mq存储数据就是commitLog,所以其实这些需要删除的数据,就是又发到了一个特定的topic里面。这个topic名字是RMQ_SYS_TRANS_OP_HALF_TOPIC。主意区分,原本半消息的topic名字是half_topic,这个topic名字是op_half_topic,存储的是处理过后,可以删除的半消息。

所以说前面提到的带引号的“删除”,就是把消息发到op_half_topic就表示是删除了,这个op_half_topic消息的内容就是half_topic的offset。那么现在需要有个地方,来消费half_topic,然后判断是否存在于op_half_topic,如果是表示可以删除了,如果不是,就接着保存起来。

处理逻辑就在TransactionalMessageCheckService这个定时任务中。具体是在TransactionalMessageServiceImpl#check方法里面

    @Override
    public void check(long transactionTimeout, int transactionCheckMax,
        AbstractTransactionalMessageCheckListener listener) {
        try {
            String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
            // 先拿到半消息
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
            if (msgQueues == null || msgQueues.size() == 0) {
                log.warn("The queue of topic is empty :" + topic);
                return;
            }
            log.debug("Check topic={}, queues={}", topic, msgQueues);
            for (MessageQueue messageQueue : msgQueues) {
                long startTime = System.currentTimeMillis();
                MessageQueue opQueue = getOpQueue(messageQueue);
                // 拿到半消息的最小偏移量
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                // 拿到op_half的最小偏移量
                long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
                log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                if (halfOffset < 0 || opOffset < 0) {
                    log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                        halfOffset, opOffset);
                    continue;
                }
                List<Long> doneOpOffset = new ArrayList<>();
                HashMap<Long, Long> removeMap = new HashMap<>();
                // 拉取op的消息(32条),op消息内容是half的offset,跟half_topic的最小offset比较,如果op的小于最小的,就说明已经处理过了,放在doneOpOffset,反之,则说明还没处理过,就先放在removeMap里面
                PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
                if (null == pullResult) {
                    log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                        messageQueue, halfOffset, opOffset);
                    continue;
                }
                // single thread
                int getMessageNullCount = 1;
                long newOffset = halfOffset;
                long i = halfOffset;
                // 然后对half_topic进行处理
                while (true) {
                    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                        log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                        break;
                    }
                    // 如果这个offset已经处理过了,就接着处理下一个
                    if (removeMap.containsKey(i)) {
                        log.debug("Half offset {} has been committed/rolled back", i);
                        Long removedOpOffset = removeMap.remove(i);
                        doneOpOffset.add(removedOpOffset);
                    } else {
                        // 如果没有处理过,就要把数据捞出来重新投递
                        GetResult getResult = getHalfMsg(messageQueue, i);
                        MessageExt msgExt = getResult.getMsg();
                        if (msgExt == null) {
                            if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                                break;
                            }
                            if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                                log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                                    messageQueue, getMessageNullCount, getResult.getPullResult());
                                break;
                            } else {
                                log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                    i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                i = getResult.getPullResult().getNextBeginOffset();
                                newOffset = i;
                                continue;
                            }
                        }
                        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                            listener.resolveDiscardMsg(msgExt);
                            newOffset = i + 1;
                            i++;
                            continue;
                        }
                        if (msgExt.getStoreTimestamp() >= startTime) {
                            log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
                                new Date(msgExt.getStoreTimestamp()));
                            break;
                        }
                        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                        long checkImmunityTime = transactionTimeout;
                        String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                        if (null != checkImmunityTimeStr) {
                            checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                            if (valueOfCurrentMinusBorn < checkImmunityTime) {
                                if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                    newOffset = i + 1;
                                    i++;
                                    continue;
                                }
                            }
                        } else {
                            if (0 <= valueOfCurrentMinusBorn && valueOfCurrentMinusBorn < checkImmunityTime) {
                                log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                                    checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                                break;
                            }
                        }
                        List<MessageExt> opMsg = pullResult.getMsgFoundList();
                        boolean isNeedCheck = opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime
                            || opMsg != null && opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout
                            || valueOfCurrentMinusBorn <= -1;
                        if (isNeedCheck) {
                            // 重新投递
                            if (!putBackHalfMsgQueue(msgExt, i)) {
                                continue;
                            }
                            // 再重新确认事务
                            listener.resolveHalfMsg(msgExt);
                        } else {
                            pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                            log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                                messageQueue, pullResult);
                            continue;
                        }
                    }
                    newOffset = i + 1;
                    i++;
                }
                // 更新offset
                if (newOffset != halfOffset) {
                    transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                }
                long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                if (newOpOffset != opOffset) {
                    transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
                }
            }
        } catch (Throwable e) {
            log.error("Check error", e);
        }
    }

我讲解一下这个代码做了啥。我们先明确这个代码是要实现什么功能。就是消费half_topic,然后去根据op_half_topic的数据来判断half_topc的消息是否被处理过,处理过了就直接忽略、丢弃,如果没有处理过,就“保留”这个消息,等待后面事务确认了再处理。

这里“保留”我也是加了引号,因为mq消费是一条一条按顺序消费,如果中间有一个数据卡住了,后面数据就没法消费了。所以这里“保留”,其实也是消费了,只是他消费到了不确定结果的消息,他是重新投递到了half_topic,来实现“保留”的目的。

好了,明确了这个代码实现的功能,我们来一步步看一下细节。

首先是拿到half_topic和op_half_topic的offset,知道现在是消费到了哪里。然后去拉取op_half_topic,每次32条,op_half消息内容存的是half_topic的offset,只要判断这条op_half里面的offset小于half_topic的offset,就表示已经消费过了,放在doneOpOffset的list里面,如果op_half保存的offset大于half_topic的offset,就表示还没消费,放入removeMap,就表示这个半消息可以放心删除了。

这一步,通过消费op_half,跟half_topic的minOffset做比较,构建了doneOpOffset,和removeMap。

然后就是消费half_topic的消息,只要判断每条消息的offset是否在removeMap中,就表示可以删除,放入doneOpOffset中,直接消费下一条数据,所以这里其实也不用真的拉取half_topic的消息,只要用offset来判断就行,消费过了,offset+1,就可以去判断下一条消息。

如果half_topic的offset没有在removeMap中,就表示暂时还不知道结果,这时候就重新发送到half_topic,重新投递之后,然后给客户端发送一个检查事务的请求,客户端检测过后,还是用之前的END_TRANSACTION命令,再发给broker,broker就会放到op_half里面,等于就是重新发了一个半消息的流程,实现了闭环。

最后就是更新两个topic的offset了。之前的doneOpOffset保存下来,就是为了更新op_half的offset,只有都处理过了,才会更新,如果中间有一个没有处理,就会阻塞在那条消息。

总结:

所以现在的情况是这样的,对于half_topic的半消息如果有结果就忽略,如果没有结果就重新投递,不会阻塞,所以half_topic的offset会一直往后更新。但是op_half要等所有的都done了,才会更新offset。假设一种情况,如果op_offset1对于的是half_offset1这个消息,然后half_offset1刚好被消费,重新投递了。这是op_offset1找不到对应的半消息,所以不会被消费。但是不会被卡主,等到下次的时候,op_offset1这个数据的offset已经小于half_offset1这个消息的offset,所以这个op_offset1也会当做已经处理过了。

可以看到整个过程其实很巧妙,大家可以结合代码捋一捋。

到此这篇关于RocketMQ事务消息保证消息的可靠性和一致性的文章就介绍到这了,更多相关RocketMQ事务消息内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • RocketMQ事务消息图文示例讲解

    RocketMQ 也允许我们像mysql 一样发送具有事务特征的消息 MQ 的事务流程(本地代码正常执行) MQ 的消息补偿过程(当本地代码执行失败时) MQ 消息的三种状态 提交状态:允许进入队列,此消息与非事务消息无区别 回滚状态:不允许进入队列,此消息等同于未发送过 中间状态:完成了 half 消息的发送,未对 MQ 进行二次状态确认(未知状态) 注意:事务消息仅与生产者有关,与消费者无关 生产者代码(提交状态.回滚状态): public class Producer { public s

  • 微服务架构设计RocketMQ进阶事务消息原理详解

    目录 前言 RocketMQ事务流程概要 RocketMQ事务流程关键 实现 基础配置 引入组件 添加配置 发送半消息 执行本地事务与回查 消费消息 测试 总结 前言 分布式消息选型的时候是否支持事务消息是一个很重要的考量点,而目前只有RocketMQ对事务消息支持的最好.今天我们来唠唠如何实现RocketMQ的事务消息! Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败

  • RocketMQ特性Broker存储事务消息实现

    目录 引言 TransactionalMessageService 处理事务消息 第一处: 第二处: 引言 在Broker中,事务消息的初始化是通过BrokerController.initialTransaction()方法执行的. private void initialTransaction() { this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE

  • RocketMq事务消息发送代码流程详解

    一.RocketMq事务消息流程: 1.首先会向broker发送一个预请求消息,消费者不可见 2.回调执行本地事务(比如操作数据库) 3.事务执行成功后,再次发送消息给broker,告诉broker事务执行成功这个消息要提交,让消费者可见.如果本地事务执行超时,会返回一个unknow,broker会发送一个消息回查,检查消息是否执行成功. 二.RocketMq事务消息实例: 1.引入rocketMq相关的依赖: <dependency> <groupId>org.apache.ro

  • 深入浅出RocketMQ的事务消息

    目录 事务消息发送流程 发送事务消息源码分析 事务消息回查 事务消息发送流程 半消息实现了分布式环境下的数据一致性的处理,生产者发送事务消息的流程如上图所示,通过对源码的学习,我们可以弄清楚下面几点,也是半消息机制的核心: 1.为什么prepare消息不会被Consumer消费? 2.事务消息是如何提交和回滚的? 3.定时回查本地事务状态的实现细节. 发送事务消息源码分析 发送事务消息方法TransactionMQProducer.sendMessageInTransaction: msg:消息

  • SpringBoot集成RocketMQ发送事务消息的原理解析

    目录 简介 原理 具体实现 消费者 消费者 生产者消息监听器 消息事务测试 正常测试 异常测试 代码调整 执行结果 总结 简介 RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败.RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致. 原理 RocketMQ事务消息通过异步确保方式,保证事务的最终一致性.设计的思想可以借鉴两个阶段提交事

  • RocketMQ事务消息原理与使用详解

    目录 一.RocketMQ事务消息概要 二.RocketMQ事务消息使用案例 (1).定义消息监听器 (2).定义消息生产者 (3).定义消息消费者 (4).观察生产者控制台输出 (5).观察消费者控制台输出 三.RocketMQ事务消息原理 四.RocketMQ事务消息使用限制 一.RocketMQ事务消息概要 RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败.RocketMQ的事务消息提供类

  • RocketMQ事务消息保证消息的可靠性和一致性

    这篇讲解一下rocketMq的事务消息的原理 在发送事务消息的时候,会加一个标识,表示这个消息是事务消息.broker接收到消息后,在我们之前看的代码里org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage会判断是否是事务消息. if (sendTransactionPrepareMessage) { asyncPutMessageFuture = this.brokerController.getTransac

  • RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略

    目录 消息队列常见问题处理 分布式事务 什么是分布式事务 常见的分布式事务解决方案 基于MQ实现的分布式事务 本地消息表-最终一致性 MQ事务-最终一致性 RocketMQ中如何处理事务 Kafka中如何处理事务 RabbitMQ中的事务 消息防丢失 生产阶段防止消息丢失 RabbitMQ中的防丢失措施 Kafka中的防丢失措施 RocketMQ中的防丢失措施使用SYNC的发送消息方式,等待broker处理结果 存储阶段 RabbitMQ中的防丢失措施 Kafka中的防丢失措施 RocketMQ

  • java分布式事务之可靠消息最终一致性解决方案

    目录 一.什么是可靠消息最终一致性事务 1.本地事务与消息发送的原子性问题 2.事务参与方接收消息的可靠性 3.消息重复消费的问题 二.解决方案 1.本地消息表方案 2.RocketMQ事务消息方案 一.什么是可靠消息最终一致性事务 可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致. 此方案是利用消息中间件完成,如下图: 事务发起方(消息生产方)将消息发给消息中间

  • 分布式面试消息队列解决消息重复保证消息顺序

    目录 引言 1.面试官: 那你有考虑过消息重复问题怎么解决吗? 2.面试官: 在多集群消息架构中,如果消费端要求接收到的消息是有序的,怎么解决消息顺序消费问题? 3.面试官: 那如何做到topic不分区,能举例说明一下吗? 总结 引言 我在<项目中为什么要使用消息队列>中列举了两个使用消息队列的例子. (1)收银系统,确认收款成功,通过MQ通知给物流系统发货. (2)消费积分,用户每消费一笔给用户增加一定积分,京东豆,信用卡积分,2020年如果还没倒闭的电商平台中,可以100%的确定订单系统和

  • RocketMQ重试机制及消息幂代码实例解析

    这篇文章主要介绍了RocketMQ重试机制及消息幂代码实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.重试机制 1.由于MQ经常处于复杂的分布式系统中,考虑网络波动,服务宕机,程序异常因素,很有可能出现消息发送或者消费失败的问题.因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点.如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响.所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好

  • windows消息和消息队列实例详解

    本文详细讲述了windows消息和消息队列的原理与应用方法.分享给大家供大家参考.具体分析如下: 与基于MS - DOS的应用程序不同,Windows的应用程序是事件(消息)驱动的.它们不会显式地调用函数(如C运行时库调用)来获取输入,而是等待windows向它们传递输入. windows系统把应用程序的输入事件传递给各个窗口,每个窗口有一个函数,称为窗口消息处理函数.窗口消息处理函数处理各种用户输入,处理完成后再将控制权交还给系统.窗口消息处理函数一般是在注册一个窗口的时候指定的.你可以从典型

  • 小程序新版订阅消息模板消息

    小程序订阅消息 功能介绍 消息能力是小程序能力中的重要组成,我们为开发者提供了订阅消息能力,以便实现服务的闭环和更优的体验. 订阅消息推送位置:服务通知 订阅消息下发条件:用户自主订阅 订阅消息卡片跳转能力:点击查看详情可跳转至该小程序的页面 使用说明 步骤1:获取模板 ID 在微信公众平台手动配置获取模板 ID: 登录 https://mp.weixin.qq.com获取模板,如果没有合适的模板,可以申请添加新模板,审核通过后可使用. 配置订阅消息在这里如下图所示 步骤2:获取下发权限 详见小

随机推荐