logging-observability

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Logging & Observability Skill

日志与可观测性技能

Activate when working with logging systems, distributed tracing, debugging, monitoring, or any observability-related tasks across applications.
在处理跨应用的日志系统、分布式追踪、调试、监控或任何与可观测性相关的任务时启用本技能。

1. Logging Best Practices

1. 日志最佳实践

Log Levels

日志级别

Use appropriate log levels for different severity:
LevelSeverityWhen to Use
DEBUGLowDevelopment only - detailed info, variable states, control flow. Use sparingly in production.
INFOLowImportant application lifecycle events - startup, shutdown, config loaded, user actions, key state changes.
WARNMediumRecoverable issues - deprecated usage, resource constraints, unexpected but handled conditions. Investigate later.
ERRORHighUnrecoverable problems - exceptions, failed operations, missing required data. Requires immediate attention.
FATALCriticalSystem-level failures - abort conditions, out of memory, unrecoverable state. System may crash.
为不同严重程度使用合适的日志级别:
级别严重程度使用场景
DEBUG仅用于开发环境 - 详细信息、变量状态、控制流。生产环境中慎用。
INFO重要的应用生命周期事件 - 启动、关闭、配置加载、用户操作、关键状态变更。
WARN可恢复的问题 - 已弃用用法、资源限制、意外但已处理的情况。可稍后调查。
ERROR不可恢复的问题 - 异常、失败的操作、缺失的必要数据。需要立即关注。
FATAL关键系统级故障 - 终止条件、内存不足、不可恢复状态。系统可能崩溃。

General Principles

通用原则

  • Actionable: Logs should help diagnose problems, not just record events
  • Contextual: Include enough context to understand what happened without code inspection
  • Consistent: Use same terminology across codebase for same events
  • Sparse: Don't log everything - unnecessary noise obscures real issues
  • Sampling: In high-volume scenarios, sample logs (10%, 1%, etc.) rather than logging everything
  • Structured: Always use structured format (JSON) for programmatic parsing
  • 可行动性:日志应有助于诊断问题,而非仅记录事件
  • 上下文完整:包含足够上下文,无需查看代码即可了解事件情况
  • 一致性:代码库中对同一事件使用统一术语
  • 精简性:不要记录所有内容,不必要的噪音会掩盖真实问题
  • 采样:在高流量场景下,对日志进行采样(如10%、1%等),而非记录全部日志
  • 结构化:始终使用结构化格式(JSON)以便程序解析

2. Structured Logging Format

2. 结构化日志格式

Standard Fields

标准字段

Every log entry should include:
json
{
  "timestamp": "2025-11-17T10:30:45.123Z",
  "level": "ERROR",
  "message": "Failed to process user request",
  "service": "auth-service",
  "version": "1.2.3",
  "environment": "production",
  "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
  "span_id": "00f067aa0ba902b7",
  "parent_span_id": "0af7651916cd43dd",
  "user_id": "user-12345",
  "request_id": "req-98765",
  "path": "/api/users/authenticate",
  "method": "POST",
  "status_code": 500,
  "error": {
    "type": "InvalidCredentialsError",
    "message": "Provided credentials do not match",
    "stack": "Error: InvalidCredentialsError...",
    "code": "AUTH_INVALID_CREDS"
  },
  "context": {
    "ip_address": "192.168.1.100",
    "user_agent": "Mozilla/5.0...",
    "attempt_number": 3,
    "rate_limit_remaining": 2
  },
  "duration_ms": 245,
  "custom_field": "custom_value"
}
每条日志条目应包含以下内容:
json
{
  "timestamp": "2025-11-17T10:30:45.123Z",
  "level": "ERROR",
  "message": "Failed to process user request",
  "service": "auth-service",
  "version": "1.2.3",
  "environment": "production",
  "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
  "span_id": "00f067aa0ba902b7",
  "parent_span_id": "0af7651916cd43dd",
  "user_id": "user-12345",
  "request_id": "req-98765",
  "path": "/api/users/authenticate",
  "method": "POST",
  "status_code": 500,
  "error": {
    "type": "InvalidCredentialsError",
    "message": "Provided credentials do not match",
    "stack": "Error: InvalidCredentialsError...",
    "code": "AUTH_INVALID_CREDS"
  },
  "context": {
    "ip_address": "192.168.1.100",
    "user_agent": "Mozilla/5.0...",
    "attempt_number": 3,
    "rate_limit_remaining": 2
  },
  "duration_ms": 245,
  "custom_field": "custom_value"
}

Required vs Optional Fields

必填与可选字段

Always include:
  • timestamp
  • level
  • message
  • trace_id
  • service
  • environment
When applicable:
  • span_id / parent_span_id (distributed tracing)
  • user_id (any user action)
  • request_id (any request)
  • error (on ERROR/FATAL)
  • duration_ms (operations)
  • context (relevant metadata)
必须包含:
  • timestamp
  • level
  • message
  • trace_id
  • service
  • environment
适用时包含:
  • span_id / parent_span_id(分布式追踪)
  • user_id(任何用户操作)
  • request_id(任何请求)
  • error(ERROR/FATAL级别时)
  • duration_ms(耗时操作)
  • context(相关元数据)

3. What to Log

3. 应记录的内容

Application Lifecycle

应用生命周期

json
// Startup
{"timestamp": "...", "level": "INFO", "message": "Service starting", "service": "auth-service", "version": "1.2.3"}

// Configuration loaded
{"timestamp": "...", "level": "INFO", "message": "Configuration loaded", "config_source": "environment", "environment": "production"}

// Database connection established
{"timestamp": "...", "level": "INFO", "message": "Database connected", "host": "db.internal", "pool_size": 20}

// Shutdown
{"timestamp": "...", "level": "INFO", "message": "Service shutting down", "reason": "SIGTERM", "uptime_seconds": 3600}
json
// 启动
{"timestamp": "...", "level": "INFO", "message": "服务启动中", "service": "auth-service", "version": "1.2.3"}

// 配置加载完成
{"timestamp": "...", "level": "INFO", "message": "配置已加载", "config_source": "environment", "environment": "production"}

// 数据库连接建立
{"timestamp": "...", "level": "INFO", "message": "数据库已连接", "host": "db.internal", "pool_size": 20}

// 关闭
{"timestamp": "...", "level": "INFO", "message": "服务正在关闭", "reason": "SIGTERM", "uptime_seconds": 3600}

User Actions

用户操作

json
// Login attempt
{"timestamp": "...", "level": "INFO", "message": "User login attempt", "user_id": "user-123", "method": "password"}

// Data modification
{"timestamp": "...", "level": "INFO", "message": "User updated profile", "user_id": "user-123", "fields_changed": ["email", "name"]}

// Permission check
{"timestamp": "...", "level": "INFO", "message": "Permission check", "user_id": "user-123", "resource": "report-456", "permission": "read", "granted": true}
json
// 登录尝试
{"timestamp": "...", "level": "INFO", "message": "用户登录尝试", "user_id": "user-123", "method": "password"}

// 数据修改
{"timestamp": "...", "level": "INFO", "message": "用户更新了个人资料", "user_id": "user-123", "fields_changed": ["email", "name"]}

// 权限检查
{"timestamp": "...", "level": "INFO", "message": "权限检查", "user_id": "user-123", "resource": "report-456", "permission": "read", "granted": true}

External API Calls

外部API调用

json
// API call started
{"timestamp": "...", "level": "DEBUG", "message": "External API call", "service": "my-service", "api": "stripe", "endpoint": "/charges", "method": "POST"}

// API response
{"timestamp": "...", "level": "DEBUG", "message": "API response received", "api": "stripe", "endpoint": "/charges", "status_code": 200, "duration_ms": 145}

// API error
{"timestamp": "...", "level": "WARN", "message": "External API error", "api": "stripe", "status_code": 429, "error": "rate_limit_exceeded", "retry_after_seconds": 60}
json
// API调用开始
{"timestamp": "...", "level": "DEBUG", "message": "外部API调用", "service": "my-service", "api": "stripe", "endpoint": "/charges", "method": "POST"}

// API响应
{"timestamp": "...", "level": "DEBUG", "message": "收到API响应", "api": "stripe", "endpoint": "/charges", "status_code": 200, "duration_ms": 145}

// API错误
{"timestamp": "...", "level": "WARN", "message": "外部API错误", "api": "stripe", "status_code": 429, "error": "rate_limit_exceeded", "retry_after_seconds": 60}

Errors and Exceptions

错误与异常

json
{
  "timestamp": "...",
  "level": "ERROR",
  "message": "Payment processing failed",
  "service": "payment-service",
  "user_id": "user-456",
  "error": {
    "type": "PaymentGatewayError",
    "message": "Connection timeout",
    "code": "GATEWAY_TIMEOUT",
    "stack": "PaymentGatewayError: Connection timeout\n    at processPayment (payment.ts:45)\n    at ..."
  },
  "context": {
    "amount": 9999,
    "currency": "USD",
    "gateway": "stripe"
  }
}
json
{
  "timestamp": "...",
  "level": "ERROR",
  "message": "支付处理失败",
  "service": "payment-service",
  "user_id": "user-456",
  "error": {
    "type": "PaymentGatewayError",
    "message": "连接超时",
    "code": "GATEWAY_TIMEOUT",
    "stack": "PaymentGatewayError: Connection timeout\n    at processPayment (payment.ts:45)\n    at ..."
  },
  "context": {
    "amount": 9999,
    "currency": "USD",
    "gateway": "stripe"
  }
}

Performance Metrics

性能指标

json
// Slow operation
{"timestamp": "...", "level": "WARN", "message": "Slow query detected", "duration_ms": 5234, "threshold_ms": 1000, "query": "SELECT * FROM orders WHERE..."}

// Resource usage
{"timestamp": "...", "level": "INFO", "message": "Memory usage high", "memory_used_mb": 2048, "memory_limit_mb": 2560, "percentage": 80}

// Cache statistics
{"timestamp": "...", "level": "DEBUG", "message": "Cache stats", "cache_hits": 4521, "cache_misses": 234, "hit_rate": 0.95}
json
// 慢操作
{"timestamp": "...", "level": "WARN", "message": "检测到慢查询", "duration_ms": 5234, "threshold_ms": 1000, "query": "SELECT * FROM orders WHERE..."}

// 资源使用情况
{"timestamp": "...", "level": "INFO", "message": "内存使用率过高", "memory_used_mb": 2048, "memory_limit_mb": 2560, "percentage": 80}

// 缓存统计
{"timestamp": "...", "level": "DEBUG", "message": "缓存统计", "cache_hits": 4521, "cache_misses": 234, "hit_rate": 0.95}

4. What NOT to Log

4. 不应记录的内容

NEVER log:
  • Passwords or authentication tokens
  • API keys or secrets
  • Private keys or certificates
  • Database credentials
  • OAuth tokens or refresh tokens
  • Credit card numbers
  • Social security numbers
  • Email addresses (without redaction in logs)
  • Personal identification numbers
  • Medical records
  • Raw HTTP request/response bodies (especially with auth headers)
Be careful with:
  • PII in general (name, phone, address) - redact or use anonymized IDs
  • Query parameters (may contain secrets)
  • Request/response headers (often contain authorization)
  • User input (may contain sensitive data)
Security rule: When in doubt, DON'T log it
python
undefined
绝对禁止记录:
  • 密码或认证令牌
  • API密钥或机密信息
  • 私钥或证书
  • 数据库凭证
  • OAuth令牌或刷新令牌
  • 信用卡号
  • 社会保障号
  • 电子邮件地址(日志中未脱敏时)
  • 个人识别号码
  • 医疗记录
  • 原始HTTP请求/响应体(尤其是包含认证头的情况)
需谨慎处理:
  • 一般个人可识别信息(PII,如姓名、电话、地址)- 脱敏或使用匿名ID
  • 查询参数(可能包含机密信息)
  • 请求/响应头(通常包含授权信息)
  • 用户输入(可能包含敏感数据)
安全规则:存疑时,请勿记录
python
undefined

BAD - logging credentials

错误示例 - 记录凭证

logger.info(f"Login attempt for {username} with password {password}")
logger.info(f"Login attempt for {username} with password {password}")

GOOD - logging action without sensitive data

正确示例 - 记录操作但不包含敏感数据

logger.info("Login attempt", extra={"username": username, "method": "password"})
logger.info("Login attempt", extra={"username": username, "method": "password"})

BAD - logging full request with auth header

错误示例 - 记录包含认证头的完整请求

logger.debug(f"Request: {request.headers}")
logger.debug(f"Request: {request.headers}")

GOOD - logging request metadata

正确示例 - 记录请求元数据

logger.debug("Incoming request", extra={ "method": request.method, "path": request.path, "user_agent": request.headers.get('user-agent') })
undefined
logger.debug("Incoming request", extra={ "method": request.method, "path": request.path, "user_agent": request.headers.get('user-agent') })
undefined

5. Distributed Tracing

5. 分布式追踪

Trace IDs and Span IDs

Trace ID与Span ID

  • Trace ID: Unique identifier for entire request flow across services
  • Span ID: Unique identifier for single operation/service call
  • Parent Span ID: Span that initiated current span (for tracing parent-child relationships)
Generated once at entry point, propagated through all downstream calls:
Request → [Service A, Trace: abc123]
  ├─ [Span: span1] Database query
  ├─ [Span: span2] → Service B, parent: span2
       └─ [Span: span3] Cache lookup
  └─ [Span: span4] External API call
  • Trace ID:跨服务的整个请求流程的唯一标识符
  • Span ID:单个操作/服务调用的唯一标识符
  • Parent Span ID:发起当前Span的父Span(用于追踪父子关系)
在入口点生成一次,通过所有下游调用传播:
请求 → [服务A, Trace: abc123]
  ├─ [Span: span1] 数据库查询
  ├─ [Span: span2] → 服务B, parent: span2
       └─ [Span: span3] 缓存查找
  └─ [Span: span4] 外部API调用

Implementation

实现示例

python
undefined
python
undefined

Python example with trace context

带追踪上下文的Python示例

import uuid
class RequestContext: def init(self, trace_id=None, span_id=None, parent_span_id=None): self.trace_id = trace_id or str(uuid.uuid4()) self.span_id = span_id or str(uuid.uuid4()) self.parent_span_id = parent_span_id
import uuid
class RequestContext: def init(self, trace_id=None, span_id=None, parent_span_id=None): self.trace_id = trace_id or str(uuid.uuid4()) self.span_id = span_id or str(uuid.uuid4()) self.parent_span_id = parent_span_id

Middleware/decorator

中间件/装饰器

def trace_request(func): def wrapper(*args, **kwargs): ctx = RequestContext() return func(*args, context=ctx, **kwargs) return wrapper
def trace_request(func): def wrapper(*args, **kwargs): ctx = RequestContext() return func(*args, context=ctx, **kwargs) return wrapper

Propagate to downstream services

向下游服务传播上下文

def call_downstream_service(service_url, data, context): headers = { 'X-Trace-ID': context.trace_id, 'X-Span-ID': context.span_id, 'X-Parent-Span-ID': context.span_id # Current becomes parent } response = requests.post(service_url, json=data, headers=headers) return response
undefined
def call_downstream_service(service_url, data, context): headers = { 'X-Trace-ID': context.trace_id, 'X-Span-ID': context.span_id, 'X-Parent-Span-ID': context.span_id # 当前Span成为父Span } response = requests.post(service_url, json=data, headers=headers) return response
undefined

Sampling Strategies

采样策略

  • No sampling: Log all traces (high volume services may be expensive)
  • Rate sampling: Log every Nth request (e.g., 1 in 100)
  • Adaptive sampling: Sample based on error rate, latency, or traffic volume
  • Tail sampling: Sample based on trace outcome (errors always sampled)
python
undefined
  • 无采样:记录所有追踪(高流量服务可能成本高昂)
  • 比率采样:每N个请求记录一次(如每100个请求记录1个)
  • 自适应采样:基于错误率、延迟或流量采样
  • 尾部采样:基于追踪结果采样(错误请求始终采样)
python
undefined

Adaptive sampling example

自适应采样示例

def should_sample(trace): # Always sample errors if trace.has_error: return True
# Sample slow requests (>1s)
if trace.duration_ms > 1000:
    return True

# Sample 1% of normal requests
return random.random() < 0.01
undefined
def should_sample(trace): # 错误请求始终采样 if trace.has_error: return True
# 慢请求(>1秒)采样
if trace.duration_ms > 1000:
    return True

# 正常请求采样1%
return random.random() < 0.01
undefined

6. Performance Logging

6. 性能日志

Execution Time

执行时间

python
import time

def log_execution_time(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        try:
            result = func(*args, **kwargs)
            duration_ms = (time.time() - start) * 1000
            logger.info(f"{func.__name__} completed", extra={
                "duration_ms": duration_ms,
                "status": "success"
            })
            return result
        except Exception as e:
            duration_ms = (time.time() - start) * 1000
            logger.error(f"{func.__name__} failed", extra={
                "duration_ms": duration_ms,
                "error": str(e)
            })
            raise
    return wrapper
python
import time

def log_execution_time(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        try:
            result = func(*args, **kwargs)
            duration_ms = (time.time() - start) * 1000
            logger.info(f"{func.__name__} 执行完成", extra={
                "duration_ms": duration_ms,
                "status": "success"
            })
            return result
        except Exception as e:
            duration_ms = (time.time() - start) * 1000
            logger.error(f"{func.__name__} 执行失败", extra={
                "duration_ms": duration_ms,
                "error": str(e)
            })
            raise
    return wrapper

Resource Usage

资源使用情况

python
import psutil
import os

def log_resource_usage():
    process = psutil.Process(os.getpid())
    memory = process.memory_info()

    logger.info("Resource usage", extra={
        "memory_rss_mb": memory.rss / 1024 / 1024,
        "memory_vms_mb": memory.vms / 1024 / 1024,
        "cpu_percent": process.cpu_percent(interval=1),
        "num_threads": process.num_threads()
    })
python
import psutil
import os

def log_resource_usage():
    process = psutil.Process(os.getpid())
    memory = process.memory_info()

    logger.info("资源使用情况", extra={
        "memory_rss_mb": memory.rss / 1024 / 1024,
        "memory_vms_mb": memory.vms / 1024 / 1024,
        "cpu_percent": process.cpu_percent(interval=1),
        "num_threads": process.num_threads()
    })

Slow Query Logs

慢查询日志

python
undefined
python
undefined

Track database query performance

追踪数据库查询性能

SLOW_QUERY_THRESHOLD_MS = 1000
def execute_query(query, params): start = time.time() cursor.execute(query, params) duration_ms = (time.time() - start) * 1000
if duration_ms > SLOW_QUERY_THRESHOLD_MS:
    logger.warn("Slow query detected", extra={
        "query": query,
        "params_count": len(params),
        "duration_ms": duration_ms,
        "threshold_ms": SLOW_QUERY_THRESHOLD_MS
    })

return cursor.fetchall()
undefined
SLOW_QUERY_THRESHOLD_MS = 1000
def execute_query(query, params): start = time.time() cursor.execute(query, params) duration_ms = (time.time() - start) * 1000
if duration_ms > SLOW_QUERY_THRESHOLD_MS:
    logger.warn("检测到慢查询", extra={
        "query": query,
        "params_count": len(params),
        "duration_ms": duration_ms,
        "threshold_ms": SLOW_QUERY_THRESHOLD_MS
    })

return cursor.fetchall()
undefined

7. Debugging Patterns

7. 调试模式

Debug Logging

调试日志

Use DEBUG level for development/troubleshooting only:
python
logger.debug("Function entry", extra={
    "function": "process_payment",
    "args": {"amount": 100, "currency": "USD"}
})

logger.debug("Intermediate state", extra={
    "processing_step": "validation",
    "validation_passed": True,
    "timestamp": time.time()
})

logger.debug("Function exit", extra={
    "function": "process_payment",
    "return_value": {"transaction_id": "txn-123", "status": "pending"}
})
仅在开发/故障排查时使用DEBUG级别:
python
logger.debug("函数入口", extra={
    "function": "process_payment",
    "args": {"amount": 100, "currency": "USD"}
})

logger.debug("中间状态", extra={
    "processing_step": "validation",
    "validation_passed": True,
    "timestamp": time.time()
})

logger.debug("函数出口", extra={
    "function": "process_payment",
    "return_value": {"transaction_id": "txn-123", "status": "pending"}
})

Conditional Breakpoints

条件断点

In IDE debugger (VS Code, PyCharm, etc.):
python
undefined
在IDE调试器(VS Code、PyCharm等)中使用:
python
undefined

Set breakpoint with condition

设置带条件的断点

Debugger pauses only when condition is true

仅当条件为真时调试器暂停

if user_id == "debug-user-123": # Breakpoint here with condition: amount > 1000 processor.process(order)
undefined
if user_id == "debug-user-123": # 在此处设置断点,条件为:amount > 1000 processor.process(order)
undefined

Remote Debugging

远程调试

Python example:
python
undefined
Python示例:
python
undefined

Start remote debugger (debugpy)

启动远程调试器(debugpy)

import debugpy
debugpy.listen(("0.0.0.0", 5678)) print("Debugger attached, waiting for connection...") debugpy.wait_for_client()
import debugpy
debugpy.listen(("0.0.0.0", 5678)) print("调试器已启动,等待连接...") debugpy.wait_for_client()

Then connect from IDE on same port

随后从IDE通过相同端口连接

undefined
undefined

Log Aggregation for Debugging

用于调试的日志聚合

python
undefined
python
undefined

Retrieve logs for specific trace

获取特定trace的日志

def get_trace_logs(trace_id): query = f"SELECT * FROM logs WHERE trace_id = '{trace_id}' ORDER BY timestamp" # Execute against log storage (ELK, Loki, etc.) return results
def get_trace_logs(trace_id): query = f"SELECT * FROM logs WHERE trace_id = '{trace_id}' ORDER BY timestamp" # 针对日志存储执行查询(如ELK、Loki等) return results

Filter by user for debugging user issues

按用户过滤日志以排查用户问题

def get_user_logs(user_id, hours=1): query = f"SELECT * FROM logs WHERE user_id = '{user_id}' AND timestamp > now() - {hours}h" return results
undefined
def get_user_logs(user_id, hours=1): query = f"SELECT * FROM logs WHERE user_id = '{user_id}' AND timestamp > now() - {hours}h" return results
undefined

8. Log Management

8. 日志管理

Log Rotation

日志轮转

Prevent unbounded disk usage:
python
undefined
防止磁盘空间无限占用:
python
undefined

Python logging with rotation

带轮转的Python日志配置

from logging.handlers import RotatingFileHandler
handler = RotatingFileHandler( filename='app.log', maxBytes=10485760, # 10MB backupCount=5 # Keep 5 rotated files )
from logging.handlers import RotatingFileHandler
handler = RotatingFileHandler( filename='app.log', maxBytes=10485760, # 10MB backupCount=5 # 保留5个轮转文件 )

Backup naming: app.log, app.log.1, app.log.2, etc.

备份文件命名:app.log, app.log.1, app.log.2, 等

undefined
undefined

Retention Policies

保留策略

json
{
  "retention_policy": {
    "DEBUG": "7 days",
    "INFO": "30 days",
    "WARN": "90 days",
    "ERROR": "1 year",
    "FATAL": "indefinite"
  }
}
json
{
  "retention_policy": {
    "DEBUG": "7天",
    "INFO": "30天",
    "WARN": "90天",
    "ERROR": "1年",
    "FATAL": "永久"
  }
}

Log Aggregation Tools

日志聚合工具

ToolBest ForStrengths
ELK Stack (Elasticsearch, Logstash, Kibana)On-premise, complex queriesPowerful search, rich dashboards, customizable
Grafana LokiSimple log aggregation, cost-effectiveLow overhead, integrates with Prometheus
DatadogCloud-first, all-in-oneAgent-based, excellent integrations
SplunkEnterprise, security focusPowerful search, alerting, compliance reports
CloudWatchAWS nativeSeamless AWS integration, log groups
StackdriverGCP nativeGoogle Cloud integration
CloudLoggingAzure nativeMicrosoft ecosystem
工具适用场景优势
ELK Stack (Elasticsearch, Logstash, Kibana)本地部署、复杂查询强大的搜索功能、丰富的仪表盘、高度可定制
Grafana Loki简单日志聚合、高性价比低资源开销、与Prometheus集成
Datadog云原生、一体化平台基于Agent、集成能力出色
Splunk企业级、安全聚焦强大的搜索、告警、合规报告
CloudWatchAWS原生与AWS无缝集成、日志组管理
StackdriverGCP原生与Google Cloud集成
CloudLoggingAzure原生微软生态系统集成

9. Metrics and Monitoring

9. 指标与监控

Application Metrics

应用指标

python
from prometheus_client import Counter, Histogram, Gauge
python
from prometheus_client import Counter, Histogram, Gauge

Counter: monotonically increasing

Counter: 单调递增

login_attempts = Counter('login_attempts_total', 'Total login attempts', ['status']) login_attempts.labels(status='success').inc()
login_attempts = Counter('login_attempts_total', 'Total login attempts', ['status']) login_attempts.labels(status='success').inc()

Histogram: observe value distribution

Histogram: 观测值分布

request_duration = Histogram('request_duration_seconds', 'Request duration') request_duration.observe(0.5)
request_duration = Histogram('request_duration_seconds', 'Request duration') request_duration.observe(0.5)

Gauge: can go up or down

Gauge: 可增可减

active_connections = Gauge('active_connections', 'Current active connections') active_connections.set(42)
undefined
active_connections = Gauge('active_connections', 'Current active connections') active_connections.set(42)
undefined

System Metrics

系统指标

python
undefined
python
undefined

CPU, memory, disk usage

CPU、内存、磁盘使用率

cpu_percent = psutil.cpu_percent(interval=1) memory = psutil.virtual_memory() disk = psutil.disk_usage('/')
logger.info("System metrics", extra={ "cpu_percent": cpu_percent, "memory_percent": memory.percent, "disk_percent": disk.percent })
undefined
cpu_percent = psutil.cpu_percent(interval=1) memory = psutil.virtual_memory() disk = psutil.disk_usage('/')
logger.info("系统指标", extra={ "cpu_percent": cpu_percent, "memory_percent": memory.percent, "disk_percent": disk.percent })
undefined

Alerting Rules

告警规则

yaml
undefined
yaml
undefined

Prometheus alert rules

Prometheus告警规则

alert: HighErrorRate expr: rate(requests_total{status="500"}[5m]) > 0.05 for: 5m annotations: summary: "High error rate detected" description: "Error rate is {{ $value | humanizePercentage }} for {{ $labels.service }}"
alert: SlowRequestLatency expr: histogram_quantile(0.95, request_duration_seconds) > 1 for: 10m annotations: summary: "Slow requests detected (p95 > 1s)"
undefined
alert: HighErrorRate expr: rate(requests_total{status="500"}[5m]) > 0.05 for: 5m annotations: summary: "检测到高错误率" description: "{{ $labels.service }}的错误率为{{ $value | humanizePercentage }}"
alert: SlowRequestLatency expr: histogram_quantile(0.95, request_duration_seconds) > 1 for: 10m annotations: summary: "检测到慢请求(p95 > 1秒)"
undefined

10. Common Libraries by Language

10. 各语言常用库

Python

Python

python
undefined
python
undefined

Standard library logging

标准库logging

import logging
import logging

Structured logging with structlog

结构化日志structlog

import structlog
logger = structlog.get_logger() logger.info("user_created", user_id="u123", email_domain="example.com")
import structlog
logger = structlog.get_logger() logger.info("user_created", user_id="u123", email_domain="example.com")

For advanced tracing

高级追踪库

from opentelemetry import trace, logging from opentelemetry.exporter.jaeger.thrift import JaegerExporter

**Libraries:**
- `logging` - Built-in, basic structured support
- `structlog` - Structured logging, cleaner API
- `python-json-logger` - JSON formatter for standard logging
- `OpenTelemetry` - Distributed tracing standard
- `Jaeger` - Distributed tracing backend
from opentelemetry import trace, logging from opentelemetry.exporter.jaeger.thrift import JaegerExporter

**常用库:**
- `logging` - 内置库,基础结构化支持
- `structlog` - 结构化日志,更简洁的API
- `python-json-logger` - 为标准logging提供JSON格式化
- `OpenTelemetry` - 分布式追踪标准
- `Jaeger` - 分布式追踪后端

Node.js / TypeScript

Node.js / TypeScript

javascript
// Winston
const winston = require('winston');

const logger = winston.createLogger({
  format: winston.format.json(),
  transports: [new winston.transports.Console()]
});

logger.info('User logged in', { userId: 'u123' });

// Pino (lightweight)
const pino = require('pino');
const logger = pino();
logger.info({ userId: 'u123' }, 'User logged in');
Libraries:
  • winston
    - Full-featured, very popular
  • pino
    - Lightweight, high performance
  • bunyan
    - JSON logging, stream-based
  • morgan
    - HTTP request logger for Express
  • OpenTelemetry
    - Distributed tracing
  • @opentelemetry/api
    - Standard tracing API
javascript
// Winston
const winston = require('winston');

const logger = winston.createLogger({
  format: winston.format.json(),
  transports: [new winston.transports.Console()]
});

logger.info('User logged in', { userId: 'u123' });

// Pino(轻量级)
const pino = require('pino');
const logger = pino();
logger.info({ userId: 'u123' }, 'User logged in');
常用库:
  • winston
    - 功能完整,非常流行
  • pino
    - 轻量级,高性能
  • bunyan
    - JSON日志,基于流
  • morgan
    - Express的HTTP请求日志器
  • OpenTelemetry
    - 分布式追踪
  • @opentelemetry/api
    - 标准追踪API

Go

Go

go
// Structured logging with zap
import "go.uber.org/zap"

logger, _ := zap.NewProduction()
defer logger.Sync()

logger.Info("user login",
    zap.String("user_id", "u123"),
    zap.Duration("duration", time.Second),
)

// Or logrus (JSON support)
import "github.com/sirupsen/logrus"

logger := logrus.New()
logger.SetFormatter(&logrus.JSONFormatter{})
logger.WithFields(logrus.Fields{"user_id": "u123"}).Info("Login")
Libraries:
  • zap
    - High performance, structured
  • logrus
    - Popular, JSON output
  • slog
    - Standard library (Go 1.21+)
  • OpenTelemetry
    - Distributed tracing
go
// 结构化日志zap
import "go.uber.org/zap"

logger, _ := zap.NewProduction()
defer logger.Sync()

logger.Info("user login",
    zap.String("user_id", "u123"),
    zap.Duration("duration", time.Second),
)

// 或logrus(支持JSON)
import "github.com/sirupsen/logrus"

logger := logrus.New()
logger.SetFormatter(&logrus.JSONFormatter{})
logger.WithFields(logrus.Fields{"user_id": "u123"}).Info("Login")
常用库:
  • zap
    - 高性能,结构化
  • logrus
    - 流行,支持JSON输出
  • slog
    - 标准库(Go 1.21+)
  • OpenTelemetry
    - 分布式追踪

Java / Kotlin

Java / Kotlin

java
// Logback with SLF4J
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import net.logstash.logback.marker.Markers;

Logger logger = LoggerFactory.getLogger(MyClass.class);

// Structured with logback-json-encoder
logger.info(Markers.append("user_id", "u123"), "User logged in");

// Spring Boot with logback (built-in)
@RestController
public class UserController {
    private static final Logger logger = LoggerFactory.getLogger(UserController.class);
}
Libraries:
  • SLF4J
    +
    Logback
    - Standard combo
  • Log4j2
    - Enterprise feature-rich
  • Logstash Logback Encoder
    - Structured output
  • OpenTelemetry
    - Distributed tracing
java
// Logback搭配SLF4J
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import net.logstash.logback.marker.Markers;

Logger logger = LoggerFactory.getLogger(MyClass.class);

// 结合logback-json-encoder实现结构化日志
logger.info(Markers.append("user_id", "u123"), "User logged in");

// Spring Boot搭配logback(内置)
@RestController
public class UserController {
    private static final Logger logger = LoggerFactory.getLogger(UserController.class);
}
常用库:
  • SLF4J
    +
    Logback
    - 标准组合
  • Log4j2
    - 企业级功能丰富
  • Logstash Logback Encoder
    - 结构化输出
  • OpenTelemetry
    - 分布式追踪

C# / .NET

C# / .NET

csharp
// Serilog (structured)
using Serilog;

Log.Logger = new LoggerConfiguration()
    .WriteTo.Console()
    .CreateLogger();

Log.Information("User {UserId} logged in", "u123");

// Built-in ILogger with dependency injection
public class UserService {
    private readonly ILogger<UserService> _logger;

    public UserService(ILogger<UserService> logger) {
        _logger = logger;
    }
}
Libraries:
  • Serilog
    - Excellent structured support
  • NLog
    - Enterprise logging
  • log4net
    - Classic Apache Log4j port
  • Microsoft.Extensions.Logging
    - Built-in DI support
  • OpenTelemetry.Exporter.Console
    - Tracing
csharp
// Serilog(结构化)
using Serilog;

Log.Logger = new LoggerConfiguration()
    .WriteTo.Console()
    .CreateLogger();

Log.Information("User {UserId} logged in", "u123");

// 内置ILogger搭配依赖注入
public class UserService {
    private readonly ILogger<UserService> _logger;

    public UserService(ILogger<UserService> logger) {
        _logger = logger;
    }
}
常用库:
  • Serilog
    - 出色的结构化支持
  • NLog
    - 企业级日志库
  • log4net
    - 经典Apache Log4j移植版
  • Microsoft.Extensions.Logging
    - 内置依赖注入支持
  • OpenTelemetry.Exporter.Console
    - 追踪库

11. Example Patterns

11. 示例模式

Complete Request Logging Pipeline (Python)

完整请求日志流水线(Python)

python
from datetime import datetime
from uuid import uuid4
import json
import time
import structlog
python
from datetime import datetime
from uuid import uuid4
import json
import time
import structlog
import sys

Configure structlog

配置structlog

structlog.configure( processors=[ structlog.stdlib.ProcessorFormatter.wrap_for_formatter, ], context_class=dict, logger_factory=structlog.PrintLoggerFactory(file=sys.stdout), )
class RequestLogger: def init(self): self.logger = structlog.get_logger()
def log_request_start(self, request):
    trace_id = request.headers.get('X-Trace-ID') or str(uuid4())
    span_id = str(uuid4())

    self.logger.info(
        "request_started",
        trace_id=trace_id,
        span_id=span_id,
        method=request.method,
        path=request.path,
        user_id=request.user_id,
    )

    return trace_id, span_id

def log_request_complete(self, trace_id, span_id, status, duration_ms):
    level = "info" if status < 400 else "warn" if status < 500 else "error"

    self.logger.log(
        level,
        "request_completed",
        trace_id=trace_id,
        span_id=span_id,
        status_code=status,
        duration_ms=duration_ms,
    )

def log_error(self, trace_id, span_id, error, context=None):
    self.logger.error(
        "request_error",
        trace_id=trace_id,
        span_id=span_id,
        error_type=type(error).__name__,
        error_message=str(error),
        error_context=context or {},
    )
structlog.configure( processors=[ structlog.stdlib.ProcessorFormatter.wrap_for_formatter, ], context_class=dict, logger_factory=structlog.PrintLoggerFactory(file=sys.stdout), )
class RequestLogger: def init(self): self.logger = structlog.get_logger()
def log_request_start(self, request):
    trace_id = request.headers.get('X-Trace-ID') or str(uuid4())
    span_id = str(uuid4())

    self.logger.info(
        "request_started",
        trace_id=trace_id,
        span_id=span_id,
        method=request.method,
        path=request.path,
        user_id=request.user_id,
    )

    return trace_id, span_id

def log_request_complete(self, trace_id, span_id, status, duration_ms):
    level = "info" if status < 400 else "warn" if status < 500 else "error"

    self.logger.log(
        level,
        "request_completed",
        trace_id=trace_id,
        span_id=span_id,
        status_code=status,
        duration_ms=duration_ms,
    )

def log_error(self, trace_id, span_id, error, context=None):
    self.logger.error(
        "request_error",
        trace_id=trace_id,
        span_id=span_id,
        error_type=type(error).__name__,
        error_message=str(error),
        error_context=context or {},
    )

Flask integration

Flask集成

app = Flask(name) req_logger = RequestLogger()
@app.before_request def before_request(): request.trace_id, request.span_id = req_logger.log_request_start(request) request.start_time = time.time()
@app.after_request def after_request(response): duration_ms = (time.time() - request.start_time) * 1000 req_logger.log_request_complete( request.trace_id, request.span_id, response.status_code, duration_ms ) return response
@app.errorhandler(Exception) def handle_error(error): req_logger.log_error( request.trace_id, request.span_id, error, context={"path": request.path} ) return {"error": "Internal server error"}, 500
undefined
from flask import Flask, request app = Flask(name) req_logger = RequestLogger()
@app.before_request def before_request(): request.trace_id, request.span_id = req_logger.log_request_start(request) request.start_time = time.time()
@app.after_request def after_request(response): duration_ms = (time.time() - request.start_time) * 1000 req_logger.log_request_complete( request.trace_id, request.span_id, response.status_code, duration_ms ) return response
@app.errorhandler(Exception) def handle_error(error): req_logger.log_error( request.trace_id, request.span_id, error, context={"path": request.path} ) return {"error": "Internal server error"}, 500
undefined

Distributed Tracing Example (Node.js)

分布式追踪示例(Node.js)

typescript
import { trace, context, SpanStatusCode } from '@opentelemetry/api';
import { NodeSDK } from '@opentelemetry/sdk-node';
import { JaegerExporter } from '@opentelemetry/exporter-jaeger-thrift';

const sdk = new NodeSDK({
  traceExporter: new JaegerExporter({
    host: process.env.JAEGER_HOST || 'localhost',
    port: parseInt(process.env.JAEGER_PORT || '6831'),
  }),
});

sdk.start();

const tracer = trace.getTracer('my-service');

async function processPayment(userId: string, amount: number) {
  const span = tracer.startSpan('processPayment', {
    attributes: {
      'user_id': userId,
      'amount': amount,
      'currency': 'USD',
    }
  });

  return context.with(trace.setSpan(context.active(), span), async () => {
    try {
      // Nested span
      const validationSpan = tracer.startSpan('validatePayment');
      try {
        await validatePayment(userId, amount);
        validationSpan.setStatus({ code: SpanStatusCode.OK });
      } catch (error) {
        validationSpan.recordException(error);
        validationSpan.setStatus({ code: SpanStatusCode.ERROR });
        throw error;
      } finally {
        validationSpan.end();
      }

      // Call external service with trace propagation
      const result = await callPaymentGateway(amount);

      span.setStatus({ code: SpanStatusCode.OK });
      return result;
    } catch (error) {
      span.recordException(error);
      span.setStatus({ code: SpanStatusCode.ERROR });
      throw error;
    } finally {
      span.end();
    }
  });
}
typescript
import { trace, context, SpanStatusCode } from '@opentelemetry/api';
import { NodeSDK } from '@opentelemetry/sdk-node';
import { JaegerExporter } from '@opentelemetry/exporter-jaeger-thrift';

const sdk = new NodeSDK({
  traceExporter: new JaegerExporter({
    host: process.env.JAEGER_HOST || 'localhost',
    port: parseInt(process.env.JAEGER_PORT || '6831'),
  }),
});

sdk.start();

const tracer = trace.getTracer('my-service');

async function processPayment(userId: string, amount: number) {
  const span = tracer.startSpan('processPayment', {
    attributes: {
      'user_id': userId,
      'amount': amount,
      'currency': 'USD',
    }
  });

  return context.with(trace.setSpan(context.active(), span), async () => {
    try {
      // 嵌套Span
      const validationSpan = tracer.startSpan('validatePayment');
      try {
        await validatePayment(userId, amount);
        validationSpan.setStatus({ code: SpanStatusCode.OK });
      } catch (error) {
        validationSpan.recordException(error);
        validationSpan.setStatus({ code: SpanStatusCode.ERROR });
        throw error;
      } finally {
        validationSpan.end();
      }

      // 调用外部服务并传播追踪上下文
      const result = await callPaymentGateway(amount);

      span.setStatus({ code: SpanStatusCode.OK });
      return result;
    } catch (error) {
      span.recordException(error);
      span.setStatus({ code: SpanStatusCode.ERROR });
      throw error;
    } finally {
      span.end();
    }
  });
}

Security-Conscious Logging (Go)

安全合规日志(Go)

go
package main

import (
  "go.uber.org/zap"
  "net/http"
)

// RedactSensitive removes sensitive fields from log data
func RedactSensitive(data map[string]interface{}) map[string]interface{} {
  sensitiveKeys := []string{"password", "api_key", "token", "credit_card", "ssn"}

  for _, key := range sensitiveKeys {
    if _, exists := data[key]; exists {
      data[key] = "[REDACTED]"
    }
  }
  return data
}

func LogRequest(logger *zap.Logger, r *http.Request) {
  // Extract safe headers only
  safeHeaders := map[string]string{
    "user-agent": r.Header.Get("User-Agent"),
    "content-type": r.Header.Get("Content-Type"),
  }

  logger.Info("incoming request",
    zap.String("method", r.Method),
    zap.String("path", r.URL.Path),
    zap.Any("headers", safeHeaders),
    zap.String("remote_addr", r.RemoteAddr),
  )
}

func LogError(logger *zap.Logger, err error, context map[string]interface{}) {
  logger.Error("operation failed",
    zap.Error(err),
    zap.Any("context", RedactSensitive(context)),
  )
}
go
package main

import (
  "go.uber.org/zap"
  "net/http"
)

// RedactSensitive 从日志数据中移除敏感字段
func RedactSensitive(data map[string]interface{}) map[string]interface{} {
  sensitiveKeys := []string{"password", "api_key", "token", "credit_card", "ssn"}

  for _, key := range sensitiveKeys {
    if _, exists := data[key]; exists {
      data[key] = "[已脱敏]"
    }
  }
  return data
}

func LogRequest(logger *zap.Logger, r *http.Request) {
  // 仅提取安全的请求头
  safeHeaders := map[string]string{
    "user-agent": r.Header.Get("User-Agent"),
    "content-type": r.Header.Get("Content-Type"),
  }

  logger.Info("收到请求",
    zap.String("method", r.Method),
    zap.String("path", r.URL.Path),
    zap.Any("headers", safeHeaders),
    zap.String("remote_addr", r.RemoteAddr),
  )
}

func LogError(logger *zap.Logger, err error, context map[string]interface{}) {
  logger.Error("操作失败",
    zap.Error(err),
    zap.Any("context", RedactSensitive(context)),
  )
}

12. Quick Reference Checklist

12. 快速参考检查清单

When implementing logging/observability:
  • Use structured JSON logging
  • Include trace_id and span_id in all logs
  • Set appropriate log levels (don't over-log)
  • Never log passwords, keys, tokens, PII
  • Add contextual fields (user_id, request_id, etc.)
  • Implement log rotation to prevent disk overflow
  • Include stack traces for errors
  • Log entry/exit for important functions
  • Track execution time for performance monitoring
  • Sample high-volume logs to prevent storage/bandwidth issues
  • Use existing libraries (structlog, pino, zap, etc.)
  • Set up log aggregation (ELK, Loki, Datadog, etc.)
  • Create alerting rules for critical errors
  • Document logging patterns in team guidelines
  • Review logs regularly to spot issues early

Activate this skill when: working with logging systems, distributed tracing, debugging, monitoring, performance analysis, or observability-related tasks.
Combine with: development-philosophy (fail-fast debugging), security-first-design (never log secrets), testing-workflow (use logs to verify behavior).
实现日志/可观测性时:
  • 使用结构化JSON日志
  • 所有日志中包含trace_id和span_id
  • 设置合适的日志级别(不要过度记录)
  • 绝不记录密码、密钥、令牌、个人可识别信息(PII)
  • 添加上下文字段(user_id、request_id等)
  • 实现日志轮转以防止磁盘溢出
  • 错误日志中包含堆栈跟踪
  • 重要函数记录入口/出口
  • 追踪执行时间以监控性能
  • 对高流量日志进行采样以避免存储/带宽问题
  • 使用现有成熟库(如structlog、pino、zap等)
  • 搭建日志聚合系统(如ELK、Loki、Datadog等)
  • 为关键错误设置告警规则
  • 在团队指南中记录日志模式
  • 定期查看日志以提前发现问题

激活本技能场景: 处理日志系统、分布式追踪、调试、监控、性能分析或可观测性相关任务时。
可结合技能: 开发理念(快速失败调试)、安全优先设计(绝不记录机密)、测试工作流(使用日志验证行为)。