Python Celery动态添加定时任务生产实践指南

目录
  • 一、背景
  • 二、Celery动态添加定时任务的官方文档
  • 三、celery简单实用
    • 3.1 基础环境配置
    • 3.2 测试使用Celery应用
  • 四、配置backend存储任务执行结果
  • 四、优化Celery目录结构
  • 五、开始使用django-celery-beat调度器
  • 六、具体操作演练
    • 6.1 创建基于间隔时间的周期性任务
    • 6.2 创建一个不带参数的周期性间隔任务
    • 6.3 周期性任务的查询、删除操作
  • 总结

一、背景

实际工作中会有一些耗时的异步任务需要使用定时调度,比如发送邮件,拉取数据,执行定时脚本

通过celery 实现调度主要思想是 通过引入中间人redis,启动 worker 进行任务执行 ,celery-beat进行定时任务数据存储

二、Celery动态添加定时任务的官方文档

celery文档:https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

celery 自定义调度类说明:

自定义调度器类可以在命令行中指定(--scheduler参数)

django-celery-beat文档 : https://pypi.org/project/django-celery-beat/

关于django-celery-beat 插件的说明:

此扩展使您能够将定期任务计划存储在数据库中,可以从 Django 管理界面管理周期性任务,您可以在其中创建、编辑和删除周期性任务以及它们应该运行的频率

三、celery简单实用

3.1 基础环境配置

1. 安装最新版本的Django

pip3 install django #当前我安装的版本是 3.0.6

2. 创建项目

django-admin startproject typeidea
django-admin startapp blog

3.安装 celery

pip3 install django-celery
pip3 install -U Celery
pip3 install "celery[librabbitmq,redis,auth,msgpack]"
pip3 install django-celery-beat # 用于动态添加定时任务
pip3 install django-celery-results
pip3 install redis

3.2 测试使用Celery应用

1. 创建blog目录、新建task.py

首先在Django项目中创建一个blog文件夹,并且在blog文件夹下创建tasks.py模块, 如下:

tasks.py代码如下:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""
#File: tasks.py
#Time: 2022/3/30 2:26 下午
#Author: julius
"""
from celery import Celery

# 使用redis做为broker
app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0')

# 创建任务函数
@app.task
def my_task():
    print('任务正在执行...')

Celery第一个参数是给其设定一个名字, 第二参数我们设定一个中间人broker, 在这里我们使用Redis作为中间人。my_task函数是我们编写的一个任务函数, 通过加上装饰器app.task, 将其注册到broker的队列中。

2. 启动redis、创建worker

现在我们在创建一个worker, 等待处理队列中的任务。

进入项目的根目录,执行命令: celery -A celery_tasks.tasks worker -l info

3. 调用任务

下面来测试一下功能,创建一个任务,加入任务队列中,提供worker执行。

进入python终端, 执行如下代码:

$ python manage.py shell
>>> from blog.tasks import my_task
>>> my_task.delay()
<AsyncResult: 83484dfe-f729-417b-8e51-6c7ae32a1377>

调用一个任务函数,将会返回一个AsyncResult对象,这个对象可以用来检查任务的状态或者获得任务的返回值。

4. 查看结果

在worker的终端查看任务执行情况,可以看到已经收到83484dfe-f729-417b-8e51-6c7ae32a1377 任务,并打印了任务执行信息

5. 存储并查看任务执行状态

把任务执行结果赋值给ret,然后调用result() 会产生 DisabledBackend 报错,可见没有配置后端存储的时候并不能保存任务执行的状态信息,下一节我们会讲到如何配置backend保存任务执行结果

$ python manage.py shell
>>> from blog.tasks import my_task
>>> ret=my_task.delay()
>>> ret.result()

四、配置backend存储任务执行结果

如果我们想跟踪任务的状态,Celery需要将结果保存到某个地方。有几种保存的方案可选:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。

1. 添加backend参数

在本例中我们使用Redis作为存储结果的方案,通过Celery的backend参数来设定任务结果存储地址。我们将tasks模块修改如下:

from celery import Celery

# 使用redis作为broker以及backend
app = Celery('celery_tasks.tasks',
             broker='redis://127.0.0.1:6379/8',
             backend='redis://127.0.0.1:6379/9')

# 创建任务函数
@app.task
def my_task(a, b):
    print("任务函数正在执行....")
    return a + b

给Celery增加了backend参数,指定redis作为结果存储,并将任务函数修改为两个参数,并且有返回值。

2. 调用任务/查看任务执行结果

下面再来执行调用一下这个任务看看。

$ python manage.py shell
>>> from blog.tasks import my_task
>>> res=my_task.delay(10,40)
>>> res.result
50
>>> res.failed()
False

再来看看worker的执行情况,如下:

可以看到celery任务已经执行成功了。

但是这只是一个开始,下一步要看看如何添加定时的任务。

四、优化Celery目录结构

上面直接将Celery的应用创建、配置、tasks任务全部写在了一个文件,这样在后面项目越来越大,也是不方便的。下面来拆分一下,并且添加一些常用的参数。

基本结构如下

$ vim typeidea/celery.py (Celery应用文件)

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""
#File: celery.py
#Time: 2022/3/30 12:25 下午
#Author: julius
"""
import os
from celery import Celery
from blog import celeryconfig
project_name='typeidea'
# set the default django setting module for the 'celery' program
os.environ.setdefault('DJANGO_SETTINGS_MODULE','typeidea.settings')
app = Celery(project_name)

app.config_from_object('django.conf:settings')

app.autodiscover_tasks()

vim blog/celeryconfig.py (配置Celery的参数文件)

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""
#File: celeryconfig.py
#Time: 2022/3/30 2:54 下午
#Author: julius
"""

# 设置结果存储
from typeidea import settings
import os

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
# 设置代理人broker
BROKER_URL = 'redis://127.0.0.1:6379/1'
# celery 的启动工作数量设置
CELERY_WORKER_CONCURRENCY = 20
# 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
CELERYD_PREFETCH_MULTIPLIER = 20
# 非常重要,有些情况下可以防止死锁
CELERYD_FORCE_EXECV = True
# celery 的 worker 执行多少个任务后进行重启操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# 禁用所有速度限制,如果网络资源有限,不建议开足马力。
CELERY_DISABLE_RATE_LIMITS = True

CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = settings.TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

vim blog/tasks.py (tasks 任务文件)

import time
from blog.celery import app

# 创建任务函数
@app.task
def my_task(a, b, c):
    print('任务正在执行...')
    print('任务1函数休眠10s')
    time.sleep(10)
    return a + b + c

五、开始使用django-celery-beat调度器

使用 django-celery-beat 动态添加定时任务  celery 4.x 版本在 django 框架中是使用 django-celery-beat 进行动态添加定时任务的。前面虽然已经安装了这个库,但是还要再说明一下。

官网的配置说明
https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

1. 安装 django-celery-beat

pip3 install django-celery-beat

2.在项目的 settings 文件配置 django-celery-beat 

INSTALLED_APPS = [
    'blog',
    'django_celery_beat',
    ...
]

# Django设置时区
LANGUAGE_CODE = 'zh-hans'  # 使用中国语言
TIME_ZONE = 'Asia/Shanghai'  # 设置Django使用中国上海时间
# 如果USE_TZ设置为True时,Django会使用系统默认设置的时区,此时的TIME_ZONE不管有没有设置都不起作用
# 如果USE_TZ 设置为False,TIME_ZONE = 'Asia/Shanghai', 则使用上海的UTC时间。
USE_TZ = False

3. 创建 django-celery-beat 相关表

执行Django数据库迁移: python manage.py migrate

4. 配置Celery使用 django-celery-beat

配置 celery.py

import os

from celery import Celery

from blog import celeryconfig

# 为celery 设置环境变量
os.environ.setdefault("DJANGO_SETTINGS_MODULE","typeidea.settings")
# 创建celery app
app = Celery('blog')
# 从单独的配置模块中加载配置
app.config_from_object(celeryconfig)

# 设置app自动加载任务
app.autodiscover_tasks([
    'blog',
])

配置 celeryconfig.py

# 设置结果存储
from typeidea import settings
import os

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "typeidea.settings")
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
# 设置代理人broker
BROKER_URL = 'redis://127.0.0.1:6379/1'
# celery 的启动工作数量设置
CELERY_WORKER_CONCURRENCY = 20
# 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
CELERYD_PREFETCH_MULTIPLIER = 20
# 非常重要,有些情况下可以防止死锁
CELERYD_FORCE_EXECV = True
# celery 的 worker 执行多少个任务后进行重启操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# 禁用所有速度限制,如果网络资源有限,不建议开足马力。
CELERY_DISABLE_RATE_LIMITS = True

CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = settings.TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
 

编写任务 tasks.py

import time
from celery import Celery
from blog.celery import app

# 使用redis做为broker
# app = Celery('blog.tasks2',broker='redis://127.0.0.1:6379/0',backend='redis://127.0.0.1:6379/1')

# 创建任务函数
@app.task
def my_task(a, b, c):
    print('任务正在执行...')
    print('任务1函数休眠10s')
    time.sleep(10)
    return a + b + c

@app.task
def my_task2():
    print("任务2函数正在执行....")
    print('任务2函数休眠10s')
    time.sleep(10)

5. 启动定时任务work

启动定时任务首先需要有一个work执行异步任务,然后再启动一个定时器触发任务。

启动任务 work

$ celery -A blog worker -l info 

启动定时器触发 beat

celery -A blog beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

六、具体操作演练

6.1 创建基于间隔时间的周期性任务

1. 初始化周期间隔对象interval 对象

>>> from django_celery_beat.models import PeriodicTask, IntervalSchedule
>>> schedule, created = IntervalSchedule.objects.get_or_create(
...       every=10,
...       period=IntervalSchedule.SECONDS,
...  )
>>> IntervalSchedule.objects.all()
<QuerySet [<IntervalSchedule: every 10 seconds>]>

2.创建一个无参数的周期性间隔任务

>>>PeriodicTask.objects.create(interval=schedule,name='my_task2',task='blog.tasks.my_task2',)
<PeriodicTask: my_task2: every 10 seconds>

beat 调度服务日志显示如下:

worker 服务日志显示如下:

3.创建一个带参数的周期性间隔任务

>>> PeriodicTask.objects.create(interval=schedule,name='my_task',task='blog.tasks.my_task',args=json.dumps([10,20,30]))
<PeriodicTask: my_task: every 10 seconds>

beat 调度服务日志结果:

worker 服务日志结果:

4.如何高并发执行任务

需要并行执行任务的时候,就需要设置多个worker来执行任务。

6.2 创建一个不带参数的周期性间隔任务

1.初始化 crontab 的调度对象

>>> import pytz
>>> schedule, _ = CrontabSchedule.objects.get_or_create(
... minute='*',
... hour='*',
... day_of_week='*',
... day_of_month='*',
... timezone=pytz.timezone('Asia/Shanghai')
... )

2. 创建不带参数的定时任务

PeriodicTask.objects.create(crontab=schedule,name='my_task2_crontab',task='blog.tasks.my_task2',)

beat 调度服务执行结果

worker 执行服务结果

6.3 周期性任务的查询、删除操作

1. 周期性任务的查询

>>> PeriodicTask.objects.all()
<ExtendedQuerySet [<PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/dM/MY/d) Asia/Shanghai>, <PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>]>
>>> PeriodicTask.objects.get(name='my_task2_crontab')
<PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
>>> for task in PeriodicTask.objects.all():
...     print(task.id)
...
1
13
>>> PeriodicTask.objects.get(id=13)
<PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>
>>> PeriodicTask.objects.get(name='my_task2_crontab')
<PeriodicTask: my_task2_crontab: * * * * * (m/h/dM/MY/d) Asia/Shanghai>

控制台实际操作记录

2.周期性任务的暂停/启动

2.1 设置my_taks2_crontab 暂停任务

>>> my_task2_crontab = PeriodicTask.objects.get(id=13)
>>> my_task2_crontab.enabled
True
>>> my_task2_crontab.enabled=False
>>> my_task2_crontab.save()

查看worker输出:

可以看到worker从19:31以后已经没有输出了,说明已经成功吧my_task2_crontab 任务暂停

2.2 设置my_task2_crontab 开启任务

把任务的 enabled 为 True 即可:

>>> my_task2_crontab.enabled
False
>>> my_task2_crontab.enabled=True
>>> my_task2_crontab.save()

查看worker输出:

可以看到worker从19:36开始有输出,说明已把my_task2_crontab 任务重新启动

3. 周期性任务的删除

获取到指定的任务后调用delete(),再次查询指定任务会发现已经不存在了

PeriodicTask.objects.get(name='my_task2_crontab').delete()
>>> PeriodicTask.objects.get(name='my_task2_crontab')
Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/manager.py", line 85, in manager_method
    return getattr(self.get_queryset(), name)(*args, **kwargs)
  File "/Users/julius/PycharmProjects/typeidea/.venv/lib/python3.9/site-packages/django/db/models/query.py", line 435, in get
    raise self.model.DoesNotExist(
django_celery_beat.models.PeriodicTask.DoesNotExist: PeriodicTask matching query does not exist.

总结

到此这篇关于Python Celery动态添加定时任务生产实践的文章就介绍到这了,更多相关Celery动态添加定时任务内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Django实现celery定时任务过程解析

    1.首先在项目同名目录下建一个celery.py from __future__ import absolute_import import os from celery import Celery from datetime import timedelta from kombu import Queue # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANG

  • Django+Celery实现动态配置定时任务的方法示例

    哈喽,今天给大家分享一篇Django+Celery实现动态配置定时任务,因为最近也是无意间看到一位大佬关于这块的文章,然后自己觉得不错,也想学习写一下,然后最终实现功能是在前端页面统一管理计划任务,大家可以在admin管理页面设置,也可以在自己写的前端页面删除添加编辑,实时生效,还可以监控这些监控任务是否运行成功失败. 补充:如果大家对celery不熟悉的话,建议先学习celery 一.安装 1.在Linux系统上安装模块 celery (3.1.26.post2) celery-with-re

  • Python Celery定时任务详细讲解

    目录 前言 一.Celery定时任务是什么? 二.使用步骤 1.代码结构 2.启动定时任务 3.执行结果 总结 前言 Celery在python中的应用除了实现异步任务(async task)外也可以执行定时任务(beat) 一.Celery定时任务是什么? Celery默认任务单元由任务生产者触发,但有时可能需要其自动触发,而Beat进程正是负责此类任务,能够自动触发定时/周期性任务. Celery 进行周期任务也很简单,只需要在配置中配置好周期任务,然后在运行一个周期任务触发器(beat)即

  • Django+Celery实现定时任务的示例

    目录 一.前言 二.配置使用 定义与触发任务 扩展 三.Django中使用定时任务 二次开发 一.前言 Celery是一个基于python开发的分布式任务队列,而做python WEB开发最为流行的框架莫属Django,但是Django的请求处理过程都是同步的无法实现异步任务,若要实现异步任务处理需要通过其他方式(前端的一般解决方案是ajax操作),而后台Celery就是不错的选择.倘若一个用户在执行某些操作需要等待很久才返回,这大大降低了网站的吞吐量. ​另一方面,当我们需要处理一些定时任务时

  • python基于celery实现异步任务周期任务定时任务

    这篇文章主要介绍了python基于celery实现异步任务周期任务定时任务,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 hello, 小伙伴们, 好久不更新了,这一次带来的是celery在python中的应用以及设置异步任务周期任务和定时任务的步骤,希望能给入坑的你带来些许帮助. 首先是对celery的介绍,Celery其实是一个专注于实时处理和调度任务的分布式任务队列,同时提供操作和维护分布式系统所需要的全部数据, 因此可以用它提供的接口快

  • python Celery定时任务的示例

    本文介绍了python Celery定时任务的示例,分享给大家,具体如下: 配置 启用Celery的定时任务需要设置CELERYBEAT_SCHEDULE . Celery的定时任务都由celery beat来进行调度.celery beat默认按照settings.py之中的时区时间来调度定时任务. 创建定时任务 一种创建定时任务的方式是配置CELERYBEAT_SCHEDULE: #每30秒调用task.add from datetime import timedelta CELERYBEA

  • celery实现动态设置定时任务

    本文实例为大家分享了celery动态设置定时任务的具体代码,供大家参考,具体内容如下 首先celery是一种异步任务队列,如果还不熟悉这个开源软件的请先看看官方文档,快速入门. 这里讲的动态设置定时任务的方法不使用数据库保存定时任务的信息,所以是项目重启后定时任务配置就会丢失,如果想保存成永久配置,可以考虑保存到数据库.redis或者使用pickle.json保存成文件,在项目启动时自动载入. 方法原理介绍 先来看一下celery的beat运行过程. 上图是beat的主要组成结构,beat中包含

  • Python Celery动态添加定时任务生产实践指南

    目录 一.背景 二.Celery动态添加定时任务的官方文档 三.celery简单实用 3.1 基础环境配置 3.2 测试使用Celery应用 四.配置backend存储任务执行结果 四.优化Celery目录结构 五.开始使用django-celery-beat调度器 六.具体操作演练 6.1 创建基于间隔时间的周期性任务 6.2 创建一个不带参数的周期性间隔任务 6.3 周期性任务的查询.删除操作 总结 一.背景 实际工作中会有一些耗时的异步任务需要使用定时调度,比如发送邮件,拉取数据,执行定时

  • 关于Django使用 django-celery-beat动态添加定时任务的方法

    版本信息 # 插件安装 Django==2.2.2 django-celery-beat==2.1.0 django-redis==4.8.0 mysqlclient==2.0.0 django-mysql==3.2.0 redis==3.2.1 uWSGI==2.0.17.1 django-redis-cache==2.1.0 安装与配置 安装上面的对应的celery版本 配置settings.py # django时区配置 TIME_ZONE = 'Asia/Shanghai' # 如果US

  • Python实现动态添加类的属性或成员函数的解决方法

    某些时候我们需要让类动态的添加属性或方法,比如我们在做插件时就可以采用这种方法.用一个配置文件指定需要加载的模块,可以根据业务扩展任意加入需要的模块. 本文就此简述了Python实现动态添加类的属性或成员函数的解决方法,具体方法如下: 首先我们可以参考ulipad的实现:mixin. 这里做的比较简单,只是声明一个类,类初始化的时候读取配置文件,根据配置列表加载特定目录下的模块下的函数,函数和模块同名,将此函数动态加载为类的成员函数. 代码如下所示: class WinBAS(Bas): def

  • Python SqlAlchemy动态添加数据表字段实例解析

    本文研究的主要是Python SqlAlchemy动态添加数据表字段,具体如下. 我们知道使用SqlAlchemy创建类来映射数据表,类属性等于数据库字段,但有时候要在我们创建表的时候,不确定数据表字段数量,遇到这种情况,应如何解决? 先看常规用法 from sqlalchemy import create_engine,Column,String,Integer class Mybase(Base): #表名 __tablename__ ='mycars' #字段,属性 myid=Column

  • Python实现动态添加属性和方法操作示例

    本文实例讲述了Python实现动态添加属性和方法操作.分享给大家供大家参考,具体如下: # -*- coding:utf-8 -*- #!python3 class Person(): def __init__(self, name, age): self.name = name self.age = age p1 = Person('ff', '28') print(p1.name, p1.age) # 给实例对象动态添加sex属性 p1.sex = 'female' print(p1.sex

  • Spring动态添加定时任务的实现思路

    一.背景 在工作中,有些时候我们有些定时任务的执行可能是需要动态修改的,比如: 生成报表,有些项目配置每天的8点生成,有些项目配置每天的10点生成,像这种动态的任务执行时间,在不考虑分布式执行的情况下,我们可以 使用 Spring Task来简单的实现. 二.需求和实现思路 1.能够动态的添加一个定时任务. 在Spring中存在一个类ThreadPoolTaskScheduler,它可以实现根据一个cron表达式来调度一个任务,并返回一个ScheduledFuture对象. 2.能够取消定时任务

  • SpringBoot 实现动态添加定时任务功能

    目录 代码结构 1.配置类 2.定时任务类型枚举 3.实际执行任务实现类 4.定时任务包装器 5.任务注册器(核心) 6.使用 最后 最近的需求有一个自动发布的功能, 需要做到每次提交都要动态的添加一个定时任务 代码结构 1. 配置类 package com.orion.ops.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conf

  • 在python的类中动态添加属性与生成对象

    本文将通过一下几个方面来一一进行解决 1.程序的主要功能 2.实现过程 3.类的定义 4.用生成器generator动态更新每个对象并返回对象 5.使用strip 去除不必要的字符 6.rematch匹配字符串 7.使用timestrptime提取字符串转化为时间对象 8.完整代码 程序的主要功能 现在有个存储用户信息的像表格一样的文档:第一行是属性,各个属性用逗号(,)分隔,从第二行开始每行是各个属性对应的值,每行代表一个用户.如何实现读入这个文档,每行输出一个用户对象呢? 另外还有4个小要求

随机推荐