线程池中execute与submit的区别说明

目录
  • 线程池execute与submit区别
    • execute
    • submit
    • 例1
    • 例2
  • 线程池submit和execute方法原理
    • 线程池的作用
    • 线程池的原理

线程池execute与submit区别

在使用线程池的时候,看到execute()与submit()方法。都可以使用线程池执行一个任务,但是两者有什么区别呢?

execute

void execute(Runnable command);

submit

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

相同点:

1 都可以执行任务

2 参数都支持runnable

不同点:

1 submit支持接收返回值 详见例1。

2 execute 任务里面的异常必须捕获,不能向上抛出;submit支持的Callable支持向上抛出异常,需要由返回值.get()来进行接收。详见例2。

例1

public class ExecutorTest { 
    public static ExecutorService executorService = Executors.newFixedThreadPool(3); 
    public static void main(String[] args) {
        Future<?> result1 = executorService.submit(new Callable() {
            @Override
            public Object call() throws Exception {
                TimeUnit.SECONDS.sleep(10);
                System.out.println("Thread1 sleep 10 seconds");
                return true;
            }
        });
        Future<?> result2 = executorService.submit(new Callable() {
            @Override
            public Object call() throws Exception {
                TimeUnit.SECONDS.sleep(5);
                System.out.println("Thread2 sleep 5 seconds");
                return false;
            }
        });
        try {
            System.out.println("Thread1 return:"+result1.get());
            System.out.println("Thread2 return:"+result2.get());
            System.out.println("finished");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } 
    }
}

该例子的返回结果:

Thread2 sleep 5 seconds
Thread1 sleep 10 seconds
Thread1 return:true
Thread2 return:false
finished

解释:可以看到接收到了两个线程的返回结果。利用thread.sleep来模拟耗时操作,直到两个线程执行完毕之后,才会输出finished。利用Callable的返回阻塞,来等待这n个线程的执行完毕,然后将这n个线程的结果响应回去;其执行时间,基本上取决于最耗时的那个线程。

适用场景:在某些情况下,需要获得一组线程的结果,给调用端。

例2

而execute不支持向上抛出异常,必须将异常捕获。

线程池submit和execute方法原理

线程池的作用

1. 避免大量的线程强占资源

2. 避免大量的线程创建和销毁带来的开销

线程池的原理

创建线程池的时候,可以使用executors的静态方法,也可以使用new ThreadPoolExecutor的方式手动创建线程池,通过在线程池中指定参数达到创建不同类型的线程池的效果

其中,executors底层其实也是调用的new ThreadPoolExecutor()的方式创建的,是对不同线程池的封装,

线程的执行有两种方式,一种是submit(runnable v)的形式,一种是execute(runnable b) 的形式,不同的是submit可以返回一个future的实现类,相同的一点是submit底层其实也是调用的execute

调用execut方法,首先判断传入的参数是否为空,如果为空,抛出异常,如果不为空,使用获取ctl值,计算出当前线程状态码,通过状态码计算出当前线程池工作线程是否小于核心线程数量

如果小于,判断添加工作线程操作是否正常,如果正常,直接返回,如果不正常,继续执行获取ctl值,在添加工作线程的过程中,首先通过循环的方式保证ctl在加1的情况下状态同步,如果不同步,一直循环到同步为止,添加完成后,创建线程工作对象,把工作线程添加到set集合中,并执行.start,如果执行不成功,从set中删除添加的worker对象,并且ctl回滚到之前没有自增的值.

如果上述中添加工作线程失败,或者当前线程池中工作线程数量操作和信息数量,执行下列逻辑

判断当前线程池状态是否是running状态:

如果不是running状态,或者是running状态,并且添加到线程队列失败,重新添加个工作线程,此时入参中第二个参数用于添加工作线程的逻辑中当前工作线程数量与最大线程数量做对比,如果添加失败,执行reject处理类处理

如果是running状态,并且添加队列成功,重新获取ctl值,判断当前线程池状态如果是不是running状态,并且从对象中删除成功,则当前线程交给拒绝线程处理器处理,如果不满足上面条件,判断当前线程池的工作线程数如果为0,重新添加一个不带任务的线程.

//AbstractExecutorService.java文件
    // executorService 中的 submit 方法
    public Future<?> submit(Runnable task) {
        // 首先判断传入的runnable 对象是否为空
        if (task == null) throw new NullPointerException();
        // 创建一个 futuretask 对象
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    // 根据runnable 创建一个futuretask对象
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    // ThreadPoolExecutor.java文件
    // 执行创建线程池的方法
    public void execute(Runnable command) {
        // 首先判断传入的线程是否为空
        if (command == null)
            // 为空,抛出异常
            throw new NullPointerException();
        // 获取线程池的状态码, 这个状态码是自增的,原子类型的自增, 在执行addworker后ctl会加1
        int c = ctl.get();
        // 通过状态码,获取线程池中的线程的数量,如果小于核心数量
        if (workerCountOf(c) < corePoolSize) {
            // 添加线程到线程池,并且为true时使用核心线程数作为边界,如果false ,使用最大数量线程数作为边界
            if (addWorker(command, true))
                // 添加完成后,返回
                return;
            // 如果添加失败,重新获取状态值
            c = ctl.get();
        }
        // 执行下面逻辑有两种情况
        //      1. 工作线程数大于核心线程
        //      2. 添加线程时出错
        // 如果线程池中线程的数量大于核心的数量, 判断如果是运行状态, 并且也把线程加进了阻塞队列 workQueue 中
        if (isRunning(c) && workQueue.offer(command)) {
            // 重新获取 线程池 状态值
            int recheck = ctl.get();
            // 判断当前线程池如果不是运行状态,并且成功从队列中移除(从workQueue中移除线程, 并尝试终止线程池)
            if (! isRunning(recheck) && remove(command))
                // 执行拒绝执行线程的处理
                reject(command);
                // 如果工作线程数为0
            else if (workerCountOf(recheck) == 0)
                // 添加一个null的工作包装对象
                addWorker(null, false);
        } else if (!addWorker(command, false))
            // 如果添加到线程池中出错,执行拒接的线程
            reject(command);
    }
    // 创建一个原子类对象用于计算线程的中状态
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // integer.size 为 32
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    // 即高3位为1,低29位为0,该状态的线程池会接收新任务,也会处理在阻塞队列中等待处理的任务
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 即高3位为0,低29位为0,该状态的线程池不会再接收新任务,但还会处理已经提交到阻塞队列中等待处理的任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 即高3位为001,低29位为0,该状态的线程池不会再接收新任务,不会处理在阻塞队列中等待的任务,而且还会中断正在运行的任务
    private static final int STOP       =  1 << COUNT_BITS;
    // 即高3位为010,低29位为0,所有任务都被终止了,workerCount为0,为此状态时还将调用terminated()方法
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 即高3位为100,低29位为0,terminated()方法调用完成后变成此状态  
    private static final int TERMINATED =  3 << COUNT_BITS;
    // 用户计算线程的状态 32位中 高3位为1 低29位为0 
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 用于计算线程池中线程的数量 32位中 高3位为0  低29位为1
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // rs 为 runState, wc 为 workerCount 通过工作状态和线程数量来计算出 ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    // 添加工作线程的方法
    private boolean addWorker(Runnable firstTask, boolean core) {
        // 设置循环跳出点,如果执行到某个位置,使用break,直接跳出的是这个标签范围内的所有循环
        retry:
        for (;;) {
            // 获取线程状态
            int c = ctl.get();
            int rs = runStateOf(c);
            // 判断线程池状态是否在shutdown上以及 状态不是关闭并且添加的线程不为空,并且线程队列中的线程不是空的
            if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                // 如果满足上面条件,说明线程池已经不适合添加新的线程了, 直接返回false
                return false;
            // 如果不满足上面条件,说明线程池可以添加线程, 下面这个循环主要是对ctl进行操作,保证在增1后线程状态保持同步
            for (;;) {
                // 获取工作线程数量
                int wc = workerCountOf(c);
                // 判断当前线程池中工作线程数量是否大于线程容量,大于核心线程数或最大线程数
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    // 满足条件,说明当前线程不是适合添加新的线程的
                    return false;
                // 如果工作数量少于最大量或者核心线程数或最大线程数, 工作线程数加1,即操作ctl,通过cas的方式
                if (compareAndIncrementWorkerCount(c))
                    // 如果添加成功,跳出内循环,
                    break retry;
                // 如果添加失败,重新获取ctl
                c = ctl.get();  // Re-read ctl
                // 判断此时线程池状态是否已经改变
                if (runStateOf(c) != rs)
                    //如果状态不一致,跳过,重新循环
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        // 创建一个线程包装对象,用于包装线程
        Worker w = null;
        try {
            w = new Worker(firstTask);
            // 创建一个worker 工作线程
            final Thread t = w.thread;
            // 判断创建的线程是否为空
            if (t != null) {
                 // 如果不为空,获取锁对象
                final ReentrantLock mainLock = this.mainLock;
                // 开始加锁
                mainLock.lock();
                try {
                    // 获取线程池状态
                    int rs = runStateOf(ctl.get());
                    // 如果线程池状态是running或者线程池状态关闭并且传入的线程是空的
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 判断创建的工作线程是否是活动状态(已经开始还没有死掉)
                        if (t.isAlive()) // precheck that t is startable
                            // 如果是活动状态,抛出 非法线程状态异常 
                            throw new IllegalThreadStateException();
                        // 如果不是活动状态, 添加到set集合中,这个set集合只有持有mainlock才可以访问
                        workers.add(w);
                        // 获取集合长度
                        int s = workers.size();
                        // 如果存放刚才创建的workers工作线程的集合中的线程数超过最大的池的大小
                        if (s > largestPoolSize)
                            // 把set集合中的数量代替原线程池最大值
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    // 释放锁
                    mainLock.unlock();
                }
                // 根据前面的判断是否需要开启线程,如果线程已经是活动的,不需要开启,如果不是活动线程,开启线程
                if (workerAdded) {
                    t.start();
                    // 开启成功,设置workerStarted 为 true
                    workerStarted = true;
                }
            }
        } finally {
            // 如果工作线程开启失败,调用添加到失败的线程中
            if (! workerStarted)
                // 从set中移除失败的线程,并且ctl减1, 并且尝试终止线程池
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    // 线程开启失败后的方法
    private void addWorkerFailed(Worker w) {
        // 获取锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                // 如果线程不为空,从set集合中移除没有开启成功的线程
                workers.remove(w);
            // 减去之前ctl增加的1
            decrementWorkerCount();
            // 尝试中断线程
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
    // 通过cas方式ctl加1
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    // 移除线程
    public boolean remove(Runnable task) {
        // 从等待队列中一尺线程
        boolean removed = workQueue.remove(task);
        // 尝试终止线程池
        tryTerminate(); // In case SHUTDOWN and now empty
        return removed;
    }
    // 使用拒绝处理对象执行拒接指定线程
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • Java线程池execute()方法源码全面解析

    先看作者给出的注释来理解线程池到底有什么作用 * Thread pools address two different problems: they usually * provide improved performance when executing large numbers of * asynchronous tasks, due to reduced per-task invocation overhead, * and they provide a means of boundin

  • 通过代码示例了解submit与execute的区别

    (1)可以接受的任务类型 submit: execute: 可以看出: execute只能接受Runnable类型的任务 submit不管是Runnable还是Callable类型的任务都可以接受,但是Runnable返回值均为void,所以使用Future的get()获得的还是null (2)返回值 由Callable和Runnable的区别可知: execute没有返回值 submit有返回值,所以需要返回值的时候必须使用submit (3)异常 1.execute中抛出异常 execute

  • 简单解析execute和submit有什么区别

    1.execute 方法位于 java.util.concurrent.Executor 中 void execute(Runnable command); 2.execute 的具体实现 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize t

  • 线程池中execute与submit的区别说明

    目录 线程池execute与submit区别 execute submit 例1 例2 线程池submit和execute方法原理 线程池的作用 线程池的原理 线程池execute与submit区别 在使用线程池的时候,看到execute()与submit()方法.都可以使用线程池执行一个任务,但是两者有什么区别呢? execute void execute(Runnable command); submit <T> Future<T> submit(Callable<T&g

  • 解决线程池中ThreadGroup的坑

    目录 线程池中ThreadGroup的坑 ThreadGroup是否可行 Executors内部类DefaultThreadFactory ThreadGroup的使用及手写线程池 监听线程异常关闭 如何拿到Thread线程中异常 ThreadGroup 线程池使用 线程池中ThreadGroup的坑 在Java中每一个线程都归属于某个线程组管理的一员,例如在主函数main()主工作流程中产生一个线程,则产生的线程属于main这个线程组管理的一员.简单地说,线程组(ThreadGroup)就是由

  • Java如何自定义线程池中队列

    目录 背景 问题分析 问题解决 总结 两个队列的UML关系图 SynchronousQueue的定义 ArrayBlockingQueue的定义 分析 jdk源码中关于线程池队列的说明 背景 业务交互的过程中涉及到了很多关于SFTP下载的问题,因此在代码中定义了一些线程池,使用中发现了一些问题, 代码类似如下所示: public class ExecutorTest { private static ExecutorService es = new ThreadPoolExecutor(2, 1

  • 线程池中使用spring aop事务增强

    这篇文章主要介绍了线程池中使用spring aop事务增强,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 问题描述: 在项目里使用了线程池运行同一个类的实例方法,代码大致如下,运行时发现job方法的事务不生效 @Transactional public void doJob() { EXECOTOR.execute(() ->job()); } @Transactional public void job(){ //db operation }

  • 解决python ThreadPoolExecutor 线程池中的异常捕获问题

    问题 最近写了涉及线程池及线程的 python 脚本,运行过程中发现一个有趣的现象,线程池中的工作线程出现问题,引发了异常,但是主线程没有捕获异常,还在发现 BUG 之前一度以为线程池代码正常返回. 先说重点 这里主要想介绍 python concurrent.futuresthread.ThreadPoolExecutor 线程池中的 worker 引发异常的时候,并不会直接向上抛起异常,而是需要主线程通过调用concurrent.futures.Future.exception(timeou

  • Java线程池中的各个参数如何合理设置

    一.前言 在开发过程中,好多场景要用到线程池.每次都是自己根据业务场景来设置线程池中的各个参数. 这两天又有需求碰到了,索性总结一下方便以后再遇到可以直接看着用. 虽说根据业务场景来设置各个参数的值,但有些万变不离其宗,掌握它的原理对如何用好线程池起了至关重要的作用. 那我们接下来就来进行线程池的分析. 二.ThreadPoolExecutor的重要参数 我们先来看下ThreadPoolExecutor的带的那些重要参数的构造器. public ThreadPoolExecutor(int co

  • java线程池中线程数量到底是几

    目录 线程池配置 线程池里的业务线程数量小于最小数量(5) 第一个请求 第二个请求 第三个请求 第五个请求 小于阻塞队列容量(10) 第六个请求 第七个请求 第15个请求 小于最大数量(20) 第16个请求 第35个请求 拒绝策略 第36个请求 复用线程 线程池配置 线程池配置,假设是: 1.最小数量是5 2.阻塞队列容量是10 3.最大数量是20 线程池里的业务线程数量小于最小数量(5) 第一个请求 第一个请求进来的时候,这个时候,线程池没有线程,就创建新的工作线程(即Worker线程). 然

  • C#线程处理系列之线程池中的I/O线程

    一.I/O线程实现对文件的异步  1.1  I/O线程介绍: 对于线程所执行的任务来说,可以把线程分为两种类型:工作者线程和I/O线程. 工作者线程用来完成一些计算的任务,在任务执行的过程中,需要CPU不间断地处理,所以,在工作者线程的执行过程中,CPU和线程的资源是充分利用的. I/O线程主要用来完成输入和输出的工作的,在这种情况下, 计算机需要I/O设备完成输入和输出的任务,在处理过程中,CPU是不需要参与处理过程的,此时正在运行的线程将处于等待状态,只有等任务完成后才会有事可做, 这样就造

  • Java线程池中多余的线程是如何回收的

    最近阅读了JDK线程池ThreadPoolExecutor的源码,对线程池执行任务的流程有了大体了解,实际上这个流程也十分通俗易懂,就不再赘述了,别人写的比我好多了. 不过,我倒是对线程池是如何回收工作线程比较感兴趣,所以简单分析了一下,加深对线程池的理解吧. 那么,就以JDK1.8为例分析吧. 1.runWorker(Worker w) 工作线程启动后,就进入runWorker(Worker w)方法. 里面是一个while循环,循环判断任务是否为空,若不为空,执行任务:若取不到任务,或发生异

  • java线程池中Worker线程执行流程原理解析

    目录 引言 Worker类分析 runWorker(Worker)方法 getTask()方法 beforeExecute(Thread, Runnable)方法 afterExecute(Runnable, Throwable)方法 processWorkerExit(Worker, boolean)方法 tryTerminate()方法 terminated()方法 引言 在<[高并发]别闹了,这样理解线程池执行任务的核心流程才正确!!>一文中我们深度分析了线程池执行任务的核心流程,在Th

随机推荐