深入理解Java并发编程之LinkedBlockingQueue队列

前面一篇文章我们介绍了使用CAS算法实现的非阻塞队列ConcurrentLinedQueue, 下面我们来介绍使用独占锁实现的阻塞队列LinkedBlockingQueue。

LinkedBlockingQueue也是使用单向链表实现的,其也有两个Node,分别用来存放首、尾节点,并且还有一个初始值为0的原子变量count,用来记录队列元素个数。另外还有两个ReentrantLock的实例,分别用来控制元素入队和出队的原子性,其中takeLock用来控制同时只有一个线程可以从队列头获取元素,其他线程必须等待,putLock控制同时只能有一个线程可以获取锁,在队列尾部添加元素,其他线程必须等待。另外,notEmpty 和 notFull 是条件变量,它们内部都有一个条件队列用来存放进队和出队时被阻塞的线程,其实这是生产者-消费者模型。如下是独占锁的创建代码。

private final AtomicInteger count = new AtomicInteger();

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
  • 当调用线程在LinkedBlockingQueue 实例上执行take、poll 等操作时需要获取到 takeLock 锁,从而保证同时只有一个线程可以操作链表头节点。另外由于条件变量 notEmpty 内部的条件队列的维护使用的是takeLock的锁状态管理机制,所以在调用notEmpty的await 和signal方法前调用线程必须先获取到 takeLock锁,否则会抛出IllegalMonitorStateException 异常。notEmpty内部则维护着一个条件队列,当线程获取到takeLock 锁后调用 notEmpty的await 方法时,调用线程会被阻塞,然后该线程会被放到notEmpty内部的条件队列进行等待,直到有线程调用了notEmpty的 signal 方法。
  • 在LinkedBlockingQueue实例上执行put、offer等操作时需要获取到putLock锁,从而保证同时只有一个线程可以操作链表尾节点。同样由于条件变量 notFull 内部的条件队列的维护使用的是putLock的锁状态管理机制,所以在调用 notFull 的 await 和 signal 方法前调用线程必须先获取到putLock锁,否则会抛出 IllegalMonitorStateException 异常。notFull 内部则维护着一个条件队列,当线程获取到 putLock 锁后调用notFull的await 方法时,调用线程会被阻塞,然后该线程会被放到notFull 内部的条件队列进行等待,直到有线程调用了 notFull 的 signal 方法。如下是LinkedBlockingQueue 的无参构造函数的代码。

如下是LinkedBlockingQueue的无参构造代码

public static final int MAX_VALUE = 0x7fffffff;
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalAgrumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

由该代码可知,默认队列容量为0x7fffffff,用户也可以自己指定容量,所以从一定程度上可以说LinkedBlockingQueue是有界阻塞队列。

offer操作

public boolean offer(E e) {
//(1)
    if (e == null) throw new NullPointerException();
    //(2)
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
        //(3)
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
    //(4)
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            //(5)
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
    //(6)
        putLock.unlock();
    }
    //(7)
    if (c == 0)
        signalNotEmpty();
        //(8)
    return c >= 0;
}

代码(2)判断如果当前队列已满则丢弃当前元素并返回false

代码(3)获取到 putLock 锁,当前线程获取到该锁后,则其他调用put和 offer操的线程将会被阻塞(阻塞的线程被放到putLock锁的AQS阻塞队列)。

代码(4)这里重新判断当前队列是否满,这是因为在执行代码(2)和获取到 putLock 锁期间可能其他线程通过 put 或者offer 操作向队列里面添加了新元素。重新判斯队列确实不满则新元素入队,并递增计数器。

代码(5)判断如果新元素入队后队列还有空闲空间,则唤醒notFull的条件队列里面因为调用了notFull的await操作(比如执行put方法而队列满了的时候)而被阻塞的一个线程,因为队列现在有空闲所以这里可以提前唤醒一个入队线程。

代码(6)则释放获取的putLock 锁,这里要注意,锁的释放一定要在finally里面做因为即使try块抛出异常了,finally也是会被执行到。另外释放锁后其他因为调用put 操作而被阻塞的线程将会有一个获取到该锁。

代码(7)中的c0说明在执行代码(6)释放锁时队列里面至少有一个元素,队列里面有元素则执行signalNotEmpty操作.

到此这篇关于深入理解Java并发编程之LinkedBlockingQueue队列的文章就介绍到这了,更多相关Java LinkedBlockingQueue队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • java并发编程工具类JUC之LinkedBlockingQueue链表队列

    java.util.concurrent.LinkedBlockingQueue 是一个基于单向链表的.范围任意的(其实是有界的).FIFO阻塞队列.访问与移除操作是在队头进行,添加操作是在队尾进行,并分别使用不同的锁进行保护,只有在可能涉及多个节点的操作才同时对两个锁进行加锁. 队列是否为空.是否已满仍然是通过元素数量的计数器(count)进行判断的,由于可以同时在队头.队尾并发地进行访问.添加操作,所以这个计数器必须是线程安全的,这里使用了一个原子类 AtomicInteger,这就决定了它

  • 深入理解Java并发编程之LinkedBlockingQueue队列

    前面一篇文章我们介绍了使用CAS算法实现的非阻塞队列ConcurrentLinedQueue, 下面我们来介绍使用独占锁实现的阻塞队列LinkedBlockingQueue. LinkedBlockingQueue也是使用单向链表实现的,其也有两个Node,分别用来存放首.尾节点,并且还有一个初始值为0的原子变量count,用来记录队列元素个数.另外还有两个ReentrantLock的实例,分别用来控制元素入队和出队的原子性,其中takeLock用来控制同时只有一个线程可以从队列头获取元素,其他

  • Java并发编程之ConcurrentLinkedQueue队列详情

    ConcurrentLinkedQueue JDK中提供了一系列场景的并发安全队列.总的来说,按照实现方式的不同可分为阻塞队列和非阻塞队列,前者使用锁实现,而后则使用CAS非阻塞算法实现. ConcurrentLinkedQueue 内部的队列使用单向链表方式实现,其中有两个volatile 类型的 Node 节点分别用来存放队列的首.尾节点.从下面的无参构造函数可知,默认头.尾节点都是指向 item 为null 的哨兵节点.新元素会被插入队列末尾,出队时从队列头部获取一个元素. public

  • 深入理解Java并发编程之ThreadLocal

    目录 ThreadLocal简介 ThreadLocal源码解析 实现原理 ThreadLocalMap源码分析 InheritableThreadLocal 参考资料 ThreadLocal简介 变量值的共享可以使用public static的形式,所有线程都使用同一个变量,如果想实现每一个线程都有自己的共享变量该如何实现呢?JDK中的ThreadLocal类正是为了解决这样的问题. ThreadLocal类并不是用来解决多线程环境下的共享变量问题,而是用来提供线程内部的共享变量,在多线程环境

  • Java并发编程之Fork/Join框架的理解

    一.Fork/Join框架的理解 ForkJoinTask类属于java.util.concurrent 包下: ForkJoinTask类下有2个子类,分别为RecursiveTask和RecursiveAction类:(lz示例中使用RecursiveTask类进行重写compute()方法进行实现数值的累加计算) ForkJoinTask类 将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出. 二.Fork/Join框架使用示例 示例场景:对数值进

  • Java并发编程之JUC并发核心AQS同步队列原理剖析

    目录 一.AQS介绍 二.AQS中的队列 1.同步等待队列 2.条件等待队列 3.AQS队列节点Node 三.同步队列源码分析 1.同步队列分析 2.同步队列--独占模式源码分析 3.同步队列--共享模式源码分析 一.AQS介绍 队列同步器AbstractQueuedSynchronizer(简称AQS),AQS定义了一套多线程访问共享资源的同步器框架,是用来构建锁或者其他同步组件的基础框架,是一个依赖状态(state)的同步器.Java并发编程的核心在java.util.concurrent(

  • Java并发编程之ConcurrentLinkedQueue源码详解

    一.ConcurrentLinkedQueue介绍 并编程中,一般需要用到安全的队列,如果要自己实现安全队列,可以使用2种方式: 方式1:加锁,这种实现方式就是我们常说的阻塞队列. 方式2:使用循环CAS算法实现,这种方式实现队列称之为非阻塞队列. 从点到面, 下面我们来看下非阻塞队列经典实现类:ConcurrentLinkedQueue (JDK1.8版) ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全的队列.当我们添加一个元素的时候,它会添加到队列的尾部,当我们

  • Java并发编程之Executors类详解

    一.Executors的理解 Executors类属于java.util.concurrent包: 线程池的创建分为两种方式:ThreadPoolExecutor 和 Executors: Executors(静态Executor工厂)用于创建线程池: 工厂和工具方法Executor , ExecutorService , ScheduledExecutorService , ThreadFactory和Callable在此包中定义的类: jdk1.8API中的解释如下: 二.Executors

  • Java并发编程之Executor接口的使用

    一.Executor接口的理解 Executor属于java.util.concurrent包下: Executor是任务执行机制的核心接口: 二.Executor接口的类图结构 由类图结构可知: ThreadPoolExecutor 继承了AbstractExecutorService接口: AbstractExecutorService接口实现了ExecutorService接口: ExecutorService继承了Executor接口: 因此以下部分主要讲解ThreadPoolExecu

  • Java并发编程之StampedLock锁介绍

    StampedLock: StampedLock是并发包里面JDK8版本新增的一个锁,该锁提供了三种模式的读写控制,当调用获取锁的系列函数时,会返回一个long 型的变量,我们称之为戳记(stamp),这个戳记代表了锁的状态.其中try 系列获取锁的函数,当获取锁失败后会返回为0的stamp值.当调用释放锁和转换锁的方法时需要传入获取锁时返回的stamp值. StampedLock提供的三种读写模式的锁分别如下: 写锁witeLock: 是一个排它锁或者独占锁,某时只有一个线程可以获取该锁,当一

  • Java并发编程之Semaphore(信号量)详解及实例

    Java并发编程之Semaphore(信号量)详解及实例 概述 通常情况下,可能有多个线程同时访问数目很少的资源,如客户端建立了若干个线程同时访问同一数据库,这势必会造成服务端资源被耗尽的地步,那么怎样能够有效的来控制不可预知的接入量呢?及在同一时刻只能获得指定数目的数据库连接,在JDK1.5 java.util.concurrent 包中引入了Semaphore(信号量),信号量是在简单上锁的基础上实现的,相当于能令线程安全执行,并初始化为可用资源个数的计数器,通常用于限制可以访问某些资源(物

随机推荐