etl-designer
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseETL Designer
ETL 管道设计
Design robust ETL/ELT pipelines for data processing.
为数据处理设计健壮的ETL/ELT管道。
Quick Start
快速入门
Use Airflow for orchestration, implement idempotent operations, add error handling, monitor pipeline health.
使用Airflow进行编排,实现幂等操作,添加错误处理,监控管道健康状态。
Instructions
操作指南
Airflow DAG Structure
Airflow DAG 结构
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['alerts@company.com']
}
with DAG(
'etl_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
extract = PythonOperator(
task_id='extract_data',
python_callable=extract_from_source
)
transform = PythonOperator(
task_id='transform_data',
python_callable=transform_data
)
load = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_warehouse
)
extract >> transform >> loadpython
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['alerts@company.com']
}
with DAG(
'etl_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
extract = PythonOperator(
task_id='extract_data',
python_callable=extract_from_source
)
transform = PythonOperator(
task_id='transform_data',
python_callable=transform_data
)
load = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_warehouse
)
extract >> transform >> loadIncremental Processing
增量处理
python
def extract_incremental(last_run_date):
query = f"""
SELECT * FROM source_table
WHERE updated_at > '{last_run_date}'
"""
return pd.read_sql(query, conn)python
def extract_incremental(last_run_date):
query = f"""
SELECT * FROM source_table
WHERE updated_at > '{last_run_date}'
"""
return pd.read_sql(query, conn)Error Handling
错误处理
python
def safe_transform(data):
try:
transformed = transform_data(data)
return transformed
except Exception as e:
logger.error(f"Transform failed: {e}")
send_alert(f"Pipeline failed: {e}")
raisepython
def safe_transform(data):
try:
transformed = transform_data(data)
return transformed
except Exception as e:
logger.error(f"Transform failed: {e}")
send_alert(f"Pipeline failed: {e}")
raiseBest Practices
最佳实践
- Make operations idempotent
- Use incremental processing
- Implement proper error handling
- Add monitoring and alerts
- Use data quality checks
- Document pipeline logic
- 确保操作具备幂等性
- 使用增量处理
- 实现完善的错误处理
- 添加监控与告警
- 执行数据质量检查
- 编写管道逻辑文档