微服务架构设计RocketMQ进阶事务消息原理详解

目录
  • 前言
  • RocketMQ事务流程概要
  • RocketMQ事务流程关键
  • 实现
    • 基础配置
      • 引入组件
      • 添加配置
    • 发送半消息
    • 执行本地事务与回查
    • 消费消息
  • 测试
  • 总结

前言

分布式消息选型的时候是否支持事务消息是一个很重要的考量点,而目前只有RocketMQ对事务消息支持的最好。今天我们来唠唠如何实现RocketMQ的事务消息!

Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。

RocketMQ事务流程概要

RocketMQ实现事务消息主要分为两个阶段:正常事务的发送及提交、事务信息的补偿流程 整体流程为:

正常事务发送与提交阶段
1、生产者发送一个半消息给MQServer(半消息是指消费者暂时不能消费的消息)
2、服务端响应消息写入结果,半消息发送成功
3、开始执行本地事务
4、根据本地事务的执行状态执行Commit或者Rollback操作

事务信息的补偿流程
1、如果MQServer长时间没收到本地事务的执行状态会向生产者发起一个确认回查的操作请求
2、生产者收到确认回查请求后,检查本地事务的执行状态
3、根据检查后的结果执行Commit或者Rollback操作
补偿阶段主要是用于解决生产者在发送Commit或者Rollback操作时发生超时或失败的情况。

RocketMQ事务流程关键

1、事务消息在一阶段对用户不可见
事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的,也就是说消费者不能直接消费。这里RocketMQ的实现方法是原消息的主题与消息消费队列,然后把主题改成RMQ_SYS_TRANS_HALF_TOPIC ,这样由于消费者没有订阅这个主题,所以不会被消费。

2、如何处理第二阶段的失败消息?
在本地事务执行完成后会向MQServer发送Commit或Rollback操作,此时如果在发送消息的时候生产者出故障了,那么要保证这条消息最终被消费,MQServer会像服务端发送回查请求,确认本地事务的执行状态。
当然了rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,RocketMQ默认回滚该消息。

3、消息状态 事务消息有三种状态:

TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息

TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。

TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。

实现

我们构建这样一个需求:用户请求订单微服务 order-service 接口删除订单(退货),删除订单后需要发送消息给用户服务account-service,用户微服务收到消息后会给用户账户增加余额。这个需求跟钱相关,肯定要保证消息的事务性,接下来我们根据上面的原理实现整个流程。

基础配置

生产者order-servcie和account-service都要引入RocketMQ相关依赖,增加RocketMQ的相关配置

引入组件

<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

添加配置

# within rocketmq
rocketmq:
  name-server: xxx.xx.x.xx:9876; xxx.xx.x.xx:9876
  producer:
    group: cloud-group

发送半消息

order-service在执行删除订单操作时发送一条半消息给MQServer,发送半消息主要是使用rocketMQTemplate.sendMessageInTransaction() 方法,发送事务消息。

@Override
public void delete(String orderNo) {
	Order order = orderMapper.selectByNo(orderNo);
	//如果订单存在且状态为有效,进行业务处理
	if (order != null && CloudConstant.VALID_STATUS.equals(order.getStatus())) {
		String transactionId = UUID.randomUUID().toString();
		//如果可以删除订单则发送消息给rocketmq,让用户中心消费消息
		rocketMQTemplate.sendMessageInTransaction("add-amount",
				MessageBuilder.withPayload(
						UserAddMoneyDTO.builder()
								.userCode(order.getAccountCode())
								.amount(order.getAmount())
								.build()
				)
				.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
				.setHeader("order_id",order.getId())
				.build()
				,order
		);
	}
}

首先先校验一下订单状态,然后发送消息给MQServer,这个逻辑大家都看得懂,

主要是关注sendMessageInTransaction() 方法,源码如下:

public TransactionSendResult sendMessageInTransaction(String destination, Message<?> message, Object arg) throws MessagingException {
	try {
		if (((TransactionMQProducer)this.producer).getTransactionListener() == null) {
			throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
		} else {
			org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
			return this.producer.sendMessageInTransaction(rocketMsg, arg);
		}
	} catch (MQClientException var5) {
		throw RocketMQUtil.convert(var5);
	}
}

该方法有三个参数:

destination:目的地(主题),这里发送给add-amount 这个主题

message:发送给消费者的消息体,需要使用 MessageBuilder.withPayload() 来构建消息

arg:参数

注意,这里我们生成了一个transactionId,并放在header中跟消息一起发送(这里实际也可以构造成一个对象,放在arg里进行发送),作用后面再讲!

执行本地事务与回查

MQServer收到半消息后会告诉生产者order-service确认收到半消息,这时候order-service需要执行本地事务,执行完本地事务后再告诉MQServer本地事务的执行状态,确认消息究竟是Commit还是Rollback。如果在告诉MQServer本地执行状态的时候出异常了还需要让MQServer能够回查到,怎么实现这一些列操作呢?

RocketMQ提供了 RocketMQLocalTransactionListener 接口,本地事务监听器,这个接口类的实现如下:

第一个方法executeLocalTransaction 为执行本地事务;
第二个方法checkLocalTransaction 为检查本地事务的执行状态,也就是回查动作。
有了这个接口类我们的执行逻辑清楚了,但是还有个问题:本地事务已经执行完成了,怎么去回查本地事务的执行结果呢?

我们可以在执行本地事务的时候同时生成一个事务日志,让本地事务与日志事务在同一个方法中,同时添加@Transactional 注解,保证两个操作事务是一个原子操作。这样如果事务日志表中有这个本地事务的信息,那就代表本地事务执行成功,需要Commit,相反如果没有对应的事务日志,则表示没执行成功,需要Rollback

思路既然理顺了,咱们就开撸。

首先创建一个日志表

很简单的三个字段,主要是这个事务id,需要根据这个事务id回查事务,还记得我们在发送半消息时生成的事务id吗,就是干这个用的!

在生产者编写方法实现RocketMQLocalTransactionListener

@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddUserAmountListener implements RocketMQLocalTransactionListener {
    private final OrderService orderService;
    private final RocketMqTransactionLogMapper rocketMqTransactionLogMapper;
    /**
     * 执行本地事务
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info("执行本地事务");
        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer orderId = Integer.valueOf((String)headers.get("order_id"));
        log.info("transactionId is {}, orderId is {}",transactionId,orderId);

        try{
            //执行本地事务,并记录日志
            orderService.changeStatuswithRocketMqLog(orderId, CloudConstant.INVALID_STATUS,transactionId);
            //执行成功,可以提交事务
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 本地事务的检查,检查本地事务是否成功
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {

        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info("检查本地事务,事务ID:{}",transactionId);
        //根据事务id从日志表检索
        QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("transaction_id",transactionId);
        RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
        if(null != rocketmqTransactionLog){
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

执行本地事务的方法

@Transactional(rollbackFor = RuntimeException.class)
@Override
public void changeStatuswithRocketMqLog(Integer id,String status,String transactionId){
    //将订单状态置位无效
	orderMapper.changeStatus(id,status);
    //插入事务表
	rocketMqTransactionLogMapper.insert(
			RocketmqTransactionLog.builder()
					.transactionId(transactionId)
					.log("执行删除订单操作")
			.build()
	);
}

这一块的代码逻辑都是在生产端,即Order-Server,大家不要搞错了

消费消息

Rollback的消息MQServer会给我们处理,我们只要关注Commit状态时消费端可以正常消费即可。在account-service监听消息,如果收到消息则给用户账户增加余额。

@Slf4j
@Service
@RocketMQMessageListener(topic = "add-amount",consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired) )
public class AddUserAmountListener implements RocketMQListener<UserAddMoneyDTO> {
    private final AccountMapper accountMapper;
    /**
     * 收到消息的业务逻辑
     */
    @Override
    public void onMessage(UserAddMoneyDTO userAddMoneyDTO) {
        log.info("received message: {}",userAddMoneyDTO);
        accountMapper.increaseAmount(userAddMoneyDTO.getUserCode(),userAddMoneyDTO.getAmount());
        log.info("add money success");
    }
}

测试

订单表有这样一条记录,用户为jianzh5,amount为200

用户表的记录,执行完成后jianzh5的账户应该变成250

调用删除订单接口,删除订单

发送半消息

执行本地事务,并生成事务日志

模拟异常情况 在发送Commit消息的时候我们用命令杀掉进程taskkill /pid 19748 -t -f,模拟异常!

重新启动order-service,查看是否会执行回查动作

MQServer进行回查,检查事务日志,判断是否可以提交事务

消费者消费事务消息,保证事务的一致性

总结

使用RocketMQ实现事务消息的过程还是很复杂的,需要好好理解开头的那张图,只有理解了事务消息的交互过程才能编写相应的代码!

好了,各位朋友们,本期的内容到此就全部结束啦,能看到这里的同学都是优秀的同学,下一个升职加薪的就是你了!

以上就是微服务架构RocketMQ进阶事务消息原理详解的详细内容,更多关于微服务架构RocketMQ事务消息的资料请关注我们其它相关文章!

(0)

相关推荐

  • RocketMQ消息过滤与查询的实现

    消息过滤 RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的. RocketMQ这么做是还是在于其Producer端写入消息和Consomer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构. 其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的M

  • 基于RocketMQ推拉模式详解

    消费者客户端有两种方式从消息中间件获取消息并消费.严格意义上来讲,RocketMQ并没有实现PUSH模式,而是对拉模式进行一层包装,名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现. 通过 Pull 不断轮询 Broker 获取消息.当不存在新消息时,Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息. 1.概述 1.1.PULL方式 由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息:采用Pull方式,如何设置Pull消息的拉取频率需要重点去考虑,举个

  • 消息中间件详解以及比较选择

    目录 一.消息中间件相关知识 1.概述 2.消息中间件的组成 2.1 Broker 2.2 Producer 2.3 Consumer 2.4 Topic 2.5 Queue 2.6 Message 3 消息中间件模式分类 3.1 点对点 3.2 发布/订阅 4 消息中间件的优势 4.1 系统解耦 4.2 提高系统响应时间 4.3 为大数据处理架构提供服务 4.4 Java消息服务--JMS 5 消息中间件应用场景 5.1 异步通信 5.2 解耦 5.3 冗余 5.4 扩展性 5.5 过载保护

  • java开发RocketMQ消息中间件原理基础详解

    RocketMQ 是什么 Github 上关于 RocketMQ 的介绍: RcoketMQ 是一款低延迟.高可靠.可伸缩.易于使用的消息中间件.具有以下特性: 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递 支持拉(pull)和推(push)两种消息模式 单一队列百万消息的堆积能力 支持多种消息协议,如 JMS.MQTT 等 分布式高可用的部署架构,满足至少一次消息传递语义 提供 docker 镜像用于隔离测试和云集群部署 提

  • 微服务架构设计RocketMQ进阶事务消息原理详解

    目录 前言 RocketMQ事务流程概要 RocketMQ事务流程关键 实现 基础配置 引入组件 添加配置 发送半消息 执行本地事务与回查 消费消息 测试 总结 前言 分布式消息选型的时候是否支持事务消息是一个很重要的考量点,而目前只有RocketMQ对事务消息支持的最好.今天我们来唠唠如何实现RocketMQ的事务消息! Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败

  • 微服务架构设计RocketMQ基础及环境整合

    目录 概述&选型 单机安装配置 双机主从高可用搭建 启动多个NameServer 和 Broker 重要参数说明 可视化管理平台 SpringBoot整合RocketMQ 引入组件rocketmq-spring-boot-starter 依赖 修改application.yml,添加RocketMQ相关配置 编写消息生产者 MessageProduce 编写消息消费者 MessageConsumer 编写单元测试发送消息 测试 概述&选型 消息队列作为高并发系统的核心组件之一,能够帮助业务

  • 微服务之注册中心和配置中心Consul详解

    目录 概述 注册中心 注册中心选型 CAP原理 Consul介绍 Consul Raft算法 Consul 基本使用 注册服务 概述 上篇说到构建良好的架构,依托于基础设施建设(自动化测试.自动化部署.服务监控,服务发现.配置中心等等),决定成败的往往是基础设施建设,所以从搭建一个注册中心和配置中心开始我们新一阶段的启程. 注册中心 注册中心选型 你有没有思考过这样一个问题,为什么会有这么多的注册中心(etcd/ZooKeeper/Consul),选用那个最适合自己,是不是在选用的时候会眼花缭乱

  • Spring事务annotation原理详解

    这篇文章主要介绍了Spring事务annotation原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 在使用Spring的时候,配置文件中我们经常看到 annotation-driven 这样的注解,其含义就是支持注解,一般根据前缀 tx.mvc 等也能很直白的理解出来分别的作用. <tx:annotation-driven/> 就是支持事务注解的(@Transactional) . <mvc:annotation-driven

  • SpringBoot定时任务设计之时间轮案例原理详解

    目录 知识准备 什么是时间轮(Timing Wheel) Netty的HashedWheelTimer要解决什么问题 HashedWheelTimer的使用方式 实现案例 Pom依赖 2个简单例子 HashedWheelTimer是如何实现的? 什么是多级Timing Wheel? 知识准备 Timer和ScheduledExecutorService是JDK内置的定时任务方案,而业内还有一个经典的定时任务的设计叫时间轮(Timing Wheel), Netty内部基于时间轮实现了一个Hashe

  • Go微服务项目配置文件的定义和读取示例详解

    目录 前言 场景 定义配置 配置文件 加载配置文件 实现原理 总结 项目地址 前言 我们在写应用时,基本都会用到配置文件,从各种 shell 到 nginx 等,都有自己的配置文件.虽然这没有太多难度,但是配置项一般相对比较繁杂,解析.校验也会比较麻烦.本文就给大家讲讲我们是怎么简化配置文件的定义和解析的. 场景 如果我们要写一个 Restful API 的服务,配置项大概有如下内容: Host,侦听的 IP,如果不填,默认用 0.0.0.0 Port,侦听的端口,必填,只能是数字,大于等于80

  • 详解微服务架构及其演进史

    目录 1 传统单体系统介绍 1.1 单体系统的问题 1.2 单体系统的优点 1.3 单体服务到微服务的发展过程 2 关于微服务 2.1 单一职责 2.2 轻量级通信 2.3 独立性 2.4 进程隔离 2.5 混合技术栈和混合部署方式 2.6 简化治理 2.7 安全可靠,可维护. 3 微服务演进史 3.1 第一阶:简单服务通信模块 3.2 第二阶:原始通信时代 3.3 第三阶:TCP时代 3.4 第四阶:第一代微服务(Spring Cloud/RPC) 3.5 第五阶:第二代微服务 3.6 第六阶

  • 微服务架构之服务注册与发现功能详解

    目录 微服务的注册与发现 1.服务注册 2.服务发现 3.注册中心 4.现下的主流注册中心 4.1 Eureka 4.1.1 介绍 4.1.2 整体架构 4.1.3 接入Spring Cloud 4.2 ZooKeeper 4.2.1 介绍 4.2.2 整体架构 4.2.3 接入Dubbo生态 4.3 Consul 4.3.1 介绍 4.3.2 整体架构 4.3.3 生态对接 4.4 总结对比 详解微服务架构及其演进史 微服务全景架构全面瓦解 微服务架构拆分策略详解 微服务的注册与发现 我们前面

  • 微服务架构拆分策略详解

    目录 1 微服务迁移准备 2 微服务颗粒的拆分策略 2.1 基于业务逻辑拆分 2.1.1 领域模型拆分 2.1.2 用户群体拆分 2.2 基于可扩展拆分 2.3 基于可靠性拆分 2.3.1 核心模块拆分 2.3.2 主次链路拆分 2.4 基于性能需求拆分 3 总结拆分原则 微服务架构及其演进史 微服务全景架构全面瓦解 前面我们学习了微服务的全景架构,了解到相对于传统单体架构,微服务的优势,以及系统服务化的发展趋势. 对于新启动的项目,我们在权衡之后可以大方的使用微服务架构.但其实大部分情况下,我

  • java开发微服务架构设计消息队列的水有多深

    目录 消息队列的作用 消息队列的设计难题 处理并发和顺序消息 处理重复消息 编写幂等消息处理器 跟踪消息并丢弃重复消息 处理事务性消息 使用数据库表作为消息队列 使用事务日志发布事件 RocketMQ事务消息解决方案 很多人在做架构设计时往往会"过度设计",简单问题复杂化,上来就引一堆中间件,我想大概原因主要有下面两点: 为了秀(学)技术而架构 我们常说技术是为业务服务的,不能为了技术而技术,为了秀技术引入一堆复杂架构这是要不得的. 考虑问题不全面,或者说广度不够,不知道如何简单化 举

随机推荐