Python使用signal定时结束AsyncIOScheduler任务的问题

在使用aiohttp结合apscheduler的AsyncIOScheduler模拟定点并发的时候遇到两个问题

在调度器scheduler.start()后,程序直接退出(在Jupiter中任务可以正常启动)如何在指定时间调用scheduler.shutdown()? (因为程序直接退出了)
原调试代码如下:

from datetime import datetime, timedelta

import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
async def get(session):
    url = 'https://httpbin.org/get?a=1'
    async with session.get(url) as res:
        print('get', res.status)
        return await res.text()

async def post(session):
    url = 'https://httpbin.org/post?b=2'
    async with session.post(url) as res:
        print('post', res.status)
        return await res.text()
async def main():
    async with aiohttp.ClientSession() as session:
        await get(session)
        await post(session)

if __name__ == '__main__':
    jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
    scheduler = AsyncIOScheduler(jobstores=jobstores)
    for i in range(10):  # 添加10个任务
        job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
    scheduler.start()

Google后发现AsyncIOScheduler的使用需要在scheduler启动后,需要自己调用asyncio.get_event_loop().run_forever()来启动协程任务。
但是一旦run_forever()则就会阻塞至死。除非有KeyboardInterrupt, SystemExit等异常或者强杀来停止其运行。
此时想到使用Python的signal来定时发送信号,修改后程序如下,可以正常延迟停止(感觉有点像模拟Go的defer)。

# -*- coding: utf-8 -*-
"""
@Time : 2021/7/23
@Auth : hanzhichao
@Desc:
"""
from datetime import datetime, timedelta
import signal
import asyncio

import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

async def get(session):
    url = 'https://httpbin.org/get?a=1'
    async with session.get(url) as res:
        print('get', res.status)
        return await res.text()

async def post(session):
    url = 'https://httpbin.org/post?b=2'
    async with session.post(url) as res:
        print('post', res.status)
        return await res.text()

async def main():
    async with aiohttp.ClientSession() as session:
        await get(session)
        await post(session)

if __name__ == '__main__':
    jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
    scheduler = AsyncIOScheduler(jobstores=jobstores)
    for i in range(10):  # 添加10个任务
        job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
    scheduler.start()
    signal.alarm(20)  # 20秒后终止程序
    asyncio.get_event_loop().run_forever()  # 永远运行

到此这篇关于Python使用signal定时结束AsyncIOScheduler任务的文章就介绍到这了,更多相关Python定时结束AsyncIOScheduler任务内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • python之线程通过信号pyqtSignal刷新ui的方法

    第一部分:UI界面设计 界面效果图如下: ui文件(可拉动控件自行创建一个button和text) <?xml version="1.0" encoding="UTF-8"?> <ui version="4.0"> <class>Dialog</class> <widget class="QDialog" name="Dialog"> <pr

  • 对Python信号处理模块signal详解

    Python中对信号处理的模块主要是使用signal模块,但signal主要是针对Unix系统,所以在Windows平台上Python不能很好的发挥信号处理的功能. 要查看Python中的信号量,可以使用dir(signal)来查看. signal.signal() 在signal模块中,主要是使用signal.signal()函数来预设信号处理函数 singnal.signal(signalnum, handler) 其中第一个参数是信号量,第二个参数信号处理函数. 下面看个简单的例子,其中

  • Python Web框架Flask信号机制(signals)介绍

    信号(signals) Flask信号(signals, or event hooking)允许特定的发送端通知订阅者发生了什么(既然知道发生了什么,那我们可以知道接下来该做什么了). Flask提供了一些信号(核心信号)且其它的扩展提供更多的信号.信号是用于通知订阅者,而不应该鼓励订阅者修改数据.相关信号请查阅文档. 信号依赖于Blinker库. 钩子(hooks) Flask钩子(通常出现在蓝图或应用程序现存的方法中,比如一些内置装饰器,例如before_request)不需要Blinker

  • 详解利用Python scipy.signal.filtfilt() 实现信号滤波

    本文将以实战的形式基于scipy模块使用Python实现简单滤波处理,包括内容有1.低通滤波,2.高通滤波,3.带通滤波,4.带阻滤波器.具体的含义大家可以查阅大学课程,信号与系统.简单的理解就是低通滤波指的是去除高于某一阈值频率的信号:高通滤波去除低于某一频率的信号:带通滤波指的是类似低通高通的结合保留中间频率信号:带阻滤波也是低通高通的结合只是过滤掉的是中间部分.上面所说的内容会在实战部分加以介绍,可以对比理解一下. 如何实现的呢?我的理解,是通过时域转换为频域,在频域信号中去除相应频域信号

  • Python使用signal定时结束AsyncIOScheduler任务的问题

    在使用aiohttp结合apscheduler的AsyncIOScheduler模拟定点并发的时候遇到两个问题 在调度器scheduler.start()后,程序直接退出(在Jupiter中任务可以正常启动)如何在指定时间调用scheduler.shutdown()? (因为程序直接退出了) 原调试代码如下: from datetime import datetime, timedelta import aiohttp from apscheduler.schedulers.asyncio im

  • 浅析Python中signal包的使用

    在liunx系统中要想每隔一分钟执行一个命令,最普遍的方法就是crontab了,如果不想使用crontab,经同事指点在程序中可以用定时器实现这种功能,于是就开始摸索了,发现需要一些信号的知识... 查看你的linux支持哪些信号:kill -l 即可 root@server:~# kill -l 1) SIGHUP   2) SIGINT   3) SIGQUIT  4) SIGILL   5) SIGTRAP  6) SIGABRT  7) SIGBUS   8) SIGFPE   9) S

  • 详解Python使用apscheduler定时执行任务

    apscheduler 的使用   我们项目中总是避免不了要使用一些定时任务,比如说最近的项目,用户点击报名考试以后需要在考试日期临近的时候推送小程序消息提醒到客户微信上,翻了翻 fastapi 中的实现,虽然方法和包也不少,但是要不就是太重了(比如需要再开服务,还要依赖 redis,都不好用),虽然也可以使用 time 模块的 time.sleep()机上 fastapi 的后台任务变相实现,但是相对简单的功能还行,复杂点的代码起来就麻烦了,所以还是专人专事找个负责这个额的包吧.找来找去发现

  • 利用Python实现Windows定时关机功能

    是最初的几个爬虫,让我认识了Python这个新朋友,虽然才刚认识了几天,但感觉有种莫名的默契感.每当在别的地方找不到思路,总能在Python找到解决的办法.自动关机,在平时下载大文件,以及跑程序的时候能用到的,刚才写了个windows自动关机的小程序,程序过于简单,就当是玩玩吧,当然还有很多可改进的地方.下面正文:  #ui制作: 照旧,笔者由Qt制作完成需要的ui,包括label,label_2,label_3,lable_4,lineEdit,lineEdit_2,pushButton组件.

  • 浅析python实现scrapy定时执行爬虫

    项目需要程序能够放在超算中心定时运行,于是针对scrapy写了一个定时爬虫的程序main.py ,直接放在scrapy的存储代码的目录中就能设定时间定时多次执行. 最简单的方法:直接使用Timer类 import time import os while True: os.system("scrapy crawl News") time.sleep(86400) #每隔一天运行一次 24*60*60=86400s或者,使用标准库的sched模块 import sched #初始化sch

  • 用python写一个定时提醒程序的实现代码

    身体是革命的本钱,身体健康了我们才有更多精力做自己想做的事情,追求女神,追求梦想.然而程序员是一个苦比的职业,大部分时间都对着电脑,我现在颈椎就不好了,有时候眼睛还疼,我还没20阿,伤心...于是乎写了一个小程序,指定时间会打开浏览器播放一段音乐,提醒我们休息一会儿,防止我们猝死,说多了都是泪. 较基础,适合python新手及对python感兴趣的同学阅读. 我们来理一遍这个程序,大概功能是:我们设置一个时间,时间到了以后会打开浏览器播放一段音频. 1.等待 2.打开浏览器,播放音频. 3.重复

  • python 每天如何定时启动爬虫任务(实现方法分享)

    python2.7环境下运行 安装相关模块 想要每天定时启动,最好是把程序放在linux服务器上运行,毕竟linux可以不用关机,即定时任务一直存活: #coding:utf8 import datetime import time def doSth(): # 把爬虫程序放在这个类里 print(u'这个程序要开始疯狂的运转啦') # 一般网站都是1:00点更新数据,所以每天凌晨一点启动 def main(h=1,m=0): while True: now = datetime.datetim

  • Python FTP文件定时自动下载实现过程解析

    这篇文章主要介绍了Python FTP文件定时自动下载实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.需求: 某数据公司每日15:00~17:00之间,在其FTP发布当日数据供下载,我方需及时下载当日数据至指定本地目录. 二.分析: 1.需实现FTP登陆.查询.下载功能: 解答:使用内置的ftplib模块中FTP类: 2.需判断文件是否下载: 解答:使用os模块中path.exists方法: 3.需判断在指定时间段内才执行下载任

  • celery在python爬虫中定时操作实例讲解

    使用定时功能对于我们想要快速获取某个数据来说,是一个非常好的方法.这样我们就不用苦苦守在电脑屏幕前,只为蹲到某个想要的东西.在之前我们已经讲过time函数进行定时操作,这算是time函数的比较基础的一个用法了.其实定时功能同样可以用celery实现,具体的方法我们往下看: 爬虫由于其特殊性,可能需要定时做增量抓取,也可能需要定时做模拟登陆,以防止cookie过期,而celery恰恰就实现了定时任务的功能.在上述基础上,我们将`tasks.py`文件改成如下内容 from celery impor

  • python tkinter实现定时关机

    本文实例为大家分享了python tkinter实现定时关机的具体代码,供大家参考,具体内容如下 很早以前写的,懒得修改,代码很简单,适合初学者 运行效果图如下: 使用注意 设定时间后点击开始即可实现定时关机,同时窗口也会关闭 想要取消关机的话,再次打开,点取消即可 代码如下 import os,time # import multiprocessing import threading from tkinter import * from tkinter import ttk import t

随机推荐