浅谈Java Fork/Join并行框架

初步了解Fork/Join框架

Fork/Join 框架是java7中加入的一个并行任务框架,可以将任务分割成足够小的小任务,然后让不同的线程来做这些分割出来的小事情,然后完成之后再进行join,将小任务的结果组装成大任务的结果。下面的图片展示了这种框架的工作模型:

使用Fork/Join并行框架的前提是我们的任务可以拆分成足够小的任务,而且可以根据小任务的结果来组装出大任务的结果,一个最简单的例子是使用Fork/Join框架来求一个数组中的最大/最小值,这个任务就可以拆成很多小任务,大任务就是寻找一个大数组中的最大/最小值,我们可以将一个大数组拆成很多小数组,然后分别求解每个小数组中的最大/最小值,然后根据这些任务的结果组装出最后的最大最小值,下面的代码展示了如何通过Fork/Join求解数组的最大值:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Created by hujian06 on 2017/9/28.
*
* fork/join demo
*/
public class ForkJoinDemo {

 /**
  * how to find the max number in array by Fork/Join
  */
 private static class MaxNumber extends RecursiveTask<Integer> {

   private int threshold = 2;

   private int[] array; // the data array

   private int index0 = 0;
   private int index1 = 0;

   public MaxNumber(int[] array, int index0, int index1) {
     this.array = array;
     this.index0 = index0;
     this.index1 = index1;
   }

   @Override
   protected Integer compute() {
     int max = Integer.MIN_VALUE;
     if ((index1 - index0) <= threshold) {

       for (int i = index0;i <= index1; i ++) {
         max = Math.max(max, array[i]);
       }

     } else {
       //fork/join
       int mid = index0 + (index1 - index0) / 2;
       MaxNumber lMax = new MaxNumber(array, index0, mid);
       MaxNumber rMax = new MaxNumber(array, mid + 1, index1);

       lMax.fork();
       rMax.fork();

       int lm = lMax.join();
       int rm = rMax.join();

       max = Math.max(lm, rm);

     }

     return max;
   }
 }

 public static void main(String ... args) throws ExecutionException, InterruptedException, TimeoutException {

   ForkJoinPool pool = new ForkJoinPool();

   int[] array = {100,400,200,90,80,300,600,10,20,-10,30,2000,1000};

   MaxNumber task = new MaxNumber(array, 0, array.length - 1);

   Future<Integer> future = pool.submit(task);

   System.out.println("Result:" + future.get(1, TimeUnit.SECONDS));

 }

}

可以通过设置不同的阈值来拆分成小任务,阈值越小代表拆出来的小任务越多。

工作窃取算法

Fork/Join在实现上,大任务拆分出来的小任务会被分发到不同的队列里面,每一个队列都会用一个线程来消费,这是为了获取任务时的多线程竞争,但是某些线程会提前消费完自己的队列。而有些线程没有及时消费完队列,这个时候,完成了任务的线程就会去窃取那些没有消费完成的线程的任务队列,为了减少线程竞争,Fork/Join使用双端队列来存取小任务,分配给这个队列的线程会一直从头取得一个任务然后执行,而窃取线程总是从队列的尾端拉取task。

Frok/Join框架的实现细节

在上面的示例代码中,我们发现Fork/Join的任务是通过ForkJoinPool来执行的,所以框架的一个核心是任务的fork和join,然后就是这个ForkJoinPool。关于任务的fork和join,我们可以想象,而且也是由我们的代码自己控制的,所以要分析Fork/Join,那么ForkJoinPool最值得研究。

上面的图片展示了ForkJoinPool的类关系图,可以看到本质上它就是一个Executor。在ForkJoinPool里面,有两个特别重要的成员如下:

  volatile WorkQueue[] workQueues;
  final ForkJoinWorkerThreadFactory factory;

workQueues 用于保存向ForkJoinPool提交的任务,而具体的执行有ForkJoinWorkerThread执行,而ForkJoinWorkerThreadFactory可以用于生产出ForkJoinWorkerThread。可以看一些ForkJoinWorkerThread,可以发现每一个ForkJoinWorkerThread会有一个pool和一个workQueue,和我们上面描述的是一致的,每个线程都被分配了一个任务队列,而执行这个任务队列的线程由pool提供。

下面我们看一下当我们fork的时候发生了什么:

  public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
      ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
      ForkJoinPool.common.externalPush(this);
    return this;
  }

看上面的fork代码,可以看到首先取到了当前线程,然后判断是否是我们的ForkJoinPool专用线程,如果是,则强制类型转换(向下转换)成ForkJoinWorkerThread,然后将任务push到这个线程负责的队列里面去。如果当前线程不是ForkJoinWorkerThread类型的线程,那么就会走else之后的逻辑,大概的意思是首先尝试将任务提交给当前线程,如果不成功,则使用例外的处理方法,关于底层实现较为复杂,和我们使用Fork/Join关系也不太大,如果希望搞明白具体原理,可以看源码。

下面看一下join的流程:

 public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
      reportException(s);
    return getRawResult();
  }

    private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :
      ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
      (w = (wt = (ForkJoinWorkerThread)t).workQueue).
      tryUnpush(this) && (s = doExec()) < 0 ? s :
      wt.pool.awaitJoin(w, this, 0L) :
      externalAwaitDone();
  }

    final int doExec() {
    int s; boolean completed;
    if ((s = status) >= 0) {
      try {
        completed = exec();
      } catch (Throwable rex) {
        return setExceptionalCompletion(rex);
      }
      if (completed)
        s = setCompletion(NORMAL);
    }
    return s;
  }

   /**
   * Implements execution conventions for RecursiveTask.
   */
  protected final boolean exec() {
    result = compute();
    return true;
  }

上面展示了主要的调用链路,我们发现最后落到了我们在代码里编写的compute方法,也就是执行它,所以,我们需要知道的一点是,fork仅仅是分割任务,只有当我们执行join的时候,我们的额任务才会被执行。

如何使用Fork/Join并行框架

前文首先展示了一个求数组中最大值得例子,然后介绍了“工作窃取算法”,然后分析了Fork/Join框架的一些细节,下面才是我们最关心的,怎么使用Fork/Join框架呢?

为了使用Fork/Join框架,我们只需要继承类RecursiveTask或者RecursiveAction。前者适用于有返回值的场景,而后者适合于没有返回值的场景。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java中Lambda表达式并行与组合行为

    从串行到并行 串行指一个步骤一个步骤地处理,也就是通常情况下,代码一行一行地执行. 如果将我们常用的迭代器式的循环展开的话,就是串行执行了循环体内所定义的操作: sum += arr.get(0); sum += arr.get(1); sum += arr.get(2); //... 在书的一开始,就提到Java需要支持集合的并行计算(而Lambda为这个需求提供了可能). 这些功能将全部被实现于库代码中,对于我们使用者,实现并行的复杂性被大大降低(最低程度上只需要调用相关方法). 另外,关于

  • Java通过Fork/Join优化并行计算

    本文实例为大家分享了Java通过Fork/Join优化并行计算的具体代码,供大家参考,具体内容如下 Java代码: package Threads; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; /** * Created by Frank */ public class RecursiveActionDemo extends RecursiveAction { sta

  • 浅谈Java Fork/Join并行框架

    初步了解Fork/Join框架 Fork/Join 框架是java7中加入的一个并行任务框架,可以将任务分割成足够小的小任务,然后让不同的线程来做这些分割出来的小事情,然后完成之后再进行join,将小任务的结果组装成大任务的结果.下面的图片展示了这种框架的工作模型: 使用Fork/Join并行框架的前提是我们的任务可以拆分成足够小的任务,而且可以根据小任务的结果来组装出大任务的结果,一个最简单的例子是使用Fork/Join框架来求一个数组中的最大/最小值,这个任务就可以拆成很多小任务,大任务就是

  • 浅谈java多线程 join方法以及优先级方法

    join: 当A线程执行到了B线程的.join()方法时,A就会等待.等B线程都执行完,A才会执行. join可以用来临时加入线程执行. 1.线程使用join方法,主线程就停下,等它执行完,那么如果该线程冻结了,主线程就挂了,这也是为什么线程要抛异常的原因 2.当两个或以上线程开启了,这个A线程才使用join方法,那么主线程还是停下,这几个个线程交替进行,直到A执行完,主线程才复活 1. tostring(),方法,获取线程具体的名字,优先级 2. 优先级代表抢资源的频率 3. java中设置有

  • 浅谈java线程join方法使用方法

    本博客简介介绍一下java线程的join方法,join方法是实现线程同步,可以将原本并行执行的多线程方法变成串行执行的 如图所示代码,是并行执行的 public class ThreadTest { //private static final Long count = 10000L; public static void main(String[] args){ long base = System.currentTimeMillis(); try { ThreadJoin t1 = new

  • 浅谈Java基准性能测试之JMH

    目录 一.JMH vs JMeter 二.JMH基本用法 2.1.创建JMH项目 2.2.编写基准测试代码 2.3.JMH打包.运行 2.4.JMH与Springboot 三.JMH注解 3.1.JMH Benchmark Modes 3.2.Benchmark Time Units 3.3.Benchmark State 3.4.State Object @Setup @TearDown 3.5.Fork 3.6.Thread 3.7.Warmup 3.8.Measurement 四.输出测试

  • 剖析Fork join并发框架工作窃取算法

    目录 什么是Fork/Join框架 工作窃取算法 Fork/Join框架的介绍 使用Fork/Join框架 Fork/Join框架的异常处理 Fork/Join框架的实现原理 Fork/Join源码剖析与算法解析 与ThreadPool的区别 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架. 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,F

  • 浅谈Java中Collection和Collections的区别

    1.java.util.Collection 是一个集合接口.它提供了对集合对象进行基本操作的通用接口方法.Collection接口在Java 类库中有很多具体的实现.Collection接口的意义是为各种具体的集合提供了最大化的统一操作方式. Collection ├List │├LinkedList │├ArrayList │└Vector │ └Stack └Set 2.java.util.Collections 是一个包装类.它包含有各种有关集合操作的静态多态方法.此类不能实例化,就像一

  • 浅谈java指令重排序的问题

    指令重排序是个比较复杂.觉得有些不可思议的问题,同样是先以例子开头(建议大家跑下例子,这是实实在在可以重现的,重排序的概率还是挺高的),有个感性的认识 /** * 一个简单的展示Happen-Before的例子. * 这里有两个共享变量:a和flag,初始值分别为0和false.在ThreadA中先给 a=1,然后flag=true. * 如果按照有序的话,那么在ThreadB中如果if(flag)成功的话,则应该a=1,而a=a*1之后a仍然为1,下方的if(a==0)应该永远不会为 * 真,

  • 浅谈Java代理(jdk静态代理、动态代理和cglib动态代理)

    一.代理是Java常用的设计模式,代理类通过调用被代理类的相关方法,并对相关方法进行增强.加入一些非业务性代码,比如事务.日志.报警发邮件等操作. 二.jdk静态代理 1.业务接口 /** * 业务接口 * @author pc * */ public interface UserService { // 增加一个用户 public void addUser(); // 编辑账户 public void editUser(); } 2.业务实现类 /** * 业务实现类 * @author pc

  • 浅谈java中集合的由来,以及集合和数组的区别详解

    对象多了用集合存,数据多了用数组存. 数组是固定长度的,集合是可变长度的. 集合是:只要是对象就可以存,不管是不是同一种对象 而数组只能存储一种类型的对象 下面是集合的框架: 以上就是小编为大家带来的浅谈java中集合的由来,以及集合和数组的区别详解的全部内容了,希望对大家有所帮助,多多支持我们~

  • 浅谈Java垃圾回收的实现过程

    本教程是为了理解基本的Java垃圾回收以及它是如何工作的.这是垃圾回收教程系列的第二部分.希望你已经读过了第一部分:<简单介绍Java垃圾回收机制>. Java垃圾回收是一项自动化的过程,用来管理程序所使用的运行时内存.通过这一自动化过程,JVM解除了程序员在程序中分配和释放内存资源的开销. 启动Java垃圾回收 作为一个自动的过程,程序员不需要在代码中显示地启动垃圾回收过程.System.gc()和Runtime.gc()用来请求JVM启动垃圾回收. 虽然这个请求机制提供给程序员一个启动GC

随机推荐