python-backend-expert

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Python Backend Expert

Python后端专家

When to Use

适用场景

Activate this skill when:
  • Creating or modifying FastAPI route handlers (endpoints)
  • Defining or updating Pydantic v2 request/response schemas
  • Writing SQLAlchemy 2.0 async models, queries, or relationships
  • Implementing the repository pattern for data access
  • Writing service layer business logic
  • Creating or running Alembic migrations
  • Setting up dependency injection chains with
    Depends()
  • Handling errors across the route/service/repository layers
Do NOT use this skill for:
  • Writing tests for backend code (use
    pytest-patterns
    )
  • FastAPI framework mechanics — middleware, WebSockets, OpenAPI customization, CORS, lifespan (use
    fastapi-patterns
    )
  • Deployment or CI/CD pipeline configuration (use
    deployment-pipeline
    )
  • API contract design or endpoint planning (use
    api-design-patterns
    )
  • Architecture decisions or layer design (use
    system-architecture
    )
在以下场景激活此技能:
  • 创建或修改FastAPI路由处理器(端点)
  • 定义或更新Pydantic v2请求/响应 schema
  • 编写SQLAlchemy 2.0异步模型、查询或关联关系
  • 实现数据访问的仓储模式
  • 编写服务层业务逻辑
  • 创建或运行Alembic迁移
  • 使用
    Depends()
    设置依赖注入链
  • 处理路由/服务/仓储层的错误
请勿在以下场景使用此技能:
  • 编写后端代码测试(请使用
    pytest-patterns
  • FastAPI框架机制——中间件、WebSockets、OpenAPI定制、CORS、生命周期(请使用
    fastapi-patterns
  • 部署或CI/CD流水线配置(请使用
    deployment-pipeline
  • API契约设计或端点规划(请使用
    api-design-patterns
  • 架构决策或分层设计(请使用
    system-architecture

Instructions

操作指南

Project Structure

项目结构

app/
├── main.py              # FastAPI application factory
├── core/
│   ├── config.py        # pydantic-settings configuration
│   ├── database.py      # Async engine, session factory
│   └── security.py      # Password hashing, JWT utilities
├── models/              # SQLAlchemy ORM models
│   ├── __init__.py
│   ├── base.py          # Declarative base
│   └── user.py
├── schemas/             # Pydantic v2 schemas
│   ├── __init__.py
│   └── user.py
├── repositories/        # Data access layer
│   ├── __init__.py
│   └── user_repo.py
├── services/            # Business logic layer
│   ├── __init__.py
│   └── user_service.py
├── routes/              # FastAPI routers
│   ├── __init__.py
│   └── users.py
├── dependencies/        # Reusable Depends() providers
│   ├── __init__.py
│   └── auth.py
└── exceptions.py        # Domain exception classes
app/
├── main.py              # FastAPI应用工厂
├── core/
│   ├── config.py        # pydantic-settings配置
│   ├── database.py      # 异步引擎、会话工厂
│   └── security.py      # 密码哈希、JWT工具
├── models/              # SQLAlchemy ORM模型
│   ├── __init__.py
│   ├── base.py          # 声明式基类
│   └── user.py
├── schemas/             # Pydantic v2 schema
│   ├── __init__.py
│   └── user.py
├── repositories/        # 数据访问层
│   ├── __init__.py
│   └── user_repo.py
├── services/            # 业务逻辑层
│   ├── __init__.py
│   └── user_service.py
├── routes/              # FastAPI路由
│   ├── __init__.py
│   └── users.py
├── dependencies/        # 可复用的Depends()提供者
│   ├── __init__.py
│   └── auth.py
└── exceptions.py        # 领域异常类

FastAPI Endpoint Pattern

FastAPI端点模式

Every endpoint follows this structure:
python
router = APIRouter(prefix="/users", tags=["Users"])

@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
    data: UserCreate,
    session: AsyncSession = Depends(get_async_session),
) -> UserResponse:
    service = UserService(session)
    try:
        user = await service.create_user(data)
        return UserResponse.model_validate(user)
    except ConflictError as e:
        raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))


@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
    user_id: int,
    session: AsyncSession = Depends(get_async_session),
) -> UserResponse:
    service = UserService(session)
    try:
        user = await service.get_user(user_id)
        return UserResponse.model_validate(user)
    except NotFoundError as e:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
Rules:
  • Routes handle HTTP concerns only: status codes,
    HTTPException
    , response formatting
  • Routes call services, never repositories directly
  • Use
    response_model
    for automatic response serialization and OpenAPI docs
  • Use
    status.HTTP_*
    constants, not bare integers
  • Use
    Depends()
    for session, auth, and service injection
每个端点遵循以下结构:
python
router = APIRouter(prefix="/users", tags=["Users"])

@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
    data: UserCreate,
    session: AsyncSession = Depends(get_async_session),
) -> UserResponse:
    service = UserService(session)
    try:
        user = await service.create_user(data)
        return UserResponse.model_validate(user)
    except ConflictError as e:
        raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))


@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
    user_id: int,
    session: AsyncSession = Depends(get_async_session),
) -> UserResponse:
    service = UserService(session)
    try:
        user = await service.get_user(user_id)
        return UserResponse.model_validate(user)
    except NotFoundError as e:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
规则:
  • 路由仅处理HTTP相关事宜:状态码、
    HTTPException
    、响应格式化
  • 路由调用服务,绝不直接调用仓储
  • 使用
    response_model
    实现自动响应序列化和OpenAPI文档生成
  • 使用
    status.HTTP_*
    常量,而非裸整数
  • 使用
    Depends()
    注入会话、认证和服务

Repository Pattern

仓储模式

Repositories encapsulate all database access:
python
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload

from app.models.user import User


class UserRepository:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def get_by_id(self, user_id: int) -> User | None:
        result = await self._session.execute(
            select(User).where(User.id == user_id)
        )
        return result.scalar_one_or_none()

    async def get_by_email(self, email: str) -> User | None:
        result = await self._session.execute(
            select(User).where(User.email == email)
        )
        return result.scalar_one_or_none()

    async def list_with_posts(
        self, *, offset: int = 0, limit: int = 20
    ) -> list[User]:
        result = await self._session.execute(
            select(User)
            .options(selectinload(User.posts))
            .offset(offset)
            .limit(limit)
        )
        return list(result.scalars().all())

    async def create(self, user: User) -> User:
        self._session.add(user)
        await self._session.flush()
        await self._session.refresh(user)
        return user

    async def update(self, user: User, **kwargs: object) -> User:
        for key, value in kwargs.items():
            setattr(user, key, value)
        await self._session.flush()
        await self._session.refresh(user)
        return user

    async def delete(self, user: User) -> None:
        await self._session.delete(user)
        await self._session.flush()
Rules:
  • One repository per model (or aggregate root)
  • Repositories return model instances or
    None
    — never HTTP responses
  • No business logic in repositories
  • Always
    flush()
    +
    refresh()
    after
    add()
    to get generated fields (id, timestamps)
  • Use
    selectinload()
    for eager loading relationships in async context
  • Never raise
    HTTPException
    from repositories
仓储封装所有数据库访问逻辑:
python
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload

from app.models.user import User


class UserRepository:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def get_by_id(self, user_id: int) -> User | None:
        result = await self._session.execute(
            select(User).where(User.id == user_id)
        )
        return result.scalar_one_or_none()

    async def get_by_email(self, email: str) -> User | None:
        result = await self._session.execute(
            select(User).where(User.email == email)
        )
        return result.scalar_one_or_none()

    async def list_with_posts(
        self, *, offset: int = 0, limit: int = 20
    ) -> list[User]:
        result = await self._session.execute(
            select(User)
            .options(selectinload(User.posts))
            .offset(offset)
            .limit(limit)
        )
        return list(result.scalars().all())

    async def create(self, user: User) -> User:
        self._session.add(user)
        await self._session.flush()
        await self._session.refresh(user)
        return user

    async def update(self, user: User, **kwargs: object) -> User:
        for key, value in kwargs.items():
            setattr(user, key, value)
        await self._session.flush()
        await self._session.refresh(user)
        return user

    async def delete(self, user: User) -> None:
        await self._session.delete(user)
        await self._session.flush()
规则:
  • 每个模型(或聚合根)对应一个仓储
  • 仓储返回模型实例或
    None
    ——绝不返回HTTP响应
  • 仓储中不包含业务逻辑
  • add()
    后务必调用
    flush()
    +
    refresh()
    以获取生成字段(id、时间戳)
  • 在异步环境中使用
    selectinload()
    预加载关联关系
  • 绝不从仓储中抛出
    HTTPException

Service Layer Pattern

服务层模式

Services contain business logic and orchestrate repositories:
python
from app.exceptions import ConflictError, NotFoundError
from app.models.user import User
from app.repositories.user_repo import UserRepository
from app.schemas.user import UserCreate, UserPatch
from app.core.security import hash_password


class UserService:
    def __init__(self, session: AsyncSession) -> None:
        self.repo = UserRepository(session)

    async def create_user(self, data: UserCreate) -> User:
        # Business rule: email must be unique
        existing = await self.repo.get_by_email(data.email)
        if existing:
            raise ConflictError(f"Email {data.email} already registered")

        # Business logic: hash password before storing
        user = User(
            email=data.email,
            hashed_password=hash_password(data.password),
            display_name=data.display_name,
        )
        return await self.repo.create(user)

    async def get_user(self, user_id: int) -> User:
        user = await self.repo.get_by_id(user_id)
        if user is None:
            raise NotFoundError(f"User {user_id} not found")
        return user

    async def update_user(self, user_id: int, data: UserPatch) -> User:
        user = await self.get_user(user_id)
        update_fields = data.model_dump(exclude_unset=True)
        if "password" in update_fields:
            update_fields["hashed_password"] = hash_password(update_fields.pop("password"))
        return await self.repo.update(user, **update_fields)
Rules:
  • Services raise domain exceptions (
    NotFoundError
    ,
    ConflictError
    ), NEVER
    HTTPException
  • Services are the only place for business logic
  • Services call repositories for data access, never run raw queries
  • Services receive
    AsyncSession
    via constructor and create their own repository instances
  • Services validate business rules before calling repositories
服务层包含业务逻辑并协调仓储:
python
from app.exceptions import ConflictError, NotFoundError
from app.models.user import User
from app.repositories.user_repo import UserRepository
from app.schemas.user import UserCreate, UserPatch
from app.core.security import hash_password


class UserService:
    def __init__(self, session: AsyncSession) -> None:
        self.repo = UserRepository(session)

    async def create_user(self, data: UserCreate) -> User:
        # 业务规则:邮箱必须唯一
        existing = await self.repo.get_by_email(data.email)
        if existing:
            raise ConflictError(f"Email {data.email} already registered")

        # 业务逻辑:存储前哈希密码
        user = User(
            email=data.email,
            hashed_password=hash_password(data.password),
            display_name=data.display_name,
        )
        return await self.repo.create(user)

    async def get_user(self, user_id: int) -> User:
        user = await self.repo.get_by_id(user_id)
        if user is None:
            raise NotFoundError(f"User {user_id} not found")
        return user

    async def update_user(self, user_id: int, data: UserPatch) -> User:
        user = await self.get_user(user_id)
        update_fields = data.model_dump(exclude_unset=True)
        if "password" in update_fields:
            update_fields["hashed_password"] = hash_password(update_fields.pop("password"))
        return await self.repo.update(user, **update_fields)
规则:
  • 服务层抛出领域异常(
    NotFoundError
    ConflictError
    ),绝不抛出
    HTTPException
  • 服务层是唯一存放业务逻辑的地方
  • 服务层调用仓储进行数据访问,绝不执行原生查询
  • 服务层通过构造函数接收
    AsyncSession
    并创建自己的仓储实例
  • 服务层在调用仓储前验证业务规则

Domain Exceptions

领域异常

Define a hierarchy of domain exceptions:
python
class AppError(Exception):
    """Base application error."""

class NotFoundError(AppError):
    """Resource not found."""

class ConflictError(AppError):
    """Resource conflict (duplicate, version mismatch)."""

class ValidationError(AppError):
    """Business rule violation."""

class PermissionError(AppError):
    """Insufficient permissions."""
Register global exception handlers in the FastAPI app:
python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

@app.exception_handler(NotFoundError)
async def not_found_handler(request: Request, exc: NotFoundError) -> JSONResponse:
    return JSONResponse(status_code=404, content={"detail": str(exc), "code": "NOT_FOUND"})

@app.exception_handler(ConflictError)
async def conflict_handler(request: Request, exc: ConflictError) -> JSONResponse:
    return JSONResponse(status_code=409, content={"detail": str(exc), "code": "CONFLICT"})
This allows services to raise domain exceptions without knowing about HTTP, and routes don't need try/except blocks.
定义领域异常的层级结构:
python
class AppError(Exception):
    """基础应用错误。"""

class NotFoundError(AppError):
    """资源未找到。"""

class ConflictError(AppError):
    """资源冲突(重复、版本不匹配)。"""

class ValidationError(AppError):
    """业务规则违反。"""

class PermissionError(AppError):
    """权限不足。"""
在FastAPI应用中注册全局异常处理器:
python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

@app.exception_handler(NotFoundError)
async def not_found_handler(request: Request, exc: NotFoundError) -> JSONResponse:
    return JSONResponse(status_code=404, content={"detail": str(exc), "code": "NOT_FOUND"})

@app.exception_handler(ConflictError)
async def conflict_handler(request: Request, exc: ConflictError) -> JSONResponse:
    return JSONResponse(status_code=409, content={"detail": str(exc), "code": "CONFLICT"})
这使得服务层可以在不了解HTTP的情况下抛出领域异常,路由也无需编写try/except块。

Pydantic v2 Schema Conventions

Pydantic v2 Schema约定

python
from datetime import datetime
from pydantic import BaseModel, ConfigDict, EmailStr, Field


class UserCreate(BaseModel):
    """POST request body — writable fields only, no id/timestamps."""
    email: EmailStr
    password: str = Field(min_length=8, max_length=128)
    display_name: str = Field(min_length=1, max_length=100)


class UserPatch(BaseModel):
    """PATCH request body — all fields Optional."""
    email: EmailStr | None = None
    password: str | None = Field(default=None, min_length=8, max_length=128)
    display_name: str | None = Field(default=None, min_length=1, max_length=100)


class UserResponse(BaseModel):
    """Response body — all fields including id and timestamps."""
    model_config = ConfigDict(from_attributes=True)

    id: int
    email: str
    display_name: str
    is_active: bool
    created_at: datetime
    updated_at: datetime
Key Pydantic v2 patterns:
  • Use
    ConfigDict(from_attributes=True)
    instead of
    class Config: orm_mode = True
  • Use
    model_validate()
    instead of
    from_orm()
  • Use
    model_dump()
    instead of
    .dict()
  • Use
    model_dump(exclude_unset=True)
    for PATCH to distinguish "not sent" from "set to null"
  • Use
    Field()
    for validation constraints
  • Use
    str | None
    syntax (Python 3.12+), not
    Optional[str]
python
from datetime import datetime
from pydantic import BaseModel, ConfigDict, EmailStr, Field


class UserCreate(BaseModel):
    """POST请求体——仅包含可写字段,无id/时间戳。"""
    email: EmailStr
    password: str = Field(min_length=8, max_length=128)
    display_name: str = Field(min_length=1, max_length=100)


class UserPatch(BaseModel):
    """PATCH请求体——所有字段均为可选。"""
    email: EmailStr | None = None
    password: str | None = Field(default=None, min_length=8, max_length=128)
    display_name: str | None = Field(default=None, min_length=1, max_length=100)


class UserResponse(BaseModel):
    """响应体——包含所有字段,包括id和时间戳。"""
    model_config = ConfigDict(from_attributes=True)

    id: int
    email: str
    display_name: str
    is_active: bool
    created_at: datetime
    updated_at: datetime
Pydantic v2核心模式:
  • 使用
    ConfigDict(from_attributes=True)
    替代
    class Config: orm_mode = True
  • 使用
    model_validate()
    替代
    from_orm()
  • 使用
    model_dump()
    替代
    .dict()
  • 使用
    model_dump(exclude_unset=True)
    处理PATCH请求,区分"未发送"和"设为null"
  • 使用
    Field()
    定义验证约束
  • 使用
    str | None
    语法(Python 3.12+),而非
    Optional[str]

Async Session Management

异步会话管理

python
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)

from app.core.config import settings

engine = create_async_engine(
    settings.database_url,
    echo=settings.debug,
    pool_size=5,
    max_overflow=10,
    pool_pre_ping=True,
)

async_session_factory = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)


async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
    async with async_session_factory() as session:
        async with session.begin():
            yield session
Rules:
  • expire_on_commit=False
    prevents detached instance errors after commit
  • session.begin()
    context manager auto-commits on success, rolls back on exception
  • One session per request via
    Depends(get_async_session)
  • Never share sessions across concurrent tasks
  • For background tasks, create a new session — never reuse the request session
python
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)

from app.core.config import settings

engine = create_async_engine(
    settings.database_url,
    echo=settings.debug,
    pool_size=5,
    max_overflow=10,
    pool_pre_ping=True,
)

async_session_factory = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)


async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
    async with async_session_factory() as session:
        async with session.begin():
            yield session
规则:
  • expire_on_commit=False
    防止提交后出现实例分离错误
  • session.begin()
    上下文管理器会在成功时自动提交,异常时自动回滚
  • 通过
    Depends(get_async_session)
    为每个请求分配一个会话
  • 绝不要在并发任务间共享会话
  • 对于后台任务,创建新会话——绝不要复用请求会话

SQLAlchemy 2.0 Model Pattern

SQLAlchemy 2.0模型模式

python
from datetime import datetime
from sqlalchemy import String, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    hashed_password: Mapped[str] = mapped_column(String(255))
    display_name: Mapped[str] = mapped_column(String(100))
    is_active: Mapped[bool] = mapped_column(default=True)
    created_at: Mapped[datetime] = mapped_column(server_default=func.now())
    updated_at: Mapped[datetime] = mapped_column(
        server_default=func.now(), onupdate=func.now()
    )

    # Relationships — ALWAYS use selectin or joined for async
    posts: Mapped[list["Post"]] = relationship(
        back_populates="author", lazy="selectin"
    )
Rules:
  • Use
    Mapped[type]
    annotations (SQLAlchemy 2.0 style)
  • Use
    mapped_column()
    instead of
    Column()
  • Set
    lazy="selectin"
    on relationships for async compatibility
  • Use
    server_default
    for database-generated defaults
  • Always include
    created_at
    and
    updated_at
    timestamps
python
from datetime import datetime
from sqlalchemy import String, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    hashed_password: Mapped[str] = mapped_column(String(255))
    display_name: Mapped[str] = mapped_column(String(100))
    is_active: Mapped[bool] = mapped_column(default=True)
    created_at: Mapped[datetime] = mapped_column(server_default=func.now())
    updated_at: Mapped[datetime] = mapped_column(
        server_default=func.now(), onupdate=func.now()
    )

    # 关联关系——异步环境下务必使用selectin或joined
    posts: Mapped[list["Post"]] = relationship(
        back_populates="author", lazy="selectin"
    )
规则:
  • 使用
    Mapped[type]
    注解(SQLAlchemy 2.0风格)
  • 使用
    mapped_column()
    替代
    Column()
  • 为关联关系设置
    lazy="selectin"
    以兼容异步环境
  • 使用
    server_default
    定义数据库生成的默认值
  • 务必包含
    created_at
    updated_at
    时间戳

Alembic Migration Workflow

Alembic迁移工作流

bash
undefined
bash
undefined

Generate migration from model changes

根据模型变更生成迁移文件

alembic revision --autogenerate -m "add_users_table"
alembic revision --autogenerate -m "add_users_table"

Review the generated migration file before applying

应用前检查生成的迁移文件

Apply migration

应用迁移

alembic upgrade head
alembic upgrade head

Rollback one step

回滚一步

alembic downgrade -1
alembic downgrade -1

Show current revision

查看当前版本

alembic current
alembic current

Show migration history

查看迁移历史

alembic history

**Migration naming convention:**
```python
alembic history

**迁移命名约定:**
```python

alembic/env.py

alembic/env.py

naming_convention = { "ix": "ix_%(column_0_label)s", "uq": "uq_%(table_name)s_%(column_0_name)s", "ck": "ck_%(table_name)s_%(constraint_name)s", "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", "pk": "pk_%(table_name)s", }

**Rules:**
- Always review autogenerated migrations before applying
- Every migration must have a working `downgrade()` function
- One migration per logical schema change
- Test both upgrade and downgrade
- Use descriptive migration messages: `"add_users_table"`, `"add_email_index_to_users"`
naming_convention = { "ix": "ix_%(column_0_label)s", "uq": "uq_%(table_name)s_%(column_0_name)s", "ck": "ck_%(table_name)s_%(constraint_name)s", "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", "pk": "pk_%(table_name)s", }

**规则:**
- 应用前务必检查自动生成的迁移文件
- 每个迁移必须包含可正常工作的`downgrade()`函数
- 每个逻辑 schema 变更对应一个迁移文件
- 测试升级和回滚流程
- 使用描述性迁移消息:`"add_users_table"`、`"add_email_index_to_users"`

Dependency Injection Pattern

依赖注入模式

python
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.database import get_async_session
from app.services.user_service import UserService


async def get_user_service(
    session: AsyncSession = Depends(get_async_session),
) -> UserService:
    return UserService(session)
python
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.database import get_async_session
from app.services.user_service import UserService


async def get_user_service(
    session: AsyncSession = Depends(get_async_session),
) -> UserService:
    return UserService(session)

Chain dependencies for auth

链式依赖实现认证

async def get_current_user( token: str = Depends(oauth2_scheme), session: AsyncSession = Depends(get_async_session), ) -> User: user_id = decode_token(token) service = UserService(session) return await service.get_user(user_id)
async def require_admin( user: User = Depends(get_current_user), ) -> User: if user.role != "admin": raise HTTPException(status_code=403, detail="Admin required") return user
undefined
async def get_current_user( token: str = Depends(oauth2_scheme), session: AsyncSession = Depends(get_async_session), ) -> User: user_id = decode_token(token) service = UserService(session) return await service.get_user(user_id)
async def require_admin( user: User = Depends(get_current_user), ) -> User: if user.role != "admin": raise HTTPException(status_code=403, detail="Admin required") return user
undefined

Examples

示例

Complete Request Flow

完整请求流程

A request to
POST /users
flows through all layers:
  1. Route receives
    UserCreate
    (Pydantic validates the request body)
  2. Route calls
    UserService.create_user(data)
    via
    Depends()
  3. Service checks business rule (email uniqueness) via
    UserRepository.get_by_email()
  4. Service hashes password, creates
    User
    model instance
  5. Service calls
    UserRepository.create(user)
    to persist
  6. Repository adds to session, flushes, refreshes to get generated fields
  7. Route converts the ORM model to
    UserResponse
    via
    model_validate()
If the email is duplicate, the service raises
ConflictError
, the global exception handler returns
409 Conflict
. No
try/except
needed in the route.
POST /users
发送请求的流程如下:
  1. 路由接收
    UserCreate
    (Pydantic验证请求体)
  2. 路由通过
    Depends()
    调用
    UserService.create_user(data)
  3. 服务层通过
    UserRepository.get_by_email()
    检查业务规则(邮箱唯一性)
  4. 服务层哈希密码,创建
    User
    模型实例
  5. 服务层调用
    UserRepository.create(user)
    持久化数据
  6. 仓储将实例添加到会话,执行flush和refresh以获取生成字段
  7. 路由通过
    model_validate()
    将ORM模型转换为
    UserResponse
如果邮箱重复,服务层抛出
ConflictError
,全局异常处理器返回
409 Conflict
。路由中无需编写try/except块。

Edge Cases

边缘场景

  • Detached instance errors: Always call
    flush()
    +
    refresh()
    after
    session.add()
    . Set
    expire_on_commit=False
    on the session factory.
  • Async session in background tasks: Never reuse the request session. Create a new session:
    python
    async def background_job():
        async with async_session_factory() as session:
            async with session.begin():
                # do work
  • N+1 queries: Use
    selectinload()
    in repository queries for relationships that will be accessed. Set
    lazy="selectin"
    as the default on model relationships.
  • Bulk operations: Use
    session.execute(insert(User).values(list_of_dicts))
    for bulk inserts instead of adding one by one.
  • Transaction spanning multiple services: Pass the same session to all services. The session's
    begin()
    context manager handles the transaction boundary.
  • Pydantic v2 computed fields: Use
    @computed_field
    for derived values in response schemas. See
    references/pydantic-v2-migration.md
    .
See
references/sqlalchemy-patterns.md
for advanced query optimization patterns.
  • 实例分离错误:
    session.add()
    后务必调用
    flush()
    +
    refresh()
    。在会话工厂中设置
    expire_on_commit=False
  • 后台任务中的异步会话: 绝不要复用请求会话。创建新会话:
    python
    async def background_job():
        async with async_session_factory() as session:
            async with session.begin():
                # 执行任务
  • N+1查询问题: 在仓储查询中对需要访问的关联关系使用
    selectinload()
    。在模型关联关系中设置
    lazy="selectin"
    作为默认值。
  • 批量操作: 使用
    session.execute(insert(User).values(list_of_dicts))
    执行批量插入,而非逐个添加。
  • 跨多个服务的事务: 向所有服务传递同一个会话。会话的
    begin()
    上下文管理器处理事务边界。
  • Pydantic v2计算字段: 在响应schema中使用
    @computed_field
    定义派生值。请参考
    references/pydantic-v2-migration.md
请查看
references/sqlalchemy-patterns.md
获取高级查询优化模式。