Spring 事务事件监控及实现原理解析

前面我们讲到了Spring在进行事务逻辑织入的时候,无论是事务开始,提交或者回滚,都会触发相应的事务事件。本文首先会使用实例进行讲解Spring事务事件是如何使用的,然后会讲解这种使用方式的实现原理。

1. 示例

对于事务事件,Spring提供了一个注解@TransactionEventListener,将这个注解标注在某个方法上,那么就将这个方法声明为了一个事务事件处理器,而具体的事件类型则是由TransactionalEventListener.phase属性进行定义的。如下是TransactionalEventListener的声明:

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface TransactionalEventListener {
  // 指定当前标注方法处理事务的类型
	TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;
  // 用于指定当前方法如果没有事务,是否执行相应的事务事件监听器
	boolean fallbackExecution() default false;
  // 与classes属性一样,指定了当前事件传入的参数类型,指定了这个参数之后就可以在监听方法上
  // 直接什么一个这个参数了
	@AliasFor(annotation = EventListener.class, attribute = "classes")
	Class<?>[] value() default {};
  // 作用于value属性一样,用于指定当前监听方法的参数类型
	@AliasFor(annotation = EventListener.class, attribute = "classes")
	Class<?>[] classes() default {};
  // 这个属性使用Spring Expression Language对目标类和方法进行匹配,对于不匹配的方法将会过滤掉
	String condition() default "";
}

关于这里的classes属性需要说明一下,如果指定了classes属性,那么当前监听方法的参数类型就可以直接使用所发布的事件的参数类型,如果没有指定,那么这里监听的参数类型可以使用两种:ApplicationEvent和PayloadApplicationEvent。对于ApplicationEvent类型的参数,可以通过其getSource()方法获取发布的事件参数,只不过其返回值是一个Object类型的,如果想获取具体的类型还需要进行强转;对于PayloadApplicationEvent类型,其可以指定一个泛型参数,该泛型参数必须与发布的事件的参数类型一致,这样就可以通过其getPayload()方法获取事务事件发布的数据了。关于上述属性中的TransactionPhase,其可以取如下几个类型的值:

public enum TransactionPhase {
  // 指定目标方法在事务commit之前执行
	BEFORE_COMMIT,
  // 指定目标方法在事务commit之后执行
	AFTER_COMMIT,
  // 指定目标方法在事务rollback之后执行
	AFTER_ROLLBACK,
  // 指定目标方法在事务完成时执行,这里的完成是指无论事务是成功提交还是事务回滚了
	AFTER_COMPLETION
}

这里我们假设数据库有一个user表,对应的有一个UserService和User的model,用于往该表中插入数据,并且插入动作时使用注解标注目标方法。如下是这几个类的声明:

public class User {
 private long id;
 private String name;
 private int age;
 // getter and setter...
}
@Service
@Transactional
public class UserServiceImpl implements UserService {
 @Autowired
 private JdbcTemplate jdbcTemplate;
 @Autowired
 private ApplicationEventPublisher publisher;
 @Override
 public void insert(User user) {
  jdbcTemplate.update("insert into user (id, name, age) value (?, ?, ?)",
    user.getId(), user.getName(), user.getAge());
  publisher.publishEvent(user);
 }
}

上述代码中有一点需要注意的是,对于需要监控事务事件的方法,在目标方法执行的时候需要使用

ApplicationEventPublisher发布相应的事件消息。如下是对上述消息进行监控的程序:

@Component
public class UserTransactionEventListener {
 @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
 public void beforeCommit(PayloadApplicationEvent<User> event) {
  System.out.println("before commit, id: " + event.getPayload().getId());
 }
 @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
 public void afterCommit(PayloadApplicationEvent<User> event) {
  System.out.println("after commit, id: " + event.getPayload().getId());
 }
 @TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
 public void afterCompletion(PayloadApplicationEvent<User> event) {
  System.out.println("after completion, id: " + event.getPayload().getId());
 }
 @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
 public void afterRollback(PayloadApplicationEvent<User> event) {
  System.out.println("after rollback, id: " + event.getPayload().getId());
 }
}

这里对于事件的监控,只需要在监听方法上添加@TransactionalEventListener注解即可。这里需要注意的一个问题,在实际使用过程中,对于监听的事务事件,需要使用其他的参数进行事件的过滤,因为这里的监听还是会监听所有事件参数为User类型的事务,而无论其是哪个位置发出来的。如果需要对事件进行过滤,这里可以封装一个UserEvent对象,其内保存一个类似EventType的属性和一个User对象,这样在发布消息的时候就可以指定EventType属性,而在监听消息的时候判断当前方法监听的事件对象的EventType是否为目标type,如果是,则对其进行处理,否则直接略过。下面是上述程序的xml文件配置和驱动程序:

<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource">
  <property name="url" value="jdbc:mysql://localhost/test?useUnicode=true"/>
  <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
  <property name="username" value="******"/>
  <property name="password" value="******"/>
</bean>

<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
  <property name="dataSource" ref="dataSource"/>
</bean>

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
  <property name="dataSource" ref="dataSource"/>
</bean>

<context:component-scan base-package="com.transaction"/>
<tx:annotation-driven/>
public class TransactionApp {
 @Test
 public void testTransaction() {
  ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext.xml");
  UserService userService = context.getBean(UserService.class);
  User user = getUser();
  userService.insert(user);
 }

 private User getUser() {
  int id = new Random()
   .nextInt(1000000);
  User user = new User();
  user.setId(id);
  user.setName("Mary");
  user.setAge(27);
  return user;
 }
}

运行上述程序,其执行结果如下:

before commit, id: 935052
after commit, id: 935052
after completion, id: 935052

可以看到,这里确实成功监听了目标程序的相关事务行为。

2. 实现原理

关于事务的实现原理,这里其实是比较简单的,在前面的文章中,我们讲解到,Spring对事务监控的处理逻辑在TransactionSynchronization中,如下是该接口的声明:

public interface TransactionSynchronization extends Flushable {

  // 在当前事务挂起时执行
	default void suspend() {
	}

  // 在当前事务重新加载时执行
	default void resume() {
	}

  // 在当前数据刷新到数据库时执行
	default void flush() {
	}

  // 在当前事务commit之前执行
	default void beforeCommit(boolean readOnly) {
	}

  // 在当前事务completion之前执行
	default void beforeCompletion() {
	}

  // 在当前事务commit之后实质性
	default void afterCommit() {
	}

  // 在当前事务completion之后执行
	default void afterCompletion(int status) {
	}
}

很明显,这里的TransactionSynchronization接口只是抽象了一些行为,用于事务事件发生时触发,这些行为在Spring事务中提供了内在支持,即在相应的事务事件时,其会获取当前所有注册的TransactionSynchronization对象,然后调用其相应的方法。那么这里TransactionSynchronization对象的注册点对于我们了解事务事件触发有至关重要的作用了。这里我们首先回到事务标签的解析处,在前面讲解事务标签解析时,我们讲到Spring会注册一个TransactionalEventListenerFactory类型的bean到Spring容器中,这里关于标签的解析读者可以阅读本人前面的文章Spring事务用法示例与实现原理。这里注册的TransactionalEventListenerFactory实现了EventListenerFactory接口,这个接口的主要作用是先判断目标方法是否是某个监听器的类型,然后为目标方法生成一个监听器,其会在某个bean初始化之后由Spring调用其方法用于生成监听器。如下是该类的实现:

ublic class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {

  // 指定当前监听器的顺序
  private int order = 50;

  public void setOrder(int order) {
    this.order = order;
  }

  @Override
  public int getOrder() {
    return this.order;
  }

  // 指定目标方法是否是所支持的监听器的类型,这里的判断逻辑就是如果目标方法上包含有
  // TransactionalEventListener注解,则说明其是一个事务事件监听器
  @Override
  public boolean supportsMethod(Method method) {
    return (AnnotationUtils.findAnnotation(method, TransactionalEventListener.class) != null);
  }

  // 为目标方法生成一个事务事件监听器,这里ApplicationListenerMethodTransactionalAdapter实现了
  // ApplicationEvent接口
  @Override
  public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
    return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
  }

}  

这里关于事务事件监听的逻辑其实已经比较清楚了。ApplicationListenerMethodTransactionalAdapter本质上是实现了ApplicationListener接口的,也就是说,其是Spring的一个事件监听器,这也就是为什么进行事务处理时需要使用ApplicationEventPublisher.publish()方法发布一下当前事务的事件。

ApplicationListenerMethodTransactionalAdapter在监听到发布的事件之后会生成一个TransactionSynchronization对象,并且将该对象注册到当前事务逻辑中,如下是监听事务事件的处理逻辑:

@Override
public void onApplicationEvent(ApplicationEvent event) {
  // 如果当前TransactionManager已经配置开启事务事件监听,
  // 此时才会注册TransactionSynchronization对象
  if (TransactionSynchronizationManager.isSynchronizationActive()) {
    // 通过当前事务事件发布的参数,创建一个TransactionSynchronization对象
    TransactionSynchronization transactionSynchronization =
      createTransactionSynchronization(event);
    // 注册TransactionSynchronization对象到TransactionManager中
    TransactionSynchronizationManager
      .registerSynchronization(transactionSynchronization);
  } else if (this.annotation.fallbackExecution()) {
    // 如果当前TransactionManager没有开启事务事件处理,但是当前事务监听方法中配置了
    // fallbackExecution属性为true,说明其需要对当前事务事件进行监听,无论其是否有事务
    if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK
      && logger.isWarnEnabled()) {
      logger.warn("Processing "
            + event + " as a fallback execution on AFTER_ROLLBACK phase");
    }
    processEvent(event);
  } else {
    // 走到这里说明当前是不需要事务事件处理的,因而直接略过
    if (logger.isDebugEnabled()) {
      logger.debug("No transaction is active - skipping " + event);
    }
  }
}

这里需要说明的是,上述annotation属性就是在事务监听方法上解析的TransactionalEventListener注解中配置的属性。可以看到,对于事务事件的处理,这里创建了一个TransactionSynchronization对象,其实主要的处理逻辑就是在返回的这个对象中,而createTransactionSynchronization()方法内部只是创建了一个TransactionSynchronizationEventAdapter对象就返回了。这里我们直接看该对象的源码:

private static class TransactionSynchronizationEventAdapter
    extends TransactionSynchronizationAdapter {

  private final ApplicationListenerMethodAdapter listener;
  private final ApplicationEvent event;
  private final TransactionPhase phase;

  public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter
    listener, ApplicationEvent event, TransactionPhase phase) {
    this.listener = listener;
    this.event = event;
    this.phase = phase;
  }

  @Override
  public int getOrder() {
    return this.listener.getOrder();
  }

  // 在目标方法配置的phase属性为BEFORE_COMMIT时,处理before commit事件
  public void beforeCommit(boolean readOnly) {
    if (this.phase == TransactionPhase.BEFORE_COMMIT) {
      processEvent();
    }
  }

  // 这里对于after completion事件的处理,虽然分为了三个if分支,但是实际上都是执行的processEvent()
  // 方法,因为after completion事件是事务事件中一定会执行的,因而这里对于commit,
  // rollback和completion事件都在当前方法中处理也是没问题的
  public void afterCompletion(int status) {
    if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
      processEvent();
    } else if (this.phase == TransactionPhase.AFTER_ROLLBACK
          && status == STATUS_ROLLED_BACK) {
      processEvent();
    } else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
      processEvent();
    }
  }

  // 执行事务事件
  protected void processEvent() {
    this.listener.processEvent(this.event);
  }
}

可以看到,对于事务事件的处理,最终都是委托给了ApplicationListenerMethodAdapter.processEvent()方法进行的。如下是该方法的源码:

public void processEvent(ApplicationEvent event) {
  // 处理事务事件的相关参数,这里主要是判断TransactionalEventListener注解中是否配置了value
  // 或classes属性,如果配置了,则将方法参数转换为该指定类型传给监听的方法;如果没有配置,则判断
  // 目标方法是ApplicationEvent类型还是PayloadApplicationEvent类型,是则转换为该类型传入
  Object[] args = resolveArguments(event);
  // 这里主要是获取TransactionalEventListener注解中的condition属性,然后通过
  // Spring expression language将其与目标类和方法进行匹配
  if (shouldHandle(event, args)) {
    // 通过处理得到的参数借助于反射调用事务监听方法
    Object result = doInvoke(args);
    if (result != null) {
      // 对方法的返回值进行处理
      handleResult(result);
    } else {
      logger.trace("No result object given - no result to handle");
    }
  }
}

// 处理事务监听方法的参数
protected Object[] resolveArguments(ApplicationEvent event) {
  // 获取发布事务事件时传入的参数类型
  ResolvableType declaredEventType = getResolvableType(event);
  if (declaredEventType == null) {
    return null;
  }

  // 如果事务监听方法的参数个数为0,则直接返回
  if (this.method.getParameterCount() == 0) {
    return new Object[0];
  }

  // 如果事务监听方法的参数不为ApplicationEvent或PayloadApplicationEvent,则直接将发布事务
  // 事件时传入的参数当做事务监听方法的参数传入。从这里可以看出,如果事务监听方法的参数不是
  // ApplicationEvent或PayloadApplicationEvent类型,那么其参数必须只能有一个,并且这个
  // 参数必须与发布事务事件时传入的参数一致
  Class<?> eventClass = declaredEventType.getRawClass();
  if ((eventClass == null || !ApplicationEvent.class.isAssignableFrom(eventClass)) &&
    event instanceof PayloadApplicationEvent) {
    return new Object[] {((PayloadApplicationEvent) event).getPayload()};
  } else {
    // 如果参数类型为ApplicationEvent或PayloadApplicationEvent,则直接将其传入事务事件方法
    return new Object[] {event};
  }
}

// 判断事务事件方法方法是否需要进行事务事件处理
private boolean shouldHandle(ApplicationEvent event, @Nullable Object[] args) {
  if (args == null) {
    return false;
  }
  String condition = getCondition();
  if (StringUtils.hasText(condition)) {
    Assert.notNull(this.evaluator, "EventExpressionEvaluator must no be null");
    EvaluationContext evaluationContext = this.evaluator.createEvaluationContext(
      event, this.targetClass, this.method, args, this.applicationContext);
    return this.evaluator.condition(condition, this.methodKey, evaluationContext);
  }
  return true;
}

// 对事务事件方法的返回值进行处理,这里的处理方式主要是将其作为一个事件继续发布出去,这样就可以在
// 一个统一的位置对事务事件的返回值进行处理
protected void handleResult(Object result) {
  // 如果返回值是数组类型,则对数组元素一个一个进行发布
  if (result.getClass().isArray()) {
    Object[] events = ObjectUtils.toObjectArray(result);
    for (Object event : events) {
      publishEvent(event);
    }
  } else if (result instanceof Collection<?>) {
    // 如果返回值是集合类型,则对集合进行遍历,并且发布集合中的每个元素
    Collection<?> events = (Collection<?>) result;
    for (Object event : events) {
      publishEvent(event);
    }
  } else {
    // 如果返回值是一个对象,则直接将其进行发布
    publishEvent(result);
  }
}

对于事务事件的处理,总结而言,就是为每个事务事件监听方法创建了一个TransactionSynchronizationEventAdapter对象,通过该对象在发布事务事件的时候,会在当前线程中注册该对象,这样就可以保证每个线程每个监听器中只会对应一个TransactionSynchronizationEventAdapter对象。在Spring进行事务事件的时候会调用该对象对应的监听方法,从而达到对事务事件进行监听的目的。

3. 小结

本文首先对事务事件监听程序的使用方式进行了讲解,然后在源码层面讲解了Spring事务监听器是如何实现的。在Spring事务监听器使用过程中,需要注意的是要对当前接收到的事件类型进行判断,因为不同的事务可能会发布同样的消息对象过来。

总结

以上所述是小编给大家介绍的Spring 事务事件监控及实现原理,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对我们网站的支持!

(0)

相关推荐

  • 使用Spring事件机制实现异步的方法

    当把一个事件发布到Spring提供的ApplicationContext中,被监听器侦测到,就会执行对应的处理方法. 事件本身 事件是一个自定义的类,需要继承Spring提供的ApplicationEvent. @Data public class MyEvent extends ApplicationEvent { private String msg; public MyEvent(Object source, String msg) { super(source); this.msg =

  • Spring事务事件监控的实现

    前面我们讲到了Spring在进行事务逻辑织入的时候,无论是事务开始,提交或者回滚,都会触发相应的事务事件.本文首先会使用实例进行讲解Spring事务事件是如何使用的,然后会讲解这种使用方式的实现原理. 1.示例 对于事务事件,Spring提供了一个注解@TransactionEventListener,将这个注解标注在某个方法上,那么就将这个方法声明为了一个事务事件处理器,而具体的事件类型则是由TransactionalEventListener.phase属性进行定义的.如下是Transact

  • Tomcat和Spring中的事件机制深入讲解

    引言 最近在看tomcat源码,源码中出现了大量事件消息,可以说整个tomcat的启动流程都可以通过事件派发机制串起来,研究透了tomcat的各种事件消息,基本上对tomcat的启动流程也就有了一个整体的认识.在这一基础上,联想到之前在看spring源码过程中也存在不少事件相关知识,于是想对这两个框架中的事件派发机制做一个简单的总结,加深理解. 事件机制原理其实比较简单,抽象来看的话,设计模式中的观察者模式可以说是最经典的事件驱动机制的体现了,观察者和被观察者就体现了事件监听和事件派发的角色.还

  • Spring的事件监听机制示例详解

    前言 最近公司在重构广告系统,其中核心的打包功由广告系统调用,即对apk打包的调用和打包完成之后的回调,需要提供相应的接口给广告系统.因此,为了将apk打包的核心流程和对接广告系统的业务解耦,利用了spring的事件监听特性来满足需求.以下说明spring的事件机制的相关内容. 首先spring事件分为事件发布者(EventPublisher).事件监听者(EventListener),还包括一个事件广播者(这个是spring实现相关,这一节不讨论).使用spring事件机制,需要自定义事件发布

  • 基于spring如何实现事件驱动实例代码

    干货点 通过阅读该篇博客,你可以了解了解java的反射机制.可以了解如何基于spring生命周期使用自定义注解解决日常研发问题.具体源码可以点击链接. 问题描述 在日常研发中,经常会遇见业务A的某个action被触发后,同时触发业务B的action的行为,这种单对单的形式可以直接在业务A的action执行结束后直接调用业务B的action,那么如果是单对多的情况呢? 方案解决 这里提供一种在日常研发中经常使用到的机制,基于spring实现的事件驱动,即在业务A的action执行完,抛出一个事件,

  • Spring 事务事件监控及实现原理解析

    前面我们讲到了Spring在进行事务逻辑织入的时候,无论是事务开始,提交或者回滚,都会触发相应的事务事件.本文首先会使用实例进行讲解Spring事务事件是如何使用的,然后会讲解这种使用方式的实现原理. 1. 示例 对于事务事件,Spring提供了一个注解@TransactionEventListener,将这个注解标注在某个方法上,那么就将这个方法声明为了一个事务事件处理器,而具体的事件类型则是由TransactionalEventListener.phase属性进行定义的.如下是Transac

  • Spring事务隔离级别简介及实例解析

    本文研究的主要是Spring事务隔离级别(solation level)介绍及例子,具体如下. 当两个事务对同一个数据库的记录进行操作时,那么,他们之间的影响是怎么样的呢?这就出现了事务隔离级别的概念.数据库的隔离性与并发控制有很大关系.数据库的隔离级别是数据库的事务特性ACID的一部分.ACID,即原子性(atomicity).一致性(consistency).隔离性(isolation)和持久性(durability).Spring的事务隔离级别有四个:READ_UNCOMMITTED.RE

  • Spring整合Dubbo框架过程及原理解析

    这篇文章主要介绍了Spring整合Dubbo框架过程及原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 Dubbo作为一个RPC框架,其最核心的功能就是要实现跨网络的远程调用.演示过程创建两个小工程,一个作为服务的提供者,一个作为服务的消费者.通过Dubbo来实现服务消费者远程调用服务提供者的方法. dubbo 的使用需要一个注册中心,这里以Zookeeper为例来演示 1.Dubbo架构 Dubbo架构图(Dubbo官方提供)如下: 节

  • PyQt5 closeEvent关闭事件退出提示框原理解析

    这篇文章主要介绍了PyQt5 closeEvent关闭事件退出提示框原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 如果关闭QWidget,就会产生一个QCloseEvent,并且把它传入到closeEvent函数的event参数中.改变控件的默认行为,就是替换掉默认的事件处理. 原生的英文提示框: # 添加一个退出的提示事件 def closeEvent(self, event): """我们创建了一个消息框,上面

  • Spring占位符Placeholder的实现原理解析

    占位符Placeholder的使用 xml中的配置: <?xml version="1.0" encoding="utf-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http:/

  • Spring boot事件监听实现过程解析

    事件监听其实我们并不陌生,简单来讲,当程序达到了某个特定的条件,程序就会自动执行一段指令.在spring 中也一样,我们可以使用spring中的事件监听来实现某些特定的需求. 发布事件 既然要监听事件,首先要发布我们的事件嘛.在spring中发布事件我们可以通过继承ApplicationEvent 来发布我们的事件类. @Data public class SendEvent extends ApplicationEvent { public SendEvent(Object source) {

  • Spring Cloud负载均衡组件Ribbon原理解析

    目录 前言 一个问题引发的思考 Ribbon的简单使用 Ribbon 原理分析 LoadBalancerAutoConfiguration 自动装配 RestTemplateCustomizer LoadBalancerInterceptor RibbonLoadBalancerClient#execute ZoneAwareLoadBalancer 负载均衡器 如何获取所有服务 如何判断服务是否可用 Ribbon 的负载均衡算法 总结 微服务体系下的 Spring Cloud Netflix

  • Spring mvc JSON数据交换格式原理解析

    什么是JSON JSON(JavaScript Object Notation, JS 对象标记) 是一种轻量级的数据交换格式,目前使用特别广泛. 采用完全独立于编程语言的文本格式来存储和表示数据. 简洁和清晰的层次结构使得 JSON 成为理想的数据交换语言. 易于人阅读和编写,同时也易于机器解析和生成,并有效地提升网络传输效率. 在 JavaScript 语言中,一切都是对象.因此,任何JavaScript 支持的类型都可以通过 JSON 来表示,例如字符串.数字.对象.数组等.看看他的要求和

  • Spring Data JPA分页复合查询原理解析

    Spring Data JPA是Spring Data家族的一部分,可以轻松实现基于JPA的存储库. 此模块处理对基于JPA的数据访问层的增强支持. 它使构建使用数据访问技术的Spring驱动应用程序变得更加容易. 在相当长的一段时间内,实现应用程序的数据访问层一直很麻烦. 必须编写太多样板代码来执行简单查询以及执行分页和审计. Spring Data JPA旨在通过减少实际需要的工作量来显著改善数据访问层的实现. 作为开发人员,您编写repository接口,包括自定义查找器方法,Spring

随机推荐