Java多线程之异步Future机制的原理和实现

项目中经常有些任务需要异步(提交到线程池中)去执行,而主线程往往需要知道异步执行产生的结果,这时我们要怎么做呢?用runnable是无法实现的,我们需要用callable看下面的代码:

 import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class AddTask implements Callable<Integer> {

 private int a,b;

 public AddTask(int a, int b) {
 this.a = a;
 this.b = b;
 }

 @Override
 public Integer call throws Exception {
 Integer result = a + b;
 return result;
 }

 public static void main(String[] args) throws InterruptedException, ExecutionException {
 ExecutorService executor = Executors.newSingleThreadExecutor;
 //JDK目前为止返回的都是FutureTask的实例
 Future<Integer> future = executor.submit(new AddTask(1, 2));
 Integer result = future.get;// 只有当future的状态是已完成时(future.isDone = true),get方法才会返回
 }
}

虽然可以实现获取异步执行结果的需求,但是我们发现这个Future其实很不好用,因为它没有提供通知的机制,也就是说我们不知道future什么时候完成(如果我们需要轮询isDone()来判断的话感觉就没有用这个的必要了)。看下java.util.concurrent.future.Future 的接口方法:

 public interface Future<V> {
  boolean cancel(boolean mayInterruptIfRunning);
  boolean isCancelled;
  boolean isDone;
  V get throws InterruptedException, ExecutionException;
  V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
}

由此可见JDK的Future机制其实并不好用,如果能给这个future加个监听器,让它在完成时通知监听器的话就比较好用了,就像下面这个IFuture:

 package future;

import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * The result of an asynchronous operation.
 *
 * @author lixiaohui
 * @param <V> 执行结果的类型参数
 */
public interface IFuture<V> extends Future<V> {
 boolean isSuccess; // 是否成功
 V getNow; //立即返回结果(不管Future是否处于完成状态)
 Throwable cause; //若执行失败时的原因
    boolean isCancellable; //是否可以取消
 IFuture<V> await throws InterruptedException; //等待future的完成
 boolean await(long timeoutMillis) throws InterruptedException; // 超时等待future的完成
 boolean await(long timeout, TimeUnit timeunit) throws InterruptedException;
    IFuture<V> awaitUninterruptibly; //等待future的完成,不响应中断
    boolean awaitUninterruptibly(long timeoutMillis);//超时等待future的完成,不响应中断
 boolean awaitUninterruptibly(long timeout, TimeUnit timeunit);
 IFuture<V> addListener(IFutureListener<V> l); //当future完成时,会通知这些加进来的监听器
 IFuture<V> removeListener(IFutureListener<V> l);

}

接下来就一起来实现这个IFuture,在这之前要说明下Object.wait,Object.notifyAll方法,因为整个Future实现的原���的核心就是这两个方法.看看JDK里面的解释:

 public class Object {
  /**
   * Causes the current thread to wait until another thread invokes the
   * {@link java.lang.Object#notify} method or the
   * {@link java.lang.Object#notifyAll} method for this object.
   * In other words, this method behaves exactly as if it simply
   * performs the call {@code wait(0)}.
   * 调用该方法后,当前线程会释放对象监视器锁,并让出CPU使用权。直到别的线程调用notify/notifyAll
   */
  public final void wait throws InterruptedException {
    wait(0);
  }

  /**
   * Wakes up all threads that are waiting on this object's monitor. A
   * thread waits on an object's monitor by calling one of the
   * {@code wait} methods.
   * <p>
   * The awakened threads will not be able to proceed until the current
   * thread relinquishes the lock on this object. The awakened threads
   * will compete in the usual manner with any other threads that might
   * be actively competing to synchronize on this object; for example,
   * the awakened threads enjoy no reliable privilege or disadvantage in
   * being the next thread to lock this object.
   */
  public final native void notifyAll;
}

知道这个后,我们要自己实现Future也就有了思路,当线程调用了IFuture.await等一系列的方法时,如果Future还未完成,那么就调用future.wait 方法使线程进入WAITING状态。而当别的线程设置Future为完成状态(注意这里的完成状态包括正常结束和异常结束)时,就需要调用future.notifyAll方法来唤醒之前因为调用过wait方法而处于WAITING状态的那些线程。完整的实现如下(代码应该没有很难理解的地方,我是参考netty的Future机制的。有兴趣的可以去看看netty的源码):

 package future;

import java.util.Collection;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * <pre>
 * 正常结束时, 若执行的结果不为null, 则result为执行结果; 若执行结果为null, 则result = {@link AbstractFuture#SUCCESS_SIGNAL}
 * 异常结束时, result为 {@link CauseHolder} 的实例;若是被取消而导致的异常结束, 则result为 {@link CancellationException} 的实例, 否则为其它异常的实例
 * 以下情况会使异步操作由未完成状态转至已完成状态, 也就是在以下情况发生时调用notifyAll方法:
 * <ul>
 * <li>异步操作被取消时(cancel方法)</li>
 * <li>异步操作正常结束时(setSuccess方法)</li>
 * <li>异步操作异常结束时(setFailure方法)</li>
 * </ul>
 * </pre>
 *
 * @author lixiaohui
 *
 * @param <V>
 * 异步执行结果的类型
 */
public class AbstractFuture<V> implements IFuture<V> {

 protected volatile Object result; // 需要保证其可见性
    /**
     * 监听器集
     */
 protected Collection<IFutureListener<V>> listeners = new CopyOnWriteArrayList<IFutureListener<V>>;

 /**
 * 当任务正常执行结果为null时, 即客户端调用{@link AbstractFuture#setSuccess(null)}时,
 * result引用该对象
 */
 private static final SuccessSignal SUCCESS_SIGNAL = new SuccessSignal;

 @Override
 public boolean cancel(boolean mayInterruptIfRunning) {
 if (isDone) { // 已完成了不能取消
  return false;
 }

 synchronized (this) {
  if (isDone) { // double check
  return false;
  }
  result = new CauseHolder(new CancellationException);
  notifyAll; // isDone = true, 通知等待在该对象的wait的线程
 }
 notifyListeners; // 通知监听器该异步操作已完成
 return true;
 }

 @Override
 public boolean isCancellable {
 return result == null;
 }

 @Override
 public boolean isCancelled {
 return result != null && result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
 }

 @Override
 public boolean isDone {
 return result != null;
 }

 @Override
 public V get throws InterruptedException, ExecutionException {
 await; // 等待执行结果

 Throwable cause = cause;
 if (cause == null) { // 没有发生异常,异步操作正常结束
  return getNow;
 }
 if (cause instanceof CancellationException) { // 异步操作被取消了
  throw (CancellationException) cause;
 }
 throw new ExecutionException(cause); // 其他异常
 }

 @Override
 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
 if (await(timeout, unit)) {// 超时等待执行结果
  Throwable cause = cause;
  if (cause == null) {// 没有发生异常,异步操作正常结束
  return getNow;
  }
  if (cause instanceof CancellationException) {// 异步操作被取消了
  throw (CancellationException) cause;
  }
  throw new ExecutionException(cause);// 其他异常
 }
 // 时间到了异步操作还没有结束, 抛出超时异常
 throw new TimeoutException;
 }

 @Override
 public boolean isSuccess {
 return result == null ? false : !(result instanceof CauseHolder);
 }

 @SuppressWarnings("unchecked")
 @Override
 public V getNow {
 return (V) (result == SUCCESS_SIGNAL ? null : result);
 }

 @Override
 public Throwable cause {
 if (result != null && result instanceof CauseHolder) {
  return ((CauseHolder) result).cause;
 }
 return null;
 }

 @Override
 public IFuture<V> addListener(IFutureListener<V> listener) {
 if (listener == null) {
  throw new NullPointerException("listener");
 }
 if (isDone) { // 若已完成直接通知该监听器
  notifyListener(listener);
  return this;
 }
 synchronized (this) {
  if (!isDone) {
  listeners.add(listener);
  return this;
  }
 }
 notifyListener(listener);
 return this;
 }

 @Override
 public IFuture<V> removeListener(IFutureListener<V> listener) {
 if (listener == null) {
  throw new NullPointerException("listener");
 }

 if (!isDone) {
  listeners.remove(listener);
 }

 return this;
 }

 @Override
 public IFuture<V> await throws InterruptedException {
 return await0(true);
 }

 private IFuture<V> await0(boolean interruptable) throws InterruptedException {
 if (!isDone) { // 若已完成就直接返回了
  // 若允许终端且被中断了则抛出中断异常
  if (interruptable && Thread.interrupted) {
  throw new InterruptedException("thread " + Thread.currentThread.getName + " has been interrupted.");
  }

  boolean interrupted = false;
  synchronized (this) {
  while (!isDone) {
   try {
   wait; // 释放锁进入waiting状态,等待其它线程调用本对象的notify/notifyAll方法
   } catch (InterruptedException e) {
   if (interruptable) {
    throw e;
   } else {
    interrupted = true;
   }
   }
  }
  }
  if (interrupted) {
  // 为什么这里要设中断标志位?因为从wait方法返回后, 中断标志是被clear了的,
  // 这里重新设置以便让其它代码知道这里被中断了。
  Thread.currentThread.interrupt;
  }
 }
 return this;
 }

 @Override
 public boolean await(long timeoutMillis) throws InterruptedException {
 return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), true);
 }

 @Override
 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
 return await0(unit.toNanos(timeout), true);
 }

 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
 if (isDone) {
  return true;
 }

 if (timeoutNanos <= 0) {
  return isDone;
 }

 if (interruptable && Thread.interrupted) {
  throw new InterruptedException(toString);
 }

 long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime;
 long waitTime = timeoutNanos;
 boolean interrupted = false;

 try {
  synchronized (this) {
  if (isDone) {
   return true;
  }

  if (waitTime <= 0) {
   return isDone;
  }

  for (;;) {
   try {
   wait(waitTime / 1000000, (int) (waitTime % 1000000));
   } catch (InterruptedException e) {
   if (interruptable) {
    throw e;
   } else {
    interrupted = true;
   }
   }

   if (isDone) {
   return true;
   } else {
   waitTime = timeoutNanos - (System.nanoTime - startTime);
   if (waitTime <= 0) {
    return isDone;
   }
   }
  }
  }
 } finally {
  if (interrupted) {
  Thread.currentThread.interrupt;
  }
 }
 }

 @Override
 public IFuture<V> awaitUninterruptibly {
 try {
  return await0(false);
 } catch (InterruptedException e) { // 这里若抛异常了就无法处理了
  throw new java.lang.InternalError;
 }
 }

 @Override
 public boolean awaitUninterruptibly(long timeoutMillis) {
 try {
  return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), false);
 } catch (InterruptedException e) {
  throw new java.lang.InternalError;
 }
 }

 @Override
 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
 try {
  return await0(unit.toNanos(timeout), false);
 } catch (InterruptedException e) {
  throw new java.lang.InternalError;
 }
 }

 protected IFuture<V> setFailure(Throwable cause) {
 if (setFailure0(cause)) {
  notifyListeners;
  return this;
 }
 throw new IllegalStateException("complete already: " + this);
 }

 private boolean setFailure0(Throwable cause) {
 if (isDone) {
  return false;
 }

 synchronized (this) {
  if (isDone) {
  return false;
  }
  result = new CauseHolder(cause);
  notifyAll;
 }

 return true;
 }

 protected IFuture<V> setSuccess(Object result) {
 if (setSuccess0(result)) { // 设置成功后通知监听器
  notifyListeners;
  return this;
 }
 throw new IllegalStateException("complete already: " + this);
 }

 private boolean setSuccess0(Object result) {
 if (isDone) {
  return false;
 }

 synchronized (this) {
  if (isDone) {
  return false;
  }
  if (result == null) { // 异步操作正常执行完毕的结果是null
  this.result = SUCCESS_SIGNAL;
  } else {
  this.result = result;
  }
  notifyAll;
 }
 return true;
 }

 private void notifyListeners {
 for (IFutureListener<V> l : listeners) {
  notifyListener(l);
 }
 }

 private void notifyListener(IFutureListener<V> l) {
 try {
  l.operationCompleted(this);
 } catch (Exception e) {
  e.printStackTrace;
 }
 }

 private static class SuccessSignal {

 }

 private static final class CauseHolder {
 final Throwable cause;

 CauseHolder(Throwable cause) {
  this.cause = cause;
 }
 }
}

那么要怎么使用这个呢,有了上面的骨架实现,我们就可以定制各种各样的异步结果了。下面模拟一下一个延时的任务:

 package future.test;

import future.IFuture;
import future.IFutureListener;

/**
 * 延时加法
 * @author lixiaohui
 *
 */
public class DelayAdder {

 public static void main(String[] args) {
 new DelayAdder.add(3 * 1000, 1, 2).addListener(new IFutureListener<Integer> {

  @Override
  public void operationCompleted(IFuture<Integer> future) throws Exception {
  System.out.println(future.getNow);
  }

 });
 }
 /**
 * 延迟加
 * @param delay 延时时长 milliseconds
 * @param a 加数
 * @param b 加数
 * @return 异步结果
 */
 public DelayAdditionFuture add(long delay, int a, int b) {
 DelayAdditionFuture future = new DelayAdditionFuture;
 new Thread(new DelayAdditionTask(delay, a, b, future)).start;
 return future;
 }

 private class DelayAdditionTask implements Runnable {

 private long delay;

 private int a, b;

 private DelayAdditionFuture future;

 public DelayAdditionTask(long delay, int a, int b, DelayAdditionFuture future) {
  super;
  this.delay = delay;
  this.a = a;
  this.b = b;
  this.future = future;
 }

 @Override
 public void run {
  try {
  Thread.sleep(delay);
  Integer i = a + b;
  // TODO 这里设置future为完成状态(正常执行完毕)
  future.setSuccess(i);
  } catch (InterruptedException e) {
  // TODO 这里设置future为完成状态(异常执行完毕)
  future.setFailure(e.getCause);
  }
 }

 }
} package future.test;

import future.AbstractFuture;
import future.IFuture;
//只是把两个方法对外暴露
public class DelayAdditionFuture extends AbstractFuture<Integer> {

 @Override
 public IFuture<Integer> setSuccess(Object result) {
 return super.setSuccess(result);
 }

 @Override
 public IFuture<Integer> setFailure(Throwable cause) {
 return super.setFailure(cause);
 }

}

可以看到客户端不用主动去询问future是否完成,而是future完成时自动回调operationcompleted方法,客户端只需在回调里实现逻辑即可。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 浅析Java和Scala中的Future

    随着CPU的核数的增加,异步编程模型在并发领域中的得到了越来越多的应用,由于Scala是一门函数式语言,天然的支持异步编程模型,今天主要来看一下Java和Scala中的Futrue,带你走入异步编程的大门. Future 很多同学可能会有疑问,Futrue跟异步编程有什么关系?从Future的表面意思是未来,一个Future对象可以看出一个将来得到的结果,这就和异步执行的概念很像,你只管自己去执行,只要将最终的结果传达给我就行,线程不必一直暂停等待结果,可以在具体异步任务执行的时候去执行其他操作

  • Java8 CompletableFuture详解

    Java 8来了,是时候学一下新的东西了.Java 7和Java 6只不过是稍作修改的版本,而Java 8将会发生重大的改进.或许是Java 8太大了吧?今天我会给你彻底地解释JDK 8中的新的抽象 – CompletableFuture.众所周知,Java 8不到一年就会发布,因此这篇文章是基于JDK 8 build 88 with lambda support的.CompletableFuture extends Future提供了方法,一元操作符和促进异步性以及事件驱动编程模型,它并不止步

  • 简单讲解Java的Future编程模式

    用过Java并发包的朋友或许对Future (interface) 已经比较熟悉了,其实Future 本身是一种被广泛运用的并发设计模式,可在很大程度上简化需要数据流同步的并发应用开发.在一些领域语言(如Alice ML )中甚至直接于语法层面支持Future. 这里就以java.util.concurrent.Future 为例简单说一下Future的具体工作方式.Future对象本身可以看作是一个显式的引用,一个对异步处理结果的引用.由于其异步性质,在创建之初,它所引用的对象可能还并不可用(

  • java自定义任务类定时执行任务示例 callable和future接口使用方法

    Callable 和 Future接口Callable是类似于Runnable的接口,实现Callable接口的类和实现Runnable的类都是可被其它线程执行的任务. Callable和Runnable有几点不同: (1)Callable规定的方法是call(),而Runnable规定的方法是run().(2)Callable的任务执行后可返回值,而Runnable的任务是不能返回值的. (3)call()方法可抛出异常,而run()方法是不能抛出异常的.(4)运行Callable任务可拿到一

  • java多线程返回值使用示例(callable与futuretask)

    Callable接口类似于Runnable,从名字就可以看出来了,但是Runnable不会返回结果,并且无法抛出返回结果的异常,而Callable功能更强大一些,被线程执行后,可以返回值,这个返回值可以被Future拿到,也就是说,Future可以拿到异步执行任务的返回值,下面来看一个简单的例子 复制代码 代码如下: package com.future.test; import java.io.FileNotFoundException;import java.io.IOException;i

  • Java多线程之异步Future机制的原理和实现

    项目中经常有些任务需要异步(提交到线程池中)去执行,而主线程往往需要知道异步执行产生的结果,这时我们要怎么做呢?用runnable是无法实现的,我们需要用callable看下面的代码: import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurren

  • Java多线程实现异步调用的方法

    在JAVA平台,实现异步调用的角色有如下三个角色:调用者 提货单   真实数据 一个调用者在调用耗时操作,不能立即返回数据时,先返回一个提货单.然后在过一断时间后凭提货单来获取真正的数据. 去蛋糕店买蛋糕,不需要等蛋糕做出来(假设现做要很长时间),只需要领个提货单就可以了(去干别的事情),等到蛋糕做好了,再拿提货单取蛋糕就可以了. public class Main { public static void main(String[] args) { System.out.println("ma

  • java多线程编程同步器Future和FutureTask解析及代码示例

    publicinterfaceFuture<V>Future表示异步计算的结果.它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果.计算完成后只能使用get方法来获取结果,如有必要,计算完成前可以阻塞此方法.取消则由cancel方法来执行.还提供了其他方法,以确定任务是正常完成还是被取消了.一旦计算完成,就不能再取消计算.如果为了可取消性而使用Future但又不提供可用的结果,则可以声明Future<?>形式类型.并返回null作为底层任务的结果. Future主要

  • Java多线程 Callable、Future 和FutureTask

    目录 1 Callable介绍 2 Future介绍 2.1 在Future接口中声明方法 2.2 Future提供了三种功能 3 FutureTask 4 Future和FutureTask的使用 4.1 使用Callable+Future获取执行结果 4.2 使用Callable+Future获取执行结果 前言: 创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口. 这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果. 如果需要获取执行结果,就必须

  • 浅谈Java多线程处理中Future的妙用(附源码)

    java 中Future是一个未来对象,里面保存这线程处理结果,它像一个提货凭证,拿着它你可以随时去提取结果.在两种情况下,离开Future几乎很难办.一种情况是拆分订单,比如你的应用收到一个批量订单,此时如果要求最快的处理订单,那么需要并发处理,并发的结果如果收集,这个问题如果自己去编程将非常繁琐,此时可以使用CompletionService解决这个问题.CompletionService将Future收集到一个队列里,可以按结果处理完成的先后顺序进队.另外一种情况是,如果你需要并发去查询一

  • java多线程中的异常处理机制简析

    在java多线程程序中,所有线程都不允许抛出未捕获的checked exception,也就是说各个线程需要自己把自己的checked exception处理掉.这一点是通过java.lang.Runnable.run()方法声明(因为此方法声明上没有throw exception部分)进行了约束.但是线程依然有可能抛出unchecked exception,当此类异常跑抛出时,线程就会终结,而对于主线程和其他线程完全不受影响,且完全感知不到某个线程抛出的异常(也是说完全无法catch到这个异常

  • Java多线程Callable和Future接口区别

    Runnable是执行工作的独立任务,但是不返回任何值.如果我们希望任务完成之后有返回值,可以实现Callable接口.在JavaSE5中引入的Callable是一个具有类型参数的范型,他的类型参数方法表示为方法call()而不是run()中返回的值,并且必须使用ExecutorService.submint()方法进行调用. 代码如下 import java.util.concurrent.Callable; import java.util.concurrent.ExecutionExcep

  • Android异步消息处理机制实现原理详解

    消息处理机制主要对象:Looper,Handler,Message(还有MessageQueue和Runnable) Looper不断从MessageQueue消息队列中取出一个Message,然后传给Handle,如此循环往复,如果队列为空,那么它会进入休眠. 这些类的主要变量 Looper.java static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>(); private static

  • Java多线程编程小实例模拟停车场系统

    下面分享的是一个Java多线程模拟停车场系统的小实例(Java的应用还是很广泛的,哈哈),具体代码如下: Park类 public class Park { boolean []park=new boolean[3]; public boolean equals() { return true; } } Car: public class Car { private String number; private int position=0; public Car(String number)

  • Java多线程编程安全退出线程方法介绍

    线程停止 Thread提供了一个stop()方法,但是stop()方法是一个被废弃的方法.为什么stop()方法被废弃而不被使用呢?原因是stop()方法太过于暴力,会强行把执行一半的线程终止.这样会就不会保证线程的资源正确释放,通常是没有给与线程完成资源释放工作的机会,因此会导致程序工作在不确定的状态下 那我们该使用什么来停止线程呢 Thread.interrupt(),我们可以用他来停止线程,他是安全的,可是使用他的时候并不会真的停止了线程,只是会给线程打上了一个记号,至于这个记号有什么用呢

随机推荐