Python aiohttp百万并发极限测试实例分析

本文实例讲述了Python aiohttp百万并发极限测试。分享给大家供大家参考,具体如下:

本文将测试python aiohttp的极限,同时测试其性能表现,以分钟发起请求数作为指标。大家都知道,当应用到网络操作时,异步的代码表现更优秀,但是验证这个事情,同时搞明白异步到底有多大的优势以及为什么会有这样的优势仍然是一件有趣的事情。为了验证,我将发起1000000请求,用aiohttp客户端。aiohttp每分钟能够发起多少请求?你能预料到哪些异常情况以及崩溃会发生,当你用比较粗糙的脚本去发起如此大量的请求?面对如此大量的请求,哪些主要的陷阱是你需要去思考的?

初识 asyncio/aiohttp

异步编程并不简单。相比平常的同步编程,你需要付出更多的努力在使用回调函数,以事件以及事件处理器的模式进行思考。同时也是因为asyncio相对较新,相关的教程以及博客还很少的缘故。官方文档非常简陋,只有最基本的范例。在我写本文的时候,Stack Overflow上面,只有410个与asyncio相关的话题(相比之下,twisted相关的有2585)。有个别关于asyncio的不错的博客以及文章,比如这个这个这个,或者还有这个以及这个

简单起见,我们先从基础开始 —— 简单HTTP hello world —— 发起GET请求,同时获取一个单独的HTTP响应。

同步模式,你这么做:

import requests
def hello()
   return requests.get("http://httpbin.org/get")
print(hello())

接着我们使用aiohttp:

#!/usr/local/bin/python3.5
import asyncio
from aiohttp import ClientSession
async def hello():
  async with ClientSession() as session:
    async with session.get("http://httpbin.org/headers") as response:
      response = await response.read()
      print(response)
loop = asyncio.get_event_loop()
loop.run_until_complete(hello())

好吧,看上去仅仅一个简单的任务,我写了很多的代码……那里有“async def”、“async with”、“await”—— 看上去让人迷惑,让我们尝试弄懂它们。

你使用async以及await关键字将函数异步化。在hello()中实际上有两个异步操作:首先异步获取相应,然后异步读取响应的内容。

Aiohttp推荐使用ClientSession作为主要的接口发起请求。ClientSession允许在多个请求之间保存cookie以及相关对象信息。Session(会话)在使用完毕之后需要关闭,关闭Session是另一个异步操作,所以每次你都需要使用async with关键字。

一旦你建立了客户端session,你可以用它发起请求。这里是又一个异步操作的开始。上下文管理器的with语句可以保证在处理session的时候,总是能正确的关闭它。

要让你的程序正常的跑起来,你需要将他们加入事件循环中。所以你需要创建一个asyncio loop的实例, 然后将任务加入其中。

看起来有些困难,但是只要你花点时间进行思考与理解,就会有所体会,其实并没有那么复杂。

访问多个链接

现在我们来做些更有意思的事情,顺序访问多个链接。

同步方式如下:

for url in urls:
  print(requests.get(url).text)

很简单。不过异步方式却没有这么容易。所以任何时候你都需要思考,你的处境是否有必要用到异步。如果你的app在同步模式工作的很好,也许你并不需要将之迁移到异步方式。如果你确实需要异步方式,这里会给你一些启示。我们的异步函数hello()还是保持原样,不过我们需要将之包装在asyncio的Future对象中,然后将Future对象列表作为任务传递给事件循环。

loop = asyncio.get_event_loop()
tasks = [] # I'm using test server localhost, but you can use any url
url = "http://localhost:8080/{}"
for i in range(5):
  task = asyncio.ensure_future(hello(url.format(i)))
  tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))

现在假设我们想获取所有的响应,并将他们保存在同一个列表中。目前,我们没有保存响应内容,仅仅只是打印了他们。让我们返回他们,将之存储在一个列表当中,最后再打印出来。

为了达到这个目的,我们需要修改一下代码:

#!/usr/local/bin/python3.5
import asyncio
from aiohttp import ClientSession
async def fetch(url):
  async with ClientSession() as session:
   async with session.get(url) as response:
     return await response.read()
async def run(loop, r):
  url = "http://localhost:8080/{}"
  tasks = []
  for i in range(r):
    task = asyncio.ensure_future(fetch(url.format(i)))
    tasks.append(task)
    responses = await asyncio.gather(*tasks)
    # you now have all response bodies in this variable
    print(responses)
def print_responses(result):
  print(result)
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run(loop, 4))
loop.run_until_complete(future)

注意asyncio.gather()的用法,它搜集所有的Future对象,然后等待他们返回。

常见错误

现在我们来模拟真实场景,去调试一些错误,作为演示范例。

看看这个:

# WARNING! BROKEN CODE DO NOT COPY PASTE
async def fetch(url):
  async with ClientSession() as session:
   async with session.get(url) as response:
     return response.read()

如果你对aiohttp或者asyncio不够了解,即使你很熟悉Python,这段代码也不好debug。

上面的代码产生如下输出:

pawel@pawel-VPCEH390X ~/p/l/benchmarker> ./bench.py
[<generator object ClientResponse.read at 0x7fa68d465728>,
 <generator object ClientResponse.read at 0x7fa68cdd9468>,
 <generator object ClientResponse.read at 0x7fa68d4656d0>,
 <generator object ClientResponse.read at 0x7fa68cdd9af0>]

发生了什么?你期待获得响应对象,但是你得到的是一组生成器。怎么会这样?

我之前提到过,response.read()是一个异步操作,这意味着它不会立即返回结果,仅仅返回生成器。这些生成器需要被调用跟运行,但是这并不是默认行为。在Python34中加入的yield from以及Python35中加入的await便是为此而生。它们将迭代这些生成器。以上代码只需要在response.read()前加上await关键字即可修复。如下:

  # async operation must be preceded by await
  return await response.read()
  # NOT: return response.read()

我们看看另一个例子。

# WARNING! BROKEN CODE DO NOT COPY PASTE
async def run(loop, r):
  url = "http://localhost:8080/{}"
  tasks = []
  for i in range(r):
    task = asyncio.ensure_future(fetch(url.format(i)))
    tasks.append(task)
    responses = asyncio.gather(*tasks)
    print(responses)

输出结果如下:

pawel@pawel-VPCEH390X ~/p/l/benchmarker> ./bench.py
<_GatheringFuture pending>
Task was destroyed but it is pending!
task: <Task pending coro=<fetch() running at ./bench.py:7>
        wait_for=<Future pending cb=[Task._wakeup()]>
        cb=[gather.<locals>._done_callback(0)()
        at /usr/local/lib/python3.5/asyncio/tasks.py:602]>
Task was destroyed but it is pending!
task: <Task pending coro=<fetch() running at ./bench.py:7>
  wait_for=<Future pending cb=[Task._wakeup()]>
  cb=[gather.<locals>._done_callback(1)()
  at /usr/local/lib/python3.5/asyncio/tasks.py:602]>
Task was destroyed but it is pending!
task: <Task pending coro=<fetch() running at ./bench.py:7>
  wait_for=<Future pending cb=[Task._wakeup()]>
  cb=[gather.<locals>._done_callback(2)()
  at /usr/local/lib/python3.5/asyncio/tasks.py:602]>
Task was destroyed but it is pending!
task: <Task pending coro=<fetch() running at ./bench.py:7>
  wait_for=<Future pending cb=[Task._wakeup()]>
  cb=[gather.<locals>._done_callback(3)()
  at /usr/local/lib/python3.5/asyncio/tasks.py:602]>

发生了什么?查看本地日志,你会发现没有任何请求到达服务器,实际上没有任何请求发生。打印信息首先打印<_Gathering pending>对象,然后警告等待的任务被销毁。又一次的,你忘记了await。

修改

responses = asyncio.gather(*tasks)

responses = await asyncio.gather(*tasks)

即可解决问题。

经验:任何时候,你在等待什么的时候,记得使用await。

同步 vs 异步

重头戏来了。我们来验证异步是否值得(编码麻烦)。看看同步与异步(client)效率上的区别。异步每分钟能够发起多少请求。

为此,我们首先配置一个异步的aiohttp服务器端。这个服务端将获取全部的html文本, 来自Marry Shelley的Frankenstein。在每个响应中,它将添加随机的延时。有的为0,最大值为3s。类似真正的app。有些app的响应延时为固定值,一般而言,每个响应的延时是不同的。

服务器代码如下:

#!/usr/local/bin/python3.5
import asyncio
from datetime import datetime
from aiohttp import web
import random
# set seed to ensure async and sync client get same distribution of delay values
# and tests are fair random.seed(1)
async def hello(request):
  name = request.match_info.get("name", "foo")
  n = datetime.now().isoformat()
  delay = random.randint(0, 3)
  await asyncio.sleep(delay)
  headers = {"content_type": "text/html", "delay": str(delay)}
  # opening file is not async here, so it may block, to improve
  # efficiency of this you can consider using asyncio Executors
  # that will delegate file operation to separate thread or process
  # and improve performance
  # https://docs.python.org/3/library/asyncio-eventloop.html#executor
  # https://pymotw.com/3/asyncio/executors.html
  with open("frank.html", "rb") as html_body:
     print("{}: {} delay: {}".format(n, request.path, delay))
     response = web.Response(body=html_body.read(), headers=headers)
     return response
app = web.Application()
app.router.add_route("GET", "/{name}", hello)
web.run_app(app)

同步客户端代码如下:

import requests
r = 100
url = "http://localhost:8080/{}"
for i in range(r):
  res = requests.get(url.format(i))
 delay = res.headers.get("DELAY")
 d = res.headers.get("DATE")
 print("{}:{} delay {}".format(d, res.url, delay))

在我的机器上,上面的代码耗时2分45s。而异步代码只需要3.48s。

有趣的是,异步代码耗时无限接近最长的延时(server的配置)。如果你观察打印信息,你会发现异步客户端的优势有多么巨大。有的响应为0延迟,有的为3s。同步模式下,客户端会阻塞、等待,你的机器什么都不做。异步客户端不会浪费时间,当有延迟发生时,它将去做其他的事情。在日志中,你也会发现这个现象。首先是0延迟的响应,然后当它们到达后,你将看到1s的延迟,最后是最大延迟的响应。

极限测试

现在我们知道异步表现更好,让我们尝试去找到它的极限,同时尝试让它崩溃。我将发送1000异步请求。我很好奇我的客户端能够处理多少数量的请求。

> time python3 bench.py
2.68user 0.24system 0:07.14elapsed 40%CPU
(0avgtext+0avgdata 53704maxresident)
k 0inputs+0outputs (0major+14156minor)pagefaults 0swaps

1000个请求,花费了7s。相当不错的成绩。然后10K呢?很不幸,失败了:

responses are <_GatheringFuture finished exception=
  ClientOSError(24, 'Cannot connect to host localhost:8080 ssl:
  False [Can not connect to localhost:8080 [Too many open files]]')>
Traceback (most recent call last):
   File "/home/pawel/.local/lib/python3.5/site-packages/aiohttp/connector.py", line 581, in _create_connection
   File "/usr/local/lib/python3.5/asyncio/base_events.py", line 651, in create_connection
   File "/usr/local/lib/python3.5/asyncio/base_events.py", line 618, in create_connection
   File "/usr/local/lib/python3.5/socket.py", line 134, in __init__ OS
   Error: [Errno 24] Too many open files

这样不大好,貌似我倒在了10K connections problem面前。

traceback显示,open files太多了,可能代表着open sockets太多。为什么叫文件?Sockets(套接字)仅仅是文件描述符,操作系统有数量的限制。多少才叫太多呢?我查看Python源码,然后发现这个值为1024.怎么样绕过这个问题?一个粗暴的办法是增加这个数值,但是听起来并不高明。更好的办法是,加入一些同步机制,限制并发数量。于是我在asyncio.Semaphore()中加入最大任务限制为1000.

修改客户端代码如下:

# modified fetch function with semaphore
import random
import asyncio
from aiohttp import ClientSession
async def fetch(url):
  async with ClientSession() as session:
    async with session.get(url) as response:
     delay = response.headers.get("DELAY")
     date = response.headers.get("DATE")
     print("{}:{} with delay {}".format(date, response.url, delay))
     return await response.read()
async def bound_fetch(sem, url):
  # getter function with semaphore
  async with sem:
   await fetch(url)
  async def run(loop, r):
   url = "http://localhost:8080/{}"
   tasks = []
   # create instance of Semaphore
   sem = asyncio.Semaphore(1000)
   for i in range(r):
     # pass Semaphore to every GET request
     task = asyncio.ensure_future(bound_fetch(sem, url.format(i)))
     tasks.append(task)
     responses = asyncio.gather(*tasks)
await responses number = 10000
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run(loop, number))
loop.run_until_complete(future)

现在,我们可以处理10k链接了。这花去我们23s,同时返回了一些异常。不过不管怎样,相当不错的表现。

那100K呢?这个任务让我的机器很吃力,不过惊奇的是,它工作的很好。服务器的表现相当稳定,虽然内存占用很高,然后cpu占用一直维持在100%左右。让我觉得有趣的是,服务器占用的cpu明显小于client。这是ps的回显:

pawel@pawel-VPCEH390X ~/p/l/benchmarker> ps ua | grep python
USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
pawel     2447 56.3  1.0 216124 64976 pts/9    Sl+  21:26   1:27 /usr/local/bin/python3.5 ./test_server.py
pawel     2527  101  3.5 674732 212076 pts/0   Rl+  21:26   2:30 /usr/local/bin/python3.5 ./bench.py

最终因为某些原因,运行5分钟过后,它崩溃了。它生成了接近100K行的输出,所以很难定位traceback,好像某些响应没有正常关闭。具体原因不太确定。(client or server error)

一段时间的滚动以后,我找到了这个异常,在client日志中。

File "/usr/local/lib/python3.5/asyncio/futures.py", line 387, in __iter__
      return self.result()  # May raise too.
  File "/usr/local/lib/python3.5/asyncio/futures.py", line 274, in result
       raise self._exception
  File "/usr/local/lib/python3.5/asyncio/selector_events.py", line 411, in _sock_connect
       sock.connect(address) OS
  Error: [Errno 99] Cannot assign requested address

我不太确定这里发生了什么。我初始的猜测是测试服务器挂掉了。一个读者提出:这个异常的发生原因是操作系统的可用端口耗尽。之前我限制了并发连接数最大为1k,可能有些sockets仍然处在closing状态,系统内核无法使用才导致这个问题。

已经很不错了,不是吗?100k耗时5分钟。相当于一分钟20k请求数。

最后我尝试1M连接数。我真怕我的笔记本因为这个爆炸^_^.我特意将延迟降低为0到1s之间。最终耗时52分钟。

1913.06user 1196.09system 52:06.87elapsed 99%CPU
(0avgtext+0avgdata 5194260maxresident)k 265144
inputs+0outputs (18692major+2528207minor)
pagefaults 0swaps

这意味着,我们的客户端每分钟发送了19230次请求。还不错吧?注意客户端的性能被服务器限制了,好像服务器端崩溃了好几次。

最后

如你所见,异步HTTP客户端相当强大。发起1M请求不是那么困难,同时相比同步模式,优势巨大。

我好奇对比其他的语言或者异步框架,其表现如何?可能在以后某个时候,我将对比Twisted Treq跟aiohttp。然后,其他的异步库(其他语言)能够支持到多少并发?比如:某些Java 异步框架?或者C++框架?或者某些Rust HTTP客户端?

更多关于Python相关内容感兴趣的读者可查看本站专题:《Python进程与线程操作技巧总结》、《Python数据结构与算法教程》、《Python函数使用技巧总结》、《Python字符串操作技巧汇总》、《Python入门与进阶经典教程》、《Python+MySQL数据库程序设计入门教程》及《Python常见数据库操作技巧汇总》

希望本文所述对大家Python程序设计有所帮助。

(0)

相关推荐

  • python实现多线程的方式及多条命令并发执行

    一.概念介绍 Thread 是threading模块中最重要的类之一,可以使用它来创建线程.有两种方式来创建线程:一种是通过继承Thread类,重写它的run方法:另一种是创建一个threading.Thread对象,在它的初始化函数(__init__)中将可调用对象作为参数传入. Thread模块是比较底层的模块,Threading模块是对Thread做了一些包装的,可以更加方便的被使用. 另外在工作时,有时需要让多条命令并发的执行, 而不是顺序执行. 二.代码样例 #!/usr/bin/py

  • Python控制多进程与多线程并发数总结

    一.前言 本来写了脚本用于暴力破解密码,可是1秒钟尝试一个密码2220000个密码我的天,想用多线程可是只会一个for全开,难道开2220000个线程吗?只好学习控制线程数了,官方文档不好看,觉得结构不够清晰,网上找很多文章也都不很清晰,只有for全开线程,没有控制线程数的具体说明,最终终于根据多篇文章和官方文档算是搞明白基础的多线程怎么实现法了,怕长时间不用又忘记,找着麻烦就贴这了,跟我一样新手也可以参照参照. 先说进程和线程的区别: 地址空间:进程内的一个执行单元;进程至少有一个线程;它们共

  • python实现可以断点续传和并发的ftp程序

    前言 下载文件时,最怕中途断线,无法成功下载完整的文件.断点续传就是从文件中断的地方接下去下载,而不必重新下载.这项功能对于下载较大文件时非常有用.那么这篇文章就来给大家分享如何利用python实现可以断点续传和并发的ftp程序. 一.要求 1.用户md5认证 2.支持多用户同时登陆(并发) 3.进入用户的命令行模式,支持cd切换目录,ls查看目录子文件 4.执行命令(ipconfig) 5.传输文件: a.支持断点续传 b.传输中显示进度条 二.思路 1.客户端用户登录和注册: a.客户端仅提

  • 详解python异步编程之asyncio(百万并发)

    前言:python由于GIL(全局锁)的存在,不能发挥多核的优势,其性能一直饱受诟病.然而在IO密集型的网络编程里,异步处理比同步处理能提升成百上千倍的效率,弥补了python性能方面的短板,如最新的微服务框架japronto,resquests per second可达百万级. python还有一个优势是库(第三方库)极为丰富,运用十分方便.asyncio是python3.4版本引入到标准库,python2x没有加这个库,毕竟python3x才是未来啊,哈哈!python3.5又加入了asyn

  • Python多进程并发(multiprocessing)用法实例详解

    本文实例讲述了Python多进程并发(multiprocessing)用法.分享给大家供大家参考.具体分析如下: 由于Python设计的限制(我说的是咱们常用的CPython).最多只能用满1个CPU核心. Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情.借助这个包,可以轻松完成从单进程到并发执行的转换. 1.新建单一进程 如果我们新建少量进程,可以如下: import multiprocessing import t

  • python aiohttp的使用详解

    1.aiohttp的简单使用(配合asyncio模块) import asyncio,aiohttp async def fetch_async(url): print(url) async with aiohttp.request("GET",url) as r: reponse = await r.text(encoding="utf-8") #或者直接await r.read()不编码,直接读取,适合于图像等无法编码文件 print(reponse) task

  • 详解python并发获取snmp信息及性能测试

    python & snmp 用python获取snmp信息有多个现成的库可以使用,其中比较常用的是netsnmp和pysnmp两个库.网上有较多的关于两个库的例子. 本文重点在于如何并发的获取snmp的数据,即同时获取多台机器的snmp信息. netsnmp 先说netsnmp.python的netsnmp,其实是来自于net-snmp包. python通过一个c文件调用net-snmp的接口获取数据. 因此,在并发获取多台机器的时候,不能够使用协程获取.因为使用协程,在get数据的时候,协程会

  • python并发编程之多进程、多线程、异步和协程详解

    最近学习python并发,于是对多进程.多线程.异步和协程做了个总结. 一.多线程 多线程就是允许一个进程内存在多个控制权,以便让多个函数同时处于激活状态,从而让多个函数的操作同时运行.即使是单CPU的计算机,也可以通过不停地在不同线程的指令间切换,从而造成多线程同时运行的效果. 多线程相当于一个并发(concunrrency)系统.并发系统一般同时执行多个任务.如果多个任务可以共享资源,特别是同时写入某个变量的时候,就需要解决同步的问题,比如多线程火车售票系统:两个指令,一个指令检查票是否卖完

  • Python中asyncio与aiohttp入门教程

    很多朋友对异步编程都处于"听说很强大"的认知状态.鲜有在生产项目中使用它.而使用它的同学,则大多数都停留在知道如何使用 Tornado.Twisted.Gevent 这类异步框架上,出现各种古怪的问题难以解决.而且使用了异步框架的部分同学,由于用法不对,感觉它并没牛逼到哪里去,所以很多同学做 Web 后端服务时还是采用 Flask.Django等传统的非异步框架. 从上两届 PyCon 技术大会看来,异步编程已经成了 Python 生态下一阶段的主旋律.如新兴的 Go.Rust.Eli

  • Python中利用aiohttp制作异步爬虫及简单应用

    摘要: 简介 asyncio可以实现单线程并发IO操作,是Python中常用的异步处理模块.关于asyncio模块的介绍,笔者会在后续的文章中加以介绍,本文将会讲述一个基于asyncio实现的HTTP框架--aiohttp,它可以帮助我们异步地实现HTTP请求,从而使得我们的程序效率大大提高. 简介 asyncio可以实现单线程并发IO操作,是Python中常用的异步处理模块.关于asyncio模块的介绍,笔者会在后续的文章中加以介绍,本文将会讲述一个基于asyncio实现的HTTP框架--ai

  • Python实现多并发访问网站功能示例

    本文实例讲述了Python实现多并发访问网站功能.分享给大家供大家参考,具体如下: # Filename:visitweb_threads.py # Description:python visit web, get startTime, endTime, everytimes spentTime,threading import threading import urllib import time import datetime print 'num web SpentTime' def P

  • python高并发异步服务器核心库forkcore使用方法

    1 拷贝下面的代码到一个文件,并命名为forkcore.py 复制代码 代码如下: import osimport threadingimport selectimport socket class ds_forkcore(object): #async IO(epoll)    def ds_epoll(self):        epoll=select.epoll()        epoll.register(self.s.fileno(),select.EPOLLIN|select.E

随机推荐