python实现的文件同步服务器实例

本文实例讲述了python实现的文件同步服务器。分享给大家供大家参考。具体实现方法如下:

服务端使用asyncore, 收到文件后保存到本地。

客户端使用pyinotify监视目录的变化 ,把变动的文件发送到服务端。

重点:

1. 使用structs打包发送文件的信息,服务端收到后,根据文件信息来接收客户端传送过来的文件。

2. 客户端使用多线程,pyinotify监视到文件变化,放到队列中,由另外一个线程发送。

上代码:

服务端:

# receive file from client and store them into file use asyncore.#
#/usr/bin/python
#coding: utf-8
import asyncore
import socket
from socket import errno
import logging
import time
import sys
import struct
import os
import fcntl
import threading
from rrd_graph import MakeGraph
try:
  import rrdtool
except (ImportError, ImportWarnning):
  print "Hope this information can help you:"
  print "Can not find pyinotify module in sys path, just run [apt-get install python-rrdtool] in ubuntu."
  sys.exit(1)
class RequestHandler(asyncore.dispatcher):
  def __init__(self, sock, map=None, chunk_size=1024):
    self.logger = logging.getLogger('%s-%s' % (self.__class__.__name__, str(sock.getsockname())))
    self.chunk_size = chunk_size
    asyncore.dispatcher.__init__(self,sock,map)
    self.data_to_write = list()
  def readable(self):
    #self.logger.debug("readable() called.")
    return True
  def writable(self):
    response = (not self.connected) or len(self.data_to_write)
    #self.logger.debug('writable() -> %s data length -> %s' % (response, len(self.data_to_write)))
    return response
  def handle_write(self):
    data = self.data_to_write.pop()
    #self.logger.debug("handle_write()->%s size: %s",data.rstrip('\r\n'),len(data))
    sent = self.send(data[:self.chunk_size])
    if sent < len(data):
      remaining = data[sent:]
      self.data_to_write.append(remaining)
  def handle_read(self):
    self.writen_size = 0
    nagios_perfdata = '../perfdata'
    head_packet_format = "!LL128s128sL"
    head_packet_size = struct.calcsize(head_packet_format)
    data = self.recv(head_packet_size)
    if not data:
      return
    filepath_len, filename_len, filepath,filename, filesize = struct.unpack(head_packet_format,data)
    filepath = os.path.join(nagios_perfdata, filepath[:filepath_len])
    filename = filename[:filename_len]
    self.logger.debug("update file: %s" % filepath + '/' + filename)
    try:
      if not os.path.exists(filepath):
        os.makedirs(filepath)
    except OSError:
      pass
    self.fd = open(os.path.join(filepath,filename), 'w')
    #self.fd = open(filename,'w')
    if filesize > self.chunk_size:
      times = filesize / self.chunk_size
      first_part_size = times * self.chunk_size
      second_part_size = filesize % self.chunk_size
      while 1:
        try:
          data = self.recv(self.chunk_size)
          #self.logger.debug("handle_read()->%s size.",len(data))
        except socket.error,e:
          if e.args[0] == errno.EWOULDBLOCK:
            print "EWOULDBLOCK"
            time.sleep(1)
          else:
            #self.logger.debug("Error happend while receive data: %s" % e)
            break
        else:
          self.fd.write(data)
          self.fd.flush()
          self.writen_size += len(data)
          if self.writen_size == first_part_size:
            break
      #receive the packet at last
      while 1:
        try:
          data = self.recv(second_part_size)
          #self.logger.debug("handle_read()->%s size.",len(data))
        except socket.error,e:
          if e.args[0] == errno.EWOULDBLOCK:
            print "EWOULDBLOCK"
            time.sleep(1)
          else:
            #self.logger.debug("Error happend while receive data: %s" % e)
            break
        else:
          self.fd.write(data)
          self.fd.flush()
          self.writen_size += len(data)
          if len(data) == second_part_size:
            break
    elif filesize <= self.chunk_size:
      while 1:
        try:
          data = self.recv(filesize)
          #self.logger.debug("handle_read()->%s size.",len(data))
        except socket.error,e:
          if e.args[0] == errno.EWOULDBLOCK:
            print "EWOULDBLOCK"
            time.sleep(1)
          else:
            #self.logger.debug("Error happend while receive data: %s" % e)
            break
        else:
          self.fd.write(data)
          self.fd.flush()
          self.writen_size += len(data)
          if len(data) == filesize:
            break
    self.logger.debug("File size: %s" % self.writen_size)
class SyncServer(asyncore.dispatcher):
  def __init__(self,host,port):
    asyncore.dispatcher.__init__(self)
    self.debug = True
    self.logger = logging.getLogger(self.__class__.__name__)
    self.create_socket(socket.AF_INET,socket.SOCK_STREAM)
    self.set_reuse_addr()
    self.bind((host,port))
    self.listen(2000)
  def handle_accept(self):
    client_socket = self.accept()
    if client_socket is None:
      pass
    else:
      sock, addr = client_socket
      #self.logger.debug("Incoming connection from %s" % repr(addr))
      handler = RequestHandler(sock=sock)
class RunServer(threading.Thread):
  def __init__(self):
    super(RunServer,self).__init__()
    self.daemon = False
  def run(self):
    server = SyncServer('',9999)
    asyncore.loop(use_poll=True)
def StartServer():
  logging.basicConfig(level=logging.DEBUG,
            format='%(name)s: %(message)s',
            )
  RunServer().start()
  #MakeGraph().start()
if __name__ == '__main__':
  StartServer()

客户端:

# monitor path with inotify(python module), and send them to remote server.#
# use sendfile(2) instead of send function in socket, if we have python-sendfile installed.#
import socket
import time
import os
import sys
import struct
import threading
import Queue
try:
   import pyinotify
except (ImportError, ImportWarnning):
   print "Hope this information can help you:"
   print "Can not find pyinotify module in sys path, just run [apt-get install python-pyinotify] in ubuntu."
   sys.exit(1)
try:
   from sendfile import sendfile
except (ImportError,ImportWarnning):
   pass
filetype_filter = [".rrd",".xml"]
def check_filetype(pathname):
   for suffix_name in filetype_filter:
     if pathname[-4:] == suffix_name:
       return True
   try:
     end_string = pathname.rsplit('.')[-1:][0]
     end_int = int(end_string)
   except:
     pass
   else:
     # means pathname endwith digit
     return False
class sync_file(threading.Thread):
   def __init__(self, addr, events_queue):
     super(sync_file,self).__init__()
     self.daemon = False
     self.queue = events_queue
     self.addr = addr
     self.chunk_size = 1024
   def run(self):
     while 1:
       event = self.queue.get()
       if check_filetype(event.pathname):
         print time.asctime(),event.maskname, event.pathname
         filepath = event.path.split('/')[-1:][0]
         filename = event.name
         filesize = os.stat(os.path.join(event.path, filename)).st_size
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         filepath_len = len(filepath)
         filename_len = len(filename)
         sock.connect(self.addr)
         offset = 0
         data = struct.pack("!LL128s128sL",filepath_len, filename_len, filepath,filename,filesize)
         fd = open(event.pathname,'rb')
         sock.sendall(data)
         if "sendfile" in sys.modules:
           # print "use sendfile(2)"
           while 1:
             sent = sendfile(sock.fileno(), fd.fileno(), offset, self.chunk_size)
             if sent == 0:
               break
             offset += sent
         else:
           # print "use original send function"
           while 1:
             data = fd.read(self.chunk_size)
             if not data: break
             sock.send(data)
         sock.close()
         fd.close()
class EventHandler(pyinotify.ProcessEvent):
   def __init__(self, events_queue):
     super(EventHandler,self).__init__()
     self.events_queue = events_queue
   def my_init(self):
     pass
   def process_IN_CLOSE_WRITE(self,event):
     self.events_queue.put(event)
   def process_IN_MOVED_TO(self,event):
     self.events_queue.put(event)
def start_notify(path, mask, sync_server):
   events_queue = Queue.Queue()
   sync_thread_pool = list()
   for i in range(500):
     sync_thread_pool.append(sync_file(sync_server, events_queue))
   for i in sync_thread_pool:
     i.start()
   wm = pyinotify.WatchManager()
   notifier = pyinotify.Notifier(wm,EventHandler(events_queue))
   wdd = wm.add_watch(path,mask,rec=True)
   notifier.loop()
def do_notify():
   perfdata_path = '/var/lib/pnp4nagios/perfdata'
   mask = pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO
   sync_server = ('127.0.0.1',9999)
   start_notify(perfdata_path,mask,sync_server)
if __name__ == '__main__':
   do_notify()

python监视线程池

#!/usr/bin/python
import threading
import time
class Monitor(threading.Thread):
  def __init__(self, *args,**kwargs):
    super(Monitor,self).__init__()
    self.daemon = False
    self.args = args
    self.kwargs = kwargs
    self.pool_list = []
  def run(self):
    print self.args
    print self.kwargs
    for name,value in self.kwargs.items():
      obj = value[0]
      temp = {}
      temp[name] = obj
      self.pool_list.append(temp)
    while 1:
      print self.pool_list
      for name,value in self.kwargs.items():
        obj = value[0]
        parameters = value[1:]
        died_threads = self.cal_died_thread(self.pool_list,name)
        print "died_threads", died_threads
        if died_threads >0:
          for i in range(died_threads):
            print "start %s thread..." % name
            t = obj[0].__class__(*parameters)
            t.start()
            self.add_to_pool_list(t,name)
        else:
          break
      time.sleep(0.5)
  def cal_died_thread(self,pool_list,name):
    i = 0
    for item in self.pool_list:
      for k,v in item.items():
        if name == k:
          lists = v
    for t in lists:
      if not t.isAlive():
        self.remove_from_pool_list(t)
        i +=1
    return i
  def add_to_pool_list(self,obj,name):
    for item in self.pool_list:
      for k,v in item.items():
        if name == k:
          v.append(obj)
  def remove_from_pool_list(self, obj):
    for item in self.pool_list:
      for k,v in item.items():
        try:
          v.remove(obj)
        except:
          pass
        else:
          return

使用方法:

rrds_queue = Queue.Queue()
  make_rrds_pool = []
  for i in range(5):
    make_rrds_pool.append(MakeRrds(rrds_queue))
  for i in make_rrds_pool:
    i.start()
  make_graph_pool = []
  for i in range(5):
    make_graph_pool.append(MakeGraph(rrds_queue))
  for i in make_graph_pool:
    i.start()
  monitor = Monitor(make_rrds_pool=(make_rrds_pool, rrds_queue), \
           make_graph_pool=(make_graph_pool, rrds_queue))
  monitor.start()

解析:

1. 接受字典参数,value为一个元组,第一个元素是线程池,后面的都是参数。
2. 每0.5秒监视线程池中的线程数量,如果线程死掉了,记录死掉线程的数目,再启动同样数量的线程。
3. 如果没有线程死去,则什么也不做。

从外部调用Django模块

import os
import sys
sys.path.insert(0,'/data/cloud_manage')
from django.core.management import setup_environ
import settings
setup_environ(settings)
from common.monitor import Monitor
from django.db import connection, transaction

前提就是,要新建一个django的project,这里我们新建了一个cloud_manage.
这样不仅可以调用django自身的模块,还能调用project本身的东西。

希望本文所述对大家的Python程序设计有所帮助。

(0)

相关推荐

  • python连接远程ftp服务器并列出目录下文件的方法

    本文实例讲述了python连接远程ftp服务器并列出目录下文件的方法.分享给大家供大家参考.具体如下: 这段python代码用到了pysftp模块,使用sftp协议,对数据进行加密传输 import pysftp srv = pysftp.Connection(host="your_FTP_server", username="your_username",password="your_password") # Get the directory

  • Python压缩解压缩zip文件及破解zip文件密码的方法

    python 的 zipfile 提供了非常便捷的方法来压缩和解压 zip 文件. 例如,在py脚本所在目录中,有如下文件: readability/readability.js readability/readability.txt readability/readability-print.css readability/sprite-readability.png readability/readability.css 将 readability 目录中的文件压缩到脚本所在目录的 read

  • Python实现向服务器请求压缩数据及解压缩数据的方法示例

    本文实例讲述了Python实现向服务器请求压缩数据及解压缩数据的方法.分享给大家供大家参考,具体如下: 向服务器请求压缩数据格式,并解压缩数据 #!/usr/bin/env python # encoding=utf-8 import urllib2, httplib def writeFile(fname, data): f = open(fname, "w") f.write(data) f.close() if __name__ == '__main__': httplib.HT

  • Python实现的简单文件传输服务器和客户端

    还是那个题目(题目和流程见java版本),感觉光用java写一点新意也没有,恰巧刚学习了python,何不拿来一用,呵呵: 服务器端: import SocketServer, time class MyServer(SocketServer.BaseRequestHandler): userInfo = { 'yangsq' : 'yangsq', 'hudeyong' : 'hudeyong', 'mudan' : 'mudan' } def handle(self): print 'Con

  • python实现获取客户机上指定文件并传输到服务器的方法

    本文实例讲述了python实现获取客户机上指定文件并传输到服务器的方法.分享给大家供大家参考.具体分析如下: 该程序实现了,把目标机器的某个目录(可控)的所有的某种类型文件(可控)全部获取并传到己方的机器上. 1.用了base64的encode(infile,outfile)加密,以及decode(infile,outfile)解密,这是2进制加密解密 2.用zip压缩 3.socket中server.py放到自己这方python server.py,然后client.py放到目标机器,然后py

  • Python压缩和解压缩zip文件

    zip文件是我们经常使用的打包格式之一,python解压和压缩zip效率非凡. python解压zip文档: 复制代码 代码如下: #/usr/bin/python #coding=utf-8 import os,sys,time import zipfile filename = 'callofdutyblackopszombies_1349649132343_my.zip'  #要解压的文件 filedir = 'data/'  #解压后放入的目录 r = zipfile.is_zipfil

  • python服务器端收发请求的实现代码

    最近学习了python的一些服务器端编程,记录在此. 发送get/post请求 # coding:utf-8 import httplib,urllib #加载模块 #urllib可以打开网站去拿 #res = urllib.urlopen('http://baidu.com'); #print res.headers #定义需要进行发送的数据 params = urllib.urlencode({'param':'6'}); #定义一些文件头 headers = {"Content-Type&

  • python 从远程服务器下载日志文件的程序

    复制代码 代码如下: import osimport sysimport ftplibimport socket ################################################################### sign in the ftp server and download the log file. # 登陆生产服务器下载日志##############################################################

  • 使用nodejs、Python写的一个简易HTTP静态文件服务器

    日常开发过程中,我们经常需要修改一些放在 CDN 上的静态文件(如 JavaScript.CSS.HTML 文件等),这个过程中,我们希望能有一种方式将线上 CDN 的目录映射为本地硬盘上的某个目录,这样,当我们在本地修改了某个文件时,不需要发布,刷新后马上能看到效果. 比如,我们的 CDN 域名是:http://a.mycdn.com,本地对应的目录是:D:\workassets,我们希望所有对 http://a.mycdn.com/* 的访问被映射到本地的 D:\workassets\* 下

  • python实现从ftp服务器下载文件的方法

    本文实例讲述了python实现从ftp服务器下载文件的方法.分享给大家供大家参考.具体实现方法如下: import ftplib ftp = ftblib.FTP("ftp.yourServer.com") ftp.login("username","password") filename = "index.html" ftp.storlines("STOR "+filename,open(filename

随机推荐