python-services
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePython Services & CLI
Python服务与CLI
Modern Tooling
现代化工具链
| Tool | Replaces | Purpose |
|---|---|---|
| uv | pip, virtualenv, pyenv, pipx | Package/dependency management |
| ruff | flake8, black, isort | Linting + formatting |
| ty | mypy, pyright | Type checking (Astral, faster) |
- for distributable packages,
uv init --package myprojectfor appsuv init - ,
uv add <pkg>, never edit pyproject.toml deps manuallyuv add --group dev <pkg> - instead of activating venvs -- auto-activates the venv without explicit activation
uv run <cmd> - to upgrade a single package without touching others
uv add --upgrade <pkg> - to preview what would be upgraded before committing
uv tree --outdated - goes in version control
uv.lock - Use (PEP 735) for dev/test/docs, not
[dependency-groups][project.optional-dependencies] - PEP 723 inline metadata for standalone scripts with deps
- for lint+format in one pass
ruff check --fix . && ruff format .
Standard project layout:
src/mypackage/
__init__.py
main.py
services/
models/
tests/
conftest.py
test_main.py
pyproject.tomlSee cli-tools.md for Click patterns, argparse, and CLI project layout.
| 工具 | 替代方案 | 用途 |
|---|---|---|
| uv | pip, virtualenv, pyenv, pipx | 包/依赖管理 |
| ruff | flake8, black, isort | 代码检查 + 格式化 |
| ty | mypy, pyright | 类型检查(Astral出品,速度更快) |
- 用于可分发包的,用于应用的
uv init --package myprojectuv init - 使用、
uv add <pkg>,请勿手动编辑pyproject.toml中的依赖uv add --group dev <pkg> - 使用替代虚拟环境激活——自动激活虚拟环境无需显式操作
uv run <cmd> - 使用升级单个包,不影响其他依赖
uv add --upgrade <pkg> - 使用在确认升级前预览待更新内容
uv tree --outdated - 需纳入版本控制
uv.lock - 使用(PEP 735)管理开发/测试/文档依赖,而非
[dependency-groups][project.optional-dependencies] - 为带依赖的独立脚本使用PEP 723内联元数据
- 使用一键完成代码检查与格式化
ruff check --fix . && ruff format .
标准项目结构:
src/mypackage/
__init__.py
main.py
services/
models/
tests/
conftest.py
test_main.py
pyproject.toml查看 cli-tools.md 获取Click模式、argparse及CLI项目布局相关内容。
Parallelism
并行处理
| Workload | Approach |
|---|---|
| Many concurrent I/O calls | |
| CPU-bound computation | |
| Mixed I/O + CPU | |
| Simple scripts, few connections | Stay synchronous |
| 工作负载 | 实现方案 |
|---|---|
| 大量并发I/O调用 | |
| CPU密集型计算 | |
| I/O与CPU混合负载 | |
| 简单脚本、少量连接 | 保持同步模式 |
Sync vs Async Decision
同步 vs 异步决策
Use async (asyncio) when:
- I/O-bound work has multiple concurrent operations (HTTP calls, database queries, file I/O happening in parallel)
- WebSocket servers or long-lived connections require it
- The framework requires it (FastAPI async endpoints, aiohttp)
Stay synchronous when:
- Work is CPU-bound (computation, data transformation) -- async adds nothing, use multiprocessing instead
- Building simple scripts and CLI tools with sequential I/O
- All I/O is sequential anyway (one DB query, process result, one API call)
- The team lacks async debugging experience (asyncio stack traces are harder to read)
Rule of thumb: if the code is not waiting on multiple I/O operations concurrently, sync is simpler and correct. Do not add async complexity for a single sequential pipeline.
Key rule: Stay fully sync or fully async within a call path.
asyncio patterns:
- for concurrent I/O -- use
asyncio.gather(*tasks)for partial failure tolerancereturn_exceptions=True - (3.11+) for structured concurrency -- automatic cancellation of sibling tasks on failure; prefer over
asyncio.TaskGroupwhen all tasks must succeedgather - to limit concurrency (rate limiting external APIs)
asyncio.Semaphore(n) - for timeouts
asyncio.wait_for(coro, timeout=N) - for producer-consumer
asyncio.Queue - when coroutines share mutable state
asyncio.Lock - Never block the event loop: for sync libs,
asyncio.to_thread(sync_fn)/aiohttpfor HTTPhttpx.AsyncClient - Handle -- always re-raise after cleanup
CancelledError - Async generators () for streaming/pagination
async for
multiprocessing for CPU-bound:
python
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as pool:
results = list(pool.map(cpu_task, items))See fastapi.md for project structure, lifespan, config, DI, async DB, and repository pattern.
使用异步(asyncio)的场景:
- I/O密集型工作包含多个并发操作(并行执行的HTTP调用、数据库查询、文件I/O)
- 需要WebSocket服务器或长连接
- 框架强制要求(FastAPI异步端点、aiohttp)
保持同步的场景:
- 工作为CPU密集型(计算、数据转换)——异步无增益,应使用多进程
- 构建简单脚本与顺序I/O的CLI工具
- 所有I/O操作本身就是顺序执行的(一次数据库查询、处理结果、一次API调用)
- 团队缺乏异步调试经验(asyncio栈追踪更难解读)
经验法则: 如果代码无需同时等待多个I/O操作,同步模式更简单且正确。不要为单一顺序流程添加异步复杂度。
核心规则: 在同一调用路径内保持完全同步或完全异步。
asyncio模式:
- 用于并发I/O——使用
asyncio.gather(*tasks)实现部分故障容忍return_exceptions=True - (Python3.11+)用于结构化并发——故障时自动取消兄弟任务;当所有任务必须成功时,优先于
asyncio.TaskGroup使用gather - 限制并发数(外部API限流)
asyncio.Semaphore(n) - 设置超时
asyncio.wait_for(coro, timeout=N) - 实现生产者-消费者模式
asyncio.Queue - 用于协程共享可变状态
asyncio.Lock - 切勿阻塞事件循环:为同步库使用,为HTTP请求使用
asyncio.to_thread(sync_fn)/aiohttphttpx.AsyncClient - 处理——清理后务必重新抛出
CancelledError - 异步生成器()用于流式处理/分页
async for
CPU密集型场景的多进程实现:
python
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as pool:
results = list(pool.map(cpu_task, items))查看 fastapi.md 获取项目结构、生命周期、配置、依赖注入、异步数据库及仓储模式相关内容。
Background Jobs
后台任务
- Return job ID immediately, process async. Client polls for status
/jobs/{id} - Celery: -- exponential backoff:
@app.task(bind=True, max_retries=3, autoretry_for=(ConnectionError,))raise self.retry(countdown=2**self.request.retries * 60) - Alternatives: Dramatiq (modern Celery), RQ (simple Redis), cloud-native (SQS+Lambda, Cloud Tasks)
- Idempotency is mandatory -- tasks may retry. Use idempotency keys for external calls, check-before-write, upsert patterns
- Dead letter queue for permanently failed tasks after max retries
- Task workflows: for sequential,
chain(a.s(), b.s())for parallel,group(...)for fan-out/fan-inchord(group, callback)
- 立即返回任务ID,异步处理。客户端轮询获取状态
/jobs/{id} - Celery:——指数退避:
@app.task(bind=True, max_retries=3, autoretry_for=(ConnectionError,))raise self.retry(countdown=2**self.request.retries * 60) - 替代方案:Dramatiq(现代化Celery)、RQ(轻量Redis任务队列)、云原生方案(SQS+Lambda、Cloud Tasks)
- 幂等性是强制要求——任务可能重试。为外部调用使用幂等键、先查后写、更新插入模式
- 为达到最大重试次数后仍永久失败的任务设置死信队列
- 任务工作流:用于顺序执行,
chain(a.s(), b.s())用于并行执行,group(...)用于扇出/扇入chord(group, callback)
Resilience
弹性设计
Retries with tenacity:
python
from tenacity import retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type
@retry(
retry=retry_if_exception_type((ConnectionError, TimeoutError)),
stop=stop_after_attempt(5) | stop_after_delay(60),
wait=wait_exponential_jitter(initial=1, max=30),
before_sleep=log_retry_attempt,
)
def call_api(url: str) -> dict: ...- Retry only transient errors: network, 429/502/503/504. Never retry 4xx (except 429), auth errors, validation errors
- Every network call needs a timeout
- decorator for non-critical paths -- return cached/default on failure
@fail_safe(default=[]) - for pure-function memoization;
functools.lru_cache(maxsize=N)(unbounded) for small domainsfunctools.cache - Stack decorators: -- separate infra from business logic
@traced @with_timeout(30) @retry(...)
Connection pooling is mandatory for production: reuse across requests, configure SQLAlchemy /, use .
httpx.AsyncClient()pool_sizemax_overflowaiohttp.TCPConnector(limit=N)使用tenacity实现重试:
python
from tenacity import retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type
@retry(
retry=retry_if_exception_type((ConnectionError, TimeoutError)),
stop=stop_after_attempt(5) | stop_after_delay(60),
wait=wait_exponential_jitter(initial=1, max=30),
before_sleep=log_retry_attempt,
)
def call_api(url: str) -> dict: ...- 仅对瞬时错误重试:网络错误、429/502/503/504状态码。切勿重试4xx(除429外)、认证错误、验证错误
- 所有网络调用必须设置超时
- 非关键路径使用装饰器——失败时返回缓存/默认值
@fail_safe(default=[]) - 纯函数使用实现记忆化;小范围场景使用
functools.lru_cache(maxsize=N)(无界)functools.cache - 装饰器堆叠:——将基础设施逻辑与业务逻辑分离
@traced @with_timeout(30) @retry(...)
生产环境必须使用连接池:跨请求复用,配置SQLAlchemy的/,使用。
httpx.AsyncClient()pool_sizemax_overflowaiohttp.TCPConnector(limit=N)Production Resilience
生产环境弹性
- Fail-fast config validation: use a Pydantic model with
BaseSettingsto parse and validate all environment variables at startup. If invalid, crash before serving traffic. Never discover a missing secret on the first request that needs it.model_validator - Health endpoints: expose (shallow liveness -- returns 200 if the process responds) and
/health(deep readiness -- verifies database, Redis, and critical dependencies are reachable). Load balancers route traffic based on/ready; orchestrators restart based on/ready./health
- 快速失败的配置验证:使用带的Pydantic
model_validator模型,在启动时解析并验证所有环境变量。若配置无效,在服务接收流量前直接崩溃。切勿在首次需要密钥的请求中才发现缺失。BaseSettings - 健康检查端点:暴露(浅度存活检查——进程响应则返回200)和
/health(深度就绪检查——验证数据库、Redis及关键依赖可达)。负载均衡器基于/ready路由流量;编排器基于/ready重启服务。/health
Observability
可观测性
- structlog for JSON structured logging. Configure once at startup with ,
JSONRenderer,TimeStampermerge_contextvars - Correlation IDs -- generate at ingress (header), bind to
X-Correlation-ID, propagate to downstream callscontextvars - Log levels: DEBUG=diagnostics, INFO=operations, WARNING=anomalies handled, ERROR=failures needing attention. Never log expected behavior at ERROR
- Prometheus metrics -- track latency (Histogram), traffic (Counter), errors (Counter), saturation (Gauge). Keep label cardinality bounded (no user IDs)
- OpenTelemetry for distributed tracing across services
- Never mutate attributes from a
LogRecord. A customFormatterthat rewriteslogging.Formatter.format()(or any record attribute) in place leaks to every other handler attached to the same logger and to pytestrecord.name.caplogpasses the sameLogger.callHandlersobject to each handler — whichever formats first wins the mutation, and downstream handlers and test filters see the modified state. Tests filtering by full logger name (LogRecord) then silently miss; routing handlers doingif r.name == "src.services.foo"fall through to defaults. Use aLOGGER_TO_MODEL.get(record.name)that adds a non-mutating attribute (logging.Filter) and reference it in the format string asrecord.short_name, or override%(short_name)sinstead offormatMessage.format/tryrestore works for synchronous handler chains but is fragile under async handlers that interleave.finally
- structlog用于JSON结构化日志。启动时一次性配置、
JSONRenderer、TimeStampermerge_contextvars - 关联ID——在入口处生成(请求头),绑定到
X-Correlation-ID,传播到下游调用contextvars - 日志级别:DEBUG=诊断信息,INFO=操作记录,WARNING=已处理异常,ERROR=需关注的故障。切勿将预期行为记录为ERROR级别
- Prometheus指标——追踪延迟(Histogram)、流量(Counter)、错误(Counter)、饱和度(Gauge)。控制标签基数(避免用户ID等基数过高的标签)
- OpenTelemetry用于跨服务分布式追踪
- 切勿从修改
Formatter属性。自定义LogRecord重写logging.Formatter.format()(或任何记录属性)会影响同一日志器上的所有其他处理器及pytest的record.name。caplog会将同一个Logger.callHandlers对象传递给每个处理器——先格式化的会修改记录状态,下游处理器和测试过滤器会看到修改后的状态。按完整日志器名称过滤的测试(LogRecord)会静默失效;基于if r.name == "src.services.foo"路由的处理器会回退到默认值。使用LOGGER_TO_MODEL.get(record.name)添加非可变属性(logging.Filter),并在格式字符串中引用record.short_name,或重写%(short_name)s而非formatMessage。format/try恢复在同步处理器链中有效,但在异步处理器交错执行时不可靠。finally
Discipline
开发规范
- Simplicity first -- every change as simple as possible, impact minimal code
- Only touch what's necessary -- avoid introducing unrelated changes
- No hacky workarounds -- if a fix feels wrong, step back and implement the clean solution
- Before adding a new abstraction, verify it appears in 3+ places. If not, inline it.
- Verify: see Verify section below -- pass all checks with zero warnings before declaring done
- Coverage target: 80%+ ()
uv run pytest --cov --cov-report=html
- 优先保证简洁性——每个变更尽可能简单,影响代码范围最小
- 仅修改必要部分——避免引入无关变更
- 拒绝粗糙的临时方案——如果修复感觉不对,退一步实现优雅的解决方案
- 添加新抽象前,确认其在3个以上场景出现。否则,直接内联实现
- 验证:参考下方验证部分——通过所有检查且零警告后再标记完成
- 覆盖率目标:80%+()
uv run pytest --cov --cov-report=html
Testing Patterns
测试模式
- pytest flags: (last failed),
--lf(stop on first failure),-x(filter),-k "pattern"(debugger on failure)--pdb - Fixtures: use for shared fixtures. Scope wisely:
conftest.pyfor expensive setup (DB connections),@pytest.fixture(scope="session")(default) for test isolationscope="function" - : built-in fixture for temp files -- no manual cleanup needed
tmp_path - Parametrize with IDs: for readable test names
@pytest.mark.parametrize("input,expected", [...], ids=["empty", "single", "overflow"]) - Mock discipline: always on mocks to catch API drift.
autospec=Truefor async mocks.assert_awaited_once() - Test markers: register in under
pyproject.tomlwith[tool.pytest.ini_options]. Run fast tests withmarkers = ["slow", "integration"].-m "not slow" - Protocol duck typing: use for structural typing at service boundaries -- enables testing with plain objects instead of mocks
class Renderable(Protocol) - Context managers: for connection/transaction lifecycle. Always implement
@contextmanagercleanup.__exit__
- pytest参数:(仅运行上次失败的测试),
--lf(首次失败即停止),-x(过滤测试),-k "pattern"(失败时启动调试器)--pdb - 夹具(Fixtures):使用存放共享夹具。合理设置作用域:
conftest.py用于昂贵的初始化(数据库连接),@pytest.fixture(scope="session")(默认)用于测试隔离scope="function" - :内置临时文件夹具——无需手动清理
tmp_path - 带ID的参数化:生成可读性强的测试名称
@pytest.mark.parametrize("input,expected", [...], ids=["empty", "single", "overflow"]) - Mock规范:始终在mock上设置以捕获API变更。异步mock使用
autospec=Trueassert_awaited_once() - 测试标记:在的
pyproject.toml下注册[tool.pytest.ini_options]。使用markers = ["slow", "integration"]运行快速测试-m "not slow" - 协议鸭子类型:在服务边界使用实现结构类型——允许使用普通对象而非mock进行测试
class Renderable(Protocol) - 上下文管理器:用于连接/事务生命周期。务必实现
@contextmanager清理逻辑__exit__
Error Handling
错误处理
- Validate inputs at boundaries before expensive ops. Report all errors at once when possible
- Use specific exceptions: ,
ValueError,TypeError, not bareKeyErrorException - -- always chain to preserve debug trail
raise ServiceError("upload failed") from e - Convert external data to domain types (enums, Pydantic models) at system boundaries
- Batch processing: -- don't let one item abort the batch
BatchResult(succeeded={}, failed={}) - Pydantic with
BaseModelfor complex input validationfield_validator
- 在执行昂贵操作前,在边界处验证输入。尽可能一次性报告所有错误
- 使用具体异常:、
ValueError、TypeError,而非裸KeyErrorException - 使用——始终链式抛出以保留调试轨迹
raise ServiceError("upload failed") from e - 在系统边界将外部数据转换为领域类型(枚举、Pydantic模型)
- 批量处理:使用——不要因单个项失败而终止整个批次
BatchResult(succeeded={}, failed={}) - 使用带的Pydantic
field_validator进行复杂输入验证BaseModel
Migrations
数据库迁移
- Separate schema and data migrations -- data backfills in their own migration file
- Renames/removals use expand-contract: add new column → backfill → switch reads → drop old (see skill for the full pattern)
ia-postgresql - Never edit a migration that has already run in a shared environment
- Alembic: use as a starting point, always review generated SQL before committing
--autogenerate - Test migrations against production-sized data -- a migration that takes 2ms on dev can lock a table for minutes in production
- 分离 schema 迁移与数据迁移——数据回填单独放在迁移文件中
- 重命名/删除操作使用扩展-收缩模式:添加新列 → 回填数据 → 切换读取源 → 删除旧列(查看技能获取完整模式)
ia-postgresql - 切勿修改已在共享环境中执行过的迁移
- Alembic:使用作为起点,提交前务必检查生成的SQL
--autogenerate - 针对生产规模的数据测试迁移——在开发环境耗时2ms的迁移在生产环境可能锁表数分钟
API Design
API设计
- Contract-first: define Pydantic request/response schemas and FastAPI
BaseModelbefore writing endpoint logic. The schema is the contract -- implementation follows. Generate OpenAPI docs from these models automatically.response_model - Hyrum's Law awareness: every observable response field, ordering, or timing becomes a dependency for callers. Use explicit and
response_modelto control exactly what's serialized -- never return raw dicts or ORM objects from endpoints.model_config = ConfigDict(extra="forbid") - Addition over modification: add new optional fields () rather than changing or removing existing ones. Removing a Pydantic field from a response model breaks callers silently. Deprecate first (
field: str | None = None), remove in a later version.Field(deprecated=True) - Consistent error structure: all exceptions should produce the same envelope: . Register
{"error": {"code": "...", "message": "...", "details": ...}}for@app.exception_handler,RequestValidationError, and application-specific exceptions to normalize into one format. Callers build error handling once.HTTPException - Boundary validation via Pydantic: validate at the endpoint/handler level with Pydantic models and FastAPI's automatic request parsing. Internal services and repositories trust that input was validated at entry -- no redundant validation scattered through business logic.
- Third-party responses are untrusted data: validate shape and content of external API responses before using them in logic, rendering, or decision-making. A compromised or misbehaving service can return unexpected types, malicious content, or missing fields. Parse through a Pydantic model before use.
- 契约优先:先定义Pydantic 请求/响应 schema 和 FastAPI
BaseModel,再编写端点逻辑。Schema即契约——实现需遵循契约。自动基于这些模型生成OpenAPI文档。response_model - 警惕海勒姆定律:每个可观测的响应字段、排序或时序都会成为调用方的依赖。使用显式和
response_model严格控制序列化内容——切勿从端点返回原始字典或ORM对象。model_config = ConfigDict(extra="forbid") - 优先新增而非修改:添加新的可选字段()而非修改或删除现有字段。从响应模型中移除Pydantic字段会静默破坏调用方。先标记弃用(
field: str | None = None),在后续版本中再删除。Field(deprecated=True) - 统一错误结构:所有异常应生成相同的信封格式:。为
{"error": {"code": "...", "message": "...", "details": ...}}、RequestValidationError及应用特定异常注册HTTPException,将其标准化为统一格式。调用方只需实现一次错误处理逻辑。@app.exception_handler - 通过Pydantic实现边界验证:在端点/处理器层使用Pydantic模型和FastAPI的自动请求解析进行验证。内部服务和仓储信任输入已在入口处验证——无需在业务逻辑中分散冗余验证。
- 第三方响应为不可信数据:在逻辑、渲染或决策中使用外部API响应前,验证其结构和内容。被攻陷或行为异常的服务可能返回意外类型、恶意内容或缺失字段。使用前务必通过Pydantic模型解析。
Verify
验证
- passes with zero failures
uv run pytest - passes with zero warnings
uv run ruff check . - passes with zero errors
uv run ty check . - Coverage target: 80%+ ()
uv run pytest --cov
- 零失败通过
uv run pytest - 零警告通过
uv run ruff check . - 零错误通过
uv run ty check . - 覆盖率目标:80%+()
uv run pytest --cov