Java concurrency集合之ConcurrentLinkedQueue_动力节点Java学院整理

ConcurrentLinkedQueue介绍

ConcurrentLinkedQueue是线程安全的队列,它适用于“高并发”的场景。

它是一个基于链接节点的无界线程安全队列,按照 FIFO(先进先出)原则对元素进行排序。队列元素中不可以放置null元素(内部实现的特殊节点除外)。

ConcurrentLinkedQueue原理和数据结构

ConcurrentLinkedQueue的数据结构,如下图所示:

说明:

1. ConcurrentLinkedQueue继承于AbstractQueue。

2. ConcurrentLinkedQueue内部是通过链表来实现的。它同时包含链表的头节点head和尾节点tail。ConcurrentLinkedQueue按照 FIFO(先进先出)原则对元素进行排序。元素都是从尾部插入到链表,从头部开始返回。

3. ConcurrentLinkedQueue的链表Node中的next的类型是volatile,而且链表数据item的类型也是volatile。关于volatile,我们知道它的语义包含:“即对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入”。ConcurrentLinkedQueue就是通过volatile来实现多线程对竞争资源的互斥访问的。

ConcurrentLinkedQueue函数列表

// 创建一个最初为空的 ConcurrentLinkedQueue。
ConcurrentLinkedQueue()
// 创建一个最初包含给定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。
ConcurrentLinkedQueue(Collection<? extends E> c)
// 将指定元素插入此队列的尾部。
boolean add(E e)
// 如果此队列包含指定元素,则返回 true。
boolean contains(Object o)
// 如果此队列不包含任何元素,则返回 true。
boolean isEmpty()
// 返回在此队列元素上以恰当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 将指定元素插入此队列的尾部。
boolean offer(E e)
// 获取但不移除此队列的头;如果此队列为空,则返回 null。
E peek()
// 获取并移除此队列的头,如果此队列为空,则返回 null。
E poll()
// 从队列中移除指定元素的单个实例(如果存在)。
boolean remove(Object o)
// 返回此队列中的元素数量。
int size()
// 返回以恰当顺序包含此队列所有元素的数组。
Object[] toArray()
// 返回以恰当顺序包含此队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)

下面从ConcurrentLinkedQueue的创建,添加,删除这几个方面对它进行分析。

1 创建

下面以ConcurrentLinkedQueue()来进行说明。

public ConcurrentLinkedQueue() {
 head = tail = new Node<E>(null);
}

说明:在构造函数中,新建了一个“内容为null的节点”,并设置表头head和表尾tail的值为新节点。

head和tail的定义如下:

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

head和tail都是volatile类型,他们具有volatile赋予的含义:“即对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入”。

Node的声明如下:

private static class Node<E> {
 volatile E item;
 volatile Node<E> next;
 Node(E item) {
 UNSAFE.putObject(this, itemOffset, item);
 }
 boolean casItem(E cmp, E val) {
 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
 }
 void lazySetNext(Node<E> val) {
 UNSAFE.putOrderedObject(this, nextOffset, val);
 }
 boolean casNext(Node<E> cmp, Node<E> val) {
 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
 }
 // Unsafe mechanics
 private static final sun.misc.Unsafe UNSAFE;
 private static final long itemOffset;
 private static final long nextOffset;
 static {
 try {
  UNSAFE = sun.misc.Unsafe.getUnsafe();
  Class k = Node.class;
  itemOffset = UNSAFE.objectFieldOffset
  (k.getDeclaredField("item"));
  nextOffset = UNSAFE.objectFieldOffset
  (k.getDeclaredField("next"));
 } catch (Exception e) {
  throw new Error(e);
 }
 }
}

说明:

Node是个单向链表节点,next用于指向下一个Node,item用于存储数据。Node中操作节点数据的API,都是通过Unsafe机制的CAS函数实现的;例如casNext()是通过CAS函数“比较并设置节点的下一个节点”。

2. 添加

下面以add(E e)为例对ConcurrentLinkedQueue中的添加进行说明。

public boolean add(E e) {
 return offer(e);
}

说明:add()实际上是调用的offer()来完成添加操作的。

offer()的源码如下:

public boolean offer(E e) {
 // 检查e是不是null,是的话抛出NullPointerException异常。
 checkNotNull(e);
 // 创建新的节点
 final Node<E> newNode = new Node<E>(e);

 // 将“新的节点”添加到链表的末尾。
 for (Node<E> t = tail, p = t;;) {
 Node<E> q = p.next;
 // 情况1:q为空
 if (q == null) {
  // CAS操作:如果“p的下一个节点为null”(即p为尾节点),则设置p的下一个节点为newNode。
  // 如果该CAS操作成功的话,则比较“p和t”(若p不等于t,则设置newNode为新的尾节点),然后返回true。
  // 如果该CAS操作失败,这意味着“其它线程对尾节点进行了修改”,则重新循环。
  if (p.casNext(null, newNode)) {
  if (p != t) // hop two nodes at a time
   casTail(t, newNode); // Failure is OK.
  return true;
  }
 }
 // 情况2:p和q相等
 else if (p == q)
  p = (t != (t = tail)) ? t : head;
 // 情况3:其它
 else
  p = (p != t && t != (t = tail)) ? t : q;
 }
}

说明:offer(E e)的作用就是将元素e添加到链表的末尾。offer()比较的地方是理解for循环,下面区分3种情况对for进行分析。

情况1 -- q为空。这意味着q是尾节点的下一个节点。此时,通过p.casNext(null, newNode)将“p的下一个节点设为newNode”,若设置成功的话,则比较“p和t”(若p不等于t,则设置newNode为新的尾节点),然后返回true。否则的话(意味着“其它线程对尾节点进行了修改”),什么也不做,继续进行for循环。

p.casNext(null, newNode),是调用CAS对p进行操作。若“p的下一个节点等于null”,则设置“p的下一个节点等于newNode”;设置成功的话,返回true,失败的话返回false。

情况2 -- p和q相等。这种情况什么时候会发生呢?通过“情况3”,我们知道,经过“情况3”的处理后,p的值可能等于q。

此时,若尾节点没有发生变化的话,那么,应该是头节点发生了变化,则设置p为头节点,然后重新遍历链表;否则(尾节点变化的话),则设置p为尾节点。

情况3 -- 其它。

我们将p = (p != t && t != (t = tail)) ? t : q;转换成如下代码。

if (p==t) {
 p = q;
} else {
 Node<E> tmp=t;
 t = tail;
 if (tmp==t) {
 p=q;
 } else {
 p=t;
 }
}

如果p和t相等,则设置p为q。否则的话,判断“尾节点是否发生变化”,没有变化的话,则设置p为q;否则,设置p为尾节点。

checkNotNull()的源码如下:

private static void checkNotNull(Object v) {
 if (v == null)
 throw new NullPointerException();
} 

3. 删除

下面以poll()为例对ConcurrentLinkedQueue中的删除进行说明。

public E poll() {
 // 设置“标记”
 restartFromHead:
 for (;;) {
 for (Node<E> h = head, p = h, q;;) {
  E item = p.item;

  // 情况1
  // 表头的数据不为null,并且“设置表头的数据为null”这个操作成功的话;
  // 则比较“p和h”(若p!=h,即表头发生了变化,则更新表头,即设置表头为p),然后返回原表头的item值。
  if (item != null && p.casItem(item, null)) {
  if (p != h) // hop two nodes at a time
   updateHead(h, ((q = p.next) != null) ? q : p);
  return item;
  }
  // 情况2
  // 表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。则更新表头为p,并返回null。
  else if ((q = p.next) == null) {
  updateHead(h, p);
  return null;
  }
  // 情况3
  // 这可能到由于“情况4”的发生导致p=q,在该情况下跳转到restartFromHead标记重新操作。
  else if (p == q)
  continue restartFromHead;
  // 情况4
  // 设置p为q
  else
  p = q;
 }
 }
}

说明:poll()的作用就是删除链表的表头节点,并返回被删节点对应的值。poll()的实现原理和offer()比较类似,下面根将or循环划分为4种情况进行分析。

情况1:“表头节点的数据”不为null,并且“设置表头节点的数据为null”这个操作成功。

p.casItem(item, null) -- 调用CAS函数,比较“节点p的数据值”与item是否相等,是的话,设置节点p的数据值为null。

在情况1发生时,先比较“p和h”,若p!=h,即表头发生了变化,则调用updateHead()更新表头;然后返回删除节点的item值。

updateHead()的源码如下:

final void updateHead(Node<E> h, Node<E> p) {
 if (h != p && casHead(h, p))
 h.lazySetNext(h);
}

说明:updateHead()的最终目的是更新表头为p,并设置h的下一个节点为h本身。

casHead(h,p)是通过CAS函数设置表头,若表头等于h的话,则设置表头为p。

lazySetNext()的源码如下:

void lazySetNext(Node<E> val) {
 UNSAFE.putOrderedObject(this, nextOffset, val);
}

putOrderedObject()函数,我们在前面一章“TODO”中介绍过。h.lazySetNext(h)的作用是通过CAS函数设置h的下一个节点为h自身,该设置可能会延迟执行。

情况2:如果表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。

则调用updateHead(h, p),将表头更新p;然后返回null。

情况3:p=q

在“情况4”的发生后,会导致p=q;此时,“情况3”就会发生。当“情况3”发生后,它会跳转到restartFromHead标记重新操作。

情况4:其它情况。

设置p=q。

ConcurrentLinkedQueue示例

import java.util.*;
 import java.util.concurrent.*;
 /*
 * ConcurrentLinkedQueue是“线程安全”的队列,而LinkedList是非线程安全的。
 *
 * 下面是“多个线程同时操作并且遍历queue”的示例
 * (01) 当queue是ConcurrentLinkedQueue对象时,程序能正常运行。
 * (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。
 *
 *
 */
 public class ConcurrentLinkedQueueDemo1 {
 // TODO: queue是LinkedList对象时,程序会出错。
 //private static Queue<String> queue = new LinkedList<String>();
 private static Queue<String> queue = new ConcurrentLinkedQueue<String>();
 public static void main(String[] args) {
  // 同时启动两个线程对queue进行操作!
  new MyThread("ta").start();
  new MyThread("tb").start();
 }
 private static void printAll() {
  String value;
  Iterator iter = queue.iterator();
  while(iter.hasNext()) {
  value = (String)iter.next();
  System.out.print(value+", ");
  }
  System.out.println();
 }
 private static class MyThread extends Thread {
  MyThread(String name) {
  super(name);
  }
  @Override
  public void run() {
   int i = 0;
  while (i++ < 6) {
   // “线程名” + "-" + "序号"
   String val = Thread.currentThread().getName()+i;
   queue.add(val);
   // 通过“Iterator”遍历queue。
   printAll();
  }
  }
 }
 }

(某一次)运行结果:

ta1, ta1, tb1, tb1,
ta1, ta1, tb1, tb1, ta2, ta2, tb2,
tb2,
ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta3, tb3,
ta3, ta1, tb3, tb1, ta4,
ta2, ta1, tb2, tb1, ta3, ta2, tb3, tb2, ta4, ta3, tb4,
tb3, ta1, ta4, tb1, tb4, ta2, ta5,
tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, ta3, ta5, tb3, tb5,
ta4, ta1, tb4, tb1, ta5, ta2, tb5, tb2, ta6,
ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, tb3, ta6, ta4, tb6,
tb4, ta5, tb5, ta6, tb6, 

结果说明:如果将源码中的queue改成LinkedList对象时,程序会产生ConcurrentModificationException异常。

(0)

相关推荐

  • Java concurrency之集合_动力节点Java学院整理

    Java集合包 Java集合主体内容包括Collection集合和Map类:而Collection集合又可以划分为List(队列)和Set(集合). 1. List的实现类主要有: LinkedList, ArrayList, Vector, Stack. (01) LinkedList是双向链表实现的双端队列:它不是线程安全的,只适用于单线程. (02) ArrayList是数组实现的队列,它是一个动态数组:它也不是线程安全的,只适用于单线程. (03) Vector是数组实现的矢量队列,它也

  • Java concurrency集合之CopyOnWriteArraySet_动力节点Java学院整理

    CopyOnWriteArraySet介绍 它是线程安全的无序的集合,可以将它理解成线程安全的HashSet.有意思的是,CopyOnWriteArraySet和HashSet虽然都继承于共同的父类AbstractSet:但是,HashSet是通过"散列表(HashMap)"实现的,而CopyOnWriteArraySet则是通过"动态数组(CopyOnWriteArrayList)"实现的,并不是散列表. 和CopyOnWriteArrayList类似,CopyO

  • Java concurrency集合之ArrayBlockingQueue_动力节点Java学院整理

    ArrayBlockingQueue介绍 ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列. 线程安全是指,ArrayBlockingQueue内部通过"互斥锁"保护竞争资源,实现了多线程对竞争资源的互斥访问.而有界,则是指ArrayBlockingQueue对应的数组是有界限的. 阻塞队列,是指多线程访问竞争资源时,当竞争资源已被某线程获取时,其它要获取该资源的线程需要阻塞等待:而且,ArrayBlockingQueue是按 FIFO(先进先出)原则对元素进行

  • Java concurrency集合之ConcurrentHashMap_动力节点Java学院整理

    ConcurrentHashMap介绍 ConcurrentHashMap是线程安全的哈希表.HashMap, Hashtable, ConcurrentHashMap之间的关联如下: HashMap是非线程安全的哈希表,常用于单线程程序中. Hashtable是线程安全的哈希表,它是通过synchronized来保证线程安全的:即,多线程通过同一个"对象的同步锁"来实现并发控制.Hashtable在线程竞争激烈时,效率比较低(此时建议使用ConcurrentHashMap)!因为当一

  • Java concurrency集合之 CopyOnWriteArrayList_动力节点Java学院整理

    CopyOnWriteArrayList介绍 它相当于线程安全的ArrayList.和ArrayList一样,它是个可变数组:但是和ArrayList不同的时,它具有以下特性: 1. 它最适合于具有以下特征的应用程序:List 大小通常保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突. 2. 它是线程安全的. 3. 因为通常需要复制整个基础数组,所以可变操作(add().set() 和 remove() 等等)的开销很大. 4. 迭代器支持hasNext(), next()等不可

  • Java concurrency集合之ConcurrentSkipListMap_动力节点Java学院整理

    ConcurrentSkipListMap介绍 ConcurrentSkipListMap是线程安全的有序的哈希表,适用于高并发的场景. ConcurrentSkipListMap和TreeMap,它们虽然都是有序的哈希表.但是,第一,它们的线程安全机制不同,TreeMap是非线程安全的,而ConcurrentSkipListMap是线程安全的.第二,ConcurrentSkipListMap是通过跳表实现的,而TreeMap是通过红黑树实现的. 关于跳表(Skip List),它是平衡树的一种

  • Java concurrency集合之ConcurrentSkipListSet_动力节点Java学院整理

    ConcurrentSkipListSet介绍 ConcurrentSkipListSet是线程安全的有序的集合,适用于高并发的场景. ConcurrentSkipListSet和TreeSet,它们虽然都是有序的集合.但是,第一,它们的线程安全机制不同,TreeSet是非线程安全的,而ConcurrentSkipListSet是线程安全的.第二,ConcurrentSkipListSet是通过ConcurrentSkipListMap实现的,而TreeSet是通过TreeMap实现的. Con

  • Java concurrency集合之LinkedBlockingDeque_动力节点Java学院整理

    LinkedBlockingDeque介绍 LinkedBlockingDeque是双向链表实现的双向并发阻塞队列.该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除):并且,该阻塞队列是支持线程安全. 此外,LinkedBlockingDeque还是可选容量的(防止过度膨胀),即可以指定队列的容量.如果不指定,默认容量大小等于Integer.MAX_VALUE. LinkedBlockingDeque原理和数据结构 LinkedBlockingDeque

  • Java concurrency集合之ConcurrentLinkedQueue_动力节点Java学院整理

    ConcurrentLinkedQueue介绍 ConcurrentLinkedQueue是线程安全的队列,它适用于"高并发"的场景. 它是一个基于链接节点的无界线程安全队列,按照 FIFO(先进先出)原则对元素进行排序.队列元素中不可以放置null元素(内部实现的特殊节点除外). ConcurrentLinkedQueue原理和数据结构 ConcurrentLinkedQueue的数据结构,如下图所示: 说明: 1. ConcurrentLinkedQueue继承于AbstractQ

  • Java concurrency之锁_动力节点Java学院整理

    根据锁的添加到Java中的时间,Java中的锁,可以分为"同步锁"和"JUC包中的锁". 同步锁 即通过synchronized关键字来进行同步,实现对竞争资源的互斥访问的锁.Java 1.0版本中就已经支持同步锁了. 同步锁的原理是,对于每一个对象,有且仅有一个同步锁:不同的线程能共同访问该同步锁.但是,在同一个时间点,该同步锁能且只能被一个线程获取到.这样,获取到同步锁的线程就能进行CPU调度,从而在CPU上执行:而没有获取到同步锁的线程,必须进行等待,直到获取

  • Java中StringBuffer和StringBuilder_动力节点Java学院整理

    下面先给大家介绍下String.StringBuffer.StringBuilder区别,具体详情如下所示: StringBuffer.StringBuilder和String一样,也用来代表字符串.String类是不可变类,任何对String的改变都 会引发新的String对象的生成:StringBuffer则是可变类,任何对它所指代的字符串的改变都不会产生新的对象.既然可变和不可变都有了,为何还有一个StringBuilder呢?相信初期的你,在进行append时,一般都会选择StringB

随机推荐