Python 4种实现定时任务的方案

目录
  • 1.利用 while True: + sleep() 实现定时任务
  • 2.使用 Timeloop 库运行定时任务
  • 3.利用 threading.Timer 实现定时任务
  • 4.利用内置模块 sched 实现定时任务

1.利用 while True: + sleep() 实现定时任务

位于 time 模块中的 sleep(secs) 函数,可以实现令当前执行的线程暂停 secs 秒后再继续执行。所谓暂停,即令当前线程进入阻塞状态,当达到 sleep() 函数规定的时间后,再由阻塞状态转为就绪状态,等待 CPU 调度。

基于这样的特性我们可以通过 while 死循环+sleep() 的方式实现简单的定时任务。

代码示例:

import datetime

import time

def time_printer():

    now = datetime.datetime.now()

    ts = now.strftime('%Y-%m-%d %H:%M:%S')

    print('do func time :', ts)

def loop_monitor():

    while True:

        time_printer()

        time.sleep(5)  # 暂停 5 秒

if __name__ == "__main__":

    loop_monitor()

主要缺点:

  • 只能设定间隔,不能指定具体的时间,比如每天早上 8:00
  • sleep 是一个阻塞函数,也就是说 sleep 这一段时间,程序什么也不能操作。

2.使用 Timeloop 库运行定时任务

Timeloop[2] 是一个库,可用于运行多周期任务。这是一个简单的库,它使用 decorator 模式在线程中运行标记函数。

示例代码:

import time

from timeloop import Timeloop

from datetime import timedelta

tl = Timeloop()

@tl.job(interval=timedelta(seconds=2))

def sample_job_every_2s():

    print "2s job current time : {}".format(time.ctime())

@tl.job(interval=timedelta(seconds=5))

def sample_job_every_5s():

    print "5s job current time : {}".format(time.ctime())

@tl.job(interval=timedelta(seconds=10))

def sample_job_every_10s():

    print "10s job current time : {}".format(time.ctime())

3.利用 threading.Timer 实现定时任务

threading 模块中的 Timer 是一个非阻塞函数,比 sleep 稍好一点,timer 最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。

Timer(interval, function, args=[ ], kwargs={ })

  • interval: 指定的时间
  • function: 要执行的方法
  • args/kwargs: 方法的参数

代码示例:

import datetime

from threading import Timer

def time_printer():

    now = datetime.datetime.now()

    ts = now.strftime('%Y-%m-%d %H:%M:%S')

    print('do func time :', ts)

    loop_monitor()

def loop_monitor():

    t = Timer(5, time_printer)

    t.start()

if __name__ == "__main__":

    loop_monitor()

备注:Timer 只能执行一次,这里需要循环调用,否则只能执行一次

4.利用内置模块 sched 实现定时任务

sched 模块实现了一个通用事件调度器,在调度器类使用一个延迟函数等待特定的时间,执行任务。同时支持多线程应用程序,在每个任务执行后会立刻调用延时函数,以确保其他线程也能执行。

class sched.scheduler(timefunc, delayfunc) 这个类定义了调度事件的通用接口,它需要外部传入两个参数,timefunc 是一个没有参数的返回时间类型数字的函数(常用使用的如 time 模块里面的 time),delayfunc 应该是一个需要一个参数来调用、与 timefunc 的输出兼容、并且作用为延迟多个时间单位的函数(常用的如 time 模块的 sleep)。

代码示例:

import datetime

import time

import sched

def time_printer():

    now = datetime.datetime.now()

    ts = now.strftime('%Y-%m-%d %H:%M:%S')

    print('do func time :', ts)

    loop_monitor()

def loop_monitor():

    s = sched.scheduler(time.time, time.sleep)  # 生成调度器

    s.enter(5, 1, time_printer, ())

    s.run()

if __name__ == "__main__":

    loop_monitor()

scheduler 对象主要方法:

  • enter(delay, priority, action, argument) ,安排一个事件来延迟 delay 个时间单位。
  • cancel(event):从队列中删除事件。如果事件不是当前队列中的事件,则该方法将跑出一个 ValueError
  • run():运行所有预定的事件。这个函数将等待(使用传递给构造函数的 delayfunc() 函数),然后执行事件,直到不再有预定的事件。

个人点评:比 threading.Timer 更好,不需要循环调用。

到此这篇关于Python 4种实现定时任务的方案的文章就介绍到这了,更多相关Python 实现定时任务方案内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Python 4种实现定时任务的方案

    目录 1.利用 while True: + sleep() 实现定时任务 2.使用 Timeloop 库运行定时任务 3.利用 threading.Timer 实现定时任务 4.利用内置模块 sched 实现定时任务 1.利用 while True: + sleep() 实现定时任务 位于 time 模块中的 sleep(secs) 函数,可以实现令当前执行的线程暂停 secs 秒后再继续执行.所谓暂停,即令当前线程进入阻塞状态,当达到 sleep() 函数规定的时间后,再由阻塞状态转为就绪状态

  • Python定时任务实现方案

    目录 1.定时任务 2.Python的定时任务 2.1 几种常见的方案 2.1.1 schedule 2.1.2 Jenkins 2.1.3 Celery 2.2 题外话之持久化 2.2.1 ApScheduler 我正在从0到1打造一个开源的接口测试平台, 也在编写一套与之对应的完整教程,希望大家多多支持. 1.定时任务 定时任务,顾名思义: 定时执行的任务,可以是一段bash命令,也可以是一个脚本文件.通常用于我们需要在特定时刻做事情. 举个例子: 每晚8点执行全业务场景接口自动化回归测试,

  • Python Celery动态添加定时任务生产实践指南

    目录 一.背景 二.Celery动态添加定时任务的官方文档 三.celery简单实用 3.1 基础环境配置 3.2 测试使用Celery应用 四.配置backend存储任务执行结果 四.优化Celery目录结构 五.开始使用django-celery-beat调度器 六.具体操作演练 6.1 创建基于间隔时间的周期性任务 6.2 创建一个不带参数的周期性间隔任务 6.3 周期性任务的查询.删除操作 总结 一.背景 实际工作中会有一些耗时的异步任务需要使用定时调度,比如发送邮件,拉取数据,执行定时

  • python 两种方法修改文件的创建时间、修改时间、访问时间

    突如其来想知道一下 python 如何修改文件的属性(创建.修改.访问时间),于是就去网上搜集了可行方案,也就有了这篇博客 方案一 from win32file import CreateFile, SetFileTime, GetFileTime, CloseHandle from win32file import GENERIC_READ, GENERIC_WRITE, OPEN_EXISTING from pywintypes import Time # 可以忽视这个 Time 报错(运行

  • 比较几种Redis集群方案

    目录 一.概述 二.Redis高可用集群搭建 三.Redis集群节点间的通信机制 3.1.集中式 3.2.gossip 四.网络抖动 五.Redis集群选举原理分析 5.1.集群是否完整才能对外提供服务 5.2.Redis集群为什么至少需要三个master节点,并且推荐节点数为奇数? 5.3.哨兵leader选举流程 六.新增/删除节点 一.概述 在Redis3.0以前的集群一般是借助哨兵sentinel工具来监控主节点的状态,如果主节点异常,则会做主从切换,将某一台slave作为master.

  • 浅谈python 四种数值类型(int,long,float,complex)

    Python支持四种不同的数值类型,包括int(整数)long(长整数)float(浮点实际值)complex (复数),本文章向码农介绍python 四种数值类型,需要的朋友可以参考一下. 数字数据类型存储数值.他们是不可改变的数据类型,这意味着改变数字数据类型的结果,在一个新分配的对象的值. Number对象被创建,当你给他们指派一个值.例如: var1 = 1 var2 = 10 您也可以删除数字对象的参考,使用del语句. del语句的语法是: del var1[,var2[,var3[

  • Linux下Python脚本自启动与定时任务详解

    前言 最近同事问了一个关于Python脚本自启动与定时任务的问题,发现很多的朋友对这块都不是特别的熟悉,所以本文主要给大家介绍的是关于Linux下Python脚本自启动与定时任务的相关内容,分享出来供大家参考学习,话不多说了,来一起看看详细的介绍: 一.让Python随Linux开机自动运行 准备好要自启的脚本auto.py 用root权限编辑以下文件 sudo vim /ect/rc.local 在exit 0上面编辑启动脚本的命令 /usr/bin/python3.5 /home/edgar

  • Python三种遍历文件目录的方法实例代码

    本文实例代码主要实现的是python遍历文件目录的操作,有三种方法,具体代码如下. #coding:utf-8 # 方法1:递归遍历目录 import os def visitDir(path): li = os.listdir(path) for p in li: pathname = os.path.join(path,p) if not os.path.isfile(pathname): #判断路径是否为文件,如果不是继续遍历 visitDir(pathname) else: print

  • python几种常用功能实现代码实例

    这篇文章主要介绍了python几种常用功能实现代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1.python 程序退出的几种方式 import sys sys.exit() sys.exit(0) sys.exit(1) 或者 os._exit() 该方法中包含一个参数status,默认为0,表示正常退出,也可以为1,表示异常退出 2. python实现获取电脑IP.主机名.Mac地址 import socket import uui

  • 人人都能看懂的 6 种限流实现方案(纯干货)

    为了上班方便,去年我把自己在北郊的房子租出去了,搬到了南郊,这样离我上班的地方就近了,它为我节约了很多的时间成本,我可以用它来做很多有意义的事,最起码不会因为堵车而闹心了,幸福感直线上升. 但即使这样,生活也有其他的烦恼.南郊的居住密度比较大,因此停车就成了头痛的事,我租的是路两边的非固定车位,每次只要下班回来,一定是没有车位停了,因此我只能和别人的车并排停着,但这样带来的问题是,我每天早上都要被挪车的电话给叫醒,心情自然就不用说了. 但后来几天,我就慢慢变聪明了,我头天晚上停车的时候,会找第二

随机推荐