深入理解Django模型与ORM:完整的生产指南

深入探讨Django最强大的抽象——从字段类型和关系到QuerySet内部结构、F表达式、注解以及生产级优化策略。


原文发布于:https://alansomathewdev.blogspot.com/2026/03/understanding-django-models-and-orm.html


目录

  1. 引言
  2. 在生产系统中为何重要
  3. 核心概念
  4. 架构设计
  5. 逐步实现
  6. 代码示例
  7. 性能优化
  8. 安全最佳实践
  9. 常见开发者错误
  10. 真实生产用例
  11. 结论

引言

Django ORM 可以说是 Django 开发者工具包中最重要的工具。它是你的 Python 对象与关系数据库之间的桥梁——一个将方法链转换为 SQL、将关系声明转换为 JOIN 子句、以及将 Python 异常转换为数据库约束违规的翻译层。

从表面上看,ORM 看起来 deceptively simple。你定义一个类,声明一些字段,然后 Django 创建数据库表。你调用 Model.objects.filter() 并返回对象。不需要 SQL。进来,出去。

但这种简单性对于没有准备的人来说是一个陷阱。Django ORM 是一个极其懒惰的系统——它将数据库执行推迟到最后一刻,在接触数据库之前在内存中构建查询计划,并暴露出丰富的表达语言,当理解后,可以将复杂的业务逻辑推送到数据库中,这样它才是合适的。当误解时,它会在每个请求中生成数十或数百个冗余查询,将整个表加载到内存中,并创建随着数据增长而非线性恶化的性能特征。

本指南从基础开始介绍Django ORM。我们将追踪一个QuerySet从Python方法调用到SQL执行的全过程,涵盖每个对生产至关重要的API,展示模型、管理器、自定义QuerySets、F表达式、注解和聚合的实际应用模式,并为您提供优化和安全的词汇,以便编写能够在规模上保持稳定的数据库代码。

无论您是刚接触Django三个月还是已经使用三年,这里都有一些内容会改变您编写数据库查询的方式。


为什么这在生产系统中很重要

您对Django ORM的使用性能特征并不是一个理论问题。它们是Django应用程序中生产性能下降最常见的单一来源。

考虑一个简单的电子商务端点,用于列出订单及其项目。一位经验不足的开发者写道:

orders = Order.objects.filter(user=request.user)

for order in orders:
    for item in order.items.all():  # ← 每个订单查询
        print(item.product.name)    # ← 每个商品查询

对于一个平均每个订单有5个商品的用户来说,这会在单次页面加载中生成1 + 50 + 250 = 301个数据库查询。如果有100个用户同时进行此操作,那么每秒将产生30,100个查询。你的数据库将崩溃。响应时间将急剧增加。你的基础设施费用将飙升。

正确的实现使用prefetch_related,无论订单或商品的数量如何,生成的查询恰好是3个。

除了N+1查询外,ORM的误用还会导致:

  • 加载不必要的列 — 当你只需要两个字段时却获取所有字段,浪费I/O带宽和内存
  • 过早评估QuerySets — 在数据尚未需要时强制进行数据库访问
  • 缺少索引 — Django不会创建主键之外的索引,除非你明确声明它们
  • 使用Python进行聚合 — 在循环中求和,而不是将SUM()推送到数据库
  • 原始SQL注入 — 不正确地使用.extra().raw()与未经过滤的用户输入

理解本指南所涵盖的ORM层次意味着这些错误不会进入生产环境。


核心概念

模型:映射到数据库表的 Python 类

Django 模型是一个继承自 django.db.models.Model 的 Python 类。每个作为 Field 实例的类属性都会成为数据库表中的一列。

Python 模型类          数据库表
──────────────────          ──────────────
class Product(Model)    →   CREATE TABLE catalog_product (
    name = CharField()          id BIGINT PRIMARY KEY,
    price = DecimalField()       name VARCHAR(255),
    is_active = BooleanField()   price DECIMAL(10,2),
                                 is_active BOOLEAN
                             );

Django 从应用标签和类名派生表名({app_label}_{model_name_lowercase}),但您可以在 Meta 中覆盖它。

字段:您架构的词汇

Django 提供了大约 30 种内置字段类型。选择正确的字段对于正确性和性能都很重要:

字段 数据库类型 备注
CharField VARCHAR 始终设置 max_length。对于无限制文本,请使用 TextField
IntegerField INTEGER 对于主键,使用 BigAutoField(自 Django 3.2 起为默认)。
DecimalField DECIMAL 用于货币。绝不要对货币使用 FloatField
BooleanField BOOLEAN 避免使用 NullBooleanField;如有需要,在 BooleanField 上使用 null=True
DateTimeField TIMESTAMP 对于创建时间使用 auto_now_add=True,对于更新时间使用 auto_now=True
ForeignKey BIGINT + 外键约束 创建 {field_name}_id 列。始终考虑 on_delete
JSONField JSONB (PostgreSQL) 功能强大,但应避免用于应作为列的结构化数据。
UUIDField UUID 用于公共标识符,以防止 IDOR。

查询集:数据库查询的惰性表示

QuerySet 是 Django ORM 的核心抽象。查询集最重要的特性之一是它们的“惰性”。这意味着创建或修改查询集的行为不会立即导致任何数据库活动。

查询集是惰性的。可以创建查询集、传递查询集,并与其他查询集实例组合,而无需实际访问数据库以获取其描述的项。

这是一个关键的思维模型。查询集不是数据——它是一个查询的描述。只有在查询集被评估时,数据库才会被访问。

评估触发器(Django 访问数据库的时刻):


迭代:          for order in Order.objects.all()
切片:            Order.objects.all()[0:10]
repr() / 打印:     print(Order.objects.all())
长度():              len(Order.objects.all())

list():             list(Order.objects.all())
bool():             if Order.objects.filter(pk=1):
.get():             Order.objects.get(pk=1)
.count():           Order.objects.count()            SELECT COUNT(*)

.exists():          Order.objects.filter(pk=1).exists()  SELECT 1 LIMIT 1

请注意,一旦执行查询,检索到的数据将存储在内存中,随后对该 QuerySet 的操作将使用“缓存”的数据,而不是再次查询数据库。这个内部缓存机制仅限于 QuerySet 实例的作用域,并且在该特定 QuerySet 的生命周期结束后不会持久化。

管理器:QuerySets 的入口

每个模型至少有一个 Manager —— 通过 Model.objects 访问。管理器是所有数据库查询的入口点。您可能想要自定义管理器的两个原因是:添加额外的管理器方法,和/或修改管理器返回的初始 QuerySet。添加额外的管理器方法是为您的模型添加“表级”功能的首选方式。


架构设计

以下是一个完整的心理模型,展示了 QuerySet 如何从 Python 传递到 SQL 再返回的过程:

Python 代码
    

  Order.objects.filter(user=request.user)
             .select_related("shipping_address")
             .prefetch_related("items__product")
             .annotate(item_count=Count("items"))
             .order_by("-created_at")
    
┌─────────────────────────────────────────────┐
            QuerySet 对象                   


_result_cache = None eval 之前 为空
query = Query(…) 正在 构建 SQL
model = Order
└──────────────────┬──────────────────────────┘
评估 已触发

┌─────────────────────────────────────────────┐
SQL 编译器


  解析 查找   过滤(用户=请求.用户)
  构建 连接       选择相关(...)
  构建 子查询  预取相关(...)
  添加 分组 依据      注释(计数(...))
  添加 排序 依据      排序依据("-创建时间")
└──────────────────┬──────────────────────────┘
                   
                   
┌─────────────────────────────────────────────┐

生成的 SQL (简化版)

选择 orders_order.*,
orders_address.*,
计数(orders_item.id) 作为 item_count
来自 orders_order
左连接 JOIN orders_address


    ON orders_order.shipping_address_id =    
       orders_address.id                     
  WHERE orders_order.user_id = 7             
  GROUP BY orders_order.id, orders_address.id
  ORDER BY orders_order.created_at DESC      
└──────────────────┬──────────────────────────┘
                   
                   
            PostgreSQL
                   

这段代码是一个SQL查询语句,主要用于从数据库中获取特定用户的订单信息。具体来说,它通过连接`orders_order`表和`orders_address`表,筛选出用户ID为7的订单,并根据订单ID和地址ID进行分组,最后按创建时间降序排列结果。

返回的行
┌─────────────────────────────────────────────┐
模型 实例化

每一 Order() 实例
结果 缓存 QuerySet._result_cache
└─────────────────────────────────────────────┘





 

 

 

理解这个管道的不同在于,编写的代码看起来干净但运行400个查询,和看起来相同但只运行2个查询的代码之间的区别。


分步实现

 

第一步:定义生产就绪模型

 

 

# apps/catalog/models.py
from django.db import models
from django.utils.translation import gettext_lazy as _
from django.utils.text import slugify

class TimestampedModel(models.Model):

"""
    抽象基类为每个继承它的模型提供 created_at 和 updated_at。定义一次,随处使用。
    """
    created_at = models.DateTimeField(auto_now_add=True, db_index=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        abstract = True

class Category(TimestampedModel):

name = models.CharField(max_length=150, unique=True)
slug = models.SlugField(max_length=150, unique=True, blank=True)
parent = models.ForeignKey(
self,
on_delete=models.CASCADE,
null=True,

blank=True,
related_name=children,
)
is_active = models.BooleanField(default=True, db_index=True)

class Meta:
app_label = catalog
verbose_name = _(Category)
verbose_name_plural = _(Categories)


ordering            = ["name"]

def save(self, *args, **kwargs):
    if not self.slug:
        self.slug = slugify(self.name)
    super().save(*args, **kwargs)

def __str__(self) -> str:

在这段代码中,`ordering` 被定义为一个包含字符串 `”name”` 的列表。接下来的 `save` 方法用于保存对象,如果 `slug` 属性尚未设置,则会使用 `slugify` 函数将 `name` 属性转换为 `slug`。最后,调用父类的 `save` 方法以确保对象的保存过程正常进行。`__str__` 方法则用于返回对象的字符串表示。


return self.name

步骤 2:定义复杂关系

# apps/catalog/models.py(续)

class Product(TimestampedModel):

    class Status(models.TextChoices):

DRAFT = 草稿, _(草稿)
ACTIVE = 活跃, _(活跃)
ARCHIVED = 归档, _(归档)

name = models.CharField(max_length=255)


slug        = models.SlugField(max_length=255, unique=True)
    category    = models.ForeignKey(
        Category,
        on_delete=models.PROTECT,          # ← PROTECT: 不能删除包含产品的类别
        related_name="products",
    )
    price       = models.DecimalField(
        max_digits=10, decimal_places=2,

help_text=_("以美元计价"),
    )
    stock       = models.PositiveIntegerField(default=0)
    status      = models.CharField(
        max_length=20,
        choices=Status.choices,
        default=Status.DRAFT,
        db_index=True,
    )
    tags        = models.ManyToManyField(

"标签",
        blank=True,
        related_name="产品",
    )

    class Meta:
        app_label = "目录"
        indexes   = [
            models.Index(fields=["状态", "类别"]),
            models.Index(fields=["别名"]),

models.Index(fields=["-created_at"]),
]
constraints = [
    models.CheckConstraint(
        check=models.Q(price__gte=0),
        name="catalog_product_price_non_negative",
    ),
    models.CheckConstraint(
        check=models.Q(stock__gte=0),

name="catalog_product_stock_non_negative",
),
]

def __str__(self) -> str:
return self.name

第3步:构建自定义管理器和查询集

使用管理器可以自由共享注释、聚合,并重用常见过滤器。最简洁的模式是定义一个自定义 QuerySet 并将其作为管理器暴露出来:

# apps/catalog/querysets.py
from django.db import models
from django.db.models import F, Q, Count, Avg, Sum

class ProductQuerySet(models.QuerySet):

    def active(self):
        """过滤出仅已发布且有库存的产品。"""


return self.filter(status="active", stock__gt=0)

    def by_category(self, category_slug: str):
        return self.filter(category__slug=category_slug)

    def with_stats(self):
        """
        为每个产品注释其订单统计信息。
        将计算推送到数据库 — 而不是 Python。
        """
        return self.annotate(</span

total_sold=Sum(order_items__quantity),
average_rating=Avg(reviews__rating),
review_count=Count(reviews, distinct=True),
)

def in_price_range(self, min_price=None, max_price=None):
qs = self
if min_price is not None:


qs = qs.filter(price__gte=min_price)
if max_price is not None:
    qs = qs.filter(price__lte=max_price)
return qs

def low_stock(self, threshold: int = 10):
    """库存低但不为零的产品。"""

return self.filter(stock__gt=0, stock__lte=threshold)

    def search(self, query: str):
        """在名称和描述中进行全文搜索。"""
        return self.filter(
            Q(name__icontains=query) |
            Q(description__icontains=query)
        )

 

# apps/catalog/models.py
from apps.catalog.querysets import ProductQuerySet

class Product(TimestampedModel):
    # ... 字段 ...

    # 将 QuerySet 作为默认管理器暴露
    objects = ProductQuerySet.as_manager()

 

使用变得流畅且可组合:

# 干净、可读,不显示 SQL
products = (
    Product.objects
    .active()
    .by_category()"electronics")
    .with_stats()

.in_price_range(min_price=50, max_price=500)
    .select_related("category")
    .order_by("-total_sold")
)

第4步:配置数据库性能设置

# config/settings/production.py
DATABASES = {
    "default": {
        "ENGINE": "django.db.backends.postgresql",
        "NAME": env("DB_NAME"),
        "USER": env("DB_USER"),


{
    "PASSWORD": env("DB_PASSWORD"),
    "HOST": env("DB_HOST"),
    "PORT": env("DB_PORT", default="5432"),
    "CONN_MAX_AGE": 60,           # 连接重用时间为60秒
    "OPTIONS": {

"connect_timeout": 5,
"options": "-c default_transaction_isolation=read committed",
},
}
}

代码示例

F 表达式:数据库端算术运算

F() 表达式在数据库层面引用模型字段的值,允许您在不先将对象加载到 Python 中的情况下执行操作。这既更快又安全,避免了竞争条件。

from django.db.models import F

# 不好 — 将对象加载到 Python 中,然后再保存回去
product = Product.objects.get(pk=42)
product.stock -= order_quantity
product.save()  # ← 两次数据库请求。读取和写入之间存在竞争条件。

# 好 — 在数据库层面进行单个原子更新



Product.objects.filter(pk=42).update(stock=F("stock") - order_quantity)
# SQL: UPDATE catalog_product SET stock = stock - 3 WHERE id = 42

# F 表达式在注解中
from django.db.models import ExpressionWrapper, DecimalField

orders = Order.objects.annotate(
    discount_amount=ExpressionWrapper(

F("subtotal") * F("discount_percentage") / 100,
        output_field=DecimalField(max_digits=10, decimal_places=2),
    )
)

 

Q 对象:复杂查找和 OR 逻辑

 

Q() 对象允许您构建复杂的、可组合的 WHERE 子句,使用 AND、OR 和 NOT 逻辑:

 

 

from django.db.models import Q

# 简单的 OR:处于活动状态或正在促销的产品
products = Product.objects.filter(
    Q(status="active") | Q(on_sale=True)
)

# 复杂的嵌套逻辑
# (活动状态或促销)并且(价格 < 100)并且 NOT(已归档)

products = Product.objects.filter(
    (Q(status="active") | Q(on_sale=True)) &
    Q(price__lt=100) &
    ~Q(status="archived")
)

# 可重用的 Q 对象
def active_and_affordable_q(max_price: float) -> Q:

return Q(status="active") & Q(price__lte=max_price)

budget_products = Product.objects.filter(active_and_affordable_q(50.00))

 

注解和聚合:将计算推送到数据库

aggregate() 不同,annotate() 不是一个终止子句。annotate() 子句的输出是一个 QuerySet;这个 QuerySet 可以使用任何其他 QuerySet 操作进行修改,包括 filter()order_by(),甚至是对 annotate() 的额外调用。

from django.db.models import (
    Count, Sum, Avg, Min, Max,
    Case, When, Value, IntegerField, DecimalField,
    ExpressionWrapper, F
)

from django.db.models.functions import Coalesce, TruncMonth

# annotate() → 为 QuerySet 中的每个对象添加一个字段
categories_with_counts = Category.objects.annotate(
    product_count=Count("products", distinct=True),
    active_count=Count(
        "products",
        filter=Q(products__status="active"),
        distinct=True,
    ),

avg_price=Coalesce(Avg("products__price"), 0.0),
)
# 每个类别现在都有 .product_count, .active_count, .avg_price

# aggregate() → 返回一个总结整个 QuerySet 的单一字典
summary = Order.objects.filter(
    created_at__year=2025
).aggregate(
    total_revenue=Sum("total"),
    average_order_value=Avg("total"),

order_count=Count(id),
max_order=Max(total),
min_order=Min(total),
)
# 返回: {“total_revenue”: Decimal(“125300.50”), “average_order_value”: …, …}

# 月度收入细分 — 按月分组
monthly_revenue = (
Order.objects
.annotate(month=TruncMonth(created_at))


.values("month")
    .annotate(revenue=Sum("total"), orders=Count("id"))
    .order_by("month")
)
# SQL: SELECT DATE_TRUNC('month', created_at) AS month,
#             SUM(total) AS revenue, COUNT(id) AS orders
#      FROM orders_order
#      GROUP BY month ORDER BY month

# 使用 Case/When 进行条件注释
products_with_tier = Product.objects.annotate(
    price_tier=Case(

(price__lt=25, =(预算)),
(price__lt=100, =(中档)),
(price__lt=500, =(高端)),
默认=(奢华),


output_field=models.CharField(),
    )
)
# 每个产品现在都有 .price_tier: "经济型"、"中档"、"高档" 或 "奢华"

select_related 和 prefetch_related: N+1 问题的解决方案

# 场景:显示订单及其用户、送货地址和订单项

# 不佳 — N+1 查询

orders = Order.objects.all()
for order in orders:           # 1 查询 (SELECT orders)
    user = order.user          # N 查询 (SELECT user WHERE id=X)
    addr = order.shipping_address  # N 查询
    for item in order.items.all():  # N 查询 (SELECT items WHERE order=X)
        name = item.product.name    # N*M 查询

# 好 — 总共 3 个查询,无论规模如何
orders = (

Order.objects
# select_related: SQL JOIN 用于外键/一对一关系
    .select_related("user", "shipping_address")
# prefetch_related: 针对多对多/反向外键的单独批量查询
    .prefetch_related("items__product__category")
.filter(status="processing")
.order_by("-created_at")
)
# 查询 1: SELECT orders + JOIN users + JOIN addresses WHERE status='processing'
# 查询 2: SELECT items WHERE order_id IN (1, 2, 3, ...)
# 查询 3: SELECT products + JOIN categories WHERE id IN (...)

自定义预取对象:带过滤器的预取

from django.db.models import Prefetch

# 仅预取活跃的评论,预先排序并过滤

active_reviews = Review.objects.filter(is_approved=True).order_by("-created_at")

products = Product.objects.prefetch_related(
    Prefetch("reviews", queryset=active_reviews, to_attr="approved_reviews")
).active()

# 现在 product.approved_reviews 是一个预加载的列表 — 无需额外查询
for product in products:

top_review = product.approved_reviews[0] if product.approved_reviews else None

批量操作:一次查询处理数十行


from django.db import transaction

# bulk_create: 在一个INSERT语句中插入多行
new_tags = Tag.objects.bulk_create([
    Tag(name="python"),
    Tag(name="django"),
    Tag(name="backend"),
], ignore_conflicts=True)  # ← 如果已存在则跳过
# bulk_update: 在一个UPDATE语句中更新多行

products_to_reactivate = Product.objects.filter(status="archived", stock__gt=0)
for product in products_to_reactivate:
    product.status = "active"

Product.objects.bulk_update(products_to_reactivate, fields=["status"])
# SQL: UPDATE catalog_product SET status=CASE WHEN id=1 THEN 'active' ... END

# update(): 单个 SQL 更新 — 无需模型实例化

Product.objects.filter(
category__slug="electronics",
stock=0,
).update(status="archived")
# SQL: UPDATE catalog_product SET status='archived'
#      WHERE category_id IN (SELECT id FROM category WHERE slug='electronics')
#      AND stock = 0



 

 

 

iterator() 方法:内存高效的大型查询集处理

 

 

# 不好 — 一次性加载所有 500,000 行到内存中
for order in Order.objects.filter(status="pending"):
    process_order(order)  # 在规模上存在 OOM 风险

# 好 — 分块获取,内存占用大大降低

for order in Order.objects.filter(status="pending").iterator(chunk_size=2000):
    process_order(order)

# 使用预取(手动,因为 prefetch_related 不适用于迭代器)

for order in Order.objects.filter(status="pending").select_related("user").iterator(chunk_size=2000):
    process_order(order)

在这段代码中,我们使用了一个循环来处理状态为“待处理”的订单。首先,通过 `Order.objects.filter(status=”pending”)` 过滤出所有状态为“待处理”的订单。接着,使用 `select_related(“user”)` 方法来预加载与订单相关的用户信息,以提高查询效率。最后,使用 `iterator(chunk_size=2000)` 方法以每次处理2000个订单的方式进行迭代处理。每个订单都会通过 `process_order(order)` 函数进行处理。




 

模型方法与管理器方法

 

 

class Order(TimestampedModel):
    # ... 字段 ...

    # ── 模型方法:行级,实例特定逻辑 ──
    def get_subtotal(self) -> Decimal:
        """所有商品价格的总和。需要预先获取商品。"""
        return sum(item.subtotal for item in self.items.all())

def can_be_cancelled(self) -> bool:
        return self.status in ("pending", "processing")

def cancel(self) -> None:
        if not self.can_be_cancelled():

raise ValueError(f"订单 #{self.pk} 无法从状态 '{self.status}'")
self.status = "已取消"
self.save(update_fields=["status", "updated_at"])

# ── 使用管理器/服务进行表级操作 ──
    # 不要在模型方法中放置批量查询或跨表逻辑。








 

事务:原子性保证

 

 

from django.db import transaction

def create_order_with_items(user, cart_items: list) -> Order:
    """
    原子性地创建一个订单及其订单项。
    如果任何操作失败,整个操作将回滚。
    不会有部分订单到达数据库。

with transaction.atomic():
    order = Order.objects.create(
        user=user,
        status=Order.Status.PENDING,
        total=sum(item["price"] * item["quantity"] for item in cart_items),
    )

    order_items = [

OrderItem(
    order=order,
    product_id=item["product_id"],
    quantity=item["quantity"],
    unit_price=item["price"],
)
for item in cart_items
]
OrderItem.objects.bulk_create(order_items)

# 原子性更新库存 — 无竞争条件

        for item in cart_items:
            updated = Product.objects.filter(
                pk=item["product_id"],
                stock__gte=item["quantity"],   # 仅在库存充足时更新
            ).update(stock=F("stock") - item["quantity"])

if updated == 0:
                raise ValueError(f"产品 {item['product_id']}")

        return order

性能优化

1. 使用 only()defer() 限制列的提取

加载不需要的列会浪费 I/O 带宽和内存。 only() 是白名单方法; defer() 是黑名单方法:

# 仅加载您实际使用的列
product_list = (
    Product.objects
    .active()
    .only("id", "name", "slug", "price", "stock")  # ← 5 列而不是 20 列


.order_by("name")
)
# SQL: SELECT id, name, slug, price, stock FROM catalog_product WHERE status='active'

# 延迟加载已知占用资源且很少需要的字段
products = Product.objects.defer("description", "metadata", "image_data")

 

重要:访问实例上的延迟字段会为每个实例触发额外的查询。当你知道所需的确切字段时,请使用 only()

2. 对于只读数据使用 values()values_list()

当你不需要模型实例时——例如在API、导出、聚合中——可以完全跳过对象实例化:

# 返回字典而不是模型实例——更快,内存占用更少
products = (
    Product.objects
    .active()


.values("id", "name", "price", "stock")
)
# [{"id": 1, "name": "Widget", "price": "29.99", "stock": 100}, ...]

# 返回元组 — 更轻量
product_ids = Product.objects.active().values_list("id", flat=True)
# (1, 2, 3, 4, ...)

# 直接获取字典: {id: name}

id_name_map = dict(Product.objects.values_list("id", "name"))

3. 适当使用 exists()count()


# 不好 — 加载整个 QuerySet 来检查存在性
if len(Order.objects.filter(user=user, status="pending")) > 0:
    ...

# 好 — SELECT 1 LIMIT 1 — 没有加载行
if Order.objects.filter(user=user, status="pending").exists():
    ...

# 不好 — 计算 QuerySet

count = len(Product.objects.filter(status="active"))

# 好 — SELECT COUNT(*) — 单个聚合查询
count = Product.objects.filter(status="active").count()




 

4. 在测试中强制执行查询预算

 

 

# apps/orders/tests/test_selectors.py
from django.test import TestCase
from apps.orders.selectors import get_orders_for_user
from apps.orders.tests.factories import OrderFactory, OrderItemFactory

class TestOrderSelectorQueryCount(TestCase):
    """
    锁定关键选择器的查询计数。
    如果有人添加了 N+1 查询,这个测试会立即失败
    — 而不是在凌晨 2 点的生产监控中。
    """

def setUp(self):
    self.user = UserFactory()
    orders = OrderFactory.create_batch(10, user=self.user)
    for order in orders:
        OrderItemFactory.create_batch(5, order=order)

def test_get_orders_query_count(self):
    with self.assertNumQueries(3):

这段代码定义了一个测试类的方法,其中 `setUp` 方法用于初始化测试环境,创建一个用户并生成10个订单,每个订单包含5个订单项。接着,`test_get_orders_query_count` 方法用于测试查询的数量,期望在执行过程中只产生3个数据库查询。


# 1: 订单 + JOIN 用户 + JOIN 地址
            # 2: 预取订单项
            # 3: 预取产品
            list(get_orders_for_user(user_id=self.user.id))

5. 数据库索引:最具影响力的单一变更


class Order(TimestampedModel):
    class Meta:
        indexes = [
            # 复合索引:最常见的查询 — “用户的订单状态”
            models.Index(
                fields=["user", "status"],
                name="idx_order_user_status",
            ),
            # 按创建时间降序排列以便默认排序
            models.Index(
                fields=["-created_at"],

name="idx_order_created_at_desc",
            ),
        ]

        # 部分索引(PostgreSQL):仅索引未取消的订单
        # 更小、更快的索引,适用于热点查询路径
        # 通过自定义迁移添加 PostgreSQL 特定语法:
        # CREATE INDEX idx_active_orders ON orders_order(user_id)
        # WHERE status != 'cancelled';

要识别缺失的索引,请在慢查询上使用 EXPLAIN ANALYZE


 

# 在 Django shell 中 — 检查慢查询选择器的查询计划
from django.db import connection

with connection.cursor() as cursor:
    cursor.execute("""
        EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
        SELECT * FROM orders_order
        WHERE user_id = 1 AND status = 'processing'
        ORDER BY created_at DESC
        LIMIT 20
    """)
    for row in cursor.fetchall():
        print(row[0])







 

 


安全最佳实践

 

1. SQL 注入:ORM 保护了什么,未保护什么

 

Django ORM 自动对所有查询进行参数化——字段查找和 filter() 调用从未受到 SQL 注入的威胁。然而,仍然存在一些需要手动处理的逃生通道:

 

 

# 安全 — ORM 对所有值进行参数化

Product.objects.filter(name=user_input)
# SQL: SELECT ... WHERE name = %s -- [user_input]

# 危险 — 原始 SQL 语句与字符串格式化
Product.objects.raw(f"SELECT * FROM catalog_product WHERE name = '{user_input}'")
# ← 永远不要这样做。 user_input = "'; DROP TABLE catalog_product; --"

# 安全 — 原始 SQL 语句与参数化
Product.objects.raw(
    "SELECT * FROM catalog_product WHERE name = %s",
    [user_input]  # ← 始终使用参数化占位符
)

# 危险 — 使用未转义的用户输入调用 extra()
Product.objects.extra(where=[f"name = '{user_input}'"])  # ← 永远不要

# 安全 — 使用参数调用 extra()
Product.objects.extra(where=["name = %s"], params=[user_input])






 

2. 防止批量赋值

 

切勿直接将 request.datarequest.POST 传递给 Model.objects.create():

 

 

# 危险 — 攻击者可以设置任何模型字段
order = Order.objects.create(**request.POST.dict())  # ← 切勿这样做

# 安全 — 通过序列化器白名单字段
from apps.orders.serializers import CreateOrderSerializer

serializer = CreateOrderSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
order = OrderService.create(**serializer.validated_data)

3. 通过对象级作用域防止 IDOR

# 危险 — 任何经过身份验证的用户都可以通过 ID 访问任何订单
order = get_object_or_404(Order, pk=pk)  # ← IDOR 漏洞

# 安全 — 始终限制在经过身份验证的用户范围内
order = get_object_or_404(Order, pk=pk, user=request.user)

# 更好 — 在选择器中使用范围查询集



def get_order_for_user(user, order_id: int) -> Order:
    return get_object_or_404(
        Order.objects.filter(user=user),
        pk=order_id,
    )

 

4. 在 .save() 中使用 update_fields 以提高精确度

 

 

# 不推荐 — 保存所有字段;有覆盖并发更新的风险
product.status = "archived"
product.save()  # ← 重写所有列,包括任何不在内存中的列

# 推荐 — 仅更新你更改的特定列
product.status = "archived"

product.save(update_fields=["status", "updated_at"])
# SQL: UPDATE catalog_product SET status='archived', updated_at=NOW() WHERE id=42

5. 在数据库层面使用约束以确保数据完整性

 

Django模型约束在数据库层面强制执行业务规则——最后的防线:

class OrderItem(models.Model):
    quantity   = models.PositiveIntegerField()
    unit_price = models.DecimalField(max_digits=10, decimal_places=2)

    class Meta:
        constraints = [
            models.CheckConstraint(

check=models.Q(quantity__gt=0),
                name="orderitem_positive_quantity",
            ),
            models.CheckConstraint(
                check=models.Q(unit_price__gte=0),
                name="orderitem_non_negative_price",
            ),
            models.UniqueConstraint(


fields=["order", "product"],
name="orderitem_unique_order_product",
),
]

常见开发者错误


❌ 错误 1:过早评估 QuerySets

 

 

# 不好 — 立即评估 QuerySet,失去链式调用能力
def get_active_products():
    return list(Product.objects.filter(status="active"))  # ← 已评估!

# 无法执行:get_active_products().filter(category=electronics)

# 好 — 返回 QuerySet,让调用者决定何时评估
def get_active_products():

return Product.objects.filter(status="active")  # ← 懒惰,链式调用

# 调用者现在可以进一步筛选:
products = get_active_products().filter(category=electronics).order_by("-price")




 

❌ 错误 2:使用 len() 而不是 .count()

 

 

# 不好 — 为了计数而将所有行加载到内存中
if len(Order.objects.filter(user=user)) > 0:
    ...
n = len(Product.objects.all())   # 可能加载数百万行


# 好 — 单个 COUNT(*) 查询,加载零行
if Order.objects.filter(user=user).exists():
    ...
n = Product.objects.count()

❌ 错误 3:错误地组合多个注解

 

使用 annotate() 组合多个聚合会产生错误的结果,因为使用的是连接而不是子查询。

 

 

# 不好 — 由于多个 JOIN 导致重复行;author__count 将是错误的
books = Book.objects.annotate(Count("authors"), Count("stores"))
# books[0].authors__count → 错误(膨胀)

# 好 — 使用 distinct=True 来修复重复计数
books = Book.objects.annotate(

author_count=Count("authors", distinct=True),
    store_count=Count("stores", distinct=True),
)


❌ 错误 4:在循环中调用 .save()

 

 

# 不好 — 对于 1,000 个产品执行 1,000 次 UPDATE 查询
products = Product.objects.filter(category=electronics)
for product in products:
    product.price *= 0.9  # 10% 折扣
    product.save()        # ← 每个产品一个查询

# 好 — 单个 UPDATE 查询

Product.objects.filter(category=electronics).update(
price=F("price") * Decimal("0.9")
)

❌ 错误 5:在未预取的情况下访问相关对象

# 不好 — 模板循环触发 N+1
{% for order in orders %}
    {{ order.user.email }}      {# 每个订单的数据库访问 #}
    {{ order.items.count() }}   {# 每个订单的数据库访问 #}
{% endfor %}

# 好 — 在传递给模板之前进行预取


orders = Order.objects.select_related("user").prefetch_related("items")

❌ 错误 6:在带注解的 .values() 后使用 .filter()


 

# 不良示例 — 注解时过滤顺序很重要
# 根据 Django 版本,这可能会产生意想不到的结果
result = (
    Order.objects
    .values("status")
    .annotate(total=Sum("amount"))
    .filter(status="active")  # ← 在这里是安全的,但如果顺序改变则很脆弱
)

# 良好示例 — 尽可能在注解之前进行过滤
result = (
    Order.objects

.filter(status="active")   # ← 首先进行过滤
    .values("status")
    .annotate(total=Sum("amount"))
)


实际生产使用案例

 

使用案例 1:电子商务仪表板 — 复杂聚合

 

产品经理的仪表板需要实时统计数据:按类别的收入、畅销产品、低库存警报、转化率。所有计算都在数据库中完成 — 不使用 Python 循环:

 

 

# apps/analytics/selectors.py
from django.db.models import Sum, Count, Avg, F, Q, ExpressionWrapper, DecimalField
from django.db.models.functions import TruncMonth, Coalesce

def get_dashboard_stats(start_date, end_date) -> dict:
    """
    返回完整的仪表板数据,共需4个查询。
    如果采用简单的方法,可能需要100次以上的Python迭代。
    """
    from apps.orders.models import Order, OrderItem
    from apps.catalog.models import Category

    # 查询1:本期按类别的收入
    revenue_by_category = (
        OrderItem.objects
        .filter(order__created_at__range=(start_date, end_date))

.values("product__category__name")
        .annotate(
            revenue=Sum(
                ExpressionWrapper(
                    F("quantity") * F("unit_price"),
                    output_field=DecimalField(max_digits=12, decimal_places=2),
                )
            ),

units_sold=Sum("quantity"),
        )
        .order_by("-revenue")[:10]
    )

    # 查询 2:月度趋势
    monthly_trend = (
        Order.objects
        .filter(created_at__range=(start_date, end_date))
        .annotate(month=TruncMonth("created_at"))

.values("month")
        .annotate(
            revenue=Sum("total"),
            order_count=Count("id"),
            avg_order_value=Avg("total"),
        )
        .order_by("month")
    )

    # 查询 3:按收入排序的热门产品
    top_products = (
        OrderItem.objects

.filter(order__created_at__range=(start_date, end_date))
.values("product__name", "product__id")
.annotate(
    total_revenue=Sum(F("quantity") * F("unit_price"),
                          output_field=DecimalField()),

total_units=Sum("quantity"),
        )
        .order_by("-total_revenue")[:20]
    )

    # 查询 4:低库存警报
    low_stock = (
        Product.objects
        .active()
        .filter(stock__lte=10)

.values("id", "name", "stock", "category__name")
        .order_by("stock")
    )

    return {
        "revenue_by_category": list(revenue_by_category),
        "monthly_trend": list(monthly_trend),
        "top_products": list(top_products),

"low_stock_alerts": list(low_stock),
    }

用例 2:大数据导出 — 内存高效迭代器模式

财务团队需要将 500,000 个订单导出为 CSV。将所有数据加载到内存中会导致服务器崩溃:

# apps/exports/services.py

import csv
from django.http import StreamingHttpResponse
from apps.orders.models import Order

class EchoWriter:
    """类似文件的对象,可以回显其输入。用于 StreamingHttpResponse。"""
    def write(self, value):
        return value

def stream_orders_csv(queryset) -> StreamingHttpResponse:
    """
    流式导出大量 CSV 数据,而不将所有内容加载到内存中。
    使用 Django 的 StreamingHttpResponse + ORM 的 iterator() 实现 O(1) 的内存使用。
    """

writer = csv.writer(EchoWriter())

def generate_rows():
    yield writer.writerow(["ID", "用户邮箱", "状态", "总计", "创建时间"])

    for order in (
        queryset
        .select_related("用户")

.only("id", "user__email", "status", "total", "created_at")
            .iterator(chunk_size=2000)
        ):
            yield writer.writerow([
                order.id,
                order.user.email,
                order.status,

str(order.total),
                order.created_at.isoformat(),
            ])

    response = StreamingHttpResponse(generate_rows(), content_type="text/csv")
    response["Content-Disposition"] = 'attachment; filename="orders.csv"'
    return response






 

用例 3:通过自定义管理器实现 SaaS 多租户数据隔离

 

 

# apps/core/managers.py
from django.db import models

class TenantQuerySet(models.QuerySet):
    def for_tenant(self, tenant_id: int):

return self.filter(tenant_id=tenant_id)

class TenantManager(models.Manager):
    def get_queryset(self):
        return TenantQuerySet(self.model, using=self._db)

    def for_tenant(self, tenant_id: int):
        return self.get_queryset().for_tenant(tenant_id)

# apps/projects/models.py
class Project(TimestampedModel):
    tenant = models.ForeignKey("tenants.Tenant", on_delete=models.CASCADE)
    name = models.CharField(max_length=255)
    # ...
    objects = TenantManager()

# 使用 — 在结构上不可能跨越租户边界

projects = Project.objects.for_tenant(request.tenant.id)

结论

Django ORM 是一款精密的工具。正确使用时,它是任何语言中最具表现力、安全性和性能的数据库接口之一。若使用不当,则会导致 N+1 查询、内存膨胀以及数据完整性错误,这些问题在规模扩大时表现不佳且不可预测。

本指南中的模式——懒惰的 QuerySet 链接、可组合 QuerySet 的自定义管理器、用于原子更新的 F 表达式、用于数据库端计算的 annotate()、用于关系加载的 select_relatedprefetch_related、用于内存高效批处理的 iterator() 以及用于数据完整性的模型级约束——并不是高级技巧。这些是生产质量 Django 代码的基础。

最重要的原则是:在最低层次上进行工作。聚合操作应在 SQL 中进行,而不是在 Python 循环中。对象图应在预取的 QuerySet 中,而不是重复的数据库调用中。业务规则应在数据库约束中,而不仅仅是在应用代码中。

故意编写你的模型。懒惰地构建你的 QuerySet。将计算推向更低层次。通过测试锁定你的查询计数。在假设查询足够快以用于生产之前,使用 EXPLAIN ANALYZE

数据库是基础。要像它重要一样构建它。


进一步阅读


由一位构建生产Django系统的Python后端工程师撰写。主题:Django ORM、QuerySet、N+1查询、F表达式、注解、聚合、select_related、prefetch_related、自定义管理器、数据库优化。

更多