SpringBoot定时任务设计之时间轮案例原理详解

目录
  • 知识准备
  • 什么是时间轮(Timing Wheel)
  • Netty的HashedWheelTimer要解决什么问题
  • HashedWheelTimer的使用方式
  • 实现案例
    • Pom依赖
    • 2个简单例子
  • HashedWheelTimer是如何实现的?
  • 什么是多级Timing Wheel?

知识准备

Timer和ScheduledExecutorService是JDK内置的定时任务方案,而业内还有一个经典的定时任务的设计叫时间轮(Timing Wheel), Netty内部基于时间轮实现了一个HashedWheelTimer来优化百万量级I/O超时的检测,它是一个高性能,低消耗的数据结构,它适合用非准实时,延迟的短平快任务,例如心跳检测。本文主要介绍时间轮(Timing Wheel)及其使用。@pdai

需要对时间轮(Timing Wheel),以及Netty的HashedWheelTimer要解决什么问题有初步的认识。

什么是时间轮(Timing Wheel)

时间轮(Timing Wheel)是George Varghese和Tony Lauck在1996年的论文' Hashed and Hierarchical Timing Wheels: data structures to efficiently implement a timer facility '实现的,它在Linux内核中使用广泛,是Linux内核定时器的实现方法和基础之一。

时间轮(Timing Wheel)是一种环形的数据结构,就像一个时钟可以分成很多格子(Tick),每个格子代表时间的间隔,它指向存储的具体任务(timerTask)的一个链表。

以上述在论文中的图片例子,这里一个轮子包含8个格子(Tick), 每个tick是一秒钟;

任务的添加:如果一个任务要在17秒后执行,那么它需要转2轮,最终加到Tick=1位置的链表中。

任务的执行:在时钟转2Round到Tick=1的位置,开始执行这个位置指向的链表中的这个任务。(# 这里表示剩余需要转几轮再执行这个任务)

Netty的HashedWheelTimer要解决什么问题

HashedWheelTimer是Netty根据时间轮(Timing Wheel)开发的工具类,它要解决什么问题呢?这里面有两个要点: 延迟任务 + 低时效性 。@pdai

在Netty中的一个典型应用场景是判断某个连接是否idle,如果idle(如客户端由于网络原因导致到服务器的心跳无法送达),则服务器会主动断开连接,释放资源。判断连接是否idle是通过定时任务完成的,但是Netty可能维持数百万级别的长连接,对每个连接去定义一个定时任务是不可行的,所以如何提升I/O超时调度的效率呢?

Netty根据时间轮(Timing Wheel)开发了HashedWheelTimer工具类,用来优化I/O超时调度(本质上是延迟任务);之所以采用时间轮(Timing Wheel)的结构还有一个很重要的原因是I/O超时这种类型的任务对时效性不需要非常精准。

HashedWheelTimer的使用方式

在了解时间轮(Timing Wheel)和Netty的HashedWheelTimer要解决的问题后,我们看下HashedWheelTimer的使用方式

通过构造函数看主要参数

public HashedWheelTimer(
        ThreadFactory threadFactory,
        long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
        long maxPendingTimeouts, Executor taskExecutor) {
}

具体参数说明如下:

threadFactory
tickDuration
unit
ticksPerWheel
leakDetection
maxPendingTimeouts

实现案例

这里展示下HashedWheelTimer的基本使用案例。@pdai

Pom依赖

引入pom的依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.77.Final</version>
</dependency>

2个简单例子

例子1:5秒后执行TimerTask

@SneakyThrows
public static void simpleHashedWheelTimer() {
    log.info("init task 1...");
    HashedWheelTimer timer = new HashedWheelTimer(1, TimeUnit.SECONDS, 8);
    // add a new timeout
    timer.newTimeout(timeout -&gt; {
        log.info("running task 1...");
    }, 5, TimeUnit.SECONDS);
}

执行结果如下:

23:32:21.364 [main] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - init task 1...
...
23:32:27.454 [pool-1-thread-1] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - running task 1...

例子2:任务失效后cancel并让它重新在3秒后执行。

@SneakyThrows
public static void reScheduleHashedWheelTimer() {
    log.info("init task 2...");
    HashedWheelTimer timer = new HashedWheelTimer(1, TimeUnit.SECONDS, 8);
    Thread.sleep(5000);
    // add a new timeout
    Timeout tm = timer.newTimeout(timeout -> {
        log.info("running task 2...");
    }, 5, TimeUnit.SECONDS);
    // cancel
    if (!tm.isExpired()) {
        log.info("cancel task 2...");
        tm.cancel();
    }
    // reschedule
    timer.newTimeout(tm.task(), 3, TimeUnit.SECONDS);
}

23:28:36.408 [main] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - init task 2...
23:28:41.412 [main] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - cancel task 2...
23:28:45.414 [pool-2-thread-1] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - running task 2...

我们通过如下问题进一步理解HashedWheelTimer。@pdai

HashedWheelTimer是如何实现的?

简单看下HashedWheelTimer是如何实现的

Worker
HashedWheelBucket
HashedWheelTimeout

构造函数

public HashedWheelTimer(
        ThreadFactory threadFactory,
        long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
        long maxPendingTimeouts, Executor taskExecutor) {
    checkNotNull(threadFactory, "threadFactory");
    checkNotNull(unit, "unit");
    checkPositive(tickDuration, "tickDuration");
    checkPositive(ticksPerWheel, "ticksPerWheel");
    this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
    // Normalize ticksPerWheel to power of two and initialize the wheel.
    wheel = createWheel(ticksPerWheel);
    mask = wheel.length - 1;
    // Convert tickDuration to nanos.
    long duration = unit.toNanos(tickDuration);
    // Prevent overflow.
    if (duration >= Long.MAX_VALUE / wheel.length) {
        throw new IllegalArgumentException(String.format(
                "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                tickDuration, Long.MAX_VALUE / wheel.length));
    }
    if (duration < MILLISECOND_NANOS) {
        logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
                    tickDuration, MILLISECOND_NANOS);
        this.tickDuration = MILLISECOND_NANOS;
    } else {
        this.tickDuration = duration;
    }
    workerThread = threadFactory.newThread(worker);
    leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
    this.maxPendingTimeouts = maxPendingTimeouts;
    if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
        WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
        reportTooManyInstances();
    }
}

创建wheel

private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
    //ticksPerWheel may not be greater than 2^30
    checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
    ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
    HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
    for (int i = 0; i < wheel.length; i ++) {
        wheel[i] = new HashedWheelBucket();
    }
    return wheel;
}
private static int normalizeTicksPerWheel(int ticksPerWheel) {
    int normalizedTicksPerWheel = 1;
    while (normalizedTicksPerWheel < ticksPerWheel) {
        normalizedTicksPerWheel <<= 1;
    }
    return normalizedTicksPerWheel;
}

任务的添加

@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    checkNotNull(task, "task");
    checkNotNull(unit, "unit");
    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
        pendingTimeouts.decrementAndGet();
        throw new RejectedExecutionException("Number of pending timeouts ("
            + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
            + "timeouts (" + maxPendingTimeouts + ")");
    }
    start();
    // Add the timeout to the timeout queue which will be processed on the next tick.
    // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
    // Guard against overflow.
    if (delay > 0 && deadline < 0) {
        deadline = Long.MAX_VALUE;
    }
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    timeouts.add(timeout);
    return timeout;
}

执行方法

/**
    * Starts the background thread explicitly.  The background thread will
    * start automatically on demand even if you did not call this method.
    *
    * @throws IllegalStateException if this timer has been
    *                               {@linkplain #stop() stopped} already
    */
public void start() {
    switch (WORKER_STATE_UPDATER.get(this)) {
        case WORKER_STATE_INIT:
            if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                workerThread.start();
            }
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
    }
    // Wait until the startTime is initialized by the worker.
    while (startTime == 0) {
        try {
            startTimeInitialized.await();
        } catch (InterruptedException ignore) {
            // Ignore - it will be ready very soon.
        }
    }
}

停止方法

@Override
public Set<Timeout> stop() {
    if (Thread.currentThread() == workerThread) {
        throw new IllegalStateException(
                HashedWheelTimer.class.getSimpleName() +
                        ".stop() cannot be called from " +
                        TimerTask.class.getSimpleName());
    }
    if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
        // workerState can be 0 or 2 at this moment - let it always be 2.
        if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
            INSTANCE_COUNTER.decrementAndGet();
            if (leak != null) {
                boolean closed = leak.close(this);
                assert closed;
            }
        }
        return Collections.emptySet();
    }
    try {
        boolean interrupted = false;
        while (workerThread.isAlive()) {
            workerThread.interrupt();
            try {
                workerThread.join(100);
            } catch (InterruptedException ignored) {
                interrupted = true;
            }
        }
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    } finally {
        INSTANCE_COUNTER.decrementAndGet();
        if (leak != null) {
            boolean closed = leak.close(this);
            assert closed;
        }
    }
    return worker.unprocessedTimeouts();
}

什么是多级Timing Wheel?

多级的时间轮是比较好理解的,时钟是有小时,分钟,秒的,秒转一圈(Round)分钟就转一个格(Tick), 分钟转一圈(Round)小时就转一格(Tick)。

PS:显然HashedWheelTimer是一层时间轮。

以上就是SpringBoot定时任务设计之时间轮案例原理详解的详细内容,更多关于SpringBoot定时任务时间轮的资料请关注我们其它相关文章!

(0)

相关推荐

  • SpringBoot开发存储服务器实现过程详解

    目录 正文 基础环境 创建项目 添加Rest API接口功能(提供上传服务) 启动服务,测试API接口可用性 增加下载文件支持 文件大小设置 打包文件部署 正文 今天我们尝试Spring Boot整合Angular,并决定建立一个非常简单的Spring Boot微服务,使用Angular作为前端渲编程语言进行前端页面渲染. 基础环境 技术 版本 Java 1.8+ SpringBoot 1.5.x 创建项目 初始化项目 mvn archetype:generate -DgroupId=com.e

  • 大厂禁止SpringBoot在项目使用Tomcat容器原理解析

    目录 前言 SpringBoot中的Tomcat容器 SpringBoot设置Undertow Tomcat与Undertow的优劣对比 最后 前言 在SpringBoot框架中,我们使用最多的是Tomcat,这是SpringBoot默认的容器技术,而且是内嵌式的Tomcat.同时,SpringBoot也支持Undertow容器,我们可以很方便的用Undertow替换Tomcat,而Undertow的性能和内存使用方面都优于Tomcat,那我们如何使用Undertow技术呢?本文将为大家细细讲解

  • SpringBoot整合Graylog做日志收集实现过程

    目录 日志收集折腾过程 ELK EFK ELFK 自己撸一个 Graylog 环境搭建 创建输入 Spring Boot整合Graylog 总结 日志收集折腾过程 ELK 之前整合过ELK做日志采集,就是Elasticsearch + Logstash + Kibana: Elasticsearch:存储引擎,存放日志内容,利于全文检索 Logstash:数据传输管道,将日志内容传输到Elasticsearch,并且支持过滤内容,将内容格式化后再传输,可以满足绝大部分的应用场景 Kibana:开

  • Springboot FatJa原理机制源码解析

    目录 一.概述 二.标准的 jar 包结构 三.探索JarLauncher 3.1 只能拷贝出来一份儿 3.2 携带程序所依赖的jar而非仅class 四. 自定义类加载器的运行机制 4.1 指定资源 4.2 创建自定义 ClassLoader 4.3 设置线程上下文类加载器,调用程序中的 main class 一.概述 SpringBoot FatJar 的设计,打破了标准 jar 的结构,在 jar 包内携带了其所依赖的 jar 包,通过 jar 中的 main 方法创建自己的类加载器,来识

  • SpringBoot数据库初始化datasource配置方式

    目录 I. 项目搭建 1. 依赖 2. 配置 3. 初始化sql II. 示例 1. 验证demo 2. 问题记录 2.1 只有初始化数据data.sql,没有schema.sql时,不生效 2.2 版本问题导致配置不生效 2.3 mode配置不对导致不生效 2.4 重复启动之后,报错 小结 I. 项目搭建 在我们的日常业务开发过程中,如果有db的相关操作,通常我们是直接建立好对应的库表结构,并初始化对应的数据,即更常见的情况下是我们在已有表结构基础之下,进行开发: 但是当我们是以项目形式工作时

  • SpringBoot 容器刷新前回调ApplicationContextInitializer

    目录 引言 I. 项目准备 II. 容器刷新前扩展点实例 1. 自定义ApplicationContextInitializer 2. 扩展点注册 3. 执行顺序指定 4. 使用场景示例 5. 小结 III. 不能错过的源码和相关知识点 项目 引言 本文将作为Spring系列教程中源码版块的第一篇,整个源码系列将分为两部分进行介绍:单纯的源码解析,大概率是个吃力没人看的事情,因此我们将结合源码解析,一个是学习下别人的优秀设计,一个是站在源码的角度看一下我们除了日常的CURD之外,还可以干些啥 在

  • SpringBoot定时任务设计之时间轮案例原理详解

    目录 知识准备 什么是时间轮(Timing Wheel) Netty的HashedWheelTimer要解决什么问题 HashedWheelTimer的使用方式 实现案例 Pom依赖 2个简单例子 HashedWheelTimer是如何实现的? 什么是多级Timing Wheel? 知识准备 Timer和ScheduledExecutorService是JDK内置的定时任务方案,而业内还有一个经典的定时任务的设计叫时间轮(Timing Wheel), Netty内部基于时间轮实现了一个Hashe

  • 微服务架构设计RocketMQ进阶事务消息原理详解

    目录 前言 RocketMQ事务流程概要 RocketMQ事务流程关键 实现 基础配置 引入组件 添加配置 发送半消息 执行本地事务与回查 消费消息 测试 总结 前言 分布式消息选型的时候是否支持事务消息是一个很重要的考量点,而目前只有RocketMQ对事务消息支持的最好.今天我们来唠唠如何实现RocketMQ的事务消息! Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败

  • Springboot整合实现邮件发送的原理详解

    目录 开发前准备 基础知识 进阶知识 加入依赖 配置邮件 测试邮件发送 通常在实际项目中,也有其他很多地方会用到邮件发送,比如通过邮件注册账户/找回密码,通过邮件发送订阅信息等等.SpringBoot集成邮件服务非常简单,通过简单的学习即可快速掌握邮件业务类的核心逻辑和企业邮件的日常服务 开发前准备 首先注册发件邮箱并设置客户端授权码,这里以QQ 免费邮箱为例,其他的邮箱的配置也大同小异. 登录 QQ 邮箱,点击设置->账户,开启IMAP/SMTP服务,并生成授权码. 基础知识 电子邮件需要在邮

  • SpringBoot 自动装配的原理详解分析

    目录 前言 自动装配案例 自动装配分析 自动装配总结 前言 关于 ​​SpringBoot​​​ 的自动装配功能,相信是每一个 ​​Java​​ 程序员天天都会用到的一个功能,但是它究竟是如何实现的呢?今天阿粉来带大家看一下. 自动装配案例 首先我们通过一个案例来看一下自动装配的效果,创建一个 ​​SpringBoot​​ 的项目,在 ​​pom​​ 文件中加入下面的依赖. <dependency> <groupId>org.springframework.boot</gro

  • SpringBoot自动装配原理详解

    首先对于一个SpringBoot工程来说,最明显的标志的就是 @SpringBootApplication它标记了这是一个SpringBoot工程,所以今天的 SpringBoot自动装配原理也就是从它开始说起. 自动装配流程 首先我们来看下@SpringBootApplication 这个注解的背后又有什么玄机呢,我们按下 ctrl + 鼠标左键,轻轻的点一下,此时见证奇迹的时刻.. 我们看到如下优雅的代码: 这其中有两个比较容易引起我们注意的地方,一个是@SpringBootConfigur

  • SpringBoot内置tomcat启动原理详解

    前言 不得不说SpringBoot的开发者是在为大众程序猿谋福利,把大家都惯成了懒汉,xml不配置了,连tomcat也懒的配置了,典型的一键启动系统,那么tomcat在springboot是怎么启动的呢? 内置tomcat 开发阶段对我们来说使用内置的tomcat是非常够用了,当然也可以使用jetty. <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-bo

  • Java SpringBoot自动装配原理详解及源码注释

    目录 一.pom.xml文件 1.父依赖 2.启动器: 二.主程序: 剖析源码注解: 三.结论: 一.pom.xml文件 1.父依赖 主要是依赖一个父项目,管理项目的资源过滤以及插件! 资源过滤已经配置好了,无需再自己配置 在pom.xml中有个父依赖:spring-boot-dependencies是SpringBoot的版本控制中心! 因为有这些版本仓库,我们在写或者引入一些springboot依赖的时候,不需要指定版本! 2.启动器: 启动器也就是Springboot的启动场景; 比如sp

  • SpringBoot自动配置原理详解

    目录 阅读收获 一.SpringBoot是什么 二.SpringBoot的特点 三.启动类 3.1 @SpringBootApplication 四.@EnableAutoConfiguration 4.1 @AutoConfigurationPackage 4.2  @Import({AutoConfigurationImportSelector.class}) 五.流程总结图 六.常用的Conditional注解 七.@Import支持导入的三种方式 阅读收获 理解SpringBoot自动配

  • SpringBoot应用jar包启动原理详解

    目录 1.maven打包 2.Jar包目录结构 3.可执行Jar(JarLauncher) 4.WarLauncher 5.总结 1.maven打包 Spring Boot项目的pom.xml文件中默认使用spring-boot-maven-plugin插件进行打包: <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>s

  • Java定时任务原理详解

    目录 序章 一.Scheduled 1.1 使用方法 1.2 源码分析 二.QUARTZ 2.1 使用方法 2.2 源码分析 序章 定时任务实现方式 当下,java编码过程中,实现定时任务的方式主要以以下两种为主 spring框架的@Scheduled quzrtz框架 网络上关于这两种框架的实践和配置相关的教程很多,这里不再赘述. 本文主要就二者的框架原理实现做一个入门引导,为了解深层实现细节做一定的铺垫. 本文源码版本 spring-context-3.2.18.RELEASE.jar qu

随机推荐