senior-data-engineer
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseSenior Data Engineer
高级数据工程师
Production-grade data engineering skill for building scalable, reliable data systems.
用于构建可扩展、可靠数据系统的生产级数据工程技能。
Table of Contents
目录
Trigger Phrases
触发短语
Activate this skill when you see:
Pipeline Design:
- "Design a data pipeline for..."
- "Build an ETL/ELT process..."
- "How should I ingest data from..."
- "Set up data extraction from..."
Architecture:
- "Should I use batch or streaming?"
- "Lambda vs Kappa architecture"
- "How to handle late-arriving data"
- "Design a data lakehouse"
Data Modeling:
- "Create a dimensional model..."
- "Star schema vs snowflake"
- "Implement slowly changing dimensions"
- "Design a data vault"
Data Quality:
- "Add data validation to..."
- "Set up data quality checks"
- "Monitor data freshness"
- "Implement data contracts"
Performance:
- "Optimize this Spark job"
- "Query is running slow"
- "Reduce pipeline execution time"
- "Tune Airflow DAG"
当你看到以下内容时激活此技能:
管道设计:
- "为……设计数据管道"
- "构建ETL/ELT流程……"
- "我应该如何从……摄取数据"
- "搭建从……的数据提取流程"
架构设计:
- "我应该使用批量处理还是流处理?"
- "Lambda架构 vs Kappa架构"
- "如何处理延迟到达的数据"
- "设计数据湖仓"
数据建模:
- "创建维度模型……"
- "星型模型 vs 雪花模型"
- "实现缓慢变化维度"
- "设计数据仓库模型"
数据质量:
- "为……添加数据验证"
- "设置数据质量检查"
- "监控数据新鲜度"
- "实施数据契约"
性能优化:
- "优化这个Spark作业"
- "查询运行缓慢"
- "减少管道执行时间"
- "调优Airflow DAG"
Quick Start
快速开始
Core Tools
核心工具
bash
undefinedbash
undefinedGenerate pipeline orchestration config
Generate pipeline orchestration config
python scripts/pipeline_orchestrator.py generate
--type airflow
--source postgres
--destination snowflake
--schedule "0 5 * * *"
--type airflow
--source postgres
--destination snowflake
--schedule "0 5 * * *"
python scripts/pipeline_orchestrator.py generate
--type airflow
--source postgres
--destination snowflake
--schedule "0 5 * * *"
--type airflow
--source postgres
--destination snowflake
--schedule "0 5 * * *"
Validate data quality
Validate data quality
python scripts/data_quality_validator.py validate
--input data/sales.parquet
--schema schemas/sales.json
--checks freshness,completeness,uniqueness
--input data/sales.parquet
--schema schemas/sales.json
--checks freshness,completeness,uniqueness
python scripts/data_quality_validator.py validate
--input data/sales.parquet
--schema schemas/sales.json
--checks freshness,completeness,uniqueness
--input data/sales.parquet
--schema schemas/sales.json
--checks freshness,completeness,uniqueness
Optimize ETL performance
Optimize ETL performance
python scripts/etl_performance_optimizer.py analyze
--query queries/daily_aggregation.sql
--engine spark
--recommend
--query queries/daily_aggregation.sql
--engine spark
--recommend
---python scripts/etl_performance_optimizer.py analyze
--query queries/daily_aggregation.sql
--engine spark
--recommend
--query queries/daily_aggregation.sql
--engine spark
--recommend
---Workflows
工作流
Workflow 1: Building a Batch ETL Pipeline
工作流1:构建批量ETL管道
Scenario: Extract data from PostgreSQL, transform with dbt, load to Snowflake.
场景: 从PostgreSQL提取数据,使用dbt进行转换,加载到Snowflake。
Step 1: Define Source Schema
步骤1:定义源表结构
sql
-- Document source tables
SELECT
table_name,
column_name,
data_type,
is_nullable
FROM information_schema.columns
WHERE table_schema = 'source_schema'
ORDER BY table_name, ordinal_position;sql
-- Document source tables
SELECT
table_name,
column_name,
data_type,
is_nullable
FROM information_schema.columns
WHERE table_schema = 'source_schema'
ORDER BY table_name, ordinal_position;Step 2: Generate Extraction Config
步骤2:生成提取配置
bash
python scripts/pipeline_orchestrator.py generate \
--type airflow \
--source postgres \
--tables orders,customers,products \
--mode incremental \
--watermark updated_at \
--output dags/extract_source.pybash
python scripts/pipeline_orchestrator.py generate \
--type airflow \
--source postgres \
--tables orders,customers,products \
--mode incremental \
--watermark updated_at \
--output dags/extract_source.pyStep 3: Create dbt Models
步骤3:创建dbt模型
sql
-- models/staging/stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('postgres', 'orders') }}
),
renamed AS (
SELECT
order_id,
customer_id,
order_date,
total_amount,
status,
_extracted_at
FROM source
WHERE order_date >= DATEADD(day, -3, CURRENT_DATE)
)
SELECT * FROM renamedsql
-- models/marts/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
cluster_by=['order_date']
)
}}
SELECT
o.order_id,
o.customer_id,
c.customer_segment,
o.order_date,
o.total_amount,
o.status
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c
ON o.customer_id = c.customer_id
{% if is_incremental() %}
WHERE o._extracted_at > (SELECT MAX(_extracted_at) FROM {{ this }})
{% endif %}sql
-- models/staging/stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('postgres', 'orders') }}
),
renamed AS (
SELECT
order_id,
customer_id,
order_date,
total_amount,
status,
_extracted_at
FROM source
WHERE order_date >= DATEADD(day, -3, CURRENT_DATE)
)
SELECT * FROM renamedsql
-- models/marts/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
cluster_by=['order_date']
)
}}
SELECT
o.order_id,
o.customer_id,
c.customer_segment,
o.order_date,
o.total_amount,
o.status
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c
ON o.customer_id = c.customer_id
{% if is_incremental() %}
WHERE o._extracted_at > (SELECT MAX(_extracted_at) FROM {{ this }})
{% endif %}Step 4: Configure Data Quality Tests
步骤4:配置数据质量测试
yaml
undefinedyaml
undefinedmodels/marts/schema.yml
models/marts/schema.yml
version: 2
models:
- name: fct_orders
description: "Order fact table"
columns:
- name: order_id
tests:
- unique
- not_null
- name: total_amount
tests:
- not_null
- dbt_utils.accepted_range: min_value: 0 max_value: 1000000
- name: order_date
tests:
- not_null
- dbt_utils.recency: datepart: day field: order_date interval: 1
- name: order_id
tests:
undefinedversion: 2
models:
- name: fct_orders
description: "Order fact table"
columns:
- name: order_id
tests:
- unique
- not_null
- name: total_amount
tests:
- not_null
- dbt_utils.accepted_range: min_value: 0 max_value: 1000000
- name: order_date
tests:
- not_null
- dbt_utils.recency: datepart: day field: order_date interval: 1
- name: order_id
tests:
undefinedStep 5: Create Airflow DAG
步骤5:创建Airflow DAG
python
undefinedpython
undefineddags/daily_etl.py
dags/daily_etl.py
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email': ['data-alerts@company.com'],
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'daily_etl_pipeline',
default_args=default_args,
description='Daily ETL from PostgreSQL to Snowflake',
schedule_interval='0 5 * * *',
start_date=days_ago(1),
catchup=False,
tags=['etl', 'daily'],
) as dag:
extract = BashOperator(
task_id='extract_source_data',
bash_command='python /opt/airflow/scripts/extract.py --date {{ ds }}',
)
transform = BashOperator(
task_id='run_dbt_models',
bash_command='cd /opt/airflow/dbt && dbt run --select marts.*',
)
test = BashOperator(
task_id='run_dbt_tests',
bash_command='cd /opt/airflow/dbt && dbt test --select marts.*',
)
notify = BashOperator(
task_id='send_notification',
bash_command='python /opt/airflow/scripts/notify.py --status success',
trigger_rule='all_success',
)
extract >> transform >> test >> notifyundefinedfrom airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email': ['data-alerts@company.com'],
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'daily_etl_pipeline',
default_args=default_args,
description='Daily ETL from PostgreSQL to Snowflake',
schedule_interval='0 5 * * *',
start_date=days_ago(1),
catchup=False,
tags=['etl', 'daily'],
) as dag:
extract = BashOperator(
task_id='extract_source_data',
bash_command='python /opt/airflow/scripts/extract.py --date {{ ds }}',
)
transform = BashOperator(
task_id='run_dbt_models',
bash_command='cd /opt/airflow/dbt && dbt run --select marts.*',
)
test = BashOperator(
task_id='run_dbt_tests',
bash_command='cd /opt/airflow/dbt && dbt test --select marts.*',
)
notify = BashOperator(
task_id='send_notification',
bash_command='python /opt/airflow/scripts/notify.py --status success',
trigger_rule='all_success',
)
extract >> transform >> test >> notifyundefinedStep 6: Validate Pipeline
步骤6:验证管道
bash
undefinedbash
undefinedTest locally
Test locally
dbt run --select stg_orders fct_orders
dbt test --select fct_orders
dbt run --select stg_orders fct_orders
dbt test --select fct_orders
Validate data quality
Validate data quality
python scripts/data_quality_validator.py validate
--table fct_orders
--checks all
--output reports/quality_report.json
--table fct_orders
--checks all
--output reports/quality_report.json
---python scripts/data_quality_validator.py validate
--table fct_orders
--checks all
--output reports/quality_report.json
--table fct_orders
--checks all
--output reports/quality_report.json
---Workflow 2: Implementing Real-Time Streaming
工作流2:实现实时流处理
Scenario: Stream events from Kafka, process with Flink/Spark Streaming, sink to data lake.
场景: 从Kafka流传输事件,使用Flink/Spark Streaming处理,写入数据湖。
Step 1: Define Event Schema
步骤1:定义事件结构
json
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "UserEvent",
"type": "object",
"required": ["event_id", "user_id", "event_type", "timestamp"],
"properties": {
"event_id": {"type": "string", "format": "uuid"},
"user_id": {"type": "string"},
"event_type": {"type": "string", "enum": ["page_view", "click", "purchase"]},
"timestamp": {"type": "string", "format": "date-time"},
"properties": {"type": "object"}
}
}json
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "UserEvent",
"type": "object",
"required": ["event_id", "user_id", "event_type", "timestamp"],
"properties": {
"event_id": {"type": "string", "format": "uuid"},
"user_id": {"type": "string"},
"event_type": {"type": "string", "enum": ["page_view", "click", "purchase"]},
"timestamp": {"type": "string", "format": "date-time"},
"properties": {"type": "object"}
}
}Step 2: Create Kafka Topic
步骤2:创建Kafka主题
bash
undefinedbash
undefinedCreate topic with appropriate partitions
Create topic with appropriate partitions
kafka-topics.sh --create
--bootstrap-server localhost:9092
--topic user-events
--partitions 12
--replication-factor 3
--config retention.ms=604800000
--config cleanup.policy=delete
--bootstrap-server localhost:9092
--topic user-events
--partitions 12
--replication-factor 3
--config retention.ms=604800000
--config cleanup.policy=delete
kafka-topics.sh --create
--bootstrap-server localhost:9092
--topic user-events
--partitions 12
--replication-factor 3
--config retention.ms=604800000
--config cleanup.policy=delete
--bootstrap-server localhost:9092
--topic user-events
--partitions 12
--replication-factor 3
--config retention.ms=604800000
--config cleanup.policy=delete
Verify topic
Verify topic
kafka-topics.sh --describe
--bootstrap-server localhost:9092
--topic user-events
--bootstrap-server localhost:9092
--topic user-events
undefinedkafka-topics.sh --describe
--bootstrap-server localhost:9092
--topic user-events
--bootstrap-server localhost:9092
--topic user-events
undefinedStep 3: Implement Spark Streaming Job
步骤3:实现Spark Streaming作业
python
undefinedpython
undefinedstreaming/user_events_processor.py
streaming/user_events_processor.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
from_json, col, window, count, avg,
to_timestamp, current_timestamp
)
from pyspark.sql.types import (
StructType, StructField, StringType,
TimestampType, MapType
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
from_json, col, window, count, avg,
to_timestamp, current_timestamp
)
from pyspark.sql.types import (
StructType, StructField, StringType,
TimestampType, MapType
)
Initialize Spark
Initialize Spark
spark = SparkSession.builder
.appName("UserEventsProcessor")
.config("spark.sql.streaming.checkpointLocation", "/checkpoints/user-events")
.config("spark.sql.shuffle.partitions", "12")
.getOrCreate()
.appName("UserEventsProcessor")
.config("spark.sql.streaming.checkpointLocation", "/checkpoints/user-events")
.config("spark.sql.shuffle.partitions", "12")
.getOrCreate()
spark = SparkSession.builder
.appName("UserEventsProcessor")
.config("spark.sql.streaming.checkpointLocation", "/checkpoints/user-events")
.config("spark.sql.shuffle.partitions", "12")
.getOrCreate()
.appName("UserEventsProcessor")
.config("spark.sql.streaming.checkpointLocation", "/checkpoints/user-events")
.config("spark.sql.shuffle.partitions", "12")
.getOrCreate()
Define schema
Define schema
event_schema = StructType([
StructField("event_id", StringType(), False),
StructField("user_id", StringType(), False),
StructField("event_type", StringType(), False),
StructField("timestamp", StringType(), False),
StructField("properties", MapType(StringType(), StringType()), True)
])
event_schema = StructType([
StructField("event_id", StringType(), False),
StructField("user_id", StringType(), False),
StructField("event_type", StringType(), False),
StructField("timestamp", StringType(), False),
StructField("properties", MapType(StringType(), StringType()), True)
])
Read from Kafka
Read from Kafka
events_df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
events_df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
Parse JSON
Parse JSON
parsed_df = events_df
.select(from_json(col("value").cast("string"), event_schema).alias("data"))
.select("data.*")
.withColumn("event_timestamp", to_timestamp(col("timestamp")))
.select(from_json(col("value").cast("string"), event_schema).alias("data"))
.select("data.*")
.withColumn("event_timestamp", to_timestamp(col("timestamp")))
parsed_df = events_df
.select(from_json(col("value").cast("string"), event_schema).alias("data"))
.select("data.*")
.withColumn("event_timestamp", to_timestamp(col("timestamp")))
.select(from_json(col("value").cast("string"), event_schema).alias("data"))
.select("data.*")
.withColumn("event_timestamp", to_timestamp(col("timestamp")))
Windowed aggregation
Windowed aggregation
aggregated_df = parsed_df
.withWatermark("event_timestamp", "10 minutes")
.groupBy( window(col("event_timestamp"), "5 minutes"), col("event_type") )
.agg( count("*").alias("event_count"), approx_count_distinct("user_id").alias("unique_users") )
.withWatermark("event_timestamp", "10 minutes")
.groupBy( window(col("event_timestamp"), "5 minutes"), col("event_type") )
.agg( count("*").alias("event_count"), approx_count_distinct("user_id").alias("unique_users") )
aggregated_df = parsed_df
.withWatermark("event_timestamp", "10 minutes")
.groupBy( window(col("event_timestamp"), "5 minutes"), col("event_type") )
.agg( count("*").alias("event_count"), approx_count_distinct("user_id").alias("unique_users") )
.withWatermark("event_timestamp", "10 minutes")
.groupBy( window(col("event_timestamp"), "5 minutes"), col("event_type") )
.agg( count("*").alias("event_count"), approx_count_distinct("user_id").alias("unique_users") )
Write to Delta Lake
Write to Delta Lake
query = aggregated_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/user-events-aggregated")
.option("path", "/data/lake/user_events_aggregated")
.trigger(processingTime="1 minute")
.start()
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/user-events-aggregated")
.option("path", "/data/lake/user_events_aggregated")
.trigger(processingTime="1 minute")
.start()
query.awaitTermination()
undefinedquery = aggregated_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/user-events-aggregated")
.option("path", "/data/lake/user_events_aggregated")
.trigger(processingTime="1 minute")
.start()
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/user-events-aggregated")
.option("path", "/data/lake/user_events_aggregated")
.trigger(processingTime="1 minute")
.start()
query.awaitTermination()
undefinedStep 4: Handle Late Data and Errors
步骤4:处理延迟数据与错误
python
undefinedpython
undefinedDead letter queue for failed records
Dead letter queue for failed records
from pyspark.sql.functions import current_timestamp, lit
def process_with_error_handling(batch_df, batch_id):
try:
# Attempt processing
valid_df = batch_df.filter(col("event_id").isNotNull())
invalid_df = batch_df.filter(col("event_id").isNull())
# Write valid records
valid_df.write \
.format("delta") \
.mode("append") \
.save("/data/lake/user_events")
# Write invalid to DLQ
if invalid_df.count() > 0:
invalid_df \
.withColumn("error_timestamp", current_timestamp()) \
.withColumn("error_reason", lit("missing_event_id")) \
.write \
.format("delta") \
.mode("append") \
.save("/data/lake/dlq/user_events")
except Exception as e:
# Log error, alert, continue
logger.error(f"Batch {batch_id} failed: {e}")
raisefrom pyspark.sql.functions import current_timestamp, lit
def process_with_error_handling(batch_df, batch_id):
try:
# Attempt processing
valid_df = batch_df.filter(col("event_id").isNotNull())
invalid_df = batch_df.filter(col("event_id").isNull())
# Write valid records
valid_df.write \
.format("delta") \
.mode("append") \
.save("/data/lake/user_events")
# Write invalid to DLQ
if invalid_df.count() > 0:
invalid_df \
.withColumn("error_timestamp", current_timestamp()) \
.withColumn("error_reason", lit("missing_event_id")) \
.write \
.format("delta") \
.mode("append") \
.save("/data/lake/dlq/user_events")
except Exception as e:
# Log error, alert, continue
logger.error(f"Batch {batch_id} failed: {e}")
raiseUse foreachBatch for custom processing
Use foreachBatch for custom processing
query = parsed_df.writeStream
.foreachBatch(process_with_error_handling)
.option("checkpointLocation", "/checkpoints/user-events")
.start()
.foreachBatch(process_with_error_handling)
.option("checkpointLocation", "/checkpoints/user-events")
.start()
undefinedquery = parsed_df.writeStream
.foreachBatch(process_with_error_handling)
.option("checkpointLocation", "/checkpoints/user-events")
.start()
.foreachBatch(process_with_error_handling)
.option("checkpointLocation", "/checkpoints/user-events")
.start()
undefinedStep 5: Monitor Stream Health
步骤5:监控流处理健康状态
python
undefinedpython
undefinedmonitoring/stream_metrics.py
monitoring/stream_metrics.py
from prometheus_client import Gauge, Counter, start_http_server
from prometheus_client import Gauge, Counter, start_http_server
Define metrics
Define metrics
RECORDS_PROCESSED = Counter(
'stream_records_processed_total',
'Total records processed',
['stream_name', 'status']
)
PROCESSING_LAG = Gauge(
'stream_processing_lag_seconds',
'Current processing lag',
['stream_name']
)
BATCH_DURATION = Gauge(
'stream_batch_duration_seconds',
'Last batch processing duration',
['stream_name']
)
def emit_metrics(query):
"""Emit Prometheus metrics from streaming query."""
progress = query.lastProgress
if progress:
RECORDS_PROCESSED.labels(
stream_name='user-events',
status='success'
).inc(progress['numInputRows'])
if progress['sources']:
# Calculate lag from latest offset
for source in progress['sources']:
end_offset = source.get('endOffset', {})
# Parse Kafka offsets and calculate lag
---RECORDS_PROCESSED = Counter(
'stream_records_processed_total',
'Total records processed',
['stream_name', 'status']
)
PROCESSING_LAG = Gauge(
'stream_processing_lag_seconds',
'Current processing lag',
['stream_name']
)
BATCH_DURATION = Gauge(
'stream_batch_duration_seconds',
'Last batch processing duration',
['stream_name']
)
def emit_metrics(query):
"""Emit Prometheus metrics from streaming query."""
progress = query.lastProgress
if progress:
RECORDS_PROCESSED.labels(
stream_name='user-events',
status='success'
).inc(progress['numInputRows'])
if progress['sources']:
# Calculate lag from latest offset
for source in progress['sources']:
end_offset = source.get('endOffset', {})
# Parse Kafka offsets and calculate lag
---Workflow 3: Data Quality Framework Setup
工作流3:数据质量框架搭建
Scenario: Implement comprehensive data quality monitoring with Great Expectations.
场景: 使用Great Expectations实现全面的数据质量监控。
Step 1: Initialize Great Expectations
步骤1:初始化Great Expectations
bash
undefinedbash
undefinedInstall and initialize
Install and initialize
pip install great_expectations
great_expectations init
pip install great_expectations
great_expectations init
Connect to data source
Connect to data source
great_expectations datasource new
undefinedgreat_expectations datasource new
undefinedStep 2: Create Expectation Suite
步骤2:创建期望套件
python
undefinedpython
undefinedexpectations/orders_suite.py
expectations/orders_suite.py
import great_expectations as gx
context = gx.get_context()
import great_expectations as gx
context = gx.get_context()
Create expectation suite
Create expectation suite
suite = context.add_expectation_suite("orders_quality_suite")
suite = context.add_expectation_suite("orders_quality_suite")
Add expectations
Add expectations
validator = context.get_validator(
batch_request={
"datasource_name": "warehouse",
"data_asset_name": "orders",
},
expectation_suite_name="orders_quality_suite"
)
validator = context.get_validator(
batch_request={
"datasource_name": "warehouse",
"data_asset_name": "orders",
},
expectation_suite_name="orders_quality_suite"
)
Schema expectations
Schema expectations
validator.expect_table_columns_to_match_ordered_list(
column_list=[
"order_id", "customer_id", "order_date",
"total_amount", "status", "created_at"
]
)
validator.expect_table_columns_to_match_ordered_list(
column_list=[
"order_id", "customer_id", "order_date",
"total_amount", "status", "created_at"
]
)
Completeness expectations
Completeness expectations
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_not_be_null("order_date")
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_not_be_null("order_date")
Uniqueness expectations
Uniqueness expectations
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_unique("order_id")
Range expectations
Range expectations
validator.expect_column_values_to_be_between(
"total_amount",
min_value=0,
max_value=1000000
)
validator.expect_column_values_to_be_between(
"total_amount",
min_value=0,
max_value=1000000
)
Categorical expectations
Categorical expectations
validator.expect_column_values_to_be_in_set(
"status",
["pending", "confirmed", "shipped", "delivered", "cancelled"]
)
validator.expect_column_values_to_be_in_set(
"status",
["pending", "confirmed", "shipped", "delivered", "cancelled"]
)
Freshness expectation
Freshness expectation
validator.expect_column_max_to_be_between(
"order_date",
min_value={"$PARAMETER": "now - timedelta(days=1)"},
max_value={"$PARAMETER": "now"}
)
validator.expect_column_max_to_be_between(
"order_date",
min_value={"$PARAMETER": "now - timedelta(days=1)"},
max_value={"$PARAMETER": "now"}
)
Referential integrity
Referential integrity
validator.expect_column_values_to_be_in_set(
"customer_id",
value_set={"$PARAMETER": "valid_customer_ids"}
)
validator.save_expectation_suite(discard_failed_expectations=False)
undefinedvalidator.expect_column_values_to_be_in_set(
"customer_id",
value_set={"$PARAMETER": "valid_customer_ids"}
)
validator.save_expectation_suite(discard_failed_expectations=False)
undefinedStep 3: Create Data Quality Checks with dbt
步骤3:使用dbt创建数据质量检查
yaml
undefinedyaml
undefinedmodels/marts/schema.yml
models/marts/schema.yml
version: 2
models:
-
name: fct_orders description: "Order fact table with data quality checks"tests:
Row count check
- dbt_utils.equal_rowcount: compare_model: ref('stg_orders')
Freshness check
- dbt_utils.recency: datepart: hour field: created_at interval: 24
columns:-
name: order_id description: "Unique order identifier" tests:
- unique
- not_null
- relationships: to: ref('dim_orders') field: order_id
-
name: total_amount tests:
- not_null
- dbt_utils.accepted_range: min_value: 0 max_value: 1000000 inclusive: true
- dbt_expectations.expect_column_values_to_be_between: min_value: 0 row_condition: "status != 'cancelled'"
-
name: customer_id tests:
- not_null
- relationships: to: ref('dim_customers') field: customer_id severity: warn
undefinedversion: 2
models:
-
name: fct_orders description: "Order fact table with data quality checks"tests:
Row count check
- dbt_utils.equal_rowcount: compare_model: ref('stg_orders')
Freshness check
- dbt_utils.recency: datepart: hour field: created_at interval: 24
columns:-
name: order_id description: "Unique order identifier" tests:
- unique
- not_null
- relationships: to: ref('dim_orders') field: order_id
-
name: total_amount tests:
- not_null
- dbt_utils.accepted_range: min_value: 0 max_value: 1000000 inclusive: true
- dbt_expectations.expect_column_values_to_be_between: min_value: 0 row_condition: "status != 'cancelled'"
-
name: customer_id tests:
- not_null
- relationships: to: ref('dim_customers') field: customer_id severity: warn
undefinedStep 4: Implement Data Contracts
步骤4:实现数据契约
yaml
undefinedyaml
undefinedcontracts/orders_contract.yaml
contracts/orders_contract.yaml
contract:
name: orders_data_contract
version: "1.0.0"
owner: data-team@company.com
schema:
type: object
properties:
order_id:
type: string
format: uuid
description: "Unique order identifier"
customer_id:
type: string
not_null: true
order_date:
type: date
not_null: true
total_amount:
type: decimal
precision: 10
scale: 2
minimum: 0
status:
type: string
enum: ["pending", "confirmed", "shipped", "delivered", "cancelled"]
sla:
freshness:
max_delay_hours: 1
completeness:
min_percentage: 99.9
accuracy:
duplicate_tolerance: 0.01
consumers:
- name: analytics-team usage: "Daily reporting dashboards"
- name: ml-team usage: "Churn prediction model"
undefinedcontract:
name: orders_data_contract
version: "1.0.0"
owner: data-team@company.com
schema:
type: object
properties:
order_id:
type: string
format: uuid
description: "Unique order identifier"
customer_id:
type: string
not_null: true
order_date:
type: date
not_null: true
total_amount:
type: decimal
precision: 10
scale: 2
minimum: 0
status:
type: string
enum: ["pending", "confirmed", "shipped", "delivered", "cancelled"]
sla:
freshness:
max_delay_hours: 1
completeness:
min_percentage: 99.9
accuracy:
duplicate_tolerance: 0.01
consumers:
- name: analytics-team usage: "Daily reporting dashboards"
- name: ml-team usage: "Churn prediction model"
undefinedStep 5: Set Up Quality Monitoring Dashboard
步骤5:搭建质量监控仪表盘
python
undefinedpython
undefinedmonitoring/quality_dashboard.py
monitoring/quality_dashboard.py
from datetime import datetime, timedelta
import pandas as pd
def generate_quality_report(connection, table_name: str) -> dict:
"""Generate comprehensive data quality report."""
report = {
"table": table_name,
"timestamp": datetime.now().isoformat(),
"checks": {}
}
# Row count check
row_count = connection.execute(
f"SELECT COUNT(*) FROM {table_name}"
).fetchone()[0]
report["checks"]["row_count"] = {
"value": row_count,
"status": "pass" if row_count > 0 else "fail"
}
# Freshness check
max_date = connection.execute(
f"SELECT MAX(created_at) FROM {table_name}"
).fetchone()[0]
hours_old = (datetime.now() - max_date).total_seconds() / 3600
report["checks"]["freshness"] = {
"max_timestamp": max_date.isoformat(),
"hours_old": round(hours_old, 2),
"status": "pass" if hours_old < 24 else "fail"
}
# Null rate check
null_query = f"""
SELECT
SUM(CASE WHEN order_id IS NULL THEN 1 ELSE 0 END) as null_order_id,
SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) as null_customer_id,
COUNT(*) as total
FROM {table_name}
"""
null_result = connection.execute(null_query).fetchone()
report["checks"]["null_rates"] = {
"order_id": null_result[0] / null_result[2] if null_result[2] > 0 else 0,
"customer_id": null_result[1] / null_result[2] if null_result[2] > 0 else 0,
"status": "pass" if null_result[0] == 0 and null_result[1] == 0 else "fail"
}
# Duplicate check
dup_query = f"""
SELECT COUNT(*) - COUNT(DISTINCT order_id) as duplicates
FROM {table_name}
"""
duplicates = connection.execute(dup_query).fetchone()[0]
report["checks"]["duplicates"] = {
"count": duplicates,
"status": "pass" if duplicates == 0 else "fail"
}
# Overall status
all_passed = all(
check["status"] == "pass"
for check in report["checks"].values()
)
report["overall_status"] = "pass" if all_passed else "fail"
return report
---from datetime import datetime, timedelta
import pandas as pd
def generate_quality_report(connection, table_name: str) -> dict:
"""Generate comprehensive data quality report."""
report = {
"table": table_name,
"timestamp": datetime.now().isoformat(),
"checks": {}
}
# Row count check
row_count = connection.execute(
f"SELECT COUNT(*) FROM {table_name}"
).fetchone()[0]
report["checks"]["row_count"] = {
"value": row_count,
"status": "pass" if row_count > 0 else "fail"
}
# Freshness check
max_date = connection.execute(
f"SELECT MAX(created_at) FROM {table_name}"
).fetchone()[0]
hours_old = (datetime.now() - max_date).total_seconds() / 3600
report["checks"]["freshness"] = {
"max_timestamp": max_date.isoformat(),
"hours_old": round(hours_old, 2),
"status": "pass" if hours_old < 24 else "fail"
}
# Null rate check
null_query = f"""
SELECT
SUM(CASE WHEN order_id IS NULL THEN 1 ELSE 0 END) as null_order_id,
SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) as null_customer_id,
COUNT(*) as total
FROM {table_name}
"""
null_result = connection.execute(null_query).fetchone()
report["checks"]["null_rates"] = {
"order_id": null_result[0] / null_result[2] if null_result[2] > 0 else 0,
"customer_id": null_result[1] / null_result[2] if null_result[2] > 0 else 0,
"status": "pass" if null_result[0] == 0 and null_result[1] == 0 else "fail"
}
# Duplicate check
dup_query = f"""
SELECT COUNT(*) - COUNT(DISTINCT order_id) as duplicates
FROM {table_name}
"""
duplicates = connection.execute(dup_query).fetchone()[0]
report["checks"]["duplicates"] = {
"count": duplicates,
"status": "pass" if duplicates == 0 else "fail"
}
# Overall status
all_passed = all(
check["status"] == "pass"
for check in report["checks"].values()
)
report["overall_status"] = "pass" if all_passed else "fail"
return report
---Architecture Decision Framework
架构决策框架
Use this framework to choose the right approach for your data pipeline.
使用此框架为你的数据管道选择合适的方案。
Batch vs Streaming
批量处理 vs 流处理
| Criteria | Batch | Streaming |
|---|---|---|
| Latency requirement | Hours to days | Seconds to minutes |
| Data volume | Large historical datasets | Continuous event streams |
| Processing complexity | Complex transformations, ML | Simple aggregations, filtering |
| Cost sensitivity | More cost-effective | Higher infrastructure cost |
| Error handling | Easier to reprocess | Requires careful design |
Decision Tree:
Is real-time insight required?
├── Yes → Use streaming
│ └── Is exactly-once semantics needed?
│ ├── Yes → Kafka + Flink/Spark Structured Streaming
│ └── No → Kafka + consumer groups
└── No → Use batch
└── Is data volume > 1TB daily?
├── Yes → Spark/Databricks
└── No → dbt + warehouse compute| 评估标准 | 批量处理 | 流处理 |
|---|---|---|
| 延迟要求 | 数小时到数天 | 数秒到数分钟 |
| 数据量 | 大型历史数据集 | 持续事件流 |
| 处理复杂度 | 复杂转换、机器学习 | 简单聚合、过滤 |
| 成本敏感度 | 更具成本效益 | 基础设施成本更高 |
| 错误处理 | 易于重新处理 | 需要精心设计 |
决策树:
是否需要实时洞察?
├── 是 → 使用流处理
│ └── 是否需要精确一次语义?
│ ├── 是 → Kafka + Flink/Spark Structured Streaming
│ └── 否 → Kafka + 消费者组
└── 否 → 使用批量处理
└── 每日数据量是否超过1TB?
├── 是 → Spark/Databricks
└── 否 → dbt + 数据仓库计算资源Lambda vs Kappa Architecture
Lambda vs Kappa架构
| Aspect | Lambda | Kappa |
|---|---|---|
| Complexity | Two codebases (batch + stream) | Single codebase |
| Maintenance | Higher (sync batch/stream logic) | Lower |
| Reprocessing | Native batch layer | Replay from source |
| Use case | ML training + real-time serving | Pure event-driven |
When to choose Lambda:
- Need to train ML models on historical data
- Complex batch transformations not feasible in streaming
- Existing batch infrastructure
When to choose Kappa:
- Event-sourced architecture
- All processing can be expressed as stream operations
- Starting fresh without legacy systems
| 评估维度 | Lambda | Kappa |
|---|---|---|
| 复杂度 | 两个代码库(批量+流处理) | 单个代码库 |
| 维护成本 | 更高(同步批量/流处理逻辑) | 更低 |
| 重处理能力 | 原生批量层支持 | 从源端重放数据 |
| 适用场景 | 机器学习训练 + 实时服务 | 纯事件驱动场景 |
何时选择Lambda:
- 需要基于历史数据训练机器学习模型
- 复杂批量转换无法在流处理中实现
- 已有批量处理基础设施
何时选择Kappa:
- 事件源架构
- 所有处理逻辑都可通过流操作实现
- 从零开始搭建,无遗留系统
Data Warehouse vs Data Lakehouse
数据仓库 vs 数据湖仓
| Feature | Warehouse (Snowflake/BigQuery) | Lakehouse (Delta/Iceberg) |
|---|---|---|
| Best for | BI, SQL analytics | ML, unstructured data |
| Storage cost | Higher (proprietary format) | Lower (open formats) |
| Flexibility | Schema-on-write | Schema-on-read |
| Performance | Excellent for SQL | Good, improving |
| Ecosystem | Mature BI tools | Growing ML tooling |
| 特性 | 数据仓库(Snowflake/BigQuery) | 数据湖仓(Delta/Iceberg) |
|---|---|---|
| 最佳适用场景 | 商业智能、SQL分析 | 机器学习、非结构化数据 |
| 存储成本 | 更高(专有格式) | 更低(开放格式) |
| 灵活性 | 写时模式 | 读时模式 |
| 性能 | SQL查询性能优异 | 良好且持续提升 |
| 生态系统 | 成熟的BI工具链 | 不断增长的机器学习工具支持 |
Tech Stack
技术栈
| Category | Technologies |
|---|---|
| Languages | Python, SQL, Scala |
| Orchestration | Airflow, Prefect, Dagster |
| Transformation | dbt, Spark, Flink |
| Streaming | Kafka, Kinesis, Pub/Sub |
| Storage | S3, GCS, Delta Lake, Iceberg |
| Warehouses | Snowflake, BigQuery, Redshift, Databricks |
| Quality | Great Expectations, dbt tests, Monte Carlo |
| Monitoring | Prometheus, Grafana, Datadog |
| 分类 | 技术选型 |
|---|---|
| 编程语言 | Python, SQL, Scala |
| 编排工具 | Airflow, Prefect, Dagster |
| 转换工具 | dbt, Spark, Flink |
| 流处理工具 | Kafka, Kinesis, Pub/Sub |
| 存储系统 | S3, GCS, Delta Lake, Iceberg |
| 数据仓库 | Snowflake, BigQuery, Redshift, Databricks |
| 数据质量工具 | Great Expectations, dbt tests, Monte Carlo |
| 监控工具 | Prometheus, Grafana, Datadog |
Reference Documentation
参考文档
1. Data Pipeline Architecture
1. 数据管道架构
See for:
references/data_pipeline_architecture.md- Lambda vs Kappa architecture patterns
- Batch processing with Spark and Airflow
- Stream processing with Kafka and Flink
- Exactly-once semantics implementation
- Error handling and dead letter queues
查看获取:
references/data_pipeline_architecture.md- Lambda vs Kappa架构模式
- 基于Spark和Airflow的批量处理
- 基于Kafka和Flink的流处理
- 精确一次语义的实现
- 错误处理与死信队列
2. Data Modeling Patterns
2. 数据建模模式
See for:
references/data_modeling_patterns.md- Dimensional modeling (Star/Snowflake)
- Slowly Changing Dimensions (SCD Types 1-6)
- Data Vault modeling
- dbt best practices
- Partitioning and clustering
查看获取:
references/data_modeling_patterns.md- 维度建模(星型/雪花模型)
- 缓慢变化维度(SCD类型1-6)
- 数据仓库模型
- dbt最佳实践
- 分区与聚类
3. DataOps Best Practices
3. DataOps最佳实践
See for:
references/dataops_best_practices.md- Data testing frameworks
- Data contracts and schema validation
- CI/CD for data pipelines
- Observability and lineage
- Incident response
查看获取:
references/dataops_best_practices.md- 数据测试框架
- 数据契约与结构验证
- 数据管道的CI/CD
- 可观测性与数据血缘
- 事件响应流程
Troubleshooting
故障排除
Pipeline Failures
管道故障
Symptom: Airflow DAG fails with timeout
Task exceeded max execution timeSolution:
- Check resource allocation
- Profile slow operations
- Add incremental processing
python
undefined症状: Airflow DAG因超时失败
Task exceeded max execution time解决方案:
- 检查资源分配
- 分析缓慢操作
- 添加增量处理逻辑
python
undefinedIncrease timeout
Increase timeout
default_args = {
'execution_timeout': timedelta(hours=2),
}
default_args = {
'execution_timeout': timedelta(hours=2),
}
Or use incremental loads
Or use incremental loads
WHERE updated_at > '{{ prev_ds }}'
---
**Symptom:** Spark job OOMjava.lang.OutOfMemoryError: Java heap space
**Solution:**
1. Increase executor memory
2. Reduce partition size
3. Use disk spill
```python
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.memory.fraction", "0.8")Symptom: Kafka consumer lag increasing
Consumer lag: 1000000 messagesSolution:
- Increase consumer parallelism
- Optimize processing logic
- Scale consumer group
bash
undefinedWHERE updated_at > '{{ prev_ds }}'
---
**症状:** Spark作业内存溢出java.lang.OutOfMemoryError: Java heap space
**解决方案:**
1. 增加执行器内存
2. 减少分区大小
3. 使用磁盘溢出
```python
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.memory.fraction", "0.8")症状: Kafka消费者延迟持续增加
Consumer lag: 1000000 messages解决方案:
- 增加消费者并行度
- 优化处理逻辑
- 扩容消费者组
bash
undefinedAdd more partitions
Add more partitions
kafka-topics.sh --alter
--bootstrap-server localhost:9092
--topic user-events
--partitions 24
--bootstrap-server localhost:9092
--topic user-events
--partitions 24
---kafka-topics.sh --alter
--bootstrap-server localhost:9092
--topic user-events
--partitions 24
--bootstrap-server localhost:9092
--topic user-events
--partitions 24
---Data Quality Issues
数据质量问题
Symptom: Duplicate records appearing
Expected unique, found 150 duplicatesSolution:
- Add deduplication logic
- Use merge/upsert operations
sql
-- dbt incremental with dedup
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
SELECT * FROM (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY updated_at DESC
) as rn
FROM {{ source('raw', 'orders') }}
) WHERE rn = 1Symptom: Stale data in tables
Last update: 3 days agoSolution:
- Check upstream pipeline status
- Verify source availability
- Add freshness monitoring
yaml
undefined症状: 出现重复记录
Expected unique, found 150 duplicates解决方案:
- 添加去重逻辑
- 使用合并/更新操作
sql
-- dbt incremental with dedup
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
SELECT * FROM (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY updated_at DESC
) as rn
FROM {{ source('raw', 'orders') }}
) WHERE rn = 1症状: 表中数据过期
Last update: 3 days ago解决方案:
- 检查上游管道状态
- 验证源端可用性
- 添加新鲜度监控
yaml
undefineddbt freshness check
dbt freshness check
sources:
- name: raw freshness: warn_after: {count: 12, period: hour} error_after: {count: 24, period: hour} loaded_at_field: _loaded_at
---
**Symptom:** Schema drift detectedColumn 'new_field' not in expected schema
**Solution:**
1. Update data contract
2. Modify transformations
3. Communicate with producers
```pythonsources:
- name: raw freshness: warn_after: {count: 12, period: hour} error_after: {count: 24, period: hour} loaded_at_field: _loaded_at
---
**症状:** 检测到结构漂移Column 'new_field' not in expected schema
**解决方案:**
1. 更新数据契约
2. 修改转换逻辑
3. 与数据生产者沟通
```pythonHandle schema evolution
Handle schema evolution
df = spark.read.format("delta")
.option("mergeSchema", "true")
.load("/data/orders")
.option("mergeSchema", "true")
.load("/data/orders")
---df = spark.read.format("delta")
.option("mergeSchema", "true")
.load("/data/orders")
.option("mergeSchema", "true")
.load("/data/orders")
---Performance Issues
性能问题
Symptom: Query takes hours
Query runtime: 4 hours (expected: 30 minutes)Solution:
- Check query plan
- Add proper partitioning
- Optimize joins
sql
-- Before: Full table scan
SELECT * FROM orders WHERE order_date = '2024-01-15';
-- After: Partition pruning
-- Table partitioned by order_date
SELECT * FROM orders WHERE order_date = '2024-01-15';
-- Add clustering for frequent filters
ALTER TABLE orders CLUSTER BY (customer_id);Symptom: dbt model takes too long
Model fct_orders completed in 45 minutesSolution:
- Use incremental materialization
- Reduce upstream dependencies
- Pre-aggregate where possible
sql
-- Convert to incremental
{{
config(
materialized='incremental',
unique_key='order_id',
on_schema_change='sync_all_columns'
)
}}
SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE _loaded_at > (SELECT MAX(_loaded_at) FROM {{ this }})
{% endif %}症状: 查询耗时数小时
Query runtime: 4 hours (expected: 30 minutes)解决方案:
- 查看查询计划
- 添加合适的分区
- 优化关联操作
sql
-- Before: Full table scan
SELECT * FROM orders WHERE order_date = '2024-01-15';
-- After: Partition pruning
-- Table partitioned by order_date
SELECT * FROM orders WHERE order_date = '2024-01-15';
-- Add clustering for frequent filters
ALTER TABLE orders CLUSTER BY (customer_id);症状: dbt模型运行时间过长
Model fct_orders completed in 45 minutes解决方案:
- 使用增量物化
- 减少上游依赖
- 尽可能预聚合
sql
-- Convert to incremental
{{
config(
materialized='incremental',
unique_key='order_id',
on_schema_change='sync_all_columns'
)
}}
SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE _loaded_at > (SELECT MAX(_loaded_at) FROM {{ this }})
{% endif %}