Java并发编程必备之Future机制

前言

Java 5在concurrency包中引入了java.util.concurrent.Callable 接口,它和Runnable接口很相似,但它可以返回一个对象或者抛出一个异常。

Callable接口使用泛型去定义它的返回类型。Executors类提供了一些有用的方法在线程池中执行Callable内的任务。由于Callable任务是并行的,我们必须等待它返回的结果。而线程是属于异步计算模型,所以不可能直接从别的线程中得到函数返回值。

java.util.concurrent.Future对象为我们解决了这个问题。在线程池提交Callable任务后返回了一个Future对象,使用它可以知道Callable任务的状态和得到Callable返回的执行结果。Future提供了get()方法让我们可以等待Callable结束并获取它的执行结果。

Future的作用

当做一定运算的时候,运算过程可能比较耗时,有时会去查数据库,或是繁重的计算,比如压缩、加密等,在这种情况下,如果我们一直在原地等待方法返回,显然是不明智的,整体程序的运行效率会大大降低。

我们可以把运算的过程放到子线程去执行,再通过 Future 去控制子线程执行的计算过程,最后获取到计算结果。

这样一来就可以把整个程序的运行效率提高,是一种异步的思想。

同时在JDK 1.8的doc中,对Future的描述如下:

A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation.

大概意思就是Future是一个用于异步计算的接口。

举个例子:

比如去吃早点时,点了包子和凉菜,包子需要等3分钟,凉菜只需1分钟,如果是串行的一个执行,在吃上早点的时候需要等待4分钟,但是如果你在准备包子的时候,可以同时准备凉菜,这样只需要等待3分钟。

Future就是后面这种执行模式。

创建Future

线程池

class Task implements Callable<String> {
  public String call() throws Exception {
    return longTimeCalculation();
  }
}
ExecutorService executor = Executors.newFixedThreadPool(4);
// 定义任务:
Callable<String> task = new Task();
// 提交任务并获得Future:
Future<String> future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞

当我们提交一个Callable任务后,我们会同时获得一个Future对象,然后,我们在主线程某个时刻调用Future对象的get()方法,就可以获得异步执行的结果。

在调用get()时,如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果

FutureTask

除了用线程池的 submit 方法会返回一个 future 对象之外,同样还可以用 FutureTask 来获取 Future 类和任务的结果。

我们来看一下 FutureTask 的代码实现:

public class FutureTask<V> implements RunnableFuture<V>{
 ...
}

可以看到,它实现了一个接口,这个接口叫作 RunnableFuture。

我们再来看一下 RunnableFuture 接口的代码实现:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

既然 RunnableFuture 继承了 Runnable 接口和 Future 接口,而 FutureTask 又实现了 RunnableFuture 接口,所以 FutureTask 既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。

典型用法是,把 Callable 实例当作 FutureTask 构造函数的参数,生成 FutureTask 的对象,然后把这个对象当作一个 Runnable 对象,放到线程池中或另起线程去执行,最后还可以通过 FutureTask 获取任务执行的结果。

下面我们就用代码来演示一下:

public class FutureTaskDemo {

    public static void main(String[] args) {
        Task task = new Task();
        FutureTask<Integer> integerFutureTask = new FutureTask<>(task);
        new Thread(integerFutureTask).start();

        try {
            System.out.println("task运行结果:"+integerFutureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

class Task implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("子线程正在计算");
        int sum = 0;
        for (int i = 0; i < 100; i++) {
            sum += i;
        }
        return sum;
    }
}

在这段代码中可以看出,首先创建了一个实现了 Callable 接口的 Task,然后把这个 Task 实例传入到 FutureTask 的构造函数中去,创建了一个 FutureTask 实例,并且把这个实例当作一个 Runnable 放到 new Thread() 中去执行,最后再用 FutureTask 的 get 得到结果,并打印出来。

Future常用方法

方法名 返回值 入参 备注 总结
cancel boolean (boolean mayInterruptIfRunning) 用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。 也就是说Future提供了三种功能:判断任务是否完成,能够中断任务,能够获取任务执行结果
isCancelled boolean 方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
isDone boolean 方法表示任务是否已经完成,若任务完成,则返回true;
get V 方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回
get V (long timeout, TimeUnit unit) 用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null

get()方法

get方法最主要的作用就是获取任务执行的结果

我们来看一个代码示例:

public class FutureTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(10);
        Future<Integer> future = service.submit(new CallableTask());
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        service.shutdown();
    }

    static class CallableTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            Thread.sleep(3000);
            return new Random().nextInt();
        }
    }
}

在这段代码中,main 方法新建了一个 10 个线程的线程池,并且用 submit 方法把一个任务提交进去。

这个任务它所做的内容就是先休眠三秒钟,然后返回一个随机数。

接下来我们就直接把future.get结果打印出来,其结果是正常打印出一个随机数,比如 9527 等。

isDone()方法

该方法是用来判断当前这个任务是否执行完毕了。

需要注意的是,这个方法如果返回 true 则代表执行完成了;如果返回 false 则代表还没完成。

但这里如果返回 true,并不代表这个任务是成功执行的,比如说任务执行到一半抛出了异常。那么在这种情况下,对于这个 isDone 方法而言,它其实也是会返回 true 的,因为对它来说,虽然有异常发生了,但是这个任务在未来也不会再被执行,它确实已经执行完毕了。

所以 isDone 方法在返回 true 的时候,不代表这个任务是成功执行的,只代表它执行完毕了。

我们用一个代码示例来看一看,代码如下所示:

public class GetException {

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(20);
        Future<Integer> future = service.submit(new CallableTask());

        try {
            for (int i = 0; i < 5; i++) {
                System.out.println(i);
                Thread.sleep(500);
            }
            System.out.println(future.isDone());
            future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    static class CallableTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            throw new IllegalArgumentException("Callable抛出异常");
        }
    }
}

在这段代码中,可以看到有一个线程池,并且往线程池中去提交任务,这个任务会直接抛出一个异常。

那么接下来我们就用一个 for 循环去休眠,同时让它慢慢打印出 0 ~ 4 这 5 个数字,这样做的目的是起到了一定的延迟作用。

在这个执行完毕之后,再去调用 isDone() 方法,并且把这个结果打印出来,然后再去调用 future.get()

cancel方法

如果不想执行某个任务了,则可以使用 cancel 方法,会有以下三种情况:

  • 第一种情况最简单,那就是当任务还没有开始执行时,一旦调用 cancel,这个任务就会被正常取消,未来也不会被执行,那么 cancel 方法返回 true。
  • 第二种情况也比较简单。如果任务已经完成,或者之前已经被取消过了,那么执行 cancel 方法则代表取消失败,返回 false。因为任务无论是已完成还是已经被取消过了,都不能再被取消了。
  • 第三种情况就是这个任务正在执行,这个时候会根据我们传入的参数mayInterruptIfRunning做判断,如果传入的参数是 true,执行任务的线程就会收到一个中断的信号,正在执行的任务可能会有一些处理中断的逻辑,进而停止,如果传入的是 false 则就代表不中断正在运行的任务

isCancelled()方法

判断是否被取消,它和 cancel 方法配合使用,比较简单。

应用场景

目前对于Future方式,我们经常使用的有这么几类:

Guava

ListenableFutrue,通过增加监听器的方式,计算完成时立即得到结果,而无需一直循环查询

CompletableFuture

Java8的CompletableFuture,使用thenApply,thenApplyAsync可以达到和Guava类似的链式调用效果。

不同的是,对于Java8,如果thenApplyAsync不传入线程池,则会使用ForkJoinPools线程池来执行对应的方法,如此可以避免对其他线程产生影响。

Netty

Netty解决的问题:

  • 原生Future的isDone()方法判断一个异步操作是否完成,但是定义比较模糊:正常终止、抛出异常、用户取消都会使isDone方法返回true。
  • 对于一个异步操作,我们有些时候更关注的是这个异步操作触发或者结束后能否再执行一系列的动作。

与JDK相比,增加了完成状态的细分,增加了监听者,异步线程结束之后能够触发一系列的动作。

注意事项

添加超时机制

假设一共有四个任务需要执行,我们都把它放到线程池中,然后它获取的时候是按照从 1 到 4 的顺序,也就是执行 get() 方法来获取的

代码如下所示:

public class FutureDemo {

    public static void main(String[] args) {
        //创建线程池
        ExecutorService service = Executors.newFixedThreadPool(10);
        //提交任务,并用 Future 接收返回结果
        ArrayList<Future> allFutures = new ArrayList<>();
        for (int i = 0; i < 4; i++) {
            Future<String> future;
            if (i == 0 || i == 1) {
                future = service.submit(new SlowTask());
            } else {
                future = service.submit(new FastTask());
            }
            allFutures.add(future);
        }

        for (int i = 0; i < 4; i++) {
            Future<String> future = allFutures.get(i);
            try {
                String result = future.get();
                System.out.println(result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        service.shutdown();
    }

    static class SlowTask implements Callable<String> {

        @Override
        public String call() throws Exception {
            Thread.sleep(5000);
            return "速度慢的任务";
        }
    }

    static class FastTask implements Callable<String> {

        @Override
        public String call() throws Exception {
            return "速度快的任务";
        }
    }
}

可以看出,在代码中我们新建了线程池,并且用一个 list 来保存 4 个 Future。

其中,前两个 Future 所对应的任务是慢任务,也就是代码下方的 SlowTask,而后两个 Future 对应的任务是快任务。

慢任务在执行的时候需要 5 秒钟的时间才能执行完毕,而快任务很快就可以执行完毕,几乎不花费时间。

在提交完这 4 个任务之后,我们用 for 循环对它们依次执行 get 方法,来获取它们的执行结果,然后再把这个结果打印出来。

实际上在执行的时候会先等待 5 秒,然后再很快打印出这 4 行语句。

所以问题是:

第三个的任务量是比较小的,它可以很快返回结果,紧接着第四个任务也会返回结果。

但是由于前两个任务速度很慢,所以我们在利用 get 方法执行时,会卡在第一个任务上。也就是说,虽然此时第三个和第四个任务很早就得到结果了,但我们在此时使用这种 for 循环的方式去获取结果,依然无法及时获取到第三个和第四个任务的结果。直到 5 秒后,第一个任务出结果了,我们才能获取到,紧接着也可以获取到第二个任务的结果,然后才轮到第三、第四个任务。

假设由于网络原因,第一个任务可能长达 1 分钟都没办法返回结果,那么这个时候,我们的主线程会一直卡着,影响了程序的运行效率。

此时我们就可以用 Future 的带超时参数的get(long timeout, TimeUnit unit)方法来解决这个问题。

这个方法的作用是,如果在限定的时间内没能返回结果的话,那么便会抛出一个 TimeoutException 异常,随后就可以把这个异常捕获住,或者是再往上抛出去,这样就不会一直卡着了。

源码分析

超时实现原理

具体实现类:FutureTask

get()方法可以分为两步:

  • 判断当前任务的执行状态,如果不是COMPLETING,就调用awaitDone()方法开始进行死循环轮旋,如果任务还没有执行完成会使用nanos = deadline - System.nanoTime()检查是否超时,如果方法已经超时,则会返回,在返回后如果任务的状态仍然<=COMPLETING,就会抛出TimeoutException()。
  • 如果调用时任务没有执行完成,会调用parkNanos(),调用线程会阻塞在这里。

接下来分两种情况:

  • 在阻塞时间完以后任务的执行状态仍然没有改变为完成,进入下一次循环,直接返回。
  • 如果在轮询中状态已经改变,任务完成,则会中断死循环,返回任务执行的返回值。

到此这篇关于Java并发编程必备之Future机制的文章就介绍到这了,更多相关Java Future机制内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java FutureTask类使用案例解析

    FutureTask一个可取消的异步计算,FutureTask 实现了Future的基本方法,提空 start cancel 操作,可以查询计算是否已经完成,并且可以获取计算的结果.结果只可以在计算完成之后获取,get方法会阻塞当计算没有完成的时候,一旦计算已经完成,那么计算就不能再次启动或是取消. 一个FutureTask 可以用来包装一个 Callable 或是一个runnable对象.因为FurtureTask实现了Runnable方法,所以一个 FutureTask可以提交(submit

  • Java CompletableFuture的使用详解

    CompletableFuture​ 它代表某个同步或异步计算的一个阶段.你可以把它理解为是一个为了产生有价值最终结果的计算的流水线上的一个单元.这意味着多个指令可以链接起来从而一个阶段的完成可以触发下一个阶段的执行. 任务开启 supplyAsync 开启一个子线程去执行有返回结果 开启一个子线程用来执行执行事务,可以通过返回值的join来得到返回值. 例如: print("去煮饭了"); CompletableFuture<String> completableFutu

  • Java8新的异步编程方式CompletableFuture实现

    一. Future JDK 5引入了Future模式.Future接口是Java多线程Future模式的实现,在java.util.concurrent包中,可以来进行异步计算. Future模式是多线程设计常用的一种设计模式.Future模式可以理解成:我有一个任务,提交给了Future,Future替我完成这个任务.期间我自己可以去做任何想做的事情.一段时间之后,我就便可以从Future那儿取出结果. Future的接口很简单,只有五个方法. public interface Future<

  • 解析Java异步之call future

    一.概述 我们大家都知道,在 Java 中创建线程主要有三种方式: 继承 Thread 类: 实现 Runnable 接口: 实现 Callable 接口. 而后两者的区别在于 Callable 接口中的 call() 方法可以异步地返回一个计算结果 Future,并且一般需要配合ExecutorService 来执行.这一套操作在代码实现上似乎也并不难,可是对于call()方法具体怎么(被ExecutorService)执行的,以及 Future 这个结果是怎么获取的,却又不是很清楚了. 那么

  • Java中Future、FutureTask原理以及与线程池的搭配使用

    Java中的Future和Future通常和线程池搭配使用,用来获取线程池返回执行后的返回值.我们假设通过Executors工厂方法构建一个线程池es ,es要执行某个任务有两种方式,一种是执行 es.execute(runnable) ,这种情况是没有返回值的: 另外一种情况是执行 es.submit(runnale)或者 es.submit(callable) ,这种情况会返回一个Future的对象,然后调用Future的get()来获取返回值. Future public interfac

  • JAVA Future类的使用详解

    前言 在高性能编程中,并发编程已经成为了极为重要的一部分.在单核CPU性能已经趋于极限时,我们只能通过多核来进一步提升系统的性能,因此就催生了并发编程. 由于并发编程比串行编程更困难,也更容易出错,因此,我们就更需要借鉴一些前人优秀的,成熟的设计模式,使得我们的设计更加健壮,更加完美. 而Future模式,正是其中使用最为广泛,也是极为重要的一种设计模式.今天就跟阿丙了解一手Future模式! 生活中的Future模式 为了更快的了解Future模式,我们先来看一个生活中的例子. 场景1: 午饭

  • 了解JAVA Future类

    1. Future的应用场景 在并发编程中,我们经常用到非阻塞的模型,在之前的多线程的三种实现中,不管是继承thread类还是实现runnable接口,都无法保证获取到之前的执行结果.通过实现Callback接口,并用Future可以来接收多线程的执行结果. Future表示一个可能还没有完成的异步任务的结果,针对这个结果可以添加Callback以便在任务执行成功或失败后作出相应的操作. 举个例子:比如去吃早点时,点了包子和凉菜,包子需要等3分钟,凉菜只需1分钟,如果是串行的一个执行,在吃上早点

  • java通过Callable和Future来接收线程池的执行结果

    在Java的线程执行中,不管是直接继承Thread的方式,还是实现Runnable接口的方式,都不会获取到线程执行的返回结果.这样如果线程在执行过程中出现了错误,那么主线程也不会感知到.即使打印了日志,也不能立即抛出异常.事后查看日志才能发现出现了bug.而且到那时发生问题的代码点距离真正的问题点可能会相差很远.如果在线程池执行的过程中出现了bug能及时地抛出异常,那么这将会是一个很好的实现.解决上述问题的办法是使用Callable接口,其可以获取到线程的返回结果,通过Future的get方法来

  • Java并发编程必备之Future机制

    前言 Java 5在concurrency包中引入了java.util.concurrent.Callable 接口,它和Runnable接口很相似,但它可以返回一个对象或者抛出一个异常. Callable接口使用泛型去定义它的返回类型.Executors类提供了一些有用的方法在线程池中执行Callable内的任务.由于Callable任务是并行的,我们必须等待它返回的结果.而线程是属于异步计算模型,所以不可能直接从别的线程中得到函数返回值. java.util.concurrent.Futur

  • Java并发编程Callable与Future的应用实例代码

    本文主要探究的是java并发编程callable与future的使用,分享了相关实例代码,具体介绍如下. 我们都知道实现多线程有2种方式,一种是继承Thread,一种是实现Runnable,但这2种方式都有一个缺陷,在任务完成后无法获取返回结果.要想获得返回结果,就得使用Callable,Callable任务可以有返回值,但是没法直接从Callable任务里获取返回值:想要获取Callabel任务的返回值,需要用到Future.所以Callable任务和Future模式,通常结合起来使用. 试想

  • Java多线程之并发编程的基石CAS机制详解

    目录 一.CAS机制简介 1.1.悲观锁和乐观锁更新数据方式 1.2.什么是CAS机制 1.3.CAS与sychronized比较 1.4.Java中都有哪些地方应用到了CAS机制呢? 1.5.CAS 实现自旋锁 1.6.CAS机制优缺点 1>ABA问题 2>可能会消耗较高的CPU 3>不能保证代码块的原子性 二.Java提供的CAS操作类--Unsafe类 2.1.Unsafe类简介 2.2.Unsafe类的使用 三.CAS使用场景 3.1.使用一个变量统计网站的访问量 3.2.现在我

  • Java并发编程之线程之间的共享和协作

    一.线程间的共享 1.1 ynchronized内置锁 用处 Java支持多个线程同时访问一个对象或者对象的成员变量 关键字synchronized可以修饰方法或者以同步块的形式来进行使用 它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中 它保证了线程对变量访问的可见性和排他性(原子性.可见性.有序性),又称为内置锁机制. 对象锁和类锁 对象锁是用于对象实例方法,或者一个对象实例上的 类锁是用于类的静态方法或者一个类的class对象上的 类的对象实例可以有很多个,但是每个类只有

  • java多线程编程必备volatile与synchronized深入理解

    目录 Volatile概述 Synchronized概述 Volatile与Synchronized的区别 使用场景 1 Volatile的使用场景 2 Synchronized的使用场景 注意事项 相关面试问题 Volatile概述 Volatile是Java中的一种轻量级同步机制,用于保证变量的可见性和禁止指令重排.当一个变量被声明为Volatile类型时,任何修改该变量的操作都会立即被所有线程看到.也就是说,Volatile修饰的变量在每次修改时都会强制将修改刷新到主内存中,具有很好的可见

  • java并发编程_线程池的使用方法(详解)

    一.任务和执行策略之间的隐性耦合 Executor可以将任务的提交和任务的执行策略解耦 只有任务是同类型的且执行时间差别不大,才能发挥最大性能,否则,如将一些耗时长的任务和耗时短的任务放在一个线程池,除非线程池很大,否则会造成死锁等问题 1.线程饥饿死锁 类似于:将两个任务提交给一个单线程池,且两个任务之间相互依赖,一个任务等待另一个任务,则会发生死锁:表现为池不够 定义:某个任务必须等待池中其他任务的运行结果,有可能发生饥饿死锁 2.线程池大小 注意:线程池的大小还受其他的限制,如其他资源池:

  • Java并发编程总结——慎用CAS详解

    一.CAS和synchronized适用场景 1.对于资源竞争较少的情况,使用synchronized同步锁进行线程阻塞和唤醒切换以及用户态内核态间的切换操作额外浪费消耗cpu资源:而CAS基于硬件实现,不需要进入内核,不需要切换线程,操作自旋几率较少,因此可以获得更高的性能. 2.对于资源竞争严重的情况,CAS自旋的概率会比较大,从而浪费更多的CPU资源,效率低于synchronized.以java.util.concurrent.atomic包中AtomicInteger类为例,其getAn

  • Java 并发编程学习笔记之Synchronized简介

    一.Synchronized的基本使用 Synchronized是Java中解决并发问题的一种最常用的方法,也是最简单的一种方法.Synchronized的作用主要有三个:(1)确保线程互斥的访问同步代码(2)保证共享变量的修改能够及时可见(3)有效解决重排序问题.从语法上讲,Synchronized总共有三种用法: (1)修饰普通方法 (2)修饰静态方法 (3)修饰代码块 接下来我就通过几个例子程序来说明一下这三种使用方式(为了便于比较,三段代码除了Synchronized的使用方式不同以外,

  • Java 并发编程之线程挂起、恢复与终止

    挂起和恢复线程 Thread 的API中包含两个被淘汰的方法,它们用于临时挂起和重启某个线程,这些方法已经被淘汰,因为它们是不安全的,不稳定的.如果在不合适的时候挂起线程(比如,锁定共享资源时),此时便可能会发生死锁条件--其他线程在等待该线程释放锁,但该线程却被挂起了,便会发生死锁.另外,在长时间计算期间挂起线程也可能导致问题. 下面的代码演示了通过休眠来延缓运行,模拟长时间运行的情况,使线程更可能在不适当的时候被挂起: public class DeprecatedSuspendResume

  • Java 并发编程学习笔记之核心理论基础

    并发编程是Java程序员最重要的技能之一,也是最难掌握的一种技能.它要求编程者对计算机最底层的运作原理有深刻的理解,同时要求编程者逻辑清晰.思维缜密,这样才能写出高效.安全.可靠的多线程并发程序.本系列会从线程间协调的方式(wait.notify.notifyAll).Synchronized及Volatile的本质入手,详细解释JDK为我们提供的每种并发工具和底层实现机制.在此基础上,我们会进一步分析java.util.concurrent包的工具类,包括其使用方式.实现源码及其背后的原理.本

随机推荐