详解Java中的延时队列 DelayQueue

当用户超时未支付时,给用户发提醒消息。另一种场景是,超时未付款,订单自动取消。通常,订单创建的时候可以向延迟队列种插入一条消息,到时间自动执行。其实,也可以用临时表,把这些未支付的订单放到一个临时表中,或者Redis,然后定时任务去扫描。这里我们用延时队列来做。RocketMQ有延时队列,RibbitMQ也可以实现,Java自带的也有延时队列,接下来就回顾一下各种队列。

Queue

队列是一种集合。除了基本的集合操作以外,队列还提供了额外的插入、提取和检查操作。队列的每个方法都以两种形式存在:一种是当操作失败时抛异常,另一种是返回一个特定的值(null或者false,取决于具体操作)。后一种形式的插入操作是专门设计用于有界队列实现的,在大多情况下,插入操作不会失败。

队列通常(但不一定)以FIFO(先进先出)的方式对元素进行排序。例外情况包括优先级队列(根据提供的比较器对元素进行排序或元素的自然排序)和LIFO队列(或堆栈),对LIFO进行排序(后进先出)。无论使用哪种顺序,队列的开头都是该元素,可以通过调用remove()或poll()将其删除。在FIFO队列中,所有新元素都插入队列的尾部。其他种类的队列可能使用不同的放置规则。 每个Queue实现必须指定其排序属性。无论使用哪种顺序,都可以通过调用remove()或poll()来删除队列开头的元素。在FIFO队列中,所有新元素都插入到队列的尾部。其他类型的队列可能使用不同的放置规则。每个队列实现都必须指定其排序属性。

offer方法在可以的情况下会向队列种插入一个元素,否则返回false。这不同于Collection.add方法,后者只能通过抛异常来添加元素。offer方法设计用于在正常情况下(而不是在例外情况下)发生故障时,例如在固定容量(或者“有界”)队列种使用。

remove()和poll()方法删除并返回队头元素。当队列为空时,remove()抛出异常,而poll()返回null。

element()和peek()方法返回队头元素。

PriorityQueue

PriorityQueue是一个无界优先级队列是基于优先级堆的。优先级队列种的元素根据自然顺序进行排序,或者通过在队列构建时提供的Comparator进行排序,当然这取决于使用哪种构造函数。优先级队列不允许空(null)元素。一个依赖自然顺序的优先级队列也不允许插入不可比较的对象。

优先级队列的队头元素是最小的元素,如果有多个元素并列最小,那么队头是它们其中之一。

优先级队列是无界的,但是有一个内部容量来控制用于在队列上存储元素的数组的大小。它总是至少与队列大小一样大。将元素添加到优先级队列时,其容量会自动增长

BlockingQueue

这种队列还支持以下操作:在检索元素时等待队列变为非空,并在存储元素时等待队列中的空间变为可用。

BlockingQueue方法有四种形式,它们以不同的方式处理操作,这些操作无法立即满足,但将来可能会满足:一种抛出异常,第二种返回特殊值(null或false,取决于具体操作),第三种阻塞当前线程,直到操作成功为止;第四种阻塞当前线程,超时则放弃。 下表总结了这些方法:

阻塞队列不接受空元素,如果你试图add , put 或者 offer 一个null,将会抛NullPointerException。

阻塞队列是线程安全的。所有排队方法都使用内部锁或者其他形式的并发控制来保证以原子方式实现它们的效果。

阻塞队列被设计主要用于生产者-消费者队列。

下面是一个典型的生产者-消费者方案:

package com.example;

import java.text.MessageFormat;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * @author ChengJianSheng
 * @date 2020/12/15
 */
public class Setup {
  public static void main(String[] args) {
    BlockingQueue<Bread> queue = new ArrayBlockingQueue<>(5);

    Producer p1 = new Producer(queue);
    Producer p2 = new Producer(queue);
    Consumer c1 = new Consumer(queue);
    Consumer c2 = new Consumer(queue);

    new Thread(p1, "p1").start();
    new Thread(p2, "p2").start();
    new Thread(c1, "c1").start();
    new Thread(c2, "c2").start();
  }
}

class Bread {

}

/**
 * 生产者
 */
class Producer implements Runnable {

  private final BlockingQueue<Bread> queue;

  public Producer(BlockingQueue<Bread> queue) {
    this.queue = queue;
  }

  @Override
  public void run() {
    try {
      while (true) {
        queue.put(produce());
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  public Bread produce() {
    try {
      Thread.sleep(Math.round(2000));
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return new Bread();
  }
}

/**
 * 消费者
 */
class Consumer implements Runnable {

  private final BlockingQueue<Bread> queue;

  public Consumer(BlockingQueue<Bread> queue) {
    this.queue = queue;
  }

  @Override
  public void run() {
    try {
      while (true) {
        consume(queue.take());
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  public void consume(Bread bread) {
    try {
      Thread.sleep(Math.round(2000));
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

ArrayBlockingQueue

ArrayBlockingQueue是用数组实现的有界阻塞队列。这种队列中的元素按FIFO(先进先出)排序。队头是在队列中停留最长时间的元素。队尾是在队列中停留时间最短的元素。新元素插入到队列的尾部,并且队列检索操作在队列的头部获取元素。

这是一个经典的“有界缓冲区”,其中固定大小的数组包含由生产者插入并由消费者提取的元素。 创建后,容量将无法更改。 试图将一个元素放入一个已满的队列将导致操作阻塞; 试图从空队列中取出一个元素也会阻塞。

这个类支持一个可选的公平性策略,用于对等待的生产者和消费者线程进行排序。默认情况下,不保证这个顺序。然而,将公平性设置为true的队列将按FIFO顺序授予线程访问权。公平性通常会降低吞吐量,但会降低可变性并避免饥饿。

LinkedBlockingQueue

LinkedBlockingQueue是一个基于链表实现的可选边界的阻塞队列。

PriorityBlockingQueue

PriorityBlockingQueue是一个无界阻塞队列,它使用与PriorityQueue相同的排序规则,并提供阻塞检索操作。

DelayQueue

DelayQueue是一种由延迟元素组成的无界阻塞队列,在该队列中,仅当元素的延迟到期时才可以使用该元素。队头是已经过期的延迟元素,它已过期时间最长。如果没有过期的延迟,则队列没有头部,此时调用poll将返回null。当调用元素的getDelay(TimeUnit.NANOSECONDS)方法返回值小于或等于0时,就会发生过期。即使元素没有过期,也不能用take或者poll将其删除。

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer提供了一个框架来实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关的同步器(信号灯,事件等)。该类旨在为大多数依赖单个原子int值表示状态的同步器提供有用的基础。子类必须定义更改此状态的受保护方法,并定义该状态对于获取或释放此对象而言意味着什么。 鉴于这些,此类中的其他方法将执行所有排队和阻塞机制。 子类可以维护其他状态字段,但是仅跟踪关于同步的使用方法getState(),setState(int)和compareAndSetState(int,int)操作的原子更新的int值。

小结

1、Queue是一个集合,队列的每个方法都有两种形式,一种是抛异常,另一种是返回一个特定的值。

2、PriorityQueue是一个无界优先级队列,默认情况下,队列种的元素按自然顺序排序,或者根据提供的Comparator进行排序。也就是说,优先级队列种的元素都是经过排序的,排序规则可以自己指定,同时队列种的元素都必须是可排序的。

3、BlockingQueue是一个阻塞队列,向已满的队列种插入元素时会阻塞,向空队列中取元素时也会阻塞;阻塞队列被设计主要用于生产者-消费者队列。

4、ArrayBlockingQueue是用数组实现的有界阻塞队列,队列种的元素按FIFO(先进先出)排序。

5、LinkedBlockingQueue是用链表实现的可选边界的阻塞队列。

6、PriorityBlockingQueue相当于是阻塞队列和优先级队列的合体,排序规则与优先级队列相同。

7、DelayQueue延时队列中的元素都有一个有效期,只有当过了有效期才可以使用该元素。

以上就是详解Java中的延时队列 DelayQueue的详细内容,更多关于Java 延时队列 DelayQueue的资料请关注我们其它相关文章!

(0)

相关推荐

  • 一口气说出Java 6种延时队列的实现方法(面试官也得服)

    五一期间原计划是写两篇文章,看一本技术类书籍,结果这五天由于自律性过于差,禁不住各种诱惑,我连电脑都没打开过,计划完美宣告失败.所以在这能看出和大佬之间的差距,人家没白没夜的更文,比你优秀的人比你更努力,难以望其项背,真是让我自愧不如. 知耻而后勇,这不逼着自己又学起来了,个人比较喜欢一些实践类的东西,既学习到知识又能让技术落地,能搞出个demo最好,本来不知道该分享什么主题,好在最近项目紧急招人中,而我有幸做了回面试官,就给大家整理分享一道面试题:"如何实现延时队列?". 下边会介绍

  • springboot执行延时任务之DelayQueue的使用详解

    DelayQueue简介 DelayQueue是一个无界阻塞队列,只有在延迟期满时,才能从中提取元素. 队列的头部,是延迟期满后保存时间最长的delay元素. 在很多场景我们需要用到延时任务,比如给客户异步转账操作超时后发通知告知用户,还有客户下单后多长时间内没支付则取消订单等等,这些都可以使用延时任务来实现. jdk中DelayQueue可以实现上述需求,顾名思义DelayQueue就是延时队列. DelayQueue提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素. 没有

  • Java多线程并发开发之DelayQueue使用示例

    在学习Java 多线程并发开发过程中,了解到DelayQueue类的主要作用:是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走.这种队列是有序的,即队头对象的延迟到期时间最长.注意:不能将null元素放置到这种队列中. Delayed,一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象.此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序. 在网上看到了一些

  • 详解java中DelayQueue的使用

    简介 今天给大家介绍一下DelayQueue,DelayQueue是BlockingQueue的一种,所以它是线程安全的,DelayQueue的特点就是插入Queue中的数据可以按照自定义的delay时间进行排序.只有delay时间小于0的元素才能够被取出. DelayQueue 先看一下DelayQueue的定义: public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements Bloc

  • SpringBoot使用RabbitMQ延时队列(小白必备)

    1.什么是MQ MQ,是一种跨进程的通信机制,用于上下游传递消息. 在互联网架构中,MQ是一种非常常见的上下游"逻辑解耦+物理解耦"的消息通信服务. 使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务. 为什么会产生消息列队? 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个: 不

  • 基于golang的简单分布式延时队列服务的实现

    一.引言 背景 我们在做系统时,很多时候是处理实时的任务,请求来了马上就处理,然后立刻给用户以反馈.但有时也会遇到非实时的任务,比如确定的时间点发布重要公告.或者需要在用户做了一件事情的X分钟/Y小时后,EG: "PM:我们需要在这个用户通话开始10分钟后给予提醒给他们发送奖励" 对其特定动作,比如通知.发券等等.一般我接触到的解决方法中在比较小的服务里都会自己维护一个backend,但是随着这种backend和server增多,这种方法很大程度和本身业务耦合在一起,所以这时需要一个延

  • 详解Java中的延时队列 DelayQueue

    当用户超时未支付时,给用户发提醒消息.另一种场景是,超时未付款,订单自动取消.通常,订单创建的时候可以向延迟队列种插入一条消息,到时间自动执行.其实,也可以用临时表,把这些未支付的订单放到一个临时表中,或者Redis,然后定时任务去扫描.这里我们用延时队列来做.RocketMQ有延时队列,RibbitMQ也可以实现,Java自带的也有延时队列,接下来就回顾一下各种队列. Queue 队列是一种集合.除了基本的集合操作以外,队列还提供了额外的插入.提取和检查操作.队列的每个方法都以两种形式存在:一

  • 详解java中的阻塞队列

    阻塞队列简介 阻塞队列(BlockingQueue)首先是一个支持先进先出的队列,与普通的队列完全相同: 其次是一个支持阻塞操作的队列,即: 当队列满时,会阻塞执行插入操作的线程,直到队列不满. 当队列为空时,会阻塞执行获取操作的线程,直到队列不为空. 阻塞队列用在多线程的场景下,因此阻塞队列使用了锁机制来保证同步,这里使用的可重入锁: 而对于阻塞与唤醒机制则有与锁绑定的Condition实现 应用场景:生产者消费者模式 java中的阻塞队列 java中的阻塞队列根据容量可以分为有界队列和无界队

  • 详解Java中CountDownLatch异步转同步工具类

    使用场景 由于公司业务需求,需要对接socket.MQTT等消息队列. 众所周知 socket 是双向通信,socket的回复是人为定义的,客户端推送消息给服务端,服务端的回复是两条线.无法像http请求有回复. 下发指令给硬件时,需要校验此次数据下发是否成功. 用户体验而言,点击按钮就要知道此次的下发成功或失败. 如上图模型, 第一种方案使用Tread.sleep 优点:占用资源小,放弃当前cpu资源 缺点: 回复速度快,休眠时间过长,仍然需要等待休眠结束才能返回,响应速度是固定的,无法及时响

  • 详解Java中的ReentrantLock锁

    ReentrantLock锁 ReentrantLock是Java中常用的锁,属于乐观锁类型,多线程并发情况下.能保证共享数据安全性,线程间有序性 ReentrantLock通过原子操作和阻塞实现锁原理,一般使用lock获取锁,unlock释放锁, 下面说一下锁的基本使用和底层基本实现原理,lock和unlock底层 lock的时候可能被其他线程获得所,那么此线程会阻塞自己,关键原理底层用到Unsafe类的API: CAS和park 使用 java.util.concurrent.locks.R

  • 详解JAVA中priorityqueue的具体使用

    Java中PriorityQueue通过二叉小顶堆实现,可以用一棵完全二叉树表示.本文从Queue接口函数出发,结合生动的图解,深入浅出地分析PriorityQueue每个操作的具体过程和时间复杂度,将让读者建立对PriorityQueue建立清晰而深入的认识. 总体介绍 前面以JavaArrayDeque为例讲解了Stack和Queue,其实还有一种特殊的队列叫做PriorityQueue,即优先队列.优先队列的作用是能保证每次取出的元素都是队列中权值最小的(Java的优先队列每次取最小元素,

  • 详解Java中的悲观锁与乐观锁

    一.悲观锁 悲观锁顾名思义是从悲观的角度去思考问题,解决问题.它总是会假设当前情况是最坏的情况,在每次去拿数据的时候,都会认为数据会被别人改变,因此在每次进行拿数据操作的时候都会加锁,如此一来,如果此时有别人也来拿这个数据的时候就会阻塞知道它拿到锁.在Java中,Synchronized和ReentrantLock等独占锁的实现机制就是基于悲观锁思想.在数据库中也经常用到这种锁机制,如行锁,表锁,读写锁等,都是在操作之前先上锁,保证共享资源只能给一个操作(一个线程)使用. 由于悲观锁的频繁加锁,

  • 详解Java中的reactive stream协议

    背景 每个数据流都有一个生产者一个消费者.生产者负责产生数据,而消费者负责消费数据.如果是同步系统,生产一个消费一个没什么问题.但是如果在异步系统中,就会产生问题. 因为生产者无法感知消费者的状态,不知道消费者到底是繁忙状态还是空闲状态,是否有能力去消费更多的数据. 一般来说数据队列的长度都是有限的,即使没有做限制,但是系统的内存也是有限的.当太多的数据没有被消费的话,会导致内存溢出或者数据得不到即使处理的问题. 这时候就需要back-pressure了. 如果消息接收方消息处理不过来,则可以通

  • 详解Java中的ThreadLocal

    一.ThreadLocal简介 多线程访问同一个共享变量的时候容易出现并发问题,特别是多个线程对一个变量进行写入的时候,为了保证线程安全,一般使用者在访问共享变量的时候需要进行额外的同步措施才能保证线程安全性.ThreadLocal是除了加锁这种同步方式之外的一种保证一种规避多线程访问出现线程不安全的方法,当我们在创建一个变量后,如果每个线程对其进行访问的时候访问的都是线程自己的变量这样就不会存在线程不安全问题. 二.ThreadLocal简单使用 下面的例子中,开启两个线程,在每个线程内部设置

  • 详解Java中的锁Lock和synchronized

    一.Lock接口 1.Lock接口和synchronized内置锁 a)synchronized:Java提供的内置锁机制,Java中的每个对象都可以用作一个实现同步的锁(内置锁或者监视器Monitor),线程在进入同步代码块之前需要或者这把锁,在退出同步代码块会释放锁.而synchronized这种内置锁实际上是互斥的,即没把锁最多只能由一个线程持有. b)Lock接口:Lock接口提供了与synchronized相似的同步功能,和synchronized(隐式的获取和释放锁,主要体现在线程进

随机推荐