深入理解Java编程线程池的实现原理

在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:

如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?

在Java中可以通过线程池来达到这样的效果。今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,然后再讲述它的实现原理,接着给出了它的使用示例,最后讨论了一下如何合理配置线程池的大小。

以下是本文的目录大纲:

一.Java中的ThreadPoolExecutor类

二.深入剖析线程池实现原理

三.使用示例

四.如何合理配置线程池的大小

若有不正之处请多多谅解,并欢迎批评指正。

一.Java中的ThreadPoolExecutor类

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。

在ThreadPoolExecutor类中提供了四个构造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
  .....
  public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
      BlockingQueue<Runnable> workQueue);

  public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
      BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);

  public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
      BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

  public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
    BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
  ...
}

从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

下面解释下一下构造器中各个参数的含义:

corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

TimeUnit.DAYS;        //天
TimeUnit.HOURS;       //小时
TimeUnit.MINUTES;      //分钟
TimeUnit.SECONDS;      //秒
TimeUnit.MILLISECONDS;   //毫秒
TimeUnit.MICROSECONDS;   //微妙
TimeUnit.NANOSECONDS;    //纳秒

workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;

ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

threadFactory:线程工厂,主要用来创建线程;
handler:表示当拒绝处理任务时的策略,有以下四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

具体参数的配置与线程池的关系将在下一节讲述。

从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现:

public abstract class AbstractExecutorService implements ExecutorService {

  protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
  public Future<?> submit(Runnable task) {};
  public <T> Future<T> submit(Runnable task, T result) { };
  public <T> Future<T> submit(Callable<T> task) { };
  private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
              boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
  };
  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException {
  };
  public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
              long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
  };
  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
  };
  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                     long timeout, TimeUnit unit)
    throws InterruptedException {
  };
}

AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。

我们接着看ExecutorService接口的实现:

public interface ExecutorService extends Executor {

  void shutdown();
  boolean isShutdown();
  boolean isTerminated();
  boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException;
  <T> Future<T> submit(Callable<T> task);
  <T> Future<T> submit(Runnable task, T result);
  Future<?> submit(Runnable task);
  <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;
  <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                 long timeout, TimeUnit unit)
    throws InterruptedException;

  <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException;
  <T> T invokeAny(Collection<? extends Callable<T>> tasks,
          long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
}

而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:

public interface Executor {
  void execute(Runnable command);
}

到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。

Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

然后ThreadPoolExecutor继承了类AbstractExecutorService。

在ThreadPoolExecutor类中有几个非常重要的方法:

execute()
submit()
shutdown()
shutdownNow()

execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。

shutdown()和shutdownNow()是用来关闭线程池的。

还有很多其他的方法:

比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。

二.深入剖析线程池实现原理

在上一节我们从宏观上介绍了ThreadPoolExecutor,下面我们来深入解析一下线程池的具体实现原理,将从下面几个方面讲解:

1.线程池状态

  2.任务的执行

  3.线程池中的线程初始化

  4.任务缓存队列及排队策略

  5.任务拒绝策略

  6.线程池的关闭

  7.线程池容量的动态调整

1.线程池状态

在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:

volatile int runState;
static final int RUNNING  = 0;
static final int SHUTDOWN  = 1;
static final int STOP    = 2;
static final int TERMINATED = 3;

runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;

下面的几个static final变量表示runState可能的几个取值。

当创建线程池后,初始时,线程池处于RUNNING状态;

如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;

如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;

当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

2.任务的执行

在了解将任务提交给线程池到任务执行完毕整个过程之前,我们先来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:

private final BlockingQueue<Runnable> workQueue;       //任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock();  //线程池的主要状态锁,对线程池状态(比如线程池大小
                               //、runState等)的改变都要使用这个锁
private final HashSet<Worker> workers = new HashSet<Worker>(); //用来存放工作集

private volatile long keepAliveTime;  //线程存货时间
private volatile boolean allowCoreThreadTimeOut;  //是否允许为核心线程设置存活时间
private volatile int  corePoolSize;   //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int  maximumPoolSize;  //线程池最大能容忍的线程数

private volatile int  poolSize;    //线程池中当前的线程数

private volatile RejectedExecutionHandler handler; //任务拒绝策略

private volatile ThreadFactory threadFactory;  //线程工厂,用来创建线程

private int largestPoolSize;  //用来记录线程池中曾经出现过的最大线程数

private long completedTaskCount;  //用来记录已经执行完毕的任务个数

每个变量的作用都已经标明出来了,这里要重点解释一下corePoolSize、maximumPoolSize、largestPoolSize三个变量。

corePoolSize在很多地方被翻译成核心池大小,其实我的理解这个就是线程池的大小。举个简单的例子:

假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。

因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;

当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;

如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;

然后就将任务也分配给这4个临时工人做;

如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。

当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。

这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。

不过为了方便理解,在本文后面还是将corePoolSize翻译成核心池大小。

largestPoolSize只是一个用来起记录作用的变量,用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。

下面我们进入正题,看一下任务从提交到最终执行完毕经历了哪些过程。

在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:

public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
    if (runState == RUNNING && workQueue.offer(command)) {
      if (runState != RUNNING || poolSize == 0)
        ensureQueuedTaskHandled(command);
    }
    else if (!addIfUnderMaximumPoolSize(command))
      reject(command); // is shutdown or saturated
  }
}

上面的代码可能看起来不是那么容易理解,下面我们一句一句解释:

首先,判断提交的任务command是否为null,若是null,则抛出空指针异常;

接着是这句,这句要好好理解一下:

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

由于是或条件运算符,所以先计算前半部分的值,如果线程池中当前线程数不小于核心池大小,那么就会直接进入下面的if语句块了。

如果线程池中当前线程数小于核心池大小,则接着执行后半部分,也就是执行

addIfUnderCorePoolSize(command)

如果执行完addIfUnderCorePoolSize这个方法返回false,则继续执行下面的if语句块,否则整个方法就直接执行完毕了。

如果执行完addIfUnderCorePoolSize这个方法返回false,然后接着判断:

if (runState == RUNNING && workQueue.offer(command))

如果当前线程池处于RUNNING状态,则将任务放入任务缓存队列;如果当前线程池不处于RUNNING状态或者任务放入缓存队列失败,则执行:

addIfUnderMaximumPoolSize(command)

如果执行addIfUnderMaximumPoolSize方法失败,则执行reject()方法进行任务拒绝处理。

回到前面:

if (runState == RUNNING && workQueue.offer(command))

这句的执行,如果说当前线程池处于RUNNING状态且将任务放入任务缓存队列成功,则继续进行判断:

if (runState != RUNNING || poolSize == 0)

这句判断是为了防止在将此任务添加进任务缓存队列的同时其他线程突然调用shutdown或者shutdownNow方法关闭了线程池的一种应急措施。如果是这样就执行:

ensureQueuedTaskHandled(command)

进行应急处理,从名字可以看出是保证 添加到任务缓存队列中的任务得到处理。

我们接着看2个关键方法的实现:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
  Thread t = null;
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    if (poolSize < corePoolSize && runState == RUNNING)
      t = addThread(firstTask);    //创建线程去执行firstTask任务
    } finally {
    mainLock.unlock();
  }
  if (t == null)
    return false;
  t.start();
  return true;
}

这个是addIfUnderCorePoolSize方法的具体实现,从名字可以看出它的意图就是当低于核心吃大小时执行的方法。下面看其具体实现,首先获取到锁,因为这地方涉及到线程池状态的变化,先通过if语句判断当前线程池中的线程数目是否小于核心池大小,有朋友也许会有疑问:前面在execute()方法中不是已经判断过了吗,只有线程池当前线程数目小于核心池大小才会执行addIfUnderCorePoolSize方法的,为何这地方还要继续判断?原因很简单,前面的判断过程中并没有加锁,因此可能在execute方法判断的时候poolSize小于corePoolSize,而判断完之后,在其他线程中又向线程池提交了任务,就可能导致poolSize不小于corePoolSize了,所以需要在这个地方继续判断。然后接着判断线程池的状态是否为RUNNING,原因也很简单,因为有可能在其他线程中调用了shutdown或者shutdownNow方法。然后就是执行

t = addThread(firstTask);

这个方法也非常关键,传进去的参数为提交的任务,返回值为Thread类型。然后接着在下面判断t是否为空,为空则表明创建线程失败(即poolSize>=corePoolSize或者runState不等于RUNNING),否则调用t.start()方法启动线程。

我们来看一下addThread方法的实现:

private Thread addThread(Runnable firstTask) {
  Worker w = new Worker(firstTask);
  Thread t = threadFactory.newThread(w); //创建一个线程,执行任务
  if (t != null) {
    w.thread = t;      //将创建的线程的引用赋值为w的成员变量
    workers.add(w);
    int nt = ++poolSize;   //当前线程数加1
    if (nt > largestPoolSize)
      largestPoolSize = nt;
  }
  return t;
}

在addThread方法中,首先用提交的任务创建了一个Worker对象,然后调用线程工厂threadFactory创建了一个新的线程t,然后将线程t的引用赋值给了Worker对象的成员变量thread,接着通过workers.add(w)将Worker对象添加到工作集当中。

下面我们看一下Worker类的实现:

private final class Worker implements Runnable {
  private final ReentrantLock runLock = new ReentrantLock();
  private Runnable firstTask;
  volatile long completedTasks;
  Thread thread;
  Worker(Runnable firstTask) {
    this.firstTask = firstTask;
  }
  boolean isActive() {
    return runLock.isLocked();
  }
  void interruptIfIdle() {
    final ReentrantLock runLock = this.runLock;
    if (runLock.tryLock()) {
      try {
    if (thread != Thread.currentThread())
    thread.interrupt();
      } finally {
        runLock.unlock();
      }
    }
  }
  void interruptNow() {
    thread.interrupt();
  }

  private void runTask(Runnable task) {
    final ReentrantLock runLock = this.runLock;
    runLock.lock();
    try {
      if (runState < STOP &&
        Thread.interrupted() &&
        runState >= STOP)
      boolean ran = false;
      beforeExecute(thread, task);  //beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据
      //自己需要重载这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等
      try {
        task.run();
        ran = true;
        afterExecute(task, null);
        ++completedTasks;
      } catch (RuntimeException ex) {
        if (!ran)
          afterExecute(task, ex);
        throw ex;
      }
    } finally {
      runLock.unlock();
    }
  }

  public void run() {
    try {
      Runnable task = firstTask;
      firstTask = null;
      while (task != null || (task = getTask()) != null) {
        runTask(task);
        task = null;
      }
    } finally {
      workerDone(this);  //当任务队列中没有任务时,进行清理工作
    }
  }
}

它实际上实现了Runnable接口,因此上面的Thread t = threadFactory.newThread(w);效果跟下面这句的效果基本一样:

Thread t = new Thread(w);

相当于传进去了一个Runnable任务,在线程t中执行这个Runnable。

既然Worker实现了Runnable接口,那么自然最核心的方法便是run()方法了:

public void run() {
  try {
    Runnable task = firstTask;
    firstTask = null;
    while (task != null || (task = getTask()) != null) {
      runTask(task);
      task = null;
    }
  } finally {
    workerDone(this);
  }
}

从run方法的实现可以看出,它首先执行的是通过构造器传进来的任务firstTask,在调用runTask()执行完firstTask之后,在while循环里面不断通过getTask()去取新的任务来执行,那么去哪里取呢?自然是从任务缓存队列里面去取,getTask是ThreadPoolExecutor类中的方法,并不是Worker类中的方法,下面是getTask方法的实现:

Runnable getTask() {
  for (;;) {
    try {
      int state = runState;
      if (state > SHUTDOWN)
        return null;
      Runnable r;
      if (state == SHUTDOWN) // Help drain queue
        r = workQueue.poll();
      else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果线程数大于核心池大小或者允许为核心池线程设置空闲时间,
        //则通过poll取任务,若等待一定的时间取不到任务,则返回null
        r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
      else
        r = workQueue.take();
      if (r != null)
        return r;
      if (workerCanExit()) {  //如果没取到任务,即r为null,则判断当前的worker是否可以退出
        if (runState >= SHUTDOWN) // Wake up others
          interruptIdleWorkers();  //中断处于空闲状态的worker
        return null;
      }
      // Else retry
    } catch (InterruptedException ie) {
      // On interruption, re-check runState
    }
  }

在getTask中,先判断当前线程池状态,如果runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null。

如果runState为SHUTDOWN或者RUNNING,则从任务缓存队列取任务。

如果当前线程池的线程数大于核心池大小corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用poll(time,timeUnit)来取任务,这个方法会等待一定的时间,如果取不到任务就返回null。

然后判断取到的任务r是否为null,为null则通过调用workerCanExit()方法来判断当前worker是否可以退出,我们看一下workerCanExit()的实现:

private boolean workerCanExit() {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  boolean canExit;
  //如果runState大于等于STOP,或者任务缓存队列为空了
  //或者 允许为核心池线程设置空闲存活时间并且线程池中的线程数目大于1
  try {
    canExit = runState >= STOP ||
      workQueue.isEmpty() ||
      (allowCoreThreadTimeOut &&
       poolSize > Math.max(1, corePoolSize));
  } finally {
    mainLock.unlock();
  }
  return canExit;
}

也就是说如果线程池处于STOP状态、或者任务队列已为空或者允许为核心池线程设置空闲存活时间并且线程数大于1时,允许worker退出。如果允许worker退出,则调用interruptIdleWorkers()中断处于空闲状态的worker,我们看一下interruptIdleWorkers()的实现:

void interruptIdleWorkers() {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    for (Worker w : workers) //实际上调用的是worker的interruptIfIdle()方法
      w.interruptIfIdle();
  } finally {
    mainLock.unlock();
  }
}

从实现可以看出,它实际上调用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:

void interruptIfIdle() {
  final ReentrantLock runLock = this.runLock;
  if (runLock.tryLock()) {  //注意这里,是调用tryLock()来获取锁的,因为如果当前worker正在执行任务,锁已经被获取了,是无法获取到锁的
                //如果成功获取了锁,说明当前worker处于空闲状态
    try {
  if (thread != Thread.currentThread())
  thread.interrupt();
    } finally {
      runLock.unlock();
    }
  }
}

这里有一个非常巧妙的设计方式,假如我们来设计线程池,可能会有一个任务分派线程,当发现有线程空闲时,就从任务缓存队列中取一个任务交给空闲线程执行。但是在这里,并没有采用这样的方式,因为这样会要额外地对任务分派线程进行管理,无形地会增加难度和复杂度,这里直接让执行完任务的线程去任务缓存队列里面取任务来执行。

我们再看addIfUnderMaximumPoolSize方法的实现,这个方法的实现思想和addIfUnderCorePoolSize方法的实现思想非常相似,唯一的区别在于addIfUnderMaximumPoolSize方法是在线程池中的线程数达到了核心池大小并且往任务队列中添加任务失败的情况下执行的:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
  Thread t = null;
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    if (poolSize < maximumPoolSize && runState == RUNNING)
      t = addThread(firstTask);
  } finally {
    mainLock.unlock();
  }
  if (t == null)
    return false;
  t.start();
  return true;
}

看到没有,其实它和addIfUnderCorePoolSize方法的实现基本一模一样,只是if语句判断条件中的poolSize < maximumPoolSize不同而已。

到这里,大部分朋友应该对任务提交给线程池之后到被执行的整个过程有了一个基本的了解,下面总结一下:

1)首先,要清楚corePoolSize和maximumPoolSize的含义;

2)其次,要知道Worker是用来起到什么作用的;

3)要知道任务提交给线程池之后的处理策略,这里总结一下主要有4点:

如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

3.线程池中的线程初始化

默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。

在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:

prestartCoreThread():初始化一个核心线程;
prestartAllCoreThreads():初始化所有核心线程
下面是这2个方法的实现:

public boolean prestartCoreThread() {
  return addIfUnderCorePoolSize(null); //注意传进去的参数是null
}

public int prestartAllCoreThreads() {
  int n = 0;
  while (addIfUnderCorePoolSize(null))//注意传进去的参数是null
    ++n;
  return n;
}

注意上面传进去的参数是null,根据第2小节的分析可知如果传进去的参数为null,则最后执行线程会阻塞在getTask方法中的

r = workQueue.take();

即等待任务队列中有任务。

4.任务缓存队列及排队策略

在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。

workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:

1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;

2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;

3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

5.任务拒绝策略

当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

6.线程池的关闭

ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
7.线程池容量的动态调整

ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

setCorePoolSize:设置核心池大小
setMaximumPoolSize:设置线程池最大能创建的线程数目大小
当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。

三.使用示例

前面我们讨论了关于线程池的实现原理,这一节我们来看一下它的具体使用:

public class Test {
   public static void main(String[] args) {
     ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
         new ArrayBlockingQueue<Runnable>(5));

     for(int i=0;i<15;i++){
       MyTask myTask = new MyTask(i);
       executor.execute(myTask);
       System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
       executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
     }
     executor.shutdown();
   }
}

class MyTask implements Runnable {
  private int taskNum;

  public MyTask(int num) {
    this.taskNum = num;
  }

  @Override
  public void run() {
    System.out.println("正在执行task "+taskNum);
    try {
      Thread.currentThread().sleep(4000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("task "+taskNum+"执行完毕");
  }
}

执行结果:

正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 1
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 2
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 4
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 10
线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 11
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 12
线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 13
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 14
task 3执行完毕
task 0执行完毕
task 2执行完毕
task 1执行完毕
正在执行task 8
正在执行task 7
正在执行task 6
正在执行task 5
task 4执行完毕
task 10执行完毕
task 11执行完毕
task 13执行完毕
task 12执行完毕
正在执行task 9
task 14执行完毕
task 8执行完毕
task 5执行完毕
task 7执行完毕
task 6执行完毕
task 9执行完毕

从执行结果可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。如果上面程序中,将for循环中改成执行20个任务,就会抛出任务拒绝异常了。

不过在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:

Executors.newCachedThreadPool();    //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor();  //创建容量为1的缓冲池
Executors.newFixedThreadPool(int);  //创建固定容量大小的缓冲池

下面是这三个静态方法的具体实现;

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                 0L, TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                 60L, TimeUnit.SECONDS,
                 new SynchronousQueue<Runnable>());
}

从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;

newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。

另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。

四.如何合理配置线程池的大小

本节来讨论一个比较重要的话题:如何合理配置线程池大小,仅供参考。

一般需要根据任务的类型来配置线程池大小:

如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1

如果是IO密集型任务,参考值可以设置为2*NCPU

当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

总结

以上就是本文关于深入理解Java编程线程池的实现原理的全部内容,希望对大家有所帮助。有什么问题可以随时留言,小编会及时回复大家的。感谢朋友们对本站的支持!

(0)

相关推荐

  • 详解Java线程池和Executor原理的分析

    详解Java线程池和Executor原理的分析 线程池作用与基本知识 在开始之前,我们先来讨论下"线程池"这个概念."线程池",顾名思义就是一个线程缓存.它是一个或者多个线程的集合,用户可以把需要执行的任务简单地扔给线程池,而不用过多的纠结与执行的细节.那么线程池有哪些作用?或者说与直接用Thread相比,有什么优势?我简单总结了以下几点: 减小线程创建和销毁带来的消耗 对于Java Thread的实现,我在前面的一篇blog中进行了分析.Java Thread与内

  • java的线程池框架及线程池的原理

    java 线程池详解 什么是线程池? 提供一组线程资源用来复用线程资源的一个池子 为什么要用线程池? 线程的资源是有限的,当处理一组业务的时候,我们需要不断的创建和销毁线程,大多数情况下,我们需要反复的进行大量的创建和销毁工作,这个动作对于服务器而言,也是很浪费的一种情况,这时候我们可以利用线程池来复用这一部分已经创建过的线程资源,避免不断的创建和销毁的动作. 线程池的原理 创建好固定数量的线程,吧线程先存下来,有任务提交的时候,把资源放到等待队列中,等待线程池中的任务队列不断的去消费处理这个队

  • Java 线程池原理深入分析

    Java 线程池原理 Executor框架的两级调度模型 在HotSpot VM的模型中,Java线程被一对一映射为本地操作系统线程.JAVA线程启动时会创建一个本地操作系统线程,当JAVA线程终止时,对应的操作系统线程也被销毁回收,而操作系统会调度所有线程并将它们分配给可用的CPU. 在上层,JAVA程序会将应用分解为多个任务,然后使用应用级的调度器(Executor)将这些任务映射成固定数量的线程:在底层,操作系统内核将这些线程映射到硬件处理器上. Executor框架类图 在前面介绍的JA

  • Java concurrency线程池之线程池原理(二)_动力节点Java学院整理

    线程池示例 在分析线程池之前,先看一个简单的线程池示例. import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; public class ThreadPoolDemo1 { public static void main(String[] args) { // 创建一个可重用固定线程数的线程池 ExecutorService pool = Executors.newFixedThre

  • java 打造阻塞式线程池的实例详解

    java 打造阻塞式线程池的实例详解 原来以为tiger已经自带了这种线程池,就是在任务数量超出时能够阻塞住投放任务的线程,主要想用在JMS消息监听. 开始做法: 在ThreadPoolExcecutor中代入new ArrayBlockingQueue(MAX_TASK). 在任务超出时报错:RejectedExecutionException. 后来不用execute方法加入任务,直接getQueue().add(task), 利用其阻塞特性.但是发现阻塞好用了,但是任务没有被处理.一看Qu

  • 详谈Java几种线程池类型介绍及使用方法

    一.线程池使用场景 •单个任务处理时间短 •将需处理的任务数量大 二.使用Java线程池好处 1.使用new Thread()创建线程的弊端: •每次通过new Thread()创建对象性能不佳. •线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom. •缺乏更多功能,如定时执行.定期执行.线程中断. 2.使用Java线程池的好处: •重用存在的线程,减少对象创建.消亡的开销,提升性能. •可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞

  • Java concurrency线程池之线程池原理(四)_动力节点Java学院整理

    拒绝策略介绍 线程池的拒绝策略,是指当任务添加到线程池中被拒绝,而采取的处理措施. 当任务添加到线程池中之所以被拒绝,可能是由于:第一,线程池异常关闭.第二,任务数量超过线程池的最大限制. 线程池共包括4种拒绝策略,它们分别是:AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy和DiscardPolicy. AbortPolicy         -- 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException

  • java线程池:获取运行线程数并控制线程启动速度的方法

    在java里, 我们可以使用Executors.newFixedThreadPool 来创建线程池, 然后就可以不停的创建新任务,并用线程池来执行了. 在提交任务时,如果线程池已经被占满,任务会进到一个队列里等待执行. 这种机制在一些特定情况下会有些问题.今天我就遇到一种情况:创建线程比线程执行的速度要快的多,而且单个线程占用的内存又多,所以很快内存就爆了. 想了一个办法,就是在提交任务之前,先检查目前正在执行的线程数目,只有没把线程池占满的时候在去提交任务. 代码很简单: int thread

  • 深入理解Java编程线程池的实现原理

    在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,

  • Java线程池FutureTask实现原理详解

    前言 线程池可以并发执行多个任务,有些时候,我们可能想要跟踪任务的执行结果,甚至在一定时间内,如果任务没有执行完成,我们可能还想要取消任务的执行,为了支持这一特性,ThreadPoolExecutor提供了 FutureTask 用于追踪任务的执行和取消.本篇介绍FutureTask的实现原理. 类视图 为了更好的理解FutureTask的实现原理,这里先提供几个重要接口和类的结构,如下图所示: RunnableAdapter ThreadPoolExecutor提供了submit接口用于提交任

  • java线程池使用及原理面试题

    目录 引导语 1.说说你对线程池的理解? 2.ThreadPoolExecutor.Executor.ExecutorService.Runnable.Callable.FutureTask 之间的关系? 3.说一说队列在线程池中起的作用? 4.结合请求不断增加时,说一说线程池构造器参数的含义和表现? 5.coreSize 和 maxSize 可以动态设置么,有没有规则限制? 6.说一说对于线程空闲回收的理解,源码中如何体现的? 7.如果我想在线程池任务执行之前和之后,做一些资源清理的工作,可以

  • Java线程池使用与原理详解

    线程池是什么? 我们可以利用java很容易创建一个新线程,同时操作系统创建一个线程也是一笔不小的开销.所以基于线程的复用,就提出了线程池的概念,我们使用线程池创建出若干个线程,执行完一个任务后,该线程会存在一段时间(用户可以设定空闲线程的存活时间,后面会介绍),等到新任务来的时候就直接复用这个空闲线程,这样就省去了创建.销毁线程损耗.当然空闲线程也会是一种资源的浪费(所有才有空闲线程存活时间的限制),但总比频繁的创建销毁线程好太多. 下面是我的测试代码 /* * @TODO 线程池测试 */ @

  • Java编程线程间通信与信号量代码示例

    1.信号量Semaphore 先说说Semaphore,Semaphore可以控制某个资源可被同时访问的个数,通过acquire()获取一个许可,如果没有就等待,而release()释放一个许可.一般用于控制并发线程数,及线程间互斥.另外重入锁ReentrantLock也可以实现该功能,但实现上要复杂些. 功能就类似厕所有5个坑,假如有10个人要上厕所,那么同时只能有多少个人去上厕所呢?同时只能有5个人能够占用,当5个人中的任何一个人让开后,其中等待的另外5个人中又有一个人可以占用了.另外等待的

  • Java常用线程池原理及使用方法解析

    一.简介 什么是线程池? 池的概念大家也许都有所听闻,池就是相当于一个容器,里面有许许多多的东西你可以即拿即用.java中有线程池.连接池等等.线程池就是在系统启动或者实例化池时创建一些空闲的线程,等待工作调度,执行完任务后,线程并不会立即被销毁,而是重新处于空闲状态,等待下一次调度. 线程池的工作机制? 在线程池的编程模式中,任务提交并不是直接提交给线程,而是提交给池.线程池在拿到任务之后,就会寻找有没有空闲的线程,有则分配给空闲线程执行,暂时没有则会进入等待队列,继续等待空闲线程.如果超出最

  • Java使用线程池的优势有哪些

    池化技术相比大家已经屡见不鲜了,线程池.数据库连接池.Http 连接池等等都是对这个思想的应用.池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率. 线程池提供了一种限制和管理资源(包括执行一个任务). 每个线程池还维护一些基本统计信息,例如已完成任务的数量. 这里借用<Java 并发编程的艺术>提到的来说一下使用线程池的好处: 降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗. 提高响应速度.当任务到达时,任务可以不需要的等到线程创建就能立即执行. 提高线程

  • 深度源码解析Java 线程池的实现原理

    java 系统的运行归根到底是程序的运行,程序的运行归根到底是代码的执行,代码的执行归根到底是虚拟机的执行,虚拟机的执行其实就是操作系统的线程在执行,并且会占用一定的系统资源,如CPU.内存.磁盘.网络等等.所以,如何高效的使用这些资源就是程序员在平时写代码时候的一个努力的方向.本文要说的线程池就是一种对 CPU 利用的优化手段. 线程池,百度百科是这么解释的: 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务.线程池线程都是后台线程.每个线程都使用默认的

  • Java创建线程池为什么一定要用ThreadPoolExecutor

    目录 先说结论 OOM风险演示 内存溢出原因分析 使用ThreadPoolExecutor来改进 其他创建线程池的问题 总结 前言: 在 Java 语言中,并发编程都是依靠线程池完成的,而线程池的创建方式又有很多,但从大的分类来说,线程池的创建总共分为两大类:手动方式使用ThreadPoolExecutor创建线程池和使用 Executors 执行器自动创建线程池. 那究竟要使用哪种方式来创建线程池呢?我们今天就来详细的聊一聊. 先说结论 在 Java 语言中,一定要使用 ThreadPoolE

  • Java使用线程池执行定时任务

    目录 1.schedule 2.scheduleAtFixedRate 3.scheduleWithFixedDelay 总结 前言: 在 Java 语言中,有两个线程池可以执行定时任务:ScheduledThreadPool 和 SingleThreadScheduledExecutor,其中 SingleThreadScheduledExecutor 可以看做是 ScheduledThreadPool 的单线程版本,它的用法和 ScheduledThreadPool 是一样的,所以本文重点来

随机推荐