python-backend-expert
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePython Backend Expert
Python后端专家
When to Use
适用场景
Activate this skill when:
- Creating or modifying FastAPI route handlers (endpoints)
- Defining or updating Pydantic v2 request/response schemas
- Writing SQLAlchemy 2.0 async models, queries, or relationships
- Implementing the repository pattern for data access
- Writing service layer business logic
- Creating or running Alembic migrations
- Setting up dependency injection chains with
Depends() - Handling errors across the route/service/repository layers
Do NOT use this skill for:
- Writing tests for backend code (use )
pytest-patterns - FastAPI framework mechanics — middleware, WebSockets, OpenAPI customization, CORS, lifespan (use )
fastapi-patterns - Deployment or CI/CD pipeline configuration (use )
deployment-pipeline - API contract design or endpoint planning (use )
api-design-patterns - Architecture decisions or layer design (use )
system-architecture
在以下场景激活此技能:
- 创建或修改FastAPI路由处理器(端点)
- 定义或更新Pydantic v2请求/响应 schema
- 编写SQLAlchemy 2.0异步模型、查询或关联关系
- 实现数据访问的仓储模式
- 编写服务层业务逻辑
- 创建或运行Alembic迁移
- 使用设置依赖注入链
Depends() - 处理路由/服务/仓储层的错误
请勿在以下场景使用此技能:
- 编写后端代码测试(请使用)
pytest-patterns - FastAPI框架机制——中间件、WebSockets、OpenAPI定制、CORS、生命周期(请使用)
fastapi-patterns - 部署或CI/CD流水线配置(请使用)
deployment-pipeline - API契约设计或端点规划(请使用)
api-design-patterns - 架构决策或分层设计(请使用)
system-architecture
Instructions
操作指南
Project Structure
项目结构
app/
├── main.py # FastAPI application factory
├── core/
│ ├── config.py # pydantic-settings configuration
│ ├── database.py # Async engine, session factory
│ └── security.py # Password hashing, JWT utilities
├── models/ # SQLAlchemy ORM models
│ ├── __init__.py
│ ├── base.py # Declarative base
│ └── user.py
├── schemas/ # Pydantic v2 schemas
│ ├── __init__.py
│ └── user.py
├── repositories/ # Data access layer
│ ├── __init__.py
│ └── user_repo.py
├── services/ # Business logic layer
│ ├── __init__.py
│ └── user_service.py
├── routes/ # FastAPI routers
│ ├── __init__.py
│ └── users.py
├── dependencies/ # Reusable Depends() providers
│ ├── __init__.py
│ └── auth.py
└── exceptions.py # Domain exception classesapp/
├── main.py # FastAPI应用工厂
├── core/
│ ├── config.py # pydantic-settings配置
│ ├── database.py # 异步引擎、会话工厂
│ └── security.py # 密码哈希、JWT工具
├── models/ # SQLAlchemy ORM模型
│ ├── __init__.py
│ ├── base.py # 声明式基类
│ └── user.py
├── schemas/ # Pydantic v2 schema
│ ├── __init__.py
│ └── user.py
├── repositories/ # 数据访问层
│ ├── __init__.py
│ └── user_repo.py
├── services/ # 业务逻辑层
│ ├── __init__.py
│ └── user_service.py
├── routes/ # FastAPI路由
│ ├── __init__.py
│ └── users.py
├── dependencies/ # 可复用的Depends()提供者
│ ├── __init__.py
│ └── auth.py
└── exceptions.py # 领域异常类FastAPI Endpoint Pattern
FastAPI端点模式
Every endpoint follows this structure:
python
router = APIRouter(prefix="/users", tags=["Users"])
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
data: UserCreate,
session: AsyncSession = Depends(get_async_session),
) -> UserResponse:
service = UserService(session)
try:
user = await service.create_user(data)
return UserResponse.model_validate(user)
except ConflictError as e:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
session: AsyncSession = Depends(get_async_session),
) -> UserResponse:
service = UserService(session)
try:
user = await service.get_user(user_id)
return UserResponse.model_validate(user)
except NotFoundError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))Rules:
- Routes handle HTTP concerns only: status codes, , response formatting
HTTPException - Routes call services, never repositories directly
- Use for automatic response serialization and OpenAPI docs
response_model - Use constants, not bare integers
status.HTTP_* - Use for session, auth, and service injection
Depends()
每个端点遵循以下结构:
python
router = APIRouter(prefix="/users", tags=["Users"])
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
data: UserCreate,
session: AsyncSession = Depends(get_async_session),
) -> UserResponse:
service = UserService(session)
try:
user = await service.create_user(data)
return UserResponse.model_validate(user)
except ConflictError as e:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
session: AsyncSession = Depends(get_async_session),
) -> UserResponse:
service = UserService(session)
try:
user = await service.get_user(user_id)
return UserResponse.model_validate(user)
except NotFoundError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))规则:
- 路由仅处理HTTP相关事宜:状态码、、响应格式化
HTTPException - 路由调用服务,绝不直接调用仓储
- 使用实现自动响应序列化和OpenAPI文档生成
response_model - 使用常量,而非裸整数
status.HTTP_* - 使用注入会话、认证和服务
Depends()
Repository Pattern
仓储模式
Repositories encapsulate all database access:
python
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.user import User
class UserRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def get_by_id(self, user_id: int) -> User | None:
result = await self._session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_by_email(self, email: str) -> User | None:
result = await self._session.execute(
select(User).where(User.email == email)
)
return result.scalar_one_or_none()
async def list_with_posts(
self, *, offset: int = 0, limit: int = 20
) -> list[User]:
result = await self._session.execute(
select(User)
.options(selectinload(User.posts))
.offset(offset)
.limit(limit)
)
return list(result.scalars().all())
async def create(self, user: User) -> User:
self._session.add(user)
await self._session.flush()
await self._session.refresh(user)
return user
async def update(self, user: User, **kwargs: object) -> User:
for key, value in kwargs.items():
setattr(user, key, value)
await self._session.flush()
await self._session.refresh(user)
return user
async def delete(self, user: User) -> None:
await self._session.delete(user)
await self._session.flush()Rules:
- One repository per model (or aggregate root)
- Repositories return model instances or — never HTTP responses
None - No business logic in repositories
- Always +
flush()afterrefresh()to get generated fields (id, timestamps)add() - Use for eager loading relationships in async context
selectinload() - Never raise from repositories
HTTPException
仓储封装所有数据库访问逻辑:
python
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.user import User
class UserRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def get_by_id(self, user_id: int) -> User | None:
result = await self._session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_by_email(self, email: str) -> User | None:
result = await self._session.execute(
select(User).where(User.email == email)
)
return result.scalar_one_or_none()
async def list_with_posts(
self, *, offset: int = 0, limit: int = 20
) -> list[User]:
result = await self._session.execute(
select(User)
.options(selectinload(User.posts))
.offset(offset)
.limit(limit)
)
return list(result.scalars().all())
async def create(self, user: User) -> User:
self._session.add(user)
await self._session.flush()
await self._session.refresh(user)
return user
async def update(self, user: User, **kwargs: object) -> User:
for key, value in kwargs.items():
setattr(user, key, value)
await self._session.flush()
await self._session.refresh(user)
return user
async def delete(self, user: User) -> None:
await self._session.delete(user)
await self._session.flush()规则:
- 每个模型(或聚合根)对应一个仓储
- 仓储返回模型实例或——绝不返回HTTP响应
None - 仓储中不包含业务逻辑
- 在后务必调用
add()+flush()以获取生成字段(id、时间戳)refresh() - 在异步环境中使用预加载关联关系
selectinload() - 绝不从仓储中抛出
HTTPException
Service Layer Pattern
服务层模式
Services contain business logic and orchestrate repositories:
python
from app.exceptions import ConflictError, NotFoundError
from app.models.user import User
from app.repositories.user_repo import UserRepository
from app.schemas.user import UserCreate, UserPatch
from app.core.security import hash_password
class UserService:
def __init__(self, session: AsyncSession) -> None:
self.repo = UserRepository(session)
async def create_user(self, data: UserCreate) -> User:
# Business rule: email must be unique
existing = await self.repo.get_by_email(data.email)
if existing:
raise ConflictError(f"Email {data.email} already registered")
# Business logic: hash password before storing
user = User(
email=data.email,
hashed_password=hash_password(data.password),
display_name=data.display_name,
)
return await self.repo.create(user)
async def get_user(self, user_id: int) -> User:
user = await self.repo.get_by_id(user_id)
if user is None:
raise NotFoundError(f"User {user_id} not found")
return user
async def update_user(self, user_id: int, data: UserPatch) -> User:
user = await self.get_user(user_id)
update_fields = data.model_dump(exclude_unset=True)
if "password" in update_fields:
update_fields["hashed_password"] = hash_password(update_fields.pop("password"))
return await self.repo.update(user, **update_fields)Rules:
- Services raise domain exceptions (,
NotFoundError), NEVERConflictErrorHTTPException - Services are the only place for business logic
- Services call repositories for data access, never run raw queries
- Services receive via constructor and create their own repository instances
AsyncSession - Services validate business rules before calling repositories
服务层包含业务逻辑并协调仓储:
python
from app.exceptions import ConflictError, NotFoundError
from app.models.user import User
from app.repositories.user_repo import UserRepository
from app.schemas.user import UserCreate, UserPatch
from app.core.security import hash_password
class UserService:
def __init__(self, session: AsyncSession) -> None:
self.repo = UserRepository(session)
async def create_user(self, data: UserCreate) -> User:
# 业务规则:邮箱必须唯一
existing = await self.repo.get_by_email(data.email)
if existing:
raise ConflictError(f"Email {data.email} already registered")
# 业务逻辑:存储前哈希密码
user = User(
email=data.email,
hashed_password=hash_password(data.password),
display_name=data.display_name,
)
return await self.repo.create(user)
async def get_user(self, user_id: int) -> User:
user = await self.repo.get_by_id(user_id)
if user is None:
raise NotFoundError(f"User {user_id} not found")
return user
async def update_user(self, user_id: int, data: UserPatch) -> User:
user = await self.get_user(user_id)
update_fields = data.model_dump(exclude_unset=True)
if "password" in update_fields:
update_fields["hashed_password"] = hash_password(update_fields.pop("password"))
return await self.repo.update(user, **update_fields)规则:
- 服务层抛出领域异常(、
NotFoundError),绝不抛出ConflictErrorHTTPException - 服务层是唯一存放业务逻辑的地方
- 服务层调用仓储进行数据访问,绝不执行原生查询
- 服务层通过构造函数接收并创建自己的仓储实例
AsyncSession - 服务层在调用仓储前验证业务规则
Domain Exceptions
领域异常
Define a hierarchy of domain exceptions:
python
class AppError(Exception):
"""Base application error."""
class NotFoundError(AppError):
"""Resource not found."""
class ConflictError(AppError):
"""Resource conflict (duplicate, version mismatch)."""
class ValidationError(AppError):
"""Business rule violation."""
class PermissionError(AppError):
"""Insufficient permissions."""Register global exception handlers in the FastAPI app:
python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
app = FastAPI()
@app.exception_handler(NotFoundError)
async def not_found_handler(request: Request, exc: NotFoundError) -> JSONResponse:
return JSONResponse(status_code=404, content={"detail": str(exc), "code": "NOT_FOUND"})
@app.exception_handler(ConflictError)
async def conflict_handler(request: Request, exc: ConflictError) -> JSONResponse:
return JSONResponse(status_code=409, content={"detail": str(exc), "code": "CONFLICT"})This allows services to raise domain exceptions without knowing about HTTP, and routes don't need try/except blocks.
定义领域异常的层级结构:
python
class AppError(Exception):
"""基础应用错误。"""
class NotFoundError(AppError):
"""资源未找到。"""
class ConflictError(AppError):
"""资源冲突(重复、版本不匹配)。"""
class ValidationError(AppError):
"""业务规则违反。"""
class PermissionError(AppError):
"""权限不足。"""在FastAPI应用中注册全局异常处理器:
python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
app = FastAPI()
@app.exception_handler(NotFoundError)
async def not_found_handler(request: Request, exc: NotFoundError) -> JSONResponse:
return JSONResponse(status_code=404, content={"detail": str(exc), "code": "NOT_FOUND"})
@app.exception_handler(ConflictError)
async def conflict_handler(request: Request, exc: ConflictError) -> JSONResponse:
return JSONResponse(status_code=409, content={"detail": str(exc), "code": "CONFLICT"})这使得服务层可以在不了解HTTP的情况下抛出领域异常,路由也无需编写try/except块。
Pydantic v2 Schema Conventions
Pydantic v2 Schema约定
python
from datetime import datetime
from pydantic import BaseModel, ConfigDict, EmailStr, Field
class UserCreate(BaseModel):
"""POST request body — writable fields only, no id/timestamps."""
email: EmailStr
password: str = Field(min_length=8, max_length=128)
display_name: str = Field(min_length=1, max_length=100)
class UserPatch(BaseModel):
"""PATCH request body — all fields Optional."""
email: EmailStr | None = None
password: str | None = Field(default=None, min_length=8, max_length=128)
display_name: str | None = Field(default=None, min_length=1, max_length=100)
class UserResponse(BaseModel):
"""Response body — all fields including id and timestamps."""
model_config = ConfigDict(from_attributes=True)
id: int
email: str
display_name: str
is_active: bool
created_at: datetime
updated_at: datetimeKey Pydantic v2 patterns:
- Use instead of
ConfigDict(from_attributes=True)class Config: orm_mode = True - Use instead of
model_validate()from_orm() - Use instead of
model_dump().dict() - Use for PATCH to distinguish "not sent" from "set to null"
model_dump(exclude_unset=True) - Use for validation constraints
Field() - Use syntax (Python 3.12+), not
str | NoneOptional[str]
python
from datetime import datetime
from pydantic import BaseModel, ConfigDict, EmailStr, Field
class UserCreate(BaseModel):
"""POST请求体——仅包含可写字段,无id/时间戳。"""
email: EmailStr
password: str = Field(min_length=8, max_length=128)
display_name: str = Field(min_length=1, max_length=100)
class UserPatch(BaseModel):
"""PATCH请求体——所有字段均为可选。"""
email: EmailStr | None = None
password: str | None = Field(default=None, min_length=8, max_length=128)
display_name: str | None = Field(default=None, min_length=1, max_length=100)
class UserResponse(BaseModel):
"""响应体——包含所有字段,包括id和时间戳。"""
model_config = ConfigDict(from_attributes=True)
id: int
email: str
display_name: str
is_active: bool
created_at: datetime
updated_at: datetimePydantic v2核心模式:
- 使用替代
ConfigDict(from_attributes=True)class Config: orm_mode = True - 使用替代
model_validate()from_orm() - 使用替代
model_dump().dict() - 使用处理PATCH请求,区分"未发送"和"设为null"
model_dump(exclude_unset=True) - 使用定义验证约束
Field() - 使用语法(Python 3.12+),而非
str | NoneOptional[str]
Async Session Management
异步会话管理
python
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from app.core.config import settings
engine = create_async_engine(
settings.database_url,
echo=settings.debug,
pool_size=5,
max_overflow=10,
pool_pre_ping=True,
)
async_session_factory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
async with session.begin():
yield sessionRules:
- prevents detached instance errors after commit
expire_on_commit=False - context manager auto-commits on success, rolls back on exception
session.begin() - One session per request via
Depends(get_async_session) - Never share sessions across concurrent tasks
- For background tasks, create a new session — never reuse the request session
python
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from app.core.config import settings
engine = create_async_engine(
settings.database_url,
echo=settings.debug,
pool_size=5,
max_overflow=10,
pool_pre_ping=True,
)
async_session_factory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
async with session.begin():
yield session规则:
- 防止提交后出现实例分离错误
expire_on_commit=False - 上下文管理器会在成功时自动提交,异常时自动回滚
session.begin() - 通过为每个请求分配一个会话
Depends(get_async_session) - 绝不要在并发任务间共享会话
- 对于后台任务,创建新会话——绝不要复用请求会话
SQLAlchemy 2.0 Model Pattern
SQLAlchemy 2.0模型模式
python
from datetime import datetime
from sqlalchemy import String, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(255))
display_name: Mapped[str] = mapped_column(String(100))
is_active: Mapped[bool] = mapped_column(default=True)
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(
server_default=func.now(), onupdate=func.now()
)
# Relationships — ALWAYS use selectin or joined for async
posts: Mapped[list["Post"]] = relationship(
back_populates="author", lazy="selectin"
)Rules:
- Use annotations (SQLAlchemy 2.0 style)
Mapped[type] - Use instead of
mapped_column()Column() - Set on relationships for async compatibility
lazy="selectin" - Use for database-generated defaults
server_default - Always include and
created_attimestampsupdated_at
python
from datetime import datetime
from sqlalchemy import String, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(255))
display_name: Mapped[str] = mapped_column(String(100))
is_active: Mapped[bool] = mapped_column(default=True)
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(
server_default=func.now(), onupdate=func.now()
)
# 关联关系——异步环境下务必使用selectin或joined
posts: Mapped[list["Post"]] = relationship(
back_populates="author", lazy="selectin"
)规则:
- 使用注解(SQLAlchemy 2.0风格)
Mapped[type] - 使用替代
mapped_column()Column() - 为关联关系设置以兼容异步环境
lazy="selectin" - 使用定义数据库生成的默认值
server_default - 务必包含和
created_at时间戳updated_at
Alembic Migration Workflow
Alembic迁移工作流
bash
undefinedbash
undefinedGenerate migration from model changes
根据模型变更生成迁移文件
alembic revision --autogenerate -m "add_users_table"
alembic revision --autogenerate -m "add_users_table"
Review the generated migration file before applying
应用前检查生成的迁移文件
Apply migration
应用迁移
alembic upgrade head
alembic upgrade head
Rollback one step
回滚一步
alembic downgrade -1
alembic downgrade -1
Show current revision
查看当前版本
alembic current
alembic current
Show migration history
查看迁移历史
alembic history
**Migration naming convention:**
```pythonalembic history
**迁移命名约定:**
```pythonalembic/env.py
alembic/env.py
naming_convention = {
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s",
}
**Rules:**
- Always review autogenerated migrations before applying
- Every migration must have a working `downgrade()` function
- One migration per logical schema change
- Test both upgrade and downgrade
- Use descriptive migration messages: `"add_users_table"`, `"add_email_index_to_users"`naming_convention = {
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s",
}
**规则:**
- 应用前务必检查自动生成的迁移文件
- 每个迁移必须包含可正常工作的`downgrade()`函数
- 每个逻辑 schema 变更对应一个迁移文件
- 测试升级和回滚流程
- 使用描述性迁移消息:`"add_users_table"`、`"add_email_index_to_users"`Dependency Injection Pattern
依赖注入模式
python
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_async_session
from app.services.user_service import UserService
async def get_user_service(
session: AsyncSession = Depends(get_async_session),
) -> UserService:
return UserService(session)python
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_async_session
from app.services.user_service import UserService
async def get_user_service(
session: AsyncSession = Depends(get_async_session),
) -> UserService:
return UserService(session)Chain dependencies for auth
链式依赖实现认证
async def get_current_user(
token: str = Depends(oauth2_scheme),
session: AsyncSession = Depends(get_async_session),
) -> User:
user_id = decode_token(token)
service = UserService(session)
return await service.get_user(user_id)
async def require_admin(
user: User = Depends(get_current_user),
) -> User:
if user.role != "admin":
raise HTTPException(status_code=403, detail="Admin required")
return user
undefinedasync def get_current_user(
token: str = Depends(oauth2_scheme),
session: AsyncSession = Depends(get_async_session),
) -> User:
user_id = decode_token(token)
service = UserService(session)
return await service.get_user(user_id)
async def require_admin(
user: User = Depends(get_current_user),
) -> User:
if user.role != "admin":
raise HTTPException(status_code=403, detail="Admin required")
return user
undefinedExamples
示例
Complete Request Flow
完整请求流程
A request to flows through all layers:
POST /users- Route receives (Pydantic validates the request body)
UserCreate - Route calls via
UserService.create_user(data)Depends() - Service checks business rule (email uniqueness) via
UserRepository.get_by_email() - Service hashes password, creates model instance
User - Service calls to persist
UserRepository.create(user) - Repository adds to session, flushes, refreshes to get generated fields
- Route converts the ORM model to via
UserResponsemodel_validate()
If the email is duplicate, the service raises , the global exception handler returns . No needed in the route.
ConflictError409 Conflicttry/except向发送请求的流程如下:
POST /users- 路由接收(Pydantic验证请求体)
UserCreate - 路由通过调用
Depends()UserService.create_user(data) - 服务层通过检查业务规则(邮箱唯一性)
UserRepository.get_by_email() - 服务层哈希密码,创建模型实例
User - 服务层调用持久化数据
UserRepository.create(user) - 仓储将实例添加到会话,执行flush和refresh以获取生成字段
- 路由通过将ORM模型转换为
model_validate()UserResponse
如果邮箱重复,服务层抛出,全局异常处理器返回。路由中无需编写try/except块。
ConflictError409 ConflictEdge Cases
边缘场景
-
Detached instance errors: Always call+
flush()afterrefresh(). Setsession.add()on the session factory.expire_on_commit=False -
Async session in background tasks: Never reuse the request session. Create a new session:python
async def background_job(): async with async_session_factory() as session: async with session.begin(): # do work -
N+1 queries: Usein repository queries for relationships that will be accessed. Set
selectinload()as the default on model relationships.lazy="selectin" -
Bulk operations: Usefor bulk inserts instead of adding one by one.
session.execute(insert(User).values(list_of_dicts)) -
Transaction spanning multiple services: Pass the same session to all services. The session'scontext manager handles the transaction boundary.
begin() -
Pydantic v2 computed fields: Usefor derived values in response schemas. See
@computed_field.references/pydantic-v2-migration.md
See for advanced query optimization patterns.
references/sqlalchemy-patterns.md-
实例分离错误: 在后务必调用
session.add()+flush()。在会话工厂中设置refresh()。expire_on_commit=False -
后台任务中的异步会话: 绝不要复用请求会话。创建新会话:python
async def background_job(): async with async_session_factory() as session: async with session.begin(): # 执行任务 -
N+1查询问题: 在仓储查询中对需要访问的关联关系使用。在模型关联关系中设置
selectinload()作为默认值。lazy="selectin" -
批量操作: 使用执行批量插入,而非逐个添加。
session.execute(insert(User).values(list_of_dicts)) -
跨多个服务的事务: 向所有服务传递同一个会话。会话的上下文管理器处理事务边界。
begin() -
Pydantic v2计算字段: 在响应schema中使用定义派生值。请参考
@computed_field。references/pydantic-v2-migration.md
请查看获取高级查询优化模式。
references/sqlalchemy-patterns.md