data-engineering-observability

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Pipeline Observability

数据管道可观测性

Tracing and metrics for data pipelines using OpenTelemetry and Prometheus. Instrument code for visibility into performance, errors, and data lineage.
使用OpenTelemetry和Prometheus为数据管道实现追踪与指标监控。通过代码埋点,实现对性能、错误及数据血缘的可视化洞察。

Quick Reference

快速参考

ToolPurposeWhat it Measures
OpenTelemetryDistributed tracingPipeline stages, latency, dependencies
PrometheusMetricsThroughput, error rates, resource utilization
GrafanaVisualizationDashboards combining traces + metrics
工具用途测量内容
OpenTelemetry分布式追踪管道阶段、延迟、依赖关系
Prometheus指标监控吞吐量、错误率、资源利用率
Grafana可视化整合追踪与指标的仪表盘

Why Observable?

为什么需要可观测性?

  • Debugging: Trace failed records through pipeline stages
  • Performance: Identify bottlenecks, optimize slow transformations
  • Reliability: Set alerts on error rates, SLA breaches
  • Cost: Track resource usage, optimize expensive operations
  • Compliance: Audit trail of data transformations
  • 调试:追踪失败记录在管道各阶段的流转
  • 性能优化:识别瓶颈,优化缓慢的转换过程
  • 可靠性保障:针对错误率、SLA违规设置告警
  • 成本控制:跟踪资源使用,优化高成本操作
  • 合规性:数据转换的审计追踪

Skill Dependencies

技能依赖

  • @data-engineering-core
    - Pipeline structure to instrument
  • @data-engineering-orchestration
    - Prefect/Dagster have built-in observability
  • @data-engineering-streaming
    - Stream processing patterns need tracing

  • @data-engineering-core
    - 待埋点的管道结构
  • @data-engineering-orchestration
    - Prefect/Dagster内置可观测性功能
  • @data-engineering-streaming
    - 流处理模式需要追踪功能

OpenTelemetry Integration

OpenTelemetry 集成

OpenTelemetry (OTel) provides a vendor-neutral standard for distributed tracing, metrics, and logs.
OpenTelemetry(OTel)提供了一个厂商中立的分布式追踪、指标与日志标准。

Installation

安装

bash
pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp
bash
pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp

Basic Tracing

基础追踪

python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
import logging
python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
import logging

Setup tracer provider

Setup tracer provider

provider = TracerProvider() exporter = OTLPSpanExporter(endpoint="http://localhost:4317") provider.add_span_processor(BatchSpanProcessor(exporter)) trace.set_tracer_provider(provider)
tracer = trace.get_tracer("data_pipeline")
def run_pipeline(): with tracer.start_as_current_span("extract") as span: span.set_attribute("source", "sales.parquet") span.set_attribute("format", "parquet") df = pl.scan_parquet("data/sales.parquet").collect() span.set_attribute("rows_read", len(df))
with tracer.start_as_current_span("transform") as span:
    span.set_attribute("operation", "aggregation")
    result = df.group_by("category").agg(pl.col("value").sum())

with tracer.start_as_current_span("load") as span:
    span.set_attribute("target", "duckdb.summary")
    result.to_pandas().to_sql("summary", conn, if_exists="replace")
    span.set_attribute("rows_written", len(result))
if name == "main": run_pipeline()
undefined
provider = TracerProvider() exporter = OTLPSpanExporter(endpoint="http://localhost:4317") provider.add_span_processor(BatchSpanProcessor(exporter)) trace.set_tracer_provider(provider)
tracer = trace.get_tracer("data_pipeline")
def run_pipeline(): with tracer.start_as_current_span("extract") as span: span.set_attribute("source", "sales.parquet") span.set_attribute("format", "parquet") df = pl.scan_parquet("data/sales.parquet").collect() span.set_attribute("rows_read", len(df))
with tracer.start_as_current_span("transform") as span:
    span.set_attribute("operation", "aggregation")
    result = df.group_by("category").agg(pl.col("value").sum())

with tracer.start_as_current_span("load") as span:
    span.set_attribute("target", "duckdb.summary")
    result.to_pandas().to_sql("summary", conn, if_exists="replace")
    span.set_attribute("rows_written", len(result))
if name == "main": run_pipeline()
undefined

Trace Context Propagation

追踪上下文传递

For multi-service pipelines, pass trace context:
python
from opentelemetry import propagators
from opentelemetry.propagators.b3 import B3Format
对于多服务管道,传递追踪上下文:
python
from opentelemetry import propagators
from opentelemetry.propagators.b3 import B3Format

Inject trace context into message headers (Kafka, HTTP)

Inject trace context into message headers (Kafka, HTTP)

carrier = {} propagator = B3Format() propagator.inject(carrier, context=trace.get_current_span().get_context())
carrier = {} propagator = B3Format() propagator.inject(carrier, context=trace.get_current_span().get_context())

Send carrier dict with message (e.g., Kafka header)

Send carrier dict with message (e.g., Kafka header)

producer.produce( topic="events", key=key, value=json.dumps(data), headers=list(carrier.items()) )
producer.produce( topic="events", key=key, value=json.dumps(data), headers=list(carrier.items()) )

Consumer extracts context

Consumer extracts context

context = propagator.extract(carrier=carrier) with tracer.start_as_current_span("process_message", context=context): process(data)

---
context = propagator.extract(carrier=carrier) with tracer.start_as_current_span("process_message", context=context): process(data)

---

Prometheus Metrics

Prometheus 指标

Prometheus collects numeric time series data. Push or pull metrics from your application.
Prometheus 收集数值型时间序列数据。可从应用中推送或拉取指标。

Installation

安装

bash
pip install prometheus-client
bash
pip install prometheus-client

Basic Instrumentation

基础埋点

python
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
python
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time

Define metrics

Define metrics

ROWS_PROCESSED = Counter( 'etl_rows_processed_total', 'Total rows processed by ETL', ['source', 'stage'] )
PROCESSING_TIME = Histogram( 'etl_processing_seconds', 'Time spent processing', ['operation'], buckets=[0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0] )
PIPELINE_ERRORS = Counter( 'etl_errors_total', 'Total preprocessing errors', ['stage', 'error_type'] )
MEMORY_USAGE = Gauge( 'etl_memory_bytes', 'Process memory usage in bytes' )
ROWS_PROCESSED = Counter( 'etl_rows_processed_total', 'Total rows processed by ETL', ['source', 'stage'] )
PROCESSING_TIME = Histogram( 'etl_processing_seconds', 'Time spent processing', ['operation'], buckets=[0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0] )
PIPELINE_ERRORS = Counter( 'etl_errors_total', 'Total preprocessing errors', ['stage', 'error_type'] )
MEMORY_USAGE = Gauge( 'etl_memory_bytes', 'Process memory usage in bytes' )

Start metrics server (Prometheus scrapes this endpoint)

Start metrics server (Prometheus scrapes this endpoint)

start_http_server(8000)
def process_batch(stage: str, batch_id: int): with PROCESSING_TIME.time(operation=f"batch_{batch_id}"): try: rows = extract_and_process(batch_id) ROWS_PROCESSED.labels(source="kafka", stage=stage).inc(rows) return rows except Exception as e: PIPELINE_ERRORS.labels(stage=stage, error_type=type(e).name).inc() raise
start_http_server(8000)
def process_batch(stage: str, batch_id: int): with PROCESSING_TIME.time(operation=f"batch_{batch_id}"): try: rows = extract_and_process(batch_id) ROWS_PROCESSED.labels(source="kafka", stage=stage).inc(rows) return rows except Exception as e: PIPELINE_ERRORS.labels(stage=stage, error_type=type(e).name).inc() raise

Periodic gauge update

Periodic gauge update

import psutil def update_memory(): process = psutil.Process() MEMORY_USAGE.set(process.memory_info().rss)
undefined
import psutil def update_memory(): process = psutil.Process() MEMORY_USAGE.set(process.memory_info().rss)
undefined

Custom Collector

自定义收集器

python
from prometheus_client import CollectorRegistry, Gauge

registry = CollectorRegistry()
python
from prometheus_client import CollectorRegistry, Gauge

registry = CollectorRegistry()

Custom gauge that computes on demand

Custom gauge that computes on demand

queue_size = Gauge( 'kafka_queue_size', 'Number of messages in queue', registry=registry )
def collect_queue_size(): size = kafka_consumer.metrics()['fetch-metrics']['records-lag-max'] queue_size.set(size)
queue_size = Gauge( 'kafka_queue_size', 'Number of messages in queue', registry=registry )
def collect_queue_size(): size = kafka_consumer.metrics()['fetch-metrics']['records-lag-max'] queue_size.set(size)

Register with push gateway or scrape

Register with push gateway or scrape


---

---

Integration with Orchestration

与编排工具集成

Prefect Built-in Observability

Prefect 内置可观测性

Prefect automatically records:
  • Task run status (success/failure)
  • Duration
  • Retry counts
  • Parameters
Enable Prefect Cloud/Server for UI:
bash
prefect cloud login  # or prefect server start
prefect agent start -q 'default'
Prefect 自动记录:
  • 任务运行状态(成功/失败)
  • 执行时长
  • 重试次数
  • 参数
启用Prefect Cloud/Server以使用UI界面:
bash
prefect cloud login  # or prefect server start
prefect agent start -q 'default'

Dagster Observability

Dagster 可观测性

Dagster Dagit UI shows:
  • Asset materialization history
  • Run duration and status
  • Asset lineage graph
  • Resource usage
Enable metrics:
python
from dagster import DagsterMetric

@asset
def monitored_asset():
    # Dagster automatically records metrics
    pass

Dagster Dagit UI 展示:
  • 资产物化历史
  • 运行时长与状态
  • 资产血缘图
  • 资源使用情况
启用指标:
python
from dagster import DagsterMetric

@asset
def monitored_asset():
    # Dagster automatically records metrics
    pass

Dashboards & Alerting

仪表盘与告警

Grafana Dashboard Example

Grafana 仪表盘示例

Create dashboard with panels:
  • Throughput:
    rate(etl_rows_processed_total[5m])
  • Latency:
    histogram_quantile(0.95, etl_processing_seconds_bucket)
  • Error Rate:
    rate(etl_errors_total[5m])
  • Memory:
    etl_memory_bytes / 1024 / 1024
创建包含以下面板的仪表盘:
  • 吞吐量
    rate(etl_rows_processed_total[5m])
  • 延迟
    histogram_quantile(0.95, etl_processing_seconds_bucket)
  • 错误率
    rate(etl_errors_total[5m])
  • 内存
    etl_memory_bytes / 1024 / 1024

Alert Rules (Prometheus Alertmanager)

告警规则(Prometheus Alertmanager)

yaml
groups:
  - name: etl-alerts
    rules:
      - alert: HighErrorRate
        expr: rate(etl_errors_total[5m]) > 0.1
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "ETL error rate elevated"
          description: "{{ $labels.stage }} stage error rate: {{ $value }} errors/sec"

yaml
groups:
  - name: etl-alerts
    rules:
      - alert: HighErrorRate
        expr: rate(etl_errors_total[5m]) > 0.1
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "ETL error rate elevated"
          description: "{{ $labels.stage }} stage error rate: {{ $value }} errors/sec"

Best Practices

最佳实践

Instrumentation

埋点

  1. Span every pipeline stage - extract, transform, load, validate
  2. Add attributes - dataset names, row counts, file paths
  3. Propagate context across async boundaries (threads, processes, network)
  4. Record errors in spans with
    span.record_exception()
  5. Sample judiciously - 100% in dev, lower in prod (sampling policy)
  1. 为每个管道阶段添加Span - 抽取、转换、加载、验证
  2. 添加属性 - 数据集名称、行数、文件路径
  3. 跨异步边界传递上下文(线程、进程、网络)
  4. 在Span中记录错误,使用
    span.record_exception()
  5. 合理采样 - 开发环境100%采样,生产环境降低采样率(设置采样策略)

Metrics

指标

  1. Use counters for events (rows processed, errors)
  2. Use histograms for durations (processing time, latency)
  3. Use gauges for state (queue size, memory usage)
  4. Label dimensions (stage, source, status) but avoid cardinality explosion
  5. Export endpoint on separate port (8000) outside app port
  1. 使用计数器统计事件(处理的行数、错误数)
  2. 使用直方图统计时长(处理时间、延迟)
  3. 使用仪表盘统计状态(队列大小、内存使用)
  4. 添加标签维度(阶段、来源、状态),但避免基数爆炸
  5. 在独立端口暴露导出端点(如8000),与应用端口分离

Production

生产环境

  1. Centralized logs - send structured logs to ELK/Datadog
  2. Correlation IDs - Include trace IDs in log entries
  3. Alert on SLA breaches - latency > threshold, error rate > X%
  4. Test observability - Simulate failures, verify traces/metrics
  5. Document schema - Define metric names and label values in README

  1. 集中式日志 - 将结构化日志发送至ELK/Datadog
  2. 关联ID - 在日志条目中包含追踪ID
  3. 针对SLA违规设置告警 - 延迟超过阈值、错误率高于X%
  4. 测试可观测性 - 模拟故障,验证追踪/指标是否正常
  5. 文档化 schema - 在README中定义指标名称与标签值

References

参考资料