JDK数组阻塞队列源码深入分析总结

目录
  • 前言
  • 阻塞队列的功能
  • 数组阻塞队列设计
    • 数组的循环使用
    • 字段设计
    • 构造函数
    • put函数
    • take函数
    • offer函数
    • add函数
    • poll函数
  • 总结

前言

在前面一篇文章从零开始自己动手写阻塞队列当中我们仔细介绍了阻塞队列提供给我们的功能,以及他的实现原理,并且基于谈到的内容我们自己实现了一个低配版的数组阻塞队列。在这篇文章当中我们将仔细介绍JDK具体是如何实现数组阻塞队列的。

阻塞队列的功能

而在本篇文章所谈到的阻塞队列当中,是在并发的情况下使用的,上面所谈到的是队列是并发不安全的,但是阻塞队列在并发下情况是安全的。阻塞队列的主要的需求如下:

  • 队列基础的功能需要有,往队列当中放数据,从队列当中取数据。
  • 所有的队列操作都要是并发安全的。
  • 当队列满了之后再往队列当中放数据的时候,线程需要被挂起,当队列当中的数据被取出,让队列当中有空间的时候线程需要被唤醒。
  • 当队列空了之后再往队列当中取数据的时候,线程需要被挂起,当有线程往队列当中加入数据的时候被挂起的线程需要被唤醒。
  • 在我们实现的队列当中我们使用数组去存储数据,因此在构造函数当中需要提供数组的初始大小,设置用多大的数组。

上面就是数组阻塞队列给我们提供的最核心的功能,其中将线程挂起和唤醒就是阻塞队列的核心,挂起和唤醒体现了“阻塞”这一核心思想。

数组阻塞队列设计

阅读这部分内容你需要熟悉可重入锁ReentrantLock和条件变量Condition的使用。

数组的循环使用

因为我们是使用数组存储队列当中的数据,从下表为0的位置开始,当我们往队列当中加入一些数据之后,队列的情况可能如下,其中head表示队头,tail表示队尾。

在上图的基础之上我们在进行四次出队操作,结果如下:

在上面的状态下,我们继续加入8个数据,那么布局情况如下:

我们知道上图在加入数据的时候不仅将数组后半部分的空间使用完了,而且可以继续使用前半部分没有使用过的空间,也就是说在队列内部实现了一个循环使用的过程。

字段设计

在JDK当中数组阻塞队列的实现是ArrayBlockingQueue类,在他的内部是使用数组实现的,我们现在来看一下它的主要的字段,为了方便阅读将所有的解释说明都写在的注释当中:

    /** The queued items */
    final Object[] items; // 这个就是具体存储数据的数组

    /** items index for next take, poll, peek or remove */
    int takeIndex; // 因为是队列 因此我们需要知道下一个出队的数据的下标 这个就是表示下一个将要出队的数据的下标

    /** items index for next put, offer, or add */
    int putIndex; // 我们同时也需要下一个入队的数据的下标

    /** Number of elements in the queue */
    int count; // 统计队列当中一共有多少个数据

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock; // 因为阻塞队列是一种可以并发使用的数据结构

    /** Condition for waiting takes */
    private final Condition notEmpty; // 这个条件变量主要用于唤醒被 take 函数阻塞的线程 也就是从队列当中取数据的线程

    /** Condition for waiting puts */
    private final Condition notFull; // 这个条件变量主要用于唤醒被 put 函数阻塞的线程 也就是从队列当中放数据的线程

构造函数

构造函数的主要功能是申请指定大小的内存空间,并且对类的成员变量进行赋值操作。

public ArrayBlockingQueue(int capacity) {
  // capacity 表示用与存储数据的数组的长度
  this(capacity, false);
}
// fair 这个参数主要是用于说明 是否使用公平锁
// 如果为 true 表示使用公平锁 执行效率低 但是各个线程进入临界区的顺序是先来后到的顺序 更加公平
// 如果为 false 表示使用非公平锁 执行效率更高
public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  // 对变量进行赋值操作
  lock = new ReentrantLock(fair);
  notEmpty = lock.newCondition();
  notFull =  lock.newCondition();
}

put函数

这个函数是阻塞队列对核心的函数之一了,首先我们需要了解的是,如果一个线程调用了这个函数往队列当中加入数据,如果此时队列已经满了则线程需要被挂起,如果没有满则需要将数据加入到队列当中,也就是将数据存储到数组当中。注意还有一个很重要的一点是,当我们往队列当中加入一个数据之后需要发一个信号给其他被take函数阻塞的线程,因为这些线程在取数据的时候可能队列当中已经空了,因此需要将这些线程唤醒。

public void put(E e) throws InterruptedException {
  checkNotNull(e); // 保证输入的数据不为 null 代码在下方
  final ReentrantLock lock = this.lock;
  // 进行加锁操作,因为下面是临界区
  lock.lockInterruptibly();
  try {
    while (count == items.length) // 如果队列已经满了 也就是队列当中数据的个数 count == 数组的长度的话 就需要将线程挂起
      notFull.await();
    // 当队列当中有空间的之后将数据加入到队列当中 这个函数在下面仔细分析 代码在下方
    enqueue(e);
  } finally {
    lock.unlock();
  }
}

private static void checkNotNull(Object v) {
  if (v == null)
    throw new NullPointerException();
}

private void enqueue(E x) {
  // assert lock.getHoldCount() == 1;
  // assert items[putIndex] == null;
  // 进入这个函数的线程已经在 put 函数当中加上锁了 因此这里不需要加锁
  final Object[] items = this.items;
  items[putIndex] = x;
  if (++putIndex == items.length) // 因为这个数据是循环使用的 因此可以回到下标为0的位置
    // 因为队列当中的数据可以出队 因此下标为 0 的位置不存在数据可以使用
    putIndex = 0;
  count++;
  // 在这里需要将一个被 take 函数阻塞的线程唤醒 如果调用这个方法的时候没有线程阻塞
  // 那么调用这个方法相当于没有调用 如果有线程阻塞那么将会唤醒一个线程
  notEmpty.signal();
}

注意:这里有一个地方非常容易被忽略,那就是在将线程挂起的时候使用的是while循环而不是if条件语句,代码:

final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
  while (count == items.length)
    notFull.await();
  enqueue(e);
} finally {
  lock.unlock();
}

这是因为,线程被唤醒之后并不会立马执行,因为线程在调用await方法的之后会释放锁,他想再次执行还需要再次获得锁,然后就在他获取锁之前的这段时间里面,可能其他的线程也会从数组当中放数据,因此这个线程执行的时候队列可能还是满的,因此需要再次判断,否则就会覆盖数据,像这种唤醒之后并没有满足线程执行条件的现象叫做虚假唤醒,因此大家在写程序的时候要格外注意,当需要将线程挂起或者唤醒的之后,最好考虑清楚,如果不确定可以使用while替代if,这样的话更加保险。

take函数

这个函数主要是从队列当中取数据,但是当队列为空的时候需要将调用这个方法的线程阻塞。当队列当中有数据的时候,就可以从队列当中取出数据,但是有一点很重要的就是当从队列当中取出数据之后,需要调用signal方法,用于唤醒被 put 函数阻塞的线程,因为从队列当中取出数据了,队列肯定已经不满了,因此可以唤醒被 put 函数阻塞的线程了。

public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  // 因为取数据的代码涉及到数据竞争 也就是说多个线程同时竞争 数组数据items 因此需要用锁保护起来
  lock.lockInterruptibly();
  try {
    // 当 count == 0 说明队列当中没有数据
    while (count == 0)
      notEmpty.await();
    // 当队列当中还有数据的时候可以将数据出队
    return dequeue();
  } finally {
    lock.unlock();
  }
}

private E dequeue() {
  // assert lock.getHoldCount() == 1;
  // assert items[takeIndex] != null;
  final Object[] items = this.items;
  @SuppressWarnings("unchecked")
  // 取出数据
  E x = (E) items[takeIndex];
  items[takeIndex] = null; // 将对应的位置设置为 null 数据就可以被垃圾收集器回收了
  if (++takeIndex == items.length)
    takeIndex = 0;
  count--;
  // 迭代器也需要出队 如果不了
  if (itrs != null)
    itrs.elementDequeued();
  // 调用 signal 函数 将被 put 函数阻塞的线程唤醒 如果调用这个方法的时候没有线程阻塞
  // 那么调用这个方法相当于没有调用 如果有线程阻塞那么将会唤醒一个线程
  notFull.signal();
  return x;
}

同样的道理这里也需要使用while循环去进行阻塞,否则可能存在虚假唤醒,可能队列当中没有数据返回的数据为 null,而且会破坏队列的结构因为会涉及队列的两个端点的值的改变,也就是takeIndex和putIndex的改变。

offer函数

这个函数的作用和put函数一样,只不过当队列满了的时候,这个函数返回false,加入数据成功之后这个函数返回true,下面的代码就比较简单了。

public boolean offer(E e) {
  checkNotNull(e);
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    // 如果队列当中的数据个数和数组的长度相等 说明队列满了 直接返回 false 即可
    if (count == items.length)
      return false;
    else {
      enqueue(e);
      return true;
    }
  } finally {
    lock.unlock();
  }
}

add函数

这个函数和上面两个函数的意义也是一样的,只不过当队列满了之后这个函数会抛出异常。

public boolean add(E e) {
  if (offer(e))
    return true;
  else
    throw new IllegalStateException("Queue full");
}

poll函数

这个函数和take函数的作用差不多,但是这个函数不会阻塞,当队列当中没有数据的时候直接返回null,有数据的话返回数据。

public E poll() {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    return (count == 0) ? null : dequeue();
  } finally {
    lock.unlock();
  }
}

总结

在本篇文章当中主要介绍了JDK内部是如何实现ArrayBlockingQueue的,如果你对锁和队列的使用有一定的了解本篇文章应该还是比较容易理解的。在实现ArrayBlockingQueue当中有以下需要注意的点:

put函数,如果在往队列当中加入数据的时候队列满了,则需要将线程挂起。在队列当中有空间之后,线程被唤醒继续执行,在往队列当中加入了数据之后,需要调用signal方法,唤醒被take函数阻塞的线程。

take函数,如果在往队列当中取出数据的时候队列空了,则需要将线程挂起。在队列当中有数据之后,线程被唤醒继续执行,在从队列当中取出数据之后,需要调用signal方法,唤醒被put函数阻塞的线程。

在调用await函数的时候,需要小心虚假唤醒现象。

到此这篇关于JDK数组阻塞队列源码深入分析总结的文章就介绍到这了,更多相关JDK数组阻塞队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java阻塞队列BlockingQueue详解

    目录 队列的类型 数据结构 阻塞队列 BlockingQueue 常见的阻塞队列 BlockingQueue API ArrayBlockingQueue 源码简解 生产者消费者模式 延迟队列 DelayQueue 队列的类型 无限队列(unbounded queue) 无容量限定,只随存储变化 有限队列(bounded queue) 定义了最大容量 向无限队列添加元素的所有操作都将永远不会阻塞(也是线程安全的),因此它可以增长到非常大的容量. 使用无限阻塞队列 BlockingQueue 设计

  • Java阻塞队列的实现及应用

    目录 1.手写生产者消费者模型 2.手写定时器 总结 1.手写生产者消费者模型 所谓生产者消费者模型,可以用我们生活中的例子来类比:我去一个小摊儿买吃的,老板把已经做好的小吃都放在摆盘上,供我挑选.那么,老板就是生产者:我就是消费者:摆盘就是阻塞队列,用来当做生产与消费的缓冲区.因此,阻塞队列在生产者与消费者模型中起着至关重要的缓冲作用. 此次先演示如何手写阻塞队列(也可以使用Java库中自带的阻塞队列). 手写的阻塞队列只实现最基础的两个功能:入队和出队.之所以叫阻塞队列,是因为当队空或者队满

  • Java 阻塞队列详解及简单使用

     Java 阻塞队列详解 概要: 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全"传输"数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 认识BlockingQueue阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示: 从上图我们可以很清楚看到,通过一个共享的队列,

  • Java中的阻塞队列详细介绍

    Java中的阻塞队列 1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是: 在队列为空时,获取元素的线程会等待队列变为非空. 当队列满时,存储元素的线程会等待队列可用. 阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 2.Java里的阻塞队列 JDK中提供了七个阻塞队列: ArrayBlockingQueue :一个由数组结

  • 利用Java手写阻塞队列的示例代码

    目录 前言 需求分析 阻塞队列实现原理 线程阻塞和唤醒 数组循环使用 代码实现 成员变量定义 构造函数 put函数 offer函数 add函数 take函数 重写toString函数 完整代码 总结 前言 在我们平时编程的时候一个很重要的工具就是容器,在本篇文章当中主要给大家介绍阻塞队列的原理,并且在了解原理之后自己动手实现一个低配版的阻塞队列. 需求分析 在前面的两篇文章ArrayDeque(JDK双端队列)源码深度剖析和深入剖析(JDK)ArrayQueue源码当中我们仔细介绍了队列的原理,

  • 详解java中的阻塞队列

    阻塞队列简介 阻塞队列(BlockingQueue)首先是一个支持先进先出的队列,与普通的队列完全相同: 其次是一个支持阻塞操作的队列,即: 当队列满时,会阻塞执行插入操作的线程,直到队列不满. 当队列为空时,会阻塞执行获取操作的线程,直到队列不为空. 阻塞队列用在多线程的场景下,因此阻塞队列使用了锁机制来保证同步,这里使用的可重入锁: 而对于阻塞与唤醒机制则有与锁绑定的Condition实现 应用场景:生产者消费者模式 java中的阻塞队列 java中的阻塞队列根据容量可以分为有界队列和无界队

  • JDK数组阻塞队列源码深入分析总结

    目录 前言 阻塞队列的功能 数组阻塞队列设计 数组的循环使用 字段设计 构造函数 put函数 take函数 offer函数 add函数 poll函数 总结 前言 在前面一篇文章从零开始自己动手写阻塞队列当中我们仔细介绍了阻塞队列提供给我们的功能,以及他的实现原理,并且基于谈到的内容我们自己实现了一个低配版的数组阻塞队列.在这篇文章当中我们将仔细介绍JDK具体是如何实现数组阻塞队列的. 阻塞队列的功能 而在本篇文章所谈到的阻塞队列当中,是在并发的情况下使用的,上面所谈到的是队列是并发不安全的,但是

  • Java线程池ThreadPoolExecutor源码深入分析

    1.线程池Executors的简单使用 1)创建一个线程的线程池. Executors.newSingleThreadExecutor(); //创建的源码 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new Linke

  • Java HashMap源码深入分析讲解

    1.HashMap是数组+链表(红黑树)的数据结构. 数组用来存放HashMap的Key,链表.红黑树用来存放HashMap的value. 2.HashMap大小的确定: 1) HashMap的初始大小是16,在下面的源码分析中会看到. 2)如果创建时给定大小,HashMap会通过计算得到1.2.4.8.16.32.64....这样的二进制位作为HashMap数组的大小. //如何做到的呢?通过右移和或运算,最终n = xxx11111.n+1 = xx100000,2的n次方,即为数组大小 s

  • React Fiber源码深入分析

    目录 前言 React架构前世今生 React@15及之前 React@16及之后 Fiber Fiber简单理解 Fiber结构 Fiber工作原理 mount update 前言 本次React源码参考版本为17.0.3. React架构前世今生 查阅文档了解到, React@16.x是个分水岭. React@15及之前 在16之前,React架构大致可以分为两层: Reconciler: 主要职责是对比查找更新前后的变化的组件: Renderer: 主要职责是基于变化渲染页面: 但是Rea

  • Spring AOP源码深入分析

    目录 1. 前言 2. 术语 3. 示例 4. @EnableAspectJAutoProxy 5. AbstractAutoProxyCreator 6. 构建Advisor 7. 创建代理对象 8. DynamicAdvisedInterceptor 9. CglibMethodInvocation 10. Advice子类 1. 前言 Spring除了IOC和DI,还有另一个杀手锏功能——Spring AOP.AOP是一种面向切面的编程思想,它的关注点是横向的,不同于OOP的纵向.面向对象

  • 基于ZooKeeper实现队列源码

    实现原理 先进先出队列是最常用的队列,使用Zookeeper实现先进先出队列就是在特定的目录下创建PERSISTENT_EQUENTIAL节点,创建成功时Watcher通知等待的队列,队列删除序列号最小的节点用以消费.此场景下Zookeeper的znode用于消息存储,znode存储的数据就是消息队列中的消息内容,SEQUENTIAL序列号就是消息的编号,按序取出即可.由于创建的节点是持久化的,所以不必担心队列消息的丢失问题. 队列(Queue) 分布式队列是通用的数据结构,为了在 Zookee

  • JavaScript常用数组去重实战源码

    数组去重,一般都是在面试的时候才会碰到,一般是要求手写数组去重方法的代码.如果是被提问到,数组去重的方法有哪些?你能答出其中的10种,面试官很有可能对你刮目相看. 在真实的项目中碰到的数组去重,一般都是后台去处理,很少让前端处理数组去重.虽然日常项目用到的概率比较低,但还是需要了解一下,以防面试的时候可能回被问到. 1.利用对象的属性 使用对象属性不重名的特性. var arr = ['qiang','ming','tao','li','liang','you','qiang','tao'];

  • 浅谈Vuejs中nextTick()异步更新队列源码解析

    vue官网关于此解释说明如下: vue2.0里面的深入响应式原理的异步更新队列 官网说明如下: 只要观察到数据变化,Vue 将开启一个队列,并缓冲在同一事件循环中发生的所有数据改变.如果同一个 watcher 被多次触发,只会一次推入到队列中.这种在缓冲时去除重复数据对于避免不必要的计算和 DOM 操作上非常重要.然后,在下一个的事件循环"tick"中,Vue 刷新队列并执行实际(已去重的)工作.Vue 在内部尝试对异步队列使用原生的 Promise.then 和 MutationOb

  • Netty分布式flush方法刷新buffer队列源码剖析

    flush方法 上一小节学习了writeAndFlush的write方法, 这一小节我们剖析flush方法 通过前面的学习我们知道, flush方法通过事件传递, 最终会传递到HeadContext的flush方法: public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } 这里最终会调用AbstractUnsafe的flush方法 public final void flush() { a

  • Vue源码解析之数组变异的实现

    力有不逮的对象 众所周知,在 Vue 中,直接修改对象属性的值无法触发响应式.当你直接修改了对象属性的值,你会发现,只有数据改了,但是页面内容并没有改变. 这是什么原因? 原因在于: Vue 的响应式系统是基于Object.defineProperty这个方法的,该方法可以监听对象中某个元素的获取或修改,经过了该方法处理的数据,我们称其为响应式数据.但是,该方法有一个很大的缺点,新增属性或者删除属性不会触发监听,举个栗子: var vm = new Vue({ data () { return

随机推荐