SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

目录
  • 消息队列实现分布式事务原理
  • RocketMQ的事务消息
  • 代码实现
  • 基础配置
  • 发送半消息
  • 执行本地事务与回查
  • Account-Service消费消息
  • 测试
  • 小结

消息队列实现分布式事务原理

首先让我们来看一下基于消息队列实现分布式事务的原理方案。

柔性事务

发送消息的服务有个OUTBOX数据表,在进行INSERT、UPDATE、DELETE 业务操作时也会给OUTBOX数据表INSERT一条消息记录,这样可以保证原子性,因为这是基于本地的ACID事务。

OUTBOX表充当临时消息队列,然后我们在引入一个消息中继(MessageRelay)的服务,由他从OUTBOX表中读取数据并发布消息到消息组件。

消息中继的实现可以很简单,只需要通过定时任务定期从OUTBOX表中拉取最新未发布的数据,获取到数据后将数据发送给消息组件,最后将完成发送的消息从OUTBOX表中删除即可,对于失败的消息可以根据业务规则进行重试。

RocketMQ的事务消息

RocketMQ本身已经支持事务消息,如果你们项目使用了RocketMQ,可以直接借助RocketMQ的事务消息实现分布式事务,我们先看一下RocketMQ事务消息的原理然后再借助RocketMQ来实现分布式事务。

RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。

分布式事务

RocketMQ实现事务消息主要分为两个阶段:正常事务的发送及提交、事务信息的补偿流程

整体流程为:

正常事务发送与提交阶段

1、生产者发送一个半消息给MQServer(半消息是指消费者暂时不能消费的消息)

2、服务端响应消息写入结果,半消息发送成功

3、开始执行本地事务

4、根据本地事务的执行状态执行Commit或者Rollback操作

事务信息的补偿流程

1、如果MQServer长时间没收到本地事务的执行状态会向生产者发起一个确认回查的操作请求

2、生产者收到确认回查请求后,检查本地事务的执行状态

3、根据检查后的结果执行Commit或者Rollback操作

补偿阶段主要是用于解决生产者在发送Commit或者Rollback操作时发生超时或失败的情况。

RocketMQ事务流程关键

事务消息在一阶段对用户不可见

事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的,也就是说消费者不能直接消费。这里RocketMQ的实现方法是原消息的主题与消息消费队列,然后把主题改成RMQ_SYS_TRANS_HALF_TOPIC,这样由于消费者没有订阅这个主题,所以不会被消费。

如何处理第二阶段的失败消息?

在本地事务执行完成后会向MQServer发送Commit或Rollback操作,此时如果在发送消息的时候生产者出故障了,那么要保证这条消息最终被消费,MQServer会像服务端发送回查请求,确认本地事务的执行状态。

当然了rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,RocketMQ默认回滚该消息。

消息状态 事务消息有三种状态:TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息

TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。

TransactionStatus.Unknown:中间状态,它代表需要检查消息队列来确定状态。

代码实现

业务需求:用户请求订单微服务order-service接口删除订单(退货),删除订单时需要调用account-service的方法给账户增加余额,一个典型的分布式事务问题。

基础配置

在Order-Service和Account-Service中引入Rocket消息组件

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

在配置中心添加RocketMQ的相关配置

rocketmq:
  name-server: xxx.xx.x.xx:9876
  producer:
    group: cloud-group

在OrderService服务中建立一张事务日志表rocketmq_transaction_log(作用稍后说)

发送半消息

Order-Service作为分布式事务开始的入口,在Service层我们给RocketMQ发送一条半消息

OrderController入口

/**
 * 根据订单号删除订单
 * @param orderNo 订单编号
 */
@PostMapping("/order/delete")
public ResultData<String> delete(@RequestParam String orderNo){
 log.info("delete order id is {}",orderNo);
 orderService.delete(orderNo);
 return ResultData.success("订单删除成功");
}

直接调用orderService的delete方法

OrderServiceImpl业务逻辑

@Override
public void delete(String orderNo) {
 Order order = orderMapper.selectByNo(orderNo);
 //如果订单存在且状态为有效,进行业务处理
 if (order != null && CloudConstant.VALID_STATUS.equals(order.getStatus())) {
  String transactionId = UUID.randomUUID().toString();
  //如果可以删除订单则发送消息给rocketmq,让用户中心消费消息
  rocketMQTemplate.sendMessageInTransaction("add-amount",
    MessageBuilder.withPayload(
      UserAddMoneyDTO.builder()
        .userCode(order.getAccountCode())
        .amount(order.getAmount())
        .build()
    )
    .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
    .setHeader("order_id",order.getId())
    .build()
    ,order
  );

 }
}

首先校验一下订单状态,然后使用rocketMQTemplate.sendMessageInTransaction()发送事务消息。

sendMessageInTransaction方法有三个参数:

  • destination:目的地(主题),这里发送给add-amount这个topic
  • message:发送给消费者的消息体,需要使用MessageBuilder.withPayload()来构建消息
  • arg:参数

注意,这里我们生成了一个transactionId,并放在header中跟消息一起发送(这里实际也可以构造成一个对象,放在arg里进行发送),作用后面再讲!

消息封装实体UserAddMoneyDTO

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class UserAddMoneyDTO {
    /**
     * 用户编码
     */
    private String userCode;
    /**
     * 金额
     */
    private BigDecimal amount;
}

这个类生产者和消费者都需要用到,所以我直接丢到common包中,大家根据项目实际情况决定放哪。

执行本地事务与回查

MQServer收到半消息后会告诉生产者order-service确认收到半消息,这时候order-service需要执行本地事务,执行完本地事务后再告诉MQServer本地事务的执行状态,确认此消息究竟是Commit还是Rollback。

RocketMQ提供了RocketMQLocalTransactionListener接口,本地事务监听器,这个接口类的实现如下:

第一个方法executeLocalTransaction为执行本地事务;第二个方法checkLocalTransaction为检查本地事务的执行状态,也就是回查动作。

我们需要实现RocketMQLocalTransactionListener接口,在executeLocalTransaction方法中执行本地事务,在执行checkLocalTransaction回查方法时告诉RocketMQ到底该提交还是回滚。

这里大家思考一个问题,本地事务已经执行完成了,怎么去回查本地事务的执行结果呢?

答案如下:我们可以在执行本地事务的时候同时生成一条事务日志,让本地事务与日志事务在同一个方法中,同时添加@Transactional注解,保证两个操作事务是一个原子操作。

这样如果事务日志表中有这个本地事务的信息,那就代表本地事务执行成功,需要Commit,相反如果没有对应的事务日志,则表示执行失败,需要Rollback。这就是为什么我们上面在OrderService中需要建立一张事务日志表的原因。

实现RocketMQLocalTransactionListener接口,完成事务执行逻辑

/**
 * 监听事务消息
 * @author javadaily
 */
@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddUserAmountListener implements RocketMQLocalTransactionListener {
    private final OrderService orderService;
    private final RocketMqTransactionLogMapper rocketMqTransactionLogMapper;
    /**
     * 执行本地事务
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info("执行本地事务");
        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer orderId = Integer.valueOf((String)headers.get("order_id"));
        log.info("transactionId is {}, orderId is {}",transactionId,orderId);
        try{
            //执行本地事务,并记录日志
            orderService.changeStatuswithRocketMqLog(orderId, CloudConstant.INVALID_STATUS,transactionId);
            //执行成功,可以提交事务
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    /**
     * 本地事务的检查,检查本地事务是否成功
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info("检查本地事务,事务ID:{}",transactionId);
        //根据事务id从日志表检索
        QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("transaction_id",transactionId);
        RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
        if(null != rocketmqTransactionLog){
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

本地事务执行逻辑

@Transactional(rollbackFor = RuntimeException.class)
@Override
public void changeStatuswithRocketMqLog(Integer id,String status,String transactionId){
    orderMapper.changeStatus(id,status);
    rocketMqTransactionLogMapper.insert(
        RocketmqTransactionLog.builder()
        .transactionId(transactionId)
        .log("执行删除订单操作")
        .build()
    );
}

修改订单状态为删除状态,同时往事务日志表中插入一条事务日志,用@Transactional注解保证事务。

Account-Service消费消息

监听消息并处理给用户增加余额逻辑

@Slf4j
@Service
@RocketMQMessageListener(topic = "add-amount",consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired) )
public class AddUserAmountListener implements RocketMQListener<UserAddMoneyDTO> {
    private final AccountMapper accountMapper;
    /**
     * 收到消息的业务逻辑
     */
    @Override
    public void onMessage(UserAddMoneyDTO userAddMoneyDTO) {
        log.info("received message: {}",userAddMoneyDTO);
        accountMapper.increaseAmount(userAddMoneyDTO.getUserCode(),userAddMoneyDTO.getAmount());
        log.info("add money success");
    }
}

测试

测试数据

订单表

用户表

事务日志表

如果事务消息成功消费最终用户表中jianzh5这个用户的amount应该变成300(100+200)

测试准备

我们在执行本地事务成功并需要通知消息队列提交事务处打个断点,然后在执行到此处时手动模拟异常

模拟异常

在准备提交事务时我们通过命令taskkill /pid 10116 -t -f命令强制杀掉OrderService进程。(先通过jps获取OrderService进程ID)

重启服务器,检查是否会执行回查方法

重启OrderService程序会自动执行回查方法,结合事务日志表判断是否提交事务。

运行后的结果

小结

我们介绍了使用消息队列实现柔性事务的方案,重点剖析了RocketMQ事务消息的原理,并通过Demo案例实现了分布式事务(柔性事务)。

到此这篇关于SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解的文章就介绍到这了,更多相关SpringCloud RocketMQ分布式事务内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • SpringCloud微服务剔除下线功能实现原理分析

    目录 一.前言 二.微服务剔除下线源码解析 1.EurekaBootStrap#contextInitialized() 1.1.初始化注册中心上下文 1.2.openForTraffic()逻辑 1.3.postInit()执行任务 1.4.剔除任务 2.服务剔除下线 2.1.AbstractInstanceRegistry#evict()逻辑 2.1.判断是否过期 2.2.从本地列表异常下线处理 一.前言 上一篇SpringCloud微服务续约源码解析已经分析了心跳机制是什么.底层实现.客户

  • SpringCloud微服务续约实现源码分析详解

    目录 一.前言 二.客户端续约 1.入口 构造初始化 initScheduledTasks()调度执行心跳任务 2.TimedSupervisorTask组件 构造初始化 TimedSupervisorTask#run()任务逻辑 3.心跳任务 HeartbeatThread私有内部类 发送心跳 4.发送心跳到注册中心 构建请求数据发送心跳 三.服务端处理客户端续约 1.InstanceRegistry#renew()逻辑 2.PeerAwareInstanceRegistryImpl#rene

  • SpringCloud超详细讲解微服务网关Gateway

    目录 前言 微服务网关GateWay介绍 GateWay特性介绍 Gateway 中的相关术语 Gateway实战 1.创建项目gateway 2.创建启动类 3.新增配置文件 4.编程方式实现路由 5.启动验证 总结 前言 上一篇:微服务网关Zuul 上文中,我们介绍了微服务网关Zuul,Zuul 是 Netflix 公司开源的产品,被称为第一代网关,也是 Spring Cloud 前几个版本默认使用的一款提供动态路由微服务网关组件,但是随着 Netflix 公司一系列的停更事件,在最新的 S

  • SpringCloud 分布式微服务架构操作步骤

    目录 前言 SpringCloud微服务 单体架构和微服务分布式架构 单体架构分析 微服务分布式架构分析 服务拆分和远程调用 服务拆分 案例需求准备 远程调用初步 Eureka注册中心 服务注册与负载均衡 服务注册 Ribbon负载均衡 指定负载均衡规则 Nocas 注册中心 环境配置启动服务注册 Nacos 分级存储模型与集群 负载均衡 namespace 环境隔离 统一配置管理与热更新 前言 这篇笔记文章我还是没有接上之前的java,因为我中间偷懒了,写不动了.打算先把这篇安排下,然后再把之

  • SpringCloud微服务熔断器Hystrix使用详解

    目录 什么是Hystrix Hystrix实战 总结 什么是Hystrix 在日常生活用电中,如果我们的电路中正确地安置了保险丝,那么在电压异常升高时,保险丝就会熔断以便切断电流,从而起到保护电路安全运行的作用. 在货船中,为了防止漏水和火灾的扩散,一般会将货仓进行分割,避免了一个货仓出事导致整艘船沉没的悲剧,这就是舱壁保护机制. Hystrix提供的熔断器也类似,在调用某个服务提供者时,当一定时间内请求总数超过配置的阈值,且窗口期内错误率过高,那Hystrix就会对调用请求熔断,后续的请求直接

  • SpringCloud微服务熔断器使用详解

    目录 一.简介 二.作用 三.核心概念 3.1 熔断目的 3.2 降级目的 四.实例 4.1 基于Hystrix 4.1.1 熔断触发降级 4.1.2 超时触发降级 4.1.3 资源隔离触发降级 4.2 基于OpenFeign pom.xml 一.简介 当微服务中的某个子服务,发生异常服务器宕机,其他服务在进行时不能正常访问而一直占用资源导致正常的服务也发生资源不能释放而崩溃,这时为了不造成整个微服务群瘫痪,进行的保护机制 就叫做熔断,是一种降级策略 熔断的目的:保护微服务集群 二.作用 对第三

  • SpringCloud超详细讲解微服务网关Zuul

    目录 网关的作用 Spring Cloud 网关组件Zuul介绍 Zuul网关实战 1.创建服务 2.创建配置文件 3.创建Zuul过滤器 4.编写启动类 5.启动验证 总结 网关的作用 微服务架构中,服务实例的地址可能经常会发生变化,所以我们不能直接将服务的地址暴露出来.如果每一个微服务都直接暴露接口,会导致一系列的问题,比如调用过于复杂,涉及到账户.权限不能统一处理等.另外基于高内聚低耦合的设计准则来讲,我们也应该将内部系统和外部系统做切割. 因此,这时就需要有一个独立的组件来处理外部的请求

  • SpringCloud超详细讲解微服务网关Zuul基础

    目录 一.Zuul的简介 1.Zuul是怎么工作的 2.Zuul能干嘛 二.Zuul的使用 1.配置Pom.xml 2.配置Application.yml 3.撰写启动类 4.效果图 三.学会SpringCloud的感触 一.Zuul的简介 1.Zuul是怎么工作的 Zull包含了对请求的路由(用来跳转的)和过滤两个最主要功能: 其中路由功能负责将外部请求转发到具体的微服务实例上,是实现外部访问统一入口的基础,而过滤器功能则负责对请求的处理过程进行干预,是实现请求校验,服务聚合等功能的基础.Zu

  • SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

    目录 消息队列实现分布式事务原理 RocketMQ的事务消息 代码实现 基础配置 发送半消息 执行本地事务与回查 Account-Service消费消息 测试 小结 消息队列实现分布式事务原理 首先让我们来看一下基于消息队列实现分布式事务的原理方案. 柔性事务 发送消息的服务有个OUTBOX数据表,在进行INSERT.UPDATE.DELETE 业务操作时也会给OUTBOX数据表INSERT一条消息记录,这样可以保证原子性,因为这是基于本地的ACID事务. OUTBOX表充当临时消息队列,然后我

  • 微服务之注册中心和配置中心Consul详解

    目录 概述 注册中心 注册中心选型 CAP原理 Consul介绍 Consul Raft算法 Consul 基本使用 注册服务 概述 上篇说到构建良好的架构,依托于基础设施建设(自动化测试.自动化部署.服务监控,服务发现.配置中心等等),决定成败的往往是基础设施建设,所以从搭建一个注册中心和配置中心开始我们新一阶段的启程. 注册中心 注册中心选型 你有没有思考过这样一个问题,为什么会有这么多的注册中心(etcd/ZooKeeper/Consul),选用那个最适合自己,是不是在选用的时候会眼花缭乱

  • php基于redis的分布式锁实例详解

    在使用分布式锁进行互斥资源访问时候,我们很多方案是采用redis的实现. 固然,redis的单节点锁在极端情况也是有问题的,假设你的业务允许偶尔的失效,使用单节点的redis锁方案就足够了,简单而且效率高. redis锁失效的情况: 客户端1从master节点获取了锁 master宕机了,存储锁的key还没来得及同步到slave节点上 slave升级为master 客户端2从新的master上获取到同一个资源的锁 于是,客户端1和客户端2同事持有了同一个资源的锁,锁的安全性被打破. 如果我们不考

  • SpringBoot+Dubbo+Seata分布式事务实战详解

    前言 Seata 是 阿里巴巴开源的分布式事务中间件,以高效并且对业务0侵入的方式,解决微服务场景下面临的分布式事务问题. 事实上,官方在GitHub已经给出了多种环境下的Seata应用示例项目,地址:https://github.com/seata/seata-samples. 为什么笔者要重新写一遍呢,主要原因有两点: 官网代码示例中,依赖太多,分不清哪些有什么作用 Seata相关资料较少,笔者在搭建的过程中,遇到了一些坑,记录一下 一.环境准备 本文涉及软件环境如下: SpringBoot

  • LCN分布式事务解决方案详解

    目录 一.什么是分布式事务? 二.lcn的实现思路 2.1 本地执行的状态怎么提交给全局事务? 2.2 本地事务的提交或回滚怎么实现? 三.lcn的使用 3.1 下载lcn-manager (全局的事务管理器) 3.2 配置lcn-manager 3.3 启动lcn 3.4 模拟转账服务 3.4.1 add-service 3.4.2 decr-service 3.5 2 个微服务都需要添加依赖 3.6 需要自定义数据库的连接池 3.7 使用 3.7.1 事务的发起者 3.7.2 添加配置文件

  • 详解Spring Boot微服务如何集成fescar解决分布式事务问题

    什么是fescar? 关于fescar的详细介绍,请参阅fescar wiki. 传统的2PC提交协议,会持有一个全局性的锁,所有局部事务预提交成功后一起提交,或有一个局部事务预提交失败后一起回滚,最后释放全局锁.锁持有的时间较长,会对并发造成较大的影响,死锁的风险也较高. fescar的创新之处在于,每个局部事务执行完立即提交,释放本地锁:它会去解析你代码中的sql,从数据库中获得事务提交前的事务资源即数据,存放到undo_log中,全局事务协调器在回滚的时候直接使用undo_log中的数据覆

  • Go微服务项目配置文件的定义和读取示例详解

    目录 前言 场景 定义配置 配置文件 加载配置文件 实现原理 总结 项目地址 前言 我们在写应用时,基本都会用到配置文件,从各种 shell 到 nginx 等,都有自己的配置文件.虽然这没有太多难度,但是配置项一般相对比较繁杂,解析.校验也会比较麻烦.本文就给大家讲讲我们是怎么简化配置文件的定义和解析的. 场景 如果我们要写一个 Restful API 的服务,配置项大概有如下内容: Host,侦听的 IP,如果不填,默认用 0.0.0.0 Port,侦听的端口,必填,只能是数字,大于等于80

  • SpringCloud+RocketMQ实现分布式事务的实践

    目录 一.RocketMQ的分布式事务结构和说明 二.搭建RocketMQ 三.事务场景,然后准备工程,运行代码 随着互联网公司的微服务越来越多,分布式事务已经成为了我们的经常使用的.所以我们来一步一步的实现基于RocketMQ的分布式事务.接下来,我们将要做的主题写出来. RocketMQ的分布式事务结构和说明 搭建RocketMQ步骤 事务场景,然后准备工程,运行代码 一.RocketMQ的分布式事务结构和说明 我们通过下图来了解一下RocketMQ实现分布式事务的结构.采用半消息机制实现分

  • 基于Pinpoint对SpringCloud微服务项目实现全链路监控的问题

    目录 1.全链路监控的概念 2.pinpoint链路监控组件的介绍 3.使用docker部署pinpoint监控组件 4.在微服务中集成pinpoint-agent 4.1.pinpoint-agent的接入方式 4.2.配置pinpoint-agent 4.3.修改每个微服务程序的Dockerfile接入pinpoint-agent 4.4.先将product商品服务接入到pinpoint观察效果 4.5.将所有的微服务接入到pinpoint系统 5.pinpoint监控系统简单使用 5.1.

  • SpringCloud 微服务最佳开发实践

    现在基于SpringCloud的微服务开发日益流行,网上各种开源项目层出不穷.我们在实际工作中可以参考开源项目实现很多开箱即用的功能,但是必须要遵守一定的约定和规范. 本文结合我们实际的开发中遇到的一些问题整理出了一份微服务开发的实践规范,欢迎各位大佬拍砖指点. Maven规范 1.所有项目必须要有一个统一的parent模块 所有微服务工程都依赖这个parent,parent用于管理依赖版本,maven仓库,jar版本的统一升级维护 在parent下层可以有 core,starter,rate-

随机推荐