一文彻底搞懂java多线程和线程池
目录
- 什么是线程
- 一. Java实现线程的三种方式
- 1.1、继承Thread类
- 1.2、实现Runnable接口,并覆写run方法
- 二. Callable接口
- 2.1 Callable接口
- 2.2 Future接口
- 2.3 Future实现类是FutureTask。
- 三. Java线程池
- 3.1、背景
- 3.2、作用
- 3.3、应用范围
- 四. Java 线程池框架Executor
- 4.1、类图:
- 4.2 核心类ThreadPoolExecutor:
- 4.3 ThreadPoolExecutor逻辑结构
- 五.Executor线程池实例
- 5.1、使用Runnable接口创建线程,并由executor调度执行:
- 5.2、使用Callable接口创建线程,并由executor调度执行:
- 六.深入剖析线程池实现原理
- 6.1、线程池状态
- 6.2、线程池中的线程初始化
- 6.3、任务的执行
- 6.4、任务缓存队列及排队策略
- 6.5、任务拒绝策略
- 6.6、线程池的关闭
- 6.7、线程池容量的动态调整
- 六、线程池的配置策略
- 七、线程池创建注意
什么是线程
传统的程序设计语言同一时刻只能执行单任务操作,效率非常低,如果网络程序在接收数据时发生阻塞,只能等到程序接收数据之后才能继续运行。随着 Internet 的飞速发展,这种单任务运行的状况越来越不被接受。如果网络接收数据阻塞,后台服务程序就会一直处于等待状态而不能继续任何操作。 这种阻塞情况经常发生, 这时的 CPU资源完全处于闲置状态。
多线程实现后台服务程序可以同时处理多个任务,并不发生阻塞现象。多线程是 Java 语言的一个很重要的特征。 多线程程序设计最大的特点就是能够提高程序执行效率和处理速度。Java 程序可同时并行运行多个相对独立的线程。例如创建一个线程来接收数据,另一个线程发送数据,既使发送线程在接收数据时被阻塞,接受数据线程仍然可以运行。 线程(Thread)是控制线程(Thread of Control)的缩写,它是具有一定顺序的指令序列(即所编写的程序代码)、存放方法中定义局部变量的栈和一些共享数据。线程是相互独立的,每个方法的局部变量和其他线程的局部变量是分开的,因此,任何线程都不能访问除自身之外的其他线程的局部变量。如果两个线程同时访问同一个方法,那每个线程将各自得到此方法的一个拷贝。
Java 提供的多线程机制使一个程序可同时执行多个任务。线程有时也被称为小进程,它是从一个大进程里分离出来的小的独立的线程。由于实现了多线程技术,Java 显得更健壮。多线程带来的好处是更好的交互性能和实时控制性能。多线程是强大而灵巧的编程工具,但要用好它却不是件容易的事。在多线程编程中,每个线程都通过代码实现线程的行为,并将数据供给代码操作。编码和数据有时是相当独立的,可分别向线程提供。多个线程可以同时处理同一代码和同一数据,不同的线程也可以处理各自不同的编码和数据。
一. Java实现线程的三种方式
先简单看看java多线程如何实现的:
1.1、继承Thread类
让自己的类继承 Thread 类:
public class Test extends Thread { public static void main(String[] args) { Thread t = new Test(); t.start(); } @Override public void run() { System.out.println("Override run() ..."); } }
用Thread类的方式创建多线程的特点:
1、因为线程已经继承Thread类,所以不可以再继承其它类。
2、如果需要访问当前线程,直接使用this即可。
1.2、实现Runnable接口,并覆写run方法
实现 Runnable 接口:
public class RunnableTest implements Runnable { public static void main(String[] args) throws Exception{ Thread t = new Thread(new RunnableTest()); t.start(); } @Override public void run(){ System.out.println(123); } }
1.3、实现Callable接口,并覆写call方法
- 实现Callable接口的类TCallable
- 以TCallable为参数创建FutureTask对象
- 将FutureTask作为参数创建Thread对象
- 调用线程对象的start()方法
public class TCallable implements Callable{ public static void main() { FutureTask futureTask = new FutureTask(new TCallable()); Thread thread = new Thread(futureTask); thread.start(); try { Object o = futureTask.get(); System.out.println(o); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } @Override public Object call() throws Exception { return "Override call() ..."; } }
用Runnable与Callable接口的方式创建多线程的特点:
1、线程类只是实现了Runnable接口或Callable接口,还可以继承其它类。
2、在这种方式下,多个线程可以共享一个target对象,所以非常适合多个线程来处理同一份资源情况。
3、如果需要访问当前线程,需要使用Thread.currentThread方法。
4、Callable接口与Runnable接口相比,只是Callable接口可以返回值而已。
主要介绍第三种方式Callable,因为讲到线程池的时候,大多时候使用Callable接口。
二. Callable接口
Callable用于产生结果,Future用于获取结果。
2.1 Callable接口
Callable接口代表一段可以调用并返回结果的代码;
Java 5在concurrency包中引入了java.util.concurrent.Callable 接口,它和Runnable接口很相似,但是 Runnable 不会返回结果,并且无法抛出返回结果的异常。 但Callable可以返回一个对象或者抛出一个异常。
Callable接口使用泛型去定义它的返回类型。Executors类提供了一些有用的方法在线程池中执行Callable内的任务。由于Callable任务是并行的,我们必须等待它返回的结果。
Callable接口的源码如下:
public interface Callable<V> { V call() throws Exception; // 计算结果 }
2.2 Future接口
Future用于表示异步计算的结果。
由于Callable任务是并行的,我们必须等待它返回的结果。那如何获取返回结果?java.util.concurrent.Future对象为我们解决了这个问题。在线程池提交Callable任务后返回了一个Future对象,使用Future可以知道Callable任务的状态和得到Callable返回的执行结果。Future提供get()方法让我们可以等待Callable结束并获取它的执行结果。
Future接口的源码如下:从源码我们可以知道,Future可以对异步运算的任务的结果进行等待获取、判断是否已经完成、取消任务等操作。
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning);// 试图取消对此任务的执行 boolean isCancelled(); // 如果在任务正常完成前将其取消,则返回 true boolean isDone(); // 如果任务已完成,则返回 true V get() throws InterruptedException, ExecutionException; // 如有必要,等待计算完成,然后获取其结果 // 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
2.3 Future实现类是FutureTask。
如果不想分支线程阻塞主线程,又想取得分支线程的执行结果,就用FutureTask , FutureTask实现了RunnableFuture接口,这个接口的定义如下:
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
可以看到这个接口实现了Runnable和Future接口,接口中的具体实现由FutureTask来实现。这个类的构造方法如下 :
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; // state记录FutureTask的状态 this.state = NEW; // ensure visibility of callable }
FutureTask 构造函数传入一个 Callable 的具体实现类,一个 FutureTask 对象可以对调用了 Callable 和 Runnable 的对象进行包装,由于 FutureTask 也是Runnable 接口的实现类,所以 FutureTask 也可以放入线程池中。
在web容器使用这种多线程的方式,要记住记得shutdown关闭,否则可能导致线程没有被关闭回收,结果线程数一直增加
当线程数太多时,肯定会导致内存溢出或者影响服务器性能等。
三. Java线程池
既然单个线程的创建和销毁都很简单,我们为什么要使用线程池?
3.1、背景
JAVA是面向对象,JAVA虚拟机每创建一个对象都要获取内存资源或者其他很多其他资源,同时试图跟踪每个对象,以便可以在对象销毁后进行垃圾回收。因此创建和销毁对象是非常费时间的。所以为了尽可能提高应用程序资源使用效率,就是要降低就是尽可能降低创建和销毁对象的次数,特别是一些非常耗资源的对象创建和销毁,这就是一些"池化资源"技术产生的原因,比如大家熟悉的数据库连接池。
针对线程而言:
线程占用系统内存:虽然线程相对于进程而言是轻量级的,创建线程依然需要占用系统的内存资源。如果无限制的创建线程,对应虚拟机垃圾回收而言也是很有压力的,毕竟线程也是对象。
线程的创建和关闭是需要花费时间的:如果任务非常多,频繁的创建和销毁线程也是需要占用系统的时间片的。
3.2、作用
1)降低资源消耗,重复利用已经创建好的线程,降低线程创建和销毁造成的资源消耗
2)提高响应速度,当任务到达时,任务可以不需要等到线程创建就可以立即执行
3)提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
3.3、应用范围
1)、需要大量的线程来完成任务。且完成任务的时间比较短。
http请求这种任务,使用线程池技术是很合适的。由于单个任务小,而任务数量巨大,你能够想象一个热门站点的点击次数。
如果一个线程的任务执行时间非常长,就没必要用线程池,比如一个telnet连接请求,线程池的优势就不明显了。
2、对性能要求苛刻的应用,比方要求server迅速响应客户请求。
3、接受突发性的大量请求,但不至于使server因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,尽管理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,并出现"OutOfMemory"的错误。
四. Java 线程池框架Executor
一个线程池包含下面四个基本组成部分:
1、线程池管理器(ThreadPool):用于创建并管理线程池。包含 创建线程池,销毁线程池,加入新任务;
2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态。能够循环的运行任务;
3、任务接口(Task):每一个任务必须实现的接口,以供工作线程调度任务的运行。它主要规定了任务的入口。任务运行完后的收尾工作,任务的运行状态等。
4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
下面我们主要官方提供的线程池管理工具Executor框架。
Executor框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架。无限制的创建线程会引起应用程序内存溢出。所以创建一个线程池是个更好的的解决方案,因为可以限制线程的数量并且可以回收再利用这些线程。Executor框架包括:线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。
4.1、类图:
Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;
然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;
然后ThreadPoolExecutor继承了类AbstractExecutorService。
4.2 核心类ThreadPoolExecutor:
java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。
在ThreadPoolExecutor类中提供了四个构造方法:
public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory); public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler); public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler); ... }
ThreadPoolExecutor提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。下面解释下一下构造器中各个参数的含义:
public ThreadPoolExecutor(int corePoolSize, //线程池核心线程数量 int maximumPoolSize, //线程池最大线程数量 long keepAliveTime, //线程KeepAlive时间,当线程池数量超过核心线程数量以后,idle时间超过这个值的线程会被终止 TimeUnit unit, //线程KeepAlive时间单位 BlockingQueue<Runnable> workQueue, //任务队列 ThreadFactory threadFactory, //创建线程的工厂对象 RejectedExecutionHandler handler)
corePoolSize: 核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:ArrayBlockingQueue;LinkedBlockingQueue;SynchronousQueue;
1. ArrayBlockingQueue : 有界的数组队列
2. LinkedBlockingQueue : 可支持有界/无界的队列,使用链表实现
3. PriorityBlockingQueue : 优先队列,可以针对任务排序
4. SynchronousQueue : 队列长度为1的队列,和Array有点区别就是:client thread提交到block queue会是一个阻塞过程,直到有一个worker thread连接上来poll task。
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
threadFactory:线程工厂,主要用来创建线程;
handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
在ThreadPoolExecutor类中有几个非常重要的方法:
execute()
submit()
shutdown()
shutdownNow()
execute()方法: 实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit()方法: 是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。
shutdown()和shutdownNow()是用来关闭线程池的。
还有很多其他的方法:
比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。
4.3 ThreadPoolExecutor逻辑结构
ThreadPoolExecutor线程池的逻辑结构:
第一步、初始的poolSize < corePoolSize,提交的runnable任务,会直接做为new一个Thread的参数,立马执行
第二步、当提交的任务数超过了corePoolSize,就进入了第二步操作。会将当前的runable提交到一个block queue中
第三步,如果block queue是个有界队列,当队列满了之后就进入了第三步。如果poolSize < maximumPoolsize时,会尝试new 一个Thread的进行救急处理,立马执行对应的runnable任务
第四步,如果第三步救急方案也无法处理了,就会走到第四步执行reject操作
五.Executor线程池实例
比如我们要并发调用远程http接口:
5.1、使用Runnable接口创建线程,并由executor调度执行:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; public class TExecutor { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 5; i++) { RunnableTask runnable = new RunnableTask("http://xxxx.com/api/v2", Integer.toString(i)); executorService.execute(runnable); } System.out.println("线程任务开始执行"); executorService.shutdown(); } /** * 线程池提交Runnable任务 */ public void runnableTask() { ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<?> futureTask = null; for (int i = 0; i < 5; i++) { RunnableTask runnable = new RunnableTask("http://xxxx.com/api/v2", Integer.toString(1)); futureTask = executorService.submit(runnable); try { futureTask.get(10, TimeUnit.SECONDS); //等待超时,等待给定的时间之后,获取其结果 } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); } finally{ futureTask.cancel(true); } } executorService.shutdown(); } } class RunnableTask implements Runnable { String url; String param; public RunnableTask(String url, String param) { this.url = url; this.param = param; } //http调用 public String requestHttp() { String result = ""; return result; } @Override public void run() { System.out.println("正在执行task "+param); try { requestHttp(); } catch (Exception e) { e.printStackTrace(); } System.out.println("task "+param+"执行完毕"); } }
5.2、使用Callable接口创建线程,并由executor调度执行:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.TimeUnit; public class TestPoolThread { private ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100)); public String main(String gid) { List<Callable<String>> tasks = new ArrayList<>(); tasks.add(new CallableTask("http://xxxx.com/api/v2", "1")); tasks.add(new CallableTask("http://xxxx.com/api/v2", "2")); try { List<Future<String>> mFuture = executorService.invokeAll(tasks, 10, TimeUnit.SECONDS); String result1 = mFuture.get(0).get(); System.out.println( "result1:" + result1); String result2 = mFuture.get(0).get(); System.out.println( "result2:" + result2); } catch (Exception e) { e.printStackTrace(); } return ""; } } class CallableTask implements Callable<String> { String url; String param; public CallableTask(String url, String param) { this.url = url; this.param = param; } //http调用 public String requestHttp() { String result = ""; return result; } @Override public String call() { try { String param = "{}"; String result = requestHttp( ); return result; } catch (Exception e) { e.printStackTrace(); return ""; } } }
不过在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:
Executors.newCachedThreadPool : 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
Executors.newCachedThreadPool :创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
Executors.newFixedThreadPool :创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
Executors.newScheduledThreadPool :创建一个定长线程池,支持定时及周期性任务执行。
Executors.newSingleThreadExecutor :创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
静态方法的具体实现:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。
newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;
newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。
另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。
六.深入剖析线程池实现原理
在上一节我们从宏观上介绍了ThreadPoolExecutor,下面我们来深入解析一下线程池的具体实现原理,将从下面几个方面讲解:
1.线程池状态
2.线程池中的线程初始化
3.任务的执行
4.任务缓存队列及排队策略
5.任务拒绝策略
6.线程池的关闭
7.线程池容量的动态调整
6.1、线程池状态
在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:
volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3;
runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;下面的几个static final变量表示runState可能的几个取值。
当创建线程池后,初始时,线程池处于RUNNING状态;
如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
6.2、线程池中的线程初始化
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:
- prestartCoreThread():初始化一个核心线程;
- prestartAllCoreThreads():初始化所有核心线程
下面是这2个方法的实现:
public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); //注意传进去的参数是null } public int prestartAllCoreThreads() { int n = 0; while (addIfUnderCorePoolSize(null))//注意传进去的参数是null,等待任务队列中有任务 ++n; return n; }
6.3、任务的执行
任务提交基本逻辑如下:
如果当前线程池中的线程poolSize数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
如果当前线程池中的线程poolSize数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
如果当前线程池中的线程poolSize数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;如果线程池中的线程poolSize数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;
如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
下面我们分析一下从任务提交给线程池到任务执行完毕整个过程。
我们先来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:
private final BlockingQueue<Runnable> workQueue; //任务缓存队列,用来存放等待执行的任务 private final ReentrantLock mainLock = new ReentrantLock(); //线程池的主要状态锁,对线程池状态(比如线程池大小 //、runState等)的改变都要使用这个锁 private final HashSet<Worker> workers = new HashSet<Worker>(); //用来存放工作集 private volatile long keepAliveTime; //线程存活时间 private volatile boolean allowCoreThreadTimeOut; //是否允许为核心线程设置存活时间 private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列) private volatile int maximumPoolSize; //线程池最大能容忍的线程数 private volatile int poolSize; //线程池中当前的线程数 private volatile RejectedExecutionHandler handler; //任务拒绝策略 private volatile ThreadFactory threadFactory; //线程工厂,用来创建线程 private int largestPoolSize; //用来记录线程池中曾经出现过的最大线程数,跟线程池的容量没有任何关系 private long completedTaskCount; //用来记录已经执行完毕的任务个数
每个变量的作用都已经标明出来了。
下面我们我们看看任务从提交到最终执行完毕经历了哪些过程。
在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:
在源码添加相关注解来解释逻辑了:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //如果线程池中当前线程数不小于核心池大小,创建线程来执行任务 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } } /** * 简单拆解execute: public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //如果线程池中当前线程数不小于核心池大小, if (poolSize >= corePoolSize ) { //创建新线程来执行任务 boolean isCreate = addIfUnderCorePoolSize(command); if (!isCreate) { //创建线程失败:即poolSize>=corePoolSize或者runState不等于RUNNING) if (runState == RUNNING && workQueue.offer(command)) {//如果当前线程池处于RUNNING状态,则将任务放入任务缓存队列 if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command);//保证 添加到任务缓存队列中的任务得到处理 } //创建新线程来执行任务 boolean isUnderMaximumPoolSize = addIfUnderMaximumPoolSize(command); if (!isUnderMaximumPoolSize) reject(command); // is shutdown or saturated } } //创建线程来执行任务 } } */
我们看看2个关键方法的实现:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:
1、addIfUnderCorePoolSize:从名字可以看出意思就是当线程池的线程低于corePoolSize大小时创建线程执行任务。
2、addIfUnderMaximumPoolSize方法的实现和addIfUnderCorePoolSize方法的实现思想非常相似,唯一的区别在于addIfUnderMaximumPoolSize方法是在线程池中的线程数达到了核心池大小并且往任务队列中添加任务失败的情况下执行
/** * 当线程池的线程低于corePoolSize大小时创建线程执行任务 * @param firstTask * @return */ private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; //首先获取到锁 final ReentrantLock mainLock = this.mainLock; //通过加锁保证创建的线程是唯一 mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); //创建线程去执行firstTask任务 } finally { mainLock.unlock(); } if (t == null)//创建线程失败 return false; t.start();//启动线程 return true; } /** * 和addIfUnderCorePoolSize方法的实现思想非常相似 * @param firstTask * @return */ private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
创建新线程:首先用提交的任务创建了一个Worker对象,然后调用线程工厂threadFactory创建了一个新的线程t,然后将线程t的引用赋值给了Worker对象的成员变量thread,接着通过workers.add(w)将Worker对象添加到工作集当中
/** * 创建新线程 * @param firstTask * @return */ private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); //创建一个线程,执行任务,实际就是Thread t = new Thread(w); Thread t = threadFactory.newThread(w); if (t != null) { w.thread = t; //将创建的线程的引用赋值为w的成员变量 workers.add(w); int nt = ++poolSize; //当前线程数加1 if (nt > largestPoolSize) largestPoolSize = nt; } return t; }
Worker执行任务: Worker实现了Runnable接口,最核心的方法是run()方法:
private final class Worker implements Runnable { private final ReentrantLock runLock = new ReentrantLock(); private Runnable firstTask; volatile long completedTasks; Thread thread; Worker(Runnable firstTask) { this.firstTask = firstTask; } boolean isActive() { return runLock.isLocked(); } void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) { try { if (thread != Thread.currentThread()) thread.interrupt(); } finally { runLock.unlock(); } } } void interruptNow() { thread.interrupt(); } private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { if (runState < STOP && Thread.interrupted() && runState >= STOP) boolean ran = false; beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据 //自己需要重载这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等 try { task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } } /** * Worker实现了Runnable接口 */ public void run() { try { Runnable task = firstTask; firstTask = null; //从ThreadPoolExecutor任务缓存队列里面去取 while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); //当任务队列中没有任务时,进行清理工作 } } } }
6.4、任务缓存队列及排队策略
workQueue任务缓存队列是用来存放等待执行的任务。
workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:
1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
6.5、任务拒绝策略
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
6.6、线程池的关闭
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
- shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
6.7、线程池容量的动态调整
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
setCorePoolSize:设置核心池大小setMaximumPoolSize:设置线程池最大能创建的线程数目大小
当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。
六、线程池的配置策略
要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:
- 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
- 任务的优先级:高,中和低。
- 任务的执行时间:长,中和短。
- 任务的依赖性:是否依赖其他系统资源,如数据库连接。
任务性质不同的任务可以用不同规模的线程池分开处理。
CPU密集型任务:配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。cpu密集型任务需要大量的运算,而且没有阻塞,需要CPU一直全速运行,CPU密集任务只有在真正的多核CPU上才可能得到加速。可以使用Runtime.availableProcessors方法获取可用处理器的个数。
一般计算公式:CPU核数 + 1个线程的线程池
ExecutorService THREAD_POOL = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(Runtime.getRuntime().availableProcessors() * 20), new ThreadPoolExecutor.CallerRunsPolicy());
《并发编程实战》一书中对于IO密集型任务建议线程池大小设为Ncpu+1Ncpu+1,原因是当计算密集型线程偶尔由于页缺失故障或其他原因而暂停时,这个“额外的”线程也能确保这段时间内的CPU始终周期不会被浪费。
对于计算密集型任务,不要创建过多的线程,由于线程有执行栈等内存消耗,创建过多的线程不会加快计算速度,反而会消耗更多的内存空间;另一方面线程过多,频繁切换线程上下文也会影响线程池的性能。
IO密集型任务:IO操作包括读写磁盘文件、读写数据库、网络请求等阻塞操作,执行这些操作,线程将处于等待状态。
io密集型由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。即该任务需要大量的IO,即大量的阻塞,这种类型分以下两种情况设置
1、有等待io任务:如果IO密集型任务线程并非一直在执行任务,则应配置尽可能多的线程,如CPU核数 * 2
2、需要一直运行的任务:参考公式:CPU核数 /(1 - 阻塞系数 ) 阻塞系数在0.8~0.9之间
比如:8核CPU:8/(1 - 0.9) = 80个线程数
用sleep方式模拟IO阻塞:
public class IOThreadPoolTest { // 使用无限线程数的CacheThreadPool线程池 static ThreadPoolExecutor cachedThreadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool(); static List<Callable<Object>> tasks; // 仍然是5000个任务 static int taskNum = 5000; static { tasks = new ArrayList<>(taskNum); for (int i = 0; i < taskNum; i++) { tasks.add(Executors.callable(new IOTask())); } } public static void main(String[] args) throws InterruptedException { cachedThreadPool.invokeAll(tasks);// warm up all thread testExecutor(cachedThreadPool, tasks); // 看看执行过程中创建了多少个线程 int largestPoolSize = cachedThreadPool.getLargestPoolSize(); System.out.println("largestPoolSize:" + largestPoolSize); cachedThreadPool.shutdown(); } private static void testExecutor(ExecutorService executor, List<Callable<Object>> tasks) throws InterruptedException { long start = System.currentTimeMillis(); executor.invokeAll(tasks); long end = System.currentTimeMillis(); System.out.println(end - start); } static class IOTask implements Runnable { @Override public void run() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }
这里使用无线程数限制的CachedThreadPool线程池,也就是说这里的5000个任务会被5000个线程同时处理,由于所有的线程都只是阻塞而不消耗CPU资源,所以5000个任务在不到2秒的时间内就执行完了。
很明显使用CachedThreadPool能有效提高IO密集型任务的吞吐量,而且由于CachedThreadPool中的线程会在空闲60秒自动回收,所以不会消耗过多的资源。
但是打开任务管理器你会发现执行任务的同时内存会飙升到接近400M,因为每个线程都消耗了一部分内存,在5000个线程创建之后,内存消耗达到了峰值。
所以使用CacheThreadPool的时候应该避免提交大量长时间阻塞的任务,以防止内存溢出;另一种替代方案是,使用固定大小的线程池,并给一个较大的线程数(不会内存溢出),同时为了在空闲时节省内存资源,调用allowCoreThreadTimeOut允许核心线程超时。
线程执行栈的大小可以通过-Xss*size*或-XX:ThreadStackSize参数调整
混合型的任务:如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
混合型任务要根据任务等待阻塞时间与CPU计算时间的比重来决定线程数量:
threads=cores1–blockingCoefficient=cores∗(1+waitTimecomputeTime)
threads=cores1–blockingCoefficient=cores∗(1+waitTimecomputeTime)
比如一个任务包含一次数据库读写(0.1ms),并在内存中对读取的数据进行分组过滤等操作(5μs),那么线程数应该为80左右。
线程数与阻塞比例的关系图大致如下:
当阻塞比例为0,也就是纯计算任务,线程数等于核心数(这里是4);阻塞比例越大,线程池的线程数应该更多。
《Java并发编程实战》中最原始的公式是这样的:
>Nthreads=Ncpu∗Ucpu∗(1+WC)>
>Nthreads=Ncpu∗Ucpu∗(1+WC)>
NcpuNcpu代表CPU的个数,UcpuUcpu代表CPU利用率的期望值(0<Ucpu<10<Ucpu<1),WCWC仍然是等待时间与计算时间的比例。
我上面提供的公式相当于目标CPU利用率为100%。
通常系统中不止一个线程池,所以实际配置线程数应该将目标CPU利用率计算进去。
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。
依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。
建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。有一次我们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞住,任务积压在线程池里。如果当时我们设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然我们的系统所有的任务是用的单独的服务器部署的,而我们使用不同规模的线程池跑不同类型的任务,但是出现这样问题时也会影响到其他任务。
总结:线程池的大小取决于任务的类型以及系统的特性,避免“过大”和“过小”两种极端。线程池过大,大量的线程将在相对更少的CPU和有限的内存资源上竞争,这不仅影响并发性能,还会因过高的内存消耗导致OOM;线程池过小,将导致处理器得不到充分利用,降低吞吐率。
要想正确的设置线程池大小,需要了解部署的系统中有多少个CPU,多大的内存,提交的任务是计算密集型、IO密集型还是两者兼有。
七、线程池创建注意
阿里巴巴编码规范里面提到:
线程池最好不要使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors各个方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
如果使用阿里巴巴的java编程规范插件,就会有这个提示:
ExecutorService newFixedThreadPool(int nThreads):固定大小线程池。
源码可以看到,corePoolSize和maximumPoolSize的大小是一样的(如果使用无界queue的话maximumPoolSize参数是没有意义的),keepAliveTime和unit的设值表名什么?-就是该实现不想keep alive!最后的BlockingQueue选择了LinkedBlockingQueue,该queue有一个特点,他是无界的。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
ExecutorService newSingleThreadExecutor():单线程。
可以看到,与fixedThreadPool很像,只不过fixedThreadPool中的入参直接退化为1
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
ExecutorService newCachedThreadPool():无界线程池,可以进行自动线程回收。
这个实现就有意思了。首先是无界的线程池,所以我们可以发现maximumPoolSize为big big。其次BlockingQueue的选择上使用SynchronousQueue。可能对于该BlockingQueue有些陌生,简单说:该QUEUE中,每个插入操作必须等待另一个
线程的对应移除操作。比如,我先添加一个元素,接下来如果继续想尝试添加则会阻塞,直到另一个线程取走一个元素,反之亦然。(想到什么?就是缓冲区为1的生产者消费者模式^_^)
注意到介绍中的自动回收线程的特性吗,为什么呢?先不说,但注意到该实现中corePoolSize和maximumPoolSize的大小不同。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
总结:
- ThreadPoolExecutor的使用还是很有技巧的。
- 使用无界queue可能会耗尽系统资源。
- 使用有界queue可能不能很好的满足性能,需要调节线程数和queue大小
- 线程数自然也有开销,所以需要根据不同应用进行调节。
通常来说对于静态任务可以归为:
- 数量大,但是执行时间很短
- 数量小,但是执行时间较长
- 数量又大执行时间又长
- 除了以上特点外,任务间还有些内在关系
总结2:
- 对于需要保证所有提交的任务都要被执行的情况,使用FixedThreadPool
- 如果限定只能使用一个线程进行任务处理,使用SingleThreadExecutor
- 如果希望提交的任务尽快分配线程执行,使用CachedThreadPool
- 如果业务上允许任务执行失败,或者任务执行过程可能出现执行时间过长进而影响其他业务的应用场景,可以通过使用限定线程数量的线程池以及限定长度的队列进行容错处理。
到此这篇关于java多线程和线程池的文章就介绍到这了,更多相关java多线程和线程池内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!