Python自定义进程池实例分析【生产者、消费者模型问题】

本文实例分析了Python自定义进程池。分享给大家供大家参考,具体如下:

代码说明一切:

#encoding=utf-8
#author: walker
#date: 2014-05-21
#function: 自定义进程池遍历目录下文件
from multiprocessing import Process, Queue, Lock
import time, os
#消费者
class Consumer(Process):
  def __init__(self, queue, ioLock):
    super(Consumer, self).__init__()
    self.queue = queue
    self.ioLock = ioLock
  def run(self):
    while True:
      task = self.queue.get()  #队列中无任务时,会阻塞进程
      if isinstance(task, str) and task == 'quit':
        break;
      time.sleep(1)  #假定任务处理需要1秒钟
      self.ioLock.acquire()
      print( str(os.getpid()) + ' ' + task)
      self.ioLock.release()
    self.ioLock.acquire()
    print 'Bye-bye'
    self.ioLock.release()
#生产者
def Producer():
  queue = Queue()  #这个队列是进程/线程安全的
  ioLock = Lock()
  subNum = 4  #子进程数量
  workers = build_worker_pool(queue, ioLock, subNum)
  start_time = time.time()
  for parent, dirnames, filenames in os.walk(r'D:\test'):
    for filename in filenames:
      queue.put(filename)
      ioLock.acquire()
      print('qsize:' + str(queue.qsize()))
      ioLock.release()
      while queue.qsize() > subNum * 10: #控制队列中任务数量
        time.sleep(1)
  for worker in workers:
    queue.put('quit')
  for worker in workers:
    worker.join()
  ioLock.acquire()
  print('Done! Time taken: {}'.format(time.time() - start_time))
  ioLock.release()
#创建进程池
def build_worker_pool(queue, ioLock, size):
  workers = []
  for _ in range(size):
    worker = Consumer(queue, ioLock)
    worker.start()
    workers.append(worker)
  return workers
if __name__ == '__main__':
  Producer()

ps:

self.ioLock.acquire()
...
self.ioLock.release()

可用:

with self.ioLock:
  ...

替代。

再来一个好玩的例子:

#encoding=utf-8
#author: walker
#date: 2016-01-06
#function: 一个多进程的好玩例子
import os, sys, time
from multiprocessing import Pool
cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))
g_List = ['a']
#修改全局变量g_List
def ModifyDict_1():
  global g_List
  g_List.append('b')
#修改全局变量g_List
def ModifyDict_2():
  global g_List
  g_List.append('c')
#处理一个
def ProcOne(num):
  print('ProcOne ' + str(num) + ', g_List:' + repr(g_List))
#处理所有
def ProcAll():
  pool = Pool(processes = 4)
  for i in range(1, 20):
    #ProcOne(i)
    #pool.apply(ProcOne, (i,))
    pool.apply_async(ProcOne, (i,))
  pool.close()
  pool.join()
ModifyDict_1() #修改全局变量g_List
if __name__ == '__main__':
  ModifyDict_2() #修改全局变量g_List
  print('In main g_List :' + repr(g_List))
  ProcAll()

Windows7 下运行的结果:

λ python3 demo.py
In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b']
ProcOne 2, g_List:['a', 'b']
ProcOne 3, g_List:['a', 'b']
ProcOne 4, g_List:['a', 'b']
ProcOne 5, g_List:['a', 'b']
ProcOne 6, g_List:['a', 'b']
ProcOne 7, g_List:['a', 'b']
ProcOne 8, g_List:['a', 'b']
ProcOne 9, g_List:['a', 'b']
ProcOne 10, g_List:['a', 'b']
ProcOne 11, g_List:['a', 'b']
ProcOne 12, g_List:['a', 'b']
ProcOne 13, g_List:['a', 'b']
ProcOne 14, g_List:['a', 'b']
ProcOne 15, g_List:['a', 'b']
ProcOne 16, g_List:['a', 'b']
ProcOne 17, g_List:['a', 'b']
ProcOne 18, g_List:['a', 'b']
ProcOne 19, g_List:['a', 'b']

Ubuntu 14.04下运行的结果:

In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b', 'c']
ProcOne 2, g_List:['a', 'b', 'c']
ProcOne 3, g_List:['a', 'b', 'c']
ProcOne 5, g_List:['a', 'b', 'c']
ProcOne 4, g_List:['a', 'b', 'c']
ProcOne 8, g_List:['a', 'b', 'c']
ProcOne 9, g_List:['a', 'b', 'c']
ProcOne 7, g_List:['a', 'b', 'c']
ProcOne 11, g_List:['a', 'b', 'c']
ProcOne 6, g_List:['a', 'b', 'c']
ProcOne 12, g_List:['a', 'b', 'c']
ProcOne 13, g_List:['a', 'b', 'c']
ProcOne 10, g_List:['a', 'b', 'c']
ProcOne 14, g_List:['a', 'b', 'c']
ProcOne 15, g_List:['a', 'b', 'c']
ProcOne 16, g_List:['a', 'b', 'c']
ProcOne 17, g_List:['a', 'b', 'c']
ProcOne 18, g_List:['a', 'b', 'c']
ProcOne 19, g_List:['a', 'b', 'c']

可以看见Windows7下第二次修改没有成功,而Ubuntu下修改成功了。据uliweb作者limodou讲,原因是Windows下是充重启实现的子进程;Linux下是fork实现的。

更多关于Python相关内容感兴趣的读者可查看本站专题:《Python URL操作技巧总结》、《Python图片操作技巧总结》、《Python数据结构与算法教程》、《Python Socket编程技巧总结》、《Python函数使用技巧总结》、《Python字符串操作技巧汇总》、《Python入门与进阶经典教程》及《Python文件与目录操作技巧汇总》

希望本文所述对大家Python程序设计有所帮助。

(0)

相关推荐

  • 理解生产者消费者模型及在Python编程中的运用实例

    什么是生产者消费者模型 在 工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进程等).产 生数据的模块,就形象地称为生产者:而处理数据的模块,就称为消费者.在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商 品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模型.结构图如下: 生产者消费者模型的优点: 1.解耦 假设生产者和消费者分别是两个类.如果让生产者直接调用消费者的某个方法,

  • python条件变量之生产者与消费者操作实例分析

    本文实例讲述了python条件变量之生产者与消费者操作.分享给大家供大家参考,具体如下: 互斥锁是最简单的线程同步机制,面对复杂线程同步问题,Python还提供了Condition对象.Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法.线程首先acquire一个条件变量,然后判断一些条件.如果条件不满足则wait:如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到

  • 详解Python 模拟实现生产者消费者模式的实例

    详解Python 模拟实现生产者消费者模式的实例 散仙使用python3.4模拟实现的一个生产者与消费者的例子,用到的知识有线程,队列,循环等,源码如下: Python代码 import queue import time import threading import random q=queue.Queue(5) #生产者 def pr(): name=threading.current_thread().getName() print(name+"线程启动......") for

  • Python自定义进程池实例分析【生产者、消费者模型问题】

    本文实例分析了Python自定义进程池.分享给大家供大家参考,具体如下: 代码说明一切: #encoding=utf-8 #author: walker #date: 2014-05-21 #function: 自定义进程池遍历目录下文件 from multiprocessing import Process, Queue, Lock import time, os #消费者 class Consumer(Process): def __init__(self, queue, ioLock):

  • Python之两种模式的生产者消费者模型详解

    第一种使用queue队列实现: #生产者消费者模型 其实服务器集群就是这个模型 # 这里介绍的是非yield方法实现过程 import threading,time import queue q = queue.Queue(maxsize=10) def Producer(anme): # for i in range(10): # q.put('骨头%s'%i) count = 1 while True: q.put('骨头%s'%count) print('生产了骨头',count) cou

  • Python守护进程用法实例分析

    本文实例讲述了Python守护进程用法.分享给大家供大家参考.具体分析如下: 守护进程是可以一直运行而不阻塞主程序退出.要标志一个守护进程,可以将Process实例的daemon属性设置为True.代码如下: import os import time import random import sys from multiprocessing import Process,current_process def daemon(): p = current_process() print "sta

  • Python自定义线程池实现方法分析

    本文实例讲述了Python自定义线程池实现方法.分享给大家供大家参考,具体如下: 关于python的多线程,由与GIL的存在被广大群主所诟病,说python的多线程不是真正的多线程.但多线程处理IO密集的任务效率还是可以杠杠的. 我实现的这个线程池其实是根据银角的思路来实现的. 主要思路: 任务获取和执行: 1.任务加入队列,等待线程来获取并执行. 2.按需生成线程,每个线程循环取任务. 线程销毁: 1.获取任务是终止符时,线程停止. 2.线程池close()时,向任务队列加入和已生成线程等量的

  • Python生成器实现简单"生产者消费者"模型代码实例

    生成器定义 在Python中,一边循环一边计算的机制,称为生成器:generator. 为什么要有生成器 列表所有数据都在内存中,如果有海量数据的话将会非常耗内存. 如:仅仅需要访问前面几个元素,那后面绝大多数元素占用的空间都白白浪费了. 如果列表元素按照某种算法推算出来,那我们就可以在循环的过程中不断推算出后续的元素,这样就不必创建完整的list,从而节省大量的空间. 简单一句话:我又想要得到庞大的数据,又想让它占用空间少,那就用生成器! 使用生成器实现简单的生产者消费者模型 1.效果截屏 代

  • Queue 实现生产者消费者模型(实例讲解)

    Python中,队列是线程间最常用的交换数据的形式. Python Queue模块有三种队列及构造函数: 1.Python Queue模块的FIFO队列先进先出. class Queue.Queue(maxsize) 2.LIFO类似于堆,即先进后出. class Queue.LifoQueue(maxsize) 3.还有一种是优先级队列级别越低越先出来. class Queue.PriorityQueue(maxsize) 此包中的常用方法(q = Queue.Queue()): q.qsiz

  • Python iter()函数用法实例分析

    本文实例讲述了Python iter()函数用法.分享给大家供大家参考,具体如下: python中的迭代器用起来非常灵巧,不仅可以迭代序列,也可以迭代表现出序列行为的对象,例如字典的键.一个文件的行,等等. 迭代器就是有一个next()方法的对象,而不是通过索引来计数.当使用一个循环机制需要下一个项时,调用迭代器的next()方法,迭代完后引发一个StopIteration异常. 但是迭代器只能向后移动.不能回到开始.再次迭代只能创建另一个新的迭代对象. 反序迭代工具:reversed()将返回

  • Python callable()函数用法实例分析

    本文实例讲述了Python callable()函数用法.分享给大家供大家参考,具体如下: python中的内建函数callable( ) ,可以检查一个对象是否是可调用的 . 对于函数, 方法, lambda 函数式, 类, 以及实现了 _ _call_ _ 方法的类实例, 它都返回 True. >>> help(callable) Help on built-in function callable in module __builtin__: callable(...) calla

  • python自定义线程池控制线程数量的示例

    1.自定义线程池 import threading import Queue import time queue = Queue.Queue() def put_data_in_queue(): for i in xrange(10): queue.put(i) class MyThread(threading.Thread): def run(self): while not queue.empty(): sleep_times = queue.get() time.sleep(sleep_t

随机推荐