Java CompletableFuture实现多线程异步编排

目录
  • 一 :问题背景
  • 二 :CompletableFuture介绍
  • 三 :具体场景
    • 1.0 单个任务
      • 1.0.1 runAsync:无返回值
      • 1.0.2 supplyAsync:有返回值
      • 1.0.3 supplyAsync:有返回值
    • 2.0 两个任务编排
      • 2.0.1 thenRunAsync
      • 2.0.2 thenAcceptAsync
      • 2.0.3 thenApplyAsync
    • 3.0 三任务编排
      • 3.0.1 三任务组合
      • 3.0.2 三任务组合二
    • 4.0 多任务的编排
      • 4.0.1、allOf:所有任务都执行完
      • 4.0.2、anyOf:其中有一个任务执行完就可以
  • 四: 一个实际的例子

一 :问题背景

问题:当查询接口较复杂时候,数据的获取都需要[远程调用],必然需要花费更多的时间。 假如查询文章详情页面,需要如下标注的时间才能完成,比如如下场景:

1. 查询文章详情 0.5s

2. 查询文章博主个人信息 0.5s

3. 查询文章评论 1s

4. 查询博主相关文章分类 1s

5. 相关推荐文章 1s

上面的描述只是举个例子不要在意这里的查询描述,看实际情况使用,有些相关的查询我们可以拆分接口实现,上面的描述只是为了举例子。

那么,用户需要4s后才能统计的数据。很显然是不能接受的。 如果有多个线程同时完成这4步操作,也许只需要1s左右即可完成响应。

二 :CompletableFuture介绍

在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。 CompletableFuture和FutureTask同属于Future接口的实现类,都可以获取线程的执行结果。

三 :具体场景

1.0 单个任务

1.0.1 runAsync:无返回值

/**
 * runAsync无返回值
 */
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
    System.out.println("当前线程" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
}, executor);

1.0.2 supplyAsync:有返回值

whenComplete:能感知异常,能感知结果,但没办法给返回值

exceptionally:能感知异常,不能感知结果,能给返回值。相当于,如果出现异常就返回这个值

/**
 * supplyAsync有返回值
 * whenComplete能感知异常,能感知结果,但没办法给返回值
 * exceptionally能感知异常,不能感知结果,能给返回值。相当于,如果出现异常就返回这个值
 */
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程" + Thread.currentThread().getId());
    int i = 10 / 0;
    System.out.println("运行结果:" + i);
    return i;
}, executor).whenComplete((res,excption)->{
    //whenComplete虽然能得到异常信息,但是没办法修改返回值
    System.out.println("异步任务成功完成...结果是:"+res+";异常是:"+excption);
}).exceptionally(throwable -> {
    //exceptionally能感知异常,而且能返回一个默认值,相当于,如果出现异常就返回这个值
    return 10;
});

1.0.3 supplyAsync:有返回值

handle能拿到返回结果,也能得到异常信息,也能修改返回值

/**
* supplyAsync有返回值
* handle能拿到返回结果,也能得到异常信息,也能修改返回值
*/
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程" + Thread.currentThread().getId());
    int i = 10 / 4;
    System.out.println("运行结果:" + i);
    return i;
}, executor).handle((res,excption)->{
    if(excption!=null){
        return 0;
    }else {
        return res * 2;
    }
});

2.0 两个任务编排

两任务组合(线程串行化)

可以是两任务的串行化,就是一个任务执行完了再执行下一个

也可以是多个任务的串行化,就是按照顺序一个个的执行

2.0.1 thenRunAsync

不能接收上一次的执行结果,也没返回值

        /**
         * thenRunXXX 不能接收上一次的执行结果,也没返回值
         * .thenRunAsync(() -> {
         *      System.out.println("任务2启动了...");
         * }, executor);
         */
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).thenRunAsync(() -> {
            System.out.println("任务2启动了...");
        }, executor);

2.0.2 thenAcceptAsync

能接收上一次的执行结果,但没返回值

    /**
     * thenAcceptXXX 能接收上一次的执行结果,但没返回值
     * .thenAcceptAsync(res->{
     *      System.out.println("任务2启动了..."+res);
     *  },executor);
     */
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("当前线程" + Thread.currentThread().getId());
        int i = 10 / 4;
        System.out.println("运行结果:" + i);
        return i;
    }, executor).thenAcceptAsync(res -> {
        System.out.println("任务2启动了..." + res);
    }, executor);

2.0.3 thenApplyAsync

能接收上一次的执行结果,又可以有返回值

        /**
         * thenApplyXXX 能接收上一次的执行结果,又可以有返回值
         * .thenApplyAsync(res -> {
         *      System.out.println("任务2启动了..." + res);
         *      return "hello " + res;
         *  }, executor);
         */
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() ->         {
            System.out.println("当前线程" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).thenApplyAsync(res -> {
            System.out.println("任务2启动了..." + res);
            return "hello " + res;
        }, executor);

3.0 三任务编排

先准备两个任务

       CompletableFuture<Object> future01 =CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程" + Thread.currentThread().getId());
            int i = 10 / 4;
            System.out.println("任务1结束:");
            return i;
        }, executor);
        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程" + Thread.currentThread().getId());
            try {
                Thread.sleep(3000);
                System.out.println("任务2结束:");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }, executor);

3.0.1 三任务组合

前两个任务都完成,才执行任务3

3.0.1-1、runAfterBothAsync:任务01 任务02都完成了,再开始执行任务3,不感知任务1、2的结果的,也没返回值

CompletableFuture<Void> future = future01.runAfterBothAsync(future02, () -> {
    System.out.println("任务3开始");
}, executor);

3.0.1-2、thenAcceptBothAsync:任务01 任务02都完成了,再开始执行任务3,能感知到任务1、2的结果,但没返回值

CompletableFuture<Void> future = future01.thenAcceptBothAsync(future02, (f1, f2) -> {
            System.out.println("任务3开始...得到之前的结果:f1:" + f1 + ", f2:" + f2);
        }, executor);

3.0.1-3、 thenCombineAsync:任务01 任务02都完成了,再开始执行任务3,能感知到任务1、2的结果,而且自己可以带返回值

 CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> {
            return f1+":"+f2+":哈哈";
        }, executor);

3.0.2 三任务组合二

前两个任务只要有一个完成,就执行任务3

3.0.2-1、runAfterEitherAsync:两个任务只要有一个完成,就执行任务3,不感知结果,自己没返回值

CompletableFuture<Void> future = future01.runAfterEitherAsync(future02, () -> {
            System.out.println("任务3开始...");
        }, executor);

3.0.2-2、 acceptEitherAsync:两个任务只要有一个完成,就执行任务3,感知结果,自己没返回值

CompletableFuture<Void> future = future01.acceptEitherAsync(future02, (res) -> {
        System.out.println("任务3开始...之前的结果" + res);
    }, executor);

3.0.2-3、applyToEitherAsync:两个任务只要有一个完成,就执行任务3,感知结果,自己有返回值

CompletableFuture<String> future = future01.applyToEitherAsync(future02, (res) -> {
        System.out.println("任务3开始...之前的结果" + res);
        return "任务3的结果...";
    }, executor);

4.0 多任务的编排

 /**
         * 多任务组合
         */
        CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> {
            System.out.println("查询商品图片信息");
            return "hello.jpg";
        },executor);
        CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
            System.out.println("查询商品属性信息");
            return "黑色+256G";
        },executor);
        CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
                System.out.println("查询商品介绍信息");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "华为...";
        },executor);

4.0.1、allOf:所有任务都执行完

        /**
         * allOf 所有任务都执行完
         */
        CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImg, futureAttr, futureDesc);
        allOf.get();//等待所有结果完成

4.0.2、anyOf:其中有一个任务执行完就可以

        /**
         * anyOf 其中有一个任务执行完就可以
         */
        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImg, futureAttr, futureDesc);
        anyOf.get();

四: 一个实际的例子

public SkuItemVo item(Long skuId) {
        SkuItemVo skuItemVo = new SkuItemVo();
        //1、sku详细信息 sku_info
        SkuInfoEntity skuInfo = getById(skuId);
        skuItemVo.setInfo(skuInfo);
        //2、sku 图片信息 sku_img
        List<SkuImagesEntity> images = skuImagesService.getImagesBySkuId(skuId);
        skuItemVo.setImages(images);
        //3、spu 销售属性组合
        List<SkuItemSaleAttrVo> saleAttr = skuSaleAttrValueService.getSaleAttrBySpuId(skuInfo.getSpuId());
        skuItemVo.setSaleAttr(saleAttr);
        //4、spu 的介绍
        SpuInfoDescEntity spuInfoDesc = spuInfoDescService.getById(skuInfo.getSpuId());
        skuItemVo.setDesc(spuInfoDesc);
        //5、spu 规格参数信息
        List<SpuItemAttrGroupVo> groupAttrs = attrGroupService.getAttrGroupWithAttrsBySpuId(skuInfo.getSpuId(),skuInfo.getCatalogId());
        skuItemVo.setGroupAttrs(groupAttrs);
        return skuItemVo;
    }

使用CompletableFuture异步编排后

private SkuItemVo item(Long skuId) {
        SkuItemVo skuItemVo = new SkuItemVo();
        /**
         * 3、4、5需要依赖1的运行结果,需要返回skuInfo后从中获取spuId和catalogId
         * 而2不需要依赖1的运行结果
         */
        //1、sku详细信息 sku_info
        CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
                SkuInfoEntity skuInfo = getById(skuId);
                skuItemVo.setInfo(skuInfo);
                return skuInfo;
        }, executor);
        //2、sku 图片信息 sku_img  2不需要等待上边1的执行结果
        CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
                List<SkuImagesEntity> images = skuImagesService.getImagesBySkuId(skuId);
                skuItemVo.setImages(images);
        }, executor);
        //下边的3、4、5都需要上边1的执行结果
        //所以下边的3、4、5都是基于上边1的执行结果 infoFuture 开始的
        //都是以infoFuture.thenAcceptAsync(skuInfo -> {})开始的
        CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync(skuInfo -> {
                //3、spu 销售属性组合  3
                List<SkuItemSaleAttrVo> saleAttr = skuSaleAttrValueService.getSaleAttrBySpuId(skuInfo.getSpuId());
                skuItemVo.setSaleAttr(saleAttr);
                System.out.println(saleAttr);
        }, executor);
        CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync(skuInfo -> {
                //4、spu 的介绍
                SpuInfoDescEntity spuInfoDesc = spuInfoDescService.getById(skuInfo.getSpuId());
                skuItemVo.setDesc(spuInfoDesc);
        }, executor);
        CompletableFuture<Void> attrGroupFuture = infoFuture.thenAcceptAsync(skuInfo -> {
                //5、spu 规格参数信息
                List<SpuItemAttrGroupVo> groupAttrs = attrGroupService.getAttrGroupWithAttrsBySpuId(skuInfo.getSpuId(),skuInfo.getCatalogId());
                System.out.println(groupAttrs);
                skuItemVo.setGroupAttrs(groupAttrs);
        }, executor);
        //等待所有任务完成
        try {
                CompletableFuture.allOf(saleAttrFuture,descFuture,attrGroupFuture,imageFuture).get() ;
        } catch (InterruptedException e) {
                log.error("查询商品详情异步编排错误: ");
                log.error(e.getMessage() );
        } catch (ExecutionException e) {
                log.error(e.getMessage() );
        }
        return skuItemVo;
}

以上就是Java CompletableFuture实现多线程异步编排的详细内容,更多关于Java CompletableFuture的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java中CompletableFuture 的详细介绍

    目录 1.概述 1.0 创建 CompletableFuture 的对象的工厂方法 1.1 non-async 和 async 区别 1.1.1 non-async 示例:注册 action 的时候任务可能已经结束 1.1.2 non-async 示例:注册 action 的时候任务未完成 1.2 Run 类方法 1.3 Accept 类方法 1.4 Apply 类方法 2 单个任务执行完成后执行一个动作(action) 2.0 示例 exceptionally 3 两个任务执行编排 4 多个任

  • Java8通过CompletableFuture实现异步回调

    目录 1 什么是CompletableFuture? 2 为什么会有CompletableFuture ? 3 CompletableFuture 简单使用 4 CompletableFuture 源码分析 4.1 创建异步任务 4.2 异步任务回调 4.3 异步任务组合 前言: java5为我们提供了Callable和Future,使我们可以很容易的完成异步任务结果的获取,但是通过Future的get获取异步任务结果会导致主线程的阻塞,这样在某些场景下是非常消耗CPU资源的,进而Java8为我

  • Java8中CompletableFuture使用场景与实现原理

    目录 1.概述 2.为什么引入CompletableFuture 3.功能 3.源码追踪 4.总结 1.概述 CompletableFuture是jdk1.8引入的实现类.扩展了Future和CompletionStage,是一个可以在任务完成阶段触发一些操作Future.简单的来讲就是可以实现异步回调. 2.为什么引入CompletableFuture 对于jdk1.5的Future,虽然提供了异步处理任务的能力,但是获取结果的方式很不优雅,还是需要通过阻塞(或者轮训)的方式.如何避免阻塞呢?

  • 详解Java8 CompletableFuture的并行处理用法

    目录 前言 场景 用法 1.在线API 2.编写在线API查询 3.编写查询服务 4.编写测试接口 5.效果 6.CompletableFuture并行查询 7.编写测试接口 8.CompletableFuture效果 思考 前言 工作中你可能会遇到很多这样的场景,一个接口,要从其他几个service调用查询方法,分别获取到需要的值之后再封装数据返回. 还可能在微服务中遇到类似的情况,某个服务的接口,要使用好几次feign去调用其他服务的方法获取数据,最后拿到想要的值并封装返回给前端. 这样的场

  • 深入学习java8 中的CompletableFuture

    目录 1 前言 2 简单使用 3 异步处理 3.1 thenApply 3.2 thenAccept 和 thenRun 3.3 exceptionally 异常处理 3.4 whenComplete 方法完成之后 3.5 handle 4 处理组合 4.1 任务均完成后组合 4.2 任一任务完成 4.3 任务处理结果 4.4 所有或者任何 5 总结 1 前言 在项目开发中,异步化处理是非常常见的解决问题的手段,异步化处理除了使用线程池之外,还可以使用 CompletableFuture 来实现

  • Java8并发新特性CompletableFuture

    目录 1.CompletableFuture是什么? 2.CompletableFuture的方法使用说明 2.1 CompletableFuture类提供几个静态方法来进行异步操作 2.2 获取异步任务执行结果的方法 get()/join() 3.CompletionStage的方法使用说明 3.1 纯消费类型 3.2 有返回值类型 3.3 不消费也不返回类型 3.4 组合类型 3.5 任务事件类型 4.CompletionStage异常处理方法 5.方法类型总结 1.CompletableF

  • Java CompletableFuture实现多线程异步编排

    目录 一 :问题背景 二 :CompletableFuture介绍 三 :具体场景 1.0 单个任务 1.0.1 runAsync:无返回值 1.0.2 supplyAsync:有返回值 1.0.3 supplyAsync:有返回值 2.0 两个任务编排 2.0.1 thenRunAsync 2.0.2 thenAcceptAsync 2.0.3 thenApplyAsync 3.0 三任务编排 3.0.1 三任务组合 3.0.2 三任务组合二 4.0 多任务的编排 4.0.1.allOf:所有

  • java CompletableFuture异步任务编排示例详解

    目录 前言 同步串行 异步串行 并行任务 多任务结果合并计算 任一任务完成 快速失败 注意 前言 在之前的项目开发中,都没怎么使用过CompletableFuture的功能,只听说过和异步编程有关.为了能够在将来有需要的时候用得上,这两天花了点时间学习了一下,并简单地总结一下如何使用CompletableFuture完成异步任务编排. 先创建一个自定义的线程池,后续所有代码都会使用到: private static final ThreadPoolExecutor THREAD_POOL_EXE

  • CompletableFuture 异步编排示例详解

    目录 从Future聊起 CompletableFuture 创建异步任务 异步回调 异步编排 串行 AND OR Future 机制扩展 CompletableFuture 实践 从Future聊起 Future是java 1.5引入的异步编程api,它表示一个异步计算结果,提供了获取异步结果的能力,解决了多线程场景下Runnable线程任务无法获取结果的问题. 但是其获取异步结果的方式并不够优雅,我们必须使用Future.get的方式阻塞调用线程,或者使用轮询方式判断 Future.isDo

  • Java CompletableFuture 异步超时实现深入研究

    目录 前言 常见使用方式 存在的问题 分析 现有做法 解决方式 JDK 9 JDK 8 前言 作者:京东科技 张天赐 JDK 8 是一次重大的版本升级,新增了非常多的特性,其中之一便是 CompletableFuture.自此从 JDK 层面真正意义上的支持了基于事件的异步编程范式,弥补了 Future 的缺陷. 在我们的日常优化中,最常用手段便是多线程并行执行.这时候就会涉及到 CompletableFuture 的使用. 常见使用方式 下面举例一个常见场景. 假如我们有两个 RPC 远程调用

  • Java使用多线程异步执行批量更新操作方法

    写在前面: 相信不少开发者在遇到项目对数据进行批量操作的时候,都会有不少的烦恼,尤其是针对数据量极大的情况下,效率问题就直接提上了菜板.因此,开多线程来执行批量任务是十分重要的一种批量操作思路,其实这种思路实现起来也十分简单,就拿批量更新的操作举例: 整体流程图 步骤 获取需要进行批量更新的大集合A,对大集合进行拆分操作,分成N个小集合A-1 ~ A-N . 开启线程池,针对集合的大小进行调参,对小集合进行批量更新操作. 对流程进行控制,控制线程执行顺序. 按照指定大小拆分集合的工具类 impo

  • java 中同步、异步、阻塞和非阻塞区别详解

    java 中同步.异步.阻塞和非阻塞区别详解 简单点说: 阻塞就是干不完不准回来,一直处于等待中,直到事情处理完成才返回: 非阻塞就是你先干,我先看看有其他事没有,一发现事情被卡住,马上报告领导. 我们拿最常用的send和recv两个函数来说吧... 比如你调用send函数发送一定的Byte,在系统内部send做的工作其实只是把数据传输(Copy)到TCP/IP协议栈的输出缓冲区,它执行成功并不代表数据已经成功的发送出去了,如果TCP/IP协议栈没有足够的可用缓冲区来保存你Copy过来的数据的话

  • Java中实现多线程关键词整理(总结)

    Java中的Runable,Callable,Future,FutureTask,ExecutorService,Excetor,Excutors,ThreadPoolExcetor在这里对这些关键词,以及它们的用法做一个总结. 首先将它们分个类: Runable,Callable Future,FutureTask ExecutorService,Excetor,Excutors,ThreadPoolExcetor 1. 关于Ranable和Callable 首先Java中创建线程的方法有三种

  • Springboot集成定时器和多线程异步处理操作

    需求:用@schedule标签进行定时处理逻辑,由于业务处理速度慢,需要每次执行逻辑放在不同的线程里异步执行 springboot集成多线程异步,直接上配置: /** * 线程池异步配置 */ @Configuration @EnableAsync public class ThreadExecutorConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskE

  • Java CompletableFuture的使用详解

    CompletableFuture​ 它代表某个同步或异步计算的一个阶段.你可以把它理解为是一个为了产生有价值最终结果的计算的流水线上的一个单元.这意味着多个指令可以链接起来从而一个阶段的完成可以触发下一个阶段的执行. 任务开启 supplyAsync 开启一个子线程去执行有返回结果 开启一个子线程用来执行执行事务,可以通过返回值的join来得到返回值. 例如: print("去煮饭了"); CompletableFuture<String> completableFutu

  • 详解Java CompletableFuture使用方法以及与FutureTask的区别

    目录 futureTask 创建异步任务 创建任务 1. .supplyAsync 2. .runAsync 异步回调 1. .thenApply 2. .thenAccept 3. .exceptionally 4. .whenComplete 组合处理 总的来说简洁了FutureTask与线程池的配合使用 没啥太大区别吧我觉得, 使用方法不一样, 多了一些方法 ??? futureTask 创建异步任务 FutureTask<String> stringFutureTask = new F

随机推荐