java并发中DelayQueue延迟队列原理剖析

介绍

DelayQueue队列是一个延迟队列,DelayQueue中存放的元素必须实现Delayed接口的元素,实现接口后相当于是每个元素都有个过期时间,当队列进行take获取元素时,先要判断元素有没有过期,只有过期的元素才能出队操作,没有过期的队列需要等待剩余过期时间才能进行出队操作。

源码分析

DelayQueue队列内部使用了PriorityQueue优先队列来进行存放数据,它采用的是二叉堆进行的优先队列,使用ReentrantLock锁来控制线程同步,由于内部元素是采用的PriorityQueue来进行存放数据,所以Delayed接口实现了Comparable接口,用于比较来控制优先级,如下代码所示:

 public interface Delayed extends Comparable<Delayed> {

     /**
      * Returns the remaining delay associated with this object, in the
      * given time unit.
      *
      * @param unit the time unit
      * @return the remaining delay; zero or negative values indicate
      * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

DelayQueue的成员变量如下所示:

 // 锁。
 private final transient ReentrantLock lock = new ReentrantLock();
 // 优先队列。
 private final PriorityQueue<E> q = new PriorityQueue<E>();

 /**
  * Leader-Follower的变种。
  * Thread designated to wait for the element at the head of
  * the queue.  This variant of the Leader-Follower pattern
 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
 * minimize unnecessary timed waiting.  When a thread becomes
 * the leader, it waits only for the next delay to elapse, but
 * other threads await indefinitely.  The leader thread must
 * signal some other thread before returning from take() or
 * poll(...), unless some other thread becomes leader in the
 * interim.  Whenever the head of the queue is replaced with
 * an element with an earlier expiration time, the leader
 * field is invalidated by being reset to null, and some
 * waiting thread, but not necessarily the current leader, is
 * signalled.  So waiting threads must be prepared to acquire
 * and lose leadership while waiting.
 */
private Thread leader = null;

/**
 * Condition signalled when a newer element becomes available
 * at the head of the queue or a new thread may need to
 * become leader.
 */
// 条件,代表如果有数据则通知Follower线程,唤醒线程处理队列内容。
private final Condition available = lock.newCondition();

Leader-Follower模式的变种,用于最小化不必要的定时等待,当一个线程被选择为Leader时,它会等待延迟过去执行代码逻辑,而其他线程则需要无限期等待,在从take或poll返回之前,每当队列的头部被替换为具有更早到期时间的元素时,leader字段将通过重置为空而无效,Leader线程必须向其中一个Follower线程发出信号,被唤醒的 follwer 线程被设置为新的Leader 线程。

offer操作

 public boolean offer(E e) {
     // 获取到锁
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
         // 将元素存储到PriorityQueue优先队列中
         q.offer(e);
         // 如果第一个元素是当前元素,说明之前队列中为空,则先将Leader设置为空,通知等待线程可以争抢Leader了。
         if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        // 返回成功
        return true;
    } finally {
        lock.unlock();
    }
}

offer操作前先进行获取锁的操作,也就是同一时间内只能有一个线程可以入队操作。

  • 获取到ReentrantLock锁对象。
  • 将元素添加到PriorityQueue优先队列中
  • 如果队列中最早过期的元素是自己,则说明队列原先是空的,所以将Leader进行重置,通知Follower线程可以成为Leader线程。
  • 最后进行解锁操作。

put操作

put操作其实就是调用的offer操作来进行添加数据的,以下是源码信息:

public void put(E e) {
    offer(e);
}

take操作

 public E take() throws InterruptedException {
     final ReentrantLock lock = this.lock;
     // 获取可中断的锁。
     lock.lockInterruptibly();
     try {
         // 循环获取数据。
         for (;;) {
             // 获取最早过期的元素,但是不弹出对象。
             E first = q.peek();
            // 如果最早过期的元素为空,说明队列为空,则线程直接进入无限期等待,并且让出锁。
            if (first == null)
                // 当前线程无限期等待,直到被唤醒,并且让出锁对象。
                available.await();
            else {
                // 获取最早过期的元素剩余过期时间。
                long delay = first.getDelay(NANOSECONDS);
                // 如果剩余过期时间小于0,则说明已经过期,反之还没有过期。
                if (delay <= )
                    // 如果已经过期直接获取最早过期的元素,并返回。
                    return q.poll();
                // 如果剩余过期日期大于0,则会进入到这里。
                // 将刚才获取的最早过期的元素设置为空。
                first = null; // don't retain ref while waiting
                // 如果有线程争抢的Leader线程,则进行无限期等待。
                if (leader != null)
                    // 无限期等待并让出锁。
                    available.await();
                else {
                    // 获取当前线程。
                    Thread thisThread = Thread.currentThread();
                    // 设置当前线程变为Leader线程。
                    leader = thisThread;
                    try {
                        // 等待剩余等待时间。
                        available.awaitNanos(delay);
                    } finally {
                        // 将Leader设置为null。
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果队列不为空,并且没有Leader则通知等待线程可以成为Leader。
        if (leader == null && q.peek() != null)
            // 通知等待线程。
            available.signal();
        lock.unlock();
    }
}
  1. 当获取元素时,先获取到锁对象。
  2. 获取最早过期的元素,但是并不从队列中弹出元素。
  3. 最早过期元素是否为空,如果为空则直接让当前线程无限期等待状态,并且让出当前锁对象。
  4. 如果最早过期的元素不为空
    1. 获取最早过期元素的剩余过期时间,如果已经过期则直接返回当前元素
    2. 如果没有过期,也就是说剩余时间还存在,则先获取Leader对象,如果Leader已经有线程在处理,则当前线程进行无限期等待,如果Leader为空,则首先将Leader设置为当前线程,并且让当前线程等待剩余时间。
    3. 最后将Leader线程设置为空
  5. 如果Leader已经为空,并且队列有内容则唤醒一个等待的队列。

poll操作

获取最早过期的元素,如果队列头没有过期的元素则直接返回null,反之返回过期的元素。

 public E poll() {
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
         E first = q.peek();
         // 如果队列为空或者队列最早过期的元素没有过期,则返回null。
         if (first == null || first.getDelay(NANOSECONDS) > 0)
             return null;
         else
            // 出队列操作。
            return q.poll();
    } finally {
        lock.unlock();
    }
}

小结

  • DelayQueue是一个无界的并发延迟阻塞队列,队列中的元素必须实现Delayed接口,相应了需要实现Comparable接口实现比较的方法
  • Leader-Follower模式的变种,用于最小化不必要的定时等待,当一个线程被选择为Leader时,它会等待延迟过去执行代码逻辑,而其他线程则需要无限期等待,在从take或poll返回之前,每当队列的头部被替换为具有更早到期时间的元素时,leader字段将通过重置为空而无效,Leader线程必须向其中一个Follower线程发出信号,被唤醒的 follwer 线程被设置为新的Leader 线程。

到此这篇关于java并发中DelayQueue延迟队列原理剖析的文章就介绍到这了,更多相关java DelayQueue延迟队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • java中DelayQueue实例用法详解

    在阻塞队里中,除了对元素进行增加和删除外,我们可以把元素的删除做一个延迟的处理,即使用DelayQueue的方法.这里的删除需要一定的时间才能生效,有点类似于过期处理的理念.下面我们就DelayQueue的概念.特点进行讲解,然后在代码示例中体会DelayQueue的使用. 1.概念 是一个带有延迟时间的无界阻塞队列.队列中的元素,只有等延时时间到了,才能取出来.此队列一般用于过期数据的删除,或任务调度.以下,模拟一下定长时间的数据删除. 2.特点 (1)无边界设计 (2)添加(put)不阻塞,

  • Java多线程并发开发之DelayQueue使用示例

    在学习Java 多线程并发开发过程中,了解到DelayQueue类的主要作用:是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走.这种队列是有序的,即队头对象的延迟到期时间最长.注意:不能将null元素放置到这种队列中. Delayed,一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象.此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序. 在网上看到了一些

  • java DelayQueue的原理浅析

    在对DelayQueue延迟功能的使用上,很多人不能后完全理解延迟的一些功能使用,这里我们深入来挖掘一下DelayQueue的原理. 下面将从构造方法.接口.继承体系三个方面进行分析,需要注意的是,相较于其它的阻塞队列,DelayQueue因为延迟的功能多了接口的使用,一起来看具体内容. 1.构造方法 public DelayQueue() {} public DelayQueue(Collection<? extends E> c) { this.addAll(c); } 构造方法比较简单,

  • 详解Java中的延时队列 DelayQueue

    当用户超时未支付时,给用户发提醒消息.另一种场景是,超时未付款,订单自动取消.通常,订单创建的时候可以向延迟队列种插入一条消息,到时间自动执行.其实,也可以用临时表,把这些未支付的订单放到一个临时表中,或者Redis,然后定时任务去扫描.这里我们用延时队列来做.RocketMQ有延时队列,RibbitMQ也可以实现,Java自带的也有延时队列,接下来就回顾一下各种队列. Queue 队列是一种集合.除了基本的集合操作以外,队列还提供了额外的插入.提取和检查操作.队列的每个方法都以两种形式存在:一

  • 详解java中DelayQueue的使用

    简介 今天给大家介绍一下DelayQueue,DelayQueue是BlockingQueue的一种,所以它是线程安全的,DelayQueue的特点就是插入Queue中的数据可以按照自定义的delay时间进行排序.只有delay时间小于0的元素才能够被取出. DelayQueue 先看一下DelayQueue的定义: public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements Bloc

  • java并发中DelayQueue延迟队列原理剖析

    介绍 DelayQueue队列是一个延迟队列,DelayQueue中存放的元素必须实现Delayed接口的元素,实现接口后相当于是每个元素都有个过期时间,当队列进行take获取元素时,先要判断元素有没有过期,只有过期的元素才能出队操作,没有过期的队列需要等待剩余过期时间才能进行出队操作. 源码分析 DelayQueue队列内部使用了PriorityQueue优先队列来进行存放数据,它采用的是二叉堆进行的优先队列,使用ReentrantLock锁来控制线程同步,由于内部元素是采用的Priority

  • Java并发编程之JUC并发核心AQS同步队列原理剖析

    目录 一.AQS介绍 二.AQS中的队列 1.同步等待队列 2.条件等待队列 3.AQS队列节点Node 三.同步队列源码分析 1.同步队列分析 2.同步队列--独占模式源码分析 3.同步队列--共享模式源码分析 一.AQS介绍 队列同步器AbstractQueuedSynchronizer(简称AQS),AQS定义了一套多线程访问共享资源的同步器框架,是用来构建锁或者其他同步组件的基础框架,是一个依赖状态(state)的同步器.Java并发编程的核心在java.util.concurrent(

  • Java并发编程之ReentrantLock实现原理及源码剖析

    目录 一.ReentrantLock简介 二.ReentrantLock使用 三.ReentrantLock源码分析 1.非公平锁源码分析 2.公平锁源码分析 前面<Java并发编程之JUC并发核心AQS同步队列原理剖析>介绍了AQS的同步等待队列的实现原理及源码分析,这节我们将介绍一下基于AQS实现的ReentranLock的应用.特性.实现原理及源码分析. 一.ReentrantLock简介 ReentrantLock位于Java的juc包里面,从JDK1.5开始出现,是基于AQS同步队列

  • 详解Java线程池队列中的延迟队列DelayQueue

    目录 DelayQueue延迟队列 DelayQueue使用场景 DelayQueue属性 DelayQueue构造方法 实现Delayed接口使用示例 DelayQueue总结 在阻塞队里中,除了对元素进行增加和删除外,我们可以把元素的删除做一个延迟的处理,即使用DelayQueue的方法.本文就来和大家聊聊Java线程池队列中的DelayQueue—延迟队列 public enum QueueTypeEnum { ARRAY_BLOCKING_QUEUE(1, "ArrayBlockingQ

  • Java延迟队列原理与用法实例详解

    本文实例讲述了Java延迟队列原理与用法.分享给大家供大家参考,具体如下: 延时队列,第一他是个队列,所以具有对列功能第二就是延时,这就是延时对列,功能也就是将任务放在该延时对列中,只有到了延时时刻才能从该延时对列中获取任务否则获取不到-- 应用场景比较多,比如延时1分钟发短信,延时1分钟再次执行等,下面先看看延时队列demo之后再看延时队列在项目中的使用: 简单的延时队列要有三部分:第一实现了Delayed接口的消息体.第二消费消息的消费者.第三存放消息的延时队列,那下面就来看看延时队列dem

  • java 高并发中volatile的实现原理

    java 高并发中volatile的实现原理 摘要: 在多线程并发编程中synchronized和Volatile都扮演着重要的角色,Volatile是轻量级的synchronized,它在多处理器开发中保证了共享变量的"可见性".可见性的意思是当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值.它在某些情况下比synchronized的开销更小 1. 定义: java编程语言允许线程访问共享变量,为了确保共享变量能被准确和一致的更新,线程应该确保通过排他锁单独获得这个变量.

  • Java必会的Synchronized底层原理剖析

    目录 1. synchronized作用 2. synchronized用法 3. synchronized加锁原理 synchronized作为Java程序员最常用同步工具,很多人却对它的用法和实现原理一知半解,以至于还有不少人认为synchronized是重量级锁,性能较差,尽量少用. 但不可否认的是synchronized依然是并发首选工具,连volatile.CAS.ReentrantLock都无法动摇synchronized的地位.synchronized是工作面试中的必备技能,今天就

  • 聊聊Java并发中的Synchronized

    1 引言 在多线程并发编程中Synchronized一直是元老级角色,很多人都会称呼它为重量级锁,但是随着Java SE1.6对Synchronized进行了各种优化之后,有些情况下它并不那么重了,本文详细介绍了Java SE1.6中为了减少获得锁和释放锁带来的性能消耗而引入的偏向锁和轻量级锁,以及锁的存储结构和升级过程. 2 术语定义 术语 英文 说明 CAS Compare and Swap 比较并设置.用于在硬件层面上提供原子性操作.在 Intel 处理器中,比较并交换通过指令cmpxch

  • java并发等待条件的实现原理详解

    前言 前面介绍了排它锁,共享锁的实现机制,本篇继续学习AQS中的另外一个内容-Condition.想必学过java的都知道Object.wait和Object.notify,同时也应该知晓这两个方法的使用离不开synchronized关键字.synchronized是jvm级别提供的同步原语,它的实现机制隐藏在jvm实现中.作为Lock系列功能中的Condition,就是用来实现类似 Object.wait和Object.notify 对应功能的. 使用场景 为了更好的理解Lock和Condit

  • Java并发中的Fork/Join 框架机制详解

    什么是 Fork/Join 框架 Fork/Join 框架是一种在 JDk 7 引入的线程池,用于并行执行把一个大任务拆成多个小任务并行执行,最终汇总每个小任务结果得到大任务结果的特殊任务.通过其命名也很容易看出框架主要分为 Fork 和 Join 两个阶段,第一阶段 Fork 是把一个大任务拆分为多个子任务并行的执行,第二阶段 Join 是合并这些子任务的所有执行结果,最后得到大任务的结果. 这里不难发现其执行主要流程:首先判断一个任务是否足够小,如果任务足够小,则直接计算,否则,就拆分成几个

随机推荐