SpringBoot集成RocketMQ发送事务消息的原理解析

目录
  • 简介
  • 原理
  • 具体实现
    • 消费者
    • 消费者
    • 生产者消息监听器
  • 消息事务测试
    • 正常测试
    • 异常测试
    • 代码调整
    • 执行结果
  • 总结

简介

RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

原理

RocketMQ事务消息通过异步确保方式,保证事务的最终一致性。设计的思想可以借鉴两个阶段提交事务。其执行流程图如下:

  • 发送方向MQ服务端发送消息。
  • MQ Server将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
  • 发送方开始执行本地事务逻辑。
  • 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
  • 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
  • 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  • 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

具体实现

消费者

@Component
public class TransactionProduce
{
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendTransactionMessage(String msg)
    {
        logger.info("start sendTransMessage hashKey:{}",msg);

         Message message =new Message();
         message.setBody("this is tx message".getBytes());
         TransactionSendResult result=rocketMQTemplate.sendMessageInTransaction("test-tx-rocketmq",
                 MessageBuilder.withPayload(message).build(), msg);

         //发送状态
         String sendStatus = result.getSendStatus().name();
         // 本地事务执行状态
         String localTxState = result.getLocalTransactionState().name();
         logger.info("send tx message sendStatus:{},localTXState:{}",sendStatus,localTxState);
    }
}

说明:发送事务消息采用的是sendMessageInTransaction方法,返回结果为TransactionSendResult对象,该对象中包含了事务发送的状态、本地事务执行的状态等。

消费者

@Component
@RocketMQMessageListener(consumerGroup="test-txRocketmq-group",topic="test-tx-rocketmq", messageModel = MessageModel.CLUSTERING)
public class TransactionConsumer implements RocketMQListener<String>
{
    private Logger logger =LoggerFactory.getLogger(getClass());
    @Override
    public void onMessage(String message)
    {
        logger.info("send transaction mssage parma is:{}", message);
    }
}

说明:发送事务消息的消费者与普通的消费者一样没有太大的区别。

生产者消息监听器

发送事务消息除了生产者和消费者以外,我们还需要创建生产者的消息监听器,来监听本地事务执行的状态和检查本地事务状态。

@RocketMQTransactionListener
public class TransactionMsgListener implements RocketMQLocalTransactionListener
{
    private Logger logger = LoggerFactory.getLogger(getClass());
    /**
     * 执行本地事务
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
            Object obj)
    {
        logger.info("start invoke local rocketMQ transaction");
        RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;

        try
        {
            //处理业务
            String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            logger.info("invoke msg content:{}",jsonStr);
        }
        catch (Exception e)
        {
            logger.error("invoke local mq trans error",e);
            resultState = RocketMQLocalTransactionState.UNKNOWN;
        }

        return resultState;
    }

    /**
     * 检查本地事务的状态
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg)
    {
        logger.info("start check Local rocketMQ transaction");

        RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;

        try
        {
            String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            logger.info("check trans msg content:{}",jsonStr);
        }
        catch (Exception e)
        {
            resultState  = RocketMQLocalTransactionState.ROLLBACK;
        }
        return resultState;
    }
}

说明:RocketMQ本地事务状态由如下几种:

  • RocketMQLocalTransactionState.COMMIT:提交事务,允许消费者消费此消息。
  • RocketMQLocalTransactionState.ROLLBACK: 回滚事务,消息将被删除,不允许被消费。
  • RocketMQLocalTransactionState.UNKNOWN:中间状态,代表需要进行检查来确定状态。

注意:Spring Boot2.0的版本之后,@RocketMQTransactionListener 已经没有了txProducerGroup属性,且sendMessageInTransaction方法也将其移除。所以在同一项目中只能有一个@RocketMQTransactionListener,不能出现多个,否则会报如下错误:

java.lang.IllegalStateException: rocketMQTemplate already exists RocketMQLocalTransactionListener

消息事务测试

正常测试

c.s.fw.mq.produce.TransactionProduce - product start sendTransMessage msg:{"userId":"zhangsann"}
c.s.f.m.p.TransactionMsgListener - start invoke local rocketMQ transaction
c.s.f.m.p.TransactionMsgListener - invoke local transaction msg content:{"topic":null,"flag":0,"properties":null,"body":"dGhpcyBpcyB0eCBtZXNzYWdl","transactionId":null,"keys":null,"tags":null,"delayTimeLevel":0,"waitStoreMsgOK":true,"buyerId":null}
c.s.fw.mq.produce.TransactionProduce - send tx message sendStatus:SEND_OK,localTXState:COMMIT_MESSAGE
c.s.f.m.consumer.TransactionConsumer - send transaction mssage parma is:{"topic":null,"flag":0,"properties":null,"body":"dGhpcyBpcyB0eCBtZXNzYWdl","transactionId":null,"keys":null,"tags":null,"delayTimeLevel":0,"waitStoreMsgOK":true,"buyerId":null}

说明:通过日志我们可以看出,执行的流程与上述的一致,执行成功后,消息执行成功返回的结果为SEND_OK,本地事务执行的状态为COMMIT_MESSAGE。

异常测试

如果在执行本地消息时出现异常,那么执行结果会是怎样?修改下本地事务执行的方法,让其出现异常。

代码调整

  @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
            Object obj)
    {
        logger.info("start invoke local rocketMQ transaction");
        RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;

        try
        {
            //处理业务
            String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            logger.info("invoke local transaction msg content:{}",jsonStr);
             int c=1/0;
        }
        catch (Exception e)
        {
            logger.error("invoke local mq trans error",e);
            resultState = RocketMQLocalTransactionState.UNKNOWN;
        }

        return resultState;
    }

执行结果

c.s.fw.mq.produce.TransactionProduce - send tx message sendStatus:SEND_OK,localTXState:UNKNOW

从执行的结果可以看出,消息执行成功返回的结果为SEND_OK,本地事务执行的状态为:UNKNOW.所以消费端无法消费此消息。

总结

到此这篇关于SpringBoot集成RocketMQ发送事务消息的文章就介绍到这了,更多相关SpringBoot集成RocketMQ事务消息内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

    目录 一.创建Springboot项目添加rockermq依赖 二.配置rocketmq 三.新建一个controller来做消息发送 四.创建消费端监听消息消费消息 五.启动服务测试顺序消息发送与消费 如何实现顺序消息? 需要程序保证发送和消费的是同一个 Queue rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是順序消费消息的:有時候,我们需要顺序消费一批消息,比如电商系统 订单创建.支付.完成操作,需要順序执行: RocketMQTemplat

  • SpringBoot整合RocketMQ实现消息发送和接收的详细步骤

    我们使用主流的SpringBoot框架整合RocketMQ来讲解,使用方便快捷: 最终项目结构如下: 具体步骤如下: 第一步:新建SpringBoot项目rocketmq-test,引入rocketmq依赖,以及项目配置 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <vers

  • Springboot详解RocketMQ实现广播消息流程

    RocketMQ消息模式主要有两种:广播模式.集群模式(负载均衡模式) 广播模式是每个消费者,都会消费消息: 负载均衡模式是每一个消费只会被某一个消费者消费一次: 我们业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮箱,手机,站内提示: 我们可以通过@RocketMQMessageListener的messageModel属性值来设置,MessageModel.BROADCASTING是广播模式,MessageModel.CLUSTERING是默认集群负载均衡

  • 解决springboot集成rocketmq关于tag的坑

    springboot集成rocketmq关于tag的坑 新项目使用springboot的若依框架集成rocketmq,选择集成RocketMQTemplate这种方式实现消息的发送和接收. 1.客户端发送代码 此处回调方法里有些业务不用关注,只关心发送方法 @Component public class RocketMqHelper { Logger logger = LoggerFactory.getLogger(RocketMqHelper.class); @Resource private

  • Springboot 整合 RocketMQ 收发消息

    Springboot 整合 RocketMQ 收发消息 创建springboot项目 pom.xml添加rocketmq-spring-boot-starter依赖. <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version>

  • SpringBoot集成RocketMQ发送事务消息的原理解析

    目录 简介 原理 具体实现 消费者 消费者 生产者消息监听器 消息事务测试 正常测试 异常测试 代码调整 执行结果 总结 简介 RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败.RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致. 原理 RocketMQ事务消息通过异步确保方式,保证事务的最终一致性.设计的思想可以借鉴两个阶段提交事

  • 微服务架构设计RocketMQ进阶事务消息原理详解

    目录 前言 RocketMQ事务流程概要 RocketMQ事务流程关键 实现 基础配置 引入组件 添加配置 发送半消息 执行本地事务与回查 消费消息 测试 总结 前言 分布式消息选型的时候是否支持事务消息是一个很重要的考量点,而目前只有RocketMQ对事务消息支持的最好.今天我们来唠唠如何实现RocketMQ的事务消息! Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败

  • 微信公众号被动消息回复原理解析

    背景:某分厂需要实时查询工件堆放的位置,要求快速便捷,因此设计了采用微信公众号被动回复信息的方案. 技术实现:开发者服务器--基于Angular2框架的已发布网站,编程语言为Python,后台存储数据库为Mysql: 微信服务器--微信公众号,此业务只是处理微信客户端发送的文本信息,且不使用公众号的其他功能,因此不需要认证公众号: 微信客户端--关注公众号的微信使用者,即粉丝. 当粉丝给公众号发送特定的消息时,微信公众号自动回复相应内容,而其背后的实现原理可由下图所示: 由上图可知,粉丝(微信客

  • SpringBoot启动应用及回调监听原理解析

    这篇文章主要介绍了SpringBoot启动应用及回调监听原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 主类Main方法 public static void main(String[] args) { SpringApplication.run(SpringBootRunApplication.class, args); } 创建SpringApplication对象 public static ConfigurableApplica

  • SpringBoot集成E-mail发送各种类型邮件

    SpringBoot 集成 E-mail发送邮件,供大家参考,具体内容如下 JDK本身有自带发送邮件api,加上SpringBoot在进行封装,使得现在使用起来十分快速简洁. 话不多说,参考纯洁的微笑博客,更改jar版本为2.0.4 开干,基本没什么坑. 就是配置邮箱账号密码是,如果是qq邮箱,需要开启PO30和STMP服务,并且获取临时授权码. 开启服务链接: https://mail.qq.com/cgi-bin/frame_html?sid=a5ZSbreeNm9pHyl1&r=a8322

  • Android6.0 消息机制原理解析

    消息都是存放在一个消息队列中去,而消息循环线程就是围绕这个消息队列进入一个无限循环的,直到线程退出.如果队列中有消息,消息循环线程就会把它取出来,并分发给相应的Handler进行处理:如果队列中没有消息,消息循环线程就会进入空闲等待状态,等待下一个消息的到来.在编写Android应用程序时,当程序执行的任务比较繁重时,为了不阻塞UI主线程而导致ANR的发生,我们通常的做法的创建一个子线程来完成特定的任务.在创建子线程时,有两种选择,一种通过创建Thread对象来创建一个无消息循环的子线程:还有一

  • 微信公众平台开发之发送文本消息.Net代码解析

    .Net实现微信公共服务平台开发中的发送文本消息功能,具体内容如下 首先建立一个微信消息类. class wxmessage { public string FromUserName { get; set; } public string ToUserName { get; set; } public string MsgType { get; set; } public string EventName { get; set; } public string Content { get; se

  • 微信公众平台开发之发送图文消息.Net代码解析

    之前我们讲过让微信发送给我们普通的文本信息,下面我们来看看如何发送图文信息,需要注意的是这里说的是,让微信发给我们,而不是我们拍个图片发给微信处理,我们上传图片在以后的章节介绍.下面是发送图文消息的函数,涉及title(标题),description(摘要),picurl(图片),链接(url)几个关键的参数: protected string sendPicTextMessage(Msg _mode,string title,string description,string picurl,s

  • Springboot集成jsp及部署服务器实现原理

    1.在application配置文件里面加入配置: spring: mvc: view: prefix: /jsp/ suffix: .jsp 2.手动在src->main->下面创建webapp文件夹 3.在Springboot 启动类加入如下代码: @Bean public InternalResourceViewResolver setupViewResolver() { InternalResourceViewResolver resolver = new InternalResour

随机推荐