RocketMQ事务消息图文示例讲解

RocketMQ 也允许我们像mysql 一样发送具有事务特征的消息

MQ 的事务流程(本地代码正常执行)

MQ 的消息补偿过程(当本地代码执行失败时)

MQ 消息的三种状态

  • 提交状态:允许进入队列,此消息与非事务消息无区别
  • 回滚状态:不允许进入队列,此消息等同于未发送过
  • 中间状态:完成了 half 消息的发送,未对 MQ 进行二次状态确认(未知状态)

注意:事务消息仅与生产者有关,与消费者无关

生产者代码(提交状态、回滚状态):

public class Producer {
    public static void main(String[] args) throws Exception{
        //事务消息使用的生产者是TransactionMQProducer
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("192.168.23.127:9876");
        //添加本地事务对应的监听
        producer.setTransactionListener(new TransactionListener() {
            //正常事务过程
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                // 此处写本地事务处理业务
                // 如果成功,消息改为提交,如果失败改为 回滚,如果是多线程处理状态未知,就提交为未知等待事务补偿过程
                //事务提交状态
                return LocalTransactionState.COMMIT_MESSAGE;// 类似于msql 的 commit
                //return LocalTransactionState.ROLLBACK_MESSAGE;回滚状态
            }
            //事务补偿过程
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                return null;
            }
        });
        producer.start();
        Message msg = new Message("topic8",("事务消息:hello rocketmq ").getBytes("UTF-8"));
        SendResult result = producer.sendMessageInTransaction(msg,null);
        System.out.println("返回结果:"+result);
        producer.shutdown();
    }
}

生产者(中间状态):

public class Producer {
    public static void main(String[] args) throws Exception{
        //事务消息使用的生产者是TransactionMQProducer
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("192.168.23.127:9876");
        //添加本地事务对应的监听
        producer.setTransactionListener(new TransactionListener() {
            //正常事务过程
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                return LocalTransactionState.UNKNOW;
            }
            //事务补偿过程
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("事务补偿过程执行");
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        producer.start();
        Message msg = new Message("topic8",("事务消息:hello rocketmq ").getBytes("UTF-8"));
        SendResult result = producer.sendMessageInTransaction(msg,null);
        System.out.println("返回结果:"+result);
        //事务补偿过程必须保障服务器在运行过程中,否则将无法进行正常的事务补偿
        //producer.shutdown();
    }
}

到此这篇关于RocketMQ事务消息图文示例讲解的文章就介绍到这了,更多相关RocketMQ事务消息内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • RocketMQ生产消息与消费消息超详细讲解

    目录 1 RocketMQ简介 2 MQ的常见产品 3 环境搭建 4 单生产者单消费者模式 5 单生产者多消费者模式 5.1默认模式(负载均衡) 5.2广播模式 6 多生产者多消费者模式 1 RocketMQ简介 RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目.并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的

  • RocketMq深入分析讲解两种削峰方式

    目录 何时需要削峰 通过消息队列的削峰方法有两种 消费延时控流 总结 何时需要削峰 当上游调用下游服务速率高于下游服务接口QPS时,那么如果不对调用速率进行控制,那么会发生很多失败请求 通过消息队列的削峰方法有两种 控制消费者消费速率和生产者投放延时消息,本质都是控制消费速度 通过消费者参数控制消费速度 先分析那些参数对控制消费速度有作用 1.PullInterval: 设置消费端,拉取mq消息的间隔时间. 注意:该时间算起时间是rocketMq消费者从broker消息后算起.经过PullInt

  • 一文彻底掌握RocketMQ 的存储模型

    目录 RocketMQ简介 1 整体概览 2 数据文件 3 消费文件 4 索引文件 5 写到最后 RocketMQ简介 RocketMQ有Producer.Consumer.NameSrv.Broker四个部分.其中Broker用于存储消息,维护消息队列和订阅关系,是RocketMQ四个部分中最重要的一个部分,并且RocketMQ的高性能就是依赖于Broker模块的底层存储模型实现的.所以搞清楚Broker的存储模型是学习RocketMQ最重要的一步. RocketMQ 优异的性能表现,必然绕不

  • 图文并茂讲解RocketMQ消息类别

    目录 1.同步消息 2.异步消息 3.单向消息 1.同步消息 即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功) 生产者: public class Producer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer=new DefaultMQProducer("group1"); producer.setNamesrvAddr("19

  • RocketMQ事务消息图文示例讲解

    RocketMQ 也允许我们像mysql 一样发送具有事务特征的消息 MQ 的事务流程(本地代码正常执行) MQ 的消息补偿过程(当本地代码执行失败时) MQ 消息的三种状态 提交状态:允许进入队列,此消息与非事务消息无区别 回滚状态:不允许进入队列,此消息等同于未发送过 中间状态:完成了 half 消息的发送,未对 MQ 进行二次状态确认(未知状态) 注意:事务消息仅与生产者有关,与消费者无关 生产者代码(提交状态.回滚状态): public class Producer { public s

  • RocketMQ延迟消息超详细讲解

    目录 一.什么是延时消息 二.延时消息等级 三.延时消息使用场景 四.延时消息示例 五.延时消息实现原理 一.什么是延时消息 当消息写入到Broker后,不能立刻被消费者消费,需要等待指定的时长后才可被消费处理的消息,称为延时消息. 二.延时消息等级 RocketMQ延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的.默认支持18个等级的延迟消息,延时等级定义在RocketMQ服务端的MessageStoreConfig类中的如下变量中: // MessageStoreConf

  • RocketMQ事务消息保证消息的可靠性和一致性

    这篇讲解一下rocketMq的事务消息的原理 在发送事务消息的时候,会加一个标识,表示这个消息是事务消息.broker接收到消息后,在我们之前看的代码里org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage会判断是否是事务消息. if (sendTransactionPrepareMessage) { asyncPutMessageFuture = this.brokerController.getTransac

  • RocketMQ事务消息原理与使用详解

    目录 一.RocketMQ事务消息概要 二.RocketMQ事务消息使用案例 (1).定义消息监听器 (2).定义消息生产者 (3).定义消息消费者 (4).观察生产者控制台输出 (5).观察消费者控制台输出 三.RocketMQ事务消息原理 四.RocketMQ事务消息使用限制 一.RocketMQ事务消息概要 RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败.RocketMQ的事务消息提供类

  • RocketMq事务消息发送代码流程详解

    一.RocketMq事务消息流程: 1.首先会向broker发送一个预请求消息,消费者不可见 2.回调执行本地事务(比如操作数据库) 3.事务执行成功后,再次发送消息给broker,告诉broker事务执行成功这个消息要提交,让消费者可见.如果本地事务执行超时,会返回一个unknow,broker会发送一个消息回查,检查消息是否执行成功. 二.RocketMq事务消息实例: 1.引入rocketMq相关的依赖: <dependency> <groupId>org.apache.ro

  • C语言单链表的图文示例讲解

    目录 一.单链表的结构 二.单链表的函数接口 1. 申请结点及打印单链表 2. 尾插尾删 3. 头插头删 4. 中间插入和删除 1. 在 pos 指向的结点之后插入结点 2. 在 pos 指向的结点之前插入结点 3. 删除 pos 指向的结点的后一个结点 4. 删除 pos 指向的结点 6. 查找 7. 销毁单链表 在上一篇所讲述的 动态顺序表 中存在一些缺陷 1.当空间不够时需要扩容,扩容是有一定的消耗的 如果每次空间扩大一点,可能会造成空间的浪费,而空间扩小了,又会造成频繁的扩容2.在顺序表

  • 微服务架构设计RocketMQ进阶事务消息原理详解

    目录 前言 RocketMQ事务流程概要 RocketMQ事务流程关键 实现 基础配置 引入组件 添加配置 发送半消息 执行本地事务与回查 消费消息 测试 总结 前言 分布式消息选型的时候是否支持事务消息是一个很重要的考量点,而目前只有RocketMQ对事务消息支持的最好.今天我们来唠唠如何实现RocketMQ的事务消息! Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败

  • SpringBoot集成RocketMQ发送事务消息的原理解析

    目录 简介 原理 具体实现 消费者 消费者 生产者消息监听器 消息事务测试 正常测试 异常测试 代码调整 执行结果 总结 简介 RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败.RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致. 原理 RocketMQ事务消息通过异步确保方式,保证事务的最终一致性.设计的思想可以借鉴两个阶段提交事

  • MySQL数据库事务transaction示例讲解教程

    目录 1.什么是事务? 2.和事务相关的语句只有这3个DML语句:insert.delete.update 3.假设所有的业务都能使用1条DML语句搞定,还需要事务机制吗? 4.事务的原理 5.事务的四大特性:ACID 6.关于事务之间的隔离性 1)第一级别:读未提交(read uncommitted) 2)第二级别:读已提交(read committed) 3)第三级别:可重复读(repeatable read) 4)第四级别:序列化读/串行化读(serializable) 7.演示事务的隔离

  • MySQL transaction事务安全示例讲解

    目录 事务安全 transaction 事务基本原理 自动事务 手动事务 事务的使用 回滚点 事务的特点 事务安全 transaction 事务 transaction 访问可能更新数据库中各种数据项的一个程序执行单元unit 事务由事务开始(begin transaction)和事务结束(end transaction)之间执行的全体操作组成 事务基本原理 MySQL允许将事务统一进行管理(存储引擎innodb),将用户所做的操作,暂时保存起来,不直接放到数据表(更新),等到用户确认结果之后再

随机推荐