fastapi-advanced

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

FastAPI Advanced Patterns ()

FastAPI 高级模式

Production-ready FastAPI patterns for modern Python backends.
适用于现代Python后端的生产级FastAPI模式。

Lifespan Management ()

生命周期管理

Modern Lifespan Context Manager

现代生命周期上下文管理器

python
from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
import redis.asyncio as redis

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan with resource management."""
    # Startup
    app.state.db_engine = create_async_engine(
        settings.database_url,
        pool_size=5,
        max_overflow=10,
    )
    app.state.redis = redis.from_url(settings.redis_url)

    # Health check connections
    async with app.state.db_engine.connect() as conn:
        await conn.execute(text("SELECT 1"))
    await app.state.redis.ping()

    yield  # Application runs

    # Shutdown
    await app.state.db_engine.dispose()
    await app.state.redis.close()

app = FastAPI(lifespan=lifespan)
python
from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
import redis.asyncio as redis

@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期与资源管理。"""
    # 启动阶段
    app.state.db_engine = create_async_engine(
        settings.database_url,
        pool_size=5,
        max_overflow=10,
    )
    app.state.redis = redis.from_url(settings.redis_url)

    # 健康检查连接
    async with app.state.db_engine.connect() as conn:
        await conn.execute(text("SELECT 1"))
    await app.state.redis.ping()

    yield  # 应用运行阶段

    # 关闭阶段
    await app.state.db_engine.dispose()
    await app.state.redis.close()

app = FastAPI(lifespan=lifespan)

Lifespan with Services

带服务的生命周期

python
from app.services import EmbeddingsService, LLMService

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize services
    app.state.embeddings = EmbeddingsService(
        model=settings.embedding_model,
        batch_size=100,
    )
    app.state.llm = LLMService(
        providers=["openai", "anthropic"],
        default="anthropic",
    )

    # Warm up models (optional)
    await app.state.embeddings.warmup()

    yield

    # Cleanup
    await app.state.embeddings.close()
    await app.state.llm.close()
python
from app.services import EmbeddingsService, LLMService

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 初始化服务
    app.state.embeddings = EmbeddingsService(
        model=settings.embedding_model,
        batch_size=100,
    )
    app.state.llm = LLMService(
        providers=["openai", "anthropic"],
        default="anthropic",
    )

    # 预热模型(可选)
    await app.state.embeddings.warmup()

    yield

    # 清理资源
    await app.state.embeddings.close()
    await app.state.llm.close()

Dependency Injection Patterns

依赖注入模式

Database Session

数据库会话

python
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import Depends, Request

async def get_db(request: Request) -> AsyncGenerator[AsyncSession, None]:
    """Yield database session from app state."""
    async with AsyncSession(
        request.app.state.db_engine,
        expire_on_commit=False,
    ) as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
python
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import Depends, Request

async def get_db(request: Request) -> AsyncGenerator[AsyncSession, None]:
    """从应用状态中生成数据库会话。"""
    async with AsyncSession(
        request.app.state.db_engine,
        expire_on_commit=False,
    ) as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

Service Dependencies

服务依赖

python
from functools import lru_cache

class AnalysisService:
    def __init__(
        self,
        db: AsyncSession,
        embeddings: EmbeddingsService,
        llm: LLMService,
    ):
        self.db = db
        self.embeddings = embeddings
        self.llm = llm

def get_analysis_service(
    db: AsyncSession = Depends(get_db),
    request: Request = None,
) -> AnalysisService:
    return AnalysisService(
        db=db,
        embeddings=request.app.state.embeddings,
        llm=request.app.state.llm,
    )

@router.post("/analyses")
async def create_analysis(
    data: AnalysisCreate,
    service: AnalysisService = Depends(get_analysis_service),
):
    return await service.create(data)
python
from functools import lru_cache

class AnalysisService:
    def __init__(
        self,
        db: AsyncSession,
        embeddings: EmbeddingsService,
        llm: LLMService,
    ):
        self.db = db
        self.embeddings = embeddings
        self.llm = llm

def get_analysis_service(
    db: AsyncSession = Depends(get_db),
    request: Request = None,
) -> AnalysisService:
    return AnalysisService(
        db=db,
        embeddings=request.app.state.embeddings,
        llm=request.app.state.llm,
    )

@router.post("/analyses")
async def create_analysis(
    data: AnalysisCreate,
    service: AnalysisService = Depends(get_analysis_service),
):
    return await service.create(data)

Cached Dependencies

缓存依赖

python
from functools import lru_cache
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    database_url: str
    redis_url: str
    api_key: str

    model_config = {"env_file": ".env"}

@lru_cache
def get_settings() -> Settings:
    return Settings()
python
from functools import lru_cache
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    database_url: str
    redis_url: str
    api_key: str

    model_config = {"env_file": ".env"}

@lru_cache
def get_settings() -> Settings:
    return Settings()

Usage in dependencies

在依赖中使用

def get_db_url(settings: Settings = Depends(get_settings)) -> str: return settings.database_url
undefined
def get_db_url(settings: Settings = Depends(get_settings)) -> str: return settings.database_url
undefined

Authenticated User

已认证用户

python
from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Security(security),
    db: AsyncSession = Depends(get_db),
) -> User:
    token = credentials.credentials
    payload = decode_jwt(token)

    user = await db.get(User, payload["sub"])
    if not user:
        raise HTTPException(401, "Invalid credentials")
    return user

async def get_admin_user(
    user: User = Depends(get_current_user),
) -> User:
    if not user.is_admin:
        raise HTTPException(403, "Admin access required")
    return user
python
from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Security(security),
    db: AsyncSession = Depends(get_db),
) -> User:
    token = credentials.credentials
    payload = decode_jwt(token)

    user = await db.get(User, payload["sub"])
    if not user:
        raise HTTPException(401, "无效凭证")
    return user

async def get_admin_user(
    user: User = Depends(get_current_user),
) -> User:
    if not user.is_admin:
        raise HTTPException(403, "需要管理员权限")
    return user

Middleware Patterns

中间件模式

Request ID Middleware

请求ID中间件

python
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request

class RequestIDMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
        request.state.request_id = request_id

        response = await call_next(request)
        response.headers["X-Request-ID"] = request_id
        return response

app.add_middleware(RequestIDMiddleware)
python
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request

class RequestIDMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
        request.state.request_id = request_id

        response = await call_next(request)
        response.headers["X-Request-ID"] = request_id
        return response

app.add_middleware(RequestIDMiddleware)

Timing Middleware

计时中间件

python
import time
from starlette.middleware.base import BaseHTTPMiddleware

class TimingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        start = time.perf_counter()
        response = await call_next(request)
        duration = time.perf_counter() - start

        response.headers["X-Response-Time"] = f"{duration:.3f}s"
        return response
python
import time
from starlette.middleware.base import BaseHTTPMiddleware

class TimingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        start = time.perf_counter()
        response = await call_next(request)
        duration = time.perf_counter() - start

        response.headers["X-Response-Time"] = f"{duration:.3f}s"
        return response

Structured Logging Middleware

结构化日志中间件

python
import structlog
from starlette.middleware.base import BaseHTTPMiddleware

logger = structlog.get_logger()

class LoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        log = logger.bind(
            request_id=getattr(request.state, "request_id", None),
            method=request.method,
            path=request.url.path,
        )

        try:
            response = await call_next(request)
            log.info(
                "request_completed",
                status_code=response.status_code,
            )
            return response
        except Exception as exc:
            log.exception("request_failed", error=str(exc))
            raise
python
import structlog
from starlette.middleware.base import BaseHTTPMiddleware

logger = structlog.get_logger()

class LoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        log = logger.bind(
            request_id=getattr(request.state, "request_id", None),
            method=request.method,
            path=request.url.path,
        )

        try:
            response = await call_next(request)
            log.info(
                "请求完成",
                status_code=response.status_code,
            )
            return response
        except Exception as exc:
            log.exception("请求失败", error=str(exc))
            raise

CORS Configuration

CORS配置

python
from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.cors_origins,
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
    allow_headers=["*"],
    expose_headers=["X-Request-ID", "X-Response-Time"],
)
python
from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.cors_origins,
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
    allow_headers=["*"],
    expose_headers=["X-Request-ID", "X-Response-Time"],
)

Settings with Pydantic

Pydantic配置

python
from pydantic import Field, field_validator, PostgresDsn
from pydantic_settings import BaseSettings, SettingsConfigDict

class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        case_sensitive=False,
    )

    # Database
    database_url: PostgresDsn
    db_pool_size: int = Field(default=5, ge=1, le=20)
    db_max_overflow: int = Field(default=10, ge=0, le=50)

    # Redis
    redis_url: str = "redis://localhost:6379"

    # API
    api_key: str = Field(min_length=32)
    debug: bool = False

    # LLM
    openai_api_key: str | None = None
    anthropic_api_key: str | None = None

    @field_validator("database_url", mode="before")
    @classmethod
    def validate_database_url(cls, v: str) -> str:
        if v and "+asyncpg" not in v:
            return v.replace("postgresql://", "postgresql+asyncpg://")
        return v

    @property
    def async_database_url(self) -> str:
        return str(self.database_url)
python
from pydantic import Field, field_validator, PostgresDsn
from pydantic_settings import BaseSettings, SettingsConfigDict

class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        case_sensitive=False,
    )

    # 数据库
    database_url: PostgresDsn
    db_pool_size: int = Field(default=5, ge=1, le=20)
    db_max_overflow: int = Field(default=10, ge=0, le=50)

    # Redis
    redis_url: str = "redis://localhost:6379"

    # API
    api_key: str = Field(min_length=32)
    debug: bool = False

    # LLM
    openai_api_key: str | None = None
    anthropic_api_key: str | None = None

    @field_validator("database_url", mode="before")
    @classmethod
    def validate_database_url(cls, v: str) -> str:
        if v and "+asyncpg" not in v:
            return v.replace("postgresql://", "postgresql+asyncpg://")
        return v

    @property
    def async_database_url(self) -> str:
        return str(self.database_url)

Exception Handlers

异常处理器

python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from sqlalchemy.exc import IntegrityError
from app.core.exceptions import ProblemException

@app.exception_handler(ProblemException)
async def problem_exception_handler(request: Request, exc: ProblemException):
    return JSONResponse(
        status_code=exc.status_code,
        content=exc.to_problem_detail(),
        media_type="application/problem+json",
    )

@app.exception_handler(IntegrityError)
async def integrity_error_handler(request: Request, exc: IntegrityError):
    return JSONResponse(
        status_code=409,
        content={
            "type": "https://api.example.com/problems/conflict",
            "title": "Conflict",
            "status": 409,
            "detail": "Resource already exists or constraint violated",
        },
        media_type="application/problem+json",
    )
python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from sqlalchemy.exc import IntegrityError
from app.core.exceptions import ProblemException

@app.exception_handler(ProblemException)
async def problem_exception_handler(request: Request, exc: ProblemException):
    return JSONResponse(
        status_code=exc.status_code,
        content=exc.to_problem_detail(),
        media_type="application/problem+json",
    )

@app.exception_handler(IntegrityError)
async def integrity_error_handler(request: Request, exc: IntegrityError):
    return JSONResponse(
        status_code=409,
        content={
            "type": "https://api.example.com/problems/conflict",
            "title": "冲突",
            "status": 409,
            "detail": "资源已存在或违反约束",
        },
        media_type="application/problem+json",
    )

Response Optimization

响应优化

python
from fastapi.responses import ORJSONResponse
python
from fastapi.responses import ORJSONResponse

Use orjson for faster JSON serialization

使用orjson实现更快的JSON序列化

app = FastAPI(default_response_class=ORJSONResponse)
app = FastAPI(default_response_class=ORJSONResponse)

Streaming response

流式响应

from fastapi.responses import StreamingResponse
@router.get("/export") async def export_data(): async def generate(): async for chunk in fetch_large_dataset(): yield json.dumps(chunk) + "\n"
return StreamingResponse(
    generate(),
    media_type="application/x-ndjson",
)
undefined
from fastapi.responses import StreamingResponse
@router.get("/export") async def export_data(): async def generate(): async for chunk in fetch_large_dataset(): yield json.dumps(chunk) + "\n"
return StreamingResponse(
    generate(),
    media_type="application/x-ndjson",
)
undefined

Health Checks

健康检查

python
from fastapi import APIRouter

health_router = APIRouter(tags=["health"])

@health_router.get("/health")
async def health_check(request: Request):
    checks = {}

    # Database
    try:
        async with request.app.state.db_engine.connect() as conn:
            await conn.execute(text("SELECT 1"))
        checks["database"] = "healthy"
    except Exception as e:
        checks["database"] = f"unhealthy: {e}"

    # Redis
    try:
        await request.app.state.redis.ping()
        checks["redis"] = "healthy"
    except Exception as e:
        checks["redis"] = f"unhealthy: {e}"

    status = "healthy" if all(v == "healthy" for v in checks.values()) else "unhealthy"
    return {"status": status, "checks": checks}
python
from fastapi import APIRouter

health_router = APIRouter(tags=["health"])

@health_router.get("/health")
async def health_check(request: Request):
    checks = {}

    # 数据库检查
    try:
        async with request.app.state.db_engine.connect() as conn:
            await conn.execute(text("SELECT 1"))
        checks["database"] = "健康"
    except Exception as e:
        checks["database"] = f"不健康: {e}"

    # Redis检查
    try:
        await request.app.state.redis.ping()
        checks["redis"] = "健康"
    except Exception as e:
        checks["redis"] = f"不健康: {e}"

    status = "健康" if all(v == "健康" for v in checks.values()) else "不健康"
    return {"status": status, "checks": checks}

Anti-Patterns (FORBIDDEN)

反模式(禁止使用)

python
undefined
python
undefined

NEVER use global state

切勿使用全局状态

db_session = None # Global mutable state!
db_session = None # 全局可变状态!

NEVER block the event loop

切勿阻塞事件循环

def sync_db_query(): # Blocking in async context! return session.query(User).all()
def sync_db_query(): # 在异步上下文中阻塞! return session.query(User).all()

NEVER skip dependency injection

切勿跳过依赖注入

@router.get("/users") async def get_users(): db = create_session() # Creating session in route! return db.query(User).all()
@router.get("/users") async def get_users(): db = create_session() # 在路由中创建会话! return db.query(User).all()

NEVER ignore lifespan cleanup

切勿忽略生命周期清理

@asynccontextmanager async def lifespan(app: FastAPI): app.state.pool = create_pool() yield # Missing cleanup! Pool never closed
undefined
@asynccontextmanager async def lifespan(app: FastAPI): app.state.pool = create_pool() yield # 缺少清理!连接池从未关闭
undefined

Key Decisions

关键决策

DecisionRecommendation
LifespanUse
asynccontextmanager
(not events)
DependenciesClass-based services with DI
SettingsPydantic Settings with
.env
ResponseORJSONResponse for performance
MiddlewareOrder: CORS → RequestID → Timing → Logging
HealthCheck all critical dependencies
决策推荐方案
生命周期使用
asynccontextmanager
(而非事件)
依赖基于类的服务与依赖注入
配置结合
.env
的Pydantic Settings
响应使用ORJSONResponse提升性能
中间件顺序:CORS → RequestID → 计时 → 日志
健康检查检查所有关键依赖

Available Scripts

可用脚本

  • scripts/create-fastapi-app.md
    - Context-aware FastAPI application generator
    • Auto-detects: Python version, database type, Redis usage, project structure
    • Usage:
      /create-fastapi-app [app-name]
    • Uses
      $ARGUMENTS
      and
      !command
      for project-specific configuration
    • Generates production-ready app with detected dependencies
  • assets/fastapi-app-template.py
    - Static FastAPI application template
  • scripts/create-fastapi-app.md
    - 上下文感知的FastAPI应用生成器
    • 自动检测:Python版本、数据库类型、Redis使用情况、项目结构
    • 使用方式:
      /create-fastapi-app [app-name]
    • 利用
      $ARGUMENTS
      !command
      进行项目特定配置
    • 生成包含检测到的依赖的生产级应用
  • assets/fastapi-app-template.py
    - 静态FastAPI应用模板

Related Skills

相关技能

  • clean-architecture
    - Service layer patterns
  • database-schema-designer
    - SQLAlchemy models
  • observability-monitoring
    - Logging and metrics
  • clean-architecture
    - 服务层模式
  • database-schema-designer
    - SQLAlchemy模型
  • observability-monitoring
    - 日志与指标

Capability Details

能力详情

lifespan

lifespan

Keywords: lifespan, startup, shutdown, asynccontextmanager Solves:
  • FastAPI startup/shutdown
  • Resource management in FastAPI
关键词: lifespan, startup, shutdown, asynccontextmanager 解决问题:
  • FastAPI启动/关闭流程
  • FastAPI中的资源管理

dependencies

dependencies

Keywords: dependency injection, Depends, get_db, service dependency Solves:
  • FastAPI dependency injection patterns
  • Reusable dependencies
关键词: dependency injection, Depends, get_db, service dependency 解决问题:
  • FastAPI依赖注入模式
  • 可复用依赖

middleware

middleware

Keywords: middleware, request id, timing, cors, logging middleware Solves:
  • Custom FastAPI middleware
  • Request/response interceptors
关键词: middleware, request id, timing, cors, logging middleware 解决问题:
  • 自定义FastAPI中间件
  • 请求/响应拦截器

settings

settings

Keywords: settings, pydantic settings, env, configuration Solves:
  • FastAPI configuration management
  • Environment variables
关键词: settings, pydantic settings, env, configuration 解决问题:
  • FastAPI配置管理
  • 环境变量处理

health-checks

health-checks

Keywords: health check, readiness, liveness, health endpoint Solves:
  • Kubernetes health checks
  • Service health monitoring
关键词: health check, readiness, liveness, health endpoint 解决问题:
  • Kubernetes健康检查
  • 服务健康监控