Java多线程 CompletionService

目录
  • 1 CompletionService介绍
  • 2 CompletionService源码分析
  • 3 CompletionService实现任务
  • 4 CompletionService总结

1 CompletionService介绍

CompletionService用于提交一组Callable任务,其take方法返回已完成的一个Callable任务对应的Future对象。
如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以将每个任务的Future保存进一个集合,然后循环这个集合调用Futureget()取出数据。幸运的是CompletionService帮你做了这件事情。
CompletionService整合了ExecutorBlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future
CompletionService的take返回的future是哪个先完成就先返回哪一个,而不是根据提交顺序。

2 CompletionService源码分析

首先看一下 构造方法:

   public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

构造法方法主要初始化了一个阻塞队列,用来存储已完成的task任务。

然后看一下 completionService.submit 方法:

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

可以看到,callable任务被包装成QueueingFuture,而 QueueingFutureFutureTask的子类,所以最终执行了FutureTask中的run()方法。

来看一下该方法:

 public void run() {
 //判断执行状态,保证callable任务只被运行一次
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
            //这里回调我们创建的callable对象中的call方法
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
            //处理执行结果
                set(result);
        }
    } finally {
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

可以看到在该 FutureTask 中执行run方法,最终回调自定义的callable中的call方法,执行结束之后,

通过 set(result) 处理执行结果:

    /**
     * Sets the result of this future to the given value unless
     * this future has already been set or has been cancelled.
     *
     * <p>This method is invoked internally by the {@link #run} method
     * upon successful completion of the computation.
     *
     * @param v the value
     */
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

继续跟进finishCompletion()方法,在该方法中找到 done()方法:

protected void done() { completionQueue.add(task); }

可以看到该方法只做了一件事情,就是将执行结束的task添加到了队列中,只要队列中有元素,我们调用take()方法时就可以获得执行的结果。
到这里就已经清晰了,异步非阻塞获取执行结果的实现原理其实就是通过队列来实现的,FutureTask将执行结果放到队列中,先进先出,线程执行结束的顺序就是获取结果的顺序。

CompletionService实际上可以看做是ExecutorBlockingQueue的结合体。CompletionService在接收到要执行的任务时,通过类似BlockingQueue的put和take获得任务执行的结果。CompletionService的一个实现是ExecutorCompletionServiceExecutorCompletionService把具体的计算任务交给Executor完成。

在实现上,ExecutorCompletionService在构造函数中会创建一个BlockingQueue(使用的基于链表的无界队列LinkedBlockingQueue),该BlockingQueue的作用是保存Executor执行的结果。当计算完成时,调用FutureTask的done方法。当提交一个任务到ExecutorCompletionService时,首先将任务包装成QueueingFuture,它是FutureTask的一个子类,然后改写FutureTask的done方法,之后把Executor执行的计算结果放入BlockingQueue中。

QueueingFuture的源码如下:

    /**
     * FutureTask extension to enqueue upon completion
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

3 CompletionService实现任务

public class CompletionServiceTest {
    public static void main(String[] args) {

        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool);
        for (int i = 1; i <=10; i++) {
            final int seq = i;
            completionService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {

                    Thread.sleep(new Random().nextInt(5000));

                    return seq;
                }
            });
        }
        threadPool.shutdown();
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println(
                        completionService.take().get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

    }
}

7
3
9
8
1
2
4
6
5
10

4 CompletionService总结

相比ExecutorServiceCompletionService可以更精确和简便地完成异步任务的执行
CompletionService的一个实现是ExecutorCompletionService,它是ExecutorBlockingQueue功能的融合体,Executor完成计算任务,BlockingQueue负责保存异步任务的执行结果
在执行大量相互独立和同构的任务时,可以使用CompletionService
CompletionService可以为任务的执行设置时限,主要是通过BlockingQueuepoll(long time,TimeUnit unit)为任务执行结果的取得限制时间,如果没有完成就取消任务

到此这篇关于Java多线程 CompletionService的文章就介绍到这了,更多相关Java多线程CompletionService内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java多线程基础

    目录 一.线程 二.创建多线程的方式 1.继承Thread类实现多线程 2.实现Runnable接口方式实现多线程 3.Callable接口创建线程 三.线程的生命周期与状态 四.线程的执行顺序 1.定时器 2.线程的互斥与同步通信 3.线程同步通信技术 一.线程 什么是线程: 线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位. 什么是多线程: 多线程指在单个程序中可以同时运行多个不同的线程执行不同的任务. 二.创建多线程的方式 多线程的创建方式有三种:T

  • Java多线程揭秘之synchronized工作原理

    目录 一. 特性 二. 加锁过程(锁升级/锁膨胀) 1. 无锁状态 2. 偏向锁 3. 轻量级锁 4. 重量级锁 5. 总结 三. 锁优化 1. 锁消除 2. 锁粗化 在学习本篇文章时,如果有不太懂的地方,大家也可以先看看博主上一篇文章,锁的这部分内容是面试中很常见的问题,多学学对自己是非常有帮助的.同时,朋友们如果有什么问题都可以随时和我探讨,大家一起进步! 一. 特性 这部分内容在上篇文章中的 synchronized充当了哪些锁部分已经介绍过了哦,没有看的小伙伴可以去看看synchroni

  • Java多线程 原子性操作类的使用

    目录 1. 基本类型的使用 2. 数组类型的使用 3. 引用类型的使用 4.字段类型的使用 前言: 在java5以后,我们接触到了线程原子性操作,也就是在修改时我们只需要保证它的那个瞬间是安全的即可,经过相应的包装后可以再处理对象的并发修改,本文总结一下Atomic系列的类的使用方法,其中包含: 1. 基本类型的使用 public class AtomicTest { /** * 常见的方法列表 * * @see AtomicInteger#get() 直接返回值 * @see AtomicIn

  • Java多线程 Callable、Future 和FutureTask

    目录 1 Callable介绍 2 Future介绍 2.1 在Future接口中声明方法 2.2 Future提供了三种功能 3 FutureTask 4 Future和FutureTask的使用 4.1 使用Callable+Future获取执行结果 4.2 使用Callable+Future获取执行结果 前言: 创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口. 这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果. 如果需要获取执行结果,就必须

  • Java 多线程之间共享数据

    目录 1.线程范围的共享变量 2.使用Map实现线程范围内数据的共享 3.ThreadLocal实现线程范围内数据的共享 4.优化 5.实例 1.线程范围的共享变量 多个业务模块针对同一个static变量的操作 要保证在不同线程中 各模块操作的是自身对应的变量对象 public class ThreadScopeSharaData { private static int data = 0 ; public static void main(String[] args) { for(int i

  • Java多线程之死锁详解

    目录 1.死锁 2.死锁经典问题--哲学家就餐问题 总结 1.死锁 出现场景:当线程A拥有了A对象的锁,想要去获取B对象的锁:线程B拥有了B对象的锁,想要拥有A对象的锁,两个线程在获取锁的时候,都不会释放已经持有的锁,于是,就造成了死锁. 示例代码: @Slf4j public class ThreadTest { private static Object objectA = new Object(); private static Object objectB = new Object();

  • Java多线程 CompletionService

    目录 1 CompletionService介绍 2 CompletionService源码分析 3 CompletionService实现任务 4 CompletionService总结 1 CompletionService介绍 CompletionService用于提交一组Callable任务,其take方法返回已完成的一个Callable任务对应的Future对象. 如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果.为此你可以将每个任务的Future保存进一个集

  • 浅谈Java多线程处理中Future的妙用(附源码)

    java 中Future是一个未来对象,里面保存这线程处理结果,它像一个提货凭证,拿着它你可以随时去提取结果.在两种情况下,离开Future几乎很难办.一种情况是拆分订单,比如你的应用收到一个批量订单,此时如果要求最快的处理订单,那么需要并发处理,并发的结果如果收集,这个问题如果自己去编程将非常繁琐,此时可以使用CompletionService解决这个问题.CompletionService将Future收集到一个队列里,可以按结果处理完成的先后顺序进队.另外一种情况是,如果你需要并发去查询一

  • Java 多线程并发编程_动力节点Java学院整理

    一.多线程 1.操作系统有两个容易混淆的概念,进程和线程. 进程:一个计算机程序的运行实例,包含了需要执行的指令:有自己的独立地址空间,包含程序内容和数据:不同进程的地址空间是互相隔离的:进程拥有各种资源和状态信息,包括打开的文件.子进程和信号处理. 线程:表示程序的执行流程,是CPU调度执行的基本单位:线程有自己的程序计数器.寄存器.堆栈和帧.同一进程中的线程共用相同的地址空间,同时共享进进程锁拥有的内存和其他资源. 2.Java标准库提供了进程和线程相关的API,进程主要包括表示进程的jav

  • 一文彻底搞懂java多线程和线程池

    目录 什么是线程 一. Java实现线程的三种方式 1.1.继承Thread类 1.2.实现Runnable接口,并覆写run方法 二. Callable接口 2.1 Callable接口 2.2 Future接口 2.3 Future实现类是FutureTask. 三. Java线程池 3.1.背景 3.2.作用 3.3.应用范围 四. Java 线程池框架Executor 4.1.类图: 4.2 核心类ThreadPoolExecutor: 4.3 ThreadPoolExecutor逻辑结

  • Java多线程提交按照时间顺序获取线程结果详解流程

    Maven依赖 其实要不要无所谓.主要是为了方便. <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.15</version> </dependency> 代码 不废话,上代码. package com.hy.csdn.tools; import cn.hutool.core.th

  • Java多线程之线程通信生产者消费者模式及等待唤醒机制代码详解

    前言 前面的例子都是多个线程在做相同的操作,比如4个线程都对共享数据做tickets–操作.大多情况下,程序中需要不同的线程做不同的事,比如一个线程对共享变量做tickets++操作,另一个线程对共享变量做tickets–操作,这就是大名鼎鼎的生产者和消费者模式. 正文 一,生产者-消费者模式也是多线程 生产者和消费者模式也是多线程的范例.所以其编程需要遵循多线程的规矩. 首先,既然是多线程,就必然要使用同步.上回说到,synchronized关键字在修饰函数的时候,使用的是"this"

  • java 多线程的几种实现方法总结

    java 多线程的几种实现方法总结 1.多线程有几种实现方法?同步有几种实现方法? 多线程有两种实现方法,分别是继承Thread类与实现Runnable接口 同步的实现方面有两种,分别是synchronized,wait与notify wait():使一个线程处于等待状态,并且释放所持有的对象的lock. sleep():使一个正在运行的线程处于睡眠状态,是一个静态方法,调用此方法要捕捉InterruptedException异常. notify():唤醒一个处于等待状态的线程,注意的是在调用此

  • java 多线程饥饿现象的问题解决方法

    java 多线程饥饿现象的问题解决方法 当有线程正在读的时候,不允许写 线程写,但是允许其他的读线程进行读.有写线程正在写的时候,其他的线程不应该读写.为了防止写线程出现饥饿现象,当线程正在读,如果写线程请求写,那么应该禁止再来的读线程进行读. 实现代码如下: File.Java package readerWriter; public class File { private String name; public File(String name) { this.name=name; } }

  • java多线程应用实现方法

    以前没有写笔记的习惯,现在慢慢的发现及时总结是多么的重要了,呵呵.虽然才大二,但是也快要毕业了,要加油了. 这一篇文章主要关于java多线程,主要还是以例子来驱动的.因为讲解多线程的书籍和文章已经很多了,所以我也不好意思多说,呵呵.大家可以去参考一些那些书籍.我这个文章主要关于实际的一些问题.同时也算是我以后复习的资料吧,.呵呵大家多多指教. 同时希望多结交一些技术上的朋友.谢谢. ---------------------------------------------------------

  • JAVA多线程实现生产者消费者的实例详解

    JAVA多线程实现生产者消费者的实例详解 下面的代码实现了生产者消费者的问题 Product.Java package consumerProducer; public class Product { private String id; public String getId() { return id; } public void setId(String id) { this.id = id; } public Product(String id) { this.id=id; } publ

随机推荐