etl-patterns

Original🇺🇸 English
Translated

Production ETL patterns orchestrator. Routes to core reliability patterns and incremental load strategies.

1installs

NPX Install

npx skill4agent add majesticlabs-dev/majestic-marketplace etl-patterns

ETL Patterns

Orchestrator for production-grade Extract-Transform-Load patterns.

Skill Routing

NeedSkillContent
Reliability patterns
etl-core-patterns
Idempotency, checkpointing, error handling, chunking, retry, logging
Load strategies
etl-incremental-patterns
Backfill, timestamp-based, CDC, pipeline orchestration

Pattern Selection Guide

By Reliability Need

NeedPatternSkill
Repeatable runsIdempotency
etl-core-patterns
Resume after failureCheckpointing
etl-core-patterns
Handle bad recordsError handling + DLQ
etl-core-patterns
Memory managementChunked processing
etl-core-patterns
Network resilienceRetry with backoff
etl-core-patterns
ObservabilityStructured logging
etl-core-patterns

By Load Strategy

ScenarioPatternSkill
Small tables (<100K)Full refresh
etl-incremental-patterns
Large tablesTimestamp incremental
etl-incremental-patterns
Real-time syncCDC events
etl-incremental-patterns
Historical migrationParallel backfill
etl-incremental-patterns
Zero-downtime refreshSwap pattern
etl-incremental-patterns
Multi-step pipelinesPipeline orchestration
etl-incremental-patterns

Quick Reference

Idempotency Options

python
# Small datasets: Delete-then-insert
# Large datasets: UPSERT on conflict
# Change detection: Row hash comparison

Load Strategy Decision

Is table < 100K rows?
  → Full refresh

Has reliable timestamp column?
  → Timestamp incremental

Source supports CDC?
  → CDC event processing

Need zero downtime?
  → Swap pattern (temp table → rename)

One-time historical load?
  → Parallel backfill with date ranges

Common Pipeline Structure

python
# 1. Setup
checkpoint = Checkpoint('.etl_checkpoint.json')
processor = ETLProcessor()

# 2. Extract (with incremental)
df = incremental_by_timestamp(source_table, 'updated_at')

# 3. Transform (with error handling)
transformed = processor.process_batch(df.to_dict('records'))

# 4. Load (with idempotency)
upsert_records(pd.DataFrame(transformed))

# 5. Checkpoint
checkpoint.set_last_processed('sync', df['updated_at'].max())

# 6. Handle failures
processor.save_failures('failures/')

Related Skills

  • data-validation
    - Validate data quality during ETL
  • data-quality
    - Monitor data quality metrics
  • pandas-coder
    - DataFrame transformations