SpringCloud+RocketMQ实现分布式事务的实践

目录
  • 一、RocketMQ的分布式事务结构和说明
  • 二、搭建RocketMQ
  • 三、事务场景,然后准备工程,运行代码

  随着互联网公司的微服务越来越多,分布式事务已经成为了我们的经常使用的。所以我们来一步一步的实现基于RocketMQ的分布式事务。接下来,我们将要做的主题写出来。

  • RocketMQ的分布式事务结构和说明
  • 搭建RocketMQ步骤
  • 事务场景,然后准备工程,运行代码

一、RocketMQ的分布式事务结构和说明

  我们通过下图来了解一下RocketMQ实现分布式事务的结构。采用半消息机制实现分布式事务,半消息顾名思义,就是发送方将消息发送到MQ中的Broker端,这个消息被标记为“暂不投递”状态,这个时间订阅方是不能收到这个消息的,当发送方将提交了Commit后,这个消息才可以被订阅方收到。如果发送方提交了Rollback,那么订阅方就不会收到以该消息。

  RocketMQ采用的是最终一致性事务,因为它使用了半消息机制,订阅方可能收不到消息,但是最终状态是能保持一致的。

  我们在讲这个流程前,我们需要先看下图的红色部分,当发送方发给MQ的Commit/Rollback没有收到的时候,这个时候需要启用一个线程,从本地事务库里边查找到当前的状态,根据当前的状态来决定是否重新发送Commit/Rollback,相当于重试。

  接下来,我们说一下整个执行的过程。

发送方将半消息发送到MQ中,成功后MQ将消息保持好后将结果同步给发送方,即半消息发送成功。当MQ告诉发送方成功后,我们需要记录下发送到MQ的相关的事情,比如发送的消息内容,发送时间,发送的相关ID,比如事务ID,都要做好相应记录。当本地事务库,也可以理解为日志库,我们可以针对发生的问题进行追溯。本地库记录好事务后,发送方就可以将Commit/Rollback提交到MQ了。当MQ收到了Commit命令后,这个时候就会将该条消息发送给订阅方。收到了Rollback后,那该条消息不会投递给订阅方,消息保存3天后删除。

  整个过程就是这些,大家如果觉得有遗漏的,可以告诉我。

二、搭建RocketMQ

  我使用的是windos10,所以需要下载windows版本。

   进入到RocketMQ的官方地址:http://rocketmq.apache.org/release_notes/release-notes-4.3.0/

   下载二进制文件压缩包。如下图:

  先解压,然后进行环境变量配置,打开菜单,直接输入环境变量。

    变量名:ROCKETMQ_HOME

    值:解压的release的路径,如D:\rocketmq-all-4.3.0-bin-release

  然后打开CMD命令,进入到解压路径中的bin目标,进行nameserver启动。

start mqnameserv.cmd

  启动后的效果为:

  再进行broker的启动,启动需要连到的nameserver为127.0.0.1:9876,打开自动创建Topic,这个时候当用到某个Topic没有的情况下会自动创建一个。

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

  这个时候Windows的rocketmq就下载好并且启动成功了。

三、事务场景,然后准备工程,运行代码

  因为我们使用的RocketMq是使用的分布式事务是最终一致性的,柔性的,所以我们使用的场景也要考虑到。我们用的场景就是下订单充话费,用户支付了订单,那么直接给用户的进行充值。购买的订单不会立马充值到手机的,需要等一会才到账,这就是最终一致性。

  我们使用的如下代码版本号,SpringCloud的版本号要和SpringBoot保持一致,否则会出现类找不到的情况。

SpringCloud(Finchley.RELEASE) + SpringBoot2.0.4.RELEASE + RocketMQ4.3 +MySQL + lombok(插件)

  我们使用SpringCloud的几个组件:

  Euerka Server :用于提供服务注册的能力和发现

  Euerka Client : 用于进行服务注册

  Feign:用于服务间的调用

  Config :用于进行配置

  接下来,我们创建需要进行配置的表。

DROP TABLE IF EXISTS `spring_cloud_config`;

CREATE TABLE `spring_cloud_config` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `akey` varchar(30) DEFAULT NULL,
  `avalue` varchar(128) DEFAULT NULL,
  `application` varchar(30) DEFAULT NULL,
  `aprofile` varchar(30) DEFAULT NULL,
  `label` varchar(30) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

INSERT INTO `spring_cloud_config` (`id`, `akey`, `avalue`, `application`, `aprofile`, `label`)
VALUES
    (2,'name_server','127.0.0.1:9876','product-service','dev','dev'),
    (3,'name_server','127.0.0.1:9876','order-service','dev','dev'),
    (4,'order_topic','order_topic','order-service','dev','dev'),
    (5,'order_topic','order_topic','product-service','dev','dev');

  我们构建了4个模块,eureka模块用于服务注册和发现,springcloud-config将数据库创建的配置加载供其他工程使用。charge-order-service用于下充值订单,phone-charge-service用于给手机进行充值业务。

  主要的流程为,通过charge-order-service产生了订单,然后将订单和数量到phone-charge-service进行二次确认看是否还有足够的数量可以使用,如果数量不够的话直接将事务进行rollback,这样消息就不会到消费端。如果下过去的订单号已经充值过了,那么该消息将会被直接丢掉,这也是消息端的幂等设计。

  接下来,我们看一下生产者以及事务的核心代码。主要用于连接RocketMQ, 执行本地事务,在本地事务中进行二次确认,根据结果进行Commit、Rollback。当连接RocketMQ出现问题的时候可能会收到UNKNOWN,这个时候会调用checkLocalTransaction()方法,用于检查是否将消息发送给RocketMQ了。

package com.hqs.chargeorderservice.mqservice;

import com.alibaba.fastjson.JSONObject;
import com.hqs.chargeorderservice.config.Jms;
import com.hqs.chargeorderservice.service.ProduceOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.*;

/**
 *
 * @Description: 分布式事务RocketMQ 生产者
 */
@Slf4j
@Component
public class TransactionProducer {

    /**
     * 需要自定义事务监听器 用于 事务的二次确认 和 事务回查
     */
    private TransactionListener transactionListener ;

    /**
     * 这里的生产者和之前的不一样
     */
    private TransactionMQProducer producer = null;

    /**
     * 官方建议自定义线程 给线程取自定义名称 发现问题更好排查
     */
    private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }

    });

    public TransactionProducer(@Autowired Jms jms, @Autowired ProduceOrderService produceOrderService) {
        transactionListener = new TransactionListenerImpl(produceOrderService);
        // 初始化 事务生产者
        producer = new TransactionMQProducer(jms.getOrderTopic());
        // 添加服务器地址
        producer.setNamesrvAddr(jms.getNameServer());
        // 添加事务监听器
        producer.setTransactionListener(transactionListener);
        // 添加自定义线程池
        producer.setExecutorService(executorService);

        start();
    }

    public TransactionMQProducer getProducer() {
        return this.producer;
    }

    /**
     * 对象在使用之前必须要调用一次,只能初始化一次
     */
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown() {
        this.producer.shutdown();
    }
}

/**
 * @Description: 自定义事务监听器
 */
@Slf4j
class TransactionListenerImpl implements TransactionListener {

    @Autowired
    private ProduceOrderService produceOrderService ;

    public TransactionListenerImpl( ProduceOrderService produceOrderService) {
        this.produceOrderService = produceOrderService;
    }

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info("=========本地事务开始执行=============");
        String message = new String(msg.getBody());
        JSONObject jsonObject = JSONObject.parseObject(message);
        Integer productId = jsonObject.getInteger("productId");
        Integer total = jsonObject.getInteger("total");
        Integer tradeNo = jsonObject.getInteger("tradeNo");
        int userId = Integer.parseInt(arg.toString());
        //模拟执行本地事务begin=======
        /**
         * 本地事务执行会有三种可能
         * 1、commit 成功
         * 2、Rollback 失败
         * 3、网络等原因服务宕机收不到返回结果
         */
        log.info("本地事务执行参数,用户id={},商品ID={},销售库存={}",userId,productId,total);
        int result = produceOrderService.save(userId, productId, total, tradeNo);
        //模拟执行本地事务end========
        //TODO 实际开发下面不需要我们手动返回,而是根据本地事务执行结果自动返回
        //1、二次确认消息,然后消费者可以消费
        if (result == 0) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        //2、回滚消息,Broker端会删除半消息
        if (result == 1) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        //3、Broker端会进行回查消息
        if (result == 2) {
            return LocalTransactionState.UNKNOW;
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    /**
     * 只有上面接口返回 LocalTransactionState.UNKNOW 才会调用查接口被调用
     *
     * @param msg 消息
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        log.info("==========回查接口=========");
        String key = msg.getKeys();
        //TODO 1、必须根据key先去检查本地事务消息是否完成。
        /**
         * 因为有种情况就是:上面本地事务执行成功了,但是return LocalTransactionState.COMMIT_MESSAG的时候
         * 服务挂了,那么最终 Brock还未收到消息的二次确定,还是个半消息 ,所以当重新启动的时候还是回调这个回调接口。
         * 如果不先查询上面本地事务的执行情况 直接在执行本地事务,那么就相当于成功执行了两次本地事务了。
         */
        // TODO 2、这里返回要么commit 要么rollback。没有必要在返回 UNKNOW
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

  我们启动一下程序。我们登录localhost:7001注册中心,看到3个服务都成功注册了。

  然后我们进行下订单调用接口,userId用户编码,productId产品号,total购买量,tradeNo交易订单号

http://localhost:9001/api/v1/order/chargeOrder?userId=1&productId=1&total=1&tradeNo=4

  charge-order-service显示出来本地执行事务的参数。

  phone-charge-service消费事务执行的日志。

  当我们在创建charge-order-service和phone-charge-service的时候一定要注意,将工程里边的application.properties变为bootstrap.yml,要不然程序启动的时候会从8888找config的配置内容。因为SpringCloud里面有个“启动上下文”,主要是用于加载远端的配置,也就是加载ConfigServer里面的配置,默认加载顺序为:加载bootstrap.*里面的配置 --> 链接configserver,加载远程配置 --> 加载application.*里面的配置。

  好了,代码也放到git地址了,https://github.com/stonehqs/springcloud-rocketmq-transaction,

到此这篇关于SpringCloud+RocketMQ实现分布式事务的实践的文章就介绍到这了,更多相关SpringCloud RocketMQ分布式事务内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Springboot RocketMq实现过程详解

    首先,在虚拟机上安装rocketmq和rocketMq可视化控制,安装不做描述. 1.pom.xml文件添加依赖 mq的版本与连接的rocketmq版本保持一致 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-remoting</artifactId> <version>4.4.0</version> </depende

  • SpringBoot中使用RocketMQ的示例代码

    目录 1 订单微服务发送消息 1.1 订单微服务添加rocketmq的依赖 1.2 添加配置 1.3 编写测试代码 1.4 测试 2 用户微服务订阅消息 2.1 用户微服务增加rocketmq依赖 2.2 修改主类,启动nacos客户端 2.3 修改配置文件 2.4 编写消息接收服务 2.5 测试 接下来我们模拟一种场景:商品下单成功之后,向下单用户发送短信.以此来示例SpringBoot中RocketMQ的使用方式. 1 订单微服务发送消息 1.1 订单微服务添加rocketmq的依赖 <!-

  • Spring Boot优雅使用RocketMQ的方法实例

    前言 MQ,是一种跨进程的通信机制,用于上下游传递消息.在传统的互联网架构中通常使用MQ来对上下游来做解耦合. 举例:当A系统对B系统进行消息通讯,如A系统发布一条系统公告,B系统可以订阅该频道进行系统公告同步,整个过程中A系统并不关系B系统会不会同步,由订阅该频道的系统自行处理. 什么是RocketMQ?# 官方说明: 随着使用越来越多的队列和虚拟主题,ActiveMQ IO模块遇到了瓶颈.我们尽力通过节流,断路器或降级来解决此问题,但效果不佳.因此,我们那时开始关注流行的消息传递解决方案Ka

  • 解决SpringBoot整合RocketMQ遇到的坑

    应用场景 在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group.Topic以及selectorExpression(数据过滤.选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换. 引入依赖 <!-- RocketMq Spring Boot Starter--> <dependency> <groupId>org.apache.rocketmq<

  • 浅谈Springboot整合RocketMQ使用心得

    一.阿里云官网---帮助文档 https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh 按照官网步骤,创建Topic.申请发布(生产者).申请订阅(消费者) 二.代码 1.配置: public class MqConfig { /** * 启动测试之前请替换如下 XXX 为您的配置 */ public static final String PUBLIC_TOPIC = "test"

  • springBoot整合RocketMQ及坑的示例代码

    版本: JDK:1.8 springBoot:1.5.10 rocketMQ:4.2.0 pom 配置: <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.10.RELEASE</version> </parent> <d

  • springboot整合rocketmq实现分布式事务

    1 执行流程 (1) 发送方向 MQ 服务端发送消息. (2) MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息. (3) 发送方开始执行本地事务逻辑. (4) 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息:MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息. (5)

  • 解决springboot集成rocketmq关于tag的坑

    springboot集成rocketmq关于tag的坑 新项目使用springboot的若依框架集成rocketmq,选择集成RocketMQTemplate这种方式实现消息的发送和接收. 1.客户端发送代码 此处回调方法里有些业务不用关注,只关心发送方法 @Component public class RocketMqHelper { Logger logger = LoggerFactory.getLogger(RocketMqHelper.class); @Resource private

  • SpringCloud+RocketMQ实现分布式事务的实践

    目录 一.RocketMQ的分布式事务结构和说明 二.搭建RocketMQ 三.事务场景,然后准备工程,运行代码 随着互联网公司的微服务越来越多,分布式事务已经成为了我们的经常使用的.所以我们来一步一步的实现基于RocketMQ的分布式事务.接下来,我们将要做的主题写出来. RocketMQ的分布式事务结构和说明 搭建RocketMQ步骤 事务场景,然后准备工程,运行代码 一.RocketMQ的分布式事务结构和说明 我们通过下图来了解一下RocketMQ实现分布式事务的结构.采用半消息机制实现分

  • SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

    目录 消息队列实现分布式事务原理 RocketMQ的事务消息 代码实现 基础配置 发送半消息 执行本地事务与回查 Account-Service消费消息 测试 小结 消息队列实现分布式事务原理 首先让我们来看一下基于消息队列实现分布式事务的原理方案. 柔性事务 发送消息的服务有个OUTBOX数据表,在进行INSERT.UPDATE.DELETE 业务操作时也会给OUTBOX数据表INSERT一条消息记录,这样可以保证原子性,因为这是基于本地的ACID事务. OUTBOX表充当临时消息队列,然后我

  • springboot整合shardingsphere和seata实现分布式事务的实践

    各个框架版本信息 springboot: 2.1.3 springcloud: Greenwich.RELEASE seata: 1.0.0 shardingsphere:4.0.1 maven 依赖        <dependency>         <!--<groupId>io.shardingsphere</groupId>-->         <groupId>org.apache.shardingsphere</group

  • 详解Java TCC分布式事务实现原理

    概述 之前网上看到很多写分布式事务的文章,不过大多都是将分布式事务各种技术方案简单介绍一下.很多朋友看了还是不知道分布式事务到底怎么回事,在项目里到底如何使用. 所以这篇文章,就用大白话+手工绘图,并结合一个电商系统的案例实践,来给大家讲清楚到底什么是 TCC 分布式事务. 业务场景介绍 咱们先来看看业务场景,假设你现在有一个电商系统,里面有一个支付订单的场景. 那对一个订单支付之后,我们需要做下面的步骤: 更改订单的状态为"已支付" 扣减商品库存 给会员增加积分 创建销售出库单通知仓

  • SpringCloud Alibaba使用Seata处理分布式事务的技巧

    Seata简介 在传统的单体项目中,我们使用@Transactional注解就能实现基本的ACID事务了. 但是前提是: 1) 数据库支持事务(如:MySQL的innoDB引擎) 2) 所有业务都在同一个数据库中执行 随着微服务架构的引入,需要对数据库进行分库分表,每个服务拥有自己的数据库,这样传统的事务就不起作用了,那么我们如何保证多个服务中数据的一致性呢? 这样就出现了分布式事务,而Seata就是为微服务架构而生的一种高性能.易于使用的分布式事务解决方案. Seata 中有三个基础组件: T

  • Springcloud seata分布式事务实现代码解析

    Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务.本篇不涉及其原理,只用代码构建项目简单试用一下其回滚的机制. 大致上seata分为TC,TM,RM三大构建成整体.它们之间的包含关系如下.即一(xid主键编码,记录信息)带三(TC,TM,RM) 下面之间构建项目进行测试. 1.下载seata并解压,然后改动配置文件. http://seata.io/zh-cn/blog/download.html官网下载. 解压之后到conf中修改file和registry

  • springboot cloud使用eureka整合分布式事务组件Seata 的方法

    前言 近期一直在忙项目,我也是打工仔.不多说,我们开始玩一玩seata. 正文 什么都不说,我们按照惯例,先上一个图(图里不规范的使用请忽略): 简单一眼就看出来, 比我们平时用的东西,多了 Seata Server 微服务 . 同样这个 Seata Server 微服务 ,也是需要注册到eureka上面去的. 那么我们首先就搞一搞这个 seata server ,那么剩下的就是一些原本的业务服务整合配置了. 该篇用的 seata server 版本,用的是1.4.1 , 可以去git下载下.当

  • 详解Java分布式事务的 6 种解决方案

    介绍 在分布式系统.微服务架构大行其道的今天,服务间互相调用出现失败已经成为常态.如何处理异常,如何保证数据一致性,成为微服务设计过程中,绕不开的一个难题. 在不同的业务场景下,解决方案会有所差异,常见的方式有: 阻塞式重试: 2PC.3PC 传统事务: 使用队列,后台异步处理: TCC 补偿事务: 本地消息表(异步确保): MQ 事务. 本文侧重于其他几项,关于 2PC.3PC 传统事务,网上资料已经非常多了,这里不多做重复. 阻塞式重试 在微服务架构中,阻塞式重试是比较常见的一种方式.伪代码

  • 关于MySQL与Golan分布式事务经典的七种解决方案

    目录 1.基础理论 1.1 事务 1.2 分布式事务 2.分布式事务的解决方案 2.1 两阶段提交/XA 2.2 SAGA 2.3 TCC 2.4 本地消息表 2.5 事务消息 2.6 最大努力通知 2.7 AT事务模式 3.异常处理 3.1 异常情况 3.2 子事务屏障 3.3 子事务屏障原理 3.4 子事务屏障小结 4.分布式事务实践 4.1 一个SAGA事务 4.2 处理网络异常 4.3 处理回滚 5.总结 前言: 随着业务的快速发展.业务复杂度越来越高,几乎每个公司的系统都会从单体走向分

随机推荐