400多行Python代码实现了一个FTP服务器

Python版本
实现了比之前的xxftp更多更完善的功能
1、继续支持多用户
2、继续支持虚拟目录
3、增加支持用户根目录以及映射虚拟目录的权限设置
4、增加支持限制用户根目录或者虚拟目录的空间大小

xxftp的特点
1、开源、跨平台
2、简单、易用
3、不需要数据库
4、可扩展性超强
5、你可以免费使用xxftp假设自己的私人FTP服务器

测试地址
ftp://xiaoxia.org
匿名帐号可以使用!
匿名根目录只读,映射了一个虚拟目录,可以上传文件但不允许更改!

使用方法
跟之前用C语言写的xxftp使用方法一样:

1. Create a root directory to hold the user directories.
Configure it in config.xml.
2. Create user directories under the root directory.
If you want to specify a password, create a directory named ".xxftp",
under which create a text file named "password" containing the MD5
code of the password.
3. If you want to specify the welcome and goodbye message, write it in
xxftp.welcome and xxftp.goodbye under the root directory.
4. Configure config.xml.

The structure of your FTP server root may be like:

-/root
-xxftp.welcome
-xxftp.goodbye

-user1
-.xxftp
-password
-...
-user2
-.xxftp
-password
-...
-anonymous源代码

代码如下:

import socket, threading, os, sys, time
import hashlib, platform, stat

listen_ip = "localhost"
listen_port = 21
conn_list = []
root_dir = "./home"
max_connections = 500
conn_timeout = 120

class FtpConnection(threading.Thread):
def __init__(self, fd):
threading.Thread.__init__(self)
self.fd = fd
self.running = True
self.setDaemon(True)
self.alive_time = time.time()
self.option_utf8 = False
self.identified = False
self.option_pasv = True
self.username = ""
def process(self, cmd, arg):
cmd = cmd.upper();
if self.option_utf8:
arg = unicode(arg, "utf8").encode(sys.getfilesystemencoding())
print "<<", cmd, arg, self.fd
# Ftp Command
if cmd == "BYE" or cmd == "QUIT":
if os.path.exists(root_dir + "/xxftp.goodbye"):
self.message(221, open(root_dir + "/xxftp.goodbye").read())
else:
self.message(221, "Bye!")
self.running = False
return
elif cmd == "USER":
# Set Anonymous User
if arg == "": arg = "anonymous"
for c in arg:
if not c.isalpha() and not c.isdigit() and c!="_":
self.message(530, "Incorrect username.")
return
self.username = arg
self.home_dir = root_dir + "/" + self.username
self.curr_dir = "/"
self.curr_dir, self.full_path, permission, self.vdir_list, \
limit_size, is_virtual = self.parse_path("/")
if not os.path.isdir(self.home_dir):
self.message(530, "User " + self.username + " not exists.")
return
self.pass_path = self.home_dir + "/.xxftp/password"
if os.path.isfile(self.pass_path):
self.message(331, "Password required for " + self.username)
else:
self.message(230, "Identified!")
self.identified = True
return
elif cmd == "PASS":
if open(self.pass_path).read() == hashlib.md5(arg).hexdigest():
self.message(230, "Identified!")
self.identified = True
else:
self.message(530, "Not identified!")
self.identified = False
return
elif not self.identified:
self.message(530, "Please login with USER and PASS.")
return

self.alive_time = time.time()
finish = True
if cmd == "NOOP":
self.message(200, "ok")
elif cmd == "TYPE":
self.message(200, "ok")
elif cmd == "SYST":
self.message(200, "UNIX")
elif cmd == "EPSV" or cmd == "PASV":
self.option_pasv = True
try:
self.data_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.data_fd.bind((listen_ip, 0))
self.data_fd.listen(1)
ip, port = self.data_fd.getsockname()
if cmd == "EPSV":
self.message(229, "Entering Extended Passive Mode (|||" + str(port) + "|)")
else:
ipnum = socket.inet_aton(ip)
self.message(227, "Entering Passive Mode (%s,%u,%u)." %
(",".join(ip.split(".")), (port>>8&0xff), (port&0xff)))
except:
self.message(500, "failed to create data socket.")
elif cmd == "EPRT":
self.message(500, "implement EPRT later...")
elif cmd == "PORT":
self.option_pasv = False
self.data_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s = arg.split(",")
self.data_ip = ".".join(s[:4])
self.data_port = int(s[4])*256 + int(s[5])
self.message(200, "ok")
elif cmd == "PWD" or cmd == "XPWD":
if self.curr_dir == "": self.curr_dir = "/"
self.message(257, '"' + self.curr_dir + '"')
elif cmd == "LIST" or cmd == "NLST":
if arg != "" and arg[0] == "-": arg = "" # omit parameters
remote, local, perm, vdir_list, limit_size, is_virtual = self.parse_path(arg)
if not os.path.exists(local):
self.message(550, "failed.")
return
if not self.establish(): return
self.message(150, "ok")
for v in vdir_list:
f = v[0]
if self.option_utf8:
f = unicode(f, sys.getfilesystemencoding()).encode("utf8")
if cmd == "NLST":
info = f + "\r\n"
else:
info = "d%s%s------- %04u %8s %8s %8lu %s %s\r\n" % (
"r" if "read" in perm else "-",
"w" if "write" in perm else "-",
1, "0", "0", 0,
time.strftime("%b %d %Y", time.localtime(time.time())),
f)
self.data_fd.send(info)
for f in os.listdir(local):
if f[0] == ".": continue
path = local + "/" + f
if self.option_utf8:
f = unicode(f, sys.getfilesystemencoding()).encode("utf8")
if cmd == "NLST":
info = f + "\r\n"
else:
st = os.stat(path)
info = "%s%s%s------- %04u %8s %8s %8lu %s %s\r\n" % (
"-" if os.path.isfile(path) else "d",
"r" if "read" in perm else "-",
"w" if "write" in perm else "-",
1, "0", "0", st[stat.ST_SIZE],
time.strftime("%b %d %Y", time.localtime(st[stat.ST_MTIME])),
f)
self.data_fd.send(info)
self.message(226, "Limit size: " + str(limit_size))
self.data_fd.close()
self.data_fd = 0
elif cmd == "REST":
self.file_pos = int(arg)
self.message(250, "ok")
elif cmd == "FEAT":
features = "211-Features:\r\nSITES\r\nEPRT\r\nEPSV\r\nMDTM\r\nPASV\r\n"\
"REST STREAM\r\nSIZE\r\nUTF8\r\n211 End\r\n"
self.fd.send(features)
elif cmd == "OPTS":
arg = arg.upper()
if arg == "UTF8 ON":
self.option_utf8 = True
self.message(200, "ok")
elif arg == "UTF8 OFF":
self.option_utf8 = False
self.message(200, "ok")
else:
self.message(500, "unrecognized option")
elif cmd == "CDUP":
finish = False
arg = ".."
else:
finish = False
if finish: return
# Parse argument ( It's a path )
if arg == "":
self.message(500, "where's my argument?")
return
remote, local, permission, vdir_list, limit_size, is_virtual = \
self.parse_path(arg)
# can not do anything to virtual directory
if is_virtual: permission = "none"
can_read, can_write, can_modify = "read" in permission, "write" in permission, "modify" in permission
newpath = local
try:
if cmd == "CWD":
if(os.path.isdir(newpath)):
self.curr_dir = remote
self.full_path = newpath
self.message(250, '"' + remote + '"')
else:
self.message(550, "failed")
elif cmd == "MDTM":
if os.path.exists(newpath):
self.message(213, time.strftime("%Y%m%d%I%M%S", time.localtime(
os.path.getmtime(newpath))))
else:
self.message(550, "failed")
elif cmd == "SIZE":
self.message(231, os.path.getsize(newpath))
elif cmd == "XMKD" or cmd == "MKD":
if not can_modify:
self.message(550, "permission denied.")
return
os.mkdir(newpath)
self.message(250, "ok")
elif cmd == "RNFR":
if not can_modify:
self.message(550, "permission denied.")
return
self.temp_path = newpath
self.message(350, "rename from " + remote)
elif cmd == "RNTO":
os.rename(self.temp_path, newpath)
self.message(250, "RNTO to " + remote)
elif cmd == "XRMD" or cmd == "RMD":
if not can_modify:
self.message(550, "permission denied.")
return
os.rmdir(newpath)
self.message(250, "ok")
elif cmd == "DELE":
if not can_modify:
self.message(550, "permission denied.")
return
os.remove(newpath)
self.message(250, "ok")
elif cmd == "RETR":
if not os.path.isfile(newpath):
self.message(550, "failed")
return
if not can_read:
self.message(550, "permission denied.")
return
if not self.establish(): return
self.message(150, "ok")
f = open(newpath, "rb")
while self.running:
self.alive_time = time.time()
data = f.read(8192)
if len(data) == 0: break
self.data_fd.send(data)
f.close()
self.data_fd.close()
self.data_fd = 0
self.message(226, "ok")
elif cmd == "STOR" or cmd == "APPE":
if not can_write:
self.message(550, "permission denied.")
return
if os.path.exists(newpath) and not can_modify:
self.message(550, "permission denied.")
return
# Check space size remained!
used_size = 0
if limit_size > 0:
used_size = self.get_dir_size(os.path.dirname(newpath))
if not self.establish(): return
self.message(150, "ok")
f = open(newpath, ("ab" if cmd == "APPE" else "wb") )
while self.running:
self.alive_time = time.time()
data = self.data_fd.recv(8192)
if len(data) == 0: break
if limit_size > 0:
used_size = used_size + len(data)
if used_size > limit_size: break
f.write(data)
f.close()
self.data_fd.close()
self.data_fd = 0
if limit_size > 0 and used_size > limit_size:
self.message(550, "Exceeding user space limit: " + str(limit_size) + " bytes")
else:
self.message(226, "ok")
else:
self.message(500, cmd + " not implemented")
except:
self.message(550, "failed.")

def establish(self):
if self.data_fd == 0:
self.message(500, "no data connection")
return False
if self.option_pasv:
fd = self.data_fd.accept()[0]
self.data_fd.close()
self.data_fd = fd
else:
try:
self.data_fd.connect((self.data_ip, self.data_port))
except:
self.message(500, "failed to establish data connection")
return False
return True

def read_virtual(self, path):
vdir_list = []
path = path + "/.xxftp/virtual"
if os.path.isfile(path):
for v in open(path, "r").readlines():
items = v.split()
items[1] = items[1].replace("$root", root_dir)
vdir_list.append(items)
return vdir_list

def get_dir_size(self, folder):
size = 0
for path, dirs, files in os.walk(folder):
for f in files:
size += os.path.getsize(os.path.join(path, f))
return size

def read_size(self, path):
size = 0
path = path + "/.xxftp/size"
if os.path.isfile(path):
size = int(open(path, "r").readline())
return size

def read_permission(self, path):
permission = "read,write,modify"
path = path + "/.xxftp/permission"
if os.path.isfile(path):
permission = open(path, "r").readline()
return permission

def parse_path(self, path):
if path == "": path = "."
if path[0] != "/":
path = self.curr_dir + "/" + path
s = os.path.normpath(path).replace("\\", "/").split("/")
local = self.home_dir
# reset directory permission
vdir_list = self.read_virtual(local)
limit_size = self.read_size(local)
permission = self.read_permission(local)
remote = ""
is_virtual = False
for name in s:
name = name.lstrip(".")
if name == "": continue
remote = remote + "/" + name
is_virtual = False
for v in vdir_list:
if v[0] == name:
permission = v[2]
local = v[1]
limit_size = self.read_size(local)
is_virtual = True
if not is_virtual: local = local + "/" + name
vdir_list = self.read_virtual(local)
return (remote, local, permission, vdir_list, limit_size, is_virtual)

def run(self):
''' Connection Process '''
try:
if len(conn_list) > max_connections:
self.message(500, "too many connections!")
self.fd.close()
self.running = False
return
# Welcome Message
if os.path.exists(root_dir + "/xxftp.welcome"):
self.message(220, open(root_dir + "/xxftp.welcome").read())
else:
self.message(220, "xxftp(Python) www.xiaoxia.org")
# Command Loop
line = ""
while self.running:
data = self.fd.recv(4096)
if len(data) == 0: break
line += data
if line[-2:] != "\r\n": continue
line = line[:-2]
space = line.find(" ")
if space == -1:
self.process(line, "")
else:
self.process(line[:space], line[space+1:])
line = ""
except:
print "error", sys.exc_info()
self.running = False
self.fd.close()
print "connection end", self.fd, "user", self.username

def message(self, code, s):
''' Send Ftp Message '''
s = str(s).replace("\r", "")
ss = s.split("\n")
if len(ss) > 1:
r = (str(code) + "-") + ("\r\n" + str(code) + "-").join(ss[:-1])
r += "\r\n" + str(code) + " " + ss[-1] + "\r\n"
else:
r = str(code) + " " + ss[0] + "\r\n"
if self.option_utf8:
r = unicode(r, sys.getfilesystemencoding()).encode("utf8")
self.fd.send(r)

def server_listen():
global conn_list
listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listen_fd.bind((listen_ip, listen_port))
listen_fd.listen(1024)
conn_lock = threading.Lock()
print "ftpd is listening on ", listen_ip + ":" + str(listen_port)

while True:
conn_fd, remote_addr = listen_fd.accept()
print "connection from ", remote_addr, "conn_list", len(conn_list)
conn = FtpConnection(conn_fd)
conn.start()

conn_lock.acquire()
conn_list.append(conn)
# check timeout
try:
curr_time = time.time()
for conn in conn_list:
if int(curr_time - conn.alive_time) > conn_timeout:
if conn.running == True:
conn.fd.shutdown(socket.SHUT_RDWR)
conn.running = False
conn_list = [conn for conn in conn_list if conn.running]
except:
print sys.exc_info()
conn_lock.release()

def main():
server_listen()

if __name__ == "__main__":
main()

(0)

相关推荐

  • python实现从ftp服务器下载文件的方法

    本文实例讲述了python实现从ftp服务器下载文件的方法.分享给大家供大家参考.具体实现方法如下: import ftplib ftp = ftblib.FTP("ftp.yourServer.com") ftp.login("username","password") filename = "index.html" ftp.storlines("STOR "+filename,open(filename

  • python连接远程ftp服务器并列出目录下文件的方法

    本文实例讲述了python连接远程ftp服务器并列出目录下文件的方法.分享给大家供大家参考.具体如下: 这段python代码用到了pysftp模块,使用sftp协议,对数据进行加密传输 import pysftp srv = pysftp.Connection(host="your_FTP_server", username="your_username",password="your_password") # Get the directory

  • python实现FTP服务器服务的方法

    FTP服务的主动模式和被动模式 在开始之前,先聊一下FTP的主动模式和被动模式,两者的区别 , 用两张图来表示可能会更加清晰一些: 主动模式: 主动模式工作过程: 1. 客户端以随机非特权端口N,就是大于1024的端口,对server端21端口发起连接 2. 客户端开始监听 N+1端口: 3. 服务端会主动以20端口连接到客户端的N+1端口. 主动模式的优点: 服务端配置简单,利于服务器安全管理,服务器只需要开放21端口 主动模式的缺点: 如果客户端开启了防火墙,或客户端处于内网(NAT网关之后

  • python定时采集摄像头图像上传ftp服务器功能实现

    首先是截图,从摄像头截取一幅图像: 复制代码 代码如下: while 1:   #测试摄像头的存在    try:        cam = Device()    except:        print "no webcam found!"        continue    break 然后是把图像上传到ftp服务器: 复制代码 代码如下: remote = ftplib.FTP('127.0.0.1') #登陆服务器remote.login()file = open('%s.

  • python通过ftplib登录到ftp服务器的方法

    本文实例讲述了python通过ftplib登录到ftp服务器的方法.分享给大家供大家参考.具体实现方法如下: import ftplib connect = ftplib.FTP("www.mysite.com") connect.login("domain\user", "password") data = [] connect.dir(data.append) connect.quit() for line in data: print(li

  • Python搭建HTTP服务器和FTP服务器

    本教程为大家分享了Python搭建HTTP服务器和FTP服务器的具体步骤,供大家参考,具体内容如下 默认安装版本为pytho2.7 http服务器搭建教程: 进入要开放访问的目录下,执行命令:python -m SimpleHTTPServer 9000 显示上述表示安装成功,且http服务的端口为:9000,不跟端口默认80端口 通过浏览器访问该http服务: http://localhost:9000 页面显示入下: 服务器端显示如下: 表示接收请求成功. 下面是ftp服务器搭建教程: py

  • 400多行Python代码实现了一个FTP服务器

    Python版本 实现了比之前的xxftp更多更完善的功能 1.继续支持多用户 2.继续支持虚拟目录 3.增加支持用户根目录以及映射虚拟目录的权限设置 4.增加支持限制用户根目录或者虚拟目录的空间大小 xxftp的特点 1.开源.跨平台 2.简单.易用 3.不需要数据库 4.可扩展性超强 5.你可以免费使用xxftp假设自己的私人FTP服务器 测试地址 ftp://xiaoxia.org 匿名帐号可以使用! 匿名根目录只读,映射了一个虚拟目录,可以上传文件但不允许更改! 使用方法 跟之前用C语言

  • 使用50行Python代码从零开始实现一个AI平衡小游戏

    集智导读: 本文会为大家展示机器学习专家 Mike Shi 如何用 50 行 Python 代码创建一个 AI,使用增强学习技术,玩耍一个保持杆子平衡的小游戏.所用环境为标准的 OpenAI Gym,只使用 Numpy 来创建 agent. 各位看官好,我(作者 Mike Shi--译者注)将在本文教大家如何用 50 行 Python 代码,教会 AI 玩一个简单的平衡游戏.我们会用到标准的 OpenAI Gym 作为测试环境,仅用 Numpy 创建我们的 AI,别的不用. 这个小游戏就是经典的

  • Python代码使用 Pyftpdlib实现FTP服务器功能

    当你想快速共享一个目录的时候,这是特别有用的,只需要1行代码即可实现. FTP 服务器,在此之前我都是使用Linux的vsftpd软件包来搭建FTP服务器的,现在发现了利用pyftpdlib可以更加简单的方法即可实现FTP服务器的功能. 环境要求 Python 2.7 Windows / Linux 环境搭建 pip install pyftpdlib 一行代码实现FTP服务器 通过Python的-m选项作为一个简单的独立服务器来运行,当你想快速共享一个目录的时候,这是特别有用的. 在需要共享的

  • 1 行 Python 代码快速实现 FTP 服务器

    摘要: 当你想快速共享一个目录的时候,这是特别有用的,只需要1行代码即可实现. 当你想快速共享一个目录的时候,这是特别有用的,只需要1行代码即可实现. FTP 服务器,在此之前我都是使用Linux的vsftpd软件包来搭建FTP服务器的,现在发现了利用pyftpdlib可以更加简单的方法即可实现FTP服务器的功能. 环境要求 Python 2.7 Windows / Linux 环境搭建 一行代码实现FTP服务器 通过Python的-m选项作为一个简单的独立服务器来运行,当你想快速共享一个目录的

  • 21行Python代码实现拼写检查器

    引入 大家在使用谷歌或者百度搜索时,输入搜索内容时,谷歌总是能提供非常好的拼写检查,比如你输入 speling,谷歌会马上返回 spelling. 下面是用21行python代码实现的一个简易但是具备完整功能的拼写检查器. 代码 import re, collections def words(text): return re.findall('[a-z]+', text.lower()) def train(features): model = collections.defaultdict(

  • 小 200 行 Python 代码制作一个换脸程序

    简介 在这篇文章中我将介绍如何写一个简短(200行)的 Python 脚本,来自动地将一幅图片的脸替换为另一幅图片的脸. 这个过程分四步: 检测脸部标记. 旋转.缩放.平移和第二张图片,以配合第一步. 调整第二张图片的色彩平衡,以适配第一张图片. 把第二张图像的特性混合在第一张图像中. 1.使用 dlib 提取面部标记 该脚本使用 dlib 的 Python 绑定来提取面部标记: Dlib 实现了 Vahid Kazemi 和 Josephine Sullivan 的<使用回归树一毫秒脸部对准>

  • 50行Python代码实现人脸检测功能

    现在的人脸识别技术已经得到了非常广泛的应用,支付领域.身份验证.美颜相机里都有它的应用.用iPhone的同学们应该对下面的功能比较熟悉 iPhone的照片中有一个"人物"的功能,能够将照片里的人脸识别出来并分类,背后的原理也是人脸识别技术. 这篇文章主要介绍怎样用Python实现人脸检测.人脸检测是人脸识别的基础.人脸检测的目的是识别出照片里的人脸并定位面部特征点,人脸识别是在人脸检测的基础上进一步告诉你这个人是谁. 好了,介绍就到这里.接下来,开始准备我们的环境. 准备工作 本文的人

  • 15行Python代码带你轻松理解令牌桶算法

    在网络中传输数据时,为了防止网络拥塞,需限制流出网络的流量,使流量以比较均匀的速度向外发送,令牌桶算法就实现了这个功能, 可控制发送到网络上数据的数目,并允许突发数据的发送. 什么是令牌 从名字上看令牌桶,大概就是一个装有令牌的桶吧,那么什么是令牌呢? 紫薇格格拿的令箭,可以发号施令,令行禁止.在计算机的世界中,令牌也有令行禁止的意思,有令牌,则相当于得到了进行操作的授权,没有令牌,就什么都不能做. 用令牌实现限速器 我们用1块令牌来代表发送1字节数据的资格,假设我们源源不断的发放令牌给程序,程

  • 10 行 Python 代码教你自动发送短信(不想回复工作邮件妙招)

    最近工作上有个需求,当爬虫程序遇到异常的时候,需要通知相应的人员进行修复.如果是国外可能是通过邮件的方式来通知,但国内除了万年不变的 qq 邮箱,大部分人都不会去再申请其他的账号,qq 邮箱也是闲的蛋疼的时候才会瞄一眼.你还记得上次看邮箱的内容是什么时候吗? 所以在国内最好的通知方式是通过手机短信,今天就教大家利用 python 10 行代码实现短信发送. Twilio 短信代理服务已经有非常多成熟的方案,比如国内的阿里云.这次我介绍的是国外的一个代理商「Twilio」,使用邮箱注册即送 15

  • 使用11行Python代码盗取了室友的U盘内容

    序言 那个猥琐的家伙整天把个U盘藏着当宝,到睡觉了就拿出来插到电脑上. 我决定想个办法看他U盘里都藏了什么,直接去抢U盘是不可能的,骗也是不可能的.那不是丢我Python程序员的脸? 我必须在电脑上智取,而且不能被他发现. 这个是我的思路: 当一个usb插入时,在后台自动把usb里的东西拷贝到本地或上传到某个服务器. 那么我就可以先借他电脑玩一会,然后把我写好的Python程序在电脑后台运行.每当有usb插入的时候,就自动拷贝文件. 如何判断U盘是否插入? 首先打开电脑终端,进入/Volumes

随机推荐