Loading...
Loading...
Designs and builds ETL/ELT data pipelines. Takes data sources, destination, transformation requirements. Generates pipeline code (Python/SQL), scheduling config, error handling, monitoring setup, and data quality checks. Outputs data-pipeline-spec.md + implementation files.
npx skill4agent add onewave-ai/claude-skills data-pipeline-builderdata-pipeline-spec.mdpipeline_name/
README.md
data-pipeline-spec.md
config/
pipeline_config.yaml
connections.yaml.example
.env.example
src/
extractors/
__init__.py
base_extractor.py
[source_name]_extractor.py
transformers/
__init__.py
base_transformer.py
[transform_name]_transformer.py
loaders/
__init__.py
base_loader.py
[destination_name]_loader.py
quality/
__init__.py
checks.py
expectations.py
utils/
__init__.py
logging_config.py
retry.py
metrics.py
sql/
staging/
[table_name]_staging.sql
transformations/
[transform_name].sql
quality_checks/
[check_name].sql
orchestration/
dags/
[pipeline_name]_dag.py
schedules/
schedule_config.yaml
tests/
unit/
test_extractors.py
test_transformers.py
test_loaders.py
test_quality.py
integration/
test_pipeline_e2e.py
fixtures/
sample_data/
monitoring/
alerts/
alert_rules.yaml
dashboards/
pipeline_dashboard.json
docker/
Dockerfile
docker-compose.yaml
requirements.txt
pyproject.toml
Makefileexcept:"""Base extractor module providing the abstract interface for all data extractors."""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Generator, Optional
import structlog
logger = structlog.get_logger(__name__)
@dataclass
class ExtractionResult:
"""Container for extraction results with metadata."""
data: list[dict[str, Any]]
source_name: str
extraction_timestamp: datetime
record_count: int
watermark_value: Optional[str] = None
schema_hash: Optional[str] = None
def __post_init__(self) -> None:
if self.record_count != len(self.data):
raise ValueError(
f"Record count mismatch: declared {self.record_count}, "
f"actual {len(self.data)}"
)
class BaseExtractor(ABC):
"""Abstract base class for all data extractors.
Subclasses must implement `connect`, `extract`, and `close` methods.
The base class provides retry logic, logging, and watermark tracking.
"""
def __init__(self, config: dict[str, Any], source_name: str) -> None:
self.config = config
self.source_name = source_name
self._connected = False
self._log = logger.bind(source=source_name)
@abstractmethod
def connect(self) -> None:
"""Establish connection to the data source."""
...
@abstractmethod
def extract(
self,
watermark: Optional[str] = None,
batch_size: int = 10000,
) -> Generator[ExtractionResult, None, None]:
"""Extract data from the source, yielding batches.
Args:
watermark: Resume point for incremental extraction.
batch_size: Number of records per batch.
Yields:
ExtractionResult for each batch of extracted data.
"""
...
@abstractmethod
def close(self) -> None:
"""Clean up connections and resources."""
...
def __enter__(self) -> "BaseExtractor":
self.connect()
self._connected = True
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.close()
self._connected = False
def validate_connection(self) -> bool:
"""Test that the source connection is alive and responsive."""
try:
self.connect()
self.close()
return True
except Exception as exc:
self._log.error("connection_validation_failed", error=str(exc))
return False"""Base transformer module providing the abstract interface for all transformations."""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional
import structlog
logger = structlog.get_logger(__name__)
@dataclass
class TransformationResult:
"""Container for transformation results with lineage metadata."""
data: list[dict[str, Any]]
transform_name: str
input_record_count: int
output_record_count: int
transformation_timestamp: datetime
dropped_records: int = 0
error_records: list[dict[str, Any]] = field(default_factory=list)
lineage: Optional[dict[str, Any]] = None
@property
def drop_rate(self) -> float:
"""Calculate the percentage of records dropped during transformation."""
if self.input_record_count == 0:
return 0.0
return self.dropped_records / self.input_record_count * 100
class BaseTransformer(ABC):
"""Abstract base class for all data transformers.
Subclasses must implement `transform` and `validate_output`.
Provides standard logging, error collection, and lineage tracking.
"""
def __init__(self, config: dict[str, Any], transform_name: str) -> None:
self.config = config
self.transform_name = transform_name
self._log = logger.bind(transform=transform_name)
@abstractmethod
def transform(
self,
data: list[dict[str, Any]],
) -> TransformationResult:
"""Apply transformation logic to the input data.
Args:
data: List of records to transform.
Returns:
TransformationResult containing transformed data and metadata.
"""
...
@abstractmethod
def validate_output(
self,
result: TransformationResult,
) -> list[str]:
"""Validate the transformation output against business rules.
Args:
result: The transformation result to validate.
Returns:
List of validation error messages. Empty list means valid.
"""
...
def _track_lineage(
self,
input_sources: list[str],
output_fields: list[str],
logic_description: str,
) -> dict[str, Any]:
"""Generate lineage metadata for the transformation."""
return {
"transform_name": self.transform_name,
"input_sources": input_sources,
"output_fields": output_fields,
"logic": logic_description,
"timestamp": datetime.utcnow().isoformat(),
}"""Base loader module providing the abstract interface for all data loaders."""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Optional
import structlog
logger = structlog.get_logger(__name__)
class LoadStrategy(Enum):
"""Supported load strategies."""
APPEND = "append"
OVERWRITE = "overwrite"
UPSERT = "upsert"
MERGE = "merge"
SCD_TYPE_2 = "scd_type_2"
@dataclass
class LoadResult:
"""Container for load operation results."""
destination_name: str
strategy: LoadStrategy
records_loaded: int
records_updated: int
records_rejected: int
load_timestamp: datetime
duration_seconds: float
partition_key: Optional[str] = None
rejected_records: list[dict[str, Any]] = None
def __post_init__(self) -> None:
if self.rejected_records is None:
self.rejected_records = []
@property
def success_rate(self) -> float:
"""Calculate the percentage of records successfully loaded."""
total = self.records_loaded + self.records_rejected
if total == 0:
return 100.0
return self.records_loaded / total * 100
class BaseLoader(ABC):
"""Abstract base class for all data loaders.
Subclasses must implement `connect`, `load`, and `close` methods.
The base class provides transaction management, retry logic, and metrics.
"""
def __init__(
self,
config: dict[str, Any],
destination_name: str,
strategy: LoadStrategy = LoadStrategy.APPEND,
) -> None:
self.config = config
self.destination_name = destination_name
self.strategy = strategy
self._connected = False
self._log = logger.bind(destination=destination_name, strategy=strategy.value)
@abstractmethod
def connect(self) -> None:
"""Establish connection to the destination system."""
...
@abstractmethod
def load(
self,
data: list[dict[str, Any]],
target_table: str,
) -> LoadResult:
"""Load data into the destination.
Args:
data: List of records to load.
target_table: Target table or collection name.
Returns:
LoadResult with load operation metadata.
"""
...
@abstractmethod
def close(self) -> None:
"""Clean up connections and resources."""
...
def __enter__(self) -> "BaseLoader":
self.connect()
self._connected = True
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.close()
self._connected = False
@abstractmethod
def create_table_if_not_exists(
self,
table_name: str,
schema: dict[str, str],
) -> None:
"""Ensure the target table exists with the correct schema.
Args:
table_name: Name of the table to create.
schema: Column name to data type mapping.
"""
..."""Data quality check module for validating pipeline data at every stage."""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Optional
import structlog
logger = structlog.get_logger(__name__)
class CheckSeverity(Enum):
"""Severity level for quality check failures."""
WARN = "warn"
ERROR = "error"
CRITICAL = "critical"
class CheckStatus(Enum):
"""Result status of a quality check."""
PASSED = "passed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class QualityCheckResult:
"""Result of a single data quality check."""
check_name: str
status: CheckStatus
severity: CheckSeverity
message: str
checked_at: datetime
records_checked: int
records_failed: int = 0
failed_examples: list[dict[str, Any]] = field(default_factory=list)
metadata: Optional[dict[str, Any]] = None
@property
def failure_rate(self) -> float:
"""Calculate the percentage of records that failed the check."""
if self.records_checked == 0:
return 0.0
return self.records_failed / self.records_checked * 100
@dataclass
class QualitySuiteResult:
"""Aggregate result of running a full quality check suite."""
suite_name: str
results: list[QualityCheckResult]
executed_at: datetime
duration_seconds: float
@property
def passed(self) -> bool:
"""Return True if no ERROR or CRITICAL checks failed."""
return not any(
r.status == CheckStatus.FAILED
and r.severity in (CheckSeverity.ERROR, CheckSeverity.CRITICAL)
for r in self.results
)
@property
def summary(self) -> dict[str, int]:
"""Return count of checks by status."""
counts: dict[str, int] = {"passed": 0, "failed": 0, "skipped": 0}
for r in self.results:
counts[r.status.value] += 1
return counts
class QualityCheck:
"""A single configurable data quality check.
Checks are composable and can be combined into suites.
Each check is a function that takes data and returns a boolean.
"""
def __init__(
self,
name: str,
description: str,
check_fn: Callable[[list[dict[str, Any]]], tuple[bool, list[dict[str, Any]]]],
severity: CheckSeverity = CheckSeverity.ERROR,
) -> None:
self.name = name
self.description = description
self.check_fn = check_fn
self.severity = severity
def run(self, data: list[dict[str, Any]]) -> QualityCheckResult:
"""Execute the quality check against the provided data.
Args:
data: List of records to check.
Returns:
QualityCheckResult with pass/fail status and details.
"""
log = logger.bind(check=self.name)
try:
passed, failed_records = self.check_fn(data)
status = CheckStatus.PASSED if passed else CheckStatus.FAILED
log.info("check_completed", status=status.value, failures=len(failed_records))
return QualityCheckResult(
check_name=self.name,
status=status,
severity=self.severity,
message=self.description,
checked_at=datetime.utcnow(),
records_checked=len(data),
records_failed=len(failed_records),
failed_examples=failed_records[:10],
)
except Exception as exc:
log.error("check_error", error=str(exc))
return QualityCheckResult(
check_name=self.name,
status=CheckStatus.FAILED,
severity=self.severity,
message=f"Check raised exception: {exc}",
checked_at=datetime.utcnow(),
records_checked=len(data),
records_failed=len(data),
)
# -- Built-in Quality Checks ---------------------------------------------------
def not_null_check(column: str) -> QualityCheck:
"""Create a check that ensures a column has no null values."""
def _check(data: list[dict[str, Any]]) -> tuple[bool, list[dict[str, Any]]]:
failed = [row for row in data if row.get(column) is None]
return len(failed) == 0, failed
return QualityCheck(
name=f"not_null_{column}",
description=f"Column '{column}' must not contain null values",
check_fn=_check,
severity=CheckSeverity.ERROR,
)
def unique_check(column: str) -> QualityCheck:
"""Create a check that ensures a column has unique values."""
def _check(data: list[dict[str, Any]]) -> tuple[bool, list[dict[str, Any]]]:
seen: dict[Any, int] = {}
duplicates: list[dict[str, Any]] = []
for row in data:
val = row.get(column)
if val in seen:
duplicates.append(row)
seen[val] = seen.get(val, 0) + 1
return len(duplicates) == 0, duplicates
return QualityCheck(
name=f"unique_{column}",
description=f"Column '{column}' must contain unique values",
check_fn=_check,
severity=CheckSeverity.ERROR,
)
def range_check(
column: str,
min_value: Optional[float] = None,
max_value: Optional[float] = None,
) -> QualityCheck:
"""Create a check that ensures a numeric column falls within a range."""
def _check(data: list[dict[str, Any]]) -> tuple[bool, list[dict[str, Any]]]:
failed = []
for row in data:
val = row.get(column)
if val is None:
continue
if min_value is not None and val < min_value:
failed.append(row)
elif max_value is not None and val > max_value:
failed.append(row)
return len(failed) == 0, failed
bounds = []
if min_value is not None:
bounds.append(f">= {min_value}")
if max_value is not None:
bounds.append(f"<= {max_value}")
desc = f"Column '{column}' must be {' and '.join(bounds)}"
return QualityCheck(
name=f"range_{column}",
description=desc,
check_fn=_check,
severity=CheckSeverity.ERROR,
)
def freshness_check(
timestamp_column: str,
max_age_hours: int,
) -> QualityCheck:
"""Create a check that ensures data is not older than a threshold."""
def _check(data: list[dict[str, Any]]) -> tuple[bool, list[dict[str, Any]]]:
if not data:
return True, []
now = datetime.utcnow()
stale = []
for row in data:
ts = row.get(timestamp_column)
if ts is None:
continue
if isinstance(ts, str):
ts = datetime.fromisoformat(ts)
age = (now - ts).total_seconds() / 3600
if age > max_age_hours:
stale.append(row)
return len(stale) == 0, stale
return QualityCheck(
name=f"freshness_{timestamp_column}",
description=f"Column '{timestamp_column}' must not be older than {max_age_hours} hours",
check_fn=_check,
severity=CheckSeverity.CRITICAL,
)
def row_count_check(
min_rows: int = 1,
max_rows: Optional[int] = None,
) -> QualityCheck:
"""Create a check that ensures the dataset has an expected row count."""
def _check(data: list[dict[str, Any]]) -> tuple[bool, list[dict[str, Any]]]:
count = len(data)
if count < min_rows:
return False, []
if max_rows is not None and count > max_rows:
return False, []
return True, []
desc = f"Row count must be >= {min_rows}"
if max_rows is not None:
desc += f" and <= {max_rows}"
return QualityCheck(
name="row_count",
description=desc,
check_fn=_check,
severity=CheckSeverity.CRITICAL,
)
def referential_integrity_check(
column: str,
reference_values: set[Any],
) -> QualityCheck:
"""Create a check that ensures all values in a column exist in a reference set."""
def _check(data: list[dict[str, Any]]) -> tuple[bool, list[dict[str, Any]]]:
orphans = [row for row in data if row.get(column) not in reference_values]
return len(orphans) == 0, orphans
return QualityCheck(
name=f"referential_integrity_{column}",
description=f"All values in '{column}' must exist in the reference set",
check_fn=_check,
severity=CheckSeverity.ERROR,
)
def schema_check(
expected_columns: list[str],
) -> QualityCheck:
"""Create a check that ensures all expected columns are present."""
def _check(data: list[dict[str, Any]]) -> tuple[bool, list[dict[str, Any]]]:
if not data:
return True, []
actual_columns = set(data[0].keys())
missing = set(expected_columns) - actual_columns
if missing:
return False, [{"missing_columns": list(missing)}]
return True, []
return QualityCheck(
name="schema_check",
description=f"Data must contain columns: {', '.join(expected_columns)}",
check_fn=_check,
severity=CheckSeverity.CRITICAL,
)"""Retry and error handling utilities for pipeline resilience."""
import functools
import time
from typing import Any, Callable, Optional, Type
import structlog
logger = structlog.get_logger(__name__)
class PipelineError(Exception):
"""Base exception for all pipeline errors."""
def __init__(self, message: str, stage: str, details: Optional[dict] = None) -> None:
super().__init__(message)
self.stage = stage
self.details = details or {}
class ExtractionError(PipelineError):
"""Raised when data extraction fails."""
def __init__(self, message: str, details: Optional[dict] = None) -> None:
super().__init__(message, stage="extraction", details=details)
class TransformationError(PipelineError):
"""Raised when data transformation fails."""
def __init__(self, message: str, details: Optional[dict] = None) -> None:
super().__init__(message, stage="transformation", details=details)
class LoadError(PipelineError):
"""Raised when data loading fails."""
def __init__(self, message: str, details: Optional[dict] = None) -> None:
super().__init__(message, stage="load", details=details)
class QualityCheckError(PipelineError):
"""Raised when a critical quality check fails."""
def __init__(self, message: str, details: Optional[dict] = None) -> None:
super().__init__(message, stage="quality_check", details=details)
def retry(
max_attempts: int = 3,
delay_seconds: float = 1.0,
backoff_factor: float = 2.0,
max_delay_seconds: float = 300.0,
retryable_exceptions: tuple[Type[Exception], ...] = (Exception,),
) -> Callable:
"""Decorator that retries a function with exponential backoff.
Args:
max_attempts: Maximum number of attempts before giving up.
delay_seconds: Initial delay between retries in seconds.
backoff_factor: Multiplier applied to delay after each retry.
max_delay_seconds: Maximum delay between retries.
retryable_exceptions: Tuple of exception types that trigger a retry.
Returns:
Decorated function with retry logic.
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
log = logger.bind(function=func.__name__)
current_delay = delay_seconds
last_exception: Optional[Exception] = None
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except retryable_exceptions as exc:
last_exception = exc
if attempt == max_attempts:
log.error(
"max_retries_exceeded",
attempt=attempt,
error=str(exc),
)
raise
log.warning(
"retrying",
attempt=attempt,
max_attempts=max_attempts,
delay=current_delay,
error=str(exc),
)
time.sleep(current_delay)
current_delay = min(
current_delay * backoff_factor,
max_delay_seconds,
)
raise last_exception # Should not reach here, but safety net
return wrapper
return decorator"""Airflow DAG template for orchestrating the data pipeline."""
from datetime import datetime, timedelta
from typing import Any
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
# -- DAG Default Arguments ------------------------------------------------------
default_args: dict[str, Any] = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=60),
"execution_timeout": timedelta(hours=2),
}
# -- DAG Definition -------------------------------------------------------------
with DAG(
dag_id="PIPELINE_NAME",
default_args=default_args,
description="PIPELINE_DESCRIPTION",
schedule="CRON_EXPRESSION",
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
tags=["data-pipeline", "PIPELINE_TAG"],
) as dag:
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end", trigger_rule="none_failed")
# -- Extraction Tasks -------------------------------------------------------
with TaskGroup("extraction") as extraction_group:
pass # Generated extraction tasks go here
# -- Quality Checks (Pre-Transform) -----------------------------------------
with TaskGroup("pre_transform_quality") as pre_quality_group:
pass # Generated pre-transform quality checks go here
# -- Transformation Tasks ---------------------------------------------------
with TaskGroup("transformation") as transformation_group:
pass # Generated transformation tasks go here
# -- Quality Checks (Post-Transform) ----------------------------------------
with TaskGroup("post_transform_quality") as post_quality_group:
pass # Generated post-transform quality checks go here
# -- Load Tasks -------------------------------------------------------------
with TaskGroup("loading") as loading_group:
pass # Generated load tasks go here
# -- Final Quality Checks ---------------------------------------------------
with TaskGroup("final_quality") as final_quality_group:
pass # Generated final quality checks go here
# -- DAG Dependencies -------------------------------------------------------
(
start
>> extraction_group
>> pre_quality_group
>> transformation_group
>> post_quality_group
>> loading_group
>> final_quality_group
>> end
)# pipeline_config.yaml -- Central configuration for the data pipeline
pipeline:
name: "PIPELINE_NAME"
version: "1.0.0"
description: "PIPELINE_DESCRIPTION"
owner: "data-engineering"
schedule: "0 6 * * *" # Daily at 6 AM UTC
timezone: "UTC"
max_runtime_minutes: 120
tags:
- data-pipeline
sources:
- name: "SOURCE_NAME"
type: "postgres" # postgres, mysql, api, s3, gcs, sftp, kafka
connection:
host: "${SOURCE_HOST}"
port: 5432
database: "${SOURCE_DB}"
username: "${SOURCE_USER}"
password: "${SOURCE_PASSWORD}"
extraction:
strategy: "incremental" # full, incremental, cdc
watermark_column: "updated_at"
batch_size: 10000
tables:
- schema: "public"
table: "TABLE_NAME"
primary_key: "id"
columns: "*" # or list specific columns
destination:
name: "DESTINATION_NAME"
type: "bigquery" # bigquery, snowflake, redshift, postgres, s3, gcs
connection:
project: "${GCP_PROJECT}"
dataset: "${BQ_DATASET}"
location: "US"
loading:
strategy: "upsert" # append, overwrite, upsert, merge, scd_type_2
partition_field: "created_date"
cluster_fields:
- "category"
- "region"
transformations:
- name: "TRANSFORM_NAME"
type: "sql" # sql, python, dbt
description: "TRANSFORM_DESCRIPTION"
input_tables:
- "staging.SOURCE_TABLE"
output_table: "analytics.TARGET_TABLE"
sql_file: "sql/transformations/TRANSFORM_NAME.sql"
quality:
pre_transform:
- check: "not_null"
column: "id"
severity: "critical"
- check: "row_count"
min_rows: 1
severity: "critical"
- check: "freshness"
column: "updated_at"
max_age_hours: 48
severity: "error"
post_transform:
- check: "unique"
column: "id"
severity: "error"
- check: "not_null"
columns:
- "id"
- "name"
- "created_date"
severity: "error"
- check: "referential_integrity"
column: "category_id"
reference_table: "dim_category"
reference_column: "id"
severity: "warn"
thresholds:
max_failure_rate_percent: 1.0
min_row_count_ratio: 0.9 # Must retain at least 90% of input rows
monitoring:
alerts:
channels:
- type: "slack"
webhook_url: "${SLACK_WEBHOOK_URL}"
channel: "#data-pipeline-alerts"
- type: "email"
recipients:
- "data-team@company.com"
rules:
- name: "pipeline_failure"
condition: "pipeline_status == 'failed'"
severity: "critical"
channels: ["slack", "email"]
- name: "quality_check_failure"
condition: "quality_suite_passed == false"
severity: "error"
channels: ["slack"]
- name: "slow_pipeline"
condition: "duration_minutes > 90"
severity: "warn"
channels: ["slack"]
metrics:
- name: "records_processed"
type: "counter"
- name: "pipeline_duration_seconds"
type: "histogram"
- name: "quality_check_pass_rate"
type: "gauge"
- name: "extraction_lag_seconds"
type: "gauge""""Pipeline monitoring and alerting utilities."""
import json
import time
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Optional, Protocol
import structlog
logger = structlog.get_logger(__name__)
class AlertSeverity(Enum):
"""Alert severity levels."""
INFO = "info"
WARN = "warn"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class PipelineMetrics:
"""Collected metrics for a single pipeline run."""
pipeline_name: str
run_id: str
start_time: datetime
end_time: Optional[datetime] = None
status: str = "running"
records_extracted: int = 0
records_transformed: int = 0
records_loaded: int = 0
records_rejected: int = 0
quality_checks_passed: int = 0
quality_checks_failed: int = 0
errors: list[dict[str, Any]] = field(default_factory=list)
custom_metrics: dict[str, Any] = field(default_factory=dict)
@property
def duration_seconds(self) -> Optional[float]:
"""Calculate pipeline run duration in seconds."""
if self.end_time is None:
return None
return (self.end_time - self.start_time).total_seconds()
def to_dict(self) -> dict[str, Any]:
"""Serialize metrics to a dictionary for reporting."""
return {
"pipeline_name": self.pipeline_name,
"run_id": self.run_id,
"start_time": self.start_time.isoformat(),
"end_time": self.end_time.isoformat() if self.end_time else None,
"status": self.status,
"duration_seconds": self.duration_seconds,
"records": {
"extracted": self.records_extracted,
"transformed": self.records_transformed,
"loaded": self.records_loaded,
"rejected": self.records_rejected,
},
"quality": {
"passed": self.quality_checks_passed,
"failed": self.quality_checks_failed,
},
"errors": self.errors,
"custom_metrics": self.custom_metrics,
}
class AlertChannel(Protocol):
"""Protocol for alert delivery channels."""
def send(self, severity: AlertSeverity, title: str, message: str) -> bool:
"""Send an alert through this channel."""
...
class MetricsCollector:
"""Collects and exposes pipeline metrics."""
def __init__(self, pipeline_name: str, run_id: str) -> None:
self.metrics = PipelineMetrics(
pipeline_name=pipeline_name,
run_id=run_id,
start_time=datetime.utcnow(),
)
self._log = logger.bind(pipeline=pipeline_name, run_id=run_id)
self._timers: dict[str, float] = {}
def start_timer(self, name: str) -> None:
"""Start a named timer."""
self._timers[name] = time.monotonic()
def stop_timer(self, name: str) -> float:
"""Stop a named timer and return elapsed seconds."""
if name not in self._timers:
return 0.0
elapsed = time.monotonic() - self._timers.pop(name)
self.metrics.custom_metrics[f"{name}_duration_seconds"] = elapsed
return elapsed
def record_extraction(self, count: int) -> None:
"""Record extracted record count."""
self.metrics.records_extracted += count
self._log.info("extraction_recorded", count=count, total=self.metrics.records_extracted)
def record_transformation(self, input_count: int, output_count: int) -> None:
"""Record transformation counts."""
self.metrics.records_transformed += output_count
dropped = input_count - output_count
self._log.info(
"transformation_recorded",
input=input_count,
output=output_count,
dropped=dropped,
)
def record_load(self, loaded: int, rejected: int = 0) -> None:
"""Record load counts."""
self.metrics.records_loaded += loaded
self.metrics.records_rejected += rejected
self._log.info("load_recorded", loaded=loaded, rejected=rejected)
def record_quality_check(self, passed: bool) -> None:
"""Record a quality check result."""
if passed:
self.metrics.quality_checks_passed += 1
else:
self.metrics.quality_checks_failed += 1
def record_error(self, stage: str, error: str, details: Optional[dict] = None) -> None:
"""Record a pipeline error."""
self.metrics.errors.append({
"stage": stage,
"error": error,
"details": details or {},
"timestamp": datetime.utcnow().isoformat(),
})
self._log.error("pipeline_error", stage=stage, error=error)
def finalize(self, status: str = "success") -> PipelineMetrics:
"""Finalize metrics collection and return the result."""
self.metrics.end_time = datetime.utcnow()
self.metrics.status = status
self._log.info(
"pipeline_completed",
status=status,
duration=self.metrics.duration_seconds,
records_loaded=self.metrics.records_loaded,
)
return self.metricsdata-pipeline-spec.md# Data Pipeline Specification: [Pipeline Name]
## Overview
- **Pipeline Name**: [name]
- **Version**: [version]
- **Owner**: [team/person]
- **Created**: [date]
- **Last Updated**: [date]
- **Status**: [draft/active/deprecated]
## Purpose
[2-3 sentences describing what this pipeline does and why it exists.]
## Architecture
[Describe the chosen architecture pattern and why it was selected.]
### Data Flow Diagram
[ASCII or mermaid diagram showing the end-to-end data flow.]
## Data Sources
### [Source 1 Name]
- **Type**: [database/api/file/stream]
- **Connection**: [connection details, redacted secrets]
- **Tables/Endpoints**: [list of tables or API endpoints]
- **Extraction Strategy**: [full/incremental/cdc]
- **Volume**: [estimated rows/day, total size]
- **Schema**: [key columns, data types, primary keys]
## Destination
- **Type**: [warehouse/lake/database]
- **Target Tables**: [list]
- **Schema Design**: [star/snowflake/OBT/vault]
- **Partitioning**: [strategy]
- **Clustering**: [fields]
## Transformations
### [Transform 1 Name]
- **Input**: [source tables]
- **Output**: [target table]
- **Logic**: [description of business rules]
- **SQL/Code**: [reference to implementation file]
## Data Quality Checks
### Pre-Transform Checks
| Check | Column | Severity | Threshold |
|-------|--------|----------|-----------|
| not_null | id | critical | 0% null |
### Post-Transform Checks
| Check | Column | Severity | Threshold |
|-------|--------|----------|-----------|
## Scheduling
- **Frequency**: [cron expression and human-readable]
- **Timezone**: [timezone]
- **Dependencies**: [upstream pipelines]
- **SLA**: [maximum acceptable completion time]
## Error Handling
- **Retry Policy**: [attempts, backoff strategy]
- **Dead Letter Queue**: [where failed records go]
- **Alerting**: [channels and severity thresholds]
- **Manual Recovery**: [steps for manual intervention]
## Monitoring
- **Metrics**: [list of tracked metrics]
- **Dashboards**: [links to monitoring dashboards]
- **Alert Rules**: [conditions and channels]
## Operational Runbook
### Starting the Pipeline
[Steps to start/restart]
### Stopping the Pipeline
[Steps to gracefully stop]
### Backfilling
[Steps to backfill historical data]
### Common Issues and Resolutions
| Issue | Symptoms | Resolution |
|-------|----------|------------|
| [issue] | [symptoms] | [steps] |
## Change Log
| Date | Version | Change | Author |
|------|---------|--------|--------|
| [date] | 1.0.0 | Initial pipeline | [author] |Pipeline Design Summary
=======================
Architecture: [pattern]
Sources: [list]
Destination: [target]
Schedule: [frequency]
Key Transformations: [summary]
Quality Gates: [summary]
Shall I proceed with implementation?data-pipeline-spec.md