logging-observability

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Logging and Observability

日志与可观测性

Overview

概述

Observability enables understanding system behavior through logs, metrics, and traces. This skill provides patterns for:
  • Structured Logging: JSON logs with correlation IDs and contextual data
  • Distributed Tracing: Span-based request tracking across services (OpenTelemetry, Jaeger, Zipkin)
  • Metrics Collection: Counters, gauges, histograms for system health (Prometheus patterns)
  • Log Aggregation: Centralized log management (ELK, Loki, Datadog)
  • Alerting: Symptom-based alerts with runbooks
可观测性通过日志、指标和追踪数据帮助理解系统行为。本技能提供以下相关模式:
  • 结构化日志:包含关联ID和上下文数据的JSON日志
  • 分布式追踪:跨服务的基于Span的请求追踪(OpenTelemetry、Jaeger、Zipkin)
  • 指标收集:用于系统健康监控的计数器、仪表盘、直方图(Prometheus模式)
  • 日志聚合:集中式日志管理(ELK、Loki、Datadog)
  • 告警:基于症状的告警及运行手册

Instructions

操作指南

1. Structured Logging (JSON Logs)

1. 结构化日志(JSON日志)

Python Implementation

Python 实现

python
import json
import logging
import sys
from datetime import datetime
from contextvars import ContextVar
from typing import Any
python
import json
import logging
import sys
from datetime import datetime
from contextvars import ContextVar
from typing import Any

Context variables for request tracking

Context variables for request tracking

correlation_id: ContextVar[str] = ContextVar('correlation_id', default='') span_id: ContextVar[str] = ContextVar('span_id', default='')
class StructuredFormatter(logging.Formatter): """JSON formatter for structured logging."""
def format(self, record: logging.LogRecord) -> str:
    log_data = {
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "level": record.levelname,
        "logger": record.name,
        "message": record.getMessage(),
        "correlation_id": correlation_id.get(),
        "span_id": span_id.get(),
    }

    # Add exception info if present
    if record.exc_info:
        log_data["exception"] = self.formatException(record.exc_info)

    # Add extra fields
    if hasattr(record, 'structured_data'):
        log_data.update(record.structured_data)

    return json.dumps(log_data)
def setup_logging(): """Configure structured logging.""" handler = logging.StreamHandler(sys.stdout) handler.setFormatter(StructuredFormatter())
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(handler)
correlation_id: ContextVar[str] = ContextVar('correlation_id', default='') span_id: ContextVar[str] = ContextVar('span_id', default='')
class StructuredFormatter(logging.Formatter): """JSON formatter for structured logging."""
def format(self, record: logging.LogRecord) -> str:
    log_data = {
        "timestamp": datetime.utcnow().isoformat() + "Z",
        "level": record.levelname,
        "logger": record.name,
        "message": record.getMessage(),
        "correlation_id": correlation_id.get(),
        "span_id": span_id.get(),
    }

    # Add exception info if present
    if record.exc_info:
        log_data["exception"] = self.formatException(record.exc_info)

    # Add extra fields
    if hasattr(record, 'structured_data'):
        log_data.update(record.structured_data)

    return json.dumps(log_data)
def setup_logging(): """Configure structured logging.""" handler = logging.StreamHandler(sys.stdout) handler.setFormatter(StructuredFormatter())
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(handler)

Usage

Usage

logger = logging.getLogger(name) logger.info("User logged in", extra={ "structured_data": { "user_id": "123", "ip_address": "192.168.1.1", "action": "login" } })
undefined
logger = logging.getLogger(name) logger.info("User logged in", extra={ "structured_data": { "user_id": "123", "ip_address": "192.168.1.1", "action": "login" } })
undefined

TypeScript Implementation

TypeScript 实现

typescript
interface LogContext {
  correlationId?: string;
  spanId?: string;
  [key: string]: unknown;
}

interface LogEntry {
  timestamp: string;
  level: string;
  message: string;
  context: LogContext;
}

class StructuredLogger {
  private context: LogContext = {};

  withContext(context: LogContext): StructuredLogger {
    const child = new StructuredLogger();
    child.context = { ...this.context, ...context };
    return child;
  }

  private log(
    level: string,
    message: string,
    data?: Record<string, unknown>,
  ): void {
    const entry: LogEntry = {
      timestamp: new Date().toISOString(),
      level,
      message,
      context: { ...this.context, ...data },
    };
    console.log(JSON.stringify(entry));
  }

  debug(message: string, data?: Record<string, unknown>): void {
    this.log("DEBUG", message, data);
  }

  info(message: string, data?: Record<string, unknown>): void {
    this.log("INFO", message, data);
  }

  warn(message: string, data?: Record<string, unknown>): void {
    this.log("WARN", message, data);
  }

  error(message: string, data?: Record<string, unknown>): void {
    this.log("ERROR", message, data);
  }
}
typescript
interface LogContext {
  correlationId?: string;
  spanId?: string;
  [key: string]: unknown;
}

interface LogEntry {
  timestamp: string;
  level: string;
  message: string;
  context: LogContext;
}

class StructuredLogger {
  private context: LogContext = {};

  withContext(context: LogContext): StructuredLogger {
    const child = new StructuredLogger();
    child.context = { ...this.context, ...context };
    return child;
  }

  private log(
    level: string,
    message: string,
    data?: Record<string, unknown>,
  ): void {
    const entry: LogEntry = {
      timestamp: new Date().toISOString(),
      level,
      message,
      context: { ...this.context, ...data },
    };
    console.log(JSON.stringify(entry));
  }

  debug(message: string, data?: Record<string, unknown>): void {
    this.log("DEBUG", message, data);
  }

  info(message: string, data?: Record<string, unknown>): void {
    this.log("INFO", message, data);
  }

  warn(message: string, data?: Record<string, unknown>): void {
    this.log("WARN", message, data);
  }

  error(message: string, data?: Record<string, unknown>): void {
    this.log("ERROR", message, data);
  }
}

2. Log Levels and When to Use Each

2. 日志级别及其适用场景

LevelUsageExamples
TRACEFine-grained debuggingLoop iterations, variable values
DEBUGDiagnostic informationFunction entry/exit, intermediate states
INFONormal operationsRequest started, job completed, user action
WARNPotential issuesDeprecated API usage, retry attempted, slow query
ERRORFailures requiring attentionException caught, operation failed
FATALCritical failuresSystem cannot continue, data corruption
python
undefined
级别适用场景示例
TRACE细粒度调试循环迭代、变量值
DEBUG诊断信息函数进入/退出、中间状态
INFO正常操作记录请求启动、任务完成、用户操作
WARN潜在问题预警已弃用API使用、重试尝试、慢查询
ERROR需要关注的失败情况捕获到异常、操作失败
FATAL严重故障系统无法继续运行、数据损坏
python
undefined

Log level usage examples

Log level usage examples

logger.debug("Processing item", extra={"structured_data": {"item_id": item.id}}) logger.info("Order processed successfully", extra={"structured_data": {"order_id": order.id, "total": order.total}}) logger.warning("Rate limit approaching", extra={"structured_data": {"current": 95, "limit": 100}}) logger.error("Payment failed", extra={"structured_data": {"order_id": order.id, "error": str(e)}})
undefined
logger.debug("Processing item", extra={"structured_data": {"item_id": item.id}}) logger.info("Order processed successfully", extra={"structured_data": {"order_id": order.id, "total": order.total}}) logger.warning("Rate limit approaching", extra={"structured_data": {"current": 95, "limit": 100}}) logger.error("Payment failed", extra={"structured_data": {"order_id": order.id, "error": str(e)}})
undefined

3. Distributed Tracing

3. 分布式追踪

Correlation IDs and Spans

关联ID与Span

python
import uuid
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Optional
import time

@dataclass
class Span:
    name: str
    trace_id: str
    span_id: str = field(default_factory=lambda: str(uuid.uuid4())[:16])
    parent_span_id: Optional[str] = None
    start_time: float = field(default_factory=time.time)
    end_time: Optional[float] = None
    attributes: dict = field(default_factory=dict)

    def end(self):
        self.end_time = time.time()

    @property
    def duration_ms(self) -> float:
        if self.end_time:
            return (self.end_time - self.start_time) * 1000
        return 0

current_span: ContextVar[Optional[Span]] = ContextVar('current_span', default=None)

class Tracer:
    def __init__(self, service_name: str):
        self.service_name = service_name

    def start_span(self, name: str, parent: Optional[Span] = None) -> Span:
        parent = parent or current_span.get()
        trace_id = parent.trace_id if parent else str(uuid.uuid4())[:32]
        parent_span_id = parent.span_id if parent else None

        span = Span(
            name=name,
            trace_id=trace_id,
            parent_span_id=parent_span_id,
            attributes={"service": self.service_name}
        )
        current_span.set(span)
        return span

    def end_span(self, span: Span):
        span.end()
        self._export(span)
        # Restore parent span if exists
        # In production, use a span stack

    def _export(self, span: Span):
        """Export span to tracing backend."""
        logger.info(f"Span completed: {span.name}", extra={
            "structured_data": {
                "trace_id": span.trace_id,
                "span_id": span.span_id,
                "parent_span_id": span.parent_span_id,
                "duration_ms": span.duration_ms,
                "attributes": span.attributes
            }
        })
python
import uuid
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Optional
import time

@dataclass
class Span:
    name: str
    trace_id: str
    span_id: str = field(default_factory=lambda: str(uuid.uuid4())[:16])
    parent_span_id: Optional[str] = None
    start_time: float = field(default_factory=time.time)
    end_time: Optional[float] = None
    attributes: dict = field(default_factory=dict)

    def end(self):
        self.end_time = time.time()

    @property
    def duration_ms(self) -> float:
        if self.end_time:
            return (self.end_time - self.start_time) * 1000
        return 0

current_span: ContextVar[Optional[Span]] = ContextVar('current_span', default=None)

class Tracer:
    def __init__(self, service_name: str):
        self.service_name = service_name

    def start_span(self, name: str, parent: Optional[Span] = None) -> Span:
        parent = parent or current_span.get()
        trace_id = parent.trace_id if parent else str(uuid.uuid4())[:32]
        parent_span_id = parent.span_id if parent else None

        span = Span(
            name=name,
            trace_id=trace_id,
            parent_span_id=parent_span_id,
            attributes={"service": self.service_name}
        )
        current_span.set(span)
        return span

    def end_span(self, span: Span):
        span.end()
        self._export(span)
        # Restore parent span if exists
        # In production, use a span stack

    def _export(self, span: Span):
        """Export span to tracing backend."""
        logger.info(f"Span completed: {span.name}", extra={
            "structured_data": {
                "trace_id": span.trace_id,
                "span_id": span.span_id,
                "parent_span_id": span.parent_span_id,
                "duration_ms": span.duration_ms,
                "attributes": span.attributes
            }
        })

Context manager for spans

Context manager for spans

from contextlib import contextmanager
@contextmanager def trace_span(tracer: Tracer, name: str): span = tracer.start_span(name) try: yield span except Exception as e: span.attributes["error"] = True span.attributes["error.message"] = str(e) raise finally: tracer.end_span(span)
from contextlib import contextmanager
@contextmanager def trace_span(tracer: Tracer, name: str): span = tracer.start_span(name) try: yield span except Exception as e: span.attributes["error"] = True span.attributes["error.message"] = str(e) raise finally: tracer.end_span(span)

Usage

Usage

tracer = Tracer("order-service")
async def process_order(order_id: str): with trace_span(tracer, "process_order") as span: span.attributes["order_id"] = order_id
    with trace_span(tracer, "validate_order"):
        await validate(order_id)

    with trace_span(tracer, "charge_payment"):
        await charge(order_id)
undefined
tracer = Tracer("order-service")
async def process_order(order_id: str): with trace_span(tracer, "process_order") as span: span.attributes["order_id"] = order_id
    with trace_span(tracer, "validate_order"):
        await validate(order_id)

    with trace_span(tracer, "charge_payment"):
        await charge(order_id)
undefined

4. Metrics Collection

4. 指标收集

python
from dataclasses import dataclass
from typing import Dict, List
from enum import Enum
import time
import threading

class MetricType(Enum):
    COUNTER = "counter"
    GAUGE = "gauge"
    HISTOGRAM = "histogram"

@dataclass
class Counter:
    name: str
    labels: Dict[str, str]
    value: float = 0

    def inc(self, amount: float = 1):
        self.value += amount

@dataclass
class Gauge:
    name: str
    labels: Dict[str, str]
    value: float = 0

    def set(self, value: float):
        self.value = value

    def inc(self, amount: float = 1):
        self.value += amount

    def dec(self, amount: float = 1):
        self.value -= amount

@dataclass
class Histogram:
    name: str
    labels: Dict[str, str]
    buckets: List[float]
    values: List[float] = None

    def __post_init__(self):
        self.values = []
        self._bucket_counts = {b: 0 for b in self.buckets}
        self._bucket_counts[float('inf')] = 0
        self._sum = 0
        self._count = 0

    def observe(self, value: float):
        self.values.append(value)
        self._sum += value
        self._count += 1
        for bucket in sorted(self._bucket_counts.keys()):
            if value <= bucket:
                self._bucket_counts[bucket] += 1

class MetricsRegistry:
    def __init__(self):
        self._metrics: Dict[str, any] = {}
        self._lock = threading.Lock()

    def counter(self, name: str, labels: Dict[str, str] = None) -> Counter:
        key = f"{name}:{labels}"
        with self._lock:
            if key not in self._metrics:
                self._metrics[key] = Counter(name, labels or {})
            return self._metrics[key]

    def gauge(self, name: str, labels: Dict[str, str] = None) -> Gauge:
        key = f"{name}:{labels}"
        with self._lock:
            if key not in self._metrics:
                self._metrics[key] = Gauge(name, labels or {})
            return self._metrics[key]

    def histogram(self, name: str, buckets: List[float], labels: Dict[str, str] = None) -> Histogram:
        key = f"{name}:{labels}"
        with self._lock:
            if key not in self._metrics:
                self._metrics[key] = Histogram(name, labels or {}, buckets)
            return self._metrics[key]
python
from dataclasses import dataclass
from typing import Dict, List
from enum import Enum
import time
import threading

class MetricType(Enum):
    COUNTER = "counter"
    GAUGE = "gauge"
    HISTOGRAM = "histogram"

@dataclass
class Counter:
    name: str
    labels: Dict[str, str]
    value: float = 0

    def inc(self, amount: float = 1):
        self.value += amount

@dataclass
class Gauge:
    name: str
    labels: Dict[str, str]
    value: float = 0

    def set(self, value: float):
        self.value = value

    def inc(self, amount: float = 1):
        self.value += amount

    def dec(self, amount: float = 1):
        self.value -= amount

@dataclass
class Histogram:
    name: str
    labels: Dict[str, str]
    buckets: List[float]
    values: List[float] = None

    def __post_init__(self):
        self.values = []
        self._bucket_counts = {b: 0 for b in self.buckets}
        self._bucket_counts[float('inf')] = 0
        self._sum = 0
        self._count = 0

    def observe(self, value: float):
        self.values.append(value)
        self._sum += value
        self._count += 1
        for bucket in sorted(self._bucket_counts.keys()):
            if value <= bucket:
                self._bucket_counts[bucket] += 1

class MetricsRegistry:
    def __init__(self):
        self._metrics: Dict[str, any] = {}
        self._lock = threading.Lock()

    def counter(self, name: str, labels: Dict[str, str] = None) -> Counter:
        key = f"{name}:{labels}"
        with self._lock:
            if key not in self._metrics:
                self._metrics[key] = Counter(name, labels or {})
            return self._metrics[key]

    def gauge(self, name: str, labels: Dict[str, str] = None) -> Gauge:
        key = f"{name}:{labels}"
        with self._lock:
            if key not in self._metrics:
                self._metrics[key] = Gauge(name, labels or {})
            return self._metrics[key]

    def histogram(self, name: str, buckets: List[float], labels: Dict[str, str] = None) -> Histogram:
        key = f"{name}:{labels}"
        with self._lock:
            if key not in self._metrics:
                self._metrics[key] = Histogram(name, labels or {}, buckets)
            return self._metrics[key]

Usage

Usage

metrics = MetricsRegistry()
metrics = MetricsRegistry()

Counter for requests

Counter for requests

request_counter = metrics.counter("http_requests_total", {"method": "GET", "path": "/api/orders"}) request_counter.inc()
request_counter = metrics.counter("http_requests_total", {"method": "GET", "path": "/api/orders"}) request_counter.inc()

Gauge for active connections

Gauge for active connections

active_connections = metrics.gauge("active_connections") active_connections.inc()
active_connections = metrics.gauge("active_connections") active_connections.inc()

... handle connection ...

... handle connection ...

active_connections.dec()
active_connections.dec()

Histogram for request duration

Histogram for request duration

request_duration = metrics.histogram( "http_request_duration_seconds", buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0] )
start = time.time()
request_duration = metrics.histogram( "http_request_duration_seconds", buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0] )
start = time.time()

... handle request ...

... handle request ...

request_duration.observe(time.time() - start)
undefined
request_duration.observe(time.time() - start)
undefined

5. OpenTelemetry Patterns

5. OpenTelemetry 模式

python
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

def setup_opentelemetry(service_name: str, otlp_endpoint: str):
    """Initialize OpenTelemetry with OTLP export."""

    # Tracing setup
    trace_provider = TracerProvider(
        resource=Resource.create({"service.name": service_name})
    )
    trace_provider.add_span_processor(
        BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
    )
    trace.set_tracer_provider(trace_provider)

    # Metrics setup
    metric_provider = MeterProvider(
        resource=Resource.create({"service.name": service_name})
    )
    metrics.set_meter_provider(metric_provider)

    # Auto-instrumentation
    RequestsInstrumentor().instrument()

    return trace.get_tracer(service_name), metrics.get_meter(service_name)
python
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

def setup_opentelemetry(service_name: str, otlp_endpoint: str):
    """Initialize OpenTelemetry with OTLP export."""

    # Tracing setup
    trace_provider = TracerProvider(
        resource=Resource.create({"service.name": service_name})
    )
    trace_provider.add_span_processor(
        BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
    )
    trace.set_tracer_provider(trace_provider)

    # Metrics setup
    metric_provider = MeterProvider(
        resource=Resource.create({"service.name": service_name})
    )
    metrics.set_meter_provider(metric_provider)

    # Auto-instrumentation
    RequestsInstrumentor().instrument()

    return trace.get_tracer(service_name), metrics.get_meter(service_name)

Usage with FastAPI

Usage with FastAPI

from fastapi import FastAPI
app = FastAPI() FastAPIInstrumentor.instrument_app(app)
tracer, meter = setup_opentelemetry("order-service", "http://otel-collector:4317")
from fastapi import FastAPI
app = FastAPI() FastAPIInstrumentor.instrument_app(app)
tracer, meter = setup_opentelemetry("order-service", "http://otel-collector:4317")

Custom spans

Custom spans

@app.get("/orders/{order_id}") async def get_order(order_id: str): with tracer.start_as_current_span("fetch_order") as span: span.set_attribute("order.id", order_id) order = await order_repository.get(order_id) span.set_attribute("order.status", order.status) return order
undefined
@app.get("/orders/{order_id}") async def get_order(order_id: str): with tracer.start_as_current_span("fetch_order") as span: span.set_attribute("order.id", order_id) order = await order_repository.get(order_id) span.set_attribute("order.status", order.status) return order
undefined

6. Log Aggregation Patterns

6. 日志聚合模式

ELK Stack (Elasticsearch, Logstash, Kibana)

ELK 栈(Elasticsearch、Logstash、Kibana)

yaml
undefined
yaml
undefined

Logstash pipeline configuration

Logstash pipeline configuration

input { file { path => "/var/log/app/*.log" codec => json } }
filter {

Parse structured JSON logs

json { source => "message" }

Add Elasticsearch index based on date

mutate { add_field => { "[@metadata][index]" => "app-logs-%{+YYYY.MM.dd}" } }

Enrich with geolocation (if IP present)

geoip { source => "ip_address" target => "geo" } }
output { elasticsearch { hosts => ["elasticsearch:9200"] index => "%{[@metadata][index]}" } }
undefined
input { file { path => "/var/log/app/*.log" codec => json } }
filter {

Parse structured JSON logs

json { source => "message" }

Add Elasticsearch index based on date

mutate { add_field => { "[@metadata][index]" => "app-logs-%{+YYYY.MM.dd}" } }

Enrich with geolocation (if IP present)

geoip { source => "ip_address" target => "geo" } }
output { elasticsearch { hosts => ["elasticsearch:9200"] index => "%{[@metadata][index]}" } }
undefined

Grafana Loki

Grafana Loki

yaml
undefined
yaml
undefined

Promtail scrape configuration

Promtail scrape configuration

scrape_configs:
  • job_name: app-logs static_configs:
    • targets:
      • localhost labels: job: app-logs path: /var/log/app/*.log

    Extract JSON fields as labels

    pipeline_stages:
    • json: expressions: level: level correlation_id: correlation_id service: service
    • labels: level: correlation_id: service:
undefined
scrape_configs:
  • job_name: app-logs static_configs:
    • targets:
      • localhost labels: job: app-logs path: /var/log/app/*.log

    Extract JSON fields as labels

    pipeline_stages:
    • json: expressions: level: level correlation_id: correlation_id service: service
    • labels: level: correlation_id: service:
undefined

Datadog Agent Configuration

Datadog Agent 配置

yaml
undefined
yaml
undefined

datadog.yaml

datadog.yaml

logs_enabled: true
logs_config: processing_rules: - type: exclude_at_match name: exclude_healthcheck pattern: "GET /health"

Auto-parse JSON logs

auto_multi_line_detection: true
logs_enabled: true
logs_config: processing_rules: - type: exclude_at_match name: exclude_healthcheck pattern: "GET /health"

Auto-parse JSON logs

auto_multi_line_detection: true

Log collection from files

Log collection from files

logs:
  • type: file path: "/var/log/app/*.log" service: "order-service" source: "python" tags:
    • "env:production"
undefined
logs:
  • type: file path: "/var/log/app/*.log" service: "order-service" source: "python" tags:
    • "env:production"
undefined

7. Alert Design

7. 告警设计

Prometheus Alerting Rules

Prometheus 告警规则

yaml
undefined
yaml
undefined

Prometheus alerting rules

Prometheus alerting rules

groups:
  • name: service-alerts rules:

    High error rate alert

    • alert: HighErrorRate expr: | sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m])) > 0.05 for: 5m labels: severity: critical annotations: summary: "High error rate detected" description: "Error rate is {{ $value | humanizePercentage }} over the last 5 minutes" runbook_url: "https://wiki.example.com/runbooks/high-error-rate"

    High latency alert

    • alert: HighLatency expr: | histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[5m])) by (le)) > 1 for: 10m labels: severity: warning annotations: summary: "High latency detected" description: "95th percentile latency is {{ $value }}s"

    Service down alert

    • alert: ServiceDown expr: up == 0 for: 1m labels: severity: critical annotations: summary: "Service {{ $labels.instance }} is down" description: "{{ $labels.job }} has been down for more than 1 minute"
undefined
groups:
  • name: service-alerts rules:

    High error rate alert

    • alert: HighErrorRate expr: | sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m])) > 0.05 for: 5m labels: severity: critical annotations: summary: "High error rate detected" description: "Error rate is {{ $value | humanizePercentage }} over the last 5 minutes" runbook_url: "https://wiki.example.com/runbooks/high-error-rate"

    High latency alert

    • alert: HighLatency expr: | histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[5m])) by (le)) > 1 for: 10m labels: severity: warning annotations: summary: "High latency detected" description: "95th percentile latency is {{ $value }}s"

    Service down alert

    • alert: ServiceDown expr: up == 0 for: 1m labels: severity: critical annotations: summary: "Service {{ $labels.instance }} is down" description: "{{ $labels.job }} has been down for more than 1 minute"
undefined

Alert Severity Levels

告警严重级别

LevelResponse TimeExamples
CriticalImmediateService down, high error rate, data loss
WarningBusiness hrsHigh latency, approaching limits, retry spikes
InfoLog onlyDeployment started, config changed
级别响应时间示例
Critical立即响应服务宕机、高错误率、数据丢失
Warning工作时间响应高延迟、接近阈值、重试次数激增
Info仅记录部署启动、配置变更

Best Practices

最佳实践

Logging

日志

  1. Log at Appropriate Levels: DEBUG for development, INFO for normal operations, WARN for potential issues, ERROR for failures, FATAL for critical failures.
  2. Include Context: Always include correlation IDs, trace IDs, user IDs, and relevant business identifiers in structured fields.
  3. Avoid Sensitive Data: Never log passwords, tokens, credit cards, or PII. Implement automatic redaction when necessary.
  4. Use Structured Logging: JSON logs enable easy parsing and querying in log aggregation systems (ELK, Loki, Datadog).
  5. Consistent Field Names: Standardize field names across services (e.g., always use
    correlation_id
    , not sometimes
    request_id
    ).
  1. 选择合适的日志级别:DEBUG用于开发调试,INFO用于正常操作记录,WARN用于潜在问题,ERROR用于失败情况,FATAL用于严重故障。
  2. 包含上下文信息:始终在结构化字段中包含关联ID、追踪ID、用户ID及相关业务标识符。
  3. 避免敏感数据:切勿记录密码、令牌、信用卡信息或个人身份信息(PII)。必要时实现自动脱敏。
  4. 使用结构化日志:JSON日志便于在日志聚合系统(ELK、Loki、Datadog)中解析和查询。
  5. 统一字段名称:跨服务标准化字段名称(例如,始终使用
    correlation_id
    ,而非偶尔使用
    request_id
    )。

Distributed Tracing

分布式追踪

  1. Trace Boundaries: Create spans at service boundaries, database calls, external API calls, and significant operations.
  2. Propagate Context: Pass trace IDs and span IDs across service boundaries via HTTP headers (OpenTelemetry standards).
  3. Add Meaningful Attributes: Include business context (user_id, order_id) and technical context (db_query, cache_hit) in span attributes.
  4. Sample Appropriately: Use adaptive sampling - trace 100% of errors, sample successful requests based on traffic volume.
  1. 明确追踪边界:在服务边界、数据库调用、外部API调用及重要操作处创建Span。
  2. 传播上下文:通过HTTP头(遵循OpenTelemetry标准)在服务间传递追踪ID和Span ID。
  3. 添加有意义的属性:在Span属性中包含业务上下文(user_id、order_id)和技术上下文(db_query、cache_hit)。
  4. 合理采样:使用自适应采样 - 100%追踪错误请求,根据流量对成功请求进行采样。

Metrics

指标

  1. Track Golden Signals: Monitor the Four Golden Signals - latency, traffic, errors, saturation.
  2. Use Correct Metric Types: Counters for totals (requests), Gauges for current values (memory), Histograms for distributions (latency).
  3. Label Cardinality: Keep label cardinality low - avoid high-cardinality values like user IDs in metric labels.
  4. Naming Conventions: Follow Prometheus naming -
    http_requests_total
    (counter),
    process_memory_bytes
    (gauge),
    http_request_duration_seconds
    (histogram).
  1. 追踪黄金信号:监控四大黄金信号 - 延迟、流量、错误、饱和度。
  2. 使用正确的指标类型:计数器用于统计总数(请求数),仪表盘用于当前值(内存占用),直方图用于分布数据(延迟)。
  3. 控制标签基数:保持标签基数较低 - 避免在指标标签中使用高基数值(如用户ID)。
  4. 命名规范:遵循Prometheus命名规范 -
    http_requests_total
    (计数器)、
    process_memory_bytes
    (仪表盘)、
    http_request_duration_seconds
    (直方图)。

Alerting

告警

  1. Alert on Symptoms: Alert on user-impacting issues (error rate, latency), not causes (CPU usage). Symptoms indicate what is broken, causes explain why.
  2. Include Runbooks: Every alert must link to a runbook with investigation steps, common causes, and remediation procedures.
  3. Use Appropriate Thresholds: Set thresholds based on SLOs and historical data, not arbitrary values.
  4. Alert Fatigue: Ensure alerts are actionable. Non-actionable alerts lead to alert fatigue and ignored critical issues.
  1. 基于症状告警:针对影响用户的问题(错误率、延迟)告警,而非直接针对原因(CPU使用率)。症状表明哪里出了问题,原因解释为什么出问题。
  2. 包含运行手册:每个告警必须链接到包含调查步骤、常见原因和修复流程的运行手册。
  3. 设置合理阈值:基于服务级别目标(SLO)和历史数据设置阈值,而非任意值。
  4. 避免告警疲劳:确保告警是可操作的。非可操作的告警会导致告警疲劳,进而忽略关键问题。

Integration

集成

  1. End-to-End Correlation: Link logs, traces, and metrics using correlation IDs to enable cross-system debugging.
  2. Centralize: Use centralized log aggregation (ELK, Loki) and trace collection (Jaeger, Zipkin) for cross-service visibility.
  3. Test Observability: Verify logging, tracing, and metrics in development - don't discover gaps in production.
  1. 端到端关联:使用关联ID链接日志、追踪和指标,实现跨系统调试。
  2. 集中管理:使用集中式日志聚合(ELK、Loki)和追踪收集(Jaeger、Zipkin)实现跨服务可见性。
  3. 测试可观测性:在开发环境验证日志、追踪和指标功能 - 不要等到生产环境才发现漏洞。

Examples

示例

Complete Request Logging Middleware

完整的请求日志中间件

python
import time
import uuid
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware

class ObservabilityMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, tracer, metrics):
        super().__init__(app)
        self.tracer = tracer
        self.request_counter = metrics.counter("http_requests_total")
        self.request_duration = metrics.histogram(
            "http_request_duration_seconds",
            buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
        )

    async def dispatch(self, request: Request, call_next):
        # Extract or generate correlation ID
        corr_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
        correlation_id.set(corr_id)

        start_time = time.time()

        with self.tracer.start_as_current_span(
            f"{request.method} {request.url.path}"
        ) as span:
            span.set_attribute("http.method", request.method)
            span.set_attribute("http.url", str(request.url))
            span.set_attribute("correlation_id", corr_id)

            try:
                response = await call_next(request)

                span.set_attribute("http.status_code", response.status_code)

                # Record metrics
                labels = {
                    "method": request.method,
                    "path": request.url.path,
                    "status": str(response.status_code)
                }
                self.request_counter.labels(**labels).inc()
                self.request_duration.labels(**labels).observe(
                    time.time() - start_time
                )

                # Add correlation ID to response
                response.headers["X-Correlation-ID"] = corr_id

                return response

            except Exception as e:
                span.set_attribute("error", True)
                span.record_exception(e)
                raise
python
import time
import uuid
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware

class ObservabilityMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, tracer, metrics):
        super().__init__(app)
        self.tracer = tracer
        self.request_counter = metrics.counter("http_requests_total")
        self.request_duration = metrics.histogram(
            "http_request_duration_seconds",
            buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
        )

    async def dispatch(self, request: Request, call_next):
        # Extract or generate correlation ID
        corr_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
        correlation_id.set(corr_id)

        start_time = time.time()

        with self.tracer.start_as_current_span(
            f"{request.method} {request.url.path}"
        ) as span:
            span.set_attribute("http.method", request.method)
            span.set_attribute("http.url", str(request.url))
            span.set_attribute("correlation_id", corr_id)

            try:
                response = await call_next(request)

                span.set_attribute("http.status_code", response.status_code)

                # Record metrics
                labels = {
                    "method": request.method,
                    "path": request.url.path,
                    "status": str(response.status_code)
                }
                self.request_counter.labels(**labels).inc()
                self.request_duration.labels(**labels).observe(
                    time.time() - start_time
                )

                # Add correlation ID to response
                response.headers["X-Correlation-ID"] = corr_id

                return response

            except Exception as e:
                span.set_attribute("error", True)
                span.record_exception(e)
                raise