python threading和multiprocessing模块基本用法实例分析

本文实例讲述了python threading和multiprocessing模块基本用法。分享给大家供大家参考,具体如下:

前言

这两天为了做一个小项目,研究了一下python的并发编程,所谓并发无非多线程和多进程,最初找到的是threading模块,因为印象中线程“轻量...”,“切换快...”,“可共享进程资源...”等等,但是没想到这里水很深,进而找到了更好的替代品multiprocessing模块。下面会讲一些使用中的经验。

后面出现的代码都在ubuntu10.04 + python2.6.5的环境下测试通过。

一、使用threading模块创建线程

1、三种线程创建方式

(1)传入一个函数

这种方式是最基本的,即调用threading中的Thread类的构造函数,然后指定参数target=func,再使用返回的Thread的实例调用start()方法,即开始运行该线程,该线程将执行函数func,当然,如果func需要参数,可以在Thread的构造函数中传入参数args=(...)。示例代码如下:

#!/usr/bin/python
#-*-coding:utf-8-*-
import threading
#用于线程执行的函数
def counter(n):
  cnt = 0;
  for i in xrange(n):
    for j in xrange(i):
      cnt += j;
  print cnt;
if __name__ == '__main__':
 #初始化一个线程对象,传入函数counter,及其参数1000
  th = threading.Thread(target=counter, args=(1000,));
 #启动线程
  th.start();
 #主线程阻塞等待子线程结束
  th.join();

这段代码很直观,counter函数是一个很无聊的双重循环,需要注意的是th.join()这句,这句的意思是主线程将自我阻塞,然后等待th表示的线程执行完毕再结束,如果没有这句,运行代码会立即结束。join的意思比较晦涩,其实将这句理解成这样会好理解些“while th.is_alive(): time.sleep(1)”。虽然意思相同,但是后面将看到,使用join也有陷阱。

(2)传入一个可调用的对象

许多的python 对象都是我们所说的可调用的,即是任何能通过函数操作符“()”来调用的对象(见《python核心编程》第14章)。类的对象也是可以调用的,当被调用时会自动调用对象的内建方法__call__(),因此这种新建线程的方法就是给线程指定一个__call__方法被重载了的对象。示例代码如下:

#!/usr/bin/python
#-*-coding:utf-8-*-
import threading
#可调用的类
class Callable(object):
  def __init__(self, func, args):
    self.func = func;
    self.args = args;
  def __call__(self):
    apply(self.func, self.args);
#用于线程执行的函数
def counter(n):
  cnt = 0;
  for i in xrange(n):
    for j in xrange(i):
      cnt += j;
  print cnt;
if __name__ == '__main__':
 #初始化一个线程对象,传入可调用的Callable对象,并用函数counter及其参数1000初始化这个对象
  th = threading.Thread(target=Callable(counter, (1000,)));
 #启动线程
  th.start();
 #主线程阻塞等待子线程结束
  th.join();

这个例子关键的一句是apply(self.func, self.args); 这里使用初始化时传入的函数对象及其参数来进行一次调用。

(3)继承Thread类

这种方式通过继承Thread类,并重载其run方法,来实现自定义的线程行为,示例代码如下:

#!/usr/bin/python
#-*-coding:utf-8-*-
import threading, time, random
def counter():
  cnt = 0;
  for i in xrange(10000):
    for j in xrange(i):
      cnt += j;
class SubThread(threading.Thread):
  def __init__(self, name):
    threading.Thread.__init__(self, name=name);
  def run(self):
    i = 0;
    while i < 4:
      print self.name,'counting...\n';
      counter();
      print self.name,'finish\n';
      i += 1;
if __name__ == '__main__':
  th = SubThread('thread-1');
  th.start();
  th.join();
  print 'all done';

这个例子定义了一个SubThread类,它继承了Thread类,并重载了run方法,在方法中调用counter4次并打印一些信息,可以看到这种方式比较直观。在构造函数中要记得先调用父类的构造函数进行初始化。

2、python多线程的限制

python多线程有个讨厌的限制,全局解释器锁(global interpreter lock),这个锁的意思是任一时间只能有一个线程使用解释器,跟单cpu跑多个程序一个意思,大家都是轮着用的,这叫“并发”,不是“并行”。手册上的解释是为了保证对象模型的正确性!这个锁造成的困扰是如果有一个计算密集型的线程占着cpu,其他的线程都得等着....,试想你的多个线程中有这么一个线程,得多悲剧,多线程生生被搞成串行;当然这个模块也不是毫无用处,手册上又说了:当用于IO密集型任务时,IO期间线程会释放解释器,这样别的线程就有机会使用解释器了!所以是否使用这个模块需要考虑面对的任务类型。

二、使用multiprocessing创建进程

1、三种创建方式

进程的创建方式跟线程完全一致,只不过要将threading.Thread换成multiprocessing.Process。multiprocessing模块尽力保持了与threading模块在方法名上的一致性,示例代码可参考上面线程部分的。这里只给出第一种使用函数的方式:

#!/usr/bin/python
#-*-coding:utf-8-*-
import multiprocessing, time
def run():
  i = 0;
  while i<10000:
    print 'running';
    time.sleep(2);
    i += 1;
if __name__ == '__main__':
  p = multiprocessing.Process(target=run);
  p.start();
  #p.join();
  print p.pid;
  print 'master gone';

2、创建进程池

该模块还允许一次创建一组进程,然后再给他们分配任务。详细内容可参考手册,这部分研究不多,不敢乱写。

pool = multiprocessing.Pool(processes=4)
pool.apply_async(func, args...)

3、使用进程的好处

完全并行,无GIL的限制,可充分利用多cpu多核的环境;可以接受linux信号,后面将看到,这个功能非常好用。

三、实例研究

该实例假想的任务是:一个主进程会启动多个子进程分别处理不同的任务,各个子进程可能又有自己的线程用于不同的IO处理(前面说过,线程在IO方面还是不错的),要实现的功能是,对这些子进程发送信号,能被正确的处理,例如发生SIGTERM,子进程能通知其线程收工,然后“优雅”的退出。现在要解决的问题有:(1)在子类化的Process对象中如何捕捉信号;(2)如何“优雅的退出”。下面分别说明。

1、子类化Process并捕捉信号

如果是使用第一种进程创建方式(传入函数),那么捕捉信号很容易,假设给进程运行的函数叫func,代码示例如下:

#!/usr/bin/python
#-*-coding:utf-8-*-
import multiprocessing, signal,time
def handler(signum, frame):
  print 'signal', signum;
def run():
  signal.signal(signal.SIGTERM, handler);
  signal.signal(signal.SIGINT, handler);
  i = 0;
  while i<10000:
    print 'running';
    time.sleep(2);
    i += 1;
if __name__ == '__main__':
  p = multiprocessing.Process(target=run);
  p.start();
  #p.join();
  print p.pid;
  print 'master gone';

这段代码是在第一种创建方式的基础上修改而来的,增加了两行signal.signal(...)调用,这是说这个函数要捕捉SIGTERM和SIGINT两个信号,另外增加了一个handler函数,该函数用于捕捉到信号时进行相应的处理,我们这里只是简单的打印出信号值。

注意p.join()被注释掉了,这里跟线程的情况有点区别,新的进程启动后就开始运行了,主进程也不用等待它运行完,可以该干嘛干嘛去。这段代码运行后会打印出子进程的进程id,根据这个id,在另一个终端输入kill -TERM id,会发现刚才的终端打印出了"signal 15"。

但是使用传入函数的方式有一点不好的是封装性太差,如果功能稍微复杂点,将会有很多的全局变量暴露在外,最好还是将功能封装成类,那么使用类又怎么注册信号相应函数呢?上面的例子貌似只能使用一个全局的函数,手册也没有给出在类中处理信号的例子,其实解决方法大同小异,也很容易,这个帖子http://stackoverflow.com/questions/6204443/python-signal-reading-return-from-signal-handler-function给了我灵感:

class Master(multiprocessing.Process):
  def __init__(self):
    super(Master,self).__init__();
    signal.signal(signal.SIGTERM, self.handler);   #注册信号处理函数
    self.live = 1;
  #信号处理函数
  def handler(self, signum, frame):
    print 'signal:',signum;
    self.live = 0;
  def run(self):
    print 'PID:',self.pid;
    while self.live:
      print 'living...'
      time.sleep(2);

方法很直观,首先在构造函数中注册信号处理函数,然后定义了一个方法handler作为处理函数。这个进程类会每隔2秒打印一个“living...”,当接收到SIGTERM后,改变self.live的值,run方法的循环检测到这个值为0后就结束了,进程也结束了。

2、让进程优雅的退出

下面放出这次的假想任务的全部代码,我在主进程中启动了一个子进程(通过子类化Process类),然后子进程启动后又产生两个子线程,用来模拟“生产者-消费者”模型,两个线程通过一个队列进行交流,为了互斥访问这个队列,自然要加一把锁(condition对象跟Lock对象差不多,不过多了等待和通知的功能);生产者每次产生一个随机数并扔进队列,然后休息一个随机时间,消费者每次从队列取一个数;而子进程中的主线程要负责接收信号,以便让整个过程优雅的结束。代码如下:

#!/usr/bin/python
#-*-coding:utf-8-*-
import time, multiprocessing, signal, threading, random, time, Queue
class Master(multiprocessing.Process):
  def __init__(self):
    super(Master,self).__init__();
    signal.signal(signal.SIGTERM, self.handler);
 #这个变量要传入线程用于控制线程运行,为什么用dict?充分利用线程间共享资源的特点
 #因为可变对象按引用传递,标量是传值的,不信写成self.live = true试试
    self.live = {'stat':True};
  def handler(self, signum, frame):
    print 'signal:',signum;
    self.live['stat'] = 0;                  #置这个变量为0,通知子线程可以“收工”了
  def run(self):
    print 'PID:',self.pid;
    cond = threading.Condition(threading.Lock());      #创建一个condition对象,用于子线程交互
    q = Queue.Queue();                    #一个队列
    sender = Sender(cond, self.live, q);           #传入共享资源
    geter = Geter(cond, self.live, q);
    sender.start();                     #启动线程
    geter.start();
    signal.pause();                     #主线程睡眠并等待信号
    while threading.activeCount()-1:             #主线程收到信号并被唤醒后,检查还有多少线程活着(除掉自己)
      time.sleep(2);                    #再睡眠等待,确保子线程都安全的结束
      print 'checking live', threading.activeCount();
    print 'mater gone';
class Sender(threading.Thread):
  def __init__(self, cond, live, queue):
    super(Sender, self).__init__(name='sender');
    self.cond = cond;
    self.queue = queue;
    self.live = live
  def run(self):
    cond = self.cond;
    while self.live['stat']:                 #检查这个进程内的“全局”变量,为真就继续运行
      cond.acquire();                   #获得锁,以便控制队列
      i = random.randint(0,100);
      self.queue.put(i,False);
      if not self.queue.full():
        print 'sender add:',i;
      cond.notify();                    #唤醒等待锁的其他线程
      cond.release();                   #释放锁
      time.sleep(random.randint(1,3));
    print 'sender done'
class Geter(threading.Thread):
  def __init__(self, cond, live, queue):
    super(Geter, self).__init__(name='geter');
    self.cond = cond;
    self.queue = queue;
    self.live = live
  def run(self):
    cond = self.cond;
    while self.live['stat']:
      cond.acquire();
      if not self.queue.empty():
        i = self.queue.get();
        print 'geter get:',i;
      cond.wait(3);
      cond.release();
      time.sleep(random.randint(1,3));
    print 'geter done'
if __name__ == '__main__':
  master = Master();
  master.start();                       #启动子进程

需要注意的地方是,在Master的run方法中sender.start()geter.start()之后,按常理应该接着调用sender.join()geter.join(),让主线程等待子线程结束,前面说的join的陷阱就在这里,join将主线程阻塞(blocking)住了,主线程无法再捕捉信号,刚开始研究这块时还以为信号处理函数写错了。网上讨论比较少,这里说的比较清楚http://stackoverflow.com/questions/631441/interruptible-thread-join-in-python,http://www.gossamer-threads.com/lists/python/python/541403

参考:

《python核心编程》
《python manual》

更多关于Python相关内容感兴趣的读者可查看本站专题:《Python进程与线程操作技巧总结》、《Python数据结构与算法教程》、《Python函数使用技巧总结》、《Python字符串操作技巧汇总》、《Python入门与进阶经典教程》、《Python+MySQL数据库程序设计入门教程》及《Python常见数据库操作技巧汇总》

希望本文所述对大家Python程序设计有所帮助。

(0)

相关推荐

  • Python 多线程Threading初学教程

    1.1 什么是多线程 Threading 多线程可简单理解为同时执行多个任务. 多进程和多线程都可以执行多个任务,线程是进程的一部分.线程的特点是线程之间可以共享内存和变量,资源消耗少(不过在Unix环境中,多进程和多线程资源调度消耗差距不明显,Unix调度较快),缺点是线程之间的同步和加锁比较麻烦. 1.2 添加线程 Thread 导入模块 import threading 获取已激活的线程数 threading.active_count() 查看所有线程信息 threading.enumer

  • Python中多线程thread与threading的实现方法

    学过Python的人应该都知道,Python是支持多线程的,并且是native的线程.本文主要是通过thread和threading这两个模块来实现多线程的. python的thread模块是比较底层的模块,python的threading模块是对thread做了一些包装的,可以更加方便的被使用. 这里需要提一下的是python对线程的支持还不够完善,不能利用多CPU,但是下个版本的python中已经考虑改进这点,让我们拭目以待吧. threading模块里面主要是对一些线程的操作对象化了,创建

  • Python multiprocessing.Manager介绍和实例(进程间共享数据)

    Python中进程间共享数据,处理基本的queue,pipe和value+array外,还提供了更高层次的封装.使用multiprocessing.Manager可以简单地使用这些高级接口. Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问.从而达到多进程间数据通信且安全. Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaph

  • Python多线程threading和multiprocessing模块实例解析

    本文研究的主要是Python多线程threading和multiprocessing模块的相关内容,具体介绍如下. 线程是一个进程的实体,是由表示程序运行状态的寄存器(如程序计数器.栈指针)以及堆栈组成,它是比进程更小的单位. 线程是程序中的一个执行流.一个执行流是由CPU运行程序代码并操作程序的数据所形成的.因此,线程被认为是以CPU为主体的行为. 线程不包含进程地址空间中的代码和数据,线程是计算过程在某一时刻的状态.所以,系统在产生一个线程或各个线程之间切换时,负担要比进程小得多. 线程是一

  • Python threading多线程编程实例

    Python 的多线程有两种实现方法: 函数,线程类 1.函数 调用 thread 模块中的 start_new_thread() 函数来创建线程,以线程函数的形式告诉线程该做什么 复制代码 代码如下: # -*- coding: utf-8 -*- import thread def f(name):   #定义线程函数   print "this is " + name   if __name__ == '__main__':   thread.start_new_thread(f

  • Python用threading实现多线程详解

    多线程 多线程是个提高程序运行效率的好办法,本来要顺序执行的程序现在可以并行执行,可想而知效率要提高很多.但是多线程也不是能提高所有程序的效率.程序的两个极端是'CPU 密集型'和'I/O 密集型'两种,多线程技术比较适用于后者,因为在串行结构中当你去读写磁盘或者网络通信的时候 CPU 是闲着的,毕竟网络比磁盘要慢几个数量级,磁盘比内存慢几个数量级,内存又比 CPU 慢几个数量级.多线程技术就可以同时执行,比如你的程序需要发送 N 个 http 数据包(10 秒),还需要将文件从一个位置复制到另

  • 举例详解Python中threading模块的几个常用方法

    threading.Thread Thread 是threading模块中最重要的类之一,可以使用它来创建线程.有两种方式来创建线程:一种是通过继承Thread类,重写它的run方法:另一种是创建一个threading.Thread对象,在它的初始化函数(__init__)中将可调用对象作为参数传入.下面分别举例说明.先来看看通过继承threading.Thread类来创建线程的例子: #coding=gbk import threading, time, random count = 0 cl

  • Python多进程并发(multiprocessing)用法实例详解

    本文实例讲述了Python多进程并发(multiprocessing)用法.分享给大家供大家参考.具体分析如下: 由于Python设计的限制(我说的是咱们常用的CPython).最多只能用满1个CPU核心. Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情.借助这个包,可以轻松完成从单进程到并发执行的转换. 1.新建单一进程 如果我们新建少量进程,可以如下: import multiprocessing import t

  • Python中Threading用法详解

    Python的threading模块松散地基于Java的threading模块.但现在线程没有优先级,没有线程组,不能被销毁.停止.暂停.开始和打断. Java Thread类的静态方法,被移植成了模块方法. main thread: 运行python程序的线程 daemon thread 守护线程,如果守护线程之外的线程都结束了.守护线程也会结束,并强行终止整个程序.不要在守护进程中进行资源相关操作.会导致资源不能正确的释放.在非守护进程中使用Event. Thread 类 (group=No

  • python使用threading获取线程函数返回值的实现方法

    threading用于提供线程相关的操作,线程是应用程序中工作的最小单元.python当前版本的多线程库没有实现优先级.线程组,线程也不能被停止.暂停.恢复.中断. threading模块提供的类:  Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local. threading 模块提供的常用方法: threading.currentThread(): 返回当前的线程变量. threading.enumera

  • Python多进程库multiprocessing中进程池Pool类的使用详解

    问题起因 最近要将一个文本分割成好几个topic,每个topic设计一个regressor,各regressor是相互独立的,最后汇总所有topic的regressor得到总得预测结果.没错!类似bagging ensemble!只是我没有抽样.文本不大,大概3000行,topic个数为8,于是我写了一个串行的程序,一个topic算完之后再算另一个topic.可是我在每个topic中用了GridSearchCV来调参,又要选特征又要调整regressor的参数,导致参数组合一共有1782种.我真

  • Python使用multiprocessing创建进程的方法

    本文实例讲述了Python使用multiprocessing创建进程的方法.分享给大家供大家参考.具体分析如下: 进程可以通过调用multiprocessing的Process进行创建,下面代码创建两个进程. [root@localhost ~]# cat twoproces.py #!/usr/bin/env python from multiprocessing import Process import os def output(): print "My pid is :%d\n&quo

随机推荐