data-engineering
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
Chinese<!-- Adapted from: claude-skills/engineering-team/senior-data-engineer -->
<!-- 改编自:claude-skills/engineering-team/senior-data-engineer -->
Data Engineering Guide
数据工程指南
Data pipelines, warehousing, and modern data stack.
数据管道、数据仓库与现代数据栈。
When to Use
使用场景
- Building data pipelines
- Designing data warehouses
- Implementing ETL/ELT processes
- Setting up data lakes
- Optimizing data infrastructure
- 构建数据管道
- 设计数据仓库
- 实现ETL/ELT流程
- 搭建数据湖
- 优化数据基础设施
Modern Data Stack
现代数据栈
Components
组件
Sources → Ingestion → Storage → Transform → Serve → Consume| Layer | Tools |
|---|---|
| Ingestion | Fivetran, Airbyte, Stitch |
| Storage | S3, GCS, Snowflake, BigQuery |
| Transform | dbt, Spark, Airflow |
| Orchestration | Airflow, Dagster, Prefect |
| Serving | Looker, Tableau, Metabase |
Sources → Ingestion → Storage → Transform → Serve → Consume| 层级 | 工具 |
|---|---|
| 数据采集 | Fivetran, Airbyte, Stitch |
| 数据存储 | S3, GCS, Snowflake, BigQuery |
| 数据转换 | dbt, Spark, Airflow |
| 任务编排 | Airflow, Dagster, Prefect |
| 数据服务 | Looker, Tableau, Metabase |
Data Pipeline Patterns
数据管道模式
Batch Processing
批处理
python
undefinedpython
undefinedAirflow DAG example
Airflow DAG example
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
'daily_etl',
schedule_interval='0 6 * * *',
start_date=datetime(2024, 1, 1)
)
def extract():
# Extract from source
pass
def transform():
# Transform data
pass
def load():
# Load to warehouse
pass
extract_task = PythonOperator(
task_id='extract',
python_callable=extract,
dag=dag
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform,
dag=dag
)
load_task = PythonOperator(
task_id='load',
python_callable=load,
dag=dag
)
extract_task >> transform_task >> load_task
undefinedfrom airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
'daily_etl',
schedule_interval='0 6 * * *',
start_date=datetime(2024, 1, 1)
)
def extract():
# Extract from source
pass
def transform():
# Transform data
pass
def load():
# Load to warehouse
pass
extract_task = PythonOperator(
task_id='extract',
python_callable=extract,
dag=dag
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform,
dag=dag
)
load_task = PythonOperator(
task_id='load',
python_callable=load,
dag=dag
)
extract_task >> transform_task >> load_task
undefinedStreaming Processing
流处理
python
undefinedpython
undefinedKafka consumer example
Kafka consumer example
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for message in consumer:
process_event(message.value)
undefinedfrom kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for message in consumer:
process_event(message.value)
undefineddbt Patterns
dbt模式
Model Structure
模型结构
models/
├── staging/ # 1:1 with source
│ ├── stg_orders.sql
│ └── stg_customers.sql
├── intermediate/ # Business logic
│ └── int_order_items.sql
└── marts/ # Final models
├── dim_customers.sql
└── fct_orders.sqlmodels/
├── staging/ # 与源数据1:1映射
│ ├── stg_orders.sql
│ └── stg_customers.sql
├── intermediate/ # 业务逻辑层
│ └── int_order_items.sql
└── marts/ # 最终模型层
├── dim_customers.sql
└── fct_orders.sqlExample Model
示例模型
sql
-- models/marts/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
select
o.order_id,
o.customer_id,
o.order_date,
sum(oi.quantity * oi.unit_price) as order_total
from {{ ref('stg_orders') }} o
join {{ ref('stg_order_items') }} oi
on o.order_id = oi.order_id
{% if is_incremental() %}
where o.order_date > (select max(order_date) from {{ this }})
{% endif %}
group by 1, 2, 3sql
-- models/marts/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
select
o.order_id,
o.customer_id,
o.order_date,
sum(oi.quantity * oi.unit_price) as order_total
from {{ ref('stg_orders') }} o
join {{ ref('stg_order_items') }} oi
on o.order_id = oi.order_id
{% if is_incremental() %}
where o.order_date > (select max(order_date) from {{ this }})
{% endif %}
group by 1, 2, 3Data Modeling
数据建模
Dimensional Modeling
维度建模
Fact Tables (events/transactions)
├── fct_orders
├── fct_page_views
└── fct_transactions
Dimension Tables (context)
├── dim_customers
├── dim_products
├── dim_dates
└── dim_locations事实表(事件/交易)
├── fct_orders
├── fct_page_views
└── fct_transactions
维度表(上下文信息)
├── dim_customers
├── dim_products
├── dim_dates
└── dim_locationsStar Schema
星型架构
dim_customers
│
dim_dates ── fct_orders ── dim_products
│
dim_locations dim_customers
│
dim_dates ── fct_orders ── dim_products
│
dim_locationsData Quality
数据质量
Validation Rules
验证规则
sql
-- dbt tests
models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: order_total
tests:
- not_null
- positive_valuesql
-- dbt tests
models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: order_total
tests:
- not_null
- positive_valueQuality Metrics
质量指标
| Metric | Description |
|---|---|
| Completeness | % non-null values |
| Uniqueness | % distinct values |
| Timeliness | Data freshness |
| Accuracy | Matches source |
| Consistency | Across systems |
| 指标 | 描述 |
|---|---|
| 完整性 | 非空值占比 |
| 唯一性 | 唯一值占比 |
| 及时性 | 数据新鲜度 |
| 准确性 | 与源数据匹配度 |
| 一致性 | 跨系统一致性 |
Performance Optimization
性能优化
Partitioning
分区策略
sql
-- BigQuery partitioned table
CREATE TABLE orders
PARTITION BY DATE(order_date)
CLUSTER BY customer_id
AS SELECT * FROM staging.orderssql
-- BigQuery partitioned table
CREATE TABLE orders
PARTITION BY DATE(order_date)
CLUSTER BY customer_id
AS SELECT * FROM staging.ordersQuery Optimization
查询优化技巧
| Technique | Impact |
|---|---|
| Partitioning | Reduce scanned data |
| Clustering | Improve filter speed |
| Materialization | Pre-compute joins |
| Caching | Reduce repeat queries |
| 技巧 | 效果 |
|---|---|
| 分区 | 减少扫描数据量 |
| 聚类 | 提升过滤速度 |
| 物化视图 | 预计算关联结果 |
| 缓存 | 减少重复查询 |
Monitoring
监控
Pipeline Metrics
管道指标
| Metric | Alert Threshold |
|---|---|
| Runtime | >2x normal |
| Row count | ±20% variance |
| Freshness | >SLA |
| Failures | Any failure |
| 指标 | 告警阈值 |
|---|---|
| 运行时长 | 超过正常时长2倍 |
| 数据行数 | 波动超过±20% |
| 数据新鲜度 | 超出服务水平协议(SLA) |
| 失败次数 | 任何失败情况 |
Data Observability
数据可观测性
yaml
undefinedyaml
undefinedMonte Carlo / Elementary example
Monte Carlo / Elementary example
monitors:
- table: fct_orders
tests:
- freshness: threshold: 6 hours
- volume: threshold: 10%
- schema_change: true
undefinedmonitors:
- table: fct_orders
tests:
- freshness: threshold: 6 hours
- volume: threshold: 10%
- schema_change: true
undefinedBest Practices
最佳实践
Pipeline Design
管道设计
- Idempotent operations
- Incremental processing
- Clear data lineage
- Automated testing
- 幂等操作
- 增量处理
- 清晰的数据血缘
- 自动化测试
Data Governance
数据治理
- Document all models
- Track data lineage
- Implement access controls
- Version control SQL
- 为所有模型添加文档
- 追踪数据血缘
- 实现访问控制
- 对SQL进行版本控制
Cost Management
成本管理
- Monitor query costs
- Use partitioning
- Schedule off-peak
- Archive old data
- 监控查询成本
- 使用分区策略
- 非高峰时段调度
- 归档历史数据