java 线程池封装及拒绝策略示例详解

目录
  • 前文
    • 线程池的封装实现
    • ThreadPoolExecutor
    • 线程池拒绝策略
    • 任务队列

前文

提到线程的使用以及线程间通信方式,通常情况下我们通过new Thread或者new Runnable创建线程,这种情况下,需要开发者手动管理线程的创建和回收,线程对象没有复用,大量的线程对象创建与销毁会引起频繁GC,那么事否有机制自动进行线程的创建,管理和回收呢?线程池可以实现该能力。

线程池的优点:

  • 线程池中线程重用,避免线程创建和销毁带来的性能开销
  • 能有效控制线程数量,避免大量线程抢占资源造成阻塞
  • 对线程进行简单管理,提供定时执行预计指定间隔执行等策略

线程池的封装实现

在java.util.concurrent包中提供了一系列的工具类以方便开发者创建和使用线程池,这些类的继承关系及说明如下:

类名 说明 备注
Executor Executor接口提供了一种任务提交后的执行机制,包括线程的创建与运行,线程调度等,通常不直接使用该类 /
ExecutorService ExecutorService接口,提供了创建,管理,终止Future执行的方法,用于跟踪一个或多个异步任务的进度,通常不直接使用该类 /
ScheduledExecutorService ExecutorService的实现接口,提供延时,周期性执行Future的能力,同时具备ExecutorService的基础能力,通常不直接使用该类 /
AbstractExecutorService AbstractExecutorService是个虚类,对ExecutorService中方法进行了默认实现,其提供了newTaskFor函数,用于获取RunnableFuture对象,该对象实现了submit,invokeAny和invokeAll方法,通常不直接使用该类 /
ThreadPoolExecutor 通过创建该类对象就可以构建一个线程池,通过调用execute方法可以向该线程池提交任务。通常情况下,开发者通过自定义参数,构造该类对象就来获得一个符合业务需求的线程池 /
ScheduledThreadPoolExecutor 通过创建该类对象就可以构建一个可以周期性执行任务的线程池,通过调用schedule,scheduleWithFixedDelay等方法可以向该线程池提交任务并在指定时间节点运行。通常情况下,开发者通过构造该类对象就来获得一个符合业务需求的可周期性执行任务的线程池 /

由上表可知,对于开发者而言,通常情况下我们可以通过构造ThreadPoolExecutor对象来获取一个线程池对象,通过其定义的execute方法来向该线程池提交任务并执行,那么怎么创建线程池呢?让我们一起看下

ThreadPoolExecutor

ThreadPoolExecutor完整参数的构造函数如下所示:

     /**
      * Creates a new {@code ThreadPoolExecutor} with the given initial
      * parameters.
      *
      * @param corePoolSize the number of threads to keep in the pool, even
      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
      * @param maximumPoolSize the maximum number of threads to allow in the
      *        pool
      * @param keepAliveTime when the number of threads is greater than
      *        the core, this is the maximum time that excess idle threads
      *        will wait for new tasks before terminating.
      * @param unit the time unit for the {@code keepAliveTime} argument
      * @param workQueue the queue to use for holding tasks before they are
      *        executed.  This queue will hold only the {@code Runnable}
      *        tasks submitted by the {@code execute} method.
      * @param threadFactory the factory to use when the executor
      *        creates a new thread
      * @param handler the handler to use when execution is blocked
      *        because the thread bounds and queue capacities are reached
      * @throws IllegalArgumentException if one of the following holds:<br>
      *         {@code corePoolSize < 0}<br>
      *         {@code keepAliveTime < 0}<br>
      *         {@code maximumPoolSize <= 0}<br>
      *         {@code maximumPoolSize < corePoolSize}
      * @throws NullPointerException if {@code workQueue}
      *         or {@code threadFactory} or {@code handler} is null
      */
     public ThreadPoolExecutor(int corePoolSize,
                               int maximumPoolSize,
                               long keepAliveTime,
                               TimeUnit unit,
                               BlockingQueue<Runnable> workQueue,
                               ThreadFactory threadFactory,
                               RejectedExecutionHandler handler) {
         if (corePoolSize < 0 ||
             maximumPoolSize <= 0 ||
             maximumPoolSize < corePoolSize ||
             keepAliveTime < 0)
             throw new IllegalArgumentException();
         if (workQueue == null || threadFactory == null || handler == null)
             throw new NullPointerException();
         this.acc = System.getSecurityManager() == null ?
                 null :
                 AccessController.getContext();
         this.corePoolSize = corePoolSize;
         this.maximumPoolSize = maximumPoolSize;
         this.workQueue = workQueue;
         this.keepAliveTime = unit.toNanos(keepAliveTime);
         this.threadFactory = threadFactory;
         this.handler = handler;
     }

从上述代码可以看出,在构建ThreadPoolExecutor时,主要涉及以下参数:

  • corePoolSize:核心线程个数,一般情况下可以使用 处理器个数/2 作为核心线程数的取值,可以通过Runtime.getRuntime().availableProcessors()来获取处理器个数
  • maximumPoolSize:最大线程个数,该线程池支持同时存在的最大线程数量
  • keepAliveTime:非核心线程闲置时的超时时长,超过这个时长,非核心线程就会被回收,我们也可以通过allowCoreThreadTimeOut(true)来设置核心线程闲置时,在超时时间到达后回收
  • unit:keepAliveTime的时间单位
  • workQueue:线程池中的任务队列,当核心线程数满或最大线程数满时,通过线程池的execute方法提交的Runnable对象存储在这个参数中,遵循先进先出原则
  • threadFactory:创建线程的工厂 ,用于批量创建线程,统一在创建线程时进行一些初始化设置,如是否守护线程、线程的优先级等。不指定时,默认使用Executors.defaultThreadFactory() 来创建线程,线程具有相同的NORM_PRIORITY优先级并且是非守护线程
  • handler:任务拒绝处理策略,当线程数量等于最大线程数且等待队列已满时,就会采用拒绝处理策略处理新提交的任务,不指定时,默认的处理策略是AbortPolicy,即抛弃该任务

综上,我们可以看出创建一个线程池最少需要明确核心线程数,最大线程数,超时时间及单位,等待队列这五个参数,下面我们创建一个核心线程数为1,最大线程数为3,5s超时回收,等待队列最多能存放5个任务的线程池,代码如下:

 ThreadPoolExecutor executor = new ThreadPoolExecutor(1,3,5,TimeUnit.SECONDS,new LinkedBlockingQueue<>(5));

随后我们使用for循环向该executor中提交任务,代码如下:

 public static void main(String[] args) {
     // 创建线程池
     ThreadPoolExecutor executor = new ThreadPoolExecutor(1,3,5,TimeUnit.SECONDS,new LinkedBlockingQueue<>(5));
     for (int i=0;i<10;i++) {
         int finalI = i;
         System.out.println("put runnable "+ finalI +"to executor");
         // 向线程池提交任务
         executor.execute(new Runnable() {
             @Override
             public void run() {
                 System.out.println(Thread.currentThread().getName()+",runnable "+ finalI +"start");
                 try {
                     Thread.sleep(5000);
                 } catch (InterruptedException e) {
                     throw new RuntimeException(e);
                 }
                 System.out.println(Thread.currentThread().getName()+",runnable "+ finalI +"executed");
             }
         });
     }
 }

输出如下:

从输出可以看到,当提交一个任务到线程池时,其执行流程如下:

线程池拒绝策略

线程池拒绝策略有四类,定义在ThreadPoolExecutor中,分别是:

  • AbortPolicy:默认拒绝策略,丢弃提交的任务并抛出RejectedExecutionException,在该异常输出信息中,可以看到当前线程池状态
  • DiscardPolicy:丢弃新来的任务,但是不抛出异常
  • DiscardOldestPolicy:丢弃队列头部的旧任务,然后尝试重新执行,如果再次失败,重复该过程
  • CallerRunsPolicy:由调用线程处理该任务

当然,如果上述拒绝策略不能满足需求,我们也可以自定义异常,实现RejectedExecutionHandler接口,即可创建自己的线程池拒绝策略,下面是使用自定义拒绝策略的示例代码:

 public static void main(String[] args) {
     RejectedExecutionHandler handler = new RejectedExecutionHandler() {
         @Override
         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
             System.out.println("runnable " + r +" in executor "+executor+" is refused");
         }
     };
     ThreadPoolExecutor executor = new ThreadPoolExecutor(1,3,5,TimeUnit.SECONDS,new LinkedBlockingQueue<>(5),handler);
     for (int i=0;i<10;i++) {
         int finalI = i;
         Runnable runnable = new Runnable() {
             @Override
             public void run() {
                 System.out.println(Thread.currentThread().getName()+",runnable "+ finalI +"start");
                 try {
                     Thread.sleep(5000);
                 } catch (InterruptedException e) {
                     throw new RuntimeException(e);
                 }
                 System.out.println(Thread.currentThread().getName()+",runnable "+ finalI +"executed");
             }
         };
         System.out.println("put runnable "+ runnable+"  index:"+finalI +" to executor:"+executor);
         executor.execute(runnable);
     }
 }

输出如下:

任务队列

对于线程池而言,任务队列需要是BlockingQueue的实现类,BlockingQueue接口的实现类类图如下:

下面我们针对常用队列做简单了解:

ArrayBlockingQueue:ArrayBlockingQueue是基于数组的阻塞队列,在其内部维护一个定长数组,所以使用ArrayBlockingQueue时必须指定任务队列长度,因为不论对数据的写入或者读取都使用的是同一个锁对象,所以没有实现读写分离,同时在创建时我们可以指定锁内部是否采用公平锁,默认实现是非公平锁。

非公平锁与公平锁

公平锁:多个任务阻塞在同一锁时,等待时长长的优先获取锁

非公平锁:多个任务阻塞在同一锁时,锁可获取时,一起抢锁,谁先抢到谁先执行

LinkedBlockingQueue:LinkedBlockingQueue是基于链表的阻塞队列,在创建时可不指定任务队列长度,默认值是Integer.MAX_VALUE,在LinkedBlockingQueue中读锁和写锁实现了分支,相对ArrayBlockingQueue而言,效率提升明显。

SynchronousQueue:SynchronousQueue是一个不存储元素的阻塞队列,也就是说当需要插入元素时,必须等待上一个元素被移出,否则不能插入,其适用于任务多但是执行比较快的场景。

PriorityBlockingQueue:PriorityBlockingQueue是一个支持指定优先即的阻塞队列,默认初始化长度为11,最大长度为Integer.MAX_VALUE - 8,可以通过让装入队列的对象实现Comparable接口,定义对象排序规则来指定队列中元素优先级,优先级高的元素会被优先取出。

DelayQueue:DelayQueue是一个带有延迟时间的阻塞队列,队列中的元素,只有等待延时时间到了才可以被取出,由于其内部用PriorityBlockingQueue维护数据,故其长度与PriorityBlockingQueue一致。一般用于定时调度类任务。

下表从一些角度对上述队列进行了比较:

队列名称 底层数据结构 默认长度 最大长度 是否读写分离 适用场景
ArrayBlockingQueue 数组 0 开发者指定大小 任务数量较少时使用
LinkedBlockingQueue 链表 Integer.MAX_VALUE Integer.MAX_VALUE 大量任务时使用
SynchronousQueue 公平锁-队列/非公平锁-栈 0 / 任务多但是执行速度快的场景
PriorityBlockingQueue 对象数组 11 Integer.MAX_VALUE-8 有任务需要优先处理的场景
DelayQueue 对象数组 11 Integer.MAX_VALUE-8 定时调度类场景

以上就是java 线程池封装及拒绝策略示例详解的详细内容,更多关于java 线程池封装拒绝策略的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java线程池队列LinkedTransferQueue示例详解

    目录 正文 LinkedTransferQueue 正文 public enum QueueTypeEnum { ARRAY_BLOCKING_QUEUE(1, "ArrayBlockingQueue"), LINKED_BLOCKING_QUEUE(2, "LinkedBlockingQueue"), DELAY_QUEUE(3, "DelayQueue"), PRIORITY_BLOCKING_QUEUE(4, "PriorityB

  • java高并发ThreadPoolExecutor类解析线程池执行流程

    目录 摘要 核心逻辑概述 execute(Runnable)方法 addWorker(Runnable, boolean)方法 addWorkerFailed(Worker)方法 拒绝策略 摘要 ThreadPoolExecutor是Java线程池中最核心的类之一,它能够保证线程池按照正常的业务逻辑执行任务,并通过原子方式更新线程池每个阶段的状态. 今天,我们通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程,小伙伴们最好是打开IDEA,按照冰河说的步骤,调试下Th

  • java线程池中Worker线程执行流程原理解析

    目录 引言 Worker类分析 runWorker(Worker)方法 getTask()方法 beforeExecute(Thread, Runnable)方法 afterExecute(Runnable, Throwable)方法 processWorkerExit(Worker, boolean)方法 tryTerminate()方法 terminated()方法 引言 在<[高并发]别闹了,这样理解线程池执行任务的核心流程才正确!!>一文中我们深度分析了线程池执行任务的核心流程,在Th

  • Java线程池队列PriorityBlockingQueue和SynchronousQueue详解

    目录 正文 PriorityBlockingQueue阻塞优先队列 SynchronousQueue 正文 public enum QueueTypeEnum { ARRAY_BLOCKING_QUEUE(1, "ArrayBlockingQueue"), LINKED_BLOCKING_QUEUE(2, "LinkedBlockingQueue"), DELAY_QUEUE(3, "DelayQueue"), PRIORITY_BLOCKING

  • Java线程池队列LinkedBlockingDeque

    目录 正文 LinkedBlockingDeque LinkedBlockingDeque和LinkedBlockingQueue的区别 正文 public enum QueueTypeEnum { ARRAY_BLOCKING_QUEUE(1, "ArrayBlockingQueue"), LINKED_BLOCKING_QUEUE(2, "LinkedBlockingQueue"), DELAY_QUEUE(3, "DelayQueue"),

  • java线程池ThreadPoolExecutor的八种拒绝策略示例详解

    目录 池化设计思想 线程池触发拒绝策略的时机 JDK内置4种线程池拒绝策略 拒绝策略接口定义 AbortPolicy(中止策略) DiscardPolicy(丢弃策略) DiscardOldestPolicy(弃老策略) 第三方实现的拒绝策略 Dubbo 中的线程拒绝策略 Netty 中的线程池拒绝策略 ActiveMQ 中的线程池拒绝策略 PinPoint 中的线程池拒绝策略 谈到 Java 的线程池最熟悉的莫过于 ExecutorService 接口了,jdk1.5 新增的 java.uti

  • java ThreadPoolExecutor线程池拒绝策略避坑

    目录 1.场景 2. 原因分析 3.总结 4.思考 1.场景 线程池使用DiscardOldestPolicy拒绝策略,阻塞队列使用ArrayBlockingQueue,发现在某些情形下对于得到的Future,调用get()方法当前线程会一直阻塞. 为了便于理解,将实际情景抽象为下面的代码: ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 1, 1, 1, TimeUnit.SECONDS, new ArrayBlo

  • Java线程中的关键字和方法示例详解

    目录 一.volatile关键字 1,volatile能保证内存可见性 2,编译器优化问题 二.wait和notify 1,wait()方法 2,notify()方法 3,notifyAll()方法 一.volatile关键字 1,volatile 能保证内存可见性 代码在写入 volatile 修饰的变量的时候 改变线程工作内存中volatile变量副本的值 将改变后的副本的值从工作内存刷新到主内存 代码在读取 volatile 修饰的变量的时候 从主内存中读取volatile变量的最新值到线

  • Java线程池的拒绝策略实现详解

    一.简介 jdk1.5 版本新增了JUC并发编程包,大大的简化了传统的多线程开发. Java线程池,是典型的池化思想的产物,类似的还有数据库的连接池.redis的连接池等.池化思想,就是在初始的时候去申请资源,创建一批可使用的连接,这样在使用的时候,就不必再进行创建连接信息的开销了.举个生活中鲜明的例子,在去著名洋快餐某基或者某劳的时候,配餐人员是字节从一个中间的保温箱里面直接取,然后打包就好了.不用再临时的来了一个单子,又要去拿原材料,又要去进行加工.效率明显的就是提高了很多. 既然是池子,那

  • java线程池不同场景下使用示例经验总结

    目录 引导语 1.coreSize == maxSize 2.maxSize 无界 + SynchronousQueue 3.maxSize 有界 + Queue 无界 4.maxSize 有界 + Queue 有界 5.keepAliveTime 设置无穷大 6.线程池的公用和独立 7.如何算线程大小和队列大小 8.总结 引导语 ThreadPoolExecutor 初始化时,主要有如下几个参数: public ThreadPoolExecutor(int corePoolSize, int

  • epoll封装reactor原理剖析示例详解

    目录 reactor是什么? reactor模型三个重要组件与流程分析 组件 流程 将epoll封装成reactor事件驱动 封装每一个连接sockfd变成ntyevent 封装epfd和ntyevent变成ntyreactor 封装读.写.接收连接等事件对应的操作变成callback 给每个客户端的ntyevent设置属性 将ntyevent加入到epoll中由内核监听 将ntyevent从epoll中去除 读事件回调函数 写事件回调函数 接受新连接事件回调函数 reactor运行 react

  • java线程中start和run的区别详解

    这篇文章主要介绍了java线程中start和run的区别详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 public class Test1 extends Thread { @Override public void run() { while (true) { System.out.println(Thread.currentThread().getName()); } } public static void main(String[

  • java开发ShardingSphere的路由引擎类型示例详解

    目录 ShardingSphere的路由引擎类型 路由引擎类型 标准路由 路由逻辑 总结 ShardingSphere的路由引擎类型 本篇文章源码基于4.0.1版本 上篇文章我们了解到了ShardingSphere在路由流程过程中,根据不同类型的SQL会现在不同的路由引擎,而ShardingSphere支持的路由规则也很多了,包括广播(broadcast)路由.混合(complex)路由.默认数据库(defaultdb)路由.无效(ignore)路由.标准(standard)路由以及单播(uni

  • Java结构型设计模式中代理模式示例详解

    目录 代理模式 分类 主要角色 作用 静态代理与动态代理的区别 静态代理的基本使用 创建抽象主题 创建真实主题 创建代理主题 客户端调用 JDK动态代理的基本使用 创建抽象主题 创建真实主题 创建代理主题 客户端调用 小优化 CGLIB动态代理的基本使用 创建抽象主题 创建真实主题 创建代理主题 客户端调用 小优化 CGLIB与JDK动态代理区别 1.执行条件 2.实现机制 3.性能 代理模式 代理模式(Proxy Pattern)属于结构型模式. 它是指为其他对象提供一种代理以控制对这个对象的

  • Java结构型设计模式中建造者模式示例详解

    目录 建造者模式 概述 角色 优缺点 应用场景 基本使用 创建产品类 创建建造者类 使用 链式写法 创建产品类与建造者类 使用 建造者模式 概述 建造者模式(Builder Pattern)属于创建型模式. 它是将一个复杂的构建与其表示相分离,使得同样的构建过程可以创建不同的表示. 简而言之:建造者模式就是使用多个简单的对象一步一步构建成一个复杂的对象. 建造者模式适用于创建对象需要很多步骤,但是步骤的顺序不一定固定.如果一个对象有非常复杂的内部结构(很多属性),可以将复杂对象的创建和使用进行分

  • java 线程公平锁与非公平锁详解及实例代码

    java 线程公平锁与非公平锁详解 在ReentrantLock中很明显可以看到其中同步包括两种,分别是公平的FairSync和非公平的NonfairSync.公平锁的作用就是严格按照线程启动的顺序来执行的,不允许其他线程插队执行的:而非公平锁是允许插队的. 默认情况下ReentrantLock是通过非公平锁来进行同步的,包括synchronized关键字都是如此,因为这样性能会更好.因为从线程进入了RUNNABLE状态,可以执行开始,到实际线程执行是要比较久的时间的.而且,在一个锁释放之后,其

随机推荐