Java并发编程之ConcurrentLinkedQueue队列详情

ConcurrentLinkedQueue

JDK中提供了一系列场景的并发安全队列。总的来说,按照实现方式的不同可分为阻塞队列和非阻塞队列,前者使用锁实现,而后则使用CAS非阻塞算法实现。

ConcurrentLinkedQueue 内部的队列使用单向链表方式实现,其中有两个volatile 类型的 Node 节点分别用来存放队列的首、尾节点。从下面的无参构造函数可知,默认头、尾节点都是指向 item 为null 的哨兵节点。新元素会被插入队列末尾,出队时从队列头部获取一个元素。

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

在 Node 节点内部则维护一个使用volatile 修饰的变量 item,用来存放节点的值;next用来存放链表的下一个节点,从而链接为一个单向无界链表。其内部则使用 UNSafe 工具类提供的CAS 算法来保证出入队时操作链表的原子性。

下面通过介绍ConcurrentLinkedQueue的几个方法来介绍其实现原理。

offer操作: offer操作是在队列末尾添加一个元素,如果传递的参数是null则抛出NPE异常,否则由于ConcurrentLinkedQueue是无界队列,该方法一直会返回true。另外,由于使用CAS无阻塞算法,因此该方法不会阻塞挂起调用线程。下面具体看下实现原理。

public boolean offer(E e) {
//(1)e为null这抛出空指针异常
    checkNotNull(e);
    //(2)构造Node节点,在构造函数内部调用unsafe.putObject
    final Node<E> newNode = new Node<E>(e);
    //(3) 从尾节点插入
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
 //(4) 如果q==null说明p是尾节点,则执行插入
        if (q == null) {
            // p is last node
            //(5)使用CAS设置p节点的next节点
            if (p.casNext(null, newNode)) {
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                   //(6)CAS成功,则说明新增节点已经放入链表,然后设置当前尾巴节点

                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}
  • 首先看当一个线程调用offer(item)时的情况。首先代码(1)对传参进行空检查, 由于使用 如果为null 则抛出NPE 异常,否则执行代码(2)并使用item作为构造函数参数创建一 个新的节点,然后代码(3)从队列尾部节点开始循环,打算从队列尾部添加元素。这时候节点p、t、head、tail同时指向了item为null的哨兵节点,由于哨兵节点的next 节点为null,所以这里q也指向null。代码(4)发现q->null则执行代码(5),通过CAS 原子操作判断p节点的next节点是否为null,如果为null 则使用节点newNode替换p的next节点,然后执行代码(6),这里由于p=t所以没有设置尾部节点,然后退出 offer方法。
  • 上面是一个线程调用offer方法的情况,如果多个线程同时调用,就会存在多个线程同时执行到代码(5)的情况。假设线程A调用offer(item1),线程B调用 ofer(item2),同时执行到代码(5)p.casNext(null, newNode)。由于CAS的比较设置操作是原子性的,所以这里假设线程A先执行了比较设置操作,发现当前p的 next 节点确实是null,则会原子性地更新next节点为iteml,这时候线程B也会判断p的next节点是否为null,结果发现不是null(因为线程A已经设置了p的next节点为iteml),则会跳到代码(3),然后执行到代码(4)。

可见,offer 操作中的关键步骤是代码(5),通过原子CAS 操作来控制某时只有一个线程可以追加元素到队列末尾。进行CAS 竞争失败的线程会通过循环一次次尝试进行 CAS操作,直到CAS 成功才会返回,也就是通过使用无限循环不断进行 CAS 尝试方式来替代阻塞算法挂起调用线程。相比阻塞算法,这是使用CPU资源换取阻塞所带来的开销。

add操作:

add操作是在链表尾部添加一个元素,其实在内部调用的还是offer操作。

public boolean add(E e) {
    return offer(e);
}

poll操作:

poll操作是在队列头部获取并移除一个元素,如果队列为空则返回null。

public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

poll方法在移除一个元素时,只是简单地使用 CAS操作把当前节点的item值设置为null,然后通过重新设置头节点将该元素从队列里面移除,被移除的节点就成了孤立节点,这个节点会在垃圾回收时被回收掉。另外,如果在执行分支中发现头节点被修改了,要跳到外层循环重新获取新的头节点。

peak操作:

peak操作是获取队列头部获一个元素,如果队列为空则返回null。

public E peek() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            //注释
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

Peek操作的代码结构与poll操作类似,不同之处在于我们在代码中标记注释的地方中少了castItem操作。其实这很正常,因为peek只是获取队列头元素值,并不清空其值。根据前面的介绍我们知道第一次执行offer后head指向的是哨兵节点(也就是item为null的节点),那么第一次执行peek时在注释处会发现item==null,然后执行q=p.next,这时候q节点指向的才是队列里面第一个真正的元素,或者如果队列为 null 则 q 指向 null。

到此这篇关于Java并发编程之ConcurrentLinkedQueue队列详情的文章就介绍到这了,更多相关Java并发编程 ConcurrentLinkedQueue 内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java concurrency集合之ConcurrentLinkedQueue_动力节点Java学院整理

    ConcurrentLinkedQueue介绍 ConcurrentLinkedQueue是线程安全的队列,它适用于"高并发"的场景. 它是一个基于链接节点的无界线程安全队列,按照 FIFO(先进先出)原则对元素进行排序.队列元素中不可以放置null元素(内部实现的特殊节点除外). ConcurrentLinkedQueue原理和数据结构 ConcurrentLinkedQueue的数据结构,如下图所示: 说明: 1. ConcurrentLinkedQueue继承于AbstractQ

  • Java并发编程之ConcurrentLinkedQueue源码详解

    一.ConcurrentLinkedQueue介绍 并编程中,一般需要用到安全的队列,如果要自己实现安全队列,可以使用2种方式: 方式1:加锁,这种实现方式就是我们常说的阻塞队列. 方式2:使用循环CAS算法实现,这种方式实现队列称之为非阻塞队列. 从点到面, 下面我们来看下非阻塞队列经典实现类:ConcurrentLinkedQueue (JDK1.8版) ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全的队列.当我们添加一个元素的时候,它会添加到队列的尾部,当我们

  • Java并发编程之ConcurrentLinkedQueue队列详情

    ConcurrentLinkedQueue JDK中提供了一系列场景的并发安全队列.总的来说,按照实现方式的不同可分为阻塞队列和非阻塞队列,前者使用锁实现,而后则使用CAS非阻塞算法实现. ConcurrentLinkedQueue 内部的队列使用单向链表方式实现,其中有两个volatile 类型的 Node 节点分别用来存放队列的首.尾节点.从下面的无参构造函数可知,默认头.尾节点都是指向 item 为null 的哨兵节点.新元素会被插入队列末尾,出队时从队列头部获取一个元素. public

  • 深入理解Java并发编程之LinkedBlockingQueue队列

    前面一篇文章我们介绍了使用CAS算法实现的非阻塞队列ConcurrentLinedQueue, 下面我们来介绍使用独占锁实现的阻塞队列LinkedBlockingQueue. LinkedBlockingQueue也是使用单向链表实现的,其也有两个Node,分别用来存放首.尾节点,并且还有一个初始值为0的原子变量count,用来记录队列元素个数.另外还有两个ReentrantLock的实例,分别用来控制元素入队和出队的原子性,其中takeLock用来控制同时只有一个线程可以从队列头获取元素,其他

  • Java并发编程之JUC并发核心AQS同步队列原理剖析

    目录 一.AQS介绍 二.AQS中的队列 1.同步等待队列 2.条件等待队列 3.AQS队列节点Node 三.同步队列源码分析 1.同步队列分析 2.同步队列--独占模式源码分析 3.同步队列--共享模式源码分析 一.AQS介绍 队列同步器AbstractQueuedSynchronizer(简称AQS),AQS定义了一套多线程访问共享资源的同步器框架,是用来构建锁或者其他同步组件的基础框架,是一个依赖状态(state)的同步器.Java并发编程的核心在java.util.concurrent(

  • Java并发编程之Condition源码分析(推荐)

    Condition介绍 上篇文章讲了ReentrantLock的加锁和释放锁的使用,这篇文章是对ReentrantLock的补充.ReentrantLock#newCondition()可以创建Condition,在ReentrantLock加锁过程中可以利用Condition阻塞当前线程并临时释放锁,待另外线程获取到锁并在逻辑后通知阻塞线程"激活".Condition常用在基于异步通信的同步机制实现中,比如dubbo中的请求和获取应答结果的实现. 常用方法 Condition中主要的

  • 浅谈Java并发编程之Lock锁和条件变量

    简单使用Lock锁 Java 5中引入了新的锁机制--java.util.concurrent.locks中的显式的互斥锁:Lock接口,它提供了比synchronized更加广泛的锁定操作.Lock接口有3个实现它的类:ReentrantLock.ReetrantReadWriteLock.ReadLock和ReetrantReadWriteLock.WriteLock,即重入锁.读锁和写锁.lock必须被显式地创建.锁定和释放,为了可以使用更多的功能,一般用ReentrantLock为其实例

  • 深入分析Java并发编程之CAS

    在Java并发编程的世界里,synchronized 和 Lock 是控制多线程并发环境下对共享资源同步访问的两大手段.其中 Lock 是 JDK 层面的锁机制,是轻量级锁,底层使用大量的自旋+CAS操作实现的. 学习并发推荐<Java并发编程的艺术> 那什么是CAS呢?CAS,compare and swap,即比较并交换,什么是比较并交换呢?在Lock锁的理念中,采用的是一种乐观锁的形式,即多线程去修改共享资源时,不是在修改之前就加锁,而是乐观的认为没有别的线程和自己争锁,就是通过CAS的

  • Java并发编程之ReentrantLock可重入锁的实例代码

    目录 1.ReentrantLock可重入锁概述2.可重入3.可打断4.锁超时5.公平锁6.条件变量 Condition 1.ReentrantLock可重入锁概述 相对于 synchronized 它具备如下特点 可中断 synchronized锁加上去不能中断,a线程应用锁,b线程不能取消掉它 可以设置超时时间 synchronized它去获取锁时,如果对方持有锁,那么它就会进入entryList一直等待下去.而可重入锁可以设置超时时间,规定时间内如果获取不到锁,就放弃锁 可以设置为公平锁

  • Java并发编程之CountDownLatch源码解析

    一.前言 CountDownLatch维护了一个计数器(还是是state字段),调用countDown方法会将计数器减1,调用await方法会阻塞线程直到计数器变为0.可以用于实现一个线程等待所有子线程任务完成之后再继续执行的逻辑,也可以实现类似简易CyclicBarrier的功能,达到让多个线程等待同时开始执行某一段逻辑目的. 二.使用 一个线程等待其它线程执行完再继续执行 ...... CountDownLatch cdl = new CountDownLatch(10); Executor

  • Java并发编程之Executors类详解

    一.Executors的理解 Executors类属于java.util.concurrent包: 线程池的创建分为两种方式:ThreadPoolExecutor 和 Executors: Executors(静态Executor工厂)用于创建线程池: 工厂和工具方法Executor , ExecutorService , ScheduledExecutorService , ThreadFactory和Callable在此包中定义的类: jdk1.8API中的解释如下: 二.Executors

随机推荐