Logging & Observability Skill
日志与可观测性技能 Activate when working with logging systems, distributed tracing, debugging, monitoring, or any observability-related tasks across applications.
在处理跨应用的日志系统、分布式追踪、调试、监控或任何与可观测性相关的任务时启用本技能。
1. Logging Best Practices
1. 日志最佳实践 Use appropriate log levels for different severity:
Level Severity When to Use DEBUG Low Development only - detailed info, variable states, control flow. Use sparingly in production. INFO Low Important application lifecycle events - startup, shutdown, config loaded, user actions, key state changes. WARN Medium Recoverable issues - deprecated usage, resource constraints, unexpected but handled conditions. Investigate later. ERROR High Unrecoverable problems - exceptions, failed operations, missing required data. Requires immediate attention. FATAL Critical System-level failures - abort conditions, out of memory, unrecoverable state. System may crash.
为不同严重程度使用合适的日志级别:
级别 严重程度 使用场景 DEBUG 低 仅用于开发环境 - 详细信息、变量状态、控制流。生产环境中慎用。 INFO 低 重要的应用生命周期事件 - 启动、关闭、配置加载、用户操作、关键状态变更。 WARN 中 可恢复的问题 - 已弃用用法、资源限制、意外但已处理的情况。可稍后调查。 ERROR 高 不可恢复的问题 - 异常、失败的操作、缺失的必要数据。需要立即关注。 FATAL 关键 系统级故障 - 终止条件、内存不足、不可恢复状态。系统可能崩溃。
Actionable : Logs should help diagnose problems, not just record events
Contextual : Include enough context to understand what happened without code inspection
Consistent : Use same terminology across codebase for same events
Sparse : Don't log everything - unnecessary noise obscures real issues
Sampling : In high-volume scenarios, sample logs (10%, 1%, etc.) rather than logging everything
Structured : Always use structured format (JSON) for programmatic parsing
可行动性 :日志应有助于诊断问题,而非仅记录事件
上下文完整 :包含足够上下文,无需查看代码即可了解事件情况
一致性 :代码库中对同一事件使用统一术语
精简性 :不要记录所有内容,不必要的噪音会掩盖真实问题
采样 :在高流量场景下,对日志进行采样(如10%、1%等),而非记录全部日志
结构化 :始终使用结构化格式(JSON)以便程序解析
2. Structured Logging Format
2. 结构化日志格式 Every log entry should include:
json
{
"timestamp" : "2025-11-17T10:30:45.123Z" ,
"level" : "ERROR" ,
"message" : "Failed to process user request" ,
"service" : "auth-service" ,
"version" : "1.2.3" ,
"environment" : "production" ,
"trace_id" : "4bf92f3577b34da6a3ce929d0e0e4736" ,
"span_id" : "00f067aa0ba902b7" ,
"parent_span_id" : "0af7651916cd43dd" ,
"user_id" : "user-12345" ,
"request_id" : "req-98765" ,
"path" : "/api/users/authenticate" ,
"method" : "POST" ,
"status_code" : 500 ,
"error" : {
"type" : "InvalidCredentialsError" ,
"message" : "Provided credentials do not match" ,
"stack" : "Error: InvalidCredentialsError..." ,
"code" : "AUTH_INVALID_CREDS"
} ,
"context" : {
"ip_address" : "192.168.1.100" ,
"user_agent" : "Mozilla/5.0..." ,
"attempt_number" : 3 ,
"rate_limit_remaining" : 2
} ,
"duration_ms" : 245 ,
"custom_field" : "custom_value"
} 每条日志条目应包含以下内容:
json
{
"timestamp" : "2025-11-17T10:30:45.123Z" ,
"level" : "ERROR" ,
"message" : "Failed to process user request" ,
"service" : "auth-service" ,
"version" : "1.2.3" ,
"environment" : "production" ,
"trace_id" : "4bf92f3577b34da6a3ce929d0e0e4736" ,
"span_id" : "00f067aa0ba902b7" ,
"parent_span_id" : "0af7651916cd43dd" ,
"user_id" : "user-12345" ,
"request_id" : "req-98765" ,
"path" : "/api/users/authenticate" ,
"method" : "POST" ,
"status_code" : 500 ,
"error" : {
"type" : "InvalidCredentialsError" ,
"message" : "Provided credentials do not match" ,
"stack" : "Error: InvalidCredentialsError..." ,
"code" : "AUTH_INVALID_CREDS"
} ,
"context" : {
"ip_address" : "192.168.1.100" ,
"user_agent" : "Mozilla/5.0..." ,
"attempt_number" : 3 ,
"rate_limit_remaining" : 2
} ,
"duration_ms" : 245 ,
"custom_field" : "custom_value"
}
Required vs Optional Fields
必填与可选字段 Always include:
timestamp
level
message
trace_id
service
environment
When applicable:
span_id / parent_span_id (distributed tracing)
user_id (any user action)
request_id (any request)
error (on ERROR/FATAL)
duration_ms (operations)
context (relevant metadata)
必须包含:
timestamp
level
message
trace_id
service
environment
适用时包含:
span_id / parent_span_id(分布式追踪)
user_id(任何用户操作)
request_id(任何请求)
error(ERROR/FATAL级别时)
duration_ms(耗时操作)
context(相关元数据)
Application Lifecycle
应用生命周期 json
// Startup
{ "timestamp" : "..." , "level" : "INFO" , "message" : "Service starting" , "service" : "auth-service" , "version" : "1.2.3" }
// Configuration loaded
{ "timestamp" : "..." , "level" : "INFO" , "message" : "Configuration loaded" , "config_source" : "environment" , "environment" : "production" }
// Database connection established
{ "timestamp" : "..." , "level" : "INFO" , "message" : "Database connected" , "host" : "db.internal" , "pool_size" : 20 }
// Shutdown
{ "timestamp" : "..." , "level" : "INFO" , "message" : "Service shutting down" , "reason" : "SIGTERM" , "uptime_seconds" : 3600 } json
// 启动
{ "timestamp" : "..." , "level" : "INFO" , "message" : "服务启动中" , "service" : "auth-service" , "version" : "1.2.3" }
// 配置加载完成
{ "timestamp" : "..." , "level" : "INFO" , "message" : "配置已加载" , "config_source" : "environment" , "environment" : "production" }
// 数据库连接建立
{ "timestamp" : "..." , "level" : "INFO" , "message" : "数据库已连接" , "host" : "db.internal" , "pool_size" : 20 }
// 关闭
{ "timestamp" : "..." , "level" : "INFO" , "message" : "服务正在关闭" , "reason" : "SIGTERM" , "uptime_seconds" : 3600 } json
// Login attempt
{ "timestamp" : "..." , "level" : "INFO" , "message" : "User login attempt" , "user_id" : "user-123" , "method" : "password" }
// Data modification
{ "timestamp" : "..." , "level" : "INFO" , "message" : "User updated profile" , "user_id" : "user-123" , "fields_changed" : [ "email" , "name" ] }
// Permission check
{ "timestamp" : "..." , "level" : "INFO" , "message" : "Permission check" , "user_id" : "user-123" , "resource" : "report-456" , "permission" : "read" , "granted" : true } json
// 登录尝试
{ "timestamp" : "..." , "level" : "INFO" , "message" : "用户登录尝试" , "user_id" : "user-123" , "method" : "password" }
// 数据修改
{ "timestamp" : "..." , "level" : "INFO" , "message" : "用户更新了个人资料" , "user_id" : "user-123" , "fields_changed" : [ "email" , "name" ] }
// 权限检查
{ "timestamp" : "..." , "level" : "INFO" , "message" : "权限检查" , "user_id" : "user-123" , "resource" : "report-456" , "permission" : "read" , "granted" : true }
External API Calls
外部API调用 json
// API call started
{ "timestamp" : "..." , "level" : "DEBUG" , "message" : "External API call" , "service" : "my-service" , "api" : "stripe" , "endpoint" : "/charges" , "method" : "POST" }
// API response
{ "timestamp" : "..." , "level" : "DEBUG" , "message" : "API response received" , "api" : "stripe" , "endpoint" : "/charges" , "status_code" : 200 , "duration_ms" : 145 }
// API error
{ "timestamp" : "..." , "level" : "WARN" , "message" : "External API error" , "api" : "stripe" , "status_code" : 429 , "error" : "rate_limit_exceeded" , "retry_after_seconds" : 60 } json
// API调用开始
{ "timestamp" : "..." , "level" : "DEBUG" , "message" : "外部API调用" , "service" : "my-service" , "api" : "stripe" , "endpoint" : "/charges" , "method" : "POST" }
// API响应
{ "timestamp" : "..." , "level" : "DEBUG" , "message" : "收到API响应" , "api" : "stripe" , "endpoint" : "/charges" , "status_code" : 200 , "duration_ms" : 145 }
// API错误
{ "timestamp" : "..." , "level" : "WARN" , "message" : "外部API错误" , "api" : "stripe" , "status_code" : 429 , "error" : "rate_limit_exceeded" , "retry_after_seconds" : 60 }
Errors and Exceptions
错误与异常 json
{
"timestamp" : "..." ,
"level" : "ERROR" ,
"message" : "Payment processing failed" ,
"service" : "payment-service" ,
"user_id" : "user-456" ,
"error" : {
"type" : "PaymentGatewayError" ,
"message" : "Connection timeout" ,
"code" : "GATEWAY_TIMEOUT" ,
"stack" : "PaymentGatewayError: Connection timeout\n at processPayment (payment.ts:45)\n at ..."
} ,
"context" : {
"amount" : 9999 ,
"currency" : "USD" ,
"gateway" : "stripe"
}
} json
{
"timestamp" : "..." ,
"level" : "ERROR" ,
"message" : "支付处理失败" ,
"service" : "payment-service" ,
"user_id" : "user-456" ,
"error" : {
"type" : "PaymentGatewayError" ,
"message" : "连接超时" ,
"code" : "GATEWAY_TIMEOUT" ,
"stack" : "PaymentGatewayError: Connection timeout\n at processPayment (payment.ts:45)\n at ..."
} ,
"context" : {
"amount" : 9999 ,
"currency" : "USD" ,
"gateway" : "stripe"
}
} json
// Slow operation
{ "timestamp" : "..." , "level" : "WARN" , "message" : "Slow query detected" , "duration_ms" : 5234 , "threshold_ms" : 1000 , "query" : "SELECT * FROM orders WHERE..." }
// Resource usage
{ "timestamp" : "..." , "level" : "INFO" , "message" : "Memory usage high" , "memory_used_mb" : 2048 , "memory_limit_mb" : 2560 , "percentage" : 80 }
// Cache statistics
{ "timestamp" : "..." , "level" : "DEBUG" , "message" : "Cache stats" , "cache_hits" : 4521 , "cache_misses" : 234 , "hit_rate" : 0.95 } json
// 慢操作
{ "timestamp" : "..." , "level" : "WARN" , "message" : "检测到慢查询" , "duration_ms" : 5234 , "threshold_ms" : 1000 , "query" : "SELECT * FROM orders WHERE..." }
// 资源使用情况
{ "timestamp" : "..." , "level" : "INFO" , "message" : "内存使用率过高" , "memory_used_mb" : 2048 , "memory_limit_mb" : 2560 , "percentage" : 80 }
// 缓存统计
{ "timestamp" : "..." , "level" : "DEBUG" , "message" : "缓存统计" , "cache_hits" : 4521 , "cache_misses" : 234 , "hit_rate" : 0.95 }
4. What NOT to Log
4. 不应记录的内容 NEVER log:
Passwords or authentication tokens
API keys or secrets
Private keys or certificates
Database credentials
OAuth tokens or refresh tokens
Credit card numbers
Social security numbers
Email addresses (without redaction in logs)
Personal identification numbers
Medical records
Raw HTTP request/response bodies (especially with auth headers)
Be careful with:
PII in general (name, phone, address) - redact or use anonymized IDs
Query parameters (may contain secrets)
Request/response headers (often contain authorization)
User input (may contain sensitive data)
Security rule: When in doubt, DON'T log it
绝对禁止记录:
密码或认证令牌
API密钥或机密信息
私钥或证书
数据库凭证
OAuth令牌或刷新令牌
信用卡号
社会保障号
电子邮件地址(日志中未脱敏时)
个人识别号码
医疗记录
原始HTTP请求/响应体(尤其是包含认证头的情况)
需谨慎处理:
一般个人可识别信息(PII,如姓名、电话、地址)- 脱敏或使用匿名ID
查询参数(可能包含机密信息)
请求/响应头(通常包含授权信息)
用户输入(可能包含敏感数据)
安全规则:存疑时,请勿记录
BAD - logging credentials
错误示例 - 记录凭证 logger.info(f"Login attempt for {username} with password {password}")
logger.info(f"Login attempt for {username} with password {password}")
GOOD - logging action without sensitive data
正确示例 - 记录操作但不包含敏感数据 logger.info("Login attempt", extra={"username": username, "method": "password"})
logger.info("Login attempt", extra={"username": username, "method": "password"})
BAD - logging full request with auth header
错误示例 - 记录包含认证头的完整请求 logger.debug(f"Request: {request.headers}")
logger.debug(f"Request: {request.headers}")
GOOD - logging request metadata
正确示例 - 记录请求元数据 logger.debug("Incoming request", extra={
"method": request.method,
"path": request.path,
"user_agent": request.headers.get('user-agent')
})
logger.debug("Incoming request", extra={
"method": request.method,
"path": request.path,
"user_agent": request.headers.get('user-agent')
})
5. Distributed Tracing
5. 分布式追踪
Trace IDs and Span IDs
Trace ID与Span ID
Trace ID : Unique identifier for entire request flow across services
Span ID : Unique identifier for single operation/service call
Parent Span ID : Span that initiated current span (for tracing parent-child relationships)
Generated once at entry point, propagated through all downstream calls:
Request → [Service A, Trace: abc123]
├─ [Span: span1] Database query
├─ [Span: span2] → Service B, parent: span2
└─ [Span: span3] Cache lookup
└─ [Span: span4] External API call
Trace ID :跨服务的整个请求流程的唯一标识符
Span ID :单个操作/服务调用的唯一标识符
Parent Span ID :发起当前Span的父Span(用于追踪父子关系)
在入口点生成一次,通过所有下游调用传播:
请求 → [服务A, Trace: abc123]
├─ [Span: span1] 数据库查询
├─ [Span: span2] → 服务B, parent: span2
└─ [Span: span3] 缓存查找
└─ [Span: span4] 外部API调用
Python example with trace context
带追踪上下文的Python示例 import uuid
class RequestContext:
def init (self, trace_id=None, span_id=None, parent_span_id=None):
self.trace_id = trace_id or str(uuid.uuid4())
self.span_id = span_id or str(uuid.uuid4())
self.parent_span_id = parent_span_id
import uuid
class RequestContext:
def init (self, trace_id=None, span_id=None, parent_span_id=None):
self.trace_id = trace_id or str(uuid.uuid4())
self.span_id = span_id or str(uuid.uuid4())
self.parent_span_id = parent_span_id
Middleware/decorator
中间件/装饰器 def trace_request(func):
def wrapper(*args, **kwargs):
ctx = RequestContext()
return func(*args, context=ctx, **kwargs)
return wrapper
def trace_request(func):
def wrapper(*args, **kwargs):
ctx = RequestContext()
return func(*args, context=ctx, **kwargs)
return wrapper
Propagate to downstream services
向下游服务传播上下文 def call_downstream_service(service_url, data, context):
headers = {
'X-Trace-ID': context.trace_id,
'X-Span-ID': context.span_id,
'X-Parent-Span-ID': context.span_id # Current becomes parent
}
response = requests.post(service_url, json=data, headers=headers)
return response
def call_downstream_service(service_url, data, context):
headers = {
'X-Trace-ID': context.trace_id,
'X-Span-ID': context.span_id,
'X-Parent-Span-ID': context.span_id # 当前Span成为父Span
}
response = requests.post(service_url, json=data, headers=headers)
return response
No sampling : Log all traces (high volume services may be expensive)
Rate sampling : Log every Nth request (e.g., 1 in 100)
Adaptive sampling : Sample based on error rate, latency, or traffic volume
Tail sampling : Sample based on trace outcome (errors always sampled)
无采样 :记录所有追踪(高流量服务可能成本高昂)
比率采样 :每N个请求记录一次(如每100个请求记录1个)
自适应采样 :基于错误率、延迟或流量采样
尾部采样 :基于追踪结果采样(错误请求始终采样)
Adaptive sampling example
自适应采样示例 def should_sample(trace):
# Always sample errors
if trace.has_error:
return True
# Sample slow requests (>1s)
if trace.duration_ms > 1000:
return True
# Sample 1% of normal requests
return random.random() < 0.01
def should_sample(trace):
# 错误请求始终采样
if trace.has_error:
return True
# 慢请求(>1秒)采样
if trace.duration_ms > 1000:
return True
# 正常请求采样1%
return random.random() < 0.01
6. Performance Logging
6. 性能日志 python
import time
def log_execution_time ( func ) :
def wrapper ( * args , ** kwargs ) :
start = time . time ( )
try :
result = func ( * args , ** kwargs )
duration_ms = ( time . time ( ) - start ) * 1000
logger . info ( f" { func . __name__ } completed" , extra = {
"duration_ms" : duration_ms ,
"status" : "success"
} )
return result
except Exception as e :
duration_ms = ( time . time ( ) - start ) * 1000
logger . error ( f" { func . __name__ } failed" , extra = {
"duration_ms" : duration_ms ,
"error" : str ( e )
} )
raise
return wrapper python
import time
def log_execution_time ( func ) :
def wrapper ( * args , ** kwargs ) :
start = time . time ( )
try :
result = func ( * args , ** kwargs )
duration_ms = ( time . time ( ) - start ) * 1000
logger . info ( f" { func . __name__ } 执行完成" , extra = {
"duration_ms" : duration_ms ,
"status" : "success"
} )
return result
except Exception as e :
duration_ms = ( time . time ( ) - start ) * 1000
logger . error ( f" { func . __name__ } 执行失败" , extra = {
"duration_ms" : duration_ms ,
"error" : str ( e )
} )
raise
return wrapper python
import psutil
import os
def log_resource_usage ( ) :
process = psutil . Process ( os . getpid ( ) )
memory = process . memory_info ( )
logger . info ( "Resource usage" , extra = {
"memory_rss_mb" : memory . rss / 1024 / 1024 ,
"memory_vms_mb" : memory . vms / 1024 / 1024 ,
"cpu_percent" : process . cpu_percent ( interval = 1 ) ,
"num_threads" : process . num_threads ( )
} ) python
import psutil
import os
def log_resource_usage ( ) :
process = psutil . Process ( os . getpid ( ) )
memory = process . memory_info ( )
logger . info ( "资源使用情况" , extra = {
"memory_rss_mb" : memory . rss / 1024 / 1024 ,
"memory_vms_mb" : memory . vms / 1024 / 1024 ,
"cpu_percent" : process . cpu_percent ( interval = 1 ) ,
"num_threads" : process . num_threads ( )
} )
Track database query performance
追踪数据库查询性能 SLOW_QUERY_THRESHOLD_MS = 1000
def execute_query(query, params):
start = time.time()
cursor.execute(query, params)
duration_ms = (time.time() - start) * 1000
if duration_ms > SLOW_QUERY_THRESHOLD_MS:
logger.warn("Slow query detected", extra={
"query": query,
"params_count": len(params),
"duration_ms": duration_ms,
"threshold_ms": SLOW_QUERY_THRESHOLD_MS
})
return cursor.fetchall()
SLOW_QUERY_THRESHOLD_MS = 1000
def execute_query(query, params):
start = time.time()
cursor.execute(query, params)
duration_ms = (time.time() - start) * 1000
if duration_ms > SLOW_QUERY_THRESHOLD_MS:
logger.warn("检测到慢查询", extra={
"query": query,
"params_count": len(params),
"duration_ms": duration_ms,
"threshold_ms": SLOW_QUERY_THRESHOLD_MS
})
return cursor.fetchall()
7. Debugging Patterns
7. 调试模式 Use DEBUG level for development/troubleshooting only:
python
logger . debug ( "Function entry" , extra = {
"function" : "process_payment" ,
"args" : { "amount" : 100 , "currency" : "USD" }
} )
logger . debug ( "Intermediate state" , extra = {
"processing_step" : "validation" ,
"validation_passed" : True ,
"timestamp" : time . time ( )
} )
logger . debug ( "Function exit" , extra = {
"function" : "process_payment" ,
"return_value" : { "transaction_id" : "txn-123" , "status" : "pending" }
} ) 仅在开发/故障排查时使用DEBUG级别:
python
logger . debug ( "函数入口" , extra = {
"function" : "process_payment" ,
"args" : { "amount" : 100 , "currency" : "USD" }
} )
logger . debug ( "中间状态" , extra = {
"processing_step" : "validation" ,
"validation_passed" : True ,
"timestamp" : time . time ( )
} )
logger . debug ( "函数出口" , extra = {
"function" : "process_payment" ,
"return_value" : { "transaction_id" : "txn-123" , "status" : "pending" }
} )
Conditional Breakpoints
条件断点 In IDE debugger (VS Code, PyCharm, etc.):
在IDE调试器(VS Code、PyCharm等)中使用:
Set breakpoint with condition
设置带条件的断点
Debugger pauses only when condition is true
仅当条件为真时调试器暂停 if user_id == "debug-user-123": # Breakpoint here with condition: amount > 1000
processor.process(order)
if user_id == "debug-user-123": # 在此处设置断点,条件为:amount > 1000
processor.process(order)
Start remote debugger (debugpy)
启动远程调试器(debugpy) import debugpy
debugpy.listen(("0.0.0.0", 5678))
print("Debugger attached, waiting for connection...")
debugpy.wait_for_client()
import debugpy
debugpy.listen(("0.0.0.0", 5678))
print("调试器已启动,等待连接...")
debugpy.wait_for_client()
Then connect from IDE on same port
随后从IDE通过相同端口连接
Log Aggregation for Debugging
用于调试的日志聚合
Retrieve logs for specific trace
获取特定trace的日志 def get_trace_logs(trace_id):
query = f"SELECT * FROM logs WHERE trace_id = '{trace_id}' ORDER BY timestamp"
# Execute against log storage (ELK, Loki, etc.)
return results
def get_trace_logs(trace_id):
query = f"SELECT * FROM logs WHERE trace_id = '{trace_id}' ORDER BY timestamp"
# 针对日志存储执行查询(如ELK、Loki等)
return results
Filter by user for debugging user issues
按用户过滤日志以排查用户问题 def get_user_logs(user_id, hours=1):
query = f"SELECT * FROM logs WHERE user_id = '{user_id}' AND timestamp > now() - {hours}h"
return results
def get_user_logs(user_id, hours=1):
query = f"SELECT * FROM logs WHERE user_id = '{user_id}' AND timestamp > now() - {hours}h"
return results
Prevent unbounded disk usage:
Python logging with rotation
带轮转的Python日志配置 from logging.handlers import RotatingFileHandler
handler = RotatingFileHandler(
filename='app.log',
maxBytes=10485760, # 10MB
backupCount=5 # Keep 5 rotated files
)
from logging.handlers import RotatingFileHandler
handler = RotatingFileHandler(
filename='app.log',
maxBytes=10485760, # 10MB
backupCount=5 # 保留5个轮转文件
)
Backup naming: app.log, app.log.1, app.log.2, etc.
备份文件命名:app.log, app.log.1, app.log.2, 等 json
{
"retention_policy" : {
"DEBUG" : "7 days" ,
"INFO" : "30 days" ,
"WARN" : "90 days" ,
"ERROR" : "1 year" ,
"FATAL" : "indefinite"
}
} json
{
"retention_policy" : {
"DEBUG" : "7天" ,
"INFO" : "30天" ,
"WARN" : "90天" ,
"ERROR" : "1年" ,
"FATAL" : "永久"
}
}
Log Aggregation Tools
日志聚合工具 Tool Best For Strengths ELK Stack (Elasticsearch, Logstash, Kibana)On-premise, complex queries Powerful search, rich dashboards, customizable Grafana Loki Simple log aggregation, cost-effective Low overhead, integrates with Prometheus Datadog Cloud-first, all-in-one Agent-based, excellent integrations Splunk Enterprise, security focus Powerful search, alerting, compliance reports CloudWatch AWS native Seamless AWS integration, log groups Stackdriver GCP native Google Cloud integration CloudLogging Azure native Microsoft ecosystem
工具 适用场景 优势 ELK Stack (Elasticsearch, Logstash, Kibana)本地部署、复杂查询 强大的搜索功能、丰富的仪表盘、高度可定制 Grafana Loki 简单日志聚合、高性价比 低资源开销、与Prometheus集成 Datadog 云原生、一体化平台 基于Agent、集成能力出色 Splunk 企业级、安全聚焦 强大的搜索、告警、合规报告 CloudWatch AWS原生 与AWS无缝集成、日志组管理 Stackdriver GCP原生 与Google Cloud集成 CloudLogging Azure原生 微软生态系统集成
9. Metrics and Monitoring
9. 指标与监控 python
from prometheus_client import Counter , Histogram , Gauge python
from prometheus_client import Counter , Histogram , Gauge
Counter: monotonically increasing
Counter: 单调递增 login_attempts = Counter('login_attempts_total', 'Total login attempts', ['status'])
login_attempts.labels(status='success').inc()
login_attempts = Counter('login_attempts_total', 'Total login attempts', ['status'])
login_attempts.labels(status='success').inc()
Histogram: observe value distribution
Histogram: 观测值分布 request_duration = Histogram('request_duration_seconds', 'Request duration')
request_duration.observe(0.5)
request_duration = Histogram('request_duration_seconds', 'Request duration')
request_duration.observe(0.5)
Gauge: can go up or down
Gauge: 可增可减 active_connections = Gauge('active_connections', 'Current active connections')
active_connections.set(42)
active_connections = Gauge('active_connections', 'Current active connections')
active_connections.set(42)
CPU, memory, disk usage
CPU、内存、磁盘使用率 cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
logger.info("System metrics", extra={
"cpu_percent": cpu_percent,
"memory_percent": memory.percent,
"disk_percent": disk.percent
})
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
logger.info("系统指标", extra={
"cpu_percent": cpu_percent,
"memory_percent": memory.percent,
"disk_percent": disk.percent
})
Prometheus alert rules
Prometheus告警规则 alert: HighErrorRate
expr: rate(requests_total{status="500"}[5m]) > 0.05
for: 5m
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }} for {{ $labels.service }}"
alert: SlowRequestLatency
expr: histogram_quantile(0.95, request_duration_seconds) > 1
for: 10m
annotations:
summary: "Slow requests detected (p95 > 1s)"
alert: HighErrorRate
expr: rate(requests_total{status="500"}[5m]) > 0.05
for: 5m
annotations:
summary: "检测到高错误率"
description: "{{ $labels.service }}的错误率为{{ $value | humanizePercentage }}"
alert: SlowRequestLatency
expr: histogram_quantile(0.95, request_duration_seconds) > 1
for: 10m
annotations:
summary: "检测到慢请求(p95 > 1秒)"
10. Common Libraries by Language
10. 各语言常用库
Standard library logging
标准库logging
Structured logging with structlog
结构化日志structlog import structlog
logger = structlog.get_logger()
logger.info("user_created", user_id="u123", email_domain="example.com")
import structlog
logger = structlog.get_logger()
logger.info("user_created", user_id="u123", email_domain="example.com")
For advanced tracing
高级追踪库 from opentelemetry import trace, logging
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
**Libraries:**
- `logging` - Built-in, basic structured support
- `structlog` - Structured logging, cleaner API
- `python-json-logger` - JSON formatter for standard logging
- `OpenTelemetry` - Distributed tracing standard
- `Jaeger` - Distributed tracing backend from opentelemetry import trace, logging
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
**常用库:**
- `logging` - 内置库,基础结构化支持
- `structlog` - 结构化日志,更简洁的API
- `python-json-logger` - 为标准logging提供JSON格式化
- `OpenTelemetry` - 分布式追踪标准
- `Jaeger` - 分布式追踪后端
Node.js / TypeScript
Node.js / TypeScript javascript
// Winston
const winston = require ( 'winston' ) ;
const logger = winston . createLogger ( {
format : winston . format . json ( ) ,
transports : [ new winston . transports . Console ( ) ]
} ) ;
logger . info ( 'User logged in' , { userId : 'u123' } ) ;
// Pino (lightweight)
const pino = require ( 'pino' ) ;
const logger = pino ( ) ;
logger . info ( { userId : 'u123' } , 'User logged in' ) ;
Libraries:
- Full-featured, very popular
- Lightweight, high performance
- JSON logging, stream-based
- HTTP request logger for Express
- Distributed tracing
- Standard tracing API
javascript
// Winston
const winston = require ( 'winston' ) ;
const logger = winston . createLogger ( {
format : winston . format . json ( ) ,
transports : [ new winston . transports . Console ( ) ]
} ) ;
logger . info ( 'User logged in' , { userId : 'u123' } ) ;
// Pino(轻量级)
const pino = require ( 'pino' ) ;
const logger = pino ( ) ;
logger . info ( { userId : 'u123' } , 'User logged in' ) ;
常用库:
- 功能完整,非常流行
- 轻量级,高性能
- JSON日志,基于流
- Express的HTTP请求日志器
- 分布式追踪
- 标准追踪API
go
// Structured logging with zap
import "go.uber.org/zap"
logger , _ := zap . NewProduction ( )
defer logger . Sync ( )
logger . Info ( "user login" ,
zap . String ( "user_id" , "u123" ) ,
zap . Duration ( "duration" , time . Second ) ,
)
// Or logrus (JSON support)
import "github.com/sirupsen/logrus"
logger := logrus . New ( )
logger . SetFormatter ( & logrus . JSONFormatter { } )
logger . WithFields ( logrus . Fields { "user_id" : "u123" } ) . Info ( "Login" )
Libraries:
- High performance, structured
- Popular, JSON output
- Standard library (Go 1.21+)
- Distributed tracing
go
// 结构化日志zap
import "go.uber.org/zap"
logger , _ := zap . NewProduction ( )
defer logger . Sync ( )
logger . Info ( "user login" ,
zap . String ( "user_id" , "u123" ) ,
zap . Duration ( "duration" , time . Second ) ,
)
// 或logrus(支持JSON)
import "github.com/sirupsen/logrus"
logger := logrus . New ( )
logger . SetFormatter ( & logrus . JSONFormatter { } )
logger . WithFields ( logrus . Fields { "user_id" : "u123" } ) . Info ( "Login" )
常用库:
- 高性能,结构化
- 流行,支持JSON输出
- 标准库(Go 1.21+)
- 分布式追踪
Java / Kotlin
Java / Kotlin java
// Logback with SLF4J
import org . slf4j . Logger ;
import org . slf4j . LoggerFactory ;
import net . logstash . logback . marker . Markers ;
Logger logger = LoggerFactory . getLogger ( MyClass . class ) ;
// Structured with logback-json-encoder
logger . info ( Markers . append ( "user_id" , "u123" ) , "User logged in" ) ;
// Spring Boot with logback (built-in)
@RestController
public class UserController {
private static final Logger logger = LoggerFactory . getLogger ( UserController . class ) ;
}
Libraries:
+ - Standard combo
- Enterprise feature-rich
- Structured output
- Distributed tracing
java
// Logback搭配SLF4J
import org . slf4j . Logger ;
import org . slf4j . LoggerFactory ;
import net . logstash . logback . marker . Markers ;
Logger logger = LoggerFactory . getLogger ( MyClass . class ) ;
// 结合logback-json-encoder实现结构化日志
logger . info ( Markers . append ( "user_id" , "u123" ) , "User logged in" ) ;
// Spring Boot搭配logback(内置)
@RestController
public class UserController {
private static final Logger logger = LoggerFactory . getLogger ( UserController . class ) ;
}
常用库:
+ - 标准组合
- 企业级功能丰富
- 结构化输出
- 分布式追踪
csharp
// Serilog (structured)
using Serilog ;
Log . Logger = new LoggerConfiguration ( )
. WriteTo . Console ( )
. CreateLogger ( ) ;
Log . Information ( "User {UserId} logged in" , "u123" ) ;
// Built-in ILogger with dependency injection
public class UserService {
private readonly ILogger < UserService > _logger ;
public UserService ( ILogger < UserService > logger ) {
_logger = logger ;
}
}
Libraries:
- Excellent structured support
- Enterprise logging
- Classic Apache Log4j port
Microsoft.Extensions.Logging - Built-in DI support
OpenTelemetry.Exporter.Console - Tracing
csharp
// Serilog(结构化)
using Serilog ;
Log . Logger = new LoggerConfiguration ( )
. WriteTo . Console ( )
. CreateLogger ( ) ;
Log . Information ( "User {UserId} logged in" , "u123" ) ;
// 内置ILogger搭配依赖注入
public class UserService {
private readonly ILogger < UserService > _logger ;
public UserService ( ILogger < UserService > logger ) {
_logger = logger ;
}
}
常用库:
- 出色的结构化支持
- 企业级日志库
- 经典Apache Log4j移植版
Microsoft.Extensions.Logging - 内置依赖注入支持
OpenTelemetry.Exporter.Console - 追踪库
11. Example Patterns
11. 示例模式
Complete Request Logging Pipeline (Python)
完整请求日志流水线(Python) python
from datetime import datetime
from uuid import uuid4
import json
import time
import structlog python
from datetime import datetime
from uuid import uuid4
import json
import time
import structlog
import sys
Configure structlog
配置structlog structlog.configure(
processors=[
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(file=sys.stdout),
)
class RequestLogger:
def init (self):
self.logger = structlog.get_logger()
def log_request_start(self, request):
trace_id = request.headers.get('X-Trace-ID') or str(uuid4())
span_id = str(uuid4())
self.logger.info(
"request_started",
trace_id=trace_id,
span_id=span_id,
method=request.method,
path=request.path,
user_id=request.user_id,
)
return trace_id, span_id
def log_request_complete(self, trace_id, span_id, status, duration_ms):
level = "info" if status < 400 else "warn" if status < 500 else "error"
self.logger.log(
level,
"request_completed",
trace_id=trace_id,
span_id=span_id,
status_code=status,
duration_ms=duration_ms,
)
def log_error(self, trace_id, span_id, error, context=None):
self.logger.error(
"request_error",
trace_id=trace_id,
span_id=span_id,
error_type=type(error).__name__,
error_message=str(error),
error_context=context or {},
) structlog.configure(
processors=[
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(file=sys.stdout),
)
class RequestLogger:
def init (self):
self.logger = structlog.get_logger()
def log_request_start(self, request):
trace_id = request.headers.get('X-Trace-ID') or str(uuid4())
span_id = str(uuid4())
self.logger.info(
"request_started",
trace_id=trace_id,
span_id=span_id,
method=request.method,
path=request.path,
user_id=request.user_id,
)
return trace_id, span_id
def log_request_complete(self, trace_id, span_id, status, duration_ms):
level = "info" if status < 400 else "warn" if status < 500 else "error"
self.logger.log(
level,
"request_completed",
trace_id=trace_id,
span_id=span_id,
status_code=status,
duration_ms=duration_ms,
)
def log_error(self, trace_id, span_id, error, context=None):
self.logger.error(
"request_error",
trace_id=trace_id,
span_id=span_id,
error_type=type(error).__name__,
error_message=str(error),
error_context=context or {},
) app = Flask(name )
req_logger = RequestLogger()
@app.before_request
def before_request():
request.trace_id, request.span_id = req_logger.log_request_start(request)
request.start_time = time.time()
@app.after_request
def after_request(response):
duration_ms = (time.time() - request.start_time) * 1000
req_logger.log_request_complete(
request.trace_id,
request.span_id,
response.status_code,
duration_ms
)
return response
@app.errorhandler(Exception)
def handle_error(error):
req_logger.log_error(
request.trace_id,
request.span_id,
error,
context={"path": request.path}
)
return {"error": "Internal server error"}, 500
from flask import Flask, request
app = Flask(name )
req_logger = RequestLogger()
@app.before_request
def before_request():
request.trace_id, request.span_id = req_logger.log_request_start(request)
request.start_time = time.time()
@app.after_request
def after_request(response):
duration_ms = (time.time() - request.start_time) * 1000
req_logger.log_request_complete(
request.trace_id,
request.span_id,
response.status_code,
duration_ms
)
return response
@app.errorhandler(Exception)
def handle_error(error):
req_logger.log_error(
request.trace_id,
request.span_id,
error,
context={"path": request.path}
)
return {"error": "Internal server error"}, 500
Distributed Tracing Example (Node.js)
分布式追踪示例(Node.js) typescript
import { trace , context , SpanStatusCode } from '@opentelemetry/api' ;
import { NodeSDK } from '@opentelemetry/sdk-node' ;
import { JaegerExporter } from '@opentelemetry/exporter-jaeger-thrift' ;
const sdk = new NodeSDK ( {
traceExporter : new JaegerExporter ( {
host : process . env . JAEGER_HOST || 'localhost' ,
port : parseInt ( process . env . JAEGER_PORT || '6831' ) ,
} ) ,
} ) ;
sdk . start ( ) ;
const tracer = trace . getTracer ( 'my-service' ) ;
async function processPayment ( userId : string , amount : number ) {
const span = tracer . startSpan ( 'processPayment' , {
attributes : {
'user_id' : userId ,
'amount' : amount ,
'currency' : 'USD' ,
}
} ) ;
return context . with ( trace . setSpan ( context . active ( ) , span ) , async ( ) => {
try {
// Nested span
const validationSpan = tracer . startSpan ( 'validatePayment' ) ;
try {
await validatePayment ( userId , amount ) ;
validationSpan . setStatus ( { code : SpanStatusCode . OK } ) ;
} catch ( error ) {
validationSpan . recordException ( error ) ;
validationSpan . setStatus ( { code : SpanStatusCode . ERROR } ) ;
throw error ;
} finally {
validationSpan . end ( ) ;
}
// Call external service with trace propagation
const result = await callPaymentGateway ( amount ) ;
span . setStatus ( { code : SpanStatusCode . OK } ) ;
return result ;
} catch ( error ) {
span . recordException ( error ) ;
span . setStatus ( { code : SpanStatusCode . ERROR } ) ;
throw error ;
} finally {
span . end ( ) ;
}
} ) ;
} typescript
import { trace , context , SpanStatusCode } from '@opentelemetry/api' ;
import { NodeSDK } from '@opentelemetry/sdk-node' ;
import { JaegerExporter } from '@opentelemetry/exporter-jaeger-thrift' ;
const sdk = new NodeSDK ( {
traceExporter : new JaegerExporter ( {
host : process . env . JAEGER_HOST || 'localhost' ,
port : parseInt ( process . env . JAEGER_PORT || '6831' ) ,
} ) ,
} ) ;
sdk . start ( ) ;
const tracer = trace . getTracer ( 'my-service' ) ;
async function processPayment ( userId : string , amount : number ) {
const span = tracer . startSpan ( 'processPayment' , {
attributes : {
'user_id' : userId ,
'amount' : amount ,
'currency' : 'USD' ,
}
} ) ;
return context . with ( trace . setSpan ( context . active ( ) , span ) , async ( ) => {
try {
// 嵌套Span
const validationSpan = tracer . startSpan ( 'validatePayment' ) ;
try {
await validatePayment ( userId , amount ) ;
validationSpan . setStatus ( { code : SpanStatusCode . OK } ) ;
} catch ( error ) {
validationSpan . recordException ( error ) ;
validationSpan . setStatus ( { code : SpanStatusCode . ERROR } ) ;
throw error ;
} finally {
validationSpan . end ( ) ;
}
// 调用外部服务并传播追踪上下文
const result = await callPaymentGateway ( amount ) ;
span . setStatus ( { code : SpanStatusCode . OK } ) ;
return result ;
} catch ( error ) {
span . recordException ( error ) ;
span . setStatus ( { code : SpanStatusCode . ERROR } ) ;
throw error ;
} finally {
span . end ( ) ;
}
} ) ;
}
Security-Conscious Logging (Go)
安全合规日志(Go) go
package main
import (
"go.uber.org/zap"
"net/http"
)
// RedactSensitive removes sensitive fields from log data
func RedactSensitive ( data map [ string ] interface { } ) map [ string ] interface { } {
sensitiveKeys := [ ] string { "password" , "api_key" , "token" , "credit_card" , "ssn" }
for _ , key := range sensitiveKeys {
if _ , exists := data [ key ] ; exists {
data [ key ] = "[REDACTED]"
}
}
return data
}
func LogRequest ( logger * zap . Logger , r * http . Request ) {
// Extract safe headers only
safeHeaders := map [ string ] string {
"user-agent" : r . Header . Get ( "User-Agent" ) ,
"content-type" : r . Header . Get ( "Content-Type" ) ,
}
logger . Info ( "incoming request" ,
zap . String ( "method" , r . Method ) ,
zap . String ( "path" , r . URL . Path ) ,
zap . Any ( "headers" , safeHeaders ) ,
zap . String ( "remote_addr" , r . RemoteAddr ) ,
)
}
func LogError ( logger * zap . Logger , err error , context map [ string ] interface { } ) {
logger . Error ( "operation failed" ,
zap . Error ( err ) ,
zap . Any ( "context" , RedactSensitive ( context ) ) ,
)
} go
package main
import (
"go.uber.org/zap"
"net/http"
)
// RedactSensitive 从日志数据中移除敏感字段
func RedactSensitive ( data map [ string ] interface { } ) map [ string ] interface { } {
sensitiveKeys := [ ] string { "password" , "api_key" , "token" , "credit_card" , "ssn" }
for _ , key := range sensitiveKeys {
if _ , exists := data [ key ] ; exists {
data [ key ] = "[已脱敏]"
}
}
return data
}
func LogRequest ( logger * zap . Logger , r * http . Request ) {
// 仅提取安全的请求头
safeHeaders := map [ string ] string {
"user-agent" : r . Header . Get ( "User-Agent" ) ,
"content-type" : r . Header . Get ( "Content-Type" ) ,
}
logger . Info ( "收到请求" ,
zap . String ( "method" , r . Method ) ,
zap . String ( "path" , r . URL . Path ) ,
zap . Any ( "headers" , safeHeaders ) ,
zap . String ( "remote_addr" , r . RemoteAddr ) ,
)
}
func LogError ( logger * zap . Logger , err error , context map [ string ] interface { } ) {
logger . Error ( "操作失败" ,
zap . Error ( err ) ,
zap . Any ( "context" , RedactSensitive ( context ) ) ,
)
}
12. Quick Reference Checklist
12. 快速参考检查清单 When implementing logging/observability:
Use structured JSON logging
Include trace_id and span_id in all logs
Set appropriate log levels (don't over-log)
Never log passwords, keys, tokens, PII
Add contextual fields (user_id, request_id, etc.)
Implement log rotation to prevent disk overflow
Include stack traces for errors
Log entry/exit for important functions
Track execution time for performance monitoring
Sample high-volume logs to prevent storage/bandwidth issues
Use existing libraries (structlog, pino, zap, etc.)
Set up log aggregation (ELK, Loki, Datadog, etc.)
Create alerting rules for critical errors
Document logging patterns in team guidelines
Review logs regularly to spot issues early
Activate this skill when: working with logging systems, distributed tracing, debugging, monitoring, performance analysis, or observability-related tasks.
Combine with: development-philosophy (fail-fast debugging), security-first-design (never log secrets), testing-workflow (use logs to verify behavior).
实现日志/可观测性时:
激活本技能场景: 处理日志系统、分布式追踪、调试、监控、性能分析或可观测性相关任务时。
可结合技能: 开发理念(快速失败调试)、安全优先设计(绝不记录机密)、测试工作流(使用日志验证行为)。