Java线程池ForkJoinPool(工作窃取算法)的使用

目录
  • 概述
  • 工作窃取算法
  • 工作窃取算法的优缺点
  • 使用 ForkJoinPool 进行分叉和合并
  • ForkJoinPool使用
    • RecursiveAction
    • RecursiveTask
  • Fork/Join 案例Demo

概述

Fork 就是把一个大任务切分为若干个子任务并行地执行,Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。Fork/Join 框架使用的是工作窃取算法。

工作窃取算法

工作窃取算法是指某个线程从其他队列里窃取任务来执行。对于一个比较大的任务,可以把它分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。但是,有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务需要处理,于是它就去其他线程的队列里窃取一个任务来执行。由于此时它们访问同一个队列,为了减小竞争,通常会使用双端队列。被窃取任务的线程永远从双端队列的头部获取任务,窃取任务的线程永远从双端队列的尾部获取任务。

工作窃取算法的优缺点

优点:充分利用线程进行并行计算,减少了线程间的竞争。

缺点:双端队列只存在一个任务时会导致竞争,会消耗更多的系统资源,因为需要创建多个线程和多个双端队列。

使用 ForkJoinPool 进行分叉和合并

ForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很相似,除了一点不同。ForkJoinPool 让我们可以很方便地把任务分成几个更小的任务,这些分出来的任务也将会提交给 ForkJoinPool。任务可以继续分割成更小的子任务,只要它还能分割。可能听起来有些抽象,因此本节中我们将会解释 ForkJoinPool 是如何工作的,还有任务分割是如何进行的。

分叉和合并解释

在我们开始看 ForkJoinPool 之前我们先来简要解释一下分叉和合并的原理。

分叉和合并原理包含两个递归进行的步骤。两个步骤分别是分叉步骤和合并步骤。

分叉

一个使用了分叉和合并原理的任务可以将自己分叉(分割)为更小的子任务,这些子任务可以被并发执行。如下图所示:

通过把自己分割成多个子任务,每个子任务可以由不同的 CPU 并行执行,或者被同一个 CPU 上的不同线程执行。

只有当给的任务过大,把它分割成几个子任务才有意义。把任务分割成子任务有一定开销,因此对于小型任务,这个分割的消耗可能比每个子任务并发执行的消耗还要大。

什么时候把一个任务分割成子任务是有意义的,这个界限也称作一个阀值。这要看每个任务对有意义阀值的决定。很大程度上取决于它要做的工作的种类。

合并

当一个任务将自己分割成若干子任务之后,该任务将进入等待所有子任务的结束之中。一旦子任务执行结束,该任务可以把所有结果合并到同一个结果。图示如下:

当然,并非所有类型的任务都会返回一个结果。如果这个任务并不返回一个结果,它只需等待所有子任务执行完毕。也就不需要结果的合并啦。

ForkJoinPool使用

ForkJoinPool 是一个特殊的线程池,它的设计是为了更好的配合 分叉-和-合并 任务分割的工作。ForkJoinPool 也在 java.util.concurrent 包中,其完整类名为 java.util.concurrent.ForkJoinPool。

创建一个 ForkJoinPool

你可以通过其构造子创建一个 ForkJoinPool。作为传递给 ForkJoinPool 构造子的一个参数,你可以定义你期望的并行级别。并行级别表示你想要传递给 ForkJoinPool 的任务所需的线程或 CPU 数量。以下是一个 ForkJoinPool 示例:

// 这个示例创建了一个并行级别为 4 的 ForkJoinPool。   如果是默认构造会自动识别当前电脑的cup核数进行并行
ForkJoinPool forkJoinPool = new ForkJoinPool(4);

提交任务到 ForkJoinPool

就像提交任务到 ExecutorService 那样,把任务提交到 ForkJoinPool。你可以提交两种类型的任务。一种是没有任何返回值的(一个 “行动”),另一种是有返回值的(一个"任务")。这两种类型分别由 RecursiveAction 和 RecursiveTask 表示。接下来介绍如何使用这两种类型的任务,以及如何对它们进行提交。

RecursiveAction

RecursiveAction 是一种没有任何返回值的任务。它只是做一些工作,比如写数据到磁盘,然后就退出了。一个 RecursiveAction 可以把自己的工作分割成更小的几块,这样它们可以由独立的线程或者 CPU 执行。

你可以通过继承来实现一个 RecursiveAction。示例如下:

package com;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class MyRecursiveAction extends RecursiveAction {

    private long workLoad = 0;

    public MyRecursiveAction(long workLoad) {
        this.workLoad = workLoad;
    }

    @Override
    protected void compute() {

        //如果工作超出阈值,将任务分解成更小的任务
        if(this.workLoad > 10) {
            System.out.println("将工作负载 : " + this.workLoad);
            //将工作负载分成多个子任务
            List<MyRecursiveAction> subtasks =
                    new ArrayList<MyRecursiveAction>();
            subtasks.addAll(createSubtasks());
            //将子任务加入到任务队列中
            for(RecursiveAction subtask : subtasks){
                subtask.fork();
            }
        } else {
            System.out.println("自己完成工作量: " + this.workLoad);
        }
    }
    //将工作负载分成多个子任务
    private List<MyRecursiveAction> createSubtasks() {
        List<MyRecursiveAction> subtasks = new ArrayList<MyRecursiveAction>();
        //将工作负载分成两个子任务   24/2=12  12/2=6
        MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);
        MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);
        subtasks.add(subtask1);
        subtasks.add(subtask2);

        return subtasks;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24);
        forkJoinPool.invoke(myRecursiveAction);

    }

}

RecursiveTask

RecursiveTask 是一种会返回结果的任务。它可以将自己的工作分割为若干更小任务,并将这些子任务的执行结果合并到一个集体结果。用法和RecursiveAction一样唯一不同的就是可以返回值

以下是一个 RecursiveTask 示例:

package com;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

//配置RecursiveTask,返回值为Long
public class MyRecursiveTask  extends RecursiveTask<Long> {

    private long workLoad = 0;

    public MyRecursiveTask (long workLoad) {
        this.workLoad = workLoad;
    }

    @Override
    protected Long compute() {

        //如果工作超出阈值,将任务分解成更小的任务
        if(this.workLoad > 10) {
            System.out.println("将工作负载 : " + this.workLoad);
            //将工作负载分成多个子任务
            List<MyRecursiveTask > subtasks = new ArrayList<MyRecursiveTask >();
            subtasks.addAll(createSubtasks());
            //将子任务加入到任务队列中
            for(RecursiveTask subtask : subtasks){
                subtask.fork();
            }
            //等待子任务执行完,并得到其结果,并将结果相加
            long result = 0;
            for(MyRecursiveTask subtask : subtasks) {
                result += subtask.join();
            }
            return result;
        } else {
            System.out.println("自己完成工作量: " + this.workLoad);
            return 1L ;//返回计算结果
        }
    }
    //将工作负载分成多个子任务
    private List<MyRecursiveTask > createSubtasks() {
        List<MyRecursiveTask > subtasks = new ArrayList<MyRecursiveTask >();
        //将工作负载分成两个子任务   24/2=12  12/2=6
        MyRecursiveTask  subtask1 = new MyRecursiveTask (this.workLoad / 2);
        MyRecursiveTask  subtask2 = new MyRecursiveTask (this.workLoad / 2);
        subtasks.add(subtask1);
        subtasks.add(subtask2);
        return subtasks;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        MyRecursiveTask myRecursiveAction = new MyRecursiveTask(24);
        Long invoke = forkJoinPool.invoke(myRecursiveAction);
        System.out.println("最终结果: " + invoke);//4 从结果可以看出,任务被分成了4个子任务,每个子任务都是一个线程

    }

}

MyRecursiveTask 类继承自 RecursiveTask,这也就意味着它将返回一个 Long 类型的结果。MyRecursiveTask 示例也会将工作分割为子任务,并通过 fork() 方法对这些子任务计划执行。此外,本示例还通过调用每个子任务的 join() 方法收集它们返回的结果。子任务的结果随后被合并到一个更大的结果,并最终将其返回。对于不同级别的递归,这种子任务的结果合并可能会发生递归。

Fork/Join 案例Demo

需求:使用 Fork/Join 计算 1-10000的和,当一个任务的计算数量大于3000时拆分任务,数量小于3000时计算。

因为1~10000求和,耗时较少。下面我们将数据调大,求和1 ~ 59999999999(599亿),然后来对比一下使用 Fork/Join求和 和 普通求和之间的效率差异。

普通求和

    public static void main(String[] args) {
        //开始时间
        Long start = System.currentTimeMillis();
        long sum = 0l;
        for (long i = 1; i <= 59999999999L; i++) {
            sum+=i;
        }
        System.out.println(sum); //结果为负数,因为超出了long的最大值了   ,平均消耗时间:16秒
        //结束时间
        Long end = System.currentTimeMillis();
        System.out.println("消耗时间:"+(end-start));
    }

Fork/Join求和

package com;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

//配置RecursiveTask,返回值为Long
public class SumRecursiveTask   extends RecursiveTask<Long> {

    //大于3000要拆分(创建一个变量)
    //是否要拆分的临界值
    private static final long THRESHOLD = 3000L;

    //起始值
    private final long start;
    //结束值
    private final long end;

    //构造方法(传递起始值、结束值)
    public SumRecursiveTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    //任务编写完成
    @Override
    protected Long compute() {
        long length = end - start;
        //计算
        if(length < THRESHOLD){
            long sum = 0;
            for (long i = start; i <= end; i++) {
                sum +=i;
            }
            return sum;
        }else{
            //拆分
            long middle = (start + end) /2;
            SumRecursiveTask left = new SumRecursiveTask(start,middle);//从小到大
            left.fork();
            SumRecursiveTask right = new SumRecursiveTask(middle+1,end);//从大到小
            right.fork();
            return left.join() +right.join();
        }
    }

    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        //放入线程池
        ForkJoinPool pool = new ForkJoinPool();
        SumRecursiveTask task = new SumRecursiveTask(1, 59999999999L);
        Long result = pool.invoke(task);
        System.out.println("result="+result); //结果为负数,因为超出了long的最大值了   ,平均消耗时间:4秒
        Long end = System.currentTimeMillis();
        System.out.println("消耗时间:"+(end-start));
    }

}

总结: 可以发现使用工作窃取算法能大大的提高我们计算的速度,理论上只要你电脑足够快这个提升是没有上限的 ,前提是任务是可拆分的

到此这篇关于Java线程池ForkJoinPool(工作窃取算法)的使用的文章就介绍到这了,更多相关Java线程池ForkJoinPool内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java使用线程池执行定时任务

    目录 1.schedule 2.scheduleAtFixedRate 3.scheduleWithFixedDelay 总结 前言: 在 Java 语言中,有两个线程池可以执行定时任务:ScheduledThreadPool 和 SingleThreadScheduledExecutor,其中 SingleThreadScheduledExecutor 可以看做是 ScheduledThreadPool 的单线程版本,它的用法和 ScheduledThreadPool 是一样的,所以本文重点来

  • java线程池使用及原理面试题

    目录 引导语 1.说说你对线程池的理解? 2.ThreadPoolExecutor.Executor.ExecutorService.Runnable.Callable.FutureTask 之间的关系? 3.说一说队列在线程池中起的作用? 4.结合请求不断增加时,说一说线程池构造器参数的含义和表现? 5.coreSize 和 maxSize 可以动态设置么,有没有规则限制? 6.说一说对于线程空闲回收的理解,源码中如何体现的? 7.如果我想在线程池任务执行之前和之后,做一些资源清理的工作,可以

  • Java详解使用线程池处理任务方法

    什么是线程池? 线程池就是一个可以复用线程的技术. 不使用线程池的问题: 如果用户每发起一个请求,后台就创建一个新线程来处理,下次新任务来了又要创建新线程,而创建新线程的开销是很大的,这样会严重影响系统的性能. 线程池常见面试题: 1.临时线程什么时候创建? 新任务提交时发现核心线程都在忙,任务队列也满了,并且还可以创建临时线程,此时才会创建临时线程. 2.什么时候会开始拒绝任务? 核心线程和临时线程都在忙,任务队列也满了,新的任务过来的时候才会开始任务拒绝. 1.线程池处理Runnable任务

  • Java使用线程池实现socket编程的方法详解

    目录 前言 一.一个简单的C/S模型实现 1.服务器: 2.客户端: 二.线程池使用方法 1.新建一个线程池 2.用Runnable接口实现线程 3.创建线程对象并提交至线程池执行 三.结合起来 四.使用新的输入输出流 总结 前言 以多个客户端和一个服务端的socket通信为例,服务端启动时创建一个固定大小的线程池.服务端每接收到一个连接请求后(通信任务),交给线程池执行,任务类实现了Runnable接口,用于跟客户端进行读写操作,该类的对象作为任务通过execute(Runnable task

  • 详解Java线程池的使用(7种创建方法)

    目录 1. 固定数量的线程池 a.  线程池返回结果 b. ⾃定义线程池名称或优先级 2. 带缓存的线程池 3. 执⾏定时任务 a. 延迟执⾏(⼀次) b. 固定频率执⾏ 4. 定时任务单线程 5. 单线程线程池 6. 根据当前CPU⽣成线程池 7. ThreadPoolExecutor (1). Executors ⾃动创建线程池可能存在的问题 a. OOM 代码演示 b. 关于参数设置 (2).  ThreadPoolExecutor 使⽤ a. ThreadPoolExecutor 参数说

  • 一文了解Java 线程池的正确使用姿势

    目录 概述 线程池介绍 线程池创建 ThreadPoolExecutor创建 Executors创建 newFixedThreadPool newCachedThreadPool newSingleThreadExecutor newScheduledThreadPool newWorkStealingPool 线程池关键API和例子 提交执行任务API 关闭线程池API 线程池监控API 扩展API 使用注意事项 概述 线程池在平时的工作中出场率非常高,基本大家多多少少都要了解过,可能不是很全

  • Java线程池使用AbortPolicy策略

    目录 线程池ThreadPoolExecutor的拒绝策略 AbortPolicy策略 线程池ThreadPoolExecutor的拒绝策略 线程池中的线程资源全部被占用时,对新添加的Task任务有不同的处理策略,在默认的情况下, ThreadPoolExecutor类中有4种不同的处理方式: AbortPolicy:当任务添加到线程池中被拒绝时,它将抛出RejectExecutionException异常. CallerRunsPolicy:当任务添加到线程池中被拒绝时,会使用调用线程池的Th

  • Java线程池ForkJoinPool实例解析

    这篇文章主要介绍了Java线程池ForkJoinPool实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 背景:ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个"小任务",把多个"小任务"放到多个处理器核心上并行执行:当多个"小任务"执行完成之后,再将这些执行结果合并起来即可.这种思想值得学习. import java.io.IOExcept

  • 一篇文章带你了解如何正确使用java线程池

    目录 1.线程是不是越多越好? 2.如何正确使用多线程? 3.Java线程池的工作原理 4.掌握JUC线程池API 总结 1.线程是不是越多越好? 在学习多线程之前,读者可能会有疑问?如果单线程跑得太慢,那么是否就能多创建多个线程来跑任务?并发的情况,线程是不是创建越多越好?这是一个很经典的问题,画图表示一下创建很多线程的情况,然后进行情况分析. 创建线程和销毁线程都是需要时间的,如果创建时间+销毁时间>执行任务时间就很不划算 创建后的线程是需要内存去存放的,创建的线程对应一个Thread对象,

  • 以实例简介Java中线程池的工作特点

    什么原因使我们不得不使用线程池? 个人认为主要原因是:短时间内需要处理的任务数量很多 使用线程池的好处: 1.减少在创建和销毁线程上所花的时间以及系统资源的开销 2.如不使用线程池,有可能造成系统创建大量线程而导致消耗完系统内存 以下是Java自带的几种线程池: 1.newFixedThreadPool  创建一个指定工作线程数量的线程池. 每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中. 2.newCachedThreadPool 创建

  • 详解Java线程池的使用及工作原理

    一.什么是线程池? 线程池是一种用于实现计算机程序并发执行的软件设计模式.线程池维护多个线程,等待由调度程序分配任务以并发执行,该模型提高了性能,并避免了由于为短期任务频繁创建和销毁线程而导致的执行延迟. 二.线程池要解决什么问题? 说到线程池就一定要从线程的生命周期讲起. 从图中可以了解无论任务执行多久,每个线程都要经历从生到死的状态.而使用线程池就是为了避免线程的重复创建,从而节省了线程的New至Runnable, Running至Terminated的时间:同时也会复用线程,最小化的节省系

  • 剖析Fork join并发框架工作窃取算法

    目录 什么是Fork/Join框架 工作窃取算法 Fork/Join框架的介绍 使用Fork/Join框架 Fork/Join框架的异常处理 Fork/Join框架的实现原理 Fork/Join源码剖析与算法解析 与ThreadPool的区别 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架. 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,F

  • JAVA线程池原理实例详解

    本文实例讲述了JAVA线程池原理.分享给大家供大家参考,具体如下: 线程池的优点 1.线程是稀缺资源,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以重复使用. 2.可以根据系统的承受能力,调整线程池中工作线程的数量,防止因为消耗过多内存导致服务器崩溃. 线程池的创建 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQu

  • 一篇文章带你深入了解Java线程池

    目录 线程池模型 常用线程池 ThreadPoolExecutor 构造函数参数说明 线程池默认工作行为 ForkJoinPool FutureTask 线程数量分析 CPU密集型 IO密集型 总结 线程池模型 一般的池化模型会有两个方法,用于获取资源和释放资源,就像这样: public interface XXPool{ XX acquire(); void release(); } 但是,工程中的线程池一般是生产者和消费者模型,线程池是消费者,任务的提交者是生产者,下面是一个简化的线程池模型

  • java线程池合理设置最大线程数和核心线程数方式

    目录 线程池合理设置最大线程数和核心线程数 一开始是这么配置的 后来网上查询线程池核心数配置 最后我是这么配置的 线程池核心线程数与最大线程数的区别 线程池策略 饱和策略 线程池合理设置最大线程数和核心线程数 工作中有这样一个场景,需要处理千万级别的数据的一个算法,大部分是增删查的操作.这个时候就需要使用多线程去处理. 一开始是这么配置的 @Configuration @EnableAsync(proxyTargetClass = true)//利用@EnableAsync注解开启异步任务支持

  • 详解Java线程池和Executor原理的分析

    详解Java线程池和Executor原理的分析 线程池作用与基本知识 在开始之前,我们先来讨论下"线程池"这个概念."线程池",顾名思义就是一个线程缓存.它是一个或者多个线程的集合,用户可以把需要执行的任务简单地扔给线程池,而不用过多的纠结与执行的细节.那么线程池有哪些作用?或者说与直接用Thread相比,有什么优势?我简单总结了以下几点: 减小线程创建和销毁带来的消耗 对于Java Thread的实现,我在前面的一篇blog中进行了分析.Java Thread与内

随机推荐