Python多进程multiprocessing用法实例分析

本文实例讲述了Python多进程multiprocessing用法。分享给大家供大家参考,具体如下:

mutilprocess简介

像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。

简单的创建进程:

import multiprocessing
def worker(num):
  """thread worker function"""
  print 'Worker:', num
  return
if __name__ == '__main__':
  jobs = []
  for i in range(5):
    p = multiprocessing.Process(target=worker, args=(i,))
    jobs.append(p)
    p.start()

确定当前的进程,即是给进程命名,方便标识区分,跟踪

import multiprocessing
import time
def worker():
  name = multiprocessing.current_process().name
  print name, 'Starting'
  time.sleep(2)
  print name, 'Exiting'
def my_service():
  name = multiprocessing.current_process().name
  print name, 'Starting'
  time.sleep(3)
  print name, 'Exiting'
if __name__ == '__main__':
  service = multiprocessing.Process(name='my_service',
                   target=my_service)
  worker_1 = multiprocessing.Process(name='worker 1',
                    target=worker)
  worker_2 = multiprocessing.Process(target=worker) # default name
  worker_1.start()
  worker_2.start()
  service.start()

守护进程就是不阻挡主程序退出,自己干自己的 mutilprocess.setDaemon(True)就这句等待守护进程退出,要加上join,join可以传入浮点数值,等待n久就不等了

守护进程:

import multiprocessing
import time
import sys
def daemon():
  name = multiprocessing.current_process().name
  print 'Starting:', name
  time.sleep(2)
  print 'Exiting :', name
def non_daemon():
  name = multiprocessing.current_process().name
  print 'Starting:', name
  print 'Exiting :', name
if __name__ == '__main__':
  d = multiprocessing.Process(name='daemon',
                target=daemon)
  d.daemon = True
  n = multiprocessing.Process(name='non-daemon',
                target=non_daemon)
  n.daemon = False
  d.start()
  n.start()
  d.join(1)
  print 'd.is_alive()', d.is_alive()
  n.join()

最好使用 poison pill,强制的使用terminate()注意 terminate之后要join,使其可以更新状态

终止进程:

import multiprocessing
import time
def slow_worker():
  print 'Starting worker'
  time.sleep(0.1)
  print 'Finished worker'
if __name__ == '__main__':
  p = multiprocessing.Process(target=slow_worker)
  print 'BEFORE:', p, p.is_alive()
  p.start()
  print 'DURING:', p, p.is_alive()
  p.terminate()
  print 'TERMINATED:', p, p.is_alive()
  p.join()
  print 'JOINED:', p, p.is_alive()

①. == 0 未生成任何错误 
②. 0 进程有一个错误,并以该错误码退出
③. < 0 进程由一个-1 * exitcode信号结束

进程的退出状态:

import multiprocessing
import sys
import time
def exit_error():
  sys.exit(1)
def exit_ok():
  return
def return_value():
  return 1
def raises():
  raise RuntimeError('There was an error!')
def terminated():
  time.sleep(3)
if __name__ == '__main__':
  jobs = []
  for f in [exit_error, exit_ok, return_value, raises, terminated]:
    print 'Starting process for', f.func_name
    j = multiprocessing.Process(target=f, name=f.func_name)
    jobs.append(j)
    j.start()
  jobs[-1].terminate()
  for j in jobs:
    j.join()
    print '%15s.exitcode = %s' % (j.name, j.exitcode)

方便的调试,可以用logging

日志:

import multiprocessing
import logging
import sys
def worker():
  print 'Doing some work'
  sys.stdout.flush()
if __name__ == '__main__':
  multiprocessing.log_to_stderr()
  logger = multiprocessing.get_logger()
  logger.setLevel(logging.INFO)
  p = multiprocessing.Process(target=worker)
  p.start()
  p.join()

利用class来创建进程,定制子类

派生进程:

import multiprocessing
class Worker(multiprocessing.Process):
  def run(self):
    print 'In %s' % self.name
    return
if __name__ == '__main__':
  jobs = []
  for i in range(5):
    p = Worker()
    jobs.append(p)
    p.start()
  for j in jobs:
    j.join()

python进程间传递消息:

import multiprocessing
class MyFancyClass(object):
  def __init__(self, name):
    self.name = name
  def do_something(self):
    proc_name = multiprocessing.current_process().name
    print 'Doing something fancy in %s for %s!' % \
      (proc_name, self.name)
def worker(q):
  obj = q.get()
  obj.do_something()
if __name__ == '__main__':
  queue = multiprocessing.Queue()
  p = multiprocessing.Process(target=worker, args=(queue,))
  p.start()
  queue.put(MyFancyClass('Fancy Dan'))
  # Wait for the worker to finish
  queue.close()
  queue.join_thread()
  p.join()
import multiprocessing
import time
class Consumer(multiprocessing.Process):
  def __init__(self, task_queue, result_queue):
    multiprocessing.Process.__init__(self)
    self.task_queue = task_queue
    self.result_queue = result_queue
  def run(self):
    proc_name = self.name
    while True:
      next_task = self.task_queue.get()
      if next_task is None:
        # Poison pill means shutdown
        print '%s: Exiting' % proc_name
        self.task_queue.task_done()
        break
      print '%s: %s' % (proc_name, next_task)
      answer = next_task()
      self.task_queue.task_done()
      self.result_queue.put(answer)
    return
class Task(object):
  def __init__(self, a, b):
    self.a = a
    self.b = b
  def __call__(self):
    time.sleep(0.1) # pretend to take some time to do the work
    return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
  def __str__(self):
    return '%s * %s' % (self.a, self.b)
if __name__ == '__main__':
  # Establish communication queues
  tasks = multiprocessing.JoinableQueue()
  results = multiprocessing.Queue()
  # Start consumers
  num_consumers = multiprocessing.cpu_count() * 2
  print 'Creating %d consumers' % num_consumers
  consumers = [ Consumer(tasks, results)
         for i in xrange(num_consumers) ]
  for w in consumers:
    w.start()
  # Enqueue jobs
  num_jobs = 10
  for i in xrange(num_jobs):
    tasks.put(Task(i, i))
  # Add a poison pill for each consumer
  for i in xrange(num_consumers):
    tasks.put(None)
  # Wait for all of the tasks to finish
  tasks.join()
  # Start printing results
  while num_jobs:
    result = results.get()
    print 'Result:', result
    num_jobs -= 1

Event提供一种简单的方法,可以在进程间传递状态信息。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。

进程间信号传递:

import multiprocessing
import time
def wait_for_event(e):
  """Wait for the event to be set before doing anything"""
  print 'wait_for_event: starting'
  e.wait()
  print 'wait_for_event: e.is_set()->', e.is_set()
def wait_for_event_timeout(e, t):
  """Wait t seconds and then timeout"""
  print 'wait_for_event_timeout: starting'
  e.wait(t)
  print 'wait_for_event_timeout: e.is_set()->', e.is_set()
if __name__ == '__main__':
  e = multiprocessing.Event()
  w1 = multiprocessing.Process(name='block',
                 target=wait_for_event,
                 args=(e,))
  w1.start()
  w2 = multiprocessing.Process(name='nonblock',
                 target=wait_for_event_timeout,
                 args=(e, 2))
  w2.start()
  print 'main: waiting before calling Event.set()'
  time.sleep(3)
  e.set()
  print 'main: event is set'

Python多进程,一般的情况是Queue来传递。

Queue:

from multiprocessing import Process, Queue
def f(q):
  q.put([42, None, 'hello'])
if __name__ == '__main__':
  q = Queue()
  p = Process(target=f, args=(q,))
  p.start()
  print q.get()  # prints "[42, None, 'hello']"
  p.join()

多线程优先队列Queue:

import Queue
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
  def __init__(self, threadID, name, q):
    threading.Thread.__init__(self)
    self.threadID = threadID
    self.name = name
    self.q = q
  def run(self):
    print "Starting " + self.name
    process_data(self.name, self.q)
    print "Exiting " + self.name
def process_data(threadName, q):
  while not exitFlag:
    queueLock.acquire()
    if not workQueue.empty():
      data = q.get()
      queueLock.release()
      print "%s processing %s" % (threadName, data)
    else:
      queueLock.release()
    time.sleep(1)
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Queue.Queue(10)
threads = []
threadID = 1
# Create new threads
for tName in threadList:
  thread = myThread(threadID, tName, workQueue)
  thread.start()
  threads.append(thread)
  threadID += 1
# Fill the queue
queueLock.acquire()
for word in nameList:
  workQueue.put(word)
queueLock.release()
# Wait for queue to empty
while not workQueue.empty():
  pass
# Notify threads it's time to exit
exitFlag = 1
# Wait for all threads to complete
for t in threads:
  t.join()
print "Exiting Main Thread"

多进程使用Queue通信的例子

import time
from multiprocessing import Process,Queue
MSG_QUEUE = Queue(5)
def startA(msgQueue):
  while True:
    if msgQueue.empty() > 0:
      print ('queue is empty %d' % (msgQueue.qsize()))
    else:
      msg = msgQueue.get()
      print( 'get msg %s' % (msg,))
    time.sleep(1)
def startB(msgQueue):
  while True:
    msgQueue.put('hello world')
    print( 'put hello world queue size is %d' % (msgQueue.qsize(),))
    time.sleep(3)
if __name__ == '__main__':
  processA = Process(target=startA,args=(MSG_QUEUE,))
  processB = Process(target=startB,args=(MSG_QUEUE,))
  processA.start()
  print( 'processA start..')

主进程定义了一个Queue类型的变量,并作为Process的args参数传给子进程processA和processB,两个进程一个向队列中写数据,一个读数据。

更多关于Python相关内容感兴趣的读者可查看本站专题:《Python进程与线程操作技巧总结》、《Python Socket编程技巧总结》、《Python数据结构与算法教程》、《Python函数使用技巧总结》、《Python字符串操作技巧汇总》、《Python入门与进阶经典教程》及《Python文件与目录操作技巧汇总》

希望本文所述对大家Python程序设计有所帮助。

(0)

相关推荐

  • Python标准库之多进程(multiprocessing包)介绍

    在初步了解Python多进程之后,我们可以继续探索multiprocessing包中更加高级的工具.这些工具可以让我们更加便利地实现多进程. 进程池 进程池 (Process Pool)可以创建多个进程.这些进程就像是随时待命的士兵,准备执行任务(程序).一个进程池中可以容纳多个待命的士兵. "三个进程的进程池" 比如下面的程序: 复制代码 代码如下: import multiprocessing as mul def f(x):     return x**2 pool = mul.

  • 简单学习Python多进程Multiprocessing

    1.1 什么是 Multiprocessing 多线程在同一时间只能处理一个任务. 可把任务平均分配给每个核,而每个核具有自己的运算空间. 1.2 添加进程 Process 与线程类似,如下所示,但是该程序直接运行无结果,因为IDLE不支持多进程,在命令行终端运行才有结果显示 import multiprocessing as mp def job(a,b): print('abc') if __name__=='__main__': p1=mp.Process(target=job,args=

  • Python使用multiprocessing创建进程的方法

    本文实例讲述了Python使用multiprocessing创建进程的方法.分享给大家供大家参考.具体分析如下: 进程可以通过调用multiprocessing的Process进行创建,下面代码创建两个进程. [root@localhost ~]# cat twoproces.py #!/usr/bin/env python from multiprocessing import Process import os def output(): print "My pid is :%d\n&quo

  • python使用multiprocessing模块实现带回调函数的异步调用方法

    本文实例讲述了python使用multiprocessing模块实现带回调函数的异步调用方法.分享给大家供大家参考.具体分析如下: multipressing模块是python 2.6版本加入的,通过这个模块可以轻松实现异步调用 from multiprocessing import Pool def f(x): return x*x if __name__ == '__main__': pool = Pool(processes=1) # Start a worker processes. r

  • python基于multiprocessing的多进程创建方法

    本文实例讲述了python基于multiprocessing的多进程创建方法.分享给大家供大家参考.具体如下: import multiprocessing import time def clock(interval): while True: print ("the time is %s"% time.time()) time.sleep(interval) if __name__=="__main__": p = multiprocessing.Process

  • Python多进程并发(multiprocessing)用法实例详解

    本文实例讲述了Python多进程并发(multiprocessing)用法.分享给大家供大家参考.具体分析如下: 由于Python设计的限制(我说的是咱们常用的CPython).最多只能用满1个CPU核心. Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情.借助这个包,可以轻松完成从单进程到并发执行的转换. 1.新建单一进程 如果我们新建少量进程,可以如下: import multiprocessing import t

  • Python multiprocessing模块中的Pipe管道使用实例

    multiprocessing.Pipe([duplex]) 返回2个连接对象(conn1, conn2),代表管道的两端,默认是双向通信.如果duplex=False,conn1只能用来接收消息,conn2只能用来发送消息.不同于os.open之处在于os.pipe()返回2个文件描述符(r, w),表示可读的和可写的 实例如下: 复制代码 代码如下: #!/usr/bin/python #coding=utf-8 import os from multiprocessing import P

  • Python multiprocessing.Manager介绍和实例(进程间共享数据)

    Python中进程间共享数据,处理基本的queue,pipe和value+array外,还提供了更高层次的封装.使用multiprocessing.Manager可以简单地使用这些高级接口. Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问.从而达到多进程间数据通信且安全. Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaph

  • Python使用multiprocessing实现一个最简单的分布式作业调度系统

    mutilprocess像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多. 介绍 Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上.一个服务进程可以作为调度者,将任务分布到其他多个机器的多个进程中,依靠网络通信. 想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统. 实现 Job 首先创建一个Job类,为了测试简单,只包含一

  • Python多进程multiprocessing用法实例分析

    本文实例讲述了Python多进程multiprocessing用法.分享给大家供大家参考,具体如下: mutilprocess简介 像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多. 简单的创建进程: import multiprocessing def worker(num): """thread worker function""" print 'Wor

  • Python iter()函数用法实例分析

    本文实例讲述了Python iter()函数用法.分享给大家供大家参考,具体如下: python中的迭代器用起来非常灵巧,不仅可以迭代序列,也可以迭代表现出序列行为的对象,例如字典的键.一个文件的行,等等. 迭代器就是有一个next()方法的对象,而不是通过索引来计数.当使用一个循环机制需要下一个项时,调用迭代器的next()方法,迭代完后引发一个StopIteration异常. 但是迭代器只能向后移动.不能回到开始.再次迭代只能创建另一个新的迭代对象. 反序迭代工具:reversed()将返回

  • Python callable()函数用法实例分析

    本文实例讲述了Python callable()函数用法.分享给大家供大家参考,具体如下: python中的内建函数callable( ) ,可以检查一个对象是否是可调用的 . 对于函数, 方法, lambda 函数式, 类, 以及实现了 _ _call_ _ 方法的类实例, 它都返回 True. >>> help(callable) Help on built-in function callable in module __builtin__: callable(...) calla

  • Python反射的用法实例分析

    本文实例讲述了Python反射的用法.分享给大家供大家参考,具体如下: 在做程序开发中,我们常常会遇到这样的需求:需要执行对象里的某个方法,或需要调用对象中的某个变量,但是由于种种原因我们无法确定这个方法或变量是否存在,这是我们需要用一个特殊的方法或机制要访问和操作这个未知的方法或变量,这中机制就称之为反射.接下记录下反射几个重要方法: hasattr 判断对象中是否有这个方法或变量 class Person(object): def __init__(self,name): self.name

  • Python守护进程用法实例分析

    本文实例讲述了Python守护进程用法.分享给大家供大家参考.具体分析如下: 守护进程是可以一直运行而不阻塞主程序退出.要标志一个守护进程,可以将Process实例的daemon属性设置为True.代码如下: import os import time import random import sys from multiprocessing import Process,current_process def daemon(): p = current_process() print "sta

  • Python多进程编程技术实例分析

    本文以实例形式分析了Python多进程编程技术,有助于进一步Python程序设计技巧.分享给大家供大家参考.具体分析如下: 一般来说,由于Python的线程有些限制,例如多线程不能充分利用多核CPU等问题,因此在Python中我们更倾向使用多进程.但在做不阻塞的异步UI等场景,我们也会使用多线程.本篇文章主要探讨Python多进程的问题. Python在2.6引入了多进程的机制,并提供了丰富的组件及api以方便编写并发应用.multiprocessing包的组件Process, Queue, P

  • python回调函数用法实例分析

    本文实例讲述了python回调函数用法.分享给大家供大家参考.具体分析如下: 软件模块之间总是存在着一定的接口,从调用方式上,可以把他们分为三类:同步调用.回调和异步调用.同步调用是一种阻塞式调用,调用方要等待对方执行完毕 才返回,它是一种单向调用:回调是一种双向调用模式,也就是说,被调用方在接口被调用时也会调用对方的接口:异步调用是一种类似消息或事件的机制,不过它 的调用方向刚好相反,接口的服务在收到某种讯息或发生某种事件时,会主动通知客户方(即调用客户方的接口).回调和异步调用的关系非常紧密

  • python中assert用法实例分析

    本文实例讲述了python中assert用法.分享给大家供大家参考.具体分析如下: 1.assert语句用来声明某个条件是真的. 2.如果你非常确信某个你使用的列表中至少有一个元素,而你想要检验这一点,并且在它非真的时候引发一个错误,那么assert语句是应用在这种情形下的理想语句. 3.当assert语句失败的时候,会引发一AssertionError. 测试程序: >>> mylist = ['item'] >>> assert len(mylist) >=

  • python动态参数用法实例分析

    本文实例讲述了python动态参数用法.分享给大家供大家参考.具体分析如下: 先来看一段代码: class Person: def __init__(self,*pros,**attrs): self.name = "jeff" self.pros = pros for (key,value) in attrs.items(): stm = "self.%s = /"%s/""% (key,value) exec(stm) if __name__

  • python中global用法实例分析

    本文实例讲述了python中global用法.分享给大家供大家参考.具体分析如下: 1.global---将变量定义为全局变量.可以通过定义为全局变量,实现在函数内部改变变量值. 2.一个global语句可以同时定义多个变量,如 global x, y, z 示例程序: >>> def func(): ... global x ... print 'x is ', x ... x = 2 ... print 'Change local x to ', x ... >>>

随机推荐