Java8中AbstractExecutorService与FutureTask源码详解

目录
  • 前言
  • 一、AbstractExecutorService
    • 1、定义
    • 2、submit
    • 3、invokeAll
    • 4、invokeAny
  • 二、FutureTask
    • 1、定义
    • 2、构造方法
    • 3、get
    • 4、run/ runAndReset
    • 5、 cancel
  • 三、ExecutorCompletionService
    • 1、定义
    • 2、submit
    • 3、take/ poll
  • 总结

前言

本篇博客重点讲解ThreadPoolExecutor的三个基础设施类AbstractExecutorService、FutureTask和ExecutorCompletionService的实现细节,AbstractExecutorService实现了ExecutorService的大部分接口,子类只需实现excute方法和shutdown相关方法即可;FutureTask是RunnableFuture接口的主要实现,该接口是Runnable和Future的包装类接口,会执行Runnable对应的run方法,调用方可以通过Future接口获取任务的执行状态和结果;ExecutorCompletionService是帮助获取多个RunnableFuture任务的执行结果的工具类,基于FutureTask执行完成时的回调方法done实现的。

一、AbstractExecutorService

1、定义

ThreadPoolExecutor的类继承关系如下:

其中ExecutorService的子类如下:

右上角带S的表示内部类,我们重点关注ThreadPoolExecutor,ScheduledThreadPoolExecutor和ForkJoinPool三个类的实现,后面两个类会在后面的博客中逐一探讨。

Executor包含的方法如下:

ExecutorService包含的方法如下:

上述接口方法中涉及的Callable接口的定义如下:

该接口也是表示一个执行任务,跟常见的Runnable接口的区别在于call方法有返回值而run方法没有返回值。

Future表示某个任务的执行结果,其定义的方法如下:

其子类比较多,如下:

后面会将涉及的子类逐一探讨的。 AbstractExecutorService基于Executor接口的excute方法实现了大部分的ExecutorService的接口,子类只需要重点实现excute方法和shutdown相关方法即可,下面来分析其具体的实现。

2、submit

//Runnable接口方法没有返回值,但是可以通过Future判断任务是否执行完成
public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

//因为Runnable的run方法没有返回值,所以如果run方法正常执行完成,其结果就是result
public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

//都是返回FutureTask
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

3、invokeAll

//执行完成tasks中所有的任务,如果有一个抛出异常,则取消掉剩余的任务
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            //遍历tasks中的任务将其转换成RunnableFuture,然后提交到线程池执行
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            //遍历Future列表
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) { //如果未执行完成
                    try {
                        //等待任务执行完成
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            //所有任务都执行完了
            done = true;
            return futures;
        } finally {
            if (!done)
                //出现异常,将所有的任务都取消掉
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

//逻辑同上,不过加了等待时间限制,所有的任务的累计时间不能超过指定值,如果超时直接返回Future列表
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        //转换成纳秒
        long nanos = unit.toNanos(timeout);
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            //转换成Future
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            //计算终止时间
            final long deadline = System.nanoTime() + nanos;
            final int size = futures.size();

            for (int i = 0; i < size; i++) {
                execute((Runnable)futures.get(i));
                nanos = deadline - System.nanoTime(); //计算剩余时间
                if (nanos <= 0L) //如果超时了则直接返回
                    return futures;
            }

            for (int i = 0; i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) { //任务未执行
                    if (nanos <= 0L)
                        return futures; //等待超时
                    try {
                        //等待任务执行完成
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    nanos = deadline - System.nanoTime();
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done) //出现异常,取消掉剩余未执行的任务
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

4、invokeAny

//多个任务只要有一个执行成功就返回,并把剩余的已提交未执行的任务给取消掉
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }

//多个任务只要有一个执行成功就返回,并把剩余的已提交未执行的任务给取消掉
//如果指定时间内没有执行成功的,则抛出TimeoutException 异常
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        //参数校验
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        try {

            ExecutionException ee = null;
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            //提交一个任务
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;

            for (;;) {
                //获取最新的已完成任务
                Future<T> f = ecs.poll();
                if (f == null) {
                    //没有执行完的
                    if (ntasks > 0) {
                        --ntasks;
                        //继续添加下一个任务
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0) //所有任务都执行失败了,没有执行成功的
                        break;
                    else if (timed) { //等待超时
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        //计算剩余等待时间
                        nanos = deadline - System.nanoTime();
                    }
                    else
                        //所有任务都提交了,阻塞等待某个任务执行完成
                        f = ecs.take();
                }
                if (f != null) {
                    --active;
                    try {
                        //某个任务已执行完成,如果抛出异常则执行下一个任务
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            } //for循环终止

            //所有任务都执行失败了
            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            //返回前,将未执行完成的任务都取消掉
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }

二、FutureTask

1、定义

FutureTask的类继承关系如下:

RunnableFuture接口没有新增方法,将Runnable的run方法由public改成包级访问了,如下:

该类包含的实例属性如下:

    /** 执行的任务*/
    private Callable<V> callable;

    /** 任务执行的结果或者执行过程中抛出的异常 */
    private Object outcome; // non-volatile, protected by state reads/writes

    /** 执行任务的线程 */
    private volatile Thread runner;

    /** 等待线程的链表*/
    private volatile WaitNode waiters;

    //状态
    private volatile int state;

其中WaitNode是一个简单的内部类,其定义如下:

该类包含的静态属性都是字段偏移量,通过static代码块初始化,如下:

FutureTask定义了多个表示状态的常量,如下:

    //初始状态
    private static final int NEW          = 0;

    //是一个很短暂的中间状态,表示任务已执行完成,保存完执行结果后就流转成NORMAL或者EXCEPTIONAL
    private static final int COMPLETING   = 1;

    //正常执行完成
    private static final int NORMAL       = 2;

    //异常终止
    private static final int EXCEPTIONAL  = 3;

    //任务被取消了
    private static final int CANCELLED    = 4;

    //是一个很短暂的中间状态,调用interrupt方法后,会将状态流转成INTERRUPTED
    private static final int INTERRUPTING = 5;

    //任务执行已中断
    private static final int INTERRUPTED  = 6;

可能的状态流转如下图:

2、构造方法

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       //初始状态是NEW
    }

public FutureTask(Runnable runnable, V result) {
        //将Runnable适配成Callable
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

//Executors方法
public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

其中RunnableAdapter是Executors的一个静态内部类,其实现如下:

3、get

get方法用于阻塞当前线程直到任务执行完成,如果阻塞的过程中被中断则抛出异常InterruptedException,可以限制阻塞的时间,如果等待超时还是未完成则抛出异常TimeoutException。

//阻塞当前线程等待任务执行完成
public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING) //如果未完成
            s = awaitDone(false, 0L);
        return report(s);
    }

//同上,可以限制等待的时间
public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
             //阻塞当前线程,如果返回值还是未完成说明是等待超时了,则抛出异常
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

//timed为true表示等待指定的时间,否则是无期限等待
//该方法返回退出此方法时的状态
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        //计算等待的终止时间
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) { //如果当前线程被中断了,则从等待链表中移除,并抛出异常
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) { //如果任务已执行完
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) //正在状态流转的过程中,让出当前CPU时间片
                Thread.yield();
            //未开始执行
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                //修改waiters属性,插入到链表头
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            //已插入到链表中
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) { //等待超时,从链表中移除
                    removeWaiter(q);
                    return state;
                }
                //让当前线程休眠
                LockSupport.parkNanos(this, nanos);
            }
            else
                //让当前线程休眠
                LockSupport.park(this);
        }
    }

private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;//将thread置为null
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    //q.thread为null,需要被移除
                    else if (pred != null) {
                        pred.next = s; //将q从链表移除
                        if (pred.thread == null) //如果为null,则从头开始遍历
                            continue retry;
                    }
                    //q.thread为null,pred为null,之前没有有效节点,修改waiters,修改失败重试
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break;
            }
        }
    }

//awaitDone正常返回后调用此方法,此时状态应该是COMPLETING之后了
private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL) //如果是正常结束
            return (V)x;
        if (s >= CANCELLED) //如果被取消了
            throw new CancellationException();
        throw new ExecutionException((Throwable)x); //如果出现异常了
    }

4、run / runAndReset

run方法是有线程池调用的,会执行Callable任务,保存执行的结果,如果出现异常则保存异常对象,并完成状态流转,最后将等待任务完成的阻塞中的线程唤醒。runAndReset和run类似,区别在于runAndReset正常执行完成后不会保存执行的结果,不会改变状态,状态还是NEW,如果是正常执行则返回true,该方法是子类使用的,其调用链如下:

这两方法的实现如下:

//由线程池中的某个线程调用此方法
public void run() {
        //如果不等于NEW,说明其他某个线程正在执行任务
        //如果等于NEW,则cas修改runner属性,修改失败说明其他某个线程也准备执行这个任务
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        //cas成功表示这个任务由当前线程抢占成功
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //执行任务
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    //出现异常
                    result = null;
                    ran = false;
                    setException(ex); //保存异常对象
                }
                if (ran)
                    //执行成功保存结果
                    set(result);
            }
        } finally {
            //如果任务被cancel了,则上述setException和set方法因为状态不是NEW了会直接返回
            runner = null;
            int s = state;
            if (s >= INTERRUPTING) //如果被中断,自旋等待中断完成
                handlePossibleCancellationInterrupt(s);
        }
    }

//跟run方法相比区别就是正常执行完成不会保存结果,不会流转状态
protected boolean runAndReset() {
        //如果state不是NEW或者cas修改runner失败
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    //执行任务,但是不保存结果,状态就不会从NEW流转成NORMAL
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex); //保存异常实例
                }
            }
        } finally {
            runner = null;
            s = state;
            if (s >= INTERRUPTING) //任务被中断了,自旋等待中断完成
                handlePossibleCancellationInterrupt(s);
        }
        //返回任务是否正常完成
        return ran && s == NEW;
    }

//保存异常对象并修改状态
protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            //只有原来的状态是NEW才进入下面的逻辑
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            //任务执行完成,唤醒阻塞的线程
            finishCompletion();
        }
    }

//保存执行结果并修改状态
protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            //只有原来的状态是NEW才进入下面的逻辑
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
             //任务执行完成,唤醒阻塞的线程
            finishCompletion();
        }
    }

private void handlePossibleCancellationInterrupt(int s) {
        //正在中断的过程中
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); //自旋等待中断完成
    }

private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                //cas将waiters置为null
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        //唤醒阻塞的新线程
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    //遍历下一个节点
                    WaitNode next = q.next;
                    if (next == null) //遍历结束,终止循环
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break; //终止外层循环
            }
        }
        //执行完成的回调方法,默认是空实现,子类可改写此方法
        done();

        callable = null;        // to reduce footprint
    }

5、 cancel

cancel方法的参数为true,则会将当前状态由NEW改成INTERRUPTING,如果此任务已经开始执行了,则将正在执行任务的线程标记为已中断,如果该线程响应中断则可能抛出异常,如果不响应中断则继续执行,最后再将状态改成INTERRUPTED;如果方法的参数为false,则将当前状态由NEW改成CANCELLED,如果此任务已经开始执行了则会继续执行。上述两种情形下,状态流转完成后都会唤醒还在阻塞中的等待线程,如果任务已经开始执行并且继续执行,因为状态已经不是NEW了,直接结果不会保存下来。

//如果mayInterruptIfRunning为true,则会将正在执行任务的线程标记为已中断,线程有可能继续执行,也有可能响应中断抛出异常
//如果为false,则标记为CANCELLED,如果任务已经开始执行了则会继续执行
//如果未执行,则标记为CANCELLED或者INTERRUPTING都会让这任务不会被执行了
public boolean cancel(boolean mayInterruptIfRunning) {
        //如果state不是NEW 或者cas修改失败,则返回false
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt(); //将正在执行任务的线程标记为已中断
                } finally {
                    //修改状态为已中断
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
             //唤醒等待的线程
            finishCompletion();
        }
        return true;
    }

三、ExecutorCompletionService

1、定义

ExecutorCompletionService是一个帮助获取多个Future执行结果的工具类,其类继承关系如下:

CompletionService包含的方法如下:

后面会讲解各方法的用途,该类包含的属性如下:

    //执行任务的线程池实现
    private final Executor executor;

    //调用其newTaskFor方法
    private final AbstractExecutorService aes;

    //已执行完成的Future阻塞队列
    private final BlockingQueue<Future<V>> completionQueue;

其构造方法实现如下:

public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        //如果executor继承自AbstractExecutorService,则aes为executor,否则为null
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        //没有指定队列,默认使用基于链表的无固定容量的LinkedBlockingQueue
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

2、submit

submit方法将Callable或者Runnable任务包装成一个RunnableFuture,然后提交到线程池中,返回RunnableFuture实例。

public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        //将其包装成RunnableFuture实现类
        RunnableFuture<V> f = newTaskFor(task);
        //提交任务到线程池
        executor.execute(new QueueingFuture(f));
        return f;
    }

public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task); //默认使用FutureTask作为RunnableFuture的实现
        else
            return aes.newTaskFor(task);//如果aes不为null,则使用该类的特定实现
    }

private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }

其中QueueingFuture是一个内部类,继承自FutureTask,其实现如下:

重点改写了done方法的实现,如果任务已经执行完成,则会将该Future实例添加到阻塞队列中。

3、take / poll

这三方法就是从已完成的Future阻塞队列中获取并移除Future实例,如果队列为空,take方法会无期限阻塞阻塞,不带时间参数的poll方法不会阻塞返回null,带时间参数的poll方法会阻塞指定的时间,如果超时则返回null,其实现都是直接调用阻塞队列的方法,如下:

总结

到此这篇关于Java8中AbstractExecutorService与FutureTask源码详解的文章就介绍到这了,更多相关Java8 AbstractExecutorService与FutureTask内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • mybatis源码解读-Java中executor包的语句处理功能

    目录 1.mybatis对多语句类型的支持 2.mybatis的语句处理功能 1.mybatis对多语句类型的支持 在mybatis映射文件中传参数,主要用到#{} 或者 ${}. #{}:表示使用这种符号的变量会以预编译的形式赋值到sql片段中. ${}:表示使用这种符号的变量会以字符串的形式直接插到sql片段中. mybatis中支持三种语句类型,不同语句类型支持的变量符号不同.mybatis的三种类型如下: STATEMENT:这种语句类型中,只会对sql片段进行简单的字符串拼接.只支持使

  • java线程池ThreadPoolExecutor的八种拒绝策略示例详解

    目录 池化设计思想 线程池触发拒绝策略的时机 JDK内置4种线程池拒绝策略 拒绝策略接口定义 AbortPolicy(中止策略) DiscardPolicy(丢弃策略) DiscardOldestPolicy(弃老策略) 第三方实现的拒绝策略 Dubbo 中的线程拒绝策略 Netty 中的线程池拒绝策略 ActiveMQ 中的线程池拒绝策略 PinPoint 中的线程池拒绝策略 谈到 Java 的线程池最熟悉的莫过于 ExecutorService 接口了,jdk1.5 新增的 java.uti

  • Java并发框架:Executor API详解

    介绍 随着当今处理器中可用的核心数量的增加, 随着对实现更高吞吐量的需求的不断增长,多线程 API 变得非常流行. Java 提供了自己的多线程框架,称为 Executor 框架. 1. Executor 框架是什么? Executor 框架包含一组用于有效管理工作线程的组件.Executor API 通过 Executors 将任务的执行与要执行的实际任务解耦. 这是 生产者-消费者 模式的一种实现. java.util.concurrent.Executors 提供了用于创建工作线程的线程池

  • mybatis源码解读之executor包语句处理功能

    1.mybatis对多语句类型的支持 在mybatis映射文件中传参数,主要用到#{} 或者 ${}. #{}:表示使用这种符号的变量会以预编译的形式赋值到sql片段中. ${}:表示使用这种符号的变量会以字符串的形式直接插到sql片段中. mybatis中支持三种语句类型,不同语句类型支持的变量符号不同.mybatis的三种类型如下: STATEMENT:这种语句类型中,只会对sql片段进行简单的字符串拼接.只支持使用${}. PREPARED:这种语句中会先对sql片段进行字符串拼接,然后再

  • mybatis源码解读之executor包懒加载功能 

    ProxyFactory是创建代理类的工厂接口,其中的setProperties方法用来对工厂进行属性设置,但是mybatis内置的两个实现类都没有实现该接口,所以不支持属性设置.createProxy方法用来创建一个代理对象 public interface ProxyFactory {   // 设置工厂属性   default void setProperties(Properties properties) {   }   // 创建代理对象   Object createProxy(O

  • java executor包参数处理功能 

    sql语句中的参数赋值是有由executor包中的parameter子包完成的. parameter子包其实只有一个parameterHandler接口,它定义了2个方法: public interface ParameterHandler {   Object getParameterObject();   void setParameters(PreparedStatement ps)       throws SQLException; } ParameterHandler接口有一个默认的

  • executor包执行器功能

    Executor接口基于以下方法可以完成增,删,改查以及事务处理等操作.事实上,mybatis中的所有数据库操作是通过调用这些方法实现的. public interface Executor {     ResultHandler NO_RESULT_HANDLER = null;     // 数据更新操作,其中数据的增加.删除.更新均可由该方法实现   int update(MappedStatement ms, Object parameter) throws SQLException;

  • Java8中AbstractExecutorService与FutureTask源码详解

    目录 前言 一.AbstractExecutorService 1.定义 2.submit 3.invokeAll 4.invokeAny 二.FutureTask 1.定义 2.构造方法 3.get 4.run/ runAndReset 5. cancel 三.ExecutorCompletionService 1.定义 2.submit 3.take/ poll 总结 前言 本篇博客重点讲解ThreadPoolExecutor的三个基础设施类AbstractExecutorService.F

  • YOLOv5中SPP/SPPF结构源码详析(内含注释分析)

    目录 一.SPP的应用的背景 二.SPP结构分析 三.SPPF结构分析 四.YOLOv5中SPP/SPPF结构源码解析(内含注释分析) 总结 一.SPP的应用的背景 在卷积神经网络中我们经常看到固定输入的设计,但是如果我们输入的不能是固定尺寸的该怎么办呢? 通常来说,我们有以下几种方法: (1)对输入进行resize操作,让他们统统变成你设计的层的输入规格那样.但是这样过于暴力直接,可能会丢失很多信息或者多出很多不该有的信息(图片变形等),影响最终的结果. (2)替换网络中的全连接层,对最后的卷

  • Android实现屏幕锁定源码详解

    最近有朋友问屏幕锁定的问题,自己也在学习,网上找了下也没太详细的例子,看的资料书上也没有有关屏幕锁定程序的介绍,下个小决心,自己照着官方文档学习下,现在做好了,废话不多说,先发下截图,看下效果,需要注意的地方会加注释,有问题的朋友可以直接留言,我们共同学习交流,共同提高进步!直接看效果图: 一:未设置密码时进入系统设置的效果图如下: 二:设置密码方式预览: 三:密码解密效果图 四:九宫格解密时的效果图 下面来简单的看下源码吧,此处讲下,这个小DEMO也是临时学习下的,有讲的不明白的地方请朋友直接

  • Spring AOP底层源码详解

    ProxyFactory的工作原理 ProxyFactory是一个代理对象生产工厂,在生成代理对象之前需要对代理工厂进行配置.ProxyFactory在生成代理对象之前需要决定到底是使用JDK动态代理还是CGLIB技术. // config就是ProxyFactory对象 // optimize为true,或proxyTargetClass为true,或用户没有给ProxyFactory对象添加interface if (config.isOptimize() || config.isProxy

  • Java并发编程之ConcurrentLinkedQueue源码详解

    一.ConcurrentLinkedQueue介绍 并编程中,一般需要用到安全的队列,如果要自己实现安全队列,可以使用2种方式: 方式1:加锁,这种实现方式就是我们常说的阻塞队列. 方式2:使用循环CAS算法实现,这种方式实现队列称之为非阻塞队列. 从点到面, 下面我们来看下非阻塞队列经典实现类:ConcurrentLinkedQueue (JDK1.8版) ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全的队列.当我们添加一个元素的时候,它会添加到队列的尾部,当我们

  • python目标检测SSD算法预测部分源码详解

    目录 学习前言 什么是SSD算法 ssd_vgg_300主体的源码 学习前言 ……学习了很多有关目标检测的概念呀,咕噜咕噜,可是要怎么才能进行预测呢,我看了好久的SSD源码,将其中的预测部分提取了出来,训练部分我还没看懂 什么是SSD算法 SSD是一种非常优秀的one-stage方法,one-stage算法就是目标检测和分类是同时完成的,其主要思路是均匀地在图片的不同位置进行密集抽样,抽样时可以采用不同尺度和长宽比,然后利用CNN提取特征后直接进行分类与回归,整个过程只需要一步,所以其优势是速度

  • python目标检测SSD算法训练部分源码详解

    目录 学习前言 讲解构架 模型训练的流程 1.设置参数 2.读取数据集 3.建立ssd网络. 4.预处理数据集 5.框的编码 6.计算loss值 7.训练模型并保存 开始训练 学习前言 ……又看了很久的SSD算法,今天讲解一下训练部分的代码.预测部分的代码可以参照https://blog.csdn.net/weixin_44791964/article/details/102496765 讲解构架 本次教程的讲解主要是对训练部分的代码进行讲解,该部分讲解主要是对训练函数的执行过程与执行思路进行详

  • Django Rest Framework实现身份认证源码详解

    目录 一.Django框架 二.身份认证的两种实现方式: 三.身份认证源码解析流程 一.Django框架 Django确实是一个很强大,用起来很爽的一个框架,在Rest Framework中已经将身份认证全都封装好了,用的时候直接导入authentication.py这个模块就好了.这个模块中5个认证类.但是我们在开发中很少用自带的认证类,而是根据项目实际需要去自己实现认证类.下面是内置的认证类 BaseAuthentication(object):所有的认证相关的类都继承自这个类,我们写的认证

  • Android线程间通信Handler源码详解

    目录 前言 01. 用法 02.源码 03.结语 前言 在[Android]线程间通信 - Handler之使用篇主要讲了 Handler 的创建,发送消息,处理消息 三个步骤.那么接下来,我们也按照这三个步骤,从源码中去探析一下它们具体是如何实现的.本篇是关于创建源码的分析. 01. 用法 先回顾一下,在主线程和非主线程是如何创建 Handler 的. //主线程 private val mHandler: Handler = object : Handler(Looper.getMainLo

  • Android开发数据结构算法ArrayList源码详解

    目录 简介 ArrayList源码讲解 初始化 扩容 增加元素 一个元素 一堆元素 删除元素 一个元素 一堆元素 修改元素 查询元素 总结 ArrayList优点 ArrayList的缺点 简介 ArrayList是List接口的一个实现类,它是一个集合容器,我们通常会通过指定泛型来存储同一类数据,ArrayList默认容器大小为10,自身可以自动扩容,当容量不足时,扩大为原来的1.5倍,和上篇文章的Vector的最大区别应该就是线程安全了,ArrayList不能保证线程安全,但我们也可以通过其

随机推荐