RocketMQ特性Broker存储事务消息实现

目录
  • 引言
  • TransactionalMessageService
  • 处理事务消息
    • 第一处:
    • 第二处:

引言

Broker中,事务消息的初始化是通过BrokerController.initialTransaction()方法执行的。

private void initialTransaction() {
    this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
    if (null == this.transactionalMessageService) {
        this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
        LOG.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
    }
    this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
    if (null == this.transactionalMessageCheckListener) {
        this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
        LOG.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
    }
    this.transactionalMessageCheckListener.setBrokerController(this);
    this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}

这里有三个核心的初始化变量

TransactionalMessageService

事务消息主要处理服务。默认实现类是TransactionalMessageServiceImpl也可以自己定义事务消息处理实现类,通过ServiceProvider.loadClass()方法进行加载。

TransactionalMessageService类定义如下。内部属性已加注释标明。

public interface TransactionalMessageService {
    //用于保存Half事务消息
    PutMessageResult prepareMessage(MessageExtBrokerInner messageInner);
    CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner);
    //删除事务消息
    boolean deletePrepareMessage(MessageExt messageExt);
    //提交事务消息
    OperationResult commitMessage(EndTransactionRequestHeader requestHeader);
    //回滚事务消息
    OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader);
    void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener);
    //打开事务消息
    boolean open();
    //关闭事务消息
    void close();
}

transactionalMessageCheckListener

事务消息回查监听器

transactionalMessageCheckService

事务消息回查服务,启动一个线程定时检查超时的Half消息是否需要回查。

处理事务消息

当初始化完成之后,Broker就可以处理事务消息了。

Broker存储事务消息的是org.apache.rocketmq.broker.processor.SendMessageProcessor,这和普通消息其实是一样的。

但是有两点针对事务消息的特殊处理

第一处:

org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage中:

//获取扩展字段的值,若是该值为true则为事务消息
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
boolean sendTransactionPrepareMessage = false;
if (Boolean.parseBoolean(traFlag)
    && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) {
    //判断当前Broker配置是否支持事务消息
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(
            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                + "] sending transaction message is forbidden");
        return response;
    }
    sendTransactionPrepareMessage = true;
}
if (sendTransactionPrepareMessage) {
    //保存Half信息
    putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
    putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}

第二处:

存储事务消息前的预处理,对应方法是

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    //将原消息的topic保存在扩展字段中
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    //将原消息的QueueId保存在扩展字段中
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    //将原消息的SysFlag保存在扩展字段中
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    //修改topic的值为RMQ_SYS_TRANS_HALF_TOPIC
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    //修改Queueid为0
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

完成上述步骤之后,调用DefaultMessageStole.putMessage()方法将其保存到CommitLog中。

CommitLog存储成功之后,通过org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend()方法对其进行处理。

final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
    // Prepared and Rollback message is not consumed, will not enter the consume queue
    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
        queueOffset = 0L;
        break;
    case MessageSysFlag.TRANSACTION_NOT_TYPE:
    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
    default:
        break;
}

这里的逻辑是这样的,当读到的消息类型为事务消息时,设置当前消息的位点值为0,而不是设置真实的位点。这样该位点就不会建立ConsumeQueue索引,也不会被消费

以上就是RocketMQ特性Broker存储事务消息实现的详细内容,更多关于RocketMQ Broker存储事务消息的资料请关注我们其它相关文章!

(0)

相关推荐

  • Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

    目录 一.创建Springboot项目添加rockermq依赖 二.配置rocketmq 三.新建一个controller来做消息发送 四.创建消费端监听消息消费消息 五.启动服务测试顺序消息发送与消费 如何实现顺序消息? 需要程序保证发送和消费的是同一个 Queue rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是順序消费消息的:有時候,我们需要顺序消费一批消息,比如电商系统 订单创建.支付.完成操作,需要順序执行: RocketMQTemplat

  • Springboot详解RocketMQ实现消息发送与接收流程

    springboot+rockermq 实现简单的消息发送与接收 普通消息的发送方式有3种:单向发送.同步发送和异步发送. 下面来介绍下 springboot+rockermq 整合实现 普通消息的发送与接收 创建Springboot项目,添加rockermq 依赖 <!--rocketMq依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-

  • java rocketmq--消息的产生(普通消息)

    前言 与消息发送紧密相关的几行代码: 1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); 2. producer.start(); 3. Message msg = new Message(...) 4. SendResult sendResult = producer.send(msg); 5. producer.shutdown(); 那这几行代码执行时,背后都做了什么? 一. 首先

  • Springboot详解RocketMQ实现广播消息流程

    RocketMQ消息模式主要有两种:广播模式.集群模式(负载均衡模式) 广播模式是每个消费者,都会消费消息: 负载均衡模式是每一个消费只会被某一个消费者消费一次: 我们业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮箱,手机,站内提示: 我们可以通过@RocketMQMessageListener的messageModel属性值来设置,MessageModel.BROADCASTING是广播模式,MessageModel.CLUSTERING是默认集群负载均衡

  • RocketMQ消息发送流程源码剖析

    目录 正文 读源码 1 调用defaultMQProducerImpl.send() 2 设置过期时间 3 执行defaultMQProducerImpl.sendDefaultImpl()方法 sendDefaultImpl是发送消息的核心方法. 1 两个校验 2 获取topic路由信息 3 计算重试次数 4 执行队列选择方法 5 发送消息 正文 就是说,我们打了个比方,把RocketMQ比作码头上的一个小房子,来送孩子登船的家长比作生产者,拉走孩子们的船夫比作消费者,所以,RocketMQ的

  • RocketMQ普通消息实战演练详解

    目录 引言 普通消息同步发送 普通消息异步发送 普通消息单向发送 集群消费模式 广播消费模式 引言 之前研究了RocketMQ的源码,在这里将各种消息发送与消费的demo进行举例,方便以后使用的时候CV. 相关的配置,安装和启动在这篇文章有相关讲解  https://www.jb51.net/article/260237.htm 普通消息同步发送 同步消息是指发送出消息后,同步等待,直到接收到Broker发送成功的响应才会继续发送下一个消息.这个方式可以确保消息发送到Broker成功,一些重要的

  • SpringBoot集成RocketMQ发送事务消息的原理解析

    目录 简介 原理 具体实现 消费者 消费者 生产者消息监听器 消息事务测试 正常测试 异常测试 代码调整 执行结果 总结 简介 RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败.RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致. 原理 RocketMQ事务消息通过异步确保方式,保证事务的最终一致性.设计的思想可以借鉴两个阶段提交事

  • RocketMQ特性Broker存储事务消息实现

    目录 引言 TransactionalMessageService 处理事务消息 第一处: 第二处: 引言 在Broker中,事务消息的初始化是通过BrokerController.initialTransaction()方法执行的. private void initialTransaction() { this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE

  • RocketMq事务消息发送代码流程详解

    一.RocketMq事务消息流程: 1.首先会向broker发送一个预请求消息,消费者不可见 2.回调执行本地事务(比如操作数据库) 3.事务执行成功后,再次发送消息给broker,告诉broker事务执行成功这个消息要提交,让消费者可见.如果本地事务执行超时,会返回一个unknow,broker会发送一个消息回查,检查消息是否执行成功. 二.RocketMq事务消息实例: 1.引入rocketMq相关的依赖: <dependency> <groupId>org.apache.ro

  • RocketMQ事务消息保证消息的可靠性和一致性

    这篇讲解一下rocketMq的事务消息的原理 在发送事务消息的时候,会加一个标识,表示这个消息是事务消息.broker接收到消息后,在我们之前看的代码里org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage会判断是否是事务消息. if (sendTransactionPrepareMessage) { asyncPutMessageFuture = this.brokerController.getTransac

  • RocketMQ事务消息原理与使用详解

    目录 一.RocketMQ事务消息概要 二.RocketMQ事务消息使用案例 (1).定义消息监听器 (2).定义消息生产者 (3).定义消息消费者 (4).观察生产者控制台输出 (5).观察消费者控制台输出 三.RocketMQ事务消息原理 四.RocketMQ事务消息使用限制 一.RocketMQ事务消息概要 RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败.RocketMQ的事务消息提供类

  • 微服务架构设计RocketMQ进阶事务消息原理详解

    目录 前言 RocketMQ事务流程概要 RocketMQ事务流程关键 实现 基础配置 引入组件 添加配置 发送半消息 执行本地事务与回查 消费消息 测试 总结 前言 分布式消息选型的时候是否支持事务消息是一个很重要的考量点,而目前只有RocketMQ对事务消息支持的最好.今天我们来唠唠如何实现RocketMQ的事务消息! Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败

  • RocketMQ事务消息图文示例讲解

    RocketMQ 也允许我们像mysql 一样发送具有事务特征的消息 MQ 的事务流程(本地代码正常执行) MQ 的消息补偿过程(当本地代码执行失败时) MQ 消息的三种状态 提交状态:允许进入队列,此消息与非事务消息无区别 回滚状态:不允许进入队列,此消息等同于未发送过 中间状态:完成了 half 消息的发送,未对 MQ 进行二次状态确认(未知状态) 注意:事务消息仅与生产者有关,与消费者无关 生产者代码(提交状态.回滚状态): public class Producer { public s

  • java发送kafka事务消息的实现方法

    前言 事务对java开发的同学来说并不陌生,我们使用事务的目的在于避免产生重复数据或者说利用数据存储中间件的事务特性确保数据的精准性,比如大家熟悉的mysql,我们在程序开始时,只需要在程序中添加上事务注解即可 kafka客户端事务,直接使用客户端提供的相关的API即可,和jdbc事务的使用很类似,主要包含下面5个API // 1 初始化事务 void initTransactions(); // 2 开启事务 void beginTransaction() throws ProducerFen

  • RocketMQ生产者调用start发送消息原理示例

    目录 RocketMQ发送消息 start()里面究竟做了什么操作 小结 RocketMQ发送消息 我们在使用RocketMQ发送消息时,一般都会使用DefaultMQProducer,类型的代码如下: DefaultMQProducer producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr("42.192.50.8:9876"); try { producer.sta

  • rocketmq如何修改存储路径

    一.下载rocketmq对应版本源码 修改消息存储路径需要修改rocketmq源码,因为rocketmq取的默认路径是user.home路径,也就是用户的根目录,如下所示 直接修改用户的user.home比较麻烦,我们打算直接修改源码里写死的路径,然后重新打包 下载rocketmq源码可以去GitHub,路径为https://github.com/apache/rocketmq 如果要下4.7.1版本的源码包可以选择对应release包 例如使用的rocketmq版本为4.7.1,则下载路径为h

随机推荐