senior-data-engineer

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Senior Data Engineer

高级数据工程师

Production-grade data engineering skill for building scalable, reliable data systems.
用于构建可扩展、可靠数据系统的生产级数据工程技能。

Table of Contents

目录

Trigger Phrases

触发短语

Activate this skill when you see:
Pipeline Design:
  • "Design a data pipeline for..."
  • "Build an ETL/ELT process..."
  • "How should I ingest data from..."
  • "Set up data extraction from..."
Architecture:
  • "Should I use batch or streaming?"
  • "Lambda vs Kappa architecture"
  • "How to handle late-arriving data"
  • "Design a data lakehouse"
Data Modeling:
  • "Create a dimensional model..."
  • "Star schema vs snowflake"
  • "Implement slowly changing dimensions"
  • "Design a data vault"
Data Quality:
  • "Add data validation to..."
  • "Set up data quality checks"
  • "Monitor data freshness"
  • "Implement data contracts"
Performance:
  • "Optimize this Spark job"
  • "Query is running slow"
  • "Reduce pipeline execution time"
  • "Tune Airflow DAG"

当你看到以下内容时激活此技能:
管道设计:
  • "为……设计数据管道"
  • "构建ETL/ELT流程……"
  • "我应该如何从……摄取数据"
  • "搭建从……的数据提取流程"
架构设计:
  • "我应该使用批量处理还是流处理?"
  • "Lambda架构 vs Kappa架构"
  • "如何处理延迟到达的数据"
  • "设计数据湖仓"
数据建模:
  • "创建维度模型……"
  • "星型模型 vs 雪花模型"
  • "实现缓慢变化维度"
  • "设计数据仓库模型"
数据质量:
  • "为……添加数据验证"
  • "设置数据质量检查"
  • "监控数据新鲜度"
  • "实施数据契约"
性能优化:
  • "优化这个Spark作业"
  • "查询运行缓慢"
  • "减少管道执行时间"
  • "调优Airflow DAG"

Quick Start

快速开始

Core Tools

核心工具

bash
undefined
bash
undefined

Generate pipeline orchestration config

Generate pipeline orchestration config

python scripts/pipeline_orchestrator.py generate
--type airflow
--source postgres
--destination snowflake
--schedule "0 5 * * *"
python scripts/pipeline_orchestrator.py generate
--type airflow
--source postgres
--destination snowflake
--schedule "0 5 * * *"

Validate data quality

Validate data quality

python scripts/data_quality_validator.py validate
--input data/sales.parquet
--schema schemas/sales.json
--checks freshness,completeness,uniqueness
python scripts/data_quality_validator.py validate
--input data/sales.parquet
--schema schemas/sales.json
--checks freshness,completeness,uniqueness

Optimize ETL performance

Optimize ETL performance

python scripts/etl_performance_optimizer.py analyze
--query queries/daily_aggregation.sql
--engine spark
--recommend

---
python scripts/etl_performance_optimizer.py analyze
--query queries/daily_aggregation.sql
--engine spark
--recommend

---

Workflows

工作流

Workflow 1: Building a Batch ETL Pipeline

工作流1:构建批量ETL管道

Scenario: Extract data from PostgreSQL, transform with dbt, load to Snowflake.
场景: 从PostgreSQL提取数据,使用dbt进行转换,加载到Snowflake。

Step 1: Define Source Schema

步骤1:定义源表结构

sql
-- Document source tables
SELECT
    table_name,
    column_name,
    data_type,
    is_nullable
FROM information_schema.columns
WHERE table_schema = 'source_schema'
ORDER BY table_name, ordinal_position;
sql
-- Document source tables
SELECT
    table_name,
    column_name,
    data_type,
    is_nullable
FROM information_schema.columns
WHERE table_schema = 'source_schema'
ORDER BY table_name, ordinal_position;

Step 2: Generate Extraction Config

步骤2:生成提取配置

bash
python scripts/pipeline_orchestrator.py generate \
  --type airflow \
  --source postgres \
  --tables orders,customers,products \
  --mode incremental \
  --watermark updated_at \
  --output dags/extract_source.py
bash
python scripts/pipeline_orchestrator.py generate \
  --type airflow \
  --source postgres \
  --tables orders,customers,products \
  --mode incremental \
  --watermark updated_at \
  --output dags/extract_source.py

Step 3: Create dbt Models

步骤3:创建dbt模型

sql
-- models/staging/stg_orders.sql
WITH source AS (
    SELECT * FROM {{ source('postgres', 'orders') }}
),

renamed AS (
    SELECT
        order_id,
        customer_id,
        order_date,
        total_amount,
        status,
        _extracted_at
    FROM source
    WHERE order_date >= DATEADD(day, -3, CURRENT_DATE)
)

SELECT * FROM renamed
sql
-- models/marts/fct_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        cluster_by=['order_date']
    )
}}

SELECT
    o.order_id,
    o.customer_id,
    c.customer_segment,
    o.order_date,
    o.total_amount,
    o.status
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c
    ON o.customer_id = c.customer_id

{% if is_incremental() %}
WHERE o._extracted_at > (SELECT MAX(_extracted_at) FROM {{ this }})
{% endif %}
sql
-- models/staging/stg_orders.sql
WITH source AS (
    SELECT * FROM {{ source('postgres', 'orders') }}
),

renamed AS (
    SELECT
        order_id,
        customer_id,
        order_date,
        total_amount,
        status,
        _extracted_at
    FROM source
    WHERE order_date >= DATEADD(day, -3, CURRENT_DATE)
)

SELECT * FROM renamed
sql
-- models/marts/fct_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        cluster_by=['order_date']
    )
}}

SELECT
    o.order_id,
    o.customer_id,
    c.customer_segment,
    o.order_date,
    o.total_amount,
    o.status
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c
    ON o.customer_id = c.customer_id

{% if is_incremental() %}
WHERE o._extracted_at > (SELECT MAX(_extracted_at) FROM {{ this }})
{% endif %}

Step 4: Configure Data Quality Tests

步骤4:配置数据质量测试

yaml
undefined
yaml
undefined

models/marts/schema.yml

models/marts/schema.yml

version: 2
models:
  • name: fct_orders description: "Order fact table" columns:
    • name: order_id tests:
      • unique
      • not_null
    • name: total_amount tests:
      • not_null
      • dbt_utils.accepted_range: min_value: 0 max_value: 1000000
    • name: order_date tests:
      • not_null
      • dbt_utils.recency: datepart: day field: order_date interval: 1
undefined
version: 2
models:
  • name: fct_orders description: "Order fact table" columns:
    • name: order_id tests:
      • unique
      • not_null
    • name: total_amount tests:
      • not_null
      • dbt_utils.accepted_range: min_value: 0 max_value: 1000000
    • name: order_date tests:
      • not_null
      • dbt_utils.recency: datepart: day field: order_date interval: 1
undefined

Step 5: Create Airflow DAG

步骤5:创建Airflow DAG

python
undefined
python
undefined

dags/daily_etl.py

dags/daily_etl.py

from airflow import DAG from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.operators.bash import BashOperator from airflow.utils.dates import days_ago from datetime import timedelta
default_args = { 'owner': 'data-team', 'depends_on_past': False, 'email_on_failure': True, 'email': ['data-alerts@company.com'], 'retries': 2, 'retry_delay': timedelta(minutes=5), }
with DAG( 'daily_etl_pipeline', default_args=default_args, description='Daily ETL from PostgreSQL to Snowflake', schedule_interval='0 5 * * *', start_date=days_ago(1), catchup=False, tags=['etl', 'daily'], ) as dag:
extract = BashOperator(
    task_id='extract_source_data',
    bash_command='python /opt/airflow/scripts/extract.py --date {{ ds }}',
)

transform = BashOperator(
    task_id='run_dbt_models',
    bash_command='cd /opt/airflow/dbt && dbt run --select marts.*',
)

test = BashOperator(
    task_id='run_dbt_tests',
    bash_command='cd /opt/airflow/dbt && dbt test --select marts.*',
)

notify = BashOperator(
    task_id='send_notification',
    bash_command='python /opt/airflow/scripts/notify.py --status success',
    trigger_rule='all_success',
)

extract >> transform >> test >> notify
undefined
from airflow import DAG from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.operators.bash import BashOperator from airflow.utils.dates import days_ago from datetime import timedelta
default_args = { 'owner': 'data-team', 'depends_on_past': False, 'email_on_failure': True, 'email': ['data-alerts@company.com'], 'retries': 2, 'retry_delay': timedelta(minutes=5), }
with DAG( 'daily_etl_pipeline', default_args=default_args, description='Daily ETL from PostgreSQL to Snowflake', schedule_interval='0 5 * * *', start_date=days_ago(1), catchup=False, tags=['etl', 'daily'], ) as dag:
extract = BashOperator(
    task_id='extract_source_data',
    bash_command='python /opt/airflow/scripts/extract.py --date {{ ds }}',
)

transform = BashOperator(
    task_id='run_dbt_models',
    bash_command='cd /opt/airflow/dbt && dbt run --select marts.*',
)

test = BashOperator(
    task_id='run_dbt_tests',
    bash_command='cd /opt/airflow/dbt && dbt test --select marts.*',
)

notify = BashOperator(
    task_id='send_notification',
    bash_command='python /opt/airflow/scripts/notify.py --status success',
    trigger_rule='all_success',
)

extract >> transform >> test >> notify
undefined

Step 6: Validate Pipeline

步骤6:验证管道

bash
undefined
bash
undefined

Test locally

Test locally

dbt run --select stg_orders fct_orders dbt test --select fct_orders
dbt run --select stg_orders fct_orders dbt test --select fct_orders

Validate data quality

Validate data quality

python scripts/data_quality_validator.py validate
--table fct_orders
--checks all
--output reports/quality_report.json

---
python scripts/data_quality_validator.py validate
--table fct_orders
--checks all
--output reports/quality_report.json

---

Workflow 2: Implementing Real-Time Streaming

工作流2:实现实时流处理

Scenario: Stream events from Kafka, process with Flink/Spark Streaming, sink to data lake.
场景: 从Kafka流传输事件,使用Flink/Spark Streaming处理,写入数据湖。

Step 1: Define Event Schema

步骤1:定义事件结构

json
{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "UserEvent",
  "type": "object",
  "required": ["event_id", "user_id", "event_type", "timestamp"],
  "properties": {
    "event_id": {"type": "string", "format": "uuid"},
    "user_id": {"type": "string"},
    "event_type": {"type": "string", "enum": ["page_view", "click", "purchase"]},
    "timestamp": {"type": "string", "format": "date-time"},
    "properties": {"type": "object"}
  }
}
json
{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "UserEvent",
  "type": "object",
  "required": ["event_id", "user_id", "event_type", "timestamp"],
  "properties": {
    "event_id": {"type": "string", "format": "uuid"},
    "user_id": {"type": "string"},
    "event_type": {"type": "string", "enum": ["page_view", "click", "purchase"]},
    "timestamp": {"type": "string", "format": "date-time"},
    "properties": {"type": "object"}
  }
}

Step 2: Create Kafka Topic

步骤2:创建Kafka主题

bash
undefined
bash
undefined

Create topic with appropriate partitions

Create topic with appropriate partitions

kafka-topics.sh --create
--bootstrap-server localhost:9092
--topic user-events
--partitions 12
--replication-factor 3
--config retention.ms=604800000
--config cleanup.policy=delete
kafka-topics.sh --create
--bootstrap-server localhost:9092
--topic user-events
--partitions 12
--replication-factor 3
--config retention.ms=604800000
--config cleanup.policy=delete

Verify topic

Verify topic

kafka-topics.sh --describe
--bootstrap-server localhost:9092
--topic user-events
undefined
kafka-topics.sh --describe
--bootstrap-server localhost:9092
--topic user-events
undefined

Step 3: Implement Spark Streaming Job

步骤3:实现Spark Streaming作业

python
undefined
python
undefined

streaming/user_events_processor.py

streaming/user_events_processor.py

from pyspark.sql import SparkSession from pyspark.sql.functions import ( from_json, col, window, count, avg, to_timestamp, current_timestamp ) from pyspark.sql.types import ( StructType, StructField, StringType, TimestampType, MapType )
from pyspark.sql import SparkSession from pyspark.sql.functions import ( from_json, col, window, count, avg, to_timestamp, current_timestamp ) from pyspark.sql.types import ( StructType, StructField, StringType, TimestampType, MapType )

Initialize Spark

Initialize Spark

spark = SparkSession.builder
.appName("UserEventsProcessor")
.config("spark.sql.streaming.checkpointLocation", "/checkpoints/user-events")
.config("spark.sql.shuffle.partitions", "12")
.getOrCreate()
spark = SparkSession.builder
.appName("UserEventsProcessor")
.config("spark.sql.streaming.checkpointLocation", "/checkpoints/user-events")
.config("spark.sql.shuffle.partitions", "12")
.getOrCreate()

Define schema

Define schema

event_schema = StructType([ StructField("event_id", StringType(), False), StructField("user_id", StringType(), False), StructField("event_type", StringType(), False), StructField("timestamp", StringType(), False), StructField("properties", MapType(StringType(), StringType()), True) ])
event_schema = StructType([ StructField("event_id", StringType(), False), StructField("user_id", StringType(), False), StructField("event_type", StringType(), False), StructField("timestamp", StringType(), False), StructField("properties", MapType(StringType(), StringType()), True) ])

Read from Kafka

Read from Kafka

events_df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
events_df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()

Parse JSON

Parse JSON

parsed_df = events_df
.select(from_json(col("value").cast("string"), event_schema).alias("data"))
.select("data.*")
.withColumn("event_timestamp", to_timestamp(col("timestamp")))
parsed_df = events_df
.select(from_json(col("value").cast("string"), event_schema).alias("data"))
.select("data.*")
.withColumn("event_timestamp", to_timestamp(col("timestamp")))

Windowed aggregation

Windowed aggregation

aggregated_df = parsed_df
.withWatermark("event_timestamp", "10 minutes")
.groupBy( window(col("event_timestamp"), "5 minutes"), col("event_type") )
.agg( count("*").alias("event_count"), approx_count_distinct("user_id").alias("unique_users") )
aggregated_df = parsed_df
.withWatermark("event_timestamp", "10 minutes")
.groupBy( window(col("event_timestamp"), "5 minutes"), col("event_type") )
.agg( count("*").alias("event_count"), approx_count_distinct("user_id").alias("unique_users") )

Write to Delta Lake

Write to Delta Lake

query = aggregated_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/user-events-aggregated")
.option("path", "/data/lake/user_events_aggregated")
.trigger(processingTime="1 minute")
.start()
query.awaitTermination()
undefined
query = aggregated_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/user-events-aggregated")
.option("path", "/data/lake/user_events_aggregated")
.trigger(processingTime="1 minute")
.start()
query.awaitTermination()
undefined

Step 4: Handle Late Data and Errors

步骤4:处理延迟数据与错误

python
undefined
python
undefined

Dead letter queue for failed records

Dead letter queue for failed records

from pyspark.sql.functions import current_timestamp, lit
def process_with_error_handling(batch_df, batch_id): try: # Attempt processing valid_df = batch_df.filter(col("event_id").isNotNull()) invalid_df = batch_df.filter(col("event_id").isNull())
    # Write valid records
    valid_df.write \
        .format("delta") \
        .mode("append") \
        .save("/data/lake/user_events")

    # Write invalid to DLQ
    if invalid_df.count() > 0:
        invalid_df \
            .withColumn("error_timestamp", current_timestamp()) \
            .withColumn("error_reason", lit("missing_event_id")) \
            .write \
            .format("delta") \
            .mode("append") \
            .save("/data/lake/dlq/user_events")

except Exception as e:
    # Log error, alert, continue
    logger.error(f"Batch {batch_id} failed: {e}")
    raise
from pyspark.sql.functions import current_timestamp, lit
def process_with_error_handling(batch_df, batch_id): try: # Attempt processing valid_df = batch_df.filter(col("event_id").isNotNull()) invalid_df = batch_df.filter(col("event_id").isNull())
    # Write valid records
    valid_df.write \
        .format("delta") \
        .mode("append") \
        .save("/data/lake/user_events")

    # Write invalid to DLQ
    if invalid_df.count() > 0:
        invalid_df \
            .withColumn("error_timestamp", current_timestamp()) \
            .withColumn("error_reason", lit("missing_event_id")) \
            .write \
            .format("delta") \
            .mode("append") \
            .save("/data/lake/dlq/user_events")

except Exception as e:
    # Log error, alert, continue
    logger.error(f"Batch {batch_id} failed: {e}")
    raise

Use foreachBatch for custom processing

Use foreachBatch for custom processing

query = parsed_df.writeStream
.foreachBatch(process_with_error_handling)
.option("checkpointLocation", "/checkpoints/user-events")
.start()
undefined
query = parsed_df.writeStream
.foreachBatch(process_with_error_handling)
.option("checkpointLocation", "/checkpoints/user-events")
.start()
undefined

Step 5: Monitor Stream Health

步骤5:监控流处理健康状态

python
undefined
python
undefined

monitoring/stream_metrics.py

monitoring/stream_metrics.py

from prometheus_client import Gauge, Counter, start_http_server
from prometheus_client import Gauge, Counter, start_http_server

Define metrics

Define metrics

RECORDS_PROCESSED = Counter( 'stream_records_processed_total', 'Total records processed', ['stream_name', 'status'] )
PROCESSING_LAG = Gauge( 'stream_processing_lag_seconds', 'Current processing lag', ['stream_name'] )
BATCH_DURATION = Gauge( 'stream_batch_duration_seconds', 'Last batch processing duration', ['stream_name'] )
def emit_metrics(query): """Emit Prometheus metrics from streaming query.""" progress = query.lastProgress if progress: RECORDS_PROCESSED.labels( stream_name='user-events', status='success' ).inc(progress['numInputRows'])
    if progress['sources']:
        # Calculate lag from latest offset
        for source in progress['sources']:
            end_offset = source.get('endOffset', {})
            # Parse Kafka offsets and calculate lag

---
RECORDS_PROCESSED = Counter( 'stream_records_processed_total', 'Total records processed', ['stream_name', 'status'] )
PROCESSING_LAG = Gauge( 'stream_processing_lag_seconds', 'Current processing lag', ['stream_name'] )
BATCH_DURATION = Gauge( 'stream_batch_duration_seconds', 'Last batch processing duration', ['stream_name'] )
def emit_metrics(query): """Emit Prometheus metrics from streaming query.""" progress = query.lastProgress if progress: RECORDS_PROCESSED.labels( stream_name='user-events', status='success' ).inc(progress['numInputRows'])
    if progress['sources']:
        # Calculate lag from latest offset
        for source in progress['sources']:
            end_offset = source.get('endOffset', {})
            # Parse Kafka offsets and calculate lag

---

Workflow 3: Data Quality Framework Setup

工作流3:数据质量框架搭建

Scenario: Implement comprehensive data quality monitoring with Great Expectations.
场景: 使用Great Expectations实现全面的数据质量监控。

Step 1: Initialize Great Expectations

步骤1:初始化Great Expectations

bash
undefined
bash
undefined

Install and initialize

Install and initialize

pip install great_expectations
great_expectations init
pip install great_expectations
great_expectations init

Connect to data source

Connect to data source

great_expectations datasource new
undefined
great_expectations datasource new
undefined

Step 2: Create Expectation Suite

步骤2:创建期望套件

python
undefined
python
undefined

expectations/orders_suite.py

expectations/orders_suite.py

import great_expectations as gx
context = gx.get_context()
import great_expectations as gx
context = gx.get_context()

Create expectation suite

Create expectation suite

suite = context.add_expectation_suite("orders_quality_suite")
suite = context.add_expectation_suite("orders_quality_suite")

Add expectations

Add expectations

validator = context.get_validator( batch_request={ "datasource_name": "warehouse", "data_asset_name": "orders", }, expectation_suite_name="orders_quality_suite" )
validator = context.get_validator( batch_request={ "datasource_name": "warehouse", "data_asset_name": "orders", }, expectation_suite_name="orders_quality_suite" )

Schema expectations

Schema expectations

validator.expect_table_columns_to_match_ordered_list( column_list=[ "order_id", "customer_id", "order_date", "total_amount", "status", "created_at" ] )
validator.expect_table_columns_to_match_ordered_list( column_list=[ "order_id", "customer_id", "order_date", "total_amount", "status", "created_at" ] )

Completeness expectations

Completeness expectations

validator.expect_column_values_to_not_be_null("order_id") validator.expect_column_values_to_not_be_null("customer_id") validator.expect_column_values_to_not_be_null("order_date")
validator.expect_column_values_to_not_be_null("order_id") validator.expect_column_values_to_not_be_null("customer_id") validator.expect_column_values_to_not_be_null("order_date")

Uniqueness expectations

Uniqueness expectations

validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_unique("order_id")

Range expectations

Range expectations

validator.expect_column_values_to_be_between( "total_amount", min_value=0, max_value=1000000 )
validator.expect_column_values_to_be_between( "total_amount", min_value=0, max_value=1000000 )

Categorical expectations

Categorical expectations

validator.expect_column_values_to_be_in_set( "status", ["pending", "confirmed", "shipped", "delivered", "cancelled"] )
validator.expect_column_values_to_be_in_set( "status", ["pending", "confirmed", "shipped", "delivered", "cancelled"] )

Freshness expectation

Freshness expectation

validator.expect_column_max_to_be_between( "order_date", min_value={"$PARAMETER": "now - timedelta(days=1)"}, max_value={"$PARAMETER": "now"} )
validator.expect_column_max_to_be_between( "order_date", min_value={"$PARAMETER": "now - timedelta(days=1)"}, max_value={"$PARAMETER": "now"} )

Referential integrity

Referential integrity

validator.expect_column_values_to_be_in_set( "customer_id", value_set={"$PARAMETER": "valid_customer_ids"} )
validator.save_expectation_suite(discard_failed_expectations=False)
undefined
validator.expect_column_values_to_be_in_set( "customer_id", value_set={"$PARAMETER": "valid_customer_ids"} )
validator.save_expectation_suite(discard_failed_expectations=False)
undefined

Step 3: Create Data Quality Checks with dbt

步骤3:使用dbt创建数据质量检查

yaml
undefined
yaml
undefined

models/marts/schema.yml

models/marts/schema.yml

version: 2
models:
  • name: fct_orders description: "Order fact table with data quality checks"
    tests:

    Row count check

    • dbt_utils.equal_rowcount: compare_model: ref('stg_orders')

    Freshness check

    • dbt_utils.recency: datepart: hour field: created_at interval: 24
    columns:
    • name: order_id description: "Unique order identifier" tests:
      • unique
      • not_null
      • relationships: to: ref('dim_orders') field: order_id
    • name: total_amount tests:
      • not_null
      • dbt_utils.accepted_range: min_value: 0 max_value: 1000000 inclusive: true
      • dbt_expectations.expect_column_values_to_be_between: min_value: 0 row_condition: "status != 'cancelled'"
    • name: customer_id tests:
      • not_null
      • relationships: to: ref('dim_customers') field: customer_id severity: warn
undefined
version: 2
models:
  • name: fct_orders description: "Order fact table with data quality checks"
    tests:

    Row count check

    • dbt_utils.equal_rowcount: compare_model: ref('stg_orders')

    Freshness check

    • dbt_utils.recency: datepart: hour field: created_at interval: 24
    columns:
    • name: order_id description: "Unique order identifier" tests:
      • unique
      • not_null
      • relationships: to: ref('dim_orders') field: order_id
    • name: total_amount tests:
      • not_null
      • dbt_utils.accepted_range: min_value: 0 max_value: 1000000 inclusive: true
      • dbt_expectations.expect_column_values_to_be_between: min_value: 0 row_condition: "status != 'cancelled'"
    • name: customer_id tests:
      • not_null
      • relationships: to: ref('dim_customers') field: customer_id severity: warn
undefined

Step 4: Implement Data Contracts

步骤4:实现数据契约

yaml
undefined
yaml
undefined

contracts/orders_contract.yaml

contracts/orders_contract.yaml

contract: name: orders_data_contract version: "1.0.0" owner: data-team@company.com
schema: type: object properties: order_id: type: string format: uuid description: "Unique order identifier" customer_id: type: string not_null: true order_date: type: date not_null: true total_amount: type: decimal precision: 10 scale: 2 minimum: 0 status: type: string enum: ["pending", "confirmed", "shipped", "delivered", "cancelled"]
sla: freshness: max_delay_hours: 1 completeness: min_percentage: 99.9 accuracy: duplicate_tolerance: 0.01
consumers:
  • name: analytics-team usage: "Daily reporting dashboards"
  • name: ml-team usage: "Churn prediction model"
undefined
contract: name: orders_data_contract version: "1.0.0" owner: data-team@company.com
schema: type: object properties: order_id: type: string format: uuid description: "Unique order identifier" customer_id: type: string not_null: true order_date: type: date not_null: true total_amount: type: decimal precision: 10 scale: 2 minimum: 0 status: type: string enum: ["pending", "confirmed", "shipped", "delivered", "cancelled"]
sla: freshness: max_delay_hours: 1 completeness: min_percentage: 99.9 accuracy: duplicate_tolerance: 0.01
consumers:
  • name: analytics-team usage: "Daily reporting dashboards"
  • name: ml-team usage: "Churn prediction model"
undefined

Step 5: Set Up Quality Monitoring Dashboard

步骤5:搭建质量监控仪表盘

python
undefined
python
undefined

monitoring/quality_dashboard.py

monitoring/quality_dashboard.py

from datetime import datetime, timedelta import pandas as pd
def generate_quality_report(connection, table_name: str) -> dict: """Generate comprehensive data quality report."""
report = {
    "table": table_name,
    "timestamp": datetime.now().isoformat(),
    "checks": {}
}

# Row count check
row_count = connection.execute(
    f"SELECT COUNT(*) FROM {table_name}"
).fetchone()[0]
report["checks"]["row_count"] = {
    "value": row_count,
    "status": "pass" if row_count > 0 else "fail"
}

# Freshness check
max_date = connection.execute(
    f"SELECT MAX(created_at) FROM {table_name}"
).fetchone()[0]
hours_old = (datetime.now() - max_date).total_seconds() / 3600
report["checks"]["freshness"] = {
    "max_timestamp": max_date.isoformat(),
    "hours_old": round(hours_old, 2),
    "status": "pass" if hours_old < 24 else "fail"
}

# Null rate check
null_query = f"""
SELECT
    SUM(CASE WHEN order_id IS NULL THEN 1 ELSE 0 END) as null_order_id,
    SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) as null_customer_id,
    COUNT(*) as total
FROM {table_name}
"""
null_result = connection.execute(null_query).fetchone()
report["checks"]["null_rates"] = {
    "order_id": null_result[0] / null_result[2] if null_result[2] > 0 else 0,
    "customer_id": null_result[1] / null_result[2] if null_result[2] > 0 else 0,
    "status": "pass" if null_result[0] == 0 and null_result[1] == 0 else "fail"
}

# Duplicate check
dup_query = f"""
SELECT COUNT(*) - COUNT(DISTINCT order_id) as duplicates
FROM {table_name}
"""
duplicates = connection.execute(dup_query).fetchone()[0]
report["checks"]["duplicates"] = {
    "count": duplicates,
    "status": "pass" if duplicates == 0 else "fail"
}

# Overall status
all_passed = all(
    check["status"] == "pass"
    for check in report["checks"].values()
)
report["overall_status"] = "pass" if all_passed else "fail"

return report

---
from datetime import datetime, timedelta import pandas as pd
def generate_quality_report(connection, table_name: str) -> dict: """Generate comprehensive data quality report."""
report = {
    "table": table_name,
    "timestamp": datetime.now().isoformat(),
    "checks": {}
}

# Row count check
row_count = connection.execute(
    f"SELECT COUNT(*) FROM {table_name}"
).fetchone()[0]
report["checks"]["row_count"] = {
    "value": row_count,
    "status": "pass" if row_count > 0 else "fail"
}

# Freshness check
max_date = connection.execute(
    f"SELECT MAX(created_at) FROM {table_name}"
).fetchone()[0]
hours_old = (datetime.now() - max_date).total_seconds() / 3600
report["checks"]["freshness"] = {
    "max_timestamp": max_date.isoformat(),
    "hours_old": round(hours_old, 2),
    "status": "pass" if hours_old < 24 else "fail"
}

# Null rate check
null_query = f"""
SELECT
    SUM(CASE WHEN order_id IS NULL THEN 1 ELSE 0 END) as null_order_id,
    SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) as null_customer_id,
    COUNT(*) as total
FROM {table_name}
"""
null_result = connection.execute(null_query).fetchone()
report["checks"]["null_rates"] = {
    "order_id": null_result[0] / null_result[2] if null_result[2] > 0 else 0,
    "customer_id": null_result[1] / null_result[2] if null_result[2] > 0 else 0,
    "status": "pass" if null_result[0] == 0 and null_result[1] == 0 else "fail"
}

# Duplicate check
dup_query = f"""
SELECT COUNT(*) - COUNT(DISTINCT order_id) as duplicates
FROM {table_name}
"""
duplicates = connection.execute(dup_query).fetchone()[0]
report["checks"]["duplicates"] = {
    "count": duplicates,
    "status": "pass" if duplicates == 0 else "fail"
}

# Overall status
all_passed = all(
    check["status"] == "pass"
    for check in report["checks"].values()
)
report["overall_status"] = "pass" if all_passed else "fail"

return report

---

Architecture Decision Framework

架构决策框架

Use this framework to choose the right approach for your data pipeline.
使用此框架为你的数据管道选择合适的方案。

Batch vs Streaming

批量处理 vs 流处理

CriteriaBatchStreaming
Latency requirementHours to daysSeconds to minutes
Data volumeLarge historical datasetsContinuous event streams
Processing complexityComplex transformations, MLSimple aggregations, filtering
Cost sensitivityMore cost-effectiveHigher infrastructure cost
Error handlingEasier to reprocessRequires careful design
Decision Tree:
Is real-time insight required?
├── Yes → Use streaming
│   └── Is exactly-once semantics needed?
│       ├── Yes → Kafka + Flink/Spark Structured Streaming
│       └── No → Kafka + consumer groups
└── No → Use batch
    └── Is data volume > 1TB daily?
        ├── Yes → Spark/Databricks
        └── No → dbt + warehouse compute
评估标准批量处理流处理
延迟要求数小时到数天数秒到数分钟
数据量大型历史数据集持续事件流
处理复杂度复杂转换、机器学习简单聚合、过滤
成本敏感度更具成本效益基础设施成本更高
错误处理易于重新处理需要精心设计
决策树:
是否需要实时洞察?
├── 是 → 使用流处理
│   └── 是否需要精确一次语义?
│       ├── 是 → Kafka + Flink/Spark Structured Streaming
│       └── 否 → Kafka + 消费者组
└── 否 → 使用批量处理
    └── 每日数据量是否超过1TB?
        ├── 是 → Spark/Databricks
        └── 否 → dbt + 数据仓库计算资源

Lambda vs Kappa Architecture

Lambda vs Kappa架构

AspectLambdaKappa
ComplexityTwo codebases (batch + stream)Single codebase
MaintenanceHigher (sync batch/stream logic)Lower
ReprocessingNative batch layerReplay from source
Use caseML training + real-time servingPure event-driven
When to choose Lambda:
  • Need to train ML models on historical data
  • Complex batch transformations not feasible in streaming
  • Existing batch infrastructure
When to choose Kappa:
  • Event-sourced architecture
  • All processing can be expressed as stream operations
  • Starting fresh without legacy systems
评估维度LambdaKappa
复杂度两个代码库(批量+流处理)单个代码库
维护成本更高(同步批量/流处理逻辑)更低
重处理能力原生批量层支持从源端重放数据
适用场景机器学习训练 + 实时服务纯事件驱动场景
何时选择Lambda:
  • 需要基于历史数据训练机器学习模型
  • 复杂批量转换无法在流处理中实现
  • 已有批量处理基础设施
何时选择Kappa:
  • 事件源架构
  • 所有处理逻辑都可通过流操作实现
  • 从零开始搭建,无遗留系统

Data Warehouse vs Data Lakehouse

数据仓库 vs 数据湖仓

FeatureWarehouse (Snowflake/BigQuery)Lakehouse (Delta/Iceberg)
Best forBI, SQL analyticsML, unstructured data
Storage costHigher (proprietary format)Lower (open formats)
FlexibilitySchema-on-writeSchema-on-read
PerformanceExcellent for SQLGood, improving
EcosystemMature BI toolsGrowing ML tooling

特性数据仓库(Snowflake/BigQuery)数据湖仓(Delta/Iceberg)
最佳适用场景商业智能、SQL分析机器学习、非结构化数据
存储成本更高(专有格式)更低(开放格式)
灵活性写时模式读时模式
性能SQL查询性能优异良好且持续提升
生态系统成熟的BI工具链不断增长的机器学习工具支持

Tech Stack

技术栈

CategoryTechnologies
LanguagesPython, SQL, Scala
OrchestrationAirflow, Prefect, Dagster
Transformationdbt, Spark, Flink
StreamingKafka, Kinesis, Pub/Sub
StorageS3, GCS, Delta Lake, Iceberg
WarehousesSnowflake, BigQuery, Redshift, Databricks
QualityGreat Expectations, dbt tests, Monte Carlo
MonitoringPrometheus, Grafana, Datadog

分类技术选型
编程语言Python, SQL, Scala
编排工具Airflow, Prefect, Dagster
转换工具dbt, Spark, Flink
流处理工具Kafka, Kinesis, Pub/Sub
存储系统S3, GCS, Delta Lake, Iceberg
数据仓库Snowflake, BigQuery, Redshift, Databricks
数据质量工具Great Expectations, dbt tests, Monte Carlo
监控工具Prometheus, Grafana, Datadog

Reference Documentation

参考文档

1. Data Pipeline Architecture

1. 数据管道架构

See
references/data_pipeline_architecture.md
for:
  • Lambda vs Kappa architecture patterns
  • Batch processing with Spark and Airflow
  • Stream processing with Kafka and Flink
  • Exactly-once semantics implementation
  • Error handling and dead letter queues
查看
references/data_pipeline_architecture.md
获取:
  • Lambda vs Kappa架构模式
  • 基于Spark和Airflow的批量处理
  • 基于Kafka和Flink的流处理
  • 精确一次语义的实现
  • 错误处理与死信队列

2. Data Modeling Patterns

2. 数据建模模式

See
references/data_modeling_patterns.md
for:
  • Dimensional modeling (Star/Snowflake)
  • Slowly Changing Dimensions (SCD Types 1-6)
  • Data Vault modeling
  • dbt best practices
  • Partitioning and clustering
查看
references/data_modeling_patterns.md
获取:
  • 维度建模(星型/雪花模型)
  • 缓慢变化维度(SCD类型1-6)
  • 数据仓库模型
  • dbt最佳实践
  • 分区与聚类

3. DataOps Best Practices

3. DataOps最佳实践

See
references/dataops_best_practices.md
for:
  • Data testing frameworks
  • Data contracts and schema validation
  • CI/CD for data pipelines
  • Observability and lineage
  • Incident response

查看
references/dataops_best_practices.md
获取:
  • 数据测试框架
  • 数据契约与结构验证
  • 数据管道的CI/CD
  • 可观测性与数据血缘
  • 事件响应流程

Troubleshooting

故障排除

Pipeline Failures

管道故障

Symptom: Airflow DAG fails with timeout
Task exceeded max execution time
Solution:
  1. Check resource allocation
  2. Profile slow operations
  3. Add incremental processing
python
undefined
症状: Airflow DAG因超时失败
Task exceeded max execution time
解决方案:
  1. 检查资源分配
  2. 分析缓慢操作
  3. 添加增量处理逻辑
python
undefined

Increase timeout

Increase timeout

default_args = { 'execution_timeout': timedelta(hours=2), }
default_args = { 'execution_timeout': timedelta(hours=2), }

Or use incremental loads

Or use incremental loads

WHERE updated_at > '{{ prev_ds }}'

---

**Symptom:** Spark job OOM
java.lang.OutOfMemoryError: Java heap space

**Solution:**
1. Increase executor memory
2. Reduce partition size
3. Use disk spill
```python
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.memory.fraction", "0.8")

Symptom: Kafka consumer lag increasing
Consumer lag: 1000000 messages
Solution:
  1. Increase consumer parallelism
  2. Optimize processing logic
  3. Scale consumer group
bash
undefined
WHERE updated_at > '{{ prev_ds }}'

---

**症状:** Spark作业内存溢出
java.lang.OutOfMemoryError: Java heap space

**解决方案:**
1. 增加执行器内存
2. 减少分区大小
3. 使用磁盘溢出
```python
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.memory.fraction", "0.8")

症状: Kafka消费者延迟持续增加
Consumer lag: 1000000 messages
解决方案:
  1. 增加消费者并行度
  2. 优化处理逻辑
  3. 扩容消费者组
bash
undefined

Add more partitions

Add more partitions

kafka-topics.sh --alter
--bootstrap-server localhost:9092
--topic user-events
--partitions 24

---
kafka-topics.sh --alter
--bootstrap-server localhost:9092
--topic user-events
--partitions 24

---

Data Quality Issues

数据质量问题

Symptom: Duplicate records appearing
Expected unique, found 150 duplicates
Solution:
  1. Add deduplication logic
  2. Use merge/upsert operations
sql
-- dbt incremental with dedup
{{
    config(
        materialized='incremental',
        unique_key='order_id'
    )
}}

SELECT * FROM (
    SELECT
        *,
        ROW_NUMBER() OVER (
            PARTITION BY order_id
            ORDER BY updated_at DESC
        ) as rn
    FROM {{ source('raw', 'orders') }}
) WHERE rn = 1

Symptom: Stale data in tables
Last update: 3 days ago
Solution:
  1. Check upstream pipeline status
  2. Verify source availability
  3. Add freshness monitoring
yaml
undefined
症状: 出现重复记录
Expected unique, found 150 duplicates
解决方案:
  1. 添加去重逻辑
  2. 使用合并/更新操作
sql
-- dbt incremental with dedup
{{
    config(
        materialized='incremental',
        unique_key='order_id'
    )
}}

SELECT * FROM (
    SELECT
        *,
        ROW_NUMBER() OVER (
            PARTITION BY order_id
            ORDER BY updated_at DESC
        ) as rn
    FROM {{ source('raw', 'orders') }}
) WHERE rn = 1

症状: 表中数据过期
Last update: 3 days ago
解决方案:
  1. 检查上游管道状态
  2. 验证源端可用性
  3. 添加新鲜度监控
yaml
undefined

dbt freshness check

dbt freshness check

sources:
  • name: raw freshness: warn_after: {count: 12, period: hour} error_after: {count: 24, period: hour} loaded_at_field: _loaded_at

---

**Symptom:** Schema drift detected
Column 'new_field' not in expected schema

**Solution:**
1. Update data contract
2. Modify transformations
3. Communicate with producers
```python
sources:
  • name: raw freshness: warn_after: {count: 12, period: hour} error_after: {count: 24, period: hour} loaded_at_field: _loaded_at

---

**症状:** 检测到结构漂移
Column 'new_field' not in expected schema

**解决方案:**
1. 更新数据契约
2. 修改转换逻辑
3. 与数据生产者沟通
```python

Handle schema evolution

Handle schema evolution

df = spark.read.format("delta")
.option("mergeSchema", "true")
.load("/data/orders")

---
df = spark.read.format("delta")
.option("mergeSchema", "true")
.load("/data/orders")

---

Performance Issues

性能问题

Symptom: Query takes hours
Query runtime: 4 hours (expected: 30 minutes)
Solution:
  1. Check query plan
  2. Add proper partitioning
  3. Optimize joins
sql
-- Before: Full table scan
SELECT * FROM orders WHERE order_date = '2024-01-15';

-- After: Partition pruning
-- Table partitioned by order_date
SELECT * FROM orders WHERE order_date = '2024-01-15';

-- Add clustering for frequent filters
ALTER TABLE orders CLUSTER BY (customer_id);

Symptom: dbt model takes too long
Model fct_orders completed in 45 minutes
Solution:
  1. Use incremental materialization
  2. Reduce upstream dependencies
  3. Pre-aggregate where possible
sql
-- Convert to incremental
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        on_schema_change='sync_all_columns'
    )
}}

SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE _loaded_at > (SELECT MAX(_loaded_at) FROM {{ this }})
{% endif %}
症状: 查询耗时数小时
Query runtime: 4 hours (expected: 30 minutes)
解决方案:
  1. 查看查询计划
  2. 添加合适的分区
  3. 优化关联操作
sql
-- Before: Full table scan
SELECT * FROM orders WHERE order_date = '2024-01-15';

-- After: Partition pruning
-- Table partitioned by order_date
SELECT * FROM orders WHERE order_date = '2024-01-15';

-- Add clustering for frequent filters
ALTER TABLE orders CLUSTER BY (customer_id);

症状: dbt模型运行时间过长
Model fct_orders completed in 45 minutes
解决方案:
  1. 使用增量物化
  2. 减少上游依赖
  3. 尽可能预聚合
sql
-- Convert to incremental
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        on_schema_change='sync_all_columns'
    )
}}

SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE _loaded_at > (SELECT MAX(_loaded_at) FROM {{ this }})
{% endif %}