data_transform

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Data Transformation

数据转换

Transform raw data into analytical assets using modern transformation patterns, frameworks, and orchestration tools.
借助现代转换模式、框架与编排工具,将原始数据转换为可用于分析的资产。

Purpose

用途

Select and implement data transformation patterns across the modern data stack. Transform raw data into clean, tested, and documented analytical datasets using SQL (dbt), Python DataFrames (pandas, polars, PySpark), and pipeline orchestration (Airflow, Dagster, Prefect).
在现代数据栈中选择并实施数据转换模式。借助SQL(dbt)、Python DataFrame(pandas、polars、PySpark)以及管道编排工具(Airflow、Dagster、Prefect),将原始数据转换为干净、经过测试且有文档记录的分析数据集。

When to Use

适用场景

Invoke this skill when:
  • Choosing between ETL and ELT transformation patterns
  • Building dbt models (staging, intermediate, marts)
  • Implementing incremental data loads and merge strategies
  • Migrating pandas code to polars for performance improvements
  • Orchestrating data pipelines with dependencies and retries
  • Adding data quality tests and validation
  • Processing large datasets with PySpark
  • Creating production-ready transformation workflows
在以下场景中使用本技能:
  • 选择ETL与ELT转换模式
  • 构建dbt模型(staging层、intermediate层、marts层)
  • 实现增量数据加载与合并策略
  • 将pandas代码迁移至polars以提升性能
  • 编排带有依赖关系与重试机制的数据管道
  • 添加数据质量测试与验证
  • 使用PySpark处理大型数据集
  • 创建可用于生产环境的转换工作流

Quick Start: Common Patterns

快速开始:常见模式

dbt Incremental Model

dbt增量模型

sql
{{
  config(
    materialized='incremental',
    unique_key='order_id'
  )
}}

select order_id, customer_id, order_created_at, sum(revenue) as total_revenue
from {{ ref('int_order_items_joined') }}
group by 1, 2, 3

{% if is_incremental() %}
    where order_created_at > (select max(order_created_at) from {{ this }})
{% endif %}
sql
{{
  config(
    materialized='incremental',
    unique_key='order_id'
  )
}}

select order_id, customer_id, order_created_at, sum(revenue) as total_revenue
from {{ ref('int_order_items_joined') }}
group by 1, 2, 3

{% if is_incremental() %}
    where order_created_at > (select max(order_created_at) from {{ this }})
{% endif %}

polars High-Performance Transformation

polars高性能转换

python
import polars as pl

result = (
    pl.scan_csv('large_dataset.csv')
    .filter(pl.col('year') == 2024)
    .with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
    .group_by('region')
    .agg(pl.col('revenue').sum())
    .collect()  # Execute lazy query
)
python
import polars as pl

result = (
    pl.scan_csv('large_dataset.csv')
    .filter(pl.col('year') == 2024)
    .with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
    .group_by('region')
    .agg(pl.col('revenue').sum())
    .collect()  # Execute lazy query
)

Airflow Data Pipeline

Airflow数据管道

python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

with DAG(
    dag_id='daily_sales_pipeline',
    schedule_interval='0 2 * * *',
    default_args={'retries': 2, 'retry_delay': timedelta(minutes=5)},
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    extract = PythonOperator(task_id='extract', python_callable=extract_data)
    transform = PythonOperator(task_id='transform', python_callable=transform_data)
    extract >> transform
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

with DAG(
    dag_id='daily_sales_pipeline',
    schedule_interval='0 2 * * *',
    default_args={'retries': 2, 'retry_delay': timedelta(minutes=5)},
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    extract = PythonOperator(task_id='extract', python_callable=extract_data)
    transform = PythonOperator(task_id='transform', python_callable=transform_data)
    extract >> transform

Decision Frameworks

决策框架

ETL vs ELT Selection

ETL与ELT选择

Use ELT (Extract, Load, Transform) when:
  • Using modern cloud data warehouse (Snowflake, BigQuery, Databricks)
  • Transformation logic changes frequently
  • Team includes SQL analysts
  • Data volume 10GB-1TB+ (leverage warehouse parallelism)
Tools: dbt, Dataform, Snowflake tasks, BigQuery scheduled queries
Use ETL (Extract, Transform, Load) when:
  • Regulatory compliance requires pre-load data redaction (PII/PHI)
  • Target system lacks compute power
  • Real-time streaming with immediate transformation
  • Legacy systems without cloud warehouse
Tools: AWS Glue, Azure Data Factory, custom Python scripts
Use Hybrid when combining sensitive data cleansing (ETL) with analytics transformations (ELT).
Default recommendation: ELT with dbt unless specific compliance or performance constraints require ETL.
For detailed patterns, see
references/etl-vs-elt-patterns.md
.
选择ELT(抽取、加载、转换)的场景
  • 使用现代云数据仓库(Snowflake、BigQuery、Databricks)
  • 转换逻辑频繁变更
  • 团队包含SQL分析师
  • 数据量在10GB-1TB+(利用仓库并行计算能力)
工具:dbt、Dataform、Snowflake Tasks、BigQuery定时查询
选择ETL(抽取、转换、加载)的场景
  • 合规要求在加载前对数据进行脱敏(PII/PHI数据)
  • 目标系统计算能力不足
  • 需要实时流处理与即时转换
  • 使用无云数据仓库的遗留系统
工具:AWS Glue、Azure Data Factory、自定义Python脚本
混合模式:结合敏感数据清洗(ETL)与分析转换(ELT)时使用。
默认推荐:除非有特定合规或性能限制要求使用ETL,否则优先选择基于dbt的ELT模式。
详细模式请参考
references/etl-vs-elt-patterns.md

DataFrame Library Selection

DataFrame库选择

Choose pandas when:
  • Data size < 500MB
  • Prototyping or exploratory analysis
  • Need compatibility with pandas-only libraries
Choose polars when:
  • Data size 500MB-100GB
  • Performance critical (10-100x faster than pandas)
  • Production pipelines with memory constraints
  • Want lazy evaluation with query optimization
Choose PySpark when:
  • Data size > 100GB
  • Need distributed processing across cluster
  • Existing Spark infrastructure (EMR, Databricks)
Migration path: pandas → polars (easier, similar API) or pandas → PySpark (requires cluster)
For comparisons and migration guides, see
references/dataframe-comparison.md
.
选择pandas的场景
  • 数据量<500MB
  • 原型开发或探索性分析
  • 需要与仅支持pandas的库兼容
选择polars的场景
  • 数据量在500MB-100GB
  • 性能要求高(比pandas快10-100倍)
  • 生产管道存在内存限制
  • 希望使用延迟查询与查询优化
选择PySpark的场景
  • 数据量>100GB
  • 需要跨集群的分布式处理
  • 已有Spark基础设施(EMR、Databricks)
迁移路径:pandas → polars(更简单,API相似)或pandas → PySpark(需要集群支持)
对比与迁移指南请参考
references/dataframe-comparison.md

Orchestration Tool Selection

编排工具选择

Choose Airflow when:
  • Enterprise production (proven at scale)
  • Need 5,000+ integrations
  • Managed services available (AWS MWAA, GCP Cloud Composer)
Choose Dagster when:
  • Heavy dbt usage (native
    dbt_assets
    integration)
  • Data lineage and asset-based workflows prioritized
  • ML pipelines requiring testability
Choose Prefect when:
  • Dynamic workflows (runtime task generation)
  • Cloud-native architecture preferred
  • Pythonic API with decorators
Safe default: Airflow (battle-tested) unless specific needs for Dagster/Prefect.
For detailed patterns, see
references/orchestration-patterns.md
.
选择Airflow的场景
  • 企业级生产环境(经大规模验证)
  • 需要5000+集成能力
  • 有托管服务可用(AWS MWAA、GCP Cloud Composer)
选择Dagster的场景
  • 大量使用dbt(原生
    dbt_assets
    集成)
  • 优先考虑数据血缘与基于资产的工作流
  • 需要可测试的机器学习管道
选择Prefect的场景
  • 动态工作流(运行时生成任务)
  • 偏好云原生架构
  • 采用Python装饰器的API
安全默认选项:Airflow(久经考验),除非对Dagster/Prefect有特定需求。
详细模式请参考
references/orchestration-patterns.md

SQL Transformations with dbt

基于dbt的SQL转换

Model Layer Structure

模型分层结构

  1. Staging Layer (
    models/staging/
    )
    • 1:1 with source tables
    • Minimal transformations (renaming, type casting, basic filtering)
    • Materialized as views or ephemeral
  2. Intermediate Layer (
    models/intermediate/
    )
    • Business logic and complex joins
    • Not exposed to end users
    • Often ephemeral (CTEs only)
  3. Marts Layer (
    models/marts/
    )
    • Final models for reporting
    • Fact tables (events, transactions)
    • Dimension tables (customers, products)
    • Materialized as tables or incremental
  1. Staging层
    models/staging/
    • 与源表1:1对应
    • 仅做最小转换(重命名、类型转换、基础过滤)
    • 以视图或临时表形式物化
  2. Intermediate层
    models/intermediate/
    • 包含业务逻辑与复杂关联
    • 不向终端用户开放
    • 通常为临时表(仅CTE)
  3. Marts层
    models/marts/
    • 用于报表的最终模型
    • 事实表(事件、交易)
    • 维度表(客户、产品)
    • 以表或增量形式物化

dbt Materialization Types

dbt物化类型

View: Query re-run each time model referenced. Use for fast queries, staging layer.
Table: Full refresh on each run. Use for frequently queried models, expensive computations.
Incremental: Only processes new/changed records. Use for large fact tables, event logs.
Ephemeral: CTE only, not persisted. Use for intermediate calculations.
View:每次引用模型时重新执行查询。适用于快速查询、Staging层。
Table:每次运行时全量刷新。适用于频繁查询的模型、计算成本高的任务。
Incremental:仅处理新增/变更记录。适用于大型事实表、事件日志。
Ephemeral:仅作为CTE,不持久化。适用于中间计算。

dbt Testing

dbt测试

yaml
models:
  - name: fct_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - relationships:
              to: ref('dim_customers')
              field: customer_id
      - name: total_revenue
        tests:
          - dbt_utils.accepted_range:
              min_value: 0
For comprehensive dbt patterns, see:
  • references/dbt-best-practices.md
  • references/incremental-strategies.md
yaml
models:
  - name: fct_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - relationships:
              to: ref('dim_customers')
              field: customer_id
      - name: total_revenue
        tests:
          - dbt_utils.accepted_range:
              min_value: 0
完整dbt模式请参考:
  • references/dbt-best-practices.md
  • references/incremental-strategies.md

Python DataFrame Transformations

Python DataFrame转换

pandas Transformation

pandas转换

python
import pandas as pd

df = pd.read_csv('sales.csv')
result = (
    df
    .query('year == 2024')
    .assign(revenue=lambda x: x['quantity'] * x['price'])
    .groupby('region')
    .agg({'revenue': ['sum', 'mean']})
)
python
import pandas as pd

df = pd.read_csv('sales.csv')
result = (
    df
    .query('year == 2024')
    .assign(revenue=lambda x: x['quantity'] * x['price'])
    .groupby('region')
    .agg({'revenue': ['sum', 'mean']})
)

polars Transformation (10-100x Faster)

polars转换(快10-100倍)

python
import polars as pl

result = (
    pl.scan_csv('sales.csv')  # Lazy evaluation
    .filter(pl.col('year') == 2024)
    .with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
    .group_by('region')
    .agg([
        pl.col('revenue').sum().alias('revenue_sum'),
        pl.col('revenue').mean().alias('revenue_mean')
    ])
    .collect()  # Execute lazy query
)
Key differences:
  • polars uses
    scan_csv()
    (lazy) vs pandas
    read_csv()
    (eager)
  • polars uses
    with_columns()
    vs pandas
    assign()
  • polars uses
    pl.col()
    expressions vs pandas string references
  • polars requires
    collect()
    to execute lazy queries
python
import polars as pl

result = (
    pl.scan_csv('sales.csv')  # Lazy evaluation
    .filter(pl.col('year') == 2024)
    .with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
    .group_by('region')
    .agg([
        pl.col('revenue').sum().alias('revenue_sum'),
        pl.col('revenue').mean().alias('revenue_mean')
    ])
    .collect()  # Execute lazy query
)
核心差异
  • polars使用
    scan_csv()
    (延迟加载),pandas使用
    read_csv()
    (即时加载)
  • polars使用
    with_columns()
    ,pandas使用
    assign()
  • polars使用
    pl.col()
    表达式,pandas使用字符串引用
  • polars需要
    collect()
    执行延迟查询

PySpark for Distributed Processing

PySpark分布式处理

python
from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.appName("Transform").getOrCreate()
df = spark.read.csv('sales.csv', header=True, inferSchema=True)

result = (
    df
    .filter(F.col('year') == 2024)
    .withColumn('revenue', F.col('quantity') * F.col('price'))
    .groupBy('region')
    .agg(F.sum('revenue').alias('total_revenue'))
)
For migration guides, see
references/dataframe-comparison.md
.
python
from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.appName("Transform").getOrCreate()
df = spark.read.csv('sales.csv', header=True, inferSchema=True)

result = (
    df
    .filter(F.col('year') == 2024)
    .withColumn('revenue', F.col('quantity') * F.col('price'))
    .groupBy('region')
    .agg(F.sum('revenue').alias('total_revenue'))
)
迁移指南请参考
references/dataframe-comparison.md

Pipeline Orchestration

管道编排

Airflow DAG Structure

Airflow DAG结构

python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='data_pipeline',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    task1 = PythonOperator(task_id='extract', python_callable=extract_fn)
    task2 = PythonOperator(task_id='transform', python_callable=transform_fn)
    task1 >> task2  # Define dependency
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='data_pipeline',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    task1 = PythonOperator(task_id='extract', python_callable=extract_fn)
    task2 = PythonOperator(task_id='transform', python_callable=transform_fn)
    task1 >> task2  # Define dependency

Task Dependency Patterns

任务依赖模式

Linear:
A >> B >> C
(sequential) Fan-out:
A >> [B, C, D]
(parallel after A) Fan-in:
[A, B, C] >> D
(D waits for all)
For Airflow, Dagster, and Prefect patterns, see
references/orchestration-patterns.md
.
线性
A >> B >> C
(顺序执行) 扇出
A >> [B, C, D]
(A执行后并行执行B、C、D) 扇入
[A, B, C] >> D
(D等待A、B、C全部完成)
Airflow、Dagster与Prefect模式请参考
references/orchestration-patterns.md

Data Quality and Testing

数据质量与测试

dbt Tests

dbt测试

Generic tests (reusable): unique, not_null, accepted_values, relationships
Singular tests (custom SQL):
sql
-- tests/assert_positive_revenue.sql
select * from {{ ref('fct_orders') }}
where total_revenue < 0
通用测试(可复用):unique、not_null、accepted_values、relationships
自定义测试(自定义SQL):
sql
-- tests/assert_positive_revenue.sql
select * from {{ ref('fct_orders') }}
where total_revenue < 0

Great Expectations

Great Expectations

python
import great_expectations as gx

context = gx.get_context()
suite = context.add_expectation_suite("orders_suite")

suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="total_revenue", min_value=0
    )
)
For comprehensive testing patterns, see
references/data-quality-testing.md
.
python
import great_expectations as gx

context = gx.get_context()
suite = context.add_expectation_suite("orders_suite")

suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="total_revenue", min_value=0
    )
)
完整测试模式请参考
references/data-quality-testing.md

Advanced SQL Patterns

高级SQL模式

Window functions for analytics:
sql
select
    order_date,
    daily_revenue,
    avg(daily_revenue) over (
        partition by region
        order by order_date
        rows between 6 preceding and current row
    ) as revenue_7d_ma,
    sum(daily_revenue) over (
        partition by region
        order by order_date
    ) as cumulative_revenue
from daily_sales
For advanced window functions, see
references/window-functions-guide.md
.
用于分析的窗口函数:
sql
select
    order_date,
    daily_revenue,
    avg(daily_revenue) over (
        partition by region
        order by order_date
        rows between 6 preceding and current row
    ) as revenue_7d_ma,
    sum(daily_revenue) over (
        partition by region
        order by order_date
    ) as cumulative_revenue
from daily_sales
高级窗口函数请参考
references/window-functions-guide.md

Production Best Practices

生产环境最佳实践

Idempotency

幂等性

Ensure transformations produce same result when run multiple times:
  • Use
    merge
    statements in incremental models
  • Implement deduplication logic
  • Use
    unique_key
    in dbt incremental models
确保转换任务多次运行结果一致:
  • 在增量模型中使用
    merge
    语句
  • 实现去重逻辑
  • 在dbt增量模型中使用
    unique_key

Incremental Loading

增量加载

sql
{% if is_incremental() %}
    where created_at > (select max(created_at) from {{ this }})
{% endif %}
sql
{% if is_incremental() %}
    where created_at > (select max(created_at) from {{ this }})
{% endif %}

Error Handling

错误处理

python
try:
    result = perform_transformation()
    validate_result(result)
except ValidationError as e:
    log_error(e)
    raise
python
try:
    result = perform_transformation()
    validate_result(result)
except ValidationError as e:
    log_error(e)
    raise

Monitoring

监控

  • Set up Airflow email/Slack alerts on task failure
  • Monitor dbt test failures
  • Track data freshness (SLAs)
  • Log row counts and data quality metrics
  • 配置Airflow任务失败时的邮件/Slack告警
  • 监控dbt测试失败情况
  • 跟踪数据新鲜度(SLA)
  • 记录行数与数据质量指标

Tool Recommendations

工具推荐

SQL Transformations: dbt Core (industry standard, multi-warehouse, rich ecosystem)
bash
pip install dbt-core dbt-snowflake
Python DataFrames: polars (10-100x faster than pandas, multi-threaded, lazy evaluation)
bash
pip install polars
Orchestration: Apache Airflow (battle-tested at scale, 5,000+ integrations)
bash
pip install apache-airflow
SQL转换:dbt Core(行业标准,支持多仓库,生态丰富)
bash
pip install dbt-core dbt-snowflake
Python DataFrame:polars(比pandas快10-100倍,多线程,延迟查询)
bash
pip install polars
编排工具:Apache Airflow(久经大规模验证,5000+集成)
bash
pip install apache-airflow

Examples

示例

Working examples in:
  • examples/python/pandas-basics.py
    - pandas transformations
  • examples/python/polars-migration.py
    - pandas to polars migration
  • examples/python/pyspark-transformations.py
    - PySpark operations
  • examples/python/airflow-data-pipeline.py
    - Complete Airflow DAG
  • examples/sql/dbt-staging-model.sql
    - dbt staging layer
  • examples/sql/dbt-intermediate-model.sql
    - dbt intermediate layer
  • examples/sql/dbt-incremental-model.sql
    - Incremental patterns
  • examples/sql/window-functions.sql
    - Advanced SQL
可运行示例位于:
  • examples/python/pandas-basics.py
    - pandas转换
  • examples/python/polars-migration.py
    - pandas转polars迁移
  • examples/python/pyspark-transformations.py
    - PySpark操作
  • examples/python/airflow-data-pipeline.py
    - 完整Airflow DAG
  • examples/sql/dbt-staging-model.sql
    - dbt Staging层
  • examples/sql/dbt-intermediate-model.sql
    - dbt Intermediate层
  • examples/sql/dbt-incremental-model.sql
    - 增量模式
  • examples/sql/window-functions.sql
    - 高级SQL

Scripts

脚本

  • scripts/generate_dbt_models.py
    - Generate dbt model boilerplate
  • scripts/benchmark_dataframes.py
    - Compare pandas vs polars performance
  • scripts/generate_dbt_models.py
    - 生成dbt模型模板
  • scripts/benchmark_dataframes.py
    - 对比pandas与polars性能

Related Skills

相关技能

For data ingestion patterns, see
ingesting-data
. For data visualization, see
visualizing-data
. For database design, see
databases-*
skills. For real-time streaming, see
streaming-data
. For data platform architecture, see
ai-data-engineering
. For monitoring pipelines, see
observability
.
数据抽取模式请参考
ingesting-data
。 数据可视化请参考
visualizing-data
。 数据库设计请参考
databases-*
技能。 实时流处理请参考
streaming-data
。 数据平台架构请参考
ai-data-engineering
。 管道监控请参考
observability

Merged Content from etl-pipelines

合并自etl-pipelines的内容


name: data_transform description: Design ETL/ELT pipelines with proper orchestration, error handling, and monitoring. Use when building data pipelines, designing data workflows, or implementing data transformations.


name: data_transform description: Design ETL/ELT pipelines with proper orchestration, error handling, and monitoring. Use when building data pipelines, designing data workflows, or implementing data transformations.

ETL Designer

ETL设计器

Design robust ETL/ELT pipelines for data processing.
设计稳健的ETL/ELT数据处理管道。

Quick Start

快速开始

Use Airflow for orchestration, implement idempotent operations, add error handling, monitor pipeline health.
使用Airflow进行编排,实现幂等操作,添加错误处理,监控管道健康状态。

Instructions

操作指南

Airflow DAG Structure

Airflow DAG结构

python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['alerts@company.com']
}

with DAG(
    'etl_pipeline',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    
    extract = PythonOperator(
        task_id='extract_data',
        python_callable=extract_from_source
    )
    
    transform = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data
    )
    
    load = PythonOperator(
        task_id='load_to_warehouse',
        python_callable=load_to_warehouse
    )
    
    extract >> transform >> load
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['alerts@company.com']
}

with DAG(
    'etl_pipeline',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    
    extract = PythonOperator(
        task_id='extract_data',
        python_callable=extract_from_source
    )
    
    transform = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data
    )
    
    load = PythonOperator(
        task_id='load_to_warehouse',
        python_callable=load_to_warehouse
    )
    
    extract >> transform >> load

Incremental Processing

增量处理

python
def extract_incremental(last_run_date):
    query = f"""
        SELECT * FROM source_table
        WHERE updated_at > '{last_run_date}'
    """
    return pd.read_sql(query, conn)
python
def extract_incremental(last_run_date):
    query = f"""
        SELECT * FROM source_table
        WHERE updated_at > '{last_run_date}'
    """
    return pd.read_sql(query, conn)

Error Handling

错误处理

python
def safe_transform(data):
    try:
        transformed = transform_data(data)
        return transformed
    except Exception as e:
        logger.error(f"Transform failed: {e}")
        send_alert(f"Pipeline failed: {e}")
        raise
python
def safe_transform(data):
    try:
        transformed = transform_data(data)
        return transformed
    except Exception as e:
        logger.error(f"Transform failed: {e}")
        send_alert(f"Pipeline failed: {e}")
        raise

Best Practices

最佳实践

🔄 Workflow

🔄 工作流

Aşama 1: Data Contract & Source Audit

步骤1:数据契约与源数据审计

  • Data Contracts: Veri kaynağı (Source) ve hedef (Target) arasındaki şemayı sabitle.
  • Profiling: Ham verideki eksikleri, null oranlarını ve tipleri (Profiling) analiz et.
  • Pattern Selection: Veri boyutuna göre ETL (Pandas/Polars) veya ELT (SQL/dbt) seçimi yap.
  • 数据契约:固定数据源(Source)与目标(Target)之间的 schema。
  • 数据探查:分析原始数据中的缺失值、空值占比与数据类型。
  • 模式选择:根据数据量选择ETL(Pandas/Polars)或ELT(SQL/dbt)。

Aşama 2: Transformation Engine Setup

步骤2:转换引擎设置

  • Infrastructure:
    dbt-core
    profilini kur veya Cloud IDE yapılandır.
  • Modular Modeling: Veriyi Staging (Renaming), Intermediate (Logic) ve Marts (Final) katmanlarına ayır.
  • Polars Optimization: Python tabanlı dönüşümlerde
    lazy
    modunu (
    scan_csv
    /
    collect
    ) kullanarak bellek ve hız optimizasyonu yap.
  • 基础设施:配置
    dbt-core
    环境或云IDE。
  • 模块化建模:将数据分为Staging(重命名)、Intermediate(逻辑处理)与Marts(最终输出)三层。
  • Polars优化:基于Python的转换中使用
    lazy
    模式(
    scan_csv
    /
    collect
    )优化内存与速度。

Aşama 3: Testing & Orchestration

步骤3:测试与编排

  • Unit Tests: Kritik dönüşüm mantığı için
    dbt tests
    veya
    Great Expectations
    ile validation yaz.
  • Idempotency: Boru hattının (Pipeline) hata durumunda tekrar çalıştırılabilir (Idempotent) olduğundan emin ol.
  • Orchestration: İş akışını Airflow veya Dagster üzerinde takvime bağla ve hata bildirimlerini kur.
  • 单元测试:使用
    dbt tests
    Great Expectations
    编写关键转换逻辑的验证用例。
  • 幂等性:确保管道在出错时可重复执行(幂等)。
  • 编排:将工作流与Airflow或Dagster调度关联,并配置错误通知。

Kontrol Noktaları

检查点

AşamaDoğrulama
1Dönüşüm sonrası veri kaybı yaşandı mı? (Check Sum)
2dbt modellerinde
ref
fonksiyonu dışında hardcoded tablo ismi kullanıldı mı?
3Pipeline başarısız olduğunda "Rollback" veya "Reprocessing" stratejisi var mı?

Data Transformation v2.0 - With Workflow
步骤验证项
1转换后是否出现数据丢失?(校验总和)
2dbt模型中是否使用了
ref
函数以外的硬编码表名?
3管道失败时是否有回滚或重处理策略?

Data Transformation v2.0 - With Workflow