使用celery和Django处理异步任务的流程分析

介绍

我们可能需要一些可以安排一些任务并定期运行一些任务或异步处理长任务的东西,而这一切都可以通过在Django Project中使用Celery来实现。

什么是Celery?

Celery是 一个专注于实时处理的任务队列,它还支持任务调度。 Celery快速,简单,高度可用且灵活。

Celery需要消息传输来发送和接收消息,这可以由Redis或RabbitMQ完成。

入门

让我们开始在您的virtualenv中安装Celery软件包。

安装Celery

<span class="nv">$ </span>pip <span class="nb">install </span>celery
pip install celery

安装Redis

我们将Message Broker用作Redis,所以我们安装

Linux / Mac用户

您可以从这里下载最新版本

$ wget http://download.redis.io/releases/redis-4.0.8.tar.gz
$ tar xzf redis-4.0.8.tar.gz
$ cd redis-4.0.8
$make

Windows用户

对于Windows用户,您可以从此处获取redis的可执行文件。

安装后,请尝试是否正确安装。

$ redis-cli ping

它应该显示:

pong

同时安装redis的python包

$ pip install redis

Django的第一步

现在您已经成功安装了软件包,现在就开始学习Django Project

settings.py Add some of the setting configuration in your settings.py CELERY_BROKER_URL = 'redis://localhost:6379' CELERY_RESULT_BACKEND = 'redis://localhost:6379' CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = "YOUR_TIMEZONE"

确保您已从YOUR_TIMEZONE更改时区。 您可以从这里获取时区

主Django项目目录中创建celery.py文件

- src/ - manage.py - celery_project/ - __init__.py - settings.py - urls.py - celery.py celery_project/celery.py

在celery.py模块中添加以下代码。 该模块用于定义celery实例。

确保已使用django项目名称更改了项目名称(<your project name>)

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '<your project name>.settings')
app = Celery('<your project name>')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
 print('Request: {0!r}'.format(self.request))
celery_project/__init__.py

然后,我们需要将定义celery.py的应用程序导入到主项目目录的__init__.py。 这样,我们可以确保在Django项目启动时已加载应用

from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ['celery_app'] 

创建任务

现在创建一些任务

在您在INSTALLED_APPS中注册的任何应用程序中创建一个新文件

my_app/tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task(name = "print_msg_with_name")
def print_message(name, *args, **kwargs):
 print("Celery is working!! {} have implemented it correctly.".format(name))
@shared_task(name = "add_2_numbers")
def add(x, y):
 print("Add function has been called!! with params {}, {}".format(x, y))
 return x+y

开始程序

打开一个NEW终端并运行以下命令以运行celery的worker实例,并将目录更改为您的主项目目录所在的位置,即,将manage.py文件放置的目录,并确保您已经 激活您的virtualenv(如果已创建)。

用您的项目名称更改项目名称

$ celery -A <your project name> worker -l info

输出:

-------------- celery@root v4.1.0 (latentcall) ---- **** ----- --- * *** * -- Linux-4.13.0-32-generic-x86_64-with-Ubuntu-17.10-artful 2018-02-17 08:09:37 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: celery_project:0x7f9039886400 - ** ---------- .> transport: redis://localhost:6379// - ** ---------- .> results: redis://localhost:6379/ - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . add_2_numbers . celery_project.celery.debug_task . print_msg_with_name [2018-02-17 08:09:37,877: INFO/MainProcess] Connected to redis://localhost:6379// [2018-02-17 08:09:37,987: INFO/MainProcess] mingle: searching for neighbors [2018-02-17 08:09:39,084: INFO/MainProcess] mingle: all alone [2018-02-17 08:09:39,121: WARNING/MainProcess] /home/jai/Desktop/demo/lib/python3.6/site-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments! warnings.warn('Using settings.DEBUG leads to a memory leak, never ' [2018-02-17 08:09:39,121: INFO/MainProcess] celery@root ready.

注意:检查上面的[tasks],它应该包含您在task.py模块中创建的任务的名称。

有关更多信息和日志,您还可以在DEBUG MODE中运行worker实例

celery <span class="nt">-A</span> <your project name> worker <span class="nt">-l</span> info <span class="nt">--loglevel</span><span class="o">=</span>DEBUG celery -A <your project name> worker -l info --loglevel=DEBUG

注意:请勿关闭此终端,应保持打开状态!!

测试任务

现在让我们从django shell运行任务打开Django shell

$ python3 manage.py shell

用delay方法运行函数:

>>> from my_app.tasks import print_message, add
>>> print_message.delay("Jai Singhal")
<AsyncResult: fe4f9787-9ee4-46da-856c-453d36556760>
>>> add.delay(10, 20)
<AsyncResult: ca5d2c50-87bc-4e87-92ad-99d6d9704c30>

当检查您的celery worker实例正在运行的第二个终端时,您将获得此类型的输出,显示您的任务已收到且任务已成功完成

[2018-02-17 08:12:14,375: INFO/MainProcess] Received task: my_app.tasks.print_message[fe4f9787-9ee4-46da-856c-453d36556760] [2018-02-17 08:12:14,377: WARNING/ForkPoolWorker-4] Celery is working!! Jai Singhal have implemented it correctly. [2018-02-17 08:12:14,382: INFO/ForkPoolWorker-4] Task my_app.tasks.print_message[fe4f9787-9ee4-46da-856c-453d36556760] succeeded in 0.004476275000342866s: None [2018-02-17 08:12:28,344: INFO/MainProcess] Received task: my_app.tasks.add[ca5d2c50-87bc-4e87-92ad-99d6d9704c30] [2018-02-17 08:12:28,349: WARNING/ForkPoolWorker-3] Add function has been called!! with params 10, 20 [2018-02-17 08:12:28,358: INFO/ForkPoolWorker-3] Task my_app.tasks.add[ca5d2c50-87bc-4e87-92ad-99d6d9704c30] succeeded in 0.010077004999857309s: 30

总结

以上所述是小编给大家介绍的使用celery和Django处理异步任务的流程分析,希望对大家有所帮助,也非常感谢大家对我们网站的支持!

(0)

相关推荐

  • 异步任务队列Celery在Django中的使用方法

    前段时间在Django Web平台开发中,碰到一些请求执行的任务时间较长(几分钟),为了加快用户的响应时间,因此决定采用异步任务的方式在后台执行这些任务.在同事的指引下接触了Celery这个异步任务队列框架,鉴于网上关于Celery和Django结合的文档较少,大部分也只是粗粗介绍了大概的流程,在实践过程中还是遇到了不少坑,希望记录下来帮助有需要的朋友. 一.Django中的异步请求 Django Web中从一个http请求发起,到获得响应返回html页面的流程大致如下:http请求发起 --

  • Django使用Celery异步任务队列的使用

    1 Celery简介 Celery是异步任务队列,可以独立于主进程运行,在主进程退出后,也不影响队列中的任务执行. 任务执行异常退出,重新启动后,会继续执行队列中的其他任务,同时可以缓存停止期间接收的工作任务,这个功能依赖于消息队列(MQ.Redis). 1.1 Celery原理 Celery的 架构 由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成. 消息中间件:Celery本身不提供消息服务,但

  • Django Celery异步任务队列的实现

    背景 在开发中,我们常常会遇到一些耗时任务,举个例子: 上传并解析一个 1w 条数据的 Excel 文件,最后持久化至数据库. 在我的程序中,这个任务耗时大约 6s,对于用户来说,6s 的等待已经是个灾难了. 比较好的处理方式是: 接收这个任务的请求 将这个任务添加到队列中 立即返回「操作成功,正在后台处理」的字样 后台消费这个队列,执行这个任务 我们按照这个思路,借助 Celery 进行实现. 实现 本文所使用的环境如下: Python 3.6.7 RabbitMQ 3.8 Celery 4.

  • Django异步任务之Celery的基本使用

    Celery 许多Django应用需要执行异步任务, 以便不耽误http request的执行. 我们也可以选择许多方法来完成异步任务, 使用Celery是一个比较好的选择, 因为Celery有着大量的社区支持, 能够完美的扩展, 和Django结合的也很好. Celery不仅能在Django中使用, 还能在其他地方被大量的使用. 因此一旦学会使用Celery, 我们可以很方便的在其他项目中使用它. celery 是一个用于实现异步任务的库, 在很多项目中都使用它, 它和 django 融合使用

  • Django中使用celery完成异步任务的示例代码

    本文主要介绍如何在django中用celery完成异步任务,web项目中为了提高用户体验可以对一些耗时操作放到异步队列中去执行,例如激活邮件,后台计算操作等等 当前项目环境为: django==1.11.8 celery==3.1.25 redis==2.10.6 pip==9.0.1 python3==3.5.2 django-celery==3.1.17 一,创建Django项目及celery配置 1,创建Django项目 1>打开终端输入:django-admin startproject

  • Django配置celery(非djcelery)执行异步任务和定时任务

    所有演示均基于Django2.0 celery是一个基于python开发的简单.灵活且可靠的分布式任务队列框架,支持使用任务队列的方式在分布式的机器/进程/线程上执行任务调度.采用典型的生产者-消费者模型,主要由三部分组成: 消息队列broker:broker实际上就是一个MQ队列服务,可以使用redis.rabbitmq等作为broker 处理任务的消费者workers:broker通知worker队列中有任务,worker去队列中取出任务执行,每一个worker就是一个进程 存储结果的bac

  • 使用celery执行Django串行异步任务的方法步骤

    前言 Django项目有一个耗时较长的update过程,希望在接到请求运行update过程的时候,Django应用仍能正常处理其他的请求,并且update过程要求不能并行,也不能漏掉任何一个请求 使用celery的solo模式解决 安装redis https://github.com/microsoftarchive/redis/releases 下载.msi文件安装,会直接将redis注册为windows服务 安装celery与redis依赖 pip install celery pip in

  • 使用celery和Django处理异步任务的流程分析

    介绍 我们可能需要一些可以安排一些任务并定期运行一些任务或异步处理长任务的东西,而这一切都可以通过在Django Project中使用Celery来实现. 什么是Celery? Celery是 一个专注于实时处理的任务队列,它还支持任务调度. Celery快速,简单,高度可用且灵活. Celery需要消息传输来发送和接收消息,这可以由Redis或RabbitMQ完成. 入门 让我们开始在您的virtualenv中安装Celery软件包. 安装Celery <span class="nv&q

  • Python Django中间件使用原理及流程分析

    一.什么是Django中间件 Django 中间件是用来处理Django的请求request和响应response的框架级别的钩子,它是一个轻量,低级别的插件系统,用于全局范围内改变Django的输入,输出.每个中间件组件都负责做一些特定的功能. 说的直白一点是中间件就是帮我们程序员在视图函数执行之前和执行之后都可以一些额外的操作,它是一个自定义的类,类中定义了几个方法,Django框架会在请求的特定时间去执行这些方法. 二.Django中间件的定义规则 1. 自定义中间件的规则 (1)要继承M

  • Java SpringBoot @Async实现异步任务的流程分析

    目录 1.同步任务 2.@Async 异步任务-无返回值 3.@Async 异步任务-有返回值 4.@Async + 自定义线程池 5.CompletableFuture 实现异步任务 依赖pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://

  • 详解Django中异步任务之django-celery

    Celery文档参考:http://docs.jinkan.org/docs/celery/ 参考文章:https://www.jb51.net/article/158046.htm Django中异步任务---django-celery Celery简单介绍: celery使用场景: 耗时任务定时任务 请求结果不怎么重要的 耗时任务比如:发送短信验证码我们可以先发送给客户任务状态(请求成功或失败) 请求结果重要的建议使用django实现 比如:支付 首先简单介绍一下,Celery 是一个强大的

  • python中使用Celery容联云异步发送验证码功能

    目录 1.celery异步消息队列介绍 celery应用举例 Celery有以下优点 Celery 特性 2.工作原理 ***** Celery 扮演生产者和消费者的角色 思维导图 3.异步发短信 3.1.settings同级目录下创建 celery 文件 3.2.redis及容联云配置 3.3.配置settings文件 3.4.配置 settings同级目录下 init 文件 3.5.在utils下新建一个task.py文件 3.6.接口中调用 3.7.先启动django项目 1.celery

  • Django使用Celery加redis执行异步任务的实例内容

    简单使用 安装celery及redis 定义celery任务 项目下新建tasks.py from celery import Celery # 创建一个Celery类的实例对象 app = Celery('celery_task.tasks', broker='redis://127.0.0.1:6379/8') # 定义任务函数 @app.task def send_register_active_email(message): with open("D:\\celery\\text.txt

  • 利用Celery实现Django博客PV统计功能详解

    前言 前几天给网站的文章增加了pv统计,之前只有uv统计.之前没加pv统计是觉得每个用户每访问一次文章,我都需要做一次数据库写操作实在是有损性能,毕竟从用户在the5fire博客的的一次访问来看,只需要从数据库里拿到对应的文章(通常情况下是从缓存中拿),然后返回给浏览器.写操作无意义.之前的uv,也是针对每个用户24小时内只会有一次写操作. 不过话说回来,就对于the5fire博客这么个小站点来说,就算每次访问我写十几次数据库都没啥影响,毕竟量小的可怜.但是咱们码农不是得有颗抗亿级流量的心嘛.

  • Android 异步加载图片分析总结

    研究了android从网络上异步加载图像,现总结如下: (1)由于android UI更新支持单一线程原则,所以从网络上取数据并更新到界面上,为了不阻塞主线程首先可能会想到以下方法. 在主线程中new 一个Handler对象,加载图像方法如下所示 复制代码 代码如下: private void loadImage(final String url, final int id) { handler.post(new Runnable() { public void run() { Drawable

随机推荐