Java8 使用CompletableFuture 构建异步应用方式
目录
- 概述
- 同步API VS 异步API
- 同步API
- 异步API
- 同步的困扰
- 实现异步API
- 将同步方法改为异步方法
- 处理异常错误
概述
为了展示 CompletableFuture 的强大特性, 创建一个名为 best-price-finder 的应用,它会查询多个在线商店,依据给定的产品或服务找出最低的价格。
这个过程中,会学到几个重要的技能。
- 如何提供异步API
- 如何让你使用了同步API的代码变为非阻塞代码
我们将共同学习如何使用流水线将两个接续的异步操作合并为一个异步计算操作。 比如,在线商店返回了你想要购买的商品的原始价格,并附带着一个折扣代码——最终,要计算出该商品的实际价格,你不得不访问第二个远程折扣服务,查询该折扣代码对应的折扣比率
- 如何以响应式的方式处理异步操作的完成事件,以及随着各个商品返回它的商品价格,最佳价格查询器如何持续的更新每种商品的最佳推荐,而不是等待所有的商店都返回他们各自的价格(这种方式存在着一定的风险,一旦某家商店的服务中断,用户可能遭遇白屏)。
同步API VS 异步API
同步API
是对传统方法的另一种称呼:你调用了某个方法,调用方在被调用方运行的过程中会等待,被调用方运行结束返回,调用方取的了被调用方的返回值并继续运行。
即使调用方和被调用方在不同的线程中运行,调用方还是需要等被调用方结束运行,这就是 阻塞式调用。
异步API
与同步API相反,异步API会直接返回,或者至少在被调用方计算完成之前,将它剩余的计算任务交给另一个线程去做,该线程和调用方是异步的。 这就是非阻塞调用。
执行剩余的计算任务的线程将他的计算结果返回给调用方。 返回的方式要么通过回调函数,要么由调用方再此执行一个“等待,指导计算完成”的方法调用。
同步的困扰
为了实现最佳价格查询器应用,让我们从每个商店都应该提供的API定义入手。
首先,商店应该声明依据指定产品名称返回价格的方法:
public class Shop { public double getPrice(String product) { // TODO } }
该方法的内部实现会查询商店的数据库,但也有可能执行一些其他耗时的任务,比如联系其他外部服务。
用 delay 方法模拟这些长期运行的方法的执行,模拟执行1S ,方法声明如下。
public static void delay() { try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new RuntimeException(e); } }
getPrice 方法会调用 delay 方法,并返回一个随机计算的值
public double getPrice(String product) { return calculatePrice(product); } private double calculatePrice(String product) { delay(); return random.nextDouble() * product.charAt(0) + product.charAt(1); }
很明显,这个API的使用者(这个例子中为最佳价格查询器)调用该方法时,它依旧会被阻塞。为等待同步事件完成而等待1S,这是无法接受的,尤其是考虑到最佳价格查询器对网络中的所有商店都要重复这种操作。
接下来我们会了解如何以异步方式使用同步API解决这个问题。但是,出于学习如何设计异步API的考虑, 你希望以异步API的方式重写这段代码, 假装我们还在深受这一困难的烦恼,如何以异步API的方式重写这段代码,让用户更流畅地访问呢?
实现异步API
将同步方法改为异步方法
为了实现这个目标,你首先需要将 getPrice 转换为 getPriceAsync 方法,并修改它的返回值:
public Future<Double> getPriceAsync(String product) { ... }
我们知道 ,Java 5引入了 java.util.concurrent.Future 接口表示一个异步计算(即调用线程可以继续运行,不会因为调用方法而阻塞)的结果 。
这意味着 Future 是一个暂时还不可知值的处理器,这个值在计算完成后,可以通过调用它的 get 方法取得。因为这样的设计, getPriceAsync 方法才能立刻返回,给调用线程一个机会,能在同一时间去执行其他有价值的计算任务。
新的 CompletableFuture 类提供了大量的方法,让我们有机会以多种可能的方式轻松地实现这个方法,比如下面就是这样一段实现代码
【getPriceAsync方法的实现】
在这段代码中,创建了一个代表异步计算的 CompletableFuture 对象实例,它在计算完成时会包含计算的结果。
接着,调用 fork 创建了另一个线程去执行实际的价格计算工作,不等该耗时计算任务结束,直接返回一个 Future 实例。
当请求的产品价格最终计算得出时,你可以使用它的 complete 方法,结束completableFuture 对象的运行,并设置变量的值。
很显然,这个新版 Future 的名称也解释了它所具有的特性。使用这个API的客户端,可以通过下面的这段代码对其进行调用。
【使用异步的API】
我们看到这段代码中,客户向商店查询了某种商品的价格。由于商?提供了异步API,该次调用立刻返回了一个 Future 对象,通过该对象客户可以在将来的某个时刻取得商品的价格。
这种方式下,客户在进行商品价格查询的同时,还能执行一些其他的任务,比如查询其他家商店中商品的价格,不会呆呆的阻塞在那里等待第一家商店返回请求的结果。
最后,如果所有有意义的工作都已经完成,客户所有要执行的工作都依赖于商品价格时,再调用 Future 的 get 方法。执行了这个操作后,客户要么获得 Future 中封装的值(如果异步任务已经完成),要么发生阻塞,直到该异步任务完成,期望的值能够访问。
输出
你一定已经发现 getPriceAsync 方法的调用返回远远早于最终价格计算完成的时间。
我们有可能避免发生客户端被住阻塞的风险。实际上这非常简单, Future 执行完毕可以发出一个通知,仅在计算结果可用时执行一个由Lambda表达式或者方法引用定义的回
调函数。
不过,我们当下不会对此进行讨论,现在我们要解决的是另一个问题:如何正确地管理
异步任务执行过程中可能出现的错误。
处理异常错误
如果没有意外,我们目前开发的代码工作得很正常。但是,如果价格计算过程中产生了错误会怎样呢?非常不幸,这种情况下你会得到一个相当糟糕的结果:用于提示错误的异常会被限制在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会导致等待 get 方法返回结果的客户端永久的被阻塞。
客户端可以使用重载版本的 get 方法,它使用一个超时参数来避免发生这样的情况。这是一种值得推荐的做法,你应该尽量在你的代码中添加超时判断断的逻辑,避免发生类似的问题。
使用这种方法至少能防止程序永远的等待下去,超时发生时,程序会得到通知发生了 Timeout-Exception 。
不过,也因为如此,你不会有机会发现计算商品价格的线程内到底发生了什么问题才引发了这样的失效。
为了让客户端能了解商店无法提供请求商品价格的原因,你需要使用
CompletableFuture 的 completeExceptionally 方法将导致 CompletableFuture 内发生问题的异常抛出。
代码如下
【抛出CompletableFuture内的异常】
客户端现在会收到一个 ExecutionException 异常,该异常接收了一个包含失败原因的Exception 参数,即价格计算方法最初抛出的异常。
所以,举例来说,如果该方法抛出了一个运行时异常“product not available”,客户端就会得到像下面这样一段 ExecutionException :
java.util.concurrent.ExecutionException: java.lang.RuntimeException: product
not available at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2237)
at lambdasinaction.chap11.AsyncShopClient.main(AsyncShopClient.java:14)
... 5 more
Caused by: java.lang.RuntimeException: product not available
at lambdasinaction.chap11.AsyncShop.calculatePrice(AsyncShop.java:36)
at lambdasinaction.chap11.AsyncShop.lambda$getPrice$0(AsyncShop.java:23)
at lambdasinaction.chap11.AsyncShop$$Lambda$1/24071475.run(Unknown Source)
at java.lang.Thread.run(Thread.java:744)
以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。