peewee 操作详解

python 2021-05-20 2.46K

peewee 操作详解–增删改查

[TOC]

本文中代码样例所使用的 Person 模型如下:

class Person(Model):
    Name = CharField()
    Age = IntegerField()
    Birthday = DateTimeField()
    Remarks = CharField(null=True)

一、新增

1、create

Model.create 向数据库中插入一条记录,并返回一个新的实例。

p = Person.create(Name='张三', Age=30, Birthday=date(1990, 1, 1))

2、save

save(force_insert=False, only=None)
# force_insert:是否强制插入
# only(list):需要持久化的字段,当提供此参数时,只有提供的字段被持久化。

p1 = Person(Name='王五', Age=50, Birthday=date(1970, 1, 1))
p1.save()

3、insert

insert 只插入数据而不创建模型实例,返回新行的主键。

Person.insert(Name='李四', Age=40, Birthday=date(1980, 1, 1)).execute()

4、insert_many

insert_many(rows, fields=None)
# rows:元组或字典列表,要插入的数据
# fields(list):需要插入的字段名列表。

说明:
1、当 rows 传递的是字典列表时,fields 是不需要传的,如果传了,那么,rows 中的字段在字典中必须存在,否则报错。如果没有传递 fields 参数,那么默认取所有字典的交集作为插入字段。这个也好理解,比如一个字典的键是a、b、c,一个是 b、c、d,那么就取 b、c 作为需要插入的字段。peewee 不会为缺失的字段做默认处理。
2、当 rows 传递的是元组列表时,必须指定 fields,并且 fields 中字段名的顺序跟元组一致。元组中值的数量必须大于等于 fields 中字段的数量,一般建议是保持一致。

Person.insert_many(
    [('张三', 30, date(1990, 1, 1)),
    ('李四', 40, date(1980, 1, 1)),
    ('王五', 50, date(1970, 1, 1))],
    ['Name', 'Age', 'Birthday']
).execute()

Person.insert_many([
    {'Name': '张三', 'Age': 30, 'Birthday': date(1990, 1, 1)},
    {'Name': '李四', 'Age': 40, 'Birthday': date(1980, 1, 1)},
    {'Name': '王五', 'Age': 50, 'Birthday': date(1970, 1, 1)}
]
).execute()

对于批量操作,应该放在事务中执行:

with db.atomic():
    Person.insert_many(data, fields=fields).execute()

在使用批量插入时,如果是 SQLite,SQLite3 版本必须为 3.7.11.0 或更高版本才能利用批量插入API。此外,默认情况下,SQLite 将 SQL 查询中的绑定变量数限制为 999。

SQLite 中,当批量插入的行数超过 999 时,就需要使用循环来将数据批量分组:

with db.atomic():
    for idx in range(0, len(data), 100):
        Person.insert_many(data[idx: idx+100], fields=fields).execute()

Peewee 中带有一个分块辅助函数 chunked(),使用它可以有效地将通用迭代块分块为一系列批量迭代的迭代:

from peewee import chunked

# 一次插入 100 行.
with db.atomic():
    for batch in chunked(data, 100):
        Person.insert_many(batch).execute()

5、insert_from

使用 SELECT 查询作为源 INSERT 数据。此 API 应用于 INSERT INTO … SELECT FROM … 形式的查询。

insert_from(query, fields)
# query:SELECT查询用作数据源
# fields:要将数据插入的字段,此参数必须要的 **示例:**我们将 Person 表按原结构复制一个 Person2 表出来,以做演示。

data = Person.select(Person.Name, Person.Age, Person.Birthday)
Person2.insert_from(data, ['Name', 'Age', 'Birthday']).execute()

注意: 因为是 INSERT INTO … SELECT FROM … 形式的,所以数据源的列跟要插入的列必须保持一致。

二、删除

1、delete

delete 后加 where 删除指定记录,如果不加 where,则删除全部记录。

Person.delete().where(Person.Name=='王五').execute()

2、delete_instance

删除给定的实例。 语法:

delete_instance(recursive=False, delete_nullable=False)

# 示例:
p = Person.get(Person.Name=='张三')
p.delete_instance()

delete_instance 直接执行删除了,不用调用 execute() 方法。

这两个参数都跟外键有关。我们修改一下测试用的模型。假设有这样两个模型,一个人员,一个部门,人员属于部门。

class Department(Model):
    Name = CharField()

    class Meta:
        database = db

class Person(Model):
    Name = CharField()
    Age = IntegerField()
    Birthday = DateTimeField()
    Remarks = CharField(null=True)
    Department = ForeignKeyField(Department, null=True) # 这里外键可为空和不可为空是不一样的,下面说明

    class Meta:
        database = db
  1. recursive=False 时,只删除了【部门】,【人员】没有影响,从 SQL 语句中可以看出。
d = Department.get(1)
d.delete_instance(recursive=False)

# 执行的 SQL 语句
('SELECT "t1"."id", "t1"."Name" FROM "department" AS "t1" WHERE ? LIMIT ? OFFSET ?', [1, 1, 0])
('DELETE FROM "department" WHERE ("department"."id" = ?)', [1])
  1. recursive=True ,并且外键不可为空时,会先删除【部门】下的【人员】,再删除【部门】。
d = Department.get(1)
d.delete_instance(recursive=True)

# 执行的 SQL 语句
('SELECT "t1"."id", "t1"."Name" FROM "department" AS "t1" WHERE ? LIMIT ? OFFSET ?', [1, 1, 0])
('DELETE FROM "person" WHERE ("person"."Department_id" = ?)', [1])
('DELETE FROM "department" WHERE ("department"."id" = ?)', [1])
  1. recursive=True ,并且外键可为空时,先将【人员】的【部门ID(外键字段)】置为了 NULL,再删除【部门】。
d = Department.get(1)
d.delete_instance(recursive=True)

# 执行的 SQL 语句
('SELECT "t1"."id", "t1"."Name" FROM "department" AS "t1" WHERE ? LIMIT ? OFFSET ?', [1, 1, 0])
('UPDATE "person" SET "Department_id" = ? WHERE ("person"."Department_id" = ?)', [None, 1])
('DELETE FROM "department" WHERE ("department"."id" = ?)', [1])
  1. delete_nullable 仅在 recursive=True 且外键可为空时有效,和 ③ 一样,当 delete_nullable=True 时,会删除【人员】,而不是将【人员的部门ID】置为 NULL
d = Department.get(1)
d.delete_instance(recursive=True, delete_nullable=True)

# 执行的 SQL 语句
('SELECT "t1"."id", "t1"."Name" FROM "department" AS "t1" WHERE ? LIMIT ? OFFSET ?', [1, 1, 0])
('DELETE FROM "person" WHERE ("person"."Department_id" = ?)', [1])
('DELETE FROM "department" WHERE ("department"."id" = ?)', [1])

三、修改

1、save

save() 方法可以插入一条记录,一旦模型实例具有主键,任何后续调用 save() 都将导致 UPDATE 而不是另一个 INSERT。模型的主键不会改变。

p = Person(Name='王五', Age=50, Birthday=date(1970, 1, 1))
p.save()
print(p1.id)
p.Remarks = 'abc'
p.save()
# 这个例子,第一次执行的 save 是 INSERT,第二次是 UPDATE。

这里解释一下,Person 这个模型,我并没有指定主键,peewee 会自动增加一个名为 id 的自增列作为主键。在执行第一个 save() 方法的时候,主键没值,所以执行 INSERTsave() 方法执行之后,自增列的值就返回并赋给了模型实例,所以第二次调用 save() 执行的是 UPDATE
如果模型中一开始就用 PrimaryKeyFieldprimary_key 指定了主键,那么 save 执行的永远都是 update,所以什么主键不存在则 INSERT,存在则 UPDATE 这种操作根本不存在,只能自己来写判断。

2、update

update 用于批量更新,方法相对简单,以下三种写法都可以

# 方法一
Person.update({Person.Name: '赵六', Person.Remarks: 'abc'}).where(Person.Name=='王五').execute()

# 方法二,字典形式
Person.update({'Name': '赵六', 'Remarks': 'abc'}).where(Person.Name=='张三').execute()

# 方法三
Person.update(Name='赵六', Remarks='abc').where(Person.Name=='李四').execute()

3、原子更新

看这样的一个需求,有一张表,记录博客的访问量,每次有人访问博客的时候,访问量+1。

因为懒得新建模型,我们就以 Person 模型的 Age + 1 来演示。

我们可以这样来写:

for p in Person.select():
    p.Age += 1
    p.save()

这样当然是可以实现的,但是这不仅速度慢,而且如果多个进程同时更新计数器,它也容易受到竞争条件的影响。

我们可以用 update 方法来实现。

Person.update(Age=Person.Age+1).execute()

四、查询

1、get

Model.get() 方法检索与给定查询匹配的单个实例。 语法:

get(*query, **filters)
# query:查询条件
# filters:Mapping of field-name to value for Django-style filter. 我翻遍网上文章和官方文档都没找到这玩意怎么用!

p1 = Person.get(Name='张三')
p2 = Person.get(Person.Name == '李四')
# 当获取的结果不存在时,报 Model.DoesNotExist 异常。如果有多条记录满足条件,则返回第一条。

2、get_or_none

如果当获取的结果不存在时,不想报错,可以使用 Model.get_or_none() 方法,会返回 None,参数和 get 方法一致。

3、get_by_id

对于主键查找,还可以使用快捷方法 Model.get_by_id()

Person.get_by_id(1)

4、get_or_create

Peewee 有一个辅助方法来执行“获取/创建”类型的操作: Model.get_or_create() 首先尝试检索匹配的行。如果失败,将创建一个新行。

p, created = Person.get_or_create(Name='赵六', defaults={'Age': 80, 'Birthday': date(1940, 1, 1)})
print(p, created)

# SQL 语句
('SELECT "t1"."id", "t1"."Name", "t1"."Age", "t1"."Birthday", "t1"."Remarks" FROM "person" AS "t1" WHERE ("t1"."Name" = ?) LIMIT ? OFFSET ?', ['赵六', 1, 0])
('BEGIN', None)
('INSERT INTO "person" ("Name", "Age", "Birthday") VALUES (?, ?, ?)', ['赵六', 80, datetime.date(1940, 1, 1)])

参数:
get_or_create 的参数是 **kwargs,其中 defaults 为非查询条件的参数,剩余的为尝试检索匹配的条件,这个看执行时的 SQL 语句就一目了然了。对于“创建或获取”类型逻辑,通常会依赖唯一 约束或主键来防止创建重复对象。但这并不是强制的,比如例子中,我以 Name 为条件,而 Name 并非主键。只是最好不要这样做。

返回值:
get_or_create 方法有两个返回值,第一个是“获取/创建”的模型实例,第二个是是否新创建。

5、select

使用 Model.select() 查询获取多条数据。select 后可以添加 where 条件,如果不加则查询整个表。

ps = Person.select(Person.Name, Person.Age).where(Person.Name == '张三')

select() 返回结果是一个 ModelSelect 对象,该对象可迭代、索引、切片。当查询不到结果时,不报错,返回 None。并且 select() 结果是延时返回的。如果想立即执行,可以调用 execute() 方法。

注意:where 中的条件不支持 Name='张三' 这种写法,只能是 Person.Name == '张三'

6、获取记录条数 count 方法

Person.select().count()

7、排序 order_by 方法

Person.select().order_by(Person.Age)

# 排序默认是升序排列,也可以用 + 或 asc() 来明确表示是升序排列:
Person.select().order_by(+Person.Age)
Person.select().order_by(Person.Age.asc())

# 用 - 或 desc() 来表示降序:
Person.select().order_by(-Person.Age)
Person.select().order_by(Person.Age.desc())

如要对多个字段进行排序,逗号分隔写就可以了。

五、查询运算符

当查询条件不止一个,需要使用逻辑运算符连接,而 Python 中的 and、or 在 Peewee 中是不支持的,此时我们需要使用 Peewee 封装好的运算符,如下:

逻辑符 含义 样例
& and Person.select().where((Person.Name == '张三') & (Person.Age == 30))
or Person.select().where((Person.Name == '张三') \| (Person.Age == 30))
not Person.select().where(~Person.Name == '张三')

特别注意:有多个条件时,每个条件必须用 () 括起来。

当条件全为 and 时,也可以用逗号分隔,get 和 select 中都可以:

Person.get(Person.Name == '张三', Person.Age == 30)
比较 意义
<< x IN y,其中 y 是列表或查询
>> x IS y,其中 y 为None / NULL
% x LIKE y,其中 y 可能包含通配符
** x LIKE y,其中 y 可能包含通配符
^ 异或
~ 一元否定(例如,NOT x)

Image11

注意:由于 SQLite 的 LIKE 操作默认情况下不区分大小写,因此 peewee 将使用 SQLite GLOB 操作进行区分大小写的搜索。glob 操作使用星号表示通配符,而不是通常的百分号。如果您正在使用 SQLite 并希望区分大小写的部分字符串匹配,请记住使用星号作为通配符。

解释一下,在 SQLite 中,如果希望 like 的时候区分大小写,可以这么写:

Person.select().where(Person.Remarks % 'a*')

如果不希望区分大小写,这么写:

Person.select().where(Person.Remarks ** 'a%')

六、fn 函数的应用

# 需要导入模块: from peewee import fn [as 别名]
# 或者: from peewee.fn import COUNT [as 别名]
def get_best_users(chan=None, since=datetime.timedelta(days=31), limit=None):
        """ Return the number of message by user """

        query = [schema.Message.created_at > datetime.datetime.now() - since]

        if chan:
            query.append(schema.Message.chan == chan)

        result = (schema.User
                  .select(
                      schema.User.name,
                      fn.COUNT(schema.Message.id).alias("nb_messages"),
                      fn.AVG(fn.Length(schema.Message.message)).alias("avg_messages"),
                      fn.SUM(fn.Length(schema.Message.message)).alias("len_messages"),
                  )
                  .join(schema.Message, JOIN.LEFT_OUTER)
                  .where(*query)
                  .group_by(schema.User.name)
                  .order_by(fn.SUM(fn.Length(schema.Message.message)).desc()))

        if limit:
            result.limit(limit)

        return result 
# 需要导入模块: from peewee import fn [as 别名]
# 或者: from peewee.fn import COUNT [as 别名]
def user_summary():  # noqa: D103

    form = DateRangeForm(request.args)

    if not (form.from_date.data and form.to_date.data):
        date_range = User.select(
            fn.MIN(User.created_at).alias('from_date'),
            fn.MAX(User.created_at).alias('to_date')).first()
        if date_range:
            return redirect(
                url_for(
                    "user_summary",
                    from_date=date_range.from_date.date().isoformat(),
                    to_date=date_range.to_date.date().isoformat()))

    query = (Organisation.select(Organisation.name,
                                 fn.COUNT(User.id).alias("user_count"),
                                 fn.COUNT(User.orcid).alias("linked_user_count"))
             .where(User.created_at.between(form.from_date.data, form.to_date.data)).join(
                 UserOrg, JOIN.LEFT_OUTER, on=(UserOrg.org_id == Organisation.id)).join(
                     User, JOIN.LEFT_OUTER, on=(User.id == UserOrg.user_id)).group_by(
                         Organisation.name))

    total_user_count = sum(r.user_count for r in query)
    total_linked_user_count = sum(r.linked_user_count for r in query)

    return render_template(
        "user_summary.html",
        form=form,
        query=query,
        total_user_count=total_user_count,
        total_linked_user_count=total_linked_user_count) 


关注微信公众号『极客技术之路

第一时间了解最新动态
关注博主不迷路~

极客技术之路:站内收集的部分资源来源于网络,若侵犯了您的合法权益,请联系我们删除!
分享到:
赞(0)

文章评论

0点赞 0评论 收藏 QQ分享 微博分享

极客技术之路

极客技术之路