python实用代码片段收集贴

获取一个类的所有子类

代码如下:

def itersubclasses(cls, _seen=None):
    """Generator over all subclasses of a given class in depth first order."""
    if not isinstance(cls, type):
        raise TypeError(_('itersubclasses must be called with '
                          'new-style classes, not %.100r') % cls)
    _seen = _seen or set()
    try:
        subs = cls.__subclasses__()
    except TypeError:   # fails only when cls is type
        subs = cls.__subclasses__(cls)
    for sub in subs:
        if sub not in _seen:
            _seen.add(sub)
            yield sub
            for sub in itersubclasses(sub, _seen):
                yield sub

简单的线程配合

代码如下:

import threading
is_done = threading.Event()
consumer = threading.Thread(
    target=self.consume_results,
    args=(key, self.task, runner.result_queue, is_done))
consumer.start()
self.duration = runner.run(
        name, kw.get("context", {}), kw.get("args", {}))
is_done.set()
consumer.join() #主线程堵塞,直到consumer运行结束

多说一点,threading.Event()也可以被替换为threading.Condition(),condition有notify(), wait(), notifyAll()。解释如下:

代码如下:

The wait() method releases the lock, and then blocks until it is awakened by a notify() or notifyAll() call for the same condition variable in another thread. Once awakened, it re-acquires the lock and returns. It is also possible to specify a timeout.
The notify() method wakes up one of the threads waiting for the condition variable, if any are waiting. The notifyAll() method wakes up all threads waiting for the condition variable.
Note: the notify() and notifyAll() methods don't release the lock; this means that the thread or threads awakened will not return from their wait() call immediately, but only when the thread that called notify() or notifyAll() finally relinquishes ownership of the lock.

代码如下:

# Consume one item
cv.acquire()
while not an_item_is_available():
    cv.wait()
get_an_available_item()
cv.release()
# Produce one item
cv.acquire()
make_an_item_available()
cv.notify()
cv.release()

计算运行时间

代码如下:

class Timer(object):
    def __enter__(self):
        self.error = None
        self.start = time.time()
        return self
    def __exit__(self, type, value, tb):
        self.finish = time.time()
        if type:
            self.error = (type, value, tb)
    def duration(self):
        return self.finish - self.start
with Timer() as timer:
    func()
return timer.duration()

元类

__new__()方法接收到的参数依次是:
当前准备创建的类的对象;
类的名字;
类继承的父类集合;
类的方法集合;

代码如下:

class ModelMetaclass(type):
    def __new__(cls, name, bases, attrs):
        if name=='Model':
            return type.__new__(cls, name, bases, attrs)
        mappings = dict()
        for k, v in attrs.iteritems():
            if isinstance(v, Field):
                print('Found mapping: %s==>%s' % (k, v))
                mappings[k] = v
        for k in mappings.iterkeys():
            attrs.pop(k)
        attrs['__table__'] = name # 假设表名和类名一致
        attrs['__mappings__'] = mappings # 保存属性和列的映射关系
        return type.__new__(cls, name, bases, attrs)
class Model(dict):
    __metaclass__ = ModelMetaclass
    def __init__(self, **kw):
        super(Model, self).__init__(**kw)
    def __getattr__(self, key):
        try:
            return self[key]
        except KeyError:
            raise AttributeError(r"'Model' object has no attribute '%s'" % key)
    def __setattr__(self, key, value):
        self[key] = value
    def save(self):
        fields = []
        params = []
        args = []
        for k, v in self.__mappings__.iteritems():
            fields.append(v.name)
            params.append('?')
            args.append(getattr(self, k, None))
        sql = 'insert into %s (%s) values (%s)' % (self.__table__, ','.join(fields), ','.join(params))
        print('SQL: %s' % sql)
        print('ARGS: %s' % str(args))
class Field(object):
    def __init__(self, name, column_type):
        self.name = name
        self.column_type = column_type
    def __str__(self):
        return '<%s:%s>' % (self.__class__.__name__, self.name)
class StringField(Field):
    def __init__(self, name):
        super(StringField, self).__init__(name, 'varchar(100)')
class IntegerField(Field):
    def __init__(self, name):
        super(IntegerField, self).__init__(name, 'bigint')
class User(Model):
    # 定义类的属性到列的映射:
    id = IntegerField('id')
    name = StringField('username')
    email = StringField('email')
    password = StringField('password')
# 创建一个实例:
u = User(id=12345, name='Michael', email='test@orm.org', password='my-pwd')
# 保存到数据库:
u.save()

输出如下:

代码如下:

Found model: User
Found mapping: email ==> <StringField:email>
Found mapping: password ==> <StringField:password>
Found mapping: id ==> <IntegerField:uid>
Found mapping: name ==> <StringField:username>
SQL: insert into User (password,email,username,uid) values (?,?,?,?)
ARGS: ['my-pwd', 'test@orm.org', 'Michael', 12345]

SQLAlchemy简单使用

代码如下:

# 导入:
from sqlalchemy import Column, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# 创建对象的基类:
Base = declarative_base()
# 定义User对象:
class User(Base):
    # 表的名字:
    __tablename__ = 'user'
    # 表的结构:
    id = Column(String(20), primary_key=True)
    name = Column(String(20))
# 初始化数据库连接:
engine = create_engine('mysql+mysqlconnector://root:password@localhost:3306/test') # '数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名'
# 创建DBSession类型:
DBSession = sessionmaker(bind=engine)
# 创建新User对象:
new_user = User(id='5', name='Bob')
# 添加到session:
session.add(new_user)
# 提交即保存到数据库:
session.commit()
# 创建Query查询,filter是where条件,最后调用one()返回唯一行,如果调用all()则返回所有行:
user = session.query(User).filter(User.id=='5').one()
# 关闭session:
session.close()

WSGI简单使用和Web框架Flask的简单使用

代码如下:

from wsgiref.simple_server import make_server
def application(environ, start_response):
    start_response('200 OK', [('Content-Type', 'text/html')])
    return '<h1>Hello, web!</h1>'
# 创建一个服务器,IP地址为空,端口是8000,处理函数是application:
httpd = make_server('', 8000, application)
print "Serving HTTP on port 8000..."
# 开始监听HTTP请求:
httpd.serve_forever()

了解了WSGI框架,我们发现:其实一个Web App,就是写一个WSGI的处理函数,针对每个HTTP请求进行响应。
但是如何处理HTTP请求不是问题,问题是如何处理100个不同的URL。
一个最简单和最土的想法是从environ变量里取出HTTP请求的信息,然后逐个判断。

代码如下:

from flask import Flask
from flask import request
app = Flask(__name__)
@app.route('/', methods=['GET', 'POST'])
def home():
    return '<h1>Home</h1>'
@app.route('/signin', methods=['GET'])
def signin_form():
    return '''<form action="/signin" method="post">
              <p><input name="username"></p>
              <p><input name="password" type="password"></p>
              <p><button type="submit">Sign In</button></p>
              </form>'''
@app.route('/signin', methods=['POST'])
def signin():
    # 需要从request对象读取表单内容:
    if request.form['username']=='admin' and request.form['password']=='password':
        return '<h3>Hello, admin!</h3>'
    return '<h3>Bad username or password.</h3>'
if __name__ == '__main__':
    app.run()

格式化显示json

代码如下:

print(json.dumps(data, indent=4))
# 或者
import pprint
pprint.pprint(data)

实现类似Java或C中的枚举

代码如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import itertools
import sys
class ImmutableMixin(object):
    _inited = False
    def __init__(self):
        self._inited = True
    def __setattr__(self, key, value):
        if self._inited:
            raise Exception("unsupported action")
        super(ImmutableMixin, self).__setattr__(key, value)
class EnumMixin(object):
    def __iter__(self):
        for k, v in itertools.imap(lambda x: (x, getattr(self, x)), dir(self)):
            if not k.startswith('_'):
                yield v
class _RunnerType(ImmutableMixin, EnumMixin):
    SERIAL = "serial"
    CONSTANT = "constant"
    CONSTANT_FOR_DURATION = "constant_for_duration"
    RPS = "rps"
if __name__=="__main__":
    print _RunnerType.CONSTANT

创建文件时指定权限

代码如下:

import os
def write_to_file(path, contents, umask=None):
    """Write the given contents to a file
    :param path: Destination file
    :param contents: Desired contents of the file
    :param umask: Umask to set when creating this file (will be reset)
    """
    if umask:
        saved_umask = os.umask(umask)
    try:
        with open(path, 'w') as f:
            f.write(contents)
    finally:
        if umask:
            os.umask(saved_umask)
if __name__ == '__main__':
    write_to_file('/home/kong/tmp', 'test', 31)
    # Then you will see a file is created with permission 640.
    # Warning: If the file already exists, its permission will not be changed.
    # Note:For file, default all permission is 666, and 777 for directory.

多进程并发执行

代码如下:

import multiprocessing
import time
import os
def run(flag):
    print "flag: %s, sleep 2s in run" % flag
    time.sleep(2)
    print "%s exist" % flag
    return flag
if __name__ == '__main__':
    pool = multiprocessing.Pool(3)
    iter_result = pool.imap(run, xrange(6))
    print "sleep 5s\n\n"
    time.sleep(5)
    for i in range(6):
        try:
            result = iter_result.next(600)
        except multiprocessing.TimeoutError as e:
            raise
        print result
    pool.close()
    pool.join()

运行时自动填充函数参数

代码如下:

import decorator
def default_from_global(arg_name, env_name):
    def default_from_global(f, *args, **kwargs):
        id_arg_index = f.func_code.co_varnames.index(arg_name)
        args = list(args)
        if args[id_arg_index] is None:
            args[id_arg_index] = get_global(env_name)
            if not args[id_arg_index]:
                print("Missing argument: --%(arg_name)s" % {"arg_name": arg_name})
                return(1)
        return f(*args, **kwargs)
    return decorator.decorator(default_from_global)
# 如下是一个装饰器,可以用在需要自动填充参数的函数上。功能是:
# 如果没有传递函数的deploy_id参数,那么就从环境变量中获取(调用自定义的get_global函数)
with_default_deploy_id = default_from_global('deploy_id', ENV_DEPLOYMENT)

嵌套装饰器

validator函数装饰func1,func1使用时接收参数(*arg, **kwargs),而func1又装饰func2(其实就是Rally中的scenario函数),给func2增加validators属性,是一个函数的列表,函数的接收参数config, clients, task。这些函数最终调用func1,传入参数(config, clients, task, *args, **kwargs),所以func1定义时参数是(config, clients, task, *arg, **kwargs) 
最终实现的效果是,func2有很多装饰器,每个都会接收自己的参数,做一些校验工作。

代码如下:

def validator(fn):
    """Decorator that constructs a scenario validator from given function.
    Decorated function should return ValidationResult on error.
    :param fn: function that performs validation
    :returns: rally scenario validator
    """
    def wrap_given(*args, **kwargs):
        """Dynamic validation decorator for scenario.
        :param args: the arguments of the decorator of the benchmark scenario
        ex. @my_decorator("arg1"), then args = ('arg1',)
        :param kwargs: the keyword arguments of the decorator of the scenario
        ex. @my_decorator(kwarg1="kwarg1"), then kwargs = {"kwarg1": "kwarg1"}
        """
        def wrap_validator(config, clients, task):
            return (fn(config, clients, task, *args, **kwargs) or
                    ValidationResult())
        def wrap_scenario(scenario):
            wrap_validator.permission = getattr(fn, "permission",
                                                consts.EndpointPermission.USER)
            if not hasattr(scenario, "validators"):
                scenario.validators = []
            scenario.validators.append(wrap_validator)
            return scenario
        return wrap_scenario
    return wrap_given

inspect库的一些常见用法

inspect.getargspec(func) 获取函数参数的名称和默认值,返回一个四元组(args, varargs, keywords, defaults),其中:
args是参数名称的列表;
varargs和keywords是*号和**号的变量名称;
defaults是参数默认值的列表;

inspect.getcallargs(func[, *args][, **kwds]) 绑定函数参数。返回绑定后函数的入参字典。

python中的私有属性和函数

Python把以两个或以上下划线字符开头且没有以两个或以上下划线结尾的变量当作私有变量。私有变量会在代码生成之前被转换为长格式(变为公有),这个过程叫"Private name mangling",如类A里的__private标识符将被转换为_A__private,但当类名全部以下划线命名的时候,Python就不再执行轧压。而且,虽然叫私有变量,仍然有可能被访问或修改(使用_classname__membername),所以, 总结如下:

无论是单下划线还是双下划线开头的成员,都是希望外部程序开发者不要直接使用这些成员变量和这些成员函数,只是双下划线从语法上能够更直接的避免错误的使用,但是如果按照_类名__成员名则依然可以访问到。单下划线的在动态调试时可能会方便一些,只要项目组的人都遵守下划线开头的成员不直接使用,那使用单下划线或许会更好。

(0)

相关推荐

  • 自己使用总结Python程序代码片段

    用于记录自己写的,或学习期间看到的不错的,小程序,持续更新...... **************************************************************** [例001]计算:1-2+3-4..+199-200值 复制代码 代码如下: #encoding=utf-8  #计算 1-2+3-4..+199-200值  #1+3+5+7+...199  #-2-4-6...-200  sum1  = 0  sum2  = 0  for i in range

  • python实用代码片段收集贴

    获取一个类的所有子类 复制代码 代码如下: def itersubclasses(cls, _seen=None):     """Generator over all subclasses of a given class in depth first order."""     if not isinstance(cls, type):         raise TypeError(_('itersubclasses must be cal

  • 必须收藏的php实用代码片段

    在编写代码的时候有个神奇的工具总是好的!下面这里收集了 40+ PHP 代码片段,可以帮助你开发PHP 项目. 之前已经为大家分享了<必须收藏的23个php实用代码片段>. 这些PHP 片段对于PHP 初学者也非常有帮助,非常容易学习,让我们开始学习吧- 24. 从 PHP 数据创建 CSV 文件 function generateCsv($data, $delimiter = ',', $enclosure = '"') { $handle = fopen('php://temp'

  • 10个C#程序员经常用到的实用代码片段

    1 读取操作系统和CLR的版本 OperatingSystem os = System.Environment.OSVersion; Console.WriteLine("Platform: {0}", os.Platform); Console.WriteLine("Service Pack: {0}", os.ServicePack); Console.WriteLine("Version: {0}", os.Version); Consol

  • 【经典源码收藏】jQuery实用代码片段(筛选,搜索,样式,清除默认值,多选等)

    本文实例总结了jQuery实用代码片段.分享给大家供大家参考,具体如下: //each遍历文本框 清空默认值 $(".maincenterul1").find("input,textarea").each(function () { //保存当前文本框的值 var vdefault = this.value; $(this).focus(function () { if (this.value == vdefault) { this.value = "&q

  • php实用代码片段整理

    本文整理归纳了php实用代码片段.分享给大家供大家参考,具体如下: 一 从网页中提取关键词 $meta = get_meta_tags('http://www.jb51.net/'); $keywords = $meta['keywords']; // Split keywords $keywords = explode(',', $keywords ); // Trim them $keywords = array_map( 'trim', $keywords ); // Remove emp

  • 一些实用的jQuery代码片段收集

    下边这些jQuery片段只是很少的一部分,如果您在学习过程中也遇到过一些常用的jQuery代码,欢迎分享.下边就让我们看看这些有代码片段. 1.jQuery得到用户IP: 复制代码 代码如下: $.getJSON("http://jsonip.appspot.com?callback=?", function (data) { alert("Your ip: " + data.ip); }); 2.jQuery查看图片的宽度和高度: 复制代码 代码如下: var t

  • 必须收藏的23个php实用代码片段

    在编写代码的时候有个神奇的工具总是好的!下面这里收集了 40+ PHP 代码片段,可以帮助你开发 PHP 项目. 这些 PHP 片段对于 PHP 初学者也非常有帮助,非常容易学习,让我们开始学习吧- 1. 发送 SMS 在开发 Web 或者移动应用的时候,经常会遇到需要发送 SMS 给用户,或者因为登录原因,或者是为了发送信息.下面的 PHP 代码就实现了发送 SMS 的功能. 为了使用任何的语言发送 SMS,需要一个 SMS gateway.大部分的 SMS 会提供一个 API,这里是使用 M

  • BootStrap实用代码片段之一

    如题,持续总结自己在使用BootStrap中遇到的问题,并记录解决方法,希望能帮到需要的小伙伴. 应用场景:经典上下布局中,顶部导航条固定,下部填充不显示滚动条 解决方案:导航条固定在顶部,同时为body设置内边距(padding-top),内边距为导航条高度(默认50px,可自己调整高度),html代码如下: <!--html页面布局--> <div class="container-fluid page-wrapper"> <!--导航栏-->

  • JavaScript和JQuery实用代码片段(一)

    (一)怎样用JQuery刷新一幅图片? 说明:我们都知道,当我们在请求一个资源(比如网页,图片等)的时候,如果该资源被缓存到浏览器了,那么请求返回的就是缓存的副本,不是我们希望获取的资源(该资源内容已经被更新了),此时最普遍的一个办法就是在请求的页面后面或者图片的src后面加上一个查询字符串"ran=" + Math.random(),这样就会请求到最新版本的资源啦! 代码: 复制代码 代码如下: $(imageObj).attr('src',$(imageObj).attr('src

随机推荐