详细分析Java并发集合ArrayBlockingQueue的用法

在上一章中,我们介绍了阻塞队列BlockingQueue,下面我们介绍它的常用实现类ArrayBlockingQueue。

一. 用数组来实现队列

因为队列这种数据结构的特殊要求,所以它天然适合用链表的方式来实现,用两个变量分别记录链表头和链表尾,当删除或插入队列时,只要改变链表头或链表尾就可以了,而且链表使用引用的方式链接的,所以它的容量几乎是无限的。
那么怎么使用数组来实现队列,我们需要四个变量:Object[] array来存储队列中元素,headIndex和tailIndex分别记录队列头和队列尾,count记录队列的个数。

  1. 因为数组的长度是固定,所以当count==array.length时,表示队列已经满了,当count==0的时候,表示队列是空的。
  2. 当添加元素的时候,将array[tailIndex] = e将tailIndex位置设置成新元素,之后将tailIndex++自增,然后将count++自增。但是有两点需要注意,在添加之前必须先判断队列是否已满,不然会出现覆盖已有元素。当tailIndex的值等于数组最后一个位置的时候,需要将tailIndex=0,循环利用数组
  3. 当删除元素的时候,将先记录下array[headIndex] 元素,之后将headIndex++自增,然后将count--自减。但是有两点需要注意要注意,在删除之前,必须先判断队列是否为空,不然可能会删除已删除的元素。

这里用了一个很巧妙的方式,我们知道当向队列中插入一个元素,那么就占用了数组的一个位置,当删除一个元素的时候,那么其实数组的这个位置就空闲出来了,表示这个位置又可以插入新元素了。

所以我们插入新元素前,必须检查队列是否已满,删除元素之前,必须检查队列是否为空。

二. ArrayBlockingQueue中重要成员变量

 /** 储存队列的中元素 */
  final Object[] items;

  /** 队列头的位置 */
  int takeIndex;

  /** 队列尾的位置 */
  int putIndex;

  /** 当前队列拥有的元素个数 */
  int count;

  /** 用来保证多线程操作共享变量的安全问题 */
  final ReentrantLock lock;

  /** 当队列为空时,就会调用notEmpty的wait方法,让当前线程等待 */
  private final Condition notEmpty;

  /** 当队列为满时,就会调用notFull的wait方法,让当前线程等待 */
  private final Condition notFull;

就是多了lock、notEmpty、notFull变量来实现多线程安全和线程等待条件的,它们三个是怎么操作的呢?

2.1 lock的作用

因为ArrayBlockingQueue是在多线程下操作的,所以修改items、takeIndex、putIndex和count这些成员变量时,必须要考虑多线程安全问题,所以这里使用lock独占锁,来保证并发操作的安全。

2.2 notEmpty与notFull的作用

因为阻塞队列必须实现,当队列为空或队列已满的时候,队列的读取或插入操作要等待。所以我们想到了并发框架下的Condition对象,使用它来控制。

在AQS中,我们介绍了这个类的作用:

  1. await系列方法,会释放当前锁,并让当前线程等待。
  2. signal与signalAll方法,会唤醒当前线程。其实它并不是唤醒当前线程,而是将在这个Condition条件上等待的线程,添加到lock锁上的等待线程池中,所以当锁被释放时,会唤醒lock锁上的等待线程池中一个线程。具体在AQS中有源码分析。

三. 添加元素方法

3.1 add(E e)与offer(E e)方法:

  // 调用AbstractQueue父类中的方法。
  public boolean add(E e) {
    // 通过调用offer来时实现
    if (offer(e))
      return true;
    else
      throw new IllegalStateException("Queue full");
  }

  //向队列末尾新添加元素。返回true表示添加成功,false表示添加失败,不会抛出异常
  public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 使用lock来保证,多线程修改成员属性的安全
    lock.lock();
    try {
      // 队列已满,添加元素失败,返回false。
      if (count == items.length)
        return false;
      else {
        // 调用enqueue方法将元素插入队列中
        enqueue(e);
        return true;
      }
    } finally {
      lock.unlock();
    }
  }

add方法是调用offer方法实现的。在offer方法中,必须先判断队列是否已满,如果已满就直接返回false,而不会阻塞当前线程。如果不满就调用enqueue方法将元素插入队列中。

注意:这里使用lock.lock()是保证同一时间只有一个线程修改成员变量,防止出现并发操作问题。虽然它也会阻塞当前线程,但是它并不是条件等待,只是因为锁被其他线程持有,而ArrayBlockingQueue中方法操作时间都不长,这里相当于不阻塞线程。

3.2 put方法

  // 向队列末尾新添加元素,如果队列已满,当前线程就等待。响应中断异常
  public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 使用lock来保证,多线程修改成员属性的安全
    lock.lockInterruptibly();
    try {
      // 队列已满,则调用notFull.await()方法,让当前线程等待,直到队列不是满的
      while (count == items.length)
        notFull.await();
      // 调用enqueue方法将元素插入队列中
      enqueue(e);
    } finally {
      lock.unlock();
    }
  }

与offer方法大体流程一样,只是当队列已满的时候,会调用notFull.await()方法,让当前线程阻塞等待,直到队列被别的线程移除了元素,队列不满的时候,会唤醒这个等待线程。

3.3 offer(E e, long timeout, TimeUnit unit)方法

/**
   * 向队列末尾新添加元素,如果队列中没有可用空间,当前线程就等待,
   * 如果等待时间超过timeout了,那么返回false,表示添加失败
   */
  public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    checkNotNull(e);
    // 计算一共最多等待的时间值nanos
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    // 使用lock来保证,多线程修改成员属性的安全
    lock.lockInterruptibly();
    try {
      // 队列已满
      while (count == items.length) {
        // nanos <= 0表示最大等待时间已到,那么不用再等待了,返回false,表示添加失败。
        if (nanos <= 0)
          return false;
        // 调用notFull.awaitNanos(nanos)方法,超时nanos时间会被自动唤醒,
        // 如果被提前唤醒,那么返回剩余的时间
        nanos = notFull.awaitNanos(nanos);
      }
      // 调用enqueue方法将元素插入队列中
      enqueue(e);
      return true;
    } finally {
      lock.unlock();
    }
  }

与put的方法大体流程一样,只不过是调用notFull.awaitNanos(nanos)方法,让当前线程等待,并设置等待时间。

四. 删除元素方法

4.1 remove()和poll()方法:

  // 调用AbstractQueue父类中的方法。
  public E remove() {
    // 通过调用poll来时实现
    E x = poll();
    if (x != null)
      return x;
    else
      throw new NoSuchElementException();
  }

// 删除队列第一个元素(即队列头),并返回它。如果队列是空的,它不会抛出异常,而是会返回null。
  public E poll() {
    final ReentrantLock lock = this.lock;
    // 使用lock来保证,多线程修改成员属性的安全
    lock.lock();
    try {
      // 如果count == 0,列表为空,就返回null,否则调用dequeue方法,返回列表头元素
      return (count == 0) ? null : dequeue();
    } finally {
      lock.unlock();
    }
  }

remove方法是调用poll()方法实现的。在 poll()方法中,如果列表为空,就返回null,否则调用dequeue方法,返回列表头元素。

4.2 take()方法

  /**
   * 返回并移除队列第一个元素,如果队列是空的,就前线程就等待。响应中断异常
   */
  public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 使用lock来保证,多线程修改成员属性的安全
    lock.lockInterruptibly();
    try {
      // 如果队列为空,就调用notEmpty.await()方法,让当前线程等待。
      // 直到有别的线程向队列中插入元素,那么这个线程会被唤醒。
      while (count == 0)
        notEmpty.await();
      // 调用dequeue方法,返回列表头元素
      return dequeue();
    } finally {
      lock.unlock();
    }
  }

take()方法当队列为空的时候,当前线程必须等待,直到有别的线程向队列中插入新元素,那么这个线程会被唤醒。

4.3 poll(long timeout, TimeUnit unit)方法

  /**
   * 返回并移除队列第一个元素,如果队列是空的,就前线程就等待。
   * 如果等待时间超过timeout了,那么返回false,表示获取元素失败
   */
  public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 计算一共最多等待的时间值nanos
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    // 使用lock来保证,多线程修改成员属性的安全
    lock.lockInterruptibly();
    try {
      // 队列为空
      while (count == 0) {
        // nanos <= 0表示最大等待时间已到,那么不用再等待了,返回null。
        if (nanos <= 0)
          return null;
        // 调用notEmpty.awaitNanos(nanos)方法让档期线程等待,并设置超时时间。
        nanos = notEmpty.awaitNanos(nanos);
      }
      // 调用dequeue方法,返回列表头元素
      return dequeue();
    } finally {
      lock.unlock();
    }
  }

与take()方法流程一样,只不过调用awaitNanos(nanos)方法,让当前线程等待,并设置等待时间。

五. 查看元素的方法

5.1 element()与peek() 方法

  // 调用AbstractQueue父类中的方法。
  public E element() {
    E x = peek();
    if (x != null)
      return x;
    else
      throw new NoSuchElementException();
  }

  // 查看队列头元素
  public E peek() {
    final ReentrantLock lock = this.lock;
    // 使用lock来保证,多线程修改成员属性的安全
    lock.lock();
    try {
      // 返回当前队列头的元素
      return itemAt(takeIndex); // null when queue is empty
    } finally {
      lock.unlock();
    }
  }

六. 其他重要方法

6.1 enqueue和dequeue方法

  // 将元素x插入队列尾
  private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null; //当前putIndex位置元素一定是null
    final Object[] items = this.items;
    items[putIndex] = x;
    // 如果putIndex等于items.length,那么将putIndex重新设置为0
    if (++putIndex == items.length)
      putIndex = 0;
    // 队列数量加一
    count++;
    // 因为插入一个元素,那么当前队列肯定不为空,那么唤醒在notEmpty条件下等待的一个线程
    notEmpty.signal();
  }

  // 删除队列头的元素,返回它
  private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;

    final Object[] items = this.items;
    // 得到当前队列头的元素
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    // 将当前队列头位置设置为null
    items[takeIndex] = null;
    if (++takeIndex == items.length)
      takeIndex = 0;
    // 队列数量减一
    count--;
    if (itrs != null)
      itrs.elementDequeued();
    // 因为删除了一个元素,那么队列肯定不满了,那么唤醒在notFull条件下等待的一个线程
    notFull.signal();
    return x;
  }

这两个方法分别代表,向队列中插入元素和从队列中删除元素。而且它们要唤醒等待的线程。当插入元素后,那么队列一定不为空,那么就要唤醒在notEmpty条件下等待的线程。当删除元素后,那么队列一定不满,那么就要唤醒在notFull条件下等待的线程。

6.2 remove(Object o)方法

  // 删除队列中对象o元素,最多删除一条
  public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    // 使用lock来保证,多线程修改成员属性的安全
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      // 当队列中有值的时候,才进行删除。
      if (count > 0) {
        // 队列尾下一个位置
        final int putIndex = this.putIndex;
        // 队列头的位置
        int i = takeIndex;
        do {
          // 当前位置元素与被删除元素相同
          if (o.equals(items[i])) {
            // 删除i位置元素
            removeAt(i);
            // 返回true
            return true;
          }
          if (++i == items.length)
            i = 0;
          // 当i==putIndex表示遍历完所有元素
        } while (i != putIndex);
      }
      return false;
    } finally {
      lock.unlock();
    }
  }

从队列中删除指定对象o,那么就要遍历队列,删除第一个与对象o相同的元素,如果队列中没有对象o元素,那么返回false删除失败。

这里有两点需要注意:

如何遍历队列,就是从队列头遍历到队列尾。就要靠takeIndex和putIndex两个变量了。

为什么Object[] items = this.items;这句代码没有放到同步锁lock代码块内。items是成员变量,那么多线程操作的时候,不会有并发问题么?

这个是因为items是个引用变量,不是基本数据类型,而且我们对队列的插入和删除操作,都是针对这一个items数组,没有改变数组的引用,所以在lock代码中,items会得到其他线程对它最新的修改。但是如果这里将int putIndex = this.putIndex;方法lock代码块外面,就会产生问题。

removeAt(final int removeIndex)方法

  // 删除队列removeIndex位置的元素
  void removeAt(final int removeIndex) {
    // assert lock.getHoldCount() == 1;
    // assert items[removeIndex] != null;
    // assert removeIndex >= 0 && removeIndex < items.length;
    final Object[] items = this.items;
    // 表示删除元素是列表头,就容易多了,与dequeue方法流程差不多
    if (removeIndex == takeIndex) {
      // 移除removeIndex位置元素
      items[takeIndex] = null;
      // 到了数组末尾,就要转到数组头位置
      if (++takeIndex == items.length)
        takeIndex = 0;
      // 队列数量减一
      count--;
      if (itrs != null)
        itrs.elementDequeued();
    } else {
      // an "interior" remove

      final int putIndex = this.putIndex;
      for (int i = removeIndex;;) {
        int next = i + 1;
        if (next == items.length)
          next = 0;
        // 还没有到队列尾,那么就将后一个位置元素覆盖前一个位置的元素
        if (next != putIndex) {
          items[i] = items[next];
          i = next;
        } else {
          // 将队列尾元素置位null
          items[i] = null;
          // 重新设置putIndex的值
          this.putIndex = i;
          break;
        }
      }
      // 队列数量减一
      count--;
      if (itrs != null)
        itrs.removedAt(removeIndex);
    }
    // 因为删除了一个元素,那么队列肯定不满了,那么唤醒在notFull条件下等待的一个线程
    notFull.signal();
  }

在队列中删除指定位置的元素。需要注意的是删除之后的数组还能保持队列形式,分为两种情况:

  1. 如果删除位置是队列头,那么简单,只需要将队列头的位置元素设置为null,将将队列头位置加一。
  2. 如果删除位置不是队列头,那么麻烦了,这个时候,我们就要将从removeIndex位置后的元素全部左移一位,覆盖前一个元素。最后将原来队列尾的元素置位null。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

您可能感兴趣的文章:

  • Java concurrency集合之ArrayBlockingQueue_动力节点Java学院整理
  • java并发之ArrayBlockingQueue详细介绍
  • java中LinkedBlockingQueue与ArrayBlockingQueue的异同
  • java集合框架 arrayblockingqueue应用分析
(0)

相关推荐

  • java并发之ArrayBlockingQueue详细介绍

    java并发之ArrayBlockingQueue详细介绍 ArrayBlockingQueue是常用的线程集合,在线程池中也常常被当做任务队列来使用.使用频率特别高.他是维护的是一个循环队列(基于数组实现),循环结构在数据结构中比较常见,但是在源码实现中还是比较少见的. 线程安全的实现 线程安全队列,基本是离不开锁的.ArrayBlockingQueue使用的是ReentrantLock,配合两种Condition,实现了集合的线程安全操作.这里稍微说一个好习惯,下面是成员变量的声明. pri

  • java中LinkedBlockingQueue与ArrayBlockingQueue的异同

    相同: 1.LinkedBlockingQueue和ArrayBlockingQueue都实现了BlockingQueue接口: 2.LinkedBlockingQueue和ArrayBlockingQueue都是可阻塞的队列 内部都是使用ReentrantLock和Condition来保证生产和消费的同步: 当队列为空,消费者线程被阻塞:当队列装满,生产者线程被阻塞: 使用Condition的方法来同步和通信:await()和signal() 不同: 1.由上图可以看出,他们的锁机制不同 Li

  • java集合框架 arrayblockingqueue应用分析

    Queue ------------ 1.ArrayDeque, (数组双端队列) 2.PriorityQueue, (优先级队列) 3.ConcurrentLinkedQueue, (基于链表的并发队列) 4.DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口) 5.ArrayBlockingQueue, (基于数组的并发阻塞队列) 6.LinkedBlockingQueue, (基于链表的FIFO阻塞队列) 7.LinkedBlockingDeque, (

  • Java concurrency集合之ArrayBlockingQueue_动力节点Java学院整理

    ArrayBlockingQueue介绍 ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列. 线程安全是指,ArrayBlockingQueue内部通过"互斥锁"保护竞争资源,实现了多线程对竞争资源的互斥访问.而有界,则是指ArrayBlockingQueue对应的数组是有界限的. 阻塞队列,是指多线程访问竞争资源时,当竞争资源已被某线程获取时,其它要获取该资源的线程需要阻塞等待:而且,ArrayBlockingQueue是按 FIFO(先进先出)原则对元素进行

  • 详细分析Java并发集合ArrayBlockingQueue的用法

    在上一章中,我们介绍了阻塞队列BlockingQueue,下面我们介绍它的常用实现类ArrayBlockingQueue. 一. 用数组来实现队列 因为队列这种数据结构的特殊要求,所以它天然适合用链表的方式来实现,用两个变量分别记录链表头和链表尾,当删除或插入队列时,只要改变链表头或链表尾就可以了,而且链表使用引用的方式链接的,所以它的容量几乎是无限的. 那么怎么使用数组来实现队列,我们需要四个变量:Object[] array来存储队列中元素,headIndex和tailIndex分别记录队列

  • 详细分析Java并发集合LinkedBlockingQueue的用法

    在上一章我们讲解了ArrayBlockingQueue,用数组形式实现的阻塞队列. 数组的长度在创建时就必须确定,如果数组长度小了,那么ArrayBlockingQueue队列很容易就被阻塞,如果数组长度大了,就容易浪费内存. 而队列这个数据结构天然适合用链表这个形式,而LinkedBlockingQueue就是使用链表方式实现的阻塞队列. 一. 链表实现 1.1 Node内部类 /** * 链表的节点,同时也是通过它来实现一个单向链表 */ static class Node<E> { E

  • Java 并发编程ArrayBlockingQueue的实现

    一.简介 ArrayBlockingQueue 顾名思义:基于数组的阻塞队列.数组是要指定长度的,所以使用 ArrayBlockingQueue 时必须指定长度,也就是它是一个有界队列.它实现了 BlockingQueue 接口,有着队列.集合以及阻塞队列的所有方法. ArrayBlockingQueue 是线程安全的,内部使用 ReentrantLock 来保证.ArrayBlockingQueue 支持对生产者线程和消费者线程进行公平的调度.当然默认情况下是不保证公平性的,因为公平性通常会降

  • 分析Java并发编程之信号量Semaphore

    目录 一.认识Semaphore 1.1.Semaphore 的使用场景 1.2.Semaphore 使用 1.3.Semaphore 信号量的模型 二.Semaphore 深入理解 2.1.Semaphore 基本属性 2.2.Semaphore 的公平性和非公平性 2.3.其他 Semaphore 方法 一.认识Semaphore 1.1.Semaphore 的使用场景 Semaphore 的使用场景主要用于流量控制,比如数据库连接,同时使用的数据库连接会有数量限制,数据库连接不能超过一定的

  • 详细分析JAVA 线程池

    系统启动一个新线程的成本是比较高的,因为它涉及与操作系统交互.在这种情形下,使用线程池可以很好地提高性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池. 与数据库连接池类似的是,线程池在系统启动时即创建大量空闲的线程,程序将一个 Runnable 对象或 Callable 对象传给线程池,线程池就会启动一个线程来执行它们的 run() 或 call() 方法,当 run() 或 call() 方法执行结束后,该线程并不会死亡,而是再次返回线程池成为空闲状态,等待执行下一个

  • 详细分析Java 泛型的使用

    一.泛型的简介 1.为什么要使用泛型? 一般使用在集合上,比如现在把一个字符串类型的值放入到集合里面,这个时候,这个值放到集合之后,失去本身的类型,只能是object类型.这时,如果想要对这个值进行类型转换,很容易出现类型转换错误,怎么解决这个问题,可以使用泛型来解决. 2.在泛型里面写是一个对象,String 不能写基本的数据类型 比如int,要写基本的数据类型对应的包装类 基本数据类型 对应包装类 基本数据类型 对应包装类 byte Byte short Short int Integer

  • 分析java并发中的wait notify notifyAll

    一.前言 java 面试是否有被问到过,sleep 和 wait 方法的区别,关于这个问题其实不用多说,大多数人都能回答出最主要的两点区别: sleep 是线程的方法, wait / notify / notifyAll 是 Object 类的方法: sleep 不会释放当前线程持有的锁,到时间后程序会继续执行,wait 会释放线程持有的锁并挂起,直到通过 notify 或者 notifyAll 重新获得锁. 另外还有一些参数.异常等区别,不细说了.本文重点记录一下 wait / notify

  • 详细分析Java内存模型

    目录 一.为什么要学习并发编程 二.为什么需要并发编程 三.从物理机中得到启发 四.Java 内存模型 五.原子性 5.1.什么是原子性 5.2.如何保证原子性 六.可见性 6.1.什么是可见性 6.2.如何保证可见性 七.有序性 7.1.什么是有序性 7.2.如何保证有序性 一.为什么要学习并发编程 对于 "我们为什么要学习并发编程?" 这个问题,就好比 "我们为什么要学习政治?" 一样,我们(至少作为学生党是这样)平常很少接触到,然后背了一堆 "正确且

  • 详细分析java 动态代理

    1.动态代理的特点: 字节码随用随创建,随用随加载 2.作用: 不修改源码的基础上对源码进行加强 3.分类: (1)基于接口的动态代理: 涉及到的类:Proxy,由JDK官方提供,使用Proxy类中的newProxyInstance方法创建对象.创建代理对象时要求被代理对象至少实现一个接口,否则无法使用 参数: ClassLoader:类加载器,他是用于加载对象字节码的,和被代理对象使用相同的类加载器,为固定写法 class[]:字节码数组,他是用于让代理对象和被代理对象具有相同的方法,也是固定

  • 详细分析java并发之volatile关键字

    Java面试中经常会涉及关于volatile的问题.本文梳理下volatile关键知识点. volatile字意为"易失性",在Java中用做修饰对象变量.它不是Java特有,在C,C++,C#等编程语言也存在,只是在其它编程语言中使用有所差异,但总体语义一致.比如使用volatile 能阻止编译器对变量的读写优化.简单说,如果一个变量被修饰为volatile,相当于告诉系统说我容易变化,编译器你不要随便优化(重排序,缓存)我. Happens-before 规范上,Java内存模型遵

随机推荐