C#基于时间轮调度实现延迟任务详解

在很多.net开发体系中开发者在面对调度作业需求的时候一般会选择三方开源成熟的作业调度框架来满足业务需求,比如Hangfire、Quartz.NET这样的框架。但是有些时候可能我们只是需要一个简易的延迟任务,这个时候引入这些框架就费力不讨好了。

最简单的粗暴的办法当然是:

Task.Run(async () =>
{
    //延迟xx毫秒
    await Task.Delay(time);
    //业务执行
});

当时作为一个开发者,有时候还是希望使用更优雅的、可复用的一体化方案,比如可以实现一个简易的时间轮来完成基于内存的非核心重要业务的延迟调度。什么是时间轮呢,其实就是一个环形数组,每一个数组有一个插槽代表对应时刻的任务,数组的值是一个任务队列,假设我们有一个基于60秒的延迟时间轮,也就是说我们的任务会在不超过60秒(超过的情况增加分钟插槽,下面会讲)的情况下执行,那么如何实现?下面我们将定义一段代码来实现这个简单的需求

话不多说,撸代码,首先我们需要定义一个时间轮的Model类用于承载我们的延迟任务和任务处理器。简单定义如下:

public class WheelTask<T>
{
    public T Data { get; set; }
    public Func<T, Task> Handle { get; set; }
}

定义很简单,就是一个入参T代表要执行的任务所需要的入参,然后就是任务的具体处理器Handle。接着我们来定义时间轮本轮的核心代码:

可以看到时间轮其实核心就两个东西,一个是毫秒计时器,一个是数组插槽,这里数组插槽我们使用了字典来实现,key值分别对应0到59秒。每一个插槽的value对应一个任务队列。当添加一个新任务的时候,输入需要延迟的秒数,就会将任务插入到延迟多少秒对应的插槽内,当计时器启动的时候,每一跳刚好1秒,那么就会对插槽计数+1,然后去寻找当前插槽是否有任务,有的话就会调用ExecuteTask执行该插槽下的所有任务。

public class TimeWheel<T>
{
    int secondSlot = 0;
    DateTime wheelTime { get { return new DateTime(1, 1, 1, 0, 0, secondSlot); } }
    Dictionary<int, ConcurrentQueue<WheelTask<T>>> secondTaskQueue;
    public void Start()
    {
        new Timer(Callback, null, 0, 1000);
        secondTaskQueue = new Dictionary<int, ConcurrentQueue<WheelTask<T>>>();
        Enumerable.Range(0, 60).ToList().ForEach(x =>
        {
            secondTaskQueue.Add(x, new ConcurrentQueue<WheelTask<T>>());
        });
    }
    public async Task AddTaskAsync(int second, T data, Func<T, Task> handler)
    {
        var handTime = wheelTime.AddSeconds(second);
        if (handTime.Second != wheelTime.Second)
            secondTaskQueue[handTime.Second].Enqueue(new WheelTask<T>(data, handler));
        else
            await handler(data);
    }
    async void Callback(object o)
    {
        if (secondSlot != 59)
            secondSlot++;
        else
        {
            secondSlot = 0;
        }
        if (secondTaskQueue[secondSlot].Any())
            await ExecuteTask();
    }
    async Task ExecuteTask()
    {
        if (secondTaskQueue[secondSlot].Any())
            while (secondTaskQueue[secondSlot].Any())
                if (secondTaskQueue[secondSlot].TryDequeue(out WheelTask<T> task))
                    await task.Handle(task.Data);
    }
}

接下来就是如果我需要大于60秒的情况如何处理呢。其实就是增加分钟插槽数组,举个例子我有一个任务需要2分40秒后执行,那么当我 插入到时间轮的时候我先插入到分钟插槽,当计时器每过去60秒,分钟插槽值+1,当分钟插槽对应有任务的时候就将这些任务从分钟插槽里弹出再入队到秒插槽中,这样一个任务会先进入插槽值=2(假设从0开始计算)的分钟插槽,计时器运行120秒后分钟值从0累加到2,2插槽的任务弹出到插槽值=40的秒插槽里,当计时器再运行40秒,刚好就可以执行这个延迟2分40秒的任务。话不多说,上代码:

首先我们将任务WheelTask增加一个Second属性,用于当任务从分钟插槽弹出来时需要知道自己入队哪个秒插槽

public class WheelTask<T>
{
    ...
    public int Second { get; set; }
    ...
}

接着我们再重新定义时间轮的逻辑增加分钟插槽值以及插槽队列的部分

public class TimeWheel<T>
{
    int minuteSlot, secondSlot = 0;
    DateTime wheelTime { get { return new DateTime(1, 1, 1, 0, minuteSlot, secondSlot); } }
    Dictionary<int, ConcurrentQueue<WheelTask<T>>>  minuteTaskQueue, secondTaskQueue;
    public void Start()
    {
        new Timer(Callback, null, 0, 1000);、
        minuteTaskQueue = new Dictionary<int, ConcurrentQueue<WheelTask<T>>>();
        secondTaskQueue = new Dictionary<int, ConcurrentQueue<WheelTask<T>>>();
        Enumerable.Range(0, 60).ToList().ForEach(x =>
        {
            minuteTaskQueue.Add(x, new ConcurrentQueue<WheelTask<T>>());
            secondTaskQueue.Add(x, new ConcurrentQueue<WheelTask<T>>());
        });
    }
    ...
}

同样的在添加任务的AddTaskAsync函数中我们需要增加分钟,代码改为这样,当大于1分钟的任务会入队到分钟插槽中,小于1分钟的会按原逻辑直接入队到秒插槽中:

public async Task AddTaskAsync(int minute, int second, T data, Func<T, Task> handler)
{
    var handTime = wheelTime.AddMinutes(minute).AddSeconds(second);
        if (handTime.Minute != wheelTime.Minute)
            minuteTaskQueue[handTime.Minute].Enqueue(new WheelTask<T>(handTime.Second, data, handler));
        else
        {
            if (handTime.Second != wheelTime.Second)
                secondTaskQueue[handTime.Second].Enqueue(new WheelTask<T>(data, handler));
            else
                await handler(data);
        }
}

最后的部分就是计时器的callback以及任务执行的部分:

async void Callback(object o)
{
    bool minuteExecuteTask = false;
    if (secondSlot != 59)
        secondSlot++;
    else
    {
        secondSlot = 0;
        minuteExecuteTask = true;
        if (minuteSlot != 59)
            minuteSlot++;
        else
        {
            minuteSlot = 0;
        }
    }
    if (minuteExecuteTask || secondTaskQueue[secondSlot].Any())
        await ExecuteTask(minuteExecuteTask);
}
async Task ExecuteTask(bool minuteExecuteTask)
{
    if (minuteExecuteTask)
        while (minuteTaskQueue[minuteSlot].Any())
            if (minuteTaskQueue[minuteSlot].TryDequeue(out WheelTask<T> task))
                secondTaskQueue[task.Second].Enqueue(task);
    if (secondTaskQueue[secondSlot].Any())
        while (secondTaskQueue[secondSlot].Any())
            if (secondTaskQueue[secondSlot].TryDequeue(out WheelTask<T> task))
                await task.Handle(task.Data);
}

基本上基于分钟+秒的时间轮延迟任务核心功能就这些了,聪明的你一定知道如何扩展增加小时,天,月份甚至年份的时间轮了。虽然从代码逻辑上可以实现,但是大部分情况下我们使用时间轮仅仅是完成一些内存易失性的非核心的任务延迟调度,实现天,周,月年意义不是很大。所以基本上到小时就差不多了。再多就上作业系统来调度吧。

到此这篇关于C#基于时间轮调度实现延迟任务详解的文章就介绍到这了,更多相关C#延迟任务内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • C#通过rabbitmq实现定时任务(延时队列)

    本文主要讲解如何通过RabbitMQ实现定时任务(延时队列) 环境准备 需要在MQ中进行安装插件 地址链接 插件介绍地址:https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/ 使用场景 作为一个新的预支付订单被初始化放置,如果该订单在指定时间内未进行支付,则将被认为超时订单进行关闭处理:电商系统中应用较多,用户购买商品产生订单,但未进行支付,订单产生30分钟内未支付将关闭订单(且满足该场景数量庞大)

  • C#使用反射机制实现延迟绑定

    反射允许我们在编译期或运行时获取程序集的元数据,通过反射可以做到: ● 创建类型的实例● 触发方法● 获取属性.字段信息● 延迟绑定...... 如果在编译期使用反射,可通过如下2种方式获取程序集Type类型: 1.Type类的静态方法 Type type = Type.GetType("somenamespace.someclass"); 2.通过typeof Type type = typeof(someclass); 如果在运行时使用反射,通过运行时的Assembly实例方法获取

  • C#实现程序等待延迟执行的方法

    本文实例讲述了C#实现程序等待延迟执行的方法.分享给大家供大家参考.具体如下: [System.Runtime.InteropServices.DllImport("kernel32.dll")] static extern uint GetTickCount(); /// <summary> /// 程序等待延迟执行 /// </summary> /// <param name="ms"></param> stati

  • C#延迟执行方法函数实例讲解

    需求分析: 我们在做winform开发的时候,有时候需要让程序休眠几秒钟,但是如果我们直接使用 Thread.Sleep()函数的话,页面UI就会停止响应.怎么样解决呢,你可以把页面涉及到表现UI的代码放到一个单线程处理,也可以采用我面的方法,加一个小函数就ok了. if (MessageBox.Show("确定要清理吗?", "确认", MessageBoxButtons.YesNo) == DialogResult.Yes) { this.labMsg.Text

  • C#基于时间轮调度实现延迟任务详解

    在很多.net开发体系中开发者在面对调度作业需求的时候一般会选择三方开源成熟的作业调度框架来满足业务需求,比如Hangfire.Quartz.NET这样的框架.但是有些时候可能我们只是需要一个简易的延迟任务,这个时候引入这些框架就费力不讨好了. 最简单的粗暴的办法当然是: Task.Run(async () => { //延迟xx毫秒 await Task.Delay(time); //业务执行 }); 当时作为一个开发者,有时候还是希望使用更优雅的.可复用的一体化方案,比如可以实现一个简易的时

  • 基于ScheduledExecutorService的两种方法(详解)

    开发中,往往遇到另起线程执行其他代码的情况,用java定时任务接口ScheduledExecutorService来实现. ScheduledExecutorService是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说,任务是并发执行,互不影响. 注意,只有当调度任务来的时候,ScheduledExecutorService才会真正启动一个线程,其余时间ScheduledExecutorService都是处于轮询任务的状态. 1.scheduleAtFix

  • 得物基于StarRocks的OLAP需求实践详解

    目录 1. 什么是 StarRocks 2. 系统架构 3. 存储架构 3.1 表的存储 4. 需求背景 案例一: 案例二: 5. 经验分享 6. 未来规划 1. 什么是 StarRocks 新一代极速全场景MPP数据库,可以用 StarRocks 来支持多种数据分析场景的极速分析: 架构简洁,采用了全面向量化引擎,并配备全新设计的 CBO 优化器,查询速度(尤其是多表关联查询): 很好地支持实时数据分析,并能实现对实时更新数据的高效查询, 还支持现代化物化视图,以进一步加速查询: 用户可以灵活

  • React为什么需要Scheduler调度器原理详解

    目录 正文 我们为什么需要Scheduler(调度器) Scheduler如何进行工作 总结 正文 最近在重学React,由于近两年没使用React突然重学发现一些很有意思的概念,首先便是React的Scheduler(调度器) 由于我对React的概念还停留在React 15之前(就是那个没有hooks的年代),所以接触Scheduler(调度器) 让我感觉很有意思: 在我印象中React的架构分为两层(React 16 之前) Reconciler(协调器)—— 负责找出变化的组件 Rend

  • 基于java servlet过滤器和监听器(详解)

    1 过滤器 1.过滤器是什么? servlet规范当中定义的一种特殊的组件,用于拦截容器的调用. 注:容器收到请求之后,如果有过滤器,会先调用过滤器,然后在调用servlet. 2.如何写一个过滤器? 1.写一个java类,实现Filter接口; 2.在接口方法中实现拦截方法; 3.配置过滤器(web.xml); 3.配置初始化参数 1.配置初始化参数.(init-param) 2.通过filterconfig提供的getinitparamenter方法读取初始化的值. 4.优先级: 当有多个过

  • BootStrap的JS插件之轮播效果案例详解

    Bootstrap 是一个用于快速开发 Web 应用程序和网站的前端框架.Bootstrap 是基于 HTML.CSS.JAVASCRIPT 的. 案例 下面展示的就是此插件和相关组件制作的轮播案例. <div id="carousel-example-generic" class="carousel slide" data-ride="carousel"> <!-- Indicators --> <ol class

  • 基于NIO的Netty网络框架(详解)

    Netty是一个高性能.异步事件驱动的NIO框架,它提供了对TCP.UDP和文件传输的支持,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果. Netty的优点有: a.功能丰富,内置了多种数据编解码功能.支持多种网络协议. b.高性能,通过与其它主流NIO网络框架对比,它的综合性能最佳. c.可扩展性好,可通过它提供的ChannelHandler组件对网络通信方面进行灵活扩展. d.易用性,API使用简单.

  • 基于HashMap遍历和使用方法(详解)

    map的几种遍历方式: Map< String, String> map = new HashMap<>(); map.put("aa", "@sohu.com"); map.put("bb","@163.com"); map.put("cc", "@sina.com"); System.out.println("普通的遍历方法,通过Map.keySet

  • 基于Django contrib Comments 评论模块(详解)

    老版本的Django中自带一个评论框架.但是从1.6版本后,该框架独立出去了,也就是本文的评论插件. 这个插件可给models附加评论,因此常被用于为博客文章.图片.书籍章节或其它任何东西添加评论. 一.快速入门 快速使用步骤: 安装包:pip install django-contrib-comments 在django的settings中的INSTALLED_APPS处添加'django.contrib.sites'进行app注册,并设置SITE_ID值. 在django的settings中

  • 基于Vue的ajax公共方法(详解)

    为了减少代码的冗余,决定抽离出请求ajax的公共方法,供同事们使用. 我使用了ES6语法,编写了这个方法. /** * @param type 请求类型,分为POST/GET * @param url 请求url * @param contentType * @param headers * @param data * @returns {Promise<any>} */ ajaxData: function (type, url, contentType, headers, data) {

随机推荐