CompletableFuture 异步编排示例详解

目录
  • 从Future聊起
  • CompletableFuture
    • 创建异步任务
    • 异步回调
    • 异步编排
    • 串行
    • AND
    • OR
    • Future 机制扩展
  • CompletableFuture 实践

从Future聊起

Future是java 1.5引入的异步编程api,它表示一个异步计算结果,提供了获取异步结果的能力,解决了多线程场景下Runnable线程任务无法获取结果的问题。

但是其获取异步结果的方式并不够优雅,我们必须使用Future.get的方式阻塞调用线程,或者使用轮询方式判断 Future.isDone 任务是否结束,再获取结果。

public interface Future<V> {

    //任务是否完成
    boolean isDone();

    //阻塞调用线程获取异步结果
    V get() throws InterruptedException, ExecutionException;

   //在指定时间内阻塞线程获取异步结果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

假如存在多个异步任务相互依赖,一个或多个异步线程任务需要依赖上一个异步线程任务结果,并且多个异步任务能够组合结果,显然这种阻塞线程的方式并不能优雅解决。

我们更希望能够提供一种异步回调的方式,组合各种异步任务,而无需开发者对多个异步任务结果的监听编排。

为了解决优化上述问题,java8 新增了CompletableFutureAPI ,其大大扩展了Future能力,并提供了异步任务编排能力。

CompletableFuture

CompletableFuture实现了新的接口CompletionStage,并扩展了Future接口。查看类图

创建异步任务

CompletableFuture 提供了四种方法去创建一个异步任务。

  • static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier):创建一个有返回值的异步任务实例
  • static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor):创建一个有返回值的异步任务实例,可以指定线程池
  • static CompletableFuture<Void> runAsync(Runnable runnable):创建一个无返回值的任务实例
  • static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor):创建一个无返回值的任务实例,允许指定线程池

着几个方法本质上是有返回值和无返回值两种类型方法,supply方法可以获取异步结果,而run方法则无返回值,根据需要使用。

同时两种类型的方法均提供了指定线程池的重载,如果不指定线程池会默认使用ForkJoinPool.commonPool(),默认线程数为cpu核心数,建议使用自定义线程池的方式,避免线程资源竞争

一个简单样例

        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> { System.out.println("无返回值任务"); });
        runAsync.get();
        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "hello completableFuture");
        String result = supplyAsync.get();
        System.out.println(result);

我们依然可以通过get()方法阻塞获取异步结果任务,但是CompletableFuture主要还是用于异步回调及异步任务编排使用。

异步回调

在任务执行结束后我们希望能够自动触发回调方法,CompletableFuture提供了两种方法实现。

  • CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action):当上一阶段任务执行结束后,回调方法接受上一阶段结果或者异常,返回上一阶段任务结果
  • <U> CompletableFuture<U> handle( BiFunction<? super T, Throwable, ? extends U> fn):当上一阶段任务执行结束后,回调方法接受上一阶段结果或者异常,并最终返回回调方法处理结果
  • CompletableFuture<T> exceptionally( Function<Throwable, ? extends T> fn):上一阶段任务出现异常后的回调,返回结果是回调函数的返回结果。

whenComplete 与 handle 区别:两者均接受上一阶段任务结果或异常,但是whenComplete 回调中没有返回值,所以其结果是上一阶段任务,而handle 最终返回的是其回调方法方法,其主要是BiConsumerBiFunction的区别。

异步编排

CompletionStage表示异步计算的一个阶段,当一个计算处理完成后会触发其他依赖的阶段。当然一个阶段的触发也可以是由多个阶段的完成触发或者多个中的任意一个完成触发。该接口定义了异步任务编排的各种场景,CompletableFuture则实现了这些场景。

可以把这些场景大致分为三类:串行、AND和OR。下面会逐个分析各个场景,接口中定义的以Async结尾的方法,指下一阶段任务会被单独提交到线程池中执行,后面不在赘述。

串行

当上一阶段任务执行完毕后,继续提交执行其他任务

  • <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn):接收上一阶段任务结果,并可获取返回值。
  • CompletableFuture<Void> thenAccept(Consumer<? super T> action):接收上一阶段任务结果,无返回值。
  • CompletableFuture<Void> thenRun(Runnable action):不接收上一阶段任务结果,并且无返回值。

T:上一个任务返回结果的类型 U:当前任务的返回值类型

AND

组合多个异步任务,当多个任务执行完毕继续执行其他任务

  • <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn):上一阶段任务与other任务均执行结束,接收两个任务的结果,并可获取返回值
  • <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn): 使用上一阶段任务的结果,返回一个新的CompletableFuture实例
  • <U> CompletableFuture<Void> thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action):上一阶段任务与other任务均执行结束,接收两个任务的结果,无返回值
  • CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action):上一阶段任务与other任务均执行结束,不接收两个任务的结果,无返回值
  • static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs):等待所有异步任务执行结束

T:上一个任务返回结果的类型 U:上一个other任务的返回值类型 V:当前任务返回值

OR

当多个任务中任意任务执行完成则继续执行其他任务。

  • <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn): 接收上一阶段任务与other任务最快执行完成的结果,并可获取返回值
  • CompletableFuture<Void> acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action):接收上一阶段任务与other任务最快执行完成的结果,无返回值
  • CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action):上一阶段任务与other任务任意任务完成执行,不接收结果,无返回值
  • static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs):组合多个任务,返回最快执行结束的任务结果

Future 机制扩展

CompletableFuture不仅实现了Future接口,同时对其进行了扩展,提供了更加优雅的实现。

  • T join() :与get()方法用法一致,阻塞调用线程获取结果,但是不会抛出具体异常,简化了使用上下文
  • T getNow(T valueIfAbsent):当任务结束返回任务结果,否则返回给定的结果valueIfAbsent。
  • boolean complete(T value):当任务未结束时设置给定的结果value并结束任务,已结束的任务不会生效。
  • boolean completeExceptionally(Throwable ex):当任务未结束时设置异常结果并结束任务,已结束的任务不会生效

CompletableFuture 实践

我们通过CompletableFuture实现一个经典的烧水程序。

我们可以把这个流程分为三个异步任务。

任务1:洗水壶->烧水

任务2:洗水壶->洗茶杯->拿茶叶

任务3:泡茶,需要等待任务1与任务2结束。

通过代码模拟实现

        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("洗水壶");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            return "水壶";
        }).thenApply(e->{
            System.out.println("烧水");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            return "热水";
        });
        //洗水壶->洗水杯->拿茶叶
        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("洗茶壶");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            return "茶壶";
        }).thenApply(e->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            System.out.println("洗水杯");
            return "水杯";
        }).thenApply(e->{
            System.out.println("拿茶叶");
            return "茶叶";
        });
        //泡茶
        CompletableFuture<String> task3 = task1.thenCombine(task2, (a, b) -> {
            System.out.println("泡茶");
            return "茶";
        });
        String tea = task3.join();
        System.out.println(tea);

以上就是CompletableFuture 异步编排示例详解的详细内容,更多关于CompletableFuture 异步编排的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java8 自定义CompletableFuture的原理解析

    目录 Java8 自定义CompletableFuture原理 CompleteFuture简单使用 下面简单介绍用法 Java8 自定义CompletableFuture原理 Future 接口 的局限性有很多,其中一个就是需要主动的去询问是否完成,如果等子线程的任务完成以后,通知我,那岂不是更好? public class FutureInAction3 { public static void main(String[] args) { Future<String> future = i

  • 详解Java8 CompletableFuture的并行处理用法

    目录 前言 场景 用法 1.在线API 2.编写在线API查询 3.编写查询服务 4.编写测试接口 5.效果 6.CompletableFuture并行查询 7.编写测试接口 8.CompletableFuture效果 思考 前言 工作中你可能会遇到很多这样的场景,一个接口,要从其他几个service调用查询方法,分别获取到需要的值之后再封装数据返回. 还可能在微服务中遇到类似的情况,某个服务的接口,要使用好几次feign去调用其他服务的方法获取数据,最后拿到想要的值并封装返回给前端. 这样的场

  • Java8通过CompletableFuture实现异步回调

    目录 1 什么是CompletableFuture? 2 为什么会有CompletableFuture ? 3 CompletableFuture 简单使用 4 CompletableFuture 源码分析 4.1 创建异步任务 4.2 异步任务回调 4.3 异步任务组合 前言: java5为我们提供了Callable和Future,使我们可以很容易的完成异步任务结果的获取,但是通过Future的get获取异步任务结果会导致主线程的阻塞,这样在某些场景下是非常消耗CPU资源的,进而Java8为我

  • Java8中CompletableFuture使用场景与实现原理

    目录 1.概述 2.为什么引入CompletableFuture 3.功能 3.源码追踪 4.总结 1.概述 CompletableFuture是jdk1.8引入的实现类.扩展了Future和CompletionStage,是一个可以在任务完成阶段触发一些操作Future.简单的来讲就是可以实现异步回调. 2.为什么引入CompletableFuture 对于jdk1.5的Future,虽然提供了异步处理任务的能力,但是获取结果的方式很不优雅,还是需要通过阻塞(或者轮训)的方式.如何避免阻塞呢?

  • 详解Java CompletableFuture使用方法以及与FutureTask的区别

    目录 futureTask 创建异步任务 创建任务 1. .supplyAsync 2. .runAsync 异步回调 1. .thenApply 2. .thenAccept 3. .exceptionally 4. .whenComplete 组合处理 总的来说简洁了FutureTask与线程池的配合使用 没啥太大区别吧我觉得, 使用方法不一样, 多了一些方法 ??? futureTask 创建异步任务 FutureTask<String> stringFutureTask = new F

  • Java8 使用工厂方法supplyAsync创建CompletableFuture实例

    目录 使用工厂方法 supplyAsync创建 CompletableFuture 对比 对CompletableFuture async的理解 目前为止我们已经了解了如何通过编程创建 CompletableFuture 对象以及如何获取返回值,虽然看起来这些操作已经比较方便,但还有进一步提升的空间, CompletableFuture 类自身提供了大量精巧的工厂方法,使用这些方法能更容易地完成整个流程,还不用担心实现的细节. 可以看到我们使用new Thread的方式,显然是不恰当的. 使用工

  • CompletableFuture 异步编排示例详解

    目录 从Future聊起 CompletableFuture 创建异步任务 异步回调 异步编排 串行 AND OR Future 机制扩展 CompletableFuture 实践 从Future聊起 Future是java 1.5引入的异步编程api,它表示一个异步计算结果,提供了获取异步结果的能力,解决了多线程场景下Runnable线程任务无法获取结果的问题. 但是其获取异步结果的方式并不够优雅,我们必须使用Future.get的方式阻塞调用线程,或者使用轮询方式判断 Future.isDo

  • java CompletableFuture异步任务编排示例详解

    目录 前言 同步串行 异步串行 并行任务 多任务结果合并计算 任一任务完成 快速失败 注意 前言 在之前的项目开发中,都没怎么使用过CompletableFuture的功能,只听说过和异步编程有关.为了能够在将来有需要的时候用得上,这两天花了点时间学习了一下,并简单地总结一下如何使用CompletableFuture完成异步任务编排. 先创建一个自定义的线程池,后续所有代码都会使用到: private static final ThreadPoolExecutor THREAD_POOL_EXE

  • C#异步调用示例详解

    本文实例为大家分享了C#异步调用的具体代码,供大家参考,具体内容如下 using System; using System.Collections.Generic; using System.Text; using System.Runtime.InteropServices; using System.Threading.Tasks; namespace AsyncAppTest { ////异步调用示例详解 /// 第1步:定义委托:此委托的返回值.参数类型必须与要调用的异步方法一致: //

  • Vue3中如何使用异步请求示例详解

    目录 1.前言 2.快速开始 2.1.思路 2.2.安装&封装axios 2.3.设计接口 2.4.设计视图 2.5.最终效果 总结 1.前言 接上节,我们初步体验了layui-vue的用法.相比其他ui框架,layui-vue的数据结构显得不是非常友好,但是经过数据拼凑也是能够成功运行的. 今天我们就主要介绍下在实际开发中最常用到的前后端接口交互.因为大多数时候前端为了高性能,对于后端接口的调用都会采用异步的方式.那该如何在vue3中使用异步请求渲染页面呢? 2.快速开始 2.1.思路 预期:

  • java异步编程CompletableFuture使用示例详解

    目录 一.简单介绍 二.常见操作 1.使用默认线程池 2.使用自定义线程池 3.获取线程的执行结果 三.处理异步结算的结果 四.异常处理 五.组合 CompletableFuture 六.并行运行多个 CompletableFuture 七.案例 1.从多个平台获取书价格 2.从任意一个平台获取结果就返回 一.简单介绍 CompletableFuture 同时实现了 Future 和 CompletionStage 接口. public class CompletableFuture<T> i

  • Python网络爬虫中的同步与异步示例详解

    一.同步与异步 #同步编程(同一时间只能做一件事,做完了才能做下一件事情) <-a_url-><-b_url-><-c_url-> #异步编程 (可以近似的理解成同一时间有多个事情在做,但有先后) <-a_url-> <-b_url-> <-c_url-> <-d_url-> <-e_url-> <-f_url-> <-g_url-> <-h_url-> <--i_ur

  • Kotlin Coroutines执行异步加载示例详解

    前言 Kotlin Coroutines是Kotlin推出的新的异步API.并不是解决所有问题的最优方案,但是希望在许多情况下它会使事情变得更容易一些.这里只简单的展示一下这个库在安卓中的具体使用方案.下面话不多说了,来一起看看详细的介绍吧. 引入Coroutines //在application的build.gradle文件中的android节点添加如下的代码 kotlin { experimental { coroutines 'enable' } } //添加下面两行到依赖中 implem

  • Python异步爬虫多线程与线程池示例详解

    目录 背景 异步爬虫方式 多线程,多进程(不建议) 线程池,进程池(适当使用) 单线程+异步协程(推荐) 多线程 线程池 背景 当对多个url发送请求时,只有请求完第一个url才会接着请求第二个url(requests是一个阻塞的操作),存在等待的时间,这样效率是很低的.那我们能不能在发送请求等待的时候,为其单独开启进程或者线程,继续请求下一个url,执行并行请求 异步爬虫方式 多线程,多进程(不建议) 好处:可以为相关阻塞的操作单独开启线程或者进程,阻塞操作就可以异步会执行 弊端:不能无限制开

  • Awaitility同步异步工具实战示例详解

    目录 引言 1. awaitility入门 1.1 静态导入 1.2 简单例子 2. awaitility在RocketMQ中的实战 3. 总结 引言 在编写测试用例的时候遇到有异步或者队列处理的时候经常会用到 Thread.sleep() 等待来进行测试.例如:DLedger 测试选举的过程.当DLedger Leader下线.此时DLedger会重新发起选举,这个选举的过程是需要一定时间.很多时候在测试代码中就会使用 Thread.sleep . 由于选举需要的时间多少不确定所以sleep时

  • UE4 Unlua 调用异步蓝图节点AIMoveTo函数示例详解

    目录 引言 源码分析 Unlua代码实现 引言 异步蓝图节点:在蓝图节点的右上角有时钟图标. 注意:异步节点可以在EventGraph/Macros中使用,但是无法在蓝图函数中使用. AIMoveTo节点:实现AI自主寻路,且能异步回调执行成功或失败的委托函数,且返回移动结果枚举值. 源码分析 AIMoveTo 蓝图节点对应C++的基类为 UK2Node_AIMoveToUK2Node_AIMoveTo 继承至异步Task节点基类 UK2Node_BaseAsyncTask,并在构造函数中完成了

随机推荐