ThreadPoolExecutor参数含义及源码执行流程详解

目录
  • 背景
  • 典型回答
  • 考点分析
  • 知识拓展
    • execute() VS submit()
    • 线程池的拒绝策略
    • 自定义拒绝策略
    • ThreadPoolExecutor 扩展
  • 小结

背景

线程池是为了避免线程频繁的创建和销毁带来的性能消耗,而建立的一种池化技术,它是把已创建的线程放入“池”中,当有任务来临时就可以重用已有的线程,无需等待创建的过程,这样就可以有效提高程序的响应速度。但如果要说线程池的话一定离不开 ThreadPoolExecutor ,在阿里巴巴的《Java 开发手册》中是这样规定线程池的:

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的读者更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors 返回的线程池对象的弊端如下:

FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。CachedThreadPool 和 ScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

其实当我们去看 Executors 的源码会发现,Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor() 和 Executors.newCachedThreadPool() 等方法的底层都是通过 ThreadPoolExecutor 实现的,所以本课时我们就重点来了解一下 ThreadPoolExecutor 的相关知识,比如它有哪些核心的参数?它是如何工作的?

典型回答

ThreadPoolExecutor 的核心参数指的是它在构建时需要传递的参数,其构造方法如下所示:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        // maximumPoolSize 必须大于 0,且必须大于 corePoolSize
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

第 1 个参数:corePoolSize 表示线程池的常驻核心线程数。如果设置为 0,则表示在没有任何任务时,销毁线程池;如果大于 0,即使没有任务时也会保证线程池的线程数量等于此值。但需要注意,此值如果设置的比较小,则会频繁的创建和销毁线程(创建和销毁的原因会在本课时的下半部分讲到);如果设置的比较大,则会浪费系统资源,所以开发者需要根据自己的实际业务来调整此值。

第 2 个参数:maximumPoolSize 表示线程池在任务最多时,最大可以创建的线程数。官方规定此值必须大于 0,也必须大于等于 corePoolSize,此值只有在任务比较多,且不能存放在任务队列时,才会用到。

第 3 个参数:keepAliveTime 表示线程的存活时间,当线程池空闲时并且超过了此时间,多余的线程就会销毁,直到线程池中的线程数量销毁的等于 corePoolSize 为止,如果 maximumPoolSize 等于 corePoolSize,那么线程池在空闲的时候也不会销毁任何线程。

第 4 个参数:unit 表示存活时间的单位,它是配合 keepAliveTime 参数共同使用的。

第 5 个参数:workQueue 表示线程池执行的任务队列,当线程池的所有线程都在处理任务时,如果来了新任务就会缓存到此任务队列中排队等待执行。

第 6 个参数:threadFactory 表示线程的创建工厂,此参数一般用的比较少,我们通常在创建线程池时不指定此参数,它会使用默认的线程创建工厂的方法来创建线程,源代码如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    // Executors.defaultThreadFactory() 为默认的线程创建工厂
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
public static ThreadFactory defaultThreadFactory() {
    return new DefaultThreadFactory();
}
// 默认的线程创建工厂,需要实现 ThreadFactory 接口
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }
    // 创建线程
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false); // 创建一个非守护线程
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY); // 线程优先级设置为默认值
        return t;
    }
}

我们也可以自定义一个线程工厂,通过实现 ThreadFactory 接口来完成,这样就可以自定义线程的名称或线程执行的优先级了。

第 7 个参数:RejectedExecutionHandler 表示指定线程池的拒绝策略,当线程池的任务已经在缓存队列 workQueue 中存储满了之后,并且不能创建新的线程来执行此任务时,就会用到此拒绝策略,它属于一种限流保护的机制。

线程池的工作流程要从它的执行方法 execute() 说起,源码如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 当前工作的线程数小于核心线程数
    if (workerCountOf(c) < corePoolSize) {
        // 创建新的线程执行此任务
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 检查线程池是否处于运行状态,如果是则把任务添加到队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再出检查线程池是否处于运行状态,防止在第一次校验通过后线程池关闭
        // 如果是非运行状态,则将刚加入队列的任务移除
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果线程池的线程数为 0 时(当 corePoolSize 设置为 0 时会发生)
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false); // 新建线程执行任务
    }
    // 核心线程都在忙且队列都已爆满,尝试新启动一个线程执行失败
    else if (!addWorker(command, false))
        // 执行拒绝策略
        reject(command);
}

其中 addWorker(Runnable firstTask, boolean core) 方法的参数说明如下:

  • firstTask,线程应首先运行的任务,如果没有则可以设置为 null;
  • core,判断是否可以创建线程的阀值(最大值),如果等于 true 则表示使用 corePoolSize 作为阀值,false 则表示使用maximumPoolSize 作为阀值。

考点分析

线程池任务执行的主要流程,可以参考以下流程图:

与 ThreadPoolExecutor 相关的面试题还有以下几个:

  • ThreadPoolExecutor 的执行方法有几种?它们有什么区别?
  • 什么是线程的拒绝策略?
  • 拒绝策略的分类有哪些?
  • 如何自定义拒绝策略?
  • ThreadPoolExecutor 能不能实现扩展?如何实现扩展?

知识拓展

execute() VS submit()

execute() 和 submit() 都是用来执行线程池任务的,它们最主要的区别是,submit() 方法可以接收线程池执行的返回值,而 execute() 不能接收返回值。

来看两个方法的具体使用:

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10, 10L,
        TimeUnit.SECONDS, new LinkedBlockingQueue(20));
// execute 使用
executor.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println("Hello, execute.");
    }
});
// submit 使用
Future<String> future = executor.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {
        System.out.println("Hello, submit.");
        return "Success";
    }
});
System.out.println(future.get());

以上程序执行结果如下:

Hello, submit.
Hello, execute.
Success

从以上结果可以看出 submit() 方法可以配合 Futrue 来接收线程执行的返回值。它们的另一个区别是 execute() 方法属于 Executor 接口的方法,而 submit() 方法则是属于 ExecutorService 接口的方法,它们的继承关系如下图所示:

线程池的拒绝策略

当线程池中的任务队列已经被存满,再有任务添加时会先判断当前线程池中的线程数是否大于等于线程池的最大值,如果是,则会触发线程池的拒绝策略。

Java 自带的拒绝策略有 4 种:

  • AbortPolicy,终止策略,线程池会抛出异常并终止执行,它是默认的拒绝策略;
  • CallerRunsPolicy,把任务交给当前线程来执行;
  • DiscardPolicy,忽略此任务(最新的任务);
  • DiscardOldestPolicy,忽略最早的任务(最先加入队列的任务)。

例如,我们来演示一个 AbortPolicy 的拒绝策略,代码如下:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 10,
        TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
        new ThreadPoolExecutor.AbortPolicy()); // 添加 AbortPolicy 拒绝策略
for (int i = 0; i < 6; i++) {
    executor.execute(() -> {
        System.out.println(Thread.currentThread().getName());
    });
}

以上程序的执行结果:

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.lagou.interview.ThreadPoolExample$$Lambda$1/1096979270@448139f0 rejected from java.util.concurrent.ThreadPoolExecutor@7cca494b[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]
 at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
 at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
 at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
 at com.lagou.interview.ThreadPoolExample.rejected(ThreadPoolExample.java:35)
 at com.lagou.interview.ThreadPoolExample.main(ThreadPoolExample.java:26)

可以看出当第 6 个任务来的时候,线程池则执行了AbortPolicy 拒绝策略,抛出了异常。因为队列最多存储 2 个任务,最大可以创建 3 个线程来执行任务(2+3=5),所以当第 6 个任务来的时候,此线程池就“忙”不过来了。

自定义拒绝策略

自定义拒绝策略只需要新建一个 RejectedExecutionHandler 对象,然后重写它的 rejectedExecution() 方法即可,如下代码所示:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 10,
        TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
        new RejectedExecutionHandler() {  // 添加自定义拒绝策略
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // 业务处理方法
                System.out.println("执行自定义拒绝策略");
            }
        });
for (int i = 0; i < 6; i++) {
    executor.execute(() -> {
        System.out.println(Thread.currentThread().getName());
    });
}

以上代码执行的结果如下:

执行自定义拒绝策略
pool-1-thread-2
pool-1-thread-3
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2

可以看出线程池执行了自定义的拒绝策略,我们可以在 rejectedExecution 中添加自己业务处理的代码。

ThreadPoolExecutor 扩展

ThreadPoolExecutor 的扩展主要是通过重写它的 beforeExecute() 和 afterExecute() 方法实现的,我们可以在扩展方法中添加日志或者实现数据统计,比如统计线程的执行时间,如下代码所示:

public class ThreadPoolExtend {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 线程池扩展调用
        MyThreadPoolExecutor executor = new MyThreadPoolExecutor(2, 4, 10,
                TimeUnit.SECONDS, new LinkedBlockingQueue());
        for (int i = 0; i < 3; i++) {
            executor.execute(() -> {
                Thread.currentThread().getName();
            });
        }
    }
   /**
     * 线程池扩展
     */
    static class MyThreadPoolExecutor extends ThreadPoolExecutor {
        // 保存线程执行开始时间
        private final ThreadLocal<Long> localTime = new ThreadLocal<>();
        public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
        /**
         * 开始执行之前
         * @param t 线程
         * @param r 任务
         */
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            Long sTime = System.nanoTime(); // 开始时间 (单位:纳秒)
            localTime.set(sTime);
            System.out.println(String.format("%s | before | time=%s",
                    t.getName(), sTime));
            super.beforeExecute(t, r);
        }
        /**
         * 执行完成之后
         * @param r 任务
         * @param t 抛出的异常
         */
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            Long eTime = System.nanoTime(); // 结束时间 (单位:纳秒)
            Long totalTime = eTime - localTime.get(); // 执行总时间
            System.out.println(String.format("%s | after | time=%s | 耗时:%s 毫秒",
                    Thread.currentThread().getName(), eTime, (totalTime / 1000000.0)));
            super.afterExecute(r, t);
        }
    }
}

以上程序的执行结果如下所示:

pool-1-thread-1 | before | time=4570298843700
pool-1-thread-2 | before | time=4570298840000
pool-1-thread-1 | after | time=4570327059500 | 耗时:28.2158 毫秒
pool-1-thread-2 | after | time=4570327138100 | 耗时:28.2981 毫秒
pool-1-thread-1 | before | time=4570328467800
pool-1-thread-1 | after | time=4570328636800 | 耗时:0.169 毫秒

小结

最后我们总结一下:线程池的使用必须要通过 ThreadPoolExecutor 的方式来创建,这样才可以更加明确线程池的运行规则,规避资源耗尽的风险。同时,也介绍了 ThreadPoolExecutor 的七大核心参数,包括核心线程数和最大线程数之间的区别,当线程池的任务队列没有可用空间且线程池的线程数量已经达到了最大线程数时,则会执行拒绝策略,Java 自动的拒绝策略有 4 种,用户也可以通过重写 rejectedExecution() 来自定义拒绝策略,我们还可以通过重写 beforeExecute() 和 afterExecute() 来实现 ThreadPoolExecutor 的扩展功能。

以上就是ThreadPoolExecutor参数含义及源码执行流程详解的详细内容,更多关于ThreadPoolExecutor执行流程的资料请关注我们其它相关文章!

(0)

相关推荐

  • java ThreadPoolExecutor线程池拒绝策略避坑

    目录 1.场景 2. 原因分析 3.总结 4.思考 1.场景 线程池使用DiscardOldestPolicy拒绝策略,阻塞队列使用ArrayBlockingQueue,发现在某些情形下对于得到的Future,调用get()方法当前线程会一直阻塞. 为了便于理解,将实际情景抽象为下面的代码: ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 1, 1, 1, TimeUnit.SECONDS, new ArrayBlo

  • java高并发ThreadPoolExecutor类解析线程池执行流程

    目录 摘要 核心逻辑概述 execute(Runnable)方法 addWorker(Runnable, boolean)方法 addWorkerFailed(Worker)方法 拒绝策略 摘要 ThreadPoolExecutor是Java线程池中最核心的类之一,它能够保证线程池按照正常的业务逻辑执行任务,并通过原子方式更新线程池每个阶段的状态. 今天,我们通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程,小伙伴们最好是打开IDEA,按照冰河说的步骤,调试下Th

  • python3线程池ThreadPoolExecutor处理csv文件数据

    目录 背景 知识点 拓展 库 流程 实现代码 解释 背景 由于不同乙方对服务商业务接口字段理解不一致,导致线上上千万数据量数据存在问题,为了修复数据,通过 Python 脚本进行修改 知识点 Python3.线程池.pymysql.CSV 文件操作.requests 拓展 当我们程序在使用到线程.进程或协程的时候,以下三个知识点可以先做个基本认知 CPU 密集型.IO 密集型.GIL 全局解释器锁 库 pip3 install requests pip3 install pymysql 流程 实

  • java高并发ScheduledThreadPoolExecutor与Timer区别

    目录 正文 二者的区别 线程角度 系统时间敏感度 是否捕获异常 任务是否具备优先级 是否支持对任务排序 能否获取返回的结果 二者简单的示例 Timer类简单示例 ScheduledThreadPoolExecutor类简单示例 正文 JDK 1.5开始提供ScheduledThreadPoolExecutor类,ScheduledThreadPoolExecutor类继承ThreadPoolExecutor类重用线程池实现了任务的周期性调度功能.在JDK 1.5之前,实现任务的周期性调度主要使用

  • Java 线程池ThreadPoolExecutor源码解析

    目录 引导语 1.整体架构图 1.1.类结构 1.2.类注释 1.3.ThreadPoolExecutor重要属性 2.线程池的任务提交 3.线程执行完任务之后都在干啥 4.总结 引导语 线程池我们在工作中经常会用到.在请求量大时,使用线程池,可以充分利用机器资源,增加请求的处理速度,本章节我们就和大家一起来学习线程池. 本章的顺序,先说源码,弄懂原理,接着看一看面试题,最后看看实际工作中是如何运用线程池的. 1.整体架构图 我们画了线程池的整体图,如下: 本小节主要就按照这个图来进行 Thre

  • java线程池ThreadPoolExecutor的八种拒绝策略示例详解

    目录 池化设计思想 线程池触发拒绝策略的时机 JDK内置4种线程池拒绝策略 拒绝策略接口定义 AbortPolicy(中止策略) DiscardPolicy(丢弃策略) DiscardOldestPolicy(弃老策略) 第三方实现的拒绝策略 Dubbo 中的线程拒绝策略 Netty 中的线程池拒绝策略 ActiveMQ 中的线程池拒绝策略 PinPoint 中的线程池拒绝策略 谈到 Java 的线程池最熟悉的莫过于 ExecutorService 接口了,jdk1.5 新增的 java.uti

  • ThreadPoolExecutor参数含义及源码执行流程详解

    目录 背景 典型回答 考点分析 知识拓展 execute() VS submit() 线程池的拒绝策略 自定义拒绝策略 ThreadPoolExecutor 扩展 小结 背景 线程池是为了避免线程频繁的创建和销毁带来的性能消耗,而建立的一种池化技术,它是把已创建的线程放入“池”中,当有任务来临时就可以重用已有的线程,无需等待创建的过程,这样就可以有效提高程序的响应速度.但如果要说线程池的话一定离不开 ThreadPoolExecutor ,在阿里巴巴的<Java 开发手册>中是这样规定线程池的

  • Django CBV模型源码运行流程详解

    在view文件中编写一个类,并配置好路由 class Test(View): def get(self, request, *args, **kwargs): return HttpResponse('cbv_get') def post(self, request, *args, **kwargs): return HttpResponse('cbv_post') url(r'test/',views.Test.as_view()), 通过as_view()进入查看源码 实际上views.Te

  • Django DRF APIView源码运行流程详解

    首先写一个简单的drf接口 from rest_framework.views import APIView from rest_framework.response import Response # 基于drf写接口,cbv class DrfTest(APIView): def get(self, request,*args,**kwargs): print(type(request._request)) print(type(request)) print(request.POST) p

  • Gradle 依赖切换源码实践示例详解

    目录 引言 1.一般的修改办法 2.通过 Gradle 脚本动态修改依赖 2.1 配置文件和工作流程抽象 2.2 为项目动态添加子工程 2.3 使用子工程替换依赖 2.4 注意事项 总结 引言 最近,因为开发的时候经改动依赖的库,所以,我想对 Gradle 脚本做一个调整,用来动态地将依赖替换为源码.这里以 android-mvvm-and-architecture 这个工程为例.该工程以依赖的形式引用了我的另一个工程 AndroidUtils.在之前,当我需要对 AndroidUtils 这个

  • vue从使用到源码实现教程详解

    搭建环境 项目github地址 项目中涉及了json-server模拟get请求,用了vue-router: 关于Vue生命周期以及vue-router钩子函数详解 生命周期 1.0版本 1.哪些生命周期接口 init Created beforeCompile Compiled Ready Attatched Detached beforeDestory destoryed 2.执行顺序 1. 不具有keep-alive 进入: init->create->beforeCompile->

  • Java struts2请求源码分析案例详解

    Struts2是Struts社区和WebWork社区的共同成果,我们甚至可以说,Struts2是WebWork的升级版,他采用的正是WebWork的核心,所以,Struts2并不是一个不成熟的产品,相反,构建在WebWork基础之上的Struts2是一个运行稳定.性能优异.设计成熟的WEB框架. 我这里的struts2源码是从官网下载的一个最新的struts-2.3.15.1-src.zip,将其解压即可.里面的目录页文件非常的多,我们只需要定位到struts-2.3.15.1\src\core

  • Java源码解析之详解ReentrantLock

    ReentrantLock ReentrantLock是一种可重入的互斥锁,它的行为和作用与关键字synchronized有些类似,在并发场景下可以让多个线程按照一定的顺序访问同一资源.相比synchronized,ReentrantLock多了可扩展的能力,比如我们可以创建一个名为MyReentrantLock的类继承ReentrantLock,并重写部分方法使其更加高效. 当一个线程调用ReentrantLock.lock()方法时,如果ReentrantLock没有被其他线程持有,且不存在

  • Android 网络html源码查看器详解及实例

    Android 网络html源码查看器详解及实例 IO字节流的数据传输了解 Handler的基本使用 1.作品展示 2.需要掌握的知识 FileInputStream,FIleOutputStream,BufferInputStream,BufferOutStream的读写使用与区别 //进行流的读写 byte[] buffer = new byte[1024 * 8]; //创建一个写到内存的字节数组输出流 ByteArrayOutputStream byteArrayOutputStream

  • Java源码解析之详解ImmutableMap

    一.案例场景 遇到过这样的场景,在定义一个static修饰的Map时,使用了大量的put()方法赋值,就类似这样-- public static final Map<String,String> dayMap= new HashMap<>(); static { dayMap.put("Monday","今天上英语课"); dayMap.put("Tuesday","今天上语文课"); dayMap.p

  • SpringMvc框架的简介与执行流程详解

    目录 一.SpringMvc框架简介 1.Mvc设计理念 2.SpringMvc简介 二.SpringMvc执行流程 1.流程图解 2.步骤描述 3.核心组件 三.整合Spring框架配置 1.spring-mvc配置 2.Web.xml配置 3.测试接口 4.常用注解说明 四.常见参数映射 1.普通映射 2.指定参数名 3.数组参数 4.Map参数 5.包装参数 6.Rest风格参数 五.源代码地址 一.SpringMvc框架简介 1.Mvc设计理念 M:代表模型Model 模型就是数据,应用

随机推荐