retail-etl-pipeline-medallion
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseRetail ETL Pipeline - Medallion Architecture Skill
零售ETL管道 - Medallion架构技能
Overview
概述
The Retail ETL Pipeline project implements a complete data engineering solution for retail operations using the Medallion Architecture pattern (Bronze → Silver → Gold layers). It handles complex retail scenarios including:
- Inventory shrinkage resolution
- Recipe conversions for meat/poultry products
- Supplier rebate tier tracking
- Multi-branch sales consolidation
- Stock level management across locations
The pipeline processes raw CSV data from CRM/ERP systems through three progressive quality layers, ultimately delivering a "Single Version of Truth" for business intelligence.
零售ETL管道项目采用Medallion架构模式(青铜→白银→黄金层),为零售运营实现了完整的数据工程解决方案。它处理复杂的零售场景,包括:
- 库存损耗解决
- 肉/禽产品的配方转换
- 供应商返利层级追踪
- 多门店销售合并
- 跨地点的库存水平管理
该管道将来自CRM/ERP系统的原始CSV数据通过三个逐步提升质量的层进行处理,最终为商业智能提供"单一事实版本"。
Architecture Layers
架构层
Bronze Layer (Raw Ingestion)
青铜层(原始数据摄入)
- Raw data ingestion from CSV files
- Minimal transformation, preserving source format
- Audit columns: ,
_loaded_at_source_file
- 从CSV文件摄入原始数据
- 最小化转换,保留源格式
- 审计列:、
_loaded_at_source_file
Silver Layer (Cleaned & Standardized)
白银层(清洗与标准化)
- Data type enforcement
- Deduplication
- Standardization (dates, currencies, product codes)
- Business rule validation
- 数据类型强制校验
- 去重
- 标准化(日期、货币、产品编码)
- 业务规则验证
Gold Layer (Business-Ready Analytics)
黄金层(业务就绪分析)
- Aggregated metrics
- Calculated KPIs (inventory turnover, shrinkage %)
- Dimensional models for BI tools
- 聚合指标
- 计算KPI(库存周转率、损耗率)
- 用于BI工具的维度模型
Installation & Setup
安装与设置
Prerequisites
前提条件
bash
undefinedbash
undefinedRequired tools
所需工具
- Docker & Docker Compose
- SQL Server 2019+
- Python 3.8+
- PySpark 3.x
- Apache Airflow (optional, for orchestration)
undefined- Docker & Docker Compose
- SQL Server 2019+
- Python 3.8+
- PySpark 3.x
- Apache Airflow(可选,用于编排)
undefinedInfrastructure Setup
基础设施设置
bash
undefinedbash
undefinedClone the repository
克隆仓库
git clone https://github.com/EsraaSolimanMubarak/Retail-ETL-Pipeline.git
cd Retail-ETL-Pipeline
git clone https://github.com/EsraaSolimanMubarak/Retail-ETL-Pipeline.git
cd Retail-ETL-Pipeline
Start SQL Server container
启动SQL Server容器
docker-compose up -d
docker-compose up -d
Wait for SQL Server to be ready
等待SQL Server就绪
docker logs -f retail-sql-server
undefineddocker logs -f retail-sql-server
undefinedDatabase Initialization
数据库初始化
bash
undefinedbash
undefinedConnect to SQL Server and run schema setup
连接SQL Server并运行架构设置
sqlcmd -S localhost,1433 -U sa -P ${SQL_SERVER_PASSWORD}
-i sql_scripts/00_create_database_and_schemas.sql
-i sql_scripts/00_create_database_and_schemas.sql
undefinedsqlcmd -S localhost,1433 -U sa -P ${SQL_SERVER_PASSWORD}
-i sql_scripts/00_create_database_and_schemas.sql
-i sql_scripts/00_create_database_and_schemas.sql
undefinedKey SQL Scripts Execution Order
核心SQL脚本执行顺序
The pipeline consists of 13+ SQL scripts that must be executed sequentially:
bash
undefined管道包含13+个必须按顺序执行的SQL脚本:
bash
undefined1. Create database and schemas
1. 创建数据库和架构
sql_scripts/00_create_database_and_schemas.sql
sql_scripts/00_create_database_and_schemas.sql
2. Bronze layer ingestion
2. 青铜层数据摄入
sql_scripts/01_bronze_products.sql
sql_scripts/02_bronze_sales.sql
sql_scripts/03_bronze_stock.sql
sql_scripts/01_bronze_products.sql
sql_scripts/02_bronze_sales.sql
sql_scripts/03_bronze_stock.sql
3. Silver layer transformations
3. 白银层转换
sql_scripts/04_silver_products.sql
sql_scripts/05_silver_sales.sql
sql_scripts/06_silver_stock.sql
sql_scripts/04_silver_products.sql
sql_scripts/05_silver_sales.sql
sql_scripts/06_silver_stock.sql
4. Gold layer aggregations
4. 黄金层聚合
sql_scripts/07_gold_sales_summary.sql
sql_scripts/08_gold_inventory_metrics.sql
sql_scripts/09_gold_product_performance.sql
sql_scripts/07_gold_sales_summary.sql
sql_scripts/08_gold_inventory_metrics.sql
sql_scripts/09_gold_product_performance.sql
5. Rebuild pipeline (if needed)
5. 重建管道(如有需要)
sql_scripts/12_rebuild_inventory_pipeline_final_fix.sql
undefinedsql_scripts/12_rebuild_inventory_pipeline_final_fix.sql
undefinedCore ETL Patterns
核心ETL模式
Pattern 1: Bronze Layer Ingestion (Raw CSV → SQL)
模式1:青铜层数据摄入(原始CSV → SQL)
sql
-- Example: Bronze Products Table
CREATE TABLE bronze.products (
product_id INT,
product_name NVARCHAR(255),
category NVARCHAR(100),
unit_price DECIMAL(10,2),
supplier_id INT,
_loaded_at DATETIME2 DEFAULT GETDATE(),
_source_file NVARCHAR(500)
);
-- Bulk insert from CSV
BULK INSERT bronze.products
FROM '/data_source/000.Hypermarket Products.csv'
WITH (
FIELDTERMINATOR = ',',
ROWTERMINATOR = '\n',
FIRSTROW = 2,
TABLOCK
);sql
-- 示例:青铜层产品表
CREATE TABLE bronze.products (
product_id INT,
product_name NVARCHAR(255),
category NVARCHAR(100),
unit_price DECIMAL(10,2),
supplier_id INT,
_loaded_at DATETIME2 DEFAULT GETDATE(),
_source_file NVARCHAR(500)
);
-- 从CSV批量插入
BULK INSERT bronze.products
FROM '/data_source/000.Hypermarket Products.csv'
WITH (
FIELDTERMINATOR = ',',
ROWTERMINATOR = '\n',
FIRSTROW = 2,
TABLOCK
);Pattern 2: Silver Layer Cleansing
模式2:白银层数据清洗
sql
-- Example: Silver Products with Data Quality Rules
CREATE PROCEDURE silver.usp_transform_products
AS
BEGIN
TRUNCATE TABLE silver.products;
INSERT INTO silver.products (
product_id,
product_name,
category,
unit_price,
supplier_id,
is_active,
processed_at
)
SELECT DISTINCT
product_id,
UPPER(TRIM(product_name)) AS product_name,
COALESCE(category, 'UNCATEGORIZED') AS category,
CASE
WHEN unit_price < 0 THEN 0
ELSE unit_price
END AS unit_price,
supplier_id,
1 AS is_active,
GETDATE() AS processed_at
FROM bronze.products
WHERE product_id IS NOT NULL
AND product_name IS NOT NULL;
END;sql
-- 示例:带有数据质量规则的白银层产品表
CREATE PROCEDURE silver.usp_transform_products
AS
BEGIN
TRUNCATE TABLE silver.products;
INSERT INTO silver.products (
product_id,
product_name,
category,
unit_price,
supplier_id,
is_active,
processed_at
)
SELECT DISTINCT
product_id,
UPPER(TRIM(product_name)) AS product_name,
COALESCE(category, 'UNCATEGORIZED') AS category,
CASE
WHEN unit_price < 0 THEN 0
ELSE unit_price
END AS unit_price,
supplier_id,
1 AS is_active,
GETDATE() AS processed_at
FROM bronze.products
WHERE product_id IS NOT NULL
AND product_name IS NOT NULL;
END;Pattern 3: Gold Layer Aggregations
模式3:黄金层数据聚合
sql
-- Example: Sales Summary by Branch and Product
CREATE PROCEDURE gold.usp_sales_summary
AS
BEGIN
TRUNCATE TABLE gold.sales_summary;
INSERT INTO gold.sales_summary (
branch_name,
product_category,
total_quantity_sold,
total_revenue,
avg_unit_price,
transaction_count,
report_date
)
SELECT
s.branch_name,
p.category AS product_category,
SUM(s.quantity) AS total_quantity_sold,
SUM(s.quantity * s.unit_price) AS total_revenue,
AVG(s.unit_price) AS avg_unit_price,
COUNT(DISTINCT s.transaction_id) AS transaction_count,
CAST(GETDATE() AS DATE) AS report_date
FROM silver.sales s
INNER JOIN silver.products p ON s.product_id = p.product_id
GROUP BY s.branch_name, p.category;
END;sql
-- 示例:按门店和产品汇总销售数据
CREATE PROCEDURE gold.usp_sales_summary
AS
BEGIN
TRUNCATE TABLE gold.sales_summary;
INSERT INTO gold.sales_summary (
branch_name,
product_category,
total_quantity_sold,
total_revenue,
avg_unit_price,
transaction_count,
report_date
)
SELECT
s.branch_name,
p.category AS product_category,
SUM(s.quantity) AS total_quantity_sold,
SUM(s.quantity * s.unit_price) AS total_revenue,
AVG(s.unit_price) AS avg_unit_price,
COUNT(DISTINCT s.transaction_id) AS transaction_count,
CAST(GETDATE() AS DATE) AS report_date
FROM silver.sales s
INNER JOIN silver.products p ON s.product_id = p.product_id
GROUP BY s.branch_name, p.category;
END;Pattern 4: Inventory Shrinkage Calculation
模式4:库存损耗计算
sql
-- Complex business logic: Detect inventory discrepancies
CREATE PROCEDURE gold.usp_inventory_shrinkage
AS
BEGIN
WITH StockLevels AS (
SELECT
product_id,
branch_name,
SUM(quantity_on_hand) AS current_stock
FROM silver.stock
GROUP BY product_id, branch_name
),
ExpectedStock AS (
SELECT
s.product_id,
s.branch_name,
sl.current_stock - COALESCE(SUM(s.quantity), 0) AS expected_stock
FROM StockLevels sl
LEFT JOIN silver.sales s
ON sl.product_id = s.product_id
AND sl.branch_name = s.branch_name
GROUP BY s.product_id, s.branch_name, sl.current_stock
)
INSERT INTO gold.inventory_shrinkage (
product_id,
branch_name,
current_stock,
expected_stock,
shrinkage_qty,
shrinkage_percent,
calculated_at
)
SELECT
sl.product_id,
sl.branch_name,
sl.current_stock,
es.expected_stock,
sl.current_stock - es.expected_stock AS shrinkage_qty,
CASE
WHEN es.expected_stock > 0
THEN ((sl.current_stock - es.expected_stock) * 100.0 / es.expected_stock)
ELSE 0
END AS shrinkage_percent,
GETDATE() AS calculated_at
FROM StockLevels sl
INNER JOIN ExpectedStock es
ON sl.product_id = es.product_id
AND sl.branch_name = es.branch_name;
END;sql
-- 复杂业务逻辑:检测库存差异
CREATE PROCEDURE gold.usp_inventory_shrinkage
AS
BEGIN
WITH StockLevels AS (
SELECT
product_id,
branch_name,
SUM(quantity_on_hand) AS current_stock
FROM silver.stock
GROUP BY product_id, branch_name
),
ExpectedStock AS (
SELECT
s.product_id,
s.branch_name,
sl.current_stock - COALESCE(SUM(s.quantity), 0) AS expected_stock
FROM StockLevels sl
LEFT JOIN silver.sales s
ON sl.product_id = s.product_id
AND sl.branch_name = s.branch_name
GROUP BY s.product_id, s.branch_name, sl.current_stock
)
INSERT INTO gold.inventory_shrinkage (
product_id,
branch_name,
current_stock,
expected_stock,
shrinkage_qty,
shrinkage_percent,
calculated_at
)
SELECT
sl.product_id,
sl.branch_name,
sl.current_stock,
es.expected_stock,
sl.current_stock - es.expected_stock AS shrinkage_qty,
CASE
WHEN es.expected_stock > 0
THEN ((sl.current_stock - es.expected_stock) * 100.0 / es.expected_stock)
ELSE 0
END AS shrinkage_percent,
GETDATE() AS calculated_at
FROM StockLevels sl
INNER JOIN ExpectedStock es
ON sl.product_id = es.product_id
AND sl.branch_name = es.branch_name;
END;PySpark Integration (Optional)
PySpark集成(可选)
For large-scale data processing, integrate PySpark for Silver/Gold transformations:
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when针对大规模数据处理,可集成PySpark进行白银/黄金层转换:
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, whenInitialize Spark session
初始化Spark会话
spark = SparkSession.builder
.appName("RetailETL-Silver")
.config("spark.sql.warehouse.dir", "/data/warehouse")
.getOrCreate()
.appName("RetailETL-Silver")
.config("spark.sql.warehouse.dir", "/data/warehouse")
.getOrCreate()
spark = SparkSession.builder
.appName("RetailETL-Silver")
.config("spark.sql.warehouse.dir", "/data/warehouse")
.getOrCreate()
.appName("RetailETL-Silver")
.config("spark.sql.warehouse.dir", "/data/warehouse")
.getOrCreate()
Read Bronze layer (Parquet or Delta)
读取青铜层数据(Parquet或Delta格式)
bronze_sales_df = spark.read.parquet("/data/bronze/sales/")
bronze_sales_df = spark.read.parquet("/data/bronze/sales/")
Silver transformations
白银层转换
silver_sales_df = bronze_sales_df
.dropDuplicates(["transaction_id"])
.filter(col("quantity") > 0)
.withColumn("unit_price", when(col("unit_price") < 0, 0).otherwise(col("unit_price")))
.withColumn("total_amount", col("quantity") * col("unit_price"))
.dropDuplicates(["transaction_id"])
.filter(col("quantity") > 0)
.withColumn("unit_price", when(col("unit_price") < 0, 0).otherwise(col("unit_price")))
.withColumn("total_amount", col("quantity") * col("unit_price"))
silver_sales_df = bronze_sales_df
.dropDuplicates(["transaction_id"])
.filter(col("quantity") > 0)
.withColumn("unit_price", when(col("unit_price") < 0, 0).otherwise(col("unit_price")))
.withColumn("total_amount", col("quantity") * col("unit_price"))
.dropDuplicates(["transaction_id"])
.filter(col("quantity") > 0)
.withColumn("unit_price", when(col("unit_price") < 0, 0).otherwise(col("unit_price")))
.withColumn("total_amount", col("quantity") * col("unit_price"))
Write to Silver layer
写入白银层
silver_sales_df.write
.mode("overwrite")
.partitionBy("branch_name", "sale_date")
.parquet("/data/silver/sales/")
.mode("overwrite")
.partitionBy("branch_name", "sale_date")
.parquet("/data/silver/sales/")
silver_sales_df.write
.mode("overwrite")
.partitionBy("branch_name", "sale_date")
.parquet("/data/silver/sales/")
.mode("overwrite")
.partitionBy("branch_name", "sale_date")
.parquet("/data/silver/sales/")
Gold aggregations
黄金层聚合
gold_sales_summary = silver_sales_df
.groupBy("branch_name", "product_category")
.agg( sum("quantity").alias("total_quantity"), sum("total_amount").alias("total_revenue"), avg("unit_price").alias("avg_unit_price"), count("transaction_id").alias("transaction_count") )
.groupBy("branch_name", "product_category")
.agg( sum("quantity").alias("total_quantity"), sum("total_amount").alias("total_revenue"), avg("unit_price").alias("avg_unit_price"), count("transaction_id").alias("transaction_count") )
gold_sales_summary.write
.mode("overwrite")
.parquet("/data/gold/sales_summary/")
.mode("overwrite")
.parquet("/data/gold/sales_summary/")
undefinedgold_sales_summary = silver_sales_df
.groupBy("branch_name", "product_category")
.agg( sum("quantity").alias("total_quantity"), sum("total_amount").alias("total_revenue"), avg("unit_price").alias("avg_unit_price"), count("transaction_id").alias("transaction_count") )
.groupBy("branch_name", "product_category")
.agg( sum("quantity").alias("total_quantity"), sum("total_amount").alias("total_revenue"), avg("unit_price").alias("avg_unit_price"), count("transaction_id").alias("transaction_count") )
gold_sales_summary.write
.mode("overwrite")
.parquet("/data/gold/sales_summary/")
.mode("overwrite")
.parquet("/data/gold/sales_summary/")
undefinedAirflow DAG Example
Airflow DAG示例
Orchestrate the entire pipeline with Apache Airflow:
python
from airflow import DAG
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'retail_etl_medallion',
default_args=default_args,
description='Daily Retail ETL Pipeline',
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
# Bronze ingestion
ingest_products = MsSqlOperator(
task_id='bronze_ingest_products',
mssql_conn_id='retail_sql_server',
sql='sql_scripts/01_bronze_products.sql',
)
ingest_sales = MsSqlOperator(
task_id='bronze_ingest_sales',
mssql_conn_id='retail_sql_server',
sql='sql_scripts/02_bronze_sales.sql',
)
ingest_stock = MsSqlOperator(
task_id='bronze_ingest_stock',
mssql_conn_id='retail_sql_server',
sql='sql_scripts/03_bronze_stock.sql',
)
# Silver transformations
transform_products = MsSqlOperator(
task_id='silver_transform_products',
mssql_conn_id='retail_sql_server',
sql='EXEC silver.usp_transform_products;',
)
transform_sales = MsSqlOperator(
task_id='silver_transform_sales',
mssql_conn_id='retail_sql_server',
sql='EXEC silver.usp_transform_sales;',
)
# Gold aggregations
aggregate_sales_summary = MsSqlOperator(
task_id='gold_sales_summary',
mssql_conn_id='retail_sql_server',
sql='EXEC gold.usp_sales_summary;',
)
aggregate_inventory_shrinkage = MsSqlOperator(
task_id='gold_inventory_shrinkage',
mssql_conn_id='retail_sql_server',
sql='EXEC gold.usp_inventory_shrinkage;',
)
# Define dependencies
[ingest_products, ingest_sales, ingest_stock] >> transform_products
[ingest_sales] >> transform_sales
[transform_products, transform_sales] >> aggregate_sales_summary
[transform_products, ingest_stock] >> aggregate_inventory_shrinkage使用Apache Airflow编排整个管道:
python
from airflow import DAG
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'retail_etl_medallion',
default_args=default_args,
description='Daily Retail ETL Pipeline',
schedule_interval='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
# 青铜层数据摄入
ingest_products = MsSqlOperator(
task_id='bronze_ingest_products',
mssql_conn_id='retail_sql_server',
sql='sql_scripts/01_bronze_products.sql',
)
ingest_sales = MsSqlOperator(
task_id='bronze_ingest_sales',
mssql_conn_id='retail_sql_server',
sql='sql_scripts/02_bronze_sales.sql',
)
ingest_stock = MsSqlOperator(
task_id='bronze_ingest_stock',
mssql_conn_id='retail_sql_server',
sql='sql_scripts/03_bronze_stock.sql',
)
# 白银层转换
transform_products = MsSqlOperator(
task_id='silver_transform_products',
mssql_conn_id='retail_sql_server',
sql='EXEC silver.usp_transform_products;',
)
transform_sales = MsSqlOperator(
task_id='silver_transform_sales',
mssql_conn_id='retail_sql_server',
sql='EXEC silver.usp_transform_sales;',
)
# 黄金层聚合
aggregate_sales_summary = MsSqlOperator(
task_id='gold_sales_summary',
mssql_conn_id='retail_sql_server',
sql='EXEC gold.usp_sales_summary;',
)
aggregate_inventory_shrinkage = MsSqlOperator(
task_id='gold_inventory_shrinkage',
mssql_conn_id='retail_sql_server',
sql='EXEC gold.usp_inventory_shrinkage;',
)
# 定义依赖关系
[ingest_products, ingest_sales, ingest_stock] >> transform_products
[ingest_sales] >> transform_sales
[transform_products, transform_sales] >> aggregate_sales_summary
[transform_products, ingest_stock] >> aggregate_inventory_shrinkageConfiguration
配置
Environment Variables
环境变量
bash
undefinedbash
undefinedSQL Server connection
SQL Server连接配置
export SQL_SERVER_HOST=localhost
export SQL_SERVER_PORT=1433
export SQL_SERVER_USER=sa
export SQL_SERVER_PASSWORD=${YOUR_SECURE_PASSWORD}
export SQL_SERVER_DATABASE=RetailDataWarehouse
export SQL_SERVER_HOST=localhost
export SQL_SERVER_PORT=1433
export SQL_SERVER_USER=sa
export SQL_SERVER_PASSWORD=${YOUR_SECURE_PASSWORD}
export SQL_SERVER_DATABASE=RetailDataWarehouse
Data paths
数据路径
export DATA_SOURCE_PATH=/data_source
export BRONZE_LAYER_PATH=/data/bronze
export SILVER_LAYER_PATH=/data/silver
export GOLD_LAYER_PATH=/data/gold
export DATA_SOURCE_PATH=/data_source
export BRONZE_LAYER_PATH=/data/bronze
export SILVER_LAYER_PATH=/data/silver
export GOLD_LAYER_PATH=/data/gold
Airflow (if used)
Airflow配置(如使用)
export AIRFLOW_CONN_RETAIL_SQL_SERVER="mssql://${SQL_SERVER_USER}:${SQL_SERVER_PASSWORD}@${SQL_SERVER_HOST}:${SQL_SERVER_PORT}/${SQL_SERVER_DATABASE}"
undefinedexport AIRFLOW_CONN_RETAIL_SQL_SERVER="mssql://${SQL_SERVER_USER}:${SQL_SERVER_PASSWORD}@${SQL_SERVER_HOST}:${SQL_SERVER_PORT}/${SQL_SERVER_DATABASE}"
undefinedDocker Compose Configuration
Docker Compose配置
yaml
undefinedyaml
undefineddocker-compose.yml
docker-compose.yml
version: '3.8'
services:
sqlserver:
image: mcr.microsoft.com/mssql/server:2019-latest
container_name: retail-sql-server
environment:
- ACCEPT_EULA=Y
- SA_PASSWORD=${SQL_SERVER_PASSWORD}
- MSSQL_PID=Developer
ports:
- "1433:1433"
volumes:
- ./data_source:/data_source
- ./sql_scripts:/sql_scripts
- sqlserver_data:/var/opt/mssql
volumes:
sqlserver_data:
undefinedversion: '3.8'
services:
sqlserver:
image: mcr.microsoft.com/mssql/server:2019-latest
container_name: retail-sql-server
environment:
- ACCEPT_EULA=Y
- SA_PASSWORD=${SQL_SERVER_PASSWORD}
- MSSQL_PID=Developer
ports:
- "1433:1433"
volumes:
- ./data_source:/data_source
- ./sql_scripts:/sql_scripts
- sqlserver_data:/var/opt/mssql
volumes:
sqlserver_data:
undefinedCommon Patterns & Use Cases
常见模式与用例
Use Case 1: Multi-Branch Sales Consolidation
用例1:多门店销售合并
sql
-- Combine sales from Alex, Cairo, and Giza branches
CREATE VIEW gold.vw_consolidated_sales AS
SELECT
'Alexandria' AS branch_name,
*
FROM bronze.alex_sales
UNION ALL
SELECT
'Cairo' AS branch_name,
*
FROM bronze.cairo_sales
UNION ALL
SELECT
'Giza' AS branch_name,
*
FROM bronze.giza_sales;sql
-- 合并亚历山大、开罗和吉萨门店的销售数据
CREATE VIEW gold.vw_consolidated_sales AS
SELECT
'Alexandria' AS branch_name,
*
FROM bronze.alex_sales
UNION ALL
SELECT
'Cairo' AS branch_name,
*
FROM bronze.cairo_sales
UNION ALL
SELECT
'Giza' AS branch_name,
*
FROM bronze.giza_sales;Use Case 2: Recipe Yield Tracking (Meat/Poultry)
用例2:配方产出追踪(肉/禽产品)
sql
-- Track conversion rates for processed meat products
CREATE TABLE silver.recipe_conversions (
recipe_id INT PRIMARY KEY,
raw_product_id INT,
finished_product_id INT,
conversion_ratio DECIMAL(5,2), -- e.g., 0.85 (15% waste)
effective_date DATE
);
-- Calculate actual yield vs. expected
SELECT
rc.recipe_id,
SUM(s.quantity * rc.conversion_ratio) AS expected_yield,
SUM(stock.quantity_on_hand) AS actual_yield,
(SUM(stock.quantity_on_hand) - SUM(s.quantity * rc.conversion_ratio)) AS yield_variance
FROM silver.sales s
INNER JOIN silver.recipe_conversions rc ON s.product_id = rc.raw_product_id
INNER JOIN silver.stock stock ON rc.finished_product_id = stock.product_id
GROUP BY rc.recipe_id;sql
-- 追踪加工肉制品的转化率
CREATE TABLE silver.recipe_conversions (
recipe_id INT PRIMARY KEY,
raw_product_id INT,
finished_product_id INT,
conversion_ratio DECIMAL(5,2), -- 例如:0.85(15%损耗)
effective_date DATE
);
-- 计算实际产出与预期产出
SELECT
rc.recipe_id,
SUM(s.quantity * rc.conversion_ratio) AS expected_yield,
SUM(stock.quantity_on_hand) AS actual_yield,
(SUM(stock.quantity_on_hand) - SUM(s.quantity * rc.conversion_ratio)) AS yield_variance
FROM silver.sales s
INNER JOIN silver.recipe_conversions rc ON s.product_id = rc.raw_product_id
INNER JOIN silver.stock stock ON rc.finished_product_id = stock.product_id
GROUP BY rc.recipe_id;Use Case 3: Supplier Rebate Tier Tracking
用例3:供应商返利层级追踪
sql
-- Track purchase volume for supplier rebate calculations
CREATE TABLE gold.supplier_rebate_tiers (
supplier_id INT,
total_purchases DECIMAL(15,2),
rebate_tier VARCHAR(20),
rebate_percentage DECIMAL(5,2),
calculated_at DATETIME2
);
INSERT INTO gold.supplier_rebate_tiers
SELECT
p.supplier_id,
SUM(s.quantity * s.unit_price) AS total_purchases,
CASE
WHEN SUM(s.quantity * s.unit_price) > 100000 THEN 'Platinum'
WHEN SUM(s.quantity * s.unit_price) > 50000 THEN 'Gold'
WHEN SUM(s.quantity * s.unit_price) > 25000 THEN 'Silver'
ELSE 'Bronze'
END AS rebate_tier,
CASE
WHEN SUM(s.quantity * s.unit_price) > 100000 THEN 5.0
WHEN SUM(s.quantity * s.unit_price) > 50000 THEN 3.5
WHEN SUM(s.quantity * s.unit_price) > 25000 THEN 2.0
ELSE 1.0
END AS rebate_percentage,
GETDATE() AS calculated_at
FROM silver.sales s
INNER JOIN silver.products p ON s.product_id = p.product_id
GROUP BY p.supplier_id;sql
-- 追踪采购量以计算供应商返利
CREATE TABLE gold.supplier_rebate_tiers (
supplier_id INT,
total_purchases DECIMAL(15,2),
rebate_tier VARCHAR(20),
rebate_percentage DECIMAL(5,2),
calculated_at DATETIME2
);
INSERT INTO gold.supplier_rebate_tiers
SELECT
p.supplier_id,
SUM(s.quantity * s.unit_price) AS total_purchases,
CASE
WHEN SUM(s.quantity * s.unit_price) > 100000 THEN 'Platinum'
WHEN SUM(s.quantity * s.unit_price) > 50000 THEN 'Gold'
WHEN SUM(s.quantity * s.unit_price) > 25000 THEN 'Silver'
ELSE 'Bronze'
END AS rebate_tier,
CASE
WHEN SUM(s.quantity * s.unit_price) > 100000 THEN 5.0
WHEN SUM(s.quantity * s.unit_price) > 50000 THEN 3.5
WHEN SUM(s.quantity * s.unit_price) > 25000 THEN 2.0
ELSE 1.0
END AS rebate_percentage,
GETDATE() AS calculated_at
FROM silver.sales s
INNER JOIN silver.products p ON s.product_id = p.product_id
GROUP BY p.supplier_id;Troubleshooting
故障排查
Issue 1: CSV Bulk Insert Fails
问题1:CSV批量插入失败
sql
-- Check file path permissions
EXEC xp_cmdshell 'dir C:\data_source\*.csv';
-- Use ERRORFILE to capture bad rows
BULK INSERT bronze.products
FROM '/data_source/000.Hypermarket Products.csv'
WITH (
FIELDTERMINATOR = ',',
ROWTERMINATOR = '\n',
FIRSTROW = 2,
ERRORFILE = '/data_source/errors/products_errors.csv',
MAXERRORS = 10
);sql
-- 检查文件路径权限
EXEC xp_cmdshell 'dir C:\data_source\*.csv';
-- 使用ERRORFILE捕获错误行
BULK INSERT bronze.products
FROM '/data_source/000.Hypermarket Products.csv'
WITH (
FIELDTERMINATOR = ',',
ROWTERMINATOR = '\n',
FIRSTROW = 2,
ERRORFILE = '/data_source/errors/products_errors.csv',
MAXERRORS = 10
);Issue 2: Duplicate Records in Silver Layer
问题2:白银层存在重复记录
sql
-- Add deduplication logic with ROW_NUMBER
WITH DeduplicatedSales AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY transaction_id
ORDER BY _loaded_at DESC
) AS rn
FROM bronze.sales
)
INSERT INTO silver.sales
SELECT *
FROM DeduplicatedSales
WHERE rn = 1;sql
-- 使用ROW_NUMBER添加去重逻辑
WITH DeduplicatedSales AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY transaction_id
ORDER BY _loaded_at DESC
) AS rn
FROM bronze.sales
)
INSERT INTO silver.sales
SELECT *
FROM DeduplicatedSales
WHERE rn = 1;Issue 3: Performance Optimization
问题3:性能优化
sql
-- Create indexes on frequently joined columns
CREATE NONCLUSTERED INDEX idx_sales_product_id
ON silver.sales(product_id) INCLUDE (quantity, unit_price);
CREATE NONCLUSTERED INDEX idx_stock_product_branch
ON silver.stock(product_id, branch_name) INCLUDE (quantity_on_hand);
-- Partition large tables by date
CREATE PARTITION FUNCTION pf_sales_date (DATE)
AS RANGE RIGHT FOR VALUES (
'2025-01-01', '2025-02-01', '2025-03-01', '2025-04-01'
);sql
-- 为频繁关联的列创建索引
CREATE NONCLUSTERED INDEX idx_sales_product_id
ON silver.sales(product_id) INCLUDE (quantity, unit_price);
CREATE NONCLUSTERED INDEX idx_stock_product_branch
ON silver.stock(product_id, branch_name) INCLUDE (quantity_on_hand);
-- 按日期对大表进行分区
CREATE PARTITION FUNCTION pf_sales_date (DATE)
AS RANGE RIGHT FOR VALUES (
'2025-01-01', '2025-02-01', '2025-03-01', '2025-04-01'
);Issue 4: Rebuild Entire Pipeline
问题4:重建整个管道
bash
undefinedbash
-- 使用重建脚本重置并重新处理所有层
sqlcmd -S ${SQL_SERVER_HOST},${SQL_SERVER_PORT} \
-U ${SQL_SERVER_USER} -P ${SQL_SERVER_PASSWORD} \
-d RetailDataWarehouse \
-i sql_scripts/12_rebuild_inventory_pipeline_final_fix.sqlUse the rebuild script to reset and reprocess all layers
数据质量测试
sqlcmd -S ${SQL_SERVER_HOST},${SQL_SERVER_PORT}
-U ${SQL_SERVER_USER} -P ${SQL_SERVER_PASSWORD}
-d RetailDataWarehouse
-i sql_scripts/12_rebuild_inventory_pipeline_final_fix.sql
-U ${SQL_SERVER_USER} -P ${SQL_SERVER_PASSWORD}
-d RetailDataWarehouse
-i sql_scripts/12_rebuild_inventory_pipeline_final_fix.sql
undefinedsql
-- 白银层数据质量检查
SELECT 'Products' AS layer,
COUNT(*) AS total_records,
COUNT(DISTINCT product_id) AS unique_products,
SUM(CASE WHEN unit_price < 0 THEN 1 ELSE 0 END) AS negative_prices,
SUM(CASE WHEN product_name IS NULL THEN 1 ELSE 0 END) AS null_names
FROM silver.products
UNION ALL
SELECT 'Sales' AS layer,
COUNT(*) AS total_records,
COUNT(DISTINCT transaction_id) AS unique_transactions,
SUM(CASE WHEN quantity <= 0 THEN 1 ELSE 0 END) AS invalid_quantity,
SUM(CASE WHEN product_id NOT IN (SELECT product_id FROM silver.products) THEN 1 ELSE 0 END) AS orphaned_products
FROM silver.sales;Testing Data Quality
最佳实践
sql
-- Data quality checks for Silver layer
SELECT 'Products' AS layer,
COUNT(*) AS total_records,
COUNT(DISTINCT product_id) AS unique_products,
SUM(CASE WHEN unit_price < 0 THEN 1 ELSE 0 END) AS negative_prices,
SUM(CASE WHEN product_name IS NULL THEN 1 ELSE 0 END) AS null_names
FROM silver.products
UNION ALL
SELECT 'Sales' AS layer,
COUNT(*) AS total_records,
COUNT(DISTINCT transaction_id) AS unique_transactions,
SUM(CASE WHEN quantity <= 0 THEN 1 ELSE 0 END) AS invalid_quantity,
SUM(CASE WHEN product_id NOT IN (SELECT product_id FROM silver.products) THEN 1 ELSE 0 END) AS orphaned_products
FROM silver.sales;- 始终按顺序逐层处理:青铜→白银→黄金
- 使用存储过程实现可复用的转换逻辑
- 添加审计列(、
_loaded_at、_source_file)processed_at - 实现幂等性:采用截断加载或更新插入模式
- 对大表进行分区:按日期或门店分区以提升性能
- 创建全面的索引:针对关联和过滤列创建索引
- 使用CTE提升复杂业务逻辑的可读性
- 在每层转换时测试数据质量
- 版本控制所有SQL脚本和配置
- 使用Airflow或同类编排工具监控管道执行
Best Practices
—
- Always process through layers sequentially: Bronze → Silver → Gold
- Use stored procedures for reusable transformations
- Add audit columns (,
_loaded_at,_source_file)processed_at - Implement idempotency: Truncate-and-load or upsert patterns
- Partition large tables by date or branch for performance
- Create comprehensive indexes on join and filter columns
- Use CTEs for complex business logic readability
- Test data quality at each layer transition
- Version control all SQL scripts and configurations
- Monitor pipeline execution with Airflow or equivalent orchestrator
—