retail-etl-pipeline-medallion

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Retail ETL Pipeline - Medallion Architecture Skill

零售ETL管道 - Medallion架构技能

Skill by ara.so — Data Skills collection
ara.so提供的技能 — 数据技能合集

Overview

概述

The Retail ETL Pipeline project implements a complete data engineering solution for retail operations using the Medallion Architecture pattern (Bronze → Silver → Gold layers). It handles complex retail scenarios including:
  • Inventory shrinkage resolution
  • Recipe conversions for meat/poultry products
  • Supplier rebate tier tracking
  • Multi-branch sales consolidation
  • Stock level management across locations
The pipeline processes raw CSV data from CRM/ERP systems through three progressive quality layers, ultimately delivering a "Single Version of Truth" for business intelligence.
零售ETL管道项目采用Medallion架构模式(青铜→白银→黄金层),为零售运营实现了完整的数据工程解决方案。它处理复杂的零售场景,包括:
  • 库存损耗解决
  • 肉/禽产品的配方转换
  • 供应商返利层级追踪
  • 多门店销售合并
  • 跨地点的库存水平管理
该管道将来自CRM/ERP系统的原始CSV数据通过三个逐步提升质量的层进行处理,最终为商业智能提供"单一事实版本"。

Architecture Layers

架构层

Bronze Layer (Raw Ingestion)

青铜层(原始数据摄入)

  • Raw data ingestion from CSV files
  • Minimal transformation, preserving source format
  • Audit columns:
    _loaded_at
    ,
    _source_file
  • 从CSV文件摄入原始数据
  • 最小化转换,保留源格式
  • 审计列:
    _loaded_at
    _source_file

Silver Layer (Cleaned & Standardized)

白银层(清洗与标准化)

  • Data type enforcement
  • Deduplication
  • Standardization (dates, currencies, product codes)
  • Business rule validation
  • 数据类型强制校验
  • 去重
  • 标准化(日期、货币、产品编码)
  • 业务规则验证

Gold Layer (Business-Ready Analytics)

黄金层(业务就绪分析)

  • Aggregated metrics
  • Calculated KPIs (inventory turnover, shrinkage %)
  • Dimensional models for BI tools
  • 聚合指标
  • 计算KPI(库存周转率、损耗率)
  • 用于BI工具的维度模型

Installation & Setup

安装与设置

Prerequisites

前提条件

bash
undefined
bash
undefined

Required tools

所需工具

  • Docker & Docker Compose
  • SQL Server 2019+
  • Python 3.8+
  • PySpark 3.x
  • Apache Airflow (optional, for orchestration)
undefined
  • Docker & Docker Compose
  • SQL Server 2019+
  • Python 3.8+
  • PySpark 3.x
  • Apache Airflow(可选,用于编排)
undefined

Infrastructure Setup

基础设施设置

bash
undefined
bash
undefined

Clone the repository

克隆仓库

Start SQL Server container

启动SQL Server容器

docker-compose up -d
docker-compose up -d

Wait for SQL Server to be ready

等待SQL Server就绪

docker logs -f retail-sql-server
undefined
docker logs -f retail-sql-server
undefined

Database Initialization

数据库初始化

bash
undefined
bash
undefined

Connect to SQL Server and run schema setup

连接SQL Server并运行架构设置

sqlcmd -S localhost,1433 -U sa -P ${SQL_SERVER_PASSWORD}
-i sql_scripts/00_create_database_and_schemas.sql
undefined
sqlcmd -S localhost,1433 -U sa -P ${SQL_SERVER_PASSWORD}
-i sql_scripts/00_create_database_and_schemas.sql
undefined

Key SQL Scripts Execution Order

核心SQL脚本执行顺序

The pipeline consists of 13+ SQL scripts that must be executed sequentially:
bash
undefined
管道包含13+个必须按顺序执行的SQL脚本:
bash
undefined

1. Create database and schemas

1. 创建数据库和架构

sql_scripts/00_create_database_and_schemas.sql
sql_scripts/00_create_database_and_schemas.sql

2. Bronze layer ingestion

2. 青铜层数据摄入

sql_scripts/01_bronze_products.sql sql_scripts/02_bronze_sales.sql sql_scripts/03_bronze_stock.sql
sql_scripts/01_bronze_products.sql sql_scripts/02_bronze_sales.sql sql_scripts/03_bronze_stock.sql

3. Silver layer transformations

3. 白银层转换

sql_scripts/04_silver_products.sql sql_scripts/05_silver_sales.sql sql_scripts/06_silver_stock.sql
sql_scripts/04_silver_products.sql sql_scripts/05_silver_sales.sql sql_scripts/06_silver_stock.sql

4. Gold layer aggregations

4. 黄金层聚合

sql_scripts/07_gold_sales_summary.sql sql_scripts/08_gold_inventory_metrics.sql sql_scripts/09_gold_product_performance.sql
sql_scripts/07_gold_sales_summary.sql sql_scripts/08_gold_inventory_metrics.sql sql_scripts/09_gold_product_performance.sql

5. Rebuild pipeline (if needed)

5. 重建管道(如有需要)

sql_scripts/12_rebuild_inventory_pipeline_final_fix.sql
undefined
sql_scripts/12_rebuild_inventory_pipeline_final_fix.sql
undefined

Core ETL Patterns

核心ETL模式

Pattern 1: Bronze Layer Ingestion (Raw CSV → SQL)

模式1:青铜层数据摄入(原始CSV → SQL)

sql
-- Example: Bronze Products Table
CREATE TABLE bronze.products (
    product_id INT,
    product_name NVARCHAR(255),
    category NVARCHAR(100),
    unit_price DECIMAL(10,2),
    supplier_id INT,
    _loaded_at DATETIME2 DEFAULT GETDATE(),
    _source_file NVARCHAR(500)
);

-- Bulk insert from CSV
BULK INSERT bronze.products
FROM '/data_source/000.Hypermarket Products.csv'
WITH (
    FIELDTERMINATOR = ',',
    ROWTERMINATOR = '\n',
    FIRSTROW = 2,
    TABLOCK
);
sql
-- 示例:青铜层产品表
CREATE TABLE bronze.products (
    product_id INT,
    product_name NVARCHAR(255),
    category NVARCHAR(100),
    unit_price DECIMAL(10,2),
    supplier_id INT,
    _loaded_at DATETIME2 DEFAULT GETDATE(),
    _source_file NVARCHAR(500)
);

-- 从CSV批量插入
BULK INSERT bronze.products
FROM '/data_source/000.Hypermarket Products.csv'
WITH (
    FIELDTERMINATOR = ',',
    ROWTERMINATOR = '\n',
    FIRSTROW = 2,
    TABLOCK
);

Pattern 2: Silver Layer Cleansing

模式2:白银层数据清洗

sql
-- Example: Silver Products with Data Quality Rules
CREATE PROCEDURE silver.usp_transform_products
AS
BEGIN
    TRUNCATE TABLE silver.products;
    
    INSERT INTO silver.products (
        product_id,
        product_name,
        category,
        unit_price,
        supplier_id,
        is_active,
        processed_at
    )
    SELECT DISTINCT
        product_id,
        UPPER(TRIM(product_name)) AS product_name,
        COALESCE(category, 'UNCATEGORIZED') AS category,
        CASE 
            WHEN unit_price < 0 THEN 0 
            ELSE unit_price 
        END AS unit_price,
        supplier_id,
        1 AS is_active,
        GETDATE() AS processed_at
    FROM bronze.products
    WHERE product_id IS NOT NULL
      AND product_name IS NOT NULL;
END;
sql
-- 示例:带有数据质量规则的白银层产品表
CREATE PROCEDURE silver.usp_transform_products
AS
BEGIN
    TRUNCATE TABLE silver.products;
    
    INSERT INTO silver.products (
        product_id,
        product_name,
        category,
        unit_price,
        supplier_id,
        is_active,
        processed_at
    )
    SELECT DISTINCT
        product_id,
        UPPER(TRIM(product_name)) AS product_name,
        COALESCE(category, 'UNCATEGORIZED') AS category,
        CASE 
            WHEN unit_price < 0 THEN 0 
            ELSE unit_price 
        END AS unit_price,
        supplier_id,
        1 AS is_active,
        GETDATE() AS processed_at
    FROM bronze.products
    WHERE product_id IS NOT NULL
      AND product_name IS NOT NULL;
END;

Pattern 3: Gold Layer Aggregations

模式3:黄金层数据聚合

sql
-- Example: Sales Summary by Branch and Product
CREATE PROCEDURE gold.usp_sales_summary
AS
BEGIN
    TRUNCATE TABLE gold.sales_summary;
    
    INSERT INTO gold.sales_summary (
        branch_name,
        product_category,
        total_quantity_sold,
        total_revenue,
        avg_unit_price,
        transaction_count,
        report_date
    )
    SELECT 
        s.branch_name,
        p.category AS product_category,
        SUM(s.quantity) AS total_quantity_sold,
        SUM(s.quantity * s.unit_price) AS total_revenue,
        AVG(s.unit_price) AS avg_unit_price,
        COUNT(DISTINCT s.transaction_id) AS transaction_count,
        CAST(GETDATE() AS DATE) AS report_date
    FROM silver.sales s
    INNER JOIN silver.products p ON s.product_id = p.product_id
    GROUP BY s.branch_name, p.category;
END;
sql
-- 示例:按门店和产品汇总销售数据
CREATE PROCEDURE gold.usp_sales_summary
AS
BEGIN
    TRUNCATE TABLE gold.sales_summary;
    
    INSERT INTO gold.sales_summary (
        branch_name,
        product_category,
        total_quantity_sold,
        total_revenue,
        avg_unit_price,
        transaction_count,
        report_date
    )
    SELECT 
        s.branch_name,
        p.category AS product_category,
        SUM(s.quantity) AS total_quantity_sold,
        SUM(s.quantity * s.unit_price) AS total_revenue,
        AVG(s.unit_price) AS avg_unit_price,
        COUNT(DISTINCT s.transaction_id) AS transaction_count,
        CAST(GETDATE() AS DATE) AS report_date
    FROM silver.sales s
    INNER JOIN silver.products p ON s.product_id = p.product_id
    GROUP BY s.branch_name, p.category;
END;

Pattern 4: Inventory Shrinkage Calculation

模式4:库存损耗计算

sql
-- Complex business logic: Detect inventory discrepancies
CREATE PROCEDURE gold.usp_inventory_shrinkage
AS
BEGIN
    WITH StockLevels AS (
        SELECT 
            product_id,
            branch_name,
            SUM(quantity_on_hand) AS current_stock
        FROM silver.stock
        GROUP BY product_id, branch_name
    ),
    ExpectedStock AS (
        SELECT 
            s.product_id,
            s.branch_name,
            sl.current_stock - COALESCE(SUM(s.quantity), 0) AS expected_stock
        FROM StockLevels sl
        LEFT JOIN silver.sales s 
            ON sl.product_id = s.product_id 
            AND sl.branch_name = s.branch_name
        GROUP BY s.product_id, s.branch_name, sl.current_stock
    )
    INSERT INTO gold.inventory_shrinkage (
        product_id,
        branch_name,
        current_stock,
        expected_stock,
        shrinkage_qty,
        shrinkage_percent,
        calculated_at
    )
    SELECT 
        sl.product_id,
        sl.branch_name,
        sl.current_stock,
        es.expected_stock,
        sl.current_stock - es.expected_stock AS shrinkage_qty,
        CASE 
            WHEN es.expected_stock > 0 
            THEN ((sl.current_stock - es.expected_stock) * 100.0 / es.expected_stock)
            ELSE 0 
        END AS shrinkage_percent,
        GETDATE() AS calculated_at
    FROM StockLevels sl
    INNER JOIN ExpectedStock es 
        ON sl.product_id = es.product_id 
        AND sl.branch_name = es.branch_name;
END;
sql
-- 复杂业务逻辑:检测库存差异
CREATE PROCEDURE gold.usp_inventory_shrinkage
AS
BEGIN
    WITH StockLevels AS (
        SELECT 
            product_id,
            branch_name,
            SUM(quantity_on_hand) AS current_stock
        FROM silver.stock
        GROUP BY product_id, branch_name
    ),
    ExpectedStock AS (
        SELECT 
            s.product_id,
            s.branch_name,
            sl.current_stock - COALESCE(SUM(s.quantity), 0) AS expected_stock
        FROM StockLevels sl
        LEFT JOIN silver.sales s 
            ON sl.product_id = s.product_id 
            AND sl.branch_name = s.branch_name
        GROUP BY s.product_id, s.branch_name, sl.current_stock
    )
    INSERT INTO gold.inventory_shrinkage (
        product_id,
        branch_name,
        current_stock,
        expected_stock,
        shrinkage_qty,
        shrinkage_percent,
        calculated_at
    )
    SELECT 
        sl.product_id,
        sl.branch_name,
        sl.current_stock,
        es.expected_stock,
        sl.current_stock - es.expected_stock AS shrinkage_qty,
        CASE 
            WHEN es.expected_stock > 0 
            THEN ((sl.current_stock - es.expected_stock) * 100.0 / es.expected_stock)
            ELSE 0 
        END AS shrinkage_percent,
        GETDATE() AS calculated_at
    FROM StockLevels sl
    INNER JOIN ExpectedStock es 
        ON sl.product_id = es.product_id 
        AND sl.branch_name = es.branch_name;
END;

PySpark Integration (Optional)

PySpark集成(可选)

For large-scale data processing, integrate PySpark for Silver/Gold transformations:
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when
针对大规模数据处理,可集成PySpark进行白银/黄金层转换:
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when

Initialize Spark session

初始化Spark会话

spark = SparkSession.builder
.appName("RetailETL-Silver")
.config("spark.sql.warehouse.dir", "/data/warehouse")
.getOrCreate()
spark = SparkSession.builder
.appName("RetailETL-Silver")
.config("spark.sql.warehouse.dir", "/data/warehouse")
.getOrCreate()

Read Bronze layer (Parquet or Delta)

读取青铜层数据(Parquet或Delta格式)

bronze_sales_df = spark.read.parquet("/data/bronze/sales/")
bronze_sales_df = spark.read.parquet("/data/bronze/sales/")

Silver transformations

白银层转换

silver_sales_df = bronze_sales_df
.dropDuplicates(["transaction_id"])
.filter(col("quantity") > 0)
.withColumn("unit_price", when(col("unit_price") < 0, 0).otherwise(col("unit_price")))
.withColumn("total_amount", col("quantity") * col("unit_price"))
silver_sales_df = bronze_sales_df
.dropDuplicates(["transaction_id"])
.filter(col("quantity") > 0)
.withColumn("unit_price", when(col("unit_price") < 0, 0).otherwise(col("unit_price")))
.withColumn("total_amount", col("quantity") * col("unit_price"))

Write to Silver layer

写入白银层

silver_sales_df.write
.mode("overwrite")
.partitionBy("branch_name", "sale_date")
.parquet("/data/silver/sales/")
silver_sales_df.write
.mode("overwrite")
.partitionBy("branch_name", "sale_date")
.parquet("/data/silver/sales/")

Gold aggregations

黄金层聚合

gold_sales_summary = silver_sales_df
.groupBy("branch_name", "product_category")
.agg( sum("quantity").alias("total_quantity"), sum("total_amount").alias("total_revenue"), avg("unit_price").alias("avg_unit_price"), count("transaction_id").alias("transaction_count") )
gold_sales_summary.write
.mode("overwrite")
.parquet("/data/gold/sales_summary/")
undefined
gold_sales_summary = silver_sales_df
.groupBy("branch_name", "product_category")
.agg( sum("quantity").alias("total_quantity"), sum("total_amount").alias("total_revenue"), avg("unit_price").alias("avg_unit_price"), count("transaction_id").alias("transaction_count") )
gold_sales_summary.write
.mode("overwrite")
.parquet("/data/gold/sales_summary/")
undefined

Airflow DAG Example

Airflow DAG示例

Orchestrate the entire pipeline with Apache Airflow:
python
from airflow import DAG
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'retail_etl_medallion',
    default_args=default_args,
    description='Daily Retail ETL Pipeline',
    schedule_interval='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:

    # Bronze ingestion
    ingest_products = MsSqlOperator(
        task_id='bronze_ingest_products',
        mssql_conn_id='retail_sql_server',
        sql='sql_scripts/01_bronze_products.sql',
    )
    
    ingest_sales = MsSqlOperator(
        task_id='bronze_ingest_sales',
        mssql_conn_id='retail_sql_server',
        sql='sql_scripts/02_bronze_sales.sql',
    )
    
    ingest_stock = MsSqlOperator(
        task_id='bronze_ingest_stock',
        mssql_conn_id='retail_sql_server',
        sql='sql_scripts/03_bronze_stock.sql',
    )
    
    # Silver transformations
    transform_products = MsSqlOperator(
        task_id='silver_transform_products',
        mssql_conn_id='retail_sql_server',
        sql='EXEC silver.usp_transform_products;',
    )
    
    transform_sales = MsSqlOperator(
        task_id='silver_transform_sales',
        mssql_conn_id='retail_sql_server',
        sql='EXEC silver.usp_transform_sales;',
    )
    
    # Gold aggregations
    aggregate_sales_summary = MsSqlOperator(
        task_id='gold_sales_summary',
        mssql_conn_id='retail_sql_server',
        sql='EXEC gold.usp_sales_summary;',
    )
    
    aggregate_inventory_shrinkage = MsSqlOperator(
        task_id='gold_inventory_shrinkage',
        mssql_conn_id='retail_sql_server',
        sql='EXEC gold.usp_inventory_shrinkage;',
    )
    
    # Define dependencies
    [ingest_products, ingest_sales, ingest_stock] >> transform_products
    [ingest_sales] >> transform_sales
    [transform_products, transform_sales] >> aggregate_sales_summary
    [transform_products, ingest_stock] >> aggregate_inventory_shrinkage
使用Apache Airflow编排整个管道:
python
from airflow import DAG
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'retail_etl_medallion',
    default_args=default_args,
    description='Daily Retail ETL Pipeline',
    schedule_interval='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:

    # 青铜层数据摄入
    ingest_products = MsSqlOperator(
        task_id='bronze_ingest_products',
        mssql_conn_id='retail_sql_server',
        sql='sql_scripts/01_bronze_products.sql',
    )
    
    ingest_sales = MsSqlOperator(
        task_id='bronze_ingest_sales',
        mssql_conn_id='retail_sql_server',
        sql='sql_scripts/02_bronze_sales.sql',
    )
    
    ingest_stock = MsSqlOperator(
        task_id='bronze_ingest_stock',
        mssql_conn_id='retail_sql_server',
        sql='sql_scripts/03_bronze_stock.sql',
    )
    
    # 白银层转换
    transform_products = MsSqlOperator(
        task_id='silver_transform_products',
        mssql_conn_id='retail_sql_server',
        sql='EXEC silver.usp_transform_products;',
    )
    
    transform_sales = MsSqlOperator(
        task_id='silver_transform_sales',
        mssql_conn_id='retail_sql_server',
        sql='EXEC silver.usp_transform_sales;',
    )
    
    # 黄金层聚合
    aggregate_sales_summary = MsSqlOperator(
        task_id='gold_sales_summary',
        mssql_conn_id='retail_sql_server',
        sql='EXEC gold.usp_sales_summary;',
    )
    
    aggregate_inventory_shrinkage = MsSqlOperator(
        task_id='gold_inventory_shrinkage',
        mssql_conn_id='retail_sql_server',
        sql='EXEC gold.usp_inventory_shrinkage;',
    )
    
    # 定义依赖关系
    [ingest_products, ingest_sales, ingest_stock] >> transform_products
    [ingest_sales] >> transform_sales
    [transform_products, transform_sales] >> aggregate_sales_summary
    [transform_products, ingest_stock] >> aggregate_inventory_shrinkage

Configuration

配置

Environment Variables

环境变量

bash
undefined
bash
undefined

SQL Server connection

SQL Server连接配置

export SQL_SERVER_HOST=localhost export SQL_SERVER_PORT=1433 export SQL_SERVER_USER=sa export SQL_SERVER_PASSWORD=${YOUR_SECURE_PASSWORD} export SQL_SERVER_DATABASE=RetailDataWarehouse
export SQL_SERVER_HOST=localhost export SQL_SERVER_PORT=1433 export SQL_SERVER_USER=sa export SQL_SERVER_PASSWORD=${YOUR_SECURE_PASSWORD} export SQL_SERVER_DATABASE=RetailDataWarehouse

Data paths

数据路径

export DATA_SOURCE_PATH=/data_source export BRONZE_LAYER_PATH=/data/bronze export SILVER_LAYER_PATH=/data/silver export GOLD_LAYER_PATH=/data/gold
export DATA_SOURCE_PATH=/data_source export BRONZE_LAYER_PATH=/data/bronze export SILVER_LAYER_PATH=/data/silver export GOLD_LAYER_PATH=/data/gold

Airflow (if used)

Airflow配置(如使用)

export AIRFLOW_CONN_RETAIL_SQL_SERVER="mssql://${SQL_SERVER_USER}:${SQL_SERVER_PASSWORD}@${SQL_SERVER_HOST}:${SQL_SERVER_PORT}/${SQL_SERVER_DATABASE}"
undefined
export AIRFLOW_CONN_RETAIL_SQL_SERVER="mssql://${SQL_SERVER_USER}:${SQL_SERVER_PASSWORD}@${SQL_SERVER_HOST}:${SQL_SERVER_PORT}/${SQL_SERVER_DATABASE}"
undefined

Docker Compose Configuration

Docker Compose配置

yaml
undefined
yaml
undefined

docker-compose.yml

docker-compose.yml

version: '3.8'
services: sqlserver: image: mcr.microsoft.com/mssql/server:2019-latest container_name: retail-sql-server environment: - ACCEPT_EULA=Y - SA_PASSWORD=${SQL_SERVER_PASSWORD} - MSSQL_PID=Developer ports: - "1433:1433" volumes: - ./data_source:/data_source - ./sql_scripts:/sql_scripts - sqlserver_data:/var/opt/mssql
volumes: sqlserver_data:
undefined
version: '3.8'
services: sqlserver: image: mcr.microsoft.com/mssql/server:2019-latest container_name: retail-sql-server environment: - ACCEPT_EULA=Y - SA_PASSWORD=${SQL_SERVER_PASSWORD} - MSSQL_PID=Developer ports: - "1433:1433" volumes: - ./data_source:/data_source - ./sql_scripts:/sql_scripts - sqlserver_data:/var/opt/mssql
volumes: sqlserver_data:
undefined

Common Patterns & Use Cases

常见模式与用例

Use Case 1: Multi-Branch Sales Consolidation

用例1:多门店销售合并

sql
-- Combine sales from Alex, Cairo, and Giza branches
CREATE VIEW gold.vw_consolidated_sales AS
SELECT 
    'Alexandria' AS branch_name,
    * 
FROM bronze.alex_sales
UNION ALL
SELECT 
    'Cairo' AS branch_name,
    * 
FROM bronze.cairo_sales
UNION ALL
SELECT 
    'Giza' AS branch_name,
    * 
FROM bronze.giza_sales;
sql
-- 合并亚历山大、开罗和吉萨门店的销售数据
CREATE VIEW gold.vw_consolidated_sales AS
SELECT 
    'Alexandria' AS branch_name,
    * 
FROM bronze.alex_sales
UNION ALL
SELECT 
    'Cairo' AS branch_name,
    * 
FROM bronze.cairo_sales
UNION ALL
SELECT 
    'Giza' AS branch_name,
    * 
FROM bronze.giza_sales;

Use Case 2: Recipe Yield Tracking (Meat/Poultry)

用例2:配方产出追踪(肉/禽产品)

sql
-- Track conversion rates for processed meat products
CREATE TABLE silver.recipe_conversions (
    recipe_id INT PRIMARY KEY,
    raw_product_id INT,
    finished_product_id INT,
    conversion_ratio DECIMAL(5,2), -- e.g., 0.85 (15% waste)
    effective_date DATE
);

-- Calculate actual yield vs. expected
SELECT 
    rc.recipe_id,
    SUM(s.quantity * rc.conversion_ratio) AS expected_yield,
    SUM(stock.quantity_on_hand) AS actual_yield,
    (SUM(stock.quantity_on_hand) - SUM(s.quantity * rc.conversion_ratio)) AS yield_variance
FROM silver.sales s
INNER JOIN silver.recipe_conversions rc ON s.product_id = rc.raw_product_id
INNER JOIN silver.stock stock ON rc.finished_product_id = stock.product_id
GROUP BY rc.recipe_id;
sql
-- 追踪加工肉制品的转化率
CREATE TABLE silver.recipe_conversions (
    recipe_id INT PRIMARY KEY,
    raw_product_id INT,
    finished_product_id INT,
    conversion_ratio DECIMAL(5,2), -- 例如:0.85(15%损耗)
    effective_date DATE
);

-- 计算实际产出与预期产出
SELECT 
    rc.recipe_id,
    SUM(s.quantity * rc.conversion_ratio) AS expected_yield,
    SUM(stock.quantity_on_hand) AS actual_yield,
    (SUM(stock.quantity_on_hand) - SUM(s.quantity * rc.conversion_ratio)) AS yield_variance
FROM silver.sales s
INNER JOIN silver.recipe_conversions rc ON s.product_id = rc.raw_product_id
INNER JOIN silver.stock stock ON rc.finished_product_id = stock.product_id
GROUP BY rc.recipe_id;

Use Case 3: Supplier Rebate Tier Tracking

用例3:供应商返利层级追踪

sql
-- Track purchase volume for supplier rebate calculations
CREATE TABLE gold.supplier_rebate_tiers (
    supplier_id INT,
    total_purchases DECIMAL(15,2),
    rebate_tier VARCHAR(20),
    rebate_percentage DECIMAL(5,2),
    calculated_at DATETIME2
);

INSERT INTO gold.supplier_rebate_tiers
SELECT 
    p.supplier_id,
    SUM(s.quantity * s.unit_price) AS total_purchases,
    CASE 
        WHEN SUM(s.quantity * s.unit_price) > 100000 THEN 'Platinum'
        WHEN SUM(s.quantity * s.unit_price) > 50000 THEN 'Gold'
        WHEN SUM(s.quantity * s.unit_price) > 25000 THEN 'Silver'
        ELSE 'Bronze'
    END AS rebate_tier,
    CASE 
        WHEN SUM(s.quantity * s.unit_price) > 100000 THEN 5.0
        WHEN SUM(s.quantity * s.unit_price) > 50000 THEN 3.5
        WHEN SUM(s.quantity * s.unit_price) > 25000 THEN 2.0
        ELSE 1.0
    END AS rebate_percentage,
    GETDATE() AS calculated_at
FROM silver.sales s
INNER JOIN silver.products p ON s.product_id = p.product_id
GROUP BY p.supplier_id;
sql
-- 追踪采购量以计算供应商返利
CREATE TABLE gold.supplier_rebate_tiers (
    supplier_id INT,
    total_purchases DECIMAL(15,2),
    rebate_tier VARCHAR(20),
    rebate_percentage DECIMAL(5,2),
    calculated_at DATETIME2
);

INSERT INTO gold.supplier_rebate_tiers
SELECT 
    p.supplier_id,
    SUM(s.quantity * s.unit_price) AS total_purchases,
    CASE 
        WHEN SUM(s.quantity * s.unit_price) > 100000 THEN 'Platinum'
        WHEN SUM(s.quantity * s.unit_price) > 50000 THEN 'Gold'
        WHEN SUM(s.quantity * s.unit_price) > 25000 THEN 'Silver'
        ELSE 'Bronze'
    END AS rebate_tier,
    CASE 
        WHEN SUM(s.quantity * s.unit_price) > 100000 THEN 5.0
        WHEN SUM(s.quantity * s.unit_price) > 50000 THEN 3.5
        WHEN SUM(s.quantity * s.unit_price) > 25000 THEN 2.0
        ELSE 1.0
    END AS rebate_percentage,
    GETDATE() AS calculated_at
FROM silver.sales s
INNER JOIN silver.products p ON s.product_id = p.product_id
GROUP BY p.supplier_id;

Troubleshooting

故障排查

Issue 1: CSV Bulk Insert Fails

问题1:CSV批量插入失败

sql
-- Check file path permissions
EXEC xp_cmdshell 'dir C:\data_source\*.csv';

-- Use ERRORFILE to capture bad rows
BULK INSERT bronze.products
FROM '/data_source/000.Hypermarket Products.csv'
WITH (
    FIELDTERMINATOR = ',',
    ROWTERMINATOR = '\n',
    FIRSTROW = 2,
    ERRORFILE = '/data_source/errors/products_errors.csv',
    MAXERRORS = 10
);
sql
-- 检查文件路径权限
EXEC xp_cmdshell 'dir C:\data_source\*.csv';

-- 使用ERRORFILE捕获错误行
BULK INSERT bronze.products
FROM '/data_source/000.Hypermarket Products.csv'
WITH (
    FIELDTERMINATOR = ',',
    ROWTERMINATOR = '\n',
    FIRSTROW = 2,
    ERRORFILE = '/data_source/errors/products_errors.csv',
    MAXERRORS = 10
);

Issue 2: Duplicate Records in Silver Layer

问题2:白银层存在重复记录

sql
-- Add deduplication logic with ROW_NUMBER
WITH DeduplicatedSales AS (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY transaction_id 
            ORDER BY _loaded_at DESC
        ) AS rn
    FROM bronze.sales
)
INSERT INTO silver.sales
SELECT * 
FROM DeduplicatedSales
WHERE rn = 1;
sql
-- 使用ROW_NUMBER添加去重逻辑
WITH DeduplicatedSales AS (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY transaction_id 
            ORDER BY _loaded_at DESC
        ) AS rn
    FROM bronze.sales
)
INSERT INTO silver.sales
SELECT * 
FROM DeduplicatedSales
WHERE rn = 1;

Issue 3: Performance Optimization

问题3:性能优化

sql
-- Create indexes on frequently joined columns
CREATE NONCLUSTERED INDEX idx_sales_product_id 
    ON silver.sales(product_id) INCLUDE (quantity, unit_price);

CREATE NONCLUSTERED INDEX idx_stock_product_branch 
    ON silver.stock(product_id, branch_name) INCLUDE (quantity_on_hand);

-- Partition large tables by date
CREATE PARTITION FUNCTION pf_sales_date (DATE)
AS RANGE RIGHT FOR VALUES (
    '2025-01-01', '2025-02-01', '2025-03-01', '2025-04-01'
);
sql
-- 为频繁关联的列创建索引
CREATE NONCLUSTERED INDEX idx_sales_product_id 
    ON silver.sales(product_id) INCLUDE (quantity, unit_price);

CREATE NONCLUSTERED INDEX idx_stock_product_branch 
    ON silver.stock(product_id, branch_name) INCLUDE (quantity_on_hand);

-- 按日期对大表进行分区
CREATE PARTITION FUNCTION pf_sales_date (DATE)
AS RANGE RIGHT FOR VALUES (
    '2025-01-01', '2025-02-01', '2025-03-01', '2025-04-01'
);

Issue 4: Rebuild Entire Pipeline

问题4:重建整个管道

bash
undefined
bash
-- 使用重建脚本重置并重新处理所有层
sqlcmd -S ${SQL_SERVER_HOST},${SQL_SERVER_PORT} \
  -U ${SQL_SERVER_USER} -P ${SQL_SERVER_PASSWORD} \
  -d RetailDataWarehouse \
  -i sql_scripts/12_rebuild_inventory_pipeline_final_fix.sql

Use the rebuild script to reset and reprocess all layers

数据质量测试

sqlcmd -S ${SQL_SERVER_HOST},${SQL_SERVER_PORT}
-U ${SQL_SERVER_USER} -P ${SQL_SERVER_PASSWORD}
-d RetailDataWarehouse
-i sql_scripts/12_rebuild_inventory_pipeline_final_fix.sql
undefined
sql
-- 白银层数据质量检查
SELECT 'Products' AS layer,
    COUNT(*) AS total_records,
    COUNT(DISTINCT product_id) AS unique_products,
    SUM(CASE WHEN unit_price < 0 THEN 1 ELSE 0 END) AS negative_prices,
    SUM(CASE WHEN product_name IS NULL THEN 1 ELSE 0 END) AS null_names
FROM silver.products

UNION ALL

SELECT 'Sales' AS layer,
    COUNT(*) AS total_records,
    COUNT(DISTINCT transaction_id) AS unique_transactions,
    SUM(CASE WHEN quantity <= 0 THEN 1 ELSE 0 END) AS invalid_quantity,
    SUM(CASE WHEN product_id NOT IN (SELECT product_id FROM silver.products) THEN 1 ELSE 0 END) AS orphaned_products
FROM silver.sales;

Testing Data Quality

最佳实践

sql
-- Data quality checks for Silver layer
SELECT 'Products' AS layer,
    COUNT(*) AS total_records,
    COUNT(DISTINCT product_id) AS unique_products,
    SUM(CASE WHEN unit_price < 0 THEN 1 ELSE 0 END) AS negative_prices,
    SUM(CASE WHEN product_name IS NULL THEN 1 ELSE 0 END) AS null_names
FROM silver.products

UNION ALL

SELECT 'Sales' AS layer,
    COUNT(*) AS total_records,
    COUNT(DISTINCT transaction_id) AS unique_transactions,
    SUM(CASE WHEN quantity <= 0 THEN 1 ELSE 0 END) AS invalid_quantity,
    SUM(CASE WHEN product_id NOT IN (SELECT product_id FROM silver.products) THEN 1 ELSE 0 END) AS orphaned_products
FROM silver.sales;
  1. 始终按顺序逐层处理:青铜→白银→黄金
  2. 使用存储过程实现可复用的转换逻辑
  3. 添加审计列
    _loaded_at
    _source_file
    processed_at
  4. 实现幂等性:采用截断加载或更新插入模式
  5. 对大表进行分区:按日期或门店分区以提升性能
  6. 创建全面的索引:针对关联和过滤列创建索引
  7. 使用CTE提升复杂业务逻辑的可读性
  8. 在每层转换时测试数据质量
  9. 版本控制所有SQL脚本和配置
  10. 使用Airflow或同类编排工具监控管道执行

Best Practices

  1. Always process through layers sequentially: Bronze → Silver → Gold
  2. Use stored procedures for reusable transformations
  3. Add audit columns (
    _loaded_at
    ,
    _source_file
    ,
    processed_at
    )
  4. Implement idempotency: Truncate-and-load or upsert patterns
  5. Partition large tables by date or branch for performance
  6. Create comprehensive indexes on join and filter columns
  7. Use CTEs for complex business logic readability
  8. Test data quality at each layer transition
  9. Version control all SQL scripts and configurations
  10. Monitor pipeline execution with Airflow or equivalent orchestrator