深入探讨Django最强大的抽象——从字段类型和关系到QuerySet内部结构、F表达式、注解以及生产级优化策略。
原文发布于:https://alansomathewdev.blogspot.com/2026/03/understanding-django-models-and-orm.html
目录
- 引言
- 在生产系统中为何重要
- 核心概念
- 架构设计
- 逐步实现
- 代码示例
- 性能优化
- 安全最佳实践
- 常见开发者错误
- 真实生产用例
- 结论
引言
Django ORM 可以说是 Django 开发者工具包中最重要的工具。它是你的 Python 对象与关系数据库之间的桥梁——一个将方法链转换为 SQL、将关系声明转换为 JOIN 子句、以及将 Python 异常转换为数据库约束违规的翻译层。
从表面上看,ORM 看起来 deceptively simple。你定义一个类,声明一些字段,然后 Django 创建数据库表。你调用 Model.objects.filter() 并返回对象。不需要 SQL。进来,出去。
但这种简单性对于没有准备的人来说是一个陷阱。Django ORM 是一个极其懒惰的系统——它将数据库执行推迟到最后一刻,在接触数据库之前在内存中构建查询计划,并暴露出丰富的表达语言,当理解后,可以将复杂的业务逻辑推送到数据库中,这样它才是合适的。当误解时,它会在每个请求中生成数十或数百个冗余查询,将整个表加载到内存中,并创建随着数据增长而非线性恶化的性能特征。
本指南从基础开始介绍Django ORM。我们将追踪一个QuerySet从Python方法调用到SQL执行的全过程,涵盖每个对生产至关重要的API,展示模型、管理器、自定义QuerySets、F表达式、注解和聚合的实际应用模式,并为您提供优化和安全的词汇,以便编写能够在规模上保持稳定的数据库代码。
无论您是刚接触Django三个月还是已经使用三年,这里都有一些内容会改变您编写数据库查询的方式。
为什么这在生产系统中很重要
您对Django ORM的使用性能特征并不是一个理论问题。它们是Django应用程序中生产性能下降最常见的单一来源。
考虑一个简单的电子商务端点,用于列出订单及其项目。一位经验不足的开发者写道:
orders = Order.objects.filter(user=request.user)
for order in orders:
for item in order.items.all(): # ← 每个订单查询
print(item.product.name) # ← 每个商品查询
对于一个平均每个订单有5个商品的用户来说,这会在单次页面加载中生成1 + 50 + 250 = 301个数据库查询。如果有100个用户同时进行此操作,那么每秒将产生30,100个查询。你的数据库将崩溃。响应时间将急剧增加。你的基础设施费用将飙升。
正确的实现使用prefetch_related,无论订单或商品的数量如何,生成的查询恰好是3个。
除了N+1查询外,ORM的误用还会导致:
- 加载不必要的列 — 当你只需要两个字段时却获取所有字段,浪费I/O带宽和内存
- 过早评估QuerySets — 在数据尚未需要时强制进行数据库访问
- 缺少索引 — Django不会创建主键之外的索引,除非你明确声明它们
- 使用Python进行聚合 — 在循环中求和,而不是将
SUM()推送到数据库 - 原始SQL注入 — 不正确地使用
.extra()或.raw()与未经过滤的用户输入
理解本指南所涵盖的ORM层次意味着这些错误不会进入生产环境。
核心概念
模型:映射到数据库表的 Python 类
Django 模型是一个继承自 django.db.models.Model 的 Python 类。每个作为 Field 实例的类属性都会成为数据库表中的一列。
Python 模型类 数据库表
────────────────── ──────────────
class Product(Model) → CREATE TABLE catalog_product (
name = CharField() id BIGINT PRIMARY KEY,
price = DecimalField() name VARCHAR(255),
is_active = BooleanField() price DECIMAL(10,2),
is_active BOOLEAN
);
Django 从应用标签和类名派生表名({app_label}_{model_name_lowercase}),但您可以在 Meta 中覆盖它。
字段:您架构的词汇
Django 提供了大约 30 种内置字段类型。选择正确的字段对于正确性和性能都很重要:
| 字段 | 数据库类型 | 备注 |
|---|---|---|
CharField |
VARCHAR |
始终设置 max_length。对于无限制文本,请使用 TextField。 |
IntegerField |
INTEGER |
对于主键,使用 BigAutoField(自 Django 3.2 起为默认)。 |
DecimalField |
DECIMAL |
用于货币。绝不要对货币使用 FloatField。 |
BooleanField |
BOOLEAN |
避免使用 NullBooleanField;如有需要,在 BooleanField 上使用 null=True。 |
DateTimeField |
TIMESTAMP |
对于创建时间使用 auto_now_add=True,对于更新时间使用 auto_now=True。 |
ForeignKey |
BIGINT + 外键约束 |
创建 {field_name}_id 列。始终考虑 on_delete。 |
JSONField |
JSONB (PostgreSQL) |
功能强大,但应避免用于应作为列的结构化数据。 |
UUIDField |
UUID |
用于公共标识符,以防止 IDOR。 |
查询集:数据库查询的惰性表示
QuerySet 是 Django ORM 的核心抽象。查询集最重要的特性之一是它们的“惰性”。这意味着创建或修改查询集的行为不会立即导致任何数据库活动。
查询集是惰性的。可以创建查询集、传递查询集,并与其他查询集实例组合,而无需实际访问数据库以获取其描述的项。
这是一个关键的思维模型。查询集不是数据——它是一个查询的描述。只有在查询集被评估时,数据库才会被访问。
评估触发器(Django 访问数据库的时刻):
迭代: for order in Order.objects.all()
切片: Order.objects.all()[0:10]
repr() / 打印: print(Order.objects.all())
长度(): len(Order.objects.all())
list(): list(Order.objects.all())
bool(): if Order.objects.filter(pk=1):
.get(): Order.objects.get(pk=1)
.count(): Order.objects.count() ← SELECT COUNT(*)
.exists(): Order.objects.filter(pk=1).exists() ← SELECT 1 LIMIT 1
请注意,一旦执行查询,检索到的数据将存储在内存中,随后对该 QuerySet 的操作将使用“缓存”的数据,而不是再次查询数据库。这个内部缓存机制仅限于 QuerySet 实例的作用域,并且在该特定 QuerySet 的生命周期结束后不会持久化。
管理器:QuerySets 的入口
每个模型至少有一个 Manager —— 通过 Model.objects 访问。管理器是所有数据库查询的入口点。您可能想要自定义管理器的两个原因是:添加额外的管理器方法,和/或修改管理器返回的初始 QuerySet。添加额外的管理器方法是为您的模型添加“表级”功能的首选方式。
架构设计
以下是一个完整的心理模型,展示了 QuerySet 如何从 Python 传递到 SQL 再返回的过程:
Python 代码
│
│ Order.objects.filter(user=request.user)
│ .select_related("shipping_address")
│ .prefetch_related("items__product")
│ .annotate(item_count=Count("items"))
│ .order_by("-created_at")
▼
┌─────────────────────────────────────────────┐
│ QuerySet 对象 │
│ │
│ _result_cache = None ← 在 eval 之前 为空 │
│ query = Query(…) ← 正在 构建 SQL │
│ model = Order │
└──────────────────┬──────────────────────────┘
│ 评估 已触发
▼
┌─────────────────────────────────────────────┐
│ SQL 编译器 │
│ │
│ 解析 查找 ← 过滤(用户=请求.用户)
│ 构建 连接 ← 选择相关(...)
│ 构建 子查询 ← 预取相关(...)
│ 添加 分组 依据 ← 注释(计数(...))
│ 添加 排序 依据 ← 排序依据("-创建时间")
└──────────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ 生成的 SQL (简化版) │
│ │
│ 选择 orders_order.*, │
│ orders_address.*, │
│ 计数(orders_item.id) 作为 item_count │
│ 来自 orders_order │
│ 左连接 JOIN orders_address │
│ ON orders_order.shipping_address_id = │
│ orders_address.id │
│ WHERE orders_order.user_id = 7 │
│ GROUP BY orders_order.id, orders_address.id│
│ ORDER BY orders_order.created_at DESC │
└──────────────────┬──────────────────────────┘
│
▼
PostgreSQL
│
这段代码是一个SQL查询语句,主要用于从数据库中获取特定用户的订单信息。具体来说,它通过连接`orders_order`表和`orders_address`表,筛选出用户ID为7的订单,并根据订单ID和地址ID进行分组,最后按创建时间降序排列结果。
▼ 返回的行
┌─────────────────────────────────────────────┐
│ 模型 实例化 │
│ │
│ 每一 行 → Order() 实例 │
│ 结果 缓存 在 QuerySet._result_cache │
└─────────────────────────────────────────────┘
理解这个管道的不同在于,编写的代码看起来干净但运行400个查询,和看起来相同但只运行2个查询的代码之间的区别。
分步实现
第一步:定义生产就绪模型
# apps/catalog/models.py
from django.db import models
from django.utils.translation import gettext_lazy as _
from django.utils.text import slugify
class TimestampedModel(models.Model):
"""
抽象基类为每个继承它的模型提供 created_at 和 updated_at。定义一次,随处使用。
"""
created_at = models.DateTimeField(auto_now_add=True, db_index=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
abstract = True
class Category(TimestampedModel):
name = models.CharField(max_length=150, unique=True)
slug = models.SlugField(max_length=150, unique=True, blank=True)
parent = models.ForeignKey(
“self“,
on_delete=models.CASCADE,
null=True,
blank=True,
related_name=“children“,
)
is_active = models.BooleanField(default=True, db_index=True)
class Meta:
app_label = “catalog“
verbose_name = _(“Category“)
verbose_name_plural = _(“Categories“)
ordering = ["name"]
def save(self, *args, **kwargs):
if not self.slug:
self.slug = slugify(self.name)
super().save(*args, **kwargs)
def __str__(self) -> str:
在这段代码中,`ordering` 被定义为一个包含字符串 `”name”` 的列表。接下来的 `save` 方法用于保存对象,如果 `slug` 属性尚未设置,则会使用 `slugify` 函数将 `name` 属性转换为 `slug`。最后,调用父类的 `save` 方法以确保对象的保存过程正常进行。`__str__` 方法则用于返回对象的字符串表示。
return self.name
步骤 2:定义复杂关系
# apps/catalog/models.py(续)
class Product(TimestampedModel):
class Status(models.TextChoices):
DRAFT = “草稿“, _(“草稿“)
ACTIVE = “活跃“, _(“活跃“)
ARCHIVED = “归档“, _(“归档“)
name = models.CharField(max_length=255)
slug = models.SlugField(max_length=255, unique=True)
category = models.ForeignKey(
Category,
on_delete=models.PROTECT, # ← PROTECT: 不能删除包含产品的类别
related_name="products",
)
price = models.DecimalField(
max_digits=10, decimal_places=2,
help_text=_("以美元计价"),
)
stock = models.PositiveIntegerField(default=0)
status = models.CharField(
max_length=20,
choices=Status.choices,
default=Status.DRAFT,
db_index=True,
)
tags = models.ManyToManyField(
"标签",
blank=True,
related_name="产品",
)
class Meta:
app_label = "目录"
indexes = [
models.Index(fields=["状态", "类别"]),
models.Index(fields=["别名"]),
models.Index(fields=["-created_at"]),
]
constraints = [
models.CheckConstraint(
check=models.Q(price__gte=0),
name="catalog_product_price_non_negative",
),
models.CheckConstraint(
check=models.Q(stock__gte=0),
name="catalog_product_stock_non_negative",
),
]
def __str__(self) -> str:
return self.name
第3步:构建自定义管理器和查询集
使用管理器可以自由共享注释、聚合,并重用常见过滤器。最简洁的模式是定义一个自定义 QuerySet 并将其作为管理器暴露出来:
# apps/catalog/querysets.py
from django.db import models
from django.db.models import F, Q, Count, Avg, Sum
class ProductQuerySet(models.QuerySet):
def active(self):
"""过滤出仅已发布且有库存的产品。"""
return self.filter(status="active", stock__gt=0)
def by_category(self, category_slug: str):
return self.filter(category__slug=category_slug)
def with_stats(self):
"""
为每个产品注释其订单统计信息。
将计算推送到数据库 — 而不是 Python。
"""
return self.annotate(</span
total_sold=Sum(“order_items__quantity“),
average_rating=Avg(“reviews__rating“),
review_count=Count(“reviews“, distinct=True),
)
def in_price_range(self, min_price=None, max_price=None):
qs = self
if min_price is not None:
qs = qs.filter(price__gte=min_price)
if max_price is not None:
qs = qs.filter(price__lte=max_price)
return qs
def low_stock(self, threshold: int = 10):
"""库存低但不为零的产品。"""
return self.filter(stock__gt=0, stock__lte=threshold)
def search(self, query: str):
"""在名称和描述中进行全文搜索。"""
return self.filter(
Q(name__icontains=query) |
Q(description__icontains=query)
)
# apps/catalog/models.py
from apps.catalog.querysets import ProductQuerySet
class Product(TimestampedModel):
# ... 字段 ...
# 将 QuerySet 作为默认管理器暴露
objects = ProductQuerySet.as_manager()
使用变得流畅且可组合:
# 干净、可读,不显示 SQL
products = (
Product.objects
.active()
.by_category()"electronics")
.with_stats()
.in_price_range(min_price=50, max_price=500)
.select_related("category")
.order_by("-total_sold")
)
第4步:配置数据库性能设置
# config/settings/production.py
DATABASES = {
"default": {
"ENGINE": "django.db.backends.postgresql",
"NAME": env("DB_NAME"),
"USER": env("DB_USER"),
{
"PASSWORD": env("DB_PASSWORD"),
"HOST": env("DB_HOST"),
"PORT": env("DB_PORT", default="5432"),
"CONN_MAX_AGE": 60, # 连接重用时间为60秒
"OPTIONS": {
"connect_timeout": 5,
"options": "-c default_transaction_isolation=read committed",
},
}
}
代码示例
F 表达式:数据库端算术运算
F() 表达式在数据库层面引用模型字段的值,允许您在不先将对象加载到 Python 中的情况下执行操作。这既更快又安全,避免了竞争条件。
from django.db.models import F
# 不好 — 将对象加载到 Python 中,然后再保存回去
product = Product.objects.get(pk=42)
product.stock -= order_quantity
product.save() # ← 两次数据库请求。读取和写入之间存在竞争条件。
# 好 — 在数据库层面进行单个原子更新
Product.objects.filter(pk=42).update(stock=F("stock") - order_quantity)
# SQL: UPDATE catalog_product SET stock = stock - 3 WHERE id = 42
# F 表达式在注解中
from django.db.models import ExpressionWrapper, DecimalField
orders = Order.objects.annotate(
discount_amount=ExpressionWrapper(
F("subtotal") * F("discount_percentage") / 100,
output_field=DecimalField(max_digits=10, decimal_places=2),
)
)
Q 对象:复杂查找和 OR 逻辑
Q() 对象允许您构建复杂的、可组合的 WHERE 子句,使用 AND、OR 和 NOT 逻辑:
from django.db.models import Q
# 简单的 OR:处于活动状态或正在促销的产品
products = Product.objects.filter(
Q(status="active") | Q(on_sale=True)
)
# 复杂的嵌套逻辑
# (活动状态或促销)并且(价格 < 100)并且 NOT(已归档)
products = Product.objects.filter(
(Q(status="active") | Q(on_sale=True)) &
Q(price__lt=100) &
~Q(status="archived")
)
# 可重用的 Q 对象
def active_and_affordable_q(max_price: float) -> Q:
return Q(status="active") & Q(price__lte=max_price)
budget_products = Product.objects.filter(active_and_affordable_q(50.00))
注解和聚合:将计算推送到数据库
与 aggregate() 不同,annotate() 不是一个终止子句。annotate() 子句的输出是一个 QuerySet;这个 QuerySet 可以使用任何其他 QuerySet 操作进行修改,包括 filter()、order_by(),甚至是对 annotate() 的额外调用。
from django.db.models import (
Count, Sum, Avg, Min, Max,
Case, When, Value, IntegerField, DecimalField,
ExpressionWrapper, F
)
from django.db.models.functions import Coalesce, TruncMonth
# annotate() → 为 QuerySet 中的每个对象添加一个字段
categories_with_counts = Category.objects.annotate(
product_count=Count("products", distinct=True),
active_count=Count(
"products",
filter=Q(products__status="active"),
distinct=True,
),
avg_price=Coalesce(Avg("products__price"), 0.0),
)
# 每个类别现在都有 .product_count, .active_count, .avg_price
# aggregate() → 返回一个总结整个 QuerySet 的单一字典
summary = Order.objects.filter(
created_at__year=2025
).aggregate(
total_revenue=Sum("total"),
average_order_value=Avg("total"),
order_count=Count(“id“),
max_order=Max(“total“),
min_order=Min(“total“),
)
# 返回: {“total_revenue”: Decimal(“125300.50”), “average_order_value”: …, …}
# 月度收入细分 — 按月分组
monthly_revenue = (
Order.objects
.annotate(month=TruncMonth(“created_at“))
.values("month")
.annotate(revenue=Sum("total"), orders=Count("id"))
.order_by("month")
)
# SQL: SELECT DATE_TRUNC('month', created_at) AS month,
# SUM(total) AS revenue, COUNT(id) AS orders
# FROM orders_order
# GROUP BY month ORDER BY month
# 使用 Case/When 进行条件注释
products_with_tier = Product.objects.annotate(
price_tier=Case(
当(price__lt=25, 则=值(“预算“)),
当(price__lt=100, 则=值(“中档“)),
当(price__lt=500, 则=值(“高端“)),
默认=值(“奢华“),
output_field=models.CharField(),
)
)
# 每个产品现在都有 .price_tier: "经济型"、"中档"、"高档" 或 "奢华"
select_related 和 prefetch_related: N+1 问题的解决方案
# 场景:显示订单及其用户、送货地址和订单项
# 不佳 — N+1 查询
orders = Order.objects.all()
for order in orders: # 1 查询 (SELECT orders)
user = order.user # N 查询 (SELECT user WHERE id=X)
addr = order.shipping_address # N 查询
for item in order.items.all(): # N 查询 (SELECT items WHERE order=X)
name = item.product.name # N*M 查询
# 好 — 总共 3 个查询,无论规模如何
orders = (
Order.objects
# select_related: SQL JOIN 用于外键/一对一关系
.select_related("user", "shipping_address")
# prefetch_related: 针对多对多/反向外键的单独批量查询
.prefetch_related("items__product__category")
.filter(status="processing")
.order_by("-created_at")
)
# 查询 1: SELECT orders + JOIN users + JOIN addresses WHERE status='processing'
# 查询 2: SELECT items WHERE order_id IN (1, 2, 3, ...)
# 查询 3: SELECT products + JOIN categories WHERE id IN (...)
自定义预取对象:带过滤器的预取
from django.db.models import Prefetch
# 仅预取活跃的评论,预先排序并过滤
active_reviews = Review.objects.filter(is_approved=True).order_by("-created_at")
products = Product.objects.prefetch_related(
Prefetch("reviews", queryset=active_reviews, to_attr="approved_reviews")
).active()
# 现在 product.approved_reviews 是一个预加载的列表 — 无需额外查询
for product in products:
top_review = product.approved_reviews[0] if product.approved_reviews else None
批量操作:一次查询处理数十行
from django.db import transaction
# bulk_create: 在一个INSERT语句中插入多行
new_tags = Tag.objects.bulk_create([
Tag(name="python"),
Tag(name="django"),
Tag(name="backend"),
], ignore_conflicts=True) # ← 如果已存在则跳过
# bulk_update: 在一个UPDATE语句中更新多行
products_to_reactivate = Product.objects.filter(status="archived", stock__gt=0)
for product in products_to_reactivate:
product.status = "active"
Product.objects.bulk_update(products_to_reactivate, fields=["status"])
# SQL: UPDATE catalog_product SET status=CASE WHEN id=1 THEN 'active' ... END
# update(): 单个 SQL 更新 — 无需模型实例化
Product.objects.filter(
category__slug="electronics",
stock=0,
).update(status="archived")
# SQL: UPDATE catalog_product SET status='archived'
# WHERE category_id IN (SELECT id FROM category WHERE slug='electronics')
# AND stock = 0
iterator() 方法:内存高效的大型查询集处理
# 不好 — 一次性加载所有 500,000 行到内存中
for order in Order.objects.filter(status="pending"):
process_order(order) # 在规模上存在 OOM 风险
# 好 — 分块获取,内存占用大大降低
for order in Order.objects.filter(status="pending").iterator(chunk_size=2000):
process_order(order)
# 使用预取(手动,因为 prefetch_related 不适用于迭代器)
for order in Order.objects.filter(status="pending").select_related("user").iterator(chunk_size=2000):
process_order(order)
在这段代码中,我们使用了一个循环来处理状态为“待处理”的订单。首先,通过 `Order.objects.filter(status=”pending”)` 过滤出所有状态为“待处理”的订单。接着,使用 `select_related(“user”)` 方法来预加载与订单相关的用户信息,以提高查询效率。最后,使用 `iterator(chunk_size=2000)` 方法以每次处理2000个订单的方式进行迭代处理。每个订单都会通过 `process_order(order)` 函数进行处理。
模型方法与管理器方法
class Order(TimestampedModel):
# ... 字段 ...
# ── 模型方法:行级,实例特定逻辑 ──
def get_subtotal(self) -> Decimal:
"""所有商品价格的总和。需要预先获取商品。"""
return sum(item.subtotal for item in self.items.all())
def can_be_cancelled(self) -> bool:
return self.status in ("pending", "processing")
def cancel(self) -> None:
if not self.can_be_cancelled():
raise ValueError(f"订单 #{self.pk} 无法从状态 '{self.status}'")
self.status = "已取消"
self.save(update_fields=["status", "updated_at"])
# ── 使用管理器/服务进行表级操作 ──
# 不要在模型方法中放置批量查询或跨表逻辑。
事务:原子性保证
from django.db import transaction
def create_order_with_items(user, cart_items: list) -> Order:
"""
原子性地创建一个订单及其订单项。
如果任何操作失败,整个操作将回滚。
不会有部分订单到达数据库。
with transaction.atomic():
order = Order.objects.create(
user=user,
status=Order.Status.PENDING,
total=sum(item["price"] * item["quantity"] for item in cart_items),
)
order_items = [
OrderItem(
order=order,
product_id=item["product_id"],
quantity=item["quantity"],
unit_price=item["price"],
)
for item in cart_items
]
OrderItem.objects.bulk_create(order_items)
# 原子性更新库存 — 无竞争条件
for item in cart_items:
updated = Product.objects.filter(
pk=item["product_id"],
stock__gte=item["quantity"], # 仅在库存充足时更新
).update(stock=F("stock") - item["quantity"])
if updated == 0:
raise ValueError(f"产品 {item['product_id']}")
return order
性能优化
1. 使用 only() 和 defer() 限制列的提取
加载不需要的列会浪费 I/O 带宽和内存。 only() 是白名单方法; defer() 是黑名单方法:
# 仅加载您实际使用的列
product_list = (
Product.objects
.active()
.only("id", "name", "slug", "price", "stock") # ← 5 列而不是 20 列
.order_by("name")
)
# SQL: SELECT id, name, slug, price, stock FROM catalog_product WHERE status='active'
# 延迟加载已知占用资源且很少需要的字段
products = Product.objects.defer("description", "metadata", "image_data")
重要:访问实例上的延迟字段会为每个实例触发额外的查询。当你知道所需的确切字段时,请使用 only()。
2. 对于只读数据使用 values() 和 values_list()
当你不需要模型实例时——例如在API、导出、聚合中——可以完全跳过对象实例化:
# 返回字典而不是模型实例——更快,内存占用更少
products = (
Product.objects
.active()
.values("id", "name", "price", "stock")
)
# [{"id": 1, "name": "Widget", "price": "29.99", "stock": 100}, ...]
# 返回元组 — 更轻量
product_ids = Product.objects.active().values_list("id", flat=True)
# (1, 2, 3, 4, ...)
# 直接获取字典: {id: name}
id_name_map = dict(Product.objects.values_list("id", "name"))
3. 适当使用 exists() 和 count()
# 不好 — 加载整个 QuerySet 来检查存在性
if len(Order.objects.filter(user=user, status="pending")) > 0:
...
# 好 — SELECT 1 LIMIT 1 — 没有加载行
if Order.objects.filter(user=user, status="pending").exists():
...
# 不好 — 计算 QuerySet
count = len(Product.objects.filter(status="active"))
# 好 — SELECT COUNT(*) — 单个聚合查询
count = Product.objects.filter(status="active").count()
4. 在测试中强制执行查询预算
# apps/orders/tests/test_selectors.py
from django.test import TestCase
from apps.orders.selectors import get_orders_for_user
from apps.orders.tests.factories import OrderFactory, OrderItemFactory
class TestOrderSelectorQueryCount(TestCase):
"""
锁定关键选择器的查询计数。
如果有人添加了 N+1 查询,这个测试会立即失败
— 而不是在凌晨 2 点的生产监控中。
"""
def setUp(self):
self.user = UserFactory()
orders = OrderFactory.create_batch(10, user=self.user)
for order in orders:
OrderItemFactory.create_batch(5, order=order)
def test_get_orders_query_count(self):
with self.assertNumQueries(3):
这段代码定义了一个测试类的方法,其中 `setUp` 方法用于初始化测试环境,创建一个用户并生成10个订单,每个订单包含5个订单项。接着,`test_get_orders_query_count` 方法用于测试查询的数量,期望在执行过程中只产生3个数据库查询。
# 1: 订单 + JOIN 用户 + JOIN 地址
# 2: 预取订单项
# 3: 预取产品
list(get_orders_for_user(user_id=self.user.id))
5. 数据库索引:最具影响力的单一变更
class Order(TimestampedModel):
class Meta:
indexes = [
# 复合索引:最常见的查询 — “用户的订单状态”
models.Index(
fields=["user", "status"],
name="idx_order_user_status",
),
# 按创建时间降序排列以便默认排序
models.Index(
fields=["-created_at"],
name="idx_order_created_at_desc",
),
]
# 部分索引(PostgreSQL):仅索引未取消的订单
# 更小、更快的索引,适用于热点查询路径
# 通过自定义迁移添加 PostgreSQL 特定语法:
# CREATE INDEX idx_active_orders ON orders_order(user_id)
# WHERE status != 'cancelled';
要识别缺失的索引,请在慢查询上使用 EXPLAIN ANALYZE:
# 在 Django shell 中 — 检查慢查询选择器的查询计划
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("""
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT * FROM orders_order
WHERE user_id = 1 AND status = 'processing'
ORDER BY created_at DESC
LIMIT 20
""")
for row in cursor.fetchall():
print(row[0])
安全最佳实践
1. SQL 注入:ORM 保护了什么,未保护什么
Django ORM 自动对所有查询进行参数化——字段查找和 filter() 调用从未受到 SQL 注入的威胁。然而,仍然存在一些需要手动处理的逃生通道:
# 安全 — ORM 对所有值进行参数化
Product.objects.filter(name=user_input)
# SQL: SELECT ... WHERE name = %s -- [user_input]
# 危险 — 原始 SQL 语句与字符串格式化
Product.objects.raw(f"SELECT * FROM catalog_product WHERE name = '{user_input}'")
# ← 永远不要这样做。 user_input = "'; DROP TABLE catalog_product; --"
# 安全 — 原始 SQL 语句与参数化
Product.objects.raw(
"SELECT * FROM catalog_product WHERE name = %s",
[user_input] # ← 始终使用参数化占位符
)
# 危险 — 使用未转义的用户输入调用 extra()
Product.objects.extra(where=[f"name = '{user_input}'"]) # ← 永远不要
# 安全 — 使用参数调用 extra()
Product.objects.extra(where=["name = %s"], params=[user_input])
2. 防止批量赋值
切勿直接将 request.data 或 request.POST 传递给 Model.objects.create():
# 危险 — 攻击者可以设置任何模型字段
order = Order.objects.create(**request.POST.dict()) # ← 切勿这样做
# 安全 — 通过序列化器白名单字段
from apps.orders.serializers import CreateOrderSerializer
serializer = CreateOrderSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
order = OrderService.create(**serializer.validated_data)
3. 通过对象级作用域防止 IDOR
# 危险 — 任何经过身份验证的用户都可以通过 ID 访问任何订单
order = get_object_or_404(Order, pk=pk) # ← IDOR 漏洞
# 安全 — 始终限制在经过身份验证的用户范围内
order = get_object_or_404(Order, pk=pk, user=request.user)
# 更好 — 在选择器中使用范围查询集
def get_order_for_user(user, order_id: int) -> Order:
return get_object_or_404(
Order.objects.filter(user=user),
pk=order_id,
)
4. 在 .save() 中使用 update_fields 以提高精确度
# 不推荐 — 保存所有字段;有覆盖并发更新的风险
product.status = "archived"
product.save() # ← 重写所有列,包括任何不在内存中的列
# 推荐 — 仅更新你更改的特定列
product.status = "archived"
product.save(update_fields=["status", "updated_at"])
# SQL: UPDATE catalog_product SET status='archived', updated_at=NOW() WHERE id=42
5. 在数据库层面使用约束以确保数据完整性
Django模型约束在数据库层面强制执行业务规则——最后的防线:
class OrderItem(models.Model):
quantity = models.PositiveIntegerField()
unit_price = models.DecimalField(max_digits=10, decimal_places=2)
class Meta:
constraints = [
models.CheckConstraint(
check=models.Q(quantity__gt=0),
name="orderitem_positive_quantity",
),
models.CheckConstraint(
check=models.Q(unit_price__gte=0),
name="orderitem_non_negative_price",
),
models.UniqueConstraint(
fields=["order", "product"],
name="orderitem_unique_order_product",
),
]
常见开发者错误
❌ 错误 1:过早评估 QuerySets
# 不好 — 立即评估 QuerySet,失去链式调用能力
def get_active_products():
return list(Product.objects.filter(status="active")) # ← 已评估!
# 无法执行:get_active_products().filter(category=electronics)
# 好 — 返回 QuerySet,让调用者决定何时评估
def get_active_products():
return Product.objects.filter(status="active") # ← 懒惰,链式调用
# 调用者现在可以进一步筛选:
products = get_active_products().filter(category=electronics).order_by("-price")
❌ 错误 2:使用 len() 而不是 .count()
# 不好 — 为了计数而将所有行加载到内存中
if len(Order.objects.filter(user=user)) > 0:
...
n = len(Product.objects.all()) # 可能加载数百万行
# 好 — 单个 COUNT(*) 查询,加载零行
if Order.objects.filter(user=user).exists():
...
n = Product.objects.count()
❌ 错误 3:错误地组合多个注解
使用 annotate() 组合多个聚合会产生错误的结果,因为使用的是连接而不是子查询。
# 不好 — 由于多个 JOIN 导致重复行;author__count 将是错误的
books = Book.objects.annotate(Count("authors"), Count("stores"))
# books[0].authors__count → 错误(膨胀)
# 好 — 使用 distinct=True 来修复重复计数
books = Book.objects.annotate(
author_count=Count("authors", distinct=True),
store_count=Count("stores", distinct=True),
)
❌ 错误 4:在循环中调用 .save()
# 不好 — 对于 1,000 个产品执行 1,000 次 UPDATE 查询
products = Product.objects.filter(category=electronics)
for product in products:
product.price *= 0.9 # 10% 折扣
product.save() # ← 每个产品一个查询
# 好 — 单个 UPDATE 查询
Product.objects.filter(category=electronics).update(
price=F("price") * Decimal("0.9")
)
❌ 错误 5:在未预取的情况下访问相关对象
# 不好 — 模板循环触发 N+1
{% for order in orders %}
{{ order.user.email }} {# 每个订单的数据库访问 #}
{{ order.items.count() }} {# 每个订单的数据库访问 #}
{% endfor %}
# 好 — 在传递给模板之前进行预取
orders = Order.objects.select_related("user").prefetch_related("items")
❌ 错误 6:在带注解的 .values() 后使用 .filter()
# 不良示例 — 注解时过滤顺序很重要
# 根据 Django 版本,这可能会产生意想不到的结果
result = (
Order.objects
.values("status")
.annotate(total=Sum("amount"))
.filter(status="active") # ← 在这里是安全的,但如果顺序改变则很脆弱
)
# 良好示例 — 尽可能在注解之前进行过滤
result = (
Order.objects
.filter(status="active") # ← 首先进行过滤
.values("status")
.annotate(total=Sum("amount"))
)
实际生产使用案例
使用案例 1:电子商务仪表板 — 复杂聚合
产品经理的仪表板需要实时统计数据:按类别的收入、畅销产品、低库存警报、转化率。所有计算都在数据库中完成 — 不使用 Python 循环:
# apps/analytics/selectors.py
from django.db.models import Sum, Count, Avg, F, Q, ExpressionWrapper, DecimalField
from django.db.models.functions import TruncMonth, Coalesce
def get_dashboard_stats(start_date, end_date) -> dict:
"""
返回完整的仪表板数据,共需4个查询。
如果采用简单的方法,可能需要100次以上的Python迭代。
"""
from apps.orders.models import Order, OrderItem
from apps.catalog.models import Category
# 查询1:本期按类别的收入
revenue_by_category = (
OrderItem.objects
.filter(order__created_at__range=(start_date, end_date))
.values("product__category__name")
.annotate(
revenue=Sum(
ExpressionWrapper(
F("quantity") * F("unit_price"),
output_field=DecimalField(max_digits=12, decimal_places=2),
)
),
units_sold=Sum("quantity"),
)
.order_by("-revenue")[:10]
)
# 查询 2:月度趋势
monthly_trend = (
Order.objects
.filter(created_at__range=(start_date, end_date))
.annotate(month=TruncMonth("created_at"))
.values("month")
.annotate(
revenue=Sum("total"),
order_count=Count("id"),
avg_order_value=Avg("total"),
)
.order_by("month")
)
# 查询 3:按收入排序的热门产品
top_products = (
OrderItem.objects
.filter(order__created_at__range=(start_date, end_date))
.values("product__name", "product__id")
.annotate(
total_revenue=Sum(F("quantity") * F("unit_price"),
output_field=DecimalField()),
total_units=Sum("quantity"),
)
.order_by("-total_revenue")[:20]
)
# 查询 4:低库存警报
low_stock = (
Product.objects
.active()
.filter(stock__lte=10)
.values("id", "name", "stock", "category__name")
.order_by("stock")
)
return {
"revenue_by_category": list(revenue_by_category),
"monthly_trend": list(monthly_trend),
"top_products": list(top_products),
"low_stock_alerts": list(low_stock),
}
用例 2:大数据导出 — 内存高效迭代器模式
财务团队需要将 500,000 个订单导出为 CSV。将所有数据加载到内存中会导致服务器崩溃:
# apps/exports/services.py
import csv
from django.http import StreamingHttpResponse
from apps.orders.models import Order
class EchoWriter:
"""类似文件的对象,可以回显其输入。用于 StreamingHttpResponse。"""
def write(self, value):
return value
def stream_orders_csv(queryset) -> StreamingHttpResponse:
"""
流式导出大量 CSV 数据,而不将所有内容加载到内存中。
使用 Django 的 StreamingHttpResponse + ORM 的 iterator() 实现 O(1) 的内存使用。
"""
writer = csv.writer(EchoWriter())
def generate_rows():
yield writer.writerow(["ID", "用户邮箱", "状态", "总计", "创建时间"])
for order in (
queryset
.select_related("用户")
.only("id", "user__email", "status", "total", "created_at")
.iterator(chunk_size=2000)
):
yield writer.writerow([
order.id,
order.user.email,
order.status,
str(order.total),
order.created_at.isoformat(),
])
response = StreamingHttpResponse(generate_rows(), content_type="text/csv")
response["Content-Disposition"] = 'attachment; filename="orders.csv"'
return response
用例 3:通过自定义管理器实现 SaaS 多租户数据隔离
# apps/core/managers.py
from django.db import models
class TenantQuerySet(models.QuerySet):
def for_tenant(self, tenant_id: int):
return self.filter(tenant_id=tenant_id)
class TenantManager(models.Manager):
def get_queryset(self):
return TenantQuerySet(self.model, using=self._db)
def for_tenant(self, tenant_id: int):
return self.get_queryset().for_tenant(tenant_id)
# apps/projects/models.py
class Project(TimestampedModel):
tenant = models.ForeignKey("tenants.Tenant", on_delete=models.CASCADE)
name = models.CharField(max_length=255)
# ...
objects = TenantManager()
# 使用 — 在结构上不可能跨越租户边界
projects = Project.objects.for_tenant(request.tenant.id)
结论
Django ORM 是一款精密的工具。正确使用时,它是任何语言中最具表现力、安全性和性能的数据库接口之一。若使用不当,则会导致 N+1 查询、内存膨胀以及数据完整性错误,这些问题在规模扩大时表现不佳且不可预测。
本指南中的模式——懒惰的 QuerySet 链接、可组合 QuerySet 的自定义管理器、用于原子更新的 F 表达式、用于数据库端计算的 annotate()、用于关系加载的 select_related 和 prefetch_related、用于内存高效批处理的 iterator() 以及用于数据完整性的模型级约束——并不是高级技巧。这些是生产质量 Django 代码的基础。
最重要的原则是:在最低层次上进行工作。聚合操作应在 SQL 中进行,而不是在 Python 循环中。对象图应在预取的 QuerySet 中,而不是重复的数据库调用中。业务规则应在数据库约束中,而不仅仅是在应用代码中。
故意编写你的模型。懒惰地构建你的 QuerySet。将计算推向更低层次。通过测试锁定你的查询计数。在假设查询足够快以用于生产之前,使用 EXPLAIN ANALYZE。
数据库是基础。要像它重要一样构建它。
进一步阅读
- Django ORM 优化指南
- QuerySet API 参考
- 查询表达式
- 聚合文档
- Django模型字段参考
- 自定义管理器文档
- Sentry:Django性能改进
- AppSignal:使用Django QuerySets提高查询性能
由一位构建生产Django系统的Python后端工程师撰写。主题:Django ORM、QuerySet、N+1查询、F表达式、注解、聚合、select_related、prefetch_related、自定义管理器、数据库优化。