python中如何使用分步式进程计算详解

前言

在python中使用多进程和多线程都能达到同时运行多个任务,和多进程和多线程的选择上,应该优先选择多进程的方式,因为多进程更加稳定,且对于进程的操作管理也更加方便,但有一点是多进程独有的杀手锏,多进程可以将进程分步到多台机器上跑,假如有很多个任务,一台机器即使开了多进程或者多进程跑起来还是要耗很多时间,那么这时就要想一下可否将任务分配到多台机器上跑,这样可以更快的完成任务。

在分步式进程运算中,进程之前的通信还是依赖于Queue,但此时的队列不能直接使用,需要使用multiprocessing.managers.BaseManager 进行包装,通过回调以后才能使用,既然是分步式的调用,那么应该有一个服务端和一个客户端,服务端通过网络协议将队列中的信息给各个客户端进行调用,客户端也可以通过队列将结果返回,然后服务端进行结果的收集展示,流程如下


分步式流程

服务端将任务放到 task_queue 中,然后四个客户端通过网络端口从task_queue中获取到任务,然后进行计算,再将结果放到result_queue中,最后服务端统一处理结果。整体的流程比较清晰,只是需要强调,这里的队列不能是原始的队列,需要使用BaseManager 进行包装。

先看一下服务端的代码

#coding:gbk
import time, queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support

# 任务个数
task_number = 10
# 定义收发队列
task_queue = queue.Queue(task_number)
result_queue = queue.Queue(task_number)

def gettask():
 return task_queue

def getresult():
 return result_queue

def test():
 # windows下绑定调用接口不能使用lambda,所以只能先定义函数再绑定
 BaseManager.register('get_task', callable=gettask)
 BaseManager.register('get_result', callable=getresult)
 # 绑定端口并设置验证码,windows下需要填写ip地址,linux下不填默认为本地
 manager = BaseManager(address=('127.0.0.1', 5002), authkey=b'123')
 # 启动
 manager.start()
 try:
  # 通过网络获取任务队列和结果队列
  task = manager.get_task()
  result = manager.get_result()
  # 添加任务
  for i in range(task_number):
   print('Put task %d...' % i)
   task.put(i)
  # 每秒检测一次是否所有任务都被执行完
  while not result.full():
   print(task.qsize())
   time.sleep(1)
  for i in range(result.qsize()):
   ans = result.get()
   print('task %d is finish , runtime:%d s' % ans)
 except:
  print('Manager error')
 finally:
  manager.shutdown()

if __name__ == '__main__':
 # windows下多进程可能会炸,添加这句可以缓解
 freeze_support()
 test()

这里重点说一下 BaseManager.register('get_task', callable=gettask) 这行代码,它的意思是注册一个get_task的操作,执行的操作是gettask()函数,上面定义了gettask()函数,返回的是task_queue,这也是之前说的不能直接使用queue.Queue,必须要使用通过BaseManager的register接口封装过的的队列,下面使用task = manager.get_task()来获取到这个队列。

manager = BaseManager(address=('127.0.0.1', 5002), authkey=b'123')

这行代码初始了一个manager,它绑定了本机的5002端口,并且在客户端连接的时候需要一个密码:123。

接下来看一下客户端代码。

#coding:gbk

import time, sys, queue, random
from multiprocessing.managers import BaseManager
BaseManager.register('get_task')
BaseManager.register('get_result')
conn = BaseManager(address = ('127.0.0.1',5002), authkey = b'123')
try:
 conn.connect()
except:
 print('连接失败')
 sys.exit()
task = conn.get_task()
result = conn.get_result()
while not task.empty():
 print(task.qsize())
 n = task.get(timeout = 1)
 print('run task %d' % n)
 sleeptime = random.randint(0,3)
 time.sleep(sleeptime)
 rt = (n, sleeptime)
 result.put(rt)
if __name__ == '__main__':
 pass;

这里主要看以下的代码

BaseManager.register('get_task')
BaseManager.register('get_result')

这两个是注册函数,和之前的服务端所对应,之前服务端注册了这两个函数,这里才能注册使用,注意这里不能注册服务端没有注册的函数

运行一下,先运行服务端,然后再启两个cmd运行客户端,也可以在局域网中的另外的机器上运行,但是要修改服务端的ip地址

服务端的结果如下

Put task 0...
Put task 1...
Put task 2...
Put task 3...
Put task 4...
Put task 5...
Put task 6...
Put task 7...
Put task 8...
Put task 9...
task 0 is finish , runtime:3 s
task 1 is finish , runtime:0 s
task 2 is finish , runtime:2 s
task 4 is finish , runtime:1 s
task 3 is finish , runtime:3 s
task 6 is finish , runtime:1 s
task 7 is finish , runtime:0 s
task 5 is finish , runtime:3 s
task 8 is finish , runtime:2 s
task 9 is finish , runtime:3 s

两个客户端的结果分别如下

客户端1

10
run task 0
9
run task 1
8
run task 2
6
run task 4
5
run task 5
1
run task 9

客户端2

7
run task 3
4
run task 6
3
run task 7
2
run task 8

一起运行的截图如下

结果

由于队列是线程安全的,所以这里不用加锁,在客户端中打印print(task.qsize()) 当前的队列大小,可以看到队列的信息中同步到各个客户端的。

最后还是要多说一句,分步式多进程虽然可以把任务分散到不同的机器上运行,可以处理多任务,但是如果此时服务端挂掉的话,任务就全丢掉了,所以在生产环境下还是考虑使用消息中间件如kafka等。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对我们的支持。

(0)

相关推荐

  • 详解python分布式进程

    在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上. Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上.一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信.由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序. 举个例子:如果我们已经有一个通

  • 在Python程序中实现分布式进程的教程

    在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上. Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上.一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信.由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序. 举个例子:如果我们已经有一个通

  • python3学习笔记之多进程分布式小例子

    最近一直跟着廖大在学Python,关于分布式进程的小例子挺有趣的,这里做个记录. 分布式进程 Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上.一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信.由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序. master服务端原理:通过managers模块把Queue通过网络暴露出去,其他机器的进程就可以访问Queue了 服

  • python中如何使用分步式进程计算详解

    前言 在python中使用多进程和多线程都能达到同时运行多个任务,和多进程和多线程的选择上,应该优先选择多进程的方式,因为多进程更加稳定,且对于进程的操作管理也更加方便,但有一点是多进程独有的杀手锏,多进程可以将进程分步到多台机器上跑,假如有很多个任务,一台机器即使开了多进程或者多进程跑起来还是要耗很多时间,那么这时就要想一下可否将任务分配到多台机器上跑,这样可以更快的完成任务. 在分步式进程运算中,进程之前的通信还是依赖于Queue,但此时的队列不能直接使用,需要使用multiprocessi

  • Python中常用的高阶函数实例详解

    前言 高阶函数指的是能接收函数作为参数的函数或类:python中有一些内置的高阶函数,在某些场合使用可以提高代码的效率. lambda 当在使用一些函数的时候,我们不需要显式定义函数名称,直接传入lambda匿名函数即可.lambda匿名函数通常和其他函数搭配使用. 比如可以直接使用如下的lambda表达式计算当x=3时,y = x * 3 + 5的函数值. In [1]: (lambda x: x * 3 + 5)(3) Out[1]: 14 map map函数将一个函数和序列/迭代器(可以传

  • python中函数总结之装饰器闭包详解

    1.前言 函数也是一个对象,从而可以增加属性,使用句点来表示属性. 如果内部函数的定义包含了在外部函数中定义的对象的引用(外部对象可以是在外部函数之外),那么内部函数被称之为闭包. 2.装饰器 装饰器就是包装原来的函数,从而在不需要修改原来代码的基础之上,可以做更多的事情. 装饰器语法如下: @deco2 @deco1 def func(arg1,arg2...): pass 这个表示了有两个装饰器的函数,那么表示的含义为:func = deco2(deco1(func)) 无参装饰器语法如下:

  • 对python中矩阵相加函数sum()的使用详解

    假如矩阵A是n*n的矩阵 A.sum()是计算矩阵A的每一个元素之和. A.sum(axis=0)是计算矩阵每一列元素相加之和. A.Sum(axis=1)是计算矩阵的每一行元素相加之和. 以上这篇对python中矩阵相加函数sum()的使用详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们.

  • Python 中的函数装饰器和闭包详解

    函数装饰器可以被用于增强方法的某些行为,如果想自己实现装饰器,则必须了解闭包的概念. 装饰器的基本概念 装饰器是一个可调用对象,它的参数是另一个函数,称为被装饰函数.装饰器可以修改这个函数再将其返回,也可以将其替换为另一个函数或者可调用对象. 例如:有个名为 decorate 的装饰器: @decorate def target(): print('running target()') 上述代码的写法和以下写法的效果是一样的: def target(): print('running targe

  • Python中八大图像特效算法的示例详解

    目录 0写在前面 1毛玻璃特效 2浮雕特效 3油画特效 4马赛克特效 5素描特效 6怀旧特效 7流年特效 8卡通特效 0 写在前面 图像特效处理是基于图像像素数据特征,将原图像进行一定步骤的计算——例如像素作差.灰度变换.颜色通道融合等,从而达到期望的效果.图像特效处理是日常生活中应用非常广泛的一种计算机视觉应用,出现在各种美图软件中,这些精美滤镜背后的数学原理都是相通的,本文主要介绍八大基本图像特效算法,在这些算法基础上可以进行二次开发,生成更高级的滤镜. 本文采用面向对象设计,定义了一个图像

  • python中函数的返回值及类型详解

    目录 1.返回值介绍 2.带有返回值的函数 3.保存函数的返回值 4.四种函数的类型 1.无参数,无返回值的函数 2.无参数,有返回值的函数 3.有参数,无返回值的函数 4.有参数,有返回值的函数 5.小结 5.在python中我们可不可以返回多个值? 1.返回值介绍 现实生活中的场景: 我给儿子10块钱,让他给我买包烟.这个例子中,10块钱是我给儿子的,就相当于调用函数时传递到参数,让儿子买烟这个事情最终的目标是,让他把烟给你带回来然后给你对么,,,此时烟就是返回值 开发中的场景: 定义了一个

  • Python 中 Virtualenv 和 pip 的简单用法详解

    本文介绍了Python 中 Virtualenv 和 pip 的简单用法详解,分享给大家,具体如下: 0X00 安装环境 我们在 Python 开发和学习过程中需要用到各种库,然后在各个不同的项目和作品里可能用的版本还不一样,正因为有这种问题的存在才催生了virtualenv的诞生.virtualenv 可以在电脑上创建一个虚拟环境,可以针对每一个项目创建一个虚拟环境,这样就不用担心各个不同的项目用不同版本的库的时候出现的冲突了. 下面的内容只适用于 Linux/OSX,未经 Windows 环

  • python中 chr unichr ord函数的实例详解

    python中 chr unichr ord函数的实例详解 chr()函数用一个范围在range(256)内的(就是0-255)整数作参数,返回一个对应的字符.unichr()跟它一样,只不过返回的是Unicode字符,这个从Python 2.0才加入的unichr()的参数范围依赖于你的python是如何被编译的.如果是配置为USC2的Unicode,那么它的允许范围就是range(65536)或0x0000-0xFFFF:如果配置为UCS4,那么这个值应该是range(1114112)或0x

  • python中函数默认值使用注意点详解

    当在函数中定义默认值时,值初始化只会进行一次,就是执行到def methodname时执行.看下面代码: from datetime import datetime def test(t=datetime.today()): print t if __name__ == "__main__": test() test() 两次方法调用输出的时间都为同一个值,而不是我们预想当前执行时间.对于上面这种情况,建议用下面的方式实现: from datetime import datetime

随机推荐