Java 线程池详解

系统启动一个线程的成本是比较高的,因为它涉及到与操作系统的交互,使用线程池的好处是提高性能,当系统中包含大量并发的线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃,而线程池的最大线程数参数可以控制系统中并发线程数不超过次数。

一、Executors 工厂类用来产生线程池,该工厂类包含以下几个静态工厂方法来创建对应的线程池。创建的线程池是一个ExecutorService对象,使用该对象的submit方法或者是execute方法执行相应的Runnable或者是Callable任务。线程池本身在不再需要的时候调用shutdown()方法停止线程池,调用该方法后,该线程池将不再允许任务添加进来,但是会直到已添加的所有任务执行完成后才死亡。

1、newCachedThreadPool(),创建一个具有缓存功能的线程池,提交到该线程池的任务(Runnable或Callable对象)创建的线程,如果执行完成,会被缓存到CachedThreadPool中,供后面需要执行的任务使用。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CacheThreadPool {
  static class Task implements Runnable {
    @Override
    public void run() {
      System.out.println(this + " " + Thread.currentThread().getName() + " AllStackTraces map size: "
          + Thread.currentThread().getAllStackTraces().size());
    }
  }

  public static void main(String[] args) {
    ExecutorService cacheThreadPool = Executors.newCachedThreadPool();

    //先添加三个任务到线程池
    for(int i = 0 ; i < 3; i++) {
      cacheThreadPool.execute(new Task());
    }

    //等三个线程执行完成后,再次添加三个任务到线程池
    try {
      Thread.sleep(3000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    for(int i = 0 ; i < 3; i++) {
      cacheThreadPool.execute(new Task());
    }
  }

}

执行结果如下:

CacheThreadPool$Task@2d312eb9 pool-1-thread-1 AllStackTraces map size: 7
CacheThreadPool$Task@59522b86 pool-1-thread-3 AllStackTraces map size: 7
CacheThreadPool$Task@73dbb89f pool-1-thread-2 AllStackTraces map size: 7
CacheThreadPool$Task@5795cedc pool-1-thread-3 AllStackTraces map size: 7
CacheThreadPool$Task@256d5600 pool-1-thread-1 AllStackTraces map size: 7
CacheThreadPool$Task@7d1c5894 pool-1-thread-2 AllStackTraces map size: 7

线程池中的线程对象进行了缓存,当有新任务执行时进行了复用。但是如果有特别多的并发时,缓存线程池还是会创建很多个线程对象。

2、newFixedThreadPool(int nThreads) 创建一个指定线程个数,线程可复用的线程池。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPool {
  static class Task implements Runnable {
    @Override
    public void run() {
      System.out.println(this + " " + Thread.currentThread().getName() + " AllStackTraces map size: "
          + Thread.currentThread().getAllStackTraces().size());
    }
  }

  public static void main(String[] args) {
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

    // 先添加三个任务到线程池
    for (int i = 0; i < 5; i++) {
      fixedThreadPool.execute(new Task());
    }

    // 等三个线程执行完成后,再次添加三个任务到线程池
    try {
      Thread.sleep(3);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    for (int i = 0; i < 3; i++) {
      fixedThreadPool.execute(new Task());
    }
  }

}

执行结果:

FixedThreadPool$Task@7045c12d pool-1-thread-2 AllStackTraces map size: 7
FixedThreadPool$Task@50fa0bef pool-1-thread-2 AllStackTraces map size: 7
FixedThreadPool$Task@ccb1870 pool-1-thread-2 AllStackTraces map size: 7
FixedThreadPool$Task@7392b4e3 pool-1-thread-1 AllStackTraces map size: 7
FixedThreadPool$Task@5bdeff18 pool-1-thread-2 AllStackTraces map size: 7
FixedThreadPool$Task@7d5554e1 pool-1-thread-1 AllStackTraces map size: 7
FixedThreadPool$Task@24468092 pool-1-thread-3 AllStackTraces map size: 7
FixedThreadPool$Task@fa7b978 pool-1-thread-2 AllStackTraces map size: 7

3、newSingleThreadExecutor(),创建一个只有单线程的线程池,相当于调用newFixedThreadPool(1)

4、newSheduledThreadPool(int corePoolSize),创建指定线程数的线程池,它可以在指定延迟后执行线程。也可以以某一周期重复执行某一线程,知道调用shutdown()关闭线程池。

示例如下:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPool {
  static class Task implements Runnable {
    @Override
    public void run() {
      System.out.println("time " + System.currentTimeMillis() + " " + Thread.currentThread().getName() + " AllStackTraces map size: "
          + Thread.currentThread().getAllStackTraces().size());
    }
  }

  public static void main(String[] args) {
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);

    scheduledExecutorService.schedule(new Task(), 3, TimeUnit.SECONDS);

    scheduledExecutorService.scheduleAtFixedRate(new Task(), 3, 5, TimeUnit.SECONDS);

    try {
      Thread.sleep(30 * 1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    scheduledExecutorService.shutdown();
  }

}

运行结果如下:

time 1458921795240 pool-1-thread-1 AllStackTraces map size: 6
time 1458921795241 pool-1-thread-2 AllStackTraces map size: 6
time 1458921800240 pool-1-thread-1 AllStackTraces map size: 7
time 1458921805240 pool-1-thread-1 AllStackTraces map size: 7
time 1458921810240 pool-1-thread-1 AllStackTraces map size: 7
time 1458921815240 pool-1-thread-1 AllStackTraces map size: 7
time 1458921820240 pool-1-thread-1 AllStackTraces map size: 7

由运行时间可看出,任务是按照5秒的周期执行的。

5、newSingleThreadScheduledExecutor() 创建一个只有一个线程的线程池,同调用newScheduledThreadPool(1)。

二、ForkJoinPool和ForkJoinTask

ForkJoinPool是ExecutorService的实现类,支持将一个任务划分为多个小任务并行计算,在把多个小任务的计算结果合并成总的计算结果。它有两个构造函数

ForkJoinPool(int parallelism)创建一个包含parallelism个并行线程的ForkJoinPool。

ForkJoinPool(),以Runtime.availableProcessors()方法返回值作为parallelism参数来创建ForkJoinPool。

ForkJoinTask 代表一个可以并行,合并的任务。它是实现了Future<T>接口的抽象类,它有两个抽象子类,代表无返回值任务的RecuriveAction和有返回值的RecursiveTask。可根据具体需求继承这两个抽象类实现自己的对象,然后调用ForkJoinPool的submit 方法执行。

RecuriveAction 示例如下,实现并行输出0-300的数字。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

public class ActionForkJoinTask {
  static class PrintTask extends RecursiveAction {
    private static final int THRESHOLD = 50;
    private int start;
    private int end;

    public PrintTask(int start, int end) {
      this.start = start;
      this.end = end;
    }

    @Override
    protected void compute() {
      if (end - start < THRESHOLD) {
        for(int i = start; i < end; i++) {
          System.out.println(Thread.currentThread().getName() + " " + i);
        }
      } else {
        int middle = (start + end) / 2;
        PrintTask left = new PrintTask(start, middle);
        PrintTask right = new PrintTask(middle, end);
        left.fork();
        right.fork();
      }
    }

  }

  public static void main(String[] args) {
    ForkJoinPool pool = new ForkJoinPool();

    pool.submit(new PrintTask(0, 300));
    try {
      pool.awaitTermination(2, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    pool.shutdown();
  }

}

在拆分小任务后,调用任务的fork()方法,加入到ForkJoinPool中并行执行。

RecursiveTask示例,实现并行计算100个整数求和。拆分为每20个数求和后获取结果,在最后合并为最后的结果。

import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class TaskForkJoinTask {
  static class CalTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 20;

    private int arr[];
    private int start;
    private int end;

    public CalTask(int[] arr, int start, int end) {
      this.arr = arr;
      this.start = start;
      this.end = end;
    }

    @Override
    protected Integer compute() {
      int sum = 0;

      if (end - start < THRESHOLD) {
        for (int i = start; i < end; i++) {
          sum += arr[i];
        }
        System.out.println(Thread.currentThread().getName() + " sum:" + sum);
        return sum;
      } else {
        int middle = (start + end) / 2;
        CalTask left = new CalTask(arr, start, middle);
        CalTask right = new CalTask(arr, middle, end);

        left.fork();
        right.fork();

        return left.join() + right.join();
      }
    }

  }

  public static void main(String[] args) {
    int arr[] = new int[100];
    Random random = new Random();
    int total = 0;

    for (int i = 0; i < arr.length; i++) {
      int tmp = random.nextInt(20);
      total += (arr[i] = tmp);
    }
    System.out.println("total " + total);

    ForkJoinPool pool = new ForkJoinPool(4);

    Future<Integer> future = pool.submit(new CalTask(arr, 0, arr.length));
    try {
      System.out.println("cal result: " + future.get());
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }
    pool.shutdown();
  }

}

执行结果如下:

total 912
ForkJoinPool-1-worker-2 sum:82
ForkJoinPool-1-worker-2 sum:123
ForkJoinPool-1-worker-2 sum:144
ForkJoinPool-1-worker-3 sum:119
ForkJoinPool-1-worker-2 sum:106
ForkJoinPool-1-worker-2 sum:128
ForkJoinPool-1-worker-2 sum:121
ForkJoinPool-1-worker-3 sum:89
cal result: 912

子任务执行完后,调用任务的join()方法获取子任务执行结果,再相加获得最后的结果。

(0)

相关推荐

  • Java 线程池详解及实例代码

    线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收. 所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因. 例如Android中常见到的很多通用组件一般都离不开"池"的概念,如各种图片

  • Java线程池的几种实现方法及常见问题解答

    工作中,经常会涉及到线程.比如有些任务,经常会交与线程去异步执行.抑或服务端程序为每个请求单独建立一个线程处理任务.线程之外的,比如我们用的数据库连接.这些创建销毁或者打开关闭的操作,非常影响系统性能.所以,"池"的用处就凸显出来了. 1. 为什么要使用线程池 在3.6.1节介绍的实现方式中,对每个客户都分配一个新的工作线程.当工作线程与客户通信结束,这个线程就被销毁.这种实现方式有以下不足之处: •服务器创建和销毁工作的开销( 包括所花费的时间和系统资源 )很大.这一项不用解释,可以

  • Java 线程池ExecutorService详解及实例代码

    Java 线程池ExecutorService 1.线程池 1.1什么情况下使用线程池 单个任务处理的时间比较短. 将需处理的任务的数量大. 1.2使用线程池的好处 减少在创建和销毁线程上所花的时间以及系统资源的开销. 如果不使用线程池,有可能造成系统创建大量线程而导致消耗系统内存以及"过度切换"; 2.ExecutorService和Executors 2.1简介 ExecutorService是一个接口,继承了Executor, public interface ExecutorS

  • Java Socket编程实例(三)- TCP服务端线程池

    一.服务端回传服务类: import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.util.logging.Level; import java.util.logging.Logger; public class EchoProtocol implements Runnable { private static f

  • Java 线程池_动力节点Java学院整理

    线程池 系统启动一个新线程的成本是比较高的,因为它涉及到与操作系统的交互.在这种情况下,使用线程池可以很好的提供性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池. 与数据库连接池类似的是,线程池在系统启动时即创建大量空闲的线程,程序将一个Runnable对象传给线程池,线程池就会启动一条线程来执行该对象的run方法,当run方法执行结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个Runnable对象的run方法. 除此之外,使用线程池可以有效地

  • 四种Java线程池用法解析

    本文为大家分析四种Java线程池用法,供大家参考,具体内容如下 1.new Thread的弊端 执行一个异步任务你还只是如下new Thread吗? new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } } ).start(); 那你就out太多了,new Thread的弊端如下: a. 每次new Thread新建对象性能差. b. 线程缺乏统一管理,可能无限

  • java线程池详解及代码介绍

    目录 一.线程池简介 二.四种常见的线程池详解 三.缓冲队列BlockingQueue和自定义线程池ThreadPoolExecutor 总结 一.线程池简介 线程池的概念 线程池就是首先创建一些线程,它们的集合称为线程池,使用线程池可以很好的提高性能,线程池在系统启动时既创建大量空闲的线程,程序将一个任务传给线程池.线程池就会启动一条线程来执行这个任务,执行结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务. 线程池的工作机制 在线程池的编程模式下,任务是提交给整个

  • Java 线程池详解及创建简单实例

    Java 线程池 最近在改进项目的并发功能,但开发起来磕磕碰碰的.看了好多资料,总算加深了认识.于是打算配合查看源代码,总结并发编程的原理. 准备从用得最多的线程池开始,围绕创建.执行.关闭认识线程池整个生命周期的实现原理.后续再研究原子变量.并发容器.阻塞队列.同步工具.锁等等主题.java.util.concurrent里的并发工具用起来不难,但不能仅仅会用,我们要read the fucking source code,哈哈.顺便说声,我用的JDK是1.8. Executor框架 Exec

  • Java 线程池详解

    系统启动一个线程的成本是比较高的,因为它涉及到与操作系统的交互,使用线程池的好处是提高性能,当系统中包含大量并发的线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃,而线程池的最大线程数参数可以控制系统中并发线程数不超过次数. 一.Executors 工厂类用来产生线程池,该工厂类包含以下几个静态工厂方法来创建对应的线程池.创建的线程池是一个ExecutorService对象,使用该对象的submit方法或者是execute方法执行相应的Runnable或者是Callable任务.线程池本身在不

  • Java 数据库连接池详解及简单实例

    Java 数据库连接池详解 数据库连接池的原理是: 连接池基本的思想是在系统初始化的时候,将数据库连接作为对象存储在内存中,当用户需要访问数据库时,并非建立一个新的连接,而是从连接池中取出一个已建立的空闲连接对象.使用完毕后,用户也并非将连接关闭,而是将连接放回连接池中,以供下一个请求访问使用.而连接的建立.断开都由连接池自身来管理.同时,还可以通过设置连接池的参数来控制连接池中的初始连接数.连接的上下限数以及每个连接的最大使用次数.最大空闲时间等等.也可以通过其自身的管理机制来监视数据库连接的

  • 基于tomcat的连接数与线程池详解

    前言 在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). 在前面的文章 详解Tomcat配置文件server.xml 中写到过:Connector的主要功能,是接收连接请求,创建Request和Response对象用于和请求端交换数据:然后分配线程让Engine(也就是Servlet容器)来处理这个请求,并把产生的Request和Response对象传给Engine.当Engine处理完请求后,也会通过Conn

  • Springboot 配置线程池创建线程及配置 @Async 异步操作线程池详解

    目录 前言 一.创建一个Springboot Web项目 二.新建ThreadPoolConfig 三.新建controller测试 四.演示结果 前言 众所周知,创建显示线程和直接使用未配置的线程池创建线程,都会被阿里的大佬给diss,所以我们要规范的创建线程. 至于 @Async 异步任务的用处是不想等待方法执行完就返回结果,提高软件前台响应速度,一个程序中会用到很多异步方法,所以需要使用线程池管理,防止影响性能. 一.创建一个Springboot Web项目 需要一个Springboot项

  • nginx源码分析线程池详解

    nginx源码分析线程池详解 一.前言 nginx是采用多进程模型,master和worker之间主要通过pipe管道的方式进行通信,多进程的优势就在于各个进程互不影响.但是经常会有人问道,nginx为什么不采用多线程模型(这个除了之前一篇文章讲到的情况,别的只有去问作者了,HAHA).其实,nginx代码中提供了一个thread_pool(线程池)的核心模块来处理多任务的.下面就本人对该thread_pool这个模块的理解来跟大家做些分享(文中错误.不足还请大家指出,谢谢) 二.thread_

  • Java 线程优先级详解及实例

    Java 线程优先级详解及实例 操作系统基本采用时分的调度运行线程,操作系统会分出一个个时间片,线程会被分配到若干个时间片,当线程的时间片用完了就会发生线程调度,并且等待着下次调度,线程被分配到的时间片多少也就决定了线程使用处理器资源的多少,而线程优先级就是决定线程能够分配多少处理器资源的线程属性. 在Java多线程中,通过一个整形变量priority来控制优先级,优先级的范围从1-10.默认是5,优先级越高越好. public class Priority { public static vo

  • Java 常量池详解之字符串常量池实现代码

    目录 1.字符串常量池(String Constant Pool) 1.1:字符串常量池在Java内存区域的哪个位置? 1.2:字符串常量池是什么? 1.3 字符串常量池生成的时机? 如何将String对象放入到常量池 String 对象代码案例解析 new string(“abc”)创建了几个对象 解析public native String intern() 方法 Integer 对象代码案例解析 为啥Integer i1 =10 跟Integer.valueOf(10) 是相等的? 为啥I

随机推荐