Java8 使用CompletableFuture 构建异步应用方式

目录
  • 概述
  • 同步API VS 异步API
    • 同步API
    • 异步API
  • 同步的困扰
  • 实现异步API
    • 将同步方法改为异步方法
    • 处理异常错误

概述

为了展示 CompletableFuture 的强大特性, 创建一个名为 best-price-finder 的应用,它会查询多个在线商店,依据给定的产品或服务找出最低的价格。

这个过程中,会学到几个重要的技能。

  • 如何提供异步API
  • 如何让你使用了同步API的代码变为非阻塞代码

我们将共同学习如何使用流水线将两个接续的异步操作合并为一个异步计算操作。 比如,在线商店返回了你想要购买的商品的原始价格,并附带着一个折扣代码——最终,要计算出该商品的实际价格,你不得不访问第二个远程折扣服务,查询该折扣代码对应的折扣比率

  • 如何以响应式的方式处理异步操作的完成事件,以及随着各个商品返回它的商品价格,最佳价格查询器如何持续的更新每种商品的最佳推荐,而不是等待所有的商店都返回他们各自的价格(这种方式存在着一定的风险,一旦某家商店的服务中断,用户可能遭遇白屏)。

同步API VS 异步API

同步API

是对传统方法的另一种称呼:你调用了某个方法,调用方在被调用方运行的过程中会等待,被调用方运行结束返回,调用方取的了被调用方的返回值并继续运行。

即使调用方和被调用方在不同的线程中运行,调用方还是需要等被调用方结束运行,这就是 阻塞式调用。

异步API

与同步API相反,异步API会直接返回,或者至少在被调用方计算完成之前,将它剩余的计算任务交给另一个线程去做,该线程和调用方是异步的。 这就是非阻塞调用。

执行剩余的计算任务的线程将他的计算结果返回给调用方。 返回的方式要么通过回调函数,要么由调用方再此执行一个“等待,指导计算完成”的方法调用。

同步的困扰

为了实现最佳价格查询器应用,让我们从每个商店都应该提供的API定义入手。

首先,商店应该声明依据指定产品名称返回价格的方法:

public class Shop {
	public double getPrice(String product) {
	// TODO
	}
}

该方法的内部实现会查询商店的数据库,但也有可能执行一些其他耗时的任务,比如联系其他外部服务。

用 delay 方法模拟这些长期运行的方法的执行,模拟执行1S ,方法声明如下。

public static void delay() {
	try {
		Thread.sleep(1000L);
	} catch (InterruptedException e) {
		throw new RuntimeException(e);
	}
}

getPrice 方法会调用 delay 方法,并返回一个随机计算的值

public double getPrice(String product) {
	return calculatePrice(product);
}
private double calculatePrice(String product) {
	delay();
	return random.nextDouble() * product.charAt(0) + product.charAt(1);
}

很明显,这个API的使用者(这个例子中为最佳价格查询器)调用该方法时,它依旧会被阻塞。为等待同步事件完成而等待1S,这是无法接受的,尤其是考虑到最佳价格查询器对网络中的所有商店都要重复这种操作。

接下来我们会了解如何以异步方式使用同步API解决这个问题。但是,出于学习如何设计异步API的考虑, 你希望以异步API的方式重写这段代码, 假装我们还在深受这一困难的烦恼,如何以异步API的方式重写这段代码,让用户更流畅地访问呢?

实现异步API

将同步方法改为异步方法

为了实现这个目标,你首先需要将 getPrice 转换为 getPriceAsync 方法,并修改它的返回值:

public Future<Double> getPriceAsync(String product) { ... }

我们知道 ,Java 5引入了 java.util.concurrent.Future 接口表示一个异步计算(即调用线程可以继续运行,不会因为调用方法而阻塞)的结果 。

这意味着 Future 是一个暂时还不可知值的处理器,这个值在计算完成后,可以通过调用它的 get 方法取得。因为这样的设计, getPriceAsync 方法才能立刻返回,给调用线程一个机会,能在同一时间去执行其他有价值的计算任务。

新的 CompletableFuture 类提供了大量的方法,让我们有机会以多种可能的方式轻松地实现这个方法,比如下面就是这样一段实现代码

【getPriceAsync方法的实现】

在这段代码中,创建了一个代表异步计算的 CompletableFuture 对象实例,它在计算完成时会包含计算的结果。

接着,调用 fork 创建了另一个线程去执行实际的价格计算工作,不等该耗时计算任务结束,直接返回一个 Future 实例。

当请求的产品价格最终计算得出时,你可以使用它的 complete 方法,结束completableFuture 对象的运行,并设置变量的值。

很显然,这个新版 Future 的名称也解释了它所具有的特性。使用这个API的客户端,可以通过下面的这段代码对其进行调用。

【使用异步的API】

我们看到这段代码中,客户向商店查询了某种商品的价格。由于商?提供了异步API,该次调用立刻返回了一个 Future 对象,通过该对象客户可以在将来的某个时刻取得商品的价格。

这种方式下,客户在进行商品价格查询的同时,还能执行一些其他的任务,比如查询其他家商店中商品的价格,不会呆呆的阻塞在那里等待第一家商店返回请求的结果。

最后,如果所有有意义的工作都已经完成,客户所有要执行的工作都依赖于商品价格时,再调用 Future 的 get 方法。执行了这个操作后,客户要么获得 Future 中封装的值(如果异步任务已经完成),要么发生阻塞,直到该异步任务完成,期望的值能够访问。

输出

你一定已经发现 getPriceAsync 方法的调用返回远远早于最终价格计算完成的时间。

我们有可能避免发生客户端被住阻塞的风险。实际上这非常简单, Future 执行完毕可以发出一个通知,仅在计算结果可用时执行一个由Lambda表达式或者方法引用定义的回
调函数。

不过,我们当下不会对此进行讨论,现在我们要解决的是另一个问题:如何正确地管理
异步任务执行过程中可能出现的错误。

处理异常错误

如果没有意外,我们目前开发的代码工作得很正常。但是,如果价格计算过程中产生了错误会怎样呢?非常不幸,这种情况下你会得到一个相当糟糕的结果:用于提示错误的异常会被限制在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会导致等待 get 方法返回结果的客户端永久的被阻塞。

客户端可以使用重载版本的 get 方法,它使用一个超时参数来避免发生这样的情况。这是一种值得推荐的做法,你应该尽量在你的代码中添加超时判断断的逻辑,避免发生类似的问题。

使用这种方法至少能防止程序永远的等待下去,超时发生时,程序会得到通知发生了 Timeout-Exception 。

不过,也因为如此,你不会有机会发现计算商品价格的线程内到底发生了什么问题才引发了这样的失效。

为了让客户端能了解商店无法提供请求商品价格的原因,你需要使用
CompletableFuture 的 completeExceptionally 方法将导致 CompletableFuture 内发生问题的异常抛出。

代码如下

【抛出CompletableFuture内的异常】

客户端现在会收到一个 ExecutionException 异常,该异常接收了一个包含失败原因的Exception 参数,即价格计算方法最初抛出的异常。

所以,举例来说,如果该方法抛出了一个运行时异常“product not available”,客户端就会得到像下面这样一段 ExecutionException :

java.util.concurrent.ExecutionException: java.lang.RuntimeException: product
not available at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2237)
at lambdasinaction.chap11.AsyncShopClient.main(AsyncShopClient.java:14)
... 5 more
Caused by: java.lang.RuntimeException: product not available
at lambdasinaction.chap11.AsyncShop.calculatePrice(AsyncShop.java:36)
at lambdasinaction.chap11.AsyncShop.lambda$getPrice$0(AsyncShop.java:23)
at lambdasinaction.chap11.AsyncShop$$Lambda$1/24071475.run(Unknown Source)
at java.lang.Thread.run(Thread.java:744)

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • Java8新的异步编程方式CompletableFuture实现

    一. Future JDK 5引入了Future模式.Future接口是Java多线程Future模式的实现,在java.util.concurrent包中,可以来进行异步计算. Future模式是多线程设计常用的一种设计模式.Future模式可以理解成:我有一个任务,提交给了Future,Future替我完成这个任务.期间我自己可以去做任何想做的事情.一段时间之后,我就便可以从Future那儿取出结果. Future的接口很简单,只有五个方法. public interface Future<

  • Java并发 CompletableFuture异步编程的实现

    前面我们不止一次提到,用多线程优化性能,其实不过就是将串行操作变成并行操作.如果仔细观察,你还会发现在串行转换成并行的过程中,一定会涉及到异步化,例如下面的示例代码,现在是串行的,为了提升性能,我们得把它们并行化. // 以下两个方法都是耗时操作 doBizA(); doBizB(); //创建两个子线程去执行就可以了,两个操作已经被异步化了. new Thread(()->doBizA()) .start(); new Thread(()->doBizB()) .start(); 异步化,是

  • Java8 CompletableFuture详解

    Java 8来了,是时候学一下新的东西了.Java 7和Java 6只不过是稍作修改的版本,而Java 8将会发生重大的改进.或许是Java 8太大了吧?今天我会给你彻底地解释JDK 8中的新的抽象 – CompletableFuture.众所周知,Java 8不到一年就会发布,因此这篇文章是基于JDK 8 build 88 with lambda support的.CompletableFuture extends Future提供了方法,一元操作符和促进异步性以及事件驱动编程模型,它并不止步

  • Java8 CompletableFuture 异步执行操作

    目录 1.简介 2.异步执行 3.守护线程 4.处理执行结果 1.简介 CompletableFuture 是 JDK8 提供的一个异步执行工具. 示例1: public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { for (int i

  • Java8 使用CompletableFuture 构建异步应用方式

    目录 概述 同步API VS 异步API 同步API 异步API 同步的困扰 实现异步API 将同步方法改为异步方法 处理异常错误 概述 为了展示 CompletableFuture 的强大特性, 创建一个名为 best-price-finder 的应用,它会查询多个在线商店,依据给定的产品或服务找出最低的价格. 这个过程中,会学到几个重要的技能. 如何提供异步API 如何让你使用了同步API的代码变为非阻塞代码 我们将共同学习如何使用流水线将两个接续的异步操作合并为一个异步计算操作. 比如,在

  • Java8通过CompletableFuture实现异步回调

    目录 1 什么是CompletableFuture? 2 为什么会有CompletableFuture ? 3 CompletableFuture 简单使用 4 CompletableFuture 源码分析 4.1 创建异步任务 4.2 异步任务回调 4.3 异步任务组合 前言: java5为我们提供了Callable和Future,使我们可以很容易的完成异步任务结果的获取,但是通过Future的get获取异步任务结果会导致主线程的阻塞,这样在某些场景下是非常消耗CPU资源的,进而Java8为我

  • Java8 自定义CompletableFuture的原理解析

    目录 Java8 自定义CompletableFuture原理 CompleteFuture简单使用 下面简单介绍用法 Java8 自定义CompletableFuture原理 Future 接口 的局限性有很多,其中一个就是需要主动的去询问是否完成,如果等子线程的任务完成以后,通知我,那岂不是更好? public class FutureInAction3 { public static void main(String[] args) { Future<String> future = i

  • Java8中CompletableFuture使用场景与实现原理

    目录 1.概述 2.为什么引入CompletableFuture 3.功能 3.源码追踪 4.总结 1.概述 CompletableFuture是jdk1.8引入的实现类.扩展了Future和CompletionStage,是一个可以在任务完成阶段触发一些操作Future.简单的来讲就是可以实现异步回调. 2.为什么引入CompletableFuture 对于jdk1.5的Future,虽然提供了异步处理任务的能力,但是获取结果的方式很不优雅,还是需要通过阻塞(或者轮训)的方式.如何避免阻塞呢?

  • Java8中CompletableFuture的用法全解

    目录 前言 一.创建异步任务 1.Future.submit 2.supplyAsync / runAsync 二.异步回调 1.thenApply / thenApplyAsync 2.thenAccept / thenRun 3. exceptionally 4.whenComplete 5.handle 三.组合处理 1.thenCombine / thenAcceptBoth / runAfterBoth 2.applyToEither / acceptEither / runAfter

  • 深入理解spring boot异步调用方式@Async

    本文主要给大家介绍了关于spring boot异步调用方式@Async的相关内容,分享出来供大家参考学习,下面来一起看看详细的介绍: 1.使用背景 在日常开发的项目中,当访问其他人的接口较慢或者做耗时任务时,不想程序一直卡在耗时任务上,想程序能够并行执行,我们可以使用多线程来并行的处理任务,也可以使用spring提供的异步处理方式@Async. 2.异步处理方式 调用之后,不返回任何数据. 调用之后,返回数据,通过Future来获取返回数据 3.@Async不返回数据 使用@EnableAsyn

  • java8新特性 stream流的方式遍历集合和数组操作

    前言: 在没有接触java8的时候,我们遍历一个集合都是用循环的方式,从第一条数据遍历到最后一条数据,现在思考一个问题,为什么要使用循环,因为要进行遍历,但是遍历不是唯一的方式,遍历是指每一个元素逐一进行处理(目的),而并不是从第一个到最后一个顺次处理的循环,前者是目的,后者是方式. 所以为了让遍历的方式更加优雅,出现了流(stream)! 1.流的目的在于强掉做什么 假设一个案例:将集合A根据条件1过滤为子集B,然后根据条件2过滤为子集C 在没有引入流之前我们的做法可能为: public cl

  • vue中异步数据获取方式(确保数据被获取)

    目录 vue中异步数据获取 1.获取异步数据,通过async/await限制 2.将一个方法的返回值 vue处理数据(同步,异步)问题简单记录 情况介绍 最终解决方法:new Promise() vue中异步数据获取 1.获取异步数据,通过async/await限制 import { fetchList } from '@/api/article' //在created中即开始获取 created() {     this.getList() } methods:{     async getL

  • extJS中常用的4种Ajax异步提交方式

    /** 复制代码 代码如下: * 第一种Ajax提交方式 * 这种方式需要直接使用ext Ajax方法进行提交 * 使用这种方式,需要将待传递的参数进行封装 * @return */ function saveUser_ajaxSubmit1() { Ext.Ajax.request( { url : 'user_save.action', method : 'post', params : { userName : document.getElementById('userName').val

随机推荐