python-backend
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePython Backend
Python后端
Patterns for building production Python backends with asyncio, FastAPI, SQLAlchemy 2.0, and connection pooling. Each category has individual rule files in loaded on-demand.
rules/用于构建基于asyncio、FastAPI、SQLAlchemy 2.0和连接池的生产级Python后端的模式。每个分类在目录下都有独立的规则文件,可按需加载。
rules/Quick Reference
快速参考
| Category | Rules | Impact | When to Use |
|---|---|---|---|
| Asyncio | 3 | HIGH | TaskGroup, structured concurrency, cancellation handling |
| FastAPI | 3 | HIGH | Dependencies, middleware, background tasks |
| SQLAlchemy | 3 | HIGH | Async sessions, relationships, migrations |
| Pooling | 3 | MEDIUM | Database pools, HTTP sessions, tuning |
Total: 12 rules across 4 categories
| 分类 | 规则数量 | 影响级别 | 使用场景 |
|---|---|---|---|
| Asyncio | 3 | 高 | TaskGroup、结构化并发、取消处理 |
| FastAPI | 3 | 高 | 依赖项、中间件、后台任务 |
| SQLAlchemy | 3 | 高 | 异步会话、关系映射、迁移 |
| Pooling | 3 | 中 | 数据库池、HTTP会话、调优 |
总计:4个分类共12条规则
Quick Start
快速开始
python
undefinedpython
undefinedFastAPI + SQLAlchemy async session
FastAPI + SQLAlchemy 异步会话
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@router.get("/users/{user_id}")
async def get_user(user_id: UUID, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
```pythonasync def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@router.get("/users/{user_id}")
async def get_user(user_id: UUID, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
```pythonAsyncio TaskGroup with timeout
带超时的Asyncio TaskGroup
async def fetch_all(urls: list[str]) -> list[dict]:
async with asyncio.timeout(30):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_url(url)) for url in urls]
return [t.result() for t in tasks]
undefinedasync def fetch_all(urls: list[str]) -> list[dict]:
async with asyncio.timeout(30):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_url(url)) for url in urls]
return [t.result() for t in tasks]
undefinedAsyncio
Asyncio
Modern Python asyncio patterns using structured concurrency, TaskGroup, and Python 3.11+ features.
基于Python 3.11+特性的现代asyncio模式,使用结构化并发、TaskGroup等。
Key Patterns
核心模式
- TaskGroup replaces with structured concurrency and auto-cancellation
gather() - context manager for composable timeouts
asyncio.timeout() - Semaphore for concurrency limiting (rate-limit HTTP requests)
- with ExceptionGroup for handling multiple task failures
except* - for bridging sync code to async
asyncio.to_thread()
- TaskGroup 替代,支持结构化并发和自动取消
gather() - 上下文管理器,用于可组合的超时控制
asyncio.timeout() - Semaphore 用于并发限制(如HTTP请求限流)
- 结合ExceptionGroup处理多任务失败
except* - 用于同步代码到异步代码的桥接
asyncio.to_thread()
Key Decisions
核心决策
| Decision | Recommendation |
|---|---|
| Task spawning | TaskGroup not gather() |
| Timeouts | asyncio.timeout() context manager |
| Concurrency limit | asyncio.Semaphore |
| Sync bridge | asyncio.to_thread() |
| Cancellation | Always re-raise CancelledError |
| 决策项 | 推荐方案 |
|---|---|
| 任务生成 | 使用TaskGroup而非gather() |
| 超时控制 | 使用asyncio.timeout()上下文管理器 |
| 并发限制 | 使用asyncio.Semaphore |
| 同步桥接 | 使用asyncio.to_thread() |
| 取消处理 | 始终重新抛出CancelledError |
FastAPI
FastAPI
Production-ready FastAPI patterns for lifespan, dependencies, middleware, and settings.
用于生产环境的FastAPI模式,涵盖生命周期、依赖项、中间件和配置。
Key Patterns
核心模式
- Lifespan with for startup/shutdown resource management
asynccontextmanager - Dependency injection with class-based services and
Depends() - Middleware stack: CORS -> RequestID -> Timing -> Logging
- Pydantic Settings with and field validation
.env - Exception handlers with RFC 7807 Problem Details
- Lifespan 结合用于启动/关闭资源管理
asynccontextmanager - 依赖注入 基于类的服务和
Depends() - 中间件栈:CORS -> RequestID -> 计时 -> 日志
- Pydantic Settings 结合文件和字段验证
.env - 异常处理器 遵循RFC 7807问题详情规范
Key Decisions
核心决策
| Decision | Recommendation |
|---|---|
| Lifespan | asynccontextmanager (not events) |
| Dependencies | Class-based services with DI |
| Settings | Pydantic Settings with .env |
| Response | ORJSONResponse for performance |
| Health | Check all critical dependencies |
| 决策项 | 推荐方案 |
|---|---|
| 生命周期 | 使用asynccontextmanager(而非事件) |
| 依赖项 | 基于类的服务结合DI |
| 配置 | 使用Pydantic Settings和.env文件 |
| 响应 | 使用ORJSONResponse提升性能 |
| 健康检查 | 检查所有关键依赖项 |
SQLAlchemy
SQLAlchemy
Async database patterns with SQLAlchemy 2.0, AsyncSession, and FastAPI integration.
基于SQLAlchemy 2.0、AsyncSession并与FastAPI集成的异步数据库模式。
Key Patterns
核心模式
- One AsyncSession per request with
expire_on_commit=False - on relationships to prevent accidental N+1 queries
lazy="raise" - for eager loading collections
selectinload - Repository pattern with generic async CRUD
- Bulk inserts chunked 1000-10000 rows for memory management
- 每个请求一个AsyncSession,设置
expire_on_commit=False - 关系映射使用****,防止意外的N+1查询
lazy="raise" - 用于预加载集合
selectinload - 仓库模式 结合通用异步CRUD
- 批量插入 按1000-10000行分块,优化内存管理
Key Decisions
核心决策
| Decision | Recommendation |
|---|---|
| Session scope | One AsyncSession per request |
| Lazy loading | lazy="raise" + explicit loads |
| Eager loading | selectinload for collections |
| expire_on_commit | False (prevents lazy load errors) |
| Pool | pool_pre_ping=True |
| 决策项 | 推荐方案 |
|---|---|
| 会话作用域 | 每个请求一个AsyncSession |
| 延迟加载 | lazy="raise" + 显式加载 |
| 预加载 | 集合使用selectinload |
| expire_on_commit | 设置为False(避免延迟加载错误) |
| 连接池 | 设置pool_pre_ping=True |
Pooling
连接池
Database and HTTP connection pooling for high-performance async Python applications.
用于高性能异步Python应用的数据库和HTTP连接池。
Key Patterns
核心模式
- SQLAlchemy pool with ,
pool_size,max_overflowpool_pre_ping - Direct asyncpg pool with /
min_sizeand connection lifecyclemax_size - aiohttp session with limits and DNS caching
TCPConnector - FastAPI lifespan creating and closing pools at startup/shutdown
- Pool monitoring with Prometheus metrics
- SQLAlchemy连接池 配置、
pool_size、max_overflowpool_pre_ping - 直接使用asyncpg连接池 配置/
min_size和连接生命周期max_size - aiohttp会话 结合限制和DNS缓存
TCPConnector - FastAPI Lifespan 在启动时创建连接池,关闭时销毁
- 连接池监控 使用Prometheus指标
Pool Sizing Formula
连接池大小计算公式
pool_size = (concurrent_requests / avg_queries_per_request) * 1.5pool_size = (并发请求数 / 平均每个请求的查询数) * 1.5Anti-Patterns (FORBIDDEN)
反模式(禁止使用)
python
undefinedpython
undefinedNEVER use gather() for new code - no structured concurrency
新代码中绝对不要使用gather() - 无结构化并发
NEVER swallow CancelledError - breaks TaskGroup and timeout
绝对不要吞掉CancelledError - 会破坏TaskGroup和超时机制
NEVER block the event loop with sync calls (time.sleep, requests.get)
绝对不要用同步调用阻塞事件循环(如time.sleep、requests.get)
NEVER use global mutable state for db sessions
绝对不要用全局可变状态存储数据库会话
NEVER skip dependency injection (create sessions in routes)
绝对不要跳过依赖注入(在路由中直接创建会话)
NEVER share AsyncSession across tasks (race condition)
绝对不要在多任务间共享AsyncSession(会出现竞态条件)
NEVER use sync Session in async code (blocks event loop)
绝对不要在异步代码中使用同步Session(阻塞事件循环)
NEVER create engine/pool per request
绝对不要每个请求创建引擎/连接池
NEVER forget to close pools on shutdown
绝对不要在关闭时忘记销毁连接池
undefinedundefinedRelated Skills
相关技能
- - Clean architecture and layer separation
architecture-patterns - - Celery/ARQ for background processing
async-jobs - - SSE/WebSocket async patterns
streaming-api-patterns - - Database schema design
database-patterns
- - 整洁架构和分层设计
architecture-patterns - - 用于后台处理的Celery/ARQ
async-jobs - - SSE/WebSocket异步模式
streaming-api-patterns - - 数据库 schema 设计
database-patterns