通过celery异步处理一个查询任务的完整代码

今天介绍通过celery实现一个异步任务。有这样一个需求,前端发起一个查询的请求,但是发起查询后,查询可能不会立即返回结果。这时候,发起查询后,后端可以把这次查询当作一个task,并立即返回一个能唯一表明该task的值,如taskID(用户后面可以通过这个taskID 随时查看结果),用户收到这个taskID后,可以转去处理其他任务,而不必一直等待查询结果。后端API调用celery来处理这个task,并将结果值保存在一个csv文件中,后面用户通过taskID 查询时返回结果。

def application(environ,start_response):
  """部分代码省略"""
  query_string = environ['QUERY_STRING']
  serviceGroupName = ""
  for getParam in query_string.split("&"):
    params = getParam.split("=")
    resultInfo = ""
    if params[0] == "type":
      alertType = params[1]
    elif params[0] == "projectName":
      projectName = params[1]
    elif params[0] == "serviceGroupName":
      serviceGroupName = params[1]
    else:
      resultInfo = error_info(-1, "GET参数只能为type=<?>&projectName=<?>&serviceGroupName=<?>;必须指定三个参数", {})
    return [resultInfo]
  taskId = 1
  result_file_name = '/var/www/dba_api/api/test/'+ str(taskId) + '.csv'
  contentInfo = json.dumps({"taskId":1,"opType":"continue","serviceGroupName":serviceGroupName,"dbHost":dbHost,"dbPasswd":dbPasswd,"dbUser":dbUser,"dbPort":dbPort})
  result = getServiceInfo.apply_async((contentInfo,),queue="getServiceInfo")
  taskInfo = "任务已经创建,详情请查看:http://10.4.34.254/api/task?taskId=%s"% (taskId)
  return [resultInfo]

getServiceInfo.apply_async((contentInfo,),queue=”getServiceInfo”),重点是这一行,apply_async()方法会返回一个AsyncResult实例,通过这个实例可以跟踪任务状态轨迹。

要使用此功能,需要提供结果后台(result backend),这样才有地方存储任务状态等信息。其中,getServiceInfo是自定义的一个task,后续会介绍到,contentInfo是传递的一个参数,queue是指定队列名称。

上面这个函数的原型如下:

task.apply_async(args[, kwargs[, …]])

其中 args 和 kwargs 分别是 task 接收的参数,当然它也接受额外的参数对任务进行控制。

在 Celery 中执行任务的方法一共有三种:

1. delay, 用来进行最简单便捷的任务执行(delay在第3小节的测试中使用过,它可以看作是apply_async的一个快捷方式);

2. apply_async, 对于任务的执行附加额外的参数,对任务进行控制;

3. app.send_task, 可以执行未在 Celery 中进行注册的任务。

celery文件配置

在python的库存放路径中(一般是/usr/lib/python2.6/site-packages),创建一个文件夹proj,进入proj目录,创建三个文件,init,将proj声明一个python包,celepy,其内容如下:

#_*_ coding:utf-8 _*_
from __future__ import absolute_import
from celery import Celery

app = Celery("proj",
broker="amqp://user:password@localhost//",
backend="amqp",
include=["proj.tasks"]
)
app.conf.update(
CELERY_ROUTES={
"proj.tasks.getServerInfo":{"queue":"getServerInfo"},
}
)
if __name__=="__main__":
  app.start()

这里我们定义了模块名称proj以及celery 路由。

还有一个文件,task.py

#_*_ coding:utf-8 _*_i
from __future__ import absolute_import
from proj.celery import app
import random
import simplejson as json
import types
import time
import MySQLdb
import urllib2
import ConfigParser as cparser
import hmac
import hashlib
import base64
@app.task
def getServiceInfo(contentInfo):
  contentInfo = json.loads(contentInfo)
  serviceGroupName = contentInfo['serviceGroupName']

  dbHost = contentInfo['dbHost']
  dbPort = int(contentInfo['dbPort'])
  dbUser = contentInfo['dbUser']
  dbPasswd = contentInfo['dbPasswd']
  msgLib = MessageLib.MessageLib()
  Sql = "Your SQL"
  #第三步:连接数据库,执行代码逻辑
  try:
    db_connection = MySQLdb.connect(host=dbHost, port=dbPort, passwd=dbPasswd, db="cmdb", user=dbUser, connect_timeout=2, charset="utf8")
    cursor = db_connection.cursor()
    cursor.execute(getServiceGroupHostSql)
    row = cursor.fetchall()
    result = []
    for line in row:
      ...
      result.append(tempMysqlHighInfo)

  resultInfo = msgLib.success_info(result)
  return resultInfo
  except Exception, e:
    raise
    errorInfo = "dbhost:%s, port:%s, error:%s" % (dbHost, dbPort, str(e))
    #return getServiceGroupHostSql,errorInfo
    return msgLib.error_info(-1, errorInfo, {})

启动celery

celery -A proj worker -Q getServiceInfo -l debug -c 6

最后,写一个结果,专门获取查询结果的结果,传入的参数为taskID,部分代码如下:

def application(environ,start_response):
  status = '400 ERROR'
  response_headers = [('Content-type', 'application/json;charset=utf-8')]
  start_response(status, response_headers)

  status = '200 OK'
  response_headers = [('Content-type', 'application/json;charset=utf-8')]
  start_response(status, response_headers)

  if environ['REQUEST_METHOD'] != "GET":
    resultInfo = msgLib.error_info(-1, "http请求类型不是GET", {})
  return [resultInfo]

  query_string = environ['QUERY_STRING']
  serviceGroupName = ""
  for getParam in query_string.split("&"):
    params = getParam.split("=")
    resultInfo = ""
    if params[0] == "taskId":
      taskId = params[1]
    else:
      resultInfo = msgLib.error_info(-1, "GET参数无比指定taskId这个参数", {})
    return [resultInfo]
  logging.info(query_string)
  result_file_name = '/var/www/dba_api/api/test/'+ str(taskId) + '.csv'
  result = []
  try:
    with open (result_file_name,'rb') as fp:
    lines = csv.reader(fp)
    for line in lines :
    result.append(line)
    resultInfo = msgLib.success_info(result)
  return resultInfo
  except Exception, e:
  errorInfo = "some thing wrong"
  return msgLib.error_info(-1, errorInfo, {})

以上这篇通过celery异步处理一个查询任务的完整代码就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • Django异步任务之Celery的基本使用

    Celery 许多Django应用需要执行异步任务, 以便不耽误http request的执行. 我们也可以选择许多方法来完成异步任务, 使用Celery是一个比较好的选择, 因为Celery有着大量的社区支持, 能够完美的扩展, 和Django结合的也很好. Celery不仅能在Django中使用, 还能在其他地方被大量的使用. 因此一旦学会使用Celery, 我们可以很方便的在其他项目中使用它. celery 是一个用于实现异步任务的库, 在很多项目中都使用它, 它和 django 融合使用

  • Django中使用celery完成异步任务的示例代码

    本文主要介绍如何在django中用celery完成异步任务,web项目中为了提高用户体验可以对一些耗时操作放到异步队列中去执行,例如激活邮件,后台计算操作等等 当前项目环境为: django==1.11.8 celery==3.1.25 redis==2.10.6 pip==9.0.1 python3==3.5.2 django-celery==3.1.17 一,创建Django项目及celery配置 1,创建Django项目 1>打开终端输入:django-admin startproject

  • Django Celery异步任务队列的实现

    背景 在开发中,我们常常会遇到一些耗时任务,举个例子: 上传并解析一个 1w 条数据的 Excel 文件,最后持久化至数据库. 在我的程序中,这个任务耗时大约 6s,对于用户来说,6s 的等待已经是个灾难了. 比较好的处理方式是: 接收这个任务的请求 将这个任务添加到队列中 立即返回「操作成功,正在后台处理」的字样 后台消费这个队列,执行这个任务 我们按照这个思路,借助 Celery 进行实现. 实现 本文所使用的环境如下: Python 3.6.7 RabbitMQ 3.8 Celery 4.

  • python使用celery实现异步任务执行的例子

    使用celery在django项目中实现异步发送短信 在项目的目录下创建celery_tasks用于保存celery异步任务. 在celery_tasks目录下创建config.py文件,用于保存celery的配置信息 ```broker_url = "redis://127.0.0.1/14"``` 在celery_tasks目录下创建main.py文件,用于作为celery的启动文件 from celery import Celery # 为celery使用django配置文件进行

  • 通过celery异步处理一个查询任务的完整代码

    今天介绍通过celery实现一个异步任务.有这样一个需求,前端发起一个查询的请求,但是发起查询后,查询可能不会立即返回结果.这时候,发起查询后,后端可以把这次查询当作一个task,并立即返回一个能唯一表明该task的值,如taskID(用户后面可以通过这个taskID 随时查看结果),用户收到这个taskID后,可以转去处理其他任务,而不必一直等待查询结果.后端API调用celery来处理这个task,并将结果值保存在一个csv文件中,后面用户通过taskID 查询时返回结果. def appl

  • vue+elementui 实现新增和修改共用一个弹框的完整代码

    element-ui是由饿了么前端团队推出的一套为开发者.设计师和产品经理准备的基于Vue.js 2.0的桌面组件库,而手机端有对应框架是 Mint UI .整个ui风格简约,很实用,同时也极大的提高了开发者的效率,是一个非常受欢迎的组件库. 一.新增 1.新增按钮 2.新增事件 在methods中,用来打开弹窗, dialogVisible在data中定义使用有true或false来控制显示弹框 **3.新增确定,弹框确定事件 ,新增和修改共用一个确定事件,使用id区别 **3.新增事件 调新

  • 用c语言实现一个电话薄(附完整代码)

    先看一下这个小程序的效果 这里我为了演示方便,把人数固定为3个: 人数都是可以自定义的: 下面是这个简单的代码: #include<stdio.h> typedef struct con { int num; char name[11]; char tel[10]; }contact; int main() { contact con[3]; int i; for(i=0;i<3;i++) { printf("请输入编号:"); scanf("%d"

  • 用python写一个福字(附完整代码)

    目录 前言: 一,扫五福活动如此火爆,为何不自己利用编程来生成福字! 二,完整代码 三,总结 前言: 支付宝 2022 集五福活动正式开启 数据显示,过去六年累计参与支付宝集五福的人数已经超过了 7 亿,每 2 个中国人里就有 1 个曾扫福.集福.送福. 一,扫五福活动如此火爆,为何不自己利用编程来生成福字! 首先作品奉上: ①,导入python库 import io from PIL import Image import requests ②,利用爬虫,获取单个汉字 def get_word

  • Java实现一个顺序表的完整代码

    实现一个顺序表 接口实现 定义一个MyArrayList类,在类中实现以下函数 public class MyArrayList { } 数组的定义 public int[] elem;//定义一个整形数组 public int usize;//usize表示数组的长度 public MyArrayList(){ this.elem = new int[5]; } 打印顺序表 for循环打印顺序表的每一位 public void display(){ for (int i = 0; i < th

  • Django使用Celery异步任务队列的使用

    1 Celery简介 Celery是异步任务队列,可以独立于主进程运行,在主进程退出后,也不影响队列中的任务执行. 任务执行异常退出,重新启动后,会继续执行队列中的其他任务,同时可以缓存停止期间接收的工作任务,这个功能依赖于消息队列(MQ.Redis). 1.1 Celery原理 Celery的 架构 由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成. 消息中间件:Celery本身不提供消息服务,但

  • Django中如何使用celery异步发送短信验证码详解

    目录 1.celery介绍 1.1 celery应用举例 1.2 Celery有以下优点 1.3 Celery 特性 2.工作原理 2.1 Celery 扮演生产者和消费者的角色 3.异步发短信 总结 1.celery介绍 1.1 celery应用举例 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,如果你的业务场景中需要用到异步任务,就可以考虑使用celery 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着

  • Django使用celery异步发送短信验证码代码示例

    目录 celery 1.celery介绍 1.1 celery应用举例 1.2 Celery有以下优点 1.3 Celery 特性 2.工作原理 2.1 Celery 扮演生产者和消费者的角色 3.异步发短信 1.settings同级目录下创建 celery 文件 2.配置settings文件 3 配置 settings同级目录下 init 文件 4.在utils下新建一个task.py文件 5.接口中调用 6 .先启动django项目 然后另开终端 cd到项目 celery 1.celery介

  • python3中celery异步框架简单使用+守护进程方式启动

    安装celery sudo pip install celery 实例化 celery from celery import Celery app = Celery("testapp") # 导入配置 app.config_from_object('celery_tasks.config') # 自动添加任务 app.autodiscover_tasks(["celery_tasks.test","celery_tasks.test2"]) 简单

  • celery异步定时任务订单定时回滚

    目录 订单回滚 控制执行(多少时间后执行) celery异步定时任务 订单回滚 用celery异步,定时任务.可以设置:如果下单15分钟后没有支付,则取消订单.做反向操作 控制执行(多少时间后执行) from datetime import datetime from pro_celery.celery import del_order def pay_status(order_id,check_time=5): # 5秒后 ctime = datetime.now() utc_ctime =

随机推荐