python-services

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Python Services & CLI

Python服务与CLI

Modern Tooling

现代化工具链

ToolReplacesPurpose
uvpip, virtualenv, pyenv, pipxPackage/dependency management
ruffflake8, black, isortLinting + formatting
tymypy, pyrightType checking (Astral, faster)
  • uv init --package myproject
    for distributable packages,
    uv init
    for apps
  • uv add <pkg>
    ,
    uv add --group dev <pkg>
    , never edit pyproject.toml deps manually
  • uv run <cmd>
    instead of activating venvs -- auto-activates the venv without explicit activation
  • uv add --upgrade <pkg>
    to upgrade a single package without touching others
  • uv tree --outdated
    to preview what would be upgraded before committing
  • uv.lock
    goes in version control
  • Use
    [dependency-groups]
    (PEP 735) for dev/test/docs, not
    [project.optional-dependencies]
  • PEP 723 inline metadata for standalone scripts with deps
  • ruff check --fix . && ruff format .
    for lint+format in one pass
Standard project layout:
src/mypackage/
    __init__.py
    main.py
    services/
    models/
tests/
    conftest.py
    test_main.py
pyproject.toml
See cli-tools.md for Click patterns, argparse, and CLI project layout.
工具替代方案用途
uvpip, virtualenv, pyenv, pipx包/依赖管理
ruffflake8, black, isort代码检查 + 格式化
tymypy, pyright类型检查(Astral出品,速度更快)
  • 用于可分发包的
    uv init --package myproject
    ,用于应用的
    uv init
  • 使用
    uv add <pkg>
    uv add --group dev <pkg>
    ,请勿手动编辑pyproject.toml中的依赖
  • 使用
    uv run <cmd>
    替代虚拟环境激活——自动激活虚拟环境无需显式操作
  • 使用
    uv add --upgrade <pkg>
    升级单个包,不影响其他依赖
  • 使用
    uv tree --outdated
    在确认升级前预览待更新内容
  • uv.lock
    需纳入版本控制
  • 使用
    [dependency-groups]
    (PEP 735)管理开发/测试/文档依赖,而非
    [project.optional-dependencies]
  • 为带依赖的独立脚本使用PEP 723内联元数据
  • 使用
    ruff check --fix . && ruff format .
    一键完成代码检查与格式化
标准项目结构:
src/mypackage/
    __init__.py
    main.py
    services/
    models/
tests/
    conftest.py
    test_main.py
pyproject.toml
查看 cli-tools.md 获取Click模式、argparse及CLI项目布局相关内容。

Parallelism

并行处理

WorkloadApproach
Many concurrent I/O calls
asyncio
(gather, create_task)
CPU-bound computation
multiprocessing.Pool
or
concurrent.futures.ProcessPoolExecutor
Mixed I/O + CPU
asyncio.to_thread()
to offload blocking work
Simple scripts, few connectionsStay synchronous
工作负载实现方案
大量并发I/O调用
asyncio
(gather、create_task)
CPU密集型计算
multiprocessing.Pool
concurrent.futures.ProcessPoolExecutor
I/O与CPU混合负载
asyncio.to_thread()
卸载阻塞型任务
简单脚本、少量连接保持同步模式

Sync vs Async Decision

同步 vs 异步决策

Use async (asyncio) when:
  • I/O-bound work has multiple concurrent operations (HTTP calls, database queries, file I/O happening in parallel)
  • WebSocket servers or long-lived connections require it
  • The framework requires it (FastAPI async endpoints, aiohttp)
Stay synchronous when:
  • Work is CPU-bound (computation, data transformation) -- async adds nothing, use multiprocessing instead
  • Building simple scripts and CLI tools with sequential I/O
  • All I/O is sequential anyway (one DB query, process result, one API call)
  • The team lacks async debugging experience (asyncio stack traces are harder to read)
Rule of thumb: if the code is not waiting on multiple I/O operations concurrently, sync is simpler and correct. Do not add async complexity for a single sequential pipeline.
Key rule: Stay fully sync or fully async within a call path.
asyncio patterns:
  • asyncio.gather(*tasks)
    for concurrent I/O -- use
    return_exceptions=True
    for partial failure tolerance
  • asyncio.TaskGroup
    (3.11+) for structured concurrency -- automatic cancellation of sibling tasks on failure; prefer over
    gather
    when all tasks must succeed
  • asyncio.Semaphore(n)
    to limit concurrency (rate limiting external APIs)
  • asyncio.wait_for(coro, timeout=N)
    for timeouts
  • asyncio.Queue
    for producer-consumer
  • asyncio.Lock
    when coroutines share mutable state
  • Never block the event loop:
    asyncio.to_thread(sync_fn)
    for sync libs,
    aiohttp
    /
    httpx.AsyncClient
    for HTTP
  • Handle
    CancelledError
    -- always re-raise after cleanup
  • Async generators (
    async for
    ) for streaming/pagination
multiprocessing for CPU-bound:
python
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as pool:
    results = list(pool.map(cpu_task, items))
See fastapi.md for project structure, lifespan, config, DI, async DB, and repository pattern.
使用异步(asyncio)的场景:
  • I/O密集型工作包含多个并发操作(并行执行的HTTP调用、数据库查询、文件I/O)
  • 需要WebSocket服务器或长连接
  • 框架强制要求(FastAPI异步端点、aiohttp)
保持同步的场景:
  • 工作为CPU密集型(计算、数据转换)——异步无增益,应使用多进程
  • 构建简单脚本与顺序I/O的CLI工具
  • 所有I/O操作本身就是顺序执行的(一次数据库查询、处理结果、一次API调用)
  • 团队缺乏异步调试经验(asyncio栈追踪更难解读)
经验法则: 如果代码无需同时等待多个I/O操作,同步模式更简单且正确。不要为单一顺序流程添加异步复杂度。
核心规则: 在同一调用路径内保持完全同步或完全异步。
asyncio模式:
  • asyncio.gather(*tasks)
    用于并发I/O——使用
    return_exceptions=True
    实现部分故障容忍
  • asyncio.TaskGroup
    (Python3.11+)用于结构化并发——故障时自动取消兄弟任务;当所有任务必须成功时,优先于
    gather
    使用
  • asyncio.Semaphore(n)
    限制并发数(外部API限流)
  • asyncio.wait_for(coro, timeout=N)
    设置超时
  • asyncio.Queue
    实现生产者-消费者模式
  • asyncio.Lock
    用于协程共享可变状态
  • 切勿阻塞事件循环:为同步库使用
    asyncio.to_thread(sync_fn)
    ,为HTTP请求使用
    aiohttp
    /
    httpx.AsyncClient
  • 处理
    CancelledError
    ——清理后务必重新抛出
  • 异步生成器(
    async for
    )用于流式处理/分页
CPU密集型场景的多进程实现:
python
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as pool:
    results = list(pool.map(cpu_task, items))
查看 fastapi.md 获取项目结构、生命周期、配置、依赖注入、异步数据库及仓储模式相关内容。

Background Jobs

后台任务

  • Return job ID immediately, process async. Client polls
    /jobs/{id}
    for status
  • Celery:
    @app.task(bind=True, max_retries=3, autoretry_for=(ConnectionError,))
    -- exponential backoff:
    raise self.retry(countdown=2**self.request.retries * 60)
  • Alternatives: Dramatiq (modern Celery), RQ (simple Redis), cloud-native (SQS+Lambda, Cloud Tasks)
  • Idempotency is mandatory -- tasks may retry. Use idempotency keys for external calls, check-before-write, upsert patterns
  • Dead letter queue for permanently failed tasks after max retries
  • Task workflows:
    chain(a.s(), b.s())
    for sequential,
    group(...)
    for parallel,
    chord(group, callback)
    for fan-out/fan-in
  • 立即返回任务ID,异步处理。客户端轮询
    /jobs/{id}
    获取状态
  • Celery
    @app.task(bind=True, max_retries=3, autoretry_for=(ConnectionError,))
    ——指数退避:
    raise self.retry(countdown=2**self.request.retries * 60)
  • 替代方案:Dramatiq(现代化Celery)、RQ(轻量Redis任务队列)、云原生方案(SQS+Lambda、Cloud Tasks)
  • 幂等性是强制要求——任务可能重试。为外部调用使用幂等键、先查后写、更新插入模式
  • 为达到最大重试次数后仍永久失败的任务设置死信队列
  • 任务工作流:
    chain(a.s(), b.s())
    用于顺序执行,
    group(...)
    用于并行执行,
    chord(group, callback)
    用于扇出/扇入

Resilience

弹性设计

Retries with tenacity:
python
from tenacity import retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type

@retry(
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    stop=stop_after_attempt(5) | stop_after_delay(60),
    wait=wait_exponential_jitter(initial=1, max=30),
    before_sleep=log_retry_attempt,
)
def call_api(url: str) -> dict: ...
  • Retry only transient errors: network, 429/502/503/504. Never retry 4xx (except 429), auth errors, validation errors
  • Every network call needs a timeout
  • @fail_safe(default=[])
    decorator for non-critical paths -- return cached/default on failure
  • functools.lru_cache(maxsize=N)
    for pure-function memoization;
    functools.cache
    (unbounded) for small domains
  • Stack decorators:
    @traced @with_timeout(30) @retry(...)
    -- separate infra from business logic
Connection pooling is mandatory for production: reuse
httpx.AsyncClient()
across requests, configure SQLAlchemy
pool_size
/
max_overflow
, use
aiohttp.TCPConnector(limit=N)
.
使用tenacity实现重试:
python
from tenacity import retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type

@retry(
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    stop=stop_after_attempt(5) | stop_after_delay(60),
    wait=wait_exponential_jitter(initial=1, max=30),
    before_sleep=log_retry_attempt,
)
def call_api(url: str) -> dict: ...
  • 仅对瞬时错误重试:网络错误、429/502/503/504状态码。切勿重试4xx(除429外)、认证错误、验证错误
  • 所有网络调用必须设置超时
  • 非关键路径使用
    @fail_safe(default=[])
    装饰器——失败时返回缓存/默认值
  • 纯函数使用
    functools.lru_cache(maxsize=N)
    实现记忆化;小范围场景使用
    functools.cache
    (无界)
  • 装饰器堆叠:
    @traced @with_timeout(30) @retry(...)
    ——将基础设施逻辑与业务逻辑分离
生产环境必须使用连接池:跨请求复用
httpx.AsyncClient()
,配置SQLAlchemy的
pool_size
/
max_overflow
,使用
aiohttp.TCPConnector(limit=N)

Production Resilience

生产环境弹性

  • Fail-fast config validation: use a Pydantic
    BaseSettings
    model with
    model_validator
    to parse and validate all environment variables at startup. If invalid, crash before serving traffic. Never discover a missing secret on the first request that needs it.
  • Health endpoints: expose
    /health
    (shallow liveness -- returns 200 if the process responds) and
    /ready
    (deep readiness -- verifies database, Redis, and critical dependencies are reachable). Load balancers route traffic based on
    /ready
    ; orchestrators restart based on
    /health
    .
  • 快速失败的配置验证:使用带
    model_validator
    的Pydantic
    BaseSettings
    模型,在启动时解析并验证所有环境变量。若配置无效,在服务接收流量前直接崩溃。切勿在首次需要密钥的请求中才发现缺失。
  • 健康检查端点:暴露
    /health
    (浅度存活检查——进程响应则返回200)和
    /ready
    (深度就绪检查——验证数据库、Redis及关键依赖可达)。负载均衡器基于
    /ready
    路由流量;编排器基于
    /health
    重启服务。

Observability

可观测性

  • structlog for JSON structured logging. Configure once at startup with
    JSONRenderer
    ,
    TimeStamper
    ,
    merge_contextvars
  • Correlation IDs -- generate at ingress (
    X-Correlation-ID
    header), bind to
    contextvars
    , propagate to downstream calls
  • Log levels: DEBUG=diagnostics, INFO=operations, WARNING=anomalies handled, ERROR=failures needing attention. Never log expected behavior at ERROR
  • Prometheus metrics -- track latency (Histogram), traffic (Counter), errors (Counter), saturation (Gauge). Keep label cardinality bounded (no user IDs)
  • OpenTelemetry for distributed tracing across services
  • Never mutate
    LogRecord
    attributes from a
    Formatter
    .
    A custom
    logging.Formatter.format()
    that rewrites
    record.name
    (or any record attribute) in place leaks to every other handler attached to the same logger and to pytest
    caplog
    .
    Logger.callHandlers
    passes the same
    LogRecord
    object to each handler — whichever formats first wins the mutation, and downstream handlers and test filters see the modified state. Tests filtering by full logger name (
    if r.name == "src.services.foo"
    ) then silently miss; routing handlers doing
    LOGGER_TO_MODEL.get(record.name)
    fall through to defaults. Use a
    logging.Filter
    that adds a non-mutating attribute (
    record.short_name
    ) and reference it in the format string as
    %(short_name)s
    , or override
    formatMessage
    instead of
    format
    .
    try
    /
    finally
    restore works for synchronous handler chains but is fragile under async handlers that interleave.
  • structlog用于JSON结构化日志。启动时一次性配置
    JSONRenderer
    TimeStamper
    merge_contextvars
  • 关联ID——在入口处生成(
    X-Correlation-ID
    请求头),绑定到
    contextvars
    ,传播到下游调用
  • 日志级别:DEBUG=诊断信息,INFO=操作记录,WARNING=已处理异常,ERROR=需关注的故障。切勿将预期行为记录为ERROR级别
  • Prometheus指标——追踪延迟(Histogram)、流量(Counter)、错误(Counter)、饱和度(Gauge)。控制标签基数(避免用户ID等基数过高的标签)
  • OpenTelemetry用于跨服务分布式追踪
  • 切勿从
    Formatter
    修改
    LogRecord
    属性
    。自定义
    logging.Formatter.format()
    重写
    record.name
    (或任何记录属性)会影响同一日志器上的所有其他处理器及pytest的
    caplog
    Logger.callHandlers
    会将同一个
    LogRecord
    对象传递给每个处理器——先格式化的会修改记录状态,下游处理器和测试过滤器会看到修改后的状态。按完整日志器名称过滤的测试(
    if r.name == "src.services.foo"
    )会静默失效;基于
    LOGGER_TO_MODEL.get(record.name)
    路由的处理器会回退到默认值。使用
    logging.Filter
    添加非可变属性(
    record.short_name
    ),并在格式字符串中引用
    %(short_name)s
    ,或重写
    formatMessage
    而非
    format
    try
    /
    finally
    恢复在同步处理器链中有效,但在异步处理器交错执行时不可靠。

Discipline

开发规范

  • Simplicity first -- every change as simple as possible, impact minimal code
  • Only touch what's necessary -- avoid introducing unrelated changes
  • No hacky workarounds -- if a fix feels wrong, step back and implement the clean solution
  • Before adding a new abstraction, verify it appears in 3+ places. If not, inline it.
  • Verify: see Verify section below -- pass all checks with zero warnings before declaring done
  • Coverage target: 80%+ (
    uv run pytest --cov --cov-report=html
    )
  • 优先保证简洁性——每个变更尽可能简单,影响代码范围最小
  • 仅修改必要部分——避免引入无关变更
  • 拒绝粗糙的临时方案——如果修复感觉不对,退一步实现优雅的解决方案
  • 添加新抽象前,确认其在3个以上场景出现。否则,直接内联实现
  • 验证:参考下方验证部分——通过所有检查且零警告后再标记完成
  • 覆盖率目标:80%+(
    uv run pytest --cov --cov-report=html

Testing Patterns

测试模式

  • pytest flags:
    --lf
    (last failed),
    -x
    (stop on first failure),
    -k "pattern"
    (filter),
    --pdb
    (debugger on failure)
  • Fixtures: use
    conftest.py
    for shared fixtures. Scope wisely:
    @pytest.fixture(scope="session")
    for expensive setup (DB connections),
    scope="function"
    (default) for test isolation
  • tmp_path
    : built-in fixture for temp files -- no manual cleanup needed
  • Parametrize with IDs:
    @pytest.mark.parametrize("input,expected", [...], ids=["empty", "single", "overflow"])
    for readable test names
  • Mock discipline: always
    autospec=True
    on mocks to catch API drift.
    assert_awaited_once()
    for async mocks.
  • Test markers: register in
    pyproject.toml
    under
    [tool.pytest.ini_options]
    with
    markers = ["slow", "integration"]
    . Run fast tests with
    -m "not slow"
    .
  • Protocol duck typing: use
    class Renderable(Protocol)
    for structural typing at service boundaries -- enables testing with plain objects instead of mocks
  • Context managers:
    @contextmanager
    for connection/transaction lifecycle. Always implement
    __exit__
    cleanup.
  • pytest参数
    --lf
    (仅运行上次失败的测试),
    -x
    (首次失败即停止),
    -k "pattern"
    (过滤测试),
    --pdb
    (失败时启动调试器)
  • 夹具(Fixtures):使用
    conftest.py
    存放共享夹具。合理设置作用域:
    @pytest.fixture(scope="session")
    用于昂贵的初始化(数据库连接),
    scope="function"
    (默认)用于测试隔离
  • tmp_path
    :内置临时文件夹具——无需手动清理
  • 带ID的参数化
    @pytest.mark.parametrize("input,expected", [...], ids=["empty", "single", "overflow"])
    生成可读性强的测试名称
  • Mock规范:始终在mock上设置
    autospec=True
    以捕获API变更。异步mock使用
    assert_awaited_once()
  • 测试标记:在
    pyproject.toml
    [tool.pytest.ini_options]
    下注册
    markers = ["slow", "integration"]
    。使用
    -m "not slow"
    运行快速测试
  • 协议鸭子类型:在服务边界使用
    class Renderable(Protocol)
    实现结构类型——允许使用普通对象而非mock进行测试
  • 上下文管理器
    @contextmanager
    用于连接/事务生命周期。务必实现
    __exit__
    清理逻辑

Error Handling

错误处理

  • Validate inputs at boundaries before expensive ops. Report all errors at once when possible
  • Use specific exceptions:
    ValueError
    ,
    TypeError
    ,
    KeyError
    , not bare
    Exception
  • raise ServiceError("upload failed") from e
    -- always chain to preserve debug trail
  • Convert external data to domain types (enums, Pydantic models) at system boundaries
  • Batch processing:
    BatchResult(succeeded={}, failed={})
    -- don't let one item abort the batch
  • Pydantic
    BaseModel
    with
    field_validator
    for complex input validation
  • 在执行昂贵操作前,在边界处验证输入。尽可能一次性报告所有错误
  • 使用具体异常:
    ValueError
    TypeError
    KeyError
    ,而非裸
    Exception
  • 使用
    raise ServiceError("upload failed") from e
    ——始终链式抛出以保留调试轨迹
  • 在系统边界将外部数据转换为领域类型(枚举、Pydantic模型)
  • 批量处理:使用
    BatchResult(succeeded={}, failed={})
    ——不要因单个项失败而终止整个批次
  • 使用带
    field_validator
    的Pydantic
    BaseModel
    进行复杂输入验证

Migrations

数据库迁移

  • Separate schema and data migrations -- data backfills in their own migration file
  • Renames/removals use expand-contract: add new column → backfill → switch reads → drop old (see
    ia-postgresql
    skill for the full pattern)
  • Never edit a migration that has already run in a shared environment
  • Alembic: use
    --autogenerate
    as a starting point, always review generated SQL before committing
  • Test migrations against production-sized data -- a migration that takes 2ms on dev can lock a table for minutes in production
  • 分离 schema 迁移与数据迁移——数据回填单独放在迁移文件中
  • 重命名/删除操作使用扩展-收缩模式:添加新列 → 回填数据 → 切换读取源 → 删除旧列(查看
    ia-postgresql
    技能获取完整模式)
  • 切勿修改已在共享环境中执行过的迁移
  • Alembic:使用
    --autogenerate
    作为起点,提交前务必检查生成的SQL
  • 针对生产规模的数据测试迁移——在开发环境耗时2ms的迁移在生产环境可能锁表数分钟

API Design

API设计

  • Contract-first: define Pydantic
    BaseModel
    request/response schemas and FastAPI
    response_model
    before writing endpoint logic. The schema is the contract -- implementation follows. Generate OpenAPI docs from these models automatically.
  • Hyrum's Law awareness: every observable response field, ordering, or timing becomes a dependency for callers. Use explicit
    response_model
    and
    model_config = ConfigDict(extra="forbid")
    to control exactly what's serialized -- never return raw dicts or ORM objects from endpoints.
  • Addition over modification: add new optional fields (
    field: str | None = None
    ) rather than changing or removing existing ones. Removing a Pydantic field from a response model breaks callers silently. Deprecate first (
    Field(deprecated=True)
    ), remove in a later version.
  • Consistent error structure: all exceptions should produce the same envelope:
    {"error": {"code": "...", "message": "...", "details": ...}}
    . Register
    @app.exception_handler
    for
    RequestValidationError
    ,
    HTTPException
    , and application-specific exceptions to normalize into one format. Callers build error handling once.
  • Boundary validation via Pydantic: validate at the endpoint/handler level with Pydantic models and FastAPI's automatic request parsing. Internal services and repositories trust that input was validated at entry -- no redundant validation scattered through business logic.
  • Third-party responses are untrusted data: validate shape and content of external API responses before using them in logic, rendering, or decision-making. A compromised or misbehaving service can return unexpected types, malicious content, or missing fields. Parse through a Pydantic model before use.
  • 契约优先:先定义Pydantic
    BaseModel
    请求/响应 schema 和 FastAPI
    response_model
    ,再编写端点逻辑。Schema即契约——实现需遵循契约。自动基于这些模型生成OpenAPI文档。
  • 警惕海勒姆定律:每个可观测的响应字段、排序或时序都会成为调用方的依赖。使用显式
    response_model
    model_config = ConfigDict(extra="forbid")
    严格控制序列化内容——切勿从端点返回原始字典或ORM对象。
  • 优先新增而非修改:添加新的可选字段(
    field: str | None = None
    )而非修改或删除现有字段。从响应模型中移除Pydantic字段会静默破坏调用方。先标记弃用(
    Field(deprecated=True)
    ),在后续版本中再删除。
  • 统一错误结构:所有异常应生成相同的信封格式:
    {"error": {"code": "...", "message": "...", "details": ...}}
    。为
    RequestValidationError
    HTTPException
    及应用特定异常注册
    @app.exception_handler
    ,将其标准化为统一格式。调用方只需实现一次错误处理逻辑。
  • 通过Pydantic实现边界验证:在端点/处理器层使用Pydantic模型和FastAPI的自动请求解析进行验证。内部服务和仓储信任输入已在入口处验证——无需在业务逻辑中分散冗余验证。
  • 第三方响应为不可信数据:在逻辑、渲染或决策中使用外部API响应前,验证其结构和内容。被攻陷或行为异常的服务可能返回意外类型、恶意内容或缺失字段。使用前务必通过Pydantic模型解析。

Verify

验证

  • uv run pytest
    passes with zero failures
  • uv run ruff check .
    passes with zero warnings
  • uv run ty check .
    passes with zero errors
  • Coverage target: 80%+ (
    uv run pytest --cov
    )
  • uv run pytest
    零失败通过
  • uv run ruff check .
    零警告通过
  • uv run ty check .
    零错误通过
  • 覆盖率目标:80%+(
    uv run pytest --cov