Java7之forkjoin简介_动力节点Java学院整理

Java7引入了Fork Join的概念,来更好的支持并行运算。顾名思义,Fork Join类似与流程语言的分支,合并的概念。也就是说Java7 SE原生支持了在一个主线程中开辟多个分支线程,并且根据分支线程的逻辑来等待(或者不等待)汇集,当然你也可以fork的某一个分支线程中再开辟Fork Join,这也就可以实现Fork Join的嵌套。

有两个核心类ForkJoinPool和ForkJoinTask。

ForkJoinPool实现了ExecutorService接口,起到线程池的作用。所以他的用法和Executor框架的使用时一样的,当然Fork Join本身就是Executor框架的扩展。ForkJoinPool有3个关键的方法,来启动线程,execute(…),invoke(…),submit(…)。具体描述如下:


<SPAN style='FONT-SIZE: 9pt;"微软雅黑","sans-serif"; COLOR: #333333;"BORDER-TOP: windowtext 1pt solid; BORDER-RIGHT: windowtext 1pt solid; BORDER-BOTTOM: windowtext 1pt solid; PADDING-BOTTOM: 0cm; PADDING-TOP: 0cm; PADDING-LEFT: 0cm; BORDER-LEFT: windowtext 1pt solid; PADDING-RIGHT: 0cm; BACKGROUND-COLOR: transparent;"> <P style="BACKGROUND: white; TEXT-ALIGN: left; LINE-HEIGHT: normal;" align=left><SPAN style='FONT-SIZE: 9pt;"微软雅黑","sans-serif"; COLOR: #333333;

ForkJoinTask是分支合并的执行任何,分支合并的业务逻辑使用者可以再继承了这个抽先类之后,在抽象方法exec()中实现。其中exec()的返回结果和ForkJoinPool的执行调用方(execute(…),invoke(…),submit(…)),共同决定着线程是否阻塞,具体请看下面的测试用例。

首先,用户需要创建一个自己的ForkJoinTask。代码如下:

public class MyForkJoinTask extends ForkJoinTask {

 /**
  *
  */
 private static final long serialVersionUID = 1L;
 private V value;
 private boolean success = false;
 @Override
 public V getRawResult() {
  return value;
 }
 @Override
 protected void setRawResult(V value) {
  this.value = value;
 }
 @Override
 protected boolean exec() {
  System.out.println("exec");
  return this.success;
 }
 public boolean isSuccess() {
  return success;
 }
 public void setSuccess(boolean isSuccess) {
  this.success = isSuccess;
 }
}

测试ForkJoinPool.invoke(…):

 @Test
 public void testForkJoinInvoke() throws InterruptedException, ExecutionException {
  ForkJoinPool forkJoinPool = new ForkJoinPool();
  MyForkJoinTask task = new MyForkJoinTask();
  task.setSuccess(true);
  task.setRawResult("test");
  String invokeResult = forkJoinPool.invoke(task);
  assertEquals(invokeResult, "test");
 }
 @Test
 public void testForkJoinInvoke2() throws InterruptedException, ExecutionException {
  final ForkJoinPool forkJoinPool = new ForkJoinPool();
  final MyForkJoinTask task = new MyForkJoinTask();
  new Thread(new Runnable() {
   public void run() {
    try {
     Thread.sleep(1000);
    } catch (InterruptedException e) {
    }
    task.complete("test");
   }
  }).start();
  // exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...)
  String result = forkJoinPool.invoke(task);
  System.out.println(result);
 }
 @Test
 public void testForkJoinSubmit() throws InterruptedException, ExecutionException {
  final ForkJoinPool forkJoinPool = new ForkJoinPool();
  final MyForkJoinTask task = new MyForkJoinTask();
  task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞
  ForkJoinTask result = forkJoinPool.submit(task);
  result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete
 }

测试ForkJoinPool.submit(…):

@Test
 public void testForkJoinSubmit() throws InterruptedException, ExecutionException {
  final ForkJoinPool forkJoinPool = new ForkJoinPool();
  final MyForkJoinTask task = new MyForkJoinTask();
  task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞
  ForkJoinTask result = forkJoinPool.submit(task);
  result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete
 }
 @Test
 public void testForkJoinSubmit2() throws InterruptedException, ExecutionException {
  final ForkJoinPool forkJoinPool = new ForkJoinPool();
  final MyForkJoinTask task = new MyForkJoinTask();
  forkJoinPool.submit(task);
  Thread.sleep(1000);
 }
 @Test
 public void testForkJoinSubmit3() throws InterruptedException, ExecutionException {
  final ForkJoinPool forkJoinPool = new ForkJoinPool();
  final MyForkJoinTask task = new MyForkJoinTask();
  new Thread(new Runnable() {
   public void run() {
    try {
     Thread.sleep(1000);
    } catch (InterruptedException e) {
    }
    task.complete("test");
   }
  }).start();
  ForkJoinTask result = forkJoinPool.submit(task);
  // exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...)
  result.get();
  Thread.sleep(1000);
 }

测试ForkJoinPool.execute(…):

 @Test
 public void testForkJoinExecute() throws InterruptedException, ExecutionException {
  ForkJoinPool forkJoinPool = new ForkJoinPool();
  MyForkJoinTask task = new MyForkJoinTask();
  forkJoinPool.execute(task); // 异步执行,无视task.exec()返回值。
 }

在实际情况中,很多时候我们都需要面对经典的“分治”问题。要解决这类问题,主要任务通常被分解为多个任务块(分解阶段),其后每一小块任务被独立并行计算。一旦计算任务完成,每一快的结果会被合并或者解决(解决阶段)。ForkJoinTask天然就是为了支持“分治”问题的。

分支/合并的完整过程如下:

下面列举一个分治算法的实例。

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class MaximumFinder extends RecursiveTask<Integer> {
 private static final int SEQUENTIAL_THRESHOLD = 5;
 private final int[] data;
 private final int start;
 private final int end;
 public MaximumFinder(int[] data, int start, int end) {
 this.data = data;
 this.start = start;
 this.end = end;
 }
 public MaximumFinder(int[] data) {
 this(data, 0, data.length);
 }
 @Override
 protected Integer compute() {
 final int length = end - start;
 if (length < SEQUENTIAL_THRESHOLD) {
  return computeDirectly();
 }
 final int split = length / 2;
 final MaximumFinder left = new MaximumFinder(data, start, start + split);
 left.fork();
 final MaximumFinder right = new MaximumFinder(data, start + split, end);
 return Math.max(right.compute(), left.join());
 }
 private Integer computeDirectly() {
 System.out.println(Thread.currentThread() + ' computing: ' + start
      + ' to ' + end);
 int max = Integer.MIN_VALUE;
 for (int i = start; i < end; i++) {
  if (data[i] > max) {
  max = data[i];
  }
 }
 return max;
 }
 public static void main(String[] args) {
 // create a random data set
 final int[] data = new int[1000];
 final Random random = new Random();
 for (int i = 0; i < data.length; i++) {
  data[i] = random.nextInt(100);
 }
 // submit the task to the pool
 final ForkJoinPool pool = new ForkJoinPool(4);
 final MaximumFinder finder = new MaximumFinder(data);
 System.out.println(pool.invoke(finder));
 }
}

以上所示是小编给大家介绍的Java7之forkjoin简介_动力节点Java学院整理,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的,在此也非常感谢大家对我们网站的支持!

(0)

相关推荐

  • 详解Java5、Java6、Java7的新特性

    Java5: 1.泛型 Generics: 引用泛型之后,允许指定集合里元素的类型,免去了强制类型转换,并且能在编译时刻进行类型检查的好处.Parameterized Type作为参数和返回值,Generic是vararg.annotation.enumeration.collection的基石. A.类型安全 抛弃List.Map,使用List.Map给它们添加元素或者使用Iterator遍历时,编译期就可以给你检查出类型错误 B.方法参数和返回值加上了Type 抛弃List.Map,使用Li

  • Java通过Fork/Join优化并行计算

    本文实例为大家分享了Java通过Fork/Join优化并行计算的具体代码,供大家参考,具体内容如下 Java代码: package Threads; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; /** * Created by Frank */ public class RecursiveActionDemo extends RecursiveAction { sta

  • Java Fork/Join框架

    Fork/Join框架是ExecutorService接口的一个实现,通过它我们可以实现多进程.Fork/Join可以用来将一个大任务递归的拆分为多个小任务,目标是充分利用所有的资源尽可能增强应用的性能. 和任何ExecutorService接口的实现一样,Fork/Join也会使用线程池来分布式的管理工作线程.Fork/Join框架的独特之处在于它使用了work-stealing(工作窃取)算法.通过这个算法,工作线程在无事可做时可以窃取其它正在繁忙的线程的任务来执行. Fork/Join框架

  • 在Java8与Java7中HashMap源码实现的对比

    一.HashMap的原理介绍 此乃老生常谈,不作仔细解说. 一句话概括之:HashMap是一个散列表,它存储的内容是键值对(key-value)映射. 二.Java 7 中HashMap的源码分析 首先是HashMap的构造函数代码块1中,根据初始化的Capacity与loadFactor(加载因子)初始化HashMap. //代码块1 public HashMap(int initialCapacity, float loadFactor) { if (initialCapacity < 0)

  • 深入Java7的一些新特性以及对脚本语言支持API的介绍

    1.switch条件语句中可以加入字符串了,实现方法是利用了字符串的hashcode()值作业真正的值2.增加了一种可以在字面量中使用的进制,二进制,通过在数字前面加"0b"或"0B"3.在数字字面量中使用下划线来分隔数字方便阅读,不影响数值大小.基本原则是前后都是数字的才可以出现下划线4.java7对异常做了两个改动: 4.1.支持在一个catch子句中同时捕获多个异常,另外一个是在捕获并重新抛出异常时的异常类型更加精确.java7中Throwable类增加add

  • Java7之forkjoin简介_动力节点Java学院整理

    Java7引入了Fork Join的概念,来更好的支持并行运算.顾名思义,Fork Join类似与流程语言的分支,合并的概念.也就是说Java7 SE原生支持了在一个主线程中开辟多个分支线程,并且根据分支线程的逻辑来等待(或者不等待)汇集,当然你也可以fork的某一个分支线程中再开辟Fork Join,这也就可以实现Fork Join的嵌套. 有两个核心类ForkJoinPool和ForkJoinTask. ForkJoinPool实现了ExecutorService接口,起到线程池的作用.所以

  • Nginx简介_动力节点Java学院整理

    1.什么是Nginx Nginx来自俄罗斯的Igor Sysoev在为Rambler Media(http://www.rambler.ru/)工作期间,使用C语言开发了Nginx.Nginx作为Web服务器,一直为俄罗斯著名的门户网站Rambler Media提供着出色.稳定的服务. Igor Sysoev将Nginx的代码开源,并且赋予其最自由的2-clause BSD-like license许可证.由于Nginx使用基于事件驱动的架构能够并发处理百万级别的TCP连接,高度模块化的设计和自

  • Java Map简介_动力节点Java学院整理

    Map简介 将键映射到值的对象.一个映射不能包含重复的键:每个键最多只能映射到一个值.此接口取代 Dictionary 类,后者完全是一个抽象类,而不是一个接口. Map 接口提供三种collection 视图,允许以键集.值集或键-值映射关系集的形式查看某个映射的内容.映射顺序 定义为迭代器在映射的 collection 视图上返回其元素的顺序.某些映射实现可明确保证其顺序,如 TreeMap 类:另一些映射实现则不保证顺序,如HashMap 类. 注:将可变对象用作映射键时必须格外小心.当对

  • Java中Object toString方法简介_动力节点Java学院整理

    一.Object类介绍  Object类在Java里面是一个比较特殊的类,JAVA只支持单继承,子类只能从一个父类来继承,如果父类又是从另外一个父类继承过来,那他也只能有一个父类,父类再有父类,那也只能有一个,JAVA为了组织这个类组织得比较方便,它提供了一个最根上的类,相当于所有的类都是从这个类继承,这个类就叫Object.所以Object类是所有JAVA类的根基类,是所有JAVA类的老祖宗.所有的类,不管是谁,都是从它继承下来的. 二.toString方法介绍  一个字符串和另外一种类型连接

  • Java Set简介_动力节点Java学院整理

    1. 概述   Java 中的Set和正好和数学上直观的集(set)的概念是相同的.Set最大的特性就是不允许在其中存放的元素是重复的.根据这个特点,我们就可以使用Set 这个接口来实现前面提到的关于商品种类的存储需求.Set 可以被用来过滤在其他集合中存放的元素,从而得到一个没有包含重复新的集合. 2. 常用方法 按照定义,Set 接口继承 Collection 接口,而且它不允许集合中存在重复项.所有原始方法都是现成的,没有引入新方法.具体的 Set 实现类依赖添加的对象的 equals()

  • RandomAccessFile简介_动力节点Java学院整理

    RandomAccessFile RandomAccessFile 是随机访问文件(包括读/写)的类.它支持对文件随机访问的读取和写入,即我们可以从指定的位置读取/写入文件数据. 需要注意的是,RandomAccessFile 虽然属于java.io包,但它不是InputStream或者OutputStream的子类:它也不同于FileInputStream和FileOutputStream. FileInputStream 只能对文件进行读操作,而FileOutputStream 只能对文件进

  • Java字符编码简介_动力节点Java学院整理

    1. 概述 本文主要包括以下几个方面:编码基本知识,Java,系统软件,url,工具软件等. 在下面的描述中,将以"中文"两个字为例,经查表可以知道其GB2312编码是"d6d0 cec4",Unicode编码为"4e2d 6587",UTF编码就是"e4b8ad e69687".注意,这两个字没有iso8859-1编码,但可以用iso8859-1编码来"表示". 2. 编码基本知识 最早的编码是iso88

  • Java List简介_动力节点Java学院整理

    Java中可变数组的原理就是不断的创建新的数组,将原数组加到新的数组中,下文对Java List用法做了详解.  List:元素是有序的(怎么存的就怎么取出来,顺序不会乱),元素可以重复(角标1上有个3,角标2上也可以有个3)因为该集合体系有索引  ArrayList:底层的数据结构使用的是数组结构(数组长度是可变的百分之五十延长)(特点是查询很快,但增删较慢)线程不同步  LinkedList:底层的数据结构是链表结构(特点是查询较慢,增删较快)  Vector:底层是数组数据结构 线

  • jQuery Autocomplete简介_动力节点Java学院整理

    jQuery UI Autocomplete是jQuery UI的自动完成组件,是我用过的最强大.最灵活的Autocomplete,它支持本地的Array/JSON数组.通过ajax请求的Array/JSON数组.JSONP.以及Function(最灵活)等方式来获取数据. 支持的数据源 jQuery UI Autocomplete主要支持字符串Array.JSON两种数据格式. 普通的Array格式没有什么特殊的,如下: ["bjpowernode","动力节点"

  • JDBC简介_动力节点Java学院整理

    前言:什么是JDBC 维基百科的简介: Java 数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法.JDBC也是Sun Microsystems的商标.它JDBC是面向关系型数据库的. 简单地说,就是用于执行SQL语句的一类Java API,通过JDBC使得我们可以直接使用Java编程来对关系数据库进行操作.通过封装,可以使开发人员使用纯Java API完成S

随机推荐