基于ThreadPoolTaskExecutor的使用说明

目录
  • ThreadPoolTaskExecutor的使用
    • springboot 配置
    • 提交任务
  • ThreadPoolTaskExecutor配置问题
    • 有关spring中ThreadPoolTaskExecutor具体如下
    • 回忆一下线程池工作原理
      • 测试场景1
      • 测试场景2

ThreadPoolTaskExecutor的使用

当我们需要实现并发、异步等操作时,通常都会使用到ThreadPoolTaskExecutor,现对其使用稍作总结。

springboot 配置

提交任务

  • 无返回值的任务使用execute(Runnable)
  • 有返回值的任务使用submit(Runnable)

处理流程

当一个任务被提交到线程池时,首先查看线程池的核心线程是否都在执行任务,否就选择一条线程执行任务,是就执行第二步。

查看核心线程池是否已满,不满就创建一条线程执行任务,否则执行第三步。

查看任务队列是否已满,不满就将任务存储在任务队列中,否则执行第四步。

查看线程池是否已满,不满就创建一条线程执行任务,否则就按照策略处理无法执行的任务。

在ThreadPoolExecutor中表现为:

如果当前运行的线程数小于corePoolSize,那么就创建线程来执行任务(执行时需要获取全局锁)。

如果运行的线程大于或等于corePoolSize,那么就把task加入BlockQueue。

如果创建的线程数量大于BlockQueue的最大容量,那么创建新线程来执行该任务。

如果创建线程导致当前运行的线程数超过maximumPoolSize,就根据饱和策略来拒绝该任务。

关闭线程池

调用shutdown或者shutdownNow,两者都不会接受新的任务,而且通过调用要停止线程的interrupt方法来中断线程,有可能线程永远不会被中断,不同之处在于shutdownNow会首先将线程池的状态设置为STOP,然后尝试停止所有线程(有可能导致部分任务没有执行完)然后返回未执行任务的列表。而shutdown则只是将线程池的状态设置为shutdown,然后中断所有没有执行任务的线程,并将剩余的任务执行完。

配置线程个数

如果是CPU密集型任务,那么线程池的线程个数应该尽量少一些,一般为CPU的个数+1条线程。

如果是IO密集型任务,那么线程池的线程可以放的很大,如2*CPU的个数。

对于混合型任务,如果可以拆分的话,通过拆分成CPU密集型和IO密集型两种来提高执行效率;如果不能拆分的的话就可以根据实际情况来调整线程池中线程的个数。

监控线程池状态

常用状态

taskCount:线程需要执行的任务个数。

completedTaskCount:线程池在运行过程中已完成的任务数。

largestPoolSize:线程池曾经创建过的最大线程数量。

getPoolSize:获取当前线程池的线程数量。

getActiveCount:获取活动的线程的数量

通过继承线程池,重写beforeExecute,afterExecute和terminated方法来在线程执行任务前,线程执行任务结束,和线程终结前获取线程的运行情况,根据具体情况调整线程池的线程数量。

ThreadPoolTaskExecutor配置问题

最近线上出现一个奇葩问题,使用的是ThreadPoolTaskExecutor来处理后续服务调用,刚开始运行ThreadPoolTaskExecutor处理后续服务调用是没有问题的,但是一段时间之后,发现后续服务一直没有被调用,导致了极其严重的后果

有关spring中ThreadPoolTaskExecutor具体如下

<bean id="threadPoolTaskExecutor"
            class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <!-- 核心线程数,默认为1 -->
    <property name="corePoolSize" value="5" />
    <!-- 最大线程数,默认为Integer.MAX_VALUE -->
    <property name="maxPoolSize" value="16" />
    <!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE -->
    <!--<property name="queueCapacity" value="10" />-->
    <!-- 线程池维护线程所允许的空闲时间,默认为60s -->
    <property name="keepAliveSeconds" value="300" />
    <!-- 线程池对拒绝任务(无线程可用)的处理策略,
        目前只支持AbortPolicy、CallerRunsPolicy;默认为后者
    -->
    <property name="rejectedExecutionHandler">
        <!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
        <!-- CallerRunsPolicy:
            主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,
        -->
        <!-- DiscardOldestPolicy:
            抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行
             -->
        <!-- DiscardPolicy:
            抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行
        -->
        <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
    </property>
</bean>

那就不得不了解一下java.util.concurrent包下Executor构架了

回忆一下线程池工作原理

如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(需要获得全局锁)

如果运行的线程等于或多于corePoolSize ,则将任务加入BlockingQueue

如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(需要获得全局锁)

如果创建新线程将使当前运行的线程超出maxiumPoolSize,任务将被拒绝,并调用

RejectedExecutionHandler.rejectedExecution()方法

测试场景1

首先,注释queueCapacity的一行

任务:

public class CustomRunnable implements Runnable {
    private int id;
    public CustomRunnable(int id) {
        this.id = id;
    }
    @Override
    public void run() {
        try {
            System.out.println("begin execute "+ Thread.currentThread().getName()
                    + "-- task id: "+ id);
            String rs =  ClientUtil.get("http://www.****.com");
            System.out.println("end execute task: "+ id);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

测试案例:

@Test
public void threadTest() throws InterruptedException {
    for (int i=0; i< 35; i++){
        Thread t= new Thread(new CustomRunnable(i));
        executor.execute(t);
    }
    Thread.sleep(1800000);
}

测试结果:

七月 09, 2018 5:46:47 下午 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor initialize
信息: Initializing ExecutorService 'threadPoolTaskExecutor'
begin execute threadPoolTaskExecutor-1-- task id: 0
begin execute threadPoolTaskExecutor-2-- task id: 1
begin execute threadPoolTaskExecutor-3-- task id: 2
begin execute threadPoolTaskExecutor-4-- task id: 3
begin execute threadPoolTaskExecutor-5-- task id: 4
end execute task: 4
begin execute threadPoolTaskExecutor-5-- task id: 5
end execute task: 1
begin execute threadPoolTaskExecutor-2-- task id: 6
end execute task: 0
begin execute threadPoolTaskExecutor-1-- task id: 7
end execute task: 2
begin execute threadPoolTaskExecutor-3-- task id: 8
end execute task: 3
begin execute threadPoolTaskExecutor-4-- task id: 9
...

可以发现,一开始线程池就创建了corePoolSize大小的线程,对于之后的新加进的任务,就放到BlockingQueue中,默认是使用LinkedBlockingQueue,大小是Integer.MAX_VALUE,因为队列大小太大,所以就不会创建maxPoolSize大小的线程数量,因此,只有线程处理完当前任务,才会去处理下一个任务,所以,刚加进去的任务得不到立即处理

测试场景2

只需要打开queueCapacity的一行,其他不变

测试结果:

七月 09, 2018 6:07:13 下午 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor initialize
信息: Initializing ExecutorService 'threadPoolTaskExecutor'
begin execute threadPoolTaskExecutor-1-- task id: 0
begin execute threadPoolTaskExecutor-2-- task id: 1
begin execute threadPoolTaskExecutor-3-- task id: 2
begin execute threadPoolTaskExecutor-4-- task id: 3
begin execute threadPoolTaskExecutor-5-- task id: 4
begin execute threadPoolTaskExecutor-6-- task id: 15
begin execute threadPoolTaskExecutor-7-- task id: 16
begin execute threadPoolTaskExecutor-8-- task id: 17
begin execute threadPoolTaskExecutor-9-- task id: 18
begin execute threadPoolTaskExecutor-10-- task id: 19
begin execute threadPoolTaskExecutor-11-- task id: 20
begin execute threadPoolTaskExecutor-12-- task id: 21
begin execute threadPoolTaskExecutor-14-- task id: 23
begin execute threadPoolTaskExecutor-15-- task id: 24
begin execute main-- task id: 26
begin execute threadPoolTaskExecutor-13-- task id: 22
begin execute threadPoolTaskExecutor-16-- task id: 25
begin execute threadPoolTaskExecutor-11-- task id: 5
end execute task: 15
begin execute threadPoolTaskExecutor-6-- task id: 6
end execute task: 23
begin execute threadPoolTaskExecutor-14-- task id: 7
end execute task: 4
begin execute threadPoolTaskExecutor-5-- task id: 8
end execute task: 17
begin execute threadPoolTaskExecutor-8-- task id: 9
....

可以发现,因为初始任务数量大于corePoolSize大小,所以线程池初始化就创建了maxPoolSize大小数量的纯种,对于后续新加进的任务会入到BlockingQueue队列中去,之后等待线程处理完一个任务之后再处理队列中的任务

猜想

线上出现这种原因可能就是因为queueCapacity被设置成了默认(Integer.MAX_VALUE),而且初始化纯种的corePoolSize数量过少,并且线程处理速度较慢(业务逻辑,网络请求等等原因),导致后续任务会一直填加到队列中去,迟迟得不到立即处理。

解决方案

手动设置queueCapacity大小,网络请求原因的话,可以设置超时时间;业务逻辑的话,另辟蹊径。。。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • Spring线程池ThreadPoolTaskExecutor配置详情

    本文介绍了Spring线程池ThreadPoolTaskExecutor配置,分享给大家,具体如下: 1. ThreadPoolTaskExecutor配置 <!-- spring thread pool executor --> <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 线

  • 线程池ThreadPoolExecutor使用简介与方法实例

    一.简介 线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为: ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) corePoolSize: 线程池维护线程的最少数量 maximumPool

  • 如何基于ThreadPoolExecutor创建线程池并操作

    日常工作中很多地方很多效率极低的操作,往往可以改串行为并行,执行效率往往提高数倍,废话不多说先上代码 1.用到的guava坐标 <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency> 2.创建一个枚举保证线程池是单例 package

  • 基于ThreadPoolTaskExecutor的使用说明

    目录 ThreadPoolTaskExecutor的使用 springboot 配置 提交任务 ThreadPoolTaskExecutor配置问题 有关spring中ThreadPoolTaskExecutor具体如下 回忆一下线程池工作原理 测试场景1 测试场景2 ThreadPoolTaskExecutor的使用 当我们需要实现并发.异步等操作时,通常都会使用到ThreadPoolTaskExecutor,现对其使用稍作总结. springboot 配置 提交任务 无返回值的任务使用exe

  • 基于JQuery 选择器使用说明介绍

    jQuery 元素选择器和属性选择器允许您通过标签名.属性名或内容对 HTML 元素进行选择. jQuery 元素选择器:jQuery 使用 CSS 选择器来选取 HTML 元素. $("p") 选取 <p> 元素. $("p.intro") 选取所有 class="intro" 的 <p> 元素. $("p#demo") 选取 id="demo" 的第一个 <p> 元素

  • 基于Bootstrap的标签页组件及bootstrap-tab使用说明

    bootstrap-tab bootstrap-tab组件是对原生的bootstrap-tab组件的封装,方便开发者更方便地使用,主要包含以下功能: tab页初始化 关闭tab页 新增tab 显示tab页 获取tab页ID 使用 Step1 :引入样式 <link rel="stylesheet" href="bootstrap/css/bootstrap.min.css" rel="external nofollow" > <

  • 基于Vue 2.0 监听文本框内容变化及ref的使用说明介绍

    如下所示: <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> <link rel="stylesheet" href="css/bootstrap.css" rel="external nofollow" >

  • 基于Vue2-Calendar改进的日历组件(含中文使用说明)

    一,前言 我是刚学Vue的菜鸟,在使用过程中需要用到日历控件,由于项目中原来是用jQuery写的,因此用了bootstarp的日历控件,但是配合Vue实在有点蛋疼,不够优雅-- 于是网上搜了好久找到了Vue2-Calendar,不用说,挺好用的,但是同时也发现这个组件有些问题,有些功能挺不符合我们的要求,于是着手改了一版 二,改进的功能 在Vue2-Calendar v2.2.4 版基础上作了优化. 1.改进原控件无法切换语言的BUG,支持 lang='zh-CN'和'en'. 2.日历面板增加

  • 基于PHP Web开发MVC框架的Smarty使用说明

    一.Smarty简明教程 1.安装演示 下载最新版本的Smarty-3.1.12,然后解压下载的文件.接下来演示Smarty自带的demo例子. (1)下载地址:http://www.smarty.net/download(2)在你的WEB服务器根目录下建立新目录,这里我在/var/www下创建yqting/目录,然后将解压之后的目录中的demo/和libs/目录复制到/var/www/yqting/目录下. (3)这里要特别注意demo/目录下cache/和template_c/两个目录,一定

  • 基于PostgreSQL pg_hba.conf 配置参数的使用说明

    pg_hba.conf 配置详解 该文件位于初始化安装的数据库目录下 编辑 pg_hba.conf 配置文件 postgres@clw-db1:/pgdata/9.6/poc/data> vi pg_hba.conf TYPE 参数设置 TYPE 表示主机类型,值可能为: 若为 `local` 表示是unix-domain的socket连接, 若为 `host` 是TCP/IP socket 若为 `hostssl` 是SSL加密的TCP/IP socket DATABASE 参数设置 DATA

  • 基于redis setIfAbsent的使用说明

    如果为空就set值,并返回1 如果存在(不为空)不进行操作,并返回0 很明显,比get和set要好.因为先判断get,再set的用法,有可能会重复set值. setIfAbsent 和 setnx setIfAbsent 是java中的方法 setnx 是 redis命令中的方法 setnx 例子 redis> SETNX mykey "Hello" (integer) 1 redis> SETNX mykey "World" (integer) 0 r

  • 基于json解析神器 jsonpath的使用说明

    如果项目需求是从某些复杂的json里面取值进行计算,用jsonpath+IK(ik-expression)来处理十分方便,jsonpath用来取json里面的值然后用IK自带的函数进行计算,如果是特殊的计算那就自定义IK方法搞定,配置化很方便. 下面简单介绍下jsonpath的使用方法,主要测试都在JsonPathDemo类里面: 下面是一个简单的java项目demo: 注意: 其中他的max,min,avg,stddev函数只能类似于如下处理: //正确写法 但是感觉很鸡肋 context.r

  • 基于SpringAop中JoinPoint对象的使用说明

    JoinPoint 对象 JoinPoint对象封装了SpringAop中切面方法的信息,在切面方法中添加JoinPoint参数,就可以获取到封装了该方法信息的JoinPoint对象. 常用api: 方法名 功能 Signature getSignature(); 获取封装了署名信息的对象,在该对象中可以获取到目标方法名,所属类的Class等信息 Object[] getArgs(); 获取传入目标方法的参数对象 Object getTarget(); 获取被代理的对象 Object getTh

随机推荐