Python并发concurrent.futures和asyncio实例

说明

Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码。

从Python3.2开始,标准库为我们提供了concurrent.futures模块,concurrent.futures 模块的主要特色是 ThreadPoolExecutor 和

ProcessPoolExecutor 类,这两个类实现的接口能分别在不同的线程或进程中执行可调

用的对象。这两个类在内部维护着一个工作线程或进程池,以及要执行的任务队列。

Python 3.4 以后标准库中asyncio 包,这个包使用事件循环驱动的协程实现并发。这是 Python 中最大也

是最具雄心壮志的库之一。asyncio 大量使用 yield from 表达式,因此与

Python 旧版不兼容。

submit和map方法

submit方法作用是向线程池提交可回调的task,并返回一个回调实例。

example:

import time
from concurrent.futures import ThreadPoolExecutor

# 可回调的task
def pub_task(msg):
  time.sleep(3)
  return msg

# 创建一个线程池
pool = ThreadPoolExecutor(max_workers=3)

# 往线程池加入2个task
task1 = pool.submit(pub_task, 'a')
task2 = pool.submit(pub_task, 'b')

print(task1.done())    # False
time.sleep(4)
print(task2.done())    # True

print(task1.result())
print(task2.result())

map方法是创建一个迭代器,回调的结果有序放在迭代器中。

问题:

Executor.map 函数易于使用,不过有个特性可能有用,也可能没用,具体情况取决于需求:这个函数返回结果的顺序与调用开始的顺序一致。

如果第一个调用生成结果用时 10秒,而其他调用只用 1 秒,代码会阻塞 10 秒,获取 map 方法返回的生成器产出的第一个结果。

在此之后,获取后续结果时不会阻塞,因为后续的调用已经结束。

如果必须等到获取所有结果后再处理,这种行为没问题;不过,通常更可取的方式是,不管提交的顺序,只要有结果就获取。

为此,要把 Executor.submit 方法和 futures.as_completed 函数结合起来使用。

from concurrent.futures import ThreadPoolExecutor
import requests

URLS = ['http://www.csdn.com', 'http://qq.com', 'http://www.leasonlove.cn']

def task(url, timeout=10):
  return requests.get(url, timeout=timeout)

pool = ThreadPoolExecutor(max_workers=3)
results = pool.map(task, URLS)

for ret in results:
  print('%s, %s' % (ret.url, ret))

future异步编程

Future可以理解为一个在未来完成的操作,这是异步编程的基础。通常情况下,我们执行io操作,访问url时(如下)在等待结果返回之前会产生阻塞,cpu不能做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
import requests

URLS = ['http://www.csdn.cn', 'http://qq.com', 'http://www.leasonlove.cn']

def task(url, timeout=1):
  return requests.get(url, timeout=timeout)

with ThreadPoolExecutor(max_workers=3) as executor:
  future_tasks = [executor.submit(task, url) for url in URLS]

  for f in future_tasks:
    if f.running():
      print('%s is running' % str(f))

  for f in as_completed(future_tasks):
    try:
      ret = f.done()
      if ret:
        f_ret = f.result()
        print('%s, done, result: %s, %s' % (str(f), f_ret.url, f_ret.content))
    except Exception as e:
      # 第一个url无响应
      f.cancel()
      print(str(e))

asyncio库协程实现并发

对于gevent 和 asyncio 建议大家放弃Gevent,拥抱asyncio,asyncio是Python3.4以后标准库。

而且由于Gevent直接修改标准库里面大部分的阻塞式系统调用,包括socket、ssl、threading和 select等模块,而变为协作式运行。

但是我们无法保证你在复杂的生产环境中有哪些地方使用这些标准库会由于打了补丁而出现奇怪的问题。

import asyncio
import time
start = time.time()

async def do(x):
  print('Waiting: ', x)
  await asyncio.sleep(x)
  return 'Finish after {}s'.format(x)

task1 = do(1)
task2 = do(2)
task3 = do(4)

tasks = [
  asyncio.ensure_future(task1),
  asyncio.ensure_future(task2),
  asyncio.ensure_future(task3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
  print('Task result: ', task.result())

end = time.time()
print('TIME: ', end - start)

协程与线程

如果使用线程做过重要的编程,你就知道写出程序有多么困难,因为调度程序任何时候都能中断线程。

必须记住保留锁,去保护程序中的重要部分,防止多步操作在执行的过程中中断,防止数据处于无效状态。

而协程默认会做好全方位保护,以防止中断。我们必须显式产出才能让程序的余下部分运行。

对协程来说,无需保留锁,在多个线程之间同步操作,协程自身就会同步,因为在任意时刻只有一个协程运行。

想交出控制权时,可以使用 yield 或 yield from 把控制权交还调度程序。

这就是能够安全地取消协程的原因:按照定义,协程只能在暂停的 yield处取消,因此可以处理 CancelledError 异常,执行清理操作。

补充知识:Python-什么时候使用yield?

简介

很多时候在python代码中见到了yield,没有系统学习过,自己也没有用过。

yield语句延迟了语句的执行,然后发送了一个值给调用者,但保留了一定的状态去保证函数离开之后可以继续。当继续的时候,函数继续执行上一个的运行状态。这使得它的代码可以随着时间产生一系列的值,而不是立即执行,然后像一个list一样发送他们回来。

例子

例子1:

# A Simple Python program to demonstrate working
# of yield 

# A generator function that yields 1 for first time,
# 2 second time and 3 third time
def simpleGeneratorFun():
  yield 1
  yield 2
  yield 3

# Driver code to check above generator function
for value in simpleGeneratorFun():
  print(value) 

返回语句发送一个特殊的值给它的调用者,而yield产生了一系列的值,当我们想要遍历一个序列的时候,我们应该使用yield,但不想要把整个序列存储在内存中。

yield用于python的生成器(generator)。一个genertator 被定义得看起来像一个普通函数一样,但它需要产生一个数字得时候,它使用yield,而不是使用return。如果一个函数里面定义了yield,那么它自动称为了一个generator函数。、

例子2:

# A Python program to generate squares from 1
# to 100 using yield and therefore generator 

# An infinite generator function that prints
# next square number. It starts with 1
def nextSquare():
  i = 1; 

  # An Infinite loop to generate squares
  while True:
    yield i*i
    i += 1 # Next execution resumes
        # from this point   

# Driver code to test above generator
# function
for num in nextSquare():
  if num > 100:
     break
  print(num) 

输出1,4,9…100

以上这篇Python并发concurrent.futures和asyncio实例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • 详解python异步编程之asyncio(百万并发)

    前言:python由于GIL(全局锁)的存在,不能发挥多核的优势,其性能一直饱受诟病.然而在IO密集型的网络编程里,异步处理比同步处理能提升成百上千倍的效率,弥补了python性能方面的短板,如最新的微服务框架japronto,resquests per second可达百万级. python还有一个优势是库(第三方库)极为丰富,运用十分方便.asyncio是python3.4版本引入到标准库,python2x没有加这个库,毕竟python3x才是未来啊,哈哈!python3.5又加入了asyn

  • Python concurrent.futures模块使用实例

    这篇文章主要介绍了Python concurrent.futures模块使用实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 concurrent.futures的作用: 管理并发任务池.concurrent.futures模块提供了使用工作线程或进程池运行任务的接口.线程和进程池API都是一样,所以应用只做最小的修改就可以在线程和进程之间地切换 1.基于线程池使用map() futures_thread_pool_map.py #!/usr

  • Python使用asyncio包处理并发详解

    阻塞型I/O和GIL CPython 解释器本身就不是线程安全的,因此有全局解释器锁(GIL),一次只允许使用一个线程执行 Python 字节码.因此,一个 Python 进程通常不能同时使用多个 CPU 核心. 然而,标准库中所有执行阻塞型 I/O 操作的函数,在等待操作系统返回结果时都会释放GIL.这意味着在 Python 语言这个层次上可以使用多线程,而 I/O 密集型 Python 程序能从中受益:一个 Python 线程等待网络响应时,阻塞型 I/O 函数会释放 GIL,再运行一个线程

  • Python并发concurrent.futures和asyncio实例

    说明 Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码. 从Python3.2开始,标准库为我们提供了concurrent.futures模块,concurrent.futures 模块的主要特色是 ThreadPoolExecutor 和 ProcessPoolExecutor 类,这两个类实现的接口能分别在不同的线程或进程中执行可调 用的对象.这两个类在内部维护着一个工作线程或进程池,以及要执行的任务队列. Python 3.4

  • python并发2之使用asyncio处理并发

    asyncio 在Python 2的时代,高性能的网络编程主要是使用Twisted.Tornado和Gevent这三个库,但是它们的异步代码相互之间既不兼容也不能移植.如上一节说的,Gvanrossum希望在Python 3 实现一个原生的基于生成器的协程库,其中直接内置了对异步IO的支持,这就是asyncio,它在Python 3.4被引入到标准库. asyncio 这个包使用事件循环驱动的协程实现并发. asyncio 包在引入标准库之前代号 "Tulip"(郁金香),所以在网上搜

  • Python并发编程之未来模块Futures

    目录 区分并发和并行 并发编程之Futures 到底什么是Futures? 为什么多线程每次只有一个线程执行? 总结 不论是哪一种语言,并发编程都是一项非常重要的技巧.比如我们上一章用的爬虫,就被广泛用在工业的各个领域.我们每天在各个网站.App上获取的新闻信息,很大一部分都是通过并发编程版本的爬虫获得的. 正确并合理的使用并发编程,无疑会给我们的程序带来极大性能上的提升.今天我们就一起学习Python中的并发编程——Futures. 区分并发和并行 我们在学习并发编程时,常常会听到两个词:并发

  • Python并发编程实例教程之线程的玩法

    目录 一.线程基础以及守护进程 二.线程锁(互斥锁) 三.线程锁(递归锁) 四.死锁 五.队列 六.相关面试题 七.判断数据是否安全 八.进程池 & 线程池 总结 一.线程基础以及守护进程 线程是CPU调度的最小单位 全局解释器锁 全局解释器锁GIL(global interpreter lock) 全局解释器锁的出现主要是为了完成垃圾回收机制的回收机制,对不同线程的引用计数的变化记录的更加精准. 全局解释器锁导致了同一个进程中的多个线程只能有一个线程真正被CPU执行. GIL锁每执行700条指

  • python并发编程之线程实例解析

    常用用法 t.is_alive() Python中线程会在一个单独的系统级别线程中执行(比如一个POSIX线程或者一个Windows线程) 这些线程将由操作系统来全权管理.线程一旦启动,将独立执行直到目标函数返回.可以通过查询 一个线程对象的状态,看它是否还在执行t.is_alive() t.join() 可以把一个线程加入到当前线程,并等待它终止 Python解释器在所有线程都终止后才继续执行代码剩余的部分 daemon 对于需要长时间运行的线程或者需要一直运行的后台任务,可以用后台线程(也称

  • python concurrent.futures模块的使用测试

    概述 concurrent.futures 是 3.2 中引入的新模块,它为异步执行可调用对象提供了高层接口. 可以使用 ThreadPoolExecutor 来进行多线程编程,ProcessPoolExecutor 进行多进程编程,两者实现了同样的接口,这些接口由抽象类 Executor 定义. 这个模块提供了两大类型,一个是执行器类 Executor,另一个是 Future 类. 执行器用来管理工作池,future 用来管理工作计算出来的结果,通常不用直接操作 future 对象,因为有丰富

  • Python并发编程队列与多线程最快发送http请求方式

    目录 队列+多线程 线程池 协程 + aiohttp grequests 最后的话 Python 并发编程有很多方法,多线程的标准库 threading,concurrency,协程 asyncio,当然还有 grequests 这种异步库,每一个都可以实现上述需求,下面一一用代码实现一下,本文的代码可以直接运行,给你以后的并发编程作为参考: 队列+多线程 定义一个大小为 400 的队列,然后开启 200 个线程,每个线程都是不断的从队列中获取 url 并访问. 主线程读取文件中的 url 放入

  • Python并发编程协程(Coroutine)之Gevent详解

    Gevent官网文档地址:http://www.gevent.org/contents.html 基本概念 我们通常所说的协程Coroutine其实是corporateroutine的缩写,直接翻译为协同的例程,一般我们都简称为协程. 在linux系统中,线程就是轻量级的进程,而我们通常也把协程称为轻量级的线程即微线程. 进程和协程 下面对比一下进程和协程的相同点和不同点: 相同点: 我们都可以把他们看做是一种执行流,执行流可以挂起,并且后面可以在你挂起的地方恢复执行,这实际上都可以看做是con

  • Python爬虫回测股票的实例讲解

    股票和基金一直是热门的话题,很多周围的人都选择不同种类的理财方式.就股票而言,肯定是短时间内收益最大化,这里我们需要用python爬虫的方法,来帮助我们获取一些股票的数据,这样才能更好的买到相应的股票.下面我们就python爬虫获取股票数据的方法带来详细的讲解. 1.生成上证与深证所有股票的代码: #上证代码 shanghaicode = [] for i in range(600000, 604000, 1): shanghaicode.append(str(i)) #深证代码 shenzhe

随机推荐