Python中celery的使用

目录
  • Celery简介
  • celery的异步任务
    • 1.安装celery
    • 2.安装redis
    • 3.使用ceelry
  • Django中使用celery
    • 1.创建celery文件
    • 2.添加celery配置
    • 3.在别的应用下使用celery执行异步任务 [使用celery异步发送钉钉群消息通知]
    • 4.启动celery服务

Celery简介

  Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

  Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

  消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等。

  任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

  任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等。

  版本支持情况:

Celery version 4.0 runs on
        Python ❨2.7, 3.4, 3.5❩
        PyPy ❨5.4, 5.5❩
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

    If you're running an older version of Python, you need to be running an older version of Celery:

        Python 2.6: Celery series 3.1 or earlier.
        Python 2.5: Celery series 3.0 or earlier.
        Python 2.4 was Celery series 2.2 or earlier.

    Celery is a project with minimal funding, so we don't support Microsoft Windows. Please don't open any issues related to that platform.

  Celery多用来执行异步任务,将耗时的操作交由Celery去异步执行,比如发送邮件、短信、消息推送、音视频处理等。还可以执行定时任务,定时执行某件事情,比如Redis中的数据每天凌晨两点保存至mysql数据库,实现Redis的持久化。

celery的异步任务

celery的使用

1.安装celery

$ pip install -U celery

1)安装相关依赖

$ pip install "celery[redis,auth,msgpack]"

序列化程序

celery[auth]

用于使用auth安全序列化程序。

celery[msgpack]

用于使用 msgpack 序列化程序。

celery[redis]

使用 Redis 作为消息传输或结果后端。

2.安装redis

这里我们使用redis作为celery的broker,作为任务队列的存储和结果的存储。

对于 Redis 支持,您必须安装其他依赖项。您可以使用celery[redis] bundle一次性安装 Celery 和这些依赖项:

$ pip install -U "celery[redis]"

1)配置

配置很简单,只需配置你的 Redis 数据库的位置:

app.conf.broker_url = 'redis://localhost:6379/0'

其中 URL 的格式为:

redis://:password@hostname:port/db_number

方案后面的所有字段都是可选的,并且将默认为localhost 端口 6379,使用数据库 0。

3.使用ceelry

1)首先我们可以创建一个celery的文件夹,然后创建一个tasks.py文件

celery/tasks.py

from celery import Celery

# 第一个参数就是当前脚本的名称,backend 任务执行结果的存储地址broker 任务队列的存储地址
app = Celery('tasks', backend='redis://127.0.0.1', broker='redis://127.0.0.1')

@app.task
def add(x, y):
    return x + y

celery/run_tasks.py

from tasks import add

result = add.delay(1, 2)
print('Is task ready: %s' % result.ready())  # False说明任务还没有执行完
run_result = result.get(timeout=1)
print('task result: %s' % run_result)

print('Is task ready: %s' % result.ready())

4.启动celery

$ cd celry
$ celery -A tasks worker --loglevel=info

使用flower监控celery任务的执行情况

pip install flower

启动flower,指定我们的应用,确保你的celery是启动的。

cd celery
celery -A tasks flower --broker=redis://@localhost:6379/0

运行结果:

celery [celery args] flower [flower args].
[I 210825 10:54:00 command:152] Visit me at http://localhost:5555
[I 210825 10:54:00 command:159] Broker: redis://127.0.0.1:6379//
[I 210825 10:54:00 command:160] Registered tasks:

我们就可以通过5555端口看到celery异步任务的运行情况了

![image-20210825113106220](/Users/gelong/Library/Application Support/typora-user-images/image-20210825113106220.png)

Django中使用celery

官方地址:https://docs.celeryproject.org/en/latest/django/first-steps-with-django.html

1.创建celery文件

根据官方文档的说明,我们可以直接在Django项目同名的应用下创建celery.py文件

recruitment/recruitment/celery.py

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SEttINGS_MODULE', 'recruitment.base')  # 这里我把配置文件放到了根目录下的settings/base.py 中

app = Celery('recruitment')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

def debug_task(self):
  print(f'Request: {self.request!r}')

然后我们需要在这个celery.py文件所在的目录的__init__文件中添加:

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when/保证所有app下的任务都能导入进来
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

2.添加celery配置

settings/base.py

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYD_MAX_TASKS_PER_CHILD = 10
CELERYD_LOG_FILE = os.path.join(BASE_DIR, "logs", "celery_work.log")
CELERYBEAT_LOG_FILE = os.path.join(BASE_DIR, "logs", "celery_beat.log")

3.在别的应用下使用celery执行异步任务 [使用celery异步发送钉钉群消息通知]

1.首先我们需要在应用下创建一个tasks.py文件interview/tasks.py

from __future__ import absolute_import, unicode_literals

from celery import shared_task
from .dingtalk import send

@shared_task
def send_dingtalk_message(message):
    send(message)

interview/dingtalk.py

from dingtalkchatbot.chatbot import DingtalkChatbot

from django.conf import settings

def send(message, at_mobiles=[]):
    # 引用 settings里面配置的钉钉群消息通知的WebHook地址:
    webhook = settings.DINGTALK_WEB_HOOK

    # 初始化机器人小Y,
    xiaoY = DingtalkChatbot(webhook)

    # 方式二:勾选“加签”选项时使用(v1.5以上新功能)
    # xiaoY = DingtalkChatbot(webhook, secret=secret)

    # Text消息@所有人
    xiaoY.send_text(msg=('消息通知: %s' % message), at_mobiles=at_mobiles)

interview.views.py

from interview.tasks import send_dingtalk_message

def notify_interview(modeladmin, request, queryset):
    candidates = ''
    interviewers = ''
    for obj in queryset:
        candidates = obj.userame + '' + candidates
        interviewers = obj.first_interviewer_user + '' + interviewers
    # 这里的消息发送到钉钉, 或者通过 Celery 异步发送到钉钉
    send_dingtalk_message.delay('候选人 %s 进入面试环节, 亲爱的面试官请做好面试准备:%s。' % (candidates, interviewers))

4.启动celery服务

启动celery服务,到我们的项目根目录启动,然后执行

$ celery -A recruitment worker -l info

如果需要制定配置文件,如果在mac下可以执行:

$ DJANGO_SEttINGS_MODULE=settings.base celery --app=recruitment worker --loglevel=info

启动flower监控异步任务

$ celery -A recruitment flower --broker=redis://localhost:6379/0

celery定时任务

到此这篇关于Python中celery的使用的文章就介绍到这了,更多相关celery的使用内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Python环境下安装使用异步任务队列包Celery的基础教程

    1.简介 celery(芹菜)是一个异步任务队列/基于分布式消息传递的作业队列.它侧重于实时操作,但对调度支持也很好. celery用于生产系统每天处理数以百万计的任务. celery是用Python编写的,但该协议可以在任何语言实现.它也可以与其他语言通过webhooks实现. 建议的消息代理RabbitMQ的,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, ,和数据库(使用SQLAlchemy的或Django的 ORM) . celery是易于集成Dja

  • python3中celery异步框架简单使用+守护进程方式启动

    安装celery sudo pip install celery 实例化 celery from celery import Celery app = Celery("testapp") # 导入配置 app.config_from_object('celery_tasks.config') # 自动添加任务 app.autodiscover_tasks(["celery_tasks.test","celery_tasks.test2"]) 简单

  • Python Celery异步任务队列使用方法解析

    Celery是一个异步的任务队列(也叫做分布式任务队列),一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需要的工具. celery的优点 1:简单,容易使用,不需要配置文件 2:高可用,任务执行失败或执行过程中发生连续中断,celery会自动尝试重新执行任务 3:快速,一个单进程的celery每分钟可以处理上百万个任务 4:灵活,几乎celery的各个组件都可以被扩展 celery应用场景 1:异步发邮件,一般发邮件等比较耗时的操作,这个时候需要提交任务给cel

  • python使用celery实现异步任务执行的例子

    使用celery在django项目中实现异步发送短信 在项目的目录下创建celery_tasks用于保存celery异步任务. 在celery_tasks目录下创建config.py文件,用于保存celery的配置信息 ```broker_url = "redis://127.0.0.1/14"``` 在celery_tasks目录下创建main.py文件,用于作为celery的启动文件 from celery import Celery # 为celery使用django配置文件进行

  • python使用celery实现订单超时取消

    本文实例为大家分享了celery实现订单超时取消的具体代码,供大家参考,具体内容如下 Celery官方文档中关于定时任务使用的说明 项目目录结构 我们需要新增一个任务目录,例如order: celey_tasks/      ├── config.py      ├── __init__.py      ├── main.py      ├── order/             ├── __init__.py            └── tasks.py 在main.py中,注册任务目录[

  • python celery分布式任务队列的使用详解

    一.Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子: 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 你想做一个定时任务,比如每天检测一下你们

  • Python中celery的使用

    目录 Celery简介 celery的异步任务 1.安装celery 2.安装redis 3.使用ceelry Django中使用celery 1.创建celery文件 2.添加celery配置 3.在别的应用下使用celery执行异步任务 [使用celery异步发送钉钉群消息通知] 4.启动celery服务 Celery简介 Celery是一个简单.灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度. Celery的架构由三部分组成,消息中间件(messa

  • python中celery的基本使用详情

    目录 1.基本介绍 2.使用场景 3.工作流程和组成部分 4.Celery执行异步任务 4.1 基础使用 1.基本介绍 Celery 是由Python 编写的简单,灵活,可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统所需的工具.Celery 专注于实时任务处理,支持任务调度. 简单的说,它就是一个分布式队列的管理工具,用celery提供的接口快速实现并管理一个分布式的任务队列. 有一点我们需要搞清楚,Celery 本身并不是任务队列,它是一个分布式队列的管理工具,Celery

  • Python中定时任务框架APScheduler的快速入门指南

    前言 大家应该都知道在编程语言中,定时任务是常用的一种调度形式,在Python中也涌现了非常多的调度模块,本文将简要介绍APScheduler的基本使用方法. 一.APScheduler介绍 APScheduler是基于Quartz的一个python定时任务框架,实现了Quartz的所有功能,使用起来十分方便.提供了基于日期.固定时间间隔以及crontab类型的任务,并且可以持久化任务. APScheduler提供了多种不同的调度器,方便开发者根据自己的实际需要进行使用:同时也提供了不同的存储机

  • python基于celery实现异步任务周期任务定时任务

    这篇文章主要介绍了python基于celery实现异步任务周期任务定时任务,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 hello, 小伙伴们, 好久不更新了,这一次带来的是celery在python中的应用以及设置异步任务周期任务和定时任务的步骤,希望能给入坑的你带来些许帮助. 首先是对celery的介绍,Celery其实是一个专注于实时处理和调度任务的分布式任务队列,同时提供操作和维护分布式系统所需要的全部数据, 因此可以用它提供的接口快

  • python中的信号通信 blinker的使用小结

    目录 信号: 官方介绍: blinker 使用 命名信号 匿名信号 组播信号 接收方订阅主题 装饰器用法 可订阅主题的装饰器 检查信号是否有接收者 检查订阅者是否订阅了某个信号 基于blinker的Flask信号 简单 Flask demo 总结 信号: 信号是一种通知或者说通信的方式,信号分为发送方和接收方.发送方发送一中信号,接收方收到信号的进程会跳入信号处理函数,执行完后再跳回原来的位置继续执行.常见的linux中的信号,通过键盘输入Ctrl+C,就是发送给系统一个信号,告诉系统退出当前进

  • Django中celery的使用项目实例

    目录 1.django应用Celery 2 .项目应用 1.异步任务redis 2.定时任务 3.任务绑定 4.任务钩子 5.任务编排 6.celery管理和监控 总结 1.django应用Celery django框架请求/响应的过程是同步的,框架本身无法实现异步响应. 但是我们在项目过程中会经常会遇到一些耗时的任务, 比如:发送邮件.发送短信.大数据统计等等,这些操作耗时长,同步执行对用户体验非常不友好,那么在这种情况下就需要实现异步执行. 异步执行前端一般使用ajax,后端使用Celery

  • 简单了解Python中的几种函数

    几个特殊的函数(待补充) python是支持多种范型的语言,可以进行所谓函数式编程,其突出体现在有这么几个函数: filter.map.reduce.lambda.yield lambda >>> g = lambda x,y:x+y #x+y,并返回结果 >>> g(3,4) 7 >>> (lambda x:x**2)(4) #返回4的平方 16 lambda函数的使用方法: 在lambda后面直接跟变量 变量后面是冒号 冒号后面是表达式,表达式计算

  • 基于python中的TCP及UDP(详解)

    python中是通过套接字即socket来实现UDP及TCP通信的.有两种套接字面向连接的及无连接的,也就是TCP套接字及UDP套接字. TCP通信模型 创建TCP服务器 伪代码: ss = socket() # 创建服务器套接字 ss.bind() # 套接字与地址绑定 ss.listen() # 监听连接 inf_loop: # 服务器无限循环 cs = ss.accept() # 接受客户端连接 comm_loop: # 通信循环 cs.recv()/cs.send() # 对话(接收/发

随机推荐