支持生产阻塞的Java线程池

通常来说,生产任务的速度要大于消费的速度。一个细节问题是,队列长度,以及如何匹配生产和消费的速度。

一个典型的生产者-消费者模型如下:








 

在并发环境下利用J.U.C提供的Queue实现可以很方便地保证生产和消费过程中的线程安全。这里需要注意的是,Queue必须设置初始容量,防止生产者生产过快导致队列长度暴涨,最终触发OutOfMemory。

对于一般的生产快于消费的情况。当队列已满时,我们并不希望有任何任务被忽略或得不到执行,此时生产者可以等待片刻再提交任务,更好的做法是,把生产者阻塞在提交任务的方法上,待队列未满时继续提交任务,这样就没有浪费的空转时间了。阻塞这一点也很容易,BlockingQueue就是为此打造的,ArrayBlockingQueue和LinkedBlockingQueue在构造时都可以提供容量做限制,其中LinkedBlockingQueue是在实际操作队列时在每次拿到锁以后判断容量。

更进一步,当队列为空时,消费者拿不到任务,可以等一会儿再拿,更好的做法是,用BlockingQueue的take方法,阻塞等待,当有任务时便可以立即获得执行,建议调用take的带超时参数的重载方法,超时后线程退出。这样当生产者事实上已经停止生产时,不至于让消费者无限等待。

于是一个高效的支持阻塞的生产消费模型就实现了。

等一下,既然J.U.C已经帮我们实现了线程池,为什么还要采用这一套东西?直接用ExecutorService不是更方便?

我们来看一下ThreadPoolExecutor的基本结构:








 

可以看到,在ThreadPoolExecutor中,BlockingQueue和Consumer部分已经帮我们实现好了,并且直接采用线程池的实现还有很多优势,例如线程数的动态调整等。

但问题在于,即便你在构造ThreadPoolExecutor时手动指定了一个BlockingQueue作为队列实现,事实上当队列满时,execute方法并不会阻塞,原因在于ThreadPoolExecutor调用的是BlockingQueue非阻塞的offer方法:

代码如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

这时候就需要做一些事情来达成一个结果:当生产者提交任务,而队列已满时,能够让生产者阻塞住,等待任务被消费。

关键在于,在并发环境下,队列满不能由生产者去判断,不能调用ThreadPoolExecutor.getQueue().size()来判断队列是否满。

线程池的实现中,当队列满时会调用构造时传入的RejectedExecutionHandler去拒绝任务的处理。默认的实现是AbortPolicy,直接抛出一个RejectedExecutionException。

几种拒绝策略在这里就不赘述了,这里和我们的需求比较接近的是CallerRunsPolicy,这种策略会在队列满时,让提交任务的线程去执行任务,相当于让生产者临时去干了消费者干的活儿,这样生产者虽然没有被阻塞,但提交任务也会被暂停。

代码如下:

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a <tt>CallerRunsPolicy</tt>.
     */
    public CallerRunsPolicy() { }

/**
     * Executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

但这种策略也有隐患,当生产者较少时,生产者消费任务的时间里,消费者可能已经把任务都消费完了,队列处于空状态,当生产者执行完任务后才能再继续生产任务,这个过程中可能导致消费者线程的饥饿。

参考类似的思路,最简单的做法,我们可以直接定义一个RejectedExecutionHandler,当队列满时改为调用BlockingQueue.put来实现生产者的阻塞:

代码如下:

new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                if (!executor.isShutdown()) {
                        try {
                                executor.getQueue().put(r);
                        } catch (InterruptedException e) {
                                // should not be interrupted
                        }
                }
        }
};

这样,我们就无需再关心Queue和Consumer的逻辑,只要把精力集中在生产者和消费者线程的实现逻辑上,只管往线程池提交任务就行了。

相比最初的设计,这种方式的代码量能减少不少,而且能避免并发环境的很多问题。当然,你也可以采用另外的手段,例如在提交时采用信号量做入口限制等,但是如果仅仅是要让生产者阻塞,那就显得复杂了。

(0)

相关推荐

  • Java线程池使用与原理详解

    线程池是什么? 我们可以利用java很容易创建一个新线程,同时操作系统创建一个线程也是一笔不小的开销.所以基于线程的复用,就提出了线程池的概念,我们使用线程池创建出若干个线程,执行完一个任务后,该线程会存在一段时间(用户可以设定空闲线程的存活时间,后面会介绍),等到新任务来的时候就直接复用这个空闲线程,这样就省去了创建.销毁线程损耗.当然空闲线程也会是一种资源的浪费(所有才有空闲线程存活时间的限制),但总比频繁的创建销毁线程好太多. 下面是我的测试代码 /* * @TODO 线程池测试 */ @

  • 四种Java线程池用法解析

    本文为大家分析四种Java线程池用法,供大家参考,具体内容如下 1.new Thread的弊端 执行一个异步任务你还只是如下new Thread吗? new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } } ).start(); 那你就out太多了,new Thread的弊端如下: a. 每次new Thread新建对象性能差. b. 线程缺乏统一管理,可能无限

  • java中通用的线程池实例代码

    复制代码 代码如下: package com.smart.frame.task.autoTask; import java.util.Collection;import java.util.Vector; /** * 任务分发器 */public class TaskManage extends Thread{    protected Vector<Runnable> tasks = new Vector<Runnable>();    protected boolean run

  • Java 线程池ExecutorService详解及实例代码

    Java 线程池ExecutorService 1.线程池 1.1什么情况下使用线程池 单个任务处理的时间比较短. 将需处理的任务的数量大. 1.2使用线程池的好处 减少在创建和销毁线程上所花的时间以及系统资源的开销. 如果不使用线程池,有可能造成系统创建大量线程而导致消耗系统内存以及"过度切换"; 2.ExecutorService和Executors 2.1简介 ExecutorService是一个接口,继承了Executor, public interface ExecutorS

  • 详谈Java几种线程池类型介绍及使用方法

    一.线程池使用场景 •单个任务处理时间短 •将需处理的任务数量大 二.使用Java线程池好处 1.使用new Thread()创建线程的弊端: •每次通过new Thread()创建对象性能不佳. •线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom. •缺乏更多功能,如定时执行.定期执行.线程中断. 2.使用Java线程池的好处: •重用存在的线程,减少对象创建.消亡的开销,提升性能. •可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞

  • Java 线程池详解及实例代码

    线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收. 所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因. 例如Android中常见到的很多通用组件一般都离不开"池"的概念,如各种图片

  • Java代码构建一个线程池

    在现代的操作系统中,有一个很重要的概念――线程,几乎所有目前流行的操作系统都支持线程,线程来源于操作系统中进程的概念,进程有自己的虚拟地址空间以及正文段.数据段及堆栈,而且各自占有不同的系统资源(例如文件.环境变量等等).与此不同,线程不能单独存在,它依附于进程,只能由进程派生.如果一个进程派生出了两个线程,那这两个线程共享此进程的全局变量和代码段,但每个线程各拥有各自的堆栈,因此它们拥有各自的局部变量,线程在UNIX系统中还被进一步分为用户级线程(由进程自已来管理)和系统级线程(由操作系统的调

  • Java Socket编程实例(三)- TCP服务端线程池

    一.服务端回传服务类: import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.util.logging.Level; import java.util.logging.Logger; public class EchoProtocol implements Runnable { private static f

  • 深入java线程池的使用详解

    在Java 5.0之前启动一个任务是通过调用Thread类的start()方法来实现的,任务的提于交和执行是同时进行的,如果你想对任务的执行进行调度或是控制 同时执行的线程数量就需要额外编写代码来完成.5.0里提供了一个新的任务执行架构使你可以轻松地调度和控制任务的执行,并且可以建立一个类似数据库连接 池的线程池来执行任务.这个架构主要有三个接口和其相应的具体类组成.这三个接口是Executor, ExecutorService.ScheduledExecutorService,让我们先用一个图

  • Java线程池的几种实现方法和区别介绍

    Java线程池的几种实现方法和区别介绍 import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.E

随机推荐