fastapi-endpoint

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

FastAPI Endpoint Builder

FastAPI 端点构建指南

When to use

适用场景

Use this skill when you need to:
  • Add new API endpoints to an existing FastAPI project
  • Build CRUD operations with proper validation and error handling
  • Set up authenticated endpoints with dependency injection
  • Create async database queries with SQLAlchemy 2.0
  • Generate complete test coverage for API routes
当你需要以下操作时,可使用本技能:
  • 为现有FastAPI项目添加新的API端点
  • 构建具备完善验证和错误处理的CRUD操作
  • 通过依赖注入设置已认证的端点
  • 使用SQLAlchemy 2.0创建异步数据库查询
  • 为API路由生成完整的测试覆盖

Phase 1: Explore (Plan Mode)

阶段1:探索(规划模式)

Enter plan mode. Before writing any code, explore the existing project to understand:
进入规划模式。在编写任何代码之前,先探索现有项目以了解:

Project structure

项目结构

  • Find the FastAPI app entry point (
    main.py
    ,
    app.py
    , or
    app/__init__.py
    )
  • Identify the router organization pattern (single file vs
    routers/
    directory)
  • Check for existing
    models/
    ,
    schemas/
    ,
    crud/
    , or
    services/
    directories
  • Look at
    pyproject.toml
    or
    requirements.txt
    for installed dependencies
  • 找到FastAPI应用的入口文件(
    main.py
    app.py
    app/__init__.py
  • 识别路由组织模式(单文件或
    routers/
    目录)
  • 检查是否存在现有的
    models/
    schemas/
    crud/
    services/
    目录
  • 查看
    pyproject.toml
    requirements.txt
    了解已安装的依赖

Existing patterns

现有模式

  • How are existing endpoints structured? (function-based vs class-based)
  • What ORM is used? (SQLAlchemy 2.0 async, Tortoise, raw SQL, none)
  • How is the database session managed? (
    Depends(get_db)
    , middleware, other)
  • What auth pattern exists? (OAuth2PasswordBearer, API key header, custom)
  • Are there existing Pydantic base models or shared schemas?
  • What response format is standard? (direct model, wrapped
    {"data": ..., "meta": ...}
    )
  • 现有端点的结构是怎样的?(基于函数或基于类)
  • 使用的是哪种ORM?(SQLAlchemy 2.0异步版、Tortoise、原生SQL、无ORM)
  • 数据库会话如何管理?(
    Depends(get_db)
    、中间件或其他方式)
  • 采用的是哪种认证模式?(OAuth2PasswordBearer、API密钥头、自定义认证)
  • 是否存在现有的Pydantic基础模型或共享模式?
  • 标准的响应格式是什么?(直接返回模型、包装为
    {"data": ..., "meta": ...}
    格式)

Test patterns

测试模式

  • Where do tests live? (
    tests/
    ,
    test_*.py
    ,
    *_test.py
    )
  • What test client is used? (httpx AsyncClient, TestClient, pytest-asyncio)
  • Are there test fixtures for database and auth?
  • 测试文件存放位置?(
    tests/
    目录、
    test_*.py
    *_test.py
    文件)
  • 使用的是哪种测试客户端?(httpx AsyncClient、TestClient、pytest-asyncio)
  • 是否有针对数据库和认证的测试夹具?

Phase 2: Interview (AskUserQuestion)

阶段2:沟通(询问用户问题)

Use AskUserQuestion to clarify requirements. Ask in rounds — do NOT dump all questions at once.
使用询问功能明确需求。分轮次提问——不要一次性抛出所有问题。

Round 1: Core endpoint

第一轮:核心端点

Question: "What resource does this endpoint manage?"
Header: "Resource"
Options:
  - "New resource (I'll describe the fields)" — Creating a new data model from scratch
  - "Existing model (extend it)" — Adding endpoints for a model that already exists in the codebase
  - "Relationship endpoint (nested)" — e.g., /users/{id}/orders — endpoint on a related resource

Question: "Which HTTP methods do you need?"
Header: "Methods"
multiSelect: true
Options:
  - "Full CRUD (GET list, GET detail, POST, PUT/PATCH, DELETE)" — All standard operations
  - "Read-only (GET list + GET detail)" — No mutations
  - "Custom action (POST /resource/{id}/action)" — Business logic endpoint, not standard CRUD
问题:"该端点管理的是什么资源?"
标题:"资源类型"
选项:
  - "新资源(我会描述字段)" —— 从零开始创建新的数据模型
  - "现有模型(扩展)" —— 为代码库中已存在的模型添加端点
  - "关联端点(嵌套)" —— 例如 /users/{id}/orders —— 关联资源的端点

问题:"你需要哪些HTTP方法?"
标题:"请求方法"
可多选:是
选项:
  - "完整CRUD(列表查询、详情查询、创建、更新/部分更新、删除)" —— 所有标准操作
  - "仅读取(列表查询 + 详情查询)" —— 不包含修改操作
  - "自定义操作(POST /resource/{id}/action)" —— 业务逻辑端点,非标准CRUD

Round 2: Data model (if new resource)

第二轮:数据模型(如果是新资源)

Question: "What fields does the resource have? (describe briefly)"
Header: "Fields"
Options:
  - "Simple (< 6 fields, basic types)" — Strings, ints, booleans, dates
  - "Medium (6-15 fields, some relations)" — Includes foreign keys or enums
  - "Complex (nested objects, polymorphic)" — JSON fields, discriminated unions, computed fields
问题:"该资源包含哪些字段?(简要描述)"
标题:"字段复杂度"
选项:
  - "简单(少于6个字段,基础类型)" —— 字符串、整数、布尔值、日期
  - "中等(6-15个字段,包含关联)" —— 包含外键或枚举类型
  - "复杂(嵌套对象、多态)" —— JSON字段、鉴别联合、计算字段

Round 3: Auth and access control

第三轮:认证与访问控制

Question: "How should this endpoint be authenticated?"
Header: "Auth"
Options:
  - "JWT Bearer token (Recommended)" — OAuth2PasswordBearer with JWT decode
  - "API Key header" — X-API-Key header validation
  - "No auth (public)" — Open endpoint, no authentication required
  - "Use existing auth" — Reuse the auth dependency already in the project

Question: "Do you need role-based access control?"
Header: "RBAC"
Options:
  - "No — any authenticated user" — Single permission level
  - "Yes — role check (admin, user, etc.)" — Require specific roles per endpoint
  - "Yes — ownership check" — Users can only access their own resources
问题:"该端点应采用哪种认证方式?"
标题:"认证方式"
选项:
  - "JWT Bearer令牌(推荐)" —— 带JWT解码的OAuth2PasswordBearer
  - "API密钥头" —— 验证X-API-Key请求头
  - "无认证(公开)" —— 开放端点,无需认证
  - "使用现有认证" —— 复用项目中已有的认证依赖

问题:"是否需要基于角色的访问控制?"
标题:"RBAC权限"
选项:
  - "不需要 —— 任何已认证用户均可访问" —— 单一权限级别
  - "需要 —— 角色校验(管理员、普通用户等)" —— 每个端点要求特定角色
  - "需要 —— 归属校验" —— 用户仅能访问自己的资源

Round 4: Pagination, filtering, caching

第四轮:分页、过滤、缓存

Question: "What pagination style for list endpoints?"
Header: "Pagination"
Options:
  - "Cursor-based (Recommended)" — Best for real-time data, no offset drift
  - "Offset/limit" — Simple, good for admin panels with page numbers
  - "No pagination" — Small datasets, return all results

Question: "Do you need response caching?"
Header: "Caching"
Options:
  - "No caching" — Fresh data on every request
  - "Cache-Control headers" — Client-side caching via HTTP headers
  - "Redis/in-memory cache" — Server-side caching with TTL
问题:"列表端点采用哪种分页方式?"
标题:"分页策略"
选项:
  - "基于游标(推荐)" —— 最适合实时数据,无偏移漂移问题
  - "偏移/限制" —— 实现简单,适用于带页码的管理面板
  - "不分页" —— 小型数据集,返回所有结果

问题:"是否需要响应缓存?"
标题:"缓存设置"
选项:
  - "不缓存" —— 每次请求返回最新数据
  - "Cache-Control头" —— 通过HTTP头实现客户端缓存
  - "Redis/内存缓存" —— 带TTL的服务端缓存

Phase 3: Plan (ExitPlanMode)

阶段3:规划(退出规划模式)

Write a concrete implementation plan covering:
  1. Files to create/modify — exact paths based on project structure discovered in Phase 1
  2. Pydantic schemas
    Create
    ,
    Update
    ,
    Response
    , and
    List
    schemas with field types
  3. SQLAlchemy model — table name, columns, relationships, indexes
  4. CRUD/service layer — async functions for each operation
  5. Router — endpoint signatures, status codes, response models
  6. Dependencies — auth, pagination, filtering dependencies
  7. Tests — test cases for happy path, validation errors, auth failures, not found
Present via ExitPlanMode for user approval.
编写具体的实现方案,涵盖:
  1. 需创建/修改的文件 —— 基于阶段1发现的项目结构给出准确路径
  2. Pydantic模式 —— 包含字段类型的
    Create
    Update
    Response
    List
    模式
  3. SQLAlchemy模型 —— 表名、列、关联关系、索引
  4. CRUD/服务层 —— 每个操作对应的异步函数
  5. 路由 —— 端点签名、状态码、响应模型
  6. 依赖项 —— 认证、分页、过滤依赖
  7. 测试 —— 正常流程、验证错误、认证失败、资源不存在等测试用例
通过退出规划模式提交方案,等待用户确认。

Phase 4: Execute

阶段4:执行

After approval, implement following this order:
获得批准后,按照以下顺序实现:

Step 1: Pydantic schemas

步骤1:Pydantic模式

python
from pydantic import BaseModel, ConfigDict
from datetime import datetime
from uuid import UUID

class ResourceBase(BaseModel):
    """Shared fields between create and response."""
    name: str
    # ... fields from interview

class ResourceCreate(ResourceBase):
    """Fields required to create the resource."""
    pass

class ResourceUpdate(BaseModel):
    """All fields optional for partial updates."""
    name: str | None = None

class ResourceResponse(ResourceBase):
    """Full resource with DB-generated fields."""
    model_config = ConfigDict(from_attributes=True)
    id: UUID
    created_at: datetime
    updated_at: datetime

class ResourceListResponse(BaseModel):
    """Paginated list response."""
    data: list[ResourceResponse]
    next_cursor: str | None = None
    has_more: bool
python
from pydantic import BaseModel, ConfigDict
from datetime import datetime
from uuid import UUID

class ResourceBase(BaseModel):
    """创建和响应共享的字段。"""
    name: str
    # ... 来自沟通阶段的字段

class ResourceCreate(ResourceBase):
    """创建资源所需的字段。"""
    pass

class ResourceUpdate(BaseModel):
    """部分更新时所有字段可选。"""
    name: str | None = None

class ResourceResponse(ResourceBase):
    """包含数据库生成字段的完整资源。"""
    model_config = ConfigDict(from_attributes=True)
    id: UUID
    created_at: datetime
    updated_at: datetime

class ResourceListResponse(BaseModel):
    """分页列表响应。"""
    data: list[ResourceResponse]
    next_cursor: str | None = None
    has_more: bool

Step 2: SQLAlchemy model

步骤2:SQLAlchemy模型

python
from sqlalchemy import Column, String, DateTime, func
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
import uuid
from app.database import Base

class Resource(Base):
    __tablename__ = "resources"

    id = Column(PG_UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    name = Column(String, nullable=False, index=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
python
from sqlalchemy import Column, String, DateTime, func
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
import uuid
from app.database import Base

class Resource(Base):
    __tablename__ = "resources"

    id = Column(PG_UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    name = Column(String, nullable=False, index=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

Step 3: CRUD/service layer

步骤3:CRUD/服务层

python
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from uuid import UUID

async def get_resource(db: AsyncSession, resource_id: UUID) -> Resource | None:
    result = await db.execute(select(Resource).where(Resource.id == resource_id))
    return result.scalar_one_or_none()

async def list_resources(
    db: AsyncSession,
    cursor: str | None = None,
    limit: int = 20,
) -> tuple[list[Resource], str | None]:
    query = select(Resource).order_by(Resource.created_at.desc()).limit(limit + 1)
    if cursor:
        query = query.where(Resource.created_at < decode_cursor(cursor))
    result = await db.execute(query)
    items = list(result.scalars().all())
    next_cursor = encode_cursor(items[-1].created_at) if len(items) > limit else None
    return items[:limit], next_cursor

async def create_resource(db: AsyncSession, data: ResourceCreate) -> Resource:
    resource = Resource(**data.model_dump())
    db.add(resource)
    await db.commit()
    await db.refresh(resource)
    return resource

async def update_resource(
    db: AsyncSession, resource_id: UUID, data: ResourceUpdate
) -> Resource | None:
    resource = await get_resource(db, resource_id)
    if not resource:
        return None
    for field, value in data.model_dump(exclude_unset=True).items():
        setattr(resource, field, value)
    await db.commit()
    await db.refresh(resource)
    return resource

async def delete_resource(db: AsyncSession, resource_id: UUID) -> bool:
    resource = await get_resource(db, resource_id)
    if not resource:
        return False
    await db.delete(resource)
    await db.commit()
    return True
python
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from uuid import UUID

async def get_resource(db: AsyncSession, resource_id: UUID) -> Resource | None:
    result = await db.execute(select(Resource).where(Resource.id == resource_id))
    return result.scalar_one_or_none()

async def list_resources(
    db: AsyncSession,
    cursor: str | None = None,
    limit: int = 20,
) -> tuple[list[Resource], str | None]:
    query = select(Resource).order_by(Resource.created_at.desc()).limit(limit + 1)
    if cursor:
        query = query.where(Resource.created_at < decode_cursor(cursor))
    result = await db.execute(query)
    items = list(result.scalars().all())
    next_cursor = encode_cursor(items[-1].created_at) if len(items) > limit else None
    return items[:limit], next_cursor

async def create_resource(db: AsyncSession, data: ResourceCreate) -> Resource:
    resource = Resource(**data.model_dump())
    db.add(resource)
    await db.commit()
    await db.refresh(resource)
    return resource

async def update_resource(
    db: AsyncSession, resource_id: UUID, data: ResourceUpdate
) -> Resource | None:
    resource = await get_resource(db, resource_id)
    if not resource:
        return None
    for field, value in data.model_dump(exclude_unset=True).items():
        setattr(resource, field, value)
    await db.commit()
    await db.refresh(resource)
    return resource

async def delete_resource(db: AsyncSession, resource_id: UUID) -> bool:
    resource = await get_resource(db, resource_id)
    if not resource:
        return False
    await db.delete(resource)
    await db.commit()
    return True

Step 4: Router with dependencies

步骤4:带依赖的路由

python
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
from uuid import UUID

router = APIRouter(prefix="/resources", tags=["resources"])

@router.get("", response_model=ResourceListResponse)
async def list_resources_endpoint(
    cursor: str | None = Query(None),
    limit: int = Query(20, ge=1, le=100),
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),  # if auth required
):
    items, next_cursor = await list_resources(db, cursor=cursor, limit=limit)
    return ResourceListResponse(
        data=items,
        next_cursor=next_cursor,
        has_more=next_cursor is not None,
    )

@router.get("/{resource_id}", response_model=ResourceResponse)
async def get_resource_endpoint(
    resource_id: UUID,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    resource = await get_resource(db, resource_id)
    if not resource:
        raise HTTPException(status_code=404, detail="Resource not found")
    return resource

@router.post("", response_model=ResourceResponse, status_code=status.HTTP_201_CREATED)
async def create_resource_endpoint(
    data: ResourceCreate,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    return await create_resource(db, data)

@router.patch("/{resource_id}", response_model=ResourceResponse)
async def update_resource_endpoint(
    resource_id: UUID,
    data: ResourceUpdate,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    resource = await update_resource(db, resource_id, data)
    if not resource:
        raise HTTPException(status_code=404, detail="Resource not found")
    return resource

@router.delete("/{resource_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_resource_endpoint(
    resource_id: UUID,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    deleted = await delete_resource(db, resource_id)
    if not deleted:
        raise HTTPException(status_code=404, detail="Resource not found")
python
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
from uuid import UUID

router = APIRouter(prefix="/resources", tags=["resources"])

@router.get("", response_model=ResourceListResponse)
async def list_resources_endpoint(
    cursor: str | None = Query(None),
    limit: int = Query(20, ge=1, le=100),
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),  # 如果需要认证
):
    items, next_cursor = await list_resources(db, cursor=cursor, limit=limit)
    return ResourceListResponse(
        data=items,
        next_cursor=next_cursor,
        has_more=next_cursor is not None,
    )

@router.get("/{resource_id}", response_model=ResourceResponse)
async def get_resource_endpoint(
    resource_id: UUID,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    resource = await get_resource(db, resource_id)
    if not resource:
        raise HTTPException(status_code=404, detail="Resource not found")
    return resource

@router.post("", response_model=ResourceResponse, status_code=status.HTTP_201_CREATED)
async def create_resource_endpoint(
    data: ResourceCreate,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    return await create_resource(db, data)

@router.patch("/{resource_id}", response_model=ResourceResponse)
async def update_resource_endpoint(
    resource_id: UUID,
    data: ResourceUpdate,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    resource = await update_resource(db, resource_id, data)
    if not resource:
        raise HTTPException(status_code=404, detail="Resource not found")
    return resource

@router.delete("/{resource_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_resource_endpoint(
    resource_id: UUID,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    deleted = await delete_resource(db, resource_id)
    if not deleted:
        raise HTTPException(status_code=404, detail="Resource not found")

Step 5: Tests

步骤5:测试

python
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app

@pytest.fixture
async def client():
    async with AsyncClient(
        transport=ASGITransport(app=app), base_url="http://test"
    ) as ac:
        yield ac

@pytest.mark.asyncio
async def test_create_resource(client: AsyncClient, auth_headers: dict):
    response = await client.post(
        "/resources",
        json={"name": "Test Resource"},
        headers=auth_headers,
    )
    assert response.status_code == 201
    data = response.json()
    assert data["name"] == "Test Resource"
    assert "id" in data

@pytest.mark.asyncio
async def test_get_resource_not_found(client: AsyncClient, auth_headers: dict):
    response = await client.get(
        "/resources/00000000-0000-0000-0000-000000000000",
        headers=auth_headers,
    )
    assert response.status_code == 404

@pytest.mark.asyncio
async def test_list_resources_pagination(client: AsyncClient, auth_headers: dict):
    # Create multiple resources first
    for i in range(5):
        await client.post(
            "/resources",
            json={"name": f"Resource {i}"},
            headers=auth_headers,
        )
    response = await client.get("/resources?limit=2", headers=auth_headers)
    assert response.status_code == 200
    data = response.json()
    assert len(data["data"]) == 2
    assert data["has_more"] is True
    assert data["next_cursor"] is not None

@pytest.mark.asyncio
async def test_create_resource_unauthorized(client: AsyncClient):
    response = await client.post("/resources", json={"name": "Test"})
    assert response.status_code in (401, 403)

@pytest.mark.asyncio
async def test_update_resource_partial(client: AsyncClient, auth_headers: dict):
    # Create
    create_resp = await client.post(
        "/resources",
        json={"name": "Original"},
        headers=auth_headers,
    )
    resource_id = create_resp.json()["id"]
    # Partial update
    response = await client.patch(
        f"/resources/{resource_id}",
        json={"name": "Updated"},
        headers=auth_headers,
    )
    assert response.status_code == 200
    assert response.json()["name"] == "Updated"

@pytest.mark.asyncio
async def test_delete_resource(client: AsyncClient, auth_headers: dict):
    create_resp = await client.post(
        "/resources",
        json={"name": "To Delete"},
        headers=auth_headers,
    )
    resource_id = create_resp.json()["id"]
    response = await client.delete(
        f"/resources/{resource_id}", headers=auth_headers
    )
    assert response.status_code == 204
    # Verify deleted
    get_resp = await client.get(
        f"/resources/{resource_id}", headers=auth_headers
    )
    assert get_resp.status_code == 404
python
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app

@pytest.fixture
async def client():
    async with AsyncClient(
        transport=ASGITransport(app=app), base_url="http://test"
    ) as ac:
        yield ac

@pytest.mark.asyncio
async def test_create_resource(client: AsyncClient, auth_headers: dict):
    response = await client.post(
        "/resources",
        json={"name": "Test Resource"},
        headers=auth_headers,
    )
    assert response.status_code == 201
    data = response.json()
    assert data["name"] == "Test Resource"
    assert "id" in data

@pytest.mark.asyncio
async def test_get_resource_not_found(client: AsyncClient, auth_headers: dict):
    response = await client.get(
        "/resources/00000000-0000-0000-0000-000000000000",
        headers=auth_headers,
    )
    assert response.status_code == 404

@pytest.mark.asyncio
async def test_list_resources_pagination(client: AsyncClient, auth_headers: dict):
    # 先创建多个资源
    for i in range(5):
        await client.post(
            "/resources",
            json={"name": f"Resource {i}"},
            headers=auth_headers,
        )
    response = await client.get("/resources?limit=2", headers=auth_headers)
    assert response.status_code == 200
    data = response.json()
    assert len(data["data"]) == 2
    assert data["has_more"] is True
    assert data["next_cursor"] is not None

@pytest.mark.asyncio
async def test_create_resource_unauthorized(client: AsyncClient):
    response = await client.post("/resources", json={"name": "Test"})
    assert response.status_code in (401, 403)

@pytest.mark.asyncio
async def test_update_resource_partial(client: AsyncClient, auth_headers: dict):
    # 创建资源
    create_resp = await client.post(
        "/resources",
        json={"name": "Original"},
        headers=auth_headers,
    )
    resource_id = create_resp.json()["id"]
    # 部分更新
    response = await client.patch(
        f"/resources/{resource_id}",
        json={"name": "Updated"},
        headers=auth_headers,
    )
    assert response.status_code == 200
    assert response.json()["name"] == "Updated"

@pytest.mark.asyncio
async def test_delete_resource(client: AsyncClient, auth_headers: dict):
    create_resp = await client.post(
        "/resources",
        json={"name": "To Delete"},
        headers=auth_headers,
    )
    resource_id = create_resp.json()["id"]
    response = await client.delete(
        f"/resources/{resource_id}", headers=auth_headers
    )
    assert response.status_code == 204
    # 验证已删除
    get_resp = await client.get(
        f"/resources/{resource_id}", headers=auth_headers
    )
    assert get_resp.status_code == 404

Key patterns to follow

需遵循的核心模式

Dependency injection for auth

用于认证的依赖注入

python
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")

async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db),
) -> User:
    payload = decode_jwt(token)
    user = await db.get(User, payload["sub"])
    if not user:
        raise HTTPException(status_code=401, detail="Invalid token")
    return user

def require_role(*roles: str):
    """Factory for role-based access control."""
    async def checker(current_user: User = Depends(get_current_user)):
        if current_user.role not in roles:
            raise HTTPException(status_code=403, detail="Insufficient permissions")
        return current_user
    return checker
python
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")

async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db),
) -> User:
    payload = decode_jwt(token)
    user = await db.get(User, payload["sub"])
    if not user:
        raise HTTPException(status_code=401, detail="Invalid token")
    return user

def require_role(*roles: str):
    """基于角色的访问控制工厂函数。"""
    async def checker(current_user: User = Depends(get_current_user)):
        if current_user.role not in roles:
            raise HTTPException(status_code=403, detail="Insufficient permissions")
        return current_user
    return checker

Cursor-based pagination helper

基于游标的分页工具

python
import base64
from datetime import datetime

def encode_cursor(dt: datetime) -> str:
    return base64.urlsafe_b64encode(dt.isoformat().encode()).decode()

def decode_cursor(cursor: str) -> datetime:
    return datetime.fromisoformat(base64.urlsafe_b64decode(cursor).decode())
python
import base64
from datetime import datetime

def encode_cursor(dt: datetime) -> str:
    return base64.urlsafe_b64encode(dt.isoformat().encode()).decode()

def decode_cursor(cursor: str) -> datetime:
    return datetime.fromisoformat(base64.urlsafe_b64decode(cursor).decode())

Error responses

错误响应

Always use FastAPI's
HTTPException
with consistent detail messages. For validation errors, Pydantic v2 handles them automatically via
RequestValidationError
(422).
python
undefined
始终使用FastAPI的
HTTPException
并保持错误信息一致。对于验证错误,Pydantic v2会通过
RequestValidationError
自动处理(状态码422)。
python
undefined

404 — not found

404 — 资源不存在

raise HTTPException(status_code=404, detail="Resource not found")
raise HTTPException(status_code=404, detail="Resource not found")

409 — conflict (duplicate)

409 — 冲突(重复资源)

raise HTTPException(status_code=409, detail="Resource with this name already exists")
raise HTTPException(status_code=409, detail="Resource with this name already exists")

403 — forbidden

403 — 禁止访问

raise HTTPException(status_code=403, detail="Not allowed to modify this resource")
undefined
raise HTTPException(status_code=403, detail="Not allowed to modify this resource")
undefined

Checklist before finishing

完成前检查清单

  • All endpoints return proper status codes (201 for POST, 204 for DELETE)
  • Pydantic schemas use
    model_config = ConfigDict(from_attributes=True)
    for ORM mode
  • List endpoint has pagination with configurable limit
  • Auth dependency is applied to all non-public endpoints
  • Tests cover: happy path, not found, unauthorized, validation errors
  • Router is registered in the main FastAPI app
  • Database model has proper indexes on filtered/sorted columns
  • 所有端点返回正确的状态码(POST返回201,DELETE返回204)
  • Pydantic模式使用
    model_config = ConfigDict(from_attributes=True)
    开启ORM模式
  • 列表端点具备可配置限制的分页功能
  • 所有非公开端点都应用了认证依赖
  • 测试覆盖:正常流程、资源不存在、未授权、验证错误
  • 路由已注册到主FastAPI应用
  • 数据库模型在过滤/排序字段上设置了正确的索引