etl-designer

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

ETL Designer

ETL 管道设计

Design robust ETL/ELT pipelines for data processing.
为数据处理设计健壮的ETL/ELT管道。

Quick Start

快速入门

Use Airflow for orchestration, implement idempotent operations, add error handling, monitor pipeline health.
使用Airflow进行编排,实现幂等操作,添加错误处理,监控管道健康状态。

Instructions

操作指南

Airflow DAG Structure

Airflow DAG 结构

python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['alerts@company.com']
}

with DAG(
    'etl_pipeline',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    
    extract = PythonOperator(
        task_id='extract_data',
        python_callable=extract_from_source
    )
    
    transform = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data
    )
    
    load = PythonOperator(
        task_id='load_to_warehouse',
        python_callable=load_to_warehouse
    )
    
    extract >> transform >> load
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['alerts@company.com']
}

with DAG(
    'etl_pipeline',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    
    extract = PythonOperator(
        task_id='extract_data',
        python_callable=extract_from_source
    )
    
    transform = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data
    )
    
    load = PythonOperator(
        task_id='load_to_warehouse',
        python_callable=load_to_warehouse
    )
    
    extract >> transform >> load

Incremental Processing

增量处理

python
def extract_incremental(last_run_date):
    query = f"""
        SELECT * FROM source_table
        WHERE updated_at > '{last_run_date}'
    """
    return pd.read_sql(query, conn)
python
def extract_incremental(last_run_date):
    query = f"""
        SELECT * FROM source_table
        WHERE updated_at > '{last_run_date}'
    """
    return pd.read_sql(query, conn)

Error Handling

错误处理

python
def safe_transform(data):
    try:
        transformed = transform_data(data)
        return transformed
    except Exception as e:
        logger.error(f"Transform failed: {e}")
        send_alert(f"Pipeline failed: {e}")
        raise
python
def safe_transform(data):
    try:
        transformed = transform_data(data)
        return transformed
    except Exception as e:
        logger.error(f"Transform failed: {e}")
        send_alert(f"Pipeline failed: {e}")
        raise

Best Practices

最佳实践

  • Make operations idempotent
  • Use incremental processing
  • Implement proper error handling
  • Add monitoring and alerts
  • Use data quality checks
  • Document pipeline logic
  • 确保操作具备幂等性
  • 使用增量处理
  • 实现完善的错误处理
  • 添加监控与告警
  • 执行数据质量检查
  • 编写管道逻辑文档