Python使用multiprocessing实现一个最简单的分布式作业调度系统

mutilprocess像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。

介绍

Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个机器的多个进程中,依靠网络通信。

想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统。

实现

Job

首先创建一个Job类,为了测试简单,只包含一个job id属性

job.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
class Job:
def __init__(self, job_id):
self.job_id = job_id

Master

Master用来派发作业和显示运行完成的作业信息

master.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from Queue import Queue
from multiprocessing.managers import BaseManager
from job import Job

class Master:

def __init__(self):
# 派发出去的作业队列
self.dispatched_job_queue = Queue()
# 完成的作业队列
self.finished_job_queue = Queue()
def get_dispatched_job_queue(self):
return self.dispatched_job_queue
def get_finished_job_queue(self):
return self.finished_job_queue
def start(self):
# 把派发作业队列和完成作业队列注册到网络上
BaseManager.register('get_dispatched_job_queue', callable=self.get_dispatched_job_queue)
BaseManager.register('get_finished_job_queue', callable=self.get_finished_job_queue)
# 监听端口和启动服务
manager = BaseManager(address=('0.0.0.0', 8888), authkey='jobs')
manager.start()
# 使用上面注册的方法获取队列
dispatched_jobs = manager.get_dispatched_job_queue()
finished_jobs = manager.get_finished_job_queue()
# 这里一次派发10个作业,等到10个作业都运行完后,继续再派发10个作业
job_id = 0
while True:
for i in range(0, 10):
job_id = job_id + 1
job = Job(job_id)
print('Dispatch job: %s' % job.job_id)
dispatched_jobs.put(job)
while not dispatched_jobs.empty():
job = finished_jobs.get(60)
print('Finished Job: %s' % job.job_id)
manager.shutdown()
if __name__ == "__main__":
master = Master()
master.start()

Slave

Slave用来运行master派发的作业并将结果返回

slave.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
from Queue import Queue
from multiprocessing.managers import BaseManager
from job import Job

class Slave:

def __init__(self):
# 派发出去的作业队列
self.dispatched_job_queue = Queue()
# 完成的作业队列
self.finished_job_queue = Queue()

def start(self):

# 把派发作业队列和完成作业队列注册到网络上
BaseManager.register('get_dispatched_job_queue')
BaseManager.register('get_finished_job_queue')
# 连接master
server = '127.0.0.1'
print('Connect to server %s...' % server)
manager = BaseManager(address=(server, 8888), authkey='jobs')
manager.connect()
# 使用上面注册的方法获取队列
dispatched_jobs = manager.get_dispatched_job_queue()
finished_jobs = manager.get_finished_job_queue()
# 运行作业并返回结果,这里只是模拟作业运行,所以返回的是接收到的作业
while True:
job = dispatched_jobs.get(timeout=1)
print('Run job: %s ' % job.job_id)
time.sleep(1)
finished_jobs.put(job)
if __name__ == "__main__":
slave = Slave()
slave.start()

测试

分别打开三个linux终端,第一个终端运行master,第二个和第三个终端用了运行slave,运行结果如下

master

$ python master.py
Dispatch job: 1
Dispatch job: 2
Dispatch job: 3
Dispatch job: 4
Dispatch job: 5
Dispatch job: 6
Dispatch job: 7
Dispatch job: 8
Dispatch job: 9
Dispatch job: 10
Finished Job: 1
Finished Job: 2
Finished Job: 3
Finished Job: 4
Finished Job: 5
Finished Job: 6
Finished Job: 7
Finished Job: 8
Finished Job: 9
Dispatch job: 11
Dispatch job: 12
Dispatch job: 13
Dispatch job: 14
Dispatch job: 15
Dispatch job: 16
Dispatch job: 17
Dispatch job: 18
Dispatch job: 19
Dispatch job: 20
Finished Job: 10
Finished Job: 11
Finished Job: 12
Finished Job: 13
Finished Job: 14
Finished Job: 15
Finished Job: 16
Finished Job: 17
Finished Job: 18
Dispatch job: 21
Dispatch job: 22
Dispatch job: 23
Dispatch job: 24
Dispatch job: 25
Dispatch job: 26
Dispatch job: 27
Dispatch job: 28
Dispatch job: 29
Dispatch job: 30

slave1

$ python slave.py
Connect to server 127.0.0.1...
Run job: 1
Run job: 2
Run job: 3
Run job: 5
Run job: 7
Run job: 9
Run job: 11
Run job: 13
Run job: 15
Run job: 17
Run job: 19
Run job: 21
Run job: 23 

slave2

$ python slave.py
Connect to server 127.0.0.1...
Run job: 4
Run job: 6
Run job: 8
Run job: 10
Run job: 12
Run job: 14
Run job: 16
Run job: 18
Run job: 20
Run job: 22
Run job: 24 

以上内容是小编给大家介绍的Python使用multiprocessing实现一个最简单的分布式作业调度系统,希望对大家有所帮助!

(0)

相关推荐

  • Python multiprocessing.Manager介绍和实例(进程间共享数据)

    Python中进程间共享数据,处理基本的queue,pipe和value+array外,还提供了更高层次的封装.使用multiprocessing.Manager可以简单地使用这些高级接口. Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问.从而达到多进程间数据通信且安全. Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaph

  • Python多进程并发(multiprocessing)用法实例详解

    本文实例讲述了Python多进程并发(multiprocessing)用法.分享给大家供大家参考.具体分析如下: 由于Python设计的限制(我说的是咱们常用的CPython).最多只能用满1个CPU核心. Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情.借助这个包,可以轻松完成从单进程到并发执行的转换. 1.新建单一进程 如果我们新建少量进程,可以如下: import multiprocessing import t

  • Python使用multiprocessing创建进程的方法

    本文实例讲述了Python使用multiprocessing创建进程的方法.分享给大家供大家参考.具体分析如下: 进程可以通过调用multiprocessing的Process进行创建,下面代码创建两个进程. [root@localhost ~]# cat twoproces.py #!/usr/bin/env python from multiprocessing import Process import os def output(): print "My pid is :%d\n&quo

  • Python multiprocessing模块中的Pipe管道使用实例

    multiprocessing.Pipe([duplex]) 返回2个连接对象(conn1, conn2),代表管道的两端,默认是双向通信.如果duplex=False,conn1只能用来接收消息,conn2只能用来发送消息.不同于os.open之处在于os.pipe()返回2个文件描述符(r, w),表示可读的和可写的 实例如下: 复制代码 代码如下: #!/usr/bin/python #coding=utf-8 import os from multiprocessing import P

  • Python多进程multiprocessing用法实例分析

    本文实例讲述了Python多进程multiprocessing用法.分享给大家供大家参考,具体如下: mutilprocess简介 像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多. 简单的创建进程: import multiprocessing def worker(num): """thread worker function""" print 'Wor

  • Python标准库之多进程(multiprocessing包)介绍

    在初步了解Python多进程之后,我们可以继续探索multiprocessing包中更加高级的工具.这些工具可以让我们更加便利地实现多进程. 进程池 进程池 (Process Pool)可以创建多个进程.这些进程就像是随时待命的士兵,准备执行任务(程序).一个进程池中可以容纳多个待命的士兵. "三个进程的进程池" 比如下面的程序: 复制代码 代码如下: import multiprocessing as mul def f(x):     return x**2 pool = mul.

  • python基于multiprocessing的多进程创建方法

    本文实例讲述了python基于multiprocessing的多进程创建方法.分享给大家供大家参考.具体如下: import multiprocessing import time def clock(interval): while True: print ("the time is %s"% time.time()) time.sleep(interval) if __name__=="__main__": p = multiprocessing.Process

  • 简单学习Python多进程Multiprocessing

    1.1 什么是 Multiprocessing 多线程在同一时间只能处理一个任务. 可把任务平均分配给每个核,而每个核具有自己的运算空间. 1.2 添加进程 Process 与线程类似,如下所示,但是该程序直接运行无结果,因为IDLE不支持多进程,在命令行终端运行才有结果显示 import multiprocessing as mp def job(a,b): print('abc') if __name__=='__main__': p1=mp.Process(target=job,args=

  • python使用multiprocessing模块实现带回调函数的异步调用方法

    本文实例讲述了python使用multiprocessing模块实现带回调函数的异步调用方法.分享给大家供大家参考.具体分析如下: multipressing模块是python 2.6版本加入的,通过这个模块可以轻松实现异步调用 from multiprocessing import Pool def f(x): return x*x if __name__ == '__main__': pool = Pool(processes=1) # Start a worker processes. r

  • Python使用multiprocessing实现一个最简单的分布式作业调度系统

    mutilprocess像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多. 介绍 Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上.一个服务进程可以作为调度者,将任务分布到其他多个机器的多个进程中,依靠网络通信. 想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统. 实现 Job 首先创建一个Job类,为了测试简单,只包含一

  • Python利用multiprocessing实现最简单的分布式作业调度系统实例

    介绍 Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上.一个服务进程可以作为调度者,将任务分布到其他多个机器的多个进程中,依靠网络通信.想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统.在这之前,我们先来详细了解下python中的多进程管理包multiprocessing. multiprocessing.Process multiprocessing包是Python中的多进程管理包.它与 threading.

  • Python多进程multiprocessing.Pool类详解

    multiprocessing模块 multiprocessing包是Python中的多进程管理包.它与 threading.Thread类似,可以利用multiprocessing.Process对象来创建一个进程.该进程可以允许放在Python程序内部编写的函数中.该Process对象与Thread对象的用法相同,拥有is_alive().join([timeout]).run().start().terminate()等方法.属性有:authkey.daemon(要通过start()设置)

  • python创建一个最简单http webserver服务器的方法

    本文实例讲述了python创建一个最简单http webserver服务器的方法.分享给大家供大家参考.具体实现方法如下: import sys import BaseHTTPServer from SimpleHTTPServer import SimpleHTTPRequestHandler Handler = SimpleHTTPRequestHandler Server = BaseHTTPServer.HTTPServer Protocol = "HTTP/1.0" if s

  • Python探索之实现一个简单的HTTP服务器

    Python标准库中的BaseHTTPServer模块实现了一个基础的HTTP服务器基类和HTTP请求处理类.这在文章python探索之BaseHTTPServer-实现Web服务器介绍中进行了相关的介绍.然而,BaseHTTPServer模块中并没有定义相关的请求方法,诸如GET.HEAD.POST等.在BaseHTTPServer模块的基础上,Python标准库中的SimpleHTTPServer模块实现了简单的GET.HEAD请求. 在该模块中,它沿用了BaseHTTPServer模块中实

  • 一个非常简单好用的Python图形界面库(PysimpleGUI)

    前一阵,我在为朋友编写一个源代码监控程序的时候,发现了一个 Python 领域非常简单好用的图形界面库. 说起图形界面库,你可能会想到 TkInter.PyQt.PyGUI 等流行的图形界面库,我也曾经尝试使用,一个很直观的感受就是,这太难用了.就去网上搜搜,看看有没有一些 demo,拿来改改,结果很少有,当时我就放弃了这些图形库的学习,转而使用了 vue+flask 的形式以浏览器网页作为程序界面,因为我会这个,即使实现起来稍微麻烦,但是也快. 那有朋友可能问了:一定要学习图形界面吗? 其实不

  • 如何利用python的tkinter实现一个简单的计算器

    做一个计算器,这是我想要达成的效果: 在按下按钮或者按下键盘的时候,第一行输入框会显示输入的内容,第二行显示框则会预览运算结果,如果发生异常,输入内容格式错误,无法计算,则显示框显示"错误". 按"="按钮或按键回车计算结果,结果显示在第一行. 1.准备工作 导入库 tkinter import tkinter as tk 2. 开始 定义两个变量: equal_is=False #定义一些变量 textchange='' equal_is 用于判断是否已经计算出结

  • 利用Python制作一个简单的天气播报系统

    目录 前言 工具 天气数据来源 代码实现 总结 前言 大家好,我是辣条 相信大家都能感觉到最近天气的多变,好几次出门半路天气转变.辣条也深受其扰,直接给我整感冒,就差被隔离起来了,既然天气我没法做主,那不如用python整个天气爬虫来获取天气情况.这样也好可以进行一个提前预防 工具 python3.7 pycharm pyttsx3:语音播报库 天气数据来源 找寻一个天气网站 比如说我们要查询某地的天气,在输入地名后就能看到结果. 我们可以看到网站的url会有变化: 每个城市的天气信息url就是

  • 利用Python为女神制作一个专属网站

    目录 数据准备 网站搭建 服务部署 先来看一下效果吧,只要有足够的照片素材,捕获女神的心就指日可待 怎么样,看起来还可以吧 下面就一起来完成吧 数据准备 首先是测试图片的获取,毕竟萝卜哥当前还没有那么多女神的照片 这里我使用如下网站的高清图片,嗯,各个都是大美女 抓取的代码比较简单 import requests import json def get_pic():     headers = {"Accept": "application/json, text/javasc

随机推荐