如何使用Celery和Docker处理Django中的定期任务

在构建和扩展Django应用程序时,不可避免地需要定期在后台自动运行某些任务。

一些例子:

生成定期报告

清除缓存

发送批量电子邮件通知

执行每晚维护工作

这是构建和扩展不属于Django核心的Web应用程序所需的少数功能之一。幸运的是,Celery提供了一个强大的解决方案,该解决方案非常容易实现,称为Celery Beat。

在下面的文章中,我们将向您展示如何使用Docker设置Django,Celery和Redis,以便通过Celery Beat定期运行自定义Django Admin命令。

依存关系:

Django v3.0.5

Docker v19.03.8

Python v3.8.2

芹菜v4.4.1

Redis v5.0.8

Django + Celery系列:

Django和Celery的异步任务

使用Celery和Docker处理Django中的定期任务(本文!)

目标

在本教程结束时,您应该能够:

使用Docker容器化Django,Celery和Redis

将Celery集成到Django应用中并创建任务

编写自定义Django Admin命令

安排自定义Django Admin命令以通过Celery Beat定期运行

项目设置

django-celery-beat存储库中克隆基础项目,然后签出基础分支:

$ git clone
https://github.com/testdrivenio/django-celery-beat
--branch base --single-branch
$ cd django-celery-beat

由于我们总共需要管理四个流程(Django,Redis,worker和Scheduler),因此我们将使用Docker通过连接起来简化它们的工作流程,从而使它们都可以通过一个命令从一个终端窗口运行 。

从项目根目录创建映像,并启动Docker容器:

$ docker-compose up -d --build
$ docker-compose exec web python manage.py migrate

构建完成后,导航至http:// localhost:1337以确保该应用程序能够按预期运行。 您应该看到以下文本:

Orders
 No orders found!

项目结构:

├── .gitignore
├── docker-compose.yml
└── project
    ├── Dockerfile
    ├── core
    │   ├── __init__.py
    │   ├── asgi.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── entrypoint.sh
    ├── manage.py
    ├── orders
    │   ├── __init__.py
    │   ├── admin.py
    │   ├── apps.py
    │   ├── migrations
    │   │   ├── 0001_initial.py
    │   │   └── __init__.py
    │   ├── models.py
    │   ├── tests.py
    │   ├── urls.py
    │   └── views.py
    ├── requirements.txt
    └── templates
        └── orders
            └── order_list.html

Celery和Redis

现在,我们需要为Celery,Celery Beat和Redis添加容器。

首先,将依赖项添加到requirements.txt文件中:

Django==3.0.5
celery==4.4.1
redis==3.4.1

docker-compose.yml文件内容:

redis:
 image: redis:alpine
celery:
 build: ./project
 command: celery -A core worker -l info
 volumes:
 - ./project/:/usr/src/app/
 environment:
 - DEBUG=1
 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
 - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
 depends_on:
 - redis
celery-beat:
 build: ./project
 command: celery -A core beat -l info
 volumes:
 - ./project/:/usr/src/app/
 environment:
 - DEBUG=1
 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
 - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
 depends_on:
 - redis

我们还需要更新Web服务的depends_on部分:

web:
 build: ./project
 command: python manage.py runserver 0.0.0.0:8000
 volumes:
 - ./project/:/usr/src/app/
 ports:
 - 1337:8000
 environment:
 - DEBUG=1
 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
 - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
 depends_on:
 - redis # NEW

完整的docker-compose文件如下:

version: '3.7'

services:
 web:
 build: ./project
 command: python manage.py runserver 0.0.0.0:8000
 volumes:
 - ./project/:/usr/src/app/
 ports:
 - 1337:8000
 environment:
 - DEBUG=1
 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
 - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
 depends_on:
 - redis
 redis:
 image: redis:alpine
 celery:
 build: ./project
 command: celery -A core worker -l info
 volumes:
 - ./project/:/usr/src/app/
 environment:
 - DEBUG=1
 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
 - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
 depends_on:
 - redis
 celery-beat:
 build: ./project
 command: celery -A core beat -l info
 volumes:
 - ./project/:/usr/src/app/
 environment:
 - DEBUG=1
 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
 - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
 depends_on:
 - redis

在构建新容器之前,我们需要在Django应用中配置Celery。

芹菜配置

设定

在“核心”目录中,创建一个celery.py文件并添加以下代码:

import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")

app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

这里发生了什么事?

首先,我们为DJANGO_SETTINGS_MODULE环境变量设置一个默认值,以便Celery知道如何找到Django项目。

接下来,我们创建了一个名称为core的新Celery实例,并将该值分配给名为app的变量。

然后,我们从django.conf的settings对象中加载了celery配置值。 我们使用namespace =“ CELERY”来防止与其他Django设置发生冲突。 换句话说,Celery的所有配置设置必须以CELERY_为前缀。

最后,app.autodiscover_tasks()告诉Celery从settings.INSTALLED_APPS中定义的应用程序中查找Celery任务。

将以下代码添加到core / __ init__.py:

from .celery import app as celery_app

__all__ = ("celery_app",)

最后,使用以下Celery设置更新core / settings.py文件,使其可以连接到Redis:

CELERY_BROKER_URL = "redis://redis:6379"
CELERY_RESULT_BACKEND = "redis://redis:6379"

build:

$ docker-compose up -d --build

查看日志:

$ docker-compose logs 'web'
$ docker-compose logs 'celery'
$ docker-compose logs 'celery-beat'
$ docker-compose logs 'redis'

如果一切顺利,我们现在有四个容器,每个容器提供不同的服务。

现在,我们准备创建一个示例任务,以查看其是否可以正常工作。

创建一个任务

创建一个新文件core / tasks.py并为仅打印到控制台的示例任务添加以下代码:

from celery import shared_task

@shared_task
def sample_task():
 print("The sample task just ran.")

安排任务

在settings.py文件的末尾,添加以下代码,以使用Celery Beat将sample_task安排为每分钟运行一次:

CELERY_BEAT_SCHEDULE = {
 "sample_task": {
 "task": "core.tasks.sample_task",
 "schedule": crontab(minute="*/1"),
 },
}

在这里,我们使用CELERY_BEAT_SCHEDULE设置定义了定期任务。 我们给任务命名了sample_task,然后声明了两个设置:

任务声明要运行的任务。

时间表设置任务应运行的时间间隔。 这可以是整数,时间增量或crontab。 我们在任务中使用了crontab模式,告诉它每分钟运行一次。 您可以在此处找到有关Celery日程安排的更多信息。

确保添加导入:

from celery.schedules import crontab

import core.tasks

重启容器,应用变更:

$ docker-compose up -d --build

查看日志:

$ docker-compose logs -f 'celery'
celery_1 | -------------- [queues]
celery_1 | .> celery exchange=celery(direct) key=celery
celery_1 |
celery_1 |
celery_1 | [tasks]
celery_1 | . core.tasks.sample_task

我们可以看到Celery获得了示例任务core.tasks.sample_task。

每分钟,您应该在日志中看到一行以“示例任务刚刚运行”结尾的行:

celery_1  | [2020-04-15 22:49:00,003: INFO/MainProcess]
              Received task: core.tasks.sample_task[8ee5a84f-c54b-4e41-945b-645765e7b20a]
celery_1  | [2020-04-15 22:49:00,007: WARNING/ForkPoolWorker-1] The sample task just ran.

自定义Django Admin命令

Django提供了许多内置的django-admin命令,例如:

迁移

启动项目

startapp

转储数据

移民

除了内置命令,Django还为我们提供了创建自己的自定义命令的选项:

自定义管理命令对于运行独立脚本或从UNIX crontab或Windows计划任务控制面板定期执行的脚本特别有用。

因此,我们将首先配置一个新命令,然后使用Celery Beat自动运行它。

首先创建一个名为orders / management / commands / my_custom_command.py的新文件。 然后,添加运行它所需的最少代码:

from django.core.management.base import BaseCommand, CommandError

class Command(BaseCommand):
 help = "A description of the command"

 def handle(self, *args, **options):
 pass

BaseCommand有一些可以被覆盖的方法,但是唯一需要的方法是handle。 handle是自定义命令的入口点。 换句话说,当我们运行命令时,将调用此方法。

为了进行测试,我们通常只添加一个快速打印语句。 但是,建议根据Django文档使用stdout.write代替:

当您使用管理命令并希望提供控制台输出时,应该写入self.stdout和self.stderr,而不是直接打印到stdout和stderr。 通过使用这些代理,测试自定义命令变得更加容易。 另请注意,您无需以换行符结束消息,除非您指定结束参数,否则它将自动添加。

因此,添加一个self.stdout.write命令:

from django.core.management.base import BaseCommand, CommandError

class Command(BaseCommand):
 help = "A description of the command"

 def handle(self, *args, **options):
 self.stdout.write("My sample command just ran.") # NEW

测试:

$ docker-compose exec web python manage.py my_custom_command
My sample command just ran.

这样,让我们将所有内容捆绑在一起!

使用Celery Beat安排自定义命令

现在我们已经启动并运行了容器,已经过测试,可以安排任务定期运行,并编写了自定义的Django Admin示例命令,现在该进行设置以定期运行自定义命令了。

设定

在项目中,我们有一个非常基本的应用程序,称为订单。 它包含两个模型,产品和订单。 让我们创建一个自定义命令,该命令从当天发送确认订单的电子邮件报告。

首先,我们将通过此项目中包含的夹具将一些产品和订单添加到数据库中:

$ docker-compose exec web python manage.py loaddata products.json

创建超级用户:

$ docker-compose exec web python manage.py createsuperuser

出现提示时,请填写用户名,电子邮件和密码。 然后在您的Web浏览器中导航到http://127.0.0.1:1337/admin。 使用您刚创建的超级用户登录,并创建几个订单。 确保至少有一个日期为今天。

让我们为我们的电子邮件报告创建一个新的自定义命令。

创建一个名为orders / management / commands / email_report.py的文件:

from datetime import timedelta, time, datetime

from django.core.mail import mail_admins
from django.core.management import BaseCommand
from django.utils import timezone
from django.utils.timezone import make_aware

from orders.models import Order

today = timezone.now()
tomorrow = today + timedelta(1)
today_start = make_aware(datetime.combine(today, time()))
today_end = make_aware(datetime.combine(tomorrow, time()))

class Command(BaseCommand):
 help = "Send Today's Orders Report to Admins"

 def handle(self, *args, **options):
 orders = Order.objects.filter(confirmed_date__range=(today_start, today_end))

 if orders:
 message = ""

 for order in orders:
 message += f"{order} \n"

 subject = (
 f"Order Report for {today_start.strftime('%Y-%m-%d')} "
 f"to {today_end.strftime('%Y-%m-%d')}"
 )

 mail_admins(subject=subject, message=message, html_message=None)

 self.stdout.write("E-mail Report was sent.")
 else:
 self.stdout.write("No orders confirmed today.")

在代码中,我们向数据库查询了日期为Confirmed_date的订单,将订单合并为电子邮件正文的单个消息,然后使用Django内置的mail_admins命令将电子邮件发送给管理员。

添加一个虚拟管理员电子邮件,并将EMAIL_BACKEND设置为使用控制台后端,以便将该电子邮件发送到设置文件中的stdout:

EMAIL_BACKEND = "django.core.mail.backends.console.EmailBackend"
DEFAULT_FROM_EMAIL = "noreply@email.com"
ADMINS = [("testuser", "test.user@email.com"), ]

运行:

$ docker-compose exec web python manage.py email_report
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Subject: [Django] Order Report for 2020-04-15 to 2020-04-16
From: root@localhost
To: test.user@email.com
Date: Wed, 15 Apr 2020 23:10:45 -0000
Message-ID: <158699224565.85.8278261495663971825@5ce6313185d3>
 
Order: 337ef21c-5f53-4761-9f81-07945de385ae - product: Rice
 
-------------------------------------------------------------------------------
E-mail Report was sent.

Celery Beat

现在,我们需要创建一个定期任务来每天运行此命令。

向core / tasks.py添加一个新任务:

from celery import shared_task
from django.core.management import call_command # NEW

@shared_task
def sample_task():
 print("The sample task just ran.")

# NEW
@shared_task
def send_email_report():
 call_command("email_report", )

因此,首先我们添加了一个call_command导入,该导入用于以编程方式调用django-admin命令。 在新任务中,然后将call_command与自定义命令的名称一起用作参数。

要安排此任务,请打开core / settings.py文件,并更新CELERY_BEAT_SCHEDULE设置以包括新任务。

CELERY_BEAT_SCHEDULE = {
 "sample_task": {
 "task": "core.tasks.sample_task",
 "schedule": crontab(minute="*/1"),
 },
 "send_email_report": {
 "task": "core.tasks.send_email_report",
 "schedule": crontab(hour="*/1"),
 },
}

在这里,我们向CELERY_BEAT_SCHEDULE添加了一个名为send_email_report的新条目。 正如我们对上一个任务所做的那样,我们声明了该任务应运行的任务-例如core.tasks.send_email_report-并使用crontab模式设置重复性。

重新启动容器,以确保新设置处于活动状态:

$ docker-compose up -d --build
看日志:
$ docker-compose logs -f 'celery'
celery_1 | -------------- [queues]
celery_1 | .> celery exchange=celery(direct) key=celery
celery_1 |
celery_1 |
celery_1 | [tasks]
celery_1 | . core.tasks.sample_task
celery_1 | . core.tasks.send_email_report

一分钟后邮件发出:

celery_1  | [2020-04-15 23:20:00,309: WARNING/ForkPoolWorker-1] Content-Type: text/plain; charset="utf-8"
celery_1  | MIME-Version: 1.0
celery_1  | Content-Transfer-Encoding: 7bit
celery_1  | Subject: [Django] Order Report for 2020-04-15 to 2020-04-16
celery_1  | From: root@localhost
celery_1  | To: test.user@email.com
celery_1  | Date: Wed, 15 Apr 2020 23:20:00 -0000
celery_1  | Message-ID: <158699280030.12.8934112422500683251@42481c198b77>
celery_1  |
celery_1  | Order: 337ef21c-5f53-4761-9f81-07945de385ae - product: Rice
celery_1  | [2020-04-15 23:20:00,310: WARNING/ForkPoolWorker-1] -------------------------------------------------------------------------------
celery_1  | [2020-04-15 23:20:00,312: WARNING/ForkPoolWorker-1] E-mail Report was sent.

结论

在本文中,我们指导您为Celery,Celery Beat和Redis设置Docker容器。 然后,我们展示了如何使用Celery Beat创建自定义Django Admin命令和定期任务以自动运行该命令。

原文:https://testdriven.io/blog/django-celery-periodic-tasks/

到此这篇关于如何使用Celery和Docker处理Django中的定期任务的文章就介绍到这了,更多相关Celery Docker 处理Django定期任务内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • django定期执行任务(实例讲解)

    要在django项目中定期执行任务,比如每天一定的时间点抓取数据,刷新数据库等,可以参考stackoverflow的方法,先编写一个manage.py命令,然后使用crontab来定时执行这个命令. 定制manage.py命令 app可以使用manage.py注册自己的命令,比如要在polls这个app中定制一个closepoll命令,要先向polls文件夹中添加一个management/commands的目录: polls/ __init__.py models.py management/

  • 如何使用Celery和Docker处理Django中的定期任务

    在构建和扩展Django应用程序时,不可避免地需要定期在后台自动运行某些任务. 一些例子: 生成定期报告 清除缓存 发送批量电子邮件通知 执行每晚维护工作 这是构建和扩展不属于Django核心的Web应用程序所需的少数功能之一.幸运的是,Celery提供了一个强大的解决方案,该解决方案非常容易实现,称为Celery Beat. 在下面的文章中,我们将向您展示如何使用Docker设置Django,Celery和Redis,以便通过Celery Beat定期运行自定义Django Admin命令.

  • Django中celery执行任务结果的保存方法

    如下所示: pip3 install django-celery-results INSTALLED_APPS = ( ..., 'django_celery_results',) # 注意这个是下划线'_' python3 manage.py migrate django_celery_results CELERY_RESULT_BACKEND = 'django-db' #在settings.py文件中配置 注意异步任务views.py中调用时,想要记录结果必须是"任务函数.delay(*a

  • django中使用Celery 布式任务队列过程详解

    本文记录django中如何使用celery完成异步任务. Celery 是一个简单.灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具. 它是一个专注于实时处理的任务队列,同时也支持任务调度. 官方网站 中文文档 示例一:用户发起request,并等待response返回.在本些views中,可能需要执行一段耗时的程序,那么用户就会等待很长时间,造成不好的用户体验 示例二:网站每小时需要同步一次天气预报信息,但是http是请求触发的,难道要一小时请求一次吗? 使用cele

  • Django中使用Celery的教程详解

    Django教程 Python下有许多款不同的 Web 框架.Django是重量级选手中最有代表性的一位.许多成功的网站和APP都基于Django. Django是一个开放源代码的Web应用框架,由Python写成. Django遵守BSD版权,初次发布于2005年7月, 并于2008年9月发布了第一个正式版本1.0 . Django采用了MVC的软件设计模式,即模型M,视图V和控制器C. 一.前言 Celery是一个基于python开发的分布式任务队列,如果不了解请阅读笔者上一篇博文Celer

  • Django中使用Celery的方法示例

    起步 在 <分布式任务队列Celery使用说明> 中介绍了在 Python 中使用 Celery 来实验异步任务和定时任务功能.本文介绍如何在 Django 中使用 Celery. 安装 pip install django-celery 这个命令使用的依赖是 Celery 3.x 的版本,所以会把我之前安装的 4.x 卸载,不过对功能上并没有什么影响.我们也完全可以仅用Celery在django中使用,但使用 django-celery 模块能更好的管理 celery. 使用 可以把有关 C

  • 异步任务队列Celery在Django中的使用方法

    前段时间在Django Web平台开发中,碰到一些请求执行的任务时间较长(几分钟),为了加快用户的响应时间,因此决定采用异步任务的方式在后台执行这些任务.在同事的指引下接触了Celery这个异步任务队列框架,鉴于网上关于Celery和Django结合的文档较少,大部分也只是粗粗介绍了大概的流程,在实践过程中还是遇到了不少坑,希望记录下来帮助有需要的朋友. 一.Django中的异步请求 Django Web中从一个http请求发起,到获得响应返回html页面的流程大致如下:http请求发起 --

  • Django中使用Celery的方法步骤

    (一).概述 Celery是一个简单.灵活和可靠的基于多任务的分布式系统,为运营提供用于维护此系统的工具.专注于实时处理的任务队列,同时也支持任务的调度.执行单元为任务(task),利用多线程这些任务可以被并发的在单个或多个职程(worker)上运行. Celery通过消息机制通信,通常通过中间人(broker)来分配和调节客户端与职程服务器(worker)之间的通信.客户端发送一条消息,中间人把消息分配给一个职程,最后由职程来负责执行此任务. Celery可以有多个职程和中间人,这样提高了高可

  • Django中如何使用celery异步发送短信验证码详解

    目录 1.celery介绍 1.1 celery应用举例 1.2 Celery有以下优点 1.3 Celery 特性 2.工作原理 2.1 Celery 扮演生产者和消费者的角色 3.异步发短信 总结 1.celery介绍 1.1 celery应用举例 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,如果你的业务场景中需要用到异步任务,就可以考虑使用celery 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着

  • Django中celery的使用项目实例

    目录 1.django应用Celery 2 .项目应用 1.异步任务redis 2.定时任务 3.任务绑定 4.任务钩子 5.任务编排 6.celery管理和监控 总结 1.django应用Celery django框架请求/响应的过程是同步的,框架本身无法实现异步响应. 但是我们在项目过程中会经常会遇到一些耗时的任务, 比如:发送邮件.发送短信.大数据统计等等,这些操作耗时长,同步执行对用户体验非常不友好,那么在这种情况下就需要实现异步执行. 异步执行前端一般使用ajax,后端使用Celery

  • 使用docker部署django技术栈项目的方法步骤

    随着Docker的普及成熟,已经逐渐成为部署项目的首选,今天来和大家分享下如何使用docker部署django技术栈项目. 我们这里说的Django技术栈为:python3.6.Django2.2.redis.mysql.celery.gunicorn和nginx.在实际的生产项目中,这些组件分布在集群的不同机器,如Nginx.redis和Mysql可能会有单独的团队或部门负责.涉及的部署架构和容器的编排会更为复杂,本文暂不去深究.本文主要介绍,如何使用 docker-compose 来编排这些

随机推荐