通过源码分析Golang cron的实现原理

目录
  • 前言
  • Demo示例
  • 源码实现
    • 结构体 Cron 和 Entry
    • New()实现
    • AddFunc()实现
    • Start()实现
    • Run()实现
    • Stop()实现
    • Remove()实现
  • 小结

前言

golang实现定时任务很简单,只须要简单几步代码即可以完成,最近在做了几个定时任务,想研究一下它内部是怎么实现的,所以将源码过了一遍,记录和分享在此。需要的朋友可以参考以下内容,希望对大家有帮助。

关于go cron是如何使用的可以参考之前的文章:一文带你入门Go语言中定时任务库Cron的使用

Demo示例

package main

import (
    "fmt"
    "github.com/robfig/cron/v3"
)

func main() {
    // 创建一个默认的cron对象
    c := cron.New()

    //添加执行任务
    c.AddFunc("30 * * * *", func() { fmt.Println("Every hour on the half hour") })
    c.AddFunc("@hourly", func() { fmt.Println("Every hour, starting an hour from now") })
    c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty, starting an hour thirty from now") })

    //开始执行任务
    c.Start()
    select {} //阻塞
}

通过上面的示例,可以发现, cron 最常用的几个函数:

  • New(): 实例化一个 cron 对象。
  • Cron.AddFunc(): 向 Cron 对象中添加一个作业,接受两个参数,第一个是 cron 表达式,第二个是一个无参无返回值的函数(作业)。
  • Cron.Stop(): 停止调度,Stop 之后不会再有未执行的作业被唤醒,但已经开始执行的作业不会受影响。

源码实现

在了解其整体逻辑的实现过程前,先了解两个重要的结构体EntryCron

位置在/robfig/cron/cron.go

结构体 Cron 和 Entry

Cron主要负责维护所有的任务数据,调用相关的func时间指定,可以启动、停止任务等;Entry是对添加到 Cron 中的任务的封装,每个 Entry 有一个 ID,除此之外,Entry 里保存了这个任务上次运行的时间和下次运行的时间。具体代码实现如下:

// Entry 数据结构,每一个被调度实体一个
type Entry struct {
  // 唯一id,用于查询和删除
  ID EntryID
  // 本Entry的调度时间,不是绝对时间,在生成entry时会计算出来
  Schedule Schedule
  // 本entry下次需要执行的绝对时间,会一直被更新
  // 被封装的含义是Job可以多层嵌套,可以实现基于需要执行Job的额外处理
  // 比如抓取Job异常、如果Job没有返回下一个时间点的Job是还是继续执行还是delay
  Next time.Time
  // 上一次被执行时间,主要用来查询
  Prev time.Time
  // WrappedJob 是真实执行的Job实体
  WrappedJob Job
  // Job 主要给用户查询
  Job Job
}
// Cron保持任意数量的任务的轨道,调用相关的func时间表指定。它可以被启动,停止,可运行的同时进行检查。
type Cron struct {
  entries   []*Entry          // 保存了所有加入到 Cron 的任务
   // chain 用来定义entry里的warppedJob使用什么逻辑(e.g. skipIfLastRunning)
   // 即一个cron里所有entry只有一个封装逻辑
  chain     Chain
  stop      chan struct{}     // 停止整个cron的channel
  add       chan *Entry       // 增加一个entry的channel
  remove    chan EntryID      // 移除一个entry的channel
  snapshot  chan chan []Entry // 获取entry整体快照的channel
  running   bool              // 代表是否已经在执行,是cron为使用者提供的动态修改entry的接口准备的
  logger    Logger            // 封装golang的log包
  runningMu sync.Mutex        // 用来修改运行中的cron数据,比如增加entry,移除entry
  location  *time.Location    // 地理位置
  parser    ScheduleParser    // 对时间格式的解析,为interface, 可以定制自己的时间规则。
  nextID    EntryID           // entry的全局ID,新增一个entry就加1
  jobWaiter sync.WaitGroup    // run job时会进行add(1), job 结束会done(),stop整个cron,以此保证所有job都能退出
}

New()实现

cron.go中的New()方法用来创建并返回一个Cron对象指针,其实现如下:

func New(opts ...Option) *Cron {
	c := &Cron{
		entries:   nil,
		chain:     NewChain(),
		add:       make(chan *Entry),
		stop:      make(chan struct{}),
		snapshot:  make(chan chan []Entry),
		remove:    make(chan EntryID),
		running:   false,
		runningMu: sync.Mutex{},
		logger:    DefaultLogger,
		location:  time.Local,
		parser:    standardParser,
	}
	for _, opt := range opts {
		opt(c)
	}
	return c
}

AddFunc()实现

AddFunc() 用于向Corn中添加一个任务,AddFunc()中将func包装成 Job 类型然后调用AddJob()AddFunc() 相较于 AddJob() 帮用户省去了包装成 Job 类型的一步,在 AddJob() 中,调用了 standardParser.Parse() cron 表达式解释成了 schedule 类型,最终,他们调用了 Schedule() 方法;其代码实现如下:

func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
	return c.AddJob(spec, FuncJob(cmd)) //包装成job类型然后调用AddJob()方法
}

func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
	schedule, err := c.parser.Parse(spec) //将cron表达式解析成schedule类型
	if err != nil {
		return 0, err
	}
	return c.Schedule(schedule, cmd), nil //调用Schedule()
}

func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
	c.runningMu.Lock() //为了保证线程安全,加锁
	defer c.runningMu.Unlock()
	c.nextID++ //下一EntryID
	entry := &Entry{
		ID:         c.nextID,
		Schedule:   schedule,
		WrappedJob: c.chain.Then(cmd),
		Job:        cmd,
	}
    // Cron是否处于运行状态
	if !c.running {
		c.entries = append(c.entries, entry) // 追加到entries列表中
	} else {
		c.add <- entry // 发送到Cron的add chan
	}
	return entry.ID
}

Schedule()这个方法负责创建 Entry 结构体,并把它追加到 Cronentries 列表中,如果 Cron 已经处于运行状态,会将这个创建好的 entry 发送到 Cronadd chan 中,在run()中会处理这种情况。

Start()实现

Start() 用于开始执行 Cron,其代码实现如下:

func (c *Cron) Start() {
	c.runningMu.Lock() // 获取锁
	defer c.runningMu.Unlock()
	if c.running {
		return
	}
	c.running = true // 将 c.running 置为 true 表示 cron 已经在运行中了
	go c.run() //开启一个 goroutine 执行 c.run()
}

通过上面的代码,可以看到主要干了这么几件事:

  • 获取锁,保证线程安全。
  • 判断cron是否已经在运行中,如果是则直接返回,否则将 c.running 置为 true 表示 cron 已经在运行中了。
  • 开启一个 goroutine 执行 c.run()

Run()实现

Run()是整个cron的一个核心,它负责处理cron开始执行后的大部分事情, run中会一直轮循c.entries中的entry, 如果一个entry 允许执行了,就会开启单独的goroutine去执行这个任务。

// run the scheduler.. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
	c.logger.Info("start")

	// Figure out the next activation times for each entry.
	now := c.now()
	for _, entry := range c.entries {
		entry.Next = entry.Schedule.Next(now)
		c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
	}

	for {
		// Determine the next entry to run.
		// 将定时任务执行时间进行排序,最近最早执行的放在前面
		sort.Sort(byTime(c.entries))

		var timer *time.Timer
		if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
			// If there are no entries yet, just sleep - it still handles new entries
			// and stop requests.
			timer = time.NewTimer(100000 * time.Hour)
		} else {
			// 生成一个定时器,距离最近的任务时间到时 触发定时器的channel,发送通知
			timer = time.NewTimer(c.entries[0].Next.Sub(now))
		}

		for {
			select {
			// 定时时间到了,执行定时任务,并设置下次执行的时刻
			case now = <-timer.C:
				now = now.In(c.location)
				c.logger.Info("wake", "now", now)

				// Run every entry whose next time was less than now
				//对每个定时任务尝试执行
				for _, e := range c.entries {
					if e.Next.After(now) || e.Next.IsZero() {
						break
					}
					c.startJob(e.WrappedJob)
					e.Prev = e.Next
					e.Next = e.Schedule.Next(now)
					c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
				}
			//新增的定时任务添加到 任务列表中
			case newEntry := <-c.add:
				timer.Stop()
				now = c.now()
				newEntry.Next = newEntry.Schedule.Next(now)
				c.entries = append(c.entries, newEntry)
				c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
			//获取 当前所有定时任务(快照)
			case replyChan := <-c.snapshot:
				replyChan <- c.entrySnapshot()
				continue
			//停止定时任务,timer停止即可完成此功能
			case <-c.stop:
				timer.Stop()
				c.logger.Info("stop")
				return
			//删除某个定时任务
			case id := <-c.remove:
				timer.Stop()
				now = c.now()
				c.removeEntry(id)
				c.logger.Info("removed", "entry", id)
			}

			break
		}
	}
}

Stop()实现

Stop() 用来停止Cron的运行,但已经在执行中的作业是不会被打断的,也就是从执行 Stop() 之后,不会再有新的任务被调度:

func (c *Cron) Stop() context.Context {
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	if c.running {
		c.stop <- struct{}{} // 会发出一个 stop 信号
		c.running = false
	}
	ctx, cancel := context.WithCancel(context.Background())
	go func() {
         // 等待所有已经在执行的任务执行完毕
		c.jobWaiter.Wait()
         // 会发出一个 cancelCtx.Done() 信号
		cancel()
	}()
	return ctx
}

Remove()实现

Remove() 用于移除一个任务:

func (c *Cron) Remove(id EntryID) {
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	if c.running {
		c.remove <- id // 会发出一个 remove 信号
	} else {
		c.removeEntry(id)
	}
}

func (c *Cron) removeEntry(id EntryID) {
	var entries []*Entry
	for _, e := range c.entries {
		if e.ID != id {
			entries = append(entries, e)
		}
	}
	c.entries = entries
}

小结

到此这篇关于Golang Cron 定时任务的内部实现的文章就介绍到这了, 其中重点如下:

Go Cron内部维护了两个结构体CronEntry,用于维护任务数据,cron.Start()执行后,cron的后台程序c.Run()就开始执行了,Run()是整个cron的一个核心,它负责处理cron开始执行后的大部分事情, run中会一直轮循c.entries中的entry, 每个entry都包含自己下一次执行的绝对时间,如果一个entry 允许执行了,就会开启单独的goroutine去执行这个任务。

以上就是通过源码分析Golang cron的实现原理的详细内容,更多关于Golang cron的资料请关注我们其它相关文章!

(0)

相关推荐

  • Go语言中定时任务库Cron使用方法介绍

    目录 快速入门 Cron表达式格式 预定义时间表 设置时区 常用的方法介绍 快速入门 安装cron,注意这里安装的是v3版本.新版本和旧版时间使用有所区别 go get github.com/robfig/cron/v3@v3.0.0 在项目中导入 import "github.com/robfig/cron/v3" v3版本的github.com/robfig/cron/v3默认解析器符合Cron 维基百科页面所描述的标准用法大致如下 package main import ( &q

  • Golang cron 定时器和定时任务的使用场景

    目录 Golang cron 定时器和定时任务 timer和ticker的区别 Timer Ticker cron 定时任务 参考链接: Golang cron 定时器和定时任务 Golang中time包有两个定时器,分别为 ticker 和 timer.两者都可以实现定时功能,但各自都有自己的使用场景. timer和ticker的区别 ticker定时器表示每隔一段时间就执行一次,一般可执行多次. timer定时器表示在一段时间后执行,默认情况下只执行一次,如果想再次执行的话,每次都需要调用

  • Go定时器cron的使用详解

    cron是什么 cron的意思就是:计划任务,说白了就是定时任务.我和系统约个时间,你在几点几分几秒或者每隔几分钟跑一个任务(job),就那么简单. cron表达式 cron表达式是一个好东西,这个东西不仅Java的quartZ能用到,Go语言中也可以用到.我没有用过Linux的cron,但网上说Linux也是可以用crontab -e 命令来配置定时任务.Go语言和Java中都是可以精确到秒的,但是Linux中不行. cron表达式代表一个时间的集合,使用6个空格分隔的字段表示: 字段名 是否

  • Golang Cron 定时任务的实现示例

    开门见山写一个 package main import ( "fmt" "github.com/robfig/cron" "log" "strings" "time" ) func CronTask() { log.Println("******** ******* *******") } func CronTest() { log.Println("Starting Cron

  • 一文带你入门Go语言中定时任务库Cron的使用

    目录 前言 快速开始 安装 导入 Demo Cron表达式格式 标准格式 预定义时间表 常用的方法介绍 new() AddJob() AddFunc() Start() 相关推荐 Go第三方库之cronexpr——解析 crontab 表达式 总结 前言 在平时的开发需求中,我们经常会有一些重复执行的操作需要触发执行,和系统约个时间,在几点几分几秒或者每隔几分钟跑一个任务,说白了就是定时任务,,想必大家第一反应都是linux的Crontab.其实定时任务不止使用系统自带的Crontab,在Go语

  • Go语言中定时器cron的基本使用教程

    cron是什么 cron的意思就是:计划任务,说白了就是定时任务.我和系统约个时间,你在几点几分几秒或者每隔几分钟跑一个任务(job),就那么简单. 前言 cron 是 robfig 开发的一个定时作业库,robfig 总是想的比别人早,给了我们这些 Gopher 不少急需的东西,想当年 revel 的出现也是这样的.看看 cron 的使用,还是一如既往的简洁明了,发现 Go 的世界里,有些产品还是有鲜明的个人特质的,那就是所谓的个人魅力吧?! 总之 robfig 开发的产品都是有一定超前性,比

  • 通过源码分析Golang cron的实现原理

    目录 前言 Demo示例 源码实现 结构体 Cron 和 Entry New()实现 AddFunc()实现 Start()实现 Run()实现 Stop()实现 Remove()实现 小结 前言 golang实现定时任务很简单,只须要简单几步代码即可以完成,最近在做了几个定时任务,想研究一下它内部是怎么实现的,所以将源码过了一遍,记录和分享在此.需要的朋友可以参考以下内容,希望对大家有帮助. 关于go cron是如何使用的可以参考之前的文章:一文带你入门Go语言中定时任务库Cron的使用 De

  • 通过源码分析iOS中的深拷贝与浅拷贝

    前言 关于iOS中对象的深拷贝和浅拷贝的文章有很多,但是大部分都是基于打印内存地址来推导结果,这篇文章是从源码的角度来分析深拷贝和浅拷贝. 深拷贝和浅拷贝的概念 拷贝的方式有两种:深拷贝和浅拷贝. 浅拷贝又叫指针拷贝,比如说有一个指针,这个指针指向一个字符串,也就是说这个指针变量的值是这个字符串的地址,那么此时对这个字符串进行指针拷贝的意思就是又创建了一个指针变量,这个指针变量的值是这个字符串的地址,也就是这个字符串的引用计数+1. 深拷贝又叫内容拷贝,比如有一个指针,这个指针指向一个字符串,也

  • 通过源码分析Python中的切片赋值

    本文主要介绍的关于Python切片赋值的相关内容,分享出来供大家参考学习,下面来一起看看详细的介绍: 昨天有同学问了我这么个问题: t = [1, 2, 3] t[1:1] = [7] # 感谢@一往直前 的疑问,之前写为 t[1:1] = 7了 print t # 输出 [1, 7, 2, 3] 这个问题之前还真没遇到过,有谁会对列表这么进行赋值吗?不过对于这个输出结果的原因确实值得去再了解下,毕竟之前也看过<Python源码分析>.(题外话:据说最近有大牛在写新的版本) 想着今天有空看看P

  • 通过源码分析Vue的双向数据绑定详解

    前言 虽然工作中一直使用Vue作为基础库,但是对于其实现机理仅限于道听途说,这样对长期的技术发展很不利.所以最近攻读了其源码的一部分,先把双向数据绑定这一块的内容给整理一下,也算是一种学习的反刍. 本篇文章的Vue源码版本为v2.2.0开发版. Vue源码的整体架构无非是初始化Vue对象,挂载数据data/props等,在不同的时期触发不同的事件钩子,如created() / mounted() / update()等,后面专门整理各个模块的文章.这里先讲双向数据绑定的部分,也是最主要的部分.

  • SpringBoot通过源码探究静态资源的映射规则实现

    我们开发一个Spring Boot项目,肯定要导入许多的静态资源,比如css,js等文件 如果我们是一个web应用,我们的main下会有一个webapp,我们以前都是将所有的页面导在这里面的,对吧!但是我们现在的pom呢,打包方式是为jar的方式,那么这种方式SpringBoot能不能来给我们写页面呢?当然是可以的,但是SpringBoot对于静态资源放置的位置,是有规定的! 1.静态资源映射规则 1.1.第一种映射规则 SpringBoot中,SpringMVC的web配置都在 WebMvcA

  • 如何通过源码了解Java的自动装箱拆箱详解

    目录 什么叫装箱 & 拆箱? 首先看一段代码 装箱(valueOf()) 为什么要有[-128,127]的缓存? 为什么是[-128,127]? 自动装箱带来的性能问题 小总结 拆箱(intValue) 补充:自动装箱.拆箱总是会发生吗? 总结 什么叫装箱 & 拆箱? 将int基本类型转换为Integer包装类型的过程叫做装箱,反之叫拆箱. 首先看一段代码 public static void main(String[] args) { Integer a = 127, b = 127;

  • Ubuntu18.04通过源码安装Odoo14的教程

    本系列背景介绍 Odoo 是一个基于Python语言构建的开源软件,面向企业应用的CRM,ERP等领域,其目标是对标SAP,Oracle等大型软件提供商,但却通过仅仅一个平台满足企业所有管理的业务需求. 本系列文章针对Odoo 14版,从系统安装,开发环境配置,代码结构,主要功能升级,源码赏析,Anodoo对Odoo的关键扩展等角度,预先给大家介绍即将在2020年发布的这一最新版本. 本篇概述 Odoo14的安装和历史版本差不多,同样也包括安装文件,源码,Docker等多种形式,本文则通过源码方

  • Jwt通过源码揭秘隐藏大坑

    目录 前言 集成JWT 坑在哪里 查看源码探索问题原因 总结 前言 JWT是目前最为流行的接口认证方案之一,有关JWT协议的详细内容,请参考:https://jwt.io/introduction 今天分享一下在使用JWT在项目中遇到的一个问题,主要是一个协议的细节,非常容易被忽略,如果不是自己遇到,或者去看源码的实现,我估计至少80%的人都会栽在这里,下面来还原一下这个问题的过程,由于这个问题出现有一定的概率,不是每次都会出现,所以才容易掉坑里. 集成JWT 在Asp.Net Core中集成J

  • 通过源码角度看看AccessibilityService

    简介 AccessibilityService的设计初衷是为了辅助有身体缺陷的群体使用Android应用,它的设计贯穿着Android的控件树View, ViewGroup, ViewRootImpl体系.借助于system_server进程的中转,能够注册Accessibility事件的客户端可以具备通过system_server提供的Accessibility服务来实现监听.操作其它应用视图的功能.这个功能十分强大,可以模拟用户的行为去操作其它APP,常常被用在自动化测试.微信抢红包.自动回

  • Hadoop源码分析六启动文件namenode原理详解

    1. namenode启动 在本系列文章三中分析了hadoop的启动文件,其中提到了namenode启动的时候调用的类为 org.apache.hadoop.hdfs.server.namenode.NameNode 其main方法的内容如下: public static void main(String argv[]) throws Exception { if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)

随机推荐