详解Java并发包中线程池ThreadPoolExecutor

一、线程池简介

线程池的使用主要是解决两个问题:①当执行大量异步任务的时候线程池能够提供更好的性能,在不使用线程池时候,每当需要执行异步任务的时候直接new一个线程来运行的话,线程的创建和销毁都是需要开销的。而线程池中的线程是可复用的,不需要每次执行异步任务的时候重新创建和销毁线程;②线程池提供一种资源限制和管理的手段,比如可以限制线程的个数,动态的新增线程等等。

在下面的分析中,我们可以看到,线程池使用一个Integer的原子类型变量来记录线程池状态和线程池中的线程数量,通过线程池状态来控制任务的执行,每个工作线程Worker线程可以处理多个任务。

二、ThreadPoolExecutor类

2.1、ThreadPoolExecutor成员变量以含义

ThreadPoolExecutor继承了AbstractExecutorService,其中的成员变量ctl是一个Integer类型的原子变量,用来记录线程池的状态和线程池中的线程的个数。这里(Integer看做32位)ctl高三位表示线程池的状态,后面的29位表示线程池中的线程个数。如下所示是ThreadPoolExecutor源码中的成员变量

//(高3位)表示线程池状态,(低29位)表示线程池中线程的个数;
// 默认状态是RUNNING,线程池中线程个数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//表示具体平台下Integer的二进制位数-3后的剩余位数表示的数才是线程的个数;
//其中Integer.SIZE=32,-3之后的低29位表示的就是线程的个数了
private static final int COUNT_BITS = Integer.SIZE - 3;

//线程最大个数(低29位)00011111111111111111111111111111(1<<29-1)
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

//线程池状态(高3位表示线程池状态)
//111 00000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;

//000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

//001 00000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;

//010 00000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;

//011 00000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

//获取高3位(运行状态)==> c & 11100000000000000000000000000000
private static int runStateOf(int c)     { return c & ~CAPACITY; }

//获取低29位(线程个数)==> c &  00011111111111111111111111111111
private static int workerCountOf(int c)  { return c & CAPACITY; }

//计算原子变量ctl新值(运行状态和线程个数)
private static int ctlOf(int rs, int wc) { return rs | wc; }

下面我们简单解释一下上面的线程状态的含义:

①RUNNING:接受新任务并处理阻塞队列中的任务

②SHUTDOWN:拒绝新任务但是处理阻塞队列中的任务

③STOP:拒绝新任务并抛弃阻塞队列中的任务,同时会中断当前正在执行的任务

④TIDYING:所有任务执行完之后(包含阻塞队列中的任务)当前线程池中活跃的线程数量为0,将要调用terminated方法

⑥TERMINATED:终止状态。terminated方法调用之后的状态

2.2、ThreadPoolExecutor的参数以及实现原理

①corePoolSize:线程池核心现车个数

②workQueue:用于保存等待任务执行的任务的阻塞队列(比如基于数组的有界阻塞队列ArrayBlockingQueue、基于链表的无界阻塞队列LinkedBlockingQueue等等)

③maximumPoolSize:线程池最大线程数量

④ThreadFactory:创建线程的工厂

⑤RejectedExecutionHandler:拒绝策略,表示当队列已满并且线程数量达到线程池最大线程数量的时候对新提交的任务所采取的策略,主要有四种策略:AbortPolicy(抛出异常)、CallerRunsPolicy(只用调用者所在线程来运行该任务)、DiscardOldestPolicy(丢掉阻塞队列中最近的一个任务来处理当前提交的任务)、DiscardPolicy(不做处理,直接丢弃掉)

⑥keepAliveTime:存活时间,如果当前线程池中的数量比核心线程数量多,并且当前线程是闲置状态,该变量就是这些线程的最大生存时间

⑦TimeUnit:存活时间的时间单位。

根据上面的参数介绍,简单了解一下线程池的实现原理,以提交一个新任务为开始点,分析线程池的主要处理流程

2.3、关于一些线程池的使用类型

①newFixedThreadPool:创建一个核心线程个数和最大线程个数均为nThreads的线程池,并且阻塞队列长度为Integer.MAX_VALUE,keepAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲即回收。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

②newSingleThreadExecutor:创建一个核心线程个数和最大线程个数都为1 的线程池,并且阻塞队列长度为Integer.MAX_VALUE,keepAliveTime=0说明只要线程个数比核心线程个数多并且当前线程空闲即回收该线程。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

③newCachedThreadPoolExecutor:创建一个按需创建线程的线程池,初始线程个数为0,最多线程个数为Integer.MAX_VALUE,并且阻塞队列为同步队列(最多只有一个元素),keepAliveTime=60说明只要当前线程在60s内空闲则回收。这个类型的线程池的特点就是:加入同步队列的任务会被马上执行,同步队列中最多只有一个任务

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

2.4、ThreadPoolExecutor中的其他成员

//独占锁,用来控制新增工作线程Worker操作的原子性
private final ReentrantLock mainLock = new ReentrantLock();

//工作线程集合,Worker继承了AQS接口和Runnable接口,是具体处理任务的线程对象
//Worker实现AQS,并自己实现了简单不可重入独占锁,其中state=0表示当前锁未被获取状态,state=1表示锁被获取,
//state=-1表示Work创建时候的默认状态,创建时候设置state=-1是为了防止runWorker方法运行前被中断
private final HashSet<Worker> workers = new HashSet<Worker>();

//termination是该锁对应的条件队列,在线程调用awaitTermination时候用来存放阻塞的线程
private final Condition termination = mainLock.newCondition();

三、execute(Runnable command)方法实现

executor方法的作用是提交任务command到线程池执行,可以简单的按照下面的图进行理解,ThreadPoolExecutor的实现类似于一个生产者消费者模型,当用户添加任务到线程池中相当于生产者生产元素,workers工作线程则直接执行任务或者从任务队列中获取任务,相当于消费之消费元素。

public void execute(Runnable command) {
    //(1)首先检查任务是否为null,为null抛出异常,否则进行下面的步骤
    if (command == null)
        throw new NullPointerException();
    //(2)ctl值中包含了当前线程池的状态和线程池中的线程数量
    int c = ctl.get();
    //(3)workerCountOf方法是获取低29位,即获取当前线程池中的线程个数,如果小于corePoolSize,就开启新的线程运行
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //(4)如果线程池处理RUNNING状态,就添加任务到阻塞队列中
    if (isRunning(c) && workQueue.offer(command)) {
        //(4-1)二次检查,获取ctl值
        int recheck = ctl.get();
        //(4-2)如果当前线程池不是出于RUNNING状态,就从队列中删除任务,并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //(4-3)否则,如果线程池为空,就添加一个线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //(5)如果队列满,则新增线程,如果新增线程失败,就执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

我们在看一下上面代码的执行流程,按照标记的数字进行分析:

①步骤(3)判断当前线程池中的线程个数是否小于corePoolSize,如果小于核心线程数,会向workers里面新增一个核心线程执行任务。

②如果当前线程池中的线程数量大于核心线程数,就执行(4)。(4)首先判断当前线程池是否处于RUNNING状态,如果处于该状态,就添加任务到任务队列中,这里需要判断线程池的状态是因为线程池可能已经处于非RUNNING状态,而在非RUNNING状态下是需要抛弃新任务的。

③如果想任务队列中添加任务成功,需要进行二次校验,因为在添加任务到任务队列后,可能线程池的状态发生了变化,所以这里需要进行二次校验,如果当前线程池已经不是RUNNING状态了,需要将任务从任务队列中移除,然后执行拒绝策略;如果二次校验通过,则执行4-3代码重新判断当前线程池是否为空,如果线程池为空没有线程,那么就需要新创建一个线程。

④如果上面的步骤(4)创建添加任务失败,说明队列已满,那么(5)会尝试再开启新的线程执行任务(类比上图中的thread3和thread4,即不是核心线程的那些线程),如果当前线程池中的线程个数已经大于最大线程数maximumPoolSize,表示不能开启新的线程。这就属于线程池满并且任务队列满,就需要执行拒绝策略了。

下面我们在看看addWorker方法的实现

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //(6)检查队列是否只在必要时候为空
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        //(7)使用CAS增加线程个数
        for (;;) {
            //根据ctl值获得当前线程池中的线程数量
            int wc = workerCountOf(c);
            //(7-1)如果线程数量超出限制,返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //(7-2)CAS增加线程数量,同时只有一个线程可以成功
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // 重新读取ctl值
            //(7-3)CAS失败了,需要查看当前线程池状态是否发生变化,如果发生变化需要跳转到外层循环尝试重新获取线程池状态,否则内层循环重新进行CAS增加线程数量
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    //(8)执行到这里说明CAS增加新线程个数成功了,我们需要开始创建新的工作线程Worker
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //(8-1)创建新的worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            //(8-2)加独占锁,保证workers的同步,可能线程池中的多个线程调用了线程池的execute方法
            mainLock.lock();
            try {
                // (8-3)重新检查线程池状态,以免在获取锁之前调用shutdown方法改变线程池状态
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //(8-4)添加新任务
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //(8-6)添加新任务成功之后,启动任务
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

简单再分析说明一下上面的代码,addWorker方法主要分为两部分,第一部分是使用CAS线程安全的添加线程数量,第二部分则是创建新的线程并且将任务并发安全的添加到新的workers之中,然后启动线程执行。

①代码(6)中检查队列是否只在必要时候为空,只有线程池状态符合条件才能够进行下面的步骤,从(6)中的判断条件来看,下面的集中情况addWorker会直接返回false

( I )当前线程池状态为STOP,TIDYING或者TERMINATED ; (I I)当前线程池状态为SHUTDOWN并且已经有了第一个任务; (I I I)当前线程池状态为SHUTDOWN并且任务队列为空

②外层循环中判断条件通过之后,在内层循环中使用CAS增加线程数,当CAS成功就退出双重循环进行(8)步骤代码的执行,如果失败需要查看当前线程池的状态是否发生变化,如果发生变化需要进行外层循环重新判断线程池状态然后在进入内层循环重新进行CAS增加线程数,如果线程池状态没有发生变化但是上一次CAS失败就继续进行CAS尝试。

③执行到(8)代码处,表明当前已经成功增加 了线程数,但是还没有线程执行任务。ThreadPoolExecutor中使用全局独占锁mainLock来控制将新增的工作线程Worker线程安全的添加到工作者线程集合workers中。

④(8-2)获取了独占锁,但是在获取到锁之后,还需要进行重新检查线程池的状态,这是为了避免在获取全局独占锁之前其他线程调用了shutDown方法关闭了线程池。如果线程池已经关闭需要释放锁。否则将新增的线程添加到工作集合中,释放锁启动线程执行任务。

上面的addWorker方法最后几行中,会判断添加工作线程是否成功,如果失败,会执行addWorkerFailed方法,将任务从workers中移除,并且workerCount做-1操作。

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    //获取锁
    mainLock.lock();
    try {
      //如果worker不为null
      if (w != null)
          //workers移除worker
          workers.remove(w);
      //通过CAS操作,workerCount-1
      decrementWorkerCount();
      tryTerminate();
    } finally {
      //释放锁
      mainLock.unlock();
    }
}

四、工作线程Worker的执行

4.1、工作线程Worker类源码分析

上面查看addWorker方法在CAS更新线程数成功之后,下面就是创建新的Worker线程执行任务,所以我们这里先查看Worker类,下面是Worker类的源码,我们可以看出,Worker类继承了AQS并实现了Runnable接口,所以他既是一个自定义的同步组件,也是一个执行任务的线程类。下面我们分析Worker类的执行

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{

    /** 使用线程工厂创建的线程,执行任务 */
    final Thread thread;
    /** 初始化执行任务 */
    Runnable firstTask;
    /** 计数 */
    volatile long completedTasks;

    /**
     * 给出初始firstTask,线程创建工厂创建新的线程
     */
    Worker(Runnable firstTask) {
        setState(-1); // 防止在调用runWorker之前被中断
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this); //使用threadFactory创建线程
    }

    /** run方法实际上执行的是runWorker方法  */
    public void run() {
        runWorker(this);
    }

    // 关于同步状态(锁)
    //
    // 同步状态state=0表示锁未被获取
    // 同步状态state=1表示锁被获取

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    //下面都是重写AQS的方法,Worker为自定义的同步组件
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

在构造函数中我们可以看出,首先将同步状态state置为-1,而Worker这个同步组件的state有三个值,其中state=-1表示Work创建时候的默认状态,创建时候设置state=-1是为了防止runWorker方法运行前被中断前面说到过这个结论,这里置为-1是为了避免当前Worker在调用runWorker方法之前被中断(当其他线程调用线程池的shutDownNow时候,如果Worker的state>=0则会中断线程),设置为-1就不会被中断了。而Worker实现Runnable接口,那么需要重写run方法,在run方法中,我们可以看到,实际上执行的是runWorker方法,在runWorker方法中,会首先调用unlock方法,该方法会将state置为0,所以这个时候调用shutDownNow方法就会中断当前线程,而这个时候已经进入了runWork方法了,就不会在还没有执行runWorker方法的时候就中断线程。

4.2、runWorker方法的源码分析

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 这个时候调用unlock方法,将state置为0,就可以被中断了
    boolean completedAbruptly = true;
    try {
        //(10)如果当前任务为null,或者从任务队列中获取到的任务为null,就跳转到(11)处执行清理工作
        while (task != null || (task = getTask()) != null) {
            //task不为null,就需要线程执行任务,这个时候,需要获取工作线程内部持有的独占锁
            w.lock();
            /**如果线程池已被停止(STOP)(至少大于STOP状态),要确保线程都被中断
             * 如果状态不对,检查当前线程是否中断并清除中断状态,并且再次检查线程池状态是否大于STOP
             * 如果上述满足,检查该对象是否处于中断状态,不清除中断标记
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                //中断该对象
                wt.interrupt();
            try {
                //执行任务之前要做的事情
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run(); //执行任务
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //执行任务之后的方法
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                //更新当前已完成任务数量
                w.completedTasks++;
                //释放锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //执行清理工作:处理并退出当前worker
        processWorkerExit(w, completedAbruptly);
    }
}

我们梳理一下runWorker方法的执行流程

①首先先执行unlock方法,将Worker的state置为0,这样工作线程就可以被中断了(后续的操作如果线程池关闭就需要线程被中断)

②首先判断判断当前的任务(当前工作线程中的task,或者从任务队列中取出的task)是否为null,如果不为null就往下执行,为null就执行processWorkerExit方法。

③获取工作线程内部持有的独占锁(避免在执行任务期间,其他线程调用shutdown后正在执行的任务被中断,shutdown只会中断当前被阻塞挂起的没有执行任务的线程)

④然后执行beforeExecute()方法,该方法为扩展接口代码,表示在具体执行任务之前所做的一些事情,然后执行task.run()方法执行具体任务,执行完之后会调用afterExecute()方法,用以处理任务执行完毕之后的工作,也是一个扩展接口代码。

⑤更新当前线程池完成的任务数,并释放锁

4.3、执行清理工作的方法processWorkerExit

下面是方法processWorkerExit的源码,在下面的代码中

①首先(1-1)处统计线程池完成的任务个数,并且在此之前获取全局锁,然后更新当前的全局计数器,然后从工作线程集合中移除当前工作线程,完成清理工作。

②代码(1-2)调用了tryTerminate方法,在该方法中,判断了当前线程池状态是SHUTDOWN并且队列不为空或者当前线程池状态为STOP并且当前线程池中没有活动线程,则置线程池状态为TERMINATED。如果设置称为了TERMINATED状态,还需要调用全局条件变量termination的signalAll方法唤醒所有因为调用线程池的awaitTermination方法而被阻塞住的线程,使得线程池中的所有线程都停止,从而使得线程池为TERMINATED状态。

③代码(1-3)处判断当前线程池中的线程个数是否小于核心线程数,如果是,需要新增一个线程保证有足够的线程可以执行任务队列中的任务或者提交的任务。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    /*
    *completedAbruptly:是由runWorker传过来的参数,表示是否突然完成的意思
    *当在就是在执行任务过程当中出现异常,就会突然完成,传true
    *
    *如果是突然完成,需要通过CAS操作,更新workerCount(-1操作)
    *不是突然完成,则不需要-1,因为getTask方法当中已经-1(getTask方法中执行了decrementWorkerCount()方法)
    */
    if (completedAbruptly)
        decrementWorkerCount();
    //(1-1)在统计完成任务个数之前加上全局锁,然后统计线程池中完成的任务个数并更新全局计数器,并从工作集中删除当前worker
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); //获得全局锁
    try {
        completedTaskCount += w.completedTasks; //更新已完成的任务数量
        workers.remove(w); //将完成该任务的线程worker从工作线程集合中移除
    } finally {
        mainLock.unlock(); //释放锁
    }
    /**(1-2)
     * 这一个方法调用完成了下面的事情:
     * 判断如果当前线程池状态是SHUTDOWN并且工作队列为空,
     * 或者当前线程池状态是STOP并且当前线程池里面没有活动线程,
     * 则设置当前线程池状态为TERMINATED,如果设置成了TERMINATED状态,
     * 还需要调用条件变量termination的signAll方法激活所有因为调用线程池的awaitTermination方法而被阻塞的线程
     */
    tryTerminate();

    //(1-3)如果当前线程池中线程数小于核心线程,则增加核心线程数
    int c = ctl.get();
    //判断当前线程池的状态是否小于STOP(RUNNING或者SHUTDOWN)
    if (runStateLessThan(c, STOP)) {
        //如果任务忽然完成,执行后续的代码
        if (!completedAbruptly) {
            //allowCoreThreadTimeOut表示是否允许核心线程超时,默认为false
            //min这里当默认为allowCoreThreadTimeOut默认为false的时候,min置为coorPoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            //这里说明:如果允许核心线程超时,那么allowCoreThreadTimeOut可为true,那么min值为0,不需要维护核心线程了
            //如果min为0并且任务队列不为空
            if (min == 0 && ! workQueue.isEmpty())
                min = 1; //这里表示如果min为0,且队列不为空,那么至少需要一个核心线程存活来保证任务的执行
            //如果工作线程数大于min,表示当前线程数满足,直接返回
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

在tryTerminate方法中,我们简单说明了该方法的作用,下面是该方法的源码,可以看出源码实现上和上面所总结的功能是差不多的

final void tryTerminate() {
    for (;;) {
        //获取线程池状态
        int c = ctl.get();
        //如果线程池状态为RUNNING
        //或者状态大于TIDYING
        //或者状态==SHUTDOWN并未任务队列不为空
        //直接返回,不能调用terminated方法
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        //如果线程池中工作线程数不为0,需要中断线程
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        //获得线程池的全局锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //通过CAS操作,将线程池状态设置为TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //private static int ctlOf(int rs, int wc) { return rs | wc; }
                try {
                    //调用terminated方法
                    terminated();
                } finally {
                    //最终将线程状态设置为TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    //调用条件变量termination的signaAll方法唤醒所有因为
                    //调用线程池的awaitTermination方法而被阻塞的线程
                    //private final Condition termination = mainLock.newCondition();
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

五、补充(shutdown、shutdownNow、awaitTermination方法)

5.1、shutdown操作

我们在使用线程池的时候知道,调用shutdown方法之后线程池就不会再接受新的任务了,但是任务队列中的任务还是需要执行完的。调用该方法会立刻返回,并不是等到线程池的任务队列中的所有任务执行完毕在返回的。

public void shutdown() {
    //获得线程池的全局锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //进行权限检查
        checkShutdownAccess();

        //设置当前线程池的状态的SHUTDOWN,如果线程池状态已经是该状态就会直接返回,下面我们会分析这个方法的源码
        advanceRunState(SHUTDOWN);

        //设置中断 标志
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    //尝试将状态变为TERMINATED,上面已经分析过该方法的源码
    tryTerminate();
}

该方法的源码比较简短,首先检查了安全管理器,是查看当前调用shutdown命令的线程是否有关闭线程的权限,如果有权限还需要看调用线程是否有中断工作线程的权限,如果没有权限将会抛出SecurityException异常或者空指针异常。下面我们查看一下advanceRunState 方法的源码。

private void advanceRunState(int targetState) {
    for (;;) {
        //下面的方法执行的就是:
        //首先获取线程的ctl值,然后判断当前线程池的状态如果已经是SHUTDOWN,那么if条件第一个为真就直接返回
        //如果不是SHUTDOWN状态,就需要CAS的设置当前状态为SHUTDOWN
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            //private static int ctlOf(int rs, int wc) { return rs | wc; }
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

我们可以看出advanceRunState 方法实际上就是判断当前线程池的状态是否为SHUTDWON,如果是那么就返回,否则就需要设置当前状态为SHUTDOWN。

我们再来看看shutdown方法中调用线程中断的方法interruptIdleWorkers源码

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            //如果工作线程没有被中断,并且没有正在运行设置中断标志
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    //需要中断当前线程
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

上面的代码中,需要设置所有空闲线程的中断标志。首先获取线程池的全局锁,同时只有一个线程可以调用shutdown方法设置中断标志。然后尝试获取工作线程Worker自己的锁,获取成功则可以设置中断标志(这是由于正在执行任务的线程需要获取自己的锁,并且不可重入,所以正在执行的任务没有被中断),这里要中断的那些线程是阻塞到getTask()方法并尝试从任务队列中获取任务的线程即空闲线程。

5.2、shutdownNow操作

在使用线程池的时候,如果我们调用了shutdownNow方法,线程池不仅不会再接受新的任务,还会将任务队列中的任务丢弃,正在执行的任务也会被中断,然后立刻返回该方法,不会等待激活的任务完成,返回值为当前任务队列中被丢弃的任务列表

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess(); //还是进行权限检查
        advanceRunState(STOP); //设置线程池状态台STOP
        interruptWorkers(); //中断所有线程
        tasks = drainQueue(); //将任务队列中的任务移动到task中
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks; //返回tasks
}

从上面的代码中,我们可以可以发现,shutdownNow方法也是首先需要检查调用该方法的线程的权限,之后不同于shutdown方法之处在于需要即刻设置当前线程池状态为STOP,然后中断所有线程(空闲线程+正在执行任务的线程),移除任务队列中的任务

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) //不需要判断当前线程是否在执行任务(即不需要调用w.tryLock方法),中断所有线程
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

5.3、awaitTermination操作

当线程调用该方法之后,会阻塞调用者线程,直到线程池状态为TERMINATED状态才会返回,或者等到超时时间到之后会返回,下面是该方法的源码。

//调用该方法之后,会阻塞调用者线程,直到线程池状态为TERMINATED状态才会返回,或者等到超时时间到之后会返回
public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //阻塞当前线程,(获取了Worker自己的锁),那么当前线程就不会再执行任务(因为获取不到锁)
        for (;;) {
            //当前线程池状态为TERMINATED状态,会返回true
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            //超时时间到返回false
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

在上面的代码中,调用者线程需要首先获取线程Worker自己的独占锁,然后在循环判断当前线程池是否已经是TERMINATED状态,如果是则直接返回,否则说明当前线程池中还有线程正在执行任务,这时候需要查看当前设置的超时时间是否小于0,小于0说明不需要等待就直接返回,如果大于0就需要调用条件变量termination的awaitNanos方法等待设置的时间,并在这段时间之内等待线程池的状态变为TERMINATED。

我们在前面说到清理线程池的方法processWorkerExit的时候,需要调用tryTerminated方法,在该方法中会查看当前线程池状态是否为TERMINATED,如果是该状态也会调用termination.signalAll()方法唤醒所有线程池中因调用awaitTermination而被阻塞住的线程。

如果是设置了超时时间,那么termination的awaitNanos方法也会返回,这时候需要重新检查线程池状态是否为TERMINATED,如果是则返回,不是就继续阻塞自己。

以上就是Java并发包中线程池ThreadPoolExecutor原理探究的详细内容,更多关于Java 并发包 线程池 ThreadPoolExecutor的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java自带定时任务ScheduledThreadPoolExecutor实现定时器和延时加载功能

    java.util.concurrent.ScheduledThreadPoolExecutor 是JDK1 .6之后自带的包,功能强大,能实现定时器和延时加载的功能 各类功能和处理方面优于Timer 1.定时器: ScheduledThreadPoolExecutor  有个scheduleAtFixedRate(command, initialDelay, period, unit) ;方法 command: 执行的线程(可自己New一个) initialDelay:初始化执行的延时时间 p

  • java 定时器线程池(ScheduledThreadPoolExecutor)的实现

    前言 定时器线程池提供了定时执行任务的能力,即可以延迟执行,可以周期性执行.但定时器线程池也还是线程池,最底层实现还是ThreadPoolExecutor,可以参考我的另外一篇文章多线程–精通ThreadPoolExecutor. 特点说明 1.构造函数 public ScheduledThreadPoolExecutor(int corePoolSize) { // 对于其他几个参数在ThreadPoolExecutor中都已经详细分析过了,所以这里,将不再展开 // 这里我们可以看到调用基类

  • java中ThreadPoolExecutor常识汇总

    线程池技术在并发时经常会使用到,java中的线程池的使用是通过调用ThreadPoolExecutor来实现的.ThreadPoolExecutor提供了四个构造函数,最后都会归结于下面这个构造方法: // 七个参数的构造函数 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,

  • java中Executor,ExecutorService,ThreadPoolExecutor详解

    java中Executor,ExecutorService,ThreadPoolExecutor详解 1.Excutor 源码非常简单,只有一个execute(Runnable command)回调接口 public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thre

  • Java线程池ThreadPoolExecutor原理及使用实例

    引导 要求:线程资源必须通过线程池提供,不允许在应用自行显式创建线程: 说明:使用线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题.如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗内存或者"过度切换"的问题. 线程池介绍线程池概述   线程池,顾名思义是一个放着线程的池子,这个池子的线程主要是用来执行任务的.当用户提交任务时,线程池会创建线程去执行任务,若任务超过了核心线程数的时候,会在一个任务队列里进行排队等待,这个详细流程,我们会后面细

  • java ThreadPoolExecutor 并发调用实例详解

    java ThreadPoolExecutor 并发调用实例详解 概述 通常为了提供任务的处理速度,会使用一些并发模型,ThreadPoolExecutor中的invokeAll便是一种. 代码 package test.current; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; import java.util

  • Java ThreadPoolExecutor 线程池的使用介绍

    Executors Executors 是一个Java中的工具类. 提供工厂方法来创建不同类型的线程池. 从上图中也可以看出, Executors的创建线程池的方法, 创建出来的线程池都实现了 ExecutorService接口. 常用方法有以下几个: newFixedThreadPool(int Threads): 创建固定数目线程的线程池, 超出的线程会在队列中等待. newCachedThreadPool(): 创建一个可缓存线程池, 如果线程池长度超过处理需要, 可灵活回收空闲线程(60

  • java ThreadPoolExecutor使用方法简单介绍

    java  ThreadPoolExecutor 前言: 在项目中如果使用发短信这个功能,一般会把发短信这个动作变成异步的,因为大部分情况下,短信到底是发送成功或者失败,都不能影响主流程.当然像发送MQ消息等操作也是可以封装成异步操作的. 使用基本的New Thread 如果想一个操作变成异步的,可以直接new thread,然后在run方法中实现业务操作即可.例如: new Thread(new Runnable() { public void run() { //发短信.发MQ消息等 } }

  • Java ThreadPoolExecutor的参数深入理解

    Java ThreadPoolExecutor的参数深入理解 一.使用Executors创建线程池    之前创建线程的时候都是用的Executors的newFixedThreadPool(),newSingleThreadExecutor(),newCachedThreadPool()这三个方法.当然Executors也是用不同的参数去new ThreadPoolExecutor     1. newFixedThreadPool() 创建线程数固定大小的线程池. 由于使用了LinkedBlo

  • 详解Java并发包中线程池ThreadPoolExecutor

    一.线程池简介 线程池的使用主要是解决两个问题:①当执行大量异步任务的时候线程池能够提供更好的性能,在不使用线程池时候,每当需要执行异步任务的时候直接new一个线程来运行的话,线程的创建和销毁都是需要开销的.而线程池中的线程是可复用的,不需要每次执行异步任务的时候重新创建和销毁线程:②线程池提供一种资源限制和管理的手段,比如可以限制线程的个数,动态的新增线程等等. 在下面的分析中,我们可以看到,线程池使用一个Integer的原子类型变量来记录线程池状态和线程池中的线程数量,通过线程池状态来控制任

  • 详解Java多线程编程中线程的启动、中断或终止操作

    线程启动: 1.start() 和 run()的区别说明 start() : 它的作用是启动一个新线程,新线程会执行相应的run()方法.start()不能被重复调用. run() : run()就和普通的成员方法一样,可以被重复调用.单独调用run()的话,会在当前线程中执行run(),而并不会启动新线程! 下面以代码来进行说明. class MyThread extends Thread{ public void run(){ ... } }; MyThread mythread = new

  • 详解Java并发包基石AQS

    一.概述 AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的.当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器. 本章我们就一起探究下这个神奇的东东,并对其实现原理进行剖析理解 二.基本实现原理 AQS使用一个int成员变量来表示同步状态,通

  • 详解java各种集合的线程安全

    线程安全 首先要明白线程的工作原理,jvm有一个main memory,而每个线程有自己的working memory,一个线程对一个variable进行操作时,都要在自己的working memory里面建立一个copy,操作完之后再写入main memory.多个线程同时操作同一个variable,就可能会出现不可预知的结果.根据上面的解释,很容易想出相应的scenario. 而用synchronized的关键是建立一个monitor,这个monitor可以是要修改的variable也可以其

  • 详解Java删除Map中元素java.util.ConcurrentModificationException”异常解决

    今天在使用map并需要根据某些条件删除map元素时,自然而然想到调用Map中的remove(Object key)函数进行删除,代码如下: //遍历map,如果key<5,那么就删除此元素. Map<Integer, Integer> users = new LinkedHashMap<Integer, Integer>(); for (Map.Entry<Integer,Integer> entry : users.entrySet()){ for (int i

  • 详解java面试题中的i++和++i

    代码如下所示: public class TestPlusPlus{ public static void main(String[] args){ int k = addAfterReturn(10); System.out.println(k); //输出 10 int k1 = addbeforeReturn(10); System.out.println(k1); //输出11 } public static int addbeforeReturn(int i){ return ++i;

  • 详解Java关于JDK中时间日期的API

    目录 JDK 8中关于日期和时间的API测试 JDK 8 之前日期和时间的API测试 //1.System类中的currentTimeMillis() public void test1(){ long time = System.currentTimeMillis(); //返回当前时间与1970年1月1日0时0分0秒之间以毫秒为时间为单位的时间差. //称为时间戳 System.out.println(time);//1628416744054 } /* java.util.Date类 |-

  • 详解Java多线程编程中CountDownLatch阻塞线程的方法

    直译过来就是倒计数(CountDown)门闩(Latch).倒计数不用说,门闩的意思顾名思义就是阻止前进.在这里就是指 CountDownLatch.await() 方法在倒计数为0之前会阻塞当前线程. CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待. CountDownLatch 的作用和 Thread.join() 方法类似,可用于一组线程和另外一组线程的协作.例如,主线程在做一项工作之前需要一系列的准备工作,只有这些准备工

  • 详解Java在redis中进行对象的缓存

    Java在redis中进行对象的缓存一般有两种方法,这里介绍序列化的方法,个人感觉比较方便,不需要转来转去. 一.首先,在存储的对象上实现序列化的接口 package com.cy.example.entity.system; import java.util.List; import com.baomidou.mybatisplus.annotations.TableField; import com.baomidou.mybatisplus.annotations.TableName; im

  • java多线程CountDownLatch与线程池ThreadPoolExecutor/ExecutorService案例

    1.CountDownLatch: 一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行. 2.ThreadPoolExecutor/ExecutorService: 线程池,使用线程池可以复用线程,降低频繁创建线程造成的性能消耗,同时对线程的创建.启动.停止.销毁等操作更简便. 3.使用场景举例: 年末公司组织团建,要求每一位员工周六上午8点到公司门口集合,统一乘坐公司所租大巴前往目的地. 在这个案例中,公司作为主线程,员工作为子线程. 4.代码示例: package

随机推荐