Java并发LinkedBlockingQueue源码分析

目录
  • 简介
  • 常量
  • 构造方法
  • put
  • await
  • isOnSyncQueue
  • signal

简介

LinkedBlockingQueue是一个阻塞的有界队列,底层是通过一个个的Node节点形成的链表实现的,链表队列中的头节点是一个空的Node节点,在多线程下操作时会使用ReentrantLock锁来保证数据的安全性,并使用ReentrantLock下的Condition对象来阻塞以及唤醒线程。

常量

/**
 * 链表中的节点类
 */
static class Node<E> {
    //节点中的元素
    E item;
    //下一个节点
    Node<E> next;
    Node(E x) { item = x; }
}
/** 链表队列的容量大小,如果没有指定则使用Integer最大值 */
private final int capacity;
/** 记录链表中的节点的数量的原子类 */
private final AtomicInteger count = new AtomicInteger();
/**链表的头节点
 */
transient Node<E> head;
/**
 * 链表的尾节点
 */
private transient Node<E> last;
/** 从链表队列中获取节点时防止多个线程同时操作所产生数据安全问题时所加的锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** 添加节点到链表队列中防止多个线程同时操作所产生数据安全问题时所加的锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
  • Node:链表队列中的节点,用于存放元素。
  • capacity:链表队列中最多能存放的节点数量,如果在创建LinkedBlockingQueue的时候没有指定.则默认最多存放的节点的数量为Integer的最大值。
  • head:链表队列中的头节点,一般来说头节点都是一个没有元素的空节点。
  • last:链表队列中的尾节点。
  • takeLock:在获取链表队列中的节点的时候所加的锁。
  • putLock:在添加链表队列中的节点的时候所加的锁。
  • Condition:当线程需要进行等待或者唤醒的时候则会调用该对象下的方法。

构造方法

/**
 * 创建默认容量大小的链表队列
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
/**
 * 创建指定容量大小的链表队列
 */
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    //创建一个空节点,并将该节点设置为头尾节点
    last = head = new Node<E>(null);
}
/**
 * 根据指定集合中的元素创建一个默认容量大小的链表队列
 */
public LinkedBlockingQueue(Collection<? extends E> c) {
    //创建默认容量大小的链表队列
    this(Integer.MAX_VALUE);
    //获取添加元素节点的锁
    final ReentrantLock putLock = this.putLock;
    //加锁
    putLock.lock();
    try {
        //链表中节点的数量
        int n = 0;
        //遍历集合中的元素
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            //为元素创建一个节点,并将节点添加到链表的尾部,并设置节点为尾节点
            enqueue(new Node<E>(e));
            //链表中节点的数量自增
            ++n;
        }
        //记录链表中节点的数量
        count.set(n);
    } finally {
        //释放锁
        putLock.unlock();
    }
}

第一个和第三个构造方法中都会调用第二个构造方法,而在第二个构造方法中会设置链表队列中容纳节点的数量以及创建一个空的头节点来填充,再看第三个构造方法中的代码,首先会获取putLock锁,代表当前是一个需要添加节点的线程,再将指定集合中的元素封装成一个Node节点,并依次将封装的节点追加到链表队列中的尾部,并使用AtomicInteger来记录链表队列中节点的数量。

put

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    //为指定元素创建节点
    Node<E> node = new Node<E>(e);
    //获取添加元素节点的锁
    final ReentrantLock putLock = this.putLock;
    //获取记录链表节点数量的原子类
    final AtomicInteger count = this.count;
    //加锁,如果加锁的线程被中断了则抛出异常
    putLock.lockInterruptibly();
    try {
        //校验链表中的节点数量是否到达了指定的容量
        //如果到达了指定的容量就进行阻塞等待
        //如果线程被唤醒了,但是链表中的节点数量还是未改变,则继续阻塞等待
        //只有当头节点出队,新的节点才能继续添加
        while (count.get() == capacity) {
            notFull.await();
        }
        //将新节点添加到链表的尾部并设置为尾节点
        enqueue(node);
        //获取没有添加当前节点时链表中的节点数量
        //并更新链表中的节点数量
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            //唤醒等待添加节点的线程
            //可能当前线程在等待队列中等待的时候
            //有新的线程要执行添加节点的操作
            //但是链表的容量已经到达最大,所以新的线程也会进行等待
            //当前线程被唤醒了并且链表的容量没有到达最大则尝试去唤醒等待的线程
            notFull.signal();
    } finally {
        //释放锁
        putLock.unlock();
    }
    if (c == 0)
        //c等于0说明添加当前节点的时候链表中没有节点
        //可能有线程在获取节点,但是链表中没有节点
        //从而一直进行等待,当添加了节点的时候就需要唤醒获取节点的线程
        signalNotEmpty();
}

LinkedBlockingQueue中的代码都比较简单,主要是ReentrantLock下的Condition中的方法比较复杂,我们先整体的了解一下put方法,首先通过new Node为将指定元素封装成一个节点,再获取putLock锁,当链表队列中的节点数量已经到达了capacity大小,那当前线程就需要调用Condition下的await方法进行等待将线程阻塞,直到有节点出队或者说有节点被删除或者当前线程被中断了,当前线程被中断了则会直接退出当前put方法并抛出异常,如果节点出队了或者节点被删除了,那当前线程被唤醒了则会继续执行添加节点的操作。

enqueue方法则会将封装的节点追加到链表队列中的尾部,通过getAndIncrement方法先获取没有添加当前节点时链表队列中节点的数量,然后更新添加了当前节点之后链表队列中节点的数量,c则是没有添加当前节点时链表队列中节点的数量,c+1则是添加当前节点后链表队列中节点的数量,如果说c+1小于capacity则说明线程在添加节点的时候,链表队列中的节点数量已经到达了最大值,后续添加节点的线程都需要进行阻塞,当有节点被删除或出队的时候,最开始阻塞的线程被唤醒,被唤醒的线程则会去执行添加节点的操作,当添加完节点之后链表队列中的节点数量没有到达最大值则会去唤醒后续被阻塞的线程执行添加节点的操作。

c等于0说明在添加当前节点之前,可能有线程在获取链表队列中的节点,但是链表队列中没有节点,导致获取节点的线程处于阻塞状态,当添加完节点之后,链表队列中有了节点,此时就需要唤醒阻塞的线程去获取节点。

添加元素的方法分为put和offer,区别在于阻塞与非阻塞,当链表队列中的节点数量已经到达最大值,put方法则会阻塞,而offer方法不会阻塞则是直接返回。

获取元素的方法分为take、poll、peek,take方法与put方法相似,只不过一个是入队,一个是出队,pollpeek都是非阻塞的,但是区别在于poll获取了节点之后,该节点会从链表队列中移除,而peek不会移除节点。

await

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        //线程被中断抛出异常
        throw new InterruptedException();
    //为当前线程创建一个等待模式的节点并入队,并将等待队列中已经取消等待的节点移除掉
    Node node = addConditionWaiter();
    //释放当前线程的锁,防止当前线程加了锁,导致其它在等待的线程被唤醒之后不能获取到锁从而导致一直阻塞
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //如果指定节点还在等待队列中等待则挂起
    //如果指定节点被中断了则会将指定节点添加到同步等待队列中
    //如果指定节点被唤醒了则会将指定节点添加到同步等待队列中
    while (!isOnSyncQueue(node)) {
        //节点在等待队列中则挂起
        LockSupport.park(this);
        //线程在等待队列中被中断则会添加到同步等待队列中
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //acquireQueued 指定节点中的线程被中断了或者被唤醒了则会尝试去获取锁
    //如果还未到指定节点中的线程获取锁的时候则会继续挂起
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        //指定节点的线程已经获取到了锁并且节点关联的下一个节点不为空
        //此时就需要将已经获取到锁的节点从等待队列中移除
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

首先通过addConditionWaiter方法将当前线程封装成一个等待模式的节点,并将节点添加到等待队列中以及会将等待队列中已经取消等待的线程节点从队列中移除,再通过fullyRelease方法释放掉当前线程加的所有的锁,之所以释放锁是防止其它线程获取不到锁从而一直阻塞,再看isOnSyncQueue方法,该方法是校验当前线程节点是否在等待队列中,如果在等待队列中那就将节点中的线程挂起等待。

isOnSyncQueue

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        //指定节点还在等待队列中此时就需要继续等待
        return false;
    if (node.next != null)
        //指定节点已经不在等待队列中了
        return true;
    //从等待队列中的尾节点开始向头节点遍历,校验指定的节点是否在其中
    return findNodeFromTail(node);
}

当节点的状态为CONDITION时,则说明该节点还在等待队列中,node.prev等于null为什么说也是在等待队列中呢?因为等待队列中的节点是没有prev指针和next指针的,如果prev指针和next指针指向的节点不为空,那就说明该节点是在同步等待队列中的,如果在同步等待队列中的话,那节点中的线程就可以尝试去获取锁并执行后续的操作。

当等待队列中的线程节点被唤醒和中断则会添加到同步等待队列中,如果是被中断的话则会通过checkInterruptWhileWaiting方法添加一个中断标识,再通过acquireQueued方法来获取锁,如果获取锁失败则继续等待,当获取锁成功之后则会该节点从等待队列中移除,如果说你是一个被中断的线程,最后会通过reportInterruptAfterWait方法抛出中断异常。

signal

public final void signal() {
    if (!isHeldExclusively())
        //加锁的线程不是当前线程则抛出异常
        throw new IllegalMonitorStateException();
    //头节点
    Node first = firstWaiter;
    if (first != null)
        //唤醒头节点
        doSignal(first);
}
/**
 * 唤醒等待队列中的头节点
 * 如果等待队列中的头节点被取消等待或已经被唤醒了
 * 此时就需要唤醒头节点的后续的一个节点
 * 直到成功的唤醒一个节点中的线程
 */
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}
/**
* 将指定的节点添加到同步等待队列中
* 并根据前一个节点的等待状态来决定是否需要立刻唤醒指定节点
*/
final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        //更改节点状态失败说明该节点已经被唤醒了
        return false;
    //将要唤醒的节点添加到同步等待队列中
    //并返回前一个节点
    Node p = enq(node);
    //前一个节点的等待状态
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //如果前一个节点的等待状态大于0则说明已经被取消加锁,此时就需要唤醒后续的节点,就是当前节点
        //前一个节点的等待状态不大于0但是更改前一个节点的等待状态时失败则说明前一个节点已经被唤醒了并更改了状态
        //此时就需要尝试将当前节点中的线程唤醒
        LockSupport.unpark(node.thread);
    return true;
}

唤醒线程节点的方法主要还是看transferForSignal方法,首先会通过cas操作将需要唤醒的节点的状态设置为0,如果更改节点状态失败则说明该节点已经被唤醒了,更新节点状态成功则会通过enq方法将节点添加到同步等待队列中,此时就需要根据前一个节点来决定是否需要立即唤醒当前节点中的线程。

从下面的图片中能看出来其实同步等待队列和等待队列中使用的节点是共用的节点,并不会创建新的节点,同步等待队列中的节点使用next指针和prev指针来关联节点,而等待队列中则是使用nextWaiter指针来关联节点的。

以上就是Java并发LinkedBlockingQueue源码分析的详细内容,更多关于Java并发LinkedBlockingQueue的资料请关注我们其它相关文章!

(0)

相关推荐

  • 详解Java并发编程之原子类

    目录 原子数组 AtomicIntegerArray 原子更新器 AtomicIntegerFieldUpdater 原子累加器 LongAdder 原子数组 原子数组有AtomicIntegerArray.AtomicLongArray.AtomicReferenceArray,主要是用来对数组中的某个元素进行原子操作.三个类的方法基本类似,这里只介绍一下AtomicIntegerArray的方法. AtomicIntegerArray 两个构造方法,第一个构造方法传入数组长度初始化一个所有值

  • 详细总结Java中常用的原子类

    一.什么是原子类 Java中提供了一些原子类,原子类包装了一个变量,并且提供了一系列对变量进行原子性操作的方法.我们在多线程环境下对这些原子类进行操作时,不需要加锁,大大简化了并发编程的开发. 二.原子类的底层实现 目前Java中提供的原子类大部分底层使用了CAS锁(CompareAndSet自旋锁),如AtomicInteger.AtomicLong等:也有使用了分段锁+CAS锁的原子类,如LongAdder等. 三.常用的原子类 3.1 AtomicInteger与AtomicLong At

  • java高并发下CopyOnWriteArrayList替代ArrayList

    目录 一.ArrayList线程不安全 二.解决ArrayList线程不安全的方案 1.使用Vector类 2.使用Collections类 3.使用CopyOnWriteArrayList类 三.CopyOnWriteArrayList 1.简介 2.主要方法源码分析 1.初始化 2.添加元素 3.获取指定位置元素 4.修改指定元素 5.删除元素 6.弱一致性的迭代器 四.总结 一.ArrayList线程不安全 在Java的集合框架中,想必大家对ArrayList肯定不陌生,单线程的情况下使用

  • java高并发下解决AtomicLong性能瓶颈方案LongAdder

    目录 一. LongAdder简介 二.LongAdder代码分析 (1)LongAdder的结构 (2)add方法实现 (3)add方法中longAccumulate方法的实现 三.总结 一. LongAdder简介 LongAdder类是JDK1.8新增的一个原子性操作类.上一节说到,AtomicLong通过CAS提供了非阻塞的原子性操作,相比用阻塞算法的synchronized来说性能已经得到了很大提升.在高并发下大量线程会同时竞争更新同一个原子变量,但由于只有一个线程的CAS操作会成功,

  • java并发编程专题(十一)----(JUC原子类)数组类型详解

    上一节我们介绍过三个基本类型的原子类,这次我们来看一下数组类型: AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray.其中前两个的使用方式差不多,AtomicReferenceArray因为他的参数为引用数组,所以跟前两个的使用方式有所不同. 1.AtomicLongArray介绍 对于AtomicLongArray, AtomicIntegerArray我们还是只介绍一个,另一个使用方式大同小异. 我们先来看看AtomicLong

  • java并发编程专题(十)----(JUC原子类)基本类型详解

    这一节我们先来看一下基本类型: AtomicInteger, AtomicLong, AtomicBoolean.AtomicInteger和AtomicLong的使用方法差不多,AtomicBoolean因为比较简单所以方法比前两个都少,那我们这节主要挑AtomicLong来说,会使用一个,其余的大同小异. 1.原子操作与一般操作异同 我们在说原子操作之前为了有个对比为什么需要这些原子类而不是普通的基本数据类型就能满足我们的使用要求,那就不得不提原子操作不同的地方. 当你在操作一个普通变量时,

  • Java并发LinkedBlockingQueue源码分析

    目录 简介 常量 构造方法 put await isOnSyncQueue signal 简介 LinkedBlockingQueue是一个阻塞的有界队列,底层是通过一个个的Node节点形成的链表实现的,链表队列中的头节点是一个空的Node节点,在多线程下操作时会使用ReentrantLock锁来保证数据的安全性,并使用ReentrantLock下的Condition对象来阻塞以及唤醒线程. 常量 /** * 链表中的节点类 */ static class Node<E> { //节点中的元素

  • Java并发 结合源码分析AQS原理

    前言: 如果说J.U.C包下的核心是什么?那我想答案只有一个就是AQS.那么AQS是什么呢?接下来让我们一起揭开AQS的神秘面纱 AQS是什么? AQS是AbstractQueuedSynchronizer的简称.为什么说它是核心呢?是因为它提供了一个基于FIFO的队列和state变量来构建锁和其他同步装置的基础框架.下面是其底层的数据结构. AQS的特点 1.其内使用Node实现FIFO(FirstInFirstOut)队列.可用于构建锁或者其他同步装置的基础框架 2.且利用了一个int类表示

  • Java并发Timer源码分析

    timer在JDK里面,是很早的一个API了.具有延时的,并具有周期性的任务,在newScheduledThreadPool出来之前我们一般会用Timer和TimerTask来做,但是Timer存在一些缺陷,为什么这么说呢? Timer只创建唯一的线程来执行所有Timer任务.如果一个timer任务的执行很耗时,会导致其他TimerTask的时效准确性出问题.例如一个TimerTask每10秒执行一次,而另外一个TimerTask每40ms执行一次,重复出现的任务会在后来的任务完成后快速连续的被

  • Java 读写锁源码分析

    前言 在实际项目中,比如我们有一个共享资源文件,我们程序会会同时并发的去读.写这个共享资源文件,那怎么能保证在高并发场景下安全.高效读写呢?OK,看了下文便知 提示:以下是本篇文章正文内容,案例仅供参考 一.技术介绍 1.ReentranReadWriteLock是什么? ReadWriteLock提供了readLock和writeLock两种锁的操作机制,一个是读锁,一个是写锁,而它的实现类就是ReentranReadWriteLock 读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的

  • Java集合框架源码分析之LinkedHashMap详解

    LinkedHashMap简介 LinkedHashMap是HashMap的子类,与HashMap有着同样的存储结构,但它加入了一个双向链表的头结点,将所有put到LinkedHashmap的节点一一串成了一个双向循环链表,因此它保留了节点插入的顺序,可以使节点的输出顺序与输入顺序相同. LinkedHashMap可以用来实现LRU算法(这会在下面的源码中进行分析). LinkedHashMap同样是非线程安全的,只在单线程环境下使用. LinkedHashMap源码剖析 LinkedHashM

  • Java struts2请求源码分析案例详解

    Struts2是Struts社区和WebWork社区的共同成果,我们甚至可以说,Struts2是WebWork的升级版,他采用的正是WebWork的核心,所以,Struts2并不是一个不成熟的产品,相反,构建在WebWork基础之上的Struts2是一个运行稳定.性能优异.设计成熟的WEB框架. 我这里的struts2源码是从官网下载的一个最新的struts-2.3.15.1-src.zip,将其解压即可.里面的目录页文件非常的多,我们只需要定位到struts-2.3.15.1\src\core

  • Java Array.sort()源码分析讲解

    阅读起点: Arrays.sort(nums1); 使用ctrl+左键进入sort()方法 1.Arrays.sort() 关于sort()的方法一共有14个,就目前调用的来看是以下这种最基础的. public static void sort(int[] a) { DualPivotQuicksort.sort(a, 0, a.length - 1, null, 0, 0); } 2.DualPivotQuicksort DualPivotQuicksort即双轴快排,定义了七种原始类型的排序

  • Java BufferedWriter BufferedReader 源码分析

    一:BufferedWriter 1.类功能简介: BufferedWriter.缓存字符输出流.他的功能是为传入的底层字符输出流提供缓存功能.同样当使用底层字符输出流向目的地中写入字符或者字符数组时.每写入一次就要打开一次到目的地的连接.这样频繁的访问不断效率底下.也有可能会对存储介质造成一定的破坏.比如当我们向磁盘中不断的写入字节时.夸张一点.将一个非常大单位是G的字节数据写入到磁盘的指定文件中的.没写入一个字节就要打开一次到这个磁盘的通道.这个结果无疑是恐怖的.而当我们使用Buffered

  • java.util.Collection源码分析与深度理解

    写在开头 java.util.Collection 作为Java开发最常用的接口之一,我们经常使用,今天我带大家一起研究一下Collection接口,希望对大家以后的编程以及系统设计能有所帮助,本文所研究的jdk版本为jdk1.8.0_131 明确一下几点: Collection是接口,其继承了Iterable接口 Collection属于单值类型集合,重点子接口List接口和Set接口 Java.util.List接口(有序.不唯一) ArraryList ArrayList 是一个数组队列,

  • Java集合系列之LinkedHashMap源码分析

    这篇文章我们开始分析LinkedHashMap的源码,LinkedHashMap继承了HashMap,也就是说LinkedHashMap是在HashMap的基础上扩展而来的,因此在看LinkedHashMap源码之前,读者有必要先去了解HashMap的源码,可以查看我上一篇文章的介绍<Java集合系列[3]----HashMap源码分析>.只要深入理解了HashMap的实现原理,回过头来再去看LinkedHashMap,HashSet和LinkedHashSet的源码那都是非常简单的.因此,读

随机推荐