详解Python使用apscheduler定时执行任务

apscheduler 的使用

  我们项目中总是避免不了要使用一些定时任务,比如说最近的项目,用户点击报名考试以后需要在考试日期临近的时候推送小程序消息提醒到客户微信上,翻了翻 fastapi 中的实现,虽然方法和包也不少,但是要不就是太重了(比如需要再开服务,还要依赖 redis,都不好用),虽然也可以使用 time 模块的 time.sleep()机上 fastapi 的后台任务变相实现,但是相对简单的功能还行,复杂点的代码起来就麻烦了,所以还是专人专事找个负责这个额的包吧。找来找去发现 APScheduler 就挺适合,代码简单,实现效果也很好,这里做个记录!

安装

pip install apscheduler

主要组成部分

概念性东西,混个脸熟,代码比这些定义好理解。

触发器(trigger)包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。说人话就是你指定那种方式触发当前的任务。


类型


解释


DateTrigger


到期执行(到xxxx年x月x日 x时x分x秒执行) 对应DateTrigger


IntervalTrigger


间隔执行(每5秒执行一次)


CronTrigger


一个crontab类型的条件(这个比较复杂,比如周一到周四的4-5点每5秒执行一次)

作业存储(job store)存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。

Jobstore在scheduler中初始化,另外也可通过scheduler的add_jobstore动态添加Jobstore。每个jobstore
都会绑定一个alias,scheduler在Add Job时,根据指定的jobstore在scheduler中找到相应的jobstore,并
将job添加到jobstore中。

Jobstore主要是通过pickle库的loads和dumps【实现核心是通过python的__getstate__和__setstate__重写
实现】,每次变更时将Job动态保存到存储中,使用时再动态的加载出来,作为存储的可以是redis,也可以
是数据库【通过sqlarchemy这个库集成多种数据库】,也可以是mongodb等
目前APScheduler支持的Jobstore:

MemoryJobStore
MongoDBJobStore
RedisJobStore
RethinkDBJobStore
SQLAlchemyJobStore
ZooKeeperJobStore

执行器(executor)处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。

- 说人话就是添加任务时候用它来包装的,executor的种类会根据不同的调度来选择,如果选择AsyncIO作为调度的库,那么选择AsyncIOExecutor,如果选择tornado作为调度的库,选择TornadoExecutor,如果选择启动进程作为调度,选择ThreadPoolExecutor或者ProcessPoolExecutor都可以Executor的选择需要根据实际的scheduler来选择不同的执行器
目前APScheduler支持的Executor:
AsyncIOExecutor
GeventExecutor
ThreadPoolExecutor
ProcessPoolExecutor
TornadoExecutor
TwistedExecutor

调度器(scheduler)是其他的组成部分。你通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业.

Scheduler是APScheduler的核心,所有相关组件通过其定义。scheduler启动之后,将开始按照配置的任务进行调度。
除了依据所有定义Job的trigger生成的将要调度时间唤醒调度之外。当发生Job信息变更时也会触发调度。

scheduler可根据自身的需求选择不同的组件,如果是使用AsyncIO则选择AsyncIOScheduler,使用tornado则
选择TornadoScheduler。
目前APScheduler支持的Scheduler:

AsyncIOScheduler
BackgroundScheduler
BlockingScheduler
GeventScheduler
QtScheduler
TornadoScheduler
TwistedScheduler

简单应用

import time
from apscheduler.schedulers.blocking import BlockingScheduler # 引入后台

def my_job():
    print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=5)
sched.start()

完整代码

# trigeers 触发器
# job stores job 存储
# executors 执行器
# schedulers 调度器

from pytz import utc
from sqlalchemy import func

from apscheduler.schedulers.background import BackgroundScheduler,AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor

jobstores = {
    # 可以配置多个存储
    #'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')  # SQLAlchemyJobStore指定存储链接
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},     # 最大工作线程数20
    'processpool': ProcessPoolExecutor(max_workers=5)         # 最大工作进程数为5
}
job_defaults = {
    'coalesce': False,   # 关闭新job的合并,当job延误或者异常原因未执行时
    'max_instances': 3   # 并发运行新job默认最大实例多少
}
scheduler = BackgroundScheduler()

# .. do something else here, maybe add jobs etc.

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) # utc作为调度程序的时区

import os
import time

def print_time(name):
    print(f'{name} - {time.ctime()}')

def add_job(job_id, func, args, seconds):
    """添加job"""
    print(f"添加间隔执行任务job - {job_id}")
    scheduler.add_job(id=job_id, func=func, args=args, trigger='interval', seconds=seconds)

def add_coun_job(job_id, func, args, start_time):
    """添加job"""
    print(f"添加一次执行任务job - {job_id}")
    scheduler.add_job(id=job_id, func=func, args=args, trigger='date',timezone='Asia/Shanghai', run_date=start_time)
    # scheduler.add_job(func=print_time, trigger='date',timezone='Asia/Shanghai', run_date=datetime(2022, 2, 19, 17, 57, 0).astimezone(), args=['text2'])

def remove_job(job_id):
    """移除job"""
    scheduler.remove_job(job_id)
    print(f"移除job - {job_id}")

def pause_job(job_id):
    """停止job"""
    scheduler.pause_job(job_id)
    print(f"停止job - {job_id}")

def resume_job(job_id):
    """恢复job"""
    scheduler.resume_job(job_id)
    print(f"恢复job - {job_id}")

def get_jobs():
    """获取所有job信息,包括已停止的"""
    res = scheduler.get_jobs()
    print(f"所有job - {res}")

def print_jobs():
    print(f"详细job信息")
    scheduler.print_jobs()

def start():
    """启动调度器"""
    scheduler.start()

def shutdown():
    """关闭调度器"""
    scheduler.shutdown()

if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    # start()
    # print('Press Ctrl+{0} to exit \n'.format('Break' if os.name == 'nt' else 'C'))
    # add_job('job_A', func=print_time, args=("A", ), seconds=1)
    # add_job('job_B', func=print_time, args=("B", ), seconds=2)
    # time.sleep(6)
    # pause_job('job_A') # 停止a
    # get_jobs()   #得到所有job
    # time.sleep(6)
    # print_jobs()
    # resume_job('job_A')
    # time.sleep(6)
    # remove_job('job_A')
    # time.sleep(6)
    from datetime import datetime
    import pytz
    start()

    date_temp = datetime(2022, 2, 19, 17, 30, 5)
    # scheduler.add_job(print_time, 'date', run_date=datetime.now(pytz.timezone('America/Manaus')), args=['text'])
    # scheduler.add_job(print_time, 'date',timezone='Asia/Shanghai', run_date=datetime(2022, 2, 19, 17, 57, 0).astimezone(), args=['text2'])
    add_coun_job(job_id="job_C",func=print_time,args=('一次性执行任务',),start_time=datetime(2022, 2, 19, 18, 4, 0).astimezone())
    time.sleep(130)
    try:
        shutdown()
    except RuntimeError:
        pass

到此这篇关于详解Python使用apscheduler定时执行任务的文章就介绍到这了,更多相关Python apscheduler内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • python 基于Apscheduler实现定时任务

    导语 在工作场景遇到了这么一个场景,就是需要定期去执行一个缓存接口,用于同步设备配置.首先想到的就是Linux上的crontab,可以定期,或者间隔一段时间去执行任务.但是如果你想要把这个定时任务作为一个模块集成到Python项目中,或者想持久化任务,显然crontab不太适用.Python的APScheduler模块能够很好的解决此类问题,所以专门写这篇文章,从简单入门开始记录关于APScheduler最基础的使用场景,以及解决持久化任务的问题,最后结合其他框架深层次定制定时任务模块这几个点入

  • Python定时任务APScheduler原理及实例解析

    定时任务: 1. 线程睡眠函数 sleep() --粗暴!一直占有 CPU 资源,导致后续操作无法执行 2. threading.Timer(10, task, ()).start() # (间隔s,任务task, 函参) 3. import sched # 初始化 sched 模块的 scheduler 类 scheduler = sched.scheduler(time.time, time.sleep) # 增加调度任务 enter(delay, priority, action, arg

  • Python定时任务框架APScheduler原理及常用代码

    APScheduler简介 在平常的工作中几乎有一半的功能模块都需要定时任务来推动,例如项目中有一个定时统计程序,定时爬出网站的URL程序,定时检测钓鱼网站的程序等等,都涉及到了关于定时任务的问题,第一时间想到的是利用time模块的time.sleep()方法使程序休眠来达到定时任务的目的,虽然这样也可以,但是总觉得不是那么的专业,^_^所以就找到了python的定时任务模块APScheduler: APScheduler基于Quartz的一个Python定时任务框架,实现了Quartz的所有功

  • 详解Python利用APScheduler框架实现定时任务

    目录 背景 样例代码 代码详解 执行结果 知识点补充 背景 最近在做一些python工具的时候,常常会碰到定时器问题,总觉着使用threading.timer或者schedule模块非常不优雅.所以这里给自己做个记录,也分享一个定时任务框架APScheduler.具体的架构原理就不细说了,用个例子说明一下怎么简易的使用. 样例代码 先上样例代码,如下: #!/user/bin/env python # coding=utf-8 """ @project : csdn @aut

  • python定时任务apscheduler的详细使用教程

    目录 前言 安装 主要组成部分 简单应用 完整代码 总结 前言 我们项目中总是避免不了要使用一些定时任务,比如说最近的项目,用户点击报名考试以后需要在考试日期临近的时候推送小程序消息提醒到客户微信上,翻了翻 fastapi 中的实现,虽然方法和包也不少,但是要不就是太重了(比如需要再开服务,还要依赖 redis,都不好用),虽然也可以使用 time 模块的 time.sleep()机上 fastapi 的后台任务变相实现,但是相对简单的功能还行,复杂点的代码起来就麻烦了,所以还是专人专事找个负责

  • Python定时任务工具之APScheduler使用方式

    APScheduler (advanceded python scheduler)是一款Python开发的定时任务工具. 文档地址 apscheduler.readthedocs.io/en/latest/u- 特点: 不依赖于Linux系统的crontab系统定时,独立运行 可以 动态添加 新的定时任务,如下单后30分钟内必须支付,否则取消订单,就可以借助此工具(每下一单就要添加此订单的定时任务) 对添加的定时任务可以做持久保存 1 安装 pip install apscheduler 2 组

  • Python使用APScheduler实现定时任务过程解析

    前言 APScheduler是基于Quartz的一个Python定时任务框架.提供了基于日期.固定时间间隔以及crontab类型的任务,并且可以持久化任务. 在线文档:https://apscheduler.readthedocs.io/en/latest/userguide.html 一.安装APScheduler pip install apscheduler 二.基本概念 APScheduler有四大组件: 1.触发器 triggers : 触发器包含调度逻辑.每个作业都有自己的触发器,用

  • Python定时任务APScheduler安装及使用解析

    1.简介 APScheduler是一个 Python 定时任务框架,使用起来十分方便.提供了基于日期.固定时间间隔以及 crontab 类型的任务,并且可以持久化任务.并以 daemon 方式运行应用. 2.APScheduler四个组件 APScheduler 四个组件分别为:触发器(trigger),作业存储(job store),执行器(executor),调度器(scheduler). 触发器(trigger) 包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行.除

  • Python定时任务APScheduler的实例实例详解

    APScheduler 支持三种调度任务:固定时间间隔,固定时间点(日期),Linux 下的 Crontab 命令.同时,它还支持异步执行.后台执行调度任务. 一.基本架构 触发器 triggers:设定触发任务的条件 描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发 任务存储器 job stores:存放任务,可以放内存(默认)或数据库 注:调度器之间不能共享任务存储器 执行器 executors:用于执行任务,可设定执行模式 将指定的作业提交到线程池或者进程

  • 详解Python使用apscheduler定时执行任务

    apscheduler 的使用   我们项目中总是避免不了要使用一些定时任务,比如说最近的项目,用户点击报名考试以后需要在考试日期临近的时候推送小程序消息提醒到客户微信上,翻了翻 fastapi 中的实现,虽然方法和包也不少,但是要不就是太重了(比如需要再开服务,还要依赖 redis,都不好用),虽然也可以使用 time 模块的 time.sleep()机上 fastapi 的后台任务变相实现,但是相对简单的功能还行,复杂点的代码起来就麻烦了,所以还是专人专事找个负责这个额的包吧.找来找去发现

  • 详解python定时简单爬取网页新闻存入数据库并发送邮件

    本人小白一枚,简单记录下学校作业项目,代码十分简单,主要是对各个库的理解,希望能给别的初学者一点启发. 一.项目要求 1.程序可以从北京工业大学首页上爬取新闻内容:http://www.bjut.edu.cn 2.程序可以将爬取下来的数据写入本地MySQL数据库中. 3.程序可以将爬取下来的数据发送到邮箱. 4.程序可以定时执行. 二.项目分析 1.爬虫部分利用requests库爬取html文本,再利用bs4中的BeaultifulSoup库来解析html文本,提取需要的内容. 2.使用pymy

  • 详解Python 多线程 Timer定时器/延迟执行、Event事件

    Timer继承子Thread类,是Thread的子类,也是线程类,具有线程的能力和特征.这个类用来定义多久执行一个函数. 它的实例是能够延迟执行目标函数的线程,在真正执行目标函数之前,都可以cancel它. Timer源码: class Timer(Thread): def __init__(self, interval, function, args=None, kwargs=None): Thread.__init__(self) self.interval = interval self.

  • 一文详解Python定时任务触发

    目录 1.新建调度器schedulers 2.添加调度任务trigger 3.运行调度任务 4.特点,其他操作 APScheduler APScheduler 四个组件分别为: 调度器(scheduler).触发器(trigger),作业存储(job store),执行器(executor) 安装命令: pip install setuptools pip install --ignore-installed apscheduler 1.新建调度器schedulers BlockingSched

  • 详解python日志输出使用配置文件格式

    python脚本日志输出使用配置文件的形式,不需要在每个脚本里面配置日志. 需求简述: 如我要写2个脚本(a.py和b.py),a.py日志输出到/var/log/a.log,b.py日志输出到/var/log/b.log,并且日志按日期切割.如果每个脚本都去配置一遍日志的话,浪费时间也不利于后期维护. 现在我要使用配置文件的格式去统一管理python脚本的代码日志输出,后续所有python脚本日志都在这个配置文件里面配置,脚本读取.方便后续维护和增加脚本的可读性. 需求实现: 我配置文件路径及

  • 详解Python如何制作自动发送微信的程序

    目录 前言 模块安装和导入 pyautogui apscheduler 完整代码 结果 前言 事情是这样的:今天晚上,女朋友让我十二点催她睡觉. 不过,可是我实在太困了,熬不下去…… 是吧?女朋友哪有睡觉重要? 但,女朋友的命令,我是不敢违抗的…… 但是睡觉也不能缺! 这时候我们该怎么办呢?是时候让Python登场了! 模块安装和导入 这次我们来做一个自动发送微信的程序,在深夜十二点的时候给女朋友发去消息,也算是尽了一个男朋友的义务了. 我们需要两个模块:apscheduler,pyautogu

  • 详解Python实现多进程异步事件驱动引擎

    本文介绍了详解Python实现多进程异步事件驱动引擎,分享给大家,具体如下: 多进程异步事件驱动逻辑 逻辑 code # -*- coding: utf-8 -*- ''' author: Jimmy contact: 234390130@qq.com file: eventEngine.py time: 2017/8/25 上午10:06 description: 多进程异步事件驱动引擎 ''' __author__ = 'Jimmy' from multiprocessing import

  • 详解Python import方法引入模块的实例

    详解Python import方法引入模块的实例 在Python用import或者from-import或者from-import-as-来导入相应的模块,作用和使用方法与C语言的include头文件类似.其实就是引入某些成熟的函数库和成熟的方法,避免重复造轮子,提高开发速度. python的import方法可以引入系统的模块,也可以引入我们自己写好的共用模块,这点和PHP非常相似,但是它们的具体细节还不是很一样.因为php是在引入的时候指明引入文件的具体路径,而python中不能够写文件路径进

  • 详解Python 模拟实现生产者消费者模式的实例

    详解Python 模拟实现生产者消费者模式的实例 散仙使用python3.4模拟实现的一个生产者与消费者的例子,用到的知识有线程,队列,循环等,源码如下: Python代码 import queue import time import threading import random q=queue.Queue(5) #生产者 def pr(): name=threading.current_thread().getName() print(name+"线程启动......") for

随机推荐