Java实现FutureTask的示例详解

目录
  • 前言
  • FutureTask
  • 自己实现FutureTask
    • 工具准备
    • FutureTask设计与实现
  • 总结

前言

在并发编程当中我们最常见的需求就是启动一个线程执行一个函数去完成我们的需求,而在这种需求当中,我们常常需要函数有返回值。比如我们需要同一个非常大的数组当中数据的和,让每一个线程求某一个区间内部的和,最终将这些和加起来,那么每个线程都需要返回对应区间的和。而在Java当中给我们提供了这种机制,去实现这一个效果——FutureTask

FutureTask

在自己写FutureTask之前我们首先写一个例子来回顾一下FutureTask的编程步骤:

写一个类实现Callable接口。

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

实现接口就实现call即可,可以看到这个函数是有返回值的,而FutureTask返回给我们的值就是这个函数的返回值。

new一个FutureTask对象,并且new一个第一步写的类,new FutureTask<>(callable实现类)

最后将刚刚得到的FutureTask对象传入Thread类当中,然后启动线程即可new Thread(futureTask).start();

然后我们可以调用FutureTaskget方法得到返回的结果futureTask.get();

假如有一个数组data,长度为100000,现在有10个线程,第i个线程求数组[i * 10000, (i + 1) * 10000)所有数据的和,然后将这十个线程的结果加起来。

import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskDemo {

  public static void main(String[] args) throws ExecutionException, InterruptedException {
    int[] data = new int[100000];
    Random random = new Random();
    for (int i = 0; i < 100000; i++) {
      data[i] = random.nextInt(10000);
    }
    @SuppressWarnings("unchecked")
    FutureTask<Integer>[] tasks = (FutureTask<Integer>[]) Array.newInstance(FutureTask.class, 10);
    // 设置10个 futuretask 任务计算数组当中数据的和
    for (int i = 0; i < 10; i++) {
      int idx = i;
      tasks[i] = new FutureTask<>(() -> {
        int sum = 0;
        for (int k = idx * 10000; k < (idx + 1) * 10000; k++) {
          sum += data[k];
        }
        return sum;
      });
    }
    // 开启线程执行 futureTask 任务
    for (FutureTask<Integer> futureTask : tasks) {
      new Thread(futureTask).start();
    }
    int threadSum = 0;
    for (FutureTask<Integer> futureTask : tasks) {
      threadSum += futureTask.get();
    }
    int sum = Arrays.stream(data).sum();
    System.out.println(sum == threadSum); // 结果始终为 true
  }
}

可能你会对FutureTask的使用方式感觉困惑,或者不是很清楚,现在我们来仔细捋一下思路。

首先启动一个线程要么是继承自Thread类,然后重写Thread类的run方法,要么是给Thread类传递一个实现了Runnable的类对象,当然可以用匿名内部类实现。

既然我们的FutureTask对象可以传递给Thread类,说明FutureTask肯定是实现了Runnable接口,我们现在来看一下FutureTask的继承体系。

​ 可以发现的是FutureTask确实实现了Runnable接口,同时还实现了Future接口,这个Future接口主要提供了后面我们使用FutureTask的一系列函数比如get

看到这里你应该能够大致想到在FutureTask中的run方法会调用Callable当中实现的call方法,然后将结果保存下来,当调用get方法的时候再将这个结果返回。

自己实现FutureTask

工具准备

经过上文的分析你可能已经大致了解了FutureTask的大致执行过程了,但是需要注意的是,如果你执行FutureTaskget方法是可能阻塞的,因为可能Callablecall方法还没有执行完成。因此在get方法当中就需要有阻塞线程的代码,但是当call方法执行完成之后需要将这些线程都唤醒。

在本篇文章当中使用锁ReentrantLock和条件变量Condition进行线程的阻塞和唤醒,在我们自己动手实现FutureTask之前,我们先熟悉一下上面两种工具的使用方法。

ReentrantLock主要有两个方法:

  • lock对临界区代码块进行加锁。
  • unlock对临界区代码进行解锁。

Condition主要有三个方法:

  • await阻塞调用这个方法的线程,等待其他线程唤醒。
  • signal唤醒一个被await方法阻塞的线程。
  • signalAll唤醒所有被await方法阻塞的线程。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class LockDemo {

  private ReentrantLock lock;
  private Condition condition;

  LockDemo() {
    lock = new ReentrantLock();
    condition = lock.newCondition();
  }

  public void blocking() {
    lock.lock();
    try {
      System.out.println(Thread.currentThread() + " 准备等待被其他线程唤醒");
      condition.await();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }finally {
      lock.unlock();
    }
  }

  public void inform() throws InterruptedException {
    // 先休眠两秒 等他其他线程先阻塞
    TimeUnit.SECONDS.sleep(2);
    lock.lock();
    try {
      System.out.println(Thread.currentThread() + " 准备唤醒其他线程");
      condition.signal(); // 唤醒一个被 await 方法阻塞的线程
      // condition.signalAll(); // 唤醒所有被 await 方法阻塞的线程
    }finally {
      lock.unlock();
    }
  }

  public static void main(String[] args) {
    LockDemo lockDemo = new LockDemo();
    Thread thread = new Thread(() -> {
      lockDemo.blocking(); // 执行阻塞线程的代码
    }, "Blocking-Thread");
    Thread thread1 = new Thread(() -> {
      try {
        lockDemo.inform(); // 执行唤醒线程的代码
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }, "Inform-Thread");
    thread.start();
    thread1.start();
  }
}

上面的代码的输出:

Thread[Blocking-Thread,5,main] 准备等待被其他线程唤醒
Thread[Inform-Thread,5,main] 准备唤醒其他线程

FutureTask设计与实现

在前文当中我们已经谈到了FutureTask的实现原理,主要有以下几点:

  • 构造函数需要传入一个实现了Callable接口的类对象,这个将会在FutureTaskrun方法执行,然后得到函数的返回值,并且将返回值存储起来。
  • 当线程调用get方法的时候,如果这个时候Callable当中的call已经执行完成,直接返回call函数返回的结果就行,如果call函数还没有执行完成,那么就需要将调用get方法的线程挂起,这里我们可以使用condition.await()将线程挂起。
  • call函数执行完成之后,需要将之前被get方法挂起的线程唤醒继续执行,这里使用condition.signalAll()将所有挂起的线程唤醒。
  • 因为是我们自己实现FutureTask,功能不会那么齐全,只需要能够满足我们的主要需求即可,主要是帮助大家了解FutureTask原理。

实现代码如下(分析都在注释当中):

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

// 这里需要实现 Runnable 接口,因为需要将这个对象放入 Thread 类当中
// 而 Thread 要求传入的对象实现了 Runnable 接口
public class MyFutureTask<V> implements Runnable {

  private final Callable<V> callable;
  private Object returnVal; // 这个表示我们最终的返回值
  private final ReentrantLock lock;
  private final Condition condition;

  public MyFutureTask(Callable<V> callable) {
    // 将传入的 callable 对象存储起来 方便在后面的 run 方法当中调用
    this.callable = callable;
    lock = new ReentrantLock();
    condition = lock.newCondition();
  }

  @SuppressWarnings("unchecked")
  public V get(long timeout, TimeUnit unit) {
    if (returnVal != null) // 如果符合条件 说明 call 函数已经执行完成 返回值已经不为 null 了
      return (V) returnVal; // 直接将结果返回即可 这样不用竞争锁资源 提高程序执行效率
    lock.lock();
    try {
      // 这里需要进行二次判断 (双重检查)
      // 因为如果一个线程在第一次判断 returnVal 为空
      // 然后这个时候它可能因为获取锁而被挂起
      // 而在被挂起的这段时间,call 可能已经执行完成
      // 如果这个时候不进行判断直接执行 await方法
      // 那后面这个线程将无法被唤醒
      if (returnVal == null)
        condition.await(timeout, unit);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      lock.unlock();
    }
    return (V) returnVal;
  }

  @SuppressWarnings("unchecked")
  public V get() {
    if (returnVal != null)
      return (V) returnVal;
    lock.lock();
    try {
      // 同样的需要进行双重检查
      if (returnVal == null)
      	condition.await();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      lock.unlock();
    }
    return (V) returnVal;
  }

  @Override
  public void run() {
    if (returnVal != null)
      return;
    try {
      // 在 Runnable 的 run 方法当中
      // 执行 Callable 方法的 call 得到返回结果
      returnVal = callable.call();
    } catch (Exception e) {
      e.printStackTrace();
    }
    lock.lock();
    try {
      // 因为已经得到了结果
      // 因此需要将所有被 await 方法阻塞的线程唤醒
      // 让他们从 get 方法返回
      condition.signalAll();
    }finally {
      lock.unlock();
    }
  }
	// 下面是测试代码
  public static void main(String[] args) {
    MyFutureTask<Integer> ft = new MyFutureTask<>(() -> {
      TimeUnit.SECONDS.sleep(2);
      return 101;
    });
    Thread thread = new Thread(ft);
    thread.start();
    System.out.println(ft.get(100, TimeUnit.MILLISECONDS)); // 输出为 null
    System.out.println(ft.get()); // 输出为 101
  }
}

我们现在用我们自己写的MyFutureTask去实现在前文当中数组求和的例子:

public static void main(String[] args) throws ExecutionException, InterruptedException {
  int[] data = new int[100000];
  Random random = new Random();
  for (int i = 0; i < 100000; i++) {
    data[i] = random.nextInt(10000);
  }
  @SuppressWarnings("unchecked")
  MyFutureTask<Integer>[] tasks = (MyFutureTask<Integer>[]) Array.newInstance(MyFutureTask.class, 10);
  for (int i = 0; i < 10; i++) {
    int idx = i;
    tasks[i] = new MyFutureTask<>(() -> {
      int sum = 0;
      for (int k = idx * 10000; k < (idx + 1) * 10000; k++) {
        sum += data[k];
      }
      return sum;
    });
  }
  for (MyFutureTask<Integer> MyFutureTask : tasks) {
    new Thread(MyFutureTask).start();
  }
  int threadSum = 0;
  for (MyFutureTask<Integer> MyFutureTask : tasks) {
    threadSum += MyFutureTask.get();
  }
  int sum = Arrays.stream(data).sum();
  System.out.println(sum == threadSum); // 输出结果为 true
}

总结

在本篇文章当中主要给大家介绍了FutureTask的内部原理,并且我们自己通过使用ReentrantLockCondition实现了我们自己的FutureTask,本篇文章的主要内容如下:

FutureTask的内部原理:

  • FutureTask首先会继承Runnable接口,这样就可以将FutureTask的对象直接放入Thread类当中,作为构造函数的参数。
  • 我们在使用FutureTask的时候需要传入一个Callable实现类的对象,在函数call当中实现我们需要执行的函数,执行完成之后,将call函数的返回值保存下来,当有线程调用get方法时候将保存的返回值返回。

我们使用条件变量进行对线程的阻塞和唤醒。

  • 当有线程调用get方法时,如果call已经执行完成,那么可以直接将结果返回,否则需要使用条件变量将线程挂起。
  • call函数执行完成的时候,需要使用条件变量将所有阻塞在get方法的线程唤醒。

双重检查:

  • 我们在get方法当中首先判断returnVal是否为空,如果不为空直接将结果返回,这就可以不用去竞争锁资源了,可以提高程序执行的效率。
  • 但是我们在使用锁保护的临界区还需要进行判断,判断returnVal是否为空,因为如果一个线程在第一次判断 returnVal 为空,然后这个时候它可能因为获取锁而被挂起, 而在被挂起的这段时间,call 可能已经执行完成,如果这个时候不进行判断直接执行 await方法,那后面这个线程将无法被唤醒,因为在call函数执行完成之后调用了condition.signalAll(),如果线程在这之后执行await方法,那么将来再没有线程去将这些线程唤醒。

到此这篇关于Java实现FutureTask的示例详解的文章就介绍到这了,更多相关Java FutureTask内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java FutureTask类使用案例解析

    FutureTask一个可取消的异步计算,FutureTask 实现了Future的基本方法,提空 start cancel 操作,可以查询计算是否已经完成,并且可以获取计算的结果.结果只可以在计算完成之后获取,get方法会阻塞当计算没有完成的时候,一旦计算已经完成,那么计算就不能再次启动或是取消. 一个FutureTask 可以用来包装一个 Callable 或是一个runnable对象.因为FurtureTask实现了Runnable方法,所以一个 FutureTask可以提交(submit

  • Java多线程之FutureTask的介绍及使用

    一.FutureTask的理解 FutureTask属于java.util.concurrent 包:FutureTask表示可取消的异步计算.FutureTask类提供了一个Future的基本实现 ,具有启动和取消计算的方法,查询计算是否完整,并检索计算结果.结果只能在计算完成后才能检索; 如果计算尚未完成,则get方法将阻止. 一旦计算完成,则无法重新启动或取消计算(除非使用runAndReset()调用计算 ). 二.FutureTask类图 从上面的FutureTask类图中可以看出,F

  • Java线程池FutureTask实现原理详解

    前言 线程池可以并发执行多个任务,有些时候,我们可能想要跟踪任务的执行结果,甚至在一定时间内,如果任务没有执行完成,我们可能还想要取消任务的执行,为了支持这一特性,ThreadPoolExecutor提供了 FutureTask 用于追踪任务的执行和取消.本篇介绍FutureTask的实现原理. 类视图 为了更好的理解FutureTask的实现原理,这里先提供几个重要接口和类的结构,如下图所示: RunnableAdapter ThreadPoolExecutor提供了submit接口用于提交任

  • Java多线程并发FutureTask使用详解

    目录 基本使用 代码分析 继承关系 Future RunnableFuture FutureTask 状态 属性 内部类 构造方法 检索 FutureTask 状态 取消操作 计算结果 立刻获取结果或异常 run 方法组 本文基于最新的 OpenJDK 代码,预计发行版本为 19 . Java 的多线程机制本质上能够完成两件事情,异步计算和并发.并发问题通过解决线程安全的一系列 API 来解决:而异步计算,常见的使用是 Runnable 和 Callable 配合线程使用. FutureTask

  • 简谈java并发FutureTask的实现

    概述 在使用java多线程解决问题的时候,为了提高效率,我们常常会异步处理一些计算任务并在最后异步的获取计算结果,这个过程的实现离不开Future接口及其实现类FutureTask.FutureTask类实现了Runnable, Future接口,接下来我会通过源码对该类的实现进行详解. 使用 我们先看下FutureTask中的主要方法如下,可以看出FutureTask实现了任务及异步结果的集合功能.看到这块的方法,大家肯定会有疑问,Runnable任务的run方法返回空,FutureTask如

  • Java中Future和FutureTask的示例详解及使用

    目录 一.Future 接口 二.FutureTask 三.使用 Callable 和 Future 四.小结(FutureTask核心原理) 总结 一.Future 接口 当 call()方法完成时,结果必须存储在主线程已知的对象中,以便主线程可以知道该线程返回的结果.为此,可以使用 Future 对象. 将 Future 视为保存结果的对象–它可能暂时不保存结果,但将来会保存(一旦Callable 返回).Future 基本上是主线程可以跟踪进度以及其他线程的结果的一种方式.要实现此接口,必

  • Java实现FutureTask的示例详解

    目录 前言 FutureTask 自己实现FutureTask 工具准备 FutureTask设计与实现 总结 前言 在并发编程当中我们最常见的需求就是启动一个线程执行一个函数去完成我们的需求,而在这种需求当中,我们常常需要函数有返回值.比如我们需要同一个非常大的数组当中数据的和,让每一个线程求某一个区间内部的和,最终将这些和加起来,那么每个线程都需要返回对应区间的和.而在Java当中给我们提供了这种机制,去实现这一个效果——FutureTask. FutureTask 在自己写FutureTa

  • Java反射框架Reflections示例详解

    MAVEN 坐标 <dependency> <groupId>org.reflections</groupId> <artifactId>reflections</artifactId> <version>0.9.10</version> </dependency> Reflections 的作用 Reflections通过扫描classpath,索引元数据,并且允许在运行时查询这些元数据. 获取某个类型的所有

  • Java设计模式之单例模式示例详解

    目录 0.概述 1.饿汉式 1.1 饿汉式单例实现 1.2 破坏单例的几种情况 1.3 预防单例的破坏 2.枚举饿汉式 2.1 枚举单例实现 2.2 破坏单例 3.懒汉式 4.双检锁懒汉式 5.内部类懒汉式 6.JDK中单例的体现 0.概述 为什么要使用单例模式? 在我们的系统中,有一些对象其实我们只需要一个,比如说:线程池.缓存.对话框.注册表.日志对象.充当打印机.显卡等设备驱动程序的对象.事实上,这一类对象只能有一个实例,如果制造出多个实例就可能会导致一些问题的产生,比如:程序的行为异常.

  • java基础之注解示例详解

    目录 定义 作用 注解与注释的区别 JDK内置的标准注解 自定义注解 @Target 属性 定义 注解也叫原数据,它是JDK1.5及之后版本引入的一个特性,它可以声明在类.方法.变量等前面,用来对这些元素进行说明. 作用 生成文档:通过代码里标识的注解生成doc文档[生成doc文档] 代码分析:通过代码里标识的注解对代码进行分析[反射] 编译检查:通过代码里标识的注解让编译器能够实现基本的编译检查[Override] 注解与注释的区别 注解是给编译器看的,注释是给程序员看的. JDK内置的标准注

  • Java代理模式的示例详解

    目录 定义 案例 需求 方案:静态代理模式 总结 定义 代理模式(Proxy Parttern) 为一个对象提供一个替身,来控制这个对象的访问,即通过代理对象来访问目标对象,这样做的话好处是可以在目标对象实现的基础上,进行额外的功能的扩展. 案例 需求 苹果公司通过苹果代理商来卖手机 方案:静态代理模式 定义抽象接口类,该类在代理模式中扮演的是一个抽象功能的角色,该案例中就是把出售手机抽象为了一个接口 /** * 售卖手机的接口(代理模式--抽象角色) * @author:liyajie * @

  • Java动态代理的示例详解

    目录 定义 分类 案例 需求 方案一:jdk动态代理 方案二:cglib动态代理 分析 总结 定义 动态代理指的是,代理类和目标类的关系在程序运行的时候确定的,客户通过代理类来调用目标对象的方法,是在程序运行时根据需要动态的创建目标类的代理对象. 分类 jdk动态代理 cglib动态代理 案例 需求 苹果公司通过苹果代理商来卖手机 方案一:jdk动态代理 定义抽象接口 /** * 售卖手机的接口(代理模式--抽象角色) * @author:liyajie * @createTime:2022/2

  • Java实现无向图的示例详解

    目录 基本概念 图的定义 无向图的定义 无向图的 API 无向图的实现方式 邻接矩阵 边的数组 邻接表数组 无向图的遍历 深度优先搜索 广度优先搜索 后记 基本概念 图的定义 一个图是由点集V={vi} 和 VV 中元素的无序对的一个集合E={ek} 所构成的二元组,记为G=(V,E),V中的元素vi叫做顶点,E中的元素 ek叫做边. 对于V中的两个点 u,v,如果边(u,v) 属于E,则称 u,v两点相邻,u,v称为边(u,v)的端点. 我们可以用m(G)=|E| 表示图G中的边数,用n(G)

  • Java Stream流语法示例详解

    目录 如何使用Stream? Stream的操作分类 1.创建流 2.操作流 1)过滤 2)映射 3)匹配 4)组合 3.转换流 如何使用Stream? 聚合操作是Java 8针对集合类,使编程更为便利的方式,可以与Lambda表达式一起使用,达到更加简洁的目的. 前面例子中,对聚合操作的使用可以归结为3个部分: 1)  创建Stream:通过stream()方法,取得集合对象的数据集. 2)  Intermediate:通过一系列中间(Intermediate)方法,对数据集进行过滤.检索等数

  • Java HttpClient用法的示例详解

    目录 1.导入依赖 2.使用工具类 3.扩展 1.导入依赖 <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.3</version> </dependency> <dependency> <groupId>com.alibab

  • Go&java算法之最大数示例详解

    目录 最大数 方法一:排序(java) 方法一:排序(go) 最大数 给定一组非负整数 nums,重新排列每个数的顺序(每个数不可拆分)使之组成一个最大的整数. 注意:输出结果可能非常大,所以你需要返回一个字符串而不是整数. 示例 1: 输入:nums = [10,2] 输出:"210" 示例 2: 输入:nums = [3,30,34,5,9] 输出:"9534330" 提示: 1 <= nums.length <= 100 0 <= nums[

随机推荐