logging-observability
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseLogging & Observability Skill
日志与可观测性技能
Activate when working with logging systems, distributed tracing, debugging, monitoring, or any observability-related tasks across applications.
在处理跨应用的日志系统、分布式追踪、调试、监控或任何与可观测性相关的任务时启用本技能。
1. Logging Best Practices
1. 日志最佳实践
Log Levels
日志级别
Use appropriate log levels for different severity:
| Level | Severity | When to Use |
|---|---|---|
| DEBUG | Low | Development only - detailed info, variable states, control flow. Use sparingly in production. |
| INFO | Low | Important application lifecycle events - startup, shutdown, config loaded, user actions, key state changes. |
| WARN | Medium | Recoverable issues - deprecated usage, resource constraints, unexpected but handled conditions. Investigate later. |
| ERROR | High | Unrecoverable problems - exceptions, failed operations, missing required data. Requires immediate attention. |
| FATAL | Critical | System-level failures - abort conditions, out of memory, unrecoverable state. System may crash. |
为不同严重程度使用合适的日志级别:
| 级别 | 严重程度 | 使用场景 |
|---|---|---|
| DEBUG | 低 | 仅用于开发环境 - 详细信息、变量状态、控制流。生产环境中慎用。 |
| INFO | 低 | 重要的应用生命周期事件 - 启动、关闭、配置加载、用户操作、关键状态变更。 |
| WARN | 中 | 可恢复的问题 - 已弃用用法、资源限制、意外但已处理的情况。可稍后调查。 |
| ERROR | 高 | 不可恢复的问题 - 异常、失败的操作、缺失的必要数据。需要立即关注。 |
| FATAL | 关键 | 系统级故障 - 终止条件、内存不足、不可恢复状态。系统可能崩溃。 |
General Principles
通用原则
- Actionable: Logs should help diagnose problems, not just record events
- Contextual: Include enough context to understand what happened without code inspection
- Consistent: Use same terminology across codebase for same events
- Sparse: Don't log everything - unnecessary noise obscures real issues
- Sampling: In high-volume scenarios, sample logs (10%, 1%, etc.) rather than logging everything
- Structured: Always use structured format (JSON) for programmatic parsing
- 可行动性:日志应有助于诊断问题,而非仅记录事件
- 上下文完整:包含足够上下文,无需查看代码即可了解事件情况
- 一致性:代码库中对同一事件使用统一术语
- 精简性:不要记录所有内容,不必要的噪音会掩盖真实问题
- 采样:在高流量场景下,对日志进行采样(如10%、1%等),而非记录全部日志
- 结构化:始终使用结构化格式(JSON)以便程序解析
2. Structured Logging Format
2. 结构化日志格式
Standard Fields
标准字段
Every log entry should include:
json
{
"timestamp": "2025-11-17T10:30:45.123Z",
"level": "ERROR",
"message": "Failed to process user request",
"service": "auth-service",
"version": "1.2.3",
"environment": "production",
"trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
"span_id": "00f067aa0ba902b7",
"parent_span_id": "0af7651916cd43dd",
"user_id": "user-12345",
"request_id": "req-98765",
"path": "/api/users/authenticate",
"method": "POST",
"status_code": 500,
"error": {
"type": "InvalidCredentialsError",
"message": "Provided credentials do not match",
"stack": "Error: InvalidCredentialsError...",
"code": "AUTH_INVALID_CREDS"
},
"context": {
"ip_address": "192.168.1.100",
"user_agent": "Mozilla/5.0...",
"attempt_number": 3,
"rate_limit_remaining": 2
},
"duration_ms": 245,
"custom_field": "custom_value"
}每条日志条目应包含以下内容:
json
{
"timestamp": "2025-11-17T10:30:45.123Z",
"level": "ERROR",
"message": "Failed to process user request",
"service": "auth-service",
"version": "1.2.3",
"environment": "production",
"trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
"span_id": "00f067aa0ba902b7",
"parent_span_id": "0af7651916cd43dd",
"user_id": "user-12345",
"request_id": "req-98765",
"path": "/api/users/authenticate",
"method": "POST",
"status_code": 500,
"error": {
"type": "InvalidCredentialsError",
"message": "Provided credentials do not match",
"stack": "Error: InvalidCredentialsError...",
"code": "AUTH_INVALID_CREDS"
},
"context": {
"ip_address": "192.168.1.100",
"user_agent": "Mozilla/5.0...",
"attempt_number": 3,
"rate_limit_remaining": 2
},
"duration_ms": 245,
"custom_field": "custom_value"
}Required vs Optional Fields
必填与可选字段
Always include:
- timestamp
- level
- message
- trace_id
- service
- environment
When applicable:
- span_id / parent_span_id (distributed tracing)
- user_id (any user action)
- request_id (any request)
- error (on ERROR/FATAL)
- duration_ms (operations)
- context (relevant metadata)
必须包含:
- timestamp
- level
- message
- trace_id
- service
- environment
适用时包含:
- span_id / parent_span_id(分布式追踪)
- user_id(任何用户操作)
- request_id(任何请求)
- error(ERROR/FATAL级别时)
- duration_ms(耗时操作)
- context(相关元数据)
3. What to Log
3. 应记录的内容
Application Lifecycle
应用生命周期
json
// Startup
{"timestamp": "...", "level": "INFO", "message": "Service starting", "service": "auth-service", "version": "1.2.3"}
// Configuration loaded
{"timestamp": "...", "level": "INFO", "message": "Configuration loaded", "config_source": "environment", "environment": "production"}
// Database connection established
{"timestamp": "...", "level": "INFO", "message": "Database connected", "host": "db.internal", "pool_size": 20}
// Shutdown
{"timestamp": "...", "level": "INFO", "message": "Service shutting down", "reason": "SIGTERM", "uptime_seconds": 3600}json
// 启动
{"timestamp": "...", "level": "INFO", "message": "服务启动中", "service": "auth-service", "version": "1.2.3"}
// 配置加载完成
{"timestamp": "...", "level": "INFO", "message": "配置已加载", "config_source": "environment", "environment": "production"}
// 数据库连接建立
{"timestamp": "...", "level": "INFO", "message": "数据库已连接", "host": "db.internal", "pool_size": 20}
// 关闭
{"timestamp": "...", "level": "INFO", "message": "服务正在关闭", "reason": "SIGTERM", "uptime_seconds": 3600}User Actions
用户操作
json
// Login attempt
{"timestamp": "...", "level": "INFO", "message": "User login attempt", "user_id": "user-123", "method": "password"}
// Data modification
{"timestamp": "...", "level": "INFO", "message": "User updated profile", "user_id": "user-123", "fields_changed": ["email", "name"]}
// Permission check
{"timestamp": "...", "level": "INFO", "message": "Permission check", "user_id": "user-123", "resource": "report-456", "permission": "read", "granted": true}json
// 登录尝试
{"timestamp": "...", "level": "INFO", "message": "用户登录尝试", "user_id": "user-123", "method": "password"}
// 数据修改
{"timestamp": "...", "level": "INFO", "message": "用户更新了个人资料", "user_id": "user-123", "fields_changed": ["email", "name"]}
// 权限检查
{"timestamp": "...", "level": "INFO", "message": "权限检查", "user_id": "user-123", "resource": "report-456", "permission": "read", "granted": true}External API Calls
外部API调用
json
// API call started
{"timestamp": "...", "level": "DEBUG", "message": "External API call", "service": "my-service", "api": "stripe", "endpoint": "/charges", "method": "POST"}
// API response
{"timestamp": "...", "level": "DEBUG", "message": "API response received", "api": "stripe", "endpoint": "/charges", "status_code": 200, "duration_ms": 145}
// API error
{"timestamp": "...", "level": "WARN", "message": "External API error", "api": "stripe", "status_code": 429, "error": "rate_limit_exceeded", "retry_after_seconds": 60}json
// API调用开始
{"timestamp": "...", "level": "DEBUG", "message": "外部API调用", "service": "my-service", "api": "stripe", "endpoint": "/charges", "method": "POST"}
// API响应
{"timestamp": "...", "level": "DEBUG", "message": "收到API响应", "api": "stripe", "endpoint": "/charges", "status_code": 200, "duration_ms": 145}
// API错误
{"timestamp": "...", "level": "WARN", "message": "外部API错误", "api": "stripe", "status_code": 429, "error": "rate_limit_exceeded", "retry_after_seconds": 60}Errors and Exceptions
错误与异常
json
{
"timestamp": "...",
"level": "ERROR",
"message": "Payment processing failed",
"service": "payment-service",
"user_id": "user-456",
"error": {
"type": "PaymentGatewayError",
"message": "Connection timeout",
"code": "GATEWAY_TIMEOUT",
"stack": "PaymentGatewayError: Connection timeout\n at processPayment (payment.ts:45)\n at ..."
},
"context": {
"amount": 9999,
"currency": "USD",
"gateway": "stripe"
}
}json
{
"timestamp": "...",
"level": "ERROR",
"message": "支付处理失败",
"service": "payment-service",
"user_id": "user-456",
"error": {
"type": "PaymentGatewayError",
"message": "连接超时",
"code": "GATEWAY_TIMEOUT",
"stack": "PaymentGatewayError: Connection timeout\n at processPayment (payment.ts:45)\n at ..."
},
"context": {
"amount": 9999,
"currency": "USD",
"gateway": "stripe"
}
}Performance Metrics
性能指标
json
// Slow operation
{"timestamp": "...", "level": "WARN", "message": "Slow query detected", "duration_ms": 5234, "threshold_ms": 1000, "query": "SELECT * FROM orders WHERE..."}
// Resource usage
{"timestamp": "...", "level": "INFO", "message": "Memory usage high", "memory_used_mb": 2048, "memory_limit_mb": 2560, "percentage": 80}
// Cache statistics
{"timestamp": "...", "level": "DEBUG", "message": "Cache stats", "cache_hits": 4521, "cache_misses": 234, "hit_rate": 0.95}json
// 慢操作
{"timestamp": "...", "level": "WARN", "message": "检测到慢查询", "duration_ms": 5234, "threshold_ms": 1000, "query": "SELECT * FROM orders WHERE..."}
// 资源使用情况
{"timestamp": "...", "level": "INFO", "message": "内存使用率过高", "memory_used_mb": 2048, "memory_limit_mb": 2560, "percentage": 80}
// 缓存统计
{"timestamp": "...", "level": "DEBUG", "message": "缓存统计", "cache_hits": 4521, "cache_misses": 234, "hit_rate": 0.95}4. What NOT to Log
4. 不应记录的内容
NEVER log:
- Passwords or authentication tokens
- API keys or secrets
- Private keys or certificates
- Database credentials
- OAuth tokens or refresh tokens
- Credit card numbers
- Social security numbers
- Email addresses (without redaction in logs)
- Personal identification numbers
- Medical records
- Raw HTTP request/response bodies (especially with auth headers)
Be careful with:
- PII in general (name, phone, address) - redact or use anonymized IDs
- Query parameters (may contain secrets)
- Request/response headers (often contain authorization)
- User input (may contain sensitive data)
Security rule: When in doubt, DON'T log it
python
undefined绝对禁止记录:
- 密码或认证令牌
- API密钥或机密信息
- 私钥或证书
- 数据库凭证
- OAuth令牌或刷新令牌
- 信用卡号
- 社会保障号
- 电子邮件地址(日志中未脱敏时)
- 个人识别号码
- 医疗记录
- 原始HTTP请求/响应体(尤其是包含认证头的情况)
需谨慎处理:
- 一般个人可识别信息(PII,如姓名、电话、地址)- 脱敏或使用匿名ID
- 查询参数(可能包含机密信息)
- 请求/响应头(通常包含授权信息)
- 用户输入(可能包含敏感数据)
安全规则:存疑时,请勿记录
python
undefinedBAD - logging credentials
错误示例 - 记录凭证
logger.info(f"Login attempt for {username} with password {password}")
logger.info(f"Login attempt for {username} with password {password}")
GOOD - logging action without sensitive data
正确示例 - 记录操作但不包含敏感数据
logger.info("Login attempt", extra={"username": username, "method": "password"})
logger.info("Login attempt", extra={"username": username, "method": "password"})
BAD - logging full request with auth header
错误示例 - 记录包含认证头的完整请求
logger.debug(f"Request: {request.headers}")
logger.debug(f"Request: {request.headers}")
GOOD - logging request metadata
正确示例 - 记录请求元数据
logger.debug("Incoming request", extra={
"method": request.method,
"path": request.path,
"user_agent": request.headers.get('user-agent')
})
undefinedlogger.debug("Incoming request", extra={
"method": request.method,
"path": request.path,
"user_agent": request.headers.get('user-agent')
})
undefined5. Distributed Tracing
5. 分布式追踪
Trace IDs and Span IDs
Trace ID与Span ID
- Trace ID: Unique identifier for entire request flow across services
- Span ID: Unique identifier for single operation/service call
- Parent Span ID: Span that initiated current span (for tracing parent-child relationships)
Generated once at entry point, propagated through all downstream calls:
Request → [Service A, Trace: abc123]
├─ [Span: span1] Database query
├─ [Span: span2] → Service B, parent: span2
└─ [Span: span3] Cache lookup
└─ [Span: span4] External API call- Trace ID:跨服务的整个请求流程的唯一标识符
- Span ID:单个操作/服务调用的唯一标识符
- Parent Span ID:发起当前Span的父Span(用于追踪父子关系)
在入口点生成一次,通过所有下游调用传播:
请求 → [服务A, Trace: abc123]
├─ [Span: span1] 数据库查询
├─ [Span: span2] → 服务B, parent: span2
└─ [Span: span3] 缓存查找
└─ [Span: span4] 外部API调用Implementation
实现示例
python
undefinedpython
undefinedPython example with trace context
带追踪上下文的Python示例
import uuid
class RequestContext:
def init(self, trace_id=None, span_id=None, parent_span_id=None):
self.trace_id = trace_id or str(uuid.uuid4())
self.span_id = span_id or str(uuid.uuid4())
self.parent_span_id = parent_span_id
import uuid
class RequestContext:
def init(self, trace_id=None, span_id=None, parent_span_id=None):
self.trace_id = trace_id or str(uuid.uuid4())
self.span_id = span_id or str(uuid.uuid4())
self.parent_span_id = parent_span_id
Middleware/decorator
中间件/装饰器
def trace_request(func):
def wrapper(*args, **kwargs):
ctx = RequestContext()
return func(*args, context=ctx, **kwargs)
return wrapper
def trace_request(func):
def wrapper(*args, **kwargs):
ctx = RequestContext()
return func(*args, context=ctx, **kwargs)
return wrapper
Propagate to downstream services
向下游服务传播上下文
def call_downstream_service(service_url, data, context):
headers = {
'X-Trace-ID': context.trace_id,
'X-Span-ID': context.span_id,
'X-Parent-Span-ID': context.span_id # Current becomes parent
}
response = requests.post(service_url, json=data, headers=headers)
return response
undefineddef call_downstream_service(service_url, data, context):
headers = {
'X-Trace-ID': context.trace_id,
'X-Span-ID': context.span_id,
'X-Parent-Span-ID': context.span_id # 当前Span成为父Span
}
response = requests.post(service_url, json=data, headers=headers)
return response
undefinedSampling Strategies
采样策略
- No sampling: Log all traces (high volume services may be expensive)
- Rate sampling: Log every Nth request (e.g., 1 in 100)
- Adaptive sampling: Sample based on error rate, latency, or traffic volume
- Tail sampling: Sample based on trace outcome (errors always sampled)
python
undefined- 无采样:记录所有追踪(高流量服务可能成本高昂)
- 比率采样:每N个请求记录一次(如每100个请求记录1个)
- 自适应采样:基于错误率、延迟或流量采样
- 尾部采样:基于追踪结果采样(错误请求始终采样)
python
undefinedAdaptive sampling example
自适应采样示例
def should_sample(trace):
# Always sample errors
if trace.has_error:
return True
# Sample slow requests (>1s)
if trace.duration_ms > 1000:
return True
# Sample 1% of normal requests
return random.random() < 0.01undefineddef should_sample(trace):
# 错误请求始终采样
if trace.has_error:
return True
# 慢请求(>1秒)采样
if trace.duration_ms > 1000:
return True
# 正常请求采样1%
return random.random() < 0.01undefined6. Performance Logging
6. 性能日志
Execution Time
执行时间
python
import time
def log_execution_time(func):
def wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
duration_ms = (time.time() - start) * 1000
logger.info(f"{func.__name__} completed", extra={
"duration_ms": duration_ms,
"status": "success"
})
return result
except Exception as e:
duration_ms = (time.time() - start) * 1000
logger.error(f"{func.__name__} failed", extra={
"duration_ms": duration_ms,
"error": str(e)
})
raise
return wrapperpython
import time
def log_execution_time(func):
def wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
duration_ms = (time.time() - start) * 1000
logger.info(f"{func.__name__} 执行完成", extra={
"duration_ms": duration_ms,
"status": "success"
})
return result
except Exception as e:
duration_ms = (time.time() - start) * 1000
logger.error(f"{func.__name__} 执行失败", extra={
"duration_ms": duration_ms,
"error": str(e)
})
raise
return wrapperResource Usage
资源使用情况
python
import psutil
import os
def log_resource_usage():
process = psutil.Process(os.getpid())
memory = process.memory_info()
logger.info("Resource usage", extra={
"memory_rss_mb": memory.rss / 1024 / 1024,
"memory_vms_mb": memory.vms / 1024 / 1024,
"cpu_percent": process.cpu_percent(interval=1),
"num_threads": process.num_threads()
})python
import psutil
import os
def log_resource_usage():
process = psutil.Process(os.getpid())
memory = process.memory_info()
logger.info("资源使用情况", extra={
"memory_rss_mb": memory.rss / 1024 / 1024,
"memory_vms_mb": memory.vms / 1024 / 1024,
"cpu_percent": process.cpu_percent(interval=1),
"num_threads": process.num_threads()
})Slow Query Logs
慢查询日志
python
undefinedpython
undefinedTrack database query performance
追踪数据库查询性能
SLOW_QUERY_THRESHOLD_MS = 1000
def execute_query(query, params):
start = time.time()
cursor.execute(query, params)
duration_ms = (time.time() - start) * 1000
if duration_ms > SLOW_QUERY_THRESHOLD_MS:
logger.warn("Slow query detected", extra={
"query": query,
"params_count": len(params),
"duration_ms": duration_ms,
"threshold_ms": SLOW_QUERY_THRESHOLD_MS
})
return cursor.fetchall()undefinedSLOW_QUERY_THRESHOLD_MS = 1000
def execute_query(query, params):
start = time.time()
cursor.execute(query, params)
duration_ms = (time.time() - start) * 1000
if duration_ms > SLOW_QUERY_THRESHOLD_MS:
logger.warn("检测到慢查询", extra={
"query": query,
"params_count": len(params),
"duration_ms": duration_ms,
"threshold_ms": SLOW_QUERY_THRESHOLD_MS
})
return cursor.fetchall()undefined7. Debugging Patterns
7. 调试模式
Debug Logging
调试日志
Use DEBUG level for development/troubleshooting only:
python
logger.debug("Function entry", extra={
"function": "process_payment",
"args": {"amount": 100, "currency": "USD"}
})
logger.debug("Intermediate state", extra={
"processing_step": "validation",
"validation_passed": True,
"timestamp": time.time()
})
logger.debug("Function exit", extra={
"function": "process_payment",
"return_value": {"transaction_id": "txn-123", "status": "pending"}
})仅在开发/故障排查时使用DEBUG级别:
python
logger.debug("函数入口", extra={
"function": "process_payment",
"args": {"amount": 100, "currency": "USD"}
})
logger.debug("中间状态", extra={
"processing_step": "validation",
"validation_passed": True,
"timestamp": time.time()
})
logger.debug("函数出口", extra={
"function": "process_payment",
"return_value": {"transaction_id": "txn-123", "status": "pending"}
})Conditional Breakpoints
条件断点
In IDE debugger (VS Code, PyCharm, etc.):
python
undefined在IDE调试器(VS Code、PyCharm等)中使用:
python
undefinedSet breakpoint with condition
设置带条件的断点
Debugger pauses only when condition is true
仅当条件为真时调试器暂停
if user_id == "debug-user-123": # Breakpoint here with condition: amount > 1000
processor.process(order)
undefinedif user_id == "debug-user-123": # 在此处设置断点,条件为:amount > 1000
processor.process(order)
undefinedRemote Debugging
远程调试
Python example:
python
undefinedPython示例:
python
undefinedStart remote debugger (debugpy)
启动远程调试器(debugpy)
import debugpy
debugpy.listen(("0.0.0.0", 5678))
print("Debugger attached, waiting for connection...")
debugpy.wait_for_client()
import debugpy
debugpy.listen(("0.0.0.0", 5678))
print("调试器已启动,等待连接...")
debugpy.wait_for_client()
Then connect from IDE on same port
随后从IDE通过相同端口连接
undefinedundefinedLog Aggregation for Debugging
用于调试的日志聚合
python
undefinedpython
undefinedRetrieve logs for specific trace
获取特定trace的日志
def get_trace_logs(trace_id):
query = f"SELECT * FROM logs WHERE trace_id = '{trace_id}' ORDER BY timestamp"
# Execute against log storage (ELK, Loki, etc.)
return results
def get_trace_logs(trace_id):
query = f"SELECT * FROM logs WHERE trace_id = '{trace_id}' ORDER BY timestamp"
# 针对日志存储执行查询(如ELK、Loki等)
return results
Filter by user for debugging user issues
按用户过滤日志以排查用户问题
def get_user_logs(user_id, hours=1):
query = f"SELECT * FROM logs WHERE user_id = '{user_id}' AND timestamp > now() - {hours}h"
return results
undefineddef get_user_logs(user_id, hours=1):
query = f"SELECT * FROM logs WHERE user_id = '{user_id}' AND timestamp > now() - {hours}h"
return results
undefined8. Log Management
8. 日志管理
Log Rotation
日志轮转
Prevent unbounded disk usage:
python
undefined防止磁盘空间无限占用:
python
undefinedPython logging with rotation
带轮转的Python日志配置
from logging.handlers import RotatingFileHandler
handler = RotatingFileHandler(
filename='app.log',
maxBytes=10485760, # 10MB
backupCount=5 # Keep 5 rotated files
)
from logging.handlers import RotatingFileHandler
handler = RotatingFileHandler(
filename='app.log',
maxBytes=10485760, # 10MB
backupCount=5 # 保留5个轮转文件
)
Backup naming: app.log, app.log.1, app.log.2, etc.
备份文件命名:app.log, app.log.1, app.log.2, 等
undefinedundefinedRetention Policies
保留策略
json
{
"retention_policy": {
"DEBUG": "7 days",
"INFO": "30 days",
"WARN": "90 days",
"ERROR": "1 year",
"FATAL": "indefinite"
}
}json
{
"retention_policy": {
"DEBUG": "7天",
"INFO": "30天",
"WARN": "90天",
"ERROR": "1年",
"FATAL": "永久"
}
}Log Aggregation Tools
日志聚合工具
| Tool | Best For | Strengths |
|---|---|---|
| ELK Stack (Elasticsearch, Logstash, Kibana) | On-premise, complex queries | Powerful search, rich dashboards, customizable |
| Grafana Loki | Simple log aggregation, cost-effective | Low overhead, integrates with Prometheus |
| Datadog | Cloud-first, all-in-one | Agent-based, excellent integrations |
| Splunk | Enterprise, security focus | Powerful search, alerting, compliance reports |
| CloudWatch | AWS native | Seamless AWS integration, log groups |
| Stackdriver | GCP native | Google Cloud integration |
| CloudLogging | Azure native | Microsoft ecosystem |
| 工具 | 适用场景 | 优势 |
|---|---|---|
| ELK Stack (Elasticsearch, Logstash, Kibana) | 本地部署、复杂查询 | 强大的搜索功能、丰富的仪表盘、高度可定制 |
| Grafana Loki | 简单日志聚合、高性价比 | 低资源开销、与Prometheus集成 |
| Datadog | 云原生、一体化平台 | 基于Agent、集成能力出色 |
| Splunk | 企业级、安全聚焦 | 强大的搜索、告警、合规报告 |
| CloudWatch | AWS原生 | 与AWS无缝集成、日志组管理 |
| Stackdriver | GCP原生 | 与Google Cloud集成 |
| CloudLogging | Azure原生 | 微软生态系统集成 |
9. Metrics and Monitoring
9. 指标与监控
Application Metrics
应用指标
python
from prometheus_client import Counter, Histogram, Gaugepython
from prometheus_client import Counter, Histogram, GaugeCounter: monotonically increasing
Counter: 单调递增
login_attempts = Counter('login_attempts_total', 'Total login attempts', ['status'])
login_attempts.labels(status='success').inc()
login_attempts = Counter('login_attempts_total', 'Total login attempts', ['status'])
login_attempts.labels(status='success').inc()
Histogram: observe value distribution
Histogram: 观测值分布
request_duration = Histogram('request_duration_seconds', 'Request duration')
request_duration.observe(0.5)
request_duration = Histogram('request_duration_seconds', 'Request duration')
request_duration.observe(0.5)
Gauge: can go up or down
Gauge: 可增可减
active_connections = Gauge('active_connections', 'Current active connections')
active_connections.set(42)
undefinedactive_connections = Gauge('active_connections', 'Current active connections')
active_connections.set(42)
undefinedSystem Metrics
系统指标
python
undefinedpython
undefinedCPU, memory, disk usage
CPU、内存、磁盘使用率
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
logger.info("System metrics", extra={
"cpu_percent": cpu_percent,
"memory_percent": memory.percent,
"disk_percent": disk.percent
})
undefinedcpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
logger.info("系统指标", extra={
"cpu_percent": cpu_percent,
"memory_percent": memory.percent,
"disk_percent": disk.percent
})
undefinedAlerting Rules
告警规则
yaml
undefinedyaml
undefinedPrometheus alert rules
Prometheus告警规则
alert: HighErrorRate
expr: rate(requests_total{status="500"}[5m]) > 0.05
for: 5m
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }} for {{ $labels.service }}"
alert: SlowRequestLatency
expr: histogram_quantile(0.95, request_duration_seconds) > 1
for: 10m
annotations:
summary: "Slow requests detected (p95 > 1s)"
undefinedalert: HighErrorRate
expr: rate(requests_total{status="500"}[5m]) > 0.05
for: 5m
annotations:
summary: "检测到高错误率"
description: "{{ $labels.service }}的错误率为{{ $value | humanizePercentage }}"
alert: SlowRequestLatency
expr: histogram_quantile(0.95, request_duration_seconds) > 1
for: 10m
annotations:
summary: "检测到慢请求(p95 > 1秒)"
undefined10. Common Libraries by Language
10. 各语言常用库
Python
Python
python
undefinedpython
undefinedStandard library logging
标准库logging
import logging
import logging
Structured logging with structlog
结构化日志structlog
import structlog
logger = structlog.get_logger()
logger.info("user_created", user_id="u123", email_domain="example.com")
import structlog
logger = structlog.get_logger()
logger.info("user_created", user_id="u123", email_domain="example.com")
For advanced tracing
高级追踪库
from opentelemetry import trace, logging
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
**Libraries:**
- `logging` - Built-in, basic structured support
- `structlog` - Structured logging, cleaner API
- `python-json-logger` - JSON formatter for standard logging
- `OpenTelemetry` - Distributed tracing standard
- `Jaeger` - Distributed tracing backendfrom opentelemetry import trace, logging
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
**常用库:**
- `logging` - 内置库,基础结构化支持
- `structlog` - 结构化日志,更简洁的API
- `python-json-logger` - 为标准logging提供JSON格式化
- `OpenTelemetry` - 分布式追踪标准
- `Jaeger` - 分布式追踪后端Node.js / TypeScript
Node.js / TypeScript
javascript
// Winston
const winston = require('winston');
const logger = winston.createLogger({
format: winston.format.json(),
transports: [new winston.transports.Console()]
});
logger.info('User logged in', { userId: 'u123' });
// Pino (lightweight)
const pino = require('pino');
const logger = pino();
logger.info({ userId: 'u123' }, 'User logged in');Libraries:
- - Full-featured, very popular
winston - - Lightweight, high performance
pino - - JSON logging, stream-based
bunyan - - HTTP request logger for Express
morgan - - Distributed tracing
OpenTelemetry - - Standard tracing API
@opentelemetry/api
javascript
// Winston
const winston = require('winston');
const logger = winston.createLogger({
format: winston.format.json(),
transports: [new winston.transports.Console()]
});
logger.info('User logged in', { userId: 'u123' });
// Pino(轻量级)
const pino = require('pino');
const logger = pino();
logger.info({ userId: 'u123' }, 'User logged in');常用库:
- - 功能完整,非常流行
winston - - 轻量级,高性能
pino - - JSON日志,基于流
bunyan - - Express的HTTP请求日志器
morgan - - 分布式追踪
OpenTelemetry - - 标准追踪API
@opentelemetry/api
Go
Go
go
// Structured logging with zap
import "go.uber.org/zap"
logger, _ := zap.NewProduction()
defer logger.Sync()
logger.Info("user login",
zap.String("user_id", "u123"),
zap.Duration("duration", time.Second),
)
// Or logrus (JSON support)
import "github.com/sirupsen/logrus"
logger := logrus.New()
logger.SetFormatter(&logrus.JSONFormatter{})
logger.WithFields(logrus.Fields{"user_id": "u123"}).Info("Login")Libraries:
- - High performance, structured
zap - - Popular, JSON output
logrus - - Standard library (Go 1.21+)
slog - - Distributed tracing
OpenTelemetry
go
// 结构化日志zap
import "go.uber.org/zap"
logger, _ := zap.NewProduction()
defer logger.Sync()
logger.Info("user login",
zap.String("user_id", "u123"),
zap.Duration("duration", time.Second),
)
// 或logrus(支持JSON)
import "github.com/sirupsen/logrus"
logger := logrus.New()
logger.SetFormatter(&logrus.JSONFormatter{})
logger.WithFields(logrus.Fields{"user_id": "u123"}).Info("Login")常用库:
- - 高性能,结构化
zap - - 流行,支持JSON输出
logrus - - 标准库(Go 1.21+)
slog - - 分布式追踪
OpenTelemetry
Java / Kotlin
Java / Kotlin
java
// Logback with SLF4J
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import net.logstash.logback.marker.Markers;
Logger logger = LoggerFactory.getLogger(MyClass.class);
// Structured with logback-json-encoder
logger.info(Markers.append("user_id", "u123"), "User logged in");
// Spring Boot with logback (built-in)
@RestController
public class UserController {
private static final Logger logger = LoggerFactory.getLogger(UserController.class);
}Libraries:
- +
SLF4J- Standard comboLogback - - Enterprise feature-rich
Log4j2 - - Structured output
Logstash Logback Encoder - - Distributed tracing
OpenTelemetry
java
// Logback搭配SLF4J
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import net.logstash.logback.marker.Markers;
Logger logger = LoggerFactory.getLogger(MyClass.class);
// 结合logback-json-encoder实现结构化日志
logger.info(Markers.append("user_id", "u123"), "User logged in");
// Spring Boot搭配logback(内置)
@RestController
public class UserController {
private static final Logger logger = LoggerFactory.getLogger(UserController.class);
}常用库:
- +
SLF4J- 标准组合Logback - - 企业级功能丰富
Log4j2 - - 结构化输出
Logstash Logback Encoder - - 分布式追踪
OpenTelemetry
C# / .NET
C# / .NET
csharp
// Serilog (structured)
using Serilog;
Log.Logger = new LoggerConfiguration()
.WriteTo.Console()
.CreateLogger();
Log.Information("User {UserId} logged in", "u123");
// Built-in ILogger with dependency injection
public class UserService {
private readonly ILogger<UserService> _logger;
public UserService(ILogger<UserService> logger) {
_logger = logger;
}
}Libraries:
- - Excellent structured support
Serilog - - Enterprise logging
NLog - - Classic Apache Log4j port
log4net - - Built-in DI support
Microsoft.Extensions.Logging - - Tracing
OpenTelemetry.Exporter.Console
csharp
// Serilog(结构化)
using Serilog;
Log.Logger = new LoggerConfiguration()
.WriteTo.Console()
.CreateLogger();
Log.Information("User {UserId} logged in", "u123");
// 内置ILogger搭配依赖注入
public class UserService {
private readonly ILogger<UserService> _logger;
public UserService(ILogger<UserService> logger) {
_logger = logger;
}
}常用库:
- - 出色的结构化支持
Serilog - - 企业级日志库
NLog - - 经典Apache Log4j移植版
log4net - - 内置依赖注入支持
Microsoft.Extensions.Logging - - 追踪库
OpenTelemetry.Exporter.Console
11. Example Patterns
11. 示例模式
Complete Request Logging Pipeline (Python)
完整请求日志流水线(Python)
python
from datetime import datetime
from uuid import uuid4
import json
import time
import structlogpython
from datetime import datetime
from uuid import uuid4
import json
import time
import structlog
import sysConfigure structlog
配置structlog
structlog.configure(
processors=[
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(file=sys.stdout),
)
class RequestLogger:
def init(self):
self.logger = structlog.get_logger()
def log_request_start(self, request):
trace_id = request.headers.get('X-Trace-ID') or str(uuid4())
span_id = str(uuid4())
self.logger.info(
"request_started",
trace_id=trace_id,
span_id=span_id,
method=request.method,
path=request.path,
user_id=request.user_id,
)
return trace_id, span_id
def log_request_complete(self, trace_id, span_id, status, duration_ms):
level = "info" if status < 400 else "warn" if status < 500 else "error"
self.logger.log(
level,
"request_completed",
trace_id=trace_id,
span_id=span_id,
status_code=status,
duration_ms=duration_ms,
)
def log_error(self, trace_id, span_id, error, context=None):
self.logger.error(
"request_error",
trace_id=trace_id,
span_id=span_id,
error_type=type(error).__name__,
error_message=str(error),
error_context=context or {},
)structlog.configure(
processors=[
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(file=sys.stdout),
)
class RequestLogger:
def init(self):
self.logger = structlog.get_logger()
def log_request_start(self, request):
trace_id = request.headers.get('X-Trace-ID') or str(uuid4())
span_id = str(uuid4())
self.logger.info(
"request_started",
trace_id=trace_id,
span_id=span_id,
method=request.method,
path=request.path,
user_id=request.user_id,
)
return trace_id, span_id
def log_request_complete(self, trace_id, span_id, status, duration_ms):
level = "info" if status < 400 else "warn" if status < 500 else "error"
self.logger.log(
level,
"request_completed",
trace_id=trace_id,
span_id=span_id,
status_code=status,
duration_ms=duration_ms,
)
def log_error(self, trace_id, span_id, error, context=None):
self.logger.error(
"request_error",
trace_id=trace_id,
span_id=span_id,
error_type=type(error).__name__,
error_message=str(error),
error_context=context or {},
)Flask integration
Flask集成
app = Flask(name)
req_logger = RequestLogger()
@app.before_request
def before_request():
request.trace_id, request.span_id = req_logger.log_request_start(request)
request.start_time = time.time()
@app.after_request
def after_request(response):
duration_ms = (time.time() - request.start_time) * 1000
req_logger.log_request_complete(
request.trace_id,
request.span_id,
response.status_code,
duration_ms
)
return response
@app.errorhandler(Exception)
def handle_error(error):
req_logger.log_error(
request.trace_id,
request.span_id,
error,
context={"path": request.path}
)
return {"error": "Internal server error"}, 500
undefinedfrom flask import Flask, request
app = Flask(name)
req_logger = RequestLogger()
@app.before_request
def before_request():
request.trace_id, request.span_id = req_logger.log_request_start(request)
request.start_time = time.time()
@app.after_request
def after_request(response):
duration_ms = (time.time() - request.start_time) * 1000
req_logger.log_request_complete(
request.trace_id,
request.span_id,
response.status_code,
duration_ms
)
return response
@app.errorhandler(Exception)
def handle_error(error):
req_logger.log_error(
request.trace_id,
request.span_id,
error,
context={"path": request.path}
)
return {"error": "Internal server error"}, 500
undefinedDistributed Tracing Example (Node.js)
分布式追踪示例(Node.js)
typescript
import { trace, context, SpanStatusCode } from '@opentelemetry/api';
import { NodeSDK } from '@opentelemetry/sdk-node';
import { JaegerExporter } from '@opentelemetry/exporter-jaeger-thrift';
const sdk = new NodeSDK({
traceExporter: new JaegerExporter({
host: process.env.JAEGER_HOST || 'localhost',
port: parseInt(process.env.JAEGER_PORT || '6831'),
}),
});
sdk.start();
const tracer = trace.getTracer('my-service');
async function processPayment(userId: string, amount: number) {
const span = tracer.startSpan('processPayment', {
attributes: {
'user_id': userId,
'amount': amount,
'currency': 'USD',
}
});
return context.with(trace.setSpan(context.active(), span), async () => {
try {
// Nested span
const validationSpan = tracer.startSpan('validatePayment');
try {
await validatePayment(userId, amount);
validationSpan.setStatus({ code: SpanStatusCode.OK });
} catch (error) {
validationSpan.recordException(error);
validationSpan.setStatus({ code: SpanStatusCode.ERROR });
throw error;
} finally {
validationSpan.end();
}
// Call external service with trace propagation
const result = await callPaymentGateway(amount);
span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (error) {
span.recordException(error);
span.setStatus({ code: SpanStatusCode.ERROR });
throw error;
} finally {
span.end();
}
});
}typescript
import { trace, context, SpanStatusCode } from '@opentelemetry/api';
import { NodeSDK } from '@opentelemetry/sdk-node';
import { JaegerExporter } from '@opentelemetry/exporter-jaeger-thrift';
const sdk = new NodeSDK({
traceExporter: new JaegerExporter({
host: process.env.JAEGER_HOST || 'localhost',
port: parseInt(process.env.JAEGER_PORT || '6831'),
}),
});
sdk.start();
const tracer = trace.getTracer('my-service');
async function processPayment(userId: string, amount: number) {
const span = tracer.startSpan('processPayment', {
attributes: {
'user_id': userId,
'amount': amount,
'currency': 'USD',
}
});
return context.with(trace.setSpan(context.active(), span), async () => {
try {
// 嵌套Span
const validationSpan = tracer.startSpan('validatePayment');
try {
await validatePayment(userId, amount);
validationSpan.setStatus({ code: SpanStatusCode.OK });
} catch (error) {
validationSpan.recordException(error);
validationSpan.setStatus({ code: SpanStatusCode.ERROR });
throw error;
} finally {
validationSpan.end();
}
// 调用外部服务并传播追踪上下文
const result = await callPaymentGateway(amount);
span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (error) {
span.recordException(error);
span.setStatus({ code: SpanStatusCode.ERROR });
throw error;
} finally {
span.end();
}
});
}Security-Conscious Logging (Go)
安全合规日志(Go)
go
package main
import (
"go.uber.org/zap"
"net/http"
)
// RedactSensitive removes sensitive fields from log data
func RedactSensitive(data map[string]interface{}) map[string]interface{} {
sensitiveKeys := []string{"password", "api_key", "token", "credit_card", "ssn"}
for _, key := range sensitiveKeys {
if _, exists := data[key]; exists {
data[key] = "[REDACTED]"
}
}
return data
}
func LogRequest(logger *zap.Logger, r *http.Request) {
// Extract safe headers only
safeHeaders := map[string]string{
"user-agent": r.Header.Get("User-Agent"),
"content-type": r.Header.Get("Content-Type"),
}
logger.Info("incoming request",
zap.String("method", r.Method),
zap.String("path", r.URL.Path),
zap.Any("headers", safeHeaders),
zap.String("remote_addr", r.RemoteAddr),
)
}
func LogError(logger *zap.Logger, err error, context map[string]interface{}) {
logger.Error("operation failed",
zap.Error(err),
zap.Any("context", RedactSensitive(context)),
)
}go
package main
import (
"go.uber.org/zap"
"net/http"
)
// RedactSensitive 从日志数据中移除敏感字段
func RedactSensitive(data map[string]interface{}) map[string]interface{} {
sensitiveKeys := []string{"password", "api_key", "token", "credit_card", "ssn"}
for _, key := range sensitiveKeys {
if _, exists := data[key]; exists {
data[key] = "[已脱敏]"
}
}
return data
}
func LogRequest(logger *zap.Logger, r *http.Request) {
// 仅提取安全的请求头
safeHeaders := map[string]string{
"user-agent": r.Header.Get("User-Agent"),
"content-type": r.Header.Get("Content-Type"),
}
logger.Info("收到请求",
zap.String("method", r.Method),
zap.String("path", r.URL.Path),
zap.Any("headers", safeHeaders),
zap.String("remote_addr", r.RemoteAddr),
)
}
func LogError(logger *zap.Logger, err error, context map[string]interface{}) {
logger.Error("操作失败",
zap.Error(err),
zap.Any("context", RedactSensitive(context)),
)
}12. Quick Reference Checklist
12. 快速参考检查清单
When implementing logging/observability:
- Use structured JSON logging
- Include trace_id and span_id in all logs
- Set appropriate log levels (don't over-log)
- Never log passwords, keys, tokens, PII
- Add contextual fields (user_id, request_id, etc.)
- Implement log rotation to prevent disk overflow
- Include stack traces for errors
- Log entry/exit for important functions
- Track execution time for performance monitoring
- Sample high-volume logs to prevent storage/bandwidth issues
- Use existing libraries (structlog, pino, zap, etc.)
- Set up log aggregation (ELK, Loki, Datadog, etc.)
- Create alerting rules for critical errors
- Document logging patterns in team guidelines
- Review logs regularly to spot issues early
Activate this skill when: working with logging systems, distributed tracing, debugging, monitoring, performance analysis, or observability-related tasks.
Combine with: development-philosophy (fail-fast debugging), security-first-design (never log secrets), testing-workflow (use logs to verify behavior).
实现日志/可观测性时:
- 使用结构化JSON日志
- 所有日志中包含trace_id和span_id
- 设置合适的日志级别(不要过度记录)
- 绝不记录密码、密钥、令牌、个人可识别信息(PII)
- 添加上下文字段(user_id、request_id等)
- 实现日志轮转以防止磁盘溢出
- 错误日志中包含堆栈跟踪
- 重要函数记录入口/出口
- 追踪执行时间以监控性能
- 对高流量日志进行采样以避免存储/带宽问题
- 使用现有成熟库(如structlog、pino、zap等)
- 搭建日志聚合系统(如ELK、Loki、Datadog等)
- 为关键错误设置告警规则
- 在团队指南中记录日志模式
- 定期查看日志以提前发现问题
激活本技能场景: 处理日志系统、分布式追踪、调试、监控、性能分析或可观测性相关任务时。
可结合技能: 开发理念(快速失败调试)、安全优先设计(绝不记录机密)、测试工作流(使用日志验证行为)。