Python自定义主从分布式架构实例分析

本文实例讲述了Python自定义主从分布式架构。分享给大家供大家参考,具体如下:

环境:Win7 x64,Python 2.7,APScheduler 2.1.2。

原理图如下:

代码部分:

(1)、中心节点:

#encoding=utf-8
#author: walker
#date: 2014-12-03
#function: 中心节点(主要功能是分配任务)
import SocketServer, socket, Queue
CenterIP = '127.0.0.1'  #中心节点IP
CenterListenPort = 9999  #中心节点监听端口
CenterClient = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #中心节点用于发送网络消息的socket
TaskQueue = Queue.Queue() #任务队列
#获取任务队列
def GetTaskQueue():
  for i in range(1, 11):
    TaskQueue.put(str(i))
#CenterServer的回调函数,在接受到udp报文是触发
class MyUDPHandler(SocketServer.BaseRequestHandler):
  def handle(self):
    data = self.request[0].strip()
    socket = self.request[1]
    print(data)
    if data.startswith('wait'):
      vec = data.split(':')
      if len(vec) != 3:
        print('Error: len(vec) != 3')
      else:
        nodeIP = vec[1]
        nodeListenPort = vec[2]
        nodeID = nodeIP + ':' + nodeListenPort
        if not TaskQueue.empty():
          task = TaskQueue.get()
          print('send task ' + task + ' to ' + nodeID)
          CenterClient.sendto('task:' + task, (nodeIP, int(nodeListenPort)))
        else:
          print('TaskQueue is empty!')
GetTaskQueue() #获取任务队列
CenterServer = SocketServer.UDPServer((CenterIP, CenterListenPort), MyUDPHandler)
print('Listen port ' + str(CenterListenPort) + ' ...')
CenterServer.serve_forever()

(2)、任务节点:

#encoding=utf-8
#author: walker
#date: 2014-12-03
#function: 任务节点(请求/接收/执行任务)
import time, socket, SocketServer
from apscheduler.scheduler import Scheduler
CenterIP = '127.0.0.1'  #中心节点IP
CenterListenPort = 9999  #中心节点监听端口
NodeIP = socket.gethostbyname(socket.gethostname())  #任务节点自身IP
NodeClient = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  #任务节点用于发送网络消息的socket
#任务:发送网络信息
def jobSendNetMsg():
  msg = ''
  if NodeServer.TaskState == 'wait':
    msg = 'wait:' + NodeIP + ':' + str(NodeListenPort)
  elif NodeServer.TaskState == 'exec':
    msg = 'exec:' + NodeIP + ':' + str(NodeListenPort)
  print(msg)
  NodeClient.sendto(msg, (CenterIP, CenterListenPort))
#添加并启动定时任务
def InitTimer():
  sched = Scheduler()
  sched.add_interval_job(jobSendNetMsg, seconds=1)
  sched.start()
#执行任务
def ExecTask(task):
  print('ExecTask ' + task + ' ...')
  time.sleep(2)
  print('ExecTask ' + task + ' over')
#NodeServer的回调函数,在接受到udp报文是触发
class MyUDPHandler(SocketServer.BaseRequestHandler):
  def handle(self):
    data = self.request[0].strip()
    socket = self.request[1]
    print('recv data: ' + data)
    if data.startswith('task'):
      vec = data.split(':')
      if len(vec) != 2:
        print('Error: len(vec) != 2')
      else:
        task = vec[1]
        self.server.TaskState = 'exec'
        ExecTask(task)
        self.server.TaskState = 'wait'
InitTimer()
NodeServer = SocketServer.UDPServer(('', 0), MyUDPHandler)
NodeServer.TaskState = 'wait' #(exec/wait)
NodeListenPort = NodeServer.server_address[1]
print('NodeListenPort:' + str(NodeListenPort))
NodeServer.serve_forever()

更多关于Python相关内容感兴趣的读者可查看本站专题:《Python URL操作技巧总结》、《Python图片操作技巧总结》、《Python数据结构与算法教程》、《Python Socket编程技巧总结》、《Python函数使用技巧总结》、《Python字符串操作技巧汇总》、《Python入门与进阶经典教程》及《Python文件与目录操作技巧汇总》

希望本文所述对大家Python程序设计有所帮助。

(0)

相关推荐

  • 用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试

    MapReduce与HDFS简介 什么是Hadoop? Google为自己的业务需要提出了编程模型MapReduce和分布式文件系统Google File System,并发布了相关论文(可在Google Research的网站上获得: GFS . MapReduce). Doug Cutting和Mike Cafarella在开发搜索引擎Nutch时对这两篇论文做了自己的实现,即同名的MapReduce和HDFS,合起来就是Hadoop. MapReduce的Data flow如下图,原始数据

  • 在Python程序中实现分布式进程的教程

    在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上. Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上.一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信.由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序. 举个例子:如果我们已经有一个通

  • python 判断自定义对象类型

    要判断自定义对象的类型,用__class__方法,或者用isinstance(object, class-or-type-or-tuple)-->bool 用__class__不能认出子类实例也是父类的实例,用isinstance可以认出子类实例也是父类的实例

  • Python自定义函数的创建、调用和函数的参数详解

    函数是组织好的,可重复使用的,用来实现单一,或相关联功能的代码段.函数能提高应用的模块性,和代码的重复利用率.你已经知道Python提供了许多内建函数,比如print().但你也可以自己创见函数,这被叫做用户自定义函数.一.定义一个函数你可以定义一个由自己想要功能的函数,以下是简单的规则: 1.函数代码块以def关键词开头,后接函数标识符名称和圆括号().2.任何传入参数和自变量必须放在圆括号中间.圆括号之间可以用于定义参数.3.函数的第一行语句可以选择性地使用文档字符串-用于存放函数说明.4.

  • 详解在Python程序中自定义异常的方法

    通过创建一个新的异常类,程序可以命名它们自己的异常.异常应该是典型的继承自Exception类,通过直接或间接的方式. 以下为与RuntimeError相关的实例,实例中创建了一个类,基类为RuntimeError,用于在异常触发时输出更多的信息. 在try语句块中,用户自定义的异常后执行except块语句,变量 e 是用于创建Networkerror类的实例. class Networkerror(RuntimeError): def __init__(self, arg): self.arg

  • Python中的自定义函数学习笔记

    定义一个什么都不做的函数 复制代码 代码如下: >>> def a(): ... pass ... >>> def printHello(): ... print("hello") ... >>> printHello() hello >>> callable(printHello) True 顾名思义,callable函数用于判断函数是否可以调用: 有书上说,callable在Python3.0中已经不再使用,而

  • Python过滤函数filter()使用自定义函数过滤序列实例

    filter函数: filter()函数可以对序列做过滤处理,就是说可以使用一个自定的函数过滤一个序列,把序列的每一项传到自定义的过滤函数里处理,并返回结果做过滤.最终一次性返回过滤后的结果. filter()函数有两个参数: 第一个,自定函数名,必须的 第二个,需要过滤的列,也是必须的 DEMO 需求,过滤大于5小于10的数: 复制代码 代码如下: # coding=utf8 # 定义大于5小于10的函数 def guolvhanshu(num):     if num>5 and num<

  • Python使用multiprocessing实现一个最简单的分布式作业调度系统

    mutilprocess像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多. 介绍 Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上.一个服务进程可以作为调度者,将任务分布到其他多个机器的多个进程中,依靠网络通信. 想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统. 实现 Job 首先创建一个Job类,为了测试简单,只包含一

  • Python实现微信公众平台自定义菜单实例

    首先先获取access_token,并保存与全局之中 def token(requset): url = 'https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=%s&secret=%s' % ( Config.AppID, Config.AppSecret) result = urllib2.urlopen(url).read() Config.access_token = json.load

  • python自定义类并使用的方法

    本文实例讲述了python自定义类并使用的方法.分享给大家供大家参考.具体如下: class Person: def __init__(self, first, middle, last, age): self.first = first; self.middle = middle; self.last = last; self.age = age; def __str__(self): return self.first + ' ' + self.middle + ' ' + self.las

  • Python读取环境变量的方法和自定义类分享

    使用os.environ来读取和修改环境变量: 复制代码 代码如下: import os print (os.environ["TEMP"]) mydir = "c:\\mydir" os.environ["MYDIR"] = mydir print (os.environ["MYDIR"]) pathV = os.environ["PATH"] print (pathV) os.environ["

随机推荐