一篇文章带你搞懂Java线程池实现原理

目录
  • 1. 为什么要使用线程池
  • 2. 线程池的使用
  • 3. 线程池核心参数
  • 4. 线程池工作原理
  • 5. 线程池源码剖析
    • 5.1 线程池的属性
    • 5.2 线程池状态
    • 5.3 execute源码
    • 5.4 worker源码
    • 5.5 runWorker源码

1. 为什么要使用线程池

使用线程池通常由以下两个原因:

  • 频繁创建销毁线程需要消耗系统资源,使用线程池可以复用线程。
  • 使用线程池可以更容易管理线程,线程池可以动态管理线程个数、具有阻塞队列、定时周期执行任务、环境隔离等。

2. 线程池的使用

/**
 * @author 一灯架构
 * @apiNote 线程池示例
 **/
public class ThreadPoolDemo {

    public static void main(String[] args) {
        // 1. 创建线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                3,
                3,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

        // 2. 往线程池中提交3个任务
        for (int i = 0; i < 3; i++) {
            threadPoolExecutor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " 关注公众号:一灯架构");
            });
        }

        // 3. 关闭线程池
        threadPoolExecutor.shutdown();
    }
}

线程池的使用非常简单:

  • 调用new ThreadPoolExecutor()构造方法,指定核心参数,创建线程池。
  • 调用execute()方法提交Runnable任务
  • 使用结束后,调用shutdown()方法,关闭线程池。

再看一下线程池构造方法中核心参数的作用。

3. 线程池核心参数

线程池共有七大核心参数:

参数名称 参数含义
int corePoolSize 核心线程数
int maximumPoolSize 最大线程数
long keepAliveTime 线程存活时间
TimeUnit unit 时间单位
BlockingQueue workQueue 阻塞队列
ThreadFactory threadFactory 线程创建工厂
RejectedExecutionHandler handler 拒绝策略

1.corePoolSize 核心线程数

当往线程池中提交任务,会创建线程去处理任务,直到线程数达到corePoolSize,才会往阻塞队列中添加任务。默认情况下,空闲的核心线程并不会被回收,除非配置了allowCoreThreadTimeOut=true。

2.maximumPoolSize 最大线程数

当线程池中的线程数达到corePoolSize,阻塞队列又满了之后,才会继续创建线程,直到达到maximumPoolSize,另外空闲的非核心线程会被回收。

3.keepAliveTime 线程存活时间

非核心线程的空闲时间达到了keepAliveTime,将会被回收。

4.TimeUnit 时间单位

线程存活时间的单位,默认是TimeUnit.MILLISECONDS(毫秒),可选择的有:

  • TimeUnit.NANOSECONDS(纳秒)
  • TimeUnit.MICROSECONDS(微秒)
  • TimeUnit.MILLISECONDS(毫秒)
  • TimeUnit.SECONDS(秒)
  • TimeUnit.MINUTES(分钟)
  • TimeUnit.HOURS(小时)
  • TimeUnit.DAYS(天)

5.workQueue 阻塞队列

当线程池中的线程数达到corePoolSize,再提交的任务就会放到阻塞队列的等待,默认使用的是LinkedBlockingQueue,可选择的有:

  • LinkedBlockingQueue(基于链表实现的阻塞队列)
  • ArrayBlockingQueue(基于数组实现的阻塞队列)
  • SynchronousQueue(只有一个元素的阻塞队列)
  • PriorityBlockingQueue(实现了优先级的阻塞队列)
  • DelayQueue(实现了延迟功能的阻塞队列)

6.threadFactory 线程创建工厂

用来创建线程的工厂,默认的是Executors.defaultThreadFactory(),可选择的还有Executors.privilegedThreadFactory()实现了线程优先级。当然也可以自定义线程创建工厂,创建线程的时候最好指定线程名称,便于排查问题。

7.RejectedExecutionHandler 拒绝策略

当线程池中的线程数达到maximumPoolSize,阻塞队列也满了之后,再往线程池中提交任务,就会触发执行拒绝策略,默认的是AbortPolicy(直接终止,抛出异常),可选择的有:

  • AbortPolicy(直接终止,抛出异常)
  • DiscardPolicy(默默丢弃,不抛出异常)
  • DiscardOldestPolicy(丢弃队列中最旧的任务,执行当前任务)
  • CallerRunsPolicy(返回给调用者执行)

4. 线程池工作原理

线程池的工作原理,简单理解如下:

  • 当往线程池中提交任务的时候,会先判断线程池中线程数是否核心线程数,如果小于,会创建核心线程并执行任务。
  • 如果线程数大于核心线程数,会判断阻塞队列是否已满,如果没有满,会把任务添加到阻塞队列中等待调度执行。
  • 如果阻塞队列已满,会判断线程数是否小于最大线程数,如果小于,会继续创建最大线程数并执行任务。
  • 如果线程数大于最大线程数,会执行拒绝策略,然后结束。

5. 线程池源码剖析

5.1 线程池的属性

public class ThreadPoolExecutor extends AbstractExecutorService {

    // 线程池的控制状态,Integer长度是32位,前3位用来存储线程池状态,后29位用来存储线程数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 线程个数所占的位数
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 线程池的最大容量,2^29-1,约5亿个线程
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;

    // 独占锁,用来控制多线程下的并发操作
    private final ReentrantLock mainLock = new ReentrantLock();
    // 工作线程的集合
    private final HashSet<Worker> workers = new HashSet<>();
    // 等待条件,用来响应中断
    private final Condition termination = mainLock.newCondition();
    // 是否允许回收核心线程
    private volatile boolean allowCoreThreadTimeOut;
    // 线程数的历史峰值
    private int largestPoolSize;

    /**
     * 以下是线程池的七大核心参数
     */
    private volatile int corePoolSize;
    private volatile int maximumPoolSize;
    private volatile long keepAliveTime;
    private final BlockingQueue<Runnable> workQueue;
    private volatile ThreadFactory threadFactory;
    private volatile RejectedExecutionHandler handler;

}

线程池的控制状态ctl用来存储线程池状态和线程个数,前3位用来存储线程池状态,后29位用来存储线程数量。

设计者多聪明,用一个变量存储了两块内容。

5.2 线程池状态

线程池共有5种状态:

状态名称 状态含义 状态作用
RUNNING 运行中 线程池创建后默认状态,接收新任务,并处理阻塞队列中的任务。
SHUTDOWN 已关闭 调用shutdown方法后处于该状态,不再接收新任务,处理阻塞队列中任务。
STOP 已停止 调用shutdownNow方法后处于该状态,不再新任务,并中断所有线程,丢弃阻塞队列中所有任务。
TIDYING 处理中 所有任务已完成,所有工作线程都已回收,等待调用terminated方法。
TERMINATED 已终止 调用terminated方法后处于该状态,线程池的最终状态。

5.3 execute源码

看一下往线程池中提交任务的源码,这是线程池的核心逻辑:

// 往线程池中提交任务
public void execute(Runnable command) {
    // 1. 判断提交的任务是否为null
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    // 2. 判断线程数是否小于核心线程数
    if (workerCountOf(c) < corePoolSize) {
        // 3. 把任务包装成worker,添加到worker集合中
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 4. 判断如果线程数不小于corePoolSize,并且可以添加到阻塞队列
    if (isRunning(c) && workQueue.offer(command)) {
        // 5. 重新检查线程池状态,如果线程池不是运行状态,就移除刚才添加的任务,并执行拒绝策略
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 6. 判断如果线程数是0,就创建非核心线程(任务是null,会从阻塞队列中拉取任务)
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 7. 如果添加阻塞队列失败,就创建一个Worker
    else if (!addWorker(command, false))
        // 8. 如果创建Worker失败说明已经达到最大线程数了,则执行拒绝策略
        reject(command);
}

execute方法的逻辑也很简单,最终就是调用addWorker方法,把任务添加到worker集合中,再看一下addWorker方法的源码:

// 添加worker
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 1. 检查是否允许提交任务
        if (rs >= SHUTDOWN &&
                !(rs == SHUTDOWN &&
                        firstTask == null &&
                        !workQueue.isEmpty()))
            return false;
        // 2. 使用死循环保证添加线程成功
        for (; ; ) {
            int wc = workerCountOf(c);
            // 3. 校验线程数是否超过容量限制
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 4. 使用CAS修改线程数
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            // 5. 如果线程池状态变了,则从头再来
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 6. 把任务和新线程包装成一个worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 7. 加锁,控制并发
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 8. 再次校验线程池状态是否异常
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    // 9. 如果线程已经启动,就抛出异常
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    // 10. 添加到worker集合中
                    workers.add(w);
                    int s = workers.size();
                    // 11. 记录线程数历史峰值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 12. 启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

方法虽然很长,但是逻辑很清晰。就是把任务和线程包装成worker,添加到worker集合,并启动线程。

5.4 worker源码

再看一下worker类的结构:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    // 工作线程
    final Thread thread;
    // 任务
    Runnable firstTask;

    // 创建worker,并创建一个新线程(用来执行任务)
    Worker(Runnable firstTask) {
        setState(-1);
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
}

5.5 runWorker源码

再看一下run方法的源码:

// 线程执行入口
public void run() {
    runWorker(this);
}

// 线程运行核心方法
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        // 1. 如果当前worker中任务是null,就从阻塞队列中获取任务
        while (task != null || (task = getTask()) != null) {
            // 加锁,保证thread不被其他线程中断(除非线程池被中断)
            w.lock();
            // 2. 校验线程池状态,是否需要中断当前线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 3. 执行run方法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                // 解锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 4. 从worker集合删除当前worker
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker方法逻辑也很简单,就是不断从阻塞队列中拉取任务并执行。

再看一下从阻塞队列中拉取任务的逻辑:

// 从阻塞队列中拉取任务
private Runnable getTask() {
    boolean timedOut = false;
    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 1. 如果线程池已经停了,或者阻塞队列是空,就回收当前线程
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // 2. 再次判断是否需要回收线程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            // 3. 从阻塞队列中拉取任务
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

以上就是一篇文章带你搞懂Java线程池实现原理的详细内容,更多关于Java线程池的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java线程池使用与原理详解

    线程池是什么? 我们可以利用java很容易创建一个新线程,同时操作系统创建一个线程也是一笔不小的开销.所以基于线程的复用,就提出了线程池的概念,我们使用线程池创建出若干个线程,执行完一个任务后,该线程会存在一段时间(用户可以设定空闲线程的存活时间,后面会介绍),等到新任务来的时候就直接复用这个空闲线程,这样就省去了创建.销毁线程损耗.当然空闲线程也会是一种资源的浪费(所有才有空闲线程存活时间的限制),但总比频繁的创建销毁线程好太多. 下面是我的测试代码 /* * @TODO 线程池测试 */ @

  • java自定义线程池的原理简介

    线程池的相关概念就不在这里说明了,百度一下有很多,这里简单表述一下如何实现一个自定义的线程池就行线程管理,我们如果要实现一个线程池对线程的管理,那么需要实现一下几点的思路: 1.如何管理线程 2.如何定义工作线程以及工作线程如何持续的保持运行状态 3.如何定义线程池大小及队列大小 4.如何提供接口给调用者使用 5.如何关闭线程池中的线程 接下来我们就一一的实现这几个问题. 1.我们需要定义一个队列来来管理线程,这里使用了LinkedBlockingQueue // 1.定义一个存储线程队列 pr

  • Java常用线程池原理及使用方法解析

    一.简介 什么是线程池? 池的概念大家也许都有所听闻,池就是相当于一个容器,里面有许许多多的东西你可以即拿即用.java中有线程池.连接池等等.线程池就是在系统启动或者实例化池时创建一些空闲的线程,等待工作调度,执行完任务后,线程并不会立即被销毁,而是重新处于空闲状态,等待下一次调度. 线程池的工作机制? 在线程池的编程模式中,任务提交并不是直接提交给线程,而是提交给池.线程池在拿到任务之后,就会寻找有没有空闲的线程,有则分配给空闲线程执行,暂时没有则会进入等待队列,继续等待空闲线程.如果超出最

  • Java线程池实现原理总结

    目录 一.线程池参数 二.线程池执行流程 三.四种现成的线程池 要理解实现原理,必须把线程池的几个参数彻底搞懂,不要死记硬背 一.线程池参数 1.corePoolSize(必填):核心线程数. 2.maximumPoolSize(必填):最大线程数. 3.keepAliveTime(必填):线程空闲时长.如果超过该时长,非核心线程就会被回收. 4.unit(必填):指定keepAliveTime的时间单位.常用的有:TimeUnit.MILLISECONDS(毫秒).TimeUnit.SECON

  • 详解Java线程池的使用及工作原理

    一.什么是线程池? 线程池是一种用于实现计算机程序并发执行的软件设计模式.线程池维护多个线程,等待由调度程序分配任务以并发执行,该模型提高了性能,并避免了由于为短期任务频繁创建和销毁线程而导致的执行延迟. 二.线程池要解决什么问题? 说到线程池就一定要从线程的生命周期讲起. 从图中可以了解无论任务执行多久,每个线程都要经历从生到死的状态.而使用线程池就是为了避免线程的重复创建,从而节省了线程的New至Runnable, Running至Terminated的时间:同时也会复用线程,最小化的节省系

  • 了解Java线程池执行原理

    前言 上一篇已经对线程池的创建进行了分析,了解线程池既有预设的模板,也提供多种参数支撑灵活的定制. 本文将会围绕线程池的生命周期,分析线程池执行任务的过程. 线程池状态 首先认识两个贯穿线程池代码的参数: runState:线程池运行状态 workerCount:工作线程的数量 线程池用一个32位的int来同时保存runState和workerCount,其中高3位是runState,其余29位是workerCount.代码中会反复使用runStateOf和workerCountOf来获取run

  • 一篇文章带你搞懂Java线程池实现原理

    目录 1. 为什么要使用线程池 2. 线程池的使用 3. 线程池核心参数 4. 线程池工作原理 5. 线程池源码剖析 5.1 线程池的属性 5.2 线程池状态 5.3 execute源码 5.4 worker源码 5.5 runWorker源码 1. 为什么要使用线程池 使用线程池通常由以下两个原因: 频繁创建销毁线程需要消耗系统资源,使用线程池可以复用线程. 使用线程池可以更容易管理线程,线程池可以动态管理线程个数.具有阻塞队列.定时周期执行任务.环境隔离等. 2. 线程池的使用 /** *

  • 一篇文章带你深入了解Java线程池

    目录 线程池模型 常用线程池 ThreadPoolExecutor 构造函数参数说明 线程池默认工作行为 ForkJoinPool FutureTask 线程数量分析 CPU密集型 IO密集型 总结 线程池模型 一般的池化模型会有两个方法,用于获取资源和释放资源,就像这样: public interface XXPool{ XX acquire(); void release(); } 但是,工程中的线程池一般是生产者和消费者模型,线程池是消费者,任务的提交者是生产者,下面是一个简化的线程池模型

  • 一篇文章带你搞懂Java restful 接口开发

    目录 1.RESTful 简介 a>资源 b>资源的表述 c>状态转移 2.RESTful 的实现 3.HiddenHttpMethodFilter 4.RESTful 案例 4.1.需求 4.2.git 代码位置 4.3.UserController 4.4.添加 HiddenHttpMethodFilter 4.5.测试效果 总结 1.RESTful 简介 REST:Representational State Transfer,表现层资源状态转移. a>资源 资源是一种看待服

  • 一篇文章带你搞懂Python类的相关知识

    一.什么是类 类(class),作为代码的父亲,可以说它包裹了很多有趣的函数和方法以及变量,下面我们试着简单创建一个吧. 这样就算创建了我们的第一个类了.大家可以看到这里面有一个self,其实它指的就是类aa的实例.每个类中的函数只要你不是类函数或者静态函数你都得加上这个self,当然你也可以用其他的代替这个self,只不过这是python中的写法,就好比Java 中的this. 二.类的方法 1.静态方法,类方法,普通方法 类一般常用有三种方法,即为static method(静态方法),cl

  • 一篇文章带你搞懂Vue虚拟Dom与diff算法

    前言 使用过Vue和React的小伙伴肯定对虚拟Dom和diff算法很熟悉,它扮演着很重要的角色.由于小编接触Vue比较多,React只是浅学,所以本篇主要针对Vue来展开介绍,带你一步一步搞懂它. 虚拟DOM 什么是虚拟DOM? 虚拟DOM(Virtual   Dom),也就是我们常说的虚拟节点,是用JS对象来模拟真实DOM中的节点,该对象包含了真实DOM的结构及其属性,用于对比虚拟DOM和真实DOM的差异,从而进行局部渲染来达到优化性能的目的. 真实的元素节点: <div id="wr

  • 搞懂Java线程池

    身为程序员我们对线程是再熟悉不过了,多线程并发算是Java进阶的知识,用好多线程不容易有太多的坑.创建线程也算是一个"重"操作.创建线程的语句是new Thread()咋一看好像就是new了一个对象. 没错是new了个对象,但是不仅仅是普通对象那样在堆中分配了一块内存,它还需要调用操作系统内核API,然后操作系统再为线程分配一些资源.所以较普通对象,线程就比较"重了".所以我们要避免频繁的创建和销毁线程,还得控制一下线程的数量.线程池就是用来完成这一项使命的. 所以

  • 一篇文章带你搞懂JavaScript的变量与数据类型

    目录 前言: 温馨提示: 变量 1.声明 2.赋值 3.二个语法小细节 变量的命名规范 为什么需要数据类型? 简单数据类型(基本数据类型) 数字型 字符串型 String 什么是数据类型的转换 1.转换为字符串 2.转换为数字型(重点) 转化为布尔型 总结 前言: 我不是搞前端,而是搞后端的.本命编程语言是java.学习js的嘛,因为看到室友能做出动态网页,而我只能做出静态网页,再加上下个学期要学所以提前来学习学习. 温馨提示: java和javsScript没有半毛钱关系,只是javaScri

  • 一篇文章带你搞定JAVA反射

    目录 1.反射的概念 1.概念 2.获取字节码文件对象的方式 2.1 元数据的概念 2.2 获取class对象的方式 1.访问权限 2.获取方法 2.1 访问静态方法 2.2 访问类方法 3.获取字段,读取字段的值 4.获取实现的接口 5.获取构造函数,创建实例 6.获取继承的父类 7.获取注解 4.反射实例 5.总结 1.反射的概念 1.概念 反射,指在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法,对任意一个对象,都能调用它的任意一个方法.这种动态获取信息,以及动态调用对象方法

  • 一篇文章带你搞定JAVA泛型

    目录 1.泛型的概念 2.泛型的使用 3.泛型原理,泛型擦除 3.1 IDEA 查看字节码 3.2 泛型擦除原理 4.?和 T 的区别 5.super extends 6.注意点 1.静态方法无法访问类的泛型 2.创建之后无法修改类型 3.类型判断问题 4.创建类型实例 7.总结 1.泛型的概念 泛型的作用就是把类型参数化,也就是我们常说的类型参数 平时我们接触的普通方法的参数,比如public void fun(String s):参数的类型是String,是固定的 现在泛型的作用就是再将St

  • 一篇文章带你搞定JAVA注解

    目录 1.注解是什么 2.jdk支持的注解有哪些 2.1 三种常用的注解: 2.2 元注解 3.注解实例 1.自定义注解 2.在对应的方法上增加注解 3.在项目启动的时候检查注解的枚举 4.总结 1.注解是什么 Java 注解用于为 Java 代码提供元数据,看完这句话也许你还是一脸懵逼,用人话说就是注解不直接影响你的代码执行,仅提供信息.接下我将从注解的定义.元注解.注解属性.自定义注解.注解解析JDK 提供的注解这几个方面再次了解注解(Annotation) 2.jdk支持的注解有哪些 2.

随机推荐