Java线程池流程编排运用实战源码

目录
  • 引导语
  • 1、流程引擎关键代码回顾
  • 2、异步执行SpringBean
  • 3、如何区分异步的SpringBean
  • 4、mock流程引擎数据中心
  • 5、新建线程池
  • 6、测试
  • 7、总结

引导语

在线程池的面试中,面试官除了喜欢问 ThreadPoolExecutor 的底层源码外,还喜欢问你有没有在实际的工作中用过 ThreadPoolExecutor,我们在并发集合类的《场景集合:并发 List、Map 的应用场景》一文中说过一种简单的流程引擎,如果没有看过的同学,可以返回去看一下。

本章就在流程引擎的基础上运用 ThreadPoolExecutor,使用线程池实现 SpringBean 的异步执行。

1、流程引擎关键代码回顾

《场景集合:并发 List、Map 的应用场景》文中流程引擎执行 SpringBean 的核心代码为:

  // 批量执行 Spring Bean
  private void stageInvoke(String flowName, StageEnum stage, FlowContent content) {
    List<DomainAbilityBean>
        domainAbilitys =
        FlowCenter.flowMap.getOrDefault(flowName, Maps.newHashMap()).get(stage);
    if (CollectionUtils.isEmpty(domainAbilitys)) {
      throw new RuntimeException("找不到该流程对应的领域行为" + flowName);
    }
    for (DomainAbilityBean domainAbility : domainAbilitys) {
      // 执行 Spring Bean
      domainAbility.invoke(content);
    }
  }

入参是 flowName(流程名称)、stage(阶段)、content(上下文),其中 stage 中会执行很多 SpringBean,SpringBean 被执行的代码是 domainAbility.invoke(content)。

2、异步执行 SpringBean

从上述代码中,我们可以看到所有的 SpringBean 都是串行执行的,效率较低,我们在实际业务中发现,有的 SpringBean 完全可以异步执行,这样既能完成业务请求,又能减少业务处理的 rt,对于这个需求,我们条件反射的有了两个想法:

需要新开线程来异步执行 SpringBean,可以使用 Runable 或者 Callable;业务请求量很大,我们不能每次来一个请求,就开一个线程,我们应该让线程池来管理异步执行的线程。

于是我们决定使用线程池来完成这个需求。

3、如何区分异步的 SpringBean

我们的 SpringBean 都是实现 DomainAbilityBean 这个接口的,接口定义如下:

public interface DomainAbilityBean {
  /**
   * 领域行为的方法入口
   */
  FlowContent invoke(FlowContent content);
}

从接口定义上来看,没有预留任何地方来标识该 SpringBean 应该是同步执行还是异步执行,这时候我们可以采取注解的方式,我们新建一个注解,只要 SpringBean 上有该注解,表示该 SpringBean 应该异步执行,否则应该同步执行,新建的注解如下:

/**
 * 异步 SpringBean 执行注解
 * SpringBean 需要异步执行的话,就打上该注解
 */
@Target(ElementType.TYPE)// 表示该注解应该打在类上
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface AsyncComponent {

}

接着我们新建了两个 SpringBean,并在其中一个 SpringBean 上打上异步的注解,并且打印出执行 SpringBean 的线程名称,如下图:

图中实现了两个 SpringBean:BeanOne 和 BeanTwo,其中 BeanTwo 被打上了 AsyncComponent 注解,表明 BeanTwo 应该被异步执行,两个 SpringBean 都打印出执行的线程的名称。

4、mock 流程引擎数据中心

《场景集合:并发 List、Map 的应用场景》一文中,我们说可以从数据库中加载出流程引擎需要的数据,此时我们 mock 一下,mock 的代码如下:

@Component
public class FlowCenter {
  /**
   * flowMap 是共享变量,方便访问
   */
  public static final Map<String, Map<StageEnum, List<DomainAbilityBean>>> flowMap
      = Maps.newConcurrentMap();
  /**
   * PostConstruct 注解的意思就是
   * 在容器启动成功之后,初始化 flowMap
   */
  @PostConstruct
  public void init() {
      // 初始化 flowMap mock
    Map<StageEnum, List<DomainAbilityBean>> stageMap = flowMap.getOrDefault("flow1",Maps.newConcurrentMap());
    for (StageEnum value : StageEnum.values()) {
      List<DomainAbilityBean> domainAbilitys = stageMap.getOrDefault(value, Lists.newCopyOnWriteArrayList());
      if(CollectionUtils.isEmpty(domainAbilitys)){
        domainAbilitys.addAll(ImmutableList.of(
            ApplicationContextHelper.getBean(BeanOne.class),
            ApplicationContextHelper.getBean(BeanTwo.class)
        ));
        stageMap.put(value,domainAbilitys);
      }
    }
    flowMap.put("flow1",stageMap);
    // 打印出加载完成之后的数据结果
    log.info("init success,flowMap is {}", JSON.toJSONString(flowMap));
  }
}

5、新建线程池

在以上操作完成之后,只剩下最后一步了,之前我们执行 SpringBean 时,是这行代码:domainAbility.invoke(content);

现在我们需要区分 SpringBean 是否是异步的,如果是异步的,丢到线程池中去执行,如果是同步的,仍然使用原来的方法进行执行,于是我们把这些逻辑封装到一个工具类中,工具类如下:

public class ComponentExecutor {
	// 我们新建了一个线程池
  private static ExecutorService executor = new ThreadPoolExecutor(15, 15,
                                                                   365L, TimeUnit.DAYS,
                                                                   new LinkedBlockingQueue<>());
	// 如果 SpringBean 上有 AsyncComponent 注解,表示该 SpringBean 需要异步执行,就丢到线程池中去
  public static final void run(DomainAbilityBean component, FlowContent content) {
    // 判断类上是否有 AsyncComponent 注解
    if (AnnotationUtils.isAnnotationPresent(AsyncComponent.class, AopUtils.getTargetClass(component))) {
      // 提交到线程池中
      executor.submit(() -> { component.invoke(content); });
      return;
    }
    // 同步 SpringBean 直接执行。
    component.invoke(content);
  }
}

我们把原来的执行代码替换成使用组件执行器执行,如下图:

6、测试

以上步骤完成之后,简单的流程引擎就已经完成了,我们简单地在项目启动的时候加上测试,代码如下:

更严谨的做法,是会写单元测试来测试流程引擎,为了快一点,我们直接在项目启动类上加上了测试代码。

运行之后的关键结果如下:

[main] demo.sixth.SynchronizedDemo: SynchronizedDemo init begin
[main] demo.sixth.SynchronizedDemo: SynchronizedDemo init end
[main] demo.three.flow.FlowCenter : init success,flowMap is {"flow1":{"PARAM_VALID":[{},{}],"AFTER_TRANSACTION":[{"$ref":"$.flow1.PARAM_VALID[0]"},{"$ref":"$.flow1.PARAM_VALID[1]"}],"BUSINESS_VALID":[{"$ref":"$.flow1.PARAM_VALID[0]"},{"$ref":"$.flow1.PARAM_VALID[1]"}],"IN_TRANSACTION":[{"$ref":"$.flow1.PARAM_VALID[0]"},{"$ref":"$.flow1.PARAM_VALID[1]"}]}}
o.s.j.e.a.AnnotationMBeanExporter  : Registering beans for JMX exposure on startup
[main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080 (http)
[main] demo.DemoApplication : Started DemoApplication in 5.377 seconds (JVM running for 6.105)
[main] demo.three.flow.BeanOne : BeanOne is run,thread name is main
[main] demo.three.flow.BeanOne : BeanOne is run,thread name is main
[pool-1-thread-1] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-1
[main] demo.three.flow.BeanOne : BeanOne is run,thread name is main
[pool-1-thread-2] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-2
[pool-1-thread-3] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-3
[main] demo.three.flow.BeanOne : BeanOne is run,thread name is main
[pool-1-thread-4] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-4

从运行结果中,我们可以看到 BeanTwo 已经被多个不同的线程异步执行了。

7、总结

这是一个线程池在简单流程引擎上的运用实站,虽然这个流程引擎看起来比较简单,但在实际工作中,还是非常好用的,大家可以把代码拉下来,自己尝试一下,调试一下参数,比如当我新增 SpringBean 的时候,流程引擎的表现如何。

以上就是Java线程池流程编排运用实战源码的详细内容,更多关于Java线程池流程编排运用的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java线程池的应用实例分析

    本文实例讲述了Java线程池的应用.分享给大家供大家参考,具体如下: 一 使用Future与Callable来计算斐波那契数列 1 代码 import java.util.concurrent.*; public class FutureCallableDemo { static long fibonacci(long n) { if (n == 1 ||n == 2) return 1; else return fibonacci(n - 1) + fibonacci(n - 2); } pu

  • JAVA线程池原理实例详解

    本文实例讲述了JAVA线程池原理.分享给大家供大家参考,具体如下: 线程池的优点 1.线程是稀缺资源,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以重复使用. 2.可以根据系统的承受能力,调整线程池中工作线程的数量,防止因为消耗过多内存导致服务器崩溃. 线程池的创建 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQu

  • Java中四种线程池的使用示例详解

    在什么情况下使用线程池? 1.单个任务处理的时间比较短 2.将需处理的任务的数量大 使用线程池的好处: 1.减少在创建和销毁线程上所花的时间以及系统资源的开销 2.如不使用线程池,有可能造成系统创建大量线程而导致消耗完系统内存以及"过度切换". 本文详细的给大家介绍了关于Java中四种线程池的使用,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍: FixedThreadPool 由Executors的newFixedThreadPool方法创建.它是一种线程数量固定的线程

  • Java线程池的简单使用方法实例教程

    目录 线程池使用场景? Java线程池使用 总结 线程池使用场景? java中经常需要用到多线程来处理一些业务,我们非常不建议单纯使用继承Thread或者实现Runnable接口的方式来创建线程,那样势必有创建及销毁线程耗费资源.线程上下文切换问题.同时创建过多的线程也可能引发资源耗尽的风险,这个时候引入线程池比较合理,方便线程任务的管理.java中涉及到线程池的相关类均在jdk1.5开始的java.util.concurrent包中,涉及到的几个核心类及接口包括:Executor.Execut

  • Java线程池用法实战案例分析

    本文实例讲述了Java线程池用法.分享给大家供大家参考,具体如下: 一 使用newSingleThreadExecutor创建一个只包含一个线程的线程池 1 代码 import java.util.concurrent.*; public class executorDemo { public static void main( String[] args ) { ExecutorService executor = Executors.newSingleThreadExecutor(); ex

  • Java线程池流程编排运用实战源码

    目录 引导语 1.流程引擎关键代码回顾 2.异步执行SpringBean 3.如何区分异步的SpringBean 4.mock流程引擎数据中心 5.新建线程池 6.测试 7.总结 引导语 在线程池的面试中,面试官除了喜欢问 ThreadPoolExecutor 的底层源码外,还喜欢问你有没有在实际的工作中用过 ThreadPoolExecutor,我们在并发集合类的<场景集合:并发 List.Map 的应用场景>一文中说过一种简单的流程引擎,如果没有看过的同学,可以返回去看一下. 本章就在流程

  • 深入理解Java线程池从设计思想到源码解读

    线程池:从设计思想到源码解析 前言初识线程池线程池优势线程池设计思路 深入线程池构造方法任务队列拒绝策略线程池状态初始化&容量调整&关闭 使用线程池ThreadPoolExecutorExecutors封装线程池 解读线程池execute()addWorker()Worker类runWorker()processWorkerExit() 前言 各位小伙伴儿,春节已经结束了,在此献上一篇肝了一个春节假期的迟来的拜年之作,希望读者朋友们都能有收获. 根据穆氏哲学,投入越多,收获越大.我作此文时

  • 深入了解Java线程池:从设计思想到源码解读

    目录 为什么需要线程池 线程池设计思路 线程池的工作机制 线程池的参数及使用 线程池的状态 提交任务 任务队列 线程工厂 拒绝策略 关闭线程池 Executors 静态工厂 合理地配置线程池 线程池的监控 源码分析 execute addWorker Worker runWorker getTask processWorkerExit 面试题 为什么需要线程池 我们知道创建线程的常用方式就是 new Thread() ,而每一次 new Thread() 都会重新创建一个线程,而线程的创建和销毁

  • 基于线程池的工作原理与源码解读

    随着cpu核数越来越多,不可避免的利用多线程技术以充分利用其计算能力.所以,多线程技术是服务端开发人员必须掌握的技术. 线程的创建和销毁,都涉及到系统调用,比较消耗系统资源,所以就引入了线程池技术,避免频繁的线程创建和销毁. 在Java用有一个Executors工具类,可以为我们创建一个线程池,其本质就是new了一个ThreadPoolExecutor对象.线程池几乎也是面试必考问题.本节结合源代码,说说ThreadExecutor的工作原理 一.线程池创建 先看一下ThreadPoolExec

  • 一篇文章带你搞懂Java线程池实现原理

    目录 1. 为什么要使用线程池 2. 线程池的使用 3. 线程池核心参数 4. 线程池工作原理 5. 线程池源码剖析 5.1 线程池的属性 5.2 线程池状态 5.3 execute源码 5.4 worker源码 5.5 runWorker源码 1. 为什么要使用线程池 使用线程池通常由以下两个原因: 频繁创建销毁线程需要消耗系统资源,使用线程池可以复用线程. 使用线程池可以更容易管理线程,线程池可以动态管理线程个数.具有阻塞队列.定时周期执行任务.环境隔离等. 2. 线程池的使用 /** *

  • 深度源码解析Java 线程池的实现原理

    java 系统的运行归根到底是程序的运行,程序的运行归根到底是代码的执行,代码的执行归根到底是虚拟机的执行,虚拟机的执行其实就是操作系统的线程在执行,并且会占用一定的系统资源,如CPU.内存.磁盘.网络等等.所以,如何高效的使用这些资源就是程序员在平时写代码时候的一个努力的方向.本文要说的线程池就是一种对 CPU 利用的优化手段. 线程池,百度百科是这么解释的: 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务.线程池线程都是后台线程.每个线程都使用默认的

  • Java 线程池ThreadPoolExecutor源码解析

    目录 引导语 1.整体架构图 1.1.类结构 1.2.类注释 1.3.ThreadPoolExecutor重要属性 2.线程池的任务提交 3.线程执行完任务之后都在干啥 4.总结 引导语 线程池我们在工作中经常会用到.在请求量大时,使用线程池,可以充分利用机器资源,增加请求的处理速度,本章节我们就和大家一起来学习线程池. 本章的顺序,先说源码,弄懂原理,接着看一看面试题,最后看看实际工作中是如何运用线程池的. 1.整体架构图 我们画了线程池的整体图,如下: 本小节主要就按照这个图来进行 Thre

  • java线程池核心API源码详细分析

    目录 概述 源码分析 Executor ExecutorService ScheduledExecutorService ThreadPoolExecutor ScheduledThreadPoolExecutor 总结 概述 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务.线程池线程都是后台线程.每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中.如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有

  • Java线程池ThreadPoolExecutor源码深入分析

    1.线程池Executors的简单使用 1)创建一个线程的线程池. Executors.newSingleThreadExecutor(); //创建的源码 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new Linke

随机推荐