Python分布式进程中你会遇到的问题解析

小惊大怪

你是不是在用Python3或者在windows系统上编程?最重要的是你对进程和线程不是很清楚?那么恭喜你,在python分布式进程中,会有坑等着你去挖。。。(hahahaha,此处允许我吓唬一下你)开玩笑的啦,不过,如果你知道序列中不支持匿名函数,那这个坑就和你say byebye了。好了话不多数,直接进入正题。

分布式进程

正如大家所知道的Process比Thread更稳定,而且Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。

代码记录

举个例子

如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上,这应该怎么用分布式进程来实现呢?你已经知道了原有的Queue可以继续使用,而且通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程来访问Queue了。好,那我们就这么干!

写个task_master.py

我们先看服务进程。服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务。

#!/user/bin/pytthon
# -*- coding:utf-8 -*-
# @Time: 2018/3/3 16:46
# @Author: lichexo
# @File: task_master.py
import random, time, queue
from multiprocessing.managers import BaseManager
# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()
# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
 pass
# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
 n = random.randint(0, 10000)
 print('Put task %d...' % n)
 task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
 r = result.get(timeout=10)
 print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.')

请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。然后,在另一台机器上启动任务进程(本机上启动也可以)

写个task_worker.py

#!/user/bin/pytthon
# -*- coding:utf-8 -*-
# @Time: 2018/3/3 16:46
# @Author: lichexo
# @File: task_worker.py
import time, sys, queue
from multiprocessing.managers import BaseManager
# 创建类似的QueueManager:
class QueueManager(BaseManager):
 pass
# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
 try:
 n = task.get(timeout=1)
 print('run task %d * %d...' % (n, n))
 r = '%d * %d = %d' % (n, n, n*n)
 time.sleep(1)
 result.put(r)
 except Queue.Empty:
 print('task queue is empty.')
# 处理结束:
print('worker exit.')

任务进程要通过网络连接到服务进程,所以要指定服务进程的IP。

运行结果

现在,可以试试分布式进程的工作效果了。先启动task_master.py服务进程:

Traceback (most recent call last):
 File "F:/Python/untitled/xianchengjincheng/master.py", line 25, in <module>
 manager.start()
 File "F:Pythonpystalllibmultiprocessingmanagers.py", line 513, in start
 self._process.start()
 File "F:Pythonpystalllibmultiprocessingprocess.py", line 105, in start
 self._popen = self._Popen(self)
 File "F:Pythonpystalllibmultiprocessingcontext.py", line 322, in _Popen
 return Popen(process_obj)
 File "F:Pythonpystalllibmultiprocessingpopen_spawn_win32.py", line 65, in __init__
 reduction.dump(process_obj, to_child)
 File "F:Pythonpystalllibmultiprocessing
eduction.py", line 60, in dump
 ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x00000202D1921E18>: attribute lookup <lambda> on __main__ failed

task_master.py进程发送完任务后,开始等待result队列的结果。现在启动task_worker.py进程:

Connect to server 127.0.0.1...
Traceback (most recent call last):
 File "F:/Python/untitled/xianchengjincheng/work.py", line 24, in <module>
 m.connect()
 File "F:Pythonpystalllibmultiprocessingmanagers.py", line 489, in connect
 conn = Client(self._address, authkey=self._authkey)
 File "F:Pythonpystalllibmultiprocessingconnection.py", line 487, in Client
 c = SocketClient(address)
 File "F:Pythonpystalllibmultiprocessingconnection.py", line 614, in SocketClient
 s.connect(address)
ConnectionRefusedError: [WinError 10061] 由于目标计算机积极拒绝,无法连接。

看到没,结果都出错了,我们好好分析一下到底哪出错了。。。

错误分析

在task_master.py的报错提示中,我们知道它说lambda错误,这是因为序列化不支持匿名函数,所以我们得修改代码,重新对queue用QueueManager进行封装放到网络中。

# 把两个Queue都注册到网络上, callable参数关联了Queue对象
QueueManager.register('get_task_queue',callable=return_task_queue)
QueueManager.register('get_result_queue',callable=return_result_queue)

其中task_queue和result_queue是两个队列,分别存放任务和结果。它们用来进行进程间通信,交换对象。

因为是分布式的环境,放入queue中的数据需要等待Workers机器运算处理后再进行读取,这样就需要对queue用QueueManager进行封装放到网络中,这是通过上面的2行代码来实现的。我们给return_task_queue的网络调用接口取了一个名get_task_queue,而return_result_queue的名字是get_result_queue,方便区分对哪个queue进行操作。task.put(n)即是对task_queue进行写入数据,相当于分配任务。而result.get()即是等待workers机器处理后返回的结果。

值得注意 在windows系统中你必须要写IP地址,而其他操作系统比如linux操作系统则就不要了。

# windows需要写ip地址
manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')

修改后的代码

在task_master.py中修改如下:

#!/user/bin/pytthon
# -*- coding:utf-8 -*-
# @Time: 2018/3/3 16:46
# @Author: lichexo
# @File: task_master.py
# task_master.py
import random,time,queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
task_queue = queue.Queue() # 发送任务的队列:
result_queue = queue.Queue() # 接收结果的队列:
class QueueManager(BaseManager): # 从BaseManager继承的QueueManager:
 pass
# windows下运行
def return_task_queue():
 global task_queue
 return task_queue # 返回发送任务队列
def return_result_queue ():
 global result_queue
 return result_queue # 返回接收结果队列
def test():
 # 把两个Queue都注册到网络上, callable参数关联了Queue对象,它们用来进行进程间通信,交换对象
 #QueueManager.register('get_task_queue', callable=lambda: task_queue)
 #QueueManager.register('get_result_queue', callable=lambda: result_queue)
 QueueManager.register('get_task_queue', callable=return_task_queue)
 QueueManager.register('get_result_queue', callable=return_result_queue)
 # 绑定端口5000, 设置验证码'abc':
 #manager = QueueManager(address=('', 5000), authkey=b'abc')
 # windows需要写ip地址
 manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
 manager.start() # 启动Queue:
 # 获得通过网络访问的Queue对象:
 task = manager.get_task_queue()
 result = manager.get_result_queue()
 for i in range(10): # 放几个任务进去:
 n = random.randint(0, 10000)
 print('Put task %d...' % n)
 task.put(n)
 # 从result队列读取结果:
 print('Try get results...')
 for i in range(10):
 # 这里加了异常捕获
 try:
 r = result.get(timeout=5)
 print('Result: %s' % r)
 except queue.Empty:
 print('result queue is empty.')
 # 关闭:
 manager.shutdown()
 print('master exit.')
if __name__=='__main__':
 freeze_support()
 print('start!')
 test()

在task_worker.py中修改如下:

#!/user/bin/pytthon
# -*- coding:utf-8 -*-
# @Time: 2018/3/3 16:46
# @Author: lichexo
# @File: task_worker.py
# task_worker.py
import time, sys, queue
from multiprocessing.managers import BaseManager
# 创建类似的QueueManager:
class QueueManager(BaseManager):
 pass
# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
 try:
 n = task.get(timeout=1)
 print('run task %d * %d...' % (n, n))
 r = '%d * %d = %d' % (n, n, n*n)
 time.sleep(1)
 result.put(r)
 except queue.Empty:
 print('task queue is empty.')
# 处理结束:
print('worker exit.')

先运行task_master.py,然后再运行task_worker.py

(1)task_master.py运行结果如下

start!
Put task 7872...
Put task 6931...
Put task 1395...
Put task 8477...
Put task 8300...
Put task 1597...
Put task 8738...
Put task 8627...
Put task 1884...
Put task 2561...
Try get results...
Result: 7872 * 7872 = 61968384
Result: 6931 * 6931 = 48038761
Result: 1395 * 1395 = 1946025
Result: 8477 * 8477 = 71859529
Result: 8300 * 8300 = 68890000
Result: 1597 * 1597 = 2550409
Result: 8738 * 8738 = 76352644
Result: 8627 * 8627 = 74425129
Result: 1884 * 1884 = 3549456
Result: 2561 * 2561 = 6558721
master exit.

(2)task_worker.py运行结果如下

Connect to server 127.0.0.1...
run task 8640 * 8640...
run task 7418 * 7418...
run task 9303 * 9303...
run task 568 * 568...
run task 1633 * 1633...
run task 3583 * 3583...
run task 3293 * 3293...
run task 8975 * 8975...
run task 8189 * 8189...
run task 731 * 731...
worker exit.

知识补充

这个简单的Master/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送。

Queue对象存储在哪?注意到task_worker.py中根本没有创建Queue的代码,所以,Queue对象存储在task_master.py进程中:

而Queue之所以能通过网络访问,就是通过QueueManager实现的。由于QueueManager管理的不止一个Queue,所以,要给每个Queue的网络调用接口起个名字,比如get_task_queue。task_worker这里的QueueManager注册的名字必须和task_manager中的一样。对比上面的例子,可以看出Queue对象从另一个进程通过网络传递了过来。只不过这里的传递和网络通信由QueueManager完成。

authkey有什么用?这是为了保证两台机器正常通信,不被其他机器恶意干扰。如果task_worker.py的authkey和task_master.py的authkey不一致,肯定连接不上。

总结

以上所述是小编给大家介绍的Python分布式进程中你会遇到的问题解析,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对我们网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!

(0)

相关推荐

  • python3学习笔记之多进程分布式小例子

    最近一直跟着廖大在学Python,关于分布式进程的小例子挺有趣的,这里做个记录. 分布式进程 Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上.一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信.由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序. master服务端原理:通过managers模块把Queue通过网络暴露出去,其他机器的进程就可以访问Queue了 服

  • 在Python程序中实现分布式进程的教程

    在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上. Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上.一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信.由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序. 举个例子:如果我们已经有一个通

  • 详解python分布式进程

    在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上. Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上.一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信.由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序. 举个例子:如果我们已经有一个通

  • Python分布式进程中你会遇到的问题解析

    小惊大怪 你是不是在用Python3或者在windows系统上编程?最重要的是你对进程和线程不是很清楚?那么恭喜你,在python分布式进程中,会有坑等着你去挖...(hahahaha,此处允许我吓唬一下你)开玩笑的啦,不过,如果你知道序列中不支持匿名函数,那这个坑就和你say byebye了.好了话不多数,直接进入正题. 分布式进程 正如大家所知道的Process比Thread更稳定,而且Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上.Python的m

  • python分布式爬虫中消息队列知识点详解

    当排队等待人数过多的时候,我们需要设置一个等待区防止秩序混乱,同时再有新来的想要排队也可以呆在这个地方.那么在python分布式爬虫中,消息队列就相当于这样的一个区域,爬虫要进入这个区域找寻自己想要的资源,当然这个是一定的次序的,不然数据获取就会出现重复.就下来我们就python分布式爬虫中的消息队列进行详细解释,小伙伴们可以进一步了解一下. 实现分布式爬取的关键是消息队列,这个问题以消费端为视角更容易理解.你的爬虫程序部署到很多台机器上,那么他们怎么知道自己要爬什么呢?总要有一个地方存储了他们

  • Python max函数中key的用法及原理解析

    一.背景 起源于一个问题:怎样找到字符串中出现次数最多的字符 其实使用max函数就能很轻松的解决这个问题: 代码: str1 = "AAAaaa8888899sssss" print(max(str1, key=str1.count)) 结果:8 二.原理 max()函数用于获得给定的可迭代对象中的最大值. key是max()函数的一个参数,它辅助max函数找到最大元素.当max() 函数中有 key 参数时,求的是 value 的最大值,当没有 key 参数时,求的是 key 的最大

  • python实现在每个独立进程中运行一个函数的方法

    本文实例讲述了python实现在每个独立进程中运行一个函数的方法.分享给大家供大家参考.具体分析如下: 这个简单的函数可以同于在单独的进程中运行另外一个函数,这对于释放内存资源非常有用 #!/usr/bin/env python from __future__ import with_statement import os, cPickle def run_in_separate_process(func, *args, **kwds): pread, pwrite = os.pipe() pi

  • Python多进程入门、分布式进程数据共享实例详解

    本文实例讲述了Python多进程入门.分布式进程数据共享.分享给大家供大家参考,具体如下: python多进程入门 https://docs.python.org/3/library/multiprocessing.html 1.先来个简单的 # coding: utf-8 from multiprocessing import Process # 定义函数 def addUser(): print("addUser") if __name__ == "__main__&qu

  • python django框架中使用FastDFS分布式文件系统的安装方法

    一.安装FastDFS 1-1:执行docker命令安装 # 安装tracker docker run -dti --network=host --name tracker -v /var/fdfs/tracker:/var/fdfs youkou1/fastdfs tracker # 安装storage docker run -dti --network=host --name storage -e TRACKER_SERVER=IP地址:22122 -v /var/fdfs/storage:

  • python 在threading中如何处理主进程和子线程的关系

    之前用python的多线程,总是处理不好进程和线程之间的关系.后来发现了join和setDaemon函数,才终于弄明白.下面总结一下. 1.使用join函数后,主进程会在调用join的地方等待子线程结束,然后才接着往下执行. join使用实例如下: import time import random import threading class worker(threading.Thread): def __init__(self): threading.Thread.__init__(self

  • 如何用tempfile库创建python进程中的临时文件

    技术背景 临时文件在python项目中时常会被使用到,其作用在于随机化的创建不重名的文件,路径一般都是放在Linux系统下的/tmp目录.如果项目中并不需要持久化的存储一个文件,就可以采用临时文件的形式进行存储和读取,在使用之后可以自行决定是删除还是保留. tempfile库的使用 tempfile一般是python内置的一个函数库,不需要单独安装,这里我们直接介绍一下其常规使用方法: # tempfile_test.py import tempfile file = tempfile.Name

随机推荐