etl-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

ETL Patterns

ETL模式

Orchestrator for production-grade Extract-Transform-Load patterns.
生产级抽取-转换-加载(ETL)模式的编排器。

Skill Routing

技能路由

NeedSkillContent
Reliability patterns
etl-core-patterns
Idempotency, checkpointing, error handling, chunking, retry, logging
Load strategies
etl-incremental-patterns
Backfill, timestamp-based, CDC, pipeline orchestration
需求技能内容
可靠性模式
etl-core-patterns
Idempotency、checkpointing、错误处理、分块处理、重试、日志记录
加载策略
etl-incremental-patterns
回填、基于时间戳的加载、CDC、管道编排

Pattern Selection Guide

模式选择指南

By Reliability Need

按可靠性需求选择

NeedPatternSkill
Repeatable runsIdempotency
etl-core-patterns
Resume after failureCheckpointing
etl-core-patterns
Handle bad recordsError handling + DLQ
etl-core-patterns
Memory managementChunked processing
etl-core-patterns
Network resilienceRetry with backoff
etl-core-patterns
ObservabilityStructured logging
etl-core-patterns
需求模式技能
可重复运行Idempotency
etl-core-patterns
故障后恢复Checkpointing
etl-core-patterns
处理坏数据错误处理 + DLQ
etl-core-patterns
内存管理分块处理
etl-core-patterns
网络韧性退避重试
etl-core-patterns
可观测性结构化日志
etl-core-patterns

By Load Strategy

按加载策略选择

ScenarioPatternSkill
Small tables (<100K)Full refresh
etl-incremental-patterns
Large tablesTimestamp incremental
etl-incremental-patterns
Real-time syncCDC events
etl-incremental-patterns
Historical migrationParallel backfill
etl-incremental-patterns
Zero-downtime refreshSwap pattern
etl-incremental-patterns
Multi-step pipelinesPipeline orchestration
etl-incremental-patterns
场景模式技能
小表(<10万行)全量刷新
etl-incremental-patterns
大表基于时间戳的增量加载
etl-incremental-patterns
实时同步CDC事件
etl-incremental-patterns
历史数据迁移并行回填
etl-incremental-patterns
零停机刷新交换模式
etl-incremental-patterns
多步骤管道管道编排
etl-incremental-patterns

Quick Reference

快速参考

Idempotency Options

Idempotency选项

python
undefined
python
undefined

Small datasets: Delete-then-insert

Small datasets: Delete-then-insert

Large datasets: UPSERT on conflict

Large datasets: UPSERT on conflict

Change detection: Row hash comparison

Change detection: Row hash comparison

undefined
undefined

Load Strategy Decision

加载策略决策

Is table < 100K rows?
  → Full refresh

Has reliable timestamp column?
  → Timestamp incremental

Source supports CDC?
  → CDC event processing

Need zero downtime?
  → Swap pattern (temp table → rename)

One-time historical load?
  → Parallel backfill with date ranges
Is table < 100K rows?
  → Full refresh

Has reliable timestamp column?
  → Timestamp incremental

Source supports CDC?
  → CDC event processing

Need zero downtime?
  → Swap pattern (temp table → rename)

One-time historical load?
  → Parallel backfill with date ranges

Common Pipeline Structure

常见管道结构

python
undefined
python
undefined

1. Setup

1. Setup

checkpoint = Checkpoint('.etl_checkpoint.json') processor = ETLProcessor()
checkpoint = Checkpoint('.etl_checkpoint.json') processor = ETLProcessor()

2. Extract (with incremental)

2. Extract (with incremental)

df = incremental_by_timestamp(source_table, 'updated_at')
df = incremental_by_timestamp(source_table, 'updated_at')

3. Transform (with error handling)

3. Transform (with error handling)

transformed = processor.process_batch(df.to_dict('records'))
transformed = processor.process_batch(df.to_dict('records'))

4. Load (with idempotency)

4. Load (with idempotency)

upsert_records(pd.DataFrame(transformed))
upsert_records(pd.DataFrame(transformed))

5. Checkpoint

5. Checkpoint

checkpoint.set_last_processed('sync', df['updated_at'].max())
checkpoint.set_last_processed('sync', df['updated_at'].max())

6. Handle failures

6. Handle failures

processor.save_failures('failures/')
undefined
processor.save_failures('failures/')
undefined

Related Skills

相关技能

  • data-validation
    - Validate data quality during ETL
  • data-quality
    - Monitor data quality metrics
  • pandas-coder
    - DataFrame transformations
  • data-validation
    - ETL过程中的数据质量校验
  • data-quality
    - 数据质量指标监控
  • pandas-coder
    - DataFrame转换