RocketMq事务消息发送代码流程详解

一、RocketMq事务消息流程:

1、首先会向broker发送一个预请求消息,消费者不可见

2、回调执行本地事务(比如操作数据库)

3、事务执行成功后,再次发送消息给broker,告诉broker事务执行成功这个消息要提交,让消费者可见。如果本地事务执行超时,会返回一个unknow,broker会发送一个消息回查,检查消息是否执行成功。

二、RocketMq事务消息实例:

1、引入rocketMq相关的依赖:

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.4.0</version>
</dependency>

2、创建一个TransactionProducer类:

public class TransactionProducer {

  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
    //创建生产者并制定组名
    TransactionMQProducer producer = new TransactionMQProducer("rocketMQ_transaction_producer_group");
    //2.指定Nameserver地址
    producer.setNamesrvAddr("192.168.***.***:9876");
    //3、指定消息监听对象用于执行本地事务和消息回查
    TransactionListener listener = new TransactionListenerImol();
    producer.setTransactionListener(listener);
    //4、线程池
    ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
      @Override
      public Thread newThread(Runnable r) {
        Thread thread = newThread(r);
        thread.setName("client-tanscation-msg-check-thread");
        return thread;
      }
    });
    producer.setExecutorService(executorService);
    //5、启动producer
    producer.start();

    //6.创建消息对象,指定主题Topic、Tag和消息体 String topic, String tags, String keys, byte[] body
    Message message = new Message("Topic_transaction_demo", //主题
        "Tags", //主要用于消息过滤
        "Key_1", //消息唯一值
        ("hello-transaction").getBytes(RemotingHelper.DEFAULT_CHARSET));

    //7、发送事务消息
    TransactionSendResult result = producer.sendMessageInTransaction(message, "hello-transaction");

    producer.shutdown();
  }
}

3、发送事务消息还需要一个事务监听对象,它实现TransactionListener 接口,其中有两个方法作用分别是执行本地事务和消息回查:

public class TransactionListenerImol implements TransactionListener {
  //存储事务状态信息 key:事务id value:当前事务执行的状态
  private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
  //执行本地事务
  @Override
  public LocalTransactionState executeLocalTransaction(Message message, Object o) {
    //事务id
    String transactionId = message.getTransactionId();
    //0:执行中,状态未知 1:执行成功 2:执行失败
    localTrans.put(transactionId, 0);
    //业务执行,本地事务,service
    System.out.println("hello-demo-transaction");
    try {
      System.out.println("正在执行本地事务---");
      Thread.sleep(60000*2);
      System.out.println("本地事务执行成功---");
      localTrans.put(transactionId, 1);
    } catch (InterruptedException e) {
      e.printStackTrace();
      localTrans.put(transactionId, 2);
      return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    return LocalTransactionState.COMMIT_MESSAGE;
  }

  //消息回查
  @Override
  public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
    //获取对应事务的状态信息
    String transactionId = messageExt.getTransactionId();
    //获取对应事务id执行状态
    Integer status = localTrans.get(transactionId);
    //消息回查
    System.out.println("消息回查---transactionId:" + transactionId + "状态:" + status);
    switch (status) {
      case 0:
        return LocalTransactionState.UNKNOW;
      case 1:
        return LocalTransactionState.COMMIT_MESSAGE;
      case 2:
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    return LocalTransactionState.UNKNOW;
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Spring Boot优雅使用RocketMQ的方法实例

    前言 MQ,是一种跨进程的通信机制,用于上下游传递消息.在传统的互联网架构中通常使用MQ来对上下游来做解耦合. 举例:当A系统对B系统进行消息通讯,如A系统发布一条系统公告,B系统可以订阅该频道进行系统公告同步,整个过程中A系统并不关系B系统会不会同步,由订阅该频道的系统自行处理. 什么是RocketMQ?# 官方说明: 随着使用越来越多的队列和虚拟主题,ActiveMQ IO模块遇到了瓶颈.我们尽力通过节流,断路器或降级来解决此问题,但效果不佳.因此,我们那时开始关注流行的消息传递解决方案Ka

  • Docker中RocketMQ的安装与使用详解

    搜索RocketMQ的镜像,可以通过docker的hub.docker.com上进行搜索,也可以在Linux下通过docker的search命令进行搜索,不过最近防火墙升级后,导致国外的网站打开都很慢,通过命令搜索反而会更加方便,操作Docker命令一定要是root用户或者具有root权限的用户.查询操作如下: docker search rocketmq 可以得到如下的结果: 镜像倒是蛮多的,不过看来看去没有一个是官方发布的,我就随便选一个吧,如foxiswho/rocketmq,以下是一个查

  • springBoot整合RocketMQ及坑的示例代码

    版本: JDK:1.8 springBoot:1.5.10 rocketMQ:4.2.0 pom 配置: <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.10.RELEASE</version> </parent> <d

  • 使用Kotlin+RocketMQ实现延时消息的示例代码

    一. 延时消息 延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 使用延时消息的典型场景,例如: 在电商系统中,用户下完订单30分钟内没支付,则订单可能会被取消. 在电商系统中,用户七天内没有评价商品,则默认好评. 这些场景对应的解决方案,包括: 轮询遍历数据库记录 JDK 的 DelayQueue ScheduledExecutorService 基于 Quartz 的定时任务 基于 Redis 的 zset 实现延时队列. 除此之外,

  • Window搭建部署RocketMQ步骤详解

    序 以前简单用过ActiveMQ但是公司项目上使用的是RocketMQ,所以准备多花点时间在这上面,搞懂项目的配置使用. 看了很多资料,先说说我自己对RocketMQ的简单理解.不管是我们写的消费者还是生产者都属于客户端,而我们需要安装RocketMQ,这是属于服务端.和ActivieMQ.zookeeper类似,消费者.生成者.服务端(NameServer)之间是采取观察者模式实现. 在操作系统上安装RocketMQ,启动服务端NameServer.启动Broker,书写Consumer代码,

  • java RocketMQ快速入门基础知识

    如何使用 1.引入 rocketmq-client <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.1.0-incubating</version> </dependency> 2.编写Producer DefaultMQProducer produce

  • java rocketmq--消息的产生(普通消息)

    前言 与消息发送紧密相关的几行代码: 1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); 2. producer.start(); 3. Message msg = new Message(...) 4. SendResult sendResult = producer.send(msg); 5. producer.shutdown(); 那这几行代码执行时,背后都做了什么? 一. 首先

  • 浅谈Springboot整合RocketMQ使用心得

    一.阿里云官网---帮助文档 https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh 按照官网步骤,创建Topic.申请发布(生产者).申请订阅(消费者) 二.代码 1.配置: public class MqConfig { /** * 启动测试之前请替换如下 XXX 为您的配置 */ public static final String PUBLIC_TOPIC = "test"

  • rocketmq消费负载均衡--push消费详解

    前言 本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点.本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析,共分为6个部分进行介绍,其中第6个部分 rebalanceByTopic 为负载均衡的核心逻辑模块,具体过程运用了图文进行阐述. 介绍之前首先抛出几个问题: 1. 要做负载均衡,首先要解决的一个问题是什么? 2. 负载均衡是Client端处理还是Broker端处理? 个人理解: 1. 要做负载均衡

  • RocketMq事务消息发送代码流程详解

    一.RocketMq事务消息流程: 1.首先会向broker发送一个预请求消息,消费者不可见 2.回调执行本地事务(比如操作数据库) 3.事务执行成功后,再次发送消息给broker,告诉broker事务执行成功这个消息要提交,让消费者可见.如果本地事务执行超时,会返回一个unknow,broker会发送一个消息回查,检查消息是否执行成功. 二.RocketMq事务消息实例: 1.引入rocketMq相关的依赖: <dependency> <groupId>org.apache.ro

  • RocketMQ事务消息原理与使用详解

    目录 一.RocketMQ事务消息概要 二.RocketMQ事务消息使用案例 (1).定义消息监听器 (2).定义消息生产者 (3).定义消息消费者 (4).观察生产者控制台输出 (5).观察消费者控制台输出 三.RocketMQ事务消息原理 四.RocketMQ事务消息使用限制 一.RocketMQ事务消息概要 RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败.RocketMQ的事务消息提供类

  • C++调用libcurl开源库实现邮件的发送功能流程详解

    目录 1.为啥要选择libcurl库去实现邮件的发送 2.调用libcurl库的API接口实现邮件发送 3.构造待发送的邮件内容 4.开通163发送邮件账号的SMTP服务 5.排查接收的邮件内容为空的问题 libcurl中封装了支持这些协议的网络通信模块,支持跨平台,支持Windows,Unix,Linux等多个操作系统.libcurl提供了一套统一样式的API接口,我们不用关注各种协议下网络通信的实现细节,只需要调用这些API就能轻松地实现基于这些协议的数据通信.本文将简单地讲述一下使用lib

  • MySQL执行事务的语法与流程详解

    摘要:MySQL 提供了多种存储引擎来支持事务. MySQL 提供了多种存储引擎来支持事务.支持事务的存储引擎有 InnoDB 和 BDB,其中,InnoDB 存储引擎事务主要通过 UNDO 日志和 REDO 日志实现,MyISAM 存储引擎不支持事务. 拓展:任何一种数据库,都会拥有各种各样的日志,用来记录数据库的运行情况.日常操作.错误信息等,MySQL 也不例外.例如,当用户 root 登录到 MySQL 服务器,就会在日志文件里记录该用户的登录时间.执行操作等. 为了维护 MySQL 服

  • Docker阿里云RocketMQ 4.5.1部署流程详解

    搜索镜像 docker search rocketmq 查看镜像版本 如果要查看其它的镜像,只需要将其中的镜像名称foxiswho/rocketmq替换为其它镜像即可 curl https://registry.hub.docker.com/v1/repositories/foxiswho/rocketmq/tags\ | tr -d '[\[\]" ]' | tr '}' '\n'\ | awk -F: -v image='foxiswho/rocketmq' '{if(NR!=NF &

  • Springboot详解RocketMQ实现消息发送与接收流程

    springboot+rockermq 实现简单的消息发送与接收 普通消息的发送方式有3种:单向发送.同步发送和异步发送. 下面来介绍下 springboot+rockermq 整合实现 普通消息的发送与接收 创建Springboot项目,添加rockermq 依赖 <!--rocketMq依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-

  • OpenDataV低代码平台新增组件流程详解

    目录 正文 创建组件目录和文件 初始化组件文件 组件配置项 样式配置 属性配置 属性使用 总结 正文 OpenDataV计划采用子库的方式添加子组件,即每一个组件都当做一个子库,子库有自己的依赖,而项目本身的依赖只针对框架,因此每一个组件我们都当做一个子库来开发.下面我带着大家一步步详细的开发一个数字展示组件. 创建组件目录和文件 进入组件库目录下 所有的可拖拽组件都存放在src/resource/components目录下 cd src/resource/components 根据组件名称创建

  • Git的代码合入流程详解

    目录 总述 Rebase解决冲突 适用情况 操作方式 Squash解决冲突 适用情况 操作方式 Merge执行合入 总述 代码合入流程用于减轻代码合入复杂度.简化主分支历史(具有线性的历史).保证合入代码对主分支的HEAD有效.代码合入分为两步 解决冲突:将需要合入的分支变基到目标分支之上.保证合入代码对目标分支的HEAD有效.此时会解决所有代码冲突. 执行合入:向目标分支提交merge请求,执行合入CI后完成合入. 其中第一步“解决冲突”的方法分为两种种情况: Squash.如 feature

  • Handler消息传递机制类引入及执行流程详解

    目录 提要 1.学习路线图: 2.Handler类的引入 3.Handler的执行流程图 4.Handler的相关方法 5.Handler的使用示例 1)Handler写在主线程中 2)Handler写在子线程中 提要 本节给大家讲解的是Activity中UI组件中的信息传递Handler,相信很多朋友都知道,Android为了线程安全,并不允许我们在UI线程外操作UI:很多时候我们做界面刷新都需要通过Handler来通知UI组件更新!除了用Handler完成界面更新外,还可以使用runOnUi

  • 微信小程序支付及退款流程详解

    首先说明一下,微信小程序支付的主要逻辑集中在后端,前端只需携带支付所需的数据请求后端接口然后根据返回结果做相应成功失败处理即可.我在后端使用的是php,当然在这篇博客里我不打算贴一堆代码来说明支付的具体实现,而主要会侧重于整个支付的流程和一些细节方面的东西.所以使用其他后端语言的朋友有需要也是可以看一下的.很多时候开发的需求和相应问题的解决真的要跳出语言语法层面,去从系统和流程的角度考虑.好的,也不说什么废话了.进入正题. 一. 支付 支付主要分为几个步骤: 前端携带支付需要的数据(商品id,购

随机推荐