logging-observability
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseLogging and Observability
日志与可观测性
Overview
概述
Observability enables understanding system behavior through logs, metrics, and traces. This skill provides patterns for:
- Structured Logging: JSON logs with correlation IDs and contextual data
- Distributed Tracing: Span-based request tracking across services (OpenTelemetry, Jaeger, Zipkin)
- Metrics Collection: Counters, gauges, histograms for system health (Prometheus patterns)
- Log Aggregation: Centralized log management (ELK, Loki, Datadog)
- Alerting: Symptom-based alerts with runbooks
可观测性通过日志、指标和追踪数据帮助理解系统行为。本技能提供以下相关模式:
- 结构化日志:包含关联ID和上下文数据的JSON日志
- 分布式追踪:跨服务的基于Span的请求追踪(OpenTelemetry、Jaeger、Zipkin)
- 指标收集:用于系统健康监控的计数器、仪表盘、直方图(Prometheus模式)
- 日志聚合:集中式日志管理(ELK、Loki、Datadog)
- 告警:基于症状的告警及运行手册
Instructions
操作指南
1. Structured Logging (JSON Logs)
1. 结构化日志(JSON日志)
Python Implementation
Python 实现
python
import json
import logging
import sys
from datetime import datetime
from contextvars import ContextVar
from typing import Anypython
import json
import logging
import sys
from datetime import datetime
from contextvars import ContextVar
from typing import AnyContext variables for request tracking
Context variables for request tracking
correlation_id: ContextVar[str] = ContextVar('correlation_id', default='')
span_id: ContextVar[str] = ContextVar('span_id', default='')
class StructuredFormatter(logging.Formatter):
"""JSON formatter for structured logging."""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"correlation_id": correlation_id.get(),
"span_id": span_id.get(),
}
# Add exception info if present
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
# Add extra fields
if hasattr(record, 'structured_data'):
log_data.update(record.structured_data)
return json.dumps(log_data)def setup_logging():
"""Configure structured logging."""
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(StructuredFormatter())
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(handler)correlation_id: ContextVar[str] = ContextVar('correlation_id', default='')
span_id: ContextVar[str] = ContextVar('span_id', default='')
class StructuredFormatter(logging.Formatter):
"""JSON formatter for structured logging."""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"correlation_id": correlation_id.get(),
"span_id": span_id.get(),
}
# Add exception info if present
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
# Add extra fields
if hasattr(record, 'structured_data'):
log_data.update(record.structured_data)
return json.dumps(log_data)def setup_logging():
"""Configure structured logging."""
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(StructuredFormatter())
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(handler)Usage
Usage
logger = logging.getLogger(name)
logger.info("User logged in", extra={
"structured_data": {
"user_id": "123",
"ip_address": "192.168.1.1",
"action": "login"
}
})
undefinedlogger = logging.getLogger(name)
logger.info("User logged in", extra={
"structured_data": {
"user_id": "123",
"ip_address": "192.168.1.1",
"action": "login"
}
})
undefinedTypeScript Implementation
TypeScript 实现
typescript
interface LogContext {
correlationId?: string;
spanId?: string;
[key: string]: unknown;
}
interface LogEntry {
timestamp: string;
level: string;
message: string;
context: LogContext;
}
class StructuredLogger {
private context: LogContext = {};
withContext(context: LogContext): StructuredLogger {
const child = new StructuredLogger();
child.context = { ...this.context, ...context };
return child;
}
private log(
level: string,
message: string,
data?: Record<string, unknown>,
): void {
const entry: LogEntry = {
timestamp: new Date().toISOString(),
level,
message,
context: { ...this.context, ...data },
};
console.log(JSON.stringify(entry));
}
debug(message: string, data?: Record<string, unknown>): void {
this.log("DEBUG", message, data);
}
info(message: string, data?: Record<string, unknown>): void {
this.log("INFO", message, data);
}
warn(message: string, data?: Record<string, unknown>): void {
this.log("WARN", message, data);
}
error(message: string, data?: Record<string, unknown>): void {
this.log("ERROR", message, data);
}
}typescript
interface LogContext {
correlationId?: string;
spanId?: string;
[key: string]: unknown;
}
interface LogEntry {
timestamp: string;
level: string;
message: string;
context: LogContext;
}
class StructuredLogger {
private context: LogContext = {};
withContext(context: LogContext): StructuredLogger {
const child = new StructuredLogger();
child.context = { ...this.context, ...context };
return child;
}
private log(
level: string,
message: string,
data?: Record<string, unknown>,
): void {
const entry: LogEntry = {
timestamp: new Date().toISOString(),
level,
message,
context: { ...this.context, ...data },
};
console.log(JSON.stringify(entry));
}
debug(message: string, data?: Record<string, unknown>): void {
this.log("DEBUG", message, data);
}
info(message: string, data?: Record<string, unknown>): void {
this.log("INFO", message, data);
}
warn(message: string, data?: Record<string, unknown>): void {
this.log("WARN", message, data);
}
error(message: string, data?: Record<string, unknown>): void {
this.log("ERROR", message, data);
}
}2. Log Levels and When to Use Each
2. 日志级别及其适用场景
| Level | Usage | Examples |
|---|---|---|
| TRACE | Fine-grained debugging | Loop iterations, variable values |
| DEBUG | Diagnostic information | Function entry/exit, intermediate states |
| INFO | Normal operations | Request started, job completed, user action |
| WARN | Potential issues | Deprecated API usage, retry attempted, slow query |
| ERROR | Failures requiring attention | Exception caught, operation failed |
| FATAL | Critical failures | System cannot continue, data corruption |
python
undefined| 级别 | 适用场景 | 示例 |
|---|---|---|
| TRACE | 细粒度调试 | 循环迭代、变量值 |
| DEBUG | 诊断信息 | 函数进入/退出、中间状态 |
| INFO | 正常操作记录 | 请求启动、任务完成、用户操作 |
| WARN | 潜在问题预警 | 已弃用API使用、重试尝试、慢查询 |
| ERROR | 需要关注的失败情况 | 捕获到异常、操作失败 |
| FATAL | 严重故障 | 系统无法继续运行、数据损坏 |
python
undefinedLog level usage examples
Log level usage examples
logger.debug("Processing item", extra={"structured_data": {"item_id": item.id}})
logger.info("Order processed successfully", extra={"structured_data": {"order_id": order.id, "total": order.total}})
logger.warning("Rate limit approaching", extra={"structured_data": {"current": 95, "limit": 100}})
logger.error("Payment failed", extra={"structured_data": {"order_id": order.id, "error": str(e)}})
undefinedlogger.debug("Processing item", extra={"structured_data": {"item_id": item.id}})
logger.info("Order processed successfully", extra={"structured_data": {"order_id": order.id, "total": order.total}})
logger.warning("Rate limit approaching", extra={"structured_data": {"current": 95, "limit": 100}})
logger.error("Payment failed", extra={"structured_data": {"order_id": order.id, "error": str(e)}})
undefined3. Distributed Tracing
3. 分布式追踪
Correlation IDs and Spans
关联ID与Span
python
import uuid
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Optional
import time
@dataclass
class Span:
name: str
trace_id: str
span_id: str = field(default_factory=lambda: str(uuid.uuid4())[:16])
parent_span_id: Optional[str] = None
start_time: float = field(default_factory=time.time)
end_time: Optional[float] = None
attributes: dict = field(default_factory=dict)
def end(self):
self.end_time = time.time()
@property
def duration_ms(self) -> float:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return 0
current_span: ContextVar[Optional[Span]] = ContextVar('current_span', default=None)
class Tracer:
def __init__(self, service_name: str):
self.service_name = service_name
def start_span(self, name: str, parent: Optional[Span] = None) -> Span:
parent = parent or current_span.get()
trace_id = parent.trace_id if parent else str(uuid.uuid4())[:32]
parent_span_id = parent.span_id if parent else None
span = Span(
name=name,
trace_id=trace_id,
parent_span_id=parent_span_id,
attributes={"service": self.service_name}
)
current_span.set(span)
return span
def end_span(self, span: Span):
span.end()
self._export(span)
# Restore parent span if exists
# In production, use a span stack
def _export(self, span: Span):
"""Export span to tracing backend."""
logger.info(f"Span completed: {span.name}", extra={
"structured_data": {
"trace_id": span.trace_id,
"span_id": span.span_id,
"parent_span_id": span.parent_span_id,
"duration_ms": span.duration_ms,
"attributes": span.attributes
}
})python
import uuid
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Optional
import time
@dataclass
class Span:
name: str
trace_id: str
span_id: str = field(default_factory=lambda: str(uuid.uuid4())[:16])
parent_span_id: Optional[str] = None
start_time: float = field(default_factory=time.time)
end_time: Optional[float] = None
attributes: dict = field(default_factory=dict)
def end(self):
self.end_time = time.time()
@property
def duration_ms(self) -> float:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return 0
current_span: ContextVar[Optional[Span]] = ContextVar('current_span', default=None)
class Tracer:
def __init__(self, service_name: str):
self.service_name = service_name
def start_span(self, name: str, parent: Optional[Span] = None) -> Span:
parent = parent or current_span.get()
trace_id = parent.trace_id if parent else str(uuid.uuid4())[:32]
parent_span_id = parent.span_id if parent else None
span = Span(
name=name,
trace_id=trace_id,
parent_span_id=parent_span_id,
attributes={"service": self.service_name}
)
current_span.set(span)
return span
def end_span(self, span: Span):
span.end()
self._export(span)
# Restore parent span if exists
# In production, use a span stack
def _export(self, span: Span):
"""Export span to tracing backend."""
logger.info(f"Span completed: {span.name}", extra={
"structured_data": {
"trace_id": span.trace_id,
"span_id": span.span_id,
"parent_span_id": span.parent_span_id,
"duration_ms": span.duration_ms,
"attributes": span.attributes
}
})Context manager for spans
Context manager for spans
from contextlib import contextmanager
@contextmanager
def trace_span(tracer: Tracer, name: str):
span = tracer.start_span(name)
try:
yield span
except Exception as e:
span.attributes["error"] = True
span.attributes["error.message"] = str(e)
raise
finally:
tracer.end_span(span)
from contextlib import contextmanager
@contextmanager
def trace_span(tracer: Tracer, name: str):
span = tracer.start_span(name)
try:
yield span
except Exception as e:
span.attributes["error"] = True
span.attributes["error.message"] = str(e)
raise
finally:
tracer.end_span(span)
Usage
Usage
tracer = Tracer("order-service")
async def process_order(order_id: str):
with trace_span(tracer, "process_order") as span:
span.attributes["order_id"] = order_id
with trace_span(tracer, "validate_order"):
await validate(order_id)
with trace_span(tracer, "charge_payment"):
await charge(order_id)undefinedtracer = Tracer("order-service")
async def process_order(order_id: str):
with trace_span(tracer, "process_order") as span:
span.attributes["order_id"] = order_id
with trace_span(tracer, "validate_order"):
await validate(order_id)
with trace_span(tracer, "charge_payment"):
await charge(order_id)undefined4. Metrics Collection
4. 指标收集
python
from dataclasses import dataclass
from typing import Dict, List
from enum import Enum
import time
import threading
class MetricType(Enum):
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
@dataclass
class Counter:
name: str
labels: Dict[str, str]
value: float = 0
def inc(self, amount: float = 1):
self.value += amount
@dataclass
class Gauge:
name: str
labels: Dict[str, str]
value: float = 0
def set(self, value: float):
self.value = value
def inc(self, amount: float = 1):
self.value += amount
def dec(self, amount: float = 1):
self.value -= amount
@dataclass
class Histogram:
name: str
labels: Dict[str, str]
buckets: List[float]
values: List[float] = None
def __post_init__(self):
self.values = []
self._bucket_counts = {b: 0 for b in self.buckets}
self._bucket_counts[float('inf')] = 0
self._sum = 0
self._count = 0
def observe(self, value: float):
self.values.append(value)
self._sum += value
self._count += 1
for bucket in sorted(self._bucket_counts.keys()):
if value <= bucket:
self._bucket_counts[bucket] += 1
class MetricsRegistry:
def __init__(self):
self._metrics: Dict[str, any] = {}
self._lock = threading.Lock()
def counter(self, name: str, labels: Dict[str, str] = None) -> Counter:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Counter(name, labels or {})
return self._metrics[key]
def gauge(self, name: str, labels: Dict[str, str] = None) -> Gauge:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Gauge(name, labels or {})
return self._metrics[key]
def histogram(self, name: str, buckets: List[float], labels: Dict[str, str] = None) -> Histogram:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Histogram(name, labels or {}, buckets)
return self._metrics[key]python
from dataclasses import dataclass
from typing import Dict, List
from enum import Enum
import time
import threading
class MetricType(Enum):
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
@dataclass
class Counter:
name: str
labels: Dict[str, str]
value: float = 0
def inc(self, amount: float = 1):
self.value += amount
@dataclass
class Gauge:
name: str
labels: Dict[str, str]
value: float = 0
def set(self, value: float):
self.value = value
def inc(self, amount: float = 1):
self.value += amount
def dec(self, amount: float = 1):
self.value -= amount
@dataclass
class Histogram:
name: str
labels: Dict[str, str]
buckets: List[float]
values: List[float] = None
def __post_init__(self):
self.values = []
self._bucket_counts = {b: 0 for b in self.buckets}
self._bucket_counts[float('inf')] = 0
self._sum = 0
self._count = 0
def observe(self, value: float):
self.values.append(value)
self._sum += value
self._count += 1
for bucket in sorted(self._bucket_counts.keys()):
if value <= bucket:
self._bucket_counts[bucket] += 1
class MetricsRegistry:
def __init__(self):
self._metrics: Dict[str, any] = {}
self._lock = threading.Lock()
def counter(self, name: str, labels: Dict[str, str] = None) -> Counter:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Counter(name, labels or {})
return self._metrics[key]
def gauge(self, name: str, labels: Dict[str, str] = None) -> Gauge:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Gauge(name, labels or {})
return self._metrics[key]
def histogram(self, name: str, buckets: List[float], labels: Dict[str, str] = None) -> Histogram:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Histogram(name, labels or {}, buckets)
return self._metrics[key]Usage
Usage
metrics = MetricsRegistry()
metrics = MetricsRegistry()
Counter for requests
Counter for requests
request_counter = metrics.counter("http_requests_total", {"method": "GET", "path": "/api/orders"})
request_counter.inc()
request_counter = metrics.counter("http_requests_total", {"method": "GET", "path": "/api/orders"})
request_counter.inc()
Gauge for active connections
Gauge for active connections
active_connections = metrics.gauge("active_connections")
active_connections.inc()
active_connections = metrics.gauge("active_connections")
active_connections.inc()
... handle connection ...
... handle connection ...
active_connections.dec()
active_connections.dec()
Histogram for request duration
Histogram for request duration
request_duration = metrics.histogram(
"http_request_duration_seconds",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
start = time.time()
request_duration = metrics.histogram(
"http_request_duration_seconds",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
start = time.time()
... handle request ...
... handle request ...
request_duration.observe(time.time() - start)
undefinedrequest_duration.observe(time.time() - start)
undefined5. OpenTelemetry Patterns
5. OpenTelemetry 模式
python
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
def setup_opentelemetry(service_name: str, otlp_endpoint: str):
"""Initialize OpenTelemetry with OTLP export."""
# Tracing setup
trace_provider = TracerProvider(
resource=Resource.create({"service.name": service_name})
)
trace_provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
)
trace.set_tracer_provider(trace_provider)
# Metrics setup
metric_provider = MeterProvider(
resource=Resource.create({"service.name": service_name})
)
metrics.set_meter_provider(metric_provider)
# Auto-instrumentation
RequestsInstrumentor().instrument()
return trace.get_tracer(service_name), metrics.get_meter(service_name)python
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
def setup_opentelemetry(service_name: str, otlp_endpoint: str):
"""Initialize OpenTelemetry with OTLP export."""
# Tracing setup
trace_provider = TracerProvider(
resource=Resource.create({"service.name": service_name})
)
trace_provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
)
trace.set_tracer_provider(trace_provider)
# Metrics setup
metric_provider = MeterProvider(
resource=Resource.create({"service.name": service_name})
)
metrics.set_meter_provider(metric_provider)
# Auto-instrumentation
RequestsInstrumentor().instrument()
return trace.get_tracer(service_name), metrics.get_meter(service_name)Usage with FastAPI
Usage with FastAPI
from fastapi import FastAPI
app = FastAPI()
FastAPIInstrumentor.instrument_app(app)
tracer, meter = setup_opentelemetry("order-service", "http://otel-collector:4317")
from fastapi import FastAPI
app = FastAPI()
FastAPIInstrumentor.instrument_app(app)
tracer, meter = setup_opentelemetry("order-service", "http://otel-collector:4317")
Custom spans
Custom spans
@app.get("/orders/{order_id}")
async def get_order(order_id: str):
with tracer.start_as_current_span("fetch_order") as span:
span.set_attribute("order.id", order_id)
order = await order_repository.get(order_id)
span.set_attribute("order.status", order.status)
return order
undefined@app.get("/orders/{order_id}")
async def get_order(order_id: str):
with tracer.start_as_current_span("fetch_order") as span:
span.set_attribute("order.id", order_id)
order = await order_repository.get(order_id)
span.set_attribute("order.status", order.status)
return order
undefined6. Log Aggregation Patterns
6. 日志聚合模式
ELK Stack (Elasticsearch, Logstash, Kibana)
ELK 栈(Elasticsearch、Logstash、Kibana)
yaml
undefinedyaml
undefinedLogstash pipeline configuration
Logstash pipeline configuration
input {
file {
path => "/var/log/app/*.log"
codec => json
}
}
filter {
Parse structured JSON logs
json {
source => "message"
}
Add Elasticsearch index based on date
mutate {
add_field => {
"[@metadata][index]" => "app-logs-%{+YYYY.MM.dd}"
}
}
Enrich with geolocation (if IP present)
geoip {
source => "ip_address"
target => "geo"
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[@metadata][index]}"
}
}
undefinedinput {
file {
path => "/var/log/app/*.log"
codec => json
}
}
filter {
Parse structured JSON logs
json {
source => "message"
}
Add Elasticsearch index based on date
mutate {
add_field => {
"[@metadata][index]" => "app-logs-%{+YYYY.MM.dd}"
}
}
Enrich with geolocation (if IP present)
geoip {
source => "ip_address"
target => "geo"
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[@metadata][index]}"
}
}
undefinedGrafana Loki
Grafana Loki
yaml
undefinedyaml
undefinedPromtail scrape configuration
Promtail scrape configuration
scrape_configs:
-
job_name: app-logs static_configs:
- targets:
- localhost labels: job: app-logs path: /var/log/app/*.log
Extract JSON fields as labels
pipeline_stages:- json: expressions: level: level correlation_id: correlation_id service: service
- labels: level: correlation_id: service:
- targets:
undefinedscrape_configs:
-
job_name: app-logs static_configs:
- targets:
- localhost labels: job: app-logs path: /var/log/app/*.log
Extract JSON fields as labels
pipeline_stages:- json: expressions: level: level correlation_id: correlation_id service: service
- labels: level: correlation_id: service:
- targets:
undefinedDatadog Agent Configuration
Datadog Agent 配置
yaml
undefinedyaml
undefineddatadog.yaml
datadog.yaml
logs_enabled: true
logs_config:
processing_rules:
- type: exclude_at_match
name: exclude_healthcheck
pattern: "GET /health"
Auto-parse JSON logs
auto_multi_line_detection: true
logs_enabled: true
logs_config:
processing_rules:
- type: exclude_at_match
name: exclude_healthcheck
pattern: "GET /health"
Auto-parse JSON logs
auto_multi_line_detection: true
Log collection from files
Log collection from files
logs:
- type: file
path: "/var/log/app/*.log"
service: "order-service"
source: "python"
tags:
- "env:production"
undefinedlogs:
- type: file
path: "/var/log/app/*.log"
service: "order-service"
source: "python"
tags:
- "env:production"
undefined7. Alert Design
7. 告警设计
Prometheus Alerting Rules
Prometheus 告警规则
yaml
undefinedyaml
undefinedPrometheus alerting rules
Prometheus alerting rules
groups:
-
name: service-alerts rules:
High error rate alert
- alert: HighErrorRate expr: | sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m])) > 0.05 for: 5m labels: severity: critical annotations: summary: "High error rate detected" description: "Error rate is {{ $value | humanizePercentage }} over the last 5 minutes" runbook_url: "https://wiki.example.com/runbooks/high-error-rate"
High latency alert
- alert: HighLatency expr: | histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[5m])) by (le)) > 1 for: 10m labels: severity: warning annotations: summary: "High latency detected" description: "95th percentile latency is {{ $value }}s"
Service down alert
- alert: ServiceDown expr: up == 0 for: 1m labels: severity: critical annotations: summary: "Service {{ $labels.instance }} is down" description: "{{ $labels.job }} has been down for more than 1 minute"
undefinedgroups:
-
name: service-alerts rules:
High error rate alert
- alert: HighErrorRate expr: | sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m])) > 0.05 for: 5m labels: severity: critical annotations: summary: "High error rate detected" description: "Error rate is {{ $value | humanizePercentage }} over the last 5 minutes" runbook_url: "https://wiki.example.com/runbooks/high-error-rate"
High latency alert
- alert: HighLatency expr: | histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[5m])) by (le)) > 1 for: 10m labels: severity: warning annotations: summary: "High latency detected" description: "95th percentile latency is {{ $value }}s"
Service down alert
- alert: ServiceDown expr: up == 0 for: 1m labels: severity: critical annotations: summary: "Service {{ $labels.instance }} is down" description: "{{ $labels.job }} has been down for more than 1 minute"
undefinedAlert Severity Levels
告警严重级别
| Level | Response Time | Examples |
|---|---|---|
| Critical | Immediate | Service down, high error rate, data loss |
| Warning | Business hrs | High latency, approaching limits, retry spikes |
| Info | Log only | Deployment started, config changed |
| 级别 | 响应时间 | 示例 |
|---|---|---|
| Critical | 立即响应 | 服务宕机、高错误率、数据丢失 |
| Warning | 工作时间响应 | 高延迟、接近阈值、重试次数激增 |
| Info | 仅记录 | 部署启动、配置变更 |
Best Practices
最佳实践
Logging
日志
-
Log at Appropriate Levels: DEBUG for development, INFO for normal operations, WARN for potential issues, ERROR for failures, FATAL for critical failures.
-
Include Context: Always include correlation IDs, trace IDs, user IDs, and relevant business identifiers in structured fields.
-
Avoid Sensitive Data: Never log passwords, tokens, credit cards, or PII. Implement automatic redaction when necessary.
-
Use Structured Logging: JSON logs enable easy parsing and querying in log aggregation systems (ELK, Loki, Datadog).
-
Consistent Field Names: Standardize field names across services (e.g., always use, not sometimes
correlation_id).request_id
-
选择合适的日志级别:DEBUG用于开发调试,INFO用于正常操作记录,WARN用于潜在问题,ERROR用于失败情况,FATAL用于严重故障。
-
包含上下文信息:始终在结构化字段中包含关联ID、追踪ID、用户ID及相关业务标识符。
-
避免敏感数据:切勿记录密码、令牌、信用卡信息或个人身份信息(PII)。必要时实现自动脱敏。
-
使用结构化日志:JSON日志便于在日志聚合系统(ELK、Loki、Datadog)中解析和查询。
-
统一字段名称:跨服务标准化字段名称(例如,始终使用,而非偶尔使用
correlation_id)。request_id
Distributed Tracing
分布式追踪
-
Trace Boundaries: Create spans at service boundaries, database calls, external API calls, and significant operations.
-
Propagate Context: Pass trace IDs and span IDs across service boundaries via HTTP headers (OpenTelemetry standards).
-
Add Meaningful Attributes: Include business context (user_id, order_id) and technical context (db_query, cache_hit) in span attributes.
-
Sample Appropriately: Use adaptive sampling - trace 100% of errors, sample successful requests based on traffic volume.
-
明确追踪边界:在服务边界、数据库调用、外部API调用及重要操作处创建Span。
-
传播上下文:通过HTTP头(遵循OpenTelemetry标准)在服务间传递追踪ID和Span ID。
-
添加有意义的属性:在Span属性中包含业务上下文(user_id、order_id)和技术上下文(db_query、cache_hit)。
-
合理采样:使用自适应采样 - 100%追踪错误请求,根据流量对成功请求进行采样。
Metrics
指标
-
Track Golden Signals: Monitor the Four Golden Signals - latency, traffic, errors, saturation.
-
Use Correct Metric Types: Counters for totals (requests), Gauges for current values (memory), Histograms for distributions (latency).
-
Label Cardinality: Keep label cardinality low - avoid high-cardinality values like user IDs in metric labels.
-
Naming Conventions: Follow Prometheus naming -(counter),
http_requests_total(gauge),process_memory_bytes(histogram).http_request_duration_seconds
-
追踪黄金信号:监控四大黄金信号 - 延迟、流量、错误、饱和度。
-
使用正确的指标类型:计数器用于统计总数(请求数),仪表盘用于当前值(内存占用),直方图用于分布数据(延迟)。
-
控制标签基数:保持标签基数较低 - 避免在指标标签中使用高基数值(如用户ID)。
-
命名规范:遵循Prometheus命名规范 -(计数器)、
http_requests_total(仪表盘)、process_memory_bytes(直方图)。http_request_duration_seconds
Alerting
告警
-
Alert on Symptoms: Alert on user-impacting issues (error rate, latency), not causes (CPU usage). Symptoms indicate what is broken, causes explain why.
-
Include Runbooks: Every alert must link to a runbook with investigation steps, common causes, and remediation procedures.
-
Use Appropriate Thresholds: Set thresholds based on SLOs and historical data, not arbitrary values.
-
Alert Fatigue: Ensure alerts are actionable. Non-actionable alerts lead to alert fatigue and ignored critical issues.
-
基于症状告警:针对影响用户的问题(错误率、延迟)告警,而非直接针对原因(CPU使用率)。症状表明哪里出了问题,原因解释为什么出问题。
-
包含运行手册:每个告警必须链接到包含调查步骤、常见原因和修复流程的运行手册。
-
设置合理阈值:基于服务级别目标(SLO)和历史数据设置阈值,而非任意值。
-
避免告警疲劳:确保告警是可操作的。非可操作的告警会导致告警疲劳,进而忽略关键问题。
Integration
集成
-
End-to-End Correlation: Link logs, traces, and metrics using correlation IDs to enable cross-system debugging.
-
Centralize: Use centralized log aggregation (ELK, Loki) and trace collection (Jaeger, Zipkin) for cross-service visibility.
-
Test Observability: Verify logging, tracing, and metrics in development - don't discover gaps in production.
-
端到端关联:使用关联ID链接日志、追踪和指标,实现跨系统调试。
-
集中管理:使用集中式日志聚合(ELK、Loki)和追踪收集(Jaeger、Zipkin)实现跨服务可见性。
-
测试可观测性:在开发环境验证日志、追踪和指标功能 - 不要等到生产环境才发现漏洞。
Examples
示例
Complete Request Logging Middleware
完整的请求日志中间件
python
import time
import uuid
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
class ObservabilityMiddleware(BaseHTTPMiddleware):
def __init__(self, app, tracer, metrics):
super().__init__(app)
self.tracer = tracer
self.request_counter = metrics.counter("http_requests_total")
self.request_duration = metrics.histogram(
"http_request_duration_seconds",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
async def dispatch(self, request: Request, call_next):
# Extract or generate correlation ID
corr_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
correlation_id.set(corr_id)
start_time = time.time()
with self.tracer.start_as_current_span(
f"{request.method} {request.url.path}"
) as span:
span.set_attribute("http.method", request.method)
span.set_attribute("http.url", str(request.url))
span.set_attribute("correlation_id", corr_id)
try:
response = await call_next(request)
span.set_attribute("http.status_code", response.status_code)
# Record metrics
labels = {
"method": request.method,
"path": request.url.path,
"status": str(response.status_code)
}
self.request_counter.labels(**labels).inc()
self.request_duration.labels(**labels).observe(
time.time() - start_time
)
# Add correlation ID to response
response.headers["X-Correlation-ID"] = corr_id
return response
except Exception as e:
span.set_attribute("error", True)
span.record_exception(e)
raisepython
import time
import uuid
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
class ObservabilityMiddleware(BaseHTTPMiddleware):
def __init__(self, app, tracer, metrics):
super().__init__(app)
self.tracer = tracer
self.request_counter = metrics.counter("http_requests_total")
self.request_duration = metrics.histogram(
"http_request_duration_seconds",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
async def dispatch(self, request: Request, call_next):
# Extract or generate correlation ID
corr_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
correlation_id.set(corr_id)
start_time = time.time()
with self.tracer.start_as_current_span(
f"{request.method} {request.url.path}"
) as span:
span.set_attribute("http.method", request.method)
span.set_attribute("http.url", str(request.url))
span.set_attribute("correlation_id", corr_id)
try:
response = await call_next(request)
span.set_attribute("http.status_code", response.status_code)
# Record metrics
labels = {
"method": request.method,
"path": request.url.path,
"status": str(response.status_code)
}
self.request_counter.labels(**labels).inc()
self.request_duration.labels(**labels).observe(
time.time() - start_time
)
# Add correlation ID to response
response.headers["X-Correlation-ID"] = corr_id
return response
except Exception as e:
span.set_attribute("error", True)
span.record_exception(e)
raise