Loading...
Loading...
Observability and monitoring for data pipelines using OpenTelemetry (traces) and Prometheus (metrics). Covers instrumentation, dashboards, and alerting.
npx skill4agent add legout/data-platform-agent-skills data-engineering-observability| Tool | Purpose | What it Measures |
|---|---|---|
| OpenTelemetry | Distributed tracing | Pipeline stages, latency, dependencies |
| Prometheus | Metrics | Throughput, error rates, resource utilization |
| Grafana | Visualization | Dashboards combining traces + metrics |
@data-engineering-core@data-engineering-orchestration@data-engineering-streamingpip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlpfrom opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
import logging
# Setup tracer provider
provider = TracerProvider()
exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("data_pipeline")
def run_pipeline():
with tracer.start_as_current_span("extract") as span:
span.set_attribute("source", "sales.parquet")
span.set_attribute("format", "parquet")
df = pl.scan_parquet("data/sales.parquet").collect()
span.set_attribute("rows_read", len(df))
with tracer.start_as_current_span("transform") as span:
span.set_attribute("operation", "aggregation")
result = df.group_by("category").agg(pl.col("value").sum())
with tracer.start_as_current_span("load") as span:
span.set_attribute("target", "duckdb.summary")
result.to_pandas().to_sql("summary", conn, if_exists="replace")
span.set_attribute("rows_written", len(result))
if __name__ == "__main__":
run_pipeline()from opentelemetry import propagators
from opentelemetry.propagators.b3 import B3Format
# Inject trace context into message headers (Kafka, HTTP)
carrier = {}
propagator = B3Format()
propagator.inject(carrier, context=trace.get_current_span().get_context())
# Send carrier dict with message (e.g., Kafka header)
producer.produce(
topic="events",
key=key,
value=json.dumps(data),
headers=list(carrier.items())
)
# Consumer extracts context
context = propagator.extract(carrier=carrier)
with tracer.start_as_current_span("process_message", context=context):
process(data)pip install prometheus-clientfrom prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
# Define metrics
ROWS_PROCESSED = Counter(
'etl_rows_processed_total',
'Total rows processed by ETL',
['source', 'stage']
)
PROCESSING_TIME = Histogram(
'etl_processing_seconds',
'Time spent processing',
['operation'],
buckets=[0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0]
)
PIPELINE_ERRORS = Counter(
'etl_errors_total',
'Total preprocessing errors',
['stage', 'error_type']
)
MEMORY_USAGE = Gauge(
'etl_memory_bytes',
'Process memory usage in bytes'
)
# Start metrics server (Prometheus scrapes this endpoint)
start_http_server(8000)
def process_batch(stage: str, batch_id: int):
with PROCESSING_TIME.time(operation=f"batch_{batch_id}"):
try:
rows = extract_and_process(batch_id)
ROWS_PROCESSED.labels(source="kafka", stage=stage).inc(rows)
return rows
except Exception as e:
PIPELINE_ERRORS.labels(stage=stage, error_type=type(e).__name__).inc()
raise
# Periodic gauge update
import psutil
def update_memory():
process = psutil.Process()
MEMORY_USAGE.set(process.memory_info().rss)from prometheus_client import CollectorRegistry, Gauge
registry = CollectorRegistry()
# Custom gauge that computes on demand
queue_size = Gauge(
'kafka_queue_size',
'Number of messages in queue',
registry=registry
)
def collect_queue_size():
size = kafka_consumer.metrics()['fetch-metrics']['records-lag-max']
queue_size.set(size)
# Register with push gateway or scrapeprefect cloud login # or prefect server start
prefect agent start -q 'default'from dagster import DagsterMetric
@asset
def monitored_asset():
# Dagster automatically records metrics
passrate(etl_rows_processed_total[5m])histogram_quantile(0.95, etl_processing_seconds_bucket)rate(etl_errors_total[5m])etl_memory_bytes / 1024 / 1024groups:
- name: etl-alerts
rules:
- alert: HighErrorRate
expr: rate(etl_errors_total[5m]) > 0.1
for: 2m
labels:
severity: critical
annotations:
summary: "ETL error rate elevated"
description: "{{ $labels.stage }} stage error rate: {{ $value }} errors/sec"span.record_exception()@data-engineering-orchestration