data-engineering

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese
<!-- Adapted from: claude-skills/engineering-team/senior-data-engineer -->
<!-- 改编自:claude-skills/engineering-team/senior-data-engineer -->

Data Engineering Guide

数据工程指南

Data pipelines, warehousing, and modern data stack.
数据管道、数据仓库与现代数据栈。

When to Use

使用场景

  • Building data pipelines
  • Designing data warehouses
  • Implementing ETL/ELT processes
  • Setting up data lakes
  • Optimizing data infrastructure
  • 构建数据管道
  • 设计数据仓库
  • 实现ETL/ELT流程
  • 搭建数据湖
  • 优化数据基础设施

Modern Data Stack

现代数据栈

Components

组件

Sources → Ingestion → Storage → Transform → Serve → Consume
LayerTools
IngestionFivetran, Airbyte, Stitch
StorageS3, GCS, Snowflake, BigQuery
Transformdbt, Spark, Airflow
OrchestrationAirflow, Dagster, Prefect
ServingLooker, Tableau, Metabase
Sources → Ingestion → Storage → Transform → Serve → Consume
层级工具
数据采集Fivetran, Airbyte, Stitch
数据存储S3, GCS, Snowflake, BigQuery
数据转换dbt, Spark, Airflow
任务编排Airflow, Dagster, Prefect
数据服务Looker, Tableau, Metabase

Data Pipeline Patterns

数据管道模式

Batch Processing

批处理

python
undefined
python
undefined

Airflow DAG example

Airflow DAG example

from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime
dag = DAG( 'daily_etl', schedule_interval='0 6 * * *', start_date=datetime(2024, 1, 1) )
def extract(): # Extract from source pass
def transform(): # Transform data pass
def load(): # Load to warehouse pass
extract_task = PythonOperator( task_id='extract', python_callable=extract, dag=dag )
transform_task = PythonOperator( task_id='transform', python_callable=transform, dag=dag )
load_task = PythonOperator( task_id='load', python_callable=load, dag=dag )
extract_task >> transform_task >> load_task
undefined
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime
dag = DAG( 'daily_etl', schedule_interval='0 6 * * *', start_date=datetime(2024, 1, 1) )
def extract(): # Extract from source pass
def transform(): # Transform data pass
def load(): # Load to warehouse pass
extract_task = PythonOperator( task_id='extract', python_callable=extract, dag=dag )
transform_task = PythonOperator( task_id='transform', python_callable=transform, dag=dag )
load_task = PythonOperator( task_id='load', python_callable=load, dag=dag )
extract_task >> transform_task >> load_task
undefined

Streaming Processing

流处理

python
undefined
python
undefined

Kafka consumer example

Kafka consumer example

from kafka import KafkaConsumer import json
consumer = KafkaConsumer( 'events', bootstrap_servers=['localhost:9092'], value_deserializer=lambda x: json.loads(x.decode('utf-8')) )
for message in consumer: process_event(message.value)
undefined
from kafka import KafkaConsumer import json
consumer = KafkaConsumer( 'events', bootstrap_servers=['localhost:9092'], value_deserializer=lambda x: json.loads(x.decode('utf-8')) )
for message in consumer: process_event(message.value)
undefined

dbt Patterns

dbt模式

Model Structure

模型结构

models/
├── staging/           # 1:1 with source
│   ├── stg_orders.sql
│   └── stg_customers.sql
├── intermediate/      # Business logic
│   └── int_order_items.sql
└── marts/             # Final models
    ├── dim_customers.sql
    └── fct_orders.sql
models/
├── staging/           # 与源数据1:1映射
│   ├── stg_orders.sql
│   └── stg_customers.sql
├── intermediate/      # 业务逻辑层
│   └── int_order_items.sql
└── marts/             # 最终模型层
    ├── dim_customers.sql
    └── fct_orders.sql

Example Model

示例模型

sql
-- models/marts/fct_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_id'
    )
}}

select
    o.order_id,
    o.customer_id,
    o.order_date,
    sum(oi.quantity * oi.unit_price) as order_total
from {{ ref('stg_orders') }} o
join {{ ref('stg_order_items') }} oi
    on o.order_id = oi.order_id
{% if is_incremental() %}
where o.order_date > (select max(order_date) from {{ this }})
{% endif %}
group by 1, 2, 3
sql
-- models/marts/fct_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_id'
    )
}}

select
    o.order_id,
    o.customer_id,
    o.order_date,
    sum(oi.quantity * oi.unit_price) as order_total
from {{ ref('stg_orders') }} o
join {{ ref('stg_order_items') }} oi
    on o.order_id = oi.order_id
{% if is_incremental() %}
where o.order_date > (select max(order_date) from {{ this }})
{% endif %}
group by 1, 2, 3

Data Modeling

数据建模

Dimensional Modeling

维度建模

Fact Tables (events/transactions)
├── fct_orders
├── fct_page_views
└── fct_transactions

Dimension Tables (context)
├── dim_customers
├── dim_products
├── dim_dates
└── dim_locations
事实表(事件/交易)
├── fct_orders
├── fct_page_views
└── fct_transactions

维度表(上下文信息)
├── dim_customers
├── dim_products
├── dim_dates
└── dim_locations

Star Schema

星型架构

        dim_customers
dim_dates ── fct_orders ── dim_products
        dim_locations
        dim_customers
dim_dates ── fct_orders ── dim_products
        dim_locations

Data Quality

数据质量

Validation Rules

验证规则

sql
-- dbt tests
models:
  - name: fct_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: order_total
        tests:
          - not_null
          - positive_value
sql
-- dbt tests
models:
  - name: fct_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: order_total
        tests:
          - not_null
          - positive_value

Quality Metrics

质量指标

MetricDescription
Completeness% non-null values
Uniqueness% distinct values
TimelinessData freshness
AccuracyMatches source
ConsistencyAcross systems
指标描述
完整性非空值占比
唯一性唯一值占比
及时性数据新鲜度
准确性与源数据匹配度
一致性跨系统一致性

Performance Optimization

性能优化

Partitioning

分区策略

sql
-- BigQuery partitioned table
CREATE TABLE orders
PARTITION BY DATE(order_date)
CLUSTER BY customer_id
AS SELECT * FROM staging.orders
sql
-- BigQuery partitioned table
CREATE TABLE orders
PARTITION BY DATE(order_date)
CLUSTER BY customer_id
AS SELECT * FROM staging.orders

Query Optimization

查询优化技巧

TechniqueImpact
PartitioningReduce scanned data
ClusteringImprove filter speed
MaterializationPre-compute joins
CachingReduce repeat queries
技巧效果
分区减少扫描数据量
聚类提升过滤速度
物化视图预计算关联结果
缓存减少重复查询

Monitoring

监控

Pipeline Metrics

管道指标

MetricAlert Threshold
Runtime>2x normal
Row count±20% variance
Freshness>SLA
FailuresAny failure
指标告警阈值
运行时长超过正常时长2倍
数据行数波动超过±20%
数据新鲜度超出服务水平协议(SLA)
失败次数任何失败情况

Data Observability

数据可观测性

yaml
undefined
yaml
undefined

Monte Carlo / Elementary example

Monte Carlo / Elementary example

monitors:
  • table: fct_orders tests:
    • freshness: threshold: 6 hours
    • volume: threshold: 10%
    • schema_change: true
undefined
monitors:
  • table: fct_orders tests:
    • freshness: threshold: 6 hours
    • volume: threshold: 10%
    • schema_change: true
undefined

Best Practices

最佳实践

Pipeline Design

管道设计

  • Idempotent operations
  • Incremental processing
  • Clear data lineage
  • Automated testing
  • 幂等操作
  • 增量处理
  • 清晰的数据血缘
  • 自动化测试

Data Governance

数据治理

  • Document all models
  • Track data lineage
  • Implement access controls
  • Version control SQL
  • 为所有模型添加文档
  • 追踪数据血缘
  • 实现访问控制
  • 对SQL进行版本控制

Cost Management

成本管理

  • Monitor query costs
  • Use partitioning
  • Schedule off-peak
  • Archive old data
  • 监控查询成本
  • 使用分区策略
  • 非高峰时段调度
  • 归档历史数据