Python任务调度利器之APScheduler详解

任务调度应用场景

所谓的任务调度是指安排任务的执行计划,即何时执行,怎么执行等。在现实项目中经常出现它们的身影;特别是数据类项目,比如实时统计每5分钟网站的访问量,就需要每5分钟定时从日志数据分析访问量。

总结下任务调度应用场景:

  • 离线作业调度:按时间粒度执行某项任务
  • 共享缓存更新:定时刷新缓存,如redis缓存;不同进程间的共享数据

任务调度工具

  • linux的crontab, 支持按照分钟/小时/天/月/周粒度,执行任务
  • java的Quartz
  • windows的任务计划

本文介绍的是python中的任务调度库,APScheduler(advance python scheduler)。如果你了解Quartz的话,可以看出APScheduler是Quartz的python实现;APScheduler提供了基于时间,固定时间点和crontab方式的任务调用方案, 可以当作一个跨平台的调度工具来使用。

APScheduler

组件介绍

APScheduler由5个部分组成:触发器、调度器、任务存储器、执行器和任务事件。

  • 任务job:任务id和任务执行func
  • 触发器triggers:确定任务何时开始执行
  • 任务存储器job stores: 保存任务的状态
  • 执行器executors:确定任务怎么执行
  • 任务事件event:监控任务执行异常情况
  • 调度器schedulers:串联任务的整个生命周期,添加编辑任务到任务存储器,在任务的执行时间到来时,把任务交给执行器执行返回结果;同时发出事件监听,监控任务事件 。

安装

pip install apscheduler

简单例子

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import logging
import datetime
# 任务执行函数
def job_func(job_id):
 print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# 事件监听
def job_exception_listener(event):
 if event.exception:
 # todo:异常处理, 告警等
 print('The job crashed :(')
 else:
 print('The job worked :)')
# 日志
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
# 定义一个后台任务非阻塞调度器
scheduler = BackgroundScheduler()
# 添加一个任务到内存中
# 触发器:trigger='interval' seconds=10 每10s触发执行一次
# 执行器:executor='default' 线程执行
# 任务存储器:jobstore='default' 默认内存存储
# 最大并发数:max_instances
scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
# 设置任务监听
scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 启动调度器
scheduler.start() 

运行情况:

job 1 is runed at 2020-03-21 20:00:38 
The job worked :) 
job 1 is runed at 2020-03-21 20:00:48 
The job worked :) 
job 1 is runed at 2020-03-21 20:00:58 
The job worked :)

触发器

触发器决定何时执行任务,APScheduler支持的触发器有3种

trigger='interval':按固定时间周期执行,支持weeks,days,hours,minutes, seconds, 还可指定时间范围

sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00')

trigger='date': 固定时间,执行一次

sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])

trigger='cron': 支持crontab方式,执行任务

参数:分钟/小时/天/月/周粒度,也可指定时间范围

year (int|str) – 4-digit year
 month (int|str) – month (1-12)
 day (int|str) – day of the (1-31)
 week (int|str) – ISO week (1-53)
 day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
 hour (int|str) – hour (0-23)
 minute (int|str) – minute (0-59)
 second (int|str) – second (0-59)
 start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
 end_date (datetime|str) – latest possible date/time to trigger on (inclusive) 

例子

# 星期一到星期五,5点30执行任务job_function,直到2014-05-30 00:00:00
  sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
  # 按照crontab格式执行, 格式为:分钟 小时 天 月 周,*表示所有
  # 5月到8月的1号到15号,0点0分执行任务job_function
  sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *')) 

执行器

执行器决定如何执行任务;APScheduler支持4种不同执行器,常用的有pool(线程/进程)和gevent(io多路复用,支持高并发),默认为pool中线程池, 不同的执行器可以在调度器的配置中进行配置(见调度器)

  • apscheduler.executors.asyncio:同步io,阻塞
  • apscheduler.executors.gevent:io多路复用,非阻塞
  • apscheduler.executors.pool: 线程ThreadPoolExecutor和进程ProcessPoolExecutor
  • apscheduler.executors.twisted:基于事件驱动

任务存储器

任务存储器决定任务的保存方式, 默认存储在内存中(MemoryJobStore),重启后就没有了。APScheduler支持的任务存储器有:

  • apscheduler.jobstores.memory:内存
  • apscheduler.jobstores.mongodb:存储在mongodb
  • apscheduler.jobstores.redis:存储在redis
  • apscheduler.jobstores.rethinkdb:存储在rethinkdb
  • apscheduler.jobstores.sqlalchemy:支持sqlalchemy的数据库如mysql,sqlite等
  • apscheduler.jobstores.zookeeper:zookeeper

不同的任务存储器可以在调度器的配置中进行配置(见调度器)

调度器

APScheduler支持的调度器方式如下,比较常用的为BlockingScheduler和BackgroundScheduler

  • BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回。
  • BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用start后主线程不会阻塞。
  • AsyncIOScheduler:适用于使用了asyncio模块的应用程序。
  • GeventScheduler:适用于使用gevent模块的应用程序。
  • TwistedScheduler:适用于构建Twisted的应用程序。
  • QtScheduler:适用于构建Qt的应用程序。

从前面的例子,我们可以看到,调度器可以操作任务(并为任务指定触发器、任务存储器和执行器)和监控任务。

scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)

我们来详细看下各个部分

调度器配置:在add_job我们看到jobstore和executor都是default,APScheduler在定义调度器时可以指定不同的任务存储和执行器,以及初始的参数

from pytz import utc
 from apscheduler.schedulers.background import BackgroundScheduler
 from apscheduler.jobstores.mongodb import MongoDBJobStore
 from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
 from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
 # 通过dict方式执行不同的jobstores、executors和默认的参数
 jobstores = {
 'mongo': MongoDBJobStore(),
 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
 }
 executors = {
 'default': ThreadPoolExecutor(20),
 'processpool': ProcessPoolExecutor(5)
 }
 job_defaults = {
 'coalesce': False,
 'max_instances': 3
 }
 # 定义调度器
 scheduler = BackgroundScheduler(jobstoresjobstores=jobstores, executorsexecutors=executors, job_defaultsjob_defaults=job_defaults, timezone=utc)
 def job_func(job_id):
 print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
 # 添加任务
 scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', jobstore='default', executor='processpool', seconds=10)
 # 启动调度器
 scheduler.start() 

操作任务:调度器可以增加,删除,暂停,恢复和修改任务。需要注意的是这里的操作只是对未执行的任务起作用,已经执行和正在执行的任务不受这些操作的影响。

add_job

scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)

remove_job: 通过任务唯一的id,删除的时候对应的任务存储器里记录也会删除

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
 scheduler.remove_job('my_job_id') 

Pausing and resuming jobs:暂停和重启任务

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
 scheduler.pause_job('my_job_id')
 scheduler.resume_job('my_job_id') 

Modifying jobs:修改任务的配置

job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id', max_instances=10)
 # 修改任务的属性
 job.modify(max_instances=6, name='Alternate name')
 # 修改任务的触发器
 scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5') 

监控任务事件类型,比较常用的类型有:

  • EVENT_JOB_ERROR: 表示任务在执行过程的出现异常触发
  • EVENT_JOB_EXECUTED:任务执行成功时
  • EVENT_JOB_MAX_INSTANCES:调度器上执行的任务超过配置的参数时

scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

总结

到此这篇关于Python任务调度利器之APScheduler详解的文章就介绍到这了,更多相关python任务调度 APScheduler内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • python编写网页爬虫脚本并实现APScheduler调度

    前段时间自学了python,作为新手就想着自己写个东西能练习一下,了解到python编写爬虫脚本非常方便,且最近又学习了MongoDB相关的知识,万事具备只欠东风. 程序的需求是这样的,爬虫爬的页面是京东的电子书网站页面,每天会更新一些免费的电子书,爬虫会把每天更新的免费的书名以第一时间通过邮件发给我,通知我去下载. 一.编写思路: 1.爬虫脚本获取当日免费书籍信息 2.把获取到的书籍信息与数据库中的已有信息作比较,如果书籍存在不做任何操作,书籍不存在,执行插入数据库的操作,把数据的信息存入Mo

  • 详解Python下Flask-ApScheduler快速指南

    引言:Flask是Python社区非常流行的一个Web开发框架,本文将尝试将介绍APScheduler应用于Flask之中. 1. Flask介绍 Flask是Python社区大名鼎鼎的"microframework",基于简单的核心,使用extension来增加其他功能,其提供非常丰富易用的扩展包, 比如: 2.  Flask-APScheduler 社区提供了一个Flask-APScheduler的模块,方便大家直接在Flask模块中使用APScheduler. 关于安装的命令,仍

  • 详解Python 定时框架 Apscheduler原理及安装过程

    在我们的日常工作自动化测试当中,几乎超过一半的功能都需要利用定时的任务来推动触发,例如在我们项目中有一个定时监控模块,根据自己设置的频率定时跑测试用例,定时检测是否存在线上紧急任务等等,这些都涉及到了有关定时任务的问题,很多情况下,大多数人会选择window的任务计划程序,但如果程序不在window平台下运行,就不能定时启动了:当然也可利用time模块的time.sleep()方法使程序休眠来达到定时任务的目的,但定时任务多了,代码可能看起来不太那么友好且有很大的局限性,因此,此时的 Apsch

  • Python中定时任务框架APScheduler的快速入门指南

    前言 大家应该都知道在编程语言中,定时任务是常用的一种调度形式,在Python中也涌现了非常多的调度模块,本文将简要介绍APScheduler的基本使用方法. 一.APScheduler介绍 APScheduler是基于Quartz的一个python定时任务框架,实现了Quartz的所有功能,使用起来十分方便.提供了基于日期.固定时间间隔以及crontab类型的任务,并且可以持久化任务. APScheduler提供了多种不同的调度器,方便开发者根据自己的实际需要进行使用:同时也提供了不同的存储机

  • python任务调度实例分析

    本文实例讲述了python任务调度实现方法.分享给大家供大家参考.具体如下: 方法1: import sched, time import os s = sched.scheduler(time.time, time.sleep) #scheduler的两个参数用法复杂,可以不做任何更改 def playmusic(x): os.system(x) def jobtodo(): tmlist = [2011,8,11,22,15,0,0,0,0] x1=time.mktime(tmlist) x

  • 详解python调度框架APScheduler使用

    最近在研究python调度框架APScheduler使用的路上,那么今天也算个学习笔记吧! # coding=utf-8 """ Demonstrates how to use the background scheduler to schedule a job that executes on 3 second intervals. """ from datetime import datetime import time import os

  • Python任务调度利器之APScheduler详解

    任务调度应用场景 所谓的任务调度是指安排任务的执行计划,即何时执行,怎么执行等.在现实项目中经常出现它们的身影:特别是数据类项目,比如实时统计每5分钟网站的访问量,就需要每5分钟定时从日志数据分析访问量. 总结下任务调度应用场景: 离线作业调度:按时间粒度执行某项任务 共享缓存更新:定时刷新缓存,如redis缓存:不同进程间的共享数据 任务调度工具 linux的crontab, 支持按照分钟/小时/天/月/周粒度,执行任务 java的Quartz windows的任务计划 本文介绍的是pytho

  • python周期任务调度工具Schedule使用详解

    目录 1.准备 2.基本使用 参数传递 获取目前所有的作业 取消所有作业 标签功能 设定作业截止时间 3.高级使用 装饰器安排作业 并行执行 日志记录 异常处理 如果你想周期性地执行某个 Python 脚本,最出名的选择应该是 Crontab 脚本,但是 Crontab 具有以下缺点: 1.不方便执行秒级任务. 2.当需要执行的定时任务有上百个的时候,Crontab 的管理就会特别不方便. 还有一个选择是 Celery,但是 Celery 的配置比较麻烦,如果你只是需要一个轻量级的调度工具,Ce

  • python实现AI聊天机器人详解流程

    前言 开始几天,我是使用很原始的方法,自己去获取天气预报截图,再手动发送给小姐姐.连续几天之后我一想:不对呀,我怎么说也是一个程序猿,怎么能用这么 low 的方式呢. 联想起之前看到的一个开源 python 库-- wxpy,一个非常强大的微信 api 调用类库,正好满足我当前的需要,那话不多说,开干. 任务分解 调用微信 api 发送简单消息 获取当日天气预报截图信息 设置定时任务 调用微信 api 发送简单消息 本程序主要是通过 wxpy 库使用的,参考其官网文档,我们需要做如下准备工作:

  • Python制作微信机器人教程详解

    目录 一.环境配置 二.登录 三. 第一个简单的消息发送监控 四.指定某个微信好友发送消息 五.所有微信群监控 六.公众号监听 七.定时发送消息 八.微信智能聊天机器人 一.环境配置 大多数人无法登录网页版,所以饶过它模拟电脑登录,这个模块一定记得安装: pip install itchat-uos pip install itchat 二.登录 #码登录个人微信账号 import itchat itchat.auto_login(hotReload=True)#hotReload= True可

  • Python探索之ModelForm代码详解

    这是一个神奇的组件,通过名字我们可以看出来,这个组件的功能就是把model和form组合起来,对,你没猜错,相信自己的英语水平. 先来一个简单的例子来看一下这个东西怎么用: 比如我们的数据库中有这样一张学生表,字段有姓名,年龄,爱好,邮箱,电话,住址,注册时间等等一大堆信息,现在让你写一个创建学生的页面,你的后台应该怎么写呢? 首先我们会在前端一个一个罗列出这些字段,让用户去填写,然后我们从后天一个一个接收用户的输入,创建一个新的学生对象,保存 其实,重点不是这些,而是合法性验证,我们需要在前端

  • python装饰器实例大详解

    一.作用域 在python中,作用域分为两种:全局作用域和局部作用域. 全局作用域是定义在文件级别的变量,函数名.而局部作用域,则是定义函数内部. 关于作用域,我们要理解两点: a.在全局不能访问到局部定义的变量 b.在局部能够访问到全局定义的变量,但是不能修改全局定义的变量(当然有方法可以修改) 下面我们来看看下面实例: x = 1 def funx(): x = 10 print(x) # 打印出10 funx() print(x) # 打印出1 如果局部没有定义变量x,那么函数内部会从内往

  • python中 logging的使用详解

    日志是用来记录程序在运行过程中发生的状况,在程序开发过程中添加日志模块能够帮助我们了解程序运行过程中发生了哪些事件,这些事件也有轻重之分. 根据事件的轻重可分为以下几个级别: DEBUG: 详细信息,通常仅在诊断问题时才受到关注.整数level=10 INFO: 确认程序按预期工作.整数level=20 WARNING:出现了异常,但是不影响正常工作.整数level=30 ERROR:由于某些原因,程序 不能执行某些功能.整数level=40 CRITICAL:严重的错误,导致程序不能运行.整数

  • python的mysqldb安装步骤详解

    python的mysqldb安装步骤详解 安装MySQLdb: 一. 什么是MySQLdb? 解释:MySQLdb是Python操作MySQL的一个接口包.这里要理解一个概念,python操作数据库,都是需要一个类似MySQLdb这样的中间层,这些中间层抽象了具体的实现,提供了统一的API供开发者使用. 二. 如何安装MySQLdb? python2环境下: sudo pip install MySQL-python. MySQL-python目前暂时还不支持python3,有些小问题,可以安装

  • python模块之re正则表达式详解

    一.简单介绍 正则表达式是一种小型的.高度专业化的编程语言,并不是python中特有的,是许多编程语言中基础而又重要的一部分.在python中,主要通过re模块来实现. 正则表达式模式被编译成一系列的字节码,然后由用c编写的匹配引擎执行.那么正则表达式通常有哪些使用场景呢? 比如为想要匹配的相应字符串集指定规则: 该字符串集可以是包含e-mail地址.Internet地址.电话号码,或是根据需求自定义的一些字符串集: 当然也可以去判断一个字符串集是否符合我们定义的匹配规则: 找到字符串中匹配该规

  • python魔法方法-自定义序列详解

    自定义序列的相关魔法方法允许我们自己创建的类拥有序列的特性,让其使用起来就像 python 的内置序列(dict,tuple,list,string等). 如果要实现这个功能,就要遵循 python 的相关的协议.所谓的协议就是一些约定内容.例如,如果要将一个类要实现迭代,就必须实现两个魔法方法:__iter__.next(python3.x中为__new__).__iter__应该返回一个对象,这个对象必须实现 next 方法,通常返回的是 self 本身.而 next 方法必须在每次调用的时

随机推荐