celery实现动态设置定时任务

本文实例为大家分享了celery动态设置定时任务的具体代码,供大家参考,具体内容如下

首先celery是一种异步任务队列,如果还不熟悉这个开源软件的请先看看官方文档,快速入门。

这里讲的动态设置定时任务的方法不使用数据库保存定时任务的信息,所以是项目重启后定时任务配置就会丢失,如果想保存成永久配置,可以考虑保存到数据库、redis或者使用pickle、json保存成文件,在项目启动时自动载入。

方法原理介绍

先来看一下celery的beat运行过程。

上图是beat的主要组成结构,beat中包含了一个service对象,service中包含了一个scheduler对象,scheduler中包含了一个schedule字典,schedule中key对应的的value才是真正的定时任务,是整个beat中最小的单元。

首先分别介绍一下各个对象和它们运行的过程,beat是celery.apps.beat.Beat类创建的对象,调用beat.run()方法就可以启动beat,下面是beat.run()方法的源码。

def run(self):
 print(str(self.colored.cyan(
 'celery beat v{0} is starting.'.format(VERSION_BANNER))))
 self.init_loader()
 self.set_process_title()
 self.start_scheduler()

重点是在run()方法里调用了start_scheduler()方法,而start_scheduler()方法本质上是创建了一个service对象(celery.beat.Service类),并调用service.start()方法,下面是beat.start_scheduler()方法的源码。

def start_scheduler(self):
 if self.pidfile:
 platforms.create_pidlock(self.pidfile)
 service = self.Service(
 app=self.app,
 max_interval=self.max_interval,
 scheduler_cls=self.scheduler_cls,
 schedule_filename=self.schedule,
 )

 print(self.banner(service))

 self.setup_logging()
 if self.socket_timeout:
 logger.debug('Setting default socket timeout to %r',
 self.socket_timeout)
 socket.setdefaulttimeout(self.socket_timeout)
 try:
 self.install_sync_handler(service)
 service.start()
 except Exception as exc:
 logger.critical('beat raised exception %s: %r',
 exc.__class__, exc,
 exc_info=True)
 raise

调用了service.start()之后,会进入一个死循环,先使用self.scheduler.tick()获取下一个任务a的定时点到现在时间的间隔,然后进入睡眠,睡眠结束之后判断如果self.scheduler里的下一个任务a可以执行,就立即执行,并获取self.scheduler里的下下一个任务b的定时点到现在时间的间隔,进入下一次循环。下面是service.start()的源码。

def start(self, embedded_process=False):
 info('beat: Starting...')
 debug('beat: Ticking with max interval->%s',
 humanize_seconds(self.scheduler.max_interval))

 signals.beat_init.send(sender=self)
 if embedded_process:
 signals.beat_embedded_init.send(sender=self)
 platforms.set_process_title('celery beat')

 try:
 while not self._is_shutdown.is_set():
 interval = self.scheduler.tick()
 if interval and interval > 0.0:
 debug('beat: Waking up %s.',
 humanize_seconds(interval, prefix='in '))
 time.sleep(interval)
 if self.scheduler.should_sync():
 self.scheduler._do_sync()
 except (KeyboardInterrupt, SystemExit):
 self._is_shutdown.set()
 finally:
 self.sync()

service.scheduler默认是celery.beat.PersistentScheduler类的实例对象,而celery.beat.PersistentScheduler其实是celery.beat.Scheduler的子类,所以scheduler.schedule是celery.beat.Scheduler类中的字典,保存的是celery.beat.ScheduleEntry类型的对象。ScheduleEntry的实例对象保存了定时任务的名称、参数、定时信息、过期时间等信息。celery.beat.Scheduler类实现了对schedule的更新方法即update_from_dict(self, dict_)方法。下面是update_from_dict(self, dict_)方法的源码。

def _maybe_entry(self, name, entry):
 if isinstance(entry, self.Entry):
 entry.app = self.app
 return entry
 return self.Entry(**dict(entry, name=name, app=self.app))

def update_from_dict(self, dict_):
 self.schedule.update({
 name: self._maybe_entry(name, entry)
 for name, entry in items(dict_)
 })

可以看到update_from_dict(self, dict_)方法实际上是向schedule中更新了self.Entry的实例对象,而self.Entry从celery.beat.Scheduler的源码知道是celery.beat.ScheduleEntry。

到这里整个流程就粗略的介绍完了,基本过程是这个样子。

但是从前面start_scheduler()的源码可以看到,beat在内部创建一个service之后,就直接进入死循环了,所以从外面无法拿到service对象,就不能对service里的scheduler对象操作,就不能对scheduler的schedule字典操作,所以就无法在beat运行的过程中动态添加定时任务。

方法介绍

前面介绍完原理,现在来讲一下解决思路。主要思路就是让start_scheduler方法中创建的service暴露出来。所以就想到手写一个类去继承Beat,重写start_scheduler()方法。

import socket
from celery import platforms
from celery.apps.beat import Beat

class MyBeat(Beat):
 '''
 继承Beat 添加一个获取service的方法
 '''
 def start_scheduler(self):
 if self.pidfile:
  platforms.create_pidlock(self.pidfile)
 # 修改了获取service的方式
 service = self.get_service()

 print(self.banner(service))

 self.setup_logging()
 if self.socket_timeout:
  logger.debug('Setting default socket timeout to %r',
    self.socket_timeout)
  socket.setdefaulttimeout(self.socket_timeout)
 try:
  self.install_sync_handler(service)
  service.start()
 except Exception as exc:
  logger.critical('beat raised exception %s: %r',
    exc.__class__, exc,
    exc_info=True)
  raise

 def get_service(self):
 '''
 这个是自定义的 目的是为了把service暴露出来,方便对service的scheduler操作,因为定时任务信息都存放在service.scheduler里
 :return:
 '''
 service = getattr(self, "service", None)
 if service is None:
  service = self.Service(
  app=self.app,
  max_interval=self.max_interval,
  scheduler_cls=self.scheduler_cls,
  schedule_filename=self.schedule,
  )
  setattr(self, "service", service)
 return self.service

在MyBeat类中添加一个get_service()方法,如果beat没有servic对象就创建一个,如果有就直接返回,方便对service的scheduler操作。

然后在此基础上实现对定时任务的增删改查操作。

def add_cron_task(task_name: str, cron_task: str, minute='*', hour='*', day_of_week='*', day_of_month='*',
   month_of_year='*', **kwargs):
 '''
 创建或更新定时任务
 :param task_name: 定时任务名称
 :param cron_task: task名称
 :param minute: 以下是时间
 :param hour:
 :param day_of_week:
 :param day_of_month:
 :param month_of_year:
 :param kwargs:
 :return:
 '''
 service = beat.get_service()
 scheduler = service.scheduler
 entries = dict()
 entries[task_name] = {
 'task': cron_task,
 'schedule': crontab(minute=minute, hour=hour, day_of_week=day_of_week, day_of_month=day_of_month,
    month_of_year=month_of_year, **kwargs),
 'options': {'expires': 3600}}
 scheduler.update_from_dict(entries)

def del_cron_task(task_name: str):
 '''
 删除定时任务
 :param task_name:
 :return:
 '''
 service = beat.get_service()
 scheduler = service.scheduler
 if scheduler.schedule.get(task_name, None) is not None:
 del scheduler.schedule[task_name]

def get_cron_task():
 '''
 获取当前所有定时任务的配置
 :return:
 '''
 service = beat.get_service()
 scheduler = service.scheduler
 ret = [{k: {"task": v.task, "crontab": v.schedule}} for k, v in scheduler.schedule.items()]
 return ret

但是仅仅是这样还不能解决问题,从前面的serive.start()的源码看到,beat启动后会进入一个死循环,如果直接在主线程启动beat,必然会阻塞在死循环中,所以需要为beat创建一个子线程,这样才影响主线程的其他操作。

flag = False

beat = MyBeat(max_interval=10, app=celery_app, socket_timeout=30, pidfile=None, no_color=None,
  loglevel='INFO', logfile=None, schedule=None, scheduler='celery.beat.PersistentScheduler',
  scheduler_cls=None, # XXX use scheduler
  redirect_stdouts=None,
  redirect_stdouts_level=None)

# 设置主动启动beat是为了避免使用celery -A celery_demo worker 命令重复启动worker
def run():
 '''
 启动Beat
 :return:
 '''
 beat.run()

def new_thread():
 '''
 创建一个线程启动Beat 最多只能创建一个
 :return:
 '''
 global flag
 if not flag:
 t = threading.Thread(target=run, daemon=True)
 t.start()
 # 启动成功2s后才能操作定时任务 否则可能会报错
 time.sleep(2)
 flag = True

可能看到上面的代码有人会想,为什么不在主程序加载完成就启动为beat创建一个子线程,还非要写个函数等待主动调用?这是因为例如在使用django+celery组合时,一般启动django和启动celery woker是两个独立的进程,如果让django在加载代码的时候自动启动beat的子线程,那么在使用celery -A demo_name worker 启动celery时,会重新加载一边django的代码,因为celery需要扫描每个app下的tasks.py文件,加载异步任务函数,这时启动celery woker就会也启动一个beat子线程,可能会造成定时任务重复执行的情况。所以在这里设置成主动开启beat子线程,目的就是为了celery worker启动不重复创建beat线程。

完整的代码如下:

import socket
import time
import threading
from celery import platforms
from celery.schedules import crontab
from celery.apps.beat import Beat
from celery.utils.log import get_logger
from celery_demo import celery_app

logger = get_logger('celery.beat')
flag = False

class MyBeat(Beat):
 '''
 继承Beat 添加一个获取service的方法
 '''
 def start_scheduler(self):
 if self.pidfile:
  platforms.create_pidlock(self.pidfile)
 # 修改了获取service的方式
 service = self.get_service()

 print(self.banner(service))

 self.setup_logging()
 if self.socket_timeout:
  logger.debug('Setting default socket timeout to %r',
    self.socket_timeout)
  socket.setdefaulttimeout(self.socket_timeout)
 try:
  self.install_sync_handler(service)
  service.start()
 except Exception as exc:
  logger.critical('beat raised exception %s: %r',
    exc.__class__, exc,
    exc_info=True)
  raise

 def get_service(self):
 '''
 这个是自定义的 目的是为了把service暴露出来,方便对service的scheduler操作,因为定时任务信息都存放在service.scheduler里
 :return:
 '''
 service = getattr(self, "service", None)
 if service is None:
  service = self.Service(
  app=self.app,
  max_interval=self.max_interval,
  scheduler_cls=self.scheduler_cls,
  schedule_filename=self.schedule,
  )
  setattr(self, "service", service)
 return self.service

beat = MyBeat(max_interval=10, app=celery_app, socket_timeout=30, pidfile=None, no_color=None,
  loglevel='INFO', logfile=None, schedule=None, scheduler='celery.beat.PersistentScheduler',
  scheduler_cls=None, # XXX use scheduler
  redirect_stdouts=None,
  redirect_stdouts_level=None)

# 设置主动启动beat是为了避免使用celery -A celery_demo worker 命令重复启动worker
def run():
 '''
 启动Beat
 :return:
 '''
 beat.run()

def new_thread():
 '''
 创建一个线程启动Beat 最多只能创建一个
 :return:
 '''
 global flag
 if not flag:
 t = threading.Thread(target=run, daemon=True)
 t.start()
 # 启动成功2s后才能操作定时任务 否则可能会报错
 time.sleep(2)
 flag = True

def add_cron_task(task_name: str, cron_task: str, minute='*', hour='*', day_of_week='*', day_of_month='*',
   month_of_year='*', **kwargs):
 '''
 创建或更新定时任务
 :param task_name: 定时任务名称
 :param cron_task: task名称
 :param minute: 以下是时间
 :param hour:
 :param day_of_week:
 :param day_of_month:
 :param month_of_year:
 :param kwargs:
 :return:
 '''
 service = beat.get_service()
 scheduler = service.scheduler
 entries = dict()
 entries[task_name] = {
 'task': cron_task,
 'schedule': crontab(minute=minute, hour=hour, day_of_week=day_of_week, day_of_month=day_of_month,
    month_of_year=month_of_year, **kwargs),
 'options': {'expires': 3600}}
 scheduler.update_from_dict(entries)

def del_cron_task(task_name: str):
 '''
 删除定时任务
 :param task_name:
 :return:
 '''
 service = beat.get_service()
 scheduler = service.scheduler
 if scheduler.schedule.get(task_name, None) is not None:
 del scheduler.schedule[task_name]

def get_cron_task():
 '''
 获取当前所有定时任务的配置
 :return:
 '''
 service = beat.get_service()
 scheduler = service.scheduler
 ret = [{k: {"task": v.task, "crontab": v.schedule}} for k, v in scheduler.schedule.items()]
 return ret

另外还可以参考我的github,相关的注释在代码里写的较为清晰。

注意:使用这种方式添加/删除定时任务只是保存在内存中的,项目重启后就会丢失。如果想要持久化,可以参照上面的方法,把相关信息保存到数据库或其他可持久保存文件中,在beat线程启动时加载相关任务信息,在对定时任务修改做增删改时及时修改数据库或文件中内容。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • python Celery定时任务的示例

    本文介绍了python Celery定时任务的示例,分享给大家,具体如下: 配置 启用Celery的定时任务需要设置CELERYBEAT_SCHEDULE . Celery的定时任务都由celery beat来进行调度.celery beat默认按照settings.py之中的时区时间来调度定时任务. 创建定时任务 一种创建定时任务的方式是配置CELERYBEAT_SCHEDULE: #每30秒调用task.add from datetime import timedelta CELERYBEA

  • Django+Celery实现动态配置定时任务的方法示例

    哈喽,今天给大家分享一篇Django+Celery实现动态配置定时任务,因为最近也是无意间看到一位大佬关于这块的文章,然后自己觉得不错,也想学习写一下,然后最终实现功能是在前端页面统一管理计划任务,大家可以在admin管理页面设置,也可以在自己写的前端页面删除添加编辑,实时生效,还可以监控这些监控任务是否运行成功失败. 补充:如果大家对celery不熟悉的话,建议先学习celery 一.安装 1.在Linux系统上安装模块 celery (3.1.26.post2) celery-with-re

  • django+xadmin+djcelery实现后台管理定时任务

    继上一篇中间表的数据是动态的,图表展示的数据才比较准确.这里用到一个新的模块Djcelery,安装配置步骤如下: 1.安装 redis==2.10.6 celery==3.1.23 django-celery==3.1.17 flower==0.9.2 supervisor==3.3.4 flower用于监控定时任务,supervisor管理进程,可选 2.配置 settings.py中添加以下几行: #最顶头加上 from __future__ import absolute_import #

  • Django实现celery定时任务过程解析

    1.首先在项目同名目录下建一个celery.py from __future__ import absolute_import import os from celery import Celery from datetime import timedelta from kombu import Queue # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANG

  • celery实现动态设置定时任务

    本文实例为大家分享了celery动态设置定时任务的具体代码,供大家参考,具体内容如下 首先celery是一种异步任务队列,如果还不熟悉这个开源软件的请先看看官方文档,快速入门. 这里讲的动态设置定时任务的方法不使用数据库保存定时任务的信息,所以是项目重启后定时任务配置就会丢失,如果想保存成永久配置,可以考虑保存到数据库.redis或者使用pickle.json保存成文件,在项目启动时自动载入. 方法原理介绍 先来看一下celery的beat运行过程. 上图是beat的主要组成结构,beat中包含

  • Django配置celery(非djcelery)执行异步任务和定时任务

    所有演示均基于Django2.0 celery是一个基于python开发的简单.灵活且可靠的分布式任务队列框架,支持使用任务队列的方式在分布式的机器/进程/线程上执行任务调度.采用典型的生产者-消费者模型,主要由三部分组成: 消息队列broker:broker实际上就是一个MQ队列服务,可以使用redis.rabbitmq等作为broker 处理任务的消费者workers:broker通知worker队列中有任务,worker去队列中取出任务执行,每一个worker就是一个进程 存储结果的bac

  • python基于celery实现异步任务周期任务定时任务

    这篇文章主要介绍了python基于celery实现异步任务周期任务定时任务,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 hello, 小伙伴们, 好久不更新了,这一次带来的是celery在python中的应用以及设置异步任务周期任务和定时任务的步骤,希望能给入坑的你带来些许帮助. 首先是对celery的介绍,Celery其实是一个专注于实时处理和调度任务的分布式任务队列,同时提供操作和维护分布式系统所需要的全部数据, 因此可以用它提供的接口快

  • celery4+django2定时任务的实现代码

    网上有很多celery + django实现定时任务的教程,不过它们大多数是基于djcelery + celery3的: 或者是使用django_celery_beat配置较为繁琐的. 显然简洁而高效才是我们最终的追求,而celery4已经不需要额外插件即可与django结合实现定时任务了,原生的celery beat就可以很好的实现定时任务功能. 当然使用原生方案的同时有几点插件所带来的好处被我们放弃了: 插件提供的定时任务管理将不在可用,当我们只需要任务定期执行而不需要人为调度的时候这点忽略

  • 关于Django使用 django-celery-beat动态添加定时任务的方法

    版本信息 # 插件安装 Django==2.2.2 django-celery-beat==2.1.0 django-redis==4.8.0 mysqlclient==2.0.0 django-mysql==3.2.0 redis==3.2.1 uWSGI==2.0.17.1 django-redis-cache==2.1.0 安装与配置 安装上面的对应的celery版本 配置settings.py # django时区配置 TIME_ZONE = 'Asia/Shanghai' # 如果US

  • django 实现celery动态设置周期任务执行时间

    蓝鲸paas平台app开发经验分享 腾讯蓝鲸智云是一套基于PaaS的技术解决方案,提供了完善的前后台开发框架.调度引擎.公共组件等模块,帮助业务的产品和技术人员快速构建低成本.免运维的支撑工具和运营系统. PaaS平台不仅将应用服务的运行和开发环境作为一种服务提供给开发者用户,更为开发者用户提供了高效便捷的开发服务,如:组件系统,统一登录,权限管理,后台框架,MagicBox,桌面/工作台等. PaaS平台提供支持多语言的开发框架,助力运维人员能基于平台之上以自己擅长的技术语言(Python.j

  • Python Celery动态添加定时任务生产实践指南

    目录 一.背景 二.Celery动态添加定时任务的官方文档 三.celery简单实用 3.1 基础环境配置 3.2 测试使用Celery应用 四.配置backend存储任务执行结果 四.优化Celery目录结构 五.开始使用django-celery-beat调度器 六.具体操作演练 6.1 创建基于间隔时间的周期性任务 6.2 创建一个不带参数的周期性间隔任务 6.3 周期性任务的查询.删除操作 总结 一.背景 实际工作中会有一些耗时的异步任务需要使用定时调度,比如发送邮件,拉取数据,执行定时

  • springboot整合Quartz实现动态配置定时任务的方法

    前言 在我们日常的开发中,很多时候,定时任务都不是写死的,而是写到数据库中,从而实现定时任务的动态配置,下面就通过一个简单的示例,来实现这个功能. 一.新建一个springboot工程,并添加依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency

  • java 动态增加定时任务示例

    整理文档,java 动态增加定时任务示例,直接上代码. import org.apache.tools.ant.util.DateUtils; import org.quartz.CronTrigger; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.SchedulerFactory; import org.quartz.impl.StdSchedulerFactory; import ja

  • Quartz+Spring Boot实现动态管理定时任务

    项目实践过程中碰到一个动态管理定时任务的需求:针对每个人员进行信息的定时更新,具体更新时间可随时调整.启动.暂定等. 思路 将每个人员信息的定时配置保存到数据库中,这样实现了任务的动态展示和管理.任务的每一次新增或变更,都会去数据库变更信息. 设置一个统一的任务管理器,专门负责动态任务的增删改查. POM依赖 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://mav

  • Spring动态添加定时任务的实现思路

    一.背景 在工作中,有些时候我们有些定时任务的执行可能是需要动态修改的,比如: 生成报表,有些项目配置每天的8点生成,有些项目配置每天的10点生成,像这种动态的任务执行时间,在不考虑分布式执行的情况下,我们可以 使用 Spring Task来简单的实现. 二.需求和实现思路 1.能够动态的添加一个定时任务. 在Spring中存在一个类ThreadPoolTaskScheduler,它可以实现根据一个cron表达式来调度一个任务,并返回一个ScheduledFuture对象. 2.能够取消定时任务

  • SpringBoot 任务调度动态设置方式(不用重启服务)

    目录 SpringBoot 任务调度动态设置 1.初始化入口(注解 @PostConstruct) 2.任务调度类 SpringBoot 调度任务 1.首先需要用@EnableScheduling 2.@Scheduled 注解用于标注 一些常用的任务表达式 SpringBoot 任务调度动态设置 1.初始化入口(注解 @PostConstruct) @PostConstruct private void initTimedTask() { //初始化任务调度器cron,可以从数据库中查询到cr

随机推荐