Python的Tornado框架的异步任务与AsyncHTTPClient

高性能服务器Tornado
Python的web框架名目繁多,各有千秋。正如光荣属于希腊,伟大属于罗马。Python的优雅结合WSGI的设计,让web框架接口实现千秋一统。WSGI 把应用(Application)和服务器(Server)结合起来。Django 和 Flask 都可以结合 gunicon 搭建部署应用。

与 django 和 flask 不一样,tornado 既可以是 wsgi 应用,也可以是 wsgi 服务。当然,选择tornado更多的考量源于其单进程单线程异步IO的网络模式。高性能往往吸引人,可是有不少朋友使用之后会提出疑问,tornado号称高性能,实际使用的时候却怎么感受不到呢?

实际上,高性能源于Tornado基于Epoll(unix为kqueue)的异步网络IO。因为tornado的单线程机制,一不小心就容易写出阻塞服务(block)的代码。不但没有性能提高,反而会让性能急剧下降。因此,探索tornado的异步使用方式很有必要。

Tornado 异步使用方式
简而言之,Tornado的异步包括两个方面,异步服务端和异步客户端。无论服务端和客户端,具体的异步模型又可以分为回调(callback)和协程(coroutine)。具体应用场景,也没有很明确的界限。往往一个请求服务里还包含对别的服务的客户端异步请求。

服务端异步方式
服务端异步,可以理解为一个tornado请求之内,需要做一个耗时的任务。直接写在业务逻辑里可能会block整个服务。因此可以把这个任务放到异步处理,实现异步的方式就有两种,一种是yield挂起函数,另外一种就是使用类线程池的方式。请看一个同步例子:

class SyncHandler(tornado.web.RequestHandler):

  def get(self, *args, **kwargs):
    # 耗时的代码
    os.system("ping -c 2 www.google.com")
    self.finish('It works')

使用ab测试一下:

ab -c 5 -n 5 http://127.0.0.1:5000/sync
Server Software:    TornadoServer/4.3
Server Hostname:    127.0.0.1
Server Port:      5000

Document Path:     /sync
Document Length:    5 bytes

Concurrency Level:   5
Time taken for tests:  5.076 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   985 bytes
HTML transferred:    25 bytes
Requests per second:  0.99 [#/sec] (mean)
Time per request:    5076.015 [ms] (mean)
Time per request:    1015.203 [ms] (mean, across all concurrent requests)
Transfer rate:     0.19 [Kbytes/sec] received

qps 仅有可怜的 0.99,姑且当成每秒处理一个请求吧。

下面祭出异步大法:

class AsyncHandler(tornado.web.RequestHandler):
  @tornado.web.asynchronous
  @tornado.gen.coroutine
  def get(self, *args, **kwargs):

    tornado.ioloop.IOLoop.instance().add_timeout(1, callback=functools.partial(self.ping, 'www.google.com'))

    # do something others

    self.finish('It works')

  @tornado.gen.coroutine
  def ping(self, url):
    os.system("ping -c 2 {}".format(url))
    return 'after'

尽管在执行异步任务的时候选择了timeout 1秒,主线程的返回还是很快的。ab压测如下:

Document Path:     /async
Document Length:    5 bytes

Concurrency Level:   5
Time taken for tests:  0.009 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   985 bytes
HTML transferred:    25 bytes
Requests per second:  556.92 [#/sec] (mean)
Time per request:    8.978 [ms] (mean)
Time per request:    1.796 [ms] (mean, across all concurrent requests)
Transfer rate:     107.14 [Kbytes/sec] received

上述的使用方式,通过tornado的IO循环,把可以把耗时的任务放到后台异步计算,请求可以接着做别的计算。可是,经常有一些耗时的任务完成之后,我们需要其计算的结果。此时这种方式就不行了。车道山前必有路,只需要切换一异步方式即可。下面使用协程来改写:

class AsyncTaskHandler(tornado.web.RequestHandler):
  @tornado.web.asynchronous
  @tornado.gen.coroutine
  def get(self, *args, **kwargs):
    # yield 结果
    response = yield tornado.gen.Task(self.ping, ' www.google.com')
    print 'response', response
    self.finish('hello')

  @tornado.gen.coroutine
  def ping(self, url):
    os.system("ping -c 2 {}".format(url))
    return 'after'

可以看到异步在处理,而结果值也被返回了。

Server Software:    TornadoServer/4.3
Server Hostname:    127.0.0.1
Server Port:      5000

Document Path:     /async/task
Document Length:    5 bytes

Concurrency Level:   5
Time taken for tests:  0.049 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   985 bytes
HTML transferred:    25 bytes
Requests per second:  101.39 [#/sec] (mean)
Time per request:    49.314 [ms] (mean)
Time per request:    9.863 [ms] (mean, across all concurrent requests)
Transfer rate:     19.51 [Kbytes/sec] received

qps提升还是很明显的。有时候这种协程处理,未必就比同步快。在并发量很小的情况下,IO本身拉开的差距并不大。甚至协程和同步性能差不多。例如你跟博尔特跑100米肯定输给他,可是如果跟他跑2米,鹿死谁手还未定呢。

yield挂起函数协程,尽管没有block主线程,因为需要处理返回值,挂起到响应执行还是有时间等待,相对于单个请求而言。另外一种使用异步和协程的方式就是在主线程之外,使用线程池,线程池依赖于futures。Python2需要额外安装。

下面使用线程池的方式修改为异步处理:

from concurrent.futures import ThreadPoolExecutor

class FutureHandler(tornado.web.RequestHandler):
  executor = ThreadPoolExecutor(10)

  @tornado.web.asynchronous
  @tornado.gen.coroutine
  def get(self, *args, **kwargs):

    url = 'www.google.com'
    tornado.ioloop.IOLoop.instance().add_callback(functools.partial(self.ping, url))
    self.finish('It works')

  @tornado.concurrent.run_on_executor
  def ping(self, url):
    os.system("ping -c 2 {}".format(url))

再运行ab测试:

Document Path:     /future
Document Length:    5 bytes

Concurrency Level:   5
Time taken for tests:  0.003 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   995 bytes
HTML transferred:    25 bytes
Requests per second:  1912.78 [#/sec] (mean)
Time per request:    2.614 [ms] (mean)
Time per request:    0.523 [ms] (mean, across all concurrent requests)
Transfer rate:     371.72 [Kbytes/sec] received

qps瞬间达到了1912.78。同时,可以看到服务器的log还在不停的输出ping的结果。
想要返回值也很容易。再切换一下使用方式接口。使用tornado的gen模块下的with_timeout功能(这个功能必须在tornado>3.2的版本)。

class Executor(ThreadPoolExecutor):
  _instance = None

  def __new__(cls, *args, **kwargs):
    if not getattr(cls, '_instance', None):
      cls._instance = ThreadPoolExecutor(max_workers=10)
    return cls._instance

class FutureResponseHandler(tornado.web.RequestHandler):
  executor = Executor()

  @tornado.web.asynchronous
  @tornado.gen.coroutine
  def get(self, *args, **kwargs):

    future = Executor().submit(self.ping, 'www.google.com')

    response = yield tornado.gen.with_timeout(datetime.timedelta(10), future,
                         quiet_exceptions=tornado.gen.TimeoutError)

    if response:
      print 'response', response.result()

  @tornado.concurrent.run_on_executor
  def ping(self, url):
    os.system("ping -c 1 {}".format(url))
    return 'after'

线程池的方式也可以通过使用tornado的yield把函数挂起,实现了协程处理。可以得出耗时任务的result,同时不会block住主线程。

Concurrency Level:   5
Time taken for tests:  0.043 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   960 bytes
HTML transferred:    0 bytes
Requests per second:  116.38 [#/sec] (mean)
Time per request:    42.961 [ms] (mean)
Time per request:    8.592 [ms] (mean, across all concurrent requests)
Transfer rate:     21.82 [Kbytes/sec] received

qps为116,使用yield协程的方式,仅为非reponse的十分之一左右。看起来性能损失了很多,主要原因这个协程返回结果需要等执行完毕任务。

好比打鱼,前一种方式是撒网,然后就完事,不闻不问,时间当然快,后一种方式则撒网之后,还得收网,等待收网也是一段时间。当然,相比同步的方式还是快了千百倍,毕竟撒网还是比一只只钓比较快。

具体使用何种方式,更多的依赖业务,不需要返回值的往往需要处理callback,回调太多容易晕菜,当然如果需要很多回调嵌套,首先优化的应该是业务或产品逻辑。yield的方式很优雅,写法可以异步逻辑同步写,爽是爽了,当然也会损失一定的性能。

异步多样化
Tornado异步服务的处理大抵如此。现在异步处理的框架和库也很多,借助redis或者celery等,也可以把tonrado中一些业务异步化,放到后台执行。

此外,Tornado还有客户端异步功能。该特性主要是在于 AsyncHTTPClient的使用。此时的应用场景往往是tornado服务内,需要针对另外的IO进行请求和处理。顺便提及,上述的例子中,调用ping其实也算是一种服务内的IO处理。接下来,将会探索一下AsyncHTTPClient的使用,尤其是使用AsyncHTTPClient上传文件与转发请求。

异步客户端
前面了解Tornado的异步任务的常用做法,姑且归结为异步服务。通常在我们的服务内,还需要异步的请求第三方服务。针对HTTP请求,Python的库Requests是最好用的库,没有之一。官网宣称:HTTP for Human。然而,在tornado中直接使用requests将会是一场恶梦。requests的请求会block整个服务进程。

上帝关上门的时候,往往回打开一扇窗。Tornado提供了一个基于框架本身的异步HTTP客户端(当然也有同步的客户端)--- AsyncHTTPClient。

AsyncHTTPClient 基本用法
AsyncHTTPClient是 tornado.httpclinet 提供的一个异步http客户端。使用也比较简单。与服务进程一样,AsyncHTTPClient也可以callback和yield两种使用方式。前者不会返回结果,后者则会返回response。

如果请求第三方服务是同步方式,同样会杀死性能。

class SyncHandler(tornado.web.RequestHandler):
  def get(self, *args, **kwargs):

    url = 'https://api.github.com/'
    resp = requests.get(url)
    print resp.status_code

    self.finish('It works')

使用ab测试大概如下:

Document Path:     /sync
Document Length:    5 bytes

Concurrency Level:   5
Time taken for tests:  10.255 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   985 bytes
HTML transferred:    25 bytes
Requests per second:  0.49 [#/sec] (mean)
Time per request:    10255.051 [ms] (mean)
Time per request:    2051.010 [ms] (mean, across all concurrent requests)
Transfer rate:     0.09 [Kbytes/sec] received

性能相当慢了,换成AsyncHTTPClient再测:

class AsyncHandler(tornado.web.RequestHandler):
  @tornado.web.asynchronous
  def get(self, *args, **kwargs):

    url = 'https://api.github.com/'
    http_client = tornado.httpclient.AsyncHTTPClient()
    http_client.fetch(url, self.on_response)
    self.finish('It works')

  @tornado.gen.coroutine
  def on_response(self, response):
    print response.code

qps 提高了很多

Document Path:     /async
Document Length:    5 bytes

Concurrency Level:   5
Time taken for tests:  0.162 seconds
Complete requests:   5
Failed requests:    0
Total transferred:   985 bytes
HTML transferred:    25 bytes
Requests per second:  30.92 [#/sec] (mean)
Time per request:    161.714 [ms] (mean)
Time per request:    32.343 [ms] (mean, across all concurrent requests)
Transfer rate:     5.95 [Kbytes/sec] received

同样,为了获取response的结果,只需要yield函数。

class AsyncResponseHandler(tornado.web.RequestHandler):
  @tornado.web.asynchronous
  @tornado.gen.coroutine
  def get(self, *args, **kwargs):

    url = 'https://api.github.com/'
    http_client = tornado.httpclient.AsyncHTTPClient()
    response = yield tornado.gen.Task(http_client.fetch, url)
    print response.code
    print response.body

AsyncHTTPClient 转发
使用Tornado经常需要做一些转发服务,需要借助AsyncHTTPClient。既然是转发,就不可能只有get方法,post,put,delete等方法也会有。此时涉及到一些 headers和body,甚至还有https的waring。

下面请看一个post的例子, yield结果,通常,使用yield的时候,handler是需要 tornado.gen.coroutine。

headers = self.request.headers
body = json.dumps({'name': 'rsj217'})
http_client = tornado.httpclient.AsyncHTTPClient()

resp = yield tornado.gen.Task(
  self.http_client.fetch,
  url,
  method="POST",
  headers=headers,
  body=body,
  validate_cert=False)

AsyncHTTPClient 构造请求
如果业务处理并不是在handlers写的,而是在别的地方,当无法直接使用tornado.gen.coroutine的时候,可以构造请求,使用callback的方式。

body = urllib.urlencode(params)
req = tornado.httpclient.HTTPRequest(
 url=url,
 method='POST',
 body=body,
 validate_cert=False) 

http_client.fetch(req, self.handler_response)

def handler_response(self, response):

  print response.code

用法也比较简单,AsyncHTTPClient中的fetch方法,第一个参数其实是一个HTTPRequest实例对象,因此对于一些和http请求有关的参数,例如method和body,可以使用HTTPRequest先构造一个请求,再扔给fetch方法。通常在转发服务的时候,如果开起了validate_cert,有可能会返回599timeout之类,这是一个warning,官方却认为是合理的。

AsyncHTTPClient 上传图片
AsyncHTTPClient 更高级的用法就是上传图片。例如服务有一个功能就是请求第三方服务的图片OCR服务。需要把用户上传的图片,再转发给第三方服务。

@router.Route('/api/v2/account/upload')
class ApiAccountUploadHandler(helper.BaseHandler):
  @tornado.gen.coroutine
  @helper.token_require
  def post(self, *args, **kwargs):
    upload_type = self.get_argument('type', None)

    files_body = self.request.files['file']

    new_file = 'upload/new_pic.jpg'
    new_file_name = 'new_pic.jpg'

    # 写入文件
    with open(new_file, 'w') as w:
      w.write(file_['body'])

    logging.info('user {} upload {}'.format(user_id, new_file_name))

    # 异步请求 上传图片
    with open(new_file, 'rb') as f:
      files = [('image', new_file_name, f.read())]

    fields = (('api_key', KEY), ('api_secret', SECRET))

    content_type, body = encode_multipart_formdata(fields, files)

    headers = {"Content-Type": content_type, 'content-length': str(len(body))}
    request = tornado.httpclient.HTTPRequest(config.OCR_HOST,
                         method="POST", headers=headers, body=body, validate_cert=False)

    response = yield tornado.httpclient.AsyncHTTPClient().fetch(request)

def encode_multipart_formdata(fields, files):
  """
  fields is a sequence of (name, value) elements for regular form fields.
  files is a sequence of (name, filename, value) elements for data to be
  uploaded as files.
  Return (content_type, body) ready for httplib.HTTP instance
  """
  boundary = '----------ThIs_Is_tHe_bouNdaRY_$'
  crlf = '\r\n'
  l = []
  for (key, value) in fields:
    l.append('--' + boundary)
    l.append('Content-Disposition: form-data; name="%s"' % key)
    l.append('')
    l.append(value)
  for (key, filename, value) in files:
    filename = filename.encode("utf8")
    l.append('--' + boundary)
    l.append(
        'Content-Disposition: form-data; name="%s"; filename="%s"' % (
          key, filename
        )
    )
    l.append('Content-Type: %s' % get_content_type(filename))
    l.append('')
    l.append(value)
  l.append('--' + boundary + '--')
  l.append('')
  body = crlf.join(l)
  content_type = 'multipart/form-data; boundary=%s' % boundary
  return content_type, body

def get_content_type(filename):
  import mimetypes

  return mimetypes.guess_type(filename)[0] or 'application/octet-stream'

对比上述的用法,上传图片仅仅是多了一个图片的编码。将图片的二进制数据按照multipart 方式编码。编码的同时,还需要把传递的相关的字段处理好。相比之下,使用requests 的方式则非常简单:

files = {}
f = open('/Users/ghost/Desktop/id.jpg')
files['image'] = f
data = dict(api_key='KEY', api_secret='SECRET')
resp = requests.post(url, data=data, files=files)
f.close()
print resp.status_Code

总结
通过AsyncHTTPClient的使用方式,可以轻松的实现handler对第三方服务的请求。结合前面关于tornado异步的使用方式。无非还是两个key。是否需要返回结果,来确定使用callback的方式还是yield的方式。当然,如果不同的函数都yield,yield也可以一直传递。这个特性,tornado的中的tornado.auth 里面对oauth的认证。

大致就是这样的用法。

(0)

相关推荐

  • Android中发送Http请求(包括文件上传、servlet接收)的实例代码

    复制代码 代码如下: /*** 通过http协议提交数据到服务端,实现表单提交功能,包括上传文件* @param actionUrl 上传路径 * @param params 请求参数 key为参数名,value为参数值 * @param file 上传文件 */public static void postMultiParams(String actionUrl, Map<String, String> params, FormBean[] files) {try {PostMethod p

  • android文件上传示例分享(android图片上传)

    主要思路是调用系统文件管理器或者其他媒体采集资源来获取要上传的文件,然后将文件的上传进度实时展示到进度条中. 主Activity 复制代码 代码如下: package com.guotop.elearn.activity.app.yunpan.activity; import java.io.File;import java.io.FileNotFoundException;import java.io.IOException; import android.app.Activity;impor

  • Android实现本地上传图片并设置为圆形头像

    先从本地把图片上传到服务器,然后根据URL把头像处理成圆形头像. 因为上传图片用到bmob的平台,所以要到bmob(http://www.bmob.cn)申请密钥. 效果图: 核心代码: 复制代码 代码如下: public class MainActivity extends Activity {         private ImageView iv;         private String appKey="";                //填写你的Applicatio

  • Android使用xUtils3.0实现文件上传

    几个月前写过一篇博客<xUtils3.0框架学习笔记> ,上面也有记录通过xUtils实现文件上传的使用方法,代码如下: private void upLoadOnClick(View v) { String upUrl = "/mnt/sdcard/pic/test.jpg";//指定要上传的文件 final ProgressDialog dia = new ProgressDialog(this); dia.setMessage("加载中....")

  • Android引用开源框架通过AsyncHttpClient实现文件上传

    引用开源框架通过AsyncHttpClient进行文件上传,具体内容如下 一.步骤: 1.添加权限(访问网络权限和读写权限) 2.获取上传文件路径并判断是否为空 3.若不为空,创建异步请求对象 4.创建上传文件路径 5.执行post请求(指定url路径,封装上传参数,新建AsyncHttpResponseHandler方法) 二.查看参考文档  三.实例项目解析 运行效果如下: 在本地文件夹中查看是否获取到图片,如下图显示 重点代码:均有详细解析,请认真查看注释 1.在AndroidManife

  • Android带进度条的文件上传示例(使用AsyncTask异步任务)

    最近项目中要做一个带进度条的上传文件的功能,学习了AsyncTask,使用起来比较方便,将几个方法实现就行,另外做了一个很简单的demo,希望能对大家有帮助,在程序中设好文件路径和服务器IP即可. demo运行截图: AsyncTask是抽象类,子类必须实现抽象方法doInBackground(Params... p),在此方法中实现任务的执行工作,比如联网下载或上传.AsyncTask定义了三种泛型类型Params,Progress和Result. 1.Params 启动任务执行的输入参数,比

  • Android Retrofit 2.0框架上传图片解决方案

    本文为大家分享了 Android Retrofit 2.0框架上传图片解决方案,具体内容如下 1.单张图片的上传 /** * 上传一张图片 * @param description * @param imgs * @return */ @Multipart @POST("/upload") Call<String> uploadImage(@Part("fileName") String description, @Part("file\&qu

  • 使用Android的OkHttp包实现基于HTTP协议的文件上传下载

    OkHttp的HTTP连接基础 虽然在使用 OkHttp 发送 HTTP 请求时只需要提供 URL 即可,OkHttp 在实现中需要综合考虑 3 种不同的要素来确定与 HTTP 服务器之间实际建立的 HTTP 连接.这样做的目的是为了达到最佳的性能. 首先第一个考虑的要素是 URL 本身.URL 给出了要访问的资源的路径.比如 URL http://www.baidu.com 所对应的是百度首页的 HTTP 文档.在 URL 中比较重要的部分是访问时使用的模式,即 HTTP 还是 HTTPS.这

  • Android基于Http协议实现文件上传功能的方法

    本文实例讲述了Android基于Http协议实现文件上传功能的方法.分享给大家供大家参考,具体如下: 注意一般使用Http协议上传的文件都比较小,一般是小于2M 这里示例是上传一个小的MP3文件 1.主Activity:MainActivity.java public class MainActivity extends Activity { private static final String TAG = "MainActivity"; private EditText timel

  • 简单实现Android文件上传

    文件上传在B/S应用中是一种十分常见的功能,那么在Android平台下是否可以实现像B/S那样的文件上传功能呢?答案是肯定的.下面是一个模拟网站程序上传文件的例子. 首先新建一个Android工程,新建主启动Activity: MainActivity.java: package com.xzq.upload; import java.io.DataOutputStream; import java.io.FileInputStream; import java.io.InputStream;

随机推荐