Java利用Netty时间轮实现延时任务

目录
  • 一、时间轮算法简介
  • 二、时间轮hello-world
  • 三、异步任务线程池
  • 四、时间轮优缺点
  • 知识点补充

一、时间轮算法简介

为了大家能够理解下文中的代码,我们先来简单了解一下netty时间轮算法的核心原理

时间轮算法名副其实,时间轮就是一个环形的数据结构,类似于表盘,将时间轮分成多个bucket(比如:0-8)。假设每个时间轮轮片的分隔时间段tickDuration=1s(即:指针经过每个格子花费时间是 1 s),当前的时间bucket=3,那么在18秒后需要被执行的任务需要落到((3+18)%8=5取余运算)的5号bucket上。假如有多个需要在该时间段内执行的任务,就会组成一个双向链表。另外针对时间轮我们要有下面的几个认知:

时间轮指针是一个Worker线程,在时间轮整点的时候执行双向链表中的任务。

时间轮算法的并不是精准的延时,它的执行精度取决于每个时间轮轮片的分隔时间段tickDuration

Worker线程是单线程,一个bucket、一个bucket的顺序处理任务。「所以我们的延时任务一定要做成异步任务,否则会影响时间轮后续任务的执行时间。」

二、时间轮hello-world

实现一个延时任务的例子,需求仍然十分的简单:你买了一张火车票,必须在30分钟之内付款,否则该订单被自动取消。「订单30分钟不付款自动取消,这个任务就是一个延时任务。」 我们的火车票订单取消任务,从需求上看并不需要非常精准的延时,所以是可以使用时间轮算法来完成这个任务的。

首先通过maven坐标引入netty

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.45.Final</version>
</dependency>

然后我们创建一个时间轮,如果是Spring的开发环境,我们可以这么做。下文中我们new了一个包含512个bucket的时间轮,每个时间轮的轮片时间间隔是100毫秒。

@Bean("hashedWheelTimer")
public HashedWheelTimer hashedWheelTimer(){
    return new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512);
}

举例:当用户买火车票下单的时候,向时间轮中添加一个30分钟的延时任务。延时任务将在30分钟之后被执行,下文的lambda表达式部分实现了一个TimerTask(task)延时任务。这个延时任务的函数体内,请一定使用异步任务,即:单独起一个线程或者使用SpringBoot异步任务线程池。因为Worker线程是单线程的,你的任务处理时间长于tickDuration会妨碍后续时间轮轮片上的任务的执行。

//订单下单操作
void order(String orderInfo) {
  //下单的时候,向时间轮中添加一个30分钟的延时任务
  hashedWheelTimer.newTimeout(task -> {
    //注意这里使用异步任务线程池或者开启线程进行订单取消任务的处理
    cancelOrder(orderInfo);
  }, 30, TimeUnit.MINUTES);
}

三、异步任务线程池

我们在上文中已经多次强调,时间轮的任务TimerTask的执行内容要做成异步的。最简单的做法就是接到一个任务之后启动一个线程处理该任务。在Spring环境下其实我们有更好的选择,就是使用Spring的线程池,这个线程池是可以自定义的。比如:下文中的用法是我事先定义了一个名字为test的线程池,然后通过@Async使用即可。

@Async("test")
public void cancelOrder(String orderInfo){
  //查询订单支付信息,如果用户未支付,关闭订单
}

可能有的朋友,还不知道该如何自定义一个Spring线程池,可以参考:我之前写过一个SpringBoot的**「可观测、易配置」**的线程池开源项目,源代码地址:https://gitee.com/hanxt/zimug-monitor-threadpool。我的这个zimug-monitor-threadpool开源项目,可以做到对线程池使用情况的监控,我自己平时用的效果还不错,向大家推荐一下!

四、时间轮优缺点

时间轮算法实现延时任务的优点就是,相对于使用JDK的DelayQueue,其算法上具有优势,执行性能相对好一些。其缺点就是所有的延时任务以及延时触发的管理,都是在单个应用服务的内存中进行的,一旦该应用服务发生故障重启服务,时间轮任务数据将全部丢失。这一缺点和DelayQueue是一样的。为了解决这个问题,我们可以使用redis、RocketMQ等分布式中间件来管理延时任务消息的方式来实现延时任务,这个我会在后续的文章中为大家介绍。

知识点补充

下面主要和大家一起来分析下Netty时间轮调度算法的原理

时间轮状态

时间轮有以下三种状态:

  • WORKER_STATE_INIT:初始化状态,此时时间轮内的工作线程还没有开启
  • WORKER_STATE_STARTED:运行状态,时间轮内的工作线程已经开启
  • WORKER_STATE_SHUTDOWN:终止状态,时间轮停止工作

状态转换如下:

构造函数

    public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
            long maxPendingTimeouts) {

        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        }
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }

        // 初始化时间轮数组,时间轮大小为大于等于 ticksPerWheel 的第一个 2 的幂,和 HashMap 类似
        wheel = createWheel(ticksPerWheel);
        // 取模用,用来定位数组中的槽
        mask = wheel.length - 1;

        // 为了保证精度,时间轮内的时间单位为纳秒
        long duration = unit.toNanos(tickDuration);

        // 时间轮内的时钟拨动频率不宜太大也不宜太小
        if (duration >= Long.MAX_VALUE / wheel.length) {
            throw new IllegalArgumentException(String.format(
                    "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                    tickDuration, Long.MAX_VALUE / wheel.length));
        }

        if (duration < MILLISECOND_NANOS) {
            logger.warn("Configured tickDuration {} smaller then {}, using 1ms.",
                        tickDuration, MILLISECOND_NANOS);
            this.tickDuration = MILLISECOND_NANOS;
        } else {
            this.tickDuration = duration;
        }

        // 创建工作线程
        workerThread = threadFactory.newThread(worker);

        // 非守护线程且 leakDetection 为 true 时检测内存是否泄漏
        leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

        // 初始化最大等待任务数
        this.maxPendingTimeouts = maxPendingTimeouts;

        // 如果创建的时间轮实例大于 64,打印日志,并且这个日志只会打印一次
        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
            WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
            reportTooManyInstances();
        }
    }

构造函数中的参数相当重要,当自定义时间轮时,我们应该根据业务的范围设置合理的参数:

  • threadFactory:创建时间轮任务线程的工厂,通过这个工厂可以给我们的线程自定义一些属性(线程名、异常处理等)
  • tickDuration:时钟多长时间拨动一次,值越小,时间轮精度越高
  • unit:tickDuration 的单位
  • ticksPerWheel:时间轮数组大小
  • leakDetection:是否检测内存泄漏
  • maxPendingTimeouts:时间轮内最大等待的任务数

时间轮的时钟拨动时长应该根据业务设置恰当的值,如果设置的过大,可能导致任务触发时间不准确。如果设置的过小,时间轮转动频繁,任务少的情况下加载不到任务,属于一直空转的状态,会占用 CPU 线程资源。

为了防止时间轮占用过多的 CPU 资源,当创建的时间轮对象大于 64 时会以日志的方式提示。

构造函数中只是初始化了轮线程,并没有开启,当第一次往时间轮内添加任务时,线程才会开启。

到此这篇关于Java利用Netty时间轮实现延时任务的文章就介绍到这了,更多相关Java Netty时间轮内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java处理延时任务的常用几种解决方案

    目录 前言 数据库轮询 原理 优缺点 Java延迟队列 Reids监听失效key 创建监听类,实现MessageListener接口 RocketMq延迟消息 总结 前言 项目中经常会遇到如下的需求: 创建订单30分钟未支付,订单自动取消. 订单支付成功后,1分钟后给用户发送短信,提醒用户评价. … 针对延时任务需求,我们可以采用如下的解决方案: 数据库轮询 原理 通过一个线程定时的扫描数据库当天创建的订单,根据订单的创建时间来判断订单是否超时,针对超时订单进行相关的更新操作.实现技术采用Spr

  • Java时间轮算法的实现代码示例

    考虑这样一个场景,现在有5000个任务,要让这5000个任务每隔5分中触发某个操作,怎么去实现这个需求.大部分人首先想到的是使用定时器,但是5000个任务,你就要用5000个定时器,一个定时器就是一个线程,你懂了吧,这种方法肯定是不行的. 针对这个场景,催生了时间轮算法,时间轮到底是什么?我一贯的风格,自行谷歌去.大发慈悲,发个时间轮介绍你们看看,看文字和图就好了,代码不要看了,那个文章里的代码运行不起来,时间轮介绍. 看好了介绍,我们就开始动手吧. 开发环境:idea + jdk1.8 + m

  • java简单手写版本实现时间轮算法

    时间轮 关于时间轮的介绍,网上有很多,这里就不重复了 核心思想 一个环形数组存储时间轮的所有槽(看你的手表),每个槽对应当前时间轮的最小精度 超过当前时间轮最大表示范围的会被丢到上层时间轮,上层时间轮的最小精度即为下层时间轮能表达的最大时间(时分秒概念) 每个槽对应一个环形链表存储该时间应该被执行的任务 需要一个线程去驱动指针运转,获取到期任务 以下给出java 简单手写版本实现 代码实现 时间轮主数据结构 /** * @author apdoer * @version 1.0 * @date

  • Java DelayQueue实现延时任务的示例详解

    目录 一.DelayQueue的应用原理 二.订单延时任务的实现 三.订单处理 四.优缺点 一.DelayQueue的应用原理 DelayQueue是一个无界的BlockingQueue的实现类,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走. BlockingQueue即阻塞队列,java提供的面向多线程安全的队列数据结构,当队列内元素数量为0的时候,试图从队列内获取元素的线程将被阻塞或者抛出异常. 这里的“无界”队列,是指队列的元素数量不存在上限,队列的容量

  • Java利用Netty时间轮实现延时任务

    目录 一.时间轮算法简介 二.时间轮hello-world 三.异步任务线程池 四.时间轮优缺点 知识点补充 一.时间轮算法简介 为了大家能够理解下文中的代码,我们先来简单了解一下netty时间轮算法的核心原理 时间轮算法名副其实,时间轮就是一个环形的数据结构,类似于表盘,将时间轮分成多个bucket(比如:0-8).假设每个时间轮轮片的分隔时间段tickDuration=1s(即:指针经过每个格子花费时间是 1 s),当前的时间bucket=3,那么在18秒后需要被执行的任务需要落到((3+1

  • Java利用redis zset实现延时任务详解

    目录 一.实现原理 二.准备工作 三.代码实现 四.优缺点 所谓的延时任务给大家举个例子:你买了一张火车票,必须在30分钟之内付款,否则该订单被自动取消.「订单30分钟不付款自动取消,这个任务就是一个延时任务.」   我之前已经写过2篇关于延时任务的文章: <通过DelayQueue实现延时任务> <基于netty时间轮算法实战> 这两种方法都有一个缺点:都是基于单体应用的内存的方式运行延时任务的,一旦出现单点故障,可能出现延时任务数据的丢失.所以此篇文章给大家介绍实现延时任务的第

  • Java 利用DeferredResult实现http轮询实时返回数据接口

    今天这篇文章呢,不难,其实是解答我一直以来心里的一个疑问.是这样的,之前看五八技术委员会主席沈剑老师的公众号架构师之路的一篇文章:http 如何像 tcp 一样实时的收消息,里面其中的一个方案是用 http 短连接轮询的方式实现"伪长连接".但是对于轮询,我们的第一反应肯定是有延时,但是标题不是说的是实时吗?当然我们可以把轮询的时长缩短一些,先不说这样大部分时间的轮询调用,可能都没消息返回,造成服务器资源浪费,轮询时间再短也是有延时啊,所以难道是伪实时?反正一般消息延时个三五秒,甚至十

  • 利用C语言实现经典多级时间轮定时器

    目录 1. 序言 2. 多级时间轮实现框架 2.1 多级时间轮对象 2.2 时间轮对象 2.3 定时任务对象 2.4 双向链表 2.5 联结方式 3. 多级时间轮C语言实现 3.1 双向链表头文件: list.h 3.2 调试信息头文件: log.h 3.3 时间轮代码: timewheel.c 3.4 编译运行 总结 1. 序言 最近一直在找时间轮的C语言实现代码,发现很多都是Java或者c++实现的.而我对其他语言不熟悉,看不太懂.关于C实现的,让我如沐春风的实现没找到,github上也只找

  • java利用时间格式生成唯一文件名的方法

    前言 有时候我们需要截图,在要截图时,有人用到了时间格式,但是时间格式中的:在文件名称中是不被允许的字符,所以就会报错,如何生成唯一的时间文件名: 示例代码 package com.demo; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.GregorianCalendar;

  • Java 如何实现时间控制

    目录 一.间控制的几种方案 1.1 从线程方面解决 1.2使用Timer 1.3redis延时 二.redis 2.1maven中引入redis 2.2 在springboot中配置redis 2.3redisTemplate模板工具类 2.4在redis中实现时间控制 2.4.1无限循环比较值 三.总结 前言: 需求是这样的,在与第三方对接过程中,对方提供了token进行时效性验证,过一段时间token就会失效.后台有定时任务在获取,但是偶尔会出现token失效,这是因为在获取的时候,定时任务

  • Java利用Dijkstra和Floyd分别求取图的最短路径

    目录 1 最短路径的概述 2 杰斯特拉(Dijkstra)算法 2.1 原理 2.2 案例分析 3 弗洛伊德(Floyd)算法 3.1 原理 3.2 案例分析 4 邻接矩阵加权图实现 5 邻接表加权图实现 本文详细介绍了图的最短路径的概念,然后介绍了求最短路径的两种算法:Dijkstra算法和Floyd算法的原理,最后提供了基于邻接矩阵和邻接表的图对两种算法的Java实现. 阅读本文需要一定的图的基础,如果对于图不是太明白的可以看看这篇文章:Java数据结构之图的原理与实现. 1 最短路径的概述

  • Java利用Selenium操作浏览器的示例详解

    目录 简介 设置元素等待 显式等待 隐式等待 强制等待 总结 简介 本文主要介绍如何使用java代码利用Selenium操作浏览器,某些网页元素加载慢,如何操作元素就会把找不到元素的异常,此时需要设置元素等待,等待元素加载完,再操作. 设置元素等待 很多页面都使用 ajax 技术,页面的元素不是同时被加载出来的,为了防止定位这些尚在加载的元素报错,可以设置元素等来增加脚本的稳定性.webdriver 中的等待分为 显式等待 和 隐式等待. 显式等待 显式等待:设置一个超时时间,每个一段时间就去检

随机推荐