Java ThreadPoolExecutor 线程池的使用介绍

Executors

Executors 是一个Java中的工具类. 提供工厂方法来创建不同类型的线程池.

从上图中也可以看出, Executors的创建线程池的方法, 创建出来的线程池都实现了 ExecutorService接口. 常用方法有以下几个:

  • newFixedThreadPool(int Threads): 创建固定数目线程的线程池, 超出的线程会在队列中等待.
  • newCachedThreadPool(): 创建一个可缓存线程池, 如果线程池长度超过处理需要, 可灵活回收空闲线程(60秒), 若无可回收,则新建线程.
  • newSingleThreadExecutor(): 创建一个单线程化的线程池, 它只会用唯一的工作线程来执行任务, 保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行. 如果某一个任务执行出错, 将有另一个线程来继续执行.
  • newScheduledThreadPool(int corePoolSize): 创建一个支持定时及周期性的任务执行的线程池, 多数情况下可用来替代Timer类.

Executors 例子

newCachedThreadPool

线程最大数为 Integer.MAX_VALUE, 当我们往线程池添加了 n 个任务, 这 n 个任务都是一起执行的.

  ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    cachedThreadPool.execute(new Runnable() {
      @Override
      public void run() {
        for (;;) {
          try {
            Thread.currentThread().sleep(1000);
            System.out.println(Thread.currentThread().getName());
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });

    cachedThreadPool.execute(new Runnable() {
      @Override
      public void run() {
        for (;;) {
          try {
            Thread.currentThread().sleep(1000);
            System.out.println(Thread.currentThread().getName());
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });

newFixedThreadPool

  ExecutorService cachedThreadPool = Executors.newFixedThreadPool(1);
    cachedThreadPool.execute(new Runnable() {
      @Override
      public void run() {
        for (;;) {
          try {
            Thread.currentThread().sleep(1000);
            System.out.println(Thread.currentThread().getName());
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });

    cachedThreadPool.execute(new Runnable() {
      @Override
      public void run() {
        for (;;) {
          try {
            Thread.currentThread().sleep(1000);
            System.out.println(Thread.currentThread().getName());
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });

newScheduledThreadPool

三秒执行一次, 只有执行完这一次后, 才会执行.

 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
    scheduledExecutorService.schedule(new Runnable() {
      @Override
      public void run() {
        for (;;) {
          try {
            Thread.currentThread().sleep(2000);
            System.out.println(Thread.currentThread().getName());
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    }, 3, TimeUnit.SECONDS);

newSingleThreadExecutor

顺序执行各个任务, 第一个任务执行完, 才会执行下一个.

ExecutorService executorService = Executors.newSingleThreadExecutor();
    executorService.execute(new Runnable() {
      @Override
      public void run() {
        for (;;) {
          try {
            System.out.println(Thread.currentThread().getName());
            Thread.currentThread().sleep(10000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });

    executorService.execute(new Runnable() {
      @Override
      public void run() {
        for (;;) {
          try {
            System.out.println(Thread.currentThread().getName());
            Thread.currentThread().sleep(2);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    });

Executors存在什么问题

在阿里巴巴Java开发手册中提到,使用Executors创建线程池可能会导致OOM(OutOfMemory ,内存溢出),但是并没有说明为什么,那么接下来我们就来看一下到底为什么不允许使用Executors?

我们先来一个简单的例子,模拟一下使用Executors导致OOM的情况.

/**
 * @author Hollis
 */
public class ExecutorsDemo {
  private static ExecutorService executor = Executors.newFixedThreadPool(15);
  public static void main(String[] args) {
    for (int i = 0; i < Integer.MAX_VALUE; i++) {
      executor.execute(new SubThread());
    }
  }
}

class SubThread implements Runnable {
  @Override
  public void run() {
    try {
      Thread.sleep(10000);
    } catch (InterruptedException e) {
      //do nothing
    }
  }
}

通过指定JVM参数:-Xmx8m -Xms8m 运行以上代码,会抛出OOM:

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)

以上代码指出,ExecutorsDemo.java 的第16行,就是代码中的 executor.execute(new SubThread());

Java中的 BlockingQueue 主要有两种实现, 分别是 ArrayBlockingQueueLinkedBlockingQueue.

ArrayBlockingQueue 是一个用数组实现的有界阻塞队列, 必须设置容量.

public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  lock = new ReentrantLock(fair);
  notEmpty = lock.newCondition();
  notFull = lock.newCondition();
}

LinkedBlockingQueue 是一个用链表实现的有界阻塞队列, 容量可以选择进行设置, 不设置的话, 将是一个无边界的阻塞队列, 最大长度为 Integer.MAX_VALUE.

public LinkedBlockingQueue() {
  this(Integer.MAX_VALUE);
}

这里的问题就出在如果我们不设置 LinkedBlockingQueue 的容量的话, 其默认容量将会是 Integer.MAX_VALUE.

newFixedThreadPool 中创建 LinkedBlockingQueue 时, 并未指定容量. 此时, LinkedBlockingQueue 就是一个无边界队列, 对于一个无边界队列来说, 是可以不断的向队列中加入任务的, 这种情况下就有可能因为任务过多而导致内存溢出问题.

newCachedThreadPoolnewScheduledThreadPool 这两种方式创建的最大线程数可能是Integer.MAX_VALUE, 而创建这么多线程, 必然就有可能导致OOM.

ThreadPoolExecutor 创建线程池

避免使用 Executors 创建线程池, 主要是避免使用其中的默认实现, 那么我们可以自己直接调用 ThreadPoolExecutor 的构造函数来自己创建线程池. 在创建的同时, 给 BlockQueue 指定容量就可以了.

ExecutorService executor = new ThreadPoolExecutor(10, 10,
    60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue(10));

这种情况下, 一旦提交的线程数超过当前可用线程数时, 就会抛出 java.util.concurrent.RejectedExecutionException, 这是因为当前线程池使用的队列是有边界队列, 队列已经满了便无法继续处理新的请求.

除了自己定义 ThreadPoolExecutor 外. 还有其他方法. 如apache和guava等.

四个构造函数

  public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue)

  public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue,
               ThreadFactory threadFactory)

  public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue,
               RejectedExecutionHandler handler)

  public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue,
               ThreadFactory threadFactory,
               RejectedExecutionHandler handler)

int corePoolSize => 该线程池中核心线程数最大值

线程池新建线程的时候,如果当前线程总数小于corePoolSize, 则新建的是核心线程, 如果超过corePoolSize, 则新建的是非核心线程

核心线程默认情况下会一直存活在线程池中, 即使这个核心线程啥也不干(闲置状态).

如果指定 ThreadPoolExecutor 的 allowCoreThreadTimeOut 这个属性为 true, 那么核心线程如果不干活(闲置状态)的话, 超过一定时间(时长下面参数决定), 就会被销毁掉

很好理解吧, 正常情况下你不干活我也养你, 因为我总有用到你的时候, 但有时候特殊情况(比如我自己都养不起了), 那你不干活我就要把你干掉了

int maximumPoolSize
该线程池中线程总数最大值

线程总数 = 核心线程数 + 非核心线程数.

long keepAliveTime
该线程池中非核心线程闲置超时时长

一个非核心线程, 如果不干活(闲置状态)的时长超过这个参数所设定的时长, 就会被销毁掉

如果设置 allowCoreThreadTimeOut = true, 则会作用于核心线程

TimeUnit unit

keepAliveTime的单位, TimeUnit是一个枚举类型, 其包括:

TimeUnit.DAYS;        //天
TimeUnit.HOURS;       //小时
TimeUnit.MINUTES;      //分钟
TimeUnit.SECONDS;      //秒
TimeUnit.MILLISECONDS;   //毫秒
TimeUnit.MICROSECONDS;   //微妙
TimeUnit.NANOSECONDS;    //纳秒

BlockingQueue workQueue

一个阻塞队列, 用来存储等待执行的任务. 也就是说现在有10个任务, 核心线程 有四个, 非核心线程有六个, 那么这六个线程会被添加到 workQueue 中, 等待执行.

这个参数的选择也很重要, 会对线程池的运行过程产生重大影响, 一般来说, 这里的阻塞队列有以下几种选择:

SynchronousQueue: 这个队列接收到任务的时候, 会直接提交给线程处理, 而不保留它, 如果所有线程都在工作怎么办? 那就*新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误, 使用这个类型队列的时候, maximumPoolSize 一般指定成 Integer.MAX_VALUE, 即无限大.

LinkedBlockingQueue: 这个队列接收到任务的时候, 如果当前线程数小于核心线程数, 则核心线程处理任务; 如果当前线程数等于核心线程数, 则进入队列等待. 由于这个队列最大值为 Integer.MAX_VALUE , 即所有超过核心线程数的任务都将被添加到队列中,这也就导致了 maximumPoolSize 的设定失效, 因为总线程数永远不会超过 corePoolSize.

ArrayBlockingQueue: 可以限定队列的长度, 接收到任务的时候, 如果没有达到 corePoolSize 的值, 则核心线程执行任务, 如果达到了, 则入队等候, 如果队列已满, 则新建线程(非核心线程)执行任务, 又如果总线程数到了maximumPoolSize, 并且队列也满了, 则发生错误.

DelayQueue: 队列内元素必须实现 Delayed 接口, 这就意味着你传进去的任务必须先实现Delayed接口. 这个队列接收到任务时, 首先先入队, 只有达到了指定的延时时间, 才会执行任务.

ThreadFactory threadFactory

它是ThreadFactory类型的变量, 用来创建新线程.

默认使用 Executors.defaultThreadFactory() 来创建线程. 使用默认的 ThreadFactory 来创建线程时, 会使新创建的线程具有相同的 NORM_PRIORITY 优先级并且是非守护线程, 同时也设置了线程的名称.

RejectedExecutionHandler handler

表示当拒绝处理任务时的策略, 有以下四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常(默认).
ThreadPoolExecutor.DiscardPolicy:直接丢弃任务, 但是不抛出异常.
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务, 然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:用调用者所在的线程来执行任务.

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • java线程池对象ThreadPoolExecutor的深入讲解

    使用线程池的好处 1.降低资源消耗 可以重复利用已创建的线程降低线程创建和销毁造成的消耗. 2.提高响应速度 当任务到达时,任务可以不需要等到线程创建就能立即执行. 3.提高线程的可管理性 线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配.调优和监控 ThreadPoolExecutor 介绍: java 提供的线程池类: ThreadPoolExecutor 作用: 两个作用: 1,用于分离执行任务和当前线程: 2,主要设计初衷:重复利用T

  • java 中ThreadPoolExecutor原理分析

    java 中ThreadPoolExecutor原理分析 线程池简介 Java线程池是开发中常用的工具,当我们有异步.并行的任务要处理时,经常会用到线程池,或者在实现一个服务器时,也需要使用线程池来接收连接处理请求. 线程池使用 JDK中提供的线程池实现位于java.util.concurrent.ThreadPoolExecutor.在使用时,通常使用ExecutorService接口,它提供了submit,invokeAll,shutdown等通用的方法. 在线程池配置方面,Executor

  • java ThreadPoolExecutor使用方法简单介绍

    java  ThreadPoolExecutor 前言: 在项目中如果使用发短信这个功能,一般会把发短信这个动作变成异步的,因为大部分情况下,短信到底是发送成功或者失败,都不能影响主流程.当然像发送MQ消息等操作也是可以封装成异步操作的. 使用基本的New Thread 如果想一个操作变成异步的,可以直接new thread,然后在run方法中实现业务操作即可.例如: new Thread(new Runnable() { public void run() { //发短信.发MQ消息等 } }

  • java ThreadPoolExecutor 并发调用实例详解

    java ThreadPoolExecutor 并发调用实例详解 概述 通常为了提供任务的处理速度,会使用一些并发模型,ThreadPoolExecutor中的invokeAll便是一种. 代码 package test.current; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; import java.util

  • Java ThreadPoolExecutor的参数深入理解

    Java ThreadPoolExecutor的参数深入理解 一.使用Executors创建线程池    之前创建线程的时候都是用的Executors的newFixedThreadPool(),newSingleThreadExecutor(),newCachedThreadPool()这三个方法.当然Executors也是用不同的参数去new ThreadPoolExecutor     1. newFixedThreadPool() 创建线程数固定大小的线程池. 由于使用了LinkedBlo

  • Java自带定时任务ScheduledThreadPoolExecutor实现定时器和延时加载功能

    java.util.concurrent.ScheduledThreadPoolExecutor 是JDK1 .6之后自带的包,功能强大,能实现定时器和延时加载的功能 各类功能和处理方面优于Timer 1.定时器: ScheduledThreadPoolExecutor  有个scheduleAtFixedRate(command, initialDelay, period, unit) ;方法 command: 执行的线程(可自己New一个) initialDelay:初始化执行的延时时间 p

  • java中Executor,ExecutorService,ThreadPoolExecutor详解

    java中Executor,ExecutorService,ThreadPoolExecutor详解 1.Excutor 源码非常简单,只有一个execute(Runnable command)回调接口 public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thre

  • Java ThreadPoolExecutor 线程池的使用介绍

    Executors Executors 是一个Java中的工具类. 提供工厂方法来创建不同类型的线程池. 从上图中也可以看出, Executors的创建线程池的方法, 创建出来的线程池都实现了 ExecutorService接口. 常用方法有以下几个: newFixedThreadPool(int Threads): 创建固定数目线程的线程池, 超出的线程会在队列中等待. newCachedThreadPool(): 创建一个可缓存线程池, 如果线程池长度超过处理需要, 可灵活回收空闲线程(60

  • Java ThreadPoolExecutor线程池有关介绍

    目录 为什么要有线程池? 线程池状态 ThreadPoolExecutor核心参数 corePoolSize maximumPoolSize keepAliveTime unit workQueue threadFactory handler 关闭线程池的方式 为什么不推荐使用Executors去创建线程池 为什么要有线程池? 在实际使用中,服务器在创建和销毁线程上花费的时间和消耗的系统资源都相当大,所以要尽可能减少创建和销毁线程的次数.由于没有线程创建和销毁时的消耗,可以提高系统响应速度可以对

  • java ThreadPoolExecutor线程池拒绝策略避坑

    目录 1.场景 2. 原因分析 3.总结 4.思考 1.场景 线程池使用DiscardOldestPolicy拒绝策略,阻塞队列使用ArrayBlockingQueue,发现在某些情形下对于得到的Future,调用get()方法当前线程会一直阻塞. 为了便于理解,将实际情景抽象为下面的代码: ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 1, 1, 1, TimeUnit.SECONDS, new ArrayBlo

  • Java创建线程池为什么一定要用ThreadPoolExecutor

    目录 先说结论 OOM风险演示 内存溢出原因分析 使用ThreadPoolExecutor来改进 其他创建线程池的问题 总结 前言: 在 Java 语言中,并发编程都是依靠线程池完成的,而线程池的创建方式又有很多,但从大的分类来说,线程池的创建总共分为两大类:手动方式使用ThreadPoolExecutor创建线程池和使用 Executors 执行器自动创建线程池. 那究竟要使用哪种方式来创建线程池呢?我们今天就来详细的聊一聊. 先说结论 在 Java 语言中,一定要使用 ThreadPoolE

  • 深入理解Java编程线程池的实现原理

    在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,

  • java 定时器线程池(ScheduledThreadPoolExecutor)的实现

    前言 定时器线程池提供了定时执行任务的能力,即可以延迟执行,可以周期性执行.但定时器线程池也还是线程池,最底层实现还是ThreadPoolExecutor,可以参考我的另外一篇文章多线程–精通ThreadPoolExecutor. 特点说明 1.构造函数 public ScheduledThreadPoolExecutor(int corePoolSize) { // 对于其他几个参数在ThreadPoolExecutor中都已经详细分析过了,所以这里,将不再展开 // 这里我们可以看到调用基类

  • Java常用线程池原理及使用方法解析

    一.简介 什么是线程池? 池的概念大家也许都有所听闻,池就是相当于一个容器,里面有许许多多的东西你可以即拿即用.java中有线程池.连接池等等.线程池就是在系统启动或者实例化池时创建一些空闲的线程,等待工作调度,执行完任务后,线程并不会立即被销毁,而是重新处于空闲状态,等待下一次调度. 线程池的工作机制? 在线程池的编程模式中,任务提交并不是直接提交给线程,而是提交给池.线程池在拿到任务之后,就会寻找有没有空闲的线程,有则分配给空闲线程执行,暂时没有则会进入等待队列,继续等待空闲线程.如果超出最

  • java ThreadPool线程池的使用,线程池工具类用法说明

    实际上java已经提供线程池的实现 ExecutorService. 为了更方便的使用和管理.这里提供一个线程池工具类,方便大家的使用. 直接看看代码: 使用 public static void main(String[] args) { //实例化一个固定数目的线程池.具体参考类的构造方法 ThreadPool threadPool=new ThreadPool(ThreadPool.FixedThread,5); //线程池执行线程 threadPool.execute(new Runna

  • java中线程池最实用的创建与关闭指南

    目录 前言 线程池创建 只需要执行shutdown就可以优雅关闭 执行shutdownNow关闭的测试 总结 前言 在日常的开发工作当中,线程池往往承载着一个应用中最重要的业务逻辑,因此我们有必要更多地去关注线程池的执行情况,包括异常的处理和分析等. 线程池创建 避免使用Executors创建线程池,主要是避免使用其中的默认实现,那么我们可以自己直接调用ThreadPoolExecutor的构造函数来自己创建线程池.在创建的同时,给BlockQueue指定容量就可以了. private stat

  • 浅谈Java关闭线程池shutdown和shutdownNow的区别

    目录 前言 项目环境 1.线程池示例 2.shutdown 3.isShutdown 4.isTerminated 5.awaitTermination 6.shutdownNow 7.shutdown 和 shutdownNow 的区别? 前言 本章分为两个议题 如何正确关闭线程池 shutdown 和 shutdownNow 的区别 项目环境 jdk 1.8 github 地址:https://github.com/huajiexiewenfeng/java-concurrent 本章模块:

随机推荐