Python multiprocessing多进程原理与应用示例

本文实例讲述了Python multiprocessing多进程原理与应用。分享给大家供大家参考,具体如下:

multiprocessing包是Python中的多进程管理包,可以利用multiprocessing.Process对象来创建进程,Process对象拥有is_alive()join([timeout])run()start()terminate()等方法。

multprocessing模块的核心就是使管理进程像管理线程一样方便,每个进程有自己独立的GIL,所以不存在进程间争抢GIL的问题,在多核CPU环境中,可以大大提高运行效率。

multiprocessing使用示例:

import multiprocessing
import time
import cv2
def daemon1(image):
  name = multiprocessing.current_process().name
  for i in range(50):
    image = cv2.GaussianBlur(image, (3, 3), 1)
    time.sleep(0.1)
  print 'daemon1 done!'
  cv2.imshow('daemon1', image)
def daemon2(image):
  name = multiprocessing.current_process().name
  for i in range(50):
    image = cv2.GaussianBlur(image, (3, 3), 1)
    time.sleep(0.5)
  print 'daemon2 done!'
  cv2.imshow('daemon2', image)
if __name__ == '__main__':
  t1 = time.time()
  number_kernel = multiprocessing.cpu_count()
  print 'We have {0} kernels'.format(number_kernel)
  p1 = multiprocessing.Process(name='daemon1',
                target=daemon1,args= (cv2.imread('./p1.jpg'),))
  p1.daemon = False
  p2 = multiprocessing.Process(name='daemon2',
                target=daemon2, args=(cv2.imread('./p2.jpg'),))
  p2.daemon = False
  p1.start()
  p2.start()
  print 'p1 is {0}'.format(p1.is_alive())
  p1.terminate()
  p1.join()
  print 'p1 is {0}'.format(p1.is_alive())
  print 'p2 is {0}'.format(p2.is_alive())
  p2.join()
  t2 = time.time()
  print '!!!!!!!!!!!!!!!!!!!!OK!!!!!!!!!!!!!!!!!!!!!'
  print 'total time is {0}'.format(t2-t1)
  print 'p1.exitcode = {0}'.format(p1.exitcode)
  print 'p2.exitcode = {0}'.format(p2.exitcode)

multiprocessing中Process是一个类,用于创建进程,以及定义进程的方法,Process类的构造函数是:

def __init__(self, group=None, target=None, name=None, args=(), kwargs={})

参数含义:

  • group:进程所属组,基本不用
  • target: 创建进程关联的对象,需要传入要多进程处理的函数名
  • name: 定义进程的名称
  • args: 表示与target相关联的函数的传入参数,可以传入多个,注意args是一个元组,如果传入的参数只有一个,需要表示为 args = (element1,)
  • kwargs: 表示调用对象的字典

程序解读:

  • multiprocessing.cpu_count(): 返回机器上cpu核的总数量
  • p1.daemon = False : 定义子进程的运行属性,如果 .daemon设置为False表示子进程可以在主进程完成之后继续执行; 如果 .daemon设置为True,表示子进程随着主进程的结束而结束;必须在start之前设置;
  • p1.start(): 开始执行子进程p1
  • p1.join(): 定义子进程p1的运行阻塞主进程,只有p1子进程执行完成之后才会继续执行join之后的主进程,但是子进程间互相不受join影响。
  • 可以定义子进程阻塞主进程的时间--p1.join(100),超时之后,主进程不再等待,开始执行。join()需要放在start()方法之后;
  • p1.terminate():终止子进程的执行,其后要跟上jion()方法更新子进程的状态;
  • p1.exitcode: 进程的退出状态:  == 0 未生成任何错误,正常退出;  > 0 进程有一个错误,并以该错误码退出;    <0 进程由一个-1 * exitcode信号结束

在multiprocessing中使用pool

如果需要多个子进程时,使用进程池(pool)来(自动)管理各个子进程更加方便:

from multiprocessing import Pool
import os, time
def long_time_task(name):
  print 'Run task {0} ({1})'.format(name,os.getpid())
  start = time.time()
  time.sleep(3)
  end = time.time()
  print 'Task {0} runs {1:.2f} seconds.'.format(name,end - start)
if __name__=='__main__':
  print 'Parent process ({0})'.format(os.getpid)
  p = Pool()
  for i in range(12):
    p.apply_async(long_time_task, args=(i,))
  print 'Waiting for all subprocesses done...'
  p.close()
  p.join()
  print 'All subprocesses done.'

与Process类创建进程的方法不同,Pool是通过apply_async(func,args=(args))方法创建进程,一个进程池中能同时运行的任务数是机器上CPU核的总数量n_kernel,如果创建的子进程数大于n_kernel,则同时执行n_kernel个进程,这n_kernel中某个进程完成之后才会启动下一个进程。

  • os.getpid()是获取当前执行的进程的ID
  • p.close()方法是关掉进程池,表示不能再继续向进程池添加进程了。
  • p.join()方法是子进程阻塞主进程,必须在调用p.close()关闭进程池之后才能调用join()方法

多个子进程间的通信

多个子进程间的通信要用到multiprocessing.Queue,Queue的特性是它是一个消息队列。比如有以下的需求,一个子进程向队列中写数据,另外一个进程从队列中取数据的例子:

from multiprocessing import Process, Queue
import os, time, random
def write(q):
  for value in ['A', 'B', 'C']:
    print 'Put {0} to queue...'.format(value)
    q.put(value)
    time.sleep(random.random())
def read(q):
  while True:
    if not q.empty():
      value = q.get(True)
      print 'Get {0} from queue.'.format(value)
      time.sleep(random.random())
    else:
      break
if __name__=='__main__':
  q = multiprocessing.Queue()
  pw = Process(target=write, args=(q,))
  pr = Process(target=read, args=(q,))
  pw.start()
  pw.join()
  pr.start()
  pr.join()

Queue使用方法:

  • Queue.qsize():返回当前队列包含的消息数量;
  • Queue.empty():如果队列为空,返回True,反之False ;
  • Queue.full():如果队列满了,返回True,反之False;
  • Queue.get():获取队列中的一条消息,然后将其从列队中移除,可传参超时时长;
  • Queue.get_nowait():相当Queue.get(False),取不到值时触发异常:Empty;
  • Queue.put():将一个值添加进数列,可传参超时时长;
  • Queue.put_nowait():相当于Queue.get(False),当队列满了时报错:Full;

在进程池Pool中,使用Queue会出错,需要使用Manager.Queue

from multiprocessing import Process, Queue
import os, time, random
def write(q):
  for value in ['A', 'B', 'C']:
    print 'Put {0} to queue...'.format(value)
    q.put(value)
    time.sleep(random.random())
def read(q):
  while True:
    if not q.empty():
      value = q.get(True)
      print 'Get {0} from queue.'.format(value)
      time.sleep(random.random())
    else:
      break
if __name__=='__main__':
  manager = multiprocessing.Manager()
  q = manager.Queue()
  p = Pool()
  pw = p.apply_async(write, args=(q,))
  time.sleep(2)
  pr = p.apply_async(read, args=(q,))
  p.close()
  p.join()
  if not q.empty():
    print 'q is not empty...'
  else:
    print 'q is empty...'
  print 'OK'
  if not q.empty():
    print 'q is not empty...'
  else:
    print 'q is empty...'
  print 'done...'

父进程与子进程共享内存

定义普通的变量,不能实现在父进程和子进程之间共享:

import multiprocessing
from multiprocessing import Pool
def changevalue(n, a):
  n = 3.14
  a[0] = 5
if __name__ == '__main__':
  num = 0
  arr = range(10)
  p = Pool()
  p1 = p.apply_async(changevalue, args=(num, arr))
  p.close()
  p.join()
  print num
  print arr[:]

结果输出num的值还是在父进程中定义的0,arr的第一个元素值还是0。

使用multiprocessing创建共享对象:

import multiprocessing
def changevalue(n, a):
  n.value = 3.14
  a[0] = 5
if __name__ == '__main__':
  num = multiprocessing.Value('d', 0.0)
  arr = multiprocessing.Array('i', range(10))
  p = multiprocessing.Process(target=changevalue, args=(num, arr))
  p.start()
  p.join()
  print num.value
  print arr[:]

结果输出num的值是在子进程中修改的3.14,arr的第一个元素值更改为5。

共享内存在Pool中的使用:

import multiprocessing
from multiprocessing import Pool
def changevalue(n, a):
  n.value = 3.14
  a[0] = 5
if __name__ == '__main__':
  num = multiprocessing.Value('d', 0.0)
  arr = multiprocessing.Array('i', range(10))
  p = Pool()
  p1 = p.apply_async(changevalue, args=(num, arr))
  p.close()
  p.join()
  print num.value
  print arr[:]

更多关于Python相关内容感兴趣的读者可查看本站专题:《Python进程与线程操作技巧总结》、《Python数据结构与算法教程》、《Python函数使用技巧总结》、《Python字符串操作技巧汇总》、《Python入门与进阶经典教程》、《Python+MySQL数据库程序设计入门教程》及《Python常见数据库操作技巧汇总》

希望本文所述对大家Python程序设计有所帮助。

(0)

相关推荐

  • Python多进程multiprocessing.Pool类详解

    multiprocessing模块 multiprocessing包是Python中的多进程管理包.它与 threading.Thread类似,可以利用multiprocessing.Process对象来创建一个进程.该进程可以允许放在Python程序内部编写的函数中.该Process对象与Thread对象的用法相同,拥有is_alive().join([timeout]).run().start().terminate()等方法.属性有:authkey.daemon(要通过start()设置)

  • Python3多进程 multiprocessing 模块实例详解

    本文实例讲述了Python3多进程 multiprocessing 模块.分享给大家供大家参考,具体如下: 多进程 Multiprocessing 模块 multiprocessing 模块官方说明文档 Process 类 Process 类用来描述一个进程对象.创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成 Process 示例的创建. star() 方法启动进程, join() 方法实现进程间的同步,等待所有进程退出. close() 用来阻止多余的进程涌入进程池 Pool 造

  • python基于multiprocessing的多进程创建方法

    本文实例讲述了python基于multiprocessing的多进程创建方法.分享给大家供大家参考.具体如下: import multiprocessing import time def clock(interval): while True: print ("the time is %s"% time.time()) time.sleep(interval) if __name__=="__main__": p = multiprocessing.Process

  • Python多进程multiprocessing用法实例分析

    本文实例讲述了Python多进程multiprocessing用法.分享给大家供大家参考,具体如下: mutilprocess简介 像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多. 简单的创建进程: import multiprocessing def worker(num): """thread worker function""" print 'Wor

  • Python多进程池 multiprocessing Pool用法示例

    本文实例讲述了Python多进程池 multiprocessing Pool用法.分享给大家供大家参考,具体如下: 1. 背景 由于需要写python程序, 定时.大量发送htttp请求,并对结果进行处理. 参考其他代码有进程池,记录一下. 2. 多进程 vs 多线程 c++程序中,单个模块通常是单进程,会启动几十.上百个线程,充分发挥机器性能.(目前c++11有了std::thread编程多线程很方便,可以参考我之前的博客) shell脚本中,都是多进程后台执行.({ ...} &, 可以参考

  • Python多进程库multiprocessing中进程池Pool类的使用详解

    问题起因 最近要将一个文本分割成好几个topic,每个topic设计一个regressor,各regressor是相互独立的,最后汇总所有topic的regressor得到总得预测结果.没错!类似bagging ensemble!只是我没有抽样.文本不大,大概3000行,topic个数为8,于是我写了一个串行的程序,一个topic算完之后再算另一个topic.可是我在每个topic中用了GridSearchCV来调参,又要选特征又要调整regressor的参数,导致参数组合一共有1782种.我真

  • Python标准库之多进程(multiprocessing包)介绍

    在初步了解Python多进程之后,我们可以继续探索multiprocessing包中更加高级的工具.这些工具可以让我们更加便利地实现多进程. 进程池 进程池 (Process Pool)可以创建多个进程.这些进程就像是随时待命的士兵,准备执行任务(程序).一个进程池中可以容纳多个待命的士兵. "三个进程的进程池" 比如下面的程序: 复制代码 代码如下: import multiprocessing as mul def f(x):     return x**2 pool = mul.

  • Python多进程并发(multiprocessing)用法实例详解

    本文实例讲述了Python多进程并发(multiprocessing)用法.分享给大家供大家参考.具体分析如下: 由于Python设计的限制(我说的是咱们常用的CPython).最多只能用满1个CPU核心. Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情.借助这个包,可以轻松完成从单进程到并发执行的转换. 1.新建单一进程 如果我们新建少量进程,可以如下: import multiprocessing import t

  • 简单学习Python多进程Multiprocessing

    1.1 什么是 Multiprocessing 多线程在同一时间只能处理一个任务. 可把任务平均分配给每个核,而每个核具有自己的运算空间. 1.2 添加进程 Process 与线程类似,如下所示,但是该程序直接运行无结果,因为IDLE不支持多进程,在命令行终端运行才有结果显示 import multiprocessing as mp def job(a,b): print('abc') if __name__=='__main__': p1=mp.Process(target=job,args=

  • Python multiprocessing多进程原理与应用示例

    本文实例讲述了Python multiprocessing多进程原理与应用.分享给大家供大家参考,具体如下: multiprocessing包是Python中的多进程管理包,可以利用multiprocessing.Process对象来创建进程,Process对象拥有is_alive().join([timeout]).run().start().terminate()等方法. multprocessing模块的核心就是使管理进程像管理线程一样方便,每个进程有自己独立的GIL,所以不存在进程间争抢

  • Python多继承原理与用法示例

    本文实例讲述了Python多继承原理与用法.分享给大家供大家参考,具体如下: python中使用多继承,会涉及到查找顺序(MRO).重复调用(钻石继承,也叫菱形继承问题)等 MRO MRO即method resolution order,用于判断子类调用的属性来自于哪个父类.在Python2.3之前,MRO是基于深度优先算法的,自2.3开始使用C3算法,定义类时需要继承object,这样的类称为新式类,否则为旧式类 从图中可以看出,旧式类查找属性时是深度优先搜索,新式类则是广度优先搜索 C3算法

  • python multiprocessing 多进程并行计算的操作

    python的multiprocessing包是标准库提供的多进程并行计算包,提供了和threading(多线程)相似的API函数,但是相比于threading,将任务分配到不同的CPU,避免了GIL(Global Interpreter Lock)的限制. 下面我们对multiprocessing中的Pool和Process类做介绍. Pool 采用Pool进程池对任务并行处理更加方便,我们可以指定并行的CPU个数,然后 Pool 会自动把任务放到进程池中运行. Pool 包含了多个并行函数.

  • Python Multiprocessing多进程 使用tqdm显示进度条的实现

    1.背景 在python运行一些,计算复杂度比较高的函数时,服务器端单核CPU的情况比较耗时,因此需要多CPU使用多进程加快速度 2.函数要求 笔者使用的是:pathos.multiprocessing 库,进度条显示用tqdm库,安装方法: pip install pathos 安装完成后 from pathos.multiprocessing import ProcessingPool as Pool from tqdm import tqdm 这边使用pathos的原因是因为,multip

  • python multiprocessing多进程变量共享与加锁的实现

    python多进程和多线程是大家会重点了解的部分,因为很多工作如果并没有前后相互依赖关系的话其实顺序并不是非常的重要,采用顺序执行的话就必定会造成无谓的等待,任凭cpu和内存白白浪费,这是我们不想看到的. 为了解决这个问题,我们就可以采用多线程或者多进程的方式,(多线程我们之后再讲),而这两者之间是有本质区别的.就内存而言,已知进程是在执行过程中有独立的内存单元的,而多个线程是共享内存的,这是多进程和多线程的一大区别. 利用Value在不同进程中同步变量 在多进程中,由于进程之间内存相互是隔离的

  • Python greenlet实现原理和使用示例

    最近开始研究Python的并行开发技术,包括多线程,多进程,协程等.逐步整理了网上的一些资料,今天整理了一下greenlet相关的资料. 并发处理的技术背景 并行化处理目前很受重视, 因为在很多时候,并行计算能大大的提高系统吞吐量,尤其在现在多核多处理器的时代, 所以像lisp这种古老的语言又被人们重新拿了起来, 函数式编程也越来越流行. 介绍一个python的并行处理的一个库: greenlet. python 有一个非常有名的库叫做 stackless ,用来做并发处理, 主要是弄了个叫做t

  • python 包之 multiprocessing 多进程

    目录 一.创建一个进程 二.创建多个进程 三.管道pipe进行进程间通信 四.队列Queue进行进程间通信 五.进程间同步 六.进程间共享数据 七.进程池 一.创建一个进程 实例化 Process 类创建一个进程对象 然后调用它的 start 方法即可生成一个子进程 from multiprocessing import Process def func(s): print(s) if __name__ == '__main__': p = Process(target=func, args=(

  • Python多进程原理与用法分析

    本文实例讲述了Python多进程原理与用法.分享给大家供大家参考,具体如下: 进程是程序在计算机上的一次执行活动.当你运行一个程序,你就启动了一个进程.显然,程序是死的(静态的),进程是活的(动态的).进程可以分为系统进程和用户进程.凡是用于完成操作系统的各种功能的进程就是系统进程,它们就是处于运行状态下的操作系统本身:所有由你启动的进程都是用户进程.进程是操作系统进行资源分配的单位. 开启一个进程 import multiprocessing,time,os def runtask(): ti

  • Python 多进程原理及实现

    1 进程的基本概念 什么是进程? ​ 进程就是一个程序在一个数据集上的一次动态执行过程.进程一般由程序.数据集.进程控制块三部分组成.我们编写的程序用来描述进程要完成哪些功能以及如何完成:数据集则是程序在执行过程中所需要使用的资源:进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志. 进程的生命周期:创建(New).就绪(Runnable).运行(Running).阻塞(Block).销毁(Destroy) 进程的状态(分类)

  • python实现多进程按序号批量修改文件名的方法示例

    本文实例讲述了python实现多进程按序号批量修改文件名的方法.分享给大家供大家参考,具体如下: 说明 文件名命名方式如图,是数字序号开头,但是中间有些文件删掉了,序号不连续,这里将序号连续起来,总的文件量有40w+,故使用多进程 代码 import os import re from multiprocessing import Pool def getAllFilePath(pathFolder,filter=[".jpg",".txt"]): #遍历文件夹下所

随机推荐