fastapi-advanced
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseFastAPI Advanced Patterns ()
FastAPI 高级模式
Production-ready FastAPI patterns for modern Python backends.
适用于现代Python后端的生产级FastAPI模式。
Lifespan Management ()
生命周期管理
Modern Lifespan Context Manager
现代生命周期上下文管理器
python
from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
import redis.asyncio as redis
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan with resource management."""
# Startup
app.state.db_engine = create_async_engine(
settings.database_url,
pool_size=5,
max_overflow=10,
)
app.state.redis = redis.from_url(settings.redis_url)
# Health check connections
async with app.state.db_engine.connect() as conn:
await conn.execute(text("SELECT 1"))
await app.state.redis.ping()
yield # Application runs
# Shutdown
await app.state.db_engine.dispose()
await app.state.redis.close()
app = FastAPI(lifespan=lifespan)python
from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
import redis.asyncio as redis
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期与资源管理。"""
# 启动阶段
app.state.db_engine = create_async_engine(
settings.database_url,
pool_size=5,
max_overflow=10,
)
app.state.redis = redis.from_url(settings.redis_url)
# 健康检查连接
async with app.state.db_engine.connect() as conn:
await conn.execute(text("SELECT 1"))
await app.state.redis.ping()
yield # 应用运行阶段
# 关闭阶段
await app.state.db_engine.dispose()
await app.state.redis.close()
app = FastAPI(lifespan=lifespan)Lifespan with Services
带服务的生命周期
python
from app.services import EmbeddingsService, LLMService
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialize services
app.state.embeddings = EmbeddingsService(
model=settings.embedding_model,
batch_size=100,
)
app.state.llm = LLMService(
providers=["openai", "anthropic"],
default="anthropic",
)
# Warm up models (optional)
await app.state.embeddings.warmup()
yield
# Cleanup
await app.state.embeddings.close()
await app.state.llm.close()python
from app.services import EmbeddingsService, LLMService
@asynccontextmanager
async def lifespan(app: FastAPI):
# 初始化服务
app.state.embeddings = EmbeddingsService(
model=settings.embedding_model,
batch_size=100,
)
app.state.llm = LLMService(
providers=["openai", "anthropic"],
default="anthropic",
)
# 预热模型(可选)
await app.state.embeddings.warmup()
yield
# 清理资源
await app.state.embeddings.close()
await app.state.llm.close()Dependency Injection Patterns
依赖注入模式
Database Session
数据库会话
python
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import Depends, Request
async def get_db(request: Request) -> AsyncGenerator[AsyncSession, None]:
"""Yield database session from app state."""
async with AsyncSession(
request.app.state.db_engine,
expire_on_commit=False,
) as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raisepython
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import Depends, Request
async def get_db(request: Request) -> AsyncGenerator[AsyncSession, None]:
"""从应用状态中生成数据库会话。"""
async with AsyncSession(
request.app.state.db_engine,
expire_on_commit=False,
) as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raiseService Dependencies
服务依赖
python
from functools import lru_cache
class AnalysisService:
def __init__(
self,
db: AsyncSession,
embeddings: EmbeddingsService,
llm: LLMService,
):
self.db = db
self.embeddings = embeddings
self.llm = llm
def get_analysis_service(
db: AsyncSession = Depends(get_db),
request: Request = None,
) -> AnalysisService:
return AnalysisService(
db=db,
embeddings=request.app.state.embeddings,
llm=request.app.state.llm,
)
@router.post("/analyses")
async def create_analysis(
data: AnalysisCreate,
service: AnalysisService = Depends(get_analysis_service),
):
return await service.create(data)python
from functools import lru_cache
class AnalysisService:
def __init__(
self,
db: AsyncSession,
embeddings: EmbeddingsService,
llm: LLMService,
):
self.db = db
self.embeddings = embeddings
self.llm = llm
def get_analysis_service(
db: AsyncSession = Depends(get_db),
request: Request = None,
) -> AnalysisService:
return AnalysisService(
db=db,
embeddings=request.app.state.embeddings,
llm=request.app.state.llm,
)
@router.post("/analyses")
async def create_analysis(
data: AnalysisCreate,
service: AnalysisService = Depends(get_analysis_service),
):
return await service.create(data)Cached Dependencies
缓存依赖
python
from functools import lru_cache
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
database_url: str
redis_url: str
api_key: str
model_config = {"env_file": ".env"}
@lru_cache
def get_settings() -> Settings:
return Settings()python
from functools import lru_cache
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
database_url: str
redis_url: str
api_key: str
model_config = {"env_file": ".env"}
@lru_cache
def get_settings() -> Settings:
return Settings()Usage in dependencies
在依赖中使用
def get_db_url(settings: Settings = Depends(get_settings)) -> str:
return settings.database_url
undefineddef get_db_url(settings: Settings = Depends(get_settings)) -> str:
return settings.database_url
undefinedAuthenticated User
已认证用户
python
from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Security(security),
db: AsyncSession = Depends(get_db),
) -> User:
token = credentials.credentials
payload = decode_jwt(token)
user = await db.get(User, payload["sub"])
if not user:
raise HTTPException(401, "Invalid credentials")
return user
async def get_admin_user(
user: User = Depends(get_current_user),
) -> User:
if not user.is_admin:
raise HTTPException(403, "Admin access required")
return userpython
from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Security(security),
db: AsyncSession = Depends(get_db),
) -> User:
token = credentials.credentials
payload = decode_jwt(token)
user = await db.get(User, payload["sub"])
if not user:
raise HTTPException(401, "无效凭证")
return user
async def get_admin_user(
user: User = Depends(get_current_user),
) -> User:
if not user.is_admin:
raise HTTPException(403, "需要管理员权限")
return userMiddleware Patterns
中间件模式
Request ID Middleware
请求ID中间件
python
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
class RequestIDMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
request.state.request_id = request_id
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
app.add_middleware(RequestIDMiddleware)python
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
class RequestIDMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
request.state.request_id = request_id
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
app.add_middleware(RequestIDMiddleware)Timing Middleware
计时中间件
python
import time
from starlette.middleware.base import BaseHTTPMiddleware
class TimingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start
response.headers["X-Response-Time"] = f"{duration:.3f}s"
return responsepython
import time
from starlette.middleware.base import BaseHTTPMiddleware
class TimingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start
response.headers["X-Response-Time"] = f"{duration:.3f}s"
return responseStructured Logging Middleware
结构化日志中间件
python
import structlog
from starlette.middleware.base import BaseHTTPMiddleware
logger = structlog.get_logger()
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
log = logger.bind(
request_id=getattr(request.state, "request_id", None),
method=request.method,
path=request.url.path,
)
try:
response = await call_next(request)
log.info(
"request_completed",
status_code=response.status_code,
)
return response
except Exception as exc:
log.exception("request_failed", error=str(exc))
raisepython
import structlog
from starlette.middleware.base import BaseHTTPMiddleware
logger = structlog.get_logger()
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
log = logger.bind(
request_id=getattr(request.state, "request_id", None),
method=request.method,
path=request.url.path,
)
try:
response = await call_next(request)
log.info(
"请求完成",
status_code=response.status_code,
)
return response
except Exception as exc:
log.exception("请求失败", error=str(exc))
raiseCORS Configuration
CORS配置
python
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
allow_headers=["*"],
expose_headers=["X-Request-ID", "X-Response-Time"],
)python
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
allow_headers=["*"],
expose_headers=["X-Request-ID", "X-Response-Time"],
)Settings with Pydantic
Pydantic配置
python
from pydantic import Field, field_validator, PostgresDsn
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
)
# Database
database_url: PostgresDsn
db_pool_size: int = Field(default=5, ge=1, le=20)
db_max_overflow: int = Field(default=10, ge=0, le=50)
# Redis
redis_url: str = "redis://localhost:6379"
# API
api_key: str = Field(min_length=32)
debug: bool = False
# LLM
openai_api_key: str | None = None
anthropic_api_key: str | None = None
@field_validator("database_url", mode="before")
@classmethod
def validate_database_url(cls, v: str) -> str:
if v and "+asyncpg" not in v:
return v.replace("postgresql://", "postgresql+asyncpg://")
return v
@property
def async_database_url(self) -> str:
return str(self.database_url)python
from pydantic import Field, field_validator, PostgresDsn
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
)
# 数据库
database_url: PostgresDsn
db_pool_size: int = Field(default=5, ge=1, le=20)
db_max_overflow: int = Field(default=10, ge=0, le=50)
# Redis
redis_url: str = "redis://localhost:6379"
# API
api_key: str = Field(min_length=32)
debug: bool = False
# LLM
openai_api_key: str | None = None
anthropic_api_key: str | None = None
@field_validator("database_url", mode="before")
@classmethod
def validate_database_url(cls, v: str) -> str:
if v and "+asyncpg" not in v:
return v.replace("postgresql://", "postgresql+asyncpg://")
return v
@property
def async_database_url(self) -> str:
return str(self.database_url)Exception Handlers
异常处理器
python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from sqlalchemy.exc import IntegrityError
from app.core.exceptions import ProblemException
@app.exception_handler(ProblemException)
async def problem_exception_handler(request: Request, exc: ProblemException):
return JSONResponse(
status_code=exc.status_code,
content=exc.to_problem_detail(),
media_type="application/problem+json",
)
@app.exception_handler(IntegrityError)
async def integrity_error_handler(request: Request, exc: IntegrityError):
return JSONResponse(
status_code=409,
content={
"type": "https://api.example.com/problems/conflict",
"title": "Conflict",
"status": 409,
"detail": "Resource already exists or constraint violated",
},
media_type="application/problem+json",
)python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from sqlalchemy.exc import IntegrityError
from app.core.exceptions import ProblemException
@app.exception_handler(ProblemException)
async def problem_exception_handler(request: Request, exc: ProblemException):
return JSONResponse(
status_code=exc.status_code,
content=exc.to_problem_detail(),
media_type="application/problem+json",
)
@app.exception_handler(IntegrityError)
async def integrity_error_handler(request: Request, exc: IntegrityError):
return JSONResponse(
status_code=409,
content={
"type": "https://api.example.com/problems/conflict",
"title": "冲突",
"status": 409,
"detail": "资源已存在或违反约束",
},
media_type="application/problem+json",
)Response Optimization
响应优化
python
from fastapi.responses import ORJSONResponsepython
from fastapi.responses import ORJSONResponseUse orjson for faster JSON serialization
使用orjson实现更快的JSON序列化
app = FastAPI(default_response_class=ORJSONResponse)
app = FastAPI(default_response_class=ORJSONResponse)
Streaming response
流式响应
from fastapi.responses import StreamingResponse
@router.get("/export")
async def export_data():
async def generate():
async for chunk in fetch_large_dataset():
yield json.dumps(chunk) + "\n"
return StreamingResponse(
generate(),
media_type="application/x-ndjson",
)undefinedfrom fastapi.responses import StreamingResponse
@router.get("/export")
async def export_data():
async def generate():
async for chunk in fetch_large_dataset():
yield json.dumps(chunk) + "\n"
return StreamingResponse(
generate(),
media_type="application/x-ndjson",
)undefinedHealth Checks
健康检查
python
from fastapi import APIRouter
health_router = APIRouter(tags=["health"])
@health_router.get("/health")
async def health_check(request: Request):
checks = {}
# Database
try:
async with request.app.state.db_engine.connect() as conn:
await conn.execute(text("SELECT 1"))
checks["database"] = "healthy"
except Exception as e:
checks["database"] = f"unhealthy: {e}"
# Redis
try:
await request.app.state.redis.ping()
checks["redis"] = "healthy"
except Exception as e:
checks["redis"] = f"unhealthy: {e}"
status = "healthy" if all(v == "healthy" for v in checks.values()) else "unhealthy"
return {"status": status, "checks": checks}python
from fastapi import APIRouter
health_router = APIRouter(tags=["health"])
@health_router.get("/health")
async def health_check(request: Request):
checks = {}
# 数据库检查
try:
async with request.app.state.db_engine.connect() as conn:
await conn.execute(text("SELECT 1"))
checks["database"] = "健康"
except Exception as e:
checks["database"] = f"不健康: {e}"
# Redis检查
try:
await request.app.state.redis.ping()
checks["redis"] = "健康"
except Exception as e:
checks["redis"] = f"不健康: {e}"
status = "健康" if all(v == "健康" for v in checks.values()) else "不健康"
return {"status": status, "checks": checks}Anti-Patterns (FORBIDDEN)
反模式(禁止使用)
python
undefinedpython
undefinedNEVER use global state
切勿使用全局状态
db_session = None # Global mutable state!
db_session = None # 全局可变状态!
NEVER block the event loop
切勿阻塞事件循环
def sync_db_query(): # Blocking in async context!
return session.query(User).all()
def sync_db_query(): # 在异步上下文中阻塞!
return session.query(User).all()
NEVER skip dependency injection
切勿跳过依赖注入
@router.get("/users")
async def get_users():
db = create_session() # Creating session in route!
return db.query(User).all()
@router.get("/users")
async def get_users():
db = create_session() # 在路由中创建会话!
return db.query(User).all()
NEVER ignore lifespan cleanup
切勿忽略生命周期清理
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.pool = create_pool()
yield
# Missing cleanup! Pool never closed
undefined@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.pool = create_pool()
yield
# 缺少清理!连接池从未关闭
undefinedKey Decisions
关键决策
| Decision | Recommendation |
|---|---|
| Lifespan | Use |
| Dependencies | Class-based services with DI |
| Settings | Pydantic Settings with |
| Response | ORJSONResponse for performance |
| Middleware | Order: CORS → RequestID → Timing → Logging |
| Health | Check all critical dependencies |
| 决策 | 推荐方案 |
|---|---|
| 生命周期 | 使用 |
| 依赖 | 基于类的服务与依赖注入 |
| 配置 | 结合 |
| 响应 | 使用ORJSONResponse提升性能 |
| 中间件 | 顺序:CORS → RequestID → 计时 → 日志 |
| 健康检查 | 检查所有关键依赖 |
Available Scripts
可用脚本
-
- Context-aware FastAPI application generator
scripts/create-fastapi-app.md- Auto-detects: Python version, database type, Redis usage, project structure
- Usage:
/create-fastapi-app [app-name] - Uses and
$ARGUMENTSfor project-specific configuration!command - Generates production-ready app with detected dependencies
-
- Static FastAPI application template
assets/fastapi-app-template.py
-
- 上下文感知的FastAPI应用生成器
scripts/create-fastapi-app.md- 自动检测:Python版本、数据库类型、Redis使用情况、项目结构
- 使用方式:
/create-fastapi-app [app-name] - 利用和
$ARGUMENTS进行项目特定配置!command - 生成包含检测到的依赖的生产级应用
-
- 静态FastAPI应用模板
assets/fastapi-app-template.py
Related Skills
相关技能
- - Service layer patterns
clean-architecture - - SQLAlchemy models
database-schema-designer - - Logging and metrics
observability-monitoring
- - 服务层模式
clean-architecture - - SQLAlchemy模型
database-schema-designer - - 日志与指标
observability-monitoring
Capability Details
能力详情
lifespan
lifespan
Keywords: lifespan, startup, shutdown, asynccontextmanager
Solves:
- FastAPI startup/shutdown
- Resource management in FastAPI
关键词: lifespan, startup, shutdown, asynccontextmanager
解决问题:
- FastAPI启动/关闭流程
- FastAPI中的资源管理
dependencies
dependencies
Keywords: dependency injection, Depends, get_db, service dependency
Solves:
- FastAPI dependency injection patterns
- Reusable dependencies
关键词: dependency injection, Depends, get_db, service dependency
解决问题:
- FastAPI依赖注入模式
- 可复用依赖
middleware
middleware
Keywords: middleware, request id, timing, cors, logging middleware
Solves:
- Custom FastAPI middleware
- Request/response interceptors
关键词: middleware, request id, timing, cors, logging middleware
解决问题:
- 自定义FastAPI中间件
- 请求/响应拦截器
settings
settings
Keywords: settings, pydantic settings, env, configuration
Solves:
- FastAPI configuration management
- Environment variables
关键词: settings, pydantic settings, env, configuration
解决问题:
- FastAPI配置管理
- 环境变量处理
health-checks
health-checks
Keywords: health check, readiness, liveness, health endpoint
Solves:
- Kubernetes health checks
- Service health monitoring
关键词: health check, readiness, liveness, health endpoint
解决问题:
- Kubernetes健康检查
- 服务健康监控