Java8 CompletableFuture详解

Java 8来了,是时候学一下新的东西了。Java 7和Java 6只不过是稍作修改的版本,而Java 8将会发生重大的改进。或许是Java 8太大了吧?今天我会给你彻底地解释JDK 8中的新的抽象 – CompletableFuture。众所周知,Java 8不到一年就会发布,因此这篇文章是基于JDK 8 build 88 with lambda support的。CompletableFuture extends Future提供了方法,一元操作符和促进异步性以及事件驱动编程模型,它并不止步于旧版本的Java中。如果你打开JavaDoc of CompletableFuture你一定会感到震惊。大约有五十种方法(!),而且它们中的一些非常有意思而且不好理解,例如:


代码如下:

public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletableFuture<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn,
    Executor executor)

不必担心,继续读下去。CompletableFuture收集了所有ListenableFuture in Guava 和 SettableFuture的特征。此外,内置的lambda表达式使它更接近于Scala/Akka futures。这听起来好得令人难以置信,但是请继续读下去。CompletableFuture有两个主要的方面优于ol中的Future – 异步回调/转换,这能使得从任何时刻的任何线程都可以设置CompletableFuture的值。

一、提取、修改包装的值

通常futures代表其它线程中运行的代码,但事实并非总是如此。有时你想要创造一个Future来表示你知道将会发生什么,例如JMS message arrival。所以你有Future但是未来并没有潜在的异步工作。你只是想在未来JMS消息到达时简单地完成(解决),这是由一个事件驱动的。在这种情况下,你可以简单地创建CompletableFuture来返还给你的客户端,只要你认为你的结果是可用的,仅仅通过complete()就能解锁所有等待Future的客户端。

首先你可以简单地创建新的CompletableFuture并且给你的客户端:


代码如下:

public CompletableFuture<String> ask() {
    final CompletableFuture<String> future = new CompletableFuture<>();
    //...
    return future;
}

注意这个future和Callable没有任何联系,没有线程池也不是异步工作。如果现在客户端代码调用ask().get()它将永远阻塞。如果寄存器完成回调,它们就永远不会生效了。所以关键是什么?现在你可以说:


代码如下:

future.complete("42")

…此时此刻所有客户端Future.get()将得到字符串的结果,同时完成回调以后将会立即生效。当你想代表Future的任务时是非常方便的,而且没有必要去计算一些执行线程的任务上。CompletableFuture.complete()只能调用一次,后续调用将被忽略。但也有一个后门叫做CompletableFuture.obtrudeValue(…)覆盖一个新Future之前的价值,请小心使用。

有时你想要看到信号发生故障的情况,如你所知Future对象可以处理它所包含的结果或异常。如果你想进一步传递一些异常,可以用CompletableFuture.completeExceptionally(ex) (或者用obtrudeException(ex)这样更强大的方法覆盖前面的异常)。 completeExceptionally()也能解锁所有等待的客户端,但这一次从get()抛出异常。说到get(),也有CompletableFuture.join()方法在错误处理方面有着细微的变动。但总体上,它们都是一样的。最后也有CompletableFuture.getNow(valueIfAbsent)方法没有阻塞但是如果Future还没完成将返回默认值,这使得当构建那种我们不想等太久的健壮系统时非常有用。

最后static的方法是用completedFuture(value)来返回已经完成Future的对象,当测试或者写一些适配器层时可能非常有用。

二、创造和获取CompletableFuture

好了,那么手动地创建CompletableFuture是我们唯一的选择吗?不一定。就像一般的Futures,我们可以关联存在的任务,同时CompletableFuture使用工厂方法:


代码如下:

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

无参方法Executor是以…Async结尾同时将会使用ForkJoinPool.commonPool()(全局的,在JDK8中介绍的通用池),这适用于CompletableFuture类中的大多数的方法。runAsync()易于理解,注意它需要Runnable,因此它返回CompletableFuture<Void>作为Runnable不返回任何值。如果你需要处理异步操作并返回结果,使用Supplier<U>:


代码如下:

final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        //...long running...
        return "42";
    }
}, executor);

但是别忘了,Java 8里面还有lambdas表达式呢!


代码如下:

finalCompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    //...long running...
    return "42";
}, executor);

或者:

代码如下:

final CompletableFuture<String> future =
    CompletableFuture.supplyAsync(() -> longRunningTask(params), executor);

虽然这篇文章不是关于Lambda的,但是我会相当频繁地使用lambda表达式。

三、转换和作用于CompletableFuture(thenApply)

我说过CompletableFuture优于Future但是你还不知道为什么吗?简单说,因为CompletableFuture是一个原子也是一个因子。我说的这句话没什么帮助吗?Scala和JavaScript都允许future完成时允许注册异步回调,直到它准备好我们才要等待和阻止它。我们可以简单地说:运行这个函数时就出现了结果。此外,我们可以叠加这些功能,把多个future组合在一起等。例如如果我们从String转为Integer,我们可以转为在不关联的前提下从CompletableFuture到 CompletableFuture<Integer。这是通过thenApply()的方法:


代码如下:

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);<p></p>

<p>如前所述...Async版本提供对CompletableFuture的大多数操作,因此我将在后面的部分中跳过它们。记住,第一个方法将在future完成的相同线程中调用该方法,而剩下的两个将在不同的线程池中异步地调用它。
让我们来看看thenApply()的工作流程:</p>

<p><pre class="brush: java; gutter: true; first-line: 1; highlight: []; html-script: false">
CompletableFuture<String> f1 = //...
CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt);
CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI);
</p>

或在一个声明中:


代码如下:

CompletableFuture<Double> f3 =
    f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);

这里,你会看到一个序列的转换,从String到Integer再到Double。但最重要的是,这些转换既不立即执行也不停止。这些转换既不立即执行也不停止。他们只是记得,当原始f1完成他们所执行的程序。如果某些转换非常耗时,你可以提供你自己的Executor来异步地运行他们。注意,此操作相当于Scala中的一元map。

四、运行完成的代码(thenAccept/thenRun)


代码如下:

CompletableFuture<Void> thenAccept(Consumer<? super T> block);
CompletableFuture<Void> thenRun(Runnable action);

在future的管道里有两种典型的“最终”阶段方法。他们在你使用future的值的时候做好准备,当 thenAccept()提供最终的值时,thenRun执行 Runnable,这甚至没有方法去计算值。例如:


代码如下:

future.thenAcceptAsync(dbl -> log.debug("Result: {}", dbl), executor);
log.debug("Continuing");

…Async变量也可用两种方法,隐式和显式执行器,我不会过多强调这个方法。
thenAccept()/thenRun()方法并没有发生阻塞(即使没有明确的executor)。它们像一个事件侦听器/处理程序,你连接到一个future时,这将执行一段时间。”Continuing”消息将立即出现,尽管future甚至没有完成。

五、单个CompletableFuture的错误处理

到目前为止,我们只讨论计算的结果。那么异常呢?我们可以异步地处理它们吗?当然!


代码如下:

CompletableFuture<String> safe =
    future.exceptionally(ex -> "We have a problem: " + ex.getMessage());

exceptionally()接受一个函数时,将调用原始future来抛出一个异常。我们会有机会将此异常转换为和Future类型的兼容的一些值来进行恢复。safe进一步的转换将不再产生一个异常而是从提供功能的函数返回一个String值。
一个更加灵活的方法是handle()接受一个函数,它接收正确的结果或异常:


代码如下:

CompletableFuture<Integer> safe = future.handle((ok, ex) -> {
    if (ok != null) {
        return Integer.parseInt(ok);
    } else {
        log.warn("Problem", ex);
        return -1;
    }
});

handle()总是被调用,结果和异常都非空,这是个一站式全方位的策略。

六、一起结合两个CompletableFuture

异步处理过程之一的CompletableFuture非常不错但是当多个这样的futures以各种方式组合在一起时确实显示了它的强大。

七、结合(链接)这两个futures(thenCompose())

有时你想运行一些future的值(当它准备好了),但这个函数也返回了future。CompletableFuture足够灵活地明白我们的函数结果现在应该作为顶级的future,对比CompletableFuture<CompletableFuture>。方法 thenCompose()相当于Scala的flatMap:


代码如下:

<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);

…Async变化也是可用的,在下面的事例中,仔细观察thenApply()(map)和thenCompose()(flatMap)的类型和差异,当应用calculateRelevance()方法返回CompletableFuture:

代码如下:

CompletableFuture<Document> docFuture = //...

CompletableFuture<CompletableFuture<Double>> f =
    docFuture.thenApply(this::calculateRelevance);

CompletableFuture<Double> relevanceFuture =
    docFuture.thenCompose(this::calculateRelevance);

//...

private CompletableFuture<Double> calculateRelevance(Document doc)  //...

thenCompose()是一个重要的方法允许构建健壮的和异步的管道,没有阻塞和等待的中间步骤。

八、两个futures的转换值(thenCombine())

当thenCompose()用于链接一个future时依赖另一个thenCombine,当他们都完成之后就结合两个独立的futures:


代码如下:

<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)

…Async变量也是可用的,假设你有两个CompletableFuture,一个加载Customer另一个加载最近的Shop。他们彼此完全独立,但是当他们完成时,您想要使用它们的值来计算Route。这是一个可剥夺的例子:

代码如下:

CompletableFuture<Customer> customerFuture = loadCustomerDetails(123);
CompletableFuture<Shop> shopFuture = closestShop();
CompletableFuture<Route> routeFuture =
    customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));

//...

private Route findRoute(Customer customer, Shop shop) //...

请注意,在Java 8中可以用(cust, shop) -> findRoute(cust, shop)简单地代替this::findRoute方法的引用:


代码如下:

customerFuture.thenCombine(shopFuture, this::findRoute);

你也知道,我们有customerFuture 和 shopFuture。那么routeFuture包装它们然后“等待”它们完成。当他们准备好了,它会运行我们提供的函数来结合所有的结果(findRoute())。当两个基本的futures完成并且 findRoute()也完成时,这样routeFuture将会完成。

九、等待所有的 CompletableFutures 完成

如果不是产生新的CompletableFuture连接这两个结果,我们只是希望当完成时得到通知,我们可以使用thenAcceptBoth()/runAfterBoth()系列的方法,(…Async 变量也是可用的)。它们的工作方式与thenAccept() 和 thenRun()类似,但是是等待两个futures而不是一个:


代码如下:

<U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> block)
CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)

想象一下上面的例子,这不是产生新的 CompletableFuture,你只是想要立刻发送一些事件或刷新GUI。这可以很容易地实现:thenAcceptBoth():

代码如下:

customerFuture.thenAcceptBoth(shopFuture, (cust, shop) -> {
    final Route route = findRoute(cust, shop);
    //refresh GUI with route
});

我希望我是错的,但也许有些人会问自己一个问题:为什么我不能简单地阻塞这两个futures呢? 就像:


代码如下:

Future<Customer> customerFuture = loadCustomerDetails(123);
Future<Shop> shopFuture = closestShop();
findRoute(customerFuture.get(), shopFuture.get());

好了,你当然可以这么做。但是最关键的一点是CompletableFuture是允许异步的,它是事件驱动的编程模型而不是阻塞并急切地等待着结果。所以在功能上,上面两部分代码是等价的,但后者没有必要占用一个线程来执行。

十、等待第一个 CompletableFuture 来完成任务

另一个有趣的事是CompletableFutureAPI可以等待第一个(与所有相反)完成的future。当你有两个相同类型任务的结果时就显得非常方便,你只要关心响应时间就行了,没有哪个任务是优先的。API方法(…Async变量也是可用的):

代码如下:

CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> block)
CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, Runnable action)

作为一个例子,你有两个系统可以集成。一个具有较小的平均响应时间但是拥有高的标准差,另一个一般情况下较慢,但是更加容易预测。为了两全其美(性能和可预测性)你可以在同一时间调用两个系统并等着谁先完成。通常这会是第一个系统,但是在进度变得缓慢时,第二个系统就可以在可接受的时间内完成:

代码如下:

CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
fast.acceptEither(predictable, s -> {
    System.out.println("Result: " + s);
});

s代表了从fetchFast()或是fetchPredictably()得到的String。我们不必知道也无需关心。

十一、完整地转换第一个系统

applyToEither()算是 acceptEither()的前辈了。当两个futures快要完成时,后者只是简单地调用一些代码片段,applyToEither()将会返回一个新的future。当这两个最初的futures完成时,新的future也会完成。API有点类似于(…Async 变量也是可用的):

代码如下:

<U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other, Function<? super T,U> fn)

这个额外的fn功能在第一个future被调用时能完成。我不确定这个专业化方法的目的是什么,毕竟一个人可以简单地使用:fast.applyToEither(predictable).thenApply(fn)。因为我们坚持用这个API,但我们的确不需要额外功能的应用程序,我会简单地使用Function.identity()占位符:


代码如下:

CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
CompletableFuture<String> firstDone =
    fast.applyToEither(predictable, Function.<String>identity());

第一个完成的future可以通过运行。请注意,从客户的角度来看,两个futures实际上是在firstDone的后面而隐藏的。客户端只是等待着future来完成并且通过applyToEither()使得当最先的两个任务完成时通知客户端。

十二、多种结合的CompletableFuture

我们现在知道如何等待两个future来完成(使用thenCombine())并第一个完成(applyToEither())。但它可以扩展到任意数量的futures吗?的确,使用static辅助方法:


代码如下:

static CompletableFuture<Void< allOf(CompletableFuture<?<... cfs)
static CompletableFuture<Object< anyOf(CompletableFuture<?<... cfs)

allOf()当所有的潜在futures完成时,使用了一个futures数组并且返回一个future(等待所有的障碍)。另一方面anyOf()将会等待最快的潜在futures,请看一下返回futures的一般类型,这不是你所期望的吗?我们会在接下来的文章中关注一下这个问题。

总结

我们探索了整个CompletableFuture API。我确信这样就能战无不胜了,所以在下一篇文章中我们将研究另一个简单的web爬虫程序的实现,使用CompletableFuture方法和Java 8 lambda表达式,我们也会看看CompletableFuture的

(0)

相关推荐

  • 浅析Java和Scala中的Future

    随着CPU的核数的增加,异步编程模型在并发领域中的得到了越来越多的应用,由于Scala是一门函数式语言,天然的支持异步编程模型,今天主要来看一下Java和Scala中的Futrue,带你走入异步编程的大门. Future 很多同学可能会有疑问,Futrue跟异步编程有什么关系?从Future的表面意思是未来,一个Future对象可以看出一个将来得到的结果,这就和异步执行的概念很像,你只管自己去执行,只要将最终的结果传达给我就行,线程不必一直暂停等待结果,可以在具体异步任务执行的时候去执行其他操作

  • java自定义任务类定时执行任务示例 callable和future接口使用方法

    Callable 和 Future接口Callable是类似于Runnable的接口,实现Callable接口的类和实现Runnable的类都是可被其它线程执行的任务. Callable和Runnable有几点不同: (1)Callable规定的方法是call(),而Runnable规定的方法是run().(2)Callable的任务执行后可返回值,而Runnable的任务是不能返回值的. (3)call()方法可抛出异常,而run()方法是不能抛出异常的.(4)运行Callable任务可拿到一

  • 简单讲解Java的Future编程模式

    用过Java并发包的朋友或许对Future (interface) 已经比较熟悉了,其实Future 本身是一种被广泛运用的并发设计模式,可在很大程度上简化需要数据流同步的并发应用开发.在一些领域语言(如Alice ML )中甚至直接于语法层面支持Future. 这里就以java.util.concurrent.Future 为例简单说一下Future的具体工作方式.Future对象本身可以看作是一个显式的引用,一个对异步处理结果的引用.由于其异步性质,在创建之初,它所引用的对象可能还并不可用(

  • java多线程返回值使用示例(callable与futuretask)

    Callable接口类似于Runnable,从名字就可以看出来了,但是Runnable不会返回结果,并且无法抛出返回结果的异常,而Callable功能更强大一些,被线程执行后,可以返回值,这个返回值可以被Future拿到,也就是说,Future可以拿到异步执行任务的返回值,下面来看一个简单的例子 复制代码 代码如下: package com.future.test; import java.io.FileNotFoundException;import java.io.IOException;i

  • Java多线程之异步Future机制的原理和实现

    项目中经常有些任务需要异步(提交到线程池中)去执行,而主线程往往需要知道异步执行产生的结果,这时我们要怎么做呢?用runnable是无法实现的,我们需要用callable看下面的代码: import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurren

  • Java8 CompletableFuture详解

    Java 8来了,是时候学一下新的东西了.Java 7和Java 6只不过是稍作修改的版本,而Java 8将会发生重大的改进.或许是Java 8太大了吧?今天我会给你彻底地解释JDK 8中的新的抽象 – CompletableFuture.众所周知,Java 8不到一年就会发布,因此这篇文章是基于JDK 8 build 88 with lambda support的.CompletableFuture extends Future提供了方法,一元操作符和促进异步性以及事件驱动编程模型,它并不止步

  • Java 8 的异步编程利器 CompletableFuture的实例详解

    目录 一个例子回顾Future 一个例子走进CompletableFuture CompletableFuture使用场景 创建异步任务 supplyAsync方法 runAsync方法 任务异步回调 1.thenRun/thenRunAsync 2.thenAccept/thenAcceptAsync 3.thenApply/thenApplyAsync 4.exceptionally 5.whenComplete方法 6.handle方法 多个任务组合处理 AND组合关系 OR组合的关系 区

  • 详解Java8 CompletableFuture的并行处理用法

    目录 前言 场景 用法 1.在线API 2.编写在线API查询 3.编写查询服务 4.编写测试接口 5.效果 6.CompletableFuture并行查询 7.编写测试接口 8.CompletableFuture效果 思考 前言 工作中你可能会遇到很多这样的场景,一个接口,要从其他几个service调用查询方法,分别获取到需要的值之后再封装数据返回. 还可能在微服务中遇到类似的情况,某个服务的接口,要使用好几次feign去调用其他服务的方法获取数据,最后拿到想要的值并封装返回给前端. 这样的场

  • 详解Java8与Runtime.getRuntime().availableProcessors()

    lambda表达式以及并行流.官方承诺你写出来的代码更运行得更快.流会自动通过Fork/Join池并行地执行.我听过一些关于Java 8的主题的演讲,不过在这个非常关键的点上它们都说的有点问题.我计划在后续的文章中对并行流进行下深入的讲解,在这之前我先花点时间仔细地分析下它.关于这个问题,我只想问你们一个非常简单的问题,不过也是一个非常重要的问题,因为它是很多问题的关键所在.这个问题是: 这些并行操作的线程都是从哪来的? 在Java 8里,我们有一个通用的Fork/Join池,我们可以通过For

  • CompletableFuture 异步编排示例详解

    目录 从Future聊起 CompletableFuture 创建异步任务 异步回调 异步编排 串行 AND OR Future 机制扩展 CompletableFuture 实践 从Future聊起 Future是java 1.5引入的异步编程api,它表示一个异步计算结果,提供了获取异步结果的能力,解决了多线程场景下Runnable线程任务无法获取结果的问题. 但是其获取异步结果的方式并不够优雅,我们必须使用Future.get的方式阻塞调用线程,或者使用轮询方式判断 Future.isDo

  • java异步编程CompletableFuture使用示例详解

    目录 一.简单介绍 二.常见操作 1.使用默认线程池 2.使用自定义线程池 3.获取线程的执行结果 三.处理异步结算的结果 四.异常处理 五.组合 CompletableFuture 六.并行运行多个 CompletableFuture 七.案例 1.从多个平台获取书价格 2.从任意一个平台获取结果就返回 一.简单介绍 CompletableFuture 同时实现了 Future 和 CompletionStage 接口. public class CompletableFuture<T> i

  • Java CompletableFuture实现原理分析详解

    目录 简介 CompletableFuture类结构 CompletableFuture回调原理 CompletableFuture异步原理 总结 简介 前面的一篇文章你知道Java8并发新特性CompletableFuture吗?介绍了CompletableFuture的特性以及一些使用方法,今天我们主要来聊一聊CompletableFuture的回调功能以及异步工作原理是如何实现的. CompletableFuture类结构 1.CompletableFuture类结构主要有两个属性 pub

  • Java8新特性Stream流实例详解

    什么是Stream流? Stream流是数据渠道,用于操作数据源(集合.数组等)所生成的元素序列. Stream的优点:声明性,可复合,可并行.这三个特性使得stream操作更简洁,更灵活,更高效. Stream的操作有两个特点:可以多个操作链接起来运行,内部迭代. Stream可分为并行流与串行流,Stream API 可以声明性地通过 parallel() 与sequential() 在并行流与顺序流之间进行切换.串行流就不必再细说了,并行流主要是为了为了适应目前多核机器的时代,提高系统CP

  • Java8 新特性Lambda表达式实例详解

    Java8 新特性Lambda表达式实例详解 在介绍Lambda表达式之前,我们先来看只有单个方法的Interface(通常我们称之为回调接口): public interface OnClickListener { void onClick(View v); } 我们是这样使用它的: button.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { v.setText("

  • 详解Java8 Collect收集Stream的方法

    Collection, Collections, collect, Collector, Collectos Collection是Java集合的祖先接口. Collections是java.util包下的一个工具类,内涵各种处理集合的静态方法. java.util.stream.Stream#collect(java.util.stream.Collector<? super T,A,R>)是Stream的一个函数,负责收集流. java.util.stream.Collector 是一个收

随机推荐