springboot整合rocketmq实现分布式事务

1 执行流程

(1) 发送方向 MQ 服务端发送消息。
(2) MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
(3) 发送方开始执行本地事务逻辑。
(4) 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
(5) 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后MQ Server 将对该消息发起消息回查。
(6) 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
(7) 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

2 工程

2.1 pom

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.71</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-collections4</artifactId>
            <version>4.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.3.0.RELEASE</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

2.2 application.yml

rocketmq:
  name-server: 192.168.38.50:9876
  producer:
    group: transcation-group

2.3 TransactionListenerImpl

@RocketMQTransactionListener(txProducerGroup = "transaction-producer-group")
@Slf4j
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>();

    /**
     *  执行业务逻辑
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        try {
            System.out.println("用户A账户减500元.");
            System.out.println("用户B账户加500元.");
            STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
        }

        STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);
        return RocketMQLocalTransactionState.UNKNOWN;

    }

    /**
     * 回查
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        log.info("回查消息 -> transId ={} , state = {}", transId, STATE_MAP.get(transId));
        return STATE_MAP.get(transId);
    }
}

2.4 SpringTransactionProducer

@Component
@Slf4j
public class SpringTransactionProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送消息
     *
     */
    public void sendMsg(String topic, String msg) {
        Message<String> message = MessageBuilder.withPayload(msg).build();
        this.rocketMQTemplate.sendMessageInTransaction("transaction-producer-group", topic, message, null);
        log.info("发送成功");
    }
}

2.5 SpringTxConsumer

@Component
@RocketMQMessageListener(topic = "pay_topic",
        consumerGroup = "transaction-consumer-group",
        selectorExpression = "*")
@Slf4j
public class SpringTxConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        log.info("接收到消息 -> {}", msg);
    }
}

2.6 ProducerController

@RestController
@RequestMapping("/producer")
public class ProducerController {

    @Autowired
    private SpringTransactionProducer springTransactionProducer;

    @GetMapping("/sendMsg")
    public String sendMsg() {
        springTransactionProducer.sendMsg("pay_topic", "用户A账户减500元,用户B账户加500元。");
        return "发送成功";
    }

}

2.7 RocketApplication

@SpringBootApplication
public class RocketApplication {

    public static void main(String[] args) {
        SpringApplication.run(RocketApplication.class);
    }

}

3 测试

3.1 正常消费测试

描述: 正常启动及可。

3.2 回查代码测试

描述: 执行本地事务时添加异常,重启测试,发现消费者没有收到消息。

到此这篇关于springboot整合rocketmq实现分布式事务的文章就介绍到这了,更多相关springboot 分布式事务内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • springboot cloud使用eureka整合分布式事务组件Seata 的方法

    前言 近期一直在忙项目,我也是打工仔.不多说,我们开始玩一玩seata. 正文 什么都不说,我们按照惯例,先上一个图(图里不规范的使用请忽略): 简单一眼就看出来, 比我们平时用的东西,多了 Seata Server 微服务 . 同样这个 Seata Server 微服务 ,也是需要注册到eureka上面去的. 那么我们首先就搞一搞这个 seata server ,那么剩下的就是一些原本的业务服务整合配置了. 该篇用的 seata server 版本,用的是1.4.1 , 可以去git下载下.当

  • 详解SpringBoot基于Dubbo和Seata的分布式事务解决方案

    1. 分布式事务初探 一般来说,目前市面上的数据库都支持本地事务,也就是在你的应用程序中,在一个数据库连接下的操作,可以很容易的实现事务的操作. 但是目前,基于SOA的思想,大部分项目都采用微服务架构后,就会出现了跨服务间的事务需求,这就称为分布式事务. 本文假设你已经了解了事务的运行机制,如果你不了解事务,那么我建议先去看下事务相关的文章,再来阅读本文. 1.1 什么是分布式事务 对于传统的单体应用而言,实现本地事务可以依赖Spring的@Transactional注解标识方法,实现事务非常简

  • Springboot-dubbo-fescar 阿里分布式事务的实现方法

    大家可以自行百度下阿里分布式事务,在这里我就不啰嗦了.下面是阿里分布式事务开源框架的一些资料,本文是springboot+dubbo+fescar的集成. 快速开始 https://github.com/alibaba/fescar/wiki/Quick-Start GIT地址 https://github.com/alibaba/fescar 1.sql CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `br

  • SpringBoot+Dubbo+Seata分布式事务实战详解

    前言 Seata 是 阿里巴巴开源的分布式事务中间件,以高效并且对业务0侵入的方式,解决微服务场景下面临的分布式事务问题. 事实上,官方在GitHub已经给出了多种环境下的Seata应用示例项目,地址:https://github.com/seata/seata-samples. 为什么笔者要重新写一遍呢,主要原因有两点: 官网代码示例中,依赖太多,分不清哪些有什么作用 Seata相关资料较少,笔者在搭建的过程中,遇到了一些坑,记录一下 一.环境准备 本文涉及软件环境如下: SpringBoot

  • springboot整合rocketmq实现分布式事务

    1 执行流程 (1) 发送方向 MQ 服务端发送消息. (2) MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息. (3) 发送方开始执行本地事务逻辑. (4) 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息:MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息. (5)

  • SpringCloud+RocketMQ实现分布式事务的实践

    目录 一.RocketMQ的分布式事务结构和说明 二.搭建RocketMQ 三.事务场景,然后准备工程,运行代码 随着互联网公司的微服务越来越多,分布式事务已经成为了我们的经常使用的.所以我们来一步一步的实现基于RocketMQ的分布式事务.接下来,我们将要做的主题写出来. RocketMQ的分布式事务结构和说明 搭建RocketMQ步骤 事务场景,然后准备工程,运行代码 一.RocketMQ的分布式事务结构和说明 我们通过下图来了解一下RocketMQ实现分布式事务的结构.采用半消息机制实现分

  • Springboot 整合 RocketMQ 收发消息

    Springboot 整合 RocketMQ 收发消息 创建springboot项目 pom.xml添加rocketmq-spring-boot-starter依赖. <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version>

  • Springboot 整合 RocketMQ 收发消息的配置过程

    Springboot 整合 RocketMQ 收发消息 创建springboot项目 pom.xml添加rocketmq-spring-boot-starter依赖. <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version>

  • SpringBoot整合RocketMQ实现消息发送和接收的详细步骤

    我们使用主流的SpringBoot框架整合RocketMQ来讲解,使用方便快捷: 最终项目结构如下: 具体步骤如下: 第一步:新建SpringBoot项目rocketmq-test,引入rocketmq依赖,以及项目配置 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <vers

  • SpringBoot整合Redisson实现分布式锁

    目录 一.添加依赖 二.redis配置文件 三.新建配置类 四.使用分布式锁 可重入锁 读写锁 信号量(Semaphore) 闭锁(CountDownLatch) Redisson是架设在redis基础上的一个Java驻内存数据网格(In-Memory Data Grid).充分的利用了Redis键值数据库提供的一系列优势,基于Java实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类.使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低

  • SpringBoot 整合 Spring-Session 实现分布式会话项目实战

    目录 一.配置及开发 二.测试 三.Spring-Session 的缺点 文章参考: Spring 提供了处理分布式会话的解决方案:Spring-Session.Spring-Session 提供了对Redis.MongoDB.MySQL 等常用存储的支持,Spring-Session 提供与 HttpSession 的透明整合,这意味着开发人员可以使用 Spring-Session 支持的实现方式,切换 HttpSession 至 Spring-Session.本文采用 Redis 作为第三方

  • SpringBoot整合SpringSession实现分布式登录详情

    目录 Session 共享 为什么服务器 A 登录后,请求发到服务器 B,不认识该用户? 解决方案 SpringBoot整合SpringSession实现分布式登录 Session 共享 比如两个域名: aaa.yupi.com bbb.yupi.com 如果要共享 cookie,可以种一个更高层的公共域名,比如 yupi.com 为什么服务器 A 登录后,请求发到服务器 B,不认识该用户? 用户在 A 登录,所以 session(用户登录信息)存在了 A 上 结果请求 B 时,B 没有用户信息

  • SpringBoot整合RocketMQ的方法详解

    目录 一:Ubuntu安装RocketMQ 二:添加RocketMQ依赖 三:在application中添加RocketMQ配置 四:编写消费者,消息生产者,消息实体类(自定义) 五:测试Controller 一:Ubuntu安装RocketMQ 1.下载(在下面地址选择自己需要的版本的rocketmq) http://rocketmq.apache.org/release_notes/ 2.解压,更改配置 将下载的zip文件解压到自己需要安装的位置 在unbuntu系统下需要修改安装跟目录下的

  • SpringCloud微服务开发基于RocketMQ实现分布式事务管理详解

    目录 消息队列实现分布式事务原理 RocketMQ的事务消息 代码实现 基础配置 发送半消息 执行本地事务与回查 Account-Service消费消息 测试 小结 消息队列实现分布式事务原理 首先让我们来看一下基于消息队列实现分布式事务的原理方案. 柔性事务 发送消息的服务有个OUTBOX数据表,在进行INSERT.UPDATE.DELETE 业务操作时也会给OUTBOX数据表INSERT一条消息记录,这样可以保证原子性,因为这是基于本地的ACID事务. OUTBOX表充当临时消息队列,然后我

随机推荐