Loading...
Loading...
Production ETL patterns orchestrator. Routes to core reliability patterns and incremental load strategies.
npx skill4agent add majesticlabs-dev/majestic-marketplace etl-patterns| Need | Skill | Content |
|---|---|---|
| Reliability patterns | | Idempotency, checkpointing, error handling, chunking, retry, logging |
| Load strategies | | Backfill, timestamp-based, CDC, pipeline orchestration |
| Need | Pattern | Skill |
|---|---|---|
| Repeatable runs | Idempotency | |
| Resume after failure | Checkpointing | |
| Handle bad records | Error handling + DLQ | |
| Memory management | Chunked processing | |
| Network resilience | Retry with backoff | |
| Observability | Structured logging | |
| Scenario | Pattern | Skill |
|---|---|---|
| Small tables (<100K) | Full refresh | |
| Large tables | Timestamp incremental | |
| Real-time sync | CDC events | |
| Historical migration | Parallel backfill | |
| Zero-downtime refresh | Swap pattern | |
| Multi-step pipelines | Pipeline orchestration | |
# Small datasets: Delete-then-insert
# Large datasets: UPSERT on conflict
# Change detection: Row hash comparisonIs table < 100K rows?
→ Full refresh
Has reliable timestamp column?
→ Timestamp incremental
Source supports CDC?
→ CDC event processing
Need zero downtime?
→ Swap pattern (temp table → rename)
One-time historical load?
→ Parallel backfill with date ranges# 1. Setup
checkpoint = Checkpoint('.etl_checkpoint.json')
processor = ETLProcessor()
# 2. Extract (with incremental)
df = incremental_by_timestamp(source_table, 'updated_at')
# 3. Transform (with error handling)
transformed = processor.process_batch(df.to_dict('records'))
# 4. Load (with idempotency)
upsert_records(pd.DataFrame(transformed))
# 5. Checkpoint
checkpoint.set_last_processed('sync', df['updated_at'].max())
# 6. Handle failures
processor.save_failures('failures/')data-validationdata-qualitypandas-coder