fastapi-endpoint
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseFastAPI Endpoint Builder
FastAPI 端点构建指南
When to use
适用场景
Use this skill when you need to:
- Add new API endpoints to an existing FastAPI project
- Build CRUD operations with proper validation and error handling
- Set up authenticated endpoints with dependency injection
- Create async database queries with SQLAlchemy 2.0
- Generate complete test coverage for API routes
当你需要以下操作时,可使用本技能:
- 为现有FastAPI项目添加新的API端点
- 构建具备完善验证和错误处理的CRUD操作
- 通过依赖注入设置已认证的端点
- 使用SQLAlchemy 2.0创建异步数据库查询
- 为API路由生成完整的测试覆盖
Phase 1: Explore (Plan Mode)
阶段1:探索(规划模式)
Enter plan mode. Before writing any code, explore the existing project to understand:
进入规划模式。在编写任何代码之前,先探索现有项目以了解:
Project structure
项目结构
- Find the FastAPI app entry point (,
main.py, orapp.py)app/__init__.py - Identify the router organization pattern (single file vs directory)
routers/ - Check for existing ,
models/,schemas/, orcrud/directoriesservices/ - Look at or
pyproject.tomlfor installed dependenciesrequirements.txt
- 找到FastAPI应用的入口文件(、
main.py或app.py)app/__init__.py - 识别路由组织模式(单文件或目录)
routers/ - 检查是否存在现有的、
models/、schemas/或crud/目录services/ - 查看或
pyproject.toml了解已安装的依赖requirements.txt
Existing patterns
现有模式
- How are existing endpoints structured? (function-based vs class-based)
- What ORM is used? (SQLAlchemy 2.0 async, Tortoise, raw SQL, none)
- How is the database session managed? (, middleware, other)
Depends(get_db) - What auth pattern exists? (OAuth2PasswordBearer, API key header, custom)
- Are there existing Pydantic base models or shared schemas?
- What response format is standard? (direct model, wrapped )
{"data": ..., "meta": ...}
- 现有端点的结构是怎样的?(基于函数或基于类)
- 使用的是哪种ORM?(SQLAlchemy 2.0异步版、Tortoise、原生SQL、无ORM)
- 数据库会话如何管理?(、中间件或其他方式)
Depends(get_db) - 采用的是哪种认证模式?(OAuth2PasswordBearer、API密钥头、自定义认证)
- 是否存在现有的Pydantic基础模型或共享模式?
- 标准的响应格式是什么?(直接返回模型、包装为格式)
{"data": ..., "meta": ...}
Test patterns
测试模式
- Where do tests live? (,
tests/,test_*.py)*_test.py - What test client is used? (httpx AsyncClient, TestClient, pytest-asyncio)
- Are there test fixtures for database and auth?
- 测试文件存放位置?(目录、
tests/或test_*.py文件)*_test.py - 使用的是哪种测试客户端?(httpx AsyncClient、TestClient、pytest-asyncio)
- 是否有针对数据库和认证的测试夹具?
Phase 2: Interview (AskUserQuestion)
阶段2:沟通(询问用户问题)
Use AskUserQuestion to clarify requirements. Ask in rounds — do NOT dump all questions at once.
使用询问功能明确需求。分轮次提问——不要一次性抛出所有问题。
Round 1: Core endpoint
第一轮:核心端点
Question: "What resource does this endpoint manage?"
Header: "Resource"
Options:
- "New resource (I'll describe the fields)" — Creating a new data model from scratch
- "Existing model (extend it)" — Adding endpoints for a model that already exists in the codebase
- "Relationship endpoint (nested)" — e.g., /users/{id}/orders — endpoint on a related resource
Question: "Which HTTP methods do you need?"
Header: "Methods"
multiSelect: true
Options:
- "Full CRUD (GET list, GET detail, POST, PUT/PATCH, DELETE)" — All standard operations
- "Read-only (GET list + GET detail)" — No mutations
- "Custom action (POST /resource/{id}/action)" — Business logic endpoint, not standard CRUD问题:"该端点管理的是什么资源?"
标题:"资源类型"
选项:
- "新资源(我会描述字段)" —— 从零开始创建新的数据模型
- "现有模型(扩展)" —— 为代码库中已存在的模型添加端点
- "关联端点(嵌套)" —— 例如 /users/{id}/orders —— 关联资源的端点
问题:"你需要哪些HTTP方法?"
标题:"请求方法"
可多选:是
选项:
- "完整CRUD(列表查询、详情查询、创建、更新/部分更新、删除)" —— 所有标准操作
- "仅读取(列表查询 + 详情查询)" —— 不包含修改操作
- "自定义操作(POST /resource/{id}/action)" —— 业务逻辑端点,非标准CRUDRound 2: Data model (if new resource)
第二轮:数据模型(如果是新资源)
Question: "What fields does the resource have? (describe briefly)"
Header: "Fields"
Options:
- "Simple (< 6 fields, basic types)" — Strings, ints, booleans, dates
- "Medium (6-15 fields, some relations)" — Includes foreign keys or enums
- "Complex (nested objects, polymorphic)" — JSON fields, discriminated unions, computed fields问题:"该资源包含哪些字段?(简要描述)"
标题:"字段复杂度"
选项:
- "简单(少于6个字段,基础类型)" —— 字符串、整数、布尔值、日期
- "中等(6-15个字段,包含关联)" —— 包含外键或枚举类型
- "复杂(嵌套对象、多态)" —— JSON字段、鉴别联合、计算字段Round 3: Auth and access control
第三轮:认证与访问控制
Question: "How should this endpoint be authenticated?"
Header: "Auth"
Options:
- "JWT Bearer token (Recommended)" — OAuth2PasswordBearer with JWT decode
- "API Key header" — X-API-Key header validation
- "No auth (public)" — Open endpoint, no authentication required
- "Use existing auth" — Reuse the auth dependency already in the project
Question: "Do you need role-based access control?"
Header: "RBAC"
Options:
- "No — any authenticated user" — Single permission level
- "Yes — role check (admin, user, etc.)" — Require specific roles per endpoint
- "Yes — ownership check" — Users can only access their own resources问题:"该端点应采用哪种认证方式?"
标题:"认证方式"
选项:
- "JWT Bearer令牌(推荐)" —— 带JWT解码的OAuth2PasswordBearer
- "API密钥头" —— 验证X-API-Key请求头
- "无认证(公开)" —— 开放端点,无需认证
- "使用现有认证" —— 复用项目中已有的认证依赖
问题:"是否需要基于角色的访问控制?"
标题:"RBAC权限"
选项:
- "不需要 —— 任何已认证用户均可访问" —— 单一权限级别
- "需要 —— 角色校验(管理员、普通用户等)" —— 每个端点要求特定角色
- "需要 —— 归属校验" —— 用户仅能访问自己的资源Round 4: Pagination, filtering, caching
第四轮:分页、过滤、缓存
Question: "What pagination style for list endpoints?"
Header: "Pagination"
Options:
- "Cursor-based (Recommended)" — Best for real-time data, no offset drift
- "Offset/limit" — Simple, good for admin panels with page numbers
- "No pagination" — Small datasets, return all results
Question: "Do you need response caching?"
Header: "Caching"
Options:
- "No caching" — Fresh data on every request
- "Cache-Control headers" — Client-side caching via HTTP headers
- "Redis/in-memory cache" — Server-side caching with TTL问题:"列表端点采用哪种分页方式?"
标题:"分页策略"
选项:
- "基于游标(推荐)" —— 最适合实时数据,无偏移漂移问题
- "偏移/限制" —— 实现简单,适用于带页码的管理面板
- "不分页" —— 小型数据集,返回所有结果
问题:"是否需要响应缓存?"
标题:"缓存设置"
选项:
- "不缓存" —— 每次请求返回最新数据
- "Cache-Control头" —— 通过HTTP头实现客户端缓存
- "Redis/内存缓存" —— 带TTL的服务端缓存Phase 3: Plan (ExitPlanMode)
阶段3:规划(退出规划模式)
Write a concrete implementation plan covering:
- Files to create/modify — exact paths based on project structure discovered in Phase 1
- Pydantic schemas — ,
Create,Update, andResponseschemas with field typesList - SQLAlchemy model — table name, columns, relationships, indexes
- CRUD/service layer — async functions for each operation
- Router — endpoint signatures, status codes, response models
- Dependencies — auth, pagination, filtering dependencies
- Tests — test cases for happy path, validation errors, auth failures, not found
Present via ExitPlanMode for user approval.
编写具体的实现方案,涵盖:
- 需创建/修改的文件 —— 基于阶段1发现的项目结构给出准确路径
- Pydantic模式 —— 包含字段类型的、
Create、Update和Response模式List - SQLAlchemy模型 —— 表名、列、关联关系、索引
- CRUD/服务层 —— 每个操作对应的异步函数
- 路由 —— 端点签名、状态码、响应模型
- 依赖项 —— 认证、分页、过滤依赖
- 测试 —— 正常流程、验证错误、认证失败、资源不存在等测试用例
通过退出规划模式提交方案,等待用户确认。
Phase 4: Execute
阶段4:执行
After approval, implement following this order:
获得批准后,按照以下顺序实现:
Step 1: Pydantic schemas
步骤1:Pydantic模式
python
from pydantic import BaseModel, ConfigDict
from datetime import datetime
from uuid import UUID
class ResourceBase(BaseModel):
"""Shared fields between create and response."""
name: str
# ... fields from interview
class ResourceCreate(ResourceBase):
"""Fields required to create the resource."""
pass
class ResourceUpdate(BaseModel):
"""All fields optional for partial updates."""
name: str | None = None
class ResourceResponse(ResourceBase):
"""Full resource with DB-generated fields."""
model_config = ConfigDict(from_attributes=True)
id: UUID
created_at: datetime
updated_at: datetime
class ResourceListResponse(BaseModel):
"""Paginated list response."""
data: list[ResourceResponse]
next_cursor: str | None = None
has_more: boolpython
from pydantic import BaseModel, ConfigDict
from datetime import datetime
from uuid import UUID
class ResourceBase(BaseModel):
"""创建和响应共享的字段。"""
name: str
# ... 来自沟通阶段的字段
class ResourceCreate(ResourceBase):
"""创建资源所需的字段。"""
pass
class ResourceUpdate(BaseModel):
"""部分更新时所有字段可选。"""
name: str | None = None
class ResourceResponse(ResourceBase):
"""包含数据库生成字段的完整资源。"""
model_config = ConfigDict(from_attributes=True)
id: UUID
created_at: datetime
updated_at: datetime
class ResourceListResponse(BaseModel):
"""分页列表响应。"""
data: list[ResourceResponse]
next_cursor: str | None = None
has_more: boolStep 2: SQLAlchemy model
步骤2:SQLAlchemy模型
python
from sqlalchemy import Column, String, DateTime, func
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
import uuid
from app.database import Base
class Resource(Base):
__tablename__ = "resources"
id = Column(PG_UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String, nullable=False, index=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())python
from sqlalchemy import Column, String, DateTime, func
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
import uuid
from app.database import Base
class Resource(Base):
__tablename__ = "resources"
id = Column(PG_UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String, nullable=False, index=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())Step 3: CRUD/service layer
步骤3:CRUD/服务层
python
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from uuid import UUID
async def get_resource(db: AsyncSession, resource_id: UUID) -> Resource | None:
result = await db.execute(select(Resource).where(Resource.id == resource_id))
return result.scalar_one_or_none()
async def list_resources(
db: AsyncSession,
cursor: str | None = None,
limit: int = 20,
) -> tuple[list[Resource], str | None]:
query = select(Resource).order_by(Resource.created_at.desc()).limit(limit + 1)
if cursor:
query = query.where(Resource.created_at < decode_cursor(cursor))
result = await db.execute(query)
items = list(result.scalars().all())
next_cursor = encode_cursor(items[-1].created_at) if len(items) > limit else None
return items[:limit], next_cursor
async def create_resource(db: AsyncSession, data: ResourceCreate) -> Resource:
resource = Resource(**data.model_dump())
db.add(resource)
await db.commit()
await db.refresh(resource)
return resource
async def update_resource(
db: AsyncSession, resource_id: UUID, data: ResourceUpdate
) -> Resource | None:
resource = await get_resource(db, resource_id)
if not resource:
return None
for field, value in data.model_dump(exclude_unset=True).items():
setattr(resource, field, value)
await db.commit()
await db.refresh(resource)
return resource
async def delete_resource(db: AsyncSession, resource_id: UUID) -> bool:
resource = await get_resource(db, resource_id)
if not resource:
return False
await db.delete(resource)
await db.commit()
return Truepython
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from uuid import UUID
async def get_resource(db: AsyncSession, resource_id: UUID) -> Resource | None:
result = await db.execute(select(Resource).where(Resource.id == resource_id))
return result.scalar_one_or_none()
async def list_resources(
db: AsyncSession,
cursor: str | None = None,
limit: int = 20,
) -> tuple[list[Resource], str | None]:
query = select(Resource).order_by(Resource.created_at.desc()).limit(limit + 1)
if cursor:
query = query.where(Resource.created_at < decode_cursor(cursor))
result = await db.execute(query)
items = list(result.scalars().all())
next_cursor = encode_cursor(items[-1].created_at) if len(items) > limit else None
return items[:limit], next_cursor
async def create_resource(db: AsyncSession, data: ResourceCreate) -> Resource:
resource = Resource(**data.model_dump())
db.add(resource)
await db.commit()
await db.refresh(resource)
return resource
async def update_resource(
db: AsyncSession, resource_id: UUID, data: ResourceUpdate
) -> Resource | None:
resource = await get_resource(db, resource_id)
if not resource:
return None
for field, value in data.model_dump(exclude_unset=True).items():
setattr(resource, field, value)
await db.commit()
await db.refresh(resource)
return resource
async def delete_resource(db: AsyncSession, resource_id: UUID) -> bool:
resource = await get_resource(db, resource_id)
if not resource:
return False
await db.delete(resource)
await db.commit()
return TrueStep 4: Router with dependencies
步骤4:带依赖的路由
python
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
from uuid import UUID
router = APIRouter(prefix="/resources", tags=["resources"])
@router.get("", response_model=ResourceListResponse)
async def list_resources_endpoint(
cursor: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user), # if auth required
):
items, next_cursor = await list_resources(db, cursor=cursor, limit=limit)
return ResourceListResponse(
data=items,
next_cursor=next_cursor,
has_more=next_cursor is not None,
)
@router.get("/{resource_id}", response_model=ResourceResponse)
async def get_resource_endpoint(
resource_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
resource = await get_resource(db, resource_id)
if not resource:
raise HTTPException(status_code=404, detail="Resource not found")
return resource
@router.post("", response_model=ResourceResponse, status_code=status.HTTP_201_CREATED)
async def create_resource_endpoint(
data: ResourceCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
return await create_resource(db, data)
@router.patch("/{resource_id}", response_model=ResourceResponse)
async def update_resource_endpoint(
resource_id: UUID,
data: ResourceUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
resource = await update_resource(db, resource_id, data)
if not resource:
raise HTTPException(status_code=404, detail="Resource not found")
return resource
@router.delete("/{resource_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_resource_endpoint(
resource_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
deleted = await delete_resource(db, resource_id)
if not deleted:
raise HTTPException(status_code=404, detail="Resource not found")python
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
from uuid import UUID
router = APIRouter(prefix="/resources", tags=["resources"])
@router.get("", response_model=ResourceListResponse)
async def list_resources_endpoint(
cursor: str | None = Query(None),
limit: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user), # 如果需要认证
):
items, next_cursor = await list_resources(db, cursor=cursor, limit=limit)
return ResourceListResponse(
data=items,
next_cursor=next_cursor,
has_more=next_cursor is not None,
)
@router.get("/{resource_id}", response_model=ResourceResponse)
async def get_resource_endpoint(
resource_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
resource = await get_resource(db, resource_id)
if not resource:
raise HTTPException(status_code=404, detail="Resource not found")
return resource
@router.post("", response_model=ResourceResponse, status_code=status.HTTP_201_CREATED)
async def create_resource_endpoint(
data: ResourceCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
return await create_resource(db, data)
@router.patch("/{resource_id}", response_model=ResourceResponse)
async def update_resource_endpoint(
resource_id: UUID,
data: ResourceUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
resource = await update_resource(db, resource_id, data)
if not resource:
raise HTTPException(status_code=404, detail="Resource not found")
return resource
@router.delete("/{resource_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_resource_endpoint(
resource_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
deleted = await delete_resource(db, resource_id)
if not deleted:
raise HTTPException(status_code=404, detail="Resource not found")Step 5: Tests
步骤5:测试
python
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.fixture
async def client():
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as ac:
yield ac
@pytest.mark.asyncio
async def test_create_resource(client: AsyncClient, auth_headers: dict):
response = await client.post(
"/resources",
json={"name": "Test Resource"},
headers=auth_headers,
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "Test Resource"
assert "id" in data
@pytest.mark.asyncio
async def test_get_resource_not_found(client: AsyncClient, auth_headers: dict):
response = await client.get(
"/resources/00000000-0000-0000-0000-000000000000",
headers=auth_headers,
)
assert response.status_code == 404
@pytest.mark.asyncio
async def test_list_resources_pagination(client: AsyncClient, auth_headers: dict):
# Create multiple resources first
for i in range(5):
await client.post(
"/resources",
json={"name": f"Resource {i}"},
headers=auth_headers,
)
response = await client.get("/resources?limit=2", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert len(data["data"]) == 2
assert data["has_more"] is True
assert data["next_cursor"] is not None
@pytest.mark.asyncio
async def test_create_resource_unauthorized(client: AsyncClient):
response = await client.post("/resources", json={"name": "Test"})
assert response.status_code in (401, 403)
@pytest.mark.asyncio
async def test_update_resource_partial(client: AsyncClient, auth_headers: dict):
# Create
create_resp = await client.post(
"/resources",
json={"name": "Original"},
headers=auth_headers,
)
resource_id = create_resp.json()["id"]
# Partial update
response = await client.patch(
f"/resources/{resource_id}",
json={"name": "Updated"},
headers=auth_headers,
)
assert response.status_code == 200
assert response.json()["name"] == "Updated"
@pytest.mark.asyncio
async def test_delete_resource(client: AsyncClient, auth_headers: dict):
create_resp = await client.post(
"/resources",
json={"name": "To Delete"},
headers=auth_headers,
)
resource_id = create_resp.json()["id"]
response = await client.delete(
f"/resources/{resource_id}", headers=auth_headers
)
assert response.status_code == 204
# Verify deleted
get_resp = await client.get(
f"/resources/{resource_id}", headers=auth_headers
)
assert get_resp.status_code == 404python
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.fixture
async def client():
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as ac:
yield ac
@pytest.mark.asyncio
async def test_create_resource(client: AsyncClient, auth_headers: dict):
response = await client.post(
"/resources",
json={"name": "Test Resource"},
headers=auth_headers,
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "Test Resource"
assert "id" in data
@pytest.mark.asyncio
async def test_get_resource_not_found(client: AsyncClient, auth_headers: dict):
response = await client.get(
"/resources/00000000-0000-0000-0000-000000000000",
headers=auth_headers,
)
assert response.status_code == 404
@pytest.mark.asyncio
async def test_list_resources_pagination(client: AsyncClient, auth_headers: dict):
# 先创建多个资源
for i in range(5):
await client.post(
"/resources",
json={"name": f"Resource {i}"},
headers=auth_headers,
)
response = await client.get("/resources?limit=2", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert len(data["data"]) == 2
assert data["has_more"] is True
assert data["next_cursor"] is not None
@pytest.mark.asyncio
async def test_create_resource_unauthorized(client: AsyncClient):
response = await client.post("/resources", json={"name": "Test"})
assert response.status_code in (401, 403)
@pytest.mark.asyncio
async def test_update_resource_partial(client: AsyncClient, auth_headers: dict):
# 创建资源
create_resp = await client.post(
"/resources",
json={"name": "Original"},
headers=auth_headers,
)
resource_id = create_resp.json()["id"]
# 部分更新
response = await client.patch(
f"/resources/{resource_id}",
json={"name": "Updated"},
headers=auth_headers,
)
assert response.status_code == 200
assert response.json()["name"] == "Updated"
@pytest.mark.asyncio
async def test_delete_resource(client: AsyncClient, auth_headers: dict):
create_resp = await client.post(
"/resources",
json={"name": "To Delete"},
headers=auth_headers,
)
resource_id = create_resp.json()["id"]
response = await client.delete(
f"/resources/{resource_id}", headers=auth_headers
)
assert response.status_code == 204
# 验证已删除
get_resp = await client.get(
f"/resources/{resource_id}", headers=auth_headers
)
assert get_resp.status_code == 404Key patterns to follow
需遵循的核心模式
Dependency injection for auth
用于认证的依赖注入
python
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
payload = decode_jwt(token)
user = await db.get(User, payload["sub"])
if not user:
raise HTTPException(status_code=401, detail="Invalid token")
return user
def require_role(*roles: str):
"""Factory for role-based access control."""
async def checker(current_user: User = Depends(get_current_user)):
if current_user.role not in roles:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return current_user
return checkerpython
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
payload = decode_jwt(token)
user = await db.get(User, payload["sub"])
if not user:
raise HTTPException(status_code=401, detail="Invalid token")
return user
def require_role(*roles: str):
"""基于角色的访问控制工厂函数。"""
async def checker(current_user: User = Depends(get_current_user)):
if current_user.role not in roles:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return current_user
return checkerCursor-based pagination helper
基于游标的分页工具
python
import base64
from datetime import datetime
def encode_cursor(dt: datetime) -> str:
return base64.urlsafe_b64encode(dt.isoformat().encode()).decode()
def decode_cursor(cursor: str) -> datetime:
return datetime.fromisoformat(base64.urlsafe_b64decode(cursor).decode())python
import base64
from datetime import datetime
def encode_cursor(dt: datetime) -> str:
return base64.urlsafe_b64encode(dt.isoformat().encode()).decode()
def decode_cursor(cursor: str) -> datetime:
return datetime.fromisoformat(base64.urlsafe_b64decode(cursor).decode())Error responses
错误响应
Always use FastAPI's with consistent detail messages. For validation errors, Pydantic v2 handles them automatically via (422).
HTTPExceptionRequestValidationErrorpython
undefined始终使用FastAPI的并保持错误信息一致。对于验证错误,Pydantic v2会通过自动处理(状态码422)。
HTTPExceptionRequestValidationErrorpython
undefined404 — not found
404 — 资源不存在
raise HTTPException(status_code=404, detail="Resource not found")
raise HTTPException(status_code=404, detail="Resource not found")
409 — conflict (duplicate)
409 — 冲突(重复资源)
raise HTTPException(status_code=409, detail="Resource with this name already exists")
raise HTTPException(status_code=409, detail="Resource with this name already exists")
403 — forbidden
403 — 禁止访问
raise HTTPException(status_code=403, detail="Not allowed to modify this resource")
undefinedraise HTTPException(status_code=403, detail="Not allowed to modify this resource")
undefinedChecklist before finishing
完成前检查清单
- All endpoints return proper status codes (201 for POST, 204 for DELETE)
- Pydantic schemas use for ORM mode
model_config = ConfigDict(from_attributes=True) - List endpoint has pagination with configurable limit
- Auth dependency is applied to all non-public endpoints
- Tests cover: happy path, not found, unauthorized, validation errors
- Router is registered in the main FastAPI app
- Database model has proper indexes on filtered/sorted columns
- 所有端点返回正确的状态码(POST返回201,DELETE返回204)
- Pydantic模式使用开启ORM模式
model_config = ConfigDict(from_attributes=True) - 列表端点具备可配置限制的分页功能
- 所有非公开端点都应用了认证依赖
- 测试覆盖:正常流程、资源不存在、未授权、验证错误
- 路由已注册到主FastAPI应用
- 数据库模型在过滤/排序字段上设置了正确的索引