Java CompletableFuture实现原理分析详解

目录
  • 简介
  • CompletableFuture类结构
  • CompletableFuture回调原理
  • CompletableFuture异步原理
  • 总结

简介

前面的一篇文章你知道Java8并发新特性CompletableFuture吗?介绍了CompletableFuture的特性以及一些使用方法,今天我们主要来聊一聊CompletableFuture的回调功能以及异步工作原理是如何实现的。

CompletableFuture类结构

1.CompletableFuture类结构主要有两个属性

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    volatile Object result;       // Either the result or boxed AltResult
    volatile Completion stack;    // Top of Treiber stack of dependent actions
    ...
}
  • result:存储CompletableFuture的返回,或者存储异常对象AltResult。
  • stack:是CompletableFuture.Completion对象,表示操作数栈栈顶,在进行CompletableFuture链式调用的过程中,所有链式调用的CompletableFuture任务都会被压入该stack中,在任务调用的过程按后进先出的顺序出栈执行完所有任务。

2.stack属性栈结构

abstract static class Completion extends ForkJoinTask<Void>
    implements Runnable, AsynchronousCompletionTask {
    volatile Completion next;      // Treiber stack link
    ...
}

next:存储下一个任务链式调用栈。

3.UniCompletion的内部结构

abstract static class UniCompletion<T,V> extends Completion {
    Executor executor;                 // executor to use (null if none)
    CompletableFuture<V> dep;          // the dependent to complete
    CompletableFuture<T> src;          // source for action
    ...
}

UniCompletion继承Completion类,包含以下几个参数:

  • executor:异步任务执行器,如果为空则有主线程执行任务不进行异步执行。
  • dep:指向当前任务构建的CompletabueFuture
  • src:指向源CompletableFuture任务

CompletableFuture回调原理

这里为了方便讲解,我们用以下简短的代码来进行分析:

public static void main(String[] args) {
    CompletableFuture<String> baseFuture = CompletableFuture.completedFuture("Base Future");
    log.info(baseFuture.thenApply((r) -> r + " Then Apply").join());
    baseFuture.thenAccept((r) -> log.info(r)).thenAccept((Void) -> log.info("Void"));
}

上面的代码我们通过创建一个简单的CompletableFuture对象,再执行baseFuture.thenApply()调用后会进行一个入栈操作,如下图baseFuture引用的CompletableFuturestack属性将会指向baseFuture.thenApply()结果返回的新CompletableFuture对象,而新CompletableFuture对象的src属性将指向来源CompletableFuturebaseFuture所引用的对象。

2.在上一步的基础上再执行下一行代码,结果的引用关系图如下图:

在执行完baseFuture.thenAccept()的时候,thenAccept返回的任务将被压入栈顶,next指向上一个代码段的返回对象,在thenAccept返回的新CompletableFuture对象中在进行一次thenAccept的调用,就再产生一个新的CompletableFuture对象,dept属性就指向最新的CompletableFuture对象。

thenApply实现源码分析

public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}

private <V> CompletableFuture<V> uniApplyStage(
    Executor e, Function<? super T,? extends V> f) {
    if (f == null) throw new NullPointerException();
    // 创建一个新的CompletableFuture对象
    CompletableFuture<V> d =  new CompletableFuture<V>();
    // e:如果是异步调用直接执行代码块
    // !d.uniApply:执行任务,如果返回false则任务未执行需入栈
    if (e != null || !d.uniApply(this, f, null)) {
        UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
        // 创建出新的UniApply对象进行入栈
        push(c);
        // 尝试执行任务
        c.tryFire(SYNC);
    }
    return d;
}

final <S> boolean uniApply(CompletableFuture<S> a,
                           Function<? super S,? extends T> f,
                           UniApply<S,T> c) {
    Object r; Throwable x;
    // 任务未完成结果为null直接返回false
    if (a == null || (r = a.result) == null || f == null)
        return false;
    // 验证是否出现异常结果,如有则任务执行结束
    tryComplete: if (result == null) {
        if (r instanceof AltResult) {
            if ((x = ((AltResult)r).ex) != null) {
                completeThrowable(x, r);
                break tryComplete;
            }
            r = null;
        }
        try {
            // 异步执行任务
            if (c != null && !c.claim())
                // 任务未执行返回false
                return false;
            @SuppressWarnings("unchecked") S s = (S) r;
            // 任务执行完成将结果写入result
            completeValue(f.apply(s));
        } catch (Throwable ex) {
            completeThrowable(ex);
        }
    }
    return true;
}

以上代码片段主要描述了CompletableFuture在执行任务时会创建出新的CompletableFuture对象,使用新对象执行任务并获取结果使用CAS写入到result属性,如果任务未执行将压入栈顶,再重新尝试任务执行,在CompletableFuture其他方法的调用也都大同小异,这里不在逐一分析,可自行打开源码阅读便于理解。

CompletableFuture异步原理

需要进行CompletableFuture异步调用则要使用Async结尾的方法执行任务,这里我们就拿thenAcceptAsync()的源码进行分析。

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
    return uniAcceptStage(asyncPool, action);
}

private CompletableFuture<Void> uniAcceptStage(Executor e, Consumer<? super T> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    // 如果是异步任务,这里的参数e不会为空,也就是会将任务执行压入栈顶
    if (e != null || !d.uniAccept(this, f, null)) {
        UniAccept<T> c = new UniAccept<T>(e, d, this, f);
        push(c);
        // 重点还是这个入口
        c.tryFire(SYNC);
    }
    return d;
}

static final class UniAccept<T> extends UniCompletion<T,Void> {
    Consumer<? super T> fn;
    UniAccept(Executor executor, CompletableFuture<Void> dep,
              CompletableFuture<T> src, Consumer<? super T> fn) {
        super(executor, dep, src); this.fn = fn;
    }
    final CompletableFuture<Void> tryFire(int mode) {
        CompletableFuture<Void> d; CompletableFuture<T> a;
        // dep为空即任务已被执行过,直接返回null
        // uniAccept()结果为false,可能是任务执行中未完成,也可能是由线程池中的其他任务执行完成
        if ((d = dep) == null || !d.uniAccept(a = src, fn, mode > 0 ? null : this))
            return null;
        dep = null; src = null; fn = null;
        // 说明当前线程执行了该任务,返回结果继续执行前一个任务
        return d.postFire(a, mode);
    }
}

final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
    if (a != null && a.stack != null) {
        // postComplete调用过来的,或者上一个任务执行完成,清空栈数据,否则调用postComplete完成任务
        if (mode < 0 || a.result == null)
            a.cleanStack();
        else
            // 完成任务执行并进行出栈
            a.postComplete();
    }
    if (result != null && stack != null) {
        if (mode < 0)
            // postComplete调用过来的任务已完成
            return this;
        else
            // 完成任务执行并进行出栈
            postComplete();
    }
    return null;
}

CompletableFuture进行异步主要是通过将任务压入栈顶后tryFire方法进行异步处理,如果任务未被执行则会通过postFire方法有线程池中的线程进行任务执行,任务执行结果再使用CAS将结果返回到result,其他线程即可得知任务是否被执行过,如果当前现场判断当前任务为被执行,则调用postComplete()执行完成任务。

总结

CompletableFuture通过异步回调的方式,解决了开发过程中异步调用获取结果的难点。开发人员只需接触到CompletableFuture对象,以及CompletableFuture任务的执行结果,无需设计具体异步回调的实现,并可通过自定义线程池进一步优化任务的异步调用。

到此这篇关于Java CompletableFuture实现原理分析详解的文章就介绍到这了,更多相关Java CompletableFuture内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java中CompletableFuture 的详细介绍

    目录 1.概述 1.0 创建 CompletableFuture 的对象的工厂方法 1.1 non-async 和 async 区别 1.1.1 non-async 示例:注册 action 的时候任务可能已经结束 1.1.2 non-async 示例:注册 action 的时候任务未完成 1.2 Run 类方法 1.3 Accept 类方法 1.4 Apply 类方法 2 单个任务执行完成后执行一个动作(action) 2.0 示例 exceptionally 3 两个任务执行编排 4 多个任

  • Java CompletableFuture的使用详解

    CompletableFuture​ 它代表某个同步或异步计算的一个阶段.你可以把它理解为是一个为了产生有价值最终结果的计算的流水线上的一个单元.这意味着多个指令可以链接起来从而一个阶段的完成可以触发下一个阶段的执行. 任务开启 supplyAsync 开启一个子线程去执行有返回结果 开启一个子线程用来执行执行事务,可以通过返回值的join来得到返回值. 例如: print("去煮饭了"); CompletableFuture<String> completableFutu

  • Java8并发新特性CompletableFuture

    目录 1.CompletableFuture是什么? 2.CompletableFuture的方法使用说明 2.1 CompletableFuture类提供几个静态方法来进行异步操作 2.2 获取异步任务执行结果的方法 get()/join() 3.CompletionStage的方法使用说明 3.1 纯消费类型 3.2 有返回值类型 3.3 不消费也不返回类型 3.4 组合类型 3.5 任务事件类型 4.CompletionStage异常处理方法 5.方法类型总结 1.CompletableF

  • Java8 自定义CompletableFuture的原理解析

    目录 Java8 自定义CompletableFuture原理 CompleteFuture简单使用 下面简单介绍用法 Java8 自定义CompletableFuture原理 Future 接口 的局限性有很多,其中一个就是需要主动的去询问是否完成,如果等子线程的任务完成以后,通知我,那岂不是更好? public class FutureInAction3 { public static void main(String[] args) { Future<String> future = i

  • Java8中CompletableFuture使用场景与实现原理

    目录 1.概述 2.为什么引入CompletableFuture 3.功能 3.源码追踪 4.总结 1.概述 CompletableFuture是jdk1.8引入的实现类.扩展了Future和CompletionStage,是一个可以在任务完成阶段触发一些操作Future.简单的来讲就是可以实现异步回调. 2.为什么引入CompletableFuture 对于jdk1.5的Future,虽然提供了异步处理任务的能力,但是获取结果的方式很不优雅,还是需要通过阻塞(或者轮训)的方式.如何避免阻塞呢?

  • Java CompletableFuture实现原理分析详解

    目录 简介 CompletableFuture类结构 CompletableFuture回调原理 CompletableFuture异步原理 总结 简介 前面的一篇文章你知道Java8并发新特性CompletableFuture吗?介绍了CompletableFuture的特性以及一些使用方法,今天我们主要来聊一聊CompletableFuture的回调功能以及异步工作原理是如何实现的. CompletableFuture类结构 1.CompletableFuture类结构主要有两个属性 pub

  • java方法及this关键字原理分析详解

    目录 步骤1 .给顾客增加一个吃饭的方法 步骤 2 . 没有加static的属性和方法,一定需要先new对象 步骤 3 . 用new出来的对象去执行eat方法 步骤 4 . 怎么理解c.eat() 步骤 5 . 消息接受器 步骤 6 . 如果有两个顾客? 步骤 7 . 答案 步骤 8 .其实有个this 步骤 9 . 在eat方法里面直接使用this 步骤 10 . 构造方法 步骤 11 . 总结:this的意义是什么? 步骤 12 . 道理我都懂,那static又是什么? 步骤 13 . 本节

  • Java中注解与原理分析详解

    目录 一.注解基础 二.注解原理 三.常用注解 1.JDK注解 2.Lombok注解 四.自定义注解 1.同步控制 2.类型引擎 一.注解基础 注解即标注与解析,在Java的代码工程中,注解的使用几乎是无处不在,甚至多到被忽视: 无论是在JDK源码或者框架组件,都在使用注解能力完成各种识别和解析动作:在对系统功能封装时,也会依赖注解能力简化各种逻辑的重复实现: 基础接口 在Annotation的源码注释中有说明:所有的注解类型都需要继承该公共接口,本质上看注解是接口,但是代码并没有显式声明继承关

  • Java 递归重难点分析详解与练习

    目录 递归是什么 分析递归的过程 递归练习 按顺序打印一个数的每一位 递归是什么 就是一个方法在执行的时候,自己调用自己. 递归的要求: 1 有一个趋近于终止的条件 2 实现递归要去推导出一个递推公式 递归就是递下去,归上来.求 5 的阶乘,代码举例: public static int fact(int n){ if(n == 1){ return n; } return n*fact(n - 1); } public static void main(String[] args) { int

  • JAVA线程池原理实例详解

    本文实例讲述了JAVA线程池原理.分享给大家供大家参考,具体如下: 线程池的优点 1.线程是稀缺资源,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以重复使用. 2.可以根据系统的承受能力,调整线程池中工作线程的数量,防止因为消耗过多内存导致服务器崩溃. 线程池的创建 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQu

  • MyBatis Plus插件机制与执行流程原理分析详解

    MyBatis Plus插件 MyBatis Plus提供了分页插件PaginationInterceptor.执行分析插件SqlExplainInterceptor.性能分析插件PerformanceInterceptor以及乐观锁插件OptimisticLockerInterceptor. Mybatis 通过插件 (Interceptor) 可以做到拦截四大对象相关方法的执行 ,根据需求完成相关数据的动态改变. 四大对象是: Executor StatementHandler Parame

  • 深入java内存查看与分析详解

    1:gc日志输出在jvm启动参数中加入 -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimestamps -XX:+PrintGCApplicationStopedTime,jvm将会按照这些参数顺序输出gc概要信息,详细信息,gc时间信息,gc造成的应用暂停时间.如果在刚才的参数后面加入参数 -Xloggc:文件路径,gc信息将会输出到指定的文件中.其他参数还有-verbose:gc和-XX:+PrintTenuringDistribution等.

  • MySQL主从延迟现象及原理分析详解

    一.现象 凌晨对线上一张表添加索引,表数据量太大(1亿+数据,数据量50G以上),造成主从延迟几个小时,各个依赖从库的系统无法查询数据,最终影响业务. 现在就梳理下主从延迟的原理. 二.原理 根据 MySQL 官方文档 MySQL Replication Implementation Details 中的描述,MySQL 主从复制依赖于三个线程:master一个线程(Binlog dump thread),slave两个线程(I/O thread和SQL thread).主从复制流程如下图: m

  • java多态机制原理特点详解

    java多态机制是什么 java中实现多态的机制是依靠父类或接口的引用指向子类.从而实现了一个对象多种形态的特性.其中父类的引用是在程序运行时动态的指向具体的实例,调用该引用的方法时,不是根据引用变量的类型中定义的方法来运行,而是根据具体的实例的方法. 概念 多态就是指一个引用变量倒底会指向哪个类的实例对象,该引用变量发出的方法调用到底是哪个类中实现的方法,必须在由程序运行期间才能决定. 因为在程序运行时才确定具体的类,这样,不用修改源程序代码,就可以让引用变量绑定到各种不同的类实现上,从而导致

  • Java逃逸分析详解及代码示例

    概念引入 我们都知道,Java 创建的对象都是被分配到堆内存上,但是事实并不是这么绝对,通过对Java对象分配的过程分析,可以知道有两个地方会导致Java中创建出来的对象并一定分别在所认为的堆上.这两个点分别是Java中的逃逸分析和TLAB(Thread Local Allocation Buffer)线程私有的缓存区. 基本概念介绍 逃逸分析,是一种可以有效减少Java程序中同步负载和内存堆分配压力的跨函数全局数据流分析算法.通过逃逸分析,Java Hotspot编译器能够分析出一个新的对象的

随机推荐