transforming-data

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Data Transformation

数据转换

Transform raw data into analytical assets using modern transformation patterns, frameworks, and orchestration tools.
利用现代转换模式、框架和编排工具,将原始数据转换为可用于分析的资产。

Purpose

用途

Select and implement data transformation patterns across the modern data stack. Transform raw data into clean, tested, and documented analytical datasets using SQL (dbt), Python DataFrames (pandas, polars, PySpark), and pipeline orchestration (Airflow, Dagster, Prefect).
在现代数据栈中选择并实现数据转换模式。利用SQL(dbt)、Python DataFrames(pandas、polars、PySpark)以及管道编排工具(Airflow、Dagster、Prefect)将原始数据转换为干净、经过测试且有文档记录的分析数据集。

When to Use

适用场景

Invoke this skill when:
  • Choosing between ETL and ELT transformation patterns
  • Building dbt models (staging, intermediate, marts)
  • Implementing incremental data loads and merge strategies
  • Migrating pandas code to polars for performance improvements
  • Orchestrating data pipelines with dependencies and retries
  • Adding data quality tests and validation
  • Processing large datasets with PySpark
  • Creating production-ready transformation workflows
在以下场景中调用此技能:
  • 选择ETL与ELT转换模式
  • 构建dbt模型(分层、中间层、数据集市)
  • 实现增量数据加载与合并策略
  • 将pandas代码迁移至polars以提升性能
  • 编排带有依赖关系与重试机制的数据管道
  • 添加数据质量测试与验证
  • 使用PySpark处理大型数据集
  • 创建可投入生产的转换工作流

Quick Start: Common Patterns

快速入门:常见模式

dbt Incremental Model

dbt增量模型

sql
{{
  config(
    materialized='incremental',
    unique_key='order_id'
  )
}}

select order_id, customer_id, order_created_at, sum(revenue) as total_revenue
from {{ ref('int_order_items_joined') }}
group by 1, 2, 3

{% if is_incremental() %}
    where order_created_at > (select max(order_created_at) from {{ this }})
{% endif %}
sql
{{
  config(
    materialized='incremental',
    unique_key='order_id'
  )
}}

select order_id, customer_id, order_created_at, sum(revenue) as total_revenue
from {{ ref('int_order_items_joined') }}
group by 1, 2, 3

{% if is_incremental() %}
    where order_created_at > (select max(order_created_at) from {{ this }})
{% endif %}

polars High-Performance Transformation

polars高性能转换

python
import polars as pl

result = (
    pl.scan_csv('large_dataset.csv')
    .filter(pl.col('year') == 2024)
    .with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
    .group_by('region')
    .agg(pl.col('revenue').sum())
    .collect()  # Execute lazy query
)
python
import polars as pl

result = (
    pl.scan_csv('large_dataset.csv')
    .filter(pl.col('year') == 2024)
    .with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
    .group_by('region')
    .agg(pl.col('revenue').sum())
    .collect()  # Execute lazy query
)

Airflow Data Pipeline

Airflow数据管道

python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

with DAG(
    dag_id='daily_sales_pipeline',
    schedule_interval='0 2 * * *',
    default_args={'retries': 2, 'retry_delay': timedelta(minutes=5)},
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    extract = PythonOperator(task_id='extract', python_callable=extract_data)
    transform = PythonOperator(task_id='transform', python_callable=transform_data)
    extract >> transform
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

with DAG(
    dag_id='daily_sales_pipeline',
    schedule_interval='0 2 * * *',
    default_args={'retries': 2, 'retry_delay': timedelta(minutes=5)},
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    extract = PythonOperator(task_id='extract', python_callable=extract_data)
    transform = PythonOperator(task_id='transform', python_callable=transform_data)
    extract >> transform

Decision Frameworks

决策框架

ETL vs ELT Selection

ETL与ELT选择

Use ELT (Extract, Load, Transform) when:
  • Using modern cloud data warehouse (Snowflake, BigQuery, Databricks)
  • Transformation logic changes frequently
  • Team includes SQL analysts
  • Data volume 10GB-1TB+ (leverage warehouse parallelism)
Tools: dbt, Dataform, Snowflake tasks, BigQuery scheduled queries
Use ETL (Extract, Transform, Load) when:
  • Regulatory compliance requires pre-load data redaction (PII/PHI)
  • Target system lacks compute power
  • Real-time streaming with immediate transformation
  • Legacy systems without cloud warehouse
Tools: AWS Glue, Azure Data Factory, custom Python scripts
Use Hybrid when combining sensitive data cleansing (ETL) with analytics transformations (ELT).
Default recommendation: ELT with dbt unless specific compliance or performance constraints require ETL.
For detailed patterns, see
references/etl-vs-elt-patterns.md
.
**选择ELT(抽取、加载、转换)**的场景:
  • 使用现代云数据仓库(Snowflake、BigQuery、Databricks)
  • 转换逻辑频繁变更
  • 团队包含SQL分析师
  • 数据量在10GB-1TB+(利用仓库并行计算能力)
工具:dbt、Dataform、Snowflake tasks、BigQuery定时查询
**选择ETL(抽取、转换、加载)**的场景:
  • 合规要求在加载前对数据进行脱敏(PII/PHI数据)
  • 目标系统计算能力不足
  • 需要实时流处理与即时转换
  • 无云数据仓库的遗留系统
工具:AWS Glue、Azure Data Factory、自定义Python脚本
混合模式:结合敏感数据清洗(ETL)与分析转换(ELT)的场景。
默认推荐:除非有特定合规或性能限制要求使用ETL,否则优先选择基于dbt的ELT模式。
如需详细模式,请查看
references/etl-vs-elt-patterns.md

DataFrame Library Selection

DataFrame库选择

Choose pandas when:
  • Data size < 500MB
  • Prototyping or exploratory analysis
  • Need compatibility with pandas-only libraries
Choose polars when:
  • Data size 500MB-100GB
  • Performance critical (10-100x faster than pandas)
  • Production pipelines with memory constraints
  • Want lazy evaluation with query optimization
Choose PySpark when:
  • Data size > 100GB
  • Need distributed processing across cluster
  • Existing Spark infrastructure (EMR, Databricks)
Migration path: pandas → polars (easier, similar API) or pandas → PySpark (requires cluster)
For comparisons and migration guides, see
references/dataframe-comparison.md
.
选择pandas的场景:
  • 数据量<500MB
  • 原型开发或探索性分析
  • 需要与仅支持pandas的库兼容
选择polars的场景:
  • 数据量在500MB-100GB
  • 性能要求严苛(比pandas快10-100倍)
  • 有内存限制的生产管道
  • 希望使用延迟查询与查询优化
选择PySpark的场景:
  • 数据量>100GB
  • 需要跨集群的分布式处理
  • 已有Spark基础设施(EMR、Databricks)
迁移路径:pandas → polars(更简单,API相似)或pandas → PySpark(需要集群支持)
如需对比与迁移指南,请查看
references/dataframe-comparison.md

Orchestration Tool Selection

编排工具选择

Choose Airflow when:
  • Enterprise production (proven at scale)
  • Need 5,000+ integrations
  • Managed services available (AWS MWAA, GCP Cloud Composer)
Choose Dagster when:
  • Heavy dbt usage (native
    dbt_assets
    integration)
  • Data lineage and asset-based workflows prioritized
  • ML pipelines requiring testability
Choose Prefect when:
  • Dynamic workflows (runtime task generation)
  • Cloud-native architecture preferred
  • Pythonic API with decorators
Safe default: Airflow (battle-tested) unless specific needs for Dagster/Prefect.
For detailed patterns, see
references/orchestration-patterns.md
.
选择Airflow的场景:
  • 企业级生产环境(经大规模验证)
  • 需要5000+集成能力
  • 有托管服务可用(AWS MWAA、GCP Cloud Composer)
选择Dagster的场景:
  • 重度使用dbt(原生
    dbt_assets
    集成)
  • 优先考虑数据血缘与基于资产的工作流
  • 需要可测试的ML管道
选择Prefect的场景:
  • 动态工作流(运行时生成任务)
  • 偏好云原生架构
  • 基于装饰器的Pythonic API
安全默认选项:Airflow(久经考验),除非对Dagster/Prefect有特定需求。
如需详细模式,请查看
references/orchestration-patterns.md

SQL Transformations with dbt

基于dbt的SQL转换

Model Layer Structure

模型分层结构

  1. Staging Layer (
    models/staging/
    )
    • 1:1 with source tables
    • Minimal transformations (renaming, type casting, basic filtering)
    • Materialized as views or ephemeral
  2. Intermediate Layer (
    models/intermediate/
    )
    • Business logic and complex joins
    • Not exposed to end users
    • Often ephemeral (CTEs only)
  3. Marts Layer (
    models/marts/
    )
    • Final models for reporting
    • Fact tables (events, transactions)
    • Dimension tables (customers, products)
    • Materialized as tables or incremental
  1. 分层(Staging Layer) (
    models/staging/
    )
    • 与源表1:1对应
    • 最小化转换(重命名、类型转换、基础过滤)
    • 物化为视图或临时表
  2. 中间层(Intermediate Layer) (
    models/intermediate/
    )
    • 包含业务逻辑与复杂关联
    • 不对外开放给终端用户
    • 通常为临时表(仅CTE)
  3. 数据集市层(Marts Layer) (
    models/marts/
    )
    • 用于报表的最终模型
    • 事实表(事件、交易)
    • 维度表(客户、产品)
    • 物化为表或增量表

dbt Materialization Types

dbt物化类型

View: Query re-run each time model referenced. Use for fast queries, staging layer.
Table: Full refresh on each run. Use for frequently queried models, expensive computations.
Incremental: Only processes new/changed records. Use for large fact tables, event logs.
Ephemeral: CTE only, not persisted. Use for intermediate calculations.
View:每次引用模型时重新执行查询。适用于快速查询、分层场景。
Table:每次运行时全量刷新。适用于频繁查询的模型、计算成本高的场景。
Incremental:仅处理新增/变更记录。适用于大型事实表、事件日志。
Ephemeral:仅为CTE,不持久化。适用于中间计算。

dbt Testing

dbt测试

yaml
models:
  - name: fct_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - relationships:
              to: ref('dim_customers')
              field: customer_id
      - name: total_revenue
        tests:
          - dbt_utils.accepted_range:
              min_value: 0
For comprehensive dbt patterns, see:
  • references/dbt-best-practices.md
  • references/incremental-strategies.md
yaml
models:
  - name: fct_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - relationships:
              to: ref('dim_customers')
              field: customer_id
      - name: total_revenue
        tests:
          - dbt_utils.accepted_range:
              min_value: 0
如需全面的dbt模式,请查看:
  • references/dbt-best-practices.md
  • references/incremental-strategies.md

Python DataFrame Transformations

Python DataFrame转换

pandas Transformation

pandas转换

python
import pandas as pd

df = pd.read_csv('sales.csv')
result = (
    df
    .query('year == 2024')
    .assign(revenue=lambda x: x['quantity'] * x['price'])
    .groupby('region')
    .agg({'revenue': ['sum', 'mean']})
)
python
import pandas as pd

df = pd.read_csv('sales.csv')
result = (
    df
    .query('year == 2024')
    .assign(revenue=lambda x: x['quantity'] * x['price'])
    .groupby('region')
    .agg({'revenue': ['sum', 'mean']})
)

polars Transformation (10-100x Faster)

polars转换(比pandas快10-100倍)

python
import polars as pl

result = (
    pl.scan_csv('sales.csv')  # Lazy evaluation
    .filter(pl.col('year') == 2024)
    .with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
    .group_by('region')
    .agg([
        pl.col('revenue').sum().alias('revenue_sum'),
        pl.col('revenue').mean().alias('revenue_mean')
    ])
    .collect()  # Execute lazy query
)
Key differences:
  • polars uses
    scan_csv()
    (lazy) vs pandas
    read_csv()
    (eager)
  • polars uses
    with_columns()
    vs pandas
    assign()
  • polars uses
    pl.col()
    expressions vs pandas string references
  • polars requires
    collect()
    to execute lazy queries
python
import polars as pl

result = (
    pl.scan_csv('sales.csv')  # Lazy evaluation
    .filter(pl.col('year') == 2024)
    .with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
    .group_by('region')
    .agg([
        pl.col('revenue').sum().alias('revenue_sum'),
        pl.col('revenue').mean().alias('revenue_mean')
    ])
    .collect()  # Execute lazy query
)
核心差异
  • polars使用
    scan_csv()
    (延迟加载),而pandas使用
    read_csv()
    (即时加载)
  • polars使用
    with_columns()
    ,而pandas使用
    assign()
  • polars使用
    pl.col()
    表达式,而pandas使用字符串引用
  • polars需要
    collect()
    来执行延迟查询

PySpark for Distributed Processing

基于PySpark的分布式处理

python
from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.appName("Transform").getOrCreate()
df = spark.read.csv('sales.csv', header=True, inferSchema=True)

result = (
    df
    .filter(F.col('year') == 2024)
    .withColumn('revenue', F.col('quantity') * F.col('price'))
    .groupBy('region')
    .agg(F.sum('revenue').alias('total_revenue'))
)
For migration guides, see
references/dataframe-comparison.md
.
python
from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.appName("Transform").getOrCreate()
df = spark.read.csv('sales.csv', header=True, inferSchema=True)

result = (
    df
    .filter(F.col('year') == 2024)
    .withColumn('revenue', F.col('quantity') * F.col('price'))
    .groupBy('region')
    .agg(F.sum('revenue').alias('total_revenue'))
)
如需迁移指南,请查看
references/dataframe-comparison.md

Pipeline Orchestration

管道编排

Airflow DAG Structure

Airflow DAG结构

python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='data_pipeline',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    task1 = PythonOperator(task_id='extract', python_callable=extract_fn)
    task2 = PythonOperator(task_id='transform', python_callable=transform_fn)
    task1 >> task2  # Define dependency
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='data_pipeline',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # 每日凌晨2点
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    task1 = PythonOperator(task_id='extract', python_callable=extract_fn)
    task2 = PythonOperator(task_id='transform', python_callable=transform_fn)
    task1 >> task2  # 定义依赖关系

Task Dependency Patterns

任务依赖模式

Linear:
A >> B >> C
(sequential) Fan-out:
A >> [B, C, D]
(parallel after A) Fan-in:
[A, B, C] >> D
(D waits for all)
For Airflow, Dagster, and Prefect patterns, see
references/orchestration-patterns.md
.
线性
A >> B >> C
(顺序执行) 扇出
A >> [B, C, D]
(A执行后并行执行B、C、D) 扇入
[A, B, C] >> D
(D等待A、B、C全部完成)
如需Airflow、Dagster与Prefect的模式,请查看
references/orchestration-patterns.md

Data Quality and Testing

数据质量与测试

dbt Tests

dbt测试

Generic tests (reusable): unique, not_null, accepted_values, relationships
Singular tests (custom SQL):
sql
-- tests/assert_positive_revenue.sql
select * from {{ ref('fct_orders') }}
where total_revenue < 0
通用测试(可复用):unique、not_null、accepted_values、relationships
自定义测试(自定义SQL):
sql
-- tests/assert_positive_revenue.sql
select * from {{ ref('fct_orders') }}
where total_revenue < 0

Great Expectations

Great Expectations

python
import great_expectations as gx

context = gx.get_context()
suite = context.add_expectation_suite("orders_suite")

suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="total_revenue", min_value=0
    )
)
For comprehensive testing patterns, see
references/data-quality-testing.md
.
python
import great_expectations as gx

context = gx.get_context()
suite = context.add_expectation_suite("orders_suite")

suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="total_revenue", min_value=0
    )
)
如需全面的测试模式,请查看
references/data-quality-testing.md

Advanced SQL Patterns

高级SQL模式

Window functions for analytics:
sql
select
    order_date,
    daily_revenue,
    avg(daily_revenue) over (
        partition by region
        order by order_date
        rows between 6 preceding and current row
    ) as revenue_7d_ma,
    sum(daily_revenue) over (
        partition by region
        order by order_date
    ) as cumulative_revenue
from daily_sales
For advanced window functions, see
references/window-functions-guide.md
.
用于分析的窗口函数:
sql
select
    order_date,
    daily_revenue,
    avg(daily_revenue) over (
        partition by region
        order by order_date
        rows between 6 preceding and current row
    ) as revenue_7d_ma,
    sum(daily_revenue) over (
        partition by region
        order by order_date
    ) as cumulative_revenue
from daily_sales
如需高级窗口函数指南,请查看
references/window-functions-guide.md

Production Best Practices

生产环境最佳实践

Idempotency

幂等性

Ensure transformations produce same result when run multiple times:
  • Use
    merge
    statements in incremental models
  • Implement deduplication logic
  • Use
    unique_key
    in dbt incremental models
确保多次运行转换后结果一致:
  • 在增量模型中使用
    merge
    语句
  • 实现去重逻辑
  • 在dbt增量模型中使用
    unique_key

Incremental Loading

增量加载

sql
{% if is_incremental() %}
    where created_at > (select max(created_at) from {{ this }})
{% endif %}
sql
{% if is_incremental() %}
    where created_at > (select max(created_at) from {{ this }})
{% endif %}

Error Handling

错误处理

python
try:
    result = perform_transformation()
    validate_result(result)
except ValidationError as e:
    log_error(e)
    raise
python
try:
    result = perform_transformation()
    validate_result(result)
except ValidationError as e:
    log_error(e)
    raise

Monitoring

监控

  • Set up Airflow email/Slack alerts on task failure
  • Monitor dbt test failures
  • Track data freshness (SLAs)
  • Log row counts and data quality metrics
  • 配置Airflow任务失败时的邮件/Slack告警
  • 监控dbt测试失败情况
  • 跟踪数据新鲜度(SLA)
  • 记录行数与数据质量指标

Tool Recommendations

工具推荐

SQL Transformations: dbt Core (industry standard, multi-warehouse, rich ecosystem)
bash
pip install dbt-core dbt-snowflake
Python DataFrames: polars (10-100x faster than pandas, multi-threaded, lazy evaluation)
bash
pip install polars
Orchestration: Apache Airflow (battle-tested at scale, 5,000+ integrations)
bash
pip install apache-airflow
SQL转换:dbt Core(行业标准,支持多仓库,生态丰富)
bash
pip install dbt-core dbt-snowflake
Python DataFrames:polars(比pandas快10-100倍,多线程,延迟加载)
bash
pip install polars
编排工具:Apache Airflow(久经大规模验证,5000+集成)
bash
pip install apache-airflow

Examples

示例

Working examples in:
  • examples/python/pandas-basics.py
    - pandas transformations
  • examples/python/polars-migration.py
    - pandas to polars migration
  • examples/python/pyspark-transformations.py
    - PySpark operations
  • examples/python/airflow-data-pipeline.py
    - Complete Airflow DAG
  • examples/sql/dbt-staging-model.sql
    - dbt staging layer
  • examples/sql/dbt-intermediate-model.sql
    - dbt intermediate layer
  • examples/sql/dbt-incremental-model.sql
    - Incremental patterns
  • examples/sql/window-functions.sql
    - Advanced SQL
可运行示例位于:
  • examples/python/pandas-basics.py
    - pandas转换
  • examples/python/polars-migration.py
    - pandas至polars迁移
  • examples/python/pyspark-transformations.py
    - PySpark操作
  • examples/python/airflow-data-pipeline.py
    - 完整Airflow DAG
  • examples/sql/dbt-staging-model.sql
    - dbt分层模型
  • examples/sql/dbt-intermediate-model.sql
    - dbt中间层模型
  • examples/sql/dbt-incremental-model.sql
    - 增量模式
  • examples/sql/window-functions.sql
    - 高级SQL

Scripts

脚本

  • scripts/generate_dbt_models.py
    - Generate dbt model boilerplate
  • scripts/benchmark_dataframes.py
    - Compare pandas vs polars performance
  • scripts/generate_dbt_models.py
    - 生成dbt模型模板
  • scripts/benchmark_dataframes.py
    - 对比pandas与polars性能

Related Skills

相关技能

For data ingestion patterns, see
ingesting-data
. For data visualization, see
visualizing-data
. For database design, see
databases-*
skills. For real-time streaming, see
streaming-data
. For data platform architecture, see
ai-data-engineering
. For monitoring pipelines, see
observability
.
如需数据抽取模式,请查看
ingesting-data
。 如需数据可视化,请查看
visualizing-data
。 如需数据库设计,请查看
databases-*
技能。 如需实时流处理,请查看
streaming-data
。 如需数据平台架构,请查看
ai-data-engineering
。 如需管道监控,请查看
observability