Java并发 CompletableFuture异步编程的实现

前面我们不止一次提到,用多线程优化性能,其实不过就是将串行操作变成并行操作。如果仔细观察,你还会发现在串行转换成并行的过程中,一定会涉及到异步化,例如下面的示例代码,现在是串行的,为了提升性能,我们得把它们并行化。

// 以下两个方法都是耗时操作
doBizA();
doBizB();

//创建两个子线程去执行就可以了,两个操作已经被异步化了。
new Thread(()->doBizA())
 .start();
new Thread(()->doBizB())
 .start(); 

异步化,是并行方案得以实施的基础,更深入地讲其实就是:利用多线程优化性能这个核心方案得以实施的基础。Java 在 1.8 版本提供了 CompletableFuture 来支持异步编程。

CompletableFuture 的核心优势

为了领略 CompletableFuture 异步编程的优势,这里我们用 CompletableFuture 重新实现前面曾提及的烧水泡茶程序。首先还是需要先完成分工方案,在下面的程序中,我们分了 3 个任务:任务 1 负责洗水壶、烧开水,任务 2 负责洗茶壶、洗茶杯和拿茶叶,任务 3 负责泡茶。其中任务 3 要等待任务 1 和任务 2 都完成后才能开始。这个分工如下图所示。

烧水泡茶分工方案

// 任务 1:洗水壶 -> 烧开水
CompletableFuture<Void> f1 =
 CompletableFuture.runAsync(()->{
 System.out.println("T1: 洗水壶...");
 sleep(1, TimeUnit.SECONDS);

 System.out.println("T1: 烧开水...");
 sleep(15, TimeUnit.SECONDS);
});
// 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
CompletableFuture<String> f2 =
 CompletableFuture.supplyAsync(()->{
 System.out.println("T2: 洗茶壶...");
 sleep(1, TimeUnit.SECONDS);

 System.out.println("T2: 洗茶杯...");
 sleep(2, TimeUnit.SECONDS);

 System.out.println("T2: 拿茶叶...");
 sleep(1, TimeUnit.SECONDS);
 return " 龙井 ";
});
// 任务 3:任务 1 和任务 2 完成后执行:泡茶
CompletableFuture<String> f3 =
 f1.thenCombine(f2, (__, tf)->{
  System.out.println("T1: 拿到茶叶:" + tf);
  System.out.println("T1: 泡茶...");
  return " 上茶:" + tf;
 });
// 等待任务 3 执行结果
System.out.println(f3.join());

void sleep(int t, TimeUnit u) {
 try {
  u.sleep(t);
 }catch(InterruptedException e){}
}
// 一次执行结果:
T1: 洗水壶...
T2: 洗茶壶...
T1: 烧开水...
T2: 洗茶杯...
T2: 拿茶叶...
T1: 拿到茶叶: 龙井
T1: 泡茶...
上茶: 龙井

从整体上来看,我们会发现

  • 无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
  • 语义更清晰,例如f3 = f1.thenCombine(f2, ()->{}) 能够清晰地表述“任务 3 要等待任务 1 和任务 2 都完成后才能开始”;
  • 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。

领略 CompletableFuture 异步编程的优势之后,下面我们详细介绍 CompletableFuture 的使用。

创建 CompletableFuture 对象

创建 CompletableFuture 对象主要靠下面代码中展示的这 4 个静态方法,我们先看前两个。在烧水泡茶的例子中,我们已经使用了runAsync(Runnable runnable)supplyAsync(Supplier<U> supplier),它们之间的区别是:Runnable 接口的 run() 方法没有返回值,而 Supplier 接口的 get() 方法是有返回值的。

前两个方法和后两个方法的区别在于:后两个方法可以指定线程池参数。

默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰

// 使用默认线程池
static CompletableFuture<Void>
 runAsync(Runnable runnable)
static <U> CompletableFuture<U>
 supplyAsync(Supplier<U> supplier)
// 可以指定线程池
static CompletableFuture<Void>
 runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U>
 supplyAsync(Supplier<U> supplier, Executor executor) 

创建完 CompletableFuture 对象之后,会自动地异步执行 runnable.run() 方法或者 supplier.get() 方法,对于一个异步操作,你需要关注两个问题:一个是异步操作什么时候结束,另一个是如何获取异步操作的执行结果。因为 CompletableFuture 类实现了 Future 接口,所以这两个问题你都可以通过 Future 接口来解决。另外,CompletableFuture 类还实现了 CompletionStage 接口,这个接口内容实在是太丰富了,在 1.8 版本里有 40 个方法,这些方法我们该如何理解呢?

理解 CompletionStage 接口

可以站在分工的角度类比一下工作流。任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等。这样说可能有点抽象,这里还举前面烧水泡茶的例子,其中洗水壶和烧开水就是串行关系,洗水壶、烧开水和洗茶壶、洗茶杯这两组任务之间就是并行关系,而烧开水、拿茶叶和泡茶就是汇聚关系。

串行关系

并行关系

汇聚关系

CompletionStage 接口可以清晰地描述任务之间的这种时序关系,例如前面提到的
f3 = f1.thenCombine(f2, ()->{}) 描述的就是一种汇聚关系。烧水泡茶程序中的汇聚关系是一种 AND 聚合关系,这里的 AND 指的是所有依赖的任务(烧开水和拿茶叶)都完成后才开始执行当前任务(泡茶)。既然有 AND 聚合关系,那就一定还有 OR 聚合关系,所谓 OR 指的是依赖的任务只要有一个完成就可以执行当前任务。

最后就是异常,CompletionStage 接口也可以方便地描述异常处理。

下面我们就来一一介绍,CompletionStage 接口如何描述串行关系、AND 聚合关系、OR 聚合关系以及异常处理。

1. 描述串行关系

CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。

thenApply 系列函数里参数 fn 的类型是接口 Function<T, R>,这个接口里与 CompletionStage 相关的方法是R apply(T t),这个方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的是CompletionStage<R>

而 thenAccept 系列方法里参数 consumer 的类型是接口Consumer<T>,这个接口里与 CompletionStage 相关的方法是void accept(T t),这个方法虽然支持参数,但却不支持回值,所以 thenAccept 系列方法返回的是CompletionStage<Void>

thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也是CompletionStage<Void>

这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。其中,需要你注意的是 thenCompose 系列方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的。

CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);

通过下面的示例代码,你可以看一下 thenApply() 方法是如何使用的。首先通过 supplyAsync() 启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。

CompletableFuture<String> f0 =
 CompletableFuture.supplyAsync(
  () -> "Hello World")   //①
 .thenApply(s -> s + " QQ") //②
 .thenApply(String::toUpperCase);//③

System.out.println(f0.join());
// 输出结果
HELLO WORLD QQ

2. 描述 AND 汇聚关系

CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。

CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);

3. 描述 OR 汇聚关系

CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。

CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);
CompletableFuture<String> f1 =
 CompletableFuture.supplyAsync(()->{
  int t = getRandom(5, 10);
  sleep(t, TimeUnit.SECONDS);
  return String.valueOf(t);
});

CompletableFuture<String> f2 =
 CompletableFuture.supplyAsync(()->{
  int t = getRandom(5, 10);
  sleep(t, TimeUnit.SECONDS);
  return String.valueOf(t);
});

CompletableFuture<String> f3 =
 f1.applyToEither(f2,s -> s);

System.out.println(f3.join());

4. 异常处理

虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常 ,例如下面的代码,执行

CompletableFuture<Integer>
 f0 = CompletableFuture.
  .supplyAsync(()->(7/0))
  .thenApply(r->r*10);
System.out.println(f0.join());

CompletionStage 接口给我们提供的方案非常简单,比 try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。

CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);

下面的示例代码展示了如何使用 exceptionally() 方法来处理异常,exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。

whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。

whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。

CompletableFuture<Integer>
 f0 = CompletableFuture
  .supplyAsync(()->7/0))
  .thenApply(r->r*10)
  .exceptionally(e->0);
System.out.println(f0.join());

总结

不过最近几年,伴随着 ReactiveX 的发展(Java 语言的实现版本是 RxJava),回调地狱已经被完美解决了,Java 语言也开始官方支持异步编程:在 1.8 版本提供了 CompletableFuture,在 Java 9 版本则提供了更加完备的 Flow API,异步编程目前已经完全工业化。

CompletableFuture 已经能够满足简单的异步编程需求,如果你对异步编程感兴趣,可以重点关注 RxJava 这个项目,利用 RxJava,即便在 Java 1.6 版本也能享受异步编程的乐趣。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java8新的异步编程方式CompletableFuture实现

    一. Future JDK 5引入了Future模式.Future接口是Java多线程Future模式的实现,在java.util.concurrent包中,可以来进行异步计算. Future模式是多线程设计常用的一种设计模式.Future模式可以理解成:我有一个任务,提交给了Future,Future替我完成这个任务.期间我自己可以去做任何想做的事情.一段时间之后,我就便可以从Future那儿取出结果. Future的接口很简单,只有五个方法. public interface Future<

  • Java并发 CompletableFuture异步编程的实现

    前面我们不止一次提到,用多线程优化性能,其实不过就是将串行操作变成并行操作.如果仔细观察,你还会发现在串行转换成并行的过程中,一定会涉及到异步化,例如下面的示例代码,现在是串行的,为了提升性能,我们得把它们并行化. // 以下两个方法都是耗时操作 doBizA(); doBizB(); //创建两个子线程去执行就可以了,两个操作已经被异步化了. new Thread(()->doBizA()) .start(); new Thread(()->doBizB()) .start(); 异步化,是

  • Java 8 的异步编程利器 CompletableFuture的实例详解

    目录 一个例子回顾Future 一个例子走进CompletableFuture CompletableFuture使用场景 创建异步任务 supplyAsync方法 runAsync方法 任务异步回调 1.thenRun/thenRunAsync 2.thenAccept/thenAcceptAsync 3.thenApply/thenApplyAsync 4.exceptionally 5.whenComplete方法 6.handle方法 多个任务组合处理 AND组合关系 OR组合的关系 区

  • python并发和异步编程实例

    关于并发.并行.同步阻塞.异步非阻塞.线程.进程.协程等这些概念,单纯通过文字恐怕很难有比较深刻的理解,本文就通过代码一步步实现这些并发和异步编程,并进行比较.解释器方面本文选择python3,毕竟python3才是python的未来,并且python3用原生的库实现协程已经非常方便了. 1.准备阶段 下面为所有测试代码所需要的包 #! python3 # coding:utf-8 import socket from concurrent import futures from selecto

  • java异步编程CompletableFuture使用示例详解

    目录 一.简单介绍 二.常见操作 1.使用默认线程池 2.使用自定义线程池 3.获取线程的执行结果 三.处理异步结算的结果 四.异常处理 五.组合 CompletableFuture 六.并行运行多个 CompletableFuture 七.案例 1.从多个平台获取书价格 2.从任意一个平台获取结果就返回 一.简单介绍 CompletableFuture 同时实现了 Future 和 CompletionStage 接口. public class CompletableFuture<T> i

  • Java并发编程必备之Future机制

    前言 Java 5在concurrency包中引入了java.util.concurrent.Callable 接口,它和Runnable接口很相似,但它可以返回一个对象或者抛出一个异常. Callable接口使用泛型去定义它的返回类型.Executors类提供了一些有用的方法在线程池中执行Callable内的任务.由于Callable任务是并行的,我们必须等待它返回的结果.而线程是属于异步计算模型,所以不可能直接从别的线程中得到函数返回值. java.util.concurrent.Futur

  • java异步编程的7种实现方式小结

    目录 同步编程 一.线程 Thread 二.Future 三.FutureTask 四.异步框架 CompletableFuture 五. SpringBoot 注解 @Async 六.Spring ApplicationEvent 事件 七.消息队列 最近有很多小伙伴给我留言,能不能总结下异步编程,今天就和大家简单聊聊这个话题. 早期的系统是同步的,容易理解,我们来看个例子 同步编程 当用户创建一笔电商交易订单时,要经历的业务逻辑流程还是很长的,每一步都要耗费一定的时间,那么整体的RT就会比较

  • Java 异步编程实践_动力节点Java学院整理

    什么是异步?为什么要用它? 异步编程提供了一个非阻塞的,事件驱动的编程模型. 这种编程模型利用系统中多核执行任务来提供并行,因此提供了应用的吞吐率.此处吞吐率是指在单位时间内所做任务的数量. 在这种编程方式下, 一个工作单元将独立于主应用线程而执行, 并且会将它的状态通知调用线程:成功,处理中或者失败. 我们需要异步来消除阻塞模型.其实异步编程模型可以使用同样的线程来处理多个请求, 这些请求不会阻塞这个线程.想象一个应用正在使用的线程正在执行任务, 然后等待任务完成才进行下一步. log框架就是

  • Java并发编程Callable与Future的应用实例代码

    本文主要探究的是java并发编程callable与future的使用,分享了相关实例代码,具体介绍如下. 我们都知道实现多线程有2种方式,一种是继承Thread,一种是实现Runnable,但这2种方式都有一个缺陷,在任务完成后无法获取返回结果.要想获得返回结果,就得使用Callable,Callable任务可以有返回值,但是没法直接从Callable任务里获取返回值:想要获取Callabel任务的返回值,需要用到Future.所以Callable任务和Future模式,通常结合起来使用. 试想

  • java异步编程详解

    很多时候我们都希望能够最大的利用资源,比如在进行IO操作的时候尽可能的避免同步阻塞的等待,因为这会浪费CPU的资源.如果在有可读的数据的时候能够通知程序执行读操作甚至由操作系统内核帮助我们完成数据的拷贝,这再好不过了.从NIO到CompletableFuture.Lambda.Fork/Join,java一直在努力让程序尽可能变的异步甚至拥有更高的并行度,这一点一些函数式语言做的比较好,因此java也或多或少的借鉴了某些特性.下面介绍一种非常常用的实现异步操作的方式. 考虑有一个耗时的操作,操作

随机推荐