解决windows下python3使用multiprocessing.Pool出现的问题

例如:

from multiprocessing import Pool

def f(x):
return x*x
pool = Pool(processes=4)
r=pool.map(f, range(100))
pool.close()
pool.join()

在spyder里运行直接没反应;在shell窗口里,直接报错,如下:

Process SpawnPoolWorker-15:
Traceback (most recent call last):
File "C:\Anaconda3\lib\multiprocessing\process.py", line 254, in _bootstr
self.run()
File "C:\Anaconda3\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Anaconda3\lib\multiprocessing\queues.py", line 357, in get
return ForkingPickler.loads(res)
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>

解决:

Windows下面的multiprocessing跟Linux下面略有不同,Linux下面基于fork,fork之后所有的本地变量都复制一份,因此可以使用任意的全局变量;在Windows下面,多进程是通过启动新进程完成的,所有的全局变量都是重新初始化的,在运行过程中动态生成、修改过的全局变量是不能使用的。

multiprocessing内部使用pickling传递map的参数到不同的进程,当传递一个函数或类时,pickling将函数或者类用所在模块+函数/类名的方式表示,如果对端的Python进程无法在对应的模块中找到相应的函数或者类,就会出错。

当你在Interactive Console当中创建函数的时候,这个函数是动态添加到__main__模块中的,在重新启动的新进程当中不存在,所以会出错。

当不在Console中,而是在独立Python文件中运行时,你会遇到另一个问题:由于你下面调用multiprocessing的代码没有保护,在新进程加载这个模块的时候会重新执行这段代码,创建出新的multiprocessing池,无限调用下去。

解决这个问题的方法是永远把实际执行功能的代码加入到带保护的区域中:if __name__ == '__mian__':

补充知识:multiprocessing Pool的异常处理问题

multiprocessing.Pool开发多进程程序时,在某个子进程执行函数使用了mysql-python连接数据库,

由于程序设计问题,没有捕获到所有异常,导致某个异常错误直接抛到Pool中,导致整个Pool挂了,其异常错误如下所示:

Exception in thread Thread-3:
Traceback (most recent call last):
 File "/usr/lib64/python2.7/threading.py", line 812, in __bootstrap_inner
 self.run()
 File "/usr/lib64/python2.7/threading.py", line 765, in run
 self.__target(*self.__args, **self.__kwargs)
 File "/usr/lib64/python2.7/multiprocessing/pool.py", line 376, in _handle_results
 task = get()
 File "/usr/lib/python2.7/site-packages/mysql/connector/errors.py", line 194, in __init__
 'msg': self.msg.encode('utf8') if PY2 else self.msg
AttributeError: ("'int' object has no attribute 'encode'", <class 'mysql.connector.errors.Error'>,
(2055, "2055: Lost Connection to MySQL '192.169.36.189:3306', system error: timed out", None))

本文档基于以上问题对multiprocessing.Pool以及python-mysql-connector的源码实现进行分析,以定位具体的错误原因。解决方法其实很简单,不要让异常抛到Pool里就行。

问题产生场景

python 版本centos7.3自带的2.7.5版本,或者最新的python-2.7.14

mysql-connector库,版本是2.0及以上,可到官网下载最新版:mysql-connector

问题发生的code其实可以简化为如下所示:

from multiprocessing import Pool, log_to_stderr
import logging
import mysql.connector

# open multiprocessing lib log
log_to_stderr(level=logging.DEBUG)

def func():
 raise mysql.connector.Error("demo test", 100)

if __name__ == "__main__":
 p = Pool(3)
 res = p.apply_async(func)
 res.get()

所以解决问题很简单,在func里加个try-except就可以了。但是如果你好奇为什么为出现AttributeError的异常,那么可以继续往下看。

Multiprocessing.Pool的实现

通过查看源码,大致上multiprocess.Pool的实现如下图所示:

当我们执行以下语句时,主进程会创建三个子线程:_handle_workers、_handle_results、_handle_tasks;同时会创建Pool(n)个数的worker子进程。主进程与各个worker子进程间的通信使用内部定义的Queue,其实就是Pipe管道通信,如上图的_taskqueue、_inqueue和_outqueue。

p = Pool(3)
res = p.apply_async(func)
res.get()

这三个子线程的作用是:

1. handle_workers线程管理worker进程,使进程池维持Pool(n)个worker进程数;

2. handle_tasks线程将用户的任务(包括job_id, 处理函数func等信息)传递到_inqueue中,子进程们竞争获取任务,然后运行相关函数,将结果放在_outqueue中,然后继续监听tasksqueue的任务列表。其实就是典型的生产消费问题。

3. handle_results线程监听_outQqueue的内容,有就拿到,通过字典_cache找到对应的job,将结果存储在*Result对象中,释放该job的信号量,表明job执行完毕。此后,就可以通过*Result.get()函数获取执行结果。

当我们调用p.apply_async 或者p.map时,其实就是创建了AsyncResult或者MapResult对象,然后将task放到_taskqueue中;调用*Result.get()方法等待task被worker子进程执行完成,获取执行结果。

在知道了multprocess.Pool的实现逻辑后,现在我们来探索下,当func将异常抛出时,Pool的worker是怎么处理的。下面的代码是pool.worker工作子进程的核心执行函数的简化版。

def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
 ...
 while xxx:
  try:
   task = get()
  except:
   ...

  job, i, func, args, kwds = task
  try:
   result = (True, func(*args, **kwds))
  except Exception, e:
   result = (False, e)
  ...
  try:
   put((job, i, result))
  except Exception, e:
   ...

从代码中可以看到,在执行func时,如果func抛出异常,那么worker会将异常对象直接放入到_outqueue中,然后等待下一个task。也就是说,worker是可以处理异常的。

那么接下来看看_handle_result线程是怎么处理worker发过来的结果的。如下所示:

@staticmethod
def _handle_results(outqueue, get, cache):
 while 1:
  try:
   task = get()
  except (IOError, EOFError):
   return
  ...

上述代码为_handle_result的主要处理逻辑,可以看到,它只对 IOError, EOFError进行了处理,也就是说,如果在get()时发生了其它异常错误,将导致_handle_result这个线程直接退出(而事实上的确如此)。既然_handle_result退出了,那么就没有动作来触发_cache中*Result对象释放信号量,则用户的执行流程就一直处于wait状态。这样,用户主进程就会一直卡在get()中,导致主流程执行不下去。

我们通过打开multiprocessing库的日志(log_to_stderr(level=logging.DEBUG)),然后修改multiprocessing.Pool中_handel_result的代码,加上一个except Exception,然后运行文章一开始的的异常代码,如下所示:

# multiprocessing : pool.py
#
class Pool(object):
 @staticmethod
 def _handle_results(outqueue, get, cache):
  while 1:
   try:
    task = get()
   except (IOError, EOFError):
    return
   except Exception:
    debug("handle_result not catch Exceptions.")
    return
  ...

控制台如果输出"handle_result not catch Exceptions.",表明_handle_results没有catch到所有的异常。而实际上,真的是由于task = get()这句话抛异常了。

那么,_outqueue.get()方法做了什么。深入查看源码,发现get()方法其实就是os.pipe的read/write方法,但是做了一些处理吧。其内部实现大致如下:

def Pipe(duplex=True):
 ...
 fd1, fd2 = os.pipe()
 c1 = _multiprocessing.Connection(fd1, writable=False) # get
 c2 = _multiprocessing.Connection(fd2, readable=False) # put
 return c1, c2

_multiprocessing.Connection内部使用了C的实现,就不再深入了,否则会就越来越复杂了。它内部应该使用了pickle库,在put时将对象实例pickle(也就是序列化吧),然后在get时将实例unpikcle,重新生成实例对象。具体可查看python官方文档关于pickle的介绍(包括object可pickle的条件以及在unpickle时调用的方法等)。不管如何,就是实例在get,即unpickle的过程出错了。

'msg': self.msg.encode('utf8') if PY2 else self.msg
AttributeError: 'int' object has no attribute 'encode'

从上述错误日志中可以看到,表明在重构时msg参数传入了int类型变量。就是说在unpickle阶段,Mysql Error重新实例化时执行了__init__()方法,但是传参错误了。为了验证这一现象,我将MySql Error的__init__()进行简化,最终确认到self.args的赋值上,即Exception及其子类在unpickle时会调用__init__()方法,并将self.args作为参数列表传递给__init__()。

通过以下代码可以简单的验证问题:

import os
from multiprocessing import Pipe

class DemoError(Exception):

 def __init__(msg, errno):
  print "msg: %s, errno: %s" % (msg, errno)
  self.args = ("aa", "bb")

def func():
 raise DemoError("demo test", 100)

r, w = Pipe(duplex=False)
try:
 result = (True, func(1))
except Exception, e:
 result = (False, e)

print "send result"
w.send(result)
print "get result"
res = r.recv()
print "finished."

日志会在recv调用时打印 msg: aa, errno: bb,表明recv异常类Exception时会将self.args作为参数传入init()函数中。而Mysql的Error类重写self.args变量,而且顺序不对,导致msg在执行编码时出错。MySql Error的实现简化如下:

class Error(Exception):
 def __init__(self, msg=None, errno=None, values=None, sqlstate=None):
  super(Error, self).__init__()
  ...
  if self.msg and self.errno != -1:
   fields = {
    'errno': self.errno,
    'msg': self.msg.encode('utf-8') if PY2 else self.msg
   }
  ...
  self.args = (self.errno, self._full_msg, self.sqlstate)

可以看到,mysql Error中的self.args与__init__(msg, errno, values, sqlstate)的顺序不一,因此self.args第一个参数errno传给了msg,导致AttributeError。至于self.args是什么,简单查了下,是Exception类中定义的,一般用__str__或者__repr__方法的输出,python官方文档不建议overwrite。

总结

好吧,说了这么多,通过问题的追踪,我们也基本上了解清楚multiprocessing.Pool库的实现了。事实上,也很难说是谁的bug,是两者共同作用下出现的。不管如何,希望在用到multiprocessing库时,特别与Pipe相关时,谨慎点使用,最好的不要让异常跑到multiprocess中处理,应该在func中将所有的异常处理掉,如果有自己定于的异常类,请最好保证self.args的顺序与__init__()的顺序一致。同时,网上好像也听说使用multprocessing和subprocess库出现问题,或许也是这个异常抛出的问题,毕竟suprocessError定义与Exception好像有些区别。

以上这篇解决windows下python3使用multiprocessing.Pool出现的问题就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • Python3标准库之threading进程中管理并发操作方法

    1. threading进程中管理并发操作 threading模块提供了管理多个线程执行的API,允许程序在同一个进程空间并发的运行多个操作. 1.1 Thread对象 要使用Thread,最简单的方法就是用一个目标函数实例化一个Thread对象,并调用start()让它开始工作. import threading def worker(): """thread worker function""" print('Worker') threads

  • Python多进程编程常用方法解析

    python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU资源,在python中大部分情况需要使用多进程.python提供了非常好用的多进程包Multiprocessing,只需要定义一个函数,python会完成其它所有事情.借助这个包,可以轻松完成从单进程到并发执行的转换.multiprocessing支持子进程.通信和共享数据.执行不同形式的同步,提供了Process.Queue.Pipe.LocK等组件 一.Process 语法:Process([group[,target

  • Python进程Multiprocessing模块原理解析

    先看看下面的几个方法: star() 方法启动进程, join() 方法实现进程间的同步,等待所有进程退出. close() 用来阻止多余的进程涌入进程池 Pool 造成进程阻塞. 参数: target 是函数名字,需要调用的函数 args 函数需要的参数,以 tuple 的形式传入 用法: multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) 写一个的例子:

  • 解决windows下python3使用multiprocessing.Pool出现的问题

    例如: from multiprocessing import Pool def f(x): return x*x pool = Pool(processes=4) r=pool.map(f, range(100)) pool.close() pool.join() 在spyder里运行直接没反应:在shell窗口里,直接报错,如下: Process SpawnPoolWorker-15: Traceback (most recent call last): File "C:\Anaconda3

  • Windows下python3安装tkinter的问题及解决方法

    最近尝试写python GUI界面,决定先从tkinter开始. 但是遇到了无法安装.执行pip install tkinter没有用,报了如下错误: C:\Users\zhengjie>pip install tkinter Collecting tkinter   Could not find a version that satisfies the requirement tkinter (from versions: ) No matching distribution found fo

  • Windows下python3.6.4安装教程

    Windows下python3.6.4安装教程的详细过程,供大家参考,具体内容如下 步骤: 下包->安装-->添加环境变量-–>测试 1.下包 Python安装包下载:下载地址 下载最新的安装包 找到3.6.4版本: 2.安装 下载之候直接双机就可以开始安装,然后"下一步"就可以.需要注意的一个地方是,如果需要指定安装路径,下面这里选择: 接着下一步,在下面你的地方定义路径: 至此,Python就已经安装到您的本地了,但是十有八九是无法运行的,在Windows的命令行

  • Windows 下python3.8环境安装教程图文详解

    python3.8新功能相关文章 Python 3.8 新功能大揭秘[新手必学] Python 3.8 新功能来一波(大部分人都不知道) 第一步 下载python3.8,官网下载较慢,我已经下载好了 64位 32位 第二步 双击安装,记得勾选红框框! 不勾选的话,安装完需要自己配置环境变量 等待即可 第三步 完成安装,检验是否安装成功 ctrl+R打开运行,输入cmd打开命令行 命令行中运行输入python测试是否运行(若电脑中同时有python2和python3则输入python3来指定运行的

  • 解决Windows下python和pip命令无法使用的问题

    一. python命令找不到 安装python之后经常会出现下面的问题 , python命令找不到,这是因为Windows的环境变量中没有定义python的安装路径 这个时候我们先找到python的安装路径(或者在Python的IDE图标上点击右键 , 选择打开文件所在的位置) 右键点击地址栏 => 将地址复制为文本 => 右键此电脑(或者右键文件管理资源管理器的空白处) => 点击属性 => 在系统中选择高级系统设置 => 点击环境变量 => 在下面的系统变量框中双击

  • Windows下python3.7安装教程

    记录了Windows安装python3.7的详细过程,供大家参考,具体内容如下 1. 在python的官网下载python对应版本:官网地址 64位下载Windows x86-64 executable installer 版本 32位下载Windows x86 executable installer 版本 打开链接如下图,版本会一直更新,选择任意一个适合自己电脑的版本就好 2.勾选 Add python to PATH 添加路径 安装界面点击Customize installation 自定

  • Windows下Python3.6安装第三方模块的方法

    一. 官网下载安装包:  官网网址:https://www.python.org/ 我下载的是3.6.3版本,如下图: 二. 安装安装包,  1. 直接双击运行  2. 选择Customize installation,一定要勾选Add_Python 3.6 to PATH,防止手工添加环境变量 3.选择安装的属性,Documentation.pip.tcl/tk and IDLE 必须安装,tcl/tk and IDLE是Python环境的开发环境窗口,pip用来安装numpy等package

  • 解决windows下Sublime Text 2 运行 PyQt 不显示的方法分享

    解决方案 搜了一下,找到一个 Linux 下的解决方案,如下所示: 复制代码 代码如下: Sublime Text2 运行pySide/pyQt程序的问题 Ctrl-B后,界面不会弹出来,但是后台进程里面有"python.exe",而且使用print能打印出东西来. 解决方法: 打开$sublimeText_dir/Data/Packages/Python/Python.sublime-build 增加 "shell": "true", 按图索骥

  • win10下python3.8的PIL库安装

    1.找到Python的位置 我的是在 C:\Users\admin\AppData\Local\Programs\Python\Python38 AppData这个文件是个隐藏文件需要查询得先把隐藏文件显示出来 win10里面在Microsoft store 下载的Python我只找到了exe文件,所以就卸载然后重新下载过. 去官网下载的时候下载速度极其慢,可以去这下: https://python123.io/download 2.PIL库的安装 PIL库:具有强大的图像处理能力 在上面找到的

  • win10下python3.8的PIL库安装过程

    1.找到Python的位置 我的是在 C:\Users\admin\AppData\Local\Programs\Python\Python38 AppData这个文件是个隐藏文件需要查询得先把隐藏文件显示出来 win10里面在Microsoft store 下载的Python我只找到了exe文件,所以就卸载然后重新下载过. 去官网下载的时候下载速度极其慢,可以去这下: https://python123.io/download 2.PIL库的安装 PIL库:具有强大的图像处理能力 在上面找到的

随机推荐