python常用小脚本实例总结

目录
  • 前言
  • 打印16进制字符串
  • 文件合并
  • 多线程下载图集
  • 多线程下载图片
  • 爬虫抓取信息
  • 爬虫多线程下载电影名称
  • 串口转tcp工具
  • 远程读卡器server端
  • 黑客rtcp反向链接
  • 调用c的动态库示例
  • tcp的socket连接报文测试工具
  • 报文拼接与加解密测试
  • 二进制文件解析工具
  • 抓取动漫图片
  • 抓取网站模板
  • 总结

前言

日常生活中常会遇到一些小任务,如果人工处理会很麻烦。

用python做些小脚本处理,能够提高不少效率。或者可以把python当工具使用,辅助提高一下办公效率。(比如我常拿python当计算器,计算和字符转换用)

以下总结下个人用到的一些python小脚本留作备忘。

打印16进制字符串

用途:通信报文中的hex数据不好看,可以打印为16进制的字符串显示出来。

#coding=utf-8
#name: myutil.py
def print_hex1(s,prev='0x'):
    for c in s:
        print '%s%02x' %(prev,ord(c)),
    print
def print_hex(s):
    for c in s:
        print '%02x' %(ord(c)),
    print
print 'myutil'

def print_hex3(s,prev='0x'):
    i = 0
    for c in s:
        print '%s%s,' %(prev,s[i:i+2]),
        i += 2
    print

文件合并

之前搞单片机时生成的hex应用程序文件不能直接刷到单片机里,还需要把iap程序合并成一个文件才能烧写到单片机。每次打包麻烦,做个脚本处理:

#path='C:\\Users\\test\\IAP_CZ_v204w.hex'
#file=open(path,'r')
#for ll in file.readlines()
#    print ll
#coding=gb18030
import time
import os
def prr():
    print 'file combination begin..'

path0=os.getcwd()
print path0
path=path0
#path1=path0
path2=path0
path+='\\IAP_CZ_v204w.hex'
#path1+='\\NC_armStaSystem.hex'
path2+='\\'
print path
s=raw_input('enter file path:')
path1=s
#path1+='\\NC_armStaSystem.hex'
print path1
s=raw_input('enter file name:')
path2+=s
path2+=time.strftime('_%y%m%d%H%M%S')
path2+='.hex'
print path2
prr()
try:
    f1=open(path,'r')
    count=0
    for l in f1.readlines():
    #    print l
        count+=1
    #print count
    f1.close()
    f1=open(path,'r')
    f2=open(path1,'r')
    f3=open(path2,'w')
    while(count>1):
        l=f1.readline()
    #   print l
        f3.write(l)
        count-=1
    #   print count
    f3.flush()
    for l in f2.readlines():
        f3.write(l)
    f3.flush()
    f3.close()
    print 'combination success!'
except Exception,ex:
    print 'excettion occured!'
    print ex
    s=raw_input('press any key to continue...')
finally:
    f1.close()
    f2.close()
    s=raw_input('press any key to continue...')

多线程下载图集

网上好看的动漫图集,如果手工下载太费时了。简单分析下网页地址规律,写个多线程脚本搞定。

#!/usr/bin/python
# -*- coding: utf-8 -*-
# filename: paxel.py

'''It is a multi-thread downloading tool

    It was developed follow axel.
        Author: volans
        E-mail: volansw [at] gmail.com
'''

import sys
import os
import time
import urllib
from threading import Thread

local_proxies = {'http': 'http://131.139.58.200:8080'}

class AxelPython(Thread, urllib.FancyURLopener):
    '''Multi-thread downloading class.

        run() is a vitural method of Thread.
    '''
    def __init__(self, threadname, url, filename, ranges=0, proxies={}):
        Thread.__init__(self, name=threadname)
        urllib.FancyURLopener.__init__(self, proxies)
        self.name = threadname
        self.url = url
        self.filename = filename
        self.ranges = ranges
        self.downloaded = 0

    def run(self):
        '''vertual function in Thread'''
        try:
            self.downloaded = os.path.getsize( self.filename )
        except OSError:
            #print 'never downloaded'
            self.downloaded = 0

        # rebuild start poind
        self.startpoint = self.ranges[0] + self.downloaded

        # This part is completed
        if self.startpoint >= self.ranges[1]:
            print 'Part %s has been downloaded over.' % self.filename
            return

        self.oneTimeSize = 16384 #16kByte/time
        print 'task %s will download from %d to %d' % (self.name, self.startpoint, self.ranges[1])

        self.addheader("Range", "bytes=%d-%d" % (self.startpoint, self.ranges[1]))

        self.urlhandle = self.open( self.url )

        data = self.urlhandle.read( self.oneTimeSize )
        while data:
            filehandle = open( self.filename, 'ab+' )
            filehandle.write( data )
            filehandle.close()

            self.downloaded += len( data )
            #print "%s" % (self.name)
            #progress = u'\r...'

            data = self.urlhandle.read( self.oneTimeSize )

def GetUrlFileSize(url, proxies={}):
    urlHandler = urllib.urlopen( url, proxies=proxies )
    headers = urlHandler.info().headers
    length = 0
    for header in headers:
        if header.find('Length') != -1:
            length = header.split(':')[-1].strip()
            length = int(length)
    return length

def SpliteBlocks(totalsize, blocknumber):
    blocksize = totalsize/blocknumber
    ranges = []
    for i in range(0, blocknumber-1):
        ranges.append((i*blocksize, i*blocksize +blocksize - 1))
    ranges.append(( blocksize*(blocknumber-1), totalsize -1 ))

    return ranges
def islive(tasks):
    for task in tasks:
        if task.isAlive():
            return True
    return False

def paxel(url, output, blocks=6, proxies=local_proxies):
    ''' paxel
    '''
    size = GetUrlFileSize( url, proxies )
    ranges = SpliteBlocks( size, blocks )

    threadname = [ "thread_%d" % i for i in range(0, blocks) ]
    filename = [ "tmpfile_%d" % i for i in range(0, blocks) ]

    tasks = []
    for i in range(0,blocks):
        task = AxelPython( threadname[i], url, filename[i], ranges[i] )
        task.setDaemon( True )
        task.start()
        tasks.append( task )

    time.sleep( 2 )
    while islive(tasks):
        downloaded = sum( [task.downloaded for task in tasks] )
        process = downloaded/float(size)*100
        show = u'\rFilesize:%d Downloaded:%d Completed:%.2f%%' % (size, downloaded, process)
        sys.stdout.write(show)
        sys.stdout.flush()
        time.sleep( 0.5 )

    filehandle = open( output, 'wb+' )
    for i in filename:
        f = open( i, 'rb' )
        filehandle.write( f.read() )
        f.close()
        try:
            os.remove(i)
            pass
        except:
            pass

    filehandle.close()

if __name__ == '__main__':
    url = "http://xz1.mm667.com/xz84/images/001.jpg"
    output = '001.jpg'
    paxel( url, output, blocks=4, proxies={} )

多线程下载图片

多线程下载图片并存储到指定目录中,若目录不存在则自动创建。

# -*- coding: UTF-8 -*-
'''
import re
import urllib
urls='http://xz5.mm667.com/xz82/images/01.jpg'
def getHtml(url):
    page = urllib.urlopen(url)
    html = page.read()
    return html
def getImg(html):
    reg = r'src="(.+?\.jpg)" pic_ext'
    imgre = re.compile(reg)
    imglist = imgre.findall(html)
    x = 0
    for imgurl in imglist:
        urllib.urlretrieve(imgurl,'%s.jpg' % x)
        x = x + 1
html = getHtml("http://tieba.baidu.com/p/2460150866")
getImg(html)
'''
import re
import urllib
import threading
import time
import socket
socket.setdefaulttimeout(30)
urls=[]
j=0
for i in xrange(1,81):
    if (i-1)%4 == 0:
        j += 1
    if ((j-1)%5) == 0 :
        j=1
    site='http://xz%d.mm667.com/xz%02d/images/' %(j,i)
    urls.append(site)
    print urls[i-1]
#print urls
'''
urls.append('http://xz1.mm667.com/xz01/images/')
urls.append('http://xz1.mm667.com/xz02/images/')
urls.append('http://xz1.mm667.com/xz03/images/')
urls.append('http://xz1.mm667.com/xz04/images/')
urls.append('http://xz1.mm667.com/xz84/images/')
urls.append('http://xz2.mm667.com/xz85/images/')
urls.append('http://xz3.mm667.com/xz86/images/')
urls.append('http://xz1.mm667.com/s/')
urls.append('http://xz1.mm667.com/p/')
'''
def mkdir(path):
    # 引入模块
    import os
    # 去除首位空格
    path=path.strip()
    # 去除尾部 \ 符号
    path=path.rstrip("\\")
    # 判断路径是否存在
    # 存在     True
    # 不存在   False
    isExists=os.path.exists(path)
    # 判断结果
    if not isExists:
        # 如果不存在则创建目录
        print path+u' 创建成功'
        # 创建目录操作函数
        os.makedirs(path)
        return True
    else:
        # 如果目录存在则不创建,并提示目录已存在
        print path+u' 目录已存在'
        return False

def cbk(a,b,c):
    '''''回调函数
    @a: 已经下载的数据块
    @b: 数据块的大小
    @c: 远程文件的大小
    '''
    per = 100.0 * a * b / c
    if per > 100:
        per = 100
    print '%.2f%%' % per

#url = 'http://www.sina.com.cn'
local = 'd:\\mysite\\pic1\\'
d=0
mutex = threading.Lock()
# mutex1 = threading.Lock()
class MyThread(threading.Thread):
    def __init__(self, url, name):
        threading.Thread.__init__(self)
        self.url=url
        self.name=name
    def run(self):
        mutex.acquire()
        print
        print 'down from %s' % self.url
        time.sleep(1)
        mutex.release()
        try:
            urllib.urlretrieve(self.url, self.name)
        except Exception,e:
            print e
            time.sleep(1)
            urllib.urlretrieve(self.url, self.name)
threads=[]    

for u in urls[84:]:
    d += 1
    local = 'd:\\mysite\\pic1\\%d\\' %d
    mkdir(local)
    print 'download begin...'
    for i in xrange(40):
        lcal = local
        url=u
        url += '%03d.jpg' %i
        lcal += '%03d.jpg' %i
        th = MyThread(url,lcal)
        threads.append(th)
        th.start()
# for t in threads:
#     t.join()
print 'over! download finished'

爬虫抓取信息

#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
Python爬虫,抓取一卡通相关企业信息
Anthor: yangyongzhen
Version: 0.0.2
Date: 2014-12-14
Language: Python2.7.5
Editor: Sublime Text2
"""

import urllib2, re, string
import threading, Queue, time
import sys
import os
from bs4 import BeautifulSoup
#from pprint import pprint

reload(sys)
sys.setdefaultencoding('utf8')
_DATA = []
FILE_LOCK = threading.Lock()
SHARE_Q = Queue.Queue()  #构造一个不限制大小的的队列
_WORKER_THREAD_NUM = 3  #设置线程的个数

_Num = 0 #总条数
class MyThread(threading.Thread) :

    def __init__(self, func,num) :
        super(MyThread, self).__init__()  #调用父类的构造函数
        self.func = func  #传入线程函数逻辑
        self.thread_num = num
    def run(self) :
        self.func()
        #print u'线程ID:',self.thread_num

def worker() :
    global SHARE_Q
    while not SHARE_Q.empty():
        url = SHARE_Q.get() #获得任务
        my_page = get_page(url)
        find_data(my_page)  #获得当前页面的数据
        #write_into_file(temp_data)
        time.sleep(1)
        SHARE_Q.task_done()

def get_page(url) :
    """
    根据所给的url爬取网页HTML
    Args:
        url: 表示当前要爬取页面的url
    Returns:
        返回抓取到整个页面的HTML(unicode编码)
    Raises:
        URLError:url引发的异常
    """
    try :
        html = urllib2.urlopen(url).read()
        my_page = html.decode("gbk",'ignore')
        #my_page = unicode(html,'utf-8','ignore').encode('utf-8','ignore')
        #my_page = urllib2.urlopen(url).read().decode("utf8")
    except urllib2.URLError, e :
        if hasattr(e, "code"):
            print "The server couldn't fulfill the request."
            print "Error code: %s" % e.code
        elif hasattr(e, "reason"):
            print "We failed to reach a server. Please check your url and read the Reason"
            print "Reason: %s" % e.reason
    return my_page

def find_data(my_page) :
    """
    通过返回的整个网页HTML, 正则匹配名称

    Args:
        my_page: 传入页面的HTML文本用于正则匹配
    """
    global _Num
    temp_data = []
    items = BeautifulSoup(my_page).find_all("div", style="width:96%;margin:10px;border-bottom:1px #CCC dashed;padding-bottom:10px;")
    for index, item in enumerate(items) :
        #print item
        #print item.h1
        #print h.group()
        #temp_data.append(item)
        #print item.find(re.compile("^a"))
        href = item.find(re.compile("^a"))
        #soup = BeautifulSoup(item)
        #公司名称
        if item.a:
            data = item.a.string.encode("gbk","ignore")
            print data
            temp_data.append(data)

        goods = item.find_all("div", style="font-size:12px;")

        #经营产品与联系方式
        for i in goods:
            data = i.get_text().encode("gbk","ignore")
            temp_data.append(data)
            print data
        #b = item.find_all("b")
        #print b
        #链接地址
        pat = re.compile(r'href="([^"]*)"')
        h = pat.search(str(item))
        if h:
            #print h.group(0)
            href = h.group(1)
            print href
            temp_data.append(h.group(1))

        _Num += 1
        #b = item.find_all(text=re.compile("Dormouse"))
        #pprint(goods)
        #print href
        #pat = re.compile(r'title="([^"]*)"')
        #h = pat.search(str(href))
        #if h:
            #print h.group(1)
            #temp_data.append(h.group(1))
    _DATA.append(temp_data)

#headers = {'User-Agent':"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}##浏览器请求头(大部分网站没有这个请求头会报错、请务必加上哦)
#all_url = 'http://www.mzitu.com/all'  ##开始的URL地址
#start_html = requests.get(all_url,  headers=headers)  ##使用requests中的get方法来获取all_url(就是:http://www.mzitu.com/all这个地址)的内容 headers为上面设置的请求头、请务必参考requests官方文档解释
#print(start_html.text) ##打印出start_html (请注意,concent是二进制的数据,一般用于下载图片、视频、音频、等多媒体内容是才使用concent, 对于打印网页内容请使用text)

def main() :
    global SHARE_Q
    threads = []
    start = time.clock()
    douban_url = "http://company.yktworld.com/comapny_search.asp?page={page}"
    #向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务
    for index in xrange(20) :
        SHARE_Q.put(douban_url.format(page = index * 1))
    for i in xrange(_WORKER_THREAD_NUM) :
        thread = MyThread(worker,i)
        thread.start()  #线程开始处理任务

        threads.append(thread)
    for thread in threads :
        thread.join()
    SHARE_Q.join()
    i = 0
    with open("down.txt", "w+") as my_file :
        for page in _DATA :
            i += 1
            for name in page:
                my_file.write(name + "\n")

    print "Spider Successful!!!"
    end = time.clock()
    print u'抓取完成!'
    print u'总页数:',i
    print u'总条数:',_Num
    print u'一共用时:',end-start,u'秒'

if __name__ == '__main__':
    main()

爬虫多线程下载电影名称

#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
Python爬虫
Anthor: yangyongzhen
Version: 0.0.2
Date: 2014-12-14
Language: Python2.7.8
Editor: Sublime Text2
"""

import urllib2, re, string
import threading, Queue, time
import sys
import os
from bs4 import BeautifulSoup

reload(sys)
sys.setdefaultencoding('utf8')
_DATA = []
FILE_LOCK = threading.Lock()
SHARE_Q = Queue.Queue()  #构造一个不限制大小的的队列
_WORKER_THREAD_NUM = 3  #设置线程的个数

rootpath = os.getcwd()+u'/抓取的内容/'

def makedir(path):
    if not os.path.isdir(path):
        os.makedirs(path)

#创建抓取的根目录
#makedir(rootpath)
#显示下载进度
def Schedule(a,b,c):
    '''''
    a:已经下载的数据块
    b:数据块的大小
    c:远程文件的大小
   '''
    per = 100.0 * a * b / c
    if per > 100 :
        per = 100
    print '%.2f%%' % per
class MyThread(threading.Thread) :

    def __init__(self, func) :
        super(MyThread, self).__init__()  #调用父类的构造函数
        self.func = func  #传入线程函数逻辑

    def run(self) :
        self.func()

def worker() :
    print 'work thread start...\n'
    global SHARE_Q
    while not SHARE_Q.empty():
        url = SHARE_Q.get() #获得任务
        my_page = get_page(url)
        find_title(my_page)  #获得当前页面的电影名
        #write_into_file(temp_data)
        time.sleep(1)
        SHARE_Q.task_done()

def get_page(url) :
    """
    根据所给的url爬取网页HTML
    Args:
        url: 表示当前要爬取页面的url
    Returns:
        返回抓取到整个页面的HTML(unicode编码)
    Raises:
        URLError:url引发的异常
    """
    try :
        html = urllib2.urlopen(url).read()
        my_page = html.decode("utf8")
        #my_page = unicode(html,'utf-8','ignore').encode('utf-8','ignore')
        #my_page = urllib2.urlopen(url).read().decode("utf8")
    except urllib2.URLError, e :
        if hasattr(e, "code"):
            print "The server couldn't fulfill the request."
            print "Error code: %s" % e.code
        elif hasattr(e, "reason"):
            print "We failed to reach a server. Please check your url and read the Reason"
            print "Reason: %s" % e.reason
    return my_page

def find_title(my_page) :
    """
    通过返回的整个网页HTML, 正则匹配前100的电影名称

    Args:
        my_page: 传入页面的HTML文本用于正则匹配
    """
    temp_data = []
    movie_items = BeautifulSoup(my_page).findAll('h1')
    for index, item in enumerate(movie_items) :
        #print item
        #print item.h1
        pat = re.compile(r'href="([^"]*)"')
        h = pat.search(str(item))
        if h:
            #print h.group(0)
            href = h.group(1)
            print href
            temp_data.append(h.group(1))
        #print h.group()
        #temp_data.append(item)
        #print item.find(re.compile("^a"))
        href = item.find(re.compile("^a"))
        #soup = BeautifulSoup(item)
        if item.a:
            #print item.a.string
            temp_data.append(item.a.string)
        #print href
        #pat = re.compile(r'title="([^"]*)"')
        #h = pat.search(str(href))
        #if h:
            #print h.group(1)
            #temp_data.append(h.group(1))
    _DATA.append(temp_data)

def main() :
    global SHARE_Q
    threads = []
    start = time.clock()
    douban_url = "http://movie.misszm.com/page/{page}"
    #向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务
    for index in xrange(5) :
        SHARE_Q.put(douban_url.format(page = index * 1))
    for i in xrange(_WORKER_THREAD_NUM) :
        thread = MyThread(worker)
        thread.start()  #线程开始处理任务
        threads.append(thread)
    for thread in threads :
        thread.join()
    SHARE_Q.join()
    with open("movie.txt", "w+") as my_file :
        for page in _DATA :
            for movie_name in page:
                my_file.write(movie_name + "\n")
    print "Spider Successful!!!"
    end = time.clock()
    print u'抓取完成!'
    print u'一共用时:',end-start,u'秒'

if __name__ == '__main__':
    main()

串口转tcp工具

#coding=utf-8
#author:yangyongzhen
#QQ:534117529
#'CardTest TcpServer  - Simple Test Card Tool 1.00' 

import sys,threading,time;
import serial;
import binascii,encodings;
import re;
import os;
from socket import *
from struct import *;
#from myutil import *;
#name: myutil.py

mylock = threading.RLock() 

Server_IP = ''
Srever_Port = ''

def print_hex1(s,prev='0x'):
    for c in s:
        print '%s%02x' %(prev,ord(c)),
    print
def print_hex(s):
    for c in s:
        print '%02x' %(ord(c)),
    print

def hexto_str(s):
    r =''
    for c in s:
        r += '%02x' %(ord(c))
    return r
def strto_hex(s):
    r = s.decode('hex')
    return r
#''代表服务器为localhost

#在一个非保留端口号上进行监听

class ComThread:
    def __init__(self, Port=0):
        self.l_serial = None;
        self.alive = False;
        self.waitEnd = None;
        self.port = Port;

        #TCP部分
        #self.sockobj = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.connection = None
        #数据
        self.snddata = ''
        self.rcvdata = ''

    def waiting(self):
        if not self.waitEnd is None:
            self.waitEnd.wait();

    def SetStopEvent(self):
        if not self.waitEnd is None:
            self.waitEnd.set();
        self.alive = False;
        self.stop();

    def start(self):
        self.l_serial = serial.Serial();
        self.l_serial.port = self.port;
        self.l_serial.baudrate = 115200;
        self.l_serial.timeout = 2;  #秒
        self.l_serial.open();

        if self.l_serial.isOpen():
            self.waitEnd = threading.Event();
            self.alive = True;
            print 'open serial port %d ok!\n' %(self.port+1)
            print 'baudrate:115200 \n'
            self.thread_read = None;
            self.thread_read = threading.Thread(target=self.FirstReader);
            self.thread_read.setDaemon(1);
            self.thread_read.start();

            self.thread_write = None;
            self.thread_write = threading.Thread(target=self.FirstWriter);
            self.thread_write.setDaemon(1);
            self.thread_write.start();

            #TCP部分
            self.thread_TcpClient = None;
            self.thread_TcpClient = threading.Thread(target=self.TcpClient);
            self.thread_TcpClient.setDaemon(1);
            self.thread_TcpClient.start();

            self.thread_TcpSend = None;
            self.thread_TcpSend = threading.Thread(target=self.TcpSend);
            self.thread_TcpSend.setDaemon(1);
            self.thread_TcpSend.start();

            return True;
        else:
            return False;

    def FirstReader(self):
        while self.alive:
            # 接收间隔
            time.sleep(0.1);
            try:
                data = '';
                n = self.l_serial.inWaiting();
                if n:
                    data = data+self.l_serial.read(n);
                    #for l in xrange(len(data)):
                        #print '%02X' % ord(data[l]),
                    # 发送数据
                    print u'->请求:'
                    print data;
                    mylock.acquire()
                    self.snddata = data
                    mylock.release()
                    #print_hex(data);

                # 判断结束

            except Exception, ex:
                print str(ex);

        self.waitEnd.set();
        self.alive = False;

    def FirstWriter(self):
        while self.alive:
            # 接收间隔
            time.sleep(0.1);
            try:
                #snddata = raw_input('\nenter data send:\n')
                if self.rcvdata!='':
                    self.l_serial.write(self.rcvdata);
                    print u'-<应答:'
                    print self.rcvdata;
                    mylock.acquire()
                    self.rcvdata = '';
                    mylock.release()
                #print_hex(snddata);

            except Exception, ex:
                print str(ex);
        self.waitEnd.set();
        self.alive = False;

    def TcpClient(self):
        while True:
            # 接收间隔
            time.sleep(0.1);
            self.connection = socket(AF_INET, SOCK_STREAM);
            self.connection.connect((Server_IP, int(Server_Port)));
            print 'Connect to Server OK!';
            self.snddata = ''
            self.rcvdata = ''
            while True:
                #读取客户端套接字的下一行
                data = self.connection.recv(1024)
                #如果没有数量的话,那么跳出循环
                if not data: break
                #发送一个回复至客户端
                mylock.acquire()
                self.snddata = ''
                self.rcvdata = data
                mylock.release()
                #connection.send('Echo=>' + data)
            self.connection.close()

        self.waitEnd.set();
        self.alive = False;

    def TcpSend(self):
        while True:
            # 接收间隔
            time.sleep(0.1);
            while True:
                time.sleep(0.1);
                try:
                    if not self.connection is None:
                        if self.snddata != '':
                            self.connection.send(self.snddata)
                            mylock.acquire()
                            self.rcvdata = ''
                            self.snddata = ''
                            mylock.release()
                except Exception, ex:
                    pass
    def stop(self):
        self.alive = False;
        self.thread_read.join();
        if self.l_serial.isOpen():
            self.l_serial.close();

#测试用部分
if __name__ == '__main__':
    print 'Serial to Tcp Tool 1.00\n'
    print 'Author:yangyongzhen\n'
    print 'QQ:534117529\n'
    print 'Copyright (c) **cap 2015-2016.\n'

    Server_IP = raw_input('please enter ServerIP:')
    print 'Server_IP: %s' %(Server_IP)
    Server_Port = raw_input('please enter ServerPort:')
    print 'Server_Port: %s' %(Server_Port)
    com =raw_input('please enter com port(1-9):')
    rt = ComThread(int(com)-1);
    try:
        if rt.start():
            rt.waiting();
            rt.stop();
        else:
            pass;
    except Exception,se:
        print str(se);

    if rt.alive:
        rt.stop();
    os.system("pause")

    print '';
    print 'End OK .';
    del rt;

远程读卡器server端

很早之前做过一个远程读卡器工具,原理就是在现场客服电脑上装个python做的tcpserver服务端,操控现场的读卡器。在公司内部做个客户端连接过去,这样实现在公司调试现场的卡片业务。

这个就是服务端工具的实现:

#coding=utf-8
#author:yangyongzhen
#QQ:534117529
#'CardTest TcpServer  - Simple Test Card Tool 1.00' 

import sys,threading,time;
import serial;
import binascii,encodings;
import re;
import os;
from socket import *
from struct import *;
#from myutil import *;
#name: myutil.py

mylock = threading.RLock() 

def print_hex1(s,prev='0x'):
    for c in s:
        print '%s%02x' %(prev,ord(c)),
    print
def print_hex(s):
    for c in s:
        print '%02x' %(ord(c)),
    print

def hexto_str(s):
    r =''
    for c in s:
        r += '%02x' %(ord(c))
    return r
def strto_hex(s):
    r = s.decode('hex')
    return r
#''代表服务器为localhost

#在一个非保留端口号上进行监听

class ComThread:
    def __init__(self, Port=0):
        self.l_serial = None;
        self.alive = False;
        self.waitEnd = None;
        self.port = Port;

        #TCP部分
        self.myHost = ''
        self.myPort = 5050
        self.sockobj = socket(AF_INET, SOCK_STREAM)
        self.connection = None
        #数据
        self.snddata = ''
        self.rcvdata = ''

    def waiting(self):
        if not self.waitEnd is None:
            self.waitEnd.wait();

    def SetStopEvent(self):
        if not self.waitEnd is None:
            self.waitEnd.set();
        self.alive = False;
        self.stop();

    def start(self):
        self.l_serial = serial.Serial();
        self.l_serial.port = self.port;
        self.l_serial.baudrate = 115200;
        self.l_serial.timeout = 2;  #秒
        self.l_serial.open();

        if self.l_serial.isOpen():
            self.waitEnd = threading.Event();
            self.alive = True;
            print 'open serial port %d ok!\n' %(self.port+1)
            print 'baudrate:115200 \n'
            self.thread_read = None;
            self.thread_read = threading.Thread(target=self.FirstReader);
            self.thread_read.setDaemon(1);
            self.thread_read.start();

            self.thread_write = None;
            self.thread_write = threading.Thread(target=self.FirstWriter);
            self.thread_write.setDaemon(1);
            self.thread_write.start();

            #TCP部分
            self.thread_TcpServer = None;
            self.thread_TcpServer = threading.Thread(target=self.TcpServer);
            self.thread_TcpServer.setDaemon(1);
            self.thread_TcpServer.start();

            self.thread_TcpSend = None;
            self.thread_TcpSend = threading.Thread(target=self.TcpSend);
            self.thread_TcpSend.setDaemon(1);
            self.thread_TcpSend.start();

            return True;
        else:
            return False;

    def FirstReader(self):
        while self.alive:
            # 接收间隔
            time.sleep(0.1);
            try:
                data = '';
                n = self.l_serial.inWaiting();
                if n:
                    data = data+self.l_serial.read(n);
                    #for l in xrange(len(data)):
                        #print '%02X' % ord(data[l]),
                    # 发送数据
                    print 'serial recv:'
                    print data;
                    mylock.acquire()
                    self.snddata = data
                    mylock.release()
                    #print_hex(data);

                # 判断结束

            except Exception, ex:
                print str(ex);

        self.waitEnd.set();
        self.alive = False;

    def FirstWriter(self):
        while self.alive:
            # 接收间隔
            time.sleep(0.1);
            try:
                #snddata = raw_input('\nenter data send:\n')
                if self.rcvdata!='':
                    self.l_serial.write(self.rcvdata);
                    print 'serial send:'
                    print self.rcvdata;
                    mylock.acquire()
                    self.rcvdata = '';
                    mylock.release()
                #print_hex(snddata);

            except Exception, ex:
                print str(ex);
        self.waitEnd.set();
        self.alive = False;

    def TcpServer(self):
        self.sockobj.bind((self.myHost, self.myPort))
        self.sockobj.listen(10)
        print 'TcpServer listen at 5050 oK!\n'
        print 'Waiting for connect...\n'
        while True:
            # 接收间隔
            time.sleep(0.1);
            self.connection, address = self.sockobj.accept()
            print 'Server connected by', address
            self.snddata = ''
            self.rcvdata = ''
            try:
                while True:
                    #读取客户端套接字的下一行
                    data = self.connection.recv(1024)
                    #如果没有数量的话,那么跳出循环
                    if not data: break
                    #发送一个回复至客户端
                    mylock.acquire()
                    self.snddata = ''
                    self.rcvdata = data
                    mylock.release()
                    #connection.send('Echo=>' + data)
                self.connection.close()
            except Exception, ex:
                self.connection.close()

        self.waitEnd.set();
        self.alive = False;

    def TcpSend(self):
        while True:
            # 接收间隔
            time.sleep(0.1);
            while True:
                time.sleep(0.1);
                try:
                    if not self.connection is None:
                        if self.snddata != '':
                            self.connection.send(self.snddata)
                            mylock.acquire()
                            self.rcvdata = ''
                            self.snddata = ''
                            mylock.release()
                except Exception, ex:
                    pass
    def stop(self):
        self.alive = False;
        self.thread_read.join();
        if self.l_serial.isOpen():
            self.l_serial.close();

#测试用部分
if __name__ == '__main__':
    print 'CardTest TcpServer  - Simple Test Card Tool 1.00\n'
    print 'Author:yangyongzhen\n'
    print 'QQ:534117529\n'
    print 'Copyright (c) **** 2015-2016.\n'

    com =raw_input('please enter com port(1-9):')
    rt = ComThread(int(com)-1);
    try:
        if rt.start():
            rt.waiting();
            rt.stop();
        else:
            pass;
    except Exception,se:
        print str(se);

    if rt.alive:
        rt.stop();
    os.system("pause")

    print '';
    print 'End OK .';
    del rt;

黑客rtcp反向链接

# -*- coding: utf-8 -*-

'''
filename:rtcp.py
@desc:
利用python的socket端口转发,用于远程维护
如果连接不到远程,会sleep 36s,最多尝试200(即两小时)
@usage:
./rtcp.py stream1 stream2
stream为:l:port或c:host:port
l:port表示监听指定的本地端口
c:host:port表示监听远程指定的端口
@author: watercloud, zd, knownsec team
@web: www.knownsec.com, blog.knownsec.com
@date: 2009-7
'''

import socket
import sys
import threading
import time

streams = [None, None]  # 存放需要进行数据转发的两个数据流(都是SocketObj对象)
debug = 1  # 调试状态 0 or 1

def print_hex(s):
    for c in s:
        print '%02x' %(ord(c)),
    print
def _usage():
    print 'Usage: ./rtcp.py stream1 stream2\nstream : L:port  or C:host:port'

def _get_another_stream(num):
    '''
    从streams获取另外一个流对象,如果当前为空,则等待
    '''
    if num == 0:
        num = 1
    elif num == 1:
        num = 0
    else:
        raise "ERROR"

    while True:
        if streams[num] == 'quit':
            print("can't connect to the target, quit now!")
            sys.exit(1)

        if streams[num] != None:
            return streams[num]
        else:
            time.sleep(1)

def _xstream(num, s1, s2):
    '''
    交换两个流的数据
    num为当前流编号,主要用于调试目的,区分两个回路状态用。
    '''
    try:
        while True:
            #注意,recv函数会阻塞,直到对端完全关闭(close后还需要一定时间才能关闭,最快关闭方法是shutdow)
            buff = s1.recv(1024)
            if debug > 0:
                print num,"recv"
            if len(buff) == 0: #对端关闭连接,读不到数据
                print num,"one closed"
                break
            s2.sendall(buff)
            if debug > 0:
                print num,"sendall"
                print_hex(buff)
    except :
        print num,"one connect closed."

    try:
        s1.shutdown(socket.SHUT_RDWR)
        s1.close()
    except:
        pass

    try:
        s2.shutdown(socket.SHUT_RDWR)
        s2.close()
    except:
        pass

    streams[0] = None
    streams[1] = None
    print num, "CLOSED"

def _server(port, num):
    '''
    处理服务情况,num为流编号(第0号还是第1号)
    '''
    srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    srv.bind(('0.0.0.0', port))
    srv.listen(1)
    #print 'local listening at port %d' (%(port))
    while True:
        conn, addr = srv.accept()
        print "connected from:", addr
        streams[num] = conn  # 放入本端流对象
        s2 = _get_another_stream(num)  # 获取另一端流对象
        _xstream(num, conn, s2)

def _connect(host, port, num):
    '''    处理连接,num为流编号(第0号还是第1号)
    @note: 如果连接不到远程,会sleep 36s,最多尝试200(即两小时)
    '''
    not_connet_time = 0
    wait_time = 36
    try_cnt = 199
    while True:
        if not_connet_time > try_cnt:
            streams[num] = 'quit'
            print('not connected')
            return None

        conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            conn.connect((host, port))
        except Exception, e:
            print ('can not connect %s:%s!' % (host, port))
            not_connet_time += 1
            time.sleep(wait_time)
            continue

        print "connected to %s:%i" % (host, port)
        streams[num] = conn  #放入本端流对象
        s2 = _get_another_stream(num) #获取另一端流对象
        _xstream(num, conn, s2)

if __name__ == '__main__':
    print 'Tcp to Tcp Tool 1.00\n'
    print 'Author:yangyongzhen\n'
    print 'QQ:534117529\n'
    print 'Copyright (c) Newcapec 2015-2016.\n'
    Server_IP = raw_input('please enter Server IP:')
    print 'Server_IP: %s' %(Server_IP)
    Server_Port = raw_input('please enter Server Port:')
    print 'Server_Port: %s' %(Server_Port)
    com =raw_input('please enter Local Port:')
    tlist = []  # 线程列表,最终存放两个线程对象
    #targv = [sys.argv[1], sys.argv[2] ]
    t = threading.Thread(target=_server, args=(int(com), 0))
    tlist.append(t)

    t = threading.Thread(target=_connect, args=(Server_IP, int(Server_Port), 1))
    tlist.append(t)

    for t in tlist:
        t.start()
    for t in tlist:
        t.join()
    sys.exit(0)

调用c的动态库示例

# -*- coding:utf8 -*-
from ctypes import *
from binascii import unhexlify as unhex
import os
dll = cdll.LoadLibrary('mydll.dll');

print 'begin load mydll..'
#key
#str1='\x9B\xED\x98\x89\x15\x80\xC3\xB2'
str1=unhex('0000556677222238')
#data
str2=unhex('002d2000000100015566772222383CD881604D0D286A556677222238000020141214181427')
#output
str3='\x12\x34\x56\x78\x12\x34\x56\x78'
pstr1=c_char_p()
pstr2=c_char_p()
pstr3=c_char_p()
pstr1.value=str1
pstr2.value=str2
pstr3.value=str3
dll.CurCalc_DES_MAC64(805306481,pstr1,0,pstr2,13,pstr3)
print pstr1
print pstr2
print pstr3
stro= pstr3.value
print stro
strtemp=''
for c in stro:
    print "%02x" % (ord(c))
    strtemp+="{0:02x}".format(ord(c))
print strtemp
os.execlp("E:\\RSA.exe",'')
s=raw_input('press any key to continue...')

tcp的socket连接报文测试工具

# -*- coding: utf-8 -*-
import socket
from myutil import *
from binascii import unhexlify as unhex
from ctypes import *
dll = cdll.LoadLibrary('mydll.dll')
print 'begin load mydll..'
HOST, PORT = "192.168.51.28", 5800
sd ="1234567812345678"
# Create a socket (SOCK_STREAM means a TCP socket)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
    # Connect to server and send data
    sock.connect((HOST, int(PORT))
    print "Sent1 OK:"
    print sd
    # Receive data from the server and shut down
    received = sock.recv(1024)
    print "Received:"
    print_hex(received)
    print  'received len is 0x%02x' %(len(received))
    print  'received data analysis...'
    re1=received[0:4]
    print_hex(re1)
    re1=received[4:6]
    print_hex(re1)
    re1=received[6:10]
    print_hex(re1)
    re1=received[10:16]
    print_hex(re1)

    #pack2 send
    sock.send(sd2.decode('hex'))
    print "Sent2 OK:"
    print sd2
    # Receive data from the server and shut down
    received1 = sock.recv(1024)
    print "Received1:"
    print_hex(received1)
    print  'received1 len is 0x%02x' %(len(received1))

finally:
    sock.close()

s=raw_input('press any key to continue...')

报文拼接与加解密测试

# -*- coding: gb2312 -*-
import socket
from myutil import *
from binascii import unhexlify as unhex
from ctypes import *
dll = cdll.LoadLibrary('mydll.dll')
print 'begin load mydll..'
#key
key='\xF1\xE2\xD3\xC4\xF1\xE2\xD3\xC4'
#output MAC
mac='\x00'*8
data='\x00'*8
pkey=c_char_p()
pdata=c_char_p()
pmac=c_char_p()
pkey.value=key
pdata.value=data
pmac.value=mac
#pack1
class pack:
   pass
pk=pack()
pk.len='00000032'
pk.ID='0001'
pk.slnum='00000004'
pk.poscode='123456781234'
pk.rand='1122334455667788'
pk.psam='313233343536'
pk.kind='0000'
pk.ver='000001'
pk.time='20140805135601'
pk.mac='06cc571e6d96e12d'

data=unhex(pk.len+pk.ID+pk.slnum+pk.poscode+pk.rand+pk.psam+pk.kind+pk.ver+pk.time)
#print_hex(data)
pdata.value=data
#cacl MAC
dll.CurCalc_DES_MAC64(805306481,pkey,0,pdata,42,pmac)
stro= pmac.value
strtemp=''
for c in stro:
    strtemp+="{0:02x}".format(ord(c))
#print strtemp
pk.mac=strtemp
#data to send
sd=pk.len+pk.ID+pk.slnum+pk.poscode+pk.rand+pk.psam+pk.kind+pk.ver+pk.time+pk.mac
print  'send1 len is 0x%02x' %(len(sd)/2)
print sd
#pack2
class pack2:
   pass
pk2=pack2()
pk2.len='0000006E'
pk2.ID='0012'
pk2.slnum='00000005'
pk2.fatCode='00'
pk2.cardASN='0000000000000000'
pk2.cardType='00'
pk2.userNO= '0000000000000000'

pk2.fileName1='00000000000000000000000000000015'
pk2.dataLen1='00'
pk2.dataArea1='00000000000000319999990800FB2014080620240806FFFFFFFFFFFFFFFFFFFF'
pk2.fileName2='00000000000000000000000000000016'
pk2.dataLen2='00'
pk2.dataArea2='000003E800FFFF16'
pk2.mac='06cc571e6d96e12d'

data2=unhex(pk2.len+pk2.ID+pk2.slnum+pk2.fatCode+pk2.cardASN+pk2.cardType+pk2.userNO+pk2.fileName1+pk2.dataLen1+pk2.dataArea1+pk2.fileName2+pk2.dataLen2+pk2.dataArea2)

pdata.value=data2
#cacl MAC
dll.CurCalc_DES_MAC64(805306481,pkey,0,pdata,102,pmac)
stro= pmac.value
strtemp=''
for c in stro:
    strtemp+="{0:02x}".format(ord(c))
#print strtemp
pk2.mac=strtemp
#data to send
sd2=pk2.len+pk2.ID+pk2.slnum+pk2.fatCode+pk2.cardASN+pk2.cardType+pk2.userNO+pk2.fileName1+pk2.dataLen1+pk2.dataArea1+pk2.fileName2+pk2.dataLen2+pk2.dataArea2+pk2.mac

print  'send2 len is 0x%02x' %(len(sd2)/2)
print sd2

#PORT="192.168.60.37"
#PORT="localhost"
HOST, PORT = "192.168.51.28", 5800
# Create a socket (SOCK_STREAM means a TCP socket)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
    # Connect to server and send data
    sock.connect((HOST, int(PORT))
    #data= "123456789"
    #s = struct.pack('bbb',1,2,3)
    sock.send(sd.decode('hex'))
    print "Sent1 OK:"
    print sd
    # Receive data from the server and shut down
    received = sock.recv(1024)
    print "Received:"
    print_hex(received)
    print  'received len is 0x%02x' %(len(received))
    print  'received data analysis...'
    re1=received[0:4]
    print_hex(re1)
    re1=received[4:6]
    print_hex(re1)
    re1=received[6:10]
    print_hex(re1)
    re1=received[10:16]
    print_hex(re1)

    #pack2 send
    sock.send(sd2.decode('hex'))
    print "Sent2 OK:"
    print sd2
    # Receive data from the server and shut down
    received1 = sock.recv(1024)
    print "Received1:"
    print_hex(received1)
    print  'received1 len is 0x%02x' %(len(received1))

finally:
    sock.close()

s=raw_input('press any key to continue...')

二进制文件解析工具

# -*- coding: utf-8 -*-
from myutil import *
from binascii import unhexlify as unhex
import os
path=os.getcwd()

path+='\\rec04.bin'
#print  path
print "begin ans......"

f1=open(path,'rb')
for i in range(1,35):
    s=f1.read(280)
    print "data:",i
    print_hex(s)

print 'read data is:'
print_hex(s)

recstatadd = 187
print "终端编号:"
print_hex(s[recstatadd:recstatadd+10])
print "卡号长度:"
print_hex(s[10])
print "卡号:    "
print_hex(s[11:11+10])
print "持卡序号1+所属地城市代码2+交易地城市代码2"
print_hex(s[recstatadd+22:recstatadd+22+5])
print "应用交易计数器"
print_hex(s[92:92+2])
print "交易前余额4,交易金额3"
print_hex(s[recstatadd+29:recstatadd+29+7])
print "交易日期:"
print_hex(s[99:99+3])
print "交易时间:"
print_hex(s[44:44+3])
print "终端编号"
print_hex(s[21:21+8])
print "商户编号"
print_hex(s[21+8:21+8+15])
print "批次号"
print_hex(s[5:5+3])
print "应用密文"
print_hex(s[47:47+8])
print "授权金额"
print_hex(s[103:103+6])
print "其他金额"
print_hex(s[115:115+6])
print "终端验证结果"
print_hex(s[94:5+94])
print "应用交易计数器"
print_hex(s[92:92+4])
print "卡片验证结果"
print_hex(s[56:56+32])
print "卡片序列号:"
print_hex(s[131])
f1.close()

抓取动漫图片

# -*- coding:utf8 -*-
# 2013.12.36 19:41
# 抓取dbmei.com的图片。

from bs4 import BeautifulSoup
import os, sys, urllib2,time,random

# 创建文件夹
path = os.getcwd()                        # 获取此脚本所在目录
new_path = os.path.join(path,u'暴走漫画')
if not os.path.isdir(new_path):
  os.mkdir(new_path)

def page_loop(page=1):
  url = 'http://baozoumanhua.com/all/hot/page/%s?sv=1389537379' % page
  content = urllib2.urlopen(url)
  soup = BeautifulSoup(content)

  my_girl = soup.find_all('div',class_='img-wrap')
  for girl in my_girl:
    jokes = girl.find('img')
    link = jokes.get('src')
    flink = link
    print flink
    content2 = urllib2.urlopen(flink).read()

    #with open(u'暴走漫画'+'/'+time.strftime('%H-%M-%S')+random.choice('qwertyuiopasdfghjklzxcvbnm')+flink[-5:],'wb') as code:          #在OSC上现学的
    with open(u'暴走漫画'+'/'+flink[-11:],'wb') as code:
      code.write(content2)

  page = int(page) + 1
  print u'开始抓取下一页'
  print 'the %s page' % page
  page_loop(page)

page_loop()

抓取网站模板

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# by yangyongzhen
# 2016-12-06

from bs4 import BeautifulSoup
import urllib,urllib2,os,time
import re

rootpath = os.getcwd()+u'/抓取的模板/'

def makedir(path):
    if not os.path.isdir(path):
        os.makedirs(path)

#创建抓取的根目录
makedir(rootpath)
#显示下载进度
def Schedule(a,b,c):
    '''''
    a:已经下载的数据块
    b:数据块的大小
    c:远程文件的大小
   '''
    per = 100.0 * a * b / c
    if per > 100 :
        per = 100
    print '%.2f%%' % per

def grabHref(url,listhref,localfile):
    html = urllib2.urlopen(url).read()
    html = unicode(html,'gb2312','ignore').encode('utf-8','ignore')

    content = BeautifulSoup(html).findAll('link')
    myfile = open(localfile,'w')
    pat = re.compile(r'href="([^"]*)"')
    pat2 = re.compile(r'http')
    for item in content:
        h = pat.search(str(item))
        href = h.group(1)
        if pat2.search(href):
            ans = href
        else:
            ans = url+href
        listhref.append(ans)
        myfile.write(ans)
        myfile.write('\r\n')
        print ans

    content = BeautifulSoup(html).findAll('script')
    pat = re.compile(r'src="([^"]*)"')
    pat2 = re.compile(r'http')
    for item in content:
        h = pat.search(str(item))
        if h:
            href = h.group(1)
        if pat2.search(href):
            ans = href
        else:
            ans = url+href
        listhref.append(ans)
        myfile.write(ans)
        myfile.write('\r\n')
        print ans

    content = BeautifulSoup(html).findAll('a')
    pat = re.compile(r'href="([^"]*)"')
    pat2 = re.compile(r'http')
    for item in content:
        h = pat.search(str(item))
        if h:
            href = h.group(1)
        if pat2.search(href):
            ans = href
        else:
            ans = url+href
        listhref.append(ans)
        myfile.write(ans)
        myfile.write('\r\n')
        print ans

    myfile.close()

def main():
    url = "http://192.168.72.140/qdkj/"   #采集网页的地址
    listhref =[]    #链接地址
    localfile = 'ahref.txt'  #保存链接地址为本地文件,文件名
    grabHref(url,listhref,localfile)
    listhref = list(set(listhref)) #去除链接中的重复地址

    curpath = rootpath
    start = time.clock()
    for item in listhref:
        curpath = rootpath
        name = item.split('/')[-1]
        fdir = item.split('/')[3:-1]
        for i in fdir:
            curpath += i
            curpath += '/'
        print curpath
        makedir(curpath)
        local = curpath+name
        urllib.urlretrieve(item, local,Schedule) # 远程保存函数

    end = time.clock()
    print u'模板抓取完成!'
    print u'一共用时:',end-start,u'秒'

if __name__=="__main__":

    main()

总结

到此这篇关于python常用小脚本的文章就介绍到这了,更多相关python常用小脚本内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 分享一个简单的python读写文件脚本

    先来看一段创建文件并写入文本的代码,然后作介绍. #!/usr/bin/env python 'makeFile.py -- create a file' import os ls = os.linesep # get filename while True: fname = raw_input('Input an unused file name >') if os.path.exists(fname): print "ERROR: '%s' already exists" %

  • 3种python调用其他脚本的方法

    1.用python调用python脚本 #!/usr/local/bin/python3.7 import time import os count = 0 str = ('python b.py') result1 = os.system(str) print(result1) while True: count = count + 1 if count == 8: print('this count is:',count) break else: time.sleep(1) print('t

  • python实现调用其他python脚本的方法

    本文实例讲述了python实现调用其他python脚本的方法,分享给大家供大家参考.具体方法如下: 该实例调用当前目录下的md5get.py脚本.代码如下: import os import logging import subprocess log = logging.getLogger("Core.Analysis.Processing") INTERPRETER = "/usr/bin/python" if not os.path.exists(INTERPR

  • Python 脚本的三种执行方式小结

    1.交互模式下执行 Python,这种模式下,无需创建脚本文件,直接在 Python解释器的交互模式下编写对应的 Python 语句即可. 1)打开交互模式的方式: Windows下: 在开始菜单找到"命令提示符",打开,就进入到命令行模式: 在命令行模式输入: python 即可进入 Python 的交互模式 Linux 下: 直接在终端输入 python,如果是按装了 python3 ,则根据自己建的软连接的名字进入对应版本的 Python 交互环境,例如我建立软连接使用的 pyt

  • python常用小脚本实例总结

    目录 前言 打印16进制字符串 文件合并 多线程下载图集 多线程下载图片 爬虫抓取信息 爬虫多线程下载电影名称 串口转tcp工具 远程读卡器server端 黑客rtcp反向链接 调用c的动态库示例 tcp的socket连接报文测试工具 报文拼接与加解密测试 二进制文件解析工具 抓取动漫图片 抓取网站模板 总结 前言 日常生活中常会遇到一些小任务,如果人工处理会很麻烦. 用python做些小脚本处理,能够提高不少效率.或者可以把python当工具使用,辅助提高一下办公效率.(比如我常拿python

  • Python常用小技巧总结

    本文实例总结了Python常用的小技巧.分享给大家供大家参考.具体分析如下: 1. 获取本地mac地址: import uuid mac = uuid.uuid1().hex[-12:] print(mac) 运行结果:e0cb4e077585 2. del 的使用 a = ['b','c','d'] del a[0] print(a)# 输出 ['c', 'd'] a = ['b','c','d'] del a[0:2] # 删除从第1个元素开始,到第2个元素 print(a)# 输出 ['d

  • Python常用特殊方法实例总结

    本文实例讲述了Python常用特殊方法.分享给大家供大家参考,具体如下: 1 __init__和__new__ __init__方法用来初始化类实例:__new__方法用来创建类实例. 主要的区别: 1).__init__通常用于初始化一个新实例,控制初始化的过程,发生在类实例被创建完以后.它是实例级别的方法. 2).__new__通常用于控制生成一个新实例的过程.它是类级别的方法. __new__具体的作用: 1) 继承一些不可变的class时(比如int, str, tuple),提供一个自

  • Python五子棋小游戏实例分享

    本文实例为大家分享了Python实现五子棋小游戏的具体代码,供大家参考,具体内容如下 使用的库 pygame.pyautogui 流程简述 1.画棋盘 设置网格间隔40px ,留白 80 px ,与网格横竖线数量 ,初定19 × 19 . 2.鼠标点击 鼠标点击取得坐坐标(x0 , y0),再获得最近的网格上的点(x1 , y1),再将每次动作获得的(x1 , y1 )放入列表 chess_location 中. 再通过: chess_location_b = chess_location[0:

  • Python常用断言函数实例汇总

    Python assert 语句,又称断言语句,可以看做是功能缩小版的 if 语句,它用于判断某个表达式的值,如果值为真,则程序可以继续往下执行:反之,Python 解释器会报 AssertionError 错误. assert 语句的语法结构为: assert 表达式 assert 语句的执行流程可以用 if 判断语句表示(等效),如下所示: if 表达式==True: 程序继续执行 else: 程序报 AssertionError 错误 有读者可能会问,明明 assert 会令程序崩溃,为什

  • Python常用工具之音频调整音量

    目录 前言 环境依赖 代码 验证一下 前言 本文提供将音频提升音量的python代码,一如既往的实用主义代码. 环境依赖 ffmpeg环境安装 ffmpy安装: pip install ffmpy -i https://pypi.douban.com/simple 代码 不废话上代码. #!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2021/11/24 22:45 # @Author : 剑客阿良_ALiang # @Site :

  • python 简单备份文件脚本v1.0的实例

    整体思路 将要备份的目录列为一个列表,通过执行系统命令,进行压缩.备份. 这样关键在于构造命令并使用 os.system( )来执行,一开始使用zip 命令始终没有成功,后来发现Windows下并没有这个命令,还要安装GnuWin32项目,后来安装了7z,实现了使用系统命令进行压缩. 压缩命令 通过下载7z压缩,将7z.exe 7z,dll 加入系统环境变量目录,通过以下命令进行压缩.解压7z a test.zip a.txt b.txt # 指定若干文件 7z a test.zip f:/te

  • python 捕获shell脚本的输出结果实例

    import subprocess output =Popen(["mycmd","myarg"], stdout=PIPE).communicate()[0] import subprocess p = subprocess.Popen(['ls','-a'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() print out # work on Unix/Li

  • Python实现Pig Latin小游戏实例代码

    前言: 本文研究的主要是Python实现pig Latin小游戏的简单代码,具体介绍如下. Pig Latin是一个语言游戏. 步骤: 1.让用户输入一个英文单词 2.确保用户输入一个有效单词 3.将单词转换成Pig Latin 4.显示转换结果 一.Input 函数:raw_input()用于输出一个字符串并等待键盘输入某字符串,最后以Enter(或Return)结束输入 original = raw_input("Enter a word:") print original 上述中

  • 对Python 网络设备巡检脚本的实例讲解

    1.基本信息 我公司之前采用的是人工巡检,但奈何有大量网络设备,往往巡检需要花掉一上午(还是手速快的话),浪费时间浪费生命. 这段时间正好在学 Python ,于是乎想(其)要(实)解(就)放(是)双(懒)手. 好了,脚本很长又比较挫,有耐心就看看吧. 需要巡检的设备如下: 设备清单 设备型号 防火墙 华为 E8000E H3C M9006 飞塔 FG3950B 交换机 华为 S9306 H3C S12508 Cisco N7K 路由器 华为 NE40E 负载 Radware RD5412 Ra

随机推荐