Spring TransactionalEventListener事务未提交读取不到数据的解决

目录
  • 一、背景
  • 二、问题分析
    • 2.1、mysql隔离级别
    • 2.2、问题原因分析
  • 三、解决问题方案
    • 3.1、方式一
    • 3.2、方式二
  • 四、使用案例

一、背景

业务处理过程,发现了以下问题,代码一是原代码能正常执行,代码二是经过迭代一次非正常执行代码

  • 代码一:以下代码开启线程后,代码正常执行
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5));

@Transactional
public Long test() {
  // ......
  // 插入记录
  Long studentId = studentService.insert(student);
  // 异步线程
  writeStatisticsData(studentId);
  return studentId;
}

private void writeStatisticsData(Long studentId) {
  executor.execute(() -> {
    Student student = studentService.findById(studentId);
    //........
  });
}
  • 代码二:以下代码开启线程后,代码不正常执行
@Transactional
public Long test() {
  // ......
  // 插入记录
  Long studentId = studentService.insert(student);
   // 异步线程
  writeStatisticsData(studentId);
  // 插入学生地址记录
  Long addressId = addressService.insert(address);
  return studentId;
}

private void writeStatisticsData(Long studentId) {
  executor.execute(() -> {
    Student student = studentService.findById(studentId);
    //........
  });
}

二、问题分析

这里使用了spring事务,显然需要考虑事务的隔离级别

2.1、mysql隔离级别

查看mysql隔离级别

SELECT @@tx_isolation;
READ-COMMITTED

读提交,即在事务A插入数据过程中,事务B在A提交之前读取A插入的数据读取不到,而B在A提交之后再去读就会读取到A插入的数据,也即Read Committed不能保证在一个事务中每次读都能读到相同的数据,因为在每次读数据之后其他并发事务可能会对刚才读到的数据进行修改。

2.2、问题原因分析

  • 代码一正常运行的原因

由于mysql事务的隔离级别是读提交,test方法在开启异步线程后,异步线程也开启了事务,同时以读者身份去读 test 方法中插入的 student 记录,但此时 test 方法已经提交了事务,所以可以读取到 student 记录(即在异步方法中可以读取到 student 记录),但此代码有风险,若事务提交的时间晚一点,异步线程也有可能读取不到 student 记录。

  • 代码二不能正常运行的原因

经过上面分析,很明显异步方法中不能读取到 student 记录,由于代码二在异步线程下面又执行了其他操作,延时了test方法中事务的提交,所以代码二不能正常运行。

三、解决问题方案

解决思路是在事务提交后再做其他的处理(如异步发消息处理等),这里还是从Spring执行事务的过程中入手,Spring事务的处理过程不再分析,这里直接看Spring事务增强器TransactionInterceptor的核心处理流程,源码如下:

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
  // 获取事务属性
  final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
  //加载配置中配置的TransactionManager
  final PlatformTransactionManager tm = determineTransactionManager(txAttr);
  final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

  // 声明式事务的处理
  if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
    TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
    Object retVal = null;
    //......
    retVal = invocation.proceedWithInvocation();
    //......
    commitTransactionAfterReturning(txInfo);
    return retVal;
  } else {
    // 编程式事务的处理......
  }
  //......
}

这里主要看声明式事务的处理,因为编程式事务的处理及提交都是用户在编码中进行控制。在声明式事务处理中,当方法执行完后,会执行 commitTransactionAfterReturning 方法来进行提交事务,该方法在 TransactionAspectSupport 类中,源码如下:

protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
  if (txInfo != null && txInfo.hasTransaction()) {
    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
  }
}

再看 commit 方法,该方法在 AbstractPlatformTransactionManager 类中,源码如下:

public final void commit(TransactionStatus status) throws TransactionException {
    // 这里省略很多代码,如事务回滚......
		processCommit(defStatus);
}

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
		try {
			boolean beforeCompletionInvoked = false;
			try {
        prepareForCommit(status);
				triggerBeforeCommit(status);
				triggerBeforeCompletion(status);
				beforeCompletionInvoked = true;
				boolean globalRollbackOnly = false;
				if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
					globalRollbackOnly = status.isGlobalRollbackOnly();
				}
				if (status.hasSavepoint()) {
					status.releaseHeldSavepoint();
				} else if (status.isNewTransaction()) {
          // 提交事务
					doCommit(status);
				}
				//......
			} catch (......) {
				// 事务异常处理......
			}

			try {
        // 事务提交成功后的处理-----这里是重点
				triggerAfterCommit(status);
			} finally {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
			}
		}
		finally {
			cleanupAfterCompletion(status);
    }
}

private void triggerAfterCommit(DefaultTransactionStatus status) {
  if (status.isNewSynchronization()) {
    TransactionSynchronizationUtils.triggerAfterCommit();
  }
}

最终会走到 TransactionSynchronizationUtils.triggerAfterCommit() 方法中

public static void triggerAfterCommit() {
  invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
}

public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) {
  if (synchronizations != null) {
      for (TransactionSynchronization synchronization : synchronizations) {
    synchronization.afterCommit();
      }
  }
}

上面会把缓存在 TransactionSynchronizationManager 中的 TransactionSynchronization 按顺序来执行 afterCommit 方法,其中 TransactionSynchronization 以集合形式缓存在 TransactionSynchronizationManager 的 ThreadLocal 中。

3.1、方式一

经过上面分析,只需要代码中重新生成个 TransactionSynchronization 并加入到 TransactionSynchronizationManager 的 TransactionSynchronization 集合中即可,所以有了解决方案,如下:

private void writeStatisticsData(Long studentId) {
  if(TransactionSynchronizationManager.isActualTransactionActive()) {
            // 当前存在事务
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
              @Override
              public void afterCommit() {
                executor.execute(() -> {Student student = studentService.findById(studentId);
                //........
                });
              }});
        } else {
            // 当前不存在事务
            executor.execute(() -> {Student student = studentService.findById(studentId);
                //........
                });
        }
}

3.2、方式二

使用 @TransactionalEventListener 结合 Spring事件监听机制,该注解自从Spring4.2版本开始有的,如下:

// 事件
public class StudentEvent extends ApplicationEvent {
    public StudentEvent(Long studentId) {
        super(studentId);
    }
}

// 监听器
public class StudentEventListener{
  @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
  public void writeStatisticsData(StudentEvent studentEvent) {
    executor.execute(() -> {
      Student student = studentService.findById(studentEvent.getSource());
      //........
    });
  }
}

@Service
public class StudentService {
  // Spring4.2之后,ApplicationEventPublisher自动被注入到容器中,采用Autowired即可获取
  @Autowired
  private ApplicationEventPublisher applicationEventPublisher;

  @Transactional
  public Long test() {
    // ......
    // 插入记录
    Long studentId = studentService.insert(student);
    // 发布事件
    applicationEventPublisher.publishEvent(new StudentEvent(studentId));
    // 插入学生地址记录
    Long addressId = addressService.insert(address);
    return studentId;
  }
}

原理分析

Spring Bean在加载配置文件时,会使用 AnnotationDrivenBeanDefinitionParser 来解析 annotation-driven 标签,如下:

public class TxNamespaceHandler extends NamespaceHandlerSupport {
  //......
	@Override
	public void init() {
		registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
		registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
		registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
	}
}
class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser {

  @Override
	public BeanDefinition parse(Element element, ParserContext parserContext) {
    // 重点——将TransactionalEventListenerFactory加入到容器中
		registerTransactionalEventListenerFactory(parserContext);
		String mode = element.getAttribute("mode");
		if ("aspectj".equals(mode)) {
			// mode="aspectj"
			registerTransactionAspect(element, parserContext);
		}
		else {
			// mode="proxy"
			AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
		}
		return null;
	}

  private void registerTransactionalEventListenerFactory(ParserContext parserContext) {
		RootBeanDefinition def = new RootBeanDefinition();
		def.setBeanClass(TransactionalEventListenerFactory.class);
		parserContext.registerBeanComponent(new BeanComponentDefinition(def,
				TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME));
	}
}
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {

  //省略部分代码......

	@Override
	public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
		return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
	}
}
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {

   //省略部分代码......

	@Override
	public void onApplicationEvent(ApplicationEvent event) {
		if (TransactionSynchronizationManager.isSynchronizationActive()) {
      // 事务存在时,生成TransactionSynchronization并加入到 TransactionSynchronizationManager的缓存集合中
			TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
			TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
		} else if (this.annotation.fallbackExecution()) {
			//.......
			}
			processEvent(event);
		} else {
			// 当前不存在事务什么也不做
		}
	}

上述 @TransactionalEventListener 本质上是一个 @EventListener,TransactionalEventListenerFactory类会将每一个扫描到的方法有TransactionalEventListener注解包装成ApplicationListenerMethodTransactionalAdapter对象,通过ApplicationListenerMethodTransactionalAdapter的onApplicationEvent方法可以看到若当前存在事务,就会生成TransactionSynchronization并加入到 TransactionSynchronizationManager的缓存ThreadLocal集合中,剩余流程同上述分析。(使用 @TransactionalEventListener 结合 Spring事件监听机制,并使用到异步方式感觉有点别扭,这里是为了说明问题)。

四、使用案例

ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5,
        new ThreadFactoryBuilder().setDaemon(false).setNamePrefix("execApiCache").build());

@Override
@Transactional(rollbackFor = Exception.class)
public ResultVO addApi(Api api, List<Header> headerList, List<Request> requestList, Response response, List<Script> scriptList, List<RespCodeMapping> respCodeMappingList) {

    // 数据库代码...

    // 异步代码
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            log.warn("afterCommit...");
            executorService.execute(() -> {
                // 异步业务
                execApiCache(api);
            });
    }});

    return ResultUtil.buildSucc();
}

Ps:setDaemon(false) 注意这里守护线程标记必须设置为 false,否则主线程执行完,异步线程没执行完的话,异步线程会马上被中断、关闭,所以这里不能设置成守护(用户)线程。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • 详解在SpringBoot中@Transactional事物操作和事物无效问题排查

    目录 1.spring事务管理简述 2.SpringBoot中使用@Transactional注解 2.1.开启事务注解 2.2.在目标类.方法上添加注解@Transactional 2.3.细化事务配置 3.@Transactional事务实现机制 3.1.整体事务控制流程 3.2.Spring AOP的两种代理 3.3.事务操作的底层实现 4.@Transactional使用注释实现及问题排查 4.1.数据库引擎是否支持事务? 4.3.注解所在的类是否被加载成Bean? 4.2.注解所在方法

  • springboot中使用@Transactional注解事物不生效的坑

    一:在springboot中使用事物遇到的坑 1.我们知道spring中的事物分为两种:一种是编程式事物,一种是声明式事物.顾名思义,编程式事物是指通过代码去实现事物管理,这里不做过多说明.另一种是声明式事物,分为两种情况01:一种是通过传统xml方式配置,02:使用@Transaction注解方式配置,这是主要讲解的是通过注解方式配置.因为在springboot项目中,会自动配置DataSourceTransactionManager,我们只需要在对应的方法上或者类上加上@Transactio

  • 聊聊spring @Transactional 事务无法使用的可能原因

    spring transaction 建议 Spring团队的建议是你在具体的类(或类的方法)上使用 @Transactional 注解, 而不要使用在类所要实现的任何接口上.你当然可以在接口上使用 @Transactional 注解, 但是这将只能当你设置了基于接口的代理时它才生效. 因为注解是不能继承的, 这就意味着如果你正在使用基于类的代理时,那么事务的设置将不能被基于类的代理所识别, 而且对象也将不会被事务代理所包装(将被确认为严重的). 因此请接受Spring团队的建议并且在具体的类上

  • Spring TransactionalEventListener事务未提交读取不到数据的解决

    目录 一.背景 二.问题分析 2.1.mysql隔离级别 2.2.问题原因分析 三.解决问题方案 3.1.方式一 3.2.方式二 四.使用案例 一.背景 业务处理过程,发现了以下问题,代码一是原代码能正常执行,代码二是经过迭代一次非正常执行代码 代码一:以下代码开启线程后,代码正常执行 ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQ

  • 解决plsql因事务未提交造成的锁表问题

    1.执行以下语句可查询被锁的表 select b.owner,b.object_name,a.session_id,a.locked_mode from v$locked_object a,dba_objects b where b.object_id = a.object_id; 2.执行以下语句可查询被锁的session和serial# select b.username,b.sid,b.serial#,logon_time from v$locked_object a,v$session

  • Spring中事务几个常见的问题解决

    目录 前言 Spring如何处理事务 1.编程式事务,可以使用TransactionTemplate 2.声明式事务 Spring事务传播机制 Spring事务隔离级别 前言 首先,事务这个概念是数据库层面的,Spring只是基于数据库中的事务进行扩展,以及提供了一些能让程序员更新方便操作事务的方式 Spring如何处理事务 Spring中支持编程式事务和声明式事务管理两种方式 1.编程式事务,可以使用TransactionTemplate public class B { @Autowired

  • 关于navicat事务自动提交问题

    最近在使用navicat编辑数据库表的时候遇到了一些问题,记录下~~ 问题:编辑完数据之后点击"✔"提交的时候navicat卡了,然后提示"MySql Lock wait timeout exceeded",截图现在没有了,总之意思就是超时了. 原因:后来查了好多资料,原来是navicat的自动提交事务关闭了,导致了修改数据之后事务一直未提交,所以再修改数据的时候就得等之前的事务,但是事务一直未提交,所以就超时了 解决原因:可以先看一下事务是否是自动提交的. sho

  • Spring中事务管理的四种方法(银行转账为例)

    前言 本文配套示例代码下载地址(完整可运行,含sql文件,下载后请修改数据库配置):点击这里下载 一.事务的作用 将若干的数据库操作作为一个整体控制,一起成功或一起失败. 原子性:指事务是一个不可分割的工作单位,事务中的操作要么都发生,要么都不发生. 一致性:指事务前后数据的完整性必须保持一致. 隔离性:指多个用户并发访问数据库时,一个用户的事务不能被其他用户的事务所干扰,多个并发事务之间数据要相互隔离. 持久性:指一个事务一旦被提交,它对数据库中数据的改变就是永久性的,即时数据库发生故障也不应

  • Spring强大事务兼容数据库多种组合解决业务需求

    目录 事物的由来 事物特性 什么事脏读.不可重复读.幻读 查询 spring事物 spring事物有哪些可配项 传播属性 事物的由来 在mysql中只有innodb存储引擎才支持事物,所以我们后续都是基于innodb来展开的 事物特性 事物是用来保证数据的完整性的,保证批量sql执行的统一性:事物具有四个特性: A(Atomicity).C(Consistency).I(Isolation).D(Durability) 原子性 一个事务(transaction)中的所有操作,要么全部完成,要么全

  • Spring框架事务属性中事务隔离级别与传播行为全面讲解

    目录 一.事务隔离级别 ①介绍 ②使用方式 二.事务传播行为 ①介绍 ②测试 一.事务隔离级别 ①介绍 数据库系统必须具有隔离并发运行各个事务的能力,使它们不会相互影响,避免各种并发问题.一个事 务与其他事务隔离的程度称为隔离级别.SQL标准中规定了多种事务隔离级别,不同隔离级别对应不同 的干扰程度,隔离级别越高,数据一致性就越好,但并发性越弱. 隔离级别一共有四种: 读未提交:READ UNCOMMITTED 允许Transaction01读取Transaction02未提交的修改. 读已提交

  • php使用PDO事务配合表格读取大量数据插入操作实现方法

    本文实例讲述了php使用PDO事务配合表格读取大量数据插入操作实现方法.分享给大家供大家参考,具体如下: 在处理大量数据的时候,或者同时对几个表操作,而这几个表的操作要求,要么都成功,要么都失败的时候,就需要用到事物,而PDO中提供的事物,一般可以满足需求. 关于事务的具体讲解,http://www.jb51.net/article/105744.htm 本篇文章,只涉及一个小例子. 在向数据库导入一个表格的时候,难免excel文件中存在部分错误,如果用常规方法,将会导致,一部分插入了数据库,一

  • Spring中事务用法示例及实现原理详解

    前言 Spring并不直接管理事务,而是提供了多种事务管理器,他们将事务管理的职责委托给Hibernate或者JTA等持久化机制所提供的相关平台框架的事务来实现. 关于事务,简单来说,就是为了保证数据完整性而存在的一种工具,其主要有四大特性:原子性,一致性,隔离性和持久性.对于Spring事务,其最终还是在数据库层面实现的,而Spring只是以一种比较优雅的方式对其进行封装支持.本文首先会通过一个简单的示例来讲解Spring事务是如何使用的,然后会讲解Spring是如何解析xml中的标签,并对事

  • Spring Boot事务配置操作

    1.在启动主类添加注解:@EnableTransactionManagement 来启用注解式事务管理,相当于之前在xml中配置的<tx:annotation-driven />注解驱动. 2.在需要事务的类或者方法上面添加@Transactional() 注解,里面可以配置需要的粒度: 这么多东西提供配置: Isolation :隔离级别 隔离级别是指若干个并发的事务之间的隔离程度,与我们开发时候主要相关的场景包括:脏读取.重复读.幻读. 我们可以看 org.springframework.

随机推荐