Python使用apscheduler模块设置定时任务的实现

目录
  • 一、安装
  • 二、ApScheduler 简介
    • 1 APScheduler的组件
    • 2 调度器的种类
    • 3 内置的触发器类型
  • 三、使用举例
    • 1 使用date类型的触发器
    • 2 使用interval类型的触发器
    • 3 使用cron类型的触发器
  • 四、定时器使用装饰器的方法

一、安装

pip install apscheduler

二、ApScheduler 简介

1 APScheduler的组件

triggers:触发器
triggers包含任务执行的调度逻辑,决定任务按照什么逻辑进行定时执行

job stores;任务存储器
存储了调度任务

executors:执行器
用例执行任务的,包含线程池以及进程池等的创建和调用等等

schedulers:调度器
属于控制面,将其他几个方面组织起来的作用、

2 调度器的种类

调度器有以下几种常见类型,其中最常用的BackgroundScheduler,即非阻塞式,因为在一般情况下,定时任务都会在放到web服务中,如果使用阻塞式,则无法启动web服务,而使用非阻塞式,则将定时任务设定后,就不管了,继续执行后面的web服务,只要web服务在运行,定时任务就是一直有效的

  • BlockingScheduler: 阻塞式
  • BackgroundScheduler: 非阻塞式(后台运行)
  • AsyncIOScheduler: 当使用asyncio模块时使用
  • GeventScheduler: 当使用gevent模块时使用
  • TornadoScheduler: 构建Tornado应用时使用
  • TwistedScheduler: 构建Twisted应用时使用
  • QtScheduler: 构建Qt应用时使用

3 内置的触发器类型

  • date: 在某个时间点执行一次时使用
  • interval: 固定的时间间隔循环执行时使用
  • cron: 在一天中特定的时间点执行时使用
  • calendarinterval: 当想在在一天中特定时间点或以日历为基础的时间间隔内执行时使用

三、使用举例

这里jiu就以非阻塞式BackgroundScheduler调度器为例展开

1 使用date类型的触发器

如下,使用了三种设置日期和时间的方法

from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime

def do_func(name,age):
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))

def main():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    sched=BackgroundScheduler()

    # 通过date 设置指定日期执行
    sched.add_job(do_func,trigger="date",run_date=date(2022,5,25),args=("张三丰",100))

    # 通过datetime,设置指定日期额指定时刻执行
    sched.add_job(do_func, trigger="date", run_date=datetime(2022, 5, 25,14,0,10), args=("张三丰", 100))

    # 直接使用文本的方式指定日期和时刻表
    sched.add_job(do_func, trigger="date", run_date="2022-05-25 14:0:20", args=("张三丰", 100))

    sched.start()

if __name__=="__main__":
    main()
    while True:
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(1)

执行结果如下,可以发现,第一个通过date指定日期的默认是0点执行,显然时间已经过了,不会执行,第二和第三个则在规定的时间点执行了,这里还需要注意的是,通过打印可以看出,main函数执行完成后,已经开始执行main函数下面的while循环打印语句了,而在执行循环的过程中,定时任务仍然生效,这就是非阻塞式调度器的原理,如果是阻塞式,则在此代码中,会一直卡在main函数中,main下面额while循环语句是不会执行的,因此在实际使用中,非阻塞式应用的是非常多的

2022-05-25 14:00:02
2022-05-25 14:00:02
C:\Users\hitre\.virtualenvs\zentaolinkgitlab-QCS5yxD9\lib\site-packages\apscheduler\util.py:436: PytzUsageWarning: The localize method is no longer necessary, as this time zone supports the fold attribute (PEP 495). For more details on migrating to a PEP 495-compliant implementation, see https://pytz-deprecation-shim.readthedocs.io/en/latest/migration.html
  return tzinfo.localize(dt)
Run time of job "do_func (trigger: date[2022-05-25 00:00:00 CST], next run at: 2022-05-25 00:00:00 CST)" was missed by 14:00:02.088260
2022-05-25 14:00:03
2022-05-25 14:00:04
2022-05-25 14:00:05
2022-05-25 14:00:06
2022-05-25 14:00:07
2022-05-25 14:00:08
2022-05-25 14:00:09
2022-05-25 14:00:10 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:00:10
2022-05-25 14:00:11
2022-05-25 14:00:12
2022-05-25 14:00:13
2022-05-25 14:00:14
2022-05-25 14:00:15
2022-05-25 14:00:16
2022-05-25 14:00:17
2022-05-25 14:00:18
2022-05-25 14:00:19
2022-05-25 14:00:20 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:00:20
2022-05-25 14:00:21
2022-05-25 14:00:22

2 使用interval类型的触发器

如下代码演示了时间间隔循环执行的使用例子

from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime

def do_func(name,age):
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))

def main():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    sched=BackgroundScheduler()

    # 每3秒执行一次
    sched.add_job(do_func,trigger="interval",args=("张三丰",100),seconds=3)
    # 每3分钟执行一次
    sched.add_job(do_func, trigger="interval", args=("张三丰", 100), minutes=3)
    # 每3小时执行一次
    sched.add_job(do_func, trigger="interval", args=("张三丰", 100), hours=3)
    # 每3天执行一次
    sched.add_job(do_func, trigger="interval", args=("张三丰", 100), days=3)
    # 每3周执行一次
    sched.add_job(do_func, trigger="interval", args=("张三丰", 100), weeks=3)

    sched.start()

if __name__=="__main__":
    main()
    while True:
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(1)

上面的代码中因为时间跨度比较大,这里只演示妹3秒执行一次的代码

代码如下:

from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime

def do_func(name,age):
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))

def main():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    sched=BackgroundScheduler()

    # 每3秒执行一次
    sched.add_job(do_func,trigger="interval",args=("张三丰",100),seconds=3)

    sched.start()

if __name__=="__main__":
    main()
    while True:
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(1)

执行结果如下:

2022-05-25 14:14:04
2022-05-25 14:14:04
2022-05-25 14:14:05
2022-05-25 14:14:06
2022-05-25 14:14:07 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:14:07
2022-05-25 14:14:08
2022-05-25 14:14:09
2022-05-25 14:14:10 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:14:10
2022-05-25 14:14:11
2022-05-25 14:14:12
2022-05-25 14:14:13 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:14:13

3 使用cron类型的触发器

cron的触发器有点类似linux上crontab定时器的使用,代码如下:

from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime

def do_func(name,age):
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))

def main():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    sched=BackgroundScheduler()

    # 任务会在6月、7月、8月、11月和12月的第三个周五,00:00、01:00、02:00和03:00触发
    sched.add_job(do_func,trigger="cron",month='6-8,11-12', day='3rd fri', hour='0-3',args=("张三丰",100))

    sched.start()

if __name__=="__main__":
    main()
    while True:
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(1)

这里需要注意的是,可以省略不需要的字段。当省略时间参数时,在显式指定参数之前的参数会被设定为*,之后的参数会被设定为最小值,week 和day_of_week的最小值为*

day=1, minute=20
等同于
year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0

此外,也可以直接使用crontab表达式,如下:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import time
from datetime import date
from datetime import datetime

def do_func(name,age):
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))

def main():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    sched=BackgroundScheduler()

    # 任务会在6月、7月、8月、11月和12月的第三个周五,00:00、01:00、02:00和03:00触发
    sched.add_job(do_func,trigger=CronTrigger.from_crontab('48 10 1-15 sep-nov *'),args=("张三丰",100))

    sched.start()

if __name__=="__main__":
    main()
    while True:
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(1)

四、定时器使用装饰器的方法

以间隔时间循环执行的代码为例,如下为未使用装饰器的方式

from apscheduler.schedulers.background import BackgroundScheduler
import time
from datetime import date
from datetime import datetime

def do_func(name,age):
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))

def main():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    sched=BackgroundScheduler()

    # 每3秒执行一次
    sched.add_job(do_func,trigger="interval",args=("张三丰",100),seconds=3)

    sched.start()

if __name__=="__main__":
    main()
    while True:
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(1)

修改为使用装饰器的方式如下:

from apscheduler.schedulers.background import BackgroundScheduler
import time

sched=BackgroundScheduler()

@sched.scheduled_job(trigger="interval",args=("张三丰",100),seconds=3)
def do_func(name,age):
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))+" in do func : 姓名:"+name+" 年龄:"+str(age))

if __name__=="__main__":
    sched.start()
    while True:
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(1)

执行结果如下:

2022-05-25 14:34:10
2022-05-25 14:34:11
2022-05-25 14:34:12
2022-05-25 14:34:13 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:34:13
2022-05-25 14:34:14
2022-05-25 14:34:15
2022-05-25 14:34:16 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:34:16
2022-05-25 14:34:17
2022-05-25 14:34:18
2022-05-25 14:34:19 in do func : 姓名:张三丰 年龄:100
2022-05-25 14:34:19
2022-05-25 14:34:20

到此这篇关于Python使用apscheduler模块设置定时任务的实现的文章就介绍到这了,更多相关Python apscheduler定时任务内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • python定时任务apscheduler的详细使用教程

    目录 前言 安装 主要组成部分 简单应用 完整代码 总结 前言 我们项目中总是避免不了要使用一些定时任务,比如说最近的项目,用户点击报名考试以后需要在考试日期临近的时候推送小程序消息提醒到客户微信上,翻了翻 fastapi 中的实现,虽然方法和包也不少,但是要不就是太重了(比如需要再开服务,还要依赖 redis,都不好用),虽然也可以使用 time 模块的 time.sleep()机上 fastapi 的后台任务变相实现,但是相对简单的功能还行,复杂点的代码起来就麻烦了,所以还是专人专事找个负责

  • Python中定时任务框架APScheduler的快速入门指南

    前言 大家应该都知道在编程语言中,定时任务是常用的一种调度形式,在Python中也涌现了非常多的调度模块,本文将简要介绍APScheduler的基本使用方法. 一.APScheduler介绍 APScheduler是基于Quartz的一个python定时任务框架,实现了Quartz的所有功能,使用起来十分方便.提供了基于日期.固定时间间隔以及crontab类型的任务,并且可以持久化任务. APScheduler提供了多种不同的调度器,方便开发者根据自己的实际需要进行使用:同时也提供了不同的存储机

  • Python定时任务APScheduler原理及实例解析

    定时任务: 1. 线程睡眠函数 sleep() --粗暴!一直占有 CPU 资源,导致后续操作无法执行 2. threading.Timer(10, task, ()).start() # (间隔s,任务task, 函参) 3. import sched # 初始化 sched 模块的 scheduler 类 scheduler = sched.scheduler(time.time, time.sleep) # 增加调度任务 enter(delay, priority, action, arg

  • Python定时任务APScheduler的实例实例详解

    APScheduler 支持三种调度任务:固定时间间隔,固定时间点(日期),Linux 下的 Crontab 命令.同时,它还支持异步执行.后台执行调度任务. 一.基本架构 触发器 triggers:设定触发任务的条件 描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发 任务存储器 job stores:存放任务,可以放内存(默认)或数据库 注:调度器之间不能共享任务存储器 执行器 executors:用于执行任务,可设定执行模式 将指定的作业提交到线程池或者进程

  • Python定时任务工具之APScheduler使用方式

    APScheduler (advanceded python scheduler)是一款Python开发的定时任务工具. 文档地址 apscheduler.readthedocs.io/en/latest/u- 特点: 不依赖于Linux系统的crontab系统定时,独立运行 可以 动态添加 新的定时任务,如下单后30分钟内必须支付,否则取消订单,就可以借助此工具(每下一单就要添加此订单的定时任务) 对添加的定时任务可以做持久保存 1 安装 pip install apscheduler 2 组

  • Python使用APScheduler实现定时任务过程解析

    前言 APScheduler是基于Quartz的一个Python定时任务框架.提供了基于日期.固定时间间隔以及crontab类型的任务,并且可以持久化任务. 在线文档:https://apscheduler.readthedocs.io/en/latest/userguide.html 一.安装APScheduler pip install apscheduler 二.基本概念 APScheduler有四大组件: 1.触发器 triggers : 触发器包含调度逻辑.每个作业都有自己的触发器,用

  • Python定时任务框架APScheduler原理及常用代码

    APScheduler简介 在平常的工作中几乎有一半的功能模块都需要定时任务来推动,例如项目中有一个定时统计程序,定时爬出网站的URL程序,定时检测钓鱼网站的程序等等,都涉及到了关于定时任务的问题,第一时间想到的是利用time模块的time.sleep()方法使程序休眠来达到定时任务的目的,虽然这样也可以,但是总觉得不是那么的专业,^_^所以就找到了python的定时任务模块APScheduler: APScheduler基于Quartz的一个Python定时任务框架,实现了Quartz的所有功

  • Python定时任务APScheduler安装及使用解析

    1.简介 APScheduler是一个 Python 定时任务框架,使用起来十分方便.提供了基于日期.固定时间间隔以及 crontab 类型的任务,并且可以持久化任务.并以 daemon 方式运行应用. 2.APScheduler四个组件 APScheduler 四个组件分别为:触发器(trigger),作业存储(job store),执行器(executor),调度器(scheduler). 触发器(trigger) 包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行.除

  • Python使用apscheduler模块设置定时任务的实现

    目录 一.安装 二.ApScheduler 简介 1 APScheduler的组件 2 调度器的种类 3 内置的触发器类型 三.使用举例 1 使用date类型的触发器 2 使用interval类型的触发器 3 使用cron类型的触发器 四.定时器使用装饰器的方法 一.安装 pip install apscheduler 二.ApScheduler 简介 1 APScheduler的组件 triggers:触发器triggers包含任务执行的调度逻辑,决定任务按照什么逻辑进行定时执行 job st

  • Python使用crontab模块设置和清除定时任务操作详解

    本文实例讲述了Python使用crontab模块设置和清除定时任务操作.分享给大家供大家参考,具体如下: centos7下安装Python的pip root用户使用yum install -y python-pip 时会报如下错误: No package python-pip available Error:Nothing to do 解决方法如下: 首先安装epel扩展源: yum -y install epel-release 更新完成之后,就可安装pip: yum -y install p

  • 详解Python利用APScheduler框架实现定时任务

    目录 背景 样例代码 代码详解 执行结果 知识点补充 背景 最近在做一些python工具的时候,常常会碰到定时器问题,总觉着使用threading.timer或者schedule模块非常不优雅.所以这里给自己做个记录,也分享一个定时任务框架APScheduler.具体的架构原理就不细说了,用个例子说明一下怎么简易的使用. 样例代码 先上样例代码,如下: #!/user/bin/env python # coding=utf-8 """ @project : csdn @aut

  • Python中schedule模块关于定时任务使用方法

    目录 1 取消定时任务 2 定时任务只执行一次 3 获取所有的定时任务 4 取消所有任务 5 给定时任务打标签,同样通过标签获取或取消定时任务 1 取消定时任务 比如当满足一定条件时,就取消定时任务,在这种场景下,不可能说把进程干掉,所以可以利用取消定时任务的功能 如下代码,通过count控制当执行了5此以后,就取消定时任务 import schedule import time count=0 def do_func(name,age): global count count+=1 print

  • Linux部署python爬虫脚本,并设置定时任务的方法

    去年因项目需要,用python写了个爬虫.因爬到的数据需要存到生产环境的PG数据库.所以需要将脚本部署到CentOS服务器,并设置定时任务,自动启动脚本. 实施步骤如下: 1.安装pip(操作系统自带了python2.6可以直接用,但是没有pip) # 下载pip安装包 wget "https://pypi.python.org/packages/source/p/pip/pip-1.5.4.tar.gz#md5=834b2904f92d46aaa333267fb1c922bb" --

  • python 基于Apscheduler实现定时任务

    导语 在工作场景遇到了这么一个场景,就是需要定期去执行一个缓存接口,用于同步设备配置.首先想到的就是Linux上的crontab,可以定期,或者间隔一段时间去执行任务.但是如果你想要把这个定时任务作为一个模块集成到Python项目中,或者想持久化任务,显然crontab不太适用.Python的APScheduler模块能够很好的解决此类问题,所以专门写这篇文章,从简单入门开始记录关于APScheduler最基础的使用场景,以及解决持久化任务的问题,最后结合其他框架深层次定制定时任务模块这几个点入

  • Python中schedule模块定时任务的使用方法(2)

    目录 1 设置时间间隔随机数 2 设置定时任务执行到指定时间 3 计算当前到下一次执行的时间差单位为秒 上一篇文章Python中schedule模块关于定时任务使用方法 1 设置时间间隔随机数 在有一些场景下,为了模拟比较自然的情景,需要采用随机的时间间隔,这就派上用场了 如下代码,设置随机间隔从2秒到10秒之间取随机数 import schedule import time def do_func(name,age): print(time.strftime('%Y-%m-%d %H:%M:%

  • Python使用matplotlib模块绘制图像并设置标题与坐标轴等信息示例

    本文实例讲述了Python使用matplotlib模块绘制图像并设置标题与坐标轴等信息.分享给大家供大家参考,具体如下: 进行图像绘制有时候需要设定坐标轴以及图像标题等信息,示例代码如下: #-*- coding: utf-8 -*- #!/usr/bin/python import matplotlib.pyplot as plt from numpy.random import randn x = range(100) y = randn(100) fig = plt.figure() ax

  • Node.js设置定时任务之node-schedule模块的使用详解

    node-schedule是 Node.js 的一个定时任务(crontab)模块.我们可以使用定时任务来对服务器系统进行维护,让其在固定的时间段执行某些必要的操作,还可以使用定时任务发送邮件.爬取数据等: 一.安装 npm install node-schedule # 或 yarn add node-schedule 二.基础用法 使用schedule.scheduleJob()即可创建一个定时任务,一个简单的上手示例: const schedule = require('node-sche

  • 如何给windows设置定时任务并运行python脚本

    目录 1. 创建定时任务 1.1 计划任务 2.2 基本任务 2.3 命名 2.4 执行时间 2.5执行时间 2.6 启动程序 2.7 选择启动程序 2.8完成 3. 结尾 4. 完工 自己用python写了一个签到脚本,经过测试已经可以成功打卡,于是研究了一下windows定时运行程序 1. 创建定时任务 1.1 计划任务 打开"控制面板 "–>"系统和安全"–>"管理工具"–>"计划任务",如图 2.2

随机推荐