amee-joshi-data-engineering-portfolio

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Amee Joshi Data Engineering Portfolio

Amee Joshi 数据工程作品集

Skill by ara.so — Data Skills collection.
This portfolio showcases production-grade data engineering patterns and architectures for building scalable, cloud-native data platforms. It demonstrates end-to-end solutions covering data ingestion, transformation, modeling, and analytics using Azure services, Databricks, SQL Server, and BI tools.
ara.so提供的技能——数据技能合集。
本作品集展示了用于构建可扩展云原生数据平台的生产级数据工程模式与架构,演示了使用Azure服务、Databricks、SQL Server及BI工具实现的从数据摄取、转换、建模到分析的端到端解决方案。

What This Portfolio Demonstrates

本作品集展示内容

This is a reference collection showing:
  • Medallion Architecture (Bronze-Silver-Gold) implementations
  • Azure cloud data platforms (ADF, ADLS Gen2, Databricks, Synapse Analytics)
  • Data lakehouse patterns with Delta Lake
  • Dimensional modeling (Star Schema, SCD Type 1 & 2)
  • Metadata-driven ingestion frameworks
  • Analytics-ready datasets for BI consumption
  • ETL/ELT pipeline design with incremental loading
  • Power BI and Tableau reporting solutions
这是一份参考合集,涵盖:
  • Medallion架构(Bronze-Silver-Gold) 实现方案
  • Azure云数据平台(ADF、ADLS Gen2、Databricks、Synapse Analytics)
  • 基于Delta Lake的数据湖仓模式
  • 维度建模(星型模型、SCD Type 1 & 2)
  • 元数据驱动的摄取框架
  • 供BI工具使用的分析就绪数据集
  • 支持增量加载的ETL/ELT管道设计
  • Power BI和Tableau 报表解决方案

Key Portfolio Projects

核心作品集项目

1. Azure Databricks Retail Lakehouse

1. Azure Databricks 零售湖仓

Repository:
azure-databricks-end-to-end-retail-lakehouse
Pattern: Enterprise Medallion Architecture with Delta Lake
Architecture:
Bronze (Raw) → Silver (Cleansed) → Gold (Analytics-Ready)
Key Implementation Concepts:
python
undefined
代码仓库:
azure-databricks-end-to-end-retail-lakehouse
模式: 基于Delta Lake的企业级Medallion架构
架构:
Bronze (原始层) → Silver (清洗层) → Gold (分析就绪层)
核心实现概念:
python
undefined

Bronze Layer - Raw Ingestion

Bronze Layer - Raw Ingestion

from pyspark.sql import SparkSession from delta.tables import DeltaTable
from pyspark.sql import SparkSession from delta.tables import DeltaTable

Ingest raw data with metadata

Ingest raw data with metadata

df_raw = (spark.read .format("parquet") .load(f"{bronze_path}/source_data/") .withColumn("ingestion_timestamp", current_timestamp()) .withColumn("source_file", input_file_name()) )
df_raw = (spark.read .format("parquet") .load(f"{bronze_path}/source_data/") .withColumn("ingestion_timestamp", current_timestamp()) .withColumn("source_file", input_file_name()) )

Write to Bronze Delta table

Write to Bronze Delta table

(df_raw.write .format("delta") .mode("append") .option("mergeSchema", "true") .save(f"{bronze_path}/retail_transactions") )

```python
(df_raw.write .format("delta") .mode("append") .option("mergeSchema", "true") .save(f"{bronze_path}/retail_transactions") )

```python

Silver Layer - Data Quality & Transformation

Silver Layer - Data Quality & Transformation

from pyspark.sql.functions import col, when, trim, upper
from pyspark.sql.functions import col, when, trim, upper

Cleanse and standardize

Cleanse and standardize

df_silver = (df_bronze .filter(col("transaction_id").isNotNull()) .withColumn("customer_name", trim(upper(col("customer_name")))) .withColumn("transaction_amount", when(col("transaction_amount") < 0, 0) .otherwise(col("transaction_amount"))) .dropDuplicates(["transaction_id"]) .select("transaction_id", "customer_id", "product_id", "transaction_amount", "transaction_date") )
df_silver = (df_bronze .filter(col("transaction_id").isNotNull()) .withColumn("customer_name", trim(upper(col("customer_name")))) .withColumn("transaction_amount", when(col("transaction_amount") < 0, 0) .otherwise(col("transaction_amount"))) .dropDuplicates(["transaction_id"]) .select("transaction_id", "customer_id", "product_id", "transaction_amount", "transaction_date") )

Write with schema enforcement

Write with schema enforcement

(df_silver.write .format("delta") .mode("overwrite") .option("overwriteSchema", "false") .save(f"{silver_path}/transactions") )

```python
(df_silver.write .format("delta") .mode("overwrite") .option("overwriteSchema", "false") .save(f"{silver_path}/transactions") )

```python

Gold Layer - SCD Type 2 Dimension

Gold Layer - SCD Type 2 Dimension

def apply_scd_type2(target_table, source_df, key_columns, scd_columns): """ Implements Slowly Changing Dimension Type 2 """ from delta.tables import DeltaTable from pyspark.sql.functions import lit, current_timestamp
# Prepare source with SCD metadata
source_prepared = (source_df
    .withColumn("effective_date", current_timestamp())
    .withColumn("end_date", lit(None).cast("timestamp"))
    .withColumn("is_current", lit(True))
)

# Read existing target
target_delta = DeltaTable.forPath(spark, target_table)

# Identify changes
merge_condition = " AND ".join([f"target.{k} = source.{k}" for k in key_columns])

# Perform SCD Type 2 merge
(target_delta.alias("target")
    .merge(source_prepared.alias("source"), merge_condition)
    .whenMatchedUpdate(
        condition = "target.is_current = true AND " + 
                   " OR ".join([f"target.{c} != source.{c}" for c in scd_columns]),
        set = {
            "is_current": "false",
            "end_date": "current_timestamp()"
        }
    )
    .whenNotMatchedInsertAll()
    .execute()
)
undefined
def apply_scd_type2(target_table, source_df, key_columns, scd_columns): """ Implements Slowly Changing Dimension Type 2 """ from delta.tables import DeltaTable from pyspark.sql.functions import lit, current_timestamp
# Prepare source with SCD metadata
source_prepared = (source_df
    .withColumn("effective_date", current_timestamp())
    .withColumn("end_date", lit(None).cast("timestamp"))
    .withColumn("is_current", lit(True))
)

# Read existing target
target_delta = DeltaTable.forPath(spark, target_table)

# Identify changes
merge_condition = " AND ".join([f"target.{k} = source.{k}" for k in key_columns])

# Perform SCD Type 2 merge
(target_delta.alias("target")
    .merge(source_prepared.alias("source"), merge_condition)
    .whenMatchedUpdate(
        condition = "target.is_current = true AND " + 
                   " OR ".join([f"target.{c} != source.{c}" for c in scd_columns]),
        set = {
            "is_current": "false",
            "end_date": "current_timestamp()"
        }
    )
    .whenNotMatchedInsertAll()
    .execute()
)
undefined

2. Metadata-Driven Ingestion Framework

2. 元数据驱动的摄取框架

Pattern: Dynamic, configuration-based pipeline generation
Configuration Schema:
json
{
  "pipeline_config": {
    "source_system": "SQL_SERVER",
    "target_layer": "bronze",
    "ingestion_type": "incremental",
    "watermark_column": "modified_date",
    "tables": [
      {
        "schema_name": "sales",
        "table_name": "orders",
        "partition_column": "order_date",
        "primary_key": ["order_id"],
        "target_path": "/bronze/sales/orders"
      }
    ]
  }
}
Azure Data Factory Pattern:
python
undefined
模式: 基于配置的动态管道生成
配置Schema:
json
{
  "pipeline_config": {
    "source_system": "SQL_SERVER",
    "target_layer": "bronze",
    "ingestion_type": "incremental",
    "watermark_column": "modified_date",
    "tables": [
      {
        "schema_name": "sales",
        "table_name": "orders",
        "partition_column": "order_date",
        "primary_key": ["order_id"],
        "target_path": "/bronze/sales/orders"
      }
    ]
  }
}
Azure Data Factory模式:
python
undefined

Dynamic pipeline parameter processing

Dynamic pipeline parameter processing

This represents the logic implemented in ADF

This represents the logic implemented in ADF

def generate_copy_activity(table_config): """ Generates ADF copy activity from metadata """ return { "name": f"Copy_{table_config['table_name']}", "type": "Copy", "inputs": [{ "referenceName": "SourceDataset", "type": "DatasetReference", "parameters": { "schemaName": table_config['schema_name'], "tableName": table_config['table_name'] } }], "outputs": [{ "referenceName": "SinkDataset", "type": "DatasetReference", "parameters": { "targetPath": table_config['target_path'] } }], "typeProperties": { "source": { "type": "SqlServerSource", "sqlReaderQuery": f""" SELECT * FROM {table_config['schema_name']}.{table_config['table_name']} WHERE {table_config['watermark_column']} > '@{{pipeline().parameters.watermarkValue}}' """ }, "sink": { "type": "ParquetSink", "storeSettings": { "type": "AzureBlobFSWriteSettings", "copyBehavior": "PreserveHierarchy" } } } }
undefined
def generate_copy_activity(table_config): """ Generates ADF copy activity from metadata """ return { "name": f"Copy_{table_config['table_name']}", "type": "Copy", "inputs": [{ "referenceName": "SourceDataset", "type": "DatasetReference", "parameters": { "schemaName": table_config['schema_name'], "tableName": table_config['table_name'] } }], "outputs": [{ "referenceName": "SinkDataset", "type": "DatasetReference", "parameters": { "targetPath": table_config['target_path'] } }], "typeProperties": { "source": { "type": "SqlServerSource", "sqlReaderQuery": f""" SELECT * FROM {table_config['schema_name']}.{table_config['table_name']} WHERE {table_config['watermark_column']} > '@{{pipeline().parameters.watermarkValue}}' """ }, "sink": { "type": "ParquetSink", "storeSettings": { "type": "AzureBlobFSWriteSettings", "copyBehavior": "PreserveHierarchy" } } } }
undefined

3. Star Schema Data Warehouse

3. 星型模型数据仓库

Pattern: Dimensional Modeling with SQL Server
Dimension Table (SCD Type 1):
sql
-- Dimension: Product (SCD Type 1)
CREATE TABLE dim_product (
    product_key INT IDENTITY(1,1) PRIMARY KEY,
    product_id INT NOT NULL,
    product_name NVARCHAR(100),
    category NVARCHAR(50),
    subcategory NVARCHAR(50),
    unit_price DECIMAL(10,2),
    modified_date DATETIME DEFAULT GETDATE(),
    CONSTRAINT uk_product UNIQUE (product_id)
);

-- ETL Merge (SCD Type 1 - Overwrite)
MERGE INTO dim_product AS target
USING (
    SELECT 
        product_id,
        product_name,
        category,
        subcategory,
        unit_price
    FROM staging.products
) AS source
ON target.product_id = source.product_id
WHEN MATCHED AND (
    target.product_name != source.product_name OR
    target.category != source.category OR
    target.unit_price != source.unit_price
)
THEN UPDATE SET
    target.product_name = source.product_name,
    target.category = source.category,
    target.subcategory = source.subcategory,
    target.unit_price = source.unit_price,
    target.modified_date = GETDATE()
WHEN NOT MATCHED BY TARGET
THEN INSERT (product_id, product_name, category, subcategory, unit_price)
VALUES (source.product_id, source.product_name, source.category, 
        source.subcategory, source.unit_price);
Dimension Table (SCD Type 2):
sql
-- Dimension: Customer (SCD Type 2)
CREATE TABLE dim_customer (
    customer_key INT IDENTITY(1,1) PRIMARY KEY,
    customer_id INT NOT NULL,
    customer_name NVARCHAR(100),
    email NVARCHAR(100),
    city NVARCHAR(50),
    state NVARCHAR(50),
    effective_date DATETIME NOT NULL,
    end_date DATETIME NULL,
    is_current BIT DEFAULT 1,
    CONSTRAINT uk_customer_current UNIQUE (customer_id, is_current)
);

-- ETL for SCD Type 2
-- Step 1: Expire changed records
UPDATE dim_customer
SET 
    end_date = GETDATE(),
    is_current = 0
WHERE customer_id IN (
    SELECT s.customer_id
    FROM staging.customers s
    INNER JOIN dim_customer d ON s.customer_id = d.customer_id
    WHERE d.is_current = 1
    AND (s.city != d.city OR s.state != d.state)
);

-- Step 2: Insert new versions
INSERT INTO dim_customer (
    customer_id, customer_name, email, city, state, 
    effective_date, end_date, is_current
)
SELECT 
    s.customer_id,
    s.customer_name,
    s.email,
    s.city,
    s.state,
    GETDATE() AS effective_date,
    NULL AS end_date,
    1 AS is_current
FROM staging.customers s
LEFT JOIN dim_customer d ON s.customer_id = d.customer_id AND d.is_current = 1
WHERE d.customer_key IS NULL
   OR s.city != d.city
   OR s.state != d.state;
Fact Table:
sql
-- Fact: Sales Transactions
CREATE TABLE fact_sales (
    sales_key BIGINT IDENTITY(1,1) PRIMARY KEY,
    date_key INT NOT NULL,
    customer_key INT NOT NULL,
    product_key INT NOT NULL,
    store_key INT NOT NULL,
    quantity INT NOT NULL,
    unit_price DECIMAL(10,2) NOT NULL,
    discount_amount DECIMAL(10,2) DEFAULT 0,
    tax_amount DECIMAL(10,2) DEFAULT 0,
    total_amount DECIMAL(10,2) NOT NULL,
    CONSTRAINT fk_date FOREIGN KEY (date_key) REFERENCES dim_date(date_key),
    CONSTRAINT fk_customer FOREIGN KEY (customer_key) REFERENCES dim_customer(customer_key),
    CONSTRAINT fk_product FOREIGN KEY (product_key) REFERENCES dim_product(product_key),
    CONSTRAINT fk_store FOREIGN KEY (store_key) REFERENCES dim_store(store_key)
);

-- Create columnstore index for analytics
CREATE NONCLUSTERED COLUMNSTORE INDEX idx_fact_sales_cs
ON fact_sales (date_key, customer_key, product_key, store_key, 
               quantity, unit_price, total_amount);

-- ETL Load
INSERT INTO fact_sales (
    date_key, customer_key, product_key, store_key,
    quantity, unit_price, discount_amount, tax_amount, total_amount
)
SELECT 
    dd.date_key,
    dc.customer_key,
    dp.product_key,
    ds.store_key,
    st.quantity,
    st.unit_price,
    st.discount_amount,
    st.tax_amount,
    st.total_amount
FROM staging.transactions st
INNER JOIN dim_date dd ON CAST(st.transaction_date AS DATE) = dd.date
INNER JOIN dim_customer dc ON st.customer_id = dc.customer_id AND dc.is_current = 1
INNER JOIN dim_product dp ON st.product_id = dp.product_id
INNER JOIN dim_store ds ON st.store_id = ds.store_id;
模式: 基于SQL Server的维度建模
维度表(SCD Type 1):
sql
-- Dimension: Product (SCD Type 1)
CREATE TABLE dim_product (
    product_key INT IDENTITY(1,1) PRIMARY KEY,
    product_id INT NOT NULL,
    product_name NVARCHAR(100),
    category NVARCHAR(50),
    subcategory NVARCHAR(50),
    unit_price DECIMAL(10,2),
    modified_date DATETIME DEFAULT GETDATE(),
    CONSTRAINT uk_product UNIQUE (product_id)
);

-- ETL Merge (SCD Type 1 - Overwrite)
MERGE INTO dim_product AS target
USING (
    SELECT 
        product_id,
        product_name,
        category,
        subcategory,
        unit_price
    FROM staging.products
) AS source
ON target.product_id = source.product_id
WHEN MATCHED AND (
    target.product_name != source.product_name OR
    target.category != source.category OR
    target.unit_price != source.unit_price
)
THEN UPDATE SET
    target.product_name = source.product_name,
    target.category = source.category,
    target.subcategory = source.subcategory,
    target.unit_price = source.unit_price,
    target.modified_date = GETDATE()
WHEN NOT MATCHED BY TARGET
THEN INSERT (product_id, product_name, category, subcategory, unit_price)
VALUES (source.product_id, source.product_name, source.category, 
        source.subcategory, source.unit_price);
维度表(SCD Type 2):
sql
-- Dimension: Customer (SCD Type 2)
CREATE TABLE dim_customer (
    customer_key INT IDENTITY(1,1) PRIMARY KEY,
    customer_id INT NOT NULL,
    customer_name NVARCHAR(100),
    email NVARCHAR(100),
    city NVARCHAR(50),
    state NVARCHAR(50),
    effective_date DATETIME NOT NULL,
    end_date DATETIME NULL,
    is_current BIT DEFAULT 1,
    CONSTRAINT uk_customer_current UNIQUE (customer_id, is_current)
);

-- ETL for SCD Type 2
-- Step 1: Expire changed records
UPDATE dim_customer
SET 
    end_date = GETDATE(),
    is_current = 0
WHERE customer_id IN (
    SELECT s.customer_id
    FROM staging.customers s
    INNER JOIN dim_customer d ON s.customer_id = d.customer_id
    WHERE d.is_current = 1
    AND (s.city != d.city OR s.state != d.state)
);

-- Step 2: Insert new versions
INSERT INTO dim_customer (
    customer_id, customer_name, email, city, state, 
    effective_date, end_date, is_current
)
SELECT 
    s.customer_id,
    s.customer_name,
    s.email,
    s.city,
    s.state,
    GETDATE() AS effective_date,
    NULL AS end_date,
    1 AS is_current
FROM staging.customers s
LEFT JOIN dim_customer d ON s.customer_id = d.customer_id AND d.is_current = 1
WHERE d.customer_key IS NULL
   OR s.city != d.city
   OR s.state != d.state;
事实表:
sql
-- Fact: Sales Transactions
CREATE TABLE fact_sales (
    sales_key BIGINT IDENTITY(1,1) PRIMARY KEY,
    date_key INT NOT NULL,
    customer_key INT NOT NULL,
    product_key INT NOT NULL,
    store_key INT NOT NULL,
    quantity INT NOT NULL,
    unit_price DECIMAL(10,2) NOT NULL,
    discount_amount DECIMAL(10,2) DEFAULT 0,
    tax_amount DECIMAL(10,2) DEFAULT 0,
    total_amount DECIMAL(10,2) NOT NULL,
    CONSTRAINT fk_date FOREIGN KEY (date_key) REFERENCES dim_date(date_key),
    CONSTRAINT fk_customer FOREIGN KEY (customer_key) REFERENCES dim_customer(customer_key),
    CONSTRAINT fk_product FOREIGN KEY (product_key) REFERENCES dim_product(product_key),
    CONSTRAINT fk_store FOREIGN KEY (store_key) REFERENCES dim_store(store_key)
);

-- Create columnstore index for analytics
CREATE NONCLUSTERED COLUMNSTORE INDEX idx_fact_sales_cs
ON fact_sales (date_key, customer_key, product_key, store_key, 
               quantity, unit_price, total_amount);

-- ETL Load
INSERT INTO fact_sales (
    date_key, customer_key, product_key, store_key,
    quantity, unit_price, discount_amount, tax_amount, total_amount
)
SELECT 
    dd.date_key,
    dc.customer_key,
    dp.product_key,
    ds.store_key,
    st.quantity,
    st.unit_price,
    st.discount_amount,
    st.tax_amount,
    st.total_amount
FROM staging.transactions st
INNER JOIN dim_date dd ON CAST(st.transaction_date AS DATE) = dd.date
INNER JOIN dim_customer dc ON st.customer_id = dc.customer_id AND dc.is_current = 1
INNER JOIN dim_product dp ON st.product_id = dp.product_id
INNER JOIN dim_store ds ON st.store_id = ds.store_id;

4. Incremental Data Loading Pattern

4. 增量数据加载模式

Watermark-Based Incremental Load:
python
undefined
基于水印的增量加载:
python
undefined

Databricks notebook - Incremental load with watermark

Databricks notebook - Incremental load with watermark

from pyspark.sql.functions import col, max as spark_max from delta.tables import DeltaTable
from pyspark.sql.functions import col, max as spark_max from delta.tables import DeltaTable

Configuration

Configuration

source_table = "source_database.transactions" target_path = "/mnt/silver/transactions" watermark_table = "control.watermark" watermark_column = "modified_date"
source_table = "source_database.transactions" target_path = "/mnt/silver/transactions" watermark_table = "control.watermark" watermark_column = "modified_date"

Get last watermark

Get last watermark

last_watermark = (spark.table(watermark_table) .filter(col("table_name") == source_table) .select("watermark_value") .first()[0] )
last_watermark = (spark.table(watermark_table) .filter(col("table_name") == source_table) .select("watermark_value") .first()[0] )

Read incremental data

Read incremental data

df_incremental = (spark.table(source_table) .filter(col(watermark_column) > last_watermark) )
df_incremental = (spark.table(source_table) .filter(col(watermark_column) > last_watermark) )

Check if target exists

Check if target exists

if DeltaTable.isDeltaTable(spark, target_path): # Merge into existing table target_table = DeltaTable.forPath(spark, target_path)
(target_table.alias("target")
    .merge(
        df_incremental.alias("source"),
        "target.transaction_id = source.transaction_id"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)
else: # Initial load (df_incremental.write .format("delta") .mode("overwrite") .save(target_path) )
if DeltaTable.isDeltaTable(spark, target_path): # Merge into existing table target_table = DeltaTable.forPath(spark, target_path)
(target_table.alias("target")
    .merge(
        df_incremental.alias("source"),
        "target.transaction_id = source.transaction_id"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)
else: # Initial load (df_incremental.write .format("delta") .mode("overwrite") .save(target_path) )

Update watermark

Update watermark

new_watermark = df_incremental.agg(spark_max(watermark_column)).first()[0]
spark.sql(f""" UPDATE {watermark_table} SET watermark_value = '{new_watermark}', last_updated = current_timestamp() WHERE table_name = '{source_table}' """)
undefined
new_watermark = df_incremental.agg(spark_max(watermark_column)).first()[0]
spark.sql(f""" UPDATE {watermark_table} SET watermark_value = '{new_watermark}', last_updated = current_timestamp() WHERE table_name = '{source_table}' """)
undefined

5. Data Quality Framework

5. 数据质量框架

Quality Checks Pattern:
python
from pyspark.sql.functions import col, count, sum as spark_sum, when

class DataQualityChecker:
    """
    Data quality validation framework
    """
    
    def __init__(self, dataframe, table_name):
        self.df = dataframe
        self.table_name = table_name
        self.quality_results = []
    
    def check_null_values(self, columns):
        """Check for null values in critical columns"""
        for column in columns:
            null_count = self.df.filter(col(column).isNull()).count()
            total_count = self.df.count()
            
            self.quality_results.append({
                "check_type": "null_check",
                "column": column,
                "null_count": null_count,
                "total_count": total_count,
                "null_percentage": (null_count / total_count * 100) if total_count > 0 else 0,
                "passed": null_count == 0
            })
        return self
    
    def check_duplicates(self, key_columns):
        """Check for duplicate records"""
        duplicate_count = (self.df
            .groupBy(key_columns)
            .count()
            .filter(col("count") > 1)
            .count()
        )
        
        self.quality_results.append({
            "check_type": "duplicate_check",
            "key_columns": key_columns,
            "duplicate_count": duplicate_count,
            "passed": duplicate_count == 0
        })
        return self
    
    def check_referential_integrity(self, foreign_key, reference_df, reference_key):
        """Check referential integrity"""
        missing_references = (self.df
            .select(foreign_key)
            .distinct()
            .join(reference_df.select(reference_key), 
                  col(foreign_key) == col(reference_key), 
                  "left_anti")
            .count()
        )
        
        self.quality_results.append({
            "check_type": "referential_integrity",
            "foreign_key": foreign_key,
            "missing_references": missing_references,
            "passed": missing_references == 0
        })
        return self
    
    def check_value_range(self, column, min_value=None, max_value=None):
        """Check if values are within expected range"""
        out_of_range = self.df.filter(
            (col(column) < min_value if min_value is not None else False) |
            (col(column) > max_value if max_value is not None else False)
        ).count()
        
        self.quality_results.append({
            "check_type": "range_check",
            "column": column,
            "min_value": min_value,
            "max_value": max_value,
            "out_of_range_count": out_of_range,
            "passed": out_of_range == 0
        })
        return self
    
    def get_results(self):
        """Return quality check results"""
        return self.quality_results
质量校验模式:
python
from pyspark.sql.functions import col, count, sum as spark_sum, when

class DataQualityChecker:
    """
    Data quality validation framework
    """
    
    def __init__(self, dataframe, table_name):
        self.df = dataframe
        self.table_name = table_name
        self.quality_results = []
    
    def check_null_values(self, columns):
        """Check for null values in critical columns"""
        for column in columns:
            null_count = self.df.filter(col(column).isNull()).count()
            total_count = self.df.count()
            
            self.quality_results.append({
                "check_type": "null_check",
                "column": column,
                "null_count": null_count,
                "total_count": total_count,
                "null_percentage": (null_count / total_count * 100) if total_count > 0 else 0,
                "passed": null_count == 0
            })
        return self
    
    def check_duplicates(self, key_columns):
        """Check for duplicate records"""
        duplicate_count = (self.df
            .groupBy(key_columns)
            .count()
            .filter(col("count") > 1)
            .count()
        )
        
        self.quality_results.append({
            "check_type": "duplicate_check",
            "key_columns": key_columns,
            "duplicate_count": duplicate_count,
            "passed": duplicate_count == 0
        })
        return self
    
    def check_referential_integrity(self, foreign_key, reference_df, reference_key):
        """Check referential integrity"""
        missing_references = (self.df
            .select(foreign_key)
            .distinct()
            .join(reference_df.select(reference_key), 
                  col(foreign_key) == col(reference_key), 
                  "left_anti")
            .count()
        )
        
        self.quality_results.append({
            "check_type": "referential_integrity",
            "foreign_key": foreign_key,
            "missing_references": missing_references,
            "passed": missing_references == 0
        })
        return self
    
    def check_value_range(self, column, min_value=None, max_value=None):
        """Check if values are within expected range"""
        out_of_range = self.df.filter(
            (col(column) < min_value if min_value is not None else False) |
            (col(column) > max_value if max_value is not None else False)
        ).count()
        
        self.quality_results.append({
            "check_type": "range_check",
            "column": column,
            "min_value": min_value,
            "max_value": max_value,
            "out_of_range_count": out_of_range,
            "passed": out_of_range == 0
        })
        return self
    
    def get_results(self):
        """Return quality check results"""
        return self.quality_results

Usage example

Usage example

df_transactions = spark.read.format("delta").load("/mnt/silver/transactions") df_customers = spark.read.format("delta").load("/mnt/gold/dim_customer")
quality_checker = DataQualityChecker(df_transactions, "transactions")
results = (quality_checker .check_null_values(["transaction_id", "customer_id", "transaction_date"]) .check_duplicates(["transaction_id"]) .check_referential_integrity("customer_id", df_customers, "customer_id") .check_value_range("transaction_amount", min_value=0, max_value=100000) .get_results() )
df_transactions = spark.read.format("delta").load("/mnt/silver/transactions") df_customers = spark.read.format("delta").load("/mnt/gold/dim_customer")
quality_checker = DataQualityChecker(df_transactions, "transactions")
results = (quality_checker .check_null_values(["transaction_id", "customer_id", "transaction_date"]) .check_duplicates(["transaction_id"]) .check_referential_integrity("customer_id", df_customers, "customer_id") .check_value_range("transaction_amount", min_value=0, max_value=100000) .get_results() )

Log results

Log results

for result in results: print(f"{result['check_type']}: {'PASSED' if result['passed'] else 'FAILED'}")
undefined
for result in results: print(f"{result['check_type']}: {'PASSED' if result['passed'] else 'FAILED'}")
undefined

Power BI Analytics Patterns

Power BI分析模式

DAX Measures for KPIs:
dax
// Total Sales
Total Sales = SUM(fact_sales[total_amount])

// Year-over-Year Growth
Sales YoY Growth = 
VAR CurrentYearSales = [Total Sales]
VAR PreviousYearSales = 
    CALCULATE(
        [Total Sales],
        DATEADD(dim_date[Date], -1, YEAR)
    )
RETURN
    DIVIDE(
        CurrentYearSales - PreviousYearSales,
        PreviousYearSales,
        0
    )

// Customer Lifetime Value
Customer LTV = 
CALCULATE(
    [Total Sales],
    ALLEXCEPT(dim_customer, dim_customer[customer_id])
)

// Moving Average (3 months)
Sales 3M MA = 
CALCULATE(
    [Total Sales],
    DATESINPERIOD(
        dim_date[Date],
        LASTDATE(dim_date[Date]),
        -3,
        MONTH
    )
) / 3

// Rank by Sales
Product Sales Rank = 
RANKX(
    ALL(dim_product[product_name]),
    [Total Sales],
    ,
    DESC,
    DENSE
)
用于KPI的DAX度量值:
dax
// Total Sales
Total Sales = SUM(fact_sales[total_amount])

// Year-over-Year Growth
Sales YoY Growth = 
VAR CurrentYearSales = [Total Sales]
VAR PreviousYearSales = 
    CALCULATE(
        [Total Sales],
        DATEADD(dim_date[Date], -1, YEAR)
    )
RETURN
    DIVIDE(
        CurrentYearSales - PreviousYearSales,
        PreviousYearSales,
        0
    )

// Customer Lifetime Value
Customer LTV = 
CALCULATE(
    [Total Sales],
    ALLEXCEPT(dim_customer, dim_customer[customer_id])
)

// Moving Average (3 months)
Sales 3M MA = 
CALCULATE(
    [Total Sales],
    DATESINPERIOD(
        dim_date[Date],
        LASTDATE(dim_date[Date]),
        -3,
        MONTH
    )
) / 3

// Rank by Sales
Product Sales Rank = 
RANKX(
    ALL(dim_product[product_name]),
    [Total Sales],
    ,
    DESC,
    DENSE
)

Common Architectural Patterns

通用架构模式

Medallion Architecture Best Practices

Medallion架构最佳实践

Bronze Layer:
  • Raw data ingestion with minimal transformation
  • Add audit columns (ingestion_timestamp, source_file)
  • Preserve source schema with schema evolution enabled
  • Partition by ingestion date for performance
Silver Layer:
  • Data cleansing and standardization
  • Deduplication based on business keys
  • Data type conversions and validations
  • Enforce schema constraints
  • Join related datasets
Gold Layer:
  • Business-aggregated datasets
  • Dimensional models (Star/Snowflake schema)
  • Pre-calculated metrics and KPIs
  • Optimized for BI tool consumption
Bronze层:
  • 原始数据摄取,最小化转换
  • 添加审计列(ingestion_timestamp、source_file)
  • 启用Schema演进,保留源数据Schema
  • 按摄取日期分区以提升性能
Silver层:
  • 数据清洗与标准化
  • 基于业务键去重
  • 数据类型转换与校验
  • 强制执行Schema约束
  • 关联相关数据集
Gold层:
  • 业务聚合数据集
  • 维度模型(星型/雪花型Schema)
  • 预计算指标与KPI
  • 针对BI工具使用优化

Delta Lake Optimization

Delta Lake优化

python
undefined
python
undefined

Optimize Delta tables

Optimize Delta tables

from delta.tables import DeltaTable
from delta.tables import DeltaTable

Optimize with Z-ordering

Optimize with Z-ordering

deltaTable = DeltaTable.forPath(spark, "/mnt/gold/fact_sales")
deltaTable = DeltaTable.forPath(spark, "/mnt/gold/fact_sales")

Optimize files and Z-order by common filter columns

Optimize files and Z-order by common filter columns

deltaTable.optimize().executeZOrderBy("date_key", "customer_key")
deltaTable.optimize().executeZOrderBy("date_key", "customer_key")

Vacuum old files (retention 168 hours = 7 days)

Vacuum old files (retention 168 hours = 7 days)

deltaTable.vacuum(168)
deltaTable.vacuum(168)

Update table statistics

Update table statistics

spark.sql("ANALYZE TABLE gold.fact_sales COMPUTE STATISTICS FOR ALL COLUMNS")
undefined
spark.sql("ANALYZE TABLE gold.fact_sales COMPUTE STATISTICS FOR ALL COLUMNS")
undefined

Unity Catalog Security

Unity Catalog安全

sql
-- Create catalog and schema
CREATE CATALOG IF NOT EXISTS retail_analytics;
CREATE SCHEMA IF NOT EXISTS retail_analytics.gold;

-- Grant permissions
GRANT USE CATALOG ON CATALOG retail_analytics TO `data_analysts`;
GRANT USE SCHEMA ON SCHEMA retail_analytics.gold TO `data_analysts`;
GRANT SELECT ON TABLE retail_analytics.gold.fact_sales TO `data_analysts`;

-- Row-level security
CREATE FUNCTION retail_analytics.gold.customer_filter(customer_region STRING)
RETURN customer_region = current_user_region();

ALTER TABLE retail_analytics.gold.fact_sales 
SET ROW FILTER retail_analytics.gold.customer_filter ON (region);
sql
-- Create catalog and schema
CREATE CATALOG IF NOT EXISTS retail_analytics;
CREATE SCHEMA IF NOT EXISTS retail_analytics.gold;

-- Grant permissions
GRANT USE CATALOG ON CATALOG retail_analytics TO `data_analysts`;
GRANT USE SCHEMA ON SCHEMA retail_analytics.gold TO `data_analysts`;
GRANT SELECT ON TABLE retail_analytics.gold.fact_sales TO `data_analysts`;

-- Row-level security
CREATE FUNCTION retail_analytics.gold.customer_filter(customer_region STRING)
RETURN customer_region = current_user_region();

ALTER TABLE retail_analytics.gold.fact_sales 
SET ROW FILTER retail_analytics.gold.customer_filter ON (region);

Environment Setup

环境配置

Azure Configuration:
bash
undefined
Azure配置:
bash
undefined

Set Azure environment variables

Set Azure environment variables

export AZURE_SUBSCRIPTION_ID=your_subscription_id export AZURE_RESOURCE_GROUP=rg-data-platform export AZURE_STORAGE_ACCOUNT=datalakestorage export AZURE_DATABRICKS_WORKSPACE=databricks-workspace
export AZURE_SUBSCRIPTION_ID=your_subscription_id export AZURE_RESOURCE_GROUP=rg-data-platform export AZURE_STORAGE_ACCOUNT=datalakestorage export AZURE_DATABRICKS_WORKSPACE=databricks-workspace

ADF connection

ADF connection

export ADF_FACTORY_NAME=adf-data-ingestion export ADF_LINKED_SERVICE_NAME=ls-sqlserver-source

**Databricks Configuration:**

```python
export ADF_FACTORY_NAME=adf-data-ingestion export ADF_LINKED_SERVICE_NAME=ls-sqlserver-source

**Databricks配置:**

```python

Mount ADLS Gen2 in Databricks

Mount ADLS Gen2 in Databricks

configs = { "fs.azure.account.auth.type": "OAuth", "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", "fs.azure.account.oauth2.client.id": dbutils.secrets.get(scope="keyvault", key="client-id"), "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="keyvault", key="client-secret"), "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{dbutils.secrets.get(scope='keyvault', key='tenant-id')}/oauth2/token" }
dbutils.fs.mount( source = "abfss://bronze@datalakestorage.dfs.core.windows.net/", mount_point = "/mnt/bronze", extra_configs = configs )
undefined
configs = { "fs.azure.account.auth.type": "OAuth", "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", "fs.azure.account.oauth2.client.id": dbutils.secrets.get(scope="keyvault", key="client-id"), "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="keyvault", key="client-secret"), "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{dbutils.secrets.get(scope='keyvault', key='tenant-id')}/oauth2/token" }
dbutils.fs.mount( source = "abfss://bronze@datalakestorage.dfs.core.windows.net/", mount_point = "/mnt/bronze", extra_configs = configs )
undefined

Troubleshooting

故障排查

Issue: Delta Lake merge taking too long
python
undefined
问题:Delta Lake合并操作耗时过长
python
undefined

Solution: Optimize before merge

Solution: Optimize before merge

from delta.tables import DeltaTable
target_table = DeltaTable.forPath(spark, target_path)
from delta.tables import DeltaTable
target_table = DeltaTable.forPath(spark, target_path)

Compact small files first

Compact small files first

target_table.optimize().executeCompaction()
target_table.optimize().executeCompaction()

Enable auto-optimize and auto-compaction

Enable auto-optimize and auto-compaction

spark.sql(f""" ALTER TABLE delta.
{target_path}
SET TBLPROPERTIES ( delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true ) """)

**Issue: ADF pipeline timeout**

```json
// Increase timeout in ADF pipeline activity
{
  "typeProperties": {
    "timeout": "0.12:00:00"
  },
  "policy": {
    "timeout": "7.00:00:00",
    "retry": 2,
    "retryIntervalInSeconds": 30
  }
}
Issue: Power BI slow refresh
dax
// Use incremental refresh configuration
// In Power BI Desktop: Table Tools > Incremental Refresh

// Or optimize DAX measures
Optimized Total Sales = 
CALCULATE(
    SUM(fact_sales[total_amount]),
    KEEPFILTERS(dim_date[Date])  // Reduce context transition overhead
)
Issue: Schema evolution conflicts
python
undefined
spark.sql(f""" ALTER TABLE delta.
{target_path}
SET TBLPROPERTIES ( delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true ) """)

**问题:ADF管道超时**

```json
// Increase timeout in ADF pipeline activity
{
  "typeProperties": {
    "timeout": "0.12:00:00"
  },
  "policy": {
    "timeout": "7.00:00:00",
    "retry": 2,
    "retryIntervalInSeconds": 30
  }
}
问题:Power BI刷新缓慢
dax
// Use incremental refresh configuration
// In Power BI Desktop: Table Tools > Incremental Refresh

// Or optimize DAX measures
Optimized Total Sales = 
CALCULATE(
    SUM(fact_sales[total_amount]),
    KEEPFILTERS(dim_date[Date])  // Reduce context transition overhead
)
问题:Schema演进冲突
python
undefined

Enable schema merging in Delta writes

Enable schema merging in Delta writes

(df.write .format("delta") .mode("append") .option("mergeSchema", "true") .save(target_path) )
(df.write .format("delta") .mode("append") .option("mergeSchema", "true") .save(target_path) )

Or explicitly allow schema overwrite

Or explicitly allow schema overwrite

(df.write .format("delta") .mode("overwrite") .option("overwriteSchema", "true") .save(target_path) )
undefined
(df.write .format("delta") .mode("overwrite") .option("overwriteSchema", "true") .save(target_path) )
undefined

Reference Architecture

参考架构

This portfolio demonstrates a typical enterprise data platform architecture:
┌─────────────────┐
│  Source Systems │
│  (SQL Server,   │
│   APIs, Files)  │
└────────┬────────┘
┌─────────────────┐
│  Azure Data     │
│  Factory (ADF)  │ ◄──── Metadata-driven ingestion
└────────┬────────┘
┌─────────────────┐
│  ADLS Gen2      │
│  Bronze Layer   │ ◄──── Raw data landing
└────────┬────────┘
┌─────────────────┐
│  Databricks     │
│  Silver Layer   │ ◄──── Cleansing & transformation
└────────┬────────┘
┌─────────────────┐
│  Databricks     │
│  Gold Layer     │ ◄──── Analytics-ready datasets
└────────┬────────┘
         ├──────────────────┐
         ▼                  ▼
┌─────────────────┐  ┌─────────────────┐
│  Power BI       │  │  Synapse        │
│  Reporting      │  │  Analytics      │
└─────────────────┘  └─────────────────┘
This skill provides patterns and code examples for building production-grade data platforms following industry best practices demonstrated across the portfolio projects.
本作品集演示了典型的企业级数据平台架构:
┌─────────────────┐
│  源系统         │
│  (SQL Server,   │
│   APIs, 文件)   │
└────────┬────────┘
┌─────────────────┐
│  Azure Data     │
│  Factory (ADF)  │ ◄──── 元数据驱动的摄取
└────────┬────────┘
┌─────────────────┐
│  ADLS Gen2      │
│  Bronze层       │ ◄──── 原始数据落地
└────────┬────────┘
┌─────────────────┐
│  Databricks     │
│  Silver层       │ ◄──── 数据清洗与转换
└────────┬────────┘
┌─────────────────┐
│  Databricks     │
│  Gold层         │ ◄──── 分析就绪数据集
└────────┬────────┘
         ├──────────────────┐
         ▼                  ▼
┌─────────────────┐  ┌─────────────────┐
│  Power BI       │  │  Synapse        │
│  报表分析       │  │  分析服务       │
└─────────────────┘  └─────────────────┘
本技能提供了遵循行业最佳实践的生产级数据平台构建模式与代码示例,这些内容已在作品集项目中逐一演示。