Flask实现异步执行任务

Flask 是 Python 中有名的轻量级同步 web 框架,在一些开发中,可能会遇到需要长时间处理的任务,此时就需要使用异步的方式来实现,让长时间任务在后台运行,先将本次请求的响应状态返回给前端,不让前端界面「卡顿」,当异步任务处理好后,如果需要返回状态,再将状态返回。

怎么实现呢?

使用线程的方式

当要执行耗时任务时,直接开启一个新的线程来执行任务,这种方式最为简单快速。

通过 ThreadPoolExecutor 来实现

from flask import Flask
from time import sleep
from concurrent.futures import ThreadPoolExecutor
# DOCS https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
# 创建线程池执行器
executor = ThreadPoolExecutor(2)
app = Flask(__name__)
@app.route('/jobs')
def run_jobs():
 # 交由线程去执行耗时任务
 executor.submit(long_task, 'hello', 123)
 return 'long task running.'
# 耗时任务
def long_task(arg1, arg2):
 print("args: %s %s!" % (arg1, arg2))
 sleep(5)
 print("Task is done!")
if __name__ == '__main__':
 app.run()

当要执行一些比较简单的耗时任务时就可以使用这种方式,如发邮件、发短信验证码等。

但这种方式有个问题,就是前端无法得知任务执行状态。

如果想要前端知道,就需要设计一些逻辑,比如将任务执行状态存储到 redis 中,通过唯一的任务 id 进行标识,然后再写一个接口,通过任务 id 去获取任务的状态,然后让前端定时去请求该接口,从而获得任务状态信息。

全部自己实现就显得有些麻烦了,而 Celery 刚好实现了这样的逻辑,来使用一下。

使用 Celery

为了满足前端可以获得任务状态的需求,可以使用 Celery。

Celery 是实时任务处理与调度的分布式任务队列,它常用于 web 异步任务、定时任务等,后面单独写一篇文章描述 Celery 的架构,这里不深入讨论。

现在我想让前端可以通过一个进度条来判断后端任务的执行情况。使用 Celery 就很容易实现,首先通过 pip 安装 Celery 与 redis,之所以要安装 redis,是因为让 Celery 选择 redis 作为「消息代理 / 消息中间件」。

pip install celery
pip install redis

在 Flask 中使用 Celery 其实很简单,这里先简单的过一下 Flask 中使用 Celery 的整体流程,然后再去实现具体的项目

1.在 Flask 中初始化 Celery

from flask import Flask
from celery import Celery
app = Flask(__name__)
# 配置
# 配置消息代理的路径,如果是在远程服务器上,则配置远程服务器中redis的URL
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
# 要存储 Celery 任务的状态或运行结果时就必须要配置
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
# 初始化Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
# 将Flask中的配置直接传递给Celery
celery.conf.update(app.config)

上述代码中,通过 Celery 类初始化 celery 对象,传入的应用名称与消息代理的连接 URL。

2.通过 celery.task 装饰器装饰耗时任务对应的函数

@celery.task
def long_task(arg1, arg2):
 # 耗时任务的逻辑
 return result

3.Flask 中定义接口通过异步的方式执行耗时任务

@app.route('/', methods=['GET', 'POST'])
def index():
 task = long_task.delay(1, 2)
delay () 方法是 applyasync () 方法的快捷方式,applyasync () 参数更多,可以更加细致的控制耗时任务,比如想要 long_task () 在一分钟后再执行
@app.route('/', methods=['GET', 'POST'])
def index():
 task = long_task.apply_async(args=[1, 2], countdown=60)

delay () 与 apply_async () 会返回一个任务对象,该对象可以获取任务的状态与各种相关信息。
通过这 3 步就可以使用 Celery 了。

接着就具体来实现「让前端可以通过一个进度条来判断后端任务的执行情况」的需求。

# bind为True,会传入self给被装饰的方法
@celery.task(bind=True)
def long_task(self):
 verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
 adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
 noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
 message = ''
 total = random.randint(10, 50)
 for i in range(total):
 if not message or random.random() < 0.25:
 # 随机的获取一些信息
 message = '{0} {1} {2}...'.format(random.choice(verb),
 random.choice(adjective),
 random.choice(noun))
 # 更新Celery任务状态
 self.update_state(state='PROGRESS',
 meta={'current': i, 'total': total,
 'status': message})
 time.sleep(1)
 # 返回字典
 return {'current': 100, 'total': 100, 'status': 'Task completed!',
 'result': 42}

上述代码中,celery.task () 装饰器使用了 bind=True 参数,这个参数会让 Celery 将 Celery 本身传入,可以用于记录与更新任务状态。

然后就是一个 for 迭代,迭代的逻辑没什么意义,就是随机从 list 中抽取一些词汇来模拟一些逻辑的运行,为了表示这是耗时逻辑,通过 time.sleep (1) 休眠一秒。

每次获取一次词汇,就通过 self.update_state () 更新 Celery 任务的状态,Celery 包含一些内置状态,如 SUCCESS、STARTED 等等,这里使用了自定义状态「PROGRESS」,除了状态外,还将本次循环的一些信息通过 meta 参数 (元数据) 以字典的形式存储起来。有了这些数据,前端就可以显示进度条了。

定义好耗时方法后,再定义一个 Flask 接口方法来调用该耗时方法

@app.route('/longtask', methods=['POST'])
def longtask():
 # 异步调用
 task = long_task.apply_async()
 # 返回 202,与Location头
 return jsonify({}), 202, {'Location': url_for('taskstatus',
 task_id=task.id)}

简单而言,前端通过 POST 请求到 /longtask,让后端开始去执行耗时任务。

返回的状态码为 202,202 通常表示一个请求正在进行中,然后还在返回数据包的包头 (Header) 中添加了 Location 头信息,前端可以通过读取数据包中 Header 中的 Location 的信息来获取任务 id 对应的完整 url。

前端有了任务 id 对应的 url 后,还需要提供一个接口给前端,让前端可以通过任务 id 去获取当前时刻任务的具体状态。

@app.route('/status/<task_id>')
def taskstatus(task_id):
 task = long_task.AsyncResult(task_id)
 if task.state == 'PENDING': # 在等待
 response = {
 'state': task.state,
 'current': 0,
 'total': 1,
 'status': 'Pending...'
 }
 elif task.state != 'FAILURE': # 没有失败
 response = {
 'state': task.state, # 状态
 # meta中的数据,通过task.info.get()可以获得
 'current': task.info.get('current', 0), # 当前循环进度
 'total': task.info.get('total', 1), # 总循环进度
 'status': task.info.get('status', '')
 }
 if 'result' in task.info:
 response['result'] = task.info['result']
 else:
 # 后端执行任务出现了一些问题
 response = {
 'state': task.state,
 'current': 1,
 'total': 1,
 'status': str(task.info), # 报错的具体异常
 }
 return jsonify(response)

为了可以获得任务对象中的信息,使用任务 id 初始化 AsyncResult 类,获得任务对象,然后就可以从任务对象中获得当前任务的信息。

该方法会返回一个 JSON,其中包含了任务状态以及 meta 中指定的信息,前端可以利用这些信息构建一个进度条。

如果任务在 PENDING 状态,表示该任务还没有开始,在这种状态下,任务中是没有什么信息的,这里人为的返回一些数据。如果任务执行失败,就返回 task.info 中包含的异常信息,此外就是正常执行了,正常执行可以通 task.info 获得任务中具体的信息。

这样,后端的逻辑就处理完成了,接着就来实现前端的逻辑,要实现图形进度条,可以直接使用 nanobar.js,简单两句话就可以实现一个进度条,其官网例子如下:

var options = {
 classname: 'my-class',
 id: 'my-id',
 // 进度条要出现的位置
 target: document.getElementById('myDivId')
};
// 初始化进度条对象
var nanobar = new Nanobar( options );
nanobar.go( 30 ); // 30% 进度条
nanobar.go( 76 ); // 76% 进度条
// 100% 进度条,进度条结束
nanobar.go(100);

有了 nanobar.js 就非常简单了。

先定义一个简单的 HTML 界面

<h2>Long running task with progress updates</h2>
<button id="start-bg-job">Start Long Calculation</button><br><br>
<div id="progress"></div>

通过 JavaScript 实现对后台的请求

// 按钮点击事件
$(function() {
 $('#start-bg-job').click(start_long_task);
 });
// 请求 longtask 接口
function start_long_task() {
 // 添加元素在html中
 div = $('<div class="progress"><div></div><div>0%</div><div>...</div><div> </div></div><hr>');
 $('#progress').append(div);
 // 创建进度条对象
 var nanobar = new Nanobar({
 bg: '#44f',
 target: div[0].childNodes[0]
 });
 // ajax请求longtask
 $.ajax({
 type: 'POST',
 url: '/longtask',
 // 获得数据,从响应头中获取Location
 success: function(data, status, request) {
 status_url = request.getResponseHeader('Location');
 // 调用 update_progress() 方法更新进度条
 update_progress(status_url, nanobar, div[0]);
 },
 error: function() {
 alert('Unexpected error');
 }
 });
 }
// 更新进度条
function update_progress(status_url, nanobar, status_div) {
 // getJSON()方法是JQuery内置方法,这里向Location中对应的url发起请求,即请求「/status/<task_id>」
 $.getJSON(status_url, function(data) {
 // 计算进度
 percent = parseInt(data['current'] * 100 / data['total']);
 // 更新进度条
 nanobar.go(percent);
 // 更新文字
 $(status_div.childNodes[1]).text(percent + '%');
 $(status_div.childNodes[2]).text(data['status']);
 if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') {
 if ('result' in data) {
 // 展示结果
 $(status_div.childNodes[3]).text('Result: ' + data['result']);
 }
 else {
 // 意料之外的事情发生
 $(status_div.childNodes[3]).text('Result: ' + data['state']);
 }
 }
 else {
 // 2秒后再次运行
 setTimeout(function() {
 update_progress(status_url, nanobar, status_div);
 }, 2000);
 }
 });
 } 

可以通过注释阅读代码整体逻辑。

至此,需求实现完了,运行一下。

首先运行 Redis

redis-server

然后运行 celery

celery worker -A app.celery --loglevel=info

最后运行 Flask 项目

python app.py

效果如下:

到此这篇关于Flask实现异步执行任务的文章就介绍到这了,更多相关Flask 异步内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 深入flask之异步非堵塞实现代码示例

    官方其实已经给出了方案,只不过藏的有点深,在加上网上有很多不太靠谱的帖子误导了我(当然不排除我没理解的原因哈).所以为了让有些朋友的少走点弯路,也为给自己做个备忘. 完整代码:https://github.com/wskssau/my_notespace的 python/todo_app 解决方案: flask+gevent 安装gevent pip install gevent 修改代码 # 文件头部 from gevent import monkey from gevent.pywsgi i

  • Flask实现异步非阻塞请求功能实例解析

    本文研究的主要是Flask实现异步非阻塞请求功能,具体实现如下. 最近做物联网项目的时候需要搭建一个异步非阻塞的HTTP服务器,经过查找资料,发现可以使用gevent包. 关于gevent Gevent 是一个 Python 并发网络库,它使用了基于 libevent 事件循环的 greenlet 来提供一个高级同步 API.下面是代码示例: from gevent.wsgi import WSGIServer from yourapplication import app http_serve

  • Python Flask异步发送邮件实现方法解析

    第一步,修改工厂函数,配置邮件参数 from flask import Flask from config import Config from flask_sqlalchemy import SQLAlchemy from flask_mail import Mail db = SQLAlchemy() mail = Mail() def create_app(): app = Flask(__name__) app.config.from_object(Config) db.init_app

  • Flask实现异步执行任务

    Flask 是 Python 中有名的轻量级同步 web 框架,在一些开发中,可能会遇到需要长时间处理的任务,此时就需要使用异步的方式来实现,让长时间任务在后台运行,先将本次请求的响应状态返回给前端,不让前端界面「卡顿」,当异步任务处理好后,如果需要返回状态,再将状态返回. 怎么实现呢? 使用线程的方式 当要执行耗时任务时,直接开启一个新的线程来执行任务,这种方式最为简单快速. 通过 ThreadPoolExecutor 来实现 from flask import Flask from time

  • PHP编程实现脚本异步执行的方法

    本文实例讲述了PHP编程实现脚本异步执行的方法.分享给大家供大家参考,具体如下: php语言得用fsockopen()函数,实现脚本异步运行,代码如下 异步请求函数(用debug参数若为true则为用为调试,开启调试可以看到异步的执行情况,但是失去异步的效果) main.php <?php function request_by_fsockopen($url,$post_data=array(),$debug=false){ $url_array = parse_url($url); $host

  • 更快的异步执行(setTimeout多浏览器)

    如果要异步执行一个函数,我们最先想到的方法肯定会是setTimeout 例如:setTimeout(function( /* 1s后做点什么 */){},1000} 那如果说要最快速地异步执行一个函数呢? 是否会是: setTimeout(function( /* 尽快做点什么 */){},0} 可惜的是,浏览器为了避免setTimeout嵌套可能出现卡死ui线程的情况,为setTimeout设置了最小的执行时间间隔,不同浏览器的最小执行时间间隔都不一样.chrome下测试 setTimeout

  • 浅谈js文件引用方式及其同步执行与异步执行

    任何以appendChild(scriptNode) 的方式引入的js文件都是异步执行的 (scriptNode 需要插入document中,只创建节点和设置 src 是不会加载 js 文件的,这跟 img 的与加载不同 ) html文件中的<script>标签中的代码或src引用的js文件中的代码是同步加载和执行的 html文件中的<script>标签中的代码使用document.write()方式引入的js文件是异步执行的 html文件中的<script>标签src

  • 4种PHP异步执行的常用方式

    本文为大家讲述了php异步调用方法,分享给大家供大家参考,具体内容如下 客户端与服务器端是通过HTTP协议进行连接通讯,客户端发起请求,服务器端接收到请求后执行处理,并返回处理结果. 有时服务器需要执行很耗时的操作,这个操作的结果并不需要返回给客户端.但因为php是同步执行的,所以客户端需要等待服务处理完才可以进行下一步. 因此对于耗时的操作适合异步执行,服务器接收到请求后,处理完客户端需要的数据就返回,再异步在服务器执行耗时的操作. 1.使用Ajax 与 img 标记 原理,服务器返回的htm

  • 基于PHP异步执行的常用方式详解

    1.客户端页面采用AJAX技术请求服务器优点:最简单,也最快,就是在返回给客户端的HTML代码中,嵌入AJAX调用,或者,嵌入一个img标签,src指向要执行的耗时脚本.缺点:一般来说Ajax都应该在onLoad以后触发,也就是说,用户点开页面后,就关闭,那就不会触发我们的后台脚本了.而使用img标签的话,这种方式不能称为严格意义上的异步执行.用户浏览器会长时间等待php脚本的执行完成,也就是用户浏览器的状态栏一直显示还在load.当然,还可以使用其他的类似原理的方法,比如script标签等等.

  • 浅析javascript异步执行函数导致的变量变化问题解决思路

    浅析javascript异步执行函数导致的变量变化问题解决思路 for(var i=0;i<3;i++) { setTimeout(function(){ console.log(i) },0); } 控制台输出: 3 3 3 这是因为执行方法的时候for循环已经执行完成每次执行的时候取得都是3 而不是1-2-3这时我们可以使用立即执行函数为每一次循环创建一个变量副本来供定时器调用解决这个问题 for (var i = 0; i < 3; i++) { setTimeout( (functio

  • 浅谈js的异步执行

    1.Javascript语言的执行环境是"单线程"(single thread): 优点:实现起来比较简单,执行环境相对单纯: 缺点:只要有一个任务耗时很长,后面的任务都必须排队等着,会拖延整个程序的执行.常见的浏览器无响应(假死),往往就是因为某一段Javascript代码长时间运行(比如死循环),导致整个页面卡在这个地方,其他任务无法执行. 为了解决这个问题,Javascript语言将任务的执行模式分成两种:同步(Synchronous)和异步(Asynchronous). 2.&

  • 如何在Linux中使用flock控制程序的异步执行

    最近我常常需要同时ssh给若干台电脑做许多需要等待,而且可以同时进行的工作.例如: 1.让远端电脑同时更新套件 2.同时传送小档案给远端的电脑(时间大部分在ssh认证) 然而之后的动作又需要在确认上述工作完毕之后,才能继续进行. 过去我都是这样做: # 前面的工作 update_pkg_on_machine_1 update_pkg_on_machine_2 update_pkg_on_machine_3 # ... 后面的工作 这样虽然可以确保工作同时进行完毕,但是就是很慢- 另一种可能的方法

  • 深入PHP异步执行的详解

    Web服务器执行一个PHP脚本,有时耗时很长才能返回执行结果,后面的脚本需要等待很长一段时间才能继续执行.如果想实现只简单触发耗时脚本的执行而不等待执行结果就直接执行下一步操作,可以通过fscokopen函数来实现.PHP支持socket编程,fscokopen函数返回一个到远程主机连接的句柄,可以像使用fopen返回的句柄一样,对它进行fwrite.fgets.fread等操作.使用fsockopen连接到本地服务器,触发脚本执行,然后立即返回,不等待脚本执行完成,即可实现异步执行PHP的效果

随机推荐