python轻量级orm框架 peewee常用功能速查详情
目录
- 一、常见orm数据库框架
- 1、peewee 简单demo
- 二、Model 和 Field 关系
- 三、Model 模型
- 四、Filed 字段
- 1、字段初始化参数
- 2、字段特有参数
- 3、字段默认参数
- 4、索引
- 五、基本操作 增删改查
- 1、创建
- 2、删除
- 3、更新
- 4、查询
- 5、事务
- 6、过滤
- 7、记录分类
- 8、计数
- 9、分页
- 六、聚合查询
- 七、Scalar
- 八、窗口
- 九、复杂筛选
- 1、查询中支持的筛选运算符
- 2、筛选方法
- 3、联合查询逻辑操作
- 十、SQL 方法
- 1、SQL helper
- 2、安全和SQL注入
一、常见orm数据库框架
Django ORM
peewee
SQLAlchemy
Django ORM
优点 :
易用,学习曲线短
和Django
紧密集合,用Django
时使用约定俗成的方法去操作数据库
缺点 :
QuerySet
速度不给力,会逼我用Mysqldb
来操作原生sql语句。
Peewee
优点 :
Django
式的API
,使其易用
轻量实现,很容易和任意web框架集成
缺点 :
不支持自动化 schema
迁移
不能像Django
那样,使线上的mysql
表结构生成结构化的模型。
SQLAlchemy
优点 :
巨牛逼的API,使得代码有健壮性和适应性
灵活的设计,使得能轻松写复杂查询
缺点 :
工作单元概念不常见
重量级 API,导致长学习曲线
1、peewee 简单demo
import datetime from peewee import * db = MySQLDatabase( "test", host="127.0.0.1", port=3306, user="root", passwd="123456" ) db.connect() class BaseModel(Model): class Meta: database = db class Person(BaseModel): name = CharField() age = IntegerField() height = IntegerField() sex = BooleanField(default='male') if __name__ == "__main__": Person.create_table() # 创建 Person.create(name='tom', age=30, height=177) # 查询 res = Person.select().where(Person.name=='tom') print(res) print(res[0]) print(res[0].name) print(res[0].age) print(res[0].height) print(res[0].sex) >>>> SELECT `t1`.`id`, `t1`.`name`, `t1`.`age`, `t1`.`High`, `t1`.`sex` FROM `person` AS `t1` WHERE (`t1`.`name` = 'ljk') 1 tom 30 177 True
二、Model 和 Field 关系
在ORM对象关系数据库中 Model是一个类,映射到数据库表中就是一个表。Filed是字段,映射到表中就是字段。model
实例就是数据库中的一条记录。在peewee中Model
和Field的关系如下:
Thing | 对应关系 |
---|---|
Model 类 | 表 |
Field 实例 | 表中字段 |
Model 实例 | 表中数据 |
数据库连接和model类定义的 典型使用
import datetime from peewee import * db = SqliteDatabase('my_app.db') class BaseModel(Model): class Meta: database = db class User(BaseModel): username = CharField(unique=True) class Tweet(BaseModel): user = ForeignKeyField(User, backref='tweets') message = TextField() created_date = DateTimeField(default=datetime.datetime.now) is_published = BooleanField(default=True)
创建一个数据库实例
db = SqliteDatabase('my_app.db')
创建一个基础model类
class BaseModel(Model): class Meta: database = db
定义一个用于建立数据库连接的基模类是一种推荐的做法,因为将不必为后续表指定数据库。
定义一个普通 model 类
class User(BaseModel): username = CharField(unique=True)
模型定义使用的是其他流行的orm(如SQLAlchemy
或Django
)中看到的声明式风格。因为User继承了BaseModel
类,所以User类可以继承数据库连接。
User已经明确定义了一个具有唯一约束的用户名列。因为我们没有指定主键,peewee 会自动添加一个自增整数主键字段,名为 id。没有指定主键的表peewee会自动创建一个名字为id的自增主键。
三、Model 模型
为了不污染model的命名空间,model的配置放在特殊的元属性类中。这是从Django的框架中借鉴过来的。
contacts_db = SqliteDatabase('contacts.db') class Person(Model): name = CharField() class Meta: database = contacts_db
在简单model示例中,你会注意到,我们创建了一个定义数据库的BaseModel,然后扩展了它。这是定义数据库和创建模型的首选方法。
你可以通过 ModelClass._meta
来使用:
Traceback (most recent call last): File "<stdin>", line 1, in <module> AttributeError: type object 'Person' has no attribute 'Meta' >>> Person._meta <peewee.modeloptions object="" at="" 0x7f51a2f03790="">
ModelOptions
实现了几个查看model metadata的方法:
{'id': <peewee.autofield object="" at="" 0x7f51a2e92750="">, 'name': <peewee.charfield object="" at="" 0x7f51a2f0a510="">} >>> Person._meta.primary_key <peewee.autofield object="" at="" 0x7f51a2e92750=""> >>> Person._meta.database <peewee.sqlitedatabase object="" at="" 0x7f519bff6dd0="">
Model
在ORM数据中就是一张表,那么表的属性可以有如下选项。它们是被定义在Meta中元数据。
Option | Meaning | 是否可继承? |
---|---|---|
database | 指定表创建依附的数据库 | yes |
table_name | 表名 | no |
table_function | 生成表名的函数 | yes |
indexes | 多行索引 | yes |
primary_key | 主键 | yes |
constraints | 表约束的列表 | yes |
schema | 模型的数据库架构 | yes |
only_save_dirty | 调用model.save()时,仅保存脏字段,指定字段? | yes |
options | 创建表扩展的选项字典 | yes |
table_settings | 在右括号后设置字符串的列表 | yes |
temporary | 指示临时表 | yes |
legacy_table_names | 使用旧表名生成(默认情况下启用) | yes |
depends_on | 指示此表依赖于另一个表进行创建 | no |
without_rowid | 指示表不应具有rowid(仅限SQLite) | no |
strict_tables | 指示严格的数据类型(仅限SQLite,3.37+) | yes |
四、Filed 字段
Field
类是用来将Model属性映射到数据库列。每个字段类型都有一个相应的SQL存储类,将python
数据类型转化为基本的存储类型。
当创建Model类时,fields被定义成类的属性。它看起来和django的数据库框架很类似。
class User(Model): username = CharField() join_date = DateTimeField() about_me = TextField()
在上面的例子中,因为没有field有主键属性primary_key=True,所以会创建一个名字是id的自增主键。
peewee中可用的字段包括:
字段类型 | Sqlite | Postgresql | MySQL |
---|---|---|---|
AutoField | integer | serial | integer |
BigAutoField | integer | bigserial | bigint |
IntegerField | integer | integer | integer |
BigIntegerField | integer | bigint | bigint |
SmallIntegerField | integer | smallint | smallint |
IdentityField | not supported | int identity | not supported |
FloatField | real | real | real |
DoubleField | real | double precision | double precision |
DecimalField | decimal | numeric | numeric |
CharField | varchar | varchar | varchar |
FixedCharField | char | char | char |
TextField | text | text | text |
BlobField | blob | bytea | blob |
BitField | integer | bigint | bigint |
BigBitField | blob | bytea | blob |
UUIDField | text | uuid | varchar(40) |
BinaryUUIDField | blob | bytea | varbinary(16) |
DateTimeField | datetime | timestamp | datetime |
DateField | date | date | date |
TimeField | time | time | time |
TimestampField | integer | integer | integer |
IPField | integer | bigint | bigint |
BooleanField | integer | boolean | bool |
BareField | untyped | not supported | not supported |
ForeignKeyField | integer | integer | integer |
1、字段初始化参数
所有字段类型接受的参数及其默认值
null = False
允许空值index = False
创建索引unique = False
创建唯一索引column_name = None
显式指定数据库中的列名default = None
默认值,可以使任意值或可调用对象primary_key = False
指明主键constraints = None
约束条件sequence = None
序列名字(如果数据库支持)collation = None
排序字段unindexed = False
虚表上的字段不应该被索引choices = None
两种可选项:value display
help_text = None
帮助说明字段。表示此字段的任何有用文本的字符串verbose_name = None
表示此字段的用户友好名称的字符串index_type = None
索引类型
2、字段特有参数
在一些字段中有些自己特有的参数,如下:
字段类型 | 特有参数 |
---|---|
CharField | max_length |
FixedCharField | max_length |
DateTimeField | formats |
DateField | formats |
TimeField | formats |
TimestampField | resolution, utc |
DecimalField | max_digits, decimal_places, auto_round, rounding |
ForeignKeyField | model, field, backref, on_delete, on_update, deferrable lazy_load |
BareField | adapt |
3、字段默认参数
peewee
可以为每一个字段提供默认值,比如给intergerField 默认值0而不是NULL。你可以申明字段时指定默认值:
class Message(Model): context = TextField() read_count = IntegerField(default=0)
在某些情况下,默认值是动态的会更有意义。一个可能的场景就是当前时间。Peewee
允许您在这些情况下指定一个函数,该函数的返回值将在创建对象时使用。注意,使用时只提供了函数,并不需要实际调用它。
class Message(Model): context = TextField() timestamp = DateTimeField(default=datetime.datetime.now)
如果你正在使用一个接受可变类型(list, dict等)的字段,并想提供一个默认值。将默认值包装在一个简单的函数中是个好主意,这样,多个模型实例就不会共享对同一底层对象的引用。
def house_defaults(): return {'beds': 0, 'baths': 0} class House(Model): number = TextField() street = TextField() attributes = JSONField(default=house_defaults)
4、索引
peewee可以通过单列索引和多列索引。可选地包括UNIQUE
约束。Peewee还支持对模型和字段的用户定义约束。
单列索引
单列索引使用字段初始化参数定义。下面的示例在用户名字段上添加一个惟一索引,在电子邮件字段上添加一个普通索引
class User(Model): username = CharField(unique=True) email = CharField(index=True)
在列上添加用户定义的约束。你可以使用constraints参数。例如,您可能希望指定一个默认值,或者添加一个CHECK约束
class Product(Model): name = CharField(unique=True) price = DecimalField(constraints=[Check('price < 10000')]) created = DateTimeField( constraints=[SQL("DEFAULT (datetime('now'))")])
多列索引
可以使用嵌套元组将多列索引定义为元属性。每个表的索引是一个2元组,第一部分是索引字段名称的元组,可以有多个字段,第二部分是一个布尔值,指示索引是否应该唯一。
class Transaction(Model): from_acct = CharField() to_acct = CharField() amount = DecimalField() date = DateTimeField() class Meta: indexes = ( # create a unique on from/to/date (('from_acct', 'to_acct', 'date'), True), # create a non-unique on from/to (('from_acct', 'to_acct'), False), )
记住,如果索引元组只包含一项,则添加末尾逗号
五、基本操作 增删改查
peewee
中关于增删改查的基本操作方法如下:
增 :
- create():最常用创建,返回创建实例
- save():第一次执行的save是插入,第二次是修改
- insert: 插入数据,不创建数据库实例。返回id
- insert_many: 批量插入
- bulk_create:批量插入,类似于insert_many。可指定单次插入的数量
- batch_commit: 自动添加了一个事务,然后一条条的插入
- insert_from: 从另一个表中查询的数据作为插入的数据
删除 :
- delete().where().execute()
- delete_instance() 直接执行删除了,不用调用execute() 方法
修改 :
- save(): 第一次执行的save是插入,第二次是修改
- update() 用于多字段更新
查询 :
- Model.get(): 检索与给定查询匹配的单个实例。报 Model.DoesNotExist 异常。如果有多条记录满足条件,则返回第一条
- get_or_none() :与get使用方法相同。区别是找不到结果时不会报错
- get_by_id() :通过主键查找,是一种快捷方式
- Model['id_num']: 和上面的get_by_id一样是通过主键查找。
- get_or_create(): 首先查询,如果查不到将创建一个新的记录
- select() 查询多条数据
1、创建
单条插入
你可以用 Model.create()
创建一个新的实例。这个方法接收关键字参数,参数要和表定义的字段一致。返回值是新的实例
>>> User.create(username='Charlie') <__main__.User object at 0x2529350>
批量插入
有几种方法可以快速加载大量数据,缺乏经验的做法是在循环中调用Model.create来创建
data_source = [ {'field1': 'val1-1', 'field2': 'val1-2'}, {'field1': 'val2-1', 'field2': 'val2-2'}, # ... ] for data_dict in data_source: MyModel.create(**data_dict)
上面的方法比较慢的原因有几个:
- 如果没有在事务中装饰循环,那么每个对
create
()的调用都发生在它自己的事务中。这将会非常缓慢 - 必须生成每个
InsertQuery
并将其解析为SQL - 需要原生SQL语句传入到数据库中解析
- 检索最后一
个insert id
,这在某些情况下会导致执行额外的查询
可以通过一个简单的装饰: atomic
来大幅度提高速度
# This is much faster. with db.atomic(): for data_dict in data_source: MyModel.create(**data_dict)
上面的代码仍然没有解决2、3、4这三点。我们可以通过 insert_many
带来一个大的速度提升。这个方法接收多列元组或字典,然后在一次SQL语句中插入多行数据。
data_source = [ {'field1': 'val1-1', 'field2': 'val1-2'}, {'field1': 'val2-1', 'field2': 'val2-2'}, # ... ] # Fastest way to INSERT multiple rows. MyModel.insert_many(data_source).execute()
insert_many()
方法还接收多行元组,同时需要提供一个对应的字段。
# We can INSERT tuples as well... data = [('val1-1', 'val1-2'), ('val2-1', 'val2-2'), ('val3-1', 'val3-2')] # But we need to indicate which fields the values correspond to. MyModel.insert_many(data, fields=[MyModel.field1, MyModel.field2]).execute()
在装饰中批量插入是一个好的方法。
# You can, of course, wrap this in a transaction as well: with db.atomic(): MyModel.insert_many(data, fields=fields).execute()
插入大量数据
在大量数据的插入场景下,根据数据源中的行数,您可能需要将其分解为多个块。SQLite
通常有999
或32766
的限制
您可以编写一个循环来将数据批处理成块(在这种情况下,强烈建议您使用事务)
# Insert rows 100 at a time. with db.atomic(): for idx in range(0, len(data_source), 100): MyModel.insert_many(data_source[idx:idx+100]).execute()
peewwee
提供了一个chunked
函数帮助你高效的将普通可迭代对象拆分成为可批处理对象。
from peewee import chunked # Insert rows 100 at a time. with db.atomic(): for batch in chunked(data_source, 100): MyModel.insert_many(batch).execute()
Model.bulk_create()
的行为有点像insert_many()
,但是可以用来插入没有保存的数据库实例,并且可以指定每次插入的数量。如一共插入345,如果指定了一次插入100条记录,那么就是4次插入,3 * 100 + 1 * 45
什么叫没有保存的数据库实例呢?就是类似于 User(username='kk')
,创建的数据库实例。
# Read list of usernames from a file, for example. with open('user_list.txt') as fh: # Create a list of unsaved User instances. users = [User(username=line.strip()) for line in fh.readlines()] # Wrap the operation in a transaction and batch INSERT the users # 100 at a time. with db.atomic(): User.bulk_create(users, batch_size=100)
bulk_update()
和 bulk_create
类似,可以用来插入没有保存的数据库实例,自动添加了一个事务,然后一条条的插入
# List of row data to insert. row_data = [{'username': 'u1'}, {'username': 'u2'}, ...] # Assume there are 789 items in row_data. The following code will result in # 8 total transactions (7x100 rows + 1x89 rows). for row in db.batch_commit(row_data, 100): User.create(**row)
从另一个表批量装载
Model.insert_from()
如果要批量插入的数据存储在另一个表中,还可以创建源为SELECT查询的INSERT查询。
res = (TweetArchive .insert_from( Tweet.select(Tweet.user, Tweet.message), fields=[TweetArchive.user, TweetArchive.message]) .execute())
2、删除
要删除单个模型实例,可以使用model.delete_instance()快捷方式。delete_instance()将删除给定的模型实例,并且可以选择递归地删除任何依赖对象(通过指定recursive=True)。
删除一个记录:Model.delete_instance()
删除任意记录:Model.delete()
3、更新
save()
:单个更新
一旦模型实例有了主键,随后对save()的任何调用都将导致一个UPDATE而不是另一个INSERT。模型的主键不会改变
>>> user.save() # save() returns the number of rows modified. 1 >>> user.id 1 >>> user.save() >>> user.id 1 >>> huey.save() 1 >>> huey.id 2
update
:批量更新
接受关键字参数,其中键对应于模型的字段名称
>>> today = datetime.today() >>> query = Tweet.update(is_published=True).where(Tweet.creation_date < today) >>> query.execute() # Returns the number of rows that were updated. 4
4、查询
单条记录查询
你可以通过Model.get()
方法查询到给条件的数据。如果是通过主键查找,也可以用一个快捷方法 Model.get_by_id()。
此方法是使用给定查询调用Model.select()
的快捷方式,但将结果集限制为一行。需要注意的是使用get()方法,如果没有找到匹配的数据会抛出错误:DoesNotExist
get
>>> User.get(User.id == 1) <__main__.User object at 0x25294d0> >>> User.get_by_id(1) # Same as above. <__main__.User object at 0x252df10> >>> User[1] # Also same as above. <__main__.User object at 0x252dd10> >>> User.get(User.id == 1).username u'Charlie' >>> User.get(User.username == 'Charlie') <__main__.User object at 0x2529410> >>> User.get(User.username == 'nobody') UserDoesNotExist: instance matching query does not exist: SQL: SELECT t1."id", t1."username" FROM "user" AS t1 WHERE t1."username" = ? PARAMS: ['nobody']
单条记录查询方法:
- Model.get()
- Model.get_by_id()
- Model.get_or_none() - if no matching row is found, return None.
- Model.select()
- SelectBase.get()
- SelectBase.first() - return first record of result-set or None.
查询或创建:
Model.get_or_create() 它首先尝试检索匹配的行。如果失败,将创建一个新行。
通常,可以依赖唯一约束或主键来防止创建重复对象。例如,假设我们希望使用示例用户模型实现注册新用户帐户。用户模型对用户名字段有唯一的约束,因此我们将依赖数据库的完整性保证,以确保不会出现重复的用户名:
try: with db.atomic(): return User.create(username=username) except peewee.IntegrityError: # `username` is a unique column, so this username already exists, # making it safe to call .get(). return User.get(User.username == username)
上面的例子首先尝试创建,然后回退到查询,依靠数据库来强制执行唯一约束。
如果您希望首先尝试检索记录,可以使用get_or_create()。
该函数返回一个2元组,其中包含实例和一个布尔值,该值指示对象是否被创建。
user, created = User.get_or_create(username=username) person, created = Person.get_or_create( first_name=first_name, last_name=last_name, defaults={'dob': dob, 'favorite_color': 'green'})
查询多行记录:
可以通过Model.select()
获取多行数据。peewee允许你迭代这些数据,同时也可以索引和切片。
>>> query = User.select() >>> [user.username for user in query] ['Charlie', 'Huey', 'Peewee'] >>> query[1] <__main__.User at 0x7f83e80f5550> >>> query[1].username 'Huey' >>> query[:2] [<__main__.User at 0x7f83e80f53a8>, <__main__.User at 0x7f83e80f5550>]
select()是
很智能的,在查询一次的前提下可以多次迭代,切片,下标取值等。
在缓存结果时,同一查询的后续迭代不会命中数据库。要禁用此行为(以减少内存使用),请在迭代时调用Select.iterator()。
除了返回模型实例外,Select查询还可以返回字典、元组和命名元组。根据您的用例,您可能会发现将行作为字典使用更容易
>>> query = User.select().dicts() >>> for row in query: ... print(row) {'id': 1, 'username': 'Charlie'} {'id': 2, 'username': 'Huey'} {'id': 3, 'username': 'Peewee'}
iterator() :
不缓存查询结果
默认情况下,peewee将缓存迭代Select查询时返回的行。这是一种优化,允许多次迭代以及索引和切片,而不会导致额外的查询。但是,当您计划在大量行上进行迭代时,这种缓存可能会有问题。
为了减少内存的消耗,使用iterator()方法。这个方法允许返回结果不缓存数据。使用更少的内存。
stats = Stat.select() # Our imaginary serializer class serializer = CSVSerializer() # Loop over all the stats and serialize. for stat in stats.iterator(): serializer.serialize_object(stat)
对于简单的查询,您可以通过将行作为字典返回来进一步提高速度。namedtuples或元组。以下方法可用于任何Select查询,以更改结果行类型。
dicts()
namedtuples()
tuples()
objects :
将多个查询表放在一个实例中
当对包含多个表中的列的大量行进行迭代时,peewee将为返回的每一行构建查询模型。对于复杂查询,此操作可能很慢。例如,如果我们选择一个tweet列表以及tweet作者的用户名和头像,Peewee必须为每一行创建两个对象(tweet和用户)。除了上述行类型之外,还有第四个方法objects(),它将作为模型实例返回行,但不会分解模型查询。
query = (Tweet .select(Tweet, User) # Select tweet and user data. .join(User)) # Note that the user columns are stored in a separate User instance # accessible at tweet.user: for tweet in query: print(tweet.user.username, tweet.content) # Using ".objects()" will not create the tweet.user object and assigns all # user attributes to the tweet instance: for tweet in query.objects(): print(tweet.username, tweet.content)
为了获得最佳性能,您可以执行查询,然后使用底层数据库游标对结果进行迭代。
Database.execute()。
接受查询对象,执行查询,并返回DB-API 2.0游标对象。光标将返回原始行元组:
query = Tweet.select(Tweet.content, User.username).join(User) cursor = database.execute(query) for (content, username) in cursor: print(username, '->', content)
5、事务
数据库事务
(Transaction)
是一种机制,包含了一组数据库操作命令
事务把所有的命令作为一个整体一起向系统提交或撤销操作请求,即这一组数据库命令要么都执行,要么都不执行,因此事务是一个不可分割的工作逻辑单元。
事务具有 4 个特性,即原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability
),这 4 个特性通常简称为 ACID。
peewee事务
Peewee实现事务的方法是 Database.atomic()
方法,非常简单
当事务执行成功之后,它会自动commit(),不需要我们手动调。当事务的代码块中抛出异常时,它会自动调用rollback(),将数据库状态恢复到操作之前,保证要么命令全部执行,要么全部不执行。
Peewee中实现事务有两种使用方式,一种是将atomic当做Context manager使用,另外一种将atomic当修饰器使用。
Context manager
with db.atomic(): for data_dict in data_source: MyModel.create(**data_dict)
装饰器
@db.atomic() def insert_data() for data_dict in data_source: MyModel.create(**data_dict)
事务其他特性:
- 除了自动commit()和rollback()之外,也可以手动调用commit()和rollback()方法
- 事务支持嵌套使用
- 在一个事务中对数据库操作能够有效减少事务的耗时,增加操作效率
6、过滤
您可以使用普通的python操作符过滤特定的记录。
>>> user = User.get(User.username == 'Charlie') >>> for tweet in Tweet.select().where(Tweet.user == user, Tweet.is_published == True): ... print(tweet.user.username, '->', tweet.message) ... Charlie -> hello world Charlie -> this is fun >>> for tweet in Tweet.select().where(Tweet.created_date < datetime.datetime(2011, 1, 1)): ... print(tweet.message, tweet.created_date) ... Really old tweet 2010-01-01 00:00:00 ... print(tweet.message) hello world this is fun look at this picture of my food
7、记录分类
给返回的数据排序,可以使用order_by
普通使用:
>>> for t in Tweet.select().order_by(Tweet.created_date): ... print(t.pub_date)
倒序排列:
可以使用desc或者 - 号
Tweet.select().order_by(Tweet.created_date.desc()) Tweet.select().order_by(-Tweet.created_date) # Note the "-" prefix.
正序排列:
User.select().order_by(+User.username)
高级使用:
对计算值进行排序时,可以包括必要的SQL表达式,也可以引用指定给该值的别名。
query = (User .select(User.username, fn.COUNT(Tweet.id).alias('num_tweets')) .join(Tweet, JOIN.LEFT_OUTER) .group_by(User.username))
您可以使用select子句中使用的相同计数表达式进行订购。在下面的示例中,我们按tweet ID的COUNT()降序排序:
query = (User .select(User.username, fn.COUNT(Tweet.id).alias('num_tweets')) .join(Tweet, JOIN.LEFT_OUTER) .group_by(User.username) .order_by(fn.COUNT(Tweet.id).desc()))
或者,可以在select
子句中引用指定给计算值的别名。这种方法的优点是易于阅读。请注意,我们不是直接引用命名别名,而是使用SQL帮助程序对其进行包装:
query = (User .select(User.username, fn.COUNT(Tweet.id).alias('num_tweets')) .join(Tweet, JOIN.LEFT_OUTER) .group_by(User.username) .order_by(SQL('num_tweets').desc()))
同样,也可以使用如上
ntweets = fn.COUNT(Tweet.id) query = (User .select(User.username, ntweets.alias('num_tweets')) .join(Tweet, JOIN.LEFT_OUTER) .group_by(User.username) .order_by(ntweets.desc())
8、计数
可以使用count来计算返回数量
>>> Tweet.select().count() 100 >>> Tweet.select().where(Tweet.id > 50).count() 50
9、分页
paginate()
方法可以很简单的获取一个分页的数据。paginate有两个参数:page_number 和 items_per_page。第一个参数是取回数据的页数;第二个参数是每一页多少元素。这两个参数加起来才能完成分页
>>> for tweet in Tweet.select().order_by(Tweet.id).paginate(2, 10): ... print(tweet.message) ... tweet 10 tweet 11 tweet 12 tweet 13 tweet 14 tweet 15 tweet 16 tweet 17 tweet 18 tweet 19
分页的功能也可以用 limit() 和 offset() 来实现
Tweet.select().order_by(Tweet.id).offset(10).limit(10)
offset(10) 跳过10个记录
limit(10) 取10个记录
六、聚合查询
聚合查询:对查询出来的结果进一步处理,包括统计,分组,求最大值,求平均值等。
聚合常用的函数:
- COUNT:计算表中的记录数(行数)
- SUM:计算表中数值列中数据的合计值
- AVG:计算表中数值列中数据的平均值
- MAX:求出表中任意列中数据的最大值
- MIN:求出表中任意列中数据的最小值
用于汇总的函数称为聚合函数或者聚集函数。所谓聚合,就是将多行汇总为一行。实际上,所有的聚合函数都是这样,输入多行输出一行。
聚合函数的使用:
mysql> select * from person; +----+------+-----+------+-----+ | id | name | age | High | sex | +----+------+-----+------+-----+ | 1 | ljk | 30 | 177 | 1 | | 2 | aghj | 23 | 168 | 1 | +----+------+-----+------+-----+ 2 rows in set (0.00 sec) ************************************ * 聚合函数 * ************************************ mysql> select count(*) from person; +----------+ | count(*) | +----------+ | 2 | +----------+ 1 row in set (0.00 sec) ---------------------------------------- mysql> select sum(age) from person; +----------+ | sum(age) | +----------+ | 53 | +----------+ 1 row in set (0.00 sec) ---------------------------------------- mysql> select avg(high) from person; +-----------+ | avg(high) | +-----------+ | 172.5000 | +-----------+ 1 row in set (0.00 sec) ---------------------------------------- mysql> select max(high) from person; +-----------+ | max(high) | +-----------+ | 177 | +-----------+ 1 row in set (0.00 sec) mysql> select * from person; +----+------+-----+------+-----+ | id | name | age | High | sex | +----+------+-----+------+-----+ | 1 | ljk | 30 | 177 | 1 | | 2 | aghj | 23 | 168 | 1 | | 3 | 0 | 22 | 165 | 0 | +----+------+-----+------+-----+ 3 rows in set (0.00 sec)
mysql> select avg(High) from person group by sex; +-----------+ | avg(High) | +-----------+ | 172.5000 | | 165.0000 | +-----------+ 2 rows in set (0.00 sec) # 使用having对分组的数据筛选 mysql> select avg(High) as high from person group by sex having high > 170; +----------+ | high | +----------+ | 172.5000 | +----------+ 1 row in set (0.00 sec)
where :
分组之前筛选数据
where 子句的作用是在对查询结果进行分组前,将不符合where条件的行去掉,即在分组之前过滤数据,where条件中不能包含聚组函数,使用where条件过滤出特定的行。
having :
对分组之后筛选分组的数据
having 子句的作用是筛选满足条件的组,即在分组之后过滤数据,条件中经常包含聚组函数,使用having 条件过滤出特定的组,也可以使用多个分组标准进行分组。
总结一下过滤的顺序
on->join->where->group by->having
分组
查询用户以及每个人拥有的tweet账号数量。这里使用了group_by,将结果根据User表分类。
query = (User .select(User, fn.Count(Tweet.id).alias('count')) .join(Tweet, JOIN.LEFT_OUTER) .group_by(User))
假设有如下数据库,一个多对多的关系。
class Photo(Model): image = CharField() class Tag(Model): name = CharField() class PhotoTag(Model): photo = ForeignKeyField(Photo) tag = ForeignKeyField(Tag)
查询Tag记录,按照Tag分组,筛选出每组Tag里Photo数量超过5个的记录。
query = (Tag .select() .join(PhotoTag) .join(Photo) .group_by(Tag) .having(fn.Count(Photo.id) > 5))
HAVING 子句可以让我们筛选分组后的各组数据。
HAVING,它与 GROUP BY 配合使用,为聚合操作指定条件。
WHERE 子句只能指定行的条件,而不能指定组的条件。所以当数据分组之后就需要 HAVING 对分组的数据筛选。
具体区别:
where 用在group_by前,having用在group_by之后。
聚合函数(avg、sum、max、min、count),不能作为条件放在where之后,但可以放在having之后
七、Scalar
对查询出来的数据做处理
可以通过调用Query.scalar()来检索标量值。例如
>>> Employee.select( ... fn.Min(Employee.salary), fn.Max(Employee.salary) ... ).scalar(as_tuple=True) (30000, 50000)
您可以通过传递来检索多个标量值
>>> Employee.select( ... fn.Min(Employee.salary), fn.Max(Employee.salary) ... ).scalar(as_tuple=True) (30000, 50000)
八、窗口
窗口函数是指对作为SELECT查询一部分处理的数据滑动窗口进行操作的聚合函数。窗口功能可以执行以下操作:
对结果集的子集执行聚合。
计算一个运行总数。
排名结果。
将行值与前面(或后面!)行中的值进行比较。
peewee支持SQL窗口函数,可以通过调用Function.over()并传入分区或排序参数来创建这些函数。
九、复杂筛选
peewee支持以下类型的比较
1、查询中支持的筛选运算符
Comparison | Meaning |
---|---|
== | x equals y |
< | x is less than y |
<= | x is less than or equal to y |
> | x is greater than y |
>= | x is greater than or equal to y |
!= | x is not equal to y |
<< | x IN y, where y is a list or query |
>> | x IS y, where y is None/NULL |
% | x LIKE y where y may contain wildcards |
** | x ILIKE y where y may contain wildcards |
^ | x XOR y |
~ | Unary negation (e.g., NOT x) |
2、筛选方法
因为用完了要重写的操作符,所以有一些额外的查询操作可以作为方法使用
Method | Meaning |
---|---|
.in_(value) | 查询在范围内 |
.not_in(value) | 查询不在范围内 |
.is_null(is_null) | 为空或不为空。接受布尔参数 |
.contains(substr) | 通配符搜索子字符串 |
.startswith(prefix) | 查询以prefix开头的数据 |
.endswith(suffix) | 查询以prefix结尾的数据 |
.between(low, high) | 查询在low和high中间的值 |
.regexp(exp) | 正则表达式匹配匹配的数据,贪婪模式 |
.iregexp(exp) | 正则表达式匹配匹配的数据,非贪婪模式 |
.bin_and(value) | 二进制加 |
.bin_or(value) | 二进制或 |
.concat(other) | Concatenate two strings or objects using ||. |
.distinct() | 标记重复的数据 |
.collate(collation) | 指定具有给定排序规则的列 |
.cast(type) | 将列的值强制转换为给定类型 |
3、联合查询逻辑操作
使用逻辑操作的联合查询
Operator | Meaning | Example |
---|---|---|
& | AND | (User.is_active == True) & (User.is_admin == True) |
| | OR | (User.is_admin) | (User.is_superuser) |
~ | NOT (unary negation) | ~(User.username.contains('admin')) |
# Find the user whose username is "charlie". User.select().where(User.username == 'charlie') # Find the users whose username is in [charlie, huey, mickey] User.select().where(User.username.in_(['charlie', 'huey', 'mickey'])) Employee.select().where(Employee.salary.between(50000, 60000)) Employee.select().where(Employee.name.startswith('C')) Blog.select().where(Blog.title.contains(search_string))
请注意,实际的比较用括号括起来。 Python 的运算符优先级要求将比较括在括号中。
# Find any users who are active administrations.
User.select().where(
(User.is_admin == True) &
(User.is_active == True))
可能你尝试使用python语法中的in and or 和not操作,但是在查询中是不生效的。所有的操作返回都是一个布尔值。
建议如下:
- 用 .in_() 和 .not_in() 替换 in和 not in
- 用&替换and
- 用|替换or
- 用~替换not
- 用.is_null()替换 is None 或 == None
十、SQL 方法
SQL方法,如 like , sum 等,可以通过 fn 来表达
从peewee中导入fn: from peewee import fn
query = (User .select(User, fn.COUNT(Tweet.id).alias('tweet_count')) .join(Tweet, JOIN.LEFT_OUTER) .group_by(User) .order_by(fn.COUNT(Tweet.id).desc())) for user in query: print('%s -- %s tweets' % (user.username, user.tweet_count))
fn可以表达任何SQL方法,它的参数可以是字段,值,子查询甚至嵌套函数
基础使用
- fn.AVG() 返回指定列的平均值,NULL值不包括在计算中。
- fn.SUM() 返回指定列的数目,NULL值不包括在计算中。
- fn.MIN() 返回指定列的最小值,NULL值不包括在计算中。
- fn.MAX() 返回指定列的最大值,NULL值不包括在计算中。
- fn.DATE() 返回指定日期时间格式列的日期格式
- fn.DECIMAL(10, 2) ===> decimal(10,2)中的“2”表示小数部分的位数
进阶使用
- fn.to_char() 返回指定列格式化后的字符串 e.g.: fn.to_char(18.88, '99.999') ===> 18.888; fn.to_char(model.field, '')。
- fn.char_length(str) 返回字符串字符数
- fn.array_agg() 接受一组值并返回一个数组。
- fn.array_agg(model.name).order_by(model.id.asc()) # array_agg(name order by id asc)
- fn.rank().over(partition_by=[field1, field2, or aggregation_field1], order_by=[fn.SUM(Booking.slots).desc()]) 实现rank() over(partition by filed order by filed)分区功能。
- fn.length() 返回指定列的长度。也可应用于order_by。e.g.: .order_by(fn.length(model.field).asc())。
- fn.CONCAT() 返回合并的字符串(CONCAT一定要大写,小写的concat用法不一样)。fn.CONCAT(model.id, '-', model.name) ===> '188-张三'
1、SQL helper
有时,您可能想在sql中传一些任意的sql语句。您可以使用特殊的SQL类来实现这一点
# We'll query the user table and annotate it with a count of tweets for # the given user query = (User .select(User, fn.Count(Tweet.id).alias('ct')) .join(Tweet) .group_by(User)) # Now we will order by the count, which was aliased to "ct" query = query.order_by(SQL('ct')) # You could, of course, also write this as: query = query.order_by(fn.COUNT(Tweet.id))
使用peewee执行手工SQL语句有两种方法
- Database.execute_sql() 用于执行任何类型的查询
- RawQuery 执行SELECT查询并返回模型实例
query = MyModel.raw('SELECT * FROM my_table WHERE data = %s', user_data) query.execute_sql()
2、安全和SQL注入
默认情况下,peewee将参数化查询,因此用户传入的任何参数都将被转义。
请确保将任何用户定义的数据作为查询参数传入,而不是作为实际SQL查询的一部分传入:
query = MyModel.raw('SELECT * FROM my_table WHERE data = %s' % (user_data,)) # Good. `user_data` will be treated as a parameter to the query. query = MyModel.raw('SELECT * FROM my_table WHERE data = %s', user_data) # Bad! DO NOT DO THIS! query = MyModel.select().where(SQL('Some SQL expression %s' % user_data)) # Good. `user_data` will be treated as a parameter. query = MyModel.select().where(SQL('Some SQL expression %s', user_data))
MySQL和Postgresql
使用“%s”表示参数。另一方面,SQLite使用“?”。请确保使用适合数据库的字符。还可以通过检查Database.param
来查找此参数。
到此这篇关于python轻量级orm框架 peewee常用功能速查详情的文章就介绍到这了,更多相关python轻量级orm框架 peewee常用功能速查内容请搜索我们以前的文