Java线程池并发执行多个任务方式

目录
  • Java线程池并发执行多个任务
  • Java线程池的正确使用
    • 1.Executors存在什么问题
    • 2.Executors为什么存在缺陷
    • 3. 线程池参数详解
    • 4. 线程池正确打开方式

Java线程池并发执行多个任务

Java在语言层面提供了多线程的支持,线程池能够避免频繁的线程创建和销毁的开销,因此很多时候在项目当中我们是使用的线程池去完成多线程的任务。

Java提供了Executors 框架提供了一些基础的组件能够轻松的完成多线程异步的操作,Executors提供了一系列的静态工厂方法能够获取不同的ExecutorService实现,ExecutorService扩展了Executors接口,Executors相当简单:

public interface Executor {
    void execute(Runnable command);
}

把任务本身和任务的执行解耦了,如果说Runnable是可异步执行任务的抽象,那Executor就是如何执行可异步执行任务的抽象,说起来比较绕口。

本文不讲解线程的一些基础知识,因为网上的其他文章已经写的足够详细和泛滥。我写写多个异步任务的并发执行与结果的获取问题。

假设这样一个场景:我们要组装一个对象,这个对象由大量小的内容组成,这些内容是无关联无依赖关系的,如果我们串行的去执行,如果每个任务耗时10秒钟,一共有10个任务,那我们就需要100秒才能获取到结果。显然我们可以采用线程池,每个任务起一个线程,忽略线程启动时间,我们只需要10秒钟就能获取到结果。这里还有两种选择,这10秒钟我们可以去做其他事,也可以等待结果。

我们来完成这样的操作:

// 这是任务的抽象
class GetContentTask implements Callable<String> {
        
        private String name;
        
        private Integer sleepTimes;
        
        public GetContentTask(String name, Integer sleepTimes) {
            this.name = name;
            this.sleepTimes = sleepTimes;
        }
        public String call() throws Exception {
            // 假设这是一个比较耗时的操作
            Thread.sleep(sleepTimes * 1000);
            return "this is content : hello " + this.name;
        }
        
    }

采用completionService :

// 方法一
        ExecutorService executorService = Executors.newCachedThreadPool();
        CompletionService<String> completionService = new ExecutorCompletionService(executorService);
        ExecuteServiceDemo executeServiceDemo = new ExecuteServiceDemo();
        // 十个
        long startTime = System.currentTimeMillis();
        int count = 0;
        for (int i = 0;i < 10;i ++) {
            count ++;
            GetContentTask getContentTask = new ExecuteServiceDemo.GetContentTask("micro" + i, 10);
            completionService.submit(getContentTask);
        }
        System.out.println("提交完任务,主线程空闲了, 可以去做一些事情。");
        // 假装做了8秒种其他事情
        try {
            Thread.sleep(8000);
            System.out.println("主线程做完了,等待结果");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            // 做完事情要结果
            for (int i = 0;i < count;i ++) {
                Future<String> result = completionService.take();
                System.out.println(result.get());
            }
            long endTime = System.currentTimeMillis();
            System.out.println("耗时 : " + (endTime - startTime) / 1000);
        }  catch (Exception ex) {
            System.out.println(ex.getMessage());
        }

执行结果为:

提交完任务,主线程空闲了, 可以去做一些事情。
主线程做完了,等待结果
this is content : hello micro9
this is content : hello micro7
this is content : hello micro2
this is content : hello micro5
this is content : hello micro4
this is content : hello micro8
this is content : hello micro1
this is content : hello micro3
this is content : hello micro0
this is content : hello micro6
耗时 : 10

如果多个不想一个一个提交,可以采用 invokeAll一并提交,但是会同步等待这些任务

// 方法二
        ExecutorService executeService = Executors.newCachedThreadPool();
        List<GetContentTask> taskList = new ArrayList<GetContentTask>();
        long startTime = System.currentTimeMillis();
        for (int i = 0;i < 10;i ++) {
            taskList.add(new GetContentTask("micro" + i, 10));
        }
        try {
            System.out.println("主线程发起异步任务请求");
            List<Future<String>> resultList = executeService.invokeAll(taskList);
            // 这里会阻塞等待resultList获取到所有异步执行的结果才会执行 
            for (Future<String> future : resultList) {
                System.out.println(future.get());
            }
            // 主线程假装很忙执行8秒钟
            Thread.sleep(8);
            long endTime = System.currentTimeMillis();
            System.out.println("耗时 : " + (endTime - startTime) / 1000);
        } catch (Exception e) {
            e.printStackTrace();
        }

主线程发起异步任务请求
this is content : hello micro0
this is content : hello micro1
this is content : hello micro2
this is content : hello micro3
this is content : hello micro4
this is content : hello micro5
this is content : hello micro6
this is content : hello micro7
this is content : hello micro8
this is content : hello micro9
耗时 : 10

如果一系列请求,我们并不需要等待每个请求,我们可以invokeAny,只要某一个请求返回即可。

ExecutorService executorService = Executors.newCachedThreadPool();
        ArrayList<GetContentTask> taskList = new ArrayList<GetContentTask>();
        taskList.add(new GetContentTask("micro1",3));
        taskList.add(new GetContentTask("micro2", 6));
        try {
            List<Future<String>> resultList = executorService.invokeAll(taskList);// 等待6秒 
//            String result2 = executorService.invokeAny(taskList); // 等待3秒
            // invokeAll 提交一堆任务并行处理并拿到结果
            // invokeAny就是提交一堆并行任务拿到一个结果即可
            for (Future<String> result : resultList) {
                System.out.println(result.get());
            }
//            System.out.println(result2);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("主线程等待");

如果我虽然发送了一堆异步的任务,但是我只等待一定的时间,在规定的时间没有返回我就不要了,例如很多时候的网络请求其他服务器如果要数据,由于网络原因不能一直等待,在规定时间内去拿,拿不到就我使用一个默认值。

这样的场景,我们可以使用下面的写法:

try {
            ExecutorService executorService = Executors.newCachedThreadPool();
            List<Callable<String>> taskList = new ArrayList<Callable<String>>();
            taskList.add(new GetContentTask("micro1", 4));
            taskList.add(new GetContentTask("micro2", 6));
            // 等待五秒
            List<Future<String>> resultList = executorService.invokeAll(taskList, 5, TimeUnit.SECONDS);
            for (Future<String> future : resultList) {
                System.out.println(future.get());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } 

this is content : hello micro1
java.util.concurrent.CancellationException
    at java.util.concurrent.FutureTask.report(FutureTask.java:121)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at com.micro.demo.spring.ExecuteServiceDemo.main(ExecuteServiceDemo.java:105)

因为只等待5秒,6秒的那个任务自然获取不到,抛出异常,如果将等待时间设置成8秒,就都能获取到。

Java线程池的正确使用

线程可认为是操作系统可调度的最小的程序执行序列,一般作为进程的组成部分,同一进程中多个线程可共享该进程的资源(如内存等)。JVM线程跟内核轻量级进程有一对一的映射关系,所以JVM中的线程是很宝贵的。

一般在工程上多线程的实现是基于线程池的。因为相比自己创建线程,多线程具有以下优点:

  • 线程是稀缺资源,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以重复使用。
  • 可以根据系统的承受能力,调整线程池中工作线程的数量,防止因为消耗过多内存导致服务器崩溃。

1.Executors存在什么问题

看阿里巴巴开发手册并发编程这块有一条:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式。

2.Executors为什么存在缺陷

2.1 线程池工作原理

当一个任务通过execute(Runnable)方法欲添加到线程池时:

  • 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的务。
  • 如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
  • 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
  • 那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
  • 当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

2.2 newFixedThreadPool分析

Java中的BlockingQueue主要有两种实现,分别是ArrayBlockingQueue 和 LinkedBlockingQueue。

ArrayBlockingQueue是一个用数组实现的有界阻塞队列,必须设置容量。

LinkedBlockingQueue是一个用链表实现的有界阻塞队列,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。

这里的问题就出在:不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。也就是说,如果我们不设置LinkedBlockingQueue的容量的话,其默认容量将会是Integer.MAX_VALUE。而newFixedThreadPool中创建LinkedBlockingQueue时,并未指定容量。此时,LinkedBlockingQueue就是一个无边界队列,对于一个无边界队列来说,是可以不断的向队列中加入任务的,这种情况下就有可能因为任务过多而导致内存溢出问题。

2.3 newCachedThreadPool分析

结合上述流程图,核心线程数=0,最大线程无限大,由于SynchronousQueue是一个缓存值为1的阻塞队列。当有大量任务请求时,线程池会创建大量线程,造成OOM。

3. 线程池参数详解

3.1 构造方法

3.2 线程池拒绝策略

RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。。以下是JDK1.5提供的四种策略。

  • AbortPolicy:直接抛出异常
  • CallerRunsPolicy:只用调用者所在线程来运行任务。
  • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。DiscardPolicy:不处理,丢弃掉。
  • 当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。

4. 线程池正确打开方式

4.1 创建线程池

避免使用Executors创建线程池,主要是避免使用其中的默认实现,那么我们可以自己直接调用ThreadPoolExecutor的构造函数来自己创建线程池。在创建的同时,给BlockQueue指定容量就可以了。

4.2 向线程池提交任务

我们可以使用execute提交的任务,但是execute方法没有返回值,所以无法判断任务知否被线程池执行成功。通过以下代码可知execute方法输入的任务是一个Runnable类的实例。

我们也可以使用submit 方法来提交任务,它会返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。

4.3 关闭线程池

shutdown关闭线程池

方法定义:public void shutdown()

  • 线程池的状态变成SHUTDOWN状态,此时不能再往线程池中添加新的任务,否则会抛出RejectedExecutionException异常。
  • 线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。

注意:这个函数不会等待提交的任务执行完成,要想等待全部任务完成,可以调用:

public boolean awaitTermination(longtimeout, TimeUnit unit)

shutdownNow关闭线程池并中断任务

方法定义:public List shutdownNow()

  • 线程池的状态立刻变成STOP状态,此时不能再往线程池中添加新的任务。
  • 终止等待执行的线程,并返回它们的列表;
  • 试图停止所有正在执行的线程,试图终止的方法是调用Thread.interrupt(),但是大家知道,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。

4.4 如何配置线程池大小

CPU密集型任务

  • 该任务需要大量的运算,并且没有阻塞,CPU一直全速运行,CPU密集任务只有在真正的多核CPU上才可能通过多线程加速 CPU密集型任务配置尽可能少的线程数量:
  • CPU核数+1个线程的线程池。
  • 例如: CPU 16核,内存32G。线程数=16

IO密集型任务

  • IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如:CPU核数*2
  • 某大厂设置策略:IO密集型时,大部分线程都阻塞,故需要多配置线程数:
  • CPU核数/(1-阻塞系数)
  • 例如: CPU 16核, 阻塞系数 0.9 ------------->16/(1-0.9) = 160 个线程数。
  • 此时非阻塞线程=16

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • Java多线程工具CompletableFuture的使用教程

    目录 前言 Future的问题 CompletableFuture应运而生 使用方式 基本使用-提交异步任务 处理上个异步任务结果 对两个结果进行选用-acceptEither 对两个结果进行合并-thenCombine, thenAcceptBoth 异常处理 案例 大量用户发送短信|消息 并发获取商品不同信息 问题 thenRun和thenRunAsync有什么区别 handle和exceptional有什么区别 最后 前言 Future的问题 写多线程程序的时候,可以使用Future从一个

  • Java实现手写线程池的示例代码

    目录 前言 线程池给我们提供的功能 工具介绍 Worker设计 线程池设计 总结 前言 在我们的日常的编程当中,并发是始终离不开的主题,而在并发多线程当中,线程池又是一个不可规避的问题.多线程可以提高我们并发程序的效率,可以让我们不去频繁的申请和释放线程,这是一个很大的花销,而在线程池当中就不需要去频繁的申请线程,他的主要原理是申请完线程之后并不中断,而是不断的去队列当中领取任务,然后执行,反复这样的操作.在本篇文章当中我们主要是介绍线程池的原理,因此我们会自己写一个非常非常简单的线程池,主要帮

  • Java多线程ThreadPoolExecutor详解

    目录 1 newFixedThreadPool 2 newCachedThreadPool 3 newSingleThreadExecutor 4 提交任务 5 关闭线程池 前言: 根据ThreadPoolExecutor的构造方法,JDK提供了很多工厂方法来创建各种用途的线程池. 1 newFixedThreadPool public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolE

  • Java查看线程运行状态的方法详解

    目录 一.查看线程的运行状态 二.解题思路 三.代码详解 一.查看线程的运行状态 题目 线程有以下6种状态:新建.运行.阻塞.等待.计时等待和终止. new新线程时,线程处于新建 状态. 调用start()方法时,线程处于运行状态. 当线程需要获得对象的内置锁,而该锁正被其他线程拥有,线程处于阻塞状态. 线程等待其他线程通知调度表可以运行时,该线程处于等待状态. 对于一些含有时间参数的方法,如 Thread 类的 sleep()方法,可以使线程处于计时等待状态. 当run()方法运行完毕或出现异

  • java线程组构造方法源码解析

    目录 线程组构造方法 为啥开启线程,就能统计到呢? 线程组下面可以有线程组吗? 线程组构造方法 我们看这个线程组,线程组名字是system,设置优先级,然后指定父线程是空,可以看出这个是根线程组,这个方法是私有的,不是给我们调用的. 把当前线程加入线程组中 我们试试上述代码逻辑,对叭,没指定线程组就加入当前创建的main线程的线程组,如果指定线程组就是当前线程组. 我们看源码也是一样: 活跃线程和活跃线程组数量都是0: 我们这是没开启线程,所以我们需要开启线程,这里就看到活跃线程数量了: 为啥开

  • Java线程的五种状态介绍

    目录 1. 线程的5种状态 2. Java线程的6种状态 3. Java线程状态的转换 1. 线程的5种状态 从操作系统层面上,任何线程一般都具有五种状态,即创建.就绪.运行.阻塞.终止. (1) 新建状态(NEW) 在程序中用构造方法创建一个新线程时,如new Thread(),该线程就是创建状态,此时它已经有了相应的内存空间和其它资源,但是还没有开始执行. (2) 就绪状态(READ) 新建线程对象后,调用该线程的start()方法就可以启动线程.当线程启动时,线程就进入就绪状态(runna

  • Java多线程实现第三方数据同步

    本文实例为大家分享了Java多线程实现第三方数据同步的具体代码,供大家参考,具体内容如下 一.场景 最近的一项开发任务是同步第三方数据,而第三方数据一般有存量数据和增量数据,存量数据有100w+.在得知此需求时,进行了一定的信息检索和工具学习,提前获取存量数据到目标库,再使用kettle进行存量数据转换:增量数据则根据业务方规定的请求时间,通过定时任务去获取增量数据并进行数据转换.在数据获取和转换时,我们应该要记录每一次的请求信息,便于溯源和数据对账!!! 二.获取数据的方式 2.1 递归方式

  • Java创建线程的五种写法总结

    目录 通过继承Thread类并实现run方法创建一个线程 通过实现Runnable接口,并实现run方法的方法创建一个线程 通过Thread匿名内部类创建一个线程 通过Runnable匿名内部类创建一个线程 通过Lambda表达式的方式创建一个线程 通过继承Thread类并实现run方法创建一个线程 // 定义一个Thread类,相当于一个线程的模板 class MyThread01 extends Thread { // 重写run方法// run方法描述的是线程要执行的具体任务@Overri

  • Java查看和修改线程优先级操作详解

    目录 查看和修改线程优先级 1.题目 2.解题思路 3.代码详解 查看和修改线程优先级 1.题目 JAVA中每个线程都有优化级属性,默认情况下,新建的线程和创建该线程的线程优先级是一样的.当线程调度器选择要运行的线程时,会选择优先级高的线程. 实现:查看和修改线程的优先级 2.解题思路 创建一个类:ThreadPriorityTest,继承了JFrame.用来界面显示当前线程组中运行的线程. 定义2个方法: do_this_windowActivated():用来监听窗体激活事件 do_butt

  • Java线程池并发执行多个任务方式

    目录 Java线程池并发执行多个任务 Java线程池的正确使用 1.Executors存在什么问题 2.Executors为什么存在缺陷 3. 线程池参数详解 4. 线程池正确打开方式 Java线程池并发执行多个任务 Java在语言层面提供了多线程的支持,线程池能够避免频繁的线程创建和销毁的开销,因此很多时候在项目当中我们是使用的线程池去完成多线程的任务. Java提供了Executors 框架提供了一些基础的组件能够轻松的完成多线程异步的操作,Executors提供了一系列的静态工厂方法能够获

  • java 线程池如何执行策略又拒绝哪些策略

    目录 线程池执行流程 线程池拒绝策略 DiscardPolicy拒绝策略 AbortPolicy拒绝策略 自定义拒绝策略 总结 前言: 聊到线程池就一定会聊到线程池的执行流程,也就是当有一个任务进入线程池之后,线程池是如何执行的?我们今天就来聊聊这个话题.线程池是如何执行的?线程池的拒绝策略有哪些? 线程池执行流程 想要真正的了解线程池的执行流程,就得先从线程池的执行方法 execute() 说起,execute() 实现源码如下: public void execute(Runnable co

  • 详解Java线程池和Executor原理的分析

    详解Java线程池和Executor原理的分析 线程池作用与基本知识 在开始之前,我们先来讨论下"线程池"这个概念."线程池",顾名思义就是一个线程缓存.它是一个或者多个线程的集合,用户可以把需要执行的任务简单地扔给线程池,而不用过多的纠结与执行的细节.那么线程池有哪些作用?或者说与直接用Thread相比,有什么优势?我简单总结了以下几点: 减小线程创建和销毁带来的消耗 对于Java Thread的实现,我在前面的一篇blog中进行了分析.Java Thread与内

  • Java 线程池_动力节点Java学院整理

    线程池 系统启动一个新线程的成本是比较高的,因为它涉及到与操作系统的交互.在这种情况下,使用线程池可以很好的提供性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池. 与数据库连接池类似的是,线程池在系统启动时即创建大量空闲的线程,程序将一个Runnable对象传给线程池,线程池就会启动一条线程来执行该对象的run方法,当run方法执行结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个Runnable对象的run方法. 除此之外,使用线程池可以有效地

  • Java线程池的几种实现方法和区别介绍实例详解

    下面通过实例代码为大家介绍Java线程池的几种实现方法和区别: import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.

  • Java 线程池ExecutorService详解及实例代码

    Java 线程池ExecutorService 1.线程池 1.1什么情况下使用线程池 单个任务处理的时间比较短. 将需处理的任务的数量大. 1.2使用线程池的好处 减少在创建和销毁线程上所花的时间以及系统资源的开销. 如果不使用线程池,有可能造成系统创建大量线程而导致消耗系统内存以及"过度切换"; 2.ExecutorService和Executors 2.1简介 ExecutorService是一个接口,继承了Executor, public interface ExecutorS

  • Java线程池的几种实现方法及常见问题解答

    工作中,经常会涉及到线程.比如有些任务,经常会交与线程去异步执行.抑或服务端程序为每个请求单独建立一个线程处理任务.线程之外的,比如我们用的数据库连接.这些创建销毁或者打开关闭的操作,非常影响系统性能.所以,"池"的用处就凸显出来了. 1. 为什么要使用线程池 在3.6.1节介绍的实现方式中,对每个客户都分配一个新的工作线程.当工作线程与客户通信结束,这个线程就被销毁.这种实现方式有以下不足之处: •服务器创建和销毁工作的开销( 包括所花费的时间和系统资源 )很大.这一项不用解释,可以

  • Java 线程池详解及创建简单实例

    Java 线程池 最近在改进项目的并发功能,但开发起来磕磕碰碰的.看了好多资料,总算加深了认识.于是打算配合查看源代码,总结并发编程的原理. 准备从用得最多的线程池开始,围绕创建.执行.关闭认识线程池整个生命周期的实现原理.后续再研究原子变量.并发容器.阻塞队列.同步工具.锁等等主题.java.util.concurrent里的并发工具用起来不难,但不能仅仅会用,我们要read the fucking source code,哈哈.顺便说声,我用的JDK是1.8. Executor框架 Exec

  • Java 线程池详解及实例代码

    线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收. 所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因. 例如Android中常见到的很多通用组件一般都离不开"池"的概念,如各种图片

  • 四种Java线程池用法解析

    本文为大家分析四种Java线程池用法,供大家参考,具体内容如下 1.new Thread的弊端 执行一个异步任务你还只是如下new Thread吗? new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } } ).start(); 那你就out太多了,new Thread的弊端如下: a. 每次new Thread新建对象性能差. b. 线程缺乏统一管理,可能无限

随机推荐