java中的forkjoin框架的使用

fork join框架是java 7中引入框架,这个框架的引入主要是为了提升并行计算的能力。

fork join主要有两个步骤,第一就是fork,将一个大任务分成很多个小任务,第二就是join,将第一个任务的结果join起来,生成最后的结果。如果第一步中并没有任何返回值,join将会等到所有的小任务都结束。

还记得之前的文章我们讲到了thread pool的基本结构吗?

  • ExecutorService - ForkJoinPool 用来调用任务执行。
  • workerThread - ForkJoinWorkerThread 工作线程,用来执行具体的任务。
  • task - ForkJoinTask 用来定义要执行的任务。

下面我们从这三个方面来详细讲解fork join框架。

ForkJoinPool

ForkJoinPool是一个ExecutorService的一个实现,它提供了对工作线程和线程池的一些便利管理方法。

public class ForkJoinPool extends AbstractExecutorService

一个work thread一次只能处理一个任务,但是ForkJoinPool并不会为每个任务都创建一个单独的线程,它会使用一个特殊的数据结构double-ended queue来存储任务。这样的结构可以方便的进行工作窃取(work-stealing)。

什么是work-stealing呢?

默认情况下,work thread从分配给自己的那个队列头中取出任务。如果这个队列是空的,那么这个work thread会从其他的任务队列尾部取出任务来执行,或者从全局队列中取出。这样的设计可以充分利用work thread的性能,提升并发能力。

下面看下怎么创建一个ForkJoinPool。

最常见的方法就是使用ForkJoinPool.commonPool()来创建,commonPool()为所有的ForkJoinTask提供了一个公共默认的线程池。

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

另外一种方式是使用构造函数:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);

这里的参数是并行级别,2指的是线程池将会使用2个处理器核心。

ForkJoinWorkerThread

ForkJoinWorkerThread是使用在ForkJoinPool的工作线程。

public class ForkJoinWorkerThread extends Thread
}

和一般的线程不一样的是它定义了两个变量:

 final ForkJoinPool pool;    // the pool this thread works in
 final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

一个是该worker thread所属的ForkJoinPool。 另外一个是支持 work-stealing机制的Queue。

再看一下它的run方法:

 public void run() {
  if (workQueue.array == null) { // only run once
   Throwable exception = null;
   try {
    onStart();
    pool.runWorker(workQueue);
   } catch (Throwable ex) {
    exception = ex;
   } finally {
    try {
     onTermination(exception);
    } catch (Throwable ex) {
     if (exception == null)
      exception = ex;
    } finally {
     pool.deregisterWorker(this, exception);
    }
   }
  }
 }

简单点讲就是从Queue中取出任务执行。

ForkJoinTask

ForkJoinTask是ForkJoinPool中运行的任务类型。通常我们会用到它的两个子类:RecursiveActionRecursiveTask<V>。

他们都定义了一个需要实现的compute()方法用来实现具体的业务逻辑。不同的是RecursiveAction只是用来执行任务,而RecursiveTask<V>可以有返回值。

既然两个类都带了Recursive,那么具体的实现逻辑也会跟递归有关,我们举个使用RecursiveAction来打印字符串的例子:

public class CustomRecursiveAction extends RecursiveAction {

 private String workload = "";
 private static final int THRESHOLD = 4;

 private static Logger logger =
   Logger.getAnonymousLogger();

 public CustomRecursiveAction(String workload) {
  this.workload = workload;
 }

 @Override
 protected void compute() {
  if (workload.length() > THRESHOLD) {
   ForkJoinTask.invokeAll(createSubtasks());
  } else {
   processing(workload);
  }
 }

 private List<CustomRecursiveAction> createSubtasks() {
  List<CustomRecursiveAction> subtasks = new ArrayList<>();

  String partOne = workload.substring(0, workload.length() / 2);
  String partTwo = workload.substring(workload.length() / 2, workload.length());

  subtasks.add(new CustomRecursiveAction(partOne));
  subtasks.add(new CustomRecursiveAction(partTwo));

  return subtasks;
 }

 private void processing(String work) {
  String result = work.toUpperCase();
  logger.info("This result - (" + result + ") - was processed by "
    + Thread.currentThread().getName());
 }
}

上面的例子使用了二分法来打印字符串。

我们再看一个RecursiveTask<V>的例子:

public class CustomRecursiveTask extends RecursiveTask<Integer> {
 private int[] arr;

 private static final int THRESHOLD = 20;

 public CustomRecursiveTask(int[] arr) {
  this.arr = arr;
 }

 @Override
 protected Integer compute() {
  if (arr.length > THRESHOLD) {
   return ForkJoinTask.invokeAll(createSubtasks())
     .stream()
     .mapToInt(ForkJoinTask::join)
     .sum();
  } else {
   return processing(arr);
  }
 }

 private Collection<CustomRecursiveTask> createSubtasks() {
  List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
  dividedTasks.add(new CustomRecursiveTask(
    Arrays.copyOfRange(arr, 0, arr.length / 2)));
  dividedTasks.add(new CustomRecursiveTask(
    Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
  return dividedTasks;
 }

 private Integer processing(int[] arr) {
  return Arrays.stream(arr)
    .filter(a -> a > 10 && a < 27)
    .map(a -> a * 10)
    .sum();
 }
}

和上面的例子很像,不过这里我们需要有返回值。

在ForkJoinPool中提交Task

有了上面的两个任务,我们就可以在ForkJoinPool中提交了:

int[] intArray= {12,12,13,14,15};
  CustomRecursiveTask customRecursiveTask= new CustomRecursiveTask(intArray);

  int result = forkJoinPool.invoke(customRecursiveTask);
  System.out.println(result);

上面的例子中,我们使用invoke来提交,invoke将会等待任务的执行结果。

如果不使用invoke,我们也可以将其替换成fork()和join():

customRecursiveTask.fork();
  int result2= customRecursiveTask.join();
  System.out.println(result2);

fork() 是将任务提交给pool,但是并不触发执行, join()将会真正的执行并且得到返回结果。

本文的例子可以参考https://github.com/ddean2009/learn-java-concurrency/tree/master/forkjoin

到此这篇关于java中的fork join框架的使用的文章就介绍到这了,更多相关java fork join框架内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 浅谈Java Fork/Join并行框架

    初步了解Fork/Join框架 Fork/Join 框架是java7中加入的一个并行任务框架,可以将任务分割成足够小的小任务,然后让不同的线程来做这些分割出来的小事情,然后完成之后再进行join,将小任务的结果组装成大任务的结果.下面的图片展示了这种框架的工作模型: 使用Fork/Join并行框架的前提是我们的任务可以拆分成足够小的任务,而且可以根据小任务的结果来组装出大任务的结果,一个最简单的例子是使用Fork/Join框架来求一个数组中的最大/最小值,这个任务就可以拆成很多小任务,大任务就是

  • Java Fork/Join框架

    Fork/Join框架是ExecutorService接口的一个实现,通过它我们可以实现多进程.Fork/Join可以用来将一个大任务递归的拆分为多个小任务,目标是充分利用所有的资源尽可能增强应用的性能. 和任何ExecutorService接口的实现一样,Fork/Join也会使用线程池来分布式的管理工作线程.Fork/Join框架的独特之处在于它使用了work-stealing(工作窃取)算法.通过这个算法,工作线程在无事可做时可以窃取其它正在繁忙的线程的任务来执行. Fork/Join框架

  • Java多线程ForkJoinPool实例详解

    引言 java 7提供了另外一个很有用的线程池框架,Fork/Join框架 理论 Fork/Join框架主要有以下两个类组成. * ForkJoinPool 这个类实现了ExecutorService接口和工作窃取算法(Work-Stealing Algorithm).它管理工作者线程,并提供任务的状态信息,以及任务的执行信息 * ForkJoinTask 这个类是一个将在ForkJoinPool执行的任务的基类. Fork/Join框架提供了在一个任务里执行fork()和join()操作的机制

  • Java ForkJoin框架的原理及用法

    这篇文章主要介绍了Java ForkJoin框架的原理及用法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 ForkJoin分析 一.ForkJoin ForkJoin是由JDK1.7后提供多线并发处理框架.ForkJoin的框架的基本思想是分而治之.什么是分而治之?分而治之就是将一个复杂的计算,按照设定的阈值进行分解成多个计算,然后将各个计算结果进行汇总.相应的ForkJoin将复杂的计算当做一个任务.而分解的多个计算则是当做一个子任务. 二

  • Java线程池ForkJoinPool实例解析

    这篇文章主要介绍了Java线程池ForkJoinPool实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 背景:ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个"小任务",把多个"小任务"放到多个处理器核心上并行执行:当多个"小任务"执行完成之后,再将这些执行结果合并起来即可.这种思想值得学习. import java.io.IOExcept

  • Java通过Fork/Join优化并行计算

    本文实例为大家分享了Java通过Fork/Join优化并行计算的具体代码,供大家参考,具体内容如下 Java代码: package Threads; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; /** * Created by Frank */ public class RecursiveActionDemo extends RecursiveAction { sta

  • java8中forkjoin和optional框架使用

    并行流与串行流 并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流. java 8 中将并行进行了优化,我们可以很容易的对数据进行并行操作.Stream API 可以声明性地通过 parallel()与 sequential()在并行流与顺序流之间进行切换. 了解 Fork/Join 框架 Fork/Join 框架:就是在必要的情况下,将一个大任务,进形拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运行的结果进行join汇总. Fork/Join 框架

  • Java7之forkjoin简介_动力节点Java学院整理

    Java7引入了Fork Join的概念,来更好的支持并行运算.顾名思义,Fork Join类似与流程语言的分支,合并的概念.也就是说Java7 SE原生支持了在一个主线程中开辟多个分支线程,并且根据分支线程的逻辑来等待(或者不等待)汇集,当然你也可以fork的某一个分支线程中再开辟Fork Join,这也就可以实现Fork Join的嵌套. 有两个核心类ForkJoinPool和ForkJoinTask. ForkJoinPool实现了ExecutorService接口,起到线程池的作用.所以

  • java中的forkjoin框架的使用

    fork join框架是java 7中引入框架,这个框架的引入主要是为了提升并行计算的能力. fork join主要有两个步骤,第一就是fork,将一个大任务分成很多个小任务,第二就是join,将第一个任务的结果join起来,生成最后的结果.如果第一步中并没有任何返回值,join将会等到所有的小任务都结束. 还记得之前的文章我们讲到了thread pool的基本结构吗? ExecutorService - ForkJoinPool 用来调用任务执行. workerThread - ForkJoi

  • 深入剖析java中的集合框架

    解析:如果并不知道程序运行时会需要多少对象,或者需要更复杂方式存储对象,那么可以使用Java集合框架. 如果启用集合的删除方法,那么集合中所有元素的索引会自动维护. 集合完全弥补了数组的缺陷. 02.集合框架的内容 集合框架都包含三大块内容:对外的接口,接口的实现和对集合运算的算法 01.接口:表示集合的抽象数据类型 02.实现:集合框架中接口的具体实现 03.算法:在一个实现了某个集合框架的接口的对象身上完成某种有用的计算方法 java集合框架简图: 01.Collection接口存在储存一组

  • 在Java中使用日志框架log4j的方法

    日志就是记录程序的运行轨迹,方便快速定位问题 如果用System.out.println(),信息是打印在控制台.等到产品上线后没有控制台,如果有报错信息,根本不知道去哪里看,就不知道是哪里出错. 而且开发的时候希望打印输出的内容多.方便排查,上线后只希望打印容易出错的部分.System.out.println()满足不了这个需求 而日志框架可以让错误信息输出到多个指定文件,不同的文件有不同的输出内容.方便排错,定位错误 一.log4j介绍 Log4j有三个主要的组件/对象:Loggers(记录

  • 一文带你了解Java中的ForkJoin

    目录 什么是ForkJoin? ForkJoinTask 任务 ForkJoinPool 线程池 工作窃取算法 构造方法 提交方法 创建工人(线程) 例:ForkJoinTask实现归并排序 ForkJoin计算流程 前言: ForkJoin是在Java7中新加入的特性,大家可能对其比较陌生,但是Java8中Stream的并行流parallelStream就是依赖于ForkJoin.在ForkJoin体系中最为关键的就是ForkJoinTask和ForkJoinPool,ForkJoin就是利用

  • java中的SpringBoot框架

    目录 适合人群 背景 为什么不讲Spring? SpringBoot是啥 ? 项目搭建 创建项目 & 配置依赖 application.yml 入口类 控制器 Controller 适合人群 学完Java基础 想通过Java快速构建web应用程序 想学习或了解SpringBoot 背景 本节给大家讲讲 Java的SpringBoot框架,.在我们在产品开发中,一般我们都会选择比较稳定的框架来帮我们加速开发,不会自己去造轮子,而在java众多框架中,spring框架表现的非常好,大部分公司都会首选

  • Java中的集合框架

    概念 Java中的集合类:是一种工具类,就像是容器,储存任意数量的具有共同属性的对象 集合的作用 集合框架的类型: collection和map 都是接口,不能实例化 List和Queue有序.可重复,Set无序.不可重复 list添加元素两种add方法 1.直接添加,元素添加在队尾: 对象存入集合都变成object类型,取出时需要类型转换 2.指定位置添加,指定的位置(从0开始)不能超过队列的长度,否则报错(数组下标越界). list的两种addAll方法:添加类的数组 public void

  • Java中的Struts2框架拦截器之实例代码

    本文实例为大家分享了Struts2框架拦截器实例的示例代码,供大家参考,具体内容如下 在看拦截器的小例子的前我们先来看看sturts2的原理 struts2自己是有拦截器的,通过拦截器可以拦截用户请求,并作出处理 拦截器作用有很多,譬如: 1.Action里面有个属性,这个属性我想在action执行之前改成别的值,可以用拦截器解决. 2.比如每个人执行action之前,我可以查看他们有没有这个权限执行这个action. 如果不设置拦截器,你要在每种action方法之前设置判定程序,非常繁琐. 拦

  • 分析JAVA中几种常用的RPC框架

    RPC是远程过程调用的简称,广泛应用在大规模分布式应用中,作用是有助于系统的垂直拆分,使系统更易拓展.Java中的RPC框架比较多,各有特色,广泛使用的有RMI.Hessian.Dubbo等.RPC还有一个特点就是能够跨语言,本文只以JAVA语言里的RPC为例. 对于RPC有一个逻辑关系图,以RMI为例: 其他的框架结构也类似,区别在于对象的序列化方法,传输对象的通讯协议,以及注册中心的管理与failover设计(利用zookeeper). 客户端和服务端可以运行在不同的JVM中,Client只

  • Java中dubbo+zookeeper微服务架构简介

    目录 1.Apache Dubbo概述 1.1.Dubbo简介 1.2.Dubbo的服务架构 2.服务注册中心 Zookeeper 2.1.ZooKeeper介绍 2.2.ZooKeeper安装 2.3.启动 ZooKeeper 3.ZooKeeper快速入门 3.1.服务提供方 3.2.服务消费方 3.3.问题思考 4. Dubbo管理控制台 4.1.安装 5. Dubbo相关配置 5.1.包扫描 5.2.Dubbo 协议 5.3.负载均衡 5.4. Dubbo无法发布被事务代理的服务 1.A

随机推荐