5分钟快速掌握Python定时任务框架的实现

APScheduler 简介

在实际开发中我们经常会碰上一些重复性或周期性的任务,比如像每天定时爬取某个网站的数据、一定周期定时运行代码训练模型等,类似这类的任务通常需要我们手动来进行设定或调度,以便其能够在我们设定好的时间内运行。

在 Windows 上我们可以通过计划任务来手动实现,而在 Linux 系统上往往我们会用到更多关于 crontab 的相关操作。但手动管理并不是一个很好的选择,如果我们需要有十几个不同的定时任务需要管理,那么每次通过人工来进行干预未免有些笨拙,那这时候就真的是「人工智能」了。

所以将这些定时任务的调度代码化才是能够让我们很好地从这种手动管理的纯人力操作中解脱出来。

在 Python 生态中对于定时任务的一些操作主要有那么几个:

  • schedule:第三方模块,该模块适合比较轻量级的一些调度任务,但却不适用于复杂时间的调度
  • APScheduler:第三方定时任务框架,是对 Java 第三方定时任务框架 Quartz 的模仿与移植,能提供比 schedule 更复杂的应用场景,并且各种组件都是模块化,易于使用与二次开发。
  • Celery Beat:属于 celery 这分布式任务队列第三方库下的一个定时任务组件,如果使用需要配合 RabbitMQ 或 Redis 这类的消息队列套件,需要花费一定的时间在环境搭建上,但在高版本中已经不支持 Windows。

所以为了满足能够相对复杂的时间条件,又不需要在前期的环境搭建上花费很多时间的前提下,选择 APScheduler 来对我们的调度任务或定时任务进行管理是个性价比极高的选择。而本文主要会带你快速上手有关 APScheduler 的使用。

APScheduler 概念与组件

虽然说官方文档上的内容不是很多,而且所列举的 API 不是很多,但这侧面也反映了这一框架的简单易用。所以在使用 APScheduler 之前,我们需要对这个框架的一些概念简单了解,主要有那么以下几个:

  • 触发器(trigger)
  • 任务持久化(job stores)
  • 执行器(executor)
  • 调度器(scheduler)

触发器(trigger)

所谓的触发器就是用以触发定时任务的组件,在 APScheduler 中主要是指时间触发器,并且主要有三类时间触发器可供使用:

  • date:日期触发器。日期触发器主要是在某一日期时间点上运行任务时调用,是 APScheduler 里面最简单的一种触发器。所以通常也适用于一次性的任务或作业调度。
  • interval:间隔触发器。间隔触发器是在日期触发器基础上扩展了对时间部分,比如时、分、秒、天、周这几个部分的设定。是我们用以对重复性任务进行设定或调度的一个常用调度器。设定了时间部分之后,从起始日期开始(默认是当前)会按照设定的时间去执行任务。
  • cron:cron 表达式触发器。cron 表达式触发器就等价于我们 Linux 上的 crontab,它主要用于更复杂的日期时间进行设定。但需要注意的是,APScheduler 不支持 6 位及以上的 cron 表达式,最多只支持到 5 位。

任务持久化(job stores)

任务持久化主要是用于将设定好的调度任务进行存储,即便是程序因为意外情况,如断电、电脑或服务器重启时,只要重新运行程序时,APScheduler 就会根据对存储好的调度任务结果进行判断,如果出现已经过期但未执行的情况会进行相应的操作。

APScheduler 为我们提供了多种持久化任务的途径,默认是使用 memory 也就是内存的形式,但内存并不是持久化最好的方式。最好的方式则是通过像数据库这样的载体来将我们的定时任务写入到磁盘当中,只要磁盘没有损坏就能将数据给恢复。

APScheduler 支持的且常用的数据库主要有:

  • sqlalchemy 形式的数据库,这里就主要是指各种传统的关系型数据库,如 MySQL、PostgreSQL、SQLite 等。
  • mongodb 非结构化的 Mongodb 数据库,该类型数据库经常用于对非结构化或版结构化数据的存储或操作,如 JSON。
  • redis 内存数据库,通常用作数据缓存来使用,当然通过一些主从复制等方式也能实现当中数据的持久化或保存。

通常我们可以在创建 Scheduler 实例时创建,或是单独为任务指定。配置的方式相对简单,我们只需要指定对应的数据库链接即可。

执行器(executor)

执行器顾名思义就是执行我们任务的对象,在计算机内通常要么是 CPU 调度任务,要么是单独维护一个线程来运行任务。所以 APScheduler 里的执行器通常就是 ThreadPoolExecutor 或 ProcessPoolExecutor 这样的线程池和进程池两种。

当然如果是和协程或异步相关的任务调度,还可以使用对应的 AsyncIOExecutor、TwistedExecutor 和 GeventExecutor 三种执行器。

调度器(scheduler)

调度器的选择主要取决于你当前的程序环境以及 APScheduler 的用途。根据用途的不同,APScheduler 又提供了以下几种调度器:

  • BlockingScheduler:阻塞调度器,当程序中没有任何存在主进程之中运行东西时,就则使用该调度器。
  • BackgroundScheduler:后台调度器,在不使用后面任何的调度器且希望在应用程序内部运行时的后台启动时才进行使用,如当前你已经开启了一个 Django 或 Flask 服务。
  • AsyncIOScheduler:AsyncIO 调度器,如果代码是通过 asyncio 模块进行异步操作,使用该调度器。
  • GeventScheduler:Gevent 调度器,如果代码是通过 gevent 模块进行协程操作,使用该调度器
  • TornadoScheduler:Tornado 调度器,在 Tornado 框架中使用
  • TwistedScheduler:Twisted 调度器,在基于 Twisted 的框架或应用程序中使用
  • QtScheduler:Qt 调度器,在构建 Qt 应用中进行使用。

通常情况下如果不是和 Web 项目或应用集成共存,那么往往都首选 BlockingScheduler 调度器来进行操作,它会在当前进程中启动相应的线程来进行任务调度与处理;反之,如果是和 Web 项目或应用共存,那么需要选择 BackgroundScheduler 调度器,因为它不会干扰当前应用的线程或进程状况。

基于对以上的概念和组件认识,我们就能基本上摸清 APScheduler 的运行流程:

  • 设定调度器(scheduler)用以对任务的调度与安排进行全局统筹
  • 对相应的函数或方法上设定相应的触发器(trigger),并添加到调度器中
  • 如有任务持久化(job stores)需要则需要设定对应的持久化层,否则默认使用内存存储任务
  • 当触发器被触发时,就将任务交由执行器(executor)进行执行

APScheduler 快速上手

虽然 APScheduler 里面的概念和组件看起来有点多,但在使用上并不算很复杂,我们可以通过本节的示例就能够很快使用。

选择对应的 scheduler

在使用之前我们需要先实例化一个 scheduler 对象,所有的 scheduler 对象都被放在了 apscheduler.schedulers 模块下,我们可以直接通过查看 API 文档或者借助 IDE 补全的提示来获取相应的 scheduler 对象。

这里我直接选取了最基础的 BlockingScheduler:

# main.py

from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

配置 scheduler

对于 scheduler 的一些配置我们可以直接在实例化对象时就进行配置,当然也可以在创建实例化对象之后再进行配置。

实例化时进行参数配置:

# main.py
from datetime import datetime

from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler

# 任务持久化 使用 SQLite
jobstores = {
  'default': SQLAlchemyJobStore(url = 'sqlite:///jobs.db')
}
# 执行器配置
executors = {
  'default': ThreadPoolExecutor(20),
}
# 关于 Job 的相关配置,见官方文档 API
job_defaults = {
  'coalesce': False,
  'next_run_time': datetime.now()
}
scheduler = BlockingScheduler(
 jobstores = jobstores,
 executors = executors,
 job_defaults = job_defaults,
 timezone = 'Asia/Shanghai'
)

或是通过 scheduler.configure 方法进行同样的操作:

scheduler = BlockingScheduler()
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone='Asia/Shanghai')

添加并执行你的任务

创建 scheduler 对象之后,我们需要调用其下的 add_job() 或是 scheduled_job() 方法来将我们需要执行的函数进行注册。前者是以传参的形式指定对应的函数名,而后者则是以装饰器的形式直接对我们要执行的函数进行修饰。

比如我现在有一个输出此时此刻时间的函数 now():

from datetime import datetime

def now(trigger):
  print(f"trigger:{trigger} -> {datetime.now()}")

然后我打算每 5 秒的时候运行一次,那我们使用 add_job() 可以这样写:

if __name__ == '__main__':
  scheduler.add_job(now, trigger = "interval", args = ("interval",), seconds = 5)
  scheduler.start()

在调用 start() 方法之后调度器就会开始执行,并在控制台上看到对应的结果了:

trigger:interval -> 2021-01-16 21:19:43.356674
trigger:interval -> 2021-01-16 21:19:46.679849
trigger:interval -> 2021-01-16 21:19:48.356595

当然使用 @scheduled_job 的方式来装饰我们的任务或许会更加自由一些,于是上面的例子就可以写成这样:

@scheduler.scheduled_job(trigger = "interval", args = ("interval",), seconds = 5)
def now(trigger):
  print(f"trigger:{trigger} -> {datetime.now()}")

if __name__ == '__main__':
  scheduler.start()

运行之后就会在控制台看到同样的结果了。

不过需要注意的是,添加任务一定要在 start() 方法执行前调用,否则会找不到任务或是抛出异常。

将 APScheduler 集成到 Web 项目中

如果你是正在做有关的 Web 项目且存在一些定时任务,那么得益于 APScheduler 由于多样的调度器,我们能够将其和我们的项目结合到一起。

如果你正在使用 Flask,那么 Flask-APScheduler 这一别人写好的第三方包装库就很适合你,虽然它没有相关的文档,但只要你了解了前面我所介绍的有关于 APScheduler 的概念和组件,你就能很轻易地看懂这个第三方库仓库里的示例代码。

如果你使用的不是 Flask 框架,那么 APScheduler 本身也提供了一些对任务或作业的增删改查操作,我们可以自己编写一套合适的 API。

这里我使用的是 FastAPI 这一目前流行的 Web 框架。demo 项目结构如下:

temp-scheduler
├── config.py    # 配置项
├── main.py     # API 文件
└── scheduler.py  # APScheduler 相关设置

安装依赖

这里我们需要的依赖不多,只需要简单几个即可:

pip install fastapi apscheduler sqlalchemy uvicorn

配置项

如果项目中模块过多,那么使用一个文件或模块来进行统一管理是最好的选择。这里的 config.py 我们主要像 Flask 的配置那样简单设定:

from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler

class SchedulerConfig:

  JOBSTORES = {"default": SQLAlchemyJobStore(url="sqlite:///job.db")}
  EXECUTORS = {"default": ThreadPoolExecutor(20)}
  JOB_DEFAULTS = {"coalesce": False}

  @classmethod
  def to_dict(cls):
    return {
      "jobstores": cls.JOBSTORES,
      "executors": cls.EXECUTORS,
      "job_defaults": cls.JOB_DEFAULTS,
    }

在 SchedulerConfig 配置项中我们可以自己实现一个 to_dict() 类方法,以便我们后续传参时通过解包的方式直接传入配置参数即可。

Scheduler 相关设置

scheduler.py 模块的设定也比较简单,即设定对应的 scheduler 调度器即可。由于是演示 demo 我还将要定期执行的任务也放在了这个模块当中:

import logging
from datetime import datetime

from apscheduler.schedulers.background import BackgroundScheduler

from config import SchedulerConfig

scheduler = BackgroundScheduler()
logger = logging.getLogger(__name__)

def init_scheduler() -> None:
  # config scheduler
  scheduler.configure(**SchedulerConfig.to_dict())

  logger.info("scheduler is running...")

  # schedule test
  scheduler.add_job(
    func=mytask,
    trigger="date",
    args=("APScheduler Initialize.",),
    next_run_time=datetime.now(),
  )
  scheduler.start()

def mytask(message: str) -> None:
  print(f"[{datetime.now()}] message: {message}")

在这一部分中:

  • init_scheduler() 方法主要用于在 API 服务启动时被调用,然后对 scheduler 对象的配置以及测试
  • mytask() 则是我们要定期执行的任务,后续我们可以通过 APScheduler 提供的方法来自行添加任务

API 设置

在 main.py 模块就主要存放着我们由 FastAPI 所构建的相关 API。如果在后续开发时存在多个接口,此时就需要将不同接口放在不同模块文件中,以达到路由的分发与管理,类似于 Flask 的蓝图模式。

import logging
import uuid
from datetime import datetime
from typing import Any, Dict, Optional, Sequence, Union

from fastapi import FastAPI
from pydantic import BaseModel

from scheduler import init_scheduler, mytask, scheduler

logger = logging.getLogger(__name__)

app = FastAPI(title="APScheduler API")
app.add_event_handler("startup", init_scheduler)

class Job(BaseModel):
  id: Union[int, str, uuid.UUID]
  name: Optional[str] = None
  func: Optional[str] = None
  args: Optional[Sequence[Optional[str]]] = None
  kwargs: Optional[Dict[str, Any]] = None
  executor: Optional[str] = None
  misfire_grace_time: Optional[str] = None
  coalesce: Optional[bool] = None
  max_instances: Optional[int] = None
  next_run_time: Optional[Union[str, datetime]] = None

@app.post("/add")
def add_job(
  message: str,
  trigger: str,
  trigger_args: Optional[dict],
  id: Union[str, int, uuid.UUID],
):
  try:
    scheduler.add_job(
      func=mytask,
      trigger=trigger,
      kwargs={"message": message},
      id=id,
      **trigger_args,
    )
  except Exception as e:
    logger.exception(e.args)
    return {"status_code": 0, "message": "添加失败"}
  return {"status_code": 1, "message": "添加成功"}

@app.delete("/delete/{id}")
def delete_job(id: Union[str, int, uuid.UUID]):
  """delete exist job by id"""
  try:
    scheduler.remove_job(job_id=id)
  except Exception:
    return dict(
      message="删除失败",
      status_code=0,
    )
  return dict(
    message="删除成功",
    status_code=1,
  )

@app.put("/reschedule/{id}")
def reschedule_job(
  id: Union[str, int, uuid.UUID], trigger: str, trigger_args: Optional[dict]
):
  try:
    scheduler.reschedule_job(job_id=id, trigger=trigger, **trigger_args)
  except Exception as e:
    logger.exception(e.args)
    return dict(
      message="修改失败",
      status_code=0,
    )
  return dict(
    message="修改成功",
    status_code=1,
  )

@app.get("/job")
def get_all_jobs():
  jobs = None
  try:
    job_list = scheduler.get_jobs()
    if job_list:
      jobs = [Job(**task.__getstate__()) for task in job_list]
  except Exception as e:
    logger.exception(e.args)
    return dict(
      message="查询失败",
      status_code=0,
      jobs=jobs,
    )
  return dict(
    message="查询成功",
    status_code=1,
    jobs=jobs,
  )

@app.get("/job/{id}")
def get_job_by_id(id: Union[int, str, uuid.UUID]):
  jobs = []
  try:
    job = scheduler.get_job(job_id=id)
    if job:
      jobs = [Job(**job.__getstate__())]
  except Exception as e:
    logger.exception(e.args)
    return dict(
      message="查询失败",
      status_code=0,
      jobs=jobs,
    )
  return dict(
    message="查询成功",
    status_code=1,
    jobs=jobs,
  )

以上代码看起来很多,其实核心的就那么几点:

FastAPI 对象 app 的初始化。这里用到的 add_event_handler() 方法就有点像 Flask 中的 before_first_request,会在 Web 服务请求伊始进行操作,理解为初始化相关的操作即可。

API 接口路由。路由通过 app 对象下的对应 HTTP 方法来实现,如 GET、POST、PUT 等。这里的装饰器用法其实也和 Flask 很类似,就不多赘述。

scheduler 对象的增删改查。从 scheduler.py 模块中引入我们创建好的 scheduler 对象之后就可以直接用来做增删改查的操作:

  • 增:使用 add_job() 方法,其主要的参数是要运行的函数(或方法)、触发器以及触发器参数等
  • 删:使用 delete_job() 方法,我们需要传入一个对应任务的 id 参数,用以能够查找到对应的任务
  • 改:使用 reschedule_job() 方法,这里也需要一个对应任务的 id 参数,以及需要重新修改的触发器及其参数
  • 查:使用 get_jobs() 和 get_job() 两个方法,前者是直接获取到当前调度的所有任务,返回的是一个包含了 APScheduler.job.Job 对象的列表,而后者是通过 id 参数来查找对应的任务对象;这里我通过底层源码使用 __getstate__() 来获取到任务的相关信息,这些信息我们通过事先设定好的 Job 对象来对其进行序列化,最后将信息从接口中返回。

运行

完成以上的所有操作之后,我们就可以打开控制台,进入到该目录下并激活我们的虚拟环境,之后运行:

uvicorn main:app 

之后我们就能在 FastAPI 默认的地址 http://127.0.0.1:8000/docs  中看到关于全部接口的 Swagger 文档页面了:

fastapi 集成的 swagger 页面

之后我们可以直接在文档里面或使用 Postman 来自己进行接口测试即可。

结尾

本文介绍了有关于 APScheduler 框架的概念及其用法,并进行了简单的实践。

得益于 APScheduler 的模块化设计才可以让我们更方便地去理解、使用它,并将其运用到我们实际的开发过程中。

从 APScheduler 目前的 Github 仓库代码以及 issue 来看,作者已经在开始重构 4.0 版本,当中的一些源代码和 API 也有较大的变动,相信在 4.0 版本中将会引入更多的新特性。

但如果现阶段你正打算使用或已经使用 APScheduler 用于实际生产中,那么希望本文能对会你有所帮助。

到此这篇关于5分钟快速掌握Python定时任务框架的实现的文章就介绍到这了,更多相关Python 定时任务内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Python实现定时任务

    Python下实现定时任务的方式有很多种方式.下面介绍几种 循环sleep: 这是一种最简单的方式,在循环里放入要执行的任务,然后sleep一段时间再执行.缺点是,不容易控制,而且sleep是个阻塞函数. def timer(n): ''''' 每n秒执行一次 ''' while True: print time.strftime('%Y-%m-%d %X',time.localtime()) yourTask() # 此处为要执行的任务 time.sleep(n) threading的Time

  • 解决Python中定时任务线程无法自动退出的问题

    python的线程有一个类叫Timer可以,用来创建定时任务,但是它的问题是只能运行一次,如果要重复执行,则只能在任务中再调用一次timer,但这样就存在新的问题了,就是在主进程退出后,不能正常退出子线程. from threading import Timer def scheduletaskwrap(): pritn "in task" Timer(10, scheduletaskwrap).start() Timer(10, scheduletaskwrap).start() 象

  • python Celery定时任务的示例

    本文介绍了python Celery定时任务的示例,分享给大家,具体如下: 配置 启用Celery的定时任务需要设置CELERYBEAT_SCHEDULE . Celery的定时任务都由celery beat来进行调度.celery beat默认按照settings.py之中的时区时间来调度定时任务. 创建定时任务 一种创建定时任务的方式是配置CELERYBEAT_SCHEDULE: #每30秒调用task.add from datetime import timedelta CELERYBEA

  • Python3自动签到 定时任务 判断节假日的实例

    不废话,直接上代码Python3.6 签到代码,只需修改url,账号,密码即可,此处是登录时无验证登录,有验证码的自行补充 # -*- coding:utf-8 -*- import json import urllib.request import datetime # 模拟浏览器打开网站 browser = webdriver.Chrome() browser.get('http://**.**.121.54/') # 将窗口最大化 browser.maximize_window() # 根

  • Python定时任务随机时间执行的实现方法

    背景: 有一个爬虫服务,需要定时从公开网站上拉取一些数据,为了避免被识别为爬虫(防爬虫的识别需要根据很多特征,时间仅仅是其中一个维度),需要在指定的时间内,随机生成一个时间爬取 脚本是python写的,直接上代码... import logging import traceback from datetime import datetime from apscheduler.schedulers.background import BackgroundScheduler scheduler =

  • 对Python定时任务的启动和停止方法详解

    在python中我们可以使用APScheduler进行定时任务. APScheduler的具体编码这里就不介绍了.主要说下在终端中启动和停止任务. 一.运行计划任务的python脚本 如果我们在终端中直接执行的话,关闭终端窗口,Python任务就会中断,Python进程会被杀死,程序将停止运行.可以使用如下命令运行python脚本, python apschedulerscript.py & 这样执行后及时关闭终端窗口,程序依旧运行. 二.停止计划任务的Python脚本 如何停止呢,可使用如下方

  • Python3实现定时任务的四种方式

    最近做一个小程序开发任务,主要负责后台部分开发:根据项目需求,需要实现三个定时任务: 1>定时更新微信token,需要2小时更新一次: 2>商品定时上线: 3>定时检测后台服务是否存活: 使用Python去实现这三个任务,这里需要使用定时相关知识点: Python实现定点与定时任务方式比较多,找到下面四中实现方式,每个方式都有自己应用场景:下面来快速介绍Python中常用的定时任务实现方式: 1>循环+sleep: 2>线程模块中Timer类: 3>schedule模块

  • 详解使用python crontab设置linux定时任务

    熟悉linux的朋友应该知道在linux中可以使用crontab设置定时任务.可以通过命令crontab -e编写任务.当然也可以直接写配置文件设置任务. 但是有时候希望通过脚本自动设置,比如我们应用程序部署时等.有需求当然就得想办法解决,不然在程序猿界混(一群自得其乐的猿). 下面进入正题,开始想通过以写文件的形式设置,通过在配置文件中直接追加一行即可.但是读写文件难免有点繁琐,再比如:设置任务时要检查任务是否已经存在:根据输入参数设置相应的任务等.以读写文件难免不太合适.所以想到了"万能&q

  • 5分钟快速掌握Python定时任务框架的实现

    APScheduler 简介 在实际开发中我们经常会碰上一些重复性或周期性的任务,比如像每天定时爬取某个网站的数据.一定周期定时运行代码训练模型等,类似这类的任务通常需要我们手动来进行设定或调度,以便其能够在我们设定好的时间内运行. 在 Windows 上我们可以通过计划任务来手动实现,而在 Linux 系统上往往我们会用到更多关于 crontab 的相关操作.但手动管理并不是一个很好的选择,如果我们需要有十几个不同的定时任务需要管理,那么每次通过人工来进行干预未免有些笨拙,那这时候就真的是「人

  • Python定时任务框架APScheduler安装使用详解

    目录 前言 一.APscheduler简介 二.APscheduler安装 三.APscheduler组成部分 1.Job 作业 2.Trigger 触发器 3.Jobstore 作业存储 4.Executor 执行器 5.scheduler 调度器 四.Scheduler工作流程图 1.Scheduler添加job流程 2.Scheduler调度流程 五.APscheduler使用 1.简单应用 2.操作作业 2.1 date触发器 2.2 interval触发器 2.3 cron触发器 参考

  • Python定时任务框架APScheduler原理及常用代码

    APScheduler简介 在平常的工作中几乎有一半的功能模块都需要定时任务来推动,例如项目中有一个定时统计程序,定时爬出网站的URL程序,定时检测钓鱼网站的程序等等,都涉及到了关于定时任务的问题,第一时间想到的是利用time模块的time.sleep()方法使程序休眠来达到定时任务的目的,虽然这样也可以,但是总觉得不是那么的专业,^_^所以就找到了python的定时任务模块APScheduler: APScheduler基于Quartz的一个Python定时任务框架,实现了Quartz的所有功

  • Python中定时任务框架APScheduler的快速入门指南

    前言 大家应该都知道在编程语言中,定时任务是常用的一种调度形式,在Python中也涌现了非常多的调度模块,本文将简要介绍APScheduler的基本使用方法. 一.APScheduler介绍 APScheduler是基于Quartz的一个python定时任务框架,实现了Quartz的所有功能,使用起来十分方便.提供了基于日期.固定时间间隔以及crontab类型的任务,并且可以持久化任务. APScheduler提供了多种不同的调度器,方便开发者根据自己的实际需要进行使用:同时也提供了不同的存储机

  • 详解Python 定时框架 Apscheduler原理及安装过程

    在我们的日常工作自动化测试当中,几乎超过一半的功能都需要利用定时的任务来推动触发,例如在我们项目中有一个定时监控模块,根据自己设置的频率定时跑测试用例,定时检测是否存在线上紧急任务等等,这些都涉及到了有关定时任务的问题,很多情况下,大多数人会选择window的任务计划程序,但如果程序不在window平台下运行,就不能定时启动了:当然也可利用time模块的time.sleep()方法使程序休眠来达到定时任务的目的,但定时任务多了,代码可能看起来不太那么友好且有很大的局限性,因此,此时的 Apsch

  • Python定时任务APScheduler安装及使用解析

    1.简介 APScheduler是一个 Python 定时任务框架,使用起来十分方便.提供了基于日期.固定时间间隔以及 crontab 类型的任务,并且可以持久化任务.并以 daemon 方式运行应用. 2.APScheduler四个组件 APScheduler 四个组件分别为:触发器(trigger),作业存储(job store),执行器(executor),调度器(scheduler). 触发器(trigger) 包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行.除

  • Python flask框架定时任务apscheduler应用介绍

    目录 基本使用 trigger启动方式 cron启动方式 使用装饰器定时启动任务 flask-apscheduler将apscheduler移植到了flask应用中,使得在flask中可以非常方便的使用定时任务了,除此之外,它还有如下几个特性 根据Flask配置加载调度器配置 根据Flask配置加载任务调度器 允许指定服务器运行任务 提供RESTful API管理任务,也就是远程管理任务 为RESTful API提供认证 下载安装 pip install flask-apscheduler 基本

  • python flask框架快速入门

    Flask 本身相当于一个内核,比如可以用 Flask 扩展加入ORM.窗体验证工具,文件上传.身份验证等.Flask 没有默认使用的数据库,你可以选择 MySQL,也可以用 NoSQL. 其 WSGI 工具箱采用 Werkzeug(路由模块),模板引擎则使用 Jinja2.这两个也是 Flask 框架的核心. Flask常用扩展包: Flask-SQLalchemy:操作数据库: Flask-script:插入脚本: Flask-migrate:管理迁移数据库: Flask-Session:S

  • python Django框架快速入门教程(后台管理)

    Python下有许多款不同的 Web 框架.Django是重量级选手中最有代表性的一位.许多成功的网站和APP都基于Django. Django 是一个开放源代码的 Web 应用框架,由 Python 写成. Django 遵守 BSD 版权,初次发布于 2005 年 7 月, 并于 2008 年 9 月发布了第一个正式版本 1.0 . Django 采用了 MVT 的软件设计模式,即模型(Model),视图(View)和模板(Template). 参考官方文档:Django官方文档https:

  • Python+ChatGPT实现5分钟快速上手编程

    目录 1.chatGPT是个啥 2.chatGPT怎么注册 3.chatGPT怎么用 4.小结 最近一段时间chatGPT火爆出圈!无论是在互联网行业,还是其他各行业都赚足了话题. 俗话说:“外行看笑话,内行看门道”,今天从chatGPT个人体验感受以及如何用的角度来分享一下. 1.chatGPT是个啥 chatGPT是最近新出来的玩意?并不是!在国内,chatGPT最早是在2022年11月就由OpenAI于推出的.只是去年底火了一把,后力不足又遇春节,热度草草就结束了. 先讲一下,OpenAI

随机推荐