Java CompletableFuture 异步超时实现深入研究

目录
  • 前言
  • 常见使用方式
  • 存在的问题
    • 分析
    • 现有做法
  • 解决方式
    • JDK 9
    • JDK 8

前言

作者:京东科技 张天赐

JDK 8 是一次重大的版本升级,新增了非常多的特性,其中之一便是 CompletableFuture。自此从 JDK 层面真正意义上的支持了基于事件的异步编程范式,弥补了 Future 的缺陷。

在我们的日常优化中,最常用手段便是多线程并行执行。这时候就会涉及到 CompletableFuture 的使用。

常见使用方式

下面举例一个常见场景。

假如我们有两个 RPC 远程调用服务,我们需要获取两个 RPC 的结果后,再进行后续逻辑处理。

public static void main(String[] args) {
    // 任务 A,耗时 2 秒
    int resultA = compute(1);
    // 任务 B,耗时 2 秒
    int resultB = compute(2);
    // 后续业务逻辑处理
    System.out.println(resultA + resultB);
}

可以预估到,串行执行最少耗时 4 秒,并且 B 任务并不依赖 A 任务结果。

对于这种场景,我们通常会选择并行的方式优化,Demo 代码如下:

public static void main(String[] args) {
    // 仅简单举例,在生产代码中可别这么写!
    // 统计耗时的函数
    time(() -> {
        CompletableFuture<Integer> result = Stream.of(1, 2)
                                                  // 创建异步任务
                                                  .map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
                                                  // 聚合
                                                  .reduce(CompletableFuture.completedFuture(0), (x, y) -> x.thenCombineAsync(y, Integer::sum, executor));
        // 等待结果
        try {
            System.out.println("结果:" + result.get());
        } catch (ExecutionException | InterruptedException e) {
            System.err.println("任务执行异常");
        }
    });
}
输出:
[async-1]: 任务执行开始:1
[async-2]: 任务执行开始:2
[async-1]: 任务执行完成:1
[async-2]: 任务执行完成:2
结果:3
耗时:2 秒

可以看到耗时变成了 2 秒。

存在的问题

分析

看上去 CompletableFuture 现有功能可以满足我们诉求。但当我们引入一些现实常见情况时,一些潜在的不足便暴露出来了。

compute(x) 如果是一个根据入参查询用户某类型优惠券列表的任务,我们需要查询两种优惠券并组合在一起返回给上游。假如上游要求我们 2 秒内处理完毕并返回结果,但 compute(x) 耗时却在 0.5 秒 ~ 无穷大波动。这时候我们就需要把耗时过长的 compute(x) 任务结果放弃,仅处理在指定时间内完成的任务,尽可能保证服务可用。

那么以上代码的耗时由耗时最长的服务决定,无法满足现有诉求。通常我们会使用 get(long timeout, TimeUnit unit) 来指定获取结果的超时时间,并且我们会给 compute(x) 设置一个超时时间,达到后自动抛异常来中断任务。

public static void main(String[] args) {
    // 仅简单举例,在生产代码中可别这么写!
    // 统计耗时的函数
    time(() -> {
        List<CompletableFuture<Integer>> result = Stream.of(1, 2)
                                                        // 创建异步任务,compute(x) 超时抛出异常
                                                        .map(x -> CompletableFuture.supplyAsync(() -> compute(x), executor))
                                                        .toList();
        // 等待结果
        int res = 0;
        for (CompletableFuture<Integer> future : result) {
            try {
                res += future.get(2, SECONDS);
            } catch (ExecutionException | InterruptedException | TimeoutException e) {
                System.err.println("任务执行异常或超时");
            }
        }
        System.out.println("结果:" + res);
    });
}
输出:
[async-2]: 任务执行开始:2
[async-1]: 任务执行开始:1
[async-1]: 任务执行完成:1
任务执行异常或超时
结果:1
耗时:2 秒

可以看到,只要我们能够给 compute(x) 设置一个超时时间将任务中断,结合 getgetNow 等获取结果的方式,就可以很好地管理整体耗时。

那么问题也就转变成了,如何给任务设置异步超时时间呢

现有做法

当异步任务是一个 RPC 请求时,我们可以设置一个 JSF 超时,以达到异步超时效果。

当请求是一个 R2M 请求时,我们也可以控制 R2M 连接的最大超时时间来达到效果。

这么看好像我们都是在依赖三方中间件的能力来管理任务超时时间?那么就存在一个问题,中间件超时控制能力有限,如果异步任务是中间件 IO 操作 + 本地计算操作怎么办?

用 JSF 超时举一个具体的例子,反编译 JSF 的获取结果代码如下:

public V get(long timeout, TimeUnit unit) throws InterruptedException {
    // 配置的超时时间
    timeout = unit.toMillis(timeout);
    // 剩余等待时间
    long remaintime = timeout - (this.sentTime - this.genTime);
    if (remaintime <= 0L) {
        if (this.isDone()) {
            // 反序列化获取结果
            return this.getNow();
        }
    } else if (this.await(remaintime, TimeUnit.MILLISECONDS)) {
        // 等待时间内任务完成,反序列化获取结果
        return this.getNow();
    }
    this.setDoneTime();
    // 超时抛出异常
    throw this.clientTimeoutException(false);
}

当这个任务刚好卡在超时边缘完成时,这个任务的耗时时间就变成了超时时间 + 获取结果时间。而获取结果(反序列化)作为纯本地计算操作,耗时长短受 CPU 影响较大。

某些 CPU 使用率高的情况下,就会出现异步任务没能触发抛出异常中断,导致我们无法准确控制超时时间。对上游来说,本次请求全部失败。

解决方式

JDK 9

这类问题非常常见,如大促场景,服务器 CPU 瞬间升高就会出现以上问题。

那么如何解决呢?其实 JDK 的开发大佬们早有研究。在 JDK 9,CompletableFuture 正式提供了 orTimeoutcompleteTimeout 方法,来准确实现异步超时控制。

public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {
    if (unit == null)
        throw new NullPointerException();
    if (result == null)
        whenComplete(new Canceller(Delayer.delay(new Timeout(this), timeout, unit)));
    return this;
}

JDK 9 orTimeout 其实现原理是通过一个定时任务,在给定时间之后抛出异常。如果任务在指定时间内完成,则取消抛异常的操作。

以上代码我们按执行顺序来看下:

首先执行 new Timeout(this)

static final class Timeout implements Runnable {
    final CompletableFuture<?> f;
    Timeout(CompletableFuture<?> f) { this.f = f; }
    public void run() {
        if (f != null && !f.isDone())
            // 抛出超时异常
            f.completeExceptionally(new TimeoutException());
    }
}

通过源码可以看到,Timeout 是一个实现 Runnable 的类,run() 方法负责给传入的异步任务通过 completeExceptionally CAS 赋值异常,将任务标记为异常完成。

那么谁来触发这个 run() 方法呢?我们看下 Delayer 的实现。

static final class Delayer {
    static ScheduledFuture<?> delay(Runnable command, long delay,
                                    TimeUnit unit) {
        // 到时间触发 command 任务
        return delayer.schedule(command, delay, unit);
    }
    static final class DaemonThreadFactory implements ThreadFactory {
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName("CompletableFutureDelayScheduler");
            return t;
        }
    }
    static final ScheduledThreadPoolExecutor delayer;
    static {
        (delayer = new ScheduledThreadPoolExecutor(
            1, new DaemonThreadFactory())).
            setRemoveOnCancelPolicy(true);
    }
}

Delayer 其实就是一个单例定时调度器,Delayer.delay(new Timeout(this), timeout, unit) 通过 ScheduledThreadPoolExecutor 实现指定时间后触发 Timeout 的 run() 方法。

到这里就已经实现了超时抛出异常的操作。但当任务完成时,就没必要触发 Timeout 了。因此我们还需要实现一个取消逻辑。

static final class Canceller implements BiConsumer<Object, Throwable> {
    final Future<?> f;
    Canceller(Future<?> f) { this.f = f; }
    public void accept(Object ignore, Throwable ex) {
        if (ex == null && f != null && !f.isDone())
        // 3 未触发抛异常任务则取消
            f.cancel(false);
    }
}

当任务执行完成,或者任务执行异常时,我们也就没必要抛出超时异常了。因此我们可以把 delayer.schedule(command, delay, unit) 返回的定时超时任务取消,不再触发 Timeout。 当我们的异步任务完成,并且定时超时任务未完成的时候,就是我们取消的时机。因此我们可以通过 whenComplete(BiConsumer<? super T, ? super Throwable> action) 来完成。

Canceller 就是一个 BiConsumer 的实现。其持有了 delayer.schedule(command, delay, unit) 返回的定时超时任务,accept(Object ignore, Throwable ex) 实现了定时超时任务未完成后,执行 cancel(boolean mayInterruptIfRunning) 取消任务的操作。

JDK 8

如果我们使用的是 JDK 9 或以上,我们可以直接用 JDK 的实现来完成异步超时操作。那么 JDK 8 怎么办呢?

其实我们也可以根据上述逻辑简单实现一个工具类来辅助。

以下是我们营销自己的工具类以及用法,贴出来给大家作为参考,大家也可以自己写的更优雅一些~

调用方式:

CompletableFutureExpandUtils.orTimeout(异步任务, 超时时间, 时间单位);

工具类源码:

package com.jd.jr.market.reduction.util;
import com.jdpay.market.common.exception.UncheckedException;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
/**
 * CompletableFuture 扩展工具
 *
 * @author zhangtianci7
 */
public class CompletableFutureExpandUtils {
    /**
     * 如果在给定超时之前未完成,则异常完成此 CompletableFuture 并抛出 {@link TimeoutException} 。
     *
     * @param timeout 在出现 TimeoutException 异常完成之前等待多长时间,以 {@code unit} 为单位
     * @param unit    一个 {@link TimeUnit},结合 {@code timeout} 参数,表示给定粒度单位的持续时间
     * @return 入参的 CompletableFuture
     */
    public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit) {
        if (null == unit) {
            throw new UncheckedException("时间的给定粒度不能为空");
        }
        if (null == future) {
            throw new UncheckedException("异步任务不能为空");
        }
        if (future.isDone()) {
            return future;
        }
        return future.whenComplete(new Canceller(Delayer.delay(new Timeout(future), timeout, unit)));
    }
    /**
     * 超时时异常完成的操作
     */
    static final class Timeout implements Runnable {
        final CompletableFuture<?> future;
        Timeout(CompletableFuture<?> future) {
            this.future = future;
        }
        public void run() {
            if (null != future && !future.isDone()) {
                future.completeExceptionally(new TimeoutException());
            }
        }
    }
    /**
     * 取消不需要的超时的操作
     */
    static final class Canceller implements BiConsumer<Object, Throwable> {
        final Future<?> future;
        Canceller(Future<?> future) {
            this.future = future;
        }
        public void accept(Object ignore, Throwable ex) {
            if (null == ex && null != future && !future.isDone()) {
                future.cancel(false);
            }
        }
    }
    /**
     * 单例延迟调度器,仅用于启动和取消任务,一个线程就足够
     */
    static final class Delayer {
        static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {
            return delayer.schedule(command, delay, unit);
        }
        static final class DaemonThreadFactory implements ThreadFactory {
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                t.setName("CompletableFutureExpandUtilsDelayScheduler");
                return t;
            }
        }
        static final ScheduledThreadPoolExecutor delayer;
        static {
            delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
            delayer.setRemoveOnCancelPolicy(true);
        }
    }
}

参考资料 JEP 266: JDK 9 并发包更新提案

以上就是Java CompletableFuture 异步超时实现深入研究的详细内容,更多关于Java CompletableFuture 异步超时的资料请关注我们其它相关文章!

(0)

相关推荐

  • 详解Java中Callable和Future的区别

    目录 Java中为什么需要Callable Callable和Runnable的区别 Future和RunnableFuture 不使用Callable和Future,仅使用Runnable实现相同功能 Java中为什么需要Callable 在java中有两种创建线程的方法: 一种是继承Thread类,重写run方法: public class TestMain { public static void main(String[] args) { MyThread t1 = new MyThre

  • java异步编程CompletableFuture使用示例详解

    目录 一.简单介绍 二.常见操作 1.使用默认线程池 2.使用自定义线程池 3.获取线程的执行结果 三.处理异步结算的结果 四.异常处理 五.组合 CompletableFuture 六.并行运行多个 CompletableFuture 七.案例 1.从多个平台获取书价格 2.从任意一个平台获取结果就返回 一.简单介绍 CompletableFuture 同时实现了 Future 和 CompletionStage 接口. public class CompletableFuture<T> i

  • java CompletableFuture异步任务编排示例详解

    目录 前言 同步串行 异步串行 并行任务 多任务结果合并计算 任一任务完成 快速失败 注意 前言 在之前的项目开发中,都没怎么使用过CompletableFuture的功能,只听说过和异步编程有关.为了能够在将来有需要的时候用得上,这两天花了点时间学习了一下,并简单地总结一下如何使用CompletableFuture完成异步任务编排. 先创建一个自定义的线程池,后续所有代码都会使用到: private static final ThreadPoolExecutor THREAD_POOL_EXE

  • Java CompletableFuture 异步超时实现深入研究

    目录 前言 常见使用方式 存在的问题 分析 现有做法 解决方式 JDK 9 JDK 8 前言 作者:京东科技 张天赐 JDK 8 是一次重大的版本升级,新增了非常多的特性,其中之一便是 CompletableFuture.自此从 JDK 层面真正意义上的支持了基于事件的异步编程范式,弥补了 Future 的缺陷. 在我们的日常优化中,最常用手段便是多线程并行执行.这时候就会涉及到 CompletableFuture 的使用. 常见使用方式 下面举例一个常见场景. 假如我们有两个 RPC 远程调用

  • 浅谈java中异步多线程超时导致的服务异常

    在项目中为了提高大并发量时的性能稳定性,经常会使用到线程池来做多线程异步操作,多线程有2种,一种是实现runnable接口,这种没有返回值,一种是实现Callable接口,这种有返回值. 当其中一个线程超时的时候,理论上应该不 影响其他线程的执行结果,但是在项目中出现的问题表明一个线程阻塞,其他线程返回的接口都为空.其实是个很简单的问题,但是由于第一次碰到,还是想了一些时间的.很简单,就是因为阻塞的那个线 程没有释放,并发量一大,线程池数量就满了,所以其他线程都处于等待状态. 附上一段自己写的调

  • Java并发 CompletableFuture异步编程的实现

    前面我们不止一次提到,用多线程优化性能,其实不过就是将串行操作变成并行操作.如果仔细观察,你还会发现在串行转换成并行的过程中,一定会涉及到异步化,例如下面的示例代码,现在是串行的,为了提升性能,我们得把它们并行化. // 以下两个方法都是耗时操作 doBizA(); doBizB(); //创建两个子线程去执行就可以了,两个操作已经被异步化了. new Thread(()->doBizA()) .start(); new Thread(()->doBizB()) .start(); 异步化,是

  • Java CompletableFuture实现多线程异步编排

    目录 一 :问题背景 二 :CompletableFuture介绍 三 :具体场景 1.0 单个任务 1.0.1 runAsync:无返回值 1.0.2 supplyAsync:有返回值 1.0.3 supplyAsync:有返回值 2.0 两个任务编排 2.0.1 thenRunAsync 2.0.2 thenAcceptAsync 2.0.3 thenApplyAsync 3.0 三任务编排 3.0.1 三任务组合 3.0.2 三任务组合二 4.0 多任务的编排 4.0.1.allOf:所有

  • Java Servlet异步请求开启的简单步骤

    目录 1. 背景 2. Servlet同步请求 3. Servlet异步请求 4. 异步Servlet使用方法 5. Spring中的实现例子 附:异步对象监听器 总结 1. 背景 在研究长轮询的实现过程,有使用到Servlet3的异步请求.下面就来学习一下Servlet3的异步请求 现在Servlet的版本已经到了5 2. Servlet同步请求 以Tomcat服务器为例: Http请求到达Tomcat Tomcat从线程池中取出线程处理到达Tomcat的请求 将请求Http解析为HttpSe

  • Java实现任务超时处理方法

    任务超时处理是比较常见的需求,比如在进行一些比较耗时的操作(如网络请求)或者在占用一些比较宝贵的资源(如数据库连接)时,我们通常需要给这些操作设置一个超时时间,当执行时长超过设置的阈值的时候,就终止操作并回收资源.Java中对超时任务的处理有两种方式:一种是基于异步任务结果的超时获取,一种则是使用延时任务来终止超时操作.下文将详细说明. 一.基于异步任务结果的超时获取 基于异步任务结果的获取通常是跟线程池一起使用的,我们向线程池提交任务时会返回一个Future对象,在调用Future的get方法

  • Java CompletableFuture的使用详解

    CompletableFuture​ 它代表某个同步或异步计算的一个阶段.你可以把它理解为是一个为了产生有价值最终结果的计算的流水线上的一个单元.这意味着多个指令可以链接起来从而一个阶段的完成可以触发下一个阶段的执行. 任务开启 supplyAsync 开启一个子线程去执行有返回结果 开启一个子线程用来执行执行事务,可以通过返回值的join来得到返回值. 例如: print("去煮饭了"); CompletableFuture<String> completableFutu

  • 详解Java CompletableFuture使用方法以及与FutureTask的区别

    目录 futureTask 创建异步任务 创建任务 1. .supplyAsync 2. .runAsync 异步回调 1. .thenApply 2. .thenAccept 3. .exceptionally 4. .whenComplete 组合处理 总的来说简洁了FutureTask与线程池的配合使用 没啥太大区别吧我觉得, 使用方法不一样, 多了一些方法 ??? futureTask 创建异步任务 FutureTask<String> stringFutureTask = new F

  • 详解Java中异步转同步的六种方法

    目录 一.问题 应用场景 二.分析 三.实现方法 1.轮询与休眠重试机制 2.wait/notify 3.Lock Condition 4.CountDownLatch 5.CyclicBarrier 6.LockSupport 一.问题 应用场景 应用中通过框架发送异步命令时,不能立刻返回命令的执行结果,而是异步返回命令的执行结果. 那么,问题来了,针对应用中这种异步调用,能不能像同步调用一样立刻获取到命令的执行结果,如何实现异步转同步? 二.分析 首先,解释下同步和异步 同步,就是发出一个调

随机推荐