python-observability
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePython Observability
Python可观测性
Instrument Python applications with structured logs, metrics, and traces. When something breaks in production, you need to answer "what, where, and why" without deploying new code.
为Python应用添加结构化日志、指标和追踪功能。当生产环境出现问题时,你无需部署新代码就能回答"发生了什么、在哪里发生、为什么发生"。
When to Use This Skill
何时使用本技能
- Adding structured logging to applications
- Implementing metrics collection with Prometheus
- Setting up distributed tracing across services
- Propagating correlation IDs through request chains
- Debugging production issues
- Building observability dashboards
- 为应用添加结构化日志
- 使用Prometheus实现指标收集
- 在服务间设置分布式追踪
- 在请求链中传播关联ID(Correlation IDs)
- 调试生产环境问题
- 构建可观测性仪表板
Core Concepts
核心概念
1. Structured Logging
1. 结构化日志
Emit logs as JSON with consistent fields for production environments. Machine-readable logs enable powerful queries and alerts. For local development, consider human-readable formats.
在生产环境中以JSON格式输出带有一致字段的日志。机器可读的日志支持强大的查询和告警功能。在本地开发时,可以使用人类可读的格式。
2. The Four Golden Signals
2. 四大黄金信号
Track latency, traffic, errors, and saturation for every service boundary.
为每个服务边界跟踪延迟、流量、错误和饱和度这四个指标。
3. Correlation IDs
3. 关联ID(Correlation IDs)
Thread a unique ID through all logs and spans for a single request, enabling end-to-end tracing.
为单个请求在所有日志和追踪跨度(spans)中传递唯一ID,实现端到端追踪。
4. Bounded Cardinality
4. 有限基数(Bounded Cardinality)
Keep metric label values bounded. Unbounded labels (like user IDs) explode storage costs.
保持指标标签值的数量有限。无限制的标签(如用户ID)会导致存储成本激增。
Quick Start
快速开始
python
import structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
)
logger = structlog.get_logger()
logger.info("Request processed", user_id="123", duration_ms=45)python
import structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
)
logger = structlog.get_logger()
logger.info("Request processed", user_id="123", duration_ms=45)Fundamental Patterns
基础模式
Pattern 1: Structured Logging with Structlog
模式1:使用Structlog实现结构化日志
Configure structlog for JSON output with consistent fields.
python
import logging
import structlog
def configure_logging(log_level: str = "INFO") -> None:
"""Configure structured logging for the application."""
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, log_level.upper())
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)配置structlog以输出带有一致字段的JSON格式日志。
python
import logging
import structlog
def configure_logging(log_level: str = "INFO") -> None:
"""Configure structured logging for the application."""
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, log_level.upper())
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)Initialize at application startup
Initialize at application startup
configure_logging("INFO")
logger = structlog.get_logger()
undefinedconfigure_logging("INFO")
logger = structlog.get_logger()
undefinedPattern 2: Consistent Log Fields
模式2:一致的日志字段
Every log entry should include standard fields for filtering and correlation.
python
import structlog
from contextvars import ContextVar每条日志条目都应包含标准字段,以便过滤和关联。
python
import structlog
from contextvars import ContextVarStore correlation ID in context
Store correlation ID in context
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
logger = structlog.get_logger()
def process_request(request: Request) -> Response:
"""Process request with structured logging."""
logger.info(
"Request received",
correlation_id=correlation_id.get(),
method=request.method,
path=request.path,
user_id=request.user_id,
)
try:
result = handle_request(request)
logger.info(
"Request completed",
correlation_id=correlation_id.get(),
status_code=200,
duration_ms=elapsed,
)
return result
except Exception as e:
logger.error(
"Request failed",
correlation_id=correlation_id.get(),
error_type=type(e).__name__,
error_message=str(e),
)
raiseundefinedcorrelation_id: ContextVar[str] = ContextVar("correlation_id", default="")
logger = structlog.get_logger()
def process_request(request: Request) -> Response:
"""Process request with structured logging."""
logger.info(
"Request received",
correlation_id=correlation_id.get(),
method=request.method,
path=request.path,
user_id=request.user_id,
)
try:
result = handle_request(request)
logger.info(
"Request completed",
correlation_id=correlation_id.get(),
status_code=200,
duration_ms=elapsed,
)
return result
except Exception as e:
logger.error(
"Request failed",
correlation_id=correlation_id.get(),
error_type=type(e).__name__,
error_message=str(e),
)
raiseundefinedPattern 3: Semantic Log Levels
模式3:语义化日志级别
Use log levels consistently across the application.
| Level | Purpose | Examples |
|---|---|---|
| Development diagnostics | Variable values, internal state |
| Request lifecycle, operations | Request start/end, job completion |
| Recoverable anomalies | Retry attempts, fallback used |
| Failures needing attention | Exceptions, service unavailable |
python
undefined在整个应用中一致地使用日志级别。
| 级别 | 用途 | 示例 |
|---|---|---|
| 开发阶段诊断 | 变量值、内部状态 |
| 请求生命周期、常规操作 | 请求开始/结束、任务完成 |
| 可恢复的异常情况 | 重试尝试、使用降级方案 |
| 需要关注的故障 | 异常、服务不可用 |
python
undefinedDEBUG: Detailed internal information
DEBUG: Detailed internal information
logger.debug("Cache lookup", key=cache_key, hit=cache_hit)
logger.debug("Cache lookup", key=cache_key, hit=cache_hit)
INFO: Normal operational events
INFO: Normal operational events
logger.info("Order created", order_id=order.id, total=order.total)
logger.info("Order created", order_id=order.id, total=order.total)
WARNING: Abnormal but handled situations
WARNING: Abnormal but handled situations
logger.warning(
"Rate limit approaching",
current_rate=950,
limit=1000,
reset_seconds=30,
)
logger.warning(
"Rate limit approaching",
current_rate=950,
limit=1000,
reset_seconds=30,
)
ERROR: Failures requiring investigation
ERROR: Failures requiring investigation
logger.error(
"Payment processing failed",
order_id=order.id,
error=str(e),
payment_provider="stripe",
)
Never log expected behavior at `ERROR`. A user entering a wrong password is `INFO`, not `ERROR`.logger.error(
"Payment processing failed",
order_id=order.id,
error=str(e),
payment_provider="stripe",
)
切勿将预期行为记录为`ERROR`级别。用户输入错误密码属于`INFO`级别,而非`ERROR`。Pattern 4: Correlation ID Propagation
模式4:关联ID传播
Generate a unique ID at ingress and thread it through all operations.
python
from contextvars import ContextVar
import uuid
import structlog
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
def set_correlation_id(cid: str | None = None) -> str:
"""Set correlation ID for current context."""
cid = cid or str(uuid.uuid4())
correlation_id.set(cid)
structlog.contextvars.bind_contextvars(correlation_id=cid)
return cid在入口处生成唯一ID,并在所有操作中传递该ID。
python
from contextvars import ContextVar
import uuid
import structlog
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
def set_correlation_id(cid: str | None = None) -> str:
"""Set correlation ID for current context."""
cid = cid or str(uuid.uuid4())
correlation_id.set(cid)
structlog.contextvars.bind_contextvars(correlation_id=cid)
return cidFastAPI middleware example
FastAPI middleware example
from fastapi import Request
async def correlation_middleware(request: Request, call_next):
"""Middleware to set and propagate correlation ID."""
# Use incoming header or generate new
cid = request.headers.get("X-Correlation-ID") or str(uuid.uuid4())
set_correlation_id(cid)
response = await call_next(request)
response.headers["X-Correlation-ID"] = cid
return response
Propagate to outbound requests:
```python
import httpx
async def call_downstream_service(endpoint: str, data: dict) -> dict:
"""Call downstream service with correlation ID."""
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
json=data,
headers={"X-Correlation-ID": correlation_id.get()},
)
return response.json()from fastapi import Request
async def correlation_middleware(request: Request, call_next):
"""Middleware to set and propagate correlation ID."""
# Use incoming header or generate new
cid = request.headers.get("X-Correlation-ID") or str(uuid.uuid4())
set_correlation_id(cid)
response = await call_next(request)
response.headers["X-Correlation-ID"] = cid
return response
在对外请求中传播关联ID:
```python
import httpx
async def call_downstream_service(endpoint: str, data: dict) -> dict:
"""Call downstream service with correlation ID."""
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
json=data,
headers={"X-Correlation-ID": correlation_id.get()},
)
return response.json()Advanced Patterns
进阶模式
Pattern 5: The Four Golden Signals with Prometheus
模式5:基于Prometheus的四大黄金信号
Track these metrics for every service boundary:
python
from prometheus_client import Counter, Histogram, Gauge为每个服务边界跟踪以下指标:
python
from prometheus_client import Counter, Histogram, GaugeLatency: How long requests take
Latency: How long requests take
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"Request latency in seconds",
["method", "endpoint", "status"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],
)
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"Request latency in seconds",
["method", "endpoint", "status"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],
)
Traffic: Request rate
Traffic: Request rate
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"],
)
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"],
)
Errors: Error rate
Errors: Error rate
ERROR_COUNT = Counter(
"http_errors_total",
"Total HTTP errors",
["method", "endpoint", "error_type"],
)
ERROR_COUNT = Counter(
"http_errors_total",
"Total HTTP errors",
["method", "endpoint", "error_type"],
)
Saturation: Resource utilization
Saturation: Resource utilization
DB_POOL_USAGE = Gauge(
"db_connection_pool_used",
"Number of database connections in use",
)
Instrument your endpoints:
```python
import time
from functools import wraps
def track_request(func):
"""Decorator to track request metrics."""
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
method = request.method
endpoint = request.url.path
start = time.perf_counter()
try:
response = await func(request, *args, **kwargs)
status = str(response.status_code)
return response
except Exception as e:
status = "500"
ERROR_COUNT.labels(
method=method,
endpoint=endpoint,
error_type=type(e).__name__,
).inc()
raise
finally:
duration = time.perf_counter() - start
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
REQUEST_LATENCY.labels(method=method, endpoint=endpoint, status=status).observe(duration)
return wrapperDB_POOL_USAGE = Gauge(
"db_connection_pool_used",
"Number of database connections in use",
)
为端点添加指标埋点:
```python
import time
from functools import wraps
def track_request(func):
"""Decorator to track request metrics."""
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
method = request.method
endpoint = request.url.path
start = time.perf_counter()
try:
response = await func(request, *args, **kwargs)
status = str(response.status_code)
return response
except Exception as e:
status = "500"
ERROR_COUNT.labels(
method=method,
endpoint=endpoint,
error_type=type(e).__name__,
).inc()
raise
finally:
duration = time.perf_counter() - start
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
REQUEST_LATENCY.labels(method=method, endpoint=endpoint, status=status).observe(duration)
return wrapperPattern 6: Bounded Cardinality
模式6:有限基数
Avoid labels with unbounded values to prevent metric explosion.
python
undefined避免使用具有无限值的标签,防止指标数量激增。
python
undefinedBAD: User ID has potentially millions of values
BAD: User ID has potentially millions of values
REQUEST_COUNT.labels(method="GET", user_id=user.id) # Don't do this!
REQUEST_COUNT.labels(method="GET", user_id=user.id) # Don't do this!
GOOD: Bounded values only
GOOD: Bounded values only
REQUEST_COUNT.labels(method="GET", endpoint="/users", status="200")
REQUEST_COUNT.labels(method="GET", endpoint="/users", status="200")
If you need per-user metrics, use a different approach:
If you need per-user metrics, use a different approach:
- Log the user_id and query logs
- Log the user_id and query logs
- Use a separate analytics system
- Use a separate analytics system
- Bucket users by type/tier
- Bucket users by type/tier
REQUEST_COUNT.labels(
method="GET",
endpoint="/users",
user_tier="premium", # Bounded set of values
)
undefinedREQUEST_COUNT.labels(
method="GET",
endpoint="/users",
user_tier="premium", # Bounded set of values
)
undefinedPattern 7: Timed Operations with Context Manager
模式7:使用上下文管理器计时操作
Create a reusable timing context manager for operations.
python
from contextlib import contextmanager
import time
import structlog
logger = structlog.get_logger()
@contextmanager
def timed_operation(name: str, **extra_fields):
"""Context manager for timing and logging operations."""
start = time.perf_counter()
logger.debug("Operation started", operation=name, **extra_fields)
try:
yield
except Exception as e:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.error(
"Operation failed",
operation=name,
duration_ms=round(elapsed_ms, 2),
error=str(e),
**extra_fields,
)
raise
else:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.info(
"Operation completed",
operation=name,
duration_ms=round(elapsed_ms, 2),
**extra_fields,
)创建可复用的计时上下文管理器,用于各类操作。
python
from contextlib import contextmanager
import time
import structlog
logger = structlog.get_logger()
@contextmanager
def timed_operation(name: str, **extra_fields):
"""Context manager for timing and logging operations."""
start = time.perf_counter()
logger.debug("Operation started", operation=name, **extra_fields)
try:
yield
except Exception as e:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.error(
"Operation failed",
operation=name,
duration_ms=round(elapsed_ms, 2),
error=str(e),
**extra_fields,
)
raise
else:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.info(
"Operation completed",
operation=name,
duration_ms=round(elapsed_ms, 2),
**extra_fields,
)Usage
Usage
with timed_operation("fetch_user_orders", user_id=user.id):
orders = await order_repository.get_by_user(user.id)
undefinedwith timed_operation("fetch_user_orders", user_id=user.id):
orders = await order_repository.get_by_user(user.id)
undefinedPattern 8: OpenTelemetry Tracing
模式8:OpenTelemetry追踪
Set up distributed tracing with OpenTelemetry.
Note: OpenTelemetry is actively evolving. Check the official Python documentation for the latest API patterns and best practices.
python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
def configure_tracing(service_name: str, otlp_endpoint: str) -> None:
"""Configure OpenTelemetry tracing."""
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
async def process_order(order_id: str) -> Order:
"""Process order with tracing."""
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
with tracer.start_as_current_span("validate_order"):
validate_order(order_id)
with tracer.start_as_current_span("charge_payment"):
charge_payment(order_id)
with tracer.start_as_current_span("send_confirmation"):
send_confirmation(order_id)
return order使用OpenTelemetry设置分布式追踪。
**注意:**OpenTelemetry正处于活跃发展中。请查看官方Python文档获取最新的API模式和最佳实践。
python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
def configure_tracing(service_name: str, otlp_endpoint: str) -> None:
"""Configure OpenTelemetry tracing."""
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
async def process_order(order_id: str) -> Order:
"""Process order with tracing."""
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
with tracer.start_as_current_span("validate_order"):
validate_order(order_id)
with tracer.start_as_current_span("charge_payment"):
charge_payment(order_id)
with tracer.start_as_current_span("send_confirmation"):
send_confirmation(order_id)
return orderBest Practices Summary
最佳实践总结
- Use structured logging - JSON logs with consistent fields
- Propagate correlation IDs - Thread through all requests and logs
- Track the four golden signals - Latency, traffic, errors, saturation
- Bound label cardinality - Never use unbounded values as metric labels
- Log at appropriate levels - Don't cry wolf with ERROR
- Include context - User ID, request ID, operation name in logs
- Use context managers - Consistent timing and error handling
- Separate concerns - Observability code shouldn't pollute business logic
- Test your observability - Verify logs and metrics in integration tests
- Set up alerts - Metrics are useless without alerting
- 使用结构化日志 - 带有一致字段的JSON日志
- 传播关联ID - 在所有请求和日志中传递关联ID
- 跟踪四大黄金信号 - 延迟、流量、错误、饱和度
- 限制标签基数 - 切勿使用无限值作为指标标签
- 使用合适的日志级别 - 不要用ERROR级别发出虚假警报
- 包含上下文信息 - 日志中包含用户ID、请求ID、操作名称
- 使用上下文管理器 - 实现一致的计时和错误处理
- 分离关注点 - 可观测性代码不应污染业务逻辑
- 测试可观测性功能 - 在集成测试中验证日志和指标
- 设置告警 - 没有告警的指标毫无用处