Python通过future处理并发问题

future初识

通过下面脚本来对future进行一个初步了解:

例子1:普通通过循环的方式

import os
import time
import sys
import requests
POP20_CC = (
 "CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR"
).split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'
def save_flag(img,filename):
 path = os.path.join(DEST_DIR,filename)
 with open(path,'wb') as fp:
 fp.write(img)
def get_flag(cc):
 url = "{}/{cc}/{cc}.gif".format(BASE_URL,cc=cc.lower())
 resp = requests.get(url)
 return resp.content
def show(text):
 print(text,end=" ")
 sys.stdout.flush()
def download_many(cc_list):
 for cc in sorted(cc_list):
 image = get_flag(cc)
 show(cc)
 save_flag(image,cc.lower()+".gif")
 return len(cc_list)
def main(download_many):
 t0 = time.time()
 count = download_many(POP20_CC)
 elapsed = time.time()-t0
 msg = "\n{} flags downloaded in {:.2f}s"
 print(msg.format(count,elapsed))
if __name__ == '__main__':
 main(download_many)

例子2:通过future方式实现,这里对上面的部分代码进行了复用

 from concurrent import futures
from flags import save_flag, get_flag, show, main
MAX_WORKERS = 20
def download_one(cc):
 image = get_flag(cc)
 show(cc)
 save_flag(image, cc.lower()+".gif")
 return cc
def download_many(cc_list):
 workers = min(MAX_WORKERS,len(cc_list))
 with futures.ThreadPoolExecutor(workers) as executor:
 res = executor.map(download_one, sorted(cc_list))
 return len(list(res))
if __name__ == '__main__':
 main(download_many)

分别运行三次,两者的平均速度:13.67和1.59s,可以看到差别还是非常大的。

future

future是concurrent.futures模块和asyncio模块的重要组件

从python3.4开始标准库中有两个名为Future的类:concurrent.futures.Future和asyncio.Future
这两个类的作用相同:两个Future类的实例都表示可能完成或者尚未完成的延迟计算。与Twisted中的Deferred类、Tornado框架中的Future类的功能类似

注意:通常情况下自己不应该创建future,而是由并发框架(concurrent.futures或asyncio)实例化

原因:future表示终将发生的事情,而确定某件事情会发生的唯一方式是执行的时间已经安排好,因此只有把某件事情交给concurrent.futures.Executor子类处理时,才会创建concurrent.futures.Future实例。
如:Executor.submit()方法的参数是一个可调用的对象,调用这个方法后会为传入的可调用对象排定时间,并返回一个

future

客户端代码不能应该改变future的状态,并发框架在future表示的延迟计算结束后会改变期物的状态,我们无法控制计算何时结束。

这两种future都有.done()方法,这个方法不阻塞,返回值是布尔值,指明future链接的可调用对象是否已经执行。客户端代码通常不会询问future是否运行结束,而是会等待通知。因此两个Future类都有.add_done_callback()方法,这个方法只有一个参数,类型是可调用的对象,future运行结束后会调用指定的可调用对象。

.result()方法是在两个Future类中的作用相同:返回可调用对象的结果,或者重新抛出执行可调用的对象时抛出的异常。但是如果future没有运行结束,result方法在两个Futrue类中的行为差别非常大。

对concurrent.futures.Future实例来说,调用.result()方法会阻塞调用方所在的线程,直到有结果可返回,此时,result方法可以接收可选的timeout参数,如果在指定的时间内future没有运行完毕,会抛出TimeoutError异常。

而asyncio.Future.result方法不支持设定超时时间,在获取future结果最好使用yield from结构,但是concurrent.futures.Future不能这样做

不管是asyncio还是concurrent.futures.Future都会有几个函数是返回future,其他函数则是使用future,在最开始的例子中我们使用的Executor.map就是在使用future,返回值是一个迭代器,迭代器的__next__方法调用各个future的result方法,因此我们得到的是各个futrue的结果,而不是future本身

关于future.as_completed函数的使用,这里我们用了两个循环,一个用于创建并排定future,另外一个用于获取future的结果

from concurrent import futures
from flags import save_flag, get_flag, show, main
MAX_WORKERS = 20
def download_one(cc):
 image = get_flag(cc)
 show(cc)
 save_flag(image, cc.lower()+".gif")
 return cc
def download_many(cc_list):
 cc_list = cc_list[:5]
 with futures.ThreadPoolExecutor(max_workers=3) as executor:
 to_do = []
 for cc in sorted(cc_list):
  future = executor.submit(download_one,cc)
  to_do.append(future)
  msg = "Secheduled for {}:{}"
  print(msg.format(cc,future))
 results = []
 for future in futures.as_completed(to_do):
  res = future.result()
  msg = "{}result:{!r}"
  print(msg.format(future,res))
  results.append(res)
 return len(results)
if __name__ == '__main__':
 main(download_many)

结果如下:

注意:Python代码是无法控制GIL,标准库中所有执行阻塞型IO操作的函数,在等待操作系统返回结果时都会释放GIL.运行其他线程执行,也正是因为这样,Python线程可以在IO密集型应用中发挥作用

以上都是concurrent.futures启动线程,下面通过它启动进程

concurrent.futures启动进程

concurrent.futures中的ProcessPoolExecutor类把工作分配给多个Python进程处理,因此,如果需要做CPU密集型处理,使用这个模块能绕开GIL,利用所有的CPU核心。

其原理是一个ProcessPoolExecutor创建了N个独立的Python解释器,N是系统上面可用的CPU核数。

使用方法和ThreadPoolExecutor方法一样

总结

(0)

相关推荐

  • python中利用await关键字如何等待Future对象完成详解

    前言 本文主要给大家介绍了关于python用await关键字等待Future对象完成的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 在下面的例子里,演示了怎么样使用await来等Future对象设置结果完成 示例代码如下: import asyncio def mark_done(future, result): print('setting future result to {!r}'.format(result)) future.set_result(resul

  • python中利用Future对象回调别的函数示例代码

    前言 本文主要给大家介绍了关于python中用Future对象回调别的函数的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. Future对象也可以像协程一样,当它设置完成结果时,就可以立即进行回调别的函数 例子如下: import asyncio import functools def callback(future, n): print('{}: future done: {}'.format(n, future.result())) async def regi

  • 介绍Python中的__future__模块

    Python的每个新版本都会增加一些新的功能,或者对原来的功能作一些改动.有些改动是不兼容旧版本的,也就是在当前版本运行正常的代码,到下一个版本运行就可能不正常了. 从Python 2.7到Python 3.x就有不兼容的一些改动,比如2.x里的字符串用'xxx'表示str,Unicode字符串用u'xxx'表示unicode,而在3.x中,所有字符串都被视为unicode,因此,写u'xxx'和'xxx'是完全一致的,而在2.x中以'xxx'表示的str就必须写成b'xxx',以此表示"二进制

  • python中利用Future对象异步返回结果示例代码

    前言 本文主要给大家介绍了关于python中用Future对象异步返回结果的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 一个Future是用来表示将来要完成的结果,异步循环可以自动完成对这种对象的状态触发. 例子如下: import asyncio def mark_done(future, result): print('setting future result to {!r}'.format(result)) future.set_result(result

  • Python通过future处理并发问题

    future初识 通过下面脚本来对future进行一个初步了解: 例子1:普通通过循环的方式 import os import time import sys import requests POP20_CC = ( "CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR" ).split() BASE_URL = 'http://flupy.org/data/flags' DEST_DIR = 'downloads/'

  • python使用期物处理并发教程

    目录 1. futures.ThreadPoolExecutor 2. 期物 3. 阻塞型I/O和GIL 4. 使用concurrent.futures模块启动进程 learning from <流畅的python> 1. futures.ThreadPoolExecutor import os import time import sys import requests POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG

  • python 添加用户设置密码并发邮件给root用户

    #!/usr/bin/env python #coding: utf8 import os import sys import mkpasswd //这是之前写的,直接调用 import string content = """username: ${user} password: ${password}""" t = string.Template(content) def adduser(username, pwd): os.system(&

  • 使用Python控制摄像头拍照并发邮件

    o1 前言 为什么会有写这个程序的想法呢? 最初的想法是写一个可以用电脑前置摄像头拍照的程序,在舍友使用你电脑的时候,不经意间获取到一大堆奇葩舍友的表情包. 然后我又突发奇想,要不搞个开机启动吧,这样我就可以看到是谁开启了我的电脑啦. 然后,突(nao)发(dong)奇(da)想(kai)的我又想到万一我电脑不在身边怎么办?要不再加个邮件发送机制吧,开机拍到照片再邮件发送给我?哈哈 02 工具 •deepin 15.9 •好看好用的国产linux系统 •python 2.7或者3.6 •解释器,

  • 对python多线程SSH登录并发脚本详解

    测试系统中有一项记录ssh登录日志,需要对此进行并发压力测试. 于是用多线程进行python并发记录 因为需要安装的一些依赖和模块比较麻烦,脚本完成后再用pyinstaller打成exe包分发给其他测试人员一起使用. 1.脚本编写 # -*- coding: utf-8 -*- import paramiko import threading import time lt = [] def ssh(a,xh,sp): count = 0 for i in range(0,xh): try: ss

  • python基于gevent实现并发下载器代码实例

    这篇文章主要介绍了python基于gevent实现并发下载器代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 并发下载原理 import gevent from gevent import monkey import urllib.request monkey.patch_all() def my_download(url): print('GET: %s' % url) resp = urllib.request.urlopen(url

  • Python多进程与服务器并发原理及用法实例分析

    本文实例分析了Python多进程与服务器并发原理及用法.分享给大家供大家参考,具体如下: 进程 什么是进程 进程:正在进行的一个过程或者说一个任务.而负责执行任务则是cpu. 进程与程序的区别 程序仅仅只是一堆代码而已,而进程指的是程序的运行过程. 并发与并行 无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务 一 并发:是伪并行,即看起来是同时运行.单个cpu+多道技术就可以实现

  • Python统计时间内的并发数代码实例

    这篇文章主要介绍了Python统计时间内的并发数代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 Python实现并发的手段: 1.操作系统提供:进程.线程: 2.编程语言提供:协程:用户空间的调度(py3): # coding:utf-8 # 1.导入模块 # datatime模块用于定义时间及时间的加减操作 # MySQLdb模块用于Python2.0连接数据库,Python3.0连接数据库使用pymysql # xlwt模块是exc

  • Python模块future用法原理详解

    这篇文章主要介绍了Python模块future用法原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 计算机的知识太多了,很多东西就是一个使用过程中详细积累的过程.最近遇到了一个很久关于future的问题,踩了坑,这里就做个笔记,免得后续再犯类似错误. future的作用:把下一个新版本的特性导入到当前版本,于是我们就可以在当前版本中测试一些新版本的特性.说的通俗一点,就是你不用更新python的版本,直接加这个模块,就可以使用python

  • python基于socketserver实现并发,验证客户端的合法性

    一.socketserver实现并发 tcp协议的socket是只能和一个客户端通信的,使用socketserver可以实现和多个客户端通信,他是在socket的基础上进行的封装,底层还是调用的socket. socket是底层模块 socketserver是基于socket完成的 socketserver代码格式: 服务端: import socketserver # 引入模块 import time ​ ​ # 类名随便定义,但是必须继承socketserver.BaseRequestHan

随机推荐