详解java中的阻塞队列

阻塞队列简介

阻塞队列(BlockingQueue)首先是一个支持先进先出的队列,与普通的队列完全相同;
其次是一个支持阻塞操作的队列,即:

  • 当队列满时,会阻塞执行插入操作的线程,直到队列不满。
  • 当队列为空时,会阻塞执行获取操作的线程,直到队列不为空。

阻塞队列用在多线程的场景下,因此阻塞队列使用了锁机制来保证同步,这里使用的可重入锁;
而对于阻塞与唤醒机制则有与锁绑定的Condition实现

应用场景:生产者消费者模式

java中的阻塞队列

java中的阻塞队列根据容量可以分为有界队列和无界队列:

  • 有界队列:队列中只能存储有限个元素,超出后存放元素线程会被阻塞或者失败。
  • 无界队列:队列中可以存储无限个元素。

java8中提供了7种阻塞队列阻塞队列供开发者使用,如下表:

类名 描述
ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列
LinkedBlockingQueue 由链表结构组成的有界阻塞队列(默认大小Integer.MAX_VALUE)
PriorityBlockingQueue 支持优先级排序的无界阻塞队列
DelayQueue 使用优先级队列实现的延迟无界阻塞队列
SynchronousQueue 不存储元素的阻塞队列,即单个元素的队列
LinkedTransferQueue 由链表结构组成的无界阻塞队列
LinkedBlockingDeque 由链表结构组成的双向阻塞队列

另外还有一个在ScheduledThreadPoolExecutor中实现的DelayedWorkQueue阻塞队列,
但这个阻塞队列开发者不能使用。它们之间的UML类图如下图:

BlockingQueue接口是阻塞队列对外的访问接口,所有的阻塞队列都实现了BlockQueue中的方法

BlockQueue中方法

作为一个队列的核心方法就是入队和出队。由于存在阻塞策略,BlockQueue将出队入队的情况分为了四组,每组提供不同的方法:

  • 抛出异常:当队列满时,如果再往队列中插入元素,则抛出IllegalStateException异常;                                       当队列为空时,从队列中获取元素则抛出NoSuchElementException异常。
  • 返回特定值(布尔值):当队列满时,如果再往队列中插入元素,则返回false;当队列为空时,从队列中获取元素则返回null。
  • 一直阻塞:当队列满时,如果再往队列中插入元素,阻塞当前线程直到队列中至少一个被移除或者响应中断退出;       当队列为空时,则阻塞当前线程直到至少一个元素元素入队或者响应中断退出。
  • 超时退出:当队列满时,如果再往队列中插入元素,阻塞当前线程直到队列中至少一个被移除或者达到指定的等待时间退出或者响应中断退出;                                                                                                                                                   当队列为空时,则阻塞当前线程直到至少一个元素元素入队或者达到指定的等待时间退出或者响应中断退出。

对于每种情况BlockingQueue提供的方法如下表:

方法\处理方式 抛出异常 返回特定值(布尔值)  一直阻塞 超时退出
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time.unit)
检查 element() peek() 不可用 不可用

上述方法一般用于生产者-消费者模型中,是其中的生产和消费操作队列的核心方法。
除了这些方法,BlockingQueue还提供了一些其他的方法如下表:

方法名称 描述
remove(Object o) 从队列中移除一个指定值
size() 获取队列中元素的个数
contains(Object o) 判断队列是否包含指定的元素,但是这个元素在这次判断完可能就会被消费
drainTo(Collection<? super E> c) 将队列中元素放在给定的集合中,并返回添加的元素个数
drainTo(Collection<? super E> c, int maxElements) 将队列中元素取maxElements(不超过队列中元素个数)个放在给定的集合中,并返回添加的元素个数
remainingCapacity() 计算队列中还可以存放的元素个数
toArray() 以objetc数组的形式获取队列中所有的元素
toArray(T[] a) 以给定类型数组的方式获取队列中所有的元素
clear() 清空队列,危险的操作

阻塞队列的实现原理

阻塞队列的实现依靠通知模式实现:当生产者向满了的队列中添加元素时,会阻塞住生产者,
直到消费者消费了一个队列中的元素后会通知消费者队列可用,此时再由生产者向队列中添加元素。反之亦然。

阻塞队列的阻塞唤醒依靠Condition——条件队列来实现。

以ArrayBlockingQueue为例说明:

ArrayBlockingQueue的定义:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  implements BlockingQueue<E>, java.io.Serializable {

 /** The queued items */
 //以数组的结构存储队列的元素,采用的是循环数组
 final Object[] items;

 /** items index for next take, poll, peek or remove */
 //队列的队头索引
 int takeIndex;

 /** items index for next put, offer, or add */
 //队列的队尾索引
 int putIndex;

 /** Number of elements in the queue */
 //队列中元素的个数
 int count;

 /** Main lock guarding all access */
 //对于ArrayBlockingQueue所有的操作都需要加锁,
 final ReentrantLock lock;

 /** Condition for waiting takes */
 //条件队列,当队列为空时阻塞消费者并在生产者生产后唤醒消费者
 private final Condition notEmpty;

 /** Condition for waiting puts */
 //条件队列,当队列满时阻塞生产者,并在消费者消费队列后唤醒生产者
 private final Condition notFull;
}

根据类的定义字段可以看到,有两个Condition条件队列,猜测以下过程

  • 当队列为空,消费者试图消费时应该调用notEmpty.await()方法阻塞,并在生产者生产后调用notEmpty.single()方法
  • 当队列已满,生产者试图放入元素应调用notFull.await()方法阻塞,并在消费者消费队列后调用notFull.single()方法向队

向队列中添加元素put()方法的添加过程。

 /**
 * 向队列中添加元素
 * 当队列已满时需要阻塞当前线程
 * 放入元素后唤醒因队列为空阻塞的消费者
 */
 public void put(E e) throws InterruptedException {
  checkNotNull(e);
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
   //当队列已满时需要notFull.await()阻塞当前线程
   //offer(e,time,unit)方法就是阻塞的时候加了超时设定
   while (count == items.length)
    notFull.await();
   //放入元素的过程
   enqueue(e);
  } finally {
   lock.unlock();
  }
 }

 /**enqueue实际添加元素的方法*/
 private void enqueue(E x) {
  // assert lock.getHoldCount() == 1;
  // assert items[putIndex] == null;
  final Object[] items = this.items;
  items[putIndex] = x;
  if (++putIndex == items.length)
   putIndex = 0;
  count++;
  //如果条件队列中存在等待的线程
  //唤醒
  notEmpty.signal();
 }

从队列中获取元素take()方法的获取过程。

 /**
 * 从队列中获取元素
 * 当队列已空时阻塞当前线程
 * 从队列中消费元素后唤醒等待的生产线程
 */
 public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
   //队列为空需要阻塞当前线程
   while (count == 0)
    notEmpty.await();
   //获取元素的过程
   return dequeue();
  } finally {
   lock.unlock();
  }
 }

 /**dequeue实际消费元素的方法*/
 private E dequeue() {
  // assert lock.getHoldCount() == 1;
  // assert items[takeIndex] != null;
  final Object[] items = this.items;
  @SuppressWarnings("unchecked")
  E x = (E) items[takeIndex];
  items[takeIndex] = null;
  if (++takeIndex == items.length)
   takeIndex = 0;
  count--;
  if (itrs != null)
   itrs.elementDequeued();
  //消费元素后从唤醒阻塞的生产者线程
  notFull.signal();
  return x;
 }

总结

阻塞队列提供了不同于普通队列的增加、删除元素的方法,核心在与队列满时阻塞生产者和队列空时阻塞消费者。
这一阻塞过程依靠与锁绑定的Condition对象实现。Condition接口的实现在AQS中实现,具体的实现类是
ConditionObject

以上就是详解java中的阻塞队列的详细内容,更多关于java 阻塞队列的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java wait和notifyAll实现简单的阻塞队列

    wait,会使调用的线程进入等待状态,会释放所持有的对象锁(调用的时候也必须先获取到锁,否则会抛出异常 IllegalMonitorStateException) notifyAll.notify,会去唤醒应当前对象而等待的线程,(调用的时候也必须先获取到锁,否则会抛出异常 IllegalMonitorStateException) 顺便也记录一下join方法,调用join方法,会使当前线程进入等待,如果没有设置等待时间,就会等待另一个线程执行完成才返回(ps:调用join方法并不一定立刻执行另

  • Java源码解析阻塞队列ArrayBlockingQueue介绍

    Java的阻塞队列,在实现时,使用到了lock和condition,下面是对其主要方法的介绍. 首先看一下,阻塞队列中使用到的锁. /** Main lock guarding all access **/ final ReentrantLock lock;​ /** Condition for waiting takes **/ private final Condition notEmpty;​ /** Condition for waiting puts **/ private final

  • Java源码解析阻塞队列ArrayBlockingQueue功能简介

    本文基于jdk1.8进行分析. 阻塞队列是java开发时常用的一个数据结构.首先看一下阻塞队列的作用是什么.阻塞队列的作用,从源码中类的注释中来了解,是最清晰准确的. ArrayBlockingQueue是一个用数组实现的有界阻塞队列.提供FIFO的功能.队列头上的元素是在队列中呆了最长时间的元素,队列尾上的元素是在队列中呆了时间最短的元素.新元素会插入在队列尾部,从队列获取元素时会从队列头上获取. 这是一个传统的有界队列,在这个有界队列里,一个固定大小的数组用来保存生产者产生的元素和消费者获取

  • Java源码解析阻塞队列ArrayBlockingQueue常用方法

    本文基于jdk1.8进行分析 ArrayBlockingQueue的功能简介参考https://www.jb51.net/article/154211.htm. 首先看一下ArrayBlockingQueue的成员变量.如下图.最主要的成员变量是items,它是一个Object类型的数组用于保存阻塞队列中的元素.其次是takeIndex,putIndex,count,分别表示了从队列获取元素的位置,往队列里放元素的位置和队列中元素的个数.然后是lock,notEmpty和notFull三个和锁相

  • Java中使用阻塞队列控制线程集实例

    队列以一种先进先出的方式管理数据.如果你试图向一个已经满了的阻塞队列中添加一个元素,或是从一个空的阻塞队列中移除一个元素,将导致线程阻塞.在多线程进行合作时,阻塞队列是很有用的工具.工作者线程可以定期的把中间结果存到阻塞队列中.而其他工作者线程把中间结果取出并在将来修改它们.队列会自动平衡负载.如果第一个线程集运行的比第二个慢,则第二个线程集在等待结果时就会阻塞.如果第一个线程集运行的快,那么它将等待第二个线程集赶上来. 下面的程序展示了如何使用阻塞队列来控制线程集.程序在一个目录及它的所有子目

  • 详解Java阻塞队列(BlockingQueue)的实现原理

    阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满:从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空.并发包下很多高级同步类的实现都是基于BlockingQueue实现的. BlockingQueue 的操作方法 BlockingQueue 具有 4 组不同的方法用于插入.移除以及对队列中的元素进行检查.如果

  • 剖析Java中阻塞队列的实现原理及应用场景

    我们平时使用的一些常见队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦.但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素.当队列中有元素后,被阻塞的线程会自动

  • Java阻塞队列四组API介绍(小结)

    通过前面几篇文章的学习,我们已经知道了Java中的队列分为阻塞队列和非阻塞队列以及常用的七个阻塞队列.如下图: 本文来源:凯哥Java(kaigejava)讲解Java并发系列之阻塞队列教程.系列文章,欢迎大家从第一篇文章开始看起. 在查看以上七个队列的API的时候,我们可以很明显的看到以下四组API: add()/remove()/remove offer()/poll()/peek() put/take() offer(e,time,unit)/poll(time,unit). 分别对应的是

  • Java使用阻塞队列控制线程通信的方法实例详解

    本文实例讲述了Java使用阻塞队列控制线程通信的方法.分享给大家供大家参考,具体如下: 一 点睛 阻塞队列主要用在生产者/消费者的场景,下面这幅图展示了一个线程生产.一个线程消费的场景: 负责生产的线程不断的制造新对象并插入到阻塞队列中,直到达到这个队列的上限值.队列达到上限值之后生产线程将会被阻塞,直到消费的线程对这个队列进行消费.同理,负责消费的线程不断的从队列中消费对象,直到这个队列为空,当队列为空时,消费线程将会被阻塞,除非队列中有新的对象被插入. BlockingQueue的核心方法:

  • java阻塞队列实现原理及实例解析

    这篇文章主要介绍了java阻塞队列实现原理及实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 阻塞队列与普通队列的不同在于.当队列是空的时候,从队列中获取元素的操作将会被阻塞,或者当队列满时,往队列里面添加元素将会被阻塞.试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素.同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完

  • 深入理解Java线程编程中的阻塞队列容器

    1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 阻塞队列提供了四种处理方法: 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Q

随机推荐