futuretask源码分析(推荐)

FutureTask只实现RunnableFuture接口:

该接口继承了java.lang.Runnable和Future接口,也就是继承了这两个接口的特性。

1.可以不必直接继承Thread来生成子类,只要实现run方法,且把实例传入到Thread构造函数,Thread就可以执行该实例的run方法了( Thread(Runnable) )。

2.可以让任务独立执行,get获取任务执行结果时,可以阻塞直至执行结果完成。也可以中断执行,判断执行状态等。

FutureTask是一个支持取消行为的异步任务执行器。该类实现了Future接口的方法。

如: 1. 取消任务执行

2. 查询任务是否执行完成

3. 获取任务执行结果(”get“任务必须得执行完成才能获取结果,否则会阻塞直至任务完成)。

注意:一旦任务执行完成,则不能执行取消任务或者重新启动任务。(除非一开始就使用runAndReset模式运行任务)
FutureTask支持执行两种任务, Callable 或者 Runnable的实现类。且可把FutureTask实例交由Executor执行。

源码部分(很简单):

public class FutureTask<V> implements RunnableFuture<V> {
  /*
   * Revision notes: This differs from previous versions of this
   * class that relied on AbstractQueuedSynchronizer, mainly to
   * avoid surprising users about retaining interrupt status during
   * cancellation races. Sync control in the current design relies
   * on a "state" field updated via CAS to track completion, along
   * with a simple Treiber stack to hold waiting threads.
   *
   * Style note: As usual, we bypass overhead of using
   * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
   */
  /**
   * The run state of this task, initially NEW. The run state
   * transitions to a terminal state only in methods set,
   * setException, and cancel. During completion, state may take on
   * transient values of COMPLETING (while outcome is being set) or
   * INTERRUPTING (only while interrupting the runner to satisfy a
   * cancel(true)). Transitions from these intermediate to final
   * states use cheaper ordered/lazy writes because values are unique
   * and cannot be further modified.
   *
   * Possible state transitions:
   * NEW -> COMPLETING -> NORMAL
   * NEW -> COMPLETING -> EXCEPTIONAL
   * NEW -> CANCELLED
   * NEW -> INTERRUPTING -> INTERRUPTED
   */
  private volatile int state;
  private static final int NEW     = 0;
  private static final int COMPLETING  = 1;
  private static final int NORMAL    = 2;
  private static final int EXCEPTIONAL = 3;
  private static final int CANCELLED  = 4;
  private static final int INTERRUPTING = 5;
  private static final int INTERRUPTED = 6;
  /** The underlying callable; nulled out after running */
  private Callable<V> callable;
  /** 用来存储任务执行结果或者异常对象,根据任务state在get时候选择返回执行结果还是抛出异常 */
  private Object outcome; // non-volatile, protected by state reads/writes
  /** 当前运行Run方法的线程 */
  private volatile Thread runner;
  /** Treiber stack of waiting threads */
  private volatile WaitNode waiters;
  /**
   * Returns result or throws exception for completed task.
   *
   * @param s completed state value
   */
  @SuppressWarnings("unchecked")
  private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
      return (V)x;
    if (s >= CANCELLED)
      throw new CancellationException();
    throw new ExecutionException((Throwable)x);
  }
  /**
   * Creates a {@code FutureTask} that will, upon running, execute the
   * given {@code Callable}.
   *
   * @param callable the callable task
   * @throws NullPointerException if the callable is null
   */
  public FutureTask(Callable<V> callable) {
    if (callable == null)
      throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;    // ensure visibility of callable
  }
  /**
   * Creates a {@code FutureTask} that will, upon running, execute the
   * given {@code Runnable}, and arrange that {@code get} will return the
   * given result on successful completion.
   *
   * @param runnable the runnable task
   * @param result the result to return on successful completion. If
   * you don't need a particular result, consider using
   * constructions of the form:
   * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
   * @throws NullPointerException if the runnable is null
   */
  public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;    // ensure visibility of callable
  }
  //判断任务是否已取消(异常中断、取消等)
  public boolean isCancelled() {
    return state >= CANCELLED;
  }
  /**
  判断任务是否已结束(取消、异常、完成、NORMAL都等于结束)
  **
  public boolean isDone() {
    return state != NEW;
  }
  /**
  mayInterruptIfRunning用来决定任务的状态。
          true : 任务状态= INTERRUPTING = 5。如果任务已经运行,则强行中断。如果任务未运行,那么则不会再运行
          false:CANCELLED  = 4。如果任务已经运行,则允许运行完成(但不能通过get获取结果)。如果任务未运行,那么则不会再运行
  **/
  public boolean cancel(boolean mayInterruptIfRunning) {
    if (state != NEW)
      return false;
    if (mayInterruptIfRunning) {
      if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
        return false;
      Thread t = runner;
      if (t != null)
        t.interrupt();
      UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
    }
    else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
      return false;
    finishCompletion();
    return true;
  }
  /**
   * @throws CancellationException {@inheritDoc}
   */
  public V get() throws InterruptedException, ExecutionException {
    int s = state;
    //如果任务未彻底完成,那么则阻塞直至任务完成后唤醒该线程
    if (s <= COMPLETING)
      s = awaitDone(false, 0L);
    return report(s);
  }
  /**
   * @throws CancellationException {@inheritDoc}
   */
  public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
      throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
      (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
      throw new TimeoutException();
    return report(s);
  }
  /**
   * Protected method invoked when this task transitions to state
   * {@code isDone} (whether normally or via cancellation). The
   * default implementation does nothing. Subclasses may override
   * this method to invoke completion callbacks or perform
   * bookkeeping. Note that you can query status inside the
   * implementation of this method to determine whether this task
   * has been cancelled.
   */
  protected void done() { }
  /**
  该方法在FutureTask里只有run方法在任务完成后调用。
  主要保存任务执行结果到成员变量outcome 中,和切换任务执行状态。
  由该方法可以得知:
  COMPLETING : 任务已执行完成(也可能是异常完成),但还未设置结果到成员变量outcome中,也意味着还不能get
  NORMAL  : 任务彻底执行完成
  **/
  protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
      outcome = v;
      UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
      finishCompletion();
    }
  }
  /**
   * Causes this future to report an {@link ExecutionException}
   * with the given throwable as its cause, unless this future has
   * already been set or has been cancelled.
   *
   * <p>This method is invoked internally by the {@link #run} method
   * upon failure of the computation.
   *
   * @param t the cause of failure
   */
  protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
      outcome = t;
      UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
      finishCompletion();
    }
  }
  /**
  由于实现了Runnable接口的缘故,该方法可由执行线程所调用。
  **/
  public void run() {
    //只有当任务状态=new时才被运行继续执行
    if (state != NEW ||
      !UNSAFE.compareAndSwapObject(this, runnerOffset,
                     null, Thread.currentThread()))
      return;
    try {
      Callable<V> c = callable;
      if (c != null && state == NEW) {
        V result;
        boolean ran;
        try {
          //调用Callable的Call方法
          result = c.call();
          ran = true;
        } catch (Throwable ex) {
          result = null;
          ran = false;
          setException(ex);
        }
        if (ran)
          set(result);
      }
    } finally {
      // runner must be non-null until state is settled to
      // prevent concurrent calls to run()
      runner = null;
      // state must be re-read after nulling runner to prevent
      // leaked interrupts
      int s = state;
      if (s >= INTERRUPTING)
        handlePossibleCancellationInterrupt(s);
    }
  }
  /**
  如果该任务在执行过程中不被取消或者异常结束,那么该方法不记录任务的执行结果,且不修改任务执行状态。
  所以该方法可以重复执行N次。不过不能直接调用,因为是protected权限。
  **/
  protected boolean runAndReset() {
    if (state != NEW ||
      !UNSAFE.compareAndSwapObject(this, runnerOffset,
                     null, Thread.currentThread()))
      return false;
    boolean ran = false;
    int s = state;
    try {
      Callable<V> c = callable;
      if (c != null && s == NEW) {
        try {
          c.call(); // don't set result
          ran = true;
        } catch (Throwable ex) {
          setException(ex);
        }
      }
    } finally {
      // runner must be non-null until state is settled to
      // prevent concurrent calls to run()
      runner = null;
      // state must be re-read after nulling runner to prevent
      // leaked interrupts
      s = state;
      if (s >= INTERRUPTING)
        handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
  }
  /**
   * Ensures that any interrupt from a possible cancel(true) is only
   * delivered to a task while in run or runAndReset.
   */
  private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us. Let's spin-wait patiently.
    if (s == INTERRUPTING)
      while (state == INTERRUPTING)
        Thread.yield(); // wait out pending interrupt
    // assert state == INTERRUPTED;
    // We want to clear any interrupt we may have received from
    // cancel(true). However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
  }
  /**
   * Simple linked list nodes to record waiting threads in a Treiber
   * stack. See other classes such as Phaser and SynchronousQueue
   * for more detailed explanation.
   */
  static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
  }
  /**
  该方法在任务完成(包括异常完成、取消)后调用。删除所有正在get获取等待的节点且唤醒节点的线程。和调用done方法和置空callable.
  **/
  private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
      if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
        for (;;) {
          Thread t = q.thread;
          if (t != null) {
            q.thread = null;
            LockSupport.unpark(t);
          }
          WaitNode next = q.next;
          if (next == null)
            break;
          q.next = null; // unlink to help gc
          q = next;
        }
        break;
      }
    }
    done();
    callable = null;    // to reduce footprint
  }
  /**
  阻塞等待任务执行完成(中断、正常完成、超时)
  **/
  private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
      /**
      这里的if else的顺序也是有讲究的。
      1.先判断线程是否中断,中断则从队列中移除(也可能该线程不存在于队列中)
      2.判断当前任务是否执行完成,执行完成则不再阻塞,直接返回。
      3.如果任务状态=COMPLETING,证明该任务处于已执行完成,正在切换任务执行状态,CPU让出片刻即可
      4.q==null,则证明还未创建节点,则创建节点
      5.q节点入队
      6和7.阻塞
      **/
      if (Thread.interrupted()) {
        removeWaiter(q);
        throw new InterruptedException();
      }
      int s = state;
      if (s > COMPLETING) {
        if (q != null)
          q.thread = null;
        return s;
      }
      else if (s == COMPLETING) // cannot time out yet
        Thread.yield();
      else if (q == null)
        q = new WaitNode();
      else if (!queued)
        queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                           q.next = waiters, q);
      else if (timed) {
        nanos = deadline - System.nanoTime();
        if (nanos <= 0L) {
          removeWaiter(q);
          return state;
        }
        LockSupport.parkNanos(this, nanos);
      }
      else
        LockSupport.park(this);
    }
  }
  /**
   * Tries to unlink a timed-out or interrupted wait node to avoid
   * accumulating garbage. Internal nodes are simply unspliced
   * without CAS since it is harmless if they are traversed anyway
   * by releasers. To avoid effects of unsplicing from already
   * removed nodes, the list is retraversed in case of an apparent
   * race. This is slow when there are a lot of nodes, but we don't
   * expect lists to be long enough to outweigh higher-overhead
   * schemes.
   */
  private void removeWaiter(WaitNode node) {
    if (node != null) {
      node.thread = null;
      retry:
      for (;;) {     // restart on removeWaiter race
        for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
          s = q.next;
          if (q.thread != null)
            pred = q;
          else if (pred != null) {
            pred.next = s;
            if (pred.thread == null) // check for race
              continue retry;
          }
          else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                             q, s))
            continue retry;
        }
        break;
      }
    }
  }
  // Unsafe mechanics
  private static final sun.misc.Unsafe UNSAFE;
  private static final long stateOffset;
  private static final long runnerOffset;
  private static final long waitersOffset;
  static {
    try {
      UNSAFE = sun.misc.Unsafe.getUnsafe();
      Class<?> k = FutureTask.class;
      stateOffset = UNSAFE.objectFieldOffset
        (k.getDeclaredField("state"));
      runnerOffset = UNSAFE.objectFieldOffset
        (k.getDeclaredField("runner"));
      waitersOffset = UNSAFE.objectFieldOffset
        (k.getDeclaredField("waiters"));
    } catch (Exception e) {
      throw new Error(e);
    }
  }
}

总结

以上就是本文关于futuretask源码分析(推荐)的全部内容,希望对大家有所帮助。感兴趣的朋友可以参阅:Java利用future及时获取多线程运行结果、浅谈Java多线程处理中Future的妙用(附源码)、futuretask用法及使用场景介绍等,有什么问题可以随时留言,欢迎大家一起交流讨论。

(0)

相关推荐

  • futuretask用法及使用场景介绍

    FutureTask可用于异步获取执行结果或取消执行任务的场景.通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果.另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTa

  • Java中的Runnable,Callable,Future,FutureTask的比较

    Java中的Runnable,Callable,Future,FutureTask的比较 Java中存在Runnable.Callable.Future.FutureTask这几个与线程相关的类或者接口,在Java中也是比较重要的几个概念,我们通过下面的简单示例来了解一下它们的作用于区别. Runnable 其中Runnable应该是我们最熟悉的接口,它只有一个run()函数,用于将耗时操作写在其中, 该函数没有返回值 .然后使用某个线程去执行该runnable即可实现多线程,Thread类在调

  • java多线程返回值使用示例(callable与futuretask)

    Callable接口类似于Runnable,从名字就可以看出来了,但是Runnable不会返回结果,并且无法抛出返回结果的异常,而Callable功能更强大一些,被线程执行后,可以返回值,这个返回值可以被Future拿到,也就是说,Future可以拿到异步执行任务的返回值,下面来看一个简单的例子 复制代码 代码如下: package com.future.test; import java.io.FileNotFoundException;import java.io.IOException;i

  • futuretask源码分析(推荐)

    FutureTask只实现RunnableFuture接口: 该接口继承了java.lang.Runnable和Future接口,也就是继承了这两个接口的特性. 1.可以不必直接继承Thread来生成子类,只要实现run方法,且把实例传入到Thread构造函数,Thread就可以执行该实例的run方法了( Thread(Runnable) ). 2.可以让任务独立执行,get获取任务执行结果时,可以阻塞直至执行结果完成.也可以中断执行,判断执行状态等. FutureTask是一个支持取消行为的异

  • java使用websocket,并且获取HttpSession 源码分析(推荐)

    一:本文使用范围 此文不仅仅局限于spring boot,普通的spring工程,甚至是servlet工程,都是一样的,只不过配置一些监听器的方法不同而已. 本文经过作者实践,确认完美运行. 二:Spring boot使用websocket 2.1:依赖包 websocket本身是servlet容器所提供的服务,所以需要在web容器中运行,像我们所使用的tomcat,当然,spring boot中已经内嵌了tomcat. websocket遵循了javaee规范,所以需要引入javaee的包 <

  • Java并发编程之Condition源码分析(推荐)

    Condition介绍 上篇文章讲了ReentrantLock的加锁和释放锁的使用,这篇文章是对ReentrantLock的补充.ReentrantLock#newCondition()可以创建Condition,在ReentrantLock加锁过程中可以利用Condition阻塞当前线程并临时释放锁,待另外线程获取到锁并在逻辑后通知阻塞线程"激活".Condition常用在基于异步通信的同步机制实现中,比如dubbo中的请求和获取应答结果的实现. 常用方法 Condition中主要的

  • Android AsyncTask源码分析

    Android中只能在主线程中进行UI操作,如果是其它子线程,需要借助异步消息处理机制Handler.除此之外,还有个非常方便的AsyncTask类,这个类内部封装了Handler和线程池.本文先简要介绍AsyncTask的用法,然后分析具体实现. 基本用法 AsyncTask是一个抽象类,我们需要创建子类去继承它,并且重写一些方法.AsyncTask接受三个泛型参数: Params: 指定传给任务执行时的参数的类型 Progress: 指定后台任务执行时将任务进度返回给UI线程的参数类型 Re

  • jQuery源码分析-03构造jQuery对象-工具函数

    作者:nuysoft/高云 QQ:47214707 EMail:nuysoft@gmail.com 声明:本文为原创文章,如需转载,请注明来源并保留原文链接. 读读写写,不对的地方请告诉我,多多交流共同进步,本章的的PDF等本章写完了发布. jQuery源码分析系列的目录请查看 http://nuysoft.iteye.com/blog/1177451,想系统的好好写写,目前还是从我感兴趣的部分开始,如果大家有对哪个模块感兴趣的,建议优先分析的,可以告诉我,一起学习. 3.4 其他静态工具函数

  • jQuery 1.9.1源码分析系列(十五)之动画处理

    首先需要有队列(queue)的基本知识.见上一章. 相关教程:jQuery下的动画处理总结: http://www.jb51.net/article/42000.htm jQuery 1.9.1源码分析系列(十五)动画处理之缓动动画核心Tween  :http://www.jb51.net/article/75821.htm a.动画入口jQuery.fn.animate函数执行流程详解 ------------------------------------------------------

  • jQuery 1.9.1源码分析系列(十)事件系统之绑定事件

    事件绑定的方法有很多种,使用了jquery那么原理那种绑定方式(elem.click = function(){...}))就不太想推荐给大家了.最主要的原因是elem.click=fn这种方式只能绑定一个事件处理,多次绑定的只会保留最后一次绑定的结果. 下面给大家介绍jquery绑定事件的方式有哪些吧. 复制代码 代码如下: jQuery.fn.eventType([[data,] fn]) 比如eventType指的是事件类型,比如click: $("#chua").click(f

  • jQuery插件-jRating评分插件源码分析及使用方法

    该插件被广泛应用于各种需要评分的页面当中,今天作为学习,把源码拿出来分析一下,顺便学习其使用方法. 一.插件使用一览. 复制代码 代码如下: <div> <div>第一个例子</div> <div id="16_1" class="myRating"></div> </div> 复制代码 代码如下: <link href="Script/jRating/jRating.jquer

  • JDK源码分析之String、StringBuilder和StringBuffer

    前言 本文主要介绍了关于JDK源码分析之String.StringBuilder和StringBuffer的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧 String类的申明 public final class String implements java.io.Serializable, Comparable<String>, CharSequence {-} String类用了final修饰符,表示它不可以被继承,同时还实现了三个接口, 实现Serializa

  • 如何让Nodejs支持H5 History模式(connect-history-api-fallback源码分析)

    导读 本文主要是对connect-history-api-fallback库进行一次源码分析.connect-history-api-fallback是一个用于支持SPA History路由模式的nodejs库.阅读本文前,应对HTML5 History模式有一定程度的了解! 源码分析 /** * 前端需要开启history模式,而后端根据url并不知道前端在请求api还是在请求页面,如localhost:4200/home这种url,前端理所当然认为"我需要得到html,并跳转到首页"

随机推荐