Java中Future、FutureTask原理以及与线程池的搭配使用

Java中的Future和Future通常和线程池搭配使用,用来获取线程池返回执行后的返回值。我们假设通过Executors工厂方法构建一个线程池es ,es要执行某个任务有两种方式,一种是执行 es.execute(runnable) ,这种情况是没有返回值的; 另外一种情况是执行 es.submit(runnale)或者 es.submit(callable) ,这种情况会返回一个Future的对象,然后调用Future的get()来获取返回值。

Future

public interface Future<V> {
  boolean cancel(boolean mayInterruptIfRunning);

  boolean isCancelled();

  boolean isDone();

  V get() throws InterruptedException, ExecutionException;

  V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
}

Future是一个接口,他提供给了我们方法来检测当前的任务是否已经结束,还可以等待任务结束并且拿到一个结果,通过调用Future的get()方法可以当任务结束后返回一个结果值,如果工作没有结束,则会阻塞当前线程,直到任务执行完毕,我们可以通过调用cancel()方法来停止一个任务,如果任务已经停止,则cancel()方法会返回true;如果任务已经完成或者已经停止了或者这个任务无法停止,则cancel()会返回一个false。当一个任务被成功停止后,他无法再次执行。isDone()和isCancel()方法可以判断当前工作是否完成和是否取消。

线程池中有以下几个方法:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
  }
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
  }
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
  }

都返回一个Future对象,仔细查看发现,所有的方法最终都将runnable或者callable转变成一个RunnableFuture的对象,这个RunnableFutre的对象是一个同时继承了Runnable和Future的接口

public interface RunnableFuture<V> extends Runnable, Future<V> {
  void run();
}

然后调用executor(runnable)方法,关于executor(runnable)的具体实现我们后面再讲。最后返回一个RunnableFuture对象。RunnableFuture这个接口直有一个具体的实现类,那就时我们接下来要讲的FutureTask。

FutureTask

public class FutureTask<V> implements RunnableFuture<V>

FutureTask实现了RunnableFuture的接口,既然我们知道最终返回的是一个FutureTask对象ftask,而且我们可以通过ftask.get()可以的来得到execute(task)的返回值,这个过程具体事怎么实现的了??这个也是本篇文章的所要讲的。

我们可以先来猜测一下它的实现过程,首先Runnable的run()是没有返回值的,所以当es.submit()的参数只有一个Runnable对象的时候,通过ftask.get()得到的也是一个null值,当参数还有一个result的时候,就返回这个result;如果参数是一个Callable的对象的时候,Callable的call()是有返回值的,同时这个call()方法会在转换的Runable对象ftask的run()方法中被调用,然后将它的返回值赋值给一个全局变量,最后在ftask的get()方法中得到这个值。猜想对不对了? 我们直接看源码。

将Runnable对象转为RunnableFuture的方法:

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
  }
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;    // ensure visibility of callable
  }

Executors::callable()

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
      throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
  }

Executors的内部类RunnableAdapter

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
      this.task = task;
      this.result = result;
    }
    public T call() {
      task.run();
      return result;
    }
  }

将Callable对象转为RunnableFuture的方法:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
  }
public FutureTask(Callable<V> callable) {
    if (callable == null)
      throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;    // ensure visibility of callable
  }

接下来我们来看execute(runnbale)的执行过程:

execute(runnable)最终的实现是在ThreadPoolExecutor,基本上所有的线程池都是通过ThreadPoolExecutor的构造方法传入不同的参数来构造的。

ThreadPoolExecutor::executor(runnable) :

public void execute(Runnable command) {
    if (command == null)
      throw new NullPointerException();

    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
      if (addWorker(command, true))
        return;
      c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
      int recheck = ctl.get();
      if (! isRunning(recheck) && remove(command))
        reject(command);
      else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
    }
    else if (!addWorker(command, false))
      reject(command);
  }

如上所示,这个过程分为三步:

Step 1:

如果当前线程池的的线程小于核心线程的数量的时候,就会调用addWorker检查运行状态和正在运行的线程数量,通过返回false来防止错误地添加线程,然后执行当前任务。

Step 2:

否则当前线程池的的线程大于核心线程的数量的时候,我们仍然需要先判断是否需要添加一个新的线程来执行这个任务,因为可能已经存在的线程此刻任务执行完毕处于空闲状态,这个时候可以直接复用。否则创建一个新的线程来执行此任务。

Step 3:

如果不能再添加新的任务,就拒绝。

执行execute(runnable)最终会回调runnable的run()方法,也就是FutureTask的对象ftask的run()方法,源码如下:

public void run() {
    if (state != NEW ||
      !UNSAFE.compareAndSwapObject(this, runnerOffset,
                     null, Thread.currentThread()))
      return;
    try {
      Callable<V> c = callable;
      if (c != null && state == NEW) {
        V result;
        boolean ran;
        try {
          result = c.call();
          ran = true;
        } catch (Throwable ex) {
          result = null;
          ran = false;
          setException(ex);
        }
        if (ran)
          set(result);
      }
    } finally {
      // runner must be non-null until state is settled to
      // prevent concurrent calls to run()
      runner = null;
      // state must be re-read after nulling runner to prevent
      // leaked interrupts
      int s = state;
      if (s >= INTERRUPTING)
        handlePossibleCancellationInterrupt(s);
    }
  }

通过执行result = c.call()拿到返回值,然后set(result) ,因此get()方法获得的值正是这个result。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java中的Runnable,Callable,Future,FutureTask的比较

    Java中的Runnable,Callable,Future,FutureTask的比较 Java中存在Runnable.Callable.Future.FutureTask这几个与线程相关的类或者接口,在Java中也是比较重要的几个概念,我们通过下面的简单示例来了解一下它们的作用于区别. Runnable 其中Runnable应该是我们最熟悉的接口,它只有一个run()函数,用于将耗时操作写在其中, 该函数没有返回值 .然后使用某个线程去执行该runnable即可实现多线程,Thread类在调

  • java多线程编程同步器Future和FutureTask解析及代码示例

    publicinterfaceFuture<V>Future表示异步计算的结果.它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果.计算完成后只能使用get方法来获取结果,如有必要,计算完成前可以阻塞此方法.取消则由cancel方法来执行.还提供了其他方法,以确定任务是正常完成还是被取消了.一旦计算完成,就不能再取消计算.如果为了可取消性而使用Future但又不提供可用的结果,则可以声明Future<?>形式类型.并返回null作为底层任务的结果. Future主要

  • Java线程池FutureTask实现原理详解

    前言 线程池可以并发执行多个任务,有些时候,我们可能想要跟踪任务的执行结果,甚至在一定时间内,如果任务没有执行完成,我们可能还想要取消任务的执行,为了支持这一特性,ThreadPoolExecutor提供了 FutureTask 用于追踪任务的执行和取消.本篇介绍FutureTask的实现原理. 类视图 为了更好的理解FutureTask的实现原理,这里先提供几个重要接口和类的结构,如下图所示: RunnableAdapter ThreadPoolExecutor提供了submit接口用于提交任

  • java多线程返回值使用示例(callable与futuretask)

    Callable接口类似于Runnable,从名字就可以看出来了,但是Runnable不会返回结果,并且无法抛出返回结果的异常,而Callable功能更强大一些,被线程执行后,可以返回值,这个返回值可以被Future拿到,也就是说,Future可以拿到异步执行任务的返回值,下面来看一个简单的例子 复制代码 代码如下: package com.future.test; import java.io.FileNotFoundException;import java.io.IOException;i

  • 简谈java并发FutureTask的实现

    概述 在使用java多线程解决问题的时候,为了提高效率,我们常常会异步处理一些计算任务并在最后异步的获取计算结果,这个过程的实现离不开Future接口及其实现类FutureTask.FutureTask类实现了Runnable, Future接口,接下来我会通过源码对该类的实现进行详解. 使用 我们先看下FutureTask中的主要方法如下,可以看出FutureTask实现了任务及异步结果的集合功能.看到这块的方法,大家肯定会有疑问,Runnable任务的run方法返回空,FutureTask如

  • 比较java中Future与FutureTask之间的关系

    Future与FutureTask都是用于获取线程执行的返回结果.下面我们就对两者之间的关系与使用进行一个大致的介绍与分析 一.Future与FutureTask介绍: Future位于java.util.concurrent包下,它是一个接口 public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() t

  • Java中Future、FutureTask原理以及与线程池的搭配使用

    Java中的Future和Future通常和线程池搭配使用,用来获取线程池返回执行后的返回值.我们假设通过Executors工厂方法构建一个线程池es ,es要执行某个任务有两种方式,一种是执行 es.execute(runnable) ,这种情况是没有返回值的: 另外一种情况是执行 es.submit(runnale)或者 es.submit(callable) ,这种情况会返回一个Future的对象,然后调用Future的get()来获取返回值. Future public interfac

  • Java中该如何优雅的使用线程池详解

    目录 为什么要用线程池? 线程池 线程池基本概念 线程池接口定义和实现类 ExecutorService ScheduledExecutorService 线程池工具类 newFixedThreadPool(int nThreads) newCachedThreadPool() newSingleThreadExecutor() newScheduledThreadPool(int corePoolSize) 任务线程池执行过程 如何确认合适的线程数量? 线程池的使用分析 合理配置线程池大小 线

  • Java中Future和FutureTask的示例详解及使用

    目录 一.Future 接口 二.FutureTask 三.使用 Callable 和 Future 四.小结(FutureTask核心原理) 总结 一.Future 接口 当 call()方法完成时,结果必须存储在主线程已知的对象中,以便主线程可以知道该线程返回的结果.为此,可以使用 Future 对象. 将 Future 视为保存结果的对象–它可能暂时不保存结果,但将来会保存(一旦Callable 返回).Future 基本上是主线程可以跟踪进度以及其他线程的结果的一种方式.要实现此接口,必

  • 关于Java 中 Future 的 get 方法超时问题

    目录 一.背景 二.模拟 2.1 常见写法 2.2 尝试取消 2.2.1 cancel(false) 2.2.2 cancel(true) 三.回归源码 四.总结 一.背景 很多 Java 工程师在准备面试时,会刷很多八股文,线程和线程池这一块通常会准备线程的状态.线程的创建方式,Executors 里面的一些工厂方法和为什么不推荐使用这些工厂方法,ThreadPoolExecutor 构造方法的一些参数和执行过程等. 工作中,很多人会使用线程池的 submit 方法 获取 Future 类型的

  • Java中synchronized实现原理详解

    记得刚刚开始学习Java的时候,一遇到多线程情况就是synchronized,相对于当时的我们来说synchronized是这么的神奇而又强大,那个时候我们赋予它一个名字"同步",也成为了我们解决多线程情况的百试不爽的良药.但是,随着我们学习的进行我们知道synchronized是一个重量级锁,相对于Lock,它会显得那么笨重,以至于我们认为它不是那么的高效而慢慢摒弃它. 诚然,随着Javs SE 1.6对synchronized进行的各种优化后,synchronized并不会显得那么

  • Java中死锁的原理实战分析

    本文实例讲述了Java中死锁的原理.分享给大家供大家参考,具体如下: 一 点睛 当两个线程相互等待对方释放同步监视器时就会发生死锁,Java虚拟机没有监测.也没有采用措施来处理死锁情况,所以多线程编程时应该采取措施避免死锁的出现. 一旦出现死锁,整个程序既不会发生任何异常,也不会给出任何提示,只是所有线程处于阻塞状态,无法继续. 二 代码 class A { public synchronized void foo( B b ) { System.out.println("当前线程名: &quo

  • java中fork-join的原理解析

    ForkJoinTask就是ForkJoinPool里面的每一个任务.他主要有两个子类:RecursiveAction和RecursiveTask.然后通过fork()方法去分配任务执行任务,通过join()方法汇总任务结果, 这就是整个过程的运用.他有两个子类,使用这两个子类都可以实现我们的任务分配和计算. (1)RecursiveAction 一个递归无结果的ForkJoinTask(没有返回值) (2)RecursiveTask 一个递归有结果的ForkJoinTask(有返回值) For

  • GC参考手册二java中垃圾回收原理解析

    内存碎片整理 每次执行清除(sweeping), JVM 都必须保证不可达对象占用的内存能被回收重用.但这(最终)有可能会产生内存碎片(类似于磁盘碎片), 进而引发两个问题: 写入操作越来越耗时, 因为寻找一块足够大的空闲内存会变得非常麻烦. 在创建新对象时, JVM在连续的块中分配内存.如果碎片问题很严重, 直至没有空闲片段能存放下新创建的对象,就会发生内存分配错误(allocation error). 要避免这类问题,JVM 必须确保碎片问题不失控.因此在垃圾收集过程中, 不仅仅是标记和清除

  • 浅析Java中的SPI原理

    在面向对象的程序设计中,模块之间交互采用接口编程,通常情况下调用方不需要知道被调用方的内部实现细节,因为一旦涉及到了具体实现,如果需要换一种实现就需要修改代码,这违反了程序设计的"开闭原则".所以我们一般有两种选择:一种是使用API(Application Programming Interface),另一种是SPI(Service Provider Interface),API通常被应用程序开发人员使用,而SPI通常被框架扩展人员使用. 在进入下面学习之前,我们先来再加深一下API和

随机推荐