详解python分布式进程

在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。

Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。

举个例子:如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?

原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。

我们先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:

# task_master.py

import random, time, queue
from multiprocessing.managers import BaseManager
# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()
# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
  pass
# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
  n = random.randint(0, 10000)
  print('Put task %d...' % n)
  task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
  r = result.get(timeout=10)
  print('Result: %s' % r)
# 关闭:
manager.shutdown()

请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。

然后,在另一台机器上启动任务进程(本机上启动也可以):

# task_master.py
import random, time, queue
from multiprocessing.managers import BaseManager
# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()
# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
  pass
# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
  n = random.randint(0, 10000)
  print('Put task %d...' % n)
  task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
  r = result.get(timeout=10)
  print('Result: %s' % r)
# 关闭:
manager.shutdown()

任务进程要通过网络连接到服务进程,所以要指定服务进程的IP。

现在,可以试试分布式进程的工作效果了。先启动task_master.py服务进程:

$ python3 task_master.py
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...

task_master.py进程发送完任务后,开始等待result队列的结果。现在启动task_worker.py进程:

$ python3 task_worker.pyConnect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.

task_worker.py进程结束,在task_master.py进程中会继续打印出结果:

Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956

这个简单的Master/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送。

而Queue之所以能通过网络访问,就是通过QueueManager实现的。由于QueueManager管理的不止一个Queue,所以,要给每个Queue的网络调用接口起个名字,比如get_task_queue。

authkey有什么用?这是为了保证两台机器正常通信,不被其他机器恶意干扰。如果task_worker.py的authkey和task_master.py的authkey不一致,肯定连接不上。

Python的分布式进程接口简单,封装良好,适合需要把繁重任务分布到多台机器的环境下。

注意Queue的作用是用来传递任务和接收结果,每个任务的描述数据量要尽量小。比如发送一个处理日志文件的任务,就不要发送几百兆的日志文件本身,而是发送日志文件存放的完整路径,由Worker进程再去共享的磁盘上读取文件。

以上就是本篇文章所讲述的所有内容,这篇文章主要介绍了python分布式进程的相关知识,希望你能借助资料从而理解上述所说的内容。希望我在这片文章所讲述的内容能够对你有所帮助,让你学习python更加轻松。

(0)

相关推荐

  • python中如何使用分步式进程计算详解

    前言 在python中使用多进程和多线程都能达到同时运行多个任务,和多进程和多线程的选择上,应该优先选择多进程的方式,因为多进程更加稳定,且对于进程的操作管理也更加方便,但有一点是多进程独有的杀手锏,多进程可以将进程分步到多台机器上跑,假如有很多个任务,一台机器即使开了多进程或者多进程跑起来还是要耗很多时间,那么这时就要想一下可否将任务分配到多台机器上跑,这样可以更快的完成任务. 在分步式进程运算中,进程之前的通信还是依赖于Queue,但此时的队列不能直接使用,需要使用multiprocessi

  • python3学习笔记之多进程分布式小例子

    最近一直跟着廖大在学Python,关于分布式进程的小例子挺有趣的,这里做个记录. 分布式进程 Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上.一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信.由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序. master服务端原理:通过managers模块把Queue通过网络暴露出去,其他机器的进程就可以访问Queue了 服

  • 在Python程序中实现分布式进程的教程

    在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上. Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上.一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信.由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序. 举个例子:如果我们已经有一个通

  • 详解python分布式进程

    在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上. Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上.一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信.由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序. 举个例子:如果我们已经有一个通

  • 详解python网络进程

    目录 一.多任务编程 二.进程 三.os.fork创建进程 3.1.进程ID和退出函数 四.孤儿和僵尸 4.1.孤儿进程 4.2.僵尸进程 4.3.如何避免僵尸进程的产生 五.Multiprocessing创建进程 5.1.multiprocessing进程属性 六.进程池 七.进程间通信(IPC) 7.1.管道通信(Pipe) 7.2.消息队列 7.3.共享内存 7.4.信号量(信号灯集) 一.多任务编程 意义:充分利用计算机的资源提高程序的运行效率 定义:通过应用程序利用计算机多个核心,达到

  • 实例详解Python的进程,线程和协程

    目录 前言 前提条件 相关介绍 实验环境 进程 多进程 用进程池对多进程进行操作 线程 使用_thread模块实现 使用threading模块实现 协程 使用asyncio模块实现 总结 前言 本文用Python实例阐述了一些关于进程.线程和协程的概念,由于水平有限,难免出现错漏,敬请批评改正. 前提条件 熟悉Python基本语法熟悉Python操作进程.线程.协程的相关库 相关介绍 Python是一种跨平台的计算机程序设计语言.是一个高层次的结合了解释性.编译性.互动性和面向对象的脚本语言.最

  • 详解Python中的进程和线程

    进程是什么? 进程就是一个程序在一个数据集上的一次动态执行过程.进程一般由程序.数据集.进程控制块三部分组成.我们编写的程序用来描述进程要完成哪些功能以及如何完成:数据集则是程序在执行过程中所需要使用的资源:进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志. 线程是什么? 线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID.程序计数器.寄存器集合和堆栈共同组成.线程的引入减小了程序并发

  • 详解Python实现多进程异步事件驱动引擎

    本文介绍了详解Python实现多进程异步事件驱动引擎,分享给大家,具体如下: 多进程异步事件驱动逻辑 逻辑 code # -*- coding: utf-8 -*- ''' author: Jimmy contact: 234390130@qq.com file: eventEngine.py time: 2017/8/25 上午10:06 description: 多进程异步事件驱动引擎 ''' __author__ = 'Jimmy' from multiprocessing import

  • 详解Python之unittest单元测试代码

    前言 编写函数或者类时,还可以为其编写测试.通过测试,可确定代码面对各种输入都能够按要求的那样工作. 本次我将介绍如何使用Python模块unittest中的工具来测试代码. 测试函数 首先我们先编写一个简单的函数,它接受姓.名.和中间名三个参数,并返回完整的姓名: names.py def get_fullname(firstname,lastname,middel=''): '''创建全名''' if middel: full_name = firstname + ' ' + middel

  • 详解python中的线程

    Python中创建线程有两种方式:函数或者用类来创建线程对象. 函数式:调用 _thread 模块中的start_new_thread()函数来产生新线程. 类:创建threading.Thread的子类来包装一个线程对象. 1.线程的创建 1.1 通过thread类直接创建 import threading import time def foo(n): time.sleep(n) print("foo func:",n) def bar(n): time.sleep(n) prin

  • 详解python中GPU版本的opencv常用方法介绍

    引言 本篇是以python的视角介绍相关的函数还有自我使用中的一些问题,本想在这篇之前总结一下opencv编译的全过程,但遇到了太多坑,暂时不太想回看做过的笔记,所以这里主要总结python下GPU版本的opencv. 主要函数说明 threshold():二值化,但要指定设定阈值 blendLinear():两幅图片的线形混合 calcHist() createBoxFilter ():创建一个规范化的2D框过滤器 canny边缘检测 createGaussianFilter():创建一个Ga

  • 详解Python IO口多路复用

    什么是IO 多路复用呢? 我一个SocketServer有500个链接连过来了,我想让500个链接都是并发的,每一个链接都需要操作IO,但是单线程下IO都是串行的,我实现多路的,看起来像是并发的效果,这就是多路复用! 概念说明: 在进行解释之前,首先要说明几个概念: - 用户空间和内核空间 现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方).操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权

  • 详解python logging日志传输

    1.生成日志并通过http传输出去(通过HTTPHandler方式): #生成并发送日志 import logging from logging.handlers import HTTPHandler import logging.config def save(): logger = logging.getLogger(__name__) # 生成一个log实例,如果括号为空则返回root logger hh = HTTPHandler(host='127.0.0.1:5000', url='

随机推荐