深入了解Python并发编程

目录
  • 并发方式
    • 线程([Thread])
    • 进程 (Process)
    • 远程分布式主机 (Distributed Node)
    • 伪线程 (Pseudo-Thread)
  • 实战运用
    • 计算密集型
    • IO密集型
  • 总结

并发方式

线程([Thread])

多线程几乎是每一个程序猿在使用每一种语言时都会首先想到用于解决并发的工具(JS程序员请回避),使用多线程可以有效的利用CPU资源(Python例外)。然而多线程所带来的程序的复杂度也不可避免,尤其是对竞争资源的同步问题。

然而在python中由于使用了全局解释锁(GIL)的原因,代码并不能同时在多核上并发的运行,也就是说,Python的多线程不能并发,很多人会发现使用多线程来改进自己的Python代码后,程序的运行效率却下降了,这是多么蛋疼的一件事呀!实际上使用多线程的编程模型是很困难的,程序员很容易犯错,这并不是程序员的错误,因为并行思维是反人类的,我们大多数人的思维是串行(精神分裂不讨论),而且冯诺依曼设计的计算机架构也是以顺序执行为基础的。所以如果你总是不能把你的多线程程序搞定,恭喜你,你是个思维正常的程序猿:)

Python提供两组线程的接口,一组是thread模块,提供基础的,低等级(Low Level)接口,使用Function作为线程的运行体。还有一组是threading模块,提供更容易使用的基于对象的接口(类似于Java),可以继承Thread对象来实现线程,还提供了其它一些线程相关的对象,例如Timer,Lock

使用thread模块的例子

import thread
def worker():
    """thread worker function"""
    print 'Worker'
thread.start_new_thread(worker)

使用threading模块的例子

import threading
def worker():
    """thread worker function"""
    print 'Worker'
t = threading.Thread(target=worker)
t.start()

或者Java Style

import threading
class worker(threading.Thread):
    def __init__(self):
        pass
    def run():
        """thread worker function"""
        print 'Worker'

t = worker()
t.start()

进程 (Process)

由于前文提到的全局解释锁的问题,Python下比较好的并行方式是使用多进程,这样可以非常有效的使用CPU资源,并实现真正意义上的并发。当然,进程的开销比线程要大,也就是说如果你要创建数量惊人的并发进程的话,需要考虑一下你的机器是不是有一颗强大的心。

Python的mutliprocess模块和threading具有类似的接口。

from multiprocessing import Process

def worker():
    """thread worker function"""
    print 'Worker'
p = Process(target=worker)
p.start()
p.join()

由于线程共享相同的地址空间和内存,所以线程之间的通信是非常容易的,然而进程之间的通信就要复杂一些了。常见的进程间通信有,管道,消息队列,Socket接口(TCP/IP)等等。

Python的mutliprocess模块提供了封装好的管道和队列,可以方便的在进程间传递消息。

Python进程间的同步使用锁,这一点喝线程是一样的。

另外,Python还提供了进程池Pool对象,可以方便的管理和控制线程。

远程分布式主机 (Distributed Node)

随着大数据时代的到临,摩尔定理在单机上似乎已经失去了效果,数据的计算和处理需要分布式的计算机网络来运行,程序并行的运行在多个主机节点上,已经是现在的软件架构所必需考虑的问题。

远程主机间的进程间通信有几种常见的方式

  • TCP/IP

TCP/IP是所有远程通信的基础,然而API比较低级别,使用起来比较繁琐,所以一般不会考虑

  • 远程方法调用 Remote Function Call

[RPC]

  • 远程对象 Remote Object

远程对象是更高级别的封装,程序可以想操作本地对象一样去操作一个远程对象在本地的代理。远程对象最广为使用的规范CORBA,CORBA最大的好处是可以在不同语言和平台中进行通信。当让不用的语言和平台还有一些各自的远程对象实现,例如Java的RMI,MS的DCOM

Python的开源实现,有许多对远程对象的支持

  • Dopy]
  • Fnorb (CORBA)
  • ICE
  • omniORB (CORBA)
  • Pyro
  • YAMI
  • 消息队列 Message Queue

比起RPC或者远程对象,消息是一种更为灵活的通信手段,常见的支持Python接口的消息机制有

  • RabbitMQ
  • ZeroMQ
  • Kafka
  • AWS SQS + BOTO

在远程主机上执行并发和本地的多进程并没有非常大的差异,都需要解决进程间通信的问题。当然对远程进程的管理和协调比起本地要复杂。

Python下有许多开源的框架来支持分布式的并发,提供有效的管理手段包括:

  • Celery

Celery是一个非常成熟的Python分布式框架,可以在分布式的系统中,异步的执行任务,并提供有效的管理和调度功能。

  • SCOOP

SCOOP (Scalable COncurrent Operations in Python)提供简单易用的分布式调用接口,使用Future接口来进行并发。

  • Dispy

相比起Celery和SCOOP,Dispy提供更为轻量级的分布式并行服务

  • PP

PP (Parallel Python)是另外一个轻量级的Python并行服务

  • Asyncoro

Asyncoro是另一个利用Generator实现分布式并发的Python框架,

当然还有许多其它的系统,我没有一一列出

另外,许多的分布式系统多提供了对Python接口的支持,例如Spark

伪线程 (Pseudo-Thread)

还有一种并发手段并不常见,我们可以称之为伪线程,就是看上去像是线程,使用的接口类似线程接口,但是实际使用非线程的方式,对应的线程开销也不存的。

  • greenlet

greenlet提供轻量级的coroutines来支持进程内的并发。

greenlet是Stackless的一个副产品,使用tasklet来支持一中被称之为微线程(mirco-thread)的技术,这里是一个使用greenlet的伪线程的例子

from greenlet import greenlet

def test1():
    print 12
    gr2.switch()
    print 34

def test2():
    print 56
    gr1.switch()
    print 78

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

运行以上程序得到如下结果:

12
56
34

伪线程gr1 switch会打印12,然后调用gr2 switch得到56,然后switch回到gr1,打印34,然后伪线程gr1结束,程序退出,所以78永远不会被打印。通过这个例子我们可以看出,使用伪线程,我们可以有效的控制程序的执行流程,但是伪线程并不存在真正意义上的并发。

eventlet,gevent和concurence都是基于greenlet提供并发的。

eventlet是一个提供网络调用并发的Python库,使用者可以以非阻塞的方式来调用阻塞的IO操作。

import eventlet
from eventlet.green import urllib2

urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']

def fetch(url):
    return urllib2.urlopen(url).read()

pool = eventlet.GreenPool()

for body in pool.imap(fetch, urls):
    print("got body", len(body))

执行结果如下

('got body', 17629)
('got body', 1270)
('got body', 46949)

eventlet为了支持generator的操作对urllib2做了修改,接口和urllib2是一致的。这里的GreenPool和Python的Pool接口一致。

  • gevent

gevent和eventlet类似,

import gevent
from gevent import socket
urls = ['www.google.com', 'www.example.com', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=2)

print [job.value for job in jobs]

执行结果如下:

['206.169.145.226', '93.184.216.34', '23.235.39.223']

concurence是另外一个利用greenlet提供网络并发的开源库,我没有用过,大家可以自己尝试一下。

实战运用

通常需要用到并发的场合有两种,一种是计算密集型,也就是说你的程序需要大量的CPU资源;另一种是IO密集型,程序可能有大量的读写操作,包括读写文件,收发网络请求等等。

计算密集型

对应计算密集型的应用,我们选用著名的蒙特卡洛算法来计算PI值。基本原理如下

蒙特卡洛算法利用统计学原理来模拟计算圆周率,在一个正方形中,一个随机的点落在1/4圆的区域(红色点)的概率与其面积成正比。也就该概率 p = Pi * R*R /4 : R* R , 其中R是正方形的边长,圆的半径。也就是说该概率是圆周率的1/4, 利用这个结论,只要我们模拟出点落在四分之一圆上的概率就可以知道圆周率了,为了得到这个概率,我们可以通过大量的实验,也就是生成大量的点,看看这个点在哪个区域,然后统计出结果。

基本算法如下:

from math import hypot
from random import random

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

这里test方法做了n(tries)次试验,返回落在四分之一圆中的点的个数。判断方法是检查该点到圆心的距离,如果小于R则是在圆上。

通过大量的并发,我们可以快速的运行多次试验,试验的次数越多,结果越接近真实的圆周率。

这里给出不同并发方法的程序代码

  • 非并发

我们先在单线程,但进程运行,看看性能如何

from math import hypot
from random import random
import eventlet
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    result = map(test, [tries] * nbFutures)

    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)
  • 多线程 thread

为了使用线程池,我们用multiprocessing的dummy包,它是对多线程的一个封装。注意这里代码虽然一个字的没有提到线程,但它千真万确是多线程。

通过测试我们开(jing)心(ya)的发现,果然不出所料,当线程池为1是,它的运行结果和没有并发时一样,当我们把线程池数字设置为5时,耗时几乎是没有并发的2倍,我的测试数据从5秒到9秒。所以对于计算密集型的任务,还是放弃多线程吧。

from multiprocessing.dummy import Pool

from math import hypot
from random import random
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    p = Pool(1)
    result = p.map(test, [tries] * nbFutures)
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

if __name__ == '__main__':
    p = Pool()
    print("pi = {}".format(calcPi(3000, 4000)))
  • 多进程 multiprocess

理论上对于计算密集型的任务,使用多进程并发比较合适,在以下的例子中,进程池的规模设置为5,修改进程池的大小可以看到对结果的影响,当进程池设置为1时,和多线程的结果所需的时间类似,因为这时候并不存在并发;当设置为2时,响应时间有了明显的改进,是之前没有并发的一半;然而继续扩大进程池对性能影响并不大,甚至有所下降,也许我的Apple Air的CPU只有两个核?

当心,如果你设置一个非常大的进程池,你会遇到 Resource temporarily unavailable的错误,系统并不能支持创建太多的进程,毕竟资源是有限的。

from multiprocessing import Pool

from math import hypot
from random import random
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    p = Pool(5)
    result = p.map(test, [tries] * nbFutures)
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

if __name__ == '__main__':
    print("pi = {}".format(calcPi(3000, 4000)))
  • gevent (伪线程)

不论是gevent还是eventlet,因为不存在实际的并发,响应时间和没有并发区别不大,这个和测试结果一致。

import gevent
from math import hypot
from random import random
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    jobs = [gevent.spawn(test, t) for t in [tries] * nbFutures]
    gevent.joinall(jobs, timeout=2)
    ret = 4. * sum([job.value for job in jobs]) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)
  • eventlet (伪线程)
from math import hypot
from random import random
import eventlet
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    pool = eventlet.GreenPool()
    result = pool.imap(test, [tries] * nbFutures)

    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)
  • SCOOP

SCOOP中的Future接口符合PEP-3148的定义,也就是在Python3中提供的Future接口。

在缺省的SCOOP配置环境下(单机,4个Worker),并发的性能有提高,但是不如两个进程池配置的多进程。

from math import hypot
from random import random
from scoop import futures

import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    expr = futures.map(test, [tries] * nbFutures)
    ret = 4. * sum(expr) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

if __name__ == "__main__":
    print("pi = {}".format(calcPi(3000, 4000)))
  • Celery

任务代码

from celery import Celery

from math import hypot
from random import random

app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'

@app.task
def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

客户端代码

from celery import group
from tasks import test

import time

def calcPi(nbFutures, tries):
    ts = time.time()
    result = group(test.s(tries) for i in xrange(nbFutures))().get()

    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000, 4000)

使用Celery做并发的测试结果出乎意料(环境是单机,4frefork的并发,消息broker是rabbitMQ),是所有测试用例里最糟糕的,响应时间是没有并发的5~6倍。这也许是因为控制协调的开销太大。对于这样的计算任务,Celery也许不是一个好的选择。

  • asyncoro

Asyncoro的测试结果和非并发保持一致。

import asyncoro

from math import hypot
from random import random
import time

def test(tries):
    yield sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    coros = [ asyncoro.Coro(test,t) for t in [tries] * nbFutures]
    ret = 4. * sum([job.value() for job in coros]) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)

IO密集型

IO密集型的任务是另一种常见的用例,例如网络WEB服务器就是一个例子,每秒钟能处理多少个请求时WEB服务器的重要指标。

我们就以网页读取作为最简单的例子

from math import hypot
import time
import urllib2

urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']

def test(url):
    return urllib2.urlopen(url).read()

def testIO(nbFutures):
    ts = time.time()
    map(test, urls * nbFutures)

    span = time.time() - ts
    print "time spend ", span

testIO(10)

在不同并发库下的代码,由于比较类似,我就不一一列出。大家可以参考计算密集型中代码做参考。

通过测试我们可以发现,对于IO密集型的任务,使用多线程,或者是多进程都可以有效的提高程序的效率,而使用伪线程性能提升非常显著,eventlet比没有并发的情况下,响应时间从9秒提高到0.03秒。同时eventlet/gevent提供了非阻塞的异步调用模式,非常方便。这里推荐使用线程或者伪线程,因为在响应时间类似的情况下,线程和伪线程消耗的资源更少。

总结

Python提供了不同的并发方式,对应于不同的场景,我们需要选择不同的方式进行并发。选择合适的方式,不但要对该方法的原理有所了解,还应该做一些测试和试验,数据才是你做选择的最好参考。

以上就是深入了解Python并发编程的详细内容,更多关于Python并发编程的资料请关注我们其它相关文章!

(0)

相关推荐

  • 详解Python并发编程之创建多线程的几种方法

    大家好,并发编程 今天开始进入第二篇. 今天的内容会比较基础,主要是为了让新手也能无障碍地阅读,所以还是要再巩固下基础.学完了基础,你们也就能很顺畅地跟着我的思路理解以后的文章. 本文目录 学会使用函数创建多线程 学会使用类创建多线程 多线程:必学函数讲解 经过总结,Python创建多线程主要有如下两种方法: 函数 类 接下来,我们就来揭开多线程的神秘面纱. . 学会使用函数创建多线程 在Python3中,Python提供了一个内置模块 threading.Thread,可以很方便地让我们创建多

  • python基础之并发编程(三)

    目录 一.协程定义和作用 1.使用协程的优点 2.使用协程的缺点 二.Greenlet 的使用 三.Gevent的使用 四.async io 异步 IO 1.asyncio中的task的使用 五.总结 进程与线程的区别: 进程.线程和协程的特点 总结 一.协程定义和作用 协程(coroutine),又称为微线程,纤程.(协程是一种用户态的轻量级线程) 作用:在执行 A 函数的时候,可以随时中断,去执行 B 函数,然后中断继续执行 A 函数 (可以自动切换),单着一过程并不是函数调用(没有调用语句

  • python基础之并发编程(一)

    目录 一.进程(Process) 二.线程(Thread) 三.并发编程解决方案: 四.多线程实现 (两种) 1.第一种 函数方法 2.第二种 类方法包装 五.守护线程与子线程 1.线程在分法有: 2.守护线程 六.锁 七.死锁 八.信号量(Semaphore) 九.事件(Event) 十.线程通信-队列 1使用的队列的好处: 2Queue模块中的常用方法: 十一.生产者和消费者模式 总结 一.进程(Process) 是一个具有一定独立功能的程序关于某个数据集合的一次运行活动 二.线程(Thre

  • python基础之并发编程(二)

    目录 一.多进程的实现 方法一 方法二: 二.使用进程的优缺点 1.优点 2.缺点 三.进程的通信 1.Queue 实现进程间通信 2.Pipe 实现进程间通信(一边发送send(obj),一边接收(obj)) 四.Manager管理器 五.进程池 总结 一.多进程的实现 方法一 # 方法包装 多进程 from multiprocessing import Process from time import sleep def func1(arg): print(f'{arg}开始...') sl

  • Python并发编程实例教程之线程的玩法

    目录 一.线程基础以及守护进程 二.线程锁(互斥锁) 三.线程锁(递归锁) 四.死锁 五.队列 六.相关面试题 七.判断数据是否安全 八.进程池 & 线程池 总结 一.线程基础以及守护进程 线程是CPU调度的最小单位 全局解释器锁 全局解释器锁GIL(global interpreter lock) 全局解释器锁的出现主要是为了完成垃圾回收机制的回收机制,对不同线程的引用计数的变化记录的更加精准. 全局解释器锁导致了同一个进程中的多个线程只能有一个线程真正被CPU执行. GIL锁每执行700条指

  • 详解Python并发编程之从性能角度来初探并发编程

    . 前言 作为进阶系列的一个分支「并发编程」,我觉得这是每个程序员都应该会的. 并发编程 这个系列,我准备了将近一个星期,从知识点梳理,到思考要举哪些例子才能更加让人容易吃透这些知识点.希望呈现出来的效果真能如想象中的那样,对小白也一样的友好. 昨天大致整理了下,这个系列我大概会讲如下内容(后期可能调整): 对于并发编程,Python的实现,总结了一下,大致有如下三种方法: 多线程 多进程 协程(生成器) 在之后的章节里,将陆陆续续地给大家介绍到这三个知识点. . 并发编程的基本概念 在开始讲解

  • python并发编程多进程 模拟抢票实现过程

    抢票是并发执行 多个进程可以访问同一个文件 多个进程共享同一文件,我们可以把文件当数据库,用多个进程模拟多个人执行抢票任务 db.txt {"count": 1} 并发运行,效率高,但竞争写同一文件,数据写入错乱,只有一张票,都卖成功给了10个人 #文件db.txt的内容为:{"count":1} #注意一定要用双引号,不然json无法识别 from multiprocessing import Process import time import json cla

  • Python并发编程协程(Coroutine)之Gevent详解

    Gevent官网文档地址:http://www.gevent.org/contents.html 基本概念 我们通常所说的协程Coroutine其实是corporateroutine的缩写,直接翻译为协同的例程,一般我们都简称为协程. 在linux系统中,线程就是轻量级的进程,而我们通常也把协程称为轻量级的线程即微线程. 进程和协程 下面对比一下进程和协程的相同点和不同点: 相同点: 我们都可以把他们看做是一种执行流,执行流可以挂起,并且后面可以在你挂起的地方恢复执行,这实际上都可以看做是con

  • python并发编程之线程实例解析

    常用用法 t.is_alive() Python中线程会在一个单独的系统级别线程中执行(比如一个POSIX线程或者一个Windows线程) 这些线程将由操作系统来全权管理.线程一旦启动,将独立执行直到目标函数返回.可以通过查询 一个线程对象的状态,看它是否还在执行t.is_alive() t.join() 可以把一个线程加入到当前线程,并等待它终止 Python解释器在所有线程都终止后才继续执行代码剩余的部分 daemon 对于需要长时间运行的线程或者需要一直运行的后台任务,可以用后台线程(也称

  • Python并发编程队列与多线程最快发送http请求方式

    目录 队列+多线程 线程池 协程 + aiohttp grequests 最后的话 Python 并发编程有很多方法,多线程的标准库 threading,concurrency,协程 asyncio,当然还有 grequests 这种异步库,每一个都可以实现上述需求,下面一一用代码实现一下,本文的代码可以直接运行,给你以后的并发编程作为参考: 队列+多线程 定义一个大小为 400 的队列,然后开启 200 个线程,每个线程都是不断的从队列中获取 url 并访问. 主线程读取文件中的 url 放入

  • Python并发编程线程消息通信机制详解

    目录 1 Event事件 2 Condition 3 Queue队列 4 总结一下 前面我已经向大家介绍了,如何使用创建线程,启动线程.相信大家都会有这样一个想法,线程无非就是创建一下,然后再start()下,实在是太简单了. 可是要知道,在真实的项目中,实际场景可要我们举的例子要复杂的多得多,不同线程的执行可能是有顺序的,或者说他们的执行是有条件的,是要受控制的.如果仅仅依靠前面学的那点浅薄的知识,是远远不够的. 那今天,我们就来探讨一下如何控制线程的触发执行. 要实现对多个线程进行控制,其实

  • 深入了解Python并发编程

    目录 并发方式 线程([Thread]) 进程 (Process) 远程分布式主机 (Distributed Node) 伪线程 (Pseudo-Thread) 实战运用 计算密集型 IO密集型 总结 并发方式 线程([Thread]) 多线程几乎是每一个程序猿在使用每一种语言时都会首先想到用于解决并发的工具(JS程序员请回避),使用多线程可以有效的利用CPU资源(Python例外).然而多线程所带来的程序的复杂度也不可避免,尤其是对竞争资源的同步问题. 然而在python中由于使用了全局解释锁

  • Python并发编程之未来模块Futures

    目录 区分并发和并行 并发编程之Futures 到底什么是Futures? 为什么多线程每次只有一个线程执行? 总结 不论是哪一种语言,并发编程都是一项非常重要的技巧.比如我们上一章用的爬虫,就被广泛用在工业的各个领域.我们每天在各个网站.App上获取的新闻信息,很大一部分都是通过并发编程版本的爬虫获得的. 正确并合理的使用并发编程,无疑会给我们的程序带来极大性能上的提升.今天我们就一起学习Python中的并发编程——Futures. 区分并发和并行 我们在学习并发编程时,常常会听到两个词:并发

  • Python并发编程多进程,多线程及GIL全局解释器锁

    目录 1. 并发与并行 2. 线程与进程的应用场景 2.1. 并行/并发编程相关的技术栈 3. Python中的GIL是什么,它影响什么 1. 并发与并行 所谓的并行(Parallelism),就是多个彼此独立的任务可以同时一起执行,彼此并不相互干扰,并行强调的是同时且独立的运行,彼此不需要协作. 而所谓并发(Concurrency),则是多个任务彼此交替执行,但是同一时间只能有一个处于运行状态,并发执行强调任务之间的彼此协作. 并发通常被误解为并行,并发实际是隐式的调度独立的代码,以协作的方式

随机推荐