amee-joshi-data-engineering-portfolio
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAmee Joshi Data Engineering Portfolio
Amee Joshi 数据工程作品集
Skill by ara.so — Data Skills collection.
This portfolio showcases production-grade data engineering patterns and architectures for building scalable, cloud-native data platforms. It demonstrates end-to-end solutions covering data ingestion, transformation, modeling, and analytics using Azure services, Databricks, SQL Server, and BI tools.
由ara.so提供的技能——数据技能合集。
本作品集展示了用于构建可扩展云原生数据平台的生产级数据工程模式与架构,演示了使用Azure服务、Databricks、SQL Server及BI工具实现的从数据摄取、转换、建模到分析的端到端解决方案。
What This Portfolio Demonstrates
本作品集展示内容
This is a reference collection showing:
- Medallion Architecture (Bronze-Silver-Gold) implementations
- Azure cloud data platforms (ADF, ADLS Gen2, Databricks, Synapse Analytics)
- Data lakehouse patterns with Delta Lake
- Dimensional modeling (Star Schema, SCD Type 1 & 2)
- Metadata-driven ingestion frameworks
- Analytics-ready datasets for BI consumption
- ETL/ELT pipeline design with incremental loading
- Power BI and Tableau reporting solutions
这是一份参考合集,涵盖:
- Medallion架构(Bronze-Silver-Gold) 实现方案
- Azure云数据平台(ADF、ADLS Gen2、Databricks、Synapse Analytics)
- 基于Delta Lake的数据湖仓模式
- 维度建模(星型模型、SCD Type 1 & 2)
- 元数据驱动的摄取框架
- 供BI工具使用的分析就绪数据集
- 支持增量加载的ETL/ELT管道设计
- Power BI和Tableau 报表解决方案
Key Portfolio Projects
核心作品集项目
1. Azure Databricks Retail Lakehouse
1. Azure Databricks 零售湖仓
Repository:
azure-databricks-end-to-end-retail-lakehousePattern: Enterprise Medallion Architecture with Delta Lake
Architecture:
Bronze (Raw) → Silver (Cleansed) → Gold (Analytics-Ready)Key Implementation Concepts:
python
undefined代码仓库:
azure-databricks-end-to-end-retail-lakehouse模式: 基于Delta Lake的企业级Medallion架构
架构:
Bronze (原始层) → Silver (清洗层) → Gold (分析就绪层)核心实现概念:
python
undefinedBronze Layer - Raw Ingestion
Bronze Layer - Raw Ingestion
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
Ingest raw data with metadata
Ingest raw data with metadata
df_raw = (spark.read
.format("parquet")
.load(f"{bronze_path}/source_data/")
.withColumn("ingestion_timestamp", current_timestamp())
.withColumn("source_file", input_file_name())
)
df_raw = (spark.read
.format("parquet")
.load(f"{bronze_path}/source_data/")
.withColumn("ingestion_timestamp", current_timestamp())
.withColumn("source_file", input_file_name())
)
Write to Bronze Delta table
Write to Bronze Delta table
(df_raw.write
.format("delta")
.mode("append")
.option("mergeSchema", "true")
.save(f"{bronze_path}/retail_transactions")
)
```python(df_raw.write
.format("delta")
.mode("append")
.option("mergeSchema", "true")
.save(f"{bronze_path}/retail_transactions")
)
```pythonSilver Layer - Data Quality & Transformation
Silver Layer - Data Quality & Transformation
from pyspark.sql.functions import col, when, trim, upper
from pyspark.sql.functions import col, when, trim, upper
Cleanse and standardize
Cleanse and standardize
df_silver = (df_bronze
.filter(col("transaction_id").isNotNull())
.withColumn("customer_name", trim(upper(col("customer_name"))))
.withColumn("transaction_amount",
when(col("transaction_amount") < 0, 0)
.otherwise(col("transaction_amount")))
.dropDuplicates(["transaction_id"])
.select("transaction_id", "customer_id", "product_id",
"transaction_amount", "transaction_date")
)
df_silver = (df_bronze
.filter(col("transaction_id").isNotNull())
.withColumn("customer_name", trim(upper(col("customer_name"))))
.withColumn("transaction_amount",
when(col("transaction_amount") < 0, 0)
.otherwise(col("transaction_amount")))
.dropDuplicates(["transaction_id"])
.select("transaction_id", "customer_id", "product_id",
"transaction_amount", "transaction_date")
)
Write with schema enforcement
Write with schema enforcement
(df_silver.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "false")
.save(f"{silver_path}/transactions")
)
```python(df_silver.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "false")
.save(f"{silver_path}/transactions")
)
```pythonGold Layer - SCD Type 2 Dimension
Gold Layer - SCD Type 2 Dimension
def apply_scd_type2(target_table, source_df, key_columns, scd_columns):
"""
Implements Slowly Changing Dimension Type 2
"""
from delta.tables import DeltaTable
from pyspark.sql.functions import lit, current_timestamp
# Prepare source with SCD metadata
source_prepared = (source_df
.withColumn("effective_date", current_timestamp())
.withColumn("end_date", lit(None).cast("timestamp"))
.withColumn("is_current", lit(True))
)
# Read existing target
target_delta = DeltaTable.forPath(spark, target_table)
# Identify changes
merge_condition = " AND ".join([f"target.{k} = source.{k}" for k in key_columns])
# Perform SCD Type 2 merge
(target_delta.alias("target")
.merge(source_prepared.alias("source"), merge_condition)
.whenMatchedUpdate(
condition = "target.is_current = true AND " +
" OR ".join([f"target.{c} != source.{c}" for c in scd_columns]),
set = {
"is_current": "false",
"end_date": "current_timestamp()"
}
)
.whenNotMatchedInsertAll()
.execute()
)undefineddef apply_scd_type2(target_table, source_df, key_columns, scd_columns):
"""
Implements Slowly Changing Dimension Type 2
"""
from delta.tables import DeltaTable
from pyspark.sql.functions import lit, current_timestamp
# Prepare source with SCD metadata
source_prepared = (source_df
.withColumn("effective_date", current_timestamp())
.withColumn("end_date", lit(None).cast("timestamp"))
.withColumn("is_current", lit(True))
)
# Read existing target
target_delta = DeltaTable.forPath(spark, target_table)
# Identify changes
merge_condition = " AND ".join([f"target.{k} = source.{k}" for k in key_columns])
# Perform SCD Type 2 merge
(target_delta.alias("target")
.merge(source_prepared.alias("source"), merge_condition)
.whenMatchedUpdate(
condition = "target.is_current = true AND " +
" OR ".join([f"target.{c} != source.{c}" for c in scd_columns]),
set = {
"is_current": "false",
"end_date": "current_timestamp()"
}
)
.whenNotMatchedInsertAll()
.execute()
)undefined2. Metadata-Driven Ingestion Framework
2. 元数据驱动的摄取框架
Pattern: Dynamic, configuration-based pipeline generation
Configuration Schema:
json
{
"pipeline_config": {
"source_system": "SQL_SERVER",
"target_layer": "bronze",
"ingestion_type": "incremental",
"watermark_column": "modified_date",
"tables": [
{
"schema_name": "sales",
"table_name": "orders",
"partition_column": "order_date",
"primary_key": ["order_id"],
"target_path": "/bronze/sales/orders"
}
]
}
}Azure Data Factory Pattern:
python
undefined模式: 基于配置的动态管道生成
配置Schema:
json
{
"pipeline_config": {
"source_system": "SQL_SERVER",
"target_layer": "bronze",
"ingestion_type": "incremental",
"watermark_column": "modified_date",
"tables": [
{
"schema_name": "sales",
"table_name": "orders",
"partition_column": "order_date",
"primary_key": ["order_id"],
"target_path": "/bronze/sales/orders"
}
]
}
}Azure Data Factory模式:
python
undefinedDynamic pipeline parameter processing
Dynamic pipeline parameter processing
This represents the logic implemented in ADF
This represents the logic implemented in ADF
def generate_copy_activity(table_config):
"""
Generates ADF copy activity from metadata
"""
return {
"name": f"Copy_{table_config['table_name']}",
"type": "Copy",
"inputs": [{
"referenceName": "SourceDataset",
"type": "DatasetReference",
"parameters": {
"schemaName": table_config['schema_name'],
"tableName": table_config['table_name']
}
}],
"outputs": [{
"referenceName": "SinkDataset",
"type": "DatasetReference",
"parameters": {
"targetPath": table_config['target_path']
}
}],
"typeProperties": {
"source": {
"type": "SqlServerSource",
"sqlReaderQuery": f"""
SELECT * FROM {table_config['schema_name']}.{table_config['table_name']}
WHERE {table_config['watermark_column']} > '@{{pipeline().parameters.watermarkValue}}'
"""
},
"sink": {
"type": "ParquetSink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings",
"copyBehavior": "PreserveHierarchy"
}
}
}
}
undefineddef generate_copy_activity(table_config):
"""
Generates ADF copy activity from metadata
"""
return {
"name": f"Copy_{table_config['table_name']}",
"type": "Copy",
"inputs": [{
"referenceName": "SourceDataset",
"type": "DatasetReference",
"parameters": {
"schemaName": table_config['schema_name'],
"tableName": table_config['table_name']
}
}],
"outputs": [{
"referenceName": "SinkDataset",
"type": "DatasetReference",
"parameters": {
"targetPath": table_config['target_path']
}
}],
"typeProperties": {
"source": {
"type": "SqlServerSource",
"sqlReaderQuery": f"""
SELECT * FROM {table_config['schema_name']}.{table_config['table_name']}
WHERE {table_config['watermark_column']} > '@{{pipeline().parameters.watermarkValue}}'
"""
},
"sink": {
"type": "ParquetSink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings",
"copyBehavior": "PreserveHierarchy"
}
}
}
}
undefined3. Star Schema Data Warehouse
3. 星型模型数据仓库
Pattern: Dimensional Modeling with SQL Server
Dimension Table (SCD Type 1):
sql
-- Dimension: Product (SCD Type 1)
CREATE TABLE dim_product (
product_key INT IDENTITY(1,1) PRIMARY KEY,
product_id INT NOT NULL,
product_name NVARCHAR(100),
category NVARCHAR(50),
subcategory NVARCHAR(50),
unit_price DECIMAL(10,2),
modified_date DATETIME DEFAULT GETDATE(),
CONSTRAINT uk_product UNIQUE (product_id)
);
-- ETL Merge (SCD Type 1 - Overwrite)
MERGE INTO dim_product AS target
USING (
SELECT
product_id,
product_name,
category,
subcategory,
unit_price
FROM staging.products
) AS source
ON target.product_id = source.product_id
WHEN MATCHED AND (
target.product_name != source.product_name OR
target.category != source.category OR
target.unit_price != source.unit_price
)
THEN UPDATE SET
target.product_name = source.product_name,
target.category = source.category,
target.subcategory = source.subcategory,
target.unit_price = source.unit_price,
target.modified_date = GETDATE()
WHEN NOT MATCHED BY TARGET
THEN INSERT (product_id, product_name, category, subcategory, unit_price)
VALUES (source.product_id, source.product_name, source.category,
source.subcategory, source.unit_price);Dimension Table (SCD Type 2):
sql
-- Dimension: Customer (SCD Type 2)
CREATE TABLE dim_customer (
customer_key INT IDENTITY(1,1) PRIMARY KEY,
customer_id INT NOT NULL,
customer_name NVARCHAR(100),
email NVARCHAR(100),
city NVARCHAR(50),
state NVARCHAR(50),
effective_date DATETIME NOT NULL,
end_date DATETIME NULL,
is_current BIT DEFAULT 1,
CONSTRAINT uk_customer_current UNIQUE (customer_id, is_current)
);
-- ETL for SCD Type 2
-- Step 1: Expire changed records
UPDATE dim_customer
SET
end_date = GETDATE(),
is_current = 0
WHERE customer_id IN (
SELECT s.customer_id
FROM staging.customers s
INNER JOIN dim_customer d ON s.customer_id = d.customer_id
WHERE d.is_current = 1
AND (s.city != d.city OR s.state != d.state)
);
-- Step 2: Insert new versions
INSERT INTO dim_customer (
customer_id, customer_name, email, city, state,
effective_date, end_date, is_current
)
SELECT
s.customer_id,
s.customer_name,
s.email,
s.city,
s.state,
GETDATE() AS effective_date,
NULL AS end_date,
1 AS is_current
FROM staging.customers s
LEFT JOIN dim_customer d ON s.customer_id = d.customer_id AND d.is_current = 1
WHERE d.customer_key IS NULL
OR s.city != d.city
OR s.state != d.state;Fact Table:
sql
-- Fact: Sales Transactions
CREATE TABLE fact_sales (
sales_key BIGINT IDENTITY(1,1) PRIMARY KEY,
date_key INT NOT NULL,
customer_key INT NOT NULL,
product_key INT NOT NULL,
store_key INT NOT NULL,
quantity INT NOT NULL,
unit_price DECIMAL(10,2) NOT NULL,
discount_amount DECIMAL(10,2) DEFAULT 0,
tax_amount DECIMAL(10,2) DEFAULT 0,
total_amount DECIMAL(10,2) NOT NULL,
CONSTRAINT fk_date FOREIGN KEY (date_key) REFERENCES dim_date(date_key),
CONSTRAINT fk_customer FOREIGN KEY (customer_key) REFERENCES dim_customer(customer_key),
CONSTRAINT fk_product FOREIGN KEY (product_key) REFERENCES dim_product(product_key),
CONSTRAINT fk_store FOREIGN KEY (store_key) REFERENCES dim_store(store_key)
);
-- Create columnstore index for analytics
CREATE NONCLUSTERED COLUMNSTORE INDEX idx_fact_sales_cs
ON fact_sales (date_key, customer_key, product_key, store_key,
quantity, unit_price, total_amount);
-- ETL Load
INSERT INTO fact_sales (
date_key, customer_key, product_key, store_key,
quantity, unit_price, discount_amount, tax_amount, total_amount
)
SELECT
dd.date_key,
dc.customer_key,
dp.product_key,
ds.store_key,
st.quantity,
st.unit_price,
st.discount_amount,
st.tax_amount,
st.total_amount
FROM staging.transactions st
INNER JOIN dim_date dd ON CAST(st.transaction_date AS DATE) = dd.date
INNER JOIN dim_customer dc ON st.customer_id = dc.customer_id AND dc.is_current = 1
INNER JOIN dim_product dp ON st.product_id = dp.product_id
INNER JOIN dim_store ds ON st.store_id = ds.store_id;模式: 基于SQL Server的维度建模
维度表(SCD Type 1):
sql
-- Dimension: Product (SCD Type 1)
CREATE TABLE dim_product (
product_key INT IDENTITY(1,1) PRIMARY KEY,
product_id INT NOT NULL,
product_name NVARCHAR(100),
category NVARCHAR(50),
subcategory NVARCHAR(50),
unit_price DECIMAL(10,2),
modified_date DATETIME DEFAULT GETDATE(),
CONSTRAINT uk_product UNIQUE (product_id)
);
-- ETL Merge (SCD Type 1 - Overwrite)
MERGE INTO dim_product AS target
USING (
SELECT
product_id,
product_name,
category,
subcategory,
unit_price
FROM staging.products
) AS source
ON target.product_id = source.product_id
WHEN MATCHED AND (
target.product_name != source.product_name OR
target.category != source.category OR
target.unit_price != source.unit_price
)
THEN UPDATE SET
target.product_name = source.product_name,
target.category = source.category,
target.subcategory = source.subcategory,
target.unit_price = source.unit_price,
target.modified_date = GETDATE()
WHEN NOT MATCHED BY TARGET
THEN INSERT (product_id, product_name, category, subcategory, unit_price)
VALUES (source.product_id, source.product_name, source.category,
source.subcategory, source.unit_price);维度表(SCD Type 2):
sql
-- Dimension: Customer (SCD Type 2)
CREATE TABLE dim_customer (
customer_key INT IDENTITY(1,1) PRIMARY KEY,
customer_id INT NOT NULL,
customer_name NVARCHAR(100),
email NVARCHAR(100),
city NVARCHAR(50),
state NVARCHAR(50),
effective_date DATETIME NOT NULL,
end_date DATETIME NULL,
is_current BIT DEFAULT 1,
CONSTRAINT uk_customer_current UNIQUE (customer_id, is_current)
);
-- ETL for SCD Type 2
-- Step 1: Expire changed records
UPDATE dim_customer
SET
end_date = GETDATE(),
is_current = 0
WHERE customer_id IN (
SELECT s.customer_id
FROM staging.customers s
INNER JOIN dim_customer d ON s.customer_id = d.customer_id
WHERE d.is_current = 1
AND (s.city != d.city OR s.state != d.state)
);
-- Step 2: Insert new versions
INSERT INTO dim_customer (
customer_id, customer_name, email, city, state,
effective_date, end_date, is_current
)
SELECT
s.customer_id,
s.customer_name,
s.email,
s.city,
s.state,
GETDATE() AS effective_date,
NULL AS end_date,
1 AS is_current
FROM staging.customers s
LEFT JOIN dim_customer d ON s.customer_id = d.customer_id AND d.is_current = 1
WHERE d.customer_key IS NULL
OR s.city != d.city
OR s.state != d.state;事实表:
sql
-- Fact: Sales Transactions
CREATE TABLE fact_sales (
sales_key BIGINT IDENTITY(1,1) PRIMARY KEY,
date_key INT NOT NULL,
customer_key INT NOT NULL,
product_key INT NOT NULL,
store_key INT NOT NULL,
quantity INT NOT NULL,
unit_price DECIMAL(10,2) NOT NULL,
discount_amount DECIMAL(10,2) DEFAULT 0,
tax_amount DECIMAL(10,2) DEFAULT 0,
total_amount DECIMAL(10,2) NOT NULL,
CONSTRAINT fk_date FOREIGN KEY (date_key) REFERENCES dim_date(date_key),
CONSTRAINT fk_customer FOREIGN KEY (customer_key) REFERENCES dim_customer(customer_key),
CONSTRAINT fk_product FOREIGN KEY (product_key) REFERENCES dim_product(product_key),
CONSTRAINT fk_store FOREIGN KEY (store_key) REFERENCES dim_store(store_key)
);
-- Create columnstore index for analytics
CREATE NONCLUSTERED COLUMNSTORE INDEX idx_fact_sales_cs
ON fact_sales (date_key, customer_key, product_key, store_key,
quantity, unit_price, total_amount);
-- ETL Load
INSERT INTO fact_sales (
date_key, customer_key, product_key, store_key,
quantity, unit_price, discount_amount, tax_amount, total_amount
)
SELECT
dd.date_key,
dc.customer_key,
dp.product_key,
ds.store_key,
st.quantity,
st.unit_price,
st.discount_amount,
st.tax_amount,
st.total_amount
FROM staging.transactions st
INNER JOIN dim_date dd ON CAST(st.transaction_date AS DATE) = dd.date
INNER JOIN dim_customer dc ON st.customer_id = dc.customer_id AND dc.is_current = 1
INNER JOIN dim_product dp ON st.product_id = dp.product_id
INNER JOIN dim_store ds ON st.store_id = ds.store_id;4. Incremental Data Loading Pattern
4. 增量数据加载模式
Watermark-Based Incremental Load:
python
undefined基于水印的增量加载:
python
undefinedDatabricks notebook - Incremental load with watermark
Databricks notebook - Incremental load with watermark
from pyspark.sql.functions import col, max as spark_max
from delta.tables import DeltaTable
from pyspark.sql.functions import col, max as spark_max
from delta.tables import DeltaTable
Configuration
Configuration
source_table = "source_database.transactions"
target_path = "/mnt/silver/transactions"
watermark_table = "control.watermark"
watermark_column = "modified_date"
source_table = "source_database.transactions"
target_path = "/mnt/silver/transactions"
watermark_table = "control.watermark"
watermark_column = "modified_date"
Get last watermark
Get last watermark
last_watermark = (spark.table(watermark_table)
.filter(col("table_name") == source_table)
.select("watermark_value")
.first()[0]
)
last_watermark = (spark.table(watermark_table)
.filter(col("table_name") == source_table)
.select("watermark_value")
.first()[0]
)
Read incremental data
Read incremental data
df_incremental = (spark.table(source_table)
.filter(col(watermark_column) > last_watermark)
)
df_incremental = (spark.table(source_table)
.filter(col(watermark_column) > last_watermark)
)
Check if target exists
Check if target exists
if DeltaTable.isDeltaTable(spark, target_path):
# Merge into existing table
target_table = DeltaTable.forPath(spark, target_path)
(target_table.alias("target")
.merge(
df_incremental.alias("source"),
"target.transaction_id = source.transaction_id"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)else:
# Initial load
(df_incremental.write
.format("delta")
.mode("overwrite")
.save(target_path)
)
if DeltaTable.isDeltaTable(spark, target_path):
# Merge into existing table
target_table = DeltaTable.forPath(spark, target_path)
(target_table.alias("target")
.merge(
df_incremental.alias("source"),
"target.transaction_id = source.transaction_id"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)else:
# Initial load
(df_incremental.write
.format("delta")
.mode("overwrite")
.save(target_path)
)
Update watermark
Update watermark
new_watermark = df_incremental.agg(spark_max(watermark_column)).first()[0]
spark.sql(f"""
UPDATE {watermark_table}
SET watermark_value = '{new_watermark}',
last_updated = current_timestamp()
WHERE table_name = '{source_table}'
""")
undefinednew_watermark = df_incremental.agg(spark_max(watermark_column)).first()[0]
spark.sql(f"""
UPDATE {watermark_table}
SET watermark_value = '{new_watermark}',
last_updated = current_timestamp()
WHERE table_name = '{source_table}'
""")
undefined5. Data Quality Framework
5. 数据质量框架
Quality Checks Pattern:
python
from pyspark.sql.functions import col, count, sum as spark_sum, when
class DataQualityChecker:
"""
Data quality validation framework
"""
def __init__(self, dataframe, table_name):
self.df = dataframe
self.table_name = table_name
self.quality_results = []
def check_null_values(self, columns):
"""Check for null values in critical columns"""
for column in columns:
null_count = self.df.filter(col(column).isNull()).count()
total_count = self.df.count()
self.quality_results.append({
"check_type": "null_check",
"column": column,
"null_count": null_count,
"total_count": total_count,
"null_percentage": (null_count / total_count * 100) if total_count > 0 else 0,
"passed": null_count == 0
})
return self
def check_duplicates(self, key_columns):
"""Check for duplicate records"""
duplicate_count = (self.df
.groupBy(key_columns)
.count()
.filter(col("count") > 1)
.count()
)
self.quality_results.append({
"check_type": "duplicate_check",
"key_columns": key_columns,
"duplicate_count": duplicate_count,
"passed": duplicate_count == 0
})
return self
def check_referential_integrity(self, foreign_key, reference_df, reference_key):
"""Check referential integrity"""
missing_references = (self.df
.select(foreign_key)
.distinct()
.join(reference_df.select(reference_key),
col(foreign_key) == col(reference_key),
"left_anti")
.count()
)
self.quality_results.append({
"check_type": "referential_integrity",
"foreign_key": foreign_key,
"missing_references": missing_references,
"passed": missing_references == 0
})
return self
def check_value_range(self, column, min_value=None, max_value=None):
"""Check if values are within expected range"""
out_of_range = self.df.filter(
(col(column) < min_value if min_value is not None else False) |
(col(column) > max_value if max_value is not None else False)
).count()
self.quality_results.append({
"check_type": "range_check",
"column": column,
"min_value": min_value,
"max_value": max_value,
"out_of_range_count": out_of_range,
"passed": out_of_range == 0
})
return self
def get_results(self):
"""Return quality check results"""
return self.quality_results质量校验模式:
python
from pyspark.sql.functions import col, count, sum as spark_sum, when
class DataQualityChecker:
"""
Data quality validation framework
"""
def __init__(self, dataframe, table_name):
self.df = dataframe
self.table_name = table_name
self.quality_results = []
def check_null_values(self, columns):
"""Check for null values in critical columns"""
for column in columns:
null_count = self.df.filter(col(column).isNull()).count()
total_count = self.df.count()
self.quality_results.append({
"check_type": "null_check",
"column": column,
"null_count": null_count,
"total_count": total_count,
"null_percentage": (null_count / total_count * 100) if total_count > 0 else 0,
"passed": null_count == 0
})
return self
def check_duplicates(self, key_columns):
"""Check for duplicate records"""
duplicate_count = (self.df
.groupBy(key_columns)
.count()
.filter(col("count") > 1)
.count()
)
self.quality_results.append({
"check_type": "duplicate_check",
"key_columns": key_columns,
"duplicate_count": duplicate_count,
"passed": duplicate_count == 0
})
return self
def check_referential_integrity(self, foreign_key, reference_df, reference_key):
"""Check referential integrity"""
missing_references = (self.df
.select(foreign_key)
.distinct()
.join(reference_df.select(reference_key),
col(foreign_key) == col(reference_key),
"left_anti")
.count()
)
self.quality_results.append({
"check_type": "referential_integrity",
"foreign_key": foreign_key,
"missing_references": missing_references,
"passed": missing_references == 0
})
return self
def check_value_range(self, column, min_value=None, max_value=None):
"""Check if values are within expected range"""
out_of_range = self.df.filter(
(col(column) < min_value if min_value is not None else False) |
(col(column) > max_value if max_value is not None else False)
).count()
self.quality_results.append({
"check_type": "range_check",
"column": column,
"min_value": min_value,
"max_value": max_value,
"out_of_range_count": out_of_range,
"passed": out_of_range == 0
})
return self
def get_results(self):
"""Return quality check results"""
return self.quality_resultsUsage example
Usage example
df_transactions = spark.read.format("delta").load("/mnt/silver/transactions")
df_customers = spark.read.format("delta").load("/mnt/gold/dim_customer")
quality_checker = DataQualityChecker(df_transactions, "transactions")
results = (quality_checker
.check_null_values(["transaction_id", "customer_id", "transaction_date"])
.check_duplicates(["transaction_id"])
.check_referential_integrity("customer_id", df_customers, "customer_id")
.check_value_range("transaction_amount", min_value=0, max_value=100000)
.get_results()
)
df_transactions = spark.read.format("delta").load("/mnt/silver/transactions")
df_customers = spark.read.format("delta").load("/mnt/gold/dim_customer")
quality_checker = DataQualityChecker(df_transactions, "transactions")
results = (quality_checker
.check_null_values(["transaction_id", "customer_id", "transaction_date"])
.check_duplicates(["transaction_id"])
.check_referential_integrity("customer_id", df_customers, "customer_id")
.check_value_range("transaction_amount", min_value=0, max_value=100000)
.get_results()
)
Log results
Log results
for result in results:
print(f"{result['check_type']}: {'PASSED' if result['passed'] else 'FAILED'}")
undefinedfor result in results:
print(f"{result['check_type']}: {'PASSED' if result['passed'] else 'FAILED'}")
undefinedPower BI Analytics Patterns
Power BI分析模式
DAX Measures for KPIs:
dax
// Total Sales
Total Sales = SUM(fact_sales[total_amount])
// Year-over-Year Growth
Sales YoY Growth =
VAR CurrentYearSales = [Total Sales]
VAR PreviousYearSales =
CALCULATE(
[Total Sales],
DATEADD(dim_date[Date], -1, YEAR)
)
RETURN
DIVIDE(
CurrentYearSales - PreviousYearSales,
PreviousYearSales,
0
)
// Customer Lifetime Value
Customer LTV =
CALCULATE(
[Total Sales],
ALLEXCEPT(dim_customer, dim_customer[customer_id])
)
// Moving Average (3 months)
Sales 3M MA =
CALCULATE(
[Total Sales],
DATESINPERIOD(
dim_date[Date],
LASTDATE(dim_date[Date]),
-3,
MONTH
)
) / 3
// Rank by Sales
Product Sales Rank =
RANKX(
ALL(dim_product[product_name]),
[Total Sales],
,
DESC,
DENSE
)用于KPI的DAX度量值:
dax
// Total Sales
Total Sales = SUM(fact_sales[total_amount])
// Year-over-Year Growth
Sales YoY Growth =
VAR CurrentYearSales = [Total Sales]
VAR PreviousYearSales =
CALCULATE(
[Total Sales],
DATEADD(dim_date[Date], -1, YEAR)
)
RETURN
DIVIDE(
CurrentYearSales - PreviousYearSales,
PreviousYearSales,
0
)
// Customer Lifetime Value
Customer LTV =
CALCULATE(
[Total Sales],
ALLEXCEPT(dim_customer, dim_customer[customer_id])
)
// Moving Average (3 months)
Sales 3M MA =
CALCULATE(
[Total Sales],
DATESINPERIOD(
dim_date[Date],
LASTDATE(dim_date[Date]),
-3,
MONTH
)
) / 3
// Rank by Sales
Product Sales Rank =
RANKX(
ALL(dim_product[product_name]),
[Total Sales],
,
DESC,
DENSE
)Common Architectural Patterns
通用架构模式
Medallion Architecture Best Practices
Medallion架构最佳实践
Bronze Layer:
- Raw data ingestion with minimal transformation
- Add audit columns (ingestion_timestamp, source_file)
- Preserve source schema with schema evolution enabled
- Partition by ingestion date for performance
Silver Layer:
- Data cleansing and standardization
- Deduplication based on business keys
- Data type conversions and validations
- Enforce schema constraints
- Join related datasets
Gold Layer:
- Business-aggregated datasets
- Dimensional models (Star/Snowflake schema)
- Pre-calculated metrics and KPIs
- Optimized for BI tool consumption
Bronze层:
- 原始数据摄取,最小化转换
- 添加审计列(ingestion_timestamp、source_file)
- 启用Schema演进,保留源数据Schema
- 按摄取日期分区以提升性能
Silver层:
- 数据清洗与标准化
- 基于业务键去重
- 数据类型转换与校验
- 强制执行Schema约束
- 关联相关数据集
Gold层:
- 业务聚合数据集
- 维度模型(星型/雪花型Schema)
- 预计算指标与KPI
- 针对BI工具使用优化
Delta Lake Optimization
Delta Lake优化
python
undefinedpython
undefinedOptimize Delta tables
Optimize Delta tables
from delta.tables import DeltaTable
from delta.tables import DeltaTable
Optimize with Z-ordering
Optimize with Z-ordering
deltaTable = DeltaTable.forPath(spark, "/mnt/gold/fact_sales")
deltaTable = DeltaTable.forPath(spark, "/mnt/gold/fact_sales")
Optimize files and Z-order by common filter columns
Optimize files and Z-order by common filter columns
deltaTable.optimize().executeZOrderBy("date_key", "customer_key")
deltaTable.optimize().executeZOrderBy("date_key", "customer_key")
Vacuum old files (retention 168 hours = 7 days)
Vacuum old files (retention 168 hours = 7 days)
deltaTable.vacuum(168)
deltaTable.vacuum(168)
Update table statistics
Update table statistics
spark.sql("ANALYZE TABLE gold.fact_sales COMPUTE STATISTICS FOR ALL COLUMNS")
undefinedspark.sql("ANALYZE TABLE gold.fact_sales COMPUTE STATISTICS FOR ALL COLUMNS")
undefinedUnity Catalog Security
Unity Catalog安全
sql
-- Create catalog and schema
CREATE CATALOG IF NOT EXISTS retail_analytics;
CREATE SCHEMA IF NOT EXISTS retail_analytics.gold;
-- Grant permissions
GRANT USE CATALOG ON CATALOG retail_analytics TO `data_analysts`;
GRANT USE SCHEMA ON SCHEMA retail_analytics.gold TO `data_analysts`;
GRANT SELECT ON TABLE retail_analytics.gold.fact_sales TO `data_analysts`;
-- Row-level security
CREATE FUNCTION retail_analytics.gold.customer_filter(customer_region STRING)
RETURN customer_region = current_user_region();
ALTER TABLE retail_analytics.gold.fact_sales
SET ROW FILTER retail_analytics.gold.customer_filter ON (region);sql
-- Create catalog and schema
CREATE CATALOG IF NOT EXISTS retail_analytics;
CREATE SCHEMA IF NOT EXISTS retail_analytics.gold;
-- Grant permissions
GRANT USE CATALOG ON CATALOG retail_analytics TO `data_analysts`;
GRANT USE SCHEMA ON SCHEMA retail_analytics.gold TO `data_analysts`;
GRANT SELECT ON TABLE retail_analytics.gold.fact_sales TO `data_analysts`;
-- Row-level security
CREATE FUNCTION retail_analytics.gold.customer_filter(customer_region STRING)
RETURN customer_region = current_user_region();
ALTER TABLE retail_analytics.gold.fact_sales
SET ROW FILTER retail_analytics.gold.customer_filter ON (region);Environment Setup
环境配置
Azure Configuration:
bash
undefinedAzure配置:
bash
undefinedSet Azure environment variables
Set Azure environment variables
export AZURE_SUBSCRIPTION_ID=your_subscription_id
export AZURE_RESOURCE_GROUP=rg-data-platform
export AZURE_STORAGE_ACCOUNT=datalakestorage
export AZURE_DATABRICKS_WORKSPACE=databricks-workspace
export AZURE_SUBSCRIPTION_ID=your_subscription_id
export AZURE_RESOURCE_GROUP=rg-data-platform
export AZURE_STORAGE_ACCOUNT=datalakestorage
export AZURE_DATABRICKS_WORKSPACE=databricks-workspace
ADF connection
ADF connection
export ADF_FACTORY_NAME=adf-data-ingestion
export ADF_LINKED_SERVICE_NAME=ls-sqlserver-source
**Databricks Configuration:**
```pythonexport ADF_FACTORY_NAME=adf-data-ingestion
export ADF_LINKED_SERVICE_NAME=ls-sqlserver-source
**Databricks配置:**
```pythonMount ADLS Gen2 in Databricks
Mount ADLS Gen2 in Databricks
configs = {
"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": dbutils.secrets.get(scope="keyvault", key="client-id"),
"fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="keyvault", key="client-secret"),
"fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{dbutils.secrets.get(scope='keyvault', key='tenant-id')}/oauth2/token"
}
dbutils.fs.mount(
source = "abfss://bronze@datalakestorage.dfs.core.windows.net/",
mount_point = "/mnt/bronze",
extra_configs = configs
)
undefinedconfigs = {
"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": dbutils.secrets.get(scope="keyvault", key="client-id"),
"fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="keyvault", key="client-secret"),
"fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{dbutils.secrets.get(scope='keyvault', key='tenant-id')}/oauth2/token"
}
dbutils.fs.mount(
source = "abfss://bronze@datalakestorage.dfs.core.windows.net/",
mount_point = "/mnt/bronze",
extra_configs = configs
)
undefinedTroubleshooting
故障排查
Issue: Delta Lake merge taking too long
python
undefined问题:Delta Lake合并操作耗时过长
python
undefinedSolution: Optimize before merge
Solution: Optimize before merge
from delta.tables import DeltaTable
target_table = DeltaTable.forPath(spark, target_path)
from delta.tables import DeltaTable
target_table = DeltaTable.forPath(spark, target_path)
Compact small files first
Compact small files first
target_table.optimize().executeCompaction()
target_table.optimize().executeCompaction()
Enable auto-optimize and auto-compaction
Enable auto-optimize and auto-compaction
spark.sql(f"""
ALTER TABLE delta.
SET TBLPROPERTIES (
delta.autoOptimize.optimizeWrite = true,
delta.autoOptimize.autoCompact = true
)
""")
{target_path}
**Issue: ADF pipeline timeout**
```json
// Increase timeout in ADF pipeline activity
{
"typeProperties": {
"timeout": "0.12:00:00"
},
"policy": {
"timeout": "7.00:00:00",
"retry": 2,
"retryIntervalInSeconds": 30
}
}Issue: Power BI slow refresh
dax
// Use incremental refresh configuration
// In Power BI Desktop: Table Tools > Incremental Refresh
// Or optimize DAX measures
Optimized Total Sales =
CALCULATE(
SUM(fact_sales[total_amount]),
KEEPFILTERS(dim_date[Date]) // Reduce context transition overhead
)Issue: Schema evolution conflicts
python
undefinedspark.sql(f"""
ALTER TABLE delta.
SET TBLPROPERTIES (
delta.autoOptimize.optimizeWrite = true,
delta.autoOptimize.autoCompact = true
)
""")
{target_path}
**问题:ADF管道超时**
```json
// Increase timeout in ADF pipeline activity
{
"typeProperties": {
"timeout": "0.12:00:00"
},
"policy": {
"timeout": "7.00:00:00",
"retry": 2,
"retryIntervalInSeconds": 30
}
}问题:Power BI刷新缓慢
dax
// Use incremental refresh configuration
// In Power BI Desktop: Table Tools > Incremental Refresh
// Or optimize DAX measures
Optimized Total Sales =
CALCULATE(
SUM(fact_sales[total_amount]),
KEEPFILTERS(dim_date[Date]) // Reduce context transition overhead
)问题:Schema演进冲突
python
undefinedEnable schema merging in Delta writes
Enable schema merging in Delta writes
(df.write
.format("delta")
.mode("append")
.option("mergeSchema", "true")
.save(target_path)
)
(df.write
.format("delta")
.mode("append")
.option("mergeSchema", "true")
.save(target_path)
)
Or explicitly allow schema overwrite
Or explicitly allow schema overwrite
(df.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.save(target_path)
)
undefined(df.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.save(target_path)
)
undefinedReference Architecture
参考架构
This portfolio demonstrates a typical enterprise data platform architecture:
┌─────────────────┐
│ Source Systems │
│ (SQL Server, │
│ APIs, Files) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Azure Data │
│ Factory (ADF) │ ◄──── Metadata-driven ingestion
└────────┬────────┘
│
▼
┌─────────────────┐
│ ADLS Gen2 │
│ Bronze Layer │ ◄──── Raw data landing
└────────┬────────┘
│
▼
┌─────────────────┐
│ Databricks │
│ Silver Layer │ ◄──── Cleansing & transformation
└────────┬────────┘
│
▼
┌─────────────────┐
│ Databricks │
│ Gold Layer │ ◄──── Analytics-ready datasets
└────────┬────────┘
│
├──────────────────┐
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Power BI │ │ Synapse │
│ Reporting │ │ Analytics │
└─────────────────┘ └─────────────────┘This skill provides patterns and code examples for building production-grade data platforms following industry best practices demonstrated across the portfolio projects.
本作品集演示了典型的企业级数据平台架构:
┌─────────────────┐
│ 源系统 │
│ (SQL Server, │
│ APIs, 文件) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Azure Data │
│ Factory (ADF) │ ◄──── 元数据驱动的摄取
└────────┬────────┘
│
▼
┌─────────────────┐
│ ADLS Gen2 │
│ Bronze层 │ ◄──── 原始数据落地
└────────┬────────┘
│
▼
┌─────────────────┐
│ Databricks │
│ Silver层 │ ◄──── 数据清洗与转换
└────────┬────────┘
│
▼
┌─────────────────┐
│ Databricks │
│ Gold层 │ ◄──── 分析就绪数据集
└────────┬────────┘
│
├──────────────────┐
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Power BI │ │ Synapse │
│ 报表分析 │ │ 分析服务 │
└─────────────────┘ └─────────────────┘本技能提供了遵循行业最佳实践的生产级数据平台构建模式与代码示例,这些内容已在作品集项目中逐一演示。