transforming-data
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseData Transformation
数据转换
Transform raw data into analytical assets using modern transformation patterns, frameworks, and orchestration tools.
利用现代转换模式、框架和编排工具,将原始数据转换为可用于分析的资产。
Purpose
用途
Select and implement data transformation patterns across the modern data stack. Transform raw data into clean, tested, and documented analytical datasets using SQL (dbt), Python DataFrames (pandas, polars, PySpark), and pipeline orchestration (Airflow, Dagster, Prefect).
在现代数据栈中选择并实现数据转换模式。利用SQL(dbt)、Python DataFrames(pandas、polars、PySpark)以及管道编排工具(Airflow、Dagster、Prefect)将原始数据转换为干净、经过测试且有文档记录的分析数据集。
When to Use
适用场景
Invoke this skill when:
- Choosing between ETL and ELT transformation patterns
- Building dbt models (staging, intermediate, marts)
- Implementing incremental data loads and merge strategies
- Migrating pandas code to polars for performance improvements
- Orchestrating data pipelines with dependencies and retries
- Adding data quality tests and validation
- Processing large datasets with PySpark
- Creating production-ready transformation workflows
在以下场景中调用此技能:
- 选择ETL与ELT转换模式
- 构建dbt模型(分层、中间层、数据集市)
- 实现增量数据加载与合并策略
- 将pandas代码迁移至polars以提升性能
- 编排带有依赖关系与重试机制的数据管道
- 添加数据质量测试与验证
- 使用PySpark处理大型数据集
- 创建可投入生产的转换工作流
Quick Start: Common Patterns
快速入门:常见模式
dbt Incremental Model
dbt增量模型
sql
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
select order_id, customer_id, order_created_at, sum(revenue) as total_revenue
from {{ ref('int_order_items_joined') }}
group by 1, 2, 3
{% if is_incremental() %}
where order_created_at > (select max(order_created_at) from {{ this }})
{% endif %}sql
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
select order_id, customer_id, order_created_at, sum(revenue) as total_revenue
from {{ ref('int_order_items_joined') }}
group by 1, 2, 3
{% if is_incremental() %}
where order_created_at > (select max(order_created_at) from {{ this }})
{% endif %}polars High-Performance Transformation
polars高性能转换
python
import polars as pl
result = (
pl.scan_csv('large_dataset.csv')
.filter(pl.col('year') == 2024)
.with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
.group_by('region')
.agg(pl.col('revenue').sum())
.collect() # Execute lazy query
)python
import polars as pl
result = (
pl.scan_csv('large_dataset.csv')
.filter(pl.col('year') == 2024)
.with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
.group_by('region')
.agg(pl.col('revenue').sum())
.collect() # Execute lazy query
)Airflow Data Pipeline
Airflow数据管道
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
with DAG(
dag_id='daily_sales_pipeline',
schedule_interval='0 2 * * *',
default_args={'retries': 2, 'retry_delay': timedelta(minutes=5)},
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
extract = PythonOperator(task_id='extract', python_callable=extract_data)
transform = PythonOperator(task_id='transform', python_callable=transform_data)
extract >> transformpython
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
with DAG(
dag_id='daily_sales_pipeline',
schedule_interval='0 2 * * *',
default_args={'retries': 2, 'retry_delay': timedelta(minutes=5)},
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
extract = PythonOperator(task_id='extract', python_callable=extract_data)
transform = PythonOperator(task_id='transform', python_callable=transform_data)
extract >> transformDecision Frameworks
决策框架
ETL vs ELT Selection
ETL与ELT选择
Use ELT (Extract, Load, Transform) when:
- Using modern cloud data warehouse (Snowflake, BigQuery, Databricks)
- Transformation logic changes frequently
- Team includes SQL analysts
- Data volume 10GB-1TB+ (leverage warehouse parallelism)
Tools: dbt, Dataform, Snowflake tasks, BigQuery scheduled queries
Use ETL (Extract, Transform, Load) when:
- Regulatory compliance requires pre-load data redaction (PII/PHI)
- Target system lacks compute power
- Real-time streaming with immediate transformation
- Legacy systems without cloud warehouse
Tools: AWS Glue, Azure Data Factory, custom Python scripts
Use Hybrid when combining sensitive data cleansing (ETL) with analytics transformations (ELT).
Default recommendation: ELT with dbt unless specific compliance or performance constraints require ETL.
For detailed patterns, see .
references/etl-vs-elt-patterns.md**选择ELT(抽取、加载、转换)**的场景:
- 使用现代云数据仓库(Snowflake、BigQuery、Databricks)
- 转换逻辑频繁变更
- 团队包含SQL分析师
- 数据量在10GB-1TB+(利用仓库并行计算能力)
工具:dbt、Dataform、Snowflake tasks、BigQuery定时查询
**选择ETL(抽取、转换、加载)**的场景:
- 合规要求在加载前对数据进行脱敏(PII/PHI数据)
- 目标系统计算能力不足
- 需要实时流处理与即时转换
- 无云数据仓库的遗留系统
工具:AWS Glue、Azure Data Factory、自定义Python脚本
混合模式:结合敏感数据清洗(ETL)与分析转换(ELT)的场景。
默认推荐:除非有特定合规或性能限制要求使用ETL,否则优先选择基于dbt的ELT模式。
如需详细模式,请查看。
references/etl-vs-elt-patterns.mdDataFrame Library Selection
DataFrame库选择
Choose pandas when:
- Data size < 500MB
- Prototyping or exploratory analysis
- Need compatibility with pandas-only libraries
Choose polars when:
- Data size 500MB-100GB
- Performance critical (10-100x faster than pandas)
- Production pipelines with memory constraints
- Want lazy evaluation with query optimization
Choose PySpark when:
- Data size > 100GB
- Need distributed processing across cluster
- Existing Spark infrastructure (EMR, Databricks)
Migration path: pandas → polars (easier, similar API) or pandas → PySpark (requires cluster)
For comparisons and migration guides, see .
references/dataframe-comparison.md选择pandas的场景:
- 数据量<500MB
- 原型开发或探索性分析
- 需要与仅支持pandas的库兼容
选择polars的场景:
- 数据量在500MB-100GB
- 性能要求严苛(比pandas快10-100倍)
- 有内存限制的生产管道
- 希望使用延迟查询与查询优化
选择PySpark的场景:
- 数据量>100GB
- 需要跨集群的分布式处理
- 已有Spark基础设施(EMR、Databricks)
迁移路径:pandas → polars(更简单,API相似)或pandas → PySpark(需要集群支持)
如需对比与迁移指南,请查看。
references/dataframe-comparison.mdOrchestration Tool Selection
编排工具选择
Choose Airflow when:
- Enterprise production (proven at scale)
- Need 5,000+ integrations
- Managed services available (AWS MWAA, GCP Cloud Composer)
Choose Dagster when:
- Heavy dbt usage (native integration)
dbt_assets - Data lineage and asset-based workflows prioritized
- ML pipelines requiring testability
Choose Prefect when:
- Dynamic workflows (runtime task generation)
- Cloud-native architecture preferred
- Pythonic API with decorators
Safe default: Airflow (battle-tested) unless specific needs for Dagster/Prefect.
For detailed patterns, see .
references/orchestration-patterns.md选择Airflow的场景:
- 企业级生产环境(经大规模验证)
- 需要5000+集成能力
- 有托管服务可用(AWS MWAA、GCP Cloud Composer)
选择Dagster的场景:
- 重度使用dbt(原生集成)
dbt_assets - 优先考虑数据血缘与基于资产的工作流
- 需要可测试的ML管道
选择Prefect的场景:
- 动态工作流(运行时生成任务)
- 偏好云原生架构
- 基于装饰器的Pythonic API
安全默认选项:Airflow(久经考验),除非对Dagster/Prefect有特定需求。
如需详细模式,请查看。
references/orchestration-patterns.mdSQL Transformations with dbt
基于dbt的SQL转换
Model Layer Structure
模型分层结构
-
Staging Layer ()
models/staging/- 1:1 with source tables
- Minimal transformations (renaming, type casting, basic filtering)
- Materialized as views or ephemeral
-
Intermediate Layer ()
models/intermediate/- Business logic and complex joins
- Not exposed to end users
- Often ephemeral (CTEs only)
-
Marts Layer ()
models/marts/- Final models for reporting
- Fact tables (events, transactions)
- Dimension tables (customers, products)
- Materialized as tables or incremental
-
分层(Staging Layer) ()
models/staging/- 与源表1:1对应
- 最小化转换(重命名、类型转换、基础过滤)
- 物化为视图或临时表
-
中间层(Intermediate Layer) ()
models/intermediate/- 包含业务逻辑与复杂关联
- 不对外开放给终端用户
- 通常为临时表(仅CTE)
-
数据集市层(Marts Layer) ()
models/marts/- 用于报表的最终模型
- 事实表(事件、交易)
- 维度表(客户、产品)
- 物化为表或增量表
dbt Materialization Types
dbt物化类型
View: Query re-run each time model referenced. Use for fast queries, staging layer.
Table: Full refresh on each run. Use for frequently queried models, expensive computations.
Incremental: Only processes new/changed records. Use for large fact tables, event logs.
Ephemeral: CTE only, not persisted. Use for intermediate calculations.
View:每次引用模型时重新执行查询。适用于快速查询、分层场景。
Table:每次运行时全量刷新。适用于频繁查询的模型、计算成本高的场景。
Incremental:仅处理新增/变更记录。适用于大型事实表、事件日志。
Ephemeral:仅为CTE,不持久化。适用于中间计算。
dbt Testing
dbt测试
yaml
models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- relationships:
to: ref('dim_customers')
field: customer_id
- name: total_revenue
tests:
- dbt_utils.accepted_range:
min_value: 0For comprehensive dbt patterns, see:
references/dbt-best-practices.mdreferences/incremental-strategies.md
yaml
models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- relationships:
to: ref('dim_customers')
field: customer_id
- name: total_revenue
tests:
- dbt_utils.accepted_range:
min_value: 0如需全面的dbt模式,请查看:
references/dbt-best-practices.mdreferences/incremental-strategies.md
Python DataFrame Transformations
Python DataFrame转换
pandas Transformation
pandas转换
python
import pandas as pd
df = pd.read_csv('sales.csv')
result = (
df
.query('year == 2024')
.assign(revenue=lambda x: x['quantity'] * x['price'])
.groupby('region')
.agg({'revenue': ['sum', 'mean']})
)python
import pandas as pd
df = pd.read_csv('sales.csv')
result = (
df
.query('year == 2024')
.assign(revenue=lambda x: x['quantity'] * x['price'])
.groupby('region')
.agg({'revenue': ['sum', 'mean']})
)polars Transformation (10-100x Faster)
polars转换(比pandas快10-100倍)
python
import polars as pl
result = (
pl.scan_csv('sales.csv') # Lazy evaluation
.filter(pl.col('year') == 2024)
.with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
.group_by('region')
.agg([
pl.col('revenue').sum().alias('revenue_sum'),
pl.col('revenue').mean().alias('revenue_mean')
])
.collect() # Execute lazy query
)Key differences:
- polars uses (lazy) vs pandas
scan_csv()(eager)read_csv() - polars uses vs pandas
with_columns()assign() - polars uses expressions vs pandas string references
pl.col() - polars requires to execute lazy queries
collect()
python
import polars as pl
result = (
pl.scan_csv('sales.csv') # Lazy evaluation
.filter(pl.col('year') == 2024)
.with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
.group_by('region')
.agg([
pl.col('revenue').sum().alias('revenue_sum'),
pl.col('revenue').mean().alias('revenue_mean')
])
.collect() # Execute lazy query
)核心差异:
- polars使用(延迟加载),而pandas使用
scan_csv()(即时加载)read_csv() - polars使用,而pandas使用
with_columns()assign() - polars使用表达式,而pandas使用字符串引用
pl.col() - polars需要来执行延迟查询
collect()
PySpark for Distributed Processing
基于PySpark的分布式处理
python
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("Transform").getOrCreate()
df = spark.read.csv('sales.csv', header=True, inferSchema=True)
result = (
df
.filter(F.col('year') == 2024)
.withColumn('revenue', F.col('quantity') * F.col('price'))
.groupBy('region')
.agg(F.sum('revenue').alias('total_revenue'))
)For migration guides, see .
references/dataframe-comparison.mdpython
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("Transform").getOrCreate()
df = spark.read.csv('sales.csv', header=True, inferSchema=True)
result = (
df
.filter(F.col('year') == 2024)
.withColumn('revenue', F.col('quantity') * F.col('price'))
.groupBy('region')
.agg(F.sum('revenue').alias('total_revenue'))
)如需迁移指南,请查看。
references/dataframe-comparison.mdPipeline Orchestration
管道编排
Airflow DAG Structure
Airflow DAG结构
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='data_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
task1 = PythonOperator(task_id='extract', python_callable=extract_fn)
task2 = PythonOperator(task_id='transform', python_callable=transform_fn)
task1 >> task2 # Define dependencypython
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='data_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # 每日凌晨2点
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
task1 = PythonOperator(task_id='extract', python_callable=extract_fn)
task2 = PythonOperator(task_id='transform', python_callable=transform_fn)
task1 >> task2 # 定义依赖关系Task Dependency Patterns
任务依赖模式
Linear: (sequential)
Fan-out: (parallel after A)
Fan-in: (D waits for all)
A >> B >> CA >> [B, C, D][A, B, C] >> DFor Airflow, Dagster, and Prefect patterns, see .
references/orchestration-patterns.md线性:(顺序执行)
扇出:(A执行后并行执行B、C、D)
扇入:(D等待A、B、C全部完成)
A >> B >> CA >> [B, C, D][A, B, C] >> D如需Airflow、Dagster与Prefect的模式,请查看。
references/orchestration-patterns.mdData Quality and Testing
数据质量与测试
dbt Tests
dbt测试
Generic tests (reusable): unique, not_null, accepted_values, relationships
Singular tests (custom SQL):
sql
-- tests/assert_positive_revenue.sql
select * from {{ ref('fct_orders') }}
where total_revenue < 0通用测试(可复用):unique、not_null、accepted_values、relationships
自定义测试(自定义SQL):
sql
-- tests/assert_positive_revenue.sql
select * from {{ ref('fct_orders') }}
where total_revenue < 0Great Expectations
Great Expectations
python
import great_expectations as gx
context = gx.get_context()
suite = context.add_expectation_suite("orders_suite")
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="total_revenue", min_value=0
)
)For comprehensive testing patterns, see .
references/data-quality-testing.mdpython
import great_expectations as gx
context = gx.get_context()
suite = context.add_expectation_suite("orders_suite")
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="total_revenue", min_value=0
)
)如需全面的测试模式,请查看。
references/data-quality-testing.mdAdvanced SQL Patterns
高级SQL模式
Window functions for analytics:
sql
select
order_date,
daily_revenue,
avg(daily_revenue) over (
partition by region
order by order_date
rows between 6 preceding and current row
) as revenue_7d_ma,
sum(daily_revenue) over (
partition by region
order by order_date
) as cumulative_revenue
from daily_salesFor advanced window functions, see .
references/window-functions-guide.md用于分析的窗口函数:
sql
select
order_date,
daily_revenue,
avg(daily_revenue) over (
partition by region
order by order_date
rows between 6 preceding and current row
) as revenue_7d_ma,
sum(daily_revenue) over (
partition by region
order by order_date
) as cumulative_revenue
from daily_sales如需高级窗口函数指南,请查看。
references/window-functions-guide.mdProduction Best Practices
生产环境最佳实践
Idempotency
幂等性
Ensure transformations produce same result when run multiple times:
- Use statements in incremental models
merge - Implement deduplication logic
- Use in dbt incremental models
unique_key
确保多次运行转换后结果一致:
- 在增量模型中使用语句
merge - 实现去重逻辑
- 在dbt增量模型中使用
unique_key
Incremental Loading
增量加载
sql
{% if is_incremental() %}
where created_at > (select max(created_at) from {{ this }})
{% endif %}sql
{% if is_incremental() %}
where created_at > (select max(created_at) from {{ this }})
{% endif %}Error Handling
错误处理
python
try:
result = perform_transformation()
validate_result(result)
except ValidationError as e:
log_error(e)
raisepython
try:
result = perform_transformation()
validate_result(result)
except ValidationError as e:
log_error(e)
raiseMonitoring
监控
- Set up Airflow email/Slack alerts on task failure
- Monitor dbt test failures
- Track data freshness (SLAs)
- Log row counts and data quality metrics
- 配置Airflow任务失败时的邮件/Slack告警
- 监控dbt测试失败情况
- 跟踪数据新鲜度(SLA)
- 记录行数与数据质量指标
Tool Recommendations
工具推荐
SQL Transformations: dbt Core (industry standard, multi-warehouse, rich ecosystem)
bash
pip install dbt-core dbt-snowflakePython DataFrames: polars (10-100x faster than pandas, multi-threaded, lazy evaluation)
bash
pip install polarsOrchestration: Apache Airflow (battle-tested at scale, 5,000+ integrations)
bash
pip install apache-airflowSQL转换:dbt Core(行业标准,支持多仓库,生态丰富)
bash
pip install dbt-core dbt-snowflakePython DataFrames:polars(比pandas快10-100倍,多线程,延迟加载)
bash
pip install polars编排工具:Apache Airflow(久经大规模验证,5000+集成)
bash
pip install apache-airflowExamples
示例
Working examples in:
- - pandas transformations
examples/python/pandas-basics.py - - pandas to polars migration
examples/python/polars-migration.py - - PySpark operations
examples/python/pyspark-transformations.py - - Complete Airflow DAG
examples/python/airflow-data-pipeline.py - - dbt staging layer
examples/sql/dbt-staging-model.sql - - dbt intermediate layer
examples/sql/dbt-intermediate-model.sql - - Incremental patterns
examples/sql/dbt-incremental-model.sql - - Advanced SQL
examples/sql/window-functions.sql
可运行示例位于:
- - pandas转换
examples/python/pandas-basics.py - - pandas至polars迁移
examples/python/polars-migration.py - - PySpark操作
examples/python/pyspark-transformations.py - - 完整Airflow DAG
examples/python/airflow-data-pipeline.py - - dbt分层模型
examples/sql/dbt-staging-model.sql - - dbt中间层模型
examples/sql/dbt-intermediate-model.sql - - 增量模式
examples/sql/dbt-incremental-model.sql - - 高级SQL
examples/sql/window-functions.sql
Scripts
脚本
- - Generate dbt model boilerplate
scripts/generate_dbt_models.py - - Compare pandas vs polars performance
scripts/benchmark_dataframes.py
- - 生成dbt模型模板
scripts/generate_dbt_models.py - - 对比pandas与polars性能
scripts/benchmark_dataframes.py
Related Skills
相关技能
For data ingestion patterns, see .
For data visualization, see .
For database design, see skills.
For real-time streaming, see .
For data platform architecture, see .
For monitoring pipelines, see .
ingesting-datavisualizing-datadatabases-*streaming-dataai-data-engineeringobservability如需数据抽取模式,请查看。
如需数据可视化,请查看。
如需数据库设计,请查看技能。
如需实时流处理,请查看。
如需数据平台架构,请查看。
如需管道监控,请查看。
ingesting-datavisualizing-datadatabases-*streaming-dataai-data-engineeringobservability