Python flask框架定时任务apscheduler应用介绍

目录
  • 基本使用
  • trigger启动方式
  • cron启动方式
  • 使用装饰器定时启动任务

flask-apschedulerapscheduler移植到了flask应用中,使得在flask中可以非常方便的使用定时任务了,除此之外,它还有如下几个特性

  • 根据Flask配置加载调度器配置
  • 根据Flask配置加载任务调度器
  • 允许指定服务器运行任务
  • 提供RESTful API管理任务,也就是远程管理任务
  • RESTful API提供认证

下载安装

pip install flask-apscheduler

基本使用

flask-apscheduler的相关配置,我们会将它和其它扩展一起,放在应用的配置里

class Config(object):
    // 配置项
    JOBS = [
        {
            'id': 'job1',
            'func': 'run:add',
            'args': (1, 2),
            'trigger': 'interval',
            'seconds': 3
        }
    ]
    SCHEDULER_API_ENABLED = True
def add(a, b):
    print(a+b)

JOBS列表的每一个元素表示一个定时任务,列子中只有一个interval任务,表示每隔3秒运行一次函数add。func指定调用的函数,args表示传入函数的参数,trigger表示启动方式,常用的有两种,分别是trigger和cron。

上边我们设置了SCHEDULER_API_ENABLED = True,可以通过访问http://127.0.0.1:5000/scheduler,其中scheduler是默认的RESTful API前缀

通过查看源码,可以发现flask-apscheduler提供了以下的接口

def _load_api(self):
    """
    Add the routes for the scheduler API.
    """
    self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET')
    self._add_url_route('add_job', '/jobs', api.add_job, 'POST')
    self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET')
    self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET')
    self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE')
    self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH')
    self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST')
    self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST')
    self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')

如果需要查看当前运行的所有定时任务,则请求http://127.0.0.1:5000/scheduler/jobs即可。

trigger启动方式

trigger表示间隔启动,在trigger方式中,使用seconds配置间隔多久启动一次,单位是秒。

cron启动方式

cron表示定时启动

class Config(object):
    JOBS = [
        {
            'id': 'job1',
            'func': 'scheduler:task',
            'args': (1, 2),
            'trigger': 'cron',
            'day': '*',
            'hour': '13',
            'minute': '16',
            'second': '20'
        }
    ]
    SCHEDULER_API_ENABLED = True
def task(a, b):
    print(str(datetime.datetime.now()) + ' execute task ' + '{}+{}={}'.format(a, b, a + b))

该配置项则表示每天的13点16分20秒启动一次。*表示全部。

有关常用的cron配置有:

day

  • 表示天

hour

  • 表示小时

minute

  • 表示分钟

second

  • 表示秒

week

day_of_week

  • 星期几,如星期天使用sun,星期五使用fri,其他的类似。

使用装饰器定时启动任务

from flask import Flask
from flask_apscheduler import APScheduler
import datetime
class Config(object):
    SCHEDULER_API_ENABLED = True
scheduler = APScheduler()
# interval examples
@scheduler.task('interval', id='do_job_1', seconds=30, misfire_grace_time=900)
def job1():
    print(str(datetime.datetime.now()) + ' Job 1 executed')

表示每隔30秒调用一次job1函数。

到此这篇关于Python flask框架定时任务apscheduler应用介绍的文章就介绍到这了,更多相关Python flask apscheduler内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • python自动化测试中APScheduler Flask的应用示例

    目录 使用背景 什么是 APScheduler 框架? APScheduler 框架包含四个组成部分 APScheduler 在 flask 中使用 编写任务函数,开始 APScheduler 的调度 部分项目代码 总结 使用背景 实际项目中,需要验证打点数据在各个系统中收集是否一致,而部分节点打点数据收集是通过异步任务实现的,等待时间比较久.为应对业务异步操作处理,实现异步数据的收集,经过调研后,选择了 APScheduler 框架. 什么是 APScheduler 框架? APSchedul

  • Python Celery定时任务详细讲解

    目录 前言 一.Celery定时任务是什么? 二.使用步骤 1.代码结构 2.启动定时任务 3.执行结果 总结 前言 Celery在python中的应用除了实现异步任务(async task)外也可以执行定时任务(beat) 一.Celery定时任务是什么? Celery默认任务单元由任务生产者触发,但有时可能需要其自动触发,而Beat进程正是负责此类任务,能够自动触发定时/周期性任务. Celery 进行周期任务也很简单,只需要在配置中配置好周期任务,然后在运行一个周期任务触发器(beat)即

  • 详解Python下Flask-ApScheduler快速指南

    引言:Flask是Python社区非常流行的一个Web开发框架,本文将尝试将介绍APScheduler应用于Flask之中. 1. Flask介绍 Flask是Python社区大名鼎鼎的"microframework",基于简单的核心,使用extension来增加其他功能,其提供非常丰富易用的扩展包, 比如: 2.  Flask-APScheduler 社区提供了一个Flask-APScheduler的模块,方便大家直接在Flask模块中使用APScheduler. 关于安装的命令,仍

  • Python定时任务框架APScheduler安装使用详解

    目录 前言 一.APscheduler简介 二.APscheduler安装 三.APscheduler组成部分 1.Job 作业 2.Trigger 触发器 3.Jobstore 作业存储 4.Executor 执行器 5.scheduler 调度器 四.Scheduler工作流程图 1.Scheduler添加job流程 2.Scheduler调度流程 五.APscheduler使用 1.简单应用 2.操作作业 2.1 date触发器 2.2 interval触发器 2.3 cron触发器 参考

  • python 包之 APScheduler 定时任务

    目录 一.安装 二.定时执行一次 三.间隔执行 四.每日定时执行一次 五.每几分钟执行一次 六.每小时执行一次 七.调度器分类 一.安装 pip install apscheduler 二.定时执行一次 新建一个scheduler调度器 添加一个job store调度任务 运行调度任务 import datetime from apscheduler.schedulers.blocking import BlockingScheduler def task(name): print('%s告诉你

  • Python脚本实现定时任务的最佳方法

    目录 前言 问题描述 解决方案 总结 前言 在日常工作中,常常需要周期性地执行某些任务,常用的方式是采用 Linux 系统自带的 crond 结合命令行实现,但最近却遇到了一个让人头大的问题. 问题描述 一个包含cx_Oracle的python文件,直接在linux下使用python命令可以运行,但是设置crontab定时任务会报错如下: cx_Oracle.DatabaseError: DPI-1047: 64-bit Oracle Client library cannot be loade

  • Python flask框架定时任务apscheduler应用介绍

    目录 基本使用 trigger启动方式 cron启动方式 使用装饰器定时启动任务 flask-apscheduler将apscheduler移植到了flask应用中,使得在flask中可以非常方便的使用定时任务了,除此之外,它还有如下几个特性 根据Flask配置加载调度器配置 根据Flask配置加载任务调度器 允许指定服务器运行任务 提供RESTful API管理任务,也就是远程管理任务 为RESTful API提供认证 下载安装 pip install flask-apscheduler 基本

  • 浅谈Python flask框架

    目录 1. flask 框架概述 1.1flask 框架优势 1.2flask 框架获取 1.3flask 框架使用 2. flask demo步骤 3. flask 基础功能 3.1路由功能 3.2模版提供 4.总结  前言: Python 面向对象的高级编程语言,以其语法简单.免费开源.免编译扩展性高,同时也可以嵌入到C/C++程序和丰富的第三方库,Python运用到大数据分析.人工智能.web后端等应用场景上. Python 目前主要流行的web框架:flask.Django.Tornad

  • Python Flask框架使用介绍

    目录 1. 数据库连接池 2. wtfroms 3. 信号 3.1 内置信号 3.2 使用信号 3.3 自定义信号 4. 多app应用 5. flask-script 5.1 快速使用 5.2 自定制命令 1. 数据库连接池 使用 pymsql 链接数据库 导入:pip3 install dbutils pool.py 创建数据库连接池 from dbutils.pooled_db import PooledDB import pymysql POOL = PooledDB( creator=p

  • python flask框架详解

    Flask是一个Python编写的Web 微框架,让我们可以使用Python语言快速实现一个网站或Web服务.本文参考自Flask官方文档, 英文不好的同学也可以参考中文文档 1.安装flask pip install flask 2.简单上手 一个最小的 Flask 应用如下: from flask import Flask app = Flask(__name__) @app.route('/') def hello_world(): return 'Hello World' if __na

  • 使用python flask框架开发图片上传接口的案例详解

    python版本:3.6+ 需要模块:flask,pillow 需求:开发一个支持多格式图片上传的接口,并且将图片压缩,支持在线预览图片. 目录结构: app.py编辑内容: from flask import Flask, request, Response, render_template from werkzeug.utils import secure_filename import os import uuid from PIL import Image, ExifTags app =

  • 详解Python Flask框架的安装及应用

    目录 1.安装 1.1 创建虚拟环境 1.2 进入虚拟环境 1.3 安装 flask 2.上手 2.1 最小 Demo 2.2 基本知识 3.解构官网指导 Demo 3.1 克隆与代码架构分析 3.2 入口文件 init.py 3.3 数据库设置 3.4 蓝图和视图 4.其他 5.跑起 DEMO 1.安装 1.1 创建虚拟环境 mkdir myproject cd myproject python3 -m venv venv 1.2 进入虚拟环境 . venv/bin/activate 1.3

  • python Flask框架之HTTP请求详解

    我们的浏览器访问网站时,默认为发送了一个HTTP的GET请求. 在浏览网站时,会经常填写表单,比如填写用户名密码.点击登录后,会跳转到我们的主页. 接下来,我们实现这个案例. 首先我们先写一个登录页面 <!doctype html> <html lang="en"> <head> <title>Hello from Flask</title> </head> <body> <form action

  • Python Flask框架开发之运用SocketIO实现WebSSH方法详解

    Flask 框架中如果想要实现WebSocket功能有许多种方式,运用SocketIO库来实现无疑是最简单的一种方式,Flask中封装了一个flask_socketio库该库可以直接通过pip仓库安装,如下内容将重点简述SocketIO库在Flask框架中是如何被应用的,最终实现WebSSH命令行终端功能,其可用于在Web浏览器内实现SSH命令行执行. 首先我们先来看一下SocketIO库是如何进行通信的,对于前端部分需要引入socket.io这个框架,然后就是利用该框架内提供的各类函数实现创建

  • Python flask与fastapi性能测试方法介绍

    目录 背景 apache ab介绍 测试计划 测试代码 测试结果 结论 背景 sy项目通过MQ接受业务系统的业务数据,通过运行开发者开发的python脚本执行业务系统与财务系统数据的一致性校验. sy系统需要每天运行大量的python脚本.目前使用falsk日运行6W+次python脚本,由于性能存在瓶颈,需要引入 新的fastapi框架,来解决cpu.内存性能压榨不够及目前的性能瓶颈.本文目标给出两者的性能测试报告. 给出选择哪个框架的性能数据支撑. apache ab介绍 apache ab

  • Python Flask框架实现Proteus仿真Arduino与网页数据交互

    目录 实验原理 开发环境 Flask虚拟环境 Python Flask源码 用Proteus仿真Arduino 用com0com建立虚拟串口对 运行程序 实验原理 模拟电脑通过串口与Arduino开发板通信,并通过网页实现简单交互 开发环境 1.Windows10 2.Python3.10 3.Proteus8.6 4.com0com虚拟串口工具 Flask虚拟环境 先安装virtualenv: pip install virtualenv 建立项目文件夹(比如demo_4) 在demo_04文

随机推荐