Python使用asyncio包处理并发详解

阻塞型I/O和GIL

CPython 解释器本身就不是线程安全的,因此有全局解释器锁(GIL),一次只允许使用一个线程执行 Python 字节码。因此,一个 Python 进程通常不能同时使用多个 CPU 核心。

然而,标准库中所有执行阻塞型 I/O 操作的函数,在等待操作系统返回结果时都会释放GIL。这意味着在 Python 语言这个层次上可以使用多线程,而 I/O 密集型 Python 程序能从中受益:一个 Python 线程等待网络响应时,阻塞型 I/O 函数会释放 GIL,再运行一个线程。

asyncio

这个包使用事件循环驱动的协程实现并发。 asyncio 大量使用 yield from 表达式,因此与Python 旧版不兼容。

asyncio 包使用的“协程”是较严格的定义。适合asyncio API 的协程在定义体中必须使用 yield from,而不能使用 yield。此外,适合 asyncio 的协程要由调用方驱动,并由调用方通过 yield from 调用;

示例1

import threading
import asyncio

@asyncio.coroutine
def hello():
  print('Start Hello', threading.currentThread())
  yield from asyncio.sleep(5)
  print('End Hello', threading.currentThread())

@asyncio.coroutine
def world():
  print('Start World', threading.currentThread())
  yield from asyncio.sleep(3)
  print('End World', threading.currentThread())

# 获取EventLoop:
loop = asyncio.get_event_loop()
tasks = [hello(), world()]
# 执行coroutine
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

@asyncio.coroutine把生成器函数标记为协程类型。
asyncio.sleep(3) 创建一个3秒后完成的协程。
loop.run_until_complete(future),运行直到future完成;如果参数是 coroutine object,则需要使用 ensure_future()函数包装。
loop.close() 关闭事件循环

示例2

import asyncio

@asyncio.coroutine
def worker(text):
  """
  协程运行的函数
  :param text:
  :return:
  """
  i = 0
  while True:
    print(text, i)

    try:
      yield from asyncio.sleep(.1)
    except asyncio.CancelledError:
      break

    i += 1

@asyncio.coroutine
def client(text, io_used):
  worker_fu = asyncio.ensure_future(worker(text))

  # 假装等待I/O一段时间
  yield from asyncio.sleep(io_used)

  # 结束运行协程
  worker_fu.cancel()
  return 'done'

loop = asyncio.get_event_loop()
tasks = [client('xiaozhe', 3), client('zzzz', 5)]
result = loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print('Answer:', result)

解释:

1. asyncio.ensure_future(coro_or_future, *, loop=None):计划安排一个 coroutine object的执行,返回一个 asyncio.Task object。
2. worker_fu.cancel(): 取消一个协程的执行,抛出CancelledError异常。
3. asyncio.wait():协程的参数是一个由期物或协程构成的可迭代对象; wait 会分别把各个协程包装进一个 Task 对象。

asyncio.Task 对象与threading.Thread对象的比较

asyncio.Task 对象差不多与 threading.Thread 对象等效。
Task 对象用于驱动协程, Thread 对象用于调用可调用的对象。
Task 对象不由自己动手实例化,而是通过把协程传给 asyncio.ensure_future(…) 函数或loop.create_task(…) 方法获取。
获取的 Task 对象已经排定了运行时间;Thread 实例则必须调用 start 方法,明确告知让它运行。
如果想终止任务,可以使用 Task.cancel() 实例方法,在协程内部抛出CancelledError 异常。

线程与协程的安全比较

如果使用线程做过重要的编程,因为调度程序任何时候都能中断线程。必须记住保留锁,去保护程序中的重要部分,防止多步操作在执行的过程中中断,防止数据处于无效状态。

协程默认会做好全方位保护,以防止中断。我们必须显式产出才能让程序的余下部分运行。对协程来说,无需保留锁,在多个线程之间同步操作,协程自身就会同步,因为在任意时刻只有一个协程运行。想交出控制权时,可以使用 yield 或 yield from 把控制权交还调度程序。这就是能够安全地取消协程的原因:按照定义,协程只能在暂停的 yield处取消,因此可以处理 CancelledError 异常,执行清理操作。

Future(期物)

通常情况下自己不应该创建期物,而只能由并发框架(concurrent.futures 或 asyncio)实例化。原因很简单:期物表示终将发生的事情,而确定某件事会发生的唯一方式是执行的时间已经排定。

asyncio.Future

在 asyncio 包中, BaseEventLoop.create_task(…) 方法接收一个协程,排定它的运行时间,然后返回一个 asyncio.Task 实例——也是 asyncio.Future 类的实例,因为 Task 是Future 的子类,用于包装协程。

asyncio.ensure_future(coro_or_future, *, loop=None)

这个函数统一了协程和期物:第一个参数可以是二者中的任何一个。如果是 Future 或 Task 对象,那就原封不动地返回。如果是协程,那么 async 函数会调用loop.create_task(…) 方法创建 Task 对象。 loop= 关键字参数是可选的,用于传入事件循环;如果没有传入,那么 async 函数会通过调用 asyncio.get_event_loop() 函数获取循环对象。

BaseEventLoop.create_task(coro)

这个方法排定协程的执行时间,返回一个 asyncio.Task 对象。

asyncio 包中有多个函数会自动把参数指定的协程包装在 asyncio.Task 对象中,例如 BaseEventLoop.run_until_complete(…) 方法。

asyncio.as_completed

为了集成进度条,我们可以使用的是 as_completed 生成器函数;幸好, asyncio 包提供了这个生成器函数的相应版本。

使用asyncio和aiohttp包

从 Python 3.4 起, asyncio 包只直接支持 TCP 和 UDP。如果想使用 HTTP 或其他协议,那么要借助第三方包 aiohttp 。

cc_list = ['China', 'USA']

@asyncio.coroutine
def get_flag(cc):
  url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
  resp = yield from aiohttp.request('GET', url)
  image = yield from resp.read()
  return image

@asyncio.coroutine
def download_one(name):
  image = yield from get_flag(name)
  save_flag(image, name.lower() + '.gif')
  return name

loop = asyncio.get_event_loop()
wait_coro = asyncio.wait([download_one(cc) for cc in sorted(cc_list)])
res, _ = loop.run_until_complete(wait_coro)
loop.close()

使用 asyncio 包时,我们编写的异步代码中包含由 asyncio 本身驱动的协程(即委派生成器),而生成器最终把职责委托给 asyncio 包或第三方库(如aiohttp)中的协程。这种处理方式相当于架起了管道,让 asyncio 事件循环(通过我们编写的协程)驱动执行低层异步 I/O 操作的库函数。

避免阻塞型调用

有两种方法能避免阻塞型调用中止整个应用程序的进程:
1. 在单独的线程中运行各个阻塞型操作
2. 把每个阻塞型操作转换成非阻塞的异步调用使用

多个线程是可以的,但是各个操作系统线程(Python 使用的是这种线程)消耗的内存达兆字节(具体的量取决于操作系统种类)。如果要处理几千个连接,而每个连接都使用一个线程的话,我们负担不起。

把生成器当作协程使用是异步编程的另一种方式。对事件循环来说,调用回调与在暂停的协程上调用 .send() 方法效果差不多。各个暂停的协程是要消耗内存,但是比线程消耗的内存数量级小。

上面的脚本为什么会很快

在上面的脚本中,调用 loop.run_until_complete 方法时,事件循环驱动各个download_one 协程,运行到第一个 yield from 表达式处时,那个表达式驱动各个get_flag 协程,然后在get_flag协程里面运行到第一个 yield from 表达式处时,调用 aiohttp.request(…)函数。这些调用都不会阻塞,因此在零点几秒内所有请求全部开始。

asyncio 的基础设施获得第一个响应后,事件循环把响应发给等待结果的 get_flag 协程。得到响应后, get_flag 向前执行到下一个 yield from 表达式处,调用resp.read() 方法,然后把控制权还给主循环。其他响应会陆续返回。所有 get_ flag 协程都获得结果后,委派生成器 download_one 恢复,保存图像文件。

async和await

为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法async和await,可以让coroutine的代码更简洁易读。

async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换。
1. 把@asyncio.coroutine替换为async
2. 把yield from替换为await

例如:

@asyncio.coroutine
def hello():
  print("Hello world!")
  r = yield from asyncio.sleep(1)
  print("Hello again!")

等同于

async def hello():
  print("Hello world!")
  r = await asyncio.sleep(1)
  print("Hello again!")

网站请求实例

import asyncio
import aiohttp

urls = [
  'http://www.163.com/',
  'http://www.sina.com.cn/',
  'https://www.hupu.com/',
  'http://www.csdn.net/'
]

async def get_url_data(u):
  """
  读取url的数据
  :param u:
  :return:
  """
  print('running ', u)
  async with aiohttp.ClientSession() as session:
    async with session.get(u) as resp:
      print(u, resp.status, type(resp.text()))
      # print(await resp.text())

  return resp.headers

async def request_url(u):
  """
  主调度函数
  :param u:
  :return:
  """
  res = await get_url_data(u)
  return res

loop = asyncio.get_event_loop()
task_lists = asyncio.wait([request_url(u) for u in urls])
all_res, _ = loop.run_until_complete(task_lists)
loop.close()

print(all_res)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Python中使用asyncio 封装文件读写

    前言 和网络 IO 一样,文件读写同样是一个费事的操作. 默认情况下,Python 使用的是系统的阻塞读写.这意味着在 asyncio 中如果调用了 f = file('xx') f.read() 会阻塞事件循环. 本篇简述如何用 asyncio.Future 对象来封装文件的异步读写. 代码在 GitHub.目前仅支持 Linux. 阻塞和非阻塞 首先需要将文件的读写改为非阻塞的形式.在非阻塞情况下,每次调用 read 都会立即返回,如果返回值为空,则意味着文件操作还未完成,反之则是读取的文件

  • 探索Python3.4中新引入的asyncio模块

    使用 Simple Protocol asyncio.BaseProtocol 类是asyncio模块中协议接口(protocol interface)的一个常见的基类.asyncio.Protocolclass 继承自asyncio.BaseProtocol 并为stream protocols提供了一个接口.下面的代码演示了asyncio.Protocol 接口的一个简单实现,它的行为1就像一个echo server,同时,它还会在Python的控制台中输出一些信息.SimpleEchoPr

  • 在Python3中使用asyncio库进行快速数据抓取的教程

    web数据抓取是一个经常在python的讨论中出现的主题.有很多方法可以用来进行web数据抓取,然而其中好像并没有一个最好的办法.有一些如scrapy这样十分成熟的框架,更多的则是像mechanize这样的轻量级库.DIY自己的解决方案同样十分流行:你可以使用requests.beautifulsoup或者pyquery来实现. 方法如此多样的原因在于,数据"抓取"实际上包括很多问题:你不需要使用相同的工具从成千上万的页面中抓取数据,同时使一些Web工作流自动化(例如填一些表单然后取回

  • Python使用asyncio包处理并发详解

    阻塞型I/O和GIL CPython 解释器本身就不是线程安全的,因此有全局解释器锁(GIL),一次只允许使用一个线程执行 Python 字节码.因此,一个 Python 进程通常不能同时使用多个 CPU 核心. 然而,标准库中所有执行阻塞型 I/O 操作的函数,在等待操作系统返回结果时都会释放GIL.这意味着在 Python 语言这个层次上可以使用多线程,而 I/O 密集型 Python 程序能从中受益:一个 Python 线程等待网络响应时,阻塞型 I/O 函数会释放 GIL,再运行一个线程

  • Python安装依赖(包)模块方法详解

    Python模块,简单说就是一个.py文件,其中可以包含我们需要的任意Python代码.迄今为止,我们所编写的所有程序都包含在单独的.py文件中,因此,它们既是程序,同时也是模块.关键的区别在于,程序的设计目标是运行,而模块的设计目标是由其他程序导入并使用. 不是所有程序都有相关联的.py文件-比如说,sys模块就内置于Python中,还有些模块是使用其他语言(最常见的是C语言)实现的.不过,Python的大多数库文件都是使用Python实现的,因此,比如说,我们使用了语句import coll

  • Python学习之包与模块详解

    目录 什么是 Python 的包与模块 包的身份证 如何创建包 创建包的小练习 包的导入 - import 模块的导入 - from…import 导入子包及子包函数的调用 导入主包及主包的函数调用 导入的包与子包模块之间过长如何优化 强大的第三方包 什么是第三方包 如何安装第三方包 总结 大家好,学完面向对象与异常处理机制之后,接下里我们要学习 包与模块 .首先我们要了解什么是包?什么是模块?接下来我们还要学习 如何自定义创建包.自定义创建模块以及如何导入包与模块.最后我们在学习如何使用第三方

  • Python使用asyncio包处理并发的实现代码

    使用 asyncio 包处理并发 asyncio包:使用事件循环驱动的协程实现并发. 线程与协程的对比 '\ thinking' 旋转等待效果 In [1]: import threading In [2]: import itertools In [3]: import time,sys In [4]: class Signal: # 定义一个简单的可变对象:go 属性 从外部控制线程 ...: go = True In [5]: def spin(msg,signal): ...: w,fl

  • Python中的并发处理之asyncio包使用的详解

    导语:本文章记录了本人在学习Python基础之控制流程篇的重点知识及个人心得,打算入门Python的朋友们可以来一起学习并交流. 本文重点: 1.了解asyncio包的功能和使用方法: 2.了解如何避免阻塞型调用: 3.学会使用协程避免回调地狱. 一.使用asyncio包做并发编程 1.并发与并行 并发:一次处理多件事. 并行:一次做多件事. 并发用于制定方案,用来解决可能(但未必)并行的问题.并发更好. 2.asyncio概述 了解asyncio的4个特点: asyncio包使用事件循环驱动的

  • Python Asyncio库之asyncio.task常用函数详解

    目录 前记 0.基础 1.休眠--asyncio.sleep 2.屏蔽取消--asyncio.shield 3.超时--asyncio.wait_for 4.简单的等待--wait 5.迭代可等待对象的完成--asyncio.as_completed 前记 Asyncio在经过一段时间的发展以及获取Curio等第三方库的经验来提供更多的功能,目前高级功能也基本完善,但是相对于其他语言,Python的Asyncio高级功能还是不够的,但好在Asyncio的低级API也比较完善,开发者可以通过参考A

  • Python学习之os包使用教程详解

    目录 os 模块 文件与目录函数介绍 path 模块 ​path 模块常用方法 今天我们来学习一下 python 的内置包 —> OS 包.OS 包拥有着普遍的操作系统功能,拥有着各种各样的函数来操作系统的驱动功能.其中最常用的就是对 路径 与 文件的操作,比如检查某个路径下是否存在某个文件,某个路径是否存在等.也可以创建.删除文件等,接下来我们就详细的看一看 OS 中关于文件的操作功能与用法. os 模块 文件与目录函数介绍 函数名 参数 介绍 举例  返回值 getcwd 没有参数 返回当前

  • Python学习之时间包使用教程详解

    目录 datetime 时间包 datetime 时间包的常用功能 时间对象转字符串 时间字符串转时间类型 时间格式符 time模块 认识时间戳 time 模块的常用方法 datetime 中生成时间戳的函数 datetime 时间戳转时间对象 datetime 时间包 认识 datetime 时间包: date:日期:time:时间:所以 datetime 就是 日期与时间的结合体 使用 datetime 我们就可以获取当前的时间与时间间隔 可以将时间对象转成时间字符串 也可以将字符串转成时间

  • Python asyncio常用函数使用详解

    目录 协程的定义 协程的运行 多个协程运行 关于loop.close() 回调 事件循环 协程的定义 需要使用 async def 语句 协程可以做哪些事: 1.等待一个future结果 2.等待另一个协程(产生一个结果或引发一个异常) 3.产生一个结果给正在等它的协程 4.引发一个异常给正在等它的协程 协程的运行 调用协程函数,协程不会开始运行,只是返回一个协程对象 要让协程对象运行有两种方式: 1.在另一个已经运行的协程中用await等待它 2.通过ensure_future函数计划它的执行

  • python安装及变量名介绍详解

    python基础部分学习 一,python介绍 python的创始人为吉多·范罗苏姆(Guido van Rossum).1989年的圣诞节期间,Guido开始写能够解释Python语言语法的解释器.Python这个名字,来自Guido所挚爱的电视剧Monty Python's Flying Circus.他希望这个新的叫做Python的语言,能符合他的理想:创造一种C和shell之间,功能全面,易学易用,可拓展的语言. Python可以应用于众多领域,如:数据分析.组件集成.网络服务.图像处理

随机推荐