Java多线程工具CompletableFuture的使用教程

目录
  • 前言
    • Future的问题
    • CompletableFuture应运而生
  • 使用方式
    • 基本使用-提交异步任务
    • 处理上个异步任务结果
    • 对两个结果进行选用-acceptEither
    • 对两个结果进行合并-thenCombine, thenAcceptBoth
    • 异常处理
  • 案例
    • 大量用户发送短信|消息
    • 并发获取商品不同信息
  • 问题
    • thenRun和thenRunAsync有什么区别
    • handle和exceptional有什么区别
  • 最后

前言

Future的问题

写多线程程序的时候,可以使用Future从一个异步线程中拿到结果,但是如果使用过程中会发现一些问题:

  • 如果想要对Future的结果做进一步的操作,需要阻塞当前线程
  • 多个Future不能被链式的执行,每个Future的结果都是独立的,期望对一个Future的结果做另外一件异步的事情;
  • 没有异常处理策略,如果Future执行失败了,需要手动捕捉

CompletableFuture应运而生

为了解决Future问题,JDK在1.8的时候给我们提供了一个好用的工具类CompletableFuture;

它实现了Future和CompletionStage接口,针对Future的不足之处给出了相应的处理方式。

  • 在异步线程执行结束后可以自动回调我们新的处理逻辑,无需阻塞
  • 可以对多个异步任务进行编排,组合或者排序
  • 异常处理

CompletableFuture的核心思想是将每个异步任务都可以看做一个步骤(CompletionStage),然后其他的异步任务可以根据这个步骤做一些想做的事情。

CompletionStage定义了许多步骤处理的方法,功能非常强大,这里就只列一下日常中常用到的一些方法供大家参考。

使用方式

基本使用-提交异步任务

简单的使用方式

异步执行,无需结果:

// 可以执行Executors异步执行,如果不指定,默认使用ForkJoinPool
CompletableFuture.runAsync(() -> System.out.println("Hello CompletableFuture!"));

异步执行,同时返回结果:

// 同样可以指定线程池
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "Hello CompletableFuture!");
System.out.println(stringCompletableFuture.get());

处理上个异步任务结果

  • thenRun: 不需要上一步的结果,直接直接新的操作
  • thenAccept:获取上一步异步处理的内容,进行新的操作
  • thenApply: 获取上一步的内容,然后产生新的内容

所有加上Async后缀的,代表新的处理操作仍然是异步的。Async的操作都可以指定Executors进行处理

// Demo
       CompletableFuture
                .supplyAsync(() -> "Hello CompletableFuture!")
                // 针对上一步的结果做处理,产生新的结果
                .thenApplyAsync(s -> s.toUpperCase())
                // 针对上一步的结果做处理,不返回结果
                .thenAcceptAsync(s -> System.out.println(s))
                // 不需要上一步返回的结果,直接进行操作
                .thenRunAsync(() -> System.out.println("end"));
        ;

对两个结果进行选用-acceptEither

当我们有两个回调在处理的时候,任何完成都可以使用,两者结果没有关系,那么使用acceptEither。

两个异步线程谁先执行完成,用谁的结果,其余类型的方法也是如此。

// 返回abc
CompletableFuture
                .supplyAsync(() -> {
                    SleepUtils.sleep(100);
                    return "Hello CompletableFuture!";
                })
                .acceptEither(CompletableFuture.supplyAsync(() -> "abc"), new Consumer<String>() {
                    @Override
                    public void accept(String s) {
                        System.out.println(s);
                    }
                });
// 返回Hello CompletableFuture!
CompletableFuture
                .supplyAsync(() -> "Hello CompletableFuture!")
                .acceptEither(CompletableFuture.supplyAsync(() -> {
                    SleepUtils.sleep(100);
                    return "abc";
                }), new Consumer<String>() {
                    @Override
                    public void accept(String s) {
                        System.out.println(s);
                    }
                });

对两个结果进行合并-thenCombine, thenAcceptBoth

thenCombine

当我们有两个CompletionStage时,需要对两个的结果进行整合处理,然后计算得出一个新的结果。

  • thenCompose是对上一个CompletionStage的结果进行处理,返回结果,并且返回类型必须是CompletionStage。
  • thenCombine是得到第一个CompletionStage的结果,然后拿到当前的CompletionStage,两者的结果进行处理。
        CompletableFuture<Integer> heightAsync = CompletableFuture.supplyAsync(() -> 172);

        CompletableFuture<Double> weightAsync = CompletableFuture.supplyAsync(() -> 65)
                .thenCombine(heightAsync, new BiFunction<Integer, Integer, Double>() {
                    @Override
                    public Double apply(Integer wight, Integer height) {
                        return wight * 10000.0 / (height * height);
                    }
                })
                ;

thenAcceptBoth

需要两个异步CompletableFuture的结果,两者都完成的时候,才进入thenAcceptBoth回调。

// thenAcceptBoth案例:
        CompletableFuture
                .supplyAsync(() -> "Hello CompletableFuture!")
                .thenAcceptBoth(CompletableFuture.supplyAsync(() -> "abc"), new BiConsumer<String, String>() {
                		// 参数一为我们刚开始运行时的CompletableStage,新传入的作为第二个参数
                    @Override
                    public void accept(String s, String s2) {
                        System.out.println("param1=" + s + ", param2=" + s2);
                    }
                });
// 结果:param1=Hello CompletableFuture!, param2=abc

异常处理

当我们使用CompleteFuture进行链式调用的时候,多个异步回调中,如果有一个执行出现问题,那么接下来的回调都会停止,所以需要一种异常处理策略。

exceptionally

exceptionally是当出现错误时,给我们机会进行恢复,自定义返回内容。

        CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("发生错误");
        }).exceptionally(throwable -> {
            log.error("调用错误 {}", throwable.getMessage(), throwable);
            return "异常处理内容";
        });

handle

exceptionally是只有发生异常时才会执行,而handle则是不管是否发生错误都会执行。

CompletableFuture.supplyAsync(() -> {
    return "abc";
})
.handle((r,err) -> {
    log.error("调用错误 {}", err.getMessage(), err);
    // 对结果做额外的处理
    return r;
})
;

案例

大量用户发送短信|消息

需求为对某个表中特定条件的用户进行短信通知,但是短信用户有成百上千万,如果使用单线程读取效率会很慢。这个时候可以考虑使用多线程的方式进行读取;

1、将读取任务拆分为多个不同的子任务,指定读取的偏移量和个数

  // 假设有500万条记录
        long recordCount = 500 * 10000;
        int subTaskRecordCount = 10000;
        // 对记录进行分片
        List<Map> subTaskList = new LinkedList<>();
        for (int i = 0; i < recordCount / 500; i++) {
            // 如果子任务结构复杂,建议使用对象
            HashMap<String, Integer> subTask = new HashMap<>();
            subTask.put("index", i);
            subTask.put("offset", i * subTaskRecordCount);
            subTask.put("count", subTaskRecordCount);
            subTaskList.add(subTask);
        }

2、使用多线程进行批量读取

  // 进行subTask批量处理,拆分为不同的任务
        subTaskList.stream()
                .map(subTask -> CompletableFuture.runAsync(()->{
                    // 读取数据,然后处理
                    // dataTunel.read(subTask);
                },excuturs))   // 使用应用的通用任务线程池
                .map(c -> ((CompletableFuture<?>) c).join());

3、进行业务逻辑处理,或者直接在读取完进行业务逻辑处理也是可以;

并发获取商品不同信息

在系统拆分比较细的时候,价格,优惠券,库存,商品详情等信息分散在不同的系统中,有时候需要同时获取商品的所有信息, 有时候可能只需要获取商品的部分信息。

当然问题点在于要调用多个不同的系统,需要将RT降低下来,那么需要进行并发调用;

     List<Task> taskList = new ArrayList<>();
        List<Object> result = taskList.stream()
                .map(task -> CompletableFuture.supplyAsync(()->{
//                    handlerMap.get(task).query();
                    return "";
                }, executorService))
                .map(c -> c.join())
                .collect(Collectors.toList());

问题

thenRun和thenRunAsync有什么区别

  • 如果不使用传入的线程池,大家用默认的线程池ForkJoinPool
  • thenRun用的默认和上一个任务使用相同的线程池
  • thenRunAsync在执行新的任务的时候可以接受传入一个新的线程池,使用新的线程池执行任务;

handle和exceptional有什么区别

exceptionally是只有发生异常时才会执行,而handle则是不管是否发生错误都会执行。

最后

一般情况下上述简单的API已经满足绝大部分的场景了,如果有更复杂的诉求,可继续深入研究。

到此这篇关于Java多线程工具CompletableFuture的使用教程的文章就介绍到这了,更多相关Java CompletableFuture内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java CompletableFuture的使用详解

    CompletableFuture​ 它代表某个同步或异步计算的一个阶段.你可以把它理解为是一个为了产生有价值最终结果的计算的流水线上的一个单元.这意味着多个指令可以链接起来从而一个阶段的完成可以触发下一个阶段的执行. 任务开启 supplyAsync 开启一个子线程去执行有返回结果 开启一个子线程用来执行执行事务,可以通过返回值的join来得到返回值. 例如: print("去煮饭了"); CompletableFuture<String> completableFutu

  • Java8中CompletableFuture使用场景与实现原理

    目录 1.概述 2.为什么引入CompletableFuture 3.功能 3.源码追踪 4.总结 1.概述 CompletableFuture是jdk1.8引入的实现类.扩展了Future和CompletionStage,是一个可以在任务完成阶段触发一些操作Future.简单的来讲就是可以实现异步回调. 2.为什么引入CompletableFuture 对于jdk1.5的Future,虽然提供了异步处理任务的能力,但是获取结果的方式很不优雅,还是需要通过阻塞(或者轮训)的方式.如何避免阻塞呢?

  • 深入学习java8 中的CompletableFuture

    目录 1 前言 2 简单使用 3 异步处理 3.1 thenApply 3.2 thenAccept 和 thenRun 3.3 exceptionally 异常处理 3.4 whenComplete 方法完成之后 3.5 handle 4 处理组合 4.1 任务均完成后组合 4.2 任一任务完成 4.3 任务处理结果 4.4 所有或者任何 5 总结 1 前言 在项目开发中,异步化处理是非常常见的解决问题的手段,异步化处理除了使用线程池之外,还可以使用 CompletableFuture 来实现

  • Java8 自定义CompletableFuture的原理解析

    目录 Java8 自定义CompletableFuture原理 CompleteFuture简单使用 下面简单介绍用法 Java8 自定义CompletableFuture原理 Future 接口 的局限性有很多,其中一个就是需要主动的去询问是否完成,如果等子线程的任务完成以后,通知我,那岂不是更好? public class FutureInAction3 { public static void main(String[] args) { Future<String> future = i

  • Java中CompletableFuture 的详细介绍

    目录 1.概述 1.0 创建 CompletableFuture 的对象的工厂方法 1.1 non-async 和 async 区别 1.1.1 non-async 示例:注册 action 的时候任务可能已经结束 1.1.2 non-async 示例:注册 action 的时候任务未完成 1.2 Run 类方法 1.3 Accept 类方法 1.4 Apply 类方法 2 单个任务执行完成后执行一个动作(action) 2.0 示例 exceptionally 3 两个任务执行编排 4 多个任

  • Java8 CompletableFuture详解

    Java 8来了,是时候学一下新的东西了.Java 7和Java 6只不过是稍作修改的版本,而Java 8将会发生重大的改进.或许是Java 8太大了吧?今天我会给你彻底地解释JDK 8中的新的抽象 – CompletableFuture.众所周知,Java 8不到一年就会发布,因此这篇文章是基于JDK 8 build 88 with lambda support的.CompletableFuture extends Future提供了方法,一元操作符和促进异步性以及事件驱动编程模型,它并不止步

  • Java8中CompletableFuture的用法全解

    目录 前言 一.创建异步任务 1.Future.submit 2.supplyAsync / runAsync 二.异步回调 1.thenApply / thenApplyAsync 2.thenAccept / thenRun 3. exceptionally 4.whenComplete 5.handle 三.组合处理 1.thenCombine / thenAcceptBoth / runAfterBoth 2.applyToEither / acceptEither / runAfter

  • Java多线程工具CompletableFuture的使用教程

    目录 前言 Future的问题 CompletableFuture应运而生 使用方式 基本使用-提交异步任务 处理上个异步任务结果 对两个结果进行选用-acceptEither 对两个结果进行合并-thenCombine, thenAcceptBoth 异常处理 案例 大量用户发送短信|消息 并发获取商品不同信息 问题 thenRun和thenRunAsync有什么区别 handle和exceptional有什么区别 最后 前言 Future的问题 写多线程程序的时候,可以使用Future从一个

  • windows环境下java开发工具maven的安装教程图解

    Maven是什么? Maven是一个项目管理和综合工具.Maven提供了开发人员构建一个完整的生命周期框架.开发团队可以自动完成项目的基础工具建设,Maven使用标准的目录结构和默认构建生命周期. 在多个开发团队环境时,Maven可以设置按标准在非常短的时间里完成配置工作.由于大部分项目的设置都很简单,并且可重复使用,Maven让开发人员的工作更轻松,同时创建报表,检查,构建和测试自动化设置 maven是java项目依赖管理和构建工具,有了maven可以让java项目开发和构建更加便捷和高效,是

  • Java多线程工具篇BlockingQueue的详解

    前言: 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全"传输"数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 认识BlockingQueue 阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出: 常用的队列主要有以下两种:(当

  • Java多线程开发工具之CompletableFuture的应用详解

    做Java编程,难免会遇到多线程的开发,但是JDK8这个CompletableFuture类很多开发者目前还没听说过,但是这个类实在是太好用了,了解它的一些用法后相信你会对它爱不释手(呸渣男,咋对谁都爱不释手呢),好了我先简单举个列子,告诉你用它有多好.Single Dog拿一个Appointment来举个列子,如下: /** * 女神化完妆之后,还需要一小会选衣服,不过分吧. * 也就是说我们现在有2个异步任务,第一个是化妆,第二个是选衣服. * 选衣服要在化妆完成之后进行,这两个任务是串行

  • 教你如何使用Java多线程编程LockSupport工具类

    LockSupport类 用于创建锁和其他同步类的基本线程阻塞原语,此类与使用它的每个线程关联一个许可.如果获得许可,将立即返回对park的调用,并在此过程中消耗掉它:否则may会被阻止.调用unpark可使许可证可用(如果尚不可用).(不过与信号量不同,许可证不会累积.最多只能有一个.) 方法park和unpark提供了有效的阻塞和解阻塞线程的方法,这些线程不会遇到导致已弃用的方法Thread.suspend和Thread.resume无法用于以下问题:由于许可,在调用park的一个线程与试图

  • Java多线程之同步工具类CyclicBarrier

    目录 1 CyclicBarrier方法说明 2 CyclicBarrier实例 3 CyclicBarrier源码解析 CyclicBarrier构造函数 await方法 nextGeneration的源码 breakBarrier源码 isBroken方法 reset方法 getNumberWaiting方法 前言: CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到达到某个公共屏障点.与CountDownLatch不同的是该barrier在释放线程等待后可以重用,所以

  • Java多线程之同步工具类Exchanger

    目录 1 Exchanger 介绍 2 Exchanger 实例 exchange等待超时 3 实现原理 1 Exchanger 介绍 前面分别介绍了CyclicBarrier.CountDownLatch.Semaphore,现在介绍并发工具类中的最后一个Exchange. Exchanger 是一个用于线程间协作的工具类,Exchanger用于进行线程间的数据交换,它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据.这两个线程通过exchange 方法交换数据,如果第一个线程先执行e

  • Java多线程之同步工具类CountDownLatch

    前言: CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行.例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行. 1 CountDownLatch主要方法 void await():如果当前count大于0,当前线程将会wait,直到count等于0或者中断. PS:当count等于0的时候,再去调用await() , 线程将不会阻塞,而是立即运行.后面可以通过源码分析得到. boolean await(long t

  • Java多线程教程之如何利用Future实现携带结果的任务

    目录 Future 介绍 Runnable Callable Future 总结 Future 介绍 Future表示异步计算的结果,它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果.Future的cancel方法可以取消任务的执行,它有一布尔参数,参数为 true 表示立即中断任务的执行,参数为 false 表示允许正在运行的任务运行完成.Future的 get 方法等待计算完成,获取计算结果. Runnable Runnable 是我们多线程开发过程中常用的接口. Exec

  • Java多线程同步工具类CountDownLatch详解

    目录 简介 核心方法 CountDownLatch如何使用 CountDownLatch运行流程 运用场景 总结 简介 CountDownLatch是一个多线程同步工具类,在多线程环境中它允许多个线程处于等待状态,直到前面的线程执行结束.从类名上看CountDown既是数量递减的意思,我们可以把它理解为计数器. 核心方法 countDown():计数器递减方法. await():使调用此方法的线程进入等待状态,直到计数器计数为0时主线程才会被唤醒. await(long, TimeUnit):在

随机推荐