spring整合atomikos实现分布式事务的方法示例

前言

Atomikos 是一个为Java平台提供增值服务的并且开源类事务管理器,主要用于处理跨数据库事务,比如某个指令在A库和B库都有写操作,业务上要求A库和B库的写操作要具有原子性,这时候就可以用到atomikos。笔者这里整合了一个spring和atomikos的demo,并且通过案例演示说明atomikos的作用。

准备工作

开发工具:idea

数据库:mysql , oracle

正文

源码地址: https://github.com/qw870602/atomikos

演示原理:通过在两个库的写操作之间人为制造异常来观察数据库是否回滚

演示步骤:1.正常写操作,观察数据库值的变化情况

2.在写操作语句之间制造异常,观察数据库值的变化情况

项目结构

从web.xml中可以知道,容器只加载了appliactionContext.xml,剩下的配置文件除了database.properties外都是无用文件,所以大家如果要在项目中配置的话,仅需要把appliactionContext.xml中关于atomikos的部分新增到自己项目中就OK了

appliactionContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans
    xmlns="http://www.springframework.org/schema/beans"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  http://www.springframework.org/schema/tx
  http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
  http://www.springframework.org/schema/aop
  http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
  http://www.springframework.org/schema/context
  http://www.springframework.org/schema/context/spring-context-3.0.xsd
  http://www.springframework.org/schema/mvc
  http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd">
  <!-- 引入数据源信息的properties属性文件 -->
  <context:property-placeholder location="classpath:database.properties" />
  <!-- XA方式 -->
  <!-- MYSQL数据库配置 -->
  <bean id="mysqlDataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" destroy-method="close">
    <property name="uniqueResourceName" value="dataSource1"/>
    <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/>
    <property name="xaProperties">
      <props>
        <prop key="URL">${mysql.qa.db.url}</prop>
        <prop key="user">${mysql.qa.db.user}</prop>
        <prop key="password">${mysql.qa.db.password}</prop>
      </props>
    </property>
    <property name="minPoolSize" value="10" />
    <property name="maxPoolSize" value="100" />
    <property name="borrowConnectionTimeout" value="30" />
    <property name="maintenanceInterval" value="60" />
  </bean>

  <!-- ORACLE数据库配置 -->
  <bean id="oracleDataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" destroy-method="close">
    <property name="uniqueResourceName" value="dataSource2"/>
    <property name="xaDataSourceClassName" value="oracle.jdbc.xa.client.OracleXADataSource" />
    <property name="xaProperties">
      <props>
        <prop key="URL">${oracle.qa.db.url}</prop>
        <prop key="user">${oracle.qa.db.user}</prop>
        <prop key="password">${oracle.qa.db.password}</prop>
      </props>
    </property>
    <property name="minPoolSize" value="10" />
    <property name="maxPoolSize" value="100" />
    <property name="borrowConnectionTimeout" value="30" />
    <property name="maintenanceInterval" value="60" />
  </bean>

  <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
    <!--<property name="configLocation" value="classpath:mybatis-config-mysql.xml" />-->
    <property name="dataSource" ref="mysqlDataSource" />
    <property name="mapperLocations" >
      <list>
        <value>classpath*:/dao/*.xml</value>
      </list>
    </property>
  </bean>
  <bean id="sqlSessionFactoryOracle" class="org.mybatis.spring.SqlSessionFactoryBean">
    <!--<property name="configLocation" value="classpath:mybatis-config.xml" />-->
    <property name="dataSource" ref="oracleDataSource" />
    <property name="mapperLocations" >
      <list>
        <value>classpath*:/daodev/*.xml</value>
      </list>
    </property>
  </bean>

  <!-- MyBatis为不同的mapper注入sqlSessionFactory -->
  <bean id="mysqlTransactionTestDao" class="org.mybatis.spring.mapper.MapperFactoryBean">
    <property name="sqlSessionFactory" ref="sqlSessionFactory" />
    <property name="mapperInterface" value="com.xy.dao.MysqlTransactionTestDao" />
  </bean>
  <bean id="transactionTestDao" class="org.mybatis.spring.mapper.MapperFactoryBean">
    <property name="sqlSessionFactory" ref="sqlSessionFactoryOracle" />
    <property name="mapperInterface" value="com.xy.dao.TransactionTestDao" />
  </bean>

  <!-- 分布式事务 -->
  <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
    <property name="forceShutdown" value="true"/>
  </bean>
  <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
    <property name="transactionTimeout" value="300"/>
  </bean>
  <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager" ref="atomikosTransactionManager"/>
    <property name="userTransaction" ref="atomikosUserTransaction"/>
  </bean>

  <tx:annotation-driven transaction-manager="transactionManager"/>
  <context:annotation-config/>
  <!--<!– 自动扫描controller包下的所有类,如果@Controller注入为bean –>-->
  <!--<!–事务管理层–>-->
  <context:component-scan base-package="com.xy" />

  <!-- 注册拦截器 -->
  <!--<mvc:interceptors>
    <bean class="com.springmybatis.system.interceptor.MyInterceptor" />
  </mvc:interceptors>-->
</beans>

适用JUnit4进行单元测试

package com.xy.controller;

import com.xy.daodev.TransactionTestService;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;

@ContextConfiguration(locations = {"classpath:applicationContext.xml"})
public class TransactionTestMain extends AbstractJUnit4SpringContextTests {
  @Autowired
 private TransactionTestService transactionTestService;

 /**
 * 在同一事务有多个数据源
 */
 @Test
 public void multipleDataSource2() {
 transactionTestService.updateMultipleDataSource("1","1", 100L,"1.6");
 }
}

业务实现,当前没有异常操作

@Service
public class TransactionTestServiceImpl implements TransactionTestService {
  @Autowired
 @Qualifier("mysqlTransactionTestDao")
 private MysqlTransactionTestDao mysqlTransactionTestDao;

 @Autowired
 @Qualifier("transactionTestDao")
 private TransactionTestDao transactionTestDao;

 /**
 * 在同一事务有多个数据源
 */
 @Override
 @Transactional
 public void updateMultipleDataSource(String deUserId, String inUserid, long money,String str) {
 // 账户1转出操作
 mysqlTransactionTestDao.decreaseMoney(deUserId, money);
  //Integer.parseInt(str);
 // 账户2转入操作
 transactionTestDao.increaseMoney(inUserid, money);

 } 

}

mysql模拟金额转出,oracle模拟金额转入

<update id="decreaseMoney" parameterType="java.util.Map">
  UPDATE fx1 SET amount=amount - #{1,jdbcType=BIGINT} WHERE id=#{0,jdbcType=VARCHAR}
</update>
<update id="increaseMoney">
  UPDATE fx1 SET amount=amount + #{1,jdbcType=BIGINT} WHERE id=#{0,jdbcType=VARCHAR}
</update>

mysql初始金额

oracle初始金额

执行正常操作

mysql当前金额

oracle当前金额

将被屏蔽的制造异常的代码打开

public void updateMultipleDataSource(String deUserId, String inUserid, long money,String str) {
 // 账户1转出操作
 mysqlTransactionTestDao.decreaseMoney(deUserId, money);
  Integer.parseInt("skg");
 // 账户2转入操作
 transactionTestDao.increaseMoney(inUserid, money);
} 

发现mysql和oracle的当前金额都没有变化,说明事务回滚成功,查看日志

发现控制台打印出了异常信息,并且atomikos调用了rollback()方法,从日志也证实了回滚成功。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 详解Spring Boot微服务如何集成fescar解决分布式事务问题

    什么是fescar? 关于fescar的详细介绍,请参阅fescar wiki. 传统的2PC提交协议,会持有一个全局性的锁,所有局部事务预提交成功后一起提交,或有一个局部事务预提交失败后一起回滚,最后释放全局锁.锁持有的时间较长,会对并发造成较大的影响,死锁的风险也较高. fescar的创新之处在于,每个局部事务执行完立即提交,释放本地锁:它会去解析你代码中的sql,从数据库中获得事务提交前的事务资源即数据,存放到undo_log中,全局事务协调器在回滚的时候直接使用undo_log中的数据覆

  • Springboot-dubbo-fescar 阿里分布式事务的实现方法

    大家可以自行百度下阿里分布式事务,在这里我就不啰嗦了.下面是阿里分布式事务开源框架的一些资料,本文是springboot+dubbo+fescar的集成. 快速开始 https://github.com/alibaba/fescar/wiki/Quick-Start GIT地址 https://github.com/alibaba/fescar 1.sql CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `br

  • spring整合atomikos实现分布式事务的方法示例

    前言 Atomikos 是一个为Java平台提供增值服务的并且开源类事务管理器,主要用于处理跨数据库事务,比如某个指令在A库和B库都有写操作,业务上要求A库和B库的写操作要具有原子性,这时候就可以用到atomikos.笔者这里整合了一个spring和atomikos的demo,并且通过案例演示说明atomikos的作用. 准备工作 开发工具:idea 数据库:mysql , oracle 正文 源码地址: https://github.com/qw870602/atomikos 演示原理:通过在

  • spring 整合JDBC和AOP事务的方法

    spring整合JDBC spring提供了很多模板整合Dao技术 ORM持久化技术 模板类 JDBC org.springframework.Jdbc.core.JdbcTemplate Hibernate3.0 org.springframework.orm.hiberate3.HibernateTemplate IBatis(MyBatis) org.springframework.orm.sqlMapClientTemplate JPA org.springframework.orm.j

  • ssm整合之Spring整合MyBatis框架配置事务的详细教程

    ssm整合之Spring整合MyBatis框架配置事务 1.在applicationContext.xml修改代码如下: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance

  • Spring整合Quartz实现定时任务调度的方法

    最近项目中需要实现定时执行任务,比如定时计算会员的积分.调用第三方接口等,由于项目采用spring框架,所以这里结合spring框架来介绍. 编写作业类 即普通的pojo,如下: package com.pcmall.task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TaskA { private static Logger logger = LoggerFactory.getLogger(Ta

  • springboot整合rocketmq实现分布式事务

    1 执行流程 (1) 发送方向 MQ 服务端发送消息. (2) MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息. (3) 发送方开始执行本地事务逻辑. (4) 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息:MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息. (5)

  • Spring Boot集成ShedLock分布式定时任务的实现示例

    一.ShedLock是什么? 官方地址:github.com/lukas-kreca- 以下是ShedLock锁提供者,通过外部存储实现锁,由下图可知外部存储集成的库还是很丰富的 本篇教程我们基于JdbcTemplate存储为例来使用ShedLock锁. 二.落地实现 1.1 引入依赖包 shedlock所需依赖包: <dependency> <groupId>net.javacrumbs.shedlock</groupId> <artifactId>she

  • SpringBoot整合POI导出通用Excel的方法示例

    一.准备工作 1.pom依赖 在pom.xml中加入POI的依赖 <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi-ooxml</artifactId> <version>3.11-beta1</version> </dependency> <dependency> <groupId>org.apache.poi

  • Spring中使用atomikos+druid实现经典分布式事务的方法

    经典分布式事务,是相对互联网中的柔性分布式事务而言,其特性为ACID原则,包括原子性(Atomictiy).一致性(Consistency).隔离性(Isolation).持久性(Durabilit): 原子性:事务是一个包含一系列操作的原子操作.事务的原子性确保这些操作全部完成或者全部失败. 一致性:一旦事务的所有操作结束,事务就被提交.然后你的数据和资源将处于遵循业务规则的一直状态. 隔离性:因为同时在相同数据集上可能有许多事务处理,每个事务应该与其他事务隔离,避免数据破坏. 持久性:一旦事

  • 带你用Python实现Saga 分布式事务的方法

    目录 分布式事务 SAGA SAGA实践 处理网络异常 处理回滚 小结 银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID,只能够通过分布式事务来解决. 分布式事务 分布式事务在分布式环境下,为了满足可用性.性能与降级服务的需要,降低一致性与隔离性的要求,一方面遵循 BASE 理论: 基本业务可用性( Basic Availability ) 柔性状态( Soft state ) 最终一致性( Eve

  • thinkPHP框架中执行事务的方法示例

    本文实例讲述了thinkPHP框架中执行事务的方法.分享给大家供大家参考,具体如下: function tran(){ //定义事务成功失败的标志 $mark = true; //1. 实例化模型 $model = D('student'); //2. 开启事务处理 $model->startTrans(); //3. ls减少2000 $sql = "update student set money=money-2000 where uname='ls'"; $result =

随机推荐