Django-celery-beat动态添加周期性任务实现过程解析

前期准备

1.beat插件安装

pip3 install django-celery-beat

2.注册APP

INSTALLED_APPS = [
....
'django_celery_beat',
]

3.数据库变更

python3 manage.py migrate django_celery_beat

配置工作

目录结构请参考://www.jb51.net/article/200659.htm

1.配置celerypro.py

from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
from django.utils import timezone

# set the default Django settings module for the 'celery' program.
# 为celery设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'voice_quality_assurance_configure.settings')
# 创建celery app
app = Celery('voice_quality_assurance_configure')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
# 从单独的配置模块中加载配置
app.config_from_object('voice_quality_assurance_configure.celeryconfig')
# 设置app自动加载任务
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# 解决时区问题,定时任务启动就循环输出
app.now = timezone.now

2.配置celeryconfig.py

from __future__ import absolute_import
from kombu import Queue
from django.conf import settings

# 设置代理人broker
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/2'
# 指定 Backend
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
# 指定时区,默认是 UTC
CELERY_TIMEZONE='Asia/Shanghai'
# celery 序列化与反序列化配置
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle', 'json']
CELERY_IGNORE_RESULT = True
# celery 的启动工作数量设置
CELERY_WORKER_CONCURRENCY = 10
# 任务预取功能,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
CELERYD_PREFETCH_MULTIPLIER = 20
# 有些情况下可以防止死锁
CELERYD_FORCE_EXECV = True
# celery 的 worker 执行多少个任务后进行重启操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# 禁用所有速度限制,如果网络资源有限,不建议开足马力。
CELERY_DISABLE_RATE_LIMITS = True

# celery beat配置(周期性任务设置)
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = settings.TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

3.分别启动woker和beta

项目根目录终端执行(voice_quality_assurance_configure为项目名称,简单来说,和manage.py文件同级)

celery -A voice_quality_assurance_configure beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler #

启动beta 调度器使用数据库

celery worker -A voice_quality_assurance_configure --loglevel=info -n worker1 #启动celery worker

4.创建周期性任务

from datetime import datetime, timedelta
import json
import os,django

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "voice_quality_assurance_configure.settings")# project_name 项目名称
django.setup()
from django_celery_beat.models import PeriodicTask, IntervalSchedule
schedule, created = IntervalSchedule.objects.get_or_create(every=10,period=IntervalSchedule.SECONDS,)

# 带参数的创建方法,如下:
PeriodicTask.objects.create(
   interval=schedule,     # 上面创建10秒的间隔 interval 对象
   name='test_task',     # 设置任务的name值
   task='mission.tasks.my_task', # 指定需要周期性执行的任务
   args=json.dumps([10, 2, 76]),
  expires=datetime.utcnow() + timedelta(seconds=30)
)

详解创建周期性任务的方法

创建基于interval的周期性任务

第一步创建间隔对象

schedule, created = IntervalSchedule.objects.get_or_create(
  every=10,
  period=IntervalSchedule.SECONDS,
)

IntervalSchedule.DAYS 固定间隔天数
IntervalSchedule.HOURS 固定间隔小时数
IntervalSchedule.MINUTES 固定间隔分钟数
IntervalSchedule.SECONDS 固定间隔秒数
IntervalSchedule.MICROSECONDS 固定间隔微秒

第二步创建任务

无参数的创建方法:

PeriodicTask.objects.create(
   interval=schedule,         # we created this above.
   name='test_task',     # simply describes this periodic task.
   task='app名.tasks.任务函数名', # name of task.)

有参数的创建方法:

PeriodicTask.objects.create(
   interval=schedule,         # we created this above.
   name='test'_task',     # simply describes this periodic task.
   task='app名.tasks.任务函数名', # name of task.
   args=json.dumps(['arg1', 'arg2']),
   kwargs=json.dumps({ 'be_careful': True, }),
   expires=datetime.utcnow() + timedelta(seconds=30) )
class MonitorDeviceTask(object):
  """
  设备创建,增加周期性任务
  """

  def __init__(self, device_obj):
    self.device_obj = device_obj
    self.periodic_task = PeriodicTask.objects.create(
      interval=schedule,
      name='test_task',
      task='mission.tasks.my_task',
      args=json.dumps([self.device_obj.ip])
    )

  def starttask(self):
    """
    启动任务
    """
    self.periodic_task.enabled = True
    self.periodic_task.save()

  def stoptask(self):
    """
    停止任务
    """
    self.periodic_task.enabled = False
    self.periodic_task.save()

  def deltask(self):
    """
    删除任务
    """
    self.periodic_task.delete()
    self.periodic_task.save()

创建基于 crontab 的周期性任务

from django_celery_beat.models import CrontabSchedule, PeriodicTask
schedule, _ = CrontabSchedule.objects.get_or_create(
   minute='30',
   hour='*',
   day_of_week='*',
   day_of_month='*',
   month_of_year='*',
   timezone=pytz.timezone('Canada/Pacific')
)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • django之使用celery-把耗时程序放到celery里面执行的方法

    1 在虚拟环境创建项目test和应用booktest(过程省略),然后安装所需的包 pip install celery==3.1.25 pip install celery-with-redis==3.0 pip install django-redis==3.1.17 2 配置settings, # 数据库使用mysql DATABASES = { 'default': { 'ENGINE':'django.db.backends.mysql', 'NAME':'test', 'USER':

  • 使用celery执行Django串行异步任务的方法步骤

    前言 Django项目有一个耗时较长的update过程,希望在接到请求运行update过程的时候,Django应用仍能正常处理其他的请求,并且update过程要求不能并行,也不能漏掉任何一个请求 使用celery的solo模式解决 安装redis https://github.com/microsoftarchive/redis/releases 下载.msi文件安装,会直接将redis注册为windows服务 安装celery与redis依赖 pip install celery pip in

  • 使用celery和Django处理异步任务的流程分析

    介绍 我们可能需要一些可以安排一些任务并定期运行一些任务或异步处理长任务的东西,而这一切都可以通过在Django Project中使用Celery来实现. 什么是Celery? Celery是 一个专注于实时处理的任务队列,它还支持任务调度. Celery快速,简单,高度可用且灵活. Celery需要消息传输来发送和接收消息,这可以由Redis或RabbitMQ完成. 入门 让我们开始在您的virtualenv中安装Celery软件包. 安装Celery <span class="nv&q

  • Django中celery执行任务结果的保存方法

    如下所示: pip3 install django-celery-results INSTALLED_APPS = ( ..., 'django_celery_results',) # 注意这个是下划线'_' python3 manage.py migrate django_celery_results CELERY_RESULT_BACKEND = 'django-db' #在settings.py文件中配置 注意异步任务views.py中调用时,想要记录结果必须是"任务函数.delay(*a

  • 如何使用Celery和Docker处理Django中的定期任务

    在构建和扩展Django应用程序时,不可避免地需要定期在后台自动运行某些任务. 一些例子: 生成定期报告 清除缓存 发送批量电子邮件通知 执行每晚维护工作 这是构建和扩展不属于Django核心的Web应用程序所需的少数功能之一.幸运的是,Celery提供了一个强大的解决方案,该解决方案非常容易实现,称为Celery Beat. 在下面的文章中,我们将向您展示如何使用Docker设置Django,Celery和Redis,以便通过Celery Beat定期运行自定义Django Admin命令.

  • Django实现celery定时任务过程解析

    1.首先在项目同名目录下建一个celery.py from __future__ import absolute_import import os from celery import Celery from datetime import timedelta from kombu import Queue # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANG

  • Django+Celery实现动态配置定时任务的方法示例

    哈喽,今天给大家分享一篇Django+Celery实现动态配置定时任务,因为最近也是无意间看到一位大佬关于这块的文章,然后自己觉得不错,也想学习写一下,然后最终实现功能是在前端页面统一管理计划任务,大家可以在admin管理页面设置,也可以在自己写的前端页面删除添加编辑,实时生效,还可以监控这些监控任务是否运行成功失败. 补充:如果大家对celery不熟悉的话,建议先学习celery 一.安装 1.在Linux系统上安装模块 celery (3.1.26.post2) celery-with-re

  • django 实现celery动态设置周期任务执行时间

    蓝鲸paas平台app开发经验分享 腾讯蓝鲸智云是一套基于PaaS的技术解决方案,提供了完善的前后台开发框架.调度引擎.公共组件等模块,帮助业务的产品和技术人员快速构建低成本.免运维的支撑工具和运营系统. PaaS平台不仅将应用服务的运行和开发环境作为一种服务提供给开发者用户,更为开发者用户提供了高效便捷的开发服务,如:组件系统,统一登录,权限管理,后台框架,MagicBox,桌面/工作台等. PaaS平台提供支持多语言的开发框架,助力运维人员能基于平台之上以自己擅长的技术语言(Python.j

  • Django集成celery发送异步邮件实例

    安装依赖 pip install django-celery-beat pip install django-celery-email pip install celery pip install msgpack-python pip install msgpack 在settings文件中配置 添加app应用到installed_apps中 "djcelery_email", "django_celery_beat" 修改.env文件配置: #邮箱后端,使用cel

  • Django-celery-beat动态添加周期性任务实现过程解析

    前期准备 1.beat插件安装 pip3 install django-celery-beat 2.注册APP INSTALLED_APPS = [ .... 'django_celery_beat', ] 3.数据库变更 python3 manage.py migrate django_celery_beat 配置工作 目录结构请参考://www.jb51.net/article/200659.htm 1.配置celerypro.py from __future__ import absolu

  • 关于Django使用 django-celery-beat动态添加定时任务的方法

    版本信息 # 插件安装 Django==2.2.2 django-celery-beat==2.1.0 django-redis==4.8.0 mysqlclient==2.0.0 django-mysql==3.2.0 redis==3.2.1 uWSGI==2.0.17.1 django-redis-cache==2.1.0 安装与配置 安装上面的对应的celery版本 配置settings.py # django时区配置 TIME_ZONE = 'Asia/Shanghai' # 如果US

  • Python SqlAlchemy动态添加数据表字段实例解析

    本文研究的主要是Python SqlAlchemy动态添加数据表字段,具体如下. 我们知道使用SqlAlchemy创建类来映射数据表,类属性等于数据库字段,但有时候要在我们创建表的时候,不确定数据表字段数量,遇到这种情况,应如何解决? 先看常规用法 from sqlalchemy import create_engine,Column,String,Integer class Mybase(Base): #表名 __tablename__ ='mycars' #字段,属性 myid=Column

  • Django中使用CORS实现跨域请求过程解析

    跨域请求: 请求url包含协议.网址.端口,任何一种不同都是跨域请求. 1.安装cors模块 pip install django-cors-headers 2.添加应用 INSTALLED_APPS = ( ... 'corsheaders', ... ) 3.设置中间件 MIDDLEWARE = [ 'corsheaders.middleware.CorsMiddleware', ... ] 4.添加允许访问的白名单,凡是出现在白名单的域名都可以访问后端接口 # CORS CORS_ORIG

  • Django中使用极验Geetest滑动验证码过程解析

    一,环境部署 1.创建一个django测试项目 二,文档部署 1.下载安装python对应的SDK 使用命令从Github导入完整项目:git clone https://github.com/GeeTeam/gt3-python-sdk.git 手动下载压缩包文件:https://github.com/GeeTeam/gt3-python-sdk/archive/master.zip 2.参数配置 修改请求参数(可选) 名称 说明 user_id 用户标识,若担心用户信息风险,可作预处理(如哈

  • Django 导出项目依赖库到 requirements.txt过程解析

    虚拟环境: 使用 pip freeze pip freeze > requirements.txt # 这种方式推荐配合 virtualenv ,否则会把整个环境中的包都列出来. 只扫描项目目录 使用 pipreqs 这个工具的好处是可以通过对项目目录的扫描,自动发现使用了那些类库,自动生成依赖清单.缺点是可能会有些偏差,需要检查并自己调整下. # 首先安装 pipreqs# pip install pipreqs # 安装位置在pip所在的目录下 # 使用方式也比较简单 pipreqs /va

  • SpringCloud添加客户端Eureka Client过程解析

    1.添加依赖 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> 2.添加客户端配置 /src/main/java/resource/application.yml server: port: 8770 eur

  • Python动态类型实现原理及过程解析

    在python中,我们使用变量时,并没有声明变量的存在和类型.类型是在运行过程中自动决定的. a = 3 python将会执行三步去完成上面这个请求. 1.创建一个对象代表3 2.创建一个变量a,如果a未创建. 3.将变量a与对象3相连接. 可以将变量a看作对象3的一个引用. a = 3 b = a 多个变量可以指向同一个对象,在Python中叫共享引用. Python在每个对象中保持了一个计数器,用于记录当前指向该对象的引用的数目,一旦计数器被设置为0,该对象的内存空间就会自动回收. 原处修改

  • Nginx服务器添加Systemd自定义服务过程解析

    一.以nginx为例 使用yum命令安装的nginx Systemd服务文件以.service结尾,比如现在要建立nginx为开机启动,如果用yum install命令安装的,yum命令会自动创建nginx.service文件,直接用命令: systemcel enable nginx.service //开机自启 使用源码编译安装的 1.手动创建nginx.service服务文件.并将其放入 /lib/systemd/system 文件夹中. nginx.service内容如下: [Unit]

随机推荐