Java Fork/Join框架

Fork/Join框架是ExecutorService接口的一个实现,通过它我们可以实现多进程。Fork/Join可以用来将一个大任务递归的拆分为多个小任务,目标是充分利用所有的资源尽可能增强应用的性能。

和任何ExecutorService接口的实现一样,Fork/Join也会使用线程池来分布式的管理工作线程。Fork/Join框架的独特之处在于它使用了work-stealing(工作窃取)算法。通过这个算法,工作线程在无事可做时可以窃取其它正在繁忙的线程的任务来执行。

Fork/Join框架的核心是ForkJoinPool类,一个AbstractExecutorService类的子类。ForkJoinPool实现了核心的work-stealing算法并可以执行ForkJoinTask处理。

基础用法

使用Fork/Join框架的第一步是编写执行碎片任务的代码。要编写的代码类似如下伪代码:

if 任务足够小:
 直接执行任务
else:
 将任务切成两个小任务
 执行两个小任务并等待结果

使用ForkJoinTask子类来封装如上的代码,通常会使用一些JDK提供的类,使用的有RecursiveTask(这个类会返回一个结果)和RecursiveAction两个类。

在准备好ForkJoinTask子类后,创建一个代表所有任务的对象,并将之传递给一个ForkJoinPool实例的invoke()方法。

由模糊到清晰

为了辅助理解Fork/Join框架是如何工作的,我们使用一个案例来进行说明:比如对一张图片进行模糊处理。我们用一个整型数组表示图片,其中的每个数值代表一个像素的颜色。被模糊的图片也用一个同等长度的数组来表示。

执行模糊是通过对代表图片的每个像素进行处理实现的。计算每个像素与其周围像素的均值(红黄蓝三原色的均值),计算生成的结果数组就是模糊后的图片。由于代表图像的通常都是一个大数组,整个处理过程需要通常会需要很多时间。可以使用Fork/Join框架利用多处理器系统上的并发处理优势来进行提速。下面是一个可能的实现:

package com.zhyea.robin;

import java.util.concurrent.RecursiveAction;

public class ForkBlur extends RecursiveAction {

  private int[] mSource;
  private int mStart;
  private int mLength;
  private int[] mDestination;

  // 处理窗口大小; 需要是一个奇数.
  private int mBlurWidth = 15;

  public ForkBlur(int[] src, int start, int length, int[] dst) {
    mSource = src;
    mStart = start;
    mLength = length;
    mDestination = dst;
  }

  protected void computeDirectly() {
    int sidePixels = (mBlurWidth - 1) / 2;
    for (int index = mStart; index < mStart + mLength; index++) {
      // 计算平均值.
      float rt = 0, gt = 0, bt = 0;
      for (int mi = -sidePixels; mi <= sidePixels; mi++) {

        int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1);

        int pixel = mSource[mindex];
        rt += (float) ((pixel & 0x00ff0000) >> 16) / mBlurWidth;
        gt += (float) ((pixel & 0x0000ff00) >> 8) / mBlurWidth;
        bt += (float) ((pixel & 0x000000ff) >> 0) / mBlurWidth;
      }

      // 重组目标像素.
      int dpixel = (0xff000000) |
          (((int) rt) << 16) |
          (((int) gt) << 8) |
          (((int) bt) << 0);
      mDestination[index] = dpixel;
    }
  }

  ....
}

现在实现抽象方法compute(),在这个方法中既实现了模糊操作,也实现了将一个任务拆分成两个小任务。这里仅是简单依据数组长度来决定是直接执行任务还是将之拆分成两个小任务:

  protected static int sThreshold = 100000;

  protected void compute() {
    if (mLength < sThreshold) {
      computeDirectly();
      return;
    }

    int split = mLength / 2;

    invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
        new ForkBlur(mSource, mStart + split, mLength - split,
            mDestination));
  }

因为上面这些方法的实现是定义在RecursiveAction的一个子类中,可以直接在一个ForkJoinPool中创建并运行任务。具体步骤如下:

1. 创建一个代表要执行的任务的对象:

// src 表示源图片像素的数组
// dst 表示生成的图片的像素
ForkBlur fb = new ForkBlur(src, 0, src.length, dst);

2. 创建一个运行任务的ForkJoinPool实例:

ForkJoinPool pool = new ForkJoinPool();

3. 运行任务:

pool.invoke(fb);

在源代码中还包含了一些创建目标图片的代码。具体参考ForkBlur示例。

标准实现

要使用Fork/Join框架按自定义的算法在多核系统上执行并发任务当然需要实现自定义的类了(比如之前我们实现的ForkBlur类)。除此之外,在JavaSE中已经在广泛使用Fork/Join框架的一些特性了。比如Java8中的java.util.Arrays类的parallelSort()方法就使用了Fork/Join框架。具体可以参考Java API文档。

Fork/Join框架的另一个实现在java.util.streams包下,这也是java8的Lambda特性的一部分。

(0)

相关推荐

  • Java7之forkjoin简介_动力节点Java学院整理

    Java7引入了Fork Join的概念,来更好的支持并行运算.顾名思义,Fork Join类似与流程语言的分支,合并的概念.也就是说Java7 SE原生支持了在一个主线程中开辟多个分支线程,并且根据分支线程的逻辑来等待(或者不等待)汇集,当然你也可以fork的某一个分支线程中再开辟Fork Join,这也就可以实现Fork Join的嵌套. 有两个核心类ForkJoinPool和ForkJoinTask. ForkJoinPool实现了ExecutorService接口,起到线程池的作用.所以

  • 在Java8与Java7中HashMap源码实现的对比

    一.HashMap的原理介绍 此乃老生常谈,不作仔细解说. 一句话概括之:HashMap是一个散列表,它存储的内容是键值对(key-value)映射. 二.Java 7 中HashMap的源码分析 首先是HashMap的构造函数代码块1中,根据初始化的Capacity与loadFactor(加载因子)初始化HashMap. //代码块1 public HashMap(int initialCapacity, float loadFactor) { if (initialCapacity < 0)

  • Java通过Fork/Join优化并行计算

    本文实例为大家分享了Java通过Fork/Join优化并行计算的具体代码,供大家参考,具体内容如下 Java代码: package Threads; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; /** * Created by Frank */ public class RecursiveActionDemo extends RecursiveAction { sta

  • 详解Java5、Java6、Java7的新特性

    Java5: 1.泛型 Generics: 引用泛型之后,允许指定集合里元素的类型,免去了强制类型转换,并且能在编译时刻进行类型检查的好处.Parameterized Type作为参数和返回值,Generic是vararg.annotation.enumeration.collection的基石. A.类型安全 抛弃List.Map,使用List.Map给它们添加元素或者使用Iterator遍历时,编译期就可以给你检查出类型错误 B.方法参数和返回值加上了Type 抛弃List.Map,使用Li

  • 深入Java7的一些新特性以及对脚本语言支持API的介绍

    1.switch条件语句中可以加入字符串了,实现方法是利用了字符串的hashcode()值作业真正的值2.增加了一种可以在字面量中使用的进制,二进制,通过在数字前面加"0b"或"0B"3.在数字字面量中使用下划线来分隔数字方便阅读,不影响数值大小.基本原则是前后都是数字的才可以出现下划线4.java7对异常做了两个改动: 4.1.支持在一个catch子句中同时捕获多个异常,另外一个是在捕获并重新抛出异常时的异常类型更加精确.java7中Throwable类增加add

  • Java Fork/Join框架

    Fork/Join框架是ExecutorService接口的一个实现,通过它我们可以实现多进程.Fork/Join可以用来将一个大任务递归的拆分为多个小任务,目标是充分利用所有的资源尽可能增强应用的性能. 和任何ExecutorService接口的实现一样,Fork/Join也会使用线程池来分布式的管理工作线程.Fork/Join框架的独特之处在于它使用了work-stealing(工作窃取)算法.通过这个算法,工作线程在无事可做时可以窃取其它正在繁忙的线程的任务来执行. Fork/Join框架

  • 轻轻松松吃透Java并发fork/join框架

    目录 一.概述 二.说一说 RecursiveTask 三. Fork/Join框架基本使用 四.工作顺序图 1.ForkJoinPool构造函数 2.fork方法和join方法 五.使用Fork/Join解决实际问题 1.使用归并算法解决排序问题 2.使用Fork/Join运行归并算法 Fork / Join 是一个工具框架 , 其核心思想在于将一个大运算切成多个小份 , 最大效率的利用资源 , 其主要涉及到三个类 : ForkJoinPool / ForkJoinTask / Recursi

  • Java并发中的Fork/Join 框架机制详解

    什么是 Fork/Join 框架 Fork/Join 框架是一种在 JDk 7 引入的线程池,用于并行执行把一个大任务拆成多个小任务并行执行,最终汇总每个小任务结果得到大任务结果的特殊任务.通过其命名也很容易看出框架主要分为 Fork 和 Join 两个阶段,第一阶段 Fork 是把一个大任务拆分为多个子任务并行的执行,第二阶段 Join 是合并这些子任务的所有执行结果,最后得到大任务的结果. 这里不难发现其执行主要流程:首先判断一个任务是否足够小,如果任务足够小,则直接计算,否则,就拆分成几个

  • Java多线程高并发中的Fork/Join框架机制详解

    1.Fork/Join框架简介 Fork/Join 它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出.Fork/Join 框架要完成两件事情: Fork:把一个复杂任务进行分拆,大事化小 :把一个复杂任务进行分拆,大事化小 Join:把分拆任务的结果进行合并 在 Java 的 Fork/Join 框架中,使用两个类完成上述操作: ForkJoinTask: 我们要使用 Fork/Join 框架,首先需要创建一个 ForkJoin 任务.该类提供了

  • 浅谈Java Fork/Join并行框架

    初步了解Fork/Join框架 Fork/Join 框架是java7中加入的一个并行任务框架,可以将任务分割成足够小的小任务,然后让不同的线程来做这些分割出来的小事情,然后完成之后再进行join,将小任务的结果组装成大任务的结果.下面的图片展示了这种框架的工作模型: 使用Fork/Join并行框架的前提是我们的任务可以拆分成足够小的任务,而且可以根据小任务的结果来组装出大任务的结果,一个最简单的例子是使用Fork/Join框架来求一个数组中的最大/最小值,这个任务就可以拆成很多小任务,大任务就是

  • Java并发编程之Fork/Join框架的理解

    一.Fork/Join框架的理解 ForkJoinTask类属于java.util.concurrent 包下: ForkJoinTask类下有2个子类,分别为RecursiveTask和RecursiveAction类:(lz示例中使用RecursiveTask类进行重写compute()方法进行实现数值的累加计算) ForkJoinTask类 将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出. 二.Fork/Join框架使用示例 示例场景:对数值进

  • 剖析Fork join并发框架工作窃取算法

    目录 什么是Fork/Join框架 工作窃取算法 Fork/Join框架的介绍 使用Fork/Join框架 Fork/Join框架的异常处理 Fork/Join框架的实现原理 Fork/Join源码剖析与算法解析 与ThreadPool的区别 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架. 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,F

  • Golang的Fork/Join实现代码

    目录 1.Fork/Join是什么 2.Golang中的Fork/Join实现 3.测试验证 4.小优化 5.后续计划 做过Java开发的同学肯定知道,JDK7加入的Fork/Join是一个非常优秀的设计,到了JDK8,又结合并行流中进行了优化和增强,是一个非常好的工具. 1.Fork/Join是什么 Fork/Join本质上是一种任务分解,即:将一个很大的任务分解成若干个小任务,然后再对小任务进一步分解,直到最小颗粒度,然后并发执行. 这么做的优点很明显,就是可以大幅提升计算性能,缺点嘛,也有

  • java持久层框架mybatis防止sql注入的方法

    sql注入大家都不陌生,是一种常见的攻击方式,攻击者在界面的表单信息或url上输入一些奇怪的sql片段,例如"or '1'='1'"这样的语句,有可能入侵参数校验不足的应用程序.所以在我们的应用中需要做一些工作,来防备这样的攻击方式.在一些安全性很高的应用中,比如银行软件,经常使用将sql语句全部替换为存储过程这样的方式,来防止sql注入,这当然是一种很安全的方式,但我们平时开发中,可能不需要这种死板的方式. mybatis框架作为一款半自动化的持久层框架,其sql语句都要我们自己来手

随机推荐