深入浅出RocketMQ的事务消息

目录
  • 事务消息发送流程
  • 发送事务消息源码分析
  • 事务消息回查

事务消息发送流程

半消息实现了分布式环境下的数据一致性的处理,生产者发送事务消息的流程如上图所示,通过对源码的学习,我们可以弄清楚下面几点,也是半消息机制的核心:

1.为什么prepare消息不会被Consumer消费?

2.事务消息是如何提交和回滚的?

3.定时回查本地事务状态的实现细节。

发送事务消息源码分析

发送事务消息方法TransactionMQProducer.sendMessageInTransaction:

  • msg:消息
  • tranExecuter:本地事务执行器
  • arg:本地事务执行器参数
public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        TransactionListener transactionListener = getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }

        // 忽视消息延迟的属性
        if (msg.getDelayTimeLevel() != 0) {
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        }

        Validators.checkMessage(msg, this.defaultMQProducer);

		// 发送半消息
        SendResult sendResult = null;
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }

		// 处理发送半消息的结果
        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
        	// 发送半消息成功,执行本地事务逻辑
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    // 执行本地事务逻辑
                    if (null != localTransactionExecuter) {
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            // 发送半消息失败,标记本地事务状态为回滚
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

		// 结束事务,设置消息 COMMIT / ROLLBACK
        try {
            this.endTransaction(msg, sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }

		// 返回事务发送结果
        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());

        // 提取Prepared消息的uniqID
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }

该方法的入参包含有一个需要用户实现本地事务的LocalTransactionExecuter executer,executer中会进行事务操作以保证本地事务和消息发送这两个操作的原子性。

由上面的源码可知:

Producer会首先发送一个半消息到Broker中:

  • 半消息发送成功,执行事务
  • 半消息发送失败,不执行事务

半消息发送到Broker后不会被Consumer消费掉的原因有以下两点:

  • Broker在将消息写入CommitLog时会判断消息类型,如果是prepare或者rollback消息,ConsumeQueue的offset不变
  • Broker在构造ConsumeQueue时会判断是否是处于prepare或者rollback状态的消息,如果是则不会将该消息放入ConsumeQueue里,Consumer在拉取消息时也就不会拉取到这条消息

Producer会根据半消息的发送结果和本地任务执行结果来决定如何处理事务(commit或rollback),方法最后调用了endTransaction来处理事务的执行结果,源码如下:

  • sendResult:发送半消息的结果
  • localTransactionState:本地事务状态
  • localException:执行本地事务逻辑产生的异常
  • RemotingException:远程调用异常
  • MQBrokerException:Broker异常
  • InterruptedException:当线程中断异常
  • UnknownHostException:未知host异常
public void endTransaction(
        final Message msg,
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        // 解码消息id
        final MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }

		// 创建请求
        String transactionId = sendResult.getTransactionId();
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }

        doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;

		// 提交 commit / rollback 消息
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }

该方法是将事务执行的结果发送给Broker,再由Broker决定是否进行消息投递,执行步骤如下:

1.收到消息后先检查是否是事务消息,如果不是事务消息则直接返回

2.根据请求头里的offset查询半消息,如果查询结果为空则直接返回

3.根据半消息构造新消息,新构造的消息会被重新写入到CommitLog里,rollback消息的消息体为空

4.如果是rollback消息,则该消息不会被投递

具体原因上文中已经分析过:只有commit消息才会被Broker投递给consumer

RocketMQ会将commit消息和rollback消息都写入到commitLog里,但rollback消息的消息体为空且不会被投递,CommitLog在删除过期消息时才会将其删除。当事务commit成功之后,RocketMQ会重新封装半消息并将其投递给Consumer端消费。

事务消息回查

Broker发起

相较于普通消息,事务消息主要依赖下面三个类:

1.TransactionStateService:事务状态服务,负责对事务消息进行管理,包括存储和更新事务消息状态、回查状态等

2.TranStateTable:事务消息状态存储表,基于MappedFileQueue实现

3.TranRedoLog:TranStateTable的日志,每次写入操作都会记录日志,当Broker宕机时,可以利用这个文件做数据恢复

存储半消息到CommitLog时,使用offset索引到对应的TranStateTable的位置

到此这篇关于深入浅出RocketMQ的事务消息的文章就介绍到这了,更多相关RocketMQ事务消息内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • RocketMQ producer容错机制源码解析

    目录 1. 前言 2. 失败重试 3. 延迟故障 3.1 最普通的选择策略 3.2 延迟故障的实现 1. 前言 本文主要是介绍一下RocketMQ消息生产者在发送消息的时候发送失败的问题处理?这里有两个点,一个是关于消息的处理,一个是关于broker的处理,比如说发送消息到broker-a的broker失败了,我们可能下次就不想发送到这个broker-a,这就涉及到一个选择broker的问题,也就是选择MessageQueue的问题. 2. 失败重试 其实失败重试我们在介绍RocketMQ消息生

  • RocketMQ producer同步发送单向发送源码解析

    目录 RocketMQ生产者发送消息分为三种模式 1. 同步发送 1.1 DefaultMQProducerImpl#sendDefaultImpl 1.2 DefaultMQProducerImpl#sendKernelImpl 1.3 MQClientAPIImpl#sendMessage 1.4 MQClientAPIImpl#sendMessageSync 1.5 NettyRemotingClient#invokeSync 2. 单向发送 2.1 DefaultMQProducerIm

  • 微服务架构设计RocketMQ进阶事务消息原理详解

    目录 前言 RocketMQ事务流程概要 RocketMQ事务流程关键 实现 基础配置 引入组件 添加配置 发送半消息 执行本地事务与回查 消费消息 测试 总结 前言 分布式消息选型的时候是否支持事务消息是一个很重要的考量点,而目前只有RocketMQ对事务消息支持的最好.今天我们来唠唠如何实现RocketMQ的事务消息! Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败

  • SpringBoot集成RocketMQ发送事务消息的原理解析

    目录 简介 原理 具体实现 消费者 消费者 生产者消息监听器 消息事务测试 正常测试 异常测试 代码调整 执行结果 总结 简介 RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败.RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致. 原理 RocketMQ事务消息通过异步确保方式,保证事务的最终一致性.设计的思想可以借鉴两个阶段提交事

  • RocketMQ事务消息保证消息的可靠性和一致性

    这篇讲解一下rocketMq的事务消息的原理 在发送事务消息的时候,会加一个标识,表示这个消息是事务消息.broker接收到消息后,在我们之前看的代码里org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage会判断是否是事务消息. if (sendTransactionPrepareMessage) { asyncPutMessageFuture = this.brokerController.getTransac

  • RocketMQ事务消息原理与使用详解

    目录 一.RocketMQ事务消息概要 二.RocketMQ事务消息使用案例 (1).定义消息监听器 (2).定义消息生产者 (3).定义消息消费者 (4).观察生产者控制台输出 (5).观察消费者控制台输出 三.RocketMQ事务消息原理 四.RocketMQ事务消息使用限制 一.RocketMQ事务消息概要 RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败.RocketMQ的事务消息提供类

  • RocketMq事务消息发送代码流程详解

    一.RocketMq事务消息流程: 1.首先会向broker发送一个预请求消息,消费者不可见 2.回调执行本地事务(比如操作数据库) 3.事务执行成功后,再次发送消息给broker,告诉broker事务执行成功这个消息要提交,让消费者可见.如果本地事务执行超时,会返回一个unknow,broker会发送一个消息回查,检查消息是否执行成功. 二.RocketMq事务消息实例: 1.引入rocketMq相关的依赖: <dependency> <groupId>org.apache.ro

  • RocketMQ特性Broker存储事务消息实现

    目录 引言 TransactionalMessageService 处理事务消息 第一处: 第二处: 引言 在Broker中,事务消息的初始化是通过BrokerController.initialTransaction()方法执行的. private void initialTransaction() { this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE

  • RocketMQ事务消息图文示例讲解

    RocketMQ 也允许我们像mysql 一样发送具有事务特征的消息 MQ 的事务流程(本地代码正常执行) MQ 的消息补偿过程(当本地代码执行失败时) MQ 消息的三种状态 提交状态:允许进入队列,此消息与非事务消息无区别 回滚状态:不允许进入队列,此消息等同于未发送过 中间状态:完成了 half 消息的发送,未对 MQ 进行二次状态确认(未知状态) 注意:事务消息仅与生产者有关,与消费者无关 生产者代码(提交状态.回滚状态): public class Producer { public s

  • RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略

    目录 消息队列常见问题处理 分布式事务 什么是分布式事务 常见的分布式事务解决方案 基于MQ实现的分布式事务 本地消息表-最终一致性 MQ事务-最终一致性 RocketMQ中如何处理事务 Kafka中如何处理事务 RabbitMQ中的事务 消息防丢失 生产阶段防止消息丢失 RabbitMQ中的防丢失措施 Kafka中的防丢失措施 RocketMQ中的防丢失措施使用SYNC的发送消息方式,等待broker处理结果 存储阶段 RabbitMQ中的防丢失措施 Kafka中的防丢失措施 RocketMQ

  • kafka rabbitMQ及rocketMQ队列的消息可靠性保证分析

    目录 1.消息丢失 1.生产者发送失败 2.消费者消费失败 3.队列因为自身体原因丢失数据 2.消息顺序 1.kafka 2.rocketMQ 3.rabbitMQ 3.消息重复 1.消息丢失 1.生产者发送失败 所有消息队列都可能发生的问题 生产者发送消息后,队列未成功接收(网络原因或其他)而生产者不知情,消息丢失 生产者发送消息后,队列接收成功->生产者确认,但消息并未持久化,队列崩溃,消息丢失 针对这类问题,三种消息队列都提供了生产者消息发送确认的模式,例如将kafka的acks参数设置为

  • 使用Kotlin+RocketMQ实现延时消息的示例代码

    一. 延时消息 延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 使用延时消息的典型场景,例如: 在电商系统中,用户下完订单30分钟内没支付,则订单可能会被取消. 在电商系统中,用户七天内没有评价商品,则默认好评. 这些场景对应的解决方案,包括: 轮询遍历数据库记录 JDK 的 DelayQueue ScheduledExecutorService 基于 Quartz 的定时任务 基于 Redis 的 zset 实现延时队列. 除此之外,

随机推荐