一文带你深入了解Java中延时任务的实现

目录
  • 概述
  • JAVA DelayQueue
    • DelayQueue的实现原理
    • DelayQueue实现延时队列的优缺点
  • 时间轮算法
  • 时间轮的具体实现
    • 进阶优化版时间轮算法
    • 时间轮算法的应用
    • 小结
  • redis延时队列
  • mq延时队列
    • rocketmq延时消息
    • rocketmq的精准延时消息
  • 总结

概述

延时任务相信大家都不陌生,在现实的业务中应用场景可以说是比比皆是。例如订单下单15分钟未支付直接取消,外卖超时自动赔付等等。这些情况下,我们该怎么设计我们的服务的实现呢?

笨一点的方法自然是定时任务去数据库进行轮询。但是当业务量较大,事件处理比较费时的时候,我们的系统和数据库往往会面临巨大的压力,如果采用这种方式或许会导致数据库和系统的崩溃。那么有什么好办法吗?今天我来为大家介绍几种实现延时任务的办法。

JAVA DelayQueue

你没看错,java内部有内置延时队列,位于java concurrent包内。

DelayQueue是一个jdk中自带的延时队列实现,他的实现依赖于可重入锁ReentrantLock以及条件锁Condition和优先队列PriorityQueue。而且本质上他也是一个阻塞队列。那么他是如何实现延时效果的呢。

DelayQueue的实现原理

首先DelayQueue队列中的元素必须继承一个接口叫做Delayed,我们找到这个类

public interface Delayed extends Comparable<Delayed> {
        long getDelay(TimeUnit unit);
    }

发现这个类内部定义了一个返回值为long的方法getDelay,这个方法用来定义队列中的元素的过期时间,所有需要放在队列中的元素,必须实现这个方法。

然后我们来看看延迟队列的队列是如何操作的,我们就拿最典型的offertake来看:

public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

offer操作平平无奇,甚至直接调用到了优先队列的offer来将队列根据延时进行排序,只不过加了个锁,做了些数据的调整,没有什么深入的地方,但是take的实现看上去就很复杂了。(注意,Dalayed继承了Comparable方法,所以是可以直接用优先队列来排序的,只要你自己实现了compareTo方法)我尝试加了些注释让各位看得更明白些:

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 自选操作
            for (;;) {
                // 获取队列第一个元素,如果队列为空
                // 阻塞住直到有新元素加入队列,offer等方法调用signal唤醒线程
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    // 如果队列中有元素
                    long delay = first.getDelay(NANOSECONDS);
                    // 判断延时时间,如果到时间了,直接取出数据并return
                    if (delay <= 0)
                        return q.poll();
                    first = null;
                    // 如果leader为空则阻塞
                    if (leader != null)
                        available.await();
                    else {
                        // 获取当前线程
                        Thread thisThread = Thread.currentThread();
                        // 设置leader为当前线程
                        leader = thisThread;
                        try {
                            // 阻塞延时时间
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

我们可以看到take的实现依靠了无限自旋,直到第一个队列元素过了超时时间后才会返回,否则等待他的只有被阻塞。

DelayQueue实现延时队列的优缺点

看了源码后,我们应该对DelayQueue的实现有了一个大致的了解,也对他的优缺点有了一定的理解。他的优点很明显:

  • java原生支持,不需要引入第三方工具
  • 线程安全,即插即用使用方便

但是他的缺点也是很明显的:

  • 不支持分布式,并且数据放在内存中,没有持久化的支持,服务宕机会丢失数据
  • 插入时使用的是优先队列的排序,时间复杂度较高,并且对于队列中的任务不能很好的管理

所以有没有更好的延时队列的实现呢,我们继续看下去~

时间轮算法

时间轮算法是一个被设计出来处理延时任务的算法,现实中的应用可以在kafka以及netty等项目中找到类似的实现。

时间轮的具体实现

所谓时间轮,顾名思义,他是一个类似于时钟的结构,即他的主结构是一个环形数组,如图:

环形数组中存放的是一个一个的链表,链表中存放着需要执行的任务,我们设定好数组中执行的间隔,假设我们的环形数组的长度是60,每个数组的执行间隔为1s,那么我们会在每过1s就会执行数组下一个元素中的链表中的元素。如果只是这样,那么我们将无法处理60秒之外的延时任务,这显然不合适,所以我们会在每个任务中加上一个参数圈数,来表明任务会在几圈后执行。假如我们有一个任务是在150s后执行,那么他应该在30s的位置,同时圈数应该为2。我们每次执行一个链表中的任务的时候会把当圈需要执行的任务取出执行,然后把他从链表中删除,如果任务不是当圈执行,则修改他的圈数,将圈数减1,于是一个简单的时间轮出炉了。

那么这样的时间轮有什么优缺点呢?

先来说优点吧:

  • 相比DelayQueue来说,时间轮的插入更加的高效,时间复杂度为O(1)
  • 实现简单清晰,任务调度更加方便合理

当然他的缺点也不少:

  • 他和DelayQueue一样不支持分布式,并且数据放在内存中,没有持久化的支持,服务宕机会丢失数据
  • 数组间的间隔设置会影响任务的精度
  • 由于不同圈数的任务会在同一个链表中,执行到每个数组元素时需要遍历所有的链表数据,效率会很低

进阶优化版时间轮算法

刚才提到了一些时间轮算法的缺点,那么是不是有一些方法来进行下优化?这里我来介绍一下时间轮的优化版本。

之前我们提到不同圈数的任务会在同一个链表中被重复遍历影响效率,这种情况下我们可以进行如下优化:将时间轮进行分层

我们可以看到图中,我们采用了多层级的设计,上图中分了三层,每层都是60格,第一个轮盘中的间隔为1小时,我们的数据每一次都是插入到这个轮盘中,每当这个轮盘经过一个小时后来到下一个刻度,就会取出其中的所有元素,按照延迟时间放入到第二个象征着分钟的轮盘中,以此类推。

这样的实现好处可以说是显而易见的:

  • 首先避免了当时间跨度较大时空间的浪费
  • 每一次到达刻度的时候我们不用再像以前那样遍历链表取出需要的数据,而是可以一次性全部拿出来,大大节约了操作的时间

时间轮算法的应用

时间轮算法可能在之前大家没有听说过,但是他在各个地方都有着不小的作用。linux的定时器的实现中就有时间轮的身影,同样如果你是一个喜好看源码的读者,你也可能会在kafka以及netty中找到他的实现。

kafka

kafka中应用了时间轮算法,他的实现和之前提到的进阶版时间轮没有太大的区别,只有在一点上:kafka内部实现的时间轮应用到了DelayQueue

@nonthreadsafe
    private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {

    private[this] val interval = tickMs * wheelSize
    private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }

    private[this] var currentTime = startMs - (startMs % tickMs)

    @volatile private[this] var overflowWheel: TimingWheel = null

    private[this] def addOverflowWheel(): Unit = {
        synchronized {
        if (overflowWheel == null) {
            overflowWheel = new TimingWheel(
            tickMs = interval,
            wheelSize = wheelSize,
            startMs = currentTime,
            taskCounter = taskCounter,
            queue
            )
        }
        }
    }

    def add(timerTaskEntry: TimerTaskEntry): Boolean = {
        val expiration = timerTaskEntry.expirationMs

        if (timerTaskEntry.cancelled) {
        false
        } else if (expiration < currentTime + tickMs) {
        false
        } else if (expiration < currentTime + interval) {
        val virtualId = expiration / tickMs
        val bucket = buckets((virtualId % wheelSize.toLong).toInt)
        bucket.add(timerTaskEntry)

        if (bucket.setExpiration(virtualId * tickMs)) {
            queue.offer(bucket)
        }
        true
        } else {
        if (overflowWheel == null) addOverflowWheel()
        overflowWheel.add(timerTaskEntry)
        }
    }

    def advanceClock(timeMs: Long): Unit = {
        if (timeMs >= currentTime + tickMs) {
        currentTime = timeMs - (timeMs % tickMs)

        if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
        }
    }
    }

上面是kafka内部的实现(使用的语言是scala),我们可以看到实现非常的简洁,并且使用到了DelayQueue。我们刚才已经讨论过了DelayQueue的优缺点,查看源码后我们已经可以有一个大致的结论了:DelayQueue在kafka的时间轮中的作用是负责推进任务的,为的就是防止在时间轮中由于任务比较稀疏而造成的"空推进"。DelayQueue的触发机制可以很好的避免这一点,同时由于DelayQueue的插入效率较低,所以仅用于底层的推进,任务的插入由时间轮来操作,两者配置,可以实现效率和资源的平衡。

netty

netty的内部也有时间轮的实现HashedWheelTimer

HashedWheelTimer的实现要比kafka内部的实现复杂许多,和kafka不同的是,它的内部推进不是依靠的DelayQueue而是自己实现了一套,源码太长,有兴趣的读者可以自己去看一下。

小结

时间轮说了这么多,我们可以看到他的效率是很出众的,但是还是有这么一个问题:他不支持分布式。当我们的业务很复杂,需要分布式的时候,时间轮显得力不从心,那么这个时候有什么好一点的延时队列的选择呢?我们或许可以尝试使用第三方的工具

redis延时队列

其实啊说起延时,我们如果常用redis的话,就会想起redis是存在过期机制的,那么我们是否可以利用这个机制来实现一个延时队列呢?

redis自带key的过期机制,而且可以设置过期后的回调方法。基于此特性,我们可以非常容易就完成一个延时队列,任务进来时,设定定时时间,并且配置好过期回调方法即可。

除了使用redis的过期机制之外,我们也可以利用它自带的zset来实现延时队列。zset支持高性能的排序,因此我们任务进来时可以将时间戳作为排序的依据,以此将任务的执行先后进行有序的排列,这样也能实现延时队列。

zset实现延时队列的好处:

  • 支持高性能排序
  • redis本身的高可用和高性能以及持久性

mq延时队列

rocketmq延时消息

rocketmq天然支持延时消息,他的延时消息分为18个等级,每个等级对应不同的延时时间。

那么他的原理是怎样的呢?

rocketmqbroker收到消息后会将消息写入commitlog,并且判断这个消息是否是延时消息(即delay属性是否大于0),之后如果判断确实是延时消息,那么他不会马上写入,而是通过转发的方式将消息放入对应的延时topic(18个延时级别对应18个topic

rocketmq会有一个定时任务进行轮询,如果任务的延迟时间已经到了就发往指定的topic

这个设计比较的简单粗暴,但是缺点也十分明显:

  • 延时是固定的,如果想要的延迟超出18个级别就没办法实现
  • 无法实现精准延时,队列的堆积等等情况也会导致执行产生误差

rocketmq的精准延时消息

rocketmq本身是不支持的精确延迟的,他的商业版本ons倒是支持。不过rocketmq的社区中有相应的解决方案。方案是借助于时间轮算法来实现的,感兴趣的朋友可以自行去社区查看。(社区中的一些未被合并的pr是不错的实现参考)

总结

延时队列的实现千千万,但是如果要在生产中大规模使用,那么大部分情况下其实都避不开时间轮算法。改进过的时间轮算法可以做到精准延时,持久化,高性能,高可用性,可谓是完美。但是话又说回来,其他的延时方式就无用了吗?其实不是的,所有的方式都是需要匹配自己的使用场景。如果你是极少量数据的轮询,那么定时轮询数据库或许才是最佳的解决方案,而不是无脑的引入复杂的延时队列。如果是单机的任务,那么jdk的延时队列也是不错的选择。

本文介绍的这些延时队列只是为了向大家展示他们的原理和优缺点,具体的使用还需要结合自己业务的场景。

以上就是一文带你深入了解Java中延时任务的实现的详细内容,更多关于Java延时任务的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java处理延时任务的常用几种解决方案

    目录 前言 数据库轮询 原理 优缺点 Java延迟队列 Reids监听失效key 创建监听类,实现MessageListener接口 RocketMq延迟消息 总结 前言 项目中经常会遇到如下的需求: 创建订单30分钟未支付,订单自动取消. 订单支付成功后,1分钟后给用户发送短信,提醒用户评价. … 针对延时任务需求,我们可以采用如下的解决方案: 数据库轮询 原理 通过一个线程定时的扫描数据库当天创建的订单,根据订单的创建时间来判断订单是否超时,针对超时订单进行相关的更新操作.实现技术采用Spr

  • Java利用redis zset实现延时任务详解

    目录 一.实现原理 二.准备工作 三.代码实现 四.优缺点 所谓的延时任务给大家举个例子:你买了一张火车票,必须在30分钟之内付款,否则该订单被自动取消.「订单30分钟不付款自动取消,这个任务就是一个延时任务.」   我之前已经写过2篇关于延时任务的文章: <通过DelayQueue实现延时任务> <基于netty时间轮算法实战> 这两种方法都有一个缺点:都是基于单体应用的内存的方式运行延时任务的,一旦出现单点故障,可能出现延时任务数据的丢失.所以此篇文章给大家介绍实现延时任务的第

  • Java DelayQueue实现延时任务的示例详解

    目录 一.DelayQueue的应用原理 二.订单延时任务的实现 三.订单处理 四.优缺点 一.DelayQueue的应用原理 DelayQueue是一个无界的BlockingQueue的实现类,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走. BlockingQueue即阻塞队列,java提供的面向多线程安全的队列数据结构,当队列内元素数量为0的时候,试图从队列内获取元素的线程将被阻塞或者抛出异常. 这里的“无界”队列,是指队列的元素数量不存在上限,队列的容量

  • Java利用Netty时间轮实现延时任务

    目录 一.时间轮算法简介 二.时间轮hello-world 三.异步任务线程池 四.时间轮优缺点 知识点补充 一.时间轮算法简介 为了大家能够理解下文中的代码,我们先来简单了解一下netty时间轮算法的核心原理 时间轮算法名副其实,时间轮就是一个环形的数据结构,类似于表盘,将时间轮分成多个bucket(比如:0-8).假设每个时间轮轮片的分隔时间段tickDuration=1s(即:指针经过每个格子花费时间是 1 s),当前的时间bucket=3,那么在18秒后需要被执行的任务需要落到((3+1

  • Java自带定时任务ScheduledThreadPoolExecutor实现定时器和延时加载功能

    java.util.concurrent.ScheduledThreadPoolExecutor 是JDK1 .6之后自带的包,功能强大,能实现定时器和延时加载的功能 各类功能和处理方面优于Timer 1.定时器: ScheduledThreadPoolExecutor  有个scheduleAtFixedRate(command, initialDelay, period, unit) ;方法 command: 执行的线程(可自己New一个) initialDelay:初始化执行的延时时间 p

  • Java DelayQueue实现任务延时示例讲解

    在项目中有使用到延时队列的场景,做个简单的记录说明:首先DelayQueue实现了BlockingQueue,加入其中的元素必须实现Delayed接口: 当生产者元素调用put往其中加入元素时,出发Delayed接口的compareTo方法进行排序,这个排序是按照时间的,按照计划执行的时间排序,先执行的在前面,后执行的排后面:消费者获取元素时,调用getDelay方法返回的值大于0,则消费者线程wait返回的这个时间后,再从队列头部取出元素:下面是个简单的例子 import org.jetbra

  • 一文带你深入了解Java中延时任务的实现

    目录 概述 JAVA DelayQueue DelayQueue的实现原理 DelayQueue实现延时队列的优缺点 时间轮算法 时间轮的具体实现 进阶优化版时间轮算法 时间轮算法的应用 小结 redis延时队列 mq延时队列 rocketmq延时消息 rocketmq的精准延时消息 总结 概述 延时任务相信大家都不陌生,在现实的业务中应用场景可以说是比比皆是.例如订单下单15分钟未支付直接取消,外卖超时自动赔付等等.这些情况下,我们该怎么设计我们的服务的实现呢? 笨一点的方法自然是定时任务去数

  • 一文带你真正理解Java中的内部类

    目录 概述 内部类介绍和分类 常规内部类 局部内部类 匿名内部类 静态内部类 静态内部类和普通内部类的区别 内部类的作用 概述 不知道大家在平时的开发过程中或者源码里是否留意过内部类,那有思考过为什么要有内部类,内部类都有哪几种形式,静态内部类和普通内部类有什么区别呢?本篇文章主要带领大家理解下这块内容. 内部类介绍和分类 顾名思义,内部类是指一个类在另外一个类的内部,是定义在另一个类中的类.根据类的位置和属性不同,可以分为下面几种. 常规内部类 @Data public class Tree

  • 一文带你搞懂Java中的泛型和通配符

    目录 概述 泛型介绍和使用 泛型类 泛型方法 类型变量的限定 通配符使用 无边界通配符 通配符上界 通配符下界 概述 泛型机制在项目中一直都在使用,比如在集合中ArrayList<String, String>, Map<String,String>等,不仅如此,很多源码中都用到了泛型机制,所以深入学习了解泛型相关机制对于源码阅读以及自己代码编写有很大的帮助.但是里面很多的机制和特性一直没有明白,特别是通配符这块,对于通配符上界.下界每次用每次百度,经常忘记,这次我就做一个总结,加

  • 一文带你搞懂Java中Get和Post的使用

    目录 1 Get请求数据 1.1 Controller 1.2 Service 1.3 Application 1.4 Postman 2 Post接收数据 2.1 Controller 2.2 Service 2.3 Application 2.4 Postman 3 Post发送数据 3.1 Controller 3.2 Service 3.3 ResponseResult 3.4 Config 3.5 Application 3.6 Postman 1 Get请求数据 项目地址:https

  • 一文带你搞懂Java中的递归

    目录 概述 递归累加求和 计算1 ~ n的和 代码执行图解 递归求阶乘 递归打印多级目录 综合案例 文件搜索 文件过滤器优化 Lambda优化 概述 递归:指在当前方法内调用自己的这种现象. 递归的分类: 递归分为两种,直接递归和间接递归. 直接递归称为方法自身调用自己. 间接递归可以A方法调用B方法,B方法调用C方法,C方法调用A方法. 注意事项: 递归一定要有条件限定,保证递归能够停止下来,否则会发生栈内存溢出. 在递归中虽然有限定条件,但是递归次数不能太多.否则也会发生栈内存溢出. 构造方

  • 一文带你弄懂Java中线程池的原理

    目录 为什么要用线程池 线程池的原理 ThreadPoolExecutor提供的构造方法 ThreadPoolExecutor的策略 线程池主要的任务处理流程 ThreadPoolExecutor如何做到线程复用的 四种常见的线程池 newCachedThreadPool newFixedThreadPool newSingleThreadExecutor newScheduledThreadPool 小结 在工作中,我们经常使用线程池,但是你真的了解线程池的原理吗?同时,线程池工作原理和底层实

  • 一文带你搞懂Java中Object类和抽象类

    目录 一.抽象类是什么 二.初始抽象类 2.1 基本语法 2.2 继承抽象类 三.抽象类总结 四.Object类 4.1 初始Object 4.2 toString 4.3 equals 4.4 hashcode 一.抽象类是什么 在面向对象的概念中,所有的对象都是通过类来描绘的,但是反过来,并不是所有的类都是用来描绘对象的,如果一个类中没有包含足够的信息来描绘一个具体的对象,这样的类就是抽象类. 由于抽象类不能实例化对象,所以抽象类必须被继承,才能被使用.也是因为这个原因,通常在设计阶段决定要

  • 一文带你搞懂Java中Synchronized和Lock的原理与使用

    目录 1.Synchronized与Lock对比 2.Synchronized与Lock原理 2.1 Synchronized原理 2.2 Lock原理 3.Synchronized与Lock使用 Synchronized Lock 4.相关问题 1.Synchronized与Lock对比 实现方式:Synchronized是Java语言内置的关键字,而Lock是一个Java接口. 锁的获取和释放:Synchronized是隐式获取和释放锁,由Java虚拟机自动完成:而Lock需要显式地调用lo

  • 一文带你快速了解java中的static关键词

    Static基本规则: (1)一个类的静态方法只能访问静态属性. (2)一个类的静态方法不能直接调用非静态方法. (3)如果访问权限允许,static属性和方法可以使用对象加'.'方式调用,当然也可以使用实例加'.'方式调用. (4)静态方法中不存在当前对象,所以不能使用this,当然也不能使用super. (5)静态方法不能被非静态方法覆盖. (6)构造方法不允许被声明为static的. static关键词,可以加在方法上,成员变量上,代码块. 类就不要想了. 1. static方法 stat

  • 一文带你彻底理解Java序列化和反序列化

    Java序列化是什么? Java序列化是指把Java对象转换为字节序列的过程,Java反序列化是指把字节序列恢复为Java对象的过程. 反序列化: 客户端重文件,或者网络中获取到文件以后,在内存中重构对象. 序列化: 对象序列化的最重要的作用是传递和保存对象的时候,保证对象的完整性和可传递性.方便字节可以在网络上传输以及保存在本地文件. 为什么需要序列化和反序列化 实现分布式 核心在于RMI,可以利用对象序列化运行远程主机上的服务,实现运行的时候,就像在本地上运行Java对象一样. 实现递归保存

随机推荐