Java 并发编程ArrayBlockingQueue的实现

一、简介

ArrayBlockingQueue 顾名思义:基于数组的阻塞队列。数组是要指定长度的,所以使用 ArrayBlockingQueue 时必须指定长度,也就是它是一个有界队列。它实现了 BlockingQueue 接口,有着队列、集合以及阻塞队列的所有方法。

ArrayBlockingQueue 是线程安全的,内部使用 ReentrantLock 来保证。ArrayBlockingQueue 支持对生产者线程和消费者线程进行公平的调度。当然默认情况下是不保证公平性的,因为公平性通常会降低吞吐量,但是可以减少可变性和避免线程饥饿问题。

二、数据结构

通常,队列的实现方式有数组和链表两种方式。对于数组这种实现方式来说,我们可以通过维护一个队尾指针,使得在入队的时候可以在 O(1)O(1) 的时间内完成;但是对于出队操作,在删除队头元素之后,必须将数组中的所有元素都往前移动一个位置,这个操作的复杂度达到了 O(n)O(n),效果并不是很好。如下图所示:

为了解决这个问题,我们可以使用另外一种逻辑结构来处理数组中各个位置之间的关系。假设现在我们有一个数组 A[1…n],我们可以把它想象成一个环型结构,即 A[n] 之后是 A[1],相信了解过一致性 Hash 算法的童鞋应该很容易能够理解。

如下图所示:我们可以使用两个指针,分别维护队头和队尾两个位置,使入队和出队操作都可以在 O(1O(1 )的时间内完成。当然,这个环形结构只是逻辑上的结构,实际的物理结构还是一个普通的数组。

讲完 ArrayBlockingQueue 的数据结构,接下来我们从源码层面看看它是如何实现阻塞的。

三、源码分析

3.1 属性

// 队列的底层结构
final Object[] items;
// 队头指针
int takeIndex;
// 队尾指针
int putIndex;
// 队列中的元素个数
int count;

final ReentrantLock lock;

// 并发时的两种状态
private final Condition notEmpty;
private final Condition notFull;

items 是一个数组,用来存放入队的数据;count 表示队列中元素的个数;takeIndex 和 putIndex 分别代表队头和队尾指针。

3.2 构造方法

public ArrayBlockingQueue(int capacity) {
  this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  lock = new ReentrantLock(fair);
  notEmpty = lock.newCondition();
  notFull = lock.newCondition();
}

public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
  this(capacity, fair);

  final ReentrantLock lock = this.lock;
  lock.lock(); // Lock only for visibility, not mutual exclusion
  try {
    int i = 0;
    try {
      for (E e : c) {
        checkNotNull(e);
        items[i++] = e;
      }
    } catch (ArrayIndexOutOfBoundsException ex) {
      throw new IllegalArgumentException();
    }
    count = i;
    putIndex = (i == capacity) ? 0 : i;
  } finally {
    lock.unlock();
  }
}

第一个构造函数只需要指定队列大小,默认为非公平锁;第二个构造函数可以手动指定公平性和队列大小;第三个构造函数里面使用了 ReentrantLock 来加锁,然后把传入的集合元素按顺序一个个放入 items 中。这里加锁目的不是使用它的互斥性,而是让 items 中的元素对其他线程可见(参考 AQS 里的 state 的 volatile 可见性)。

3.3 方法

3.3.1 入队

ArrayBlockingQueue 提供了多种入队操作的实现来满足不同情况下的需求,入队操作有如下几种:

  • boolean add(E e)
  • void put(E e)
  • boolean offer(E e)
  • boolean offer(E e, long timeout, TimeUnit unit)

(1)add(E e)

public boolean add(E e) {
  return super.add(e);
}

//super.add(e)
public boolean add(E e) {
  if (offer(e))
    return true;
  else
    throw new IllegalStateException("Queue full");
}

可以看到 add 方法调用的是父类,也就是 AbstractQueue 的 add 方法,它实际上调用的就是 offer 方法。

(2)offer(E e)

我们接着上面的 add 方法来看 offer 方法:

public boolean offer(E e) {
  checkNotNull(e);
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    if (count == items.length)
      return false;
    else {
      enqueue(e);
      return true;
    }
  } finally {
    lock.unlock();
  }
}

offer 方法在队列满了的时候返回 false,否则调用 enqueue 方法插入元素,并返回 true。

private void enqueue(E x) {
  final Object[] items = this.items;
  items[putIndex] = x;
  // 圆环的index操作
  if (++putIndex == items.length)
    putIndex = 0;
  count++;
  notEmpty.signal();
}

enqueue 方法首先把元素放在 items 的 putIndex 位置,接着判断在 putIndex+1 等于队列的长度时把 putIndex 设置为0,也就是上面提到的圆环的 index 操作。最后唤醒等待获取元素的线程。

(3)offer(E e, long timeout, TimeUnit unit)

该方法在 offer(E e) 的基础上增加了超时的概念。

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {

  checkNotNull(e);
  // 把超时时间转换成纳秒
  long nanos = unit.toNanos(timeout);
  final ReentrantLock lock = this.lock;
  // 获取一个可中断的互斥锁
  lock.lockInterruptibly();
  try {
    // while循环的目的是防止在中断后没有到达传入的timeout时间,继续重试
    while (count == items.length) {
      if (nanos <= 0)
        return false;
      // 等待nanos纳秒,返回剩余的等待时间(可被中断)
      nanos = notFull.awaitNanos(nanos);
    }
    enqueue(e);
    return true;
  } finally {
    lock.unlock();
  }
}

利用了 Condition 的 awaitNanos 方法,等待指定时间,因为该方法可中断,所以这里利用 while 循环来处理中断后还有剩余时间的问题,等待时间到了以后调用 enqueue 方法放入队列。

(4)put(E e)

public void put(E e) throws InterruptedException {
  checkNotNull(e);
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    while (count == items.length)
      notFull.await();
    enqueue(e);
  } finally {
    lock.unlock();
  }
}

put 方法在 count 等于 items 长度时,一直等待,直到被其他线程唤醒。唤醒后调用 enqueue 方法放入队列。

3.3.2 出队

入队列的方法说完后,我们来说说出队列的方法。ArrayBlockingQueue 提供了多种出队操作的实现来满足不同情况下的需求,如下:

  • E poll()
  • E poll(long timeout, TimeUnit unit)
  • E take()
  • drainTo(Collection<? super E> c, int maxElements)

(1)poll()

public E poll() {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    return (count == 0) ? null : dequeue();
  } finally {
    lock.unlock();
  }
}

poll 方法是非阻塞方法,如果队列没有元素返回 null,否则调用 dequeue 把队首的元素出队列。

private E dequeue() {
  final Object[] items = this.items;
  @SuppressWarnings("unchecked")
  E x = (E) items[takeIndex];
  items[takeIndex] = null;
  if (++takeIndex == items.length)
    takeIndex = 0;
  count--;
  if (itrs != null)
    itrs.elementDequeued();
  notFull.signal();
  return x;
}

dequeue 会根据 takeIndex 获取到该位置的元素,并把该位置置为 null,接着利用圆环原理,在 takeIndex 到达列表长度时设置为0,最后唤醒等待元素放入队列的线程。

(2)poll(long timeout, TimeUnit unit)

该方法是 poll() 的可配置超时等待方法,和上面的 offer 一样,使用 while 循环配合 Condition 的 awaitNanos 来进行等待,等待时间到后执行 dequeue 获取元素。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  long nanos = unit.toNanos(timeout);
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    while (count == 0) {
      if (nanos <= 0)
        return null;
      nanos = notEmpty.awaitNanos(nanos);
    }
    return dequeue();
  } finally {
    lock.unlock();
  }
}

(3)take()

public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    while (count == 0)
      notEmpty.await();
    return dequeue();
  } finally {
    lock.unlock();
  }
}

取走队列里排在首位的对象,不同于 poll() 方法,若BlockingQueue为空,就阻塞等待直到有新的数据被加入。
(4)drainTo()

public int drainTo(Collection<? super E> c) {
  return drainTo(c, Integer.MAX_VALUE);
}

public int drainTo(Collection<? super E> c, int maxElements) {
  checkNotNull(c);
  if (c == this)
    throw new IllegalArgumentException();
  if (maxElements <= 0)
    return 0;
  final Object[] items = this.items;
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    int n = Math.min(maxElements, count);
    int take = takeIndex;
    int i = 0;
    try {
      while (i < n) {
        @SuppressWarnings("unchecked")
        E x = (E) items[take];
        c.add(x);
        items[take] = null;
        if (++take == items.length)
          take = 0;
        i++;
      }
      return n;
    } finally {
      // Restore invariants even if c.add() threw
      if (i > 0) {
        count -= i;
        takeIndex = take;
        if (itrs != null) {
          if (count == 0)
            itrs.queueIsEmpty();
          else if (i > take)
            itrs.takeIndexWrapped();
        }
        for (; i > 0 && lock.hasWaiters(notFull); i--)
          notFull.signal();
      }
    }
  } finally {
    lock.unlock();
  }
}

drainTo 相比于其他获取方法,能够一次性从队列中获取所有可用的数据对象(还可以指定获取数据的个数)。通过该方法,可以提升获取数据效率,不需要多次分批加锁或释放锁。

3.3.3 获取元素

public E peek() {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    return itemAt(takeIndex); // null when queue is empty
  } finally {
    lock.unlock();
  }
}

final E itemAt(int i) {
  return (E) items[i];
}

这里获取元素时上锁是为了避免脏数据的产生。

3.3.4 删除元素

我们可以想象一下,队列中删除某一个元素时,是不是要遍历整个数据找到该元素,并把该元素后的所有元素往前移一位,也就是说,该方法的时间复杂度为 O(n)O(n)。

public boolean remove(Object o) {
  if (o == null) return false;
  final Object[] items = this.items;
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    if (count > 0) {
      final int putIndex = this.putIndex;
      int i = takeIndex;
       // 从takeIndex一直遍历到putIndex,直到找到和元素o相同的元素,调用removeAt进行删除
      do {
        if (o.equals(items[i])) {
          removeAt(i);
          return true;
        }
        if (++i == items.length)
          i = 0;
      } while (i != putIndex);
    }
    return false;
  } finally {
    lock.unlock();
  }
}

remove 方法比较简单,它从 takeIndex 一直遍历到 putIndex,直到找到和元素 o 相同的元素,调用 removeAt 进行删除。我们重点来看一下 removeAt 方法。

void removeAt(final int removeIndex) {
  final Object[] items = this.items;
  if (removeIndex == takeIndex) {
    // removing front item; just advance
    items[takeIndex] = null;
    if (++takeIndex == items.length)
      takeIndex = 0;
    count--;
    if (itrs != null)
      itrs.elementDequeued();
  } else {
    // an "interior" remove
    // slide over all others up through putIndex.
    final int putIndex = this.putIndex;
    for (int i = removeIndex;;) {
      int next = i + 1;
      if (next == items.length)
        next = 0;
      if (next != putIndex) {
        items[i] = items[next];
        i = next;
      } else {
        items[i] = null;
        this.putIndex = i;
        break;
      }
    }
    count--;
    if (itrs != null)
      itrs.removedAt(removeIndex);
  }
  notFull.signal();
}

removeAt 的处理方式和我想的稍微有一点出入,它内部分为两种情况来考虑:

  • removeIndex == takeIndex
  • removeIndex != takeIndex

也就是我考虑的时候没有考虑边界问题。当 removeIndex == takeIndex 时就不需要后面的元素整体往前移了,而只需要把 takeIndex的指向下一个元素即可(类比圆环);当 removeIndex != takeIndex 时,通过 putIndex 将 removeIndex 后的元素往前移一位。

四、总结

ArrayBlockingQueue 是一个阻塞队列,内部由 ReentrantLock 来实现线程安全,由 Condition 的 await 和 signal 来实现等待唤醒的功能。它的数据结构是数组,准确的说是一个循环数组(可以类比一个圆环),所有的下标在到达最大长度时自动从 0 继续开始。

到此这篇关于Java 并发编程ArrayBlockingQueue的实现的文章就介绍到这了,更多相关Java 并发编程ArrayBlockingQueue内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java源码解析阻塞队列ArrayBlockingQueue常用方法

    本文基于jdk1.8进行分析 ArrayBlockingQueue的功能简介参考https://www.jb51.net/article/154211.htm. 首先看一下ArrayBlockingQueue的成员变量.如下图.最主要的成员变量是items,它是一个Object类型的数组用于保存阻塞队列中的元素.其次是takeIndex,putIndex,count,分别表示了从队列获取元素的位置,往队列里放元素的位置和队列中元素的个数.然后是lock,notEmpty和notFull三个和锁相

  • java并发之ArrayBlockingQueue详细介绍

    java并发之ArrayBlockingQueue详细介绍 ArrayBlockingQueue是常用的线程集合,在线程池中也常常被当做任务队列来使用.使用频率特别高.他是维护的是一个循环队列(基于数组实现),循环结构在数据结构中比较常见,但是在源码实现中还是比较少见的. 线程安全的实现 线程安全队列,基本是离不开锁的.ArrayBlockingQueue使用的是ReentrantLock,配合两种Condition,实现了集合的线程安全操作.这里稍微说一个好习惯,下面是成员变量的声明. pri

  • Java源码解析阻塞队列ArrayBlockingQueue介绍

    Java的阻塞队列,在实现时,使用到了lock和condition,下面是对其主要方法的介绍. 首先看一下,阻塞队列中使用到的锁. /** Main lock guarding all access **/ final ReentrantLock lock;​ /** Condition for waiting takes **/ private final Condition notEmpty;​ /** Condition for waiting puts **/ private final

  • java集合框架 arrayblockingqueue应用分析

    Queue ------------ 1.ArrayDeque, (数组双端队列) 2.PriorityQueue, (优先级队列) 3.ConcurrentLinkedQueue, (基于链表的并发队列) 4.DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口) 5.ArrayBlockingQueue, (基于数组的并发阻塞队列) 6.LinkedBlockingQueue, (基于链表的FIFO阻塞队列) 7.LinkedBlockingDeque, (

  • java中LinkedBlockingQueue与ArrayBlockingQueue的异同

    相同: 1.LinkedBlockingQueue和ArrayBlockingQueue都实现了BlockingQueue接口: 2.LinkedBlockingQueue和ArrayBlockingQueue都是可阻塞的队列 内部都是使用ReentrantLock和Condition来保证生产和消费的同步: 当队列为空,消费者线程被阻塞:当队列装满,生产者线程被阻塞: 使用Condition的方法来同步和通信:await()和signal() 不同: 1.由上图可以看出,他们的锁机制不同 Li

  • Java源码解析阻塞队列ArrayBlockingQueue功能简介

    本文基于jdk1.8进行分析. 阻塞队列是java开发时常用的一个数据结构.首先看一下阻塞队列的作用是什么.阻塞队列的作用,从源码中类的注释中来了解,是最清晰准确的. ArrayBlockingQueue是一个用数组实现的有界阻塞队列.提供FIFO的功能.队列头上的元素是在队列中呆了最长时间的元素,队列尾上的元素是在队列中呆了时间最短的元素.新元素会插入在队列尾部,从队列获取元素时会从队列头上获取. 这是一个传统的有界队列,在这个有界队列里,一个固定大小的数组用来保存生产者产生的元素和消费者获取

  • 详细分析Java并发集合ArrayBlockingQueue的用法

    在上一章中,我们介绍了阻塞队列BlockingQueue,下面我们介绍它的常用实现类ArrayBlockingQueue. 一. 用数组来实现队列 因为队列这种数据结构的特殊要求,所以它天然适合用链表的方式来实现,用两个变量分别记录链表头和链表尾,当删除或插入队列时,只要改变链表头或链表尾就可以了,而且链表使用引用的方式链接的,所以它的容量几乎是无限的. 那么怎么使用数组来实现队列,我们需要四个变量:Object[] array来存储队列中元素,headIndex和tailIndex分别记录队列

  • java ArrayBlockingQueue的方法及缺点分析

    提到ArrayBlockingQueue的方法,想必大家都不陌生,我们在入队和出队的时候,接触了不少这方面的函数.当然ArrayBlockingQueue中的方法也不止于此,今天我们就全部为大家罗列出来,也算是做一个学习方向上的指引.然后就其中的peek方法带来实例介绍,并说明ArrayBlockingQueue使用的不足之处. 1.ArrayBlockingQueue函数列表 // 创建一个带有给定的(固定)容量和默认访问策略的 ArrayBlockingQueue. ArrayBlockin

  • Java 并发编程ArrayBlockingQueue的实现

    一.简介 ArrayBlockingQueue 顾名思义:基于数组的阻塞队列.数组是要指定长度的,所以使用 ArrayBlockingQueue 时必须指定长度,也就是它是一个有界队列.它实现了 BlockingQueue 接口,有着队列.集合以及阻塞队列的所有方法. ArrayBlockingQueue 是线程安全的,内部使用 ReentrantLock 来保证.ArrayBlockingQueue 支持对生产者线程和消费者线程进行公平的调度.当然默认情况下是不保证公平性的,因为公平性通常会降

  • java并发编程工具类JUC之ArrayBlockingQueue

    Java BlockingQueue接口java.util.concurrent.BlockingQueue表示一个可以存取元素,并且线程安全的队列.换句话说,当多线程同时从 JavaBlockingQueue中插入元素.获取元素的时候,不会导致任何并发问题(元素被插入多次.处理多次等问题). 从java BlockingQueue可以引申出一个概念:阻塞队列,是指队列本身可以阻塞线程向队列里面插入元素,或者阻塞线程从队列里面获取元素.比如:当一个线程尝试去从一个空队列里面获取元素的时候,这个线

  • java并发编程工具类JUC之LinkedBlockingQueue链表队列

    java.util.concurrent.LinkedBlockingQueue 是一个基于单向链表的.范围任意的(其实是有界的).FIFO阻塞队列.访问与移除操作是在队头进行,添加操作是在队尾进行,并分别使用不同的锁进行保护,只有在可能涉及多个节点的操作才同时对两个锁进行加锁. 队列是否为空.是否已满仍然是通过元素数量的计数器(count)进行判断的,由于可以同时在队头.队尾并发地进行访问.添加操作,所以这个计数器必须是线程安全的,这里使用了一个原子类 AtomicInteger,这就决定了它

  • java并发编程包JUC线程同步CyclicBarrier语法示例

    目录 1.创建CyclicBarrier障碍 2.在CyclicBarrier障碍处等待 3.CyclicBarrierAction 4.CyclicBarrier例子 在之前的文章中已经为大家介绍了java并发编程的工具:BlockingQueue接口.ArrayBlockingQueue.DelayQueue.LinkedBlockingQueue.PriorityBlockingQueue.SynchronousQueue.BlockingDeque接口.ConcurrentHashMap

  • java并发编程工具类PriorityBlockingQueue优先级队列

    目录 前言 1.PriorityBlockingQueue特性 2.PriorityBlockingQueue应用实例 3.使用Java8Comparator做优先级排序的实例 前言 在之前的文章中已经为大家介绍了java并发编程的工具:BlockingQueue接口.ArrayBlockingQueue.DelayQueue.LinkedBlockingQueue,本文为系列文章第五篇. Java PriorityBlockingQueue队列是BlockingQueue接口的实现类,它根据p

  • Java并发编程Semaphore计数信号量详解

    Semaphore 是一个计数信号量,它的本质是一个共享锁.信号量维护了一个信号量许可集.线程可以通过调用acquire()来获取信号量的许可:当信号量中有可用的许可时,线程能获取该许可:否则线程必须等待,直到有可用的许可为止. 线程可以通过release()来释放它所持有的信号量许可(用完信号量之后必须释放,不然其他线程可能会无法获取信号量). 简单示例: package me.socketthread; import java.util.concurrent.ExecutorService;

  • JAVA并发编程有界缓存的实现详解

    JAVA并发编程有界缓存的实现 1.有界缓存的基类 package cn.xf.cp.ch14; /** * *功能:有界缓存实现基类 *时间:下午2:20:00 *文件:BaseBoundedBuffer.java *@author Administrator * * @param <V> */ public class BaseBoundedBuffer<V> { private final V[] buf; private int tail; private int head

  • Java 并发编程:volatile的使用及其原理解析

    Java并发编程系列[未完]: •Java 并发编程:核心理论 •Java并发编程:Synchronized及其实现原理 •Java并发编程:Synchronized底层优化(轻量级锁.偏向锁) •Java 并发编程:线程间的协作(wait/notify/sleep/yield/join) •Java 并发编程:volatile的使用及其原理 一.volatile的作用 在<Java并发编程:核心理论>一文中,我们已经提到过可见性.有序性及原子性问题,通常情况下我们可以通过Synchroniz

  • Java并发编程之重入锁与读写锁

    重入锁 重入锁,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁.重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁阻塞,该特性的实现需要解决以下两个问题. 1.线程再次获取锁.锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取. 2.锁的最终释放.线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁.锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放

  • Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解

    Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解 在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我们就来学习一下这三个辅助类的用法. 以下是本文目录大纲: 一.CountDownLatch用法 二.CyclicBarrier用法 三.Semaphore用法 若有不正之处请多多谅解,并欢迎批评指正. 一.CountDownLatch

随机推荐