enterprise-data-engineering-pipeline-ssis-pyspark
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseEnterprise Data Engineering Pipeline (SSIS + PySpark)
企业数据工程管道(SSIS + PySpark)
Overview
概述
This project provides a complete enterprise data engineering solution that combines:
- SSIS (SQL Server Integration Services) for ETL orchestration
- SQL Server with Star Schema data warehouse design (fact and dimension tables)
- Python (Pandas) for data quality audits and visualization
- PySpark for big data analytics and aggregation
The pipeline ingests raw CSV files (Sales, Products, Customers), transforms them through SSIS, loads into a dimensional model, and performs analytics at scale.
本项目提供完整的企业数据工程解决方案,整合了以下技术:
- SSIS(SQL Server Integration Services):用于ETL编排
- SQL Server:采用星型架构数据仓库设计(事实表与维度表)
- Python(Pandas):用于数据质量审计与可视化
- PySpark:用于大数据分析与聚合
该管道会读取原始CSV文件(销售、产品、客户数据),通过SSIS进行转换,加载到维度模型中,并执行大规模分析。
Architecture Components
架构组件
- Source Layer: Raw CSV files containing transactional and master data
- ETL Layer: SSIS packages handle extraction, transformation, error handling
- Storage Layer: SQL Server Data Warehouse with Star Schema
- Analytics Layer: Python/PySpark scripts for business intelligence
- 源层:包含交易数据和主数据的原始CSV文件
- ETL层:SSIS包负责数据提取、转换与错误处理
- 存储层:采用星型架构的SQL Server数据仓库
- 分析层:用于商业智能的Python/PySpark脚本
Installation & Setup
安装与配置
Prerequisites
前置依赖
bash
undefinedbash
undefinedRequired software
所需软件
- SQL Server 2019+ (Developer or Enterprise Edition)
- SQL Server Integration Services (SSIS)
- Visual Studio with SQL Server Data Tools (SSDT)
- Python 3.10+
- Java 8+ (for PySpark)
undefined- SQL Server 2019+(开发者版或企业版)
- SQL Server Integration Services (SSIS)
- 安装SQL Server Data Tools (SSDT)的Visual Studio
- Python 3.10+
- Java 8+(用于PySpark)
undefinedPython Dependencies
Python依赖包
bash
pip install pandas sqlalchemy pyodbc pyspark matplotlibbash
pip install pandas sqlalchemy pyodbc pyspark matplotlibDatabase Setup
数据库配置
sql
-- 01_Schema_Setup.sql
-- Create the data warehouse database
CREATE DATABASE EnterpriseDataWarehouse;
GO
USE EnterpriseDataWarehouse;
GO
-- Dimension: Customers
CREATE TABLE dim_Customers (
CustomerID INT PRIMARY KEY,
CustomerName NVARCHAR(100),
Email NVARCHAR(100),
Region NVARCHAR(50),
RegistrationDate DATE
);
-- Dimension: Products
CREATE TABLE dim_Products (
ProductID INT PRIMARY KEY,
ProductName NVARCHAR(100),
Category NVARCHAR(50),
UnitPrice DECIMAL(10, 2)
);
-- Fact: Sales
CREATE TABLE fact_Sales (
SaleID INT PRIMARY KEY,
CustomerID INT FOREIGN KEY REFERENCES dim_Customers(CustomerID),
ProductID INT FOREIGN KEY REFERENCES dim_Products(ProductID),
Quantity INT,
SaleDate DATE,
TotalAmount DECIMAL(10, 2)
);
-- Business Intelligence View: Revenue by Product
CREATE VIEW vw_RevenueByProduct AS
SELECT
p.ProductName,
p.Category,
SUM(s.TotalAmount) AS TotalRevenue,
SUM(s.Quantity) AS TotalQuantity
FROM fact_Sales s
INNER JOIN dim_Products p ON s.ProductID = p.ProductID
GROUP BY p.ProductName, p.Category;
-- Business Intelligence View: Customer Lifetime Value
CREATE VIEW vw_CustomerLTV AS
SELECT
c.CustomerID,
c.CustomerName,
c.Region,
COUNT(s.SaleID) AS TotalPurchases,
SUM(s.TotalAmount) AS LifetimeValue
FROM dim_Customers c
LEFT JOIN fact_Sales s ON c.CustomerID = s.CustomerID
GROUP BY c.CustomerID, c.CustomerName, c.Region;sql
-- 01_Schema_Setup.sql
-- 创建数据仓库数据库
CREATE DATABASE EnterpriseDataWarehouse;
GO
USE EnterpriseDataWarehouse;
GO
-- 维度表:客户
CREATE TABLE dim_Customers (
CustomerID INT PRIMARY KEY,
CustomerName NVARCHAR(100),
Email NVARCHAR(100),
Region NVARCHAR(50),
RegistrationDate DATE
);
-- 维度表:产品
CREATE TABLE dim_Products (
ProductID INT PRIMARY KEY,
ProductName NVARCHAR(100),
Category NVARCHAR(50),
UnitPrice DECIMAL(10, 2)
);
-- 事实表:销售
CREATE TABLE fact_Sales (
SaleID INT PRIMARY KEY,
CustomerID INT FOREIGN KEY REFERENCES dim_Customers(CustomerID),
ProductID INT FOREIGN KEY REFERENCES dim_Products(ProductID),
Quantity INT,
SaleDate DATE,
TotalAmount DECIMAL(10, 2)
);
-- 商业智能视图:按产品统计收入
CREATE VIEW vw_RevenueByProduct AS
SELECT
p.ProductName,
p.Category,
SUM(s.TotalAmount) AS TotalRevenue,
SUM(s.Quantity) AS TotalQuantity
FROM fact_Sales s
INNER JOIN dim_Products p ON s.ProductID = p.ProductID
GROUP BY p.ProductName, p.Category;
-- 商业智能视图:客户生命周期价值
CREATE VIEW vw_CustomerLTV AS
SELECT
c.CustomerID,
c.CustomerName,
c.Region,
COUNT(s.SaleID) AS TotalPurchases,
SUM(s.TotalAmount) AS LifetimeValue
FROM dim_Customers c
LEFT JOIN fact_Sales s ON c.CustomerID = s.CustomerID
GROUP BY c.CustomerID, c.CustomerName, c.Region;SSIS Package Configuration
SSIS包配置
Creating the SSIS Project
创建SSIS项目
- Open Visual Studio with SSDT
- Create new Integration Services Project:
EnterpriseETL.sln - Add Connection Managers:
- Source_FlatFile: Points to CSV directory
- Destination_OLEDB: SQL Server connection string
- 打开安装了SSDT的Visual Studio
- 创建新的集成服务项目:
EnterpriseETL.sln - 添加连接管理器:
- Source_FlatFile:指向CSV文件目录
- Destination_OLEDB:SQL Server连接字符串
SSIS Package Flow
SSIS包流程
xml
<!-- Key SSIS Components -->
<!-- Data Flow Task: Load dim_Customers -->
- Flat File Source (Customers.csv)
- Data Conversion (handle Unicode, trim strings)
- Derived Column (add audit columns)
- OLEDB Destination (dim_Customers)
<!-- Data Flow Task: Load dim_Products -->
- Flat File Source (Products.csv)
- Data Conversion (decimal precision for prices)
- OLEDB Destination (dim_Products)
<!-- Data Flow Task: Load fact_Sales -->
- Flat File Source (Sales.csv)
- Lookup Transformation (validate CustomerID, ProductID)
- Derived Column (calculate TotalAmount = Quantity * UnitPrice)
- OLEDB Destination (fact_Sales)xml
<!-- 核心SSIS组件 -->
<!-- 数据流任务:加载dim_Customers -->
- 平面文件源(Customers.csv)
- 数据转换(处理Unicode、修剪字符串)
- 派生列(添加审计列)
- OLEDB目标(dim_Customers)
<!-- 数据流任务:加载dim_Products -->
- 平面文件源(Products.csv)
- 数据转换(调整价格的小数精度)
- OLEDB目标(dim_Products)
<!-- 数据流任务:加载fact_Sales -->
- 平面文件源(Sales.csv)
- 查找转换(验证CustomerID、ProductID)
- 派生列(计算TotalAmount = Quantity * UnitPrice)
- OLEDB目标(fact_Sales)Error Handling in SSIS
SSIS错误处理
sql
-- Create error logging table
CREATE TABLE ETL_ErrorLog (
ErrorID INT IDENTITY(1,1) PRIMARY KEY,
PackageName NVARCHAR(100),
TaskName NVARCHAR(100),
ErrorDescription NVARCHAR(MAX),
ErrorDate DATETIME DEFAULT GETDATE()
);sql
-- 创建错误日志表
CREATE TABLE ETL_ErrorLog (
ErrorID INT IDENTITY(1,1) PRIMARY KEY,
PackageName NVARCHAR(100),
TaskName NVARCHAR(100),
ErrorDescription NVARCHAR(MAX),
ErrorDate DATETIME DEFAULT GETDATE()
);Python Analytics
Python分析
Data Quality Audit Script
数据质量审计脚本
python
undefinedpython
undefinedproject_audit.py
project_audit.py
import pandas as pd
import pyodbc
from sqlalchemy import create_engine
import matplotlib.pyplot as plt
import pandas as pd
import pyodbc
from sqlalchemy import create_engine
import matplotlib.pyplot as plt
Database connection
数据库连接
def get_connection():
conn_str = (
"mssql+pyodbc:///?odbc_connect="
"DRIVER={ODBC Driver 17 for SQL Server};"
f"SERVER={os.getenv('SQL_SERVER')};"
f"DATABASE={os.getenv('SQL_DATABASE')};"
"Trusted_Connection=yes;"
)
return create_engine(conn_str)
def get_connection():
conn_str = (
"mssql+pyodbc:///?odbc_connect="
"DRIVER={ODBC Driver 17 for SQL Server};"
f"SERVER={os.getenv('SQL_SERVER')};"
f"DATABASE={os.getenv('SQL_DATABASE')};"
"Trusted_Connection=yes;"
)
return create_engine(conn_str)
Data Quality Checks
数据质量检查
def run_audit():
engine = get_connection()
# Check 1: Null values in critical columns
query_nulls = """
SELECT
'dim_Customers' AS TableName,
SUM(CASE WHEN CustomerName IS NULL THEN 1 ELSE 0 END) AS NullCustomerName,
SUM(CASE WHEN Email IS NULL THEN 1 ELSE 0 END) AS NullEmail
FROM dim_Customers
UNION ALL
SELECT
'fact_Sales',
SUM(CASE WHEN CustomerID IS NULL THEN 1 ELSE 0 END),
SUM(CASE WHEN ProductID IS NULL THEN 1 ELSE 0 END)
FROM fact_Sales
"""
df_nulls = pd.read_sql(query_nulls, engine)
print("Null Value Audit:")
print(df_nulls)
# Check 2: Orphaned records (referential integrity)
query_orphans = """
SELECT COUNT(*) AS OrphanedSales
FROM fact_Sales s
WHERE NOT EXISTS (SELECT 1 FROM dim_Customers c WHERE c.CustomerID = s.CustomerID)
OR NOT EXISTS (SELECT 1 FROM dim_Products p WHERE p.ProductID = s.ProductID)
"""
df_orphans = pd.read_sql(query_orphans, engine)
print("\nOrphaned Records:")
print(df_orphans)
# Check 3: Revenue distribution
query_revenue = "SELECT * FROM vw_RevenueByProduct ORDER BY TotalRevenue DESC"
df_revenue = pd.read_sql(query_revenue, engine)
# Visualization
plt.figure(figsize=(10, 6))
plt.bar(df_revenue['ProductName'][:10], df_revenue['TotalRevenue'][:10])
plt.xlabel('Product')
plt.ylabel('Total Revenue')
plt.title('Top 10 Products by Revenue')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.savefig('revenue_analysis.png')
print("\nRevenue chart saved to revenue_analysis.png")if name == "main":
run_audit()
undefineddef run_audit():
engine = get_connection()
# 检查1:关键列的空值情况
query_nulls = """
SELECT
'dim_Customers' AS TableName,
SUM(CASE WHEN CustomerName IS NULL THEN 1 ELSE 0 END) AS NullCustomerName,
SUM(CASE WHEN Email IS NULL THEN 1 ELSE 0 END) AS NullEmail
FROM dim_Customers
UNION ALL
SELECT
'fact_Sales',
SUM(CASE WHEN CustomerID IS NULL THEN 1 ELSE 0 END),
SUM(CASE WHEN ProductID IS NULL THEN 1 ELSE 0 END)
FROM fact_Sales
"""
df_nulls = pd.read_sql(query_nulls, engine)
print("空值审计结果:")
print(df_nulls)
# 检查2:孤立记录(参照完整性)
query_orphans = """
SELECT COUNT(*) AS OrphanedSales
FROM fact_Sales s
WHERE NOT EXISTS (SELECT 1 FROM dim_Customers c WHERE c.CustomerID = s.CustomerID)
OR NOT EXISTS (SELECT 1 FROM dim_Products p WHERE p.ProductID = s.ProductID)
"""
df_orphans = pd.read_sql(query_orphans, engine)
print("\n孤立记录:")
print(df_orphans)
# 检查3:收入分布
query_revenue = "SELECT * FROM vw_RevenueByProduct ORDER BY TotalRevenue DESC"
df_revenue = pd.read_sql(query_revenue, engine)
# 可视化
plt.figure(figsize=(10, 6))
plt.bar(df_revenue['ProductName'][:10], df_revenue['TotalRevenue'][:10])
plt.xlabel('产品')
plt.ylabel('总收入')
plt.title('收入Top 10产品')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.savefig('revenue_analysis.png')
print("\n收入分析图表已保存至revenue_analysis.png")if name == "main":
run_audit()
undefinedCustomer Segmentation Analysis
客户细分分析
python
undefinedpython
undefinedcustomer_segmentation.py
customer_segmentation.py
import pandas as pd
from sqlalchemy import create_engine
import os
def segment_customers():
engine = create_engine(
f"mssql+pyodbc:///?odbc_connect="
f"DRIVER={{ODBC Driver 17 for SQL Server}};"
f"SERVER={os.getenv('SQL_SERVER')};"
f"DATABASE={os.getenv('SQL_DATABASE')};"
f"Trusted_Connection=yes;"
)
# Load customer LTV data
df = pd.read_sql("SELECT * FROM vw_CustomerLTV", engine)
# RFM-style segmentation
df['Segment'] = pd.cut(
df['LifetimeValue'],
bins=[0, 1000, 5000, float('inf')],
labels=['Bronze', 'Silver', 'Gold']
)
# Aggregate by segment
segment_summary = df.groupby('Segment').agg({
'CustomerID': 'count',
'LifetimeValue': 'sum',
'TotalPurchases': 'mean'
}).reset_index()
print(segment_summary)
return segment_summaryundefinedimport pandas as pd
from sqlalchemy import create_engine
import os
def segment_customers():
engine = create_engine(
f"mssql+pyodbc:///?odbc_connect="
f"DRIVER={{ODBC Driver 17 for SQL Server}};"
f"SERVER={os.getenv('SQL_SERVER')};"
f"DATABASE={os.getenv('SQL_DATABASE')};"
f"Trusted_Connection=yes;"
)
# 加载客户LTV数据
df = pd.read_sql("SELECT * FROM vw_CustomerLTV", engine)
# RFM风格细分
df['Segment'] = pd.cut(
df['LifetimeValue'],
bins=[0, 1000, 5000, float('inf')],
labels=['青铜', '白银', '黄金']
)
# 按细分维度聚合
segment_summary = df.groupby('Segment').agg({
'CustomerID': 'count',
'LifetimeValue': 'sum',
'TotalPurchases': 'mean'
}).reset_index()
print(segment_summary)
return segment_summaryundefinedPySpark Big Data Processing
PySpark大数据处理
High-Volume Sales Aggregation
高容量销售聚合
python
undefinedpython
undefinedpyspark_analytics.py
pyspark_analytics.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, count, avg, year, month
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, count, avg, year, month
import os
Initialize Spark
初始化Spark
spark = SparkSession.builder
.appName("EnterpriseSalesAnalytics")
.config("spark.jars", "mssql-jdbc-9.4.0.jre8.jar")
.getOrCreate()
.appName("EnterpriseSalesAnalytics")
.config("spark.jars", "mssql-jdbc-9.4.0.jre8.jar")
.getOrCreate()
spark = SparkSession.builder
.appName("EnterpriseSalesAnalytics")
.config("spark.jars", "mssql-jdbc-9.4.0.jre8.jar")
.getOrCreate()
.appName("EnterpriseSalesAnalytics")
.config("spark.jars", "mssql-jdbc-9.4.0.jre8.jar")
.getOrCreate()
JDBC connection properties
JDBC连接属性
jdbc_url = f"jdbc:sqlserver://{os.getenv('SQL_SERVER')}:1433;databaseName={os.getenv('SQL_DATABASE')}"
connection_properties = {
"user": os.getenv('SQL_USER'),
"password": os.getenv('SQL_PASSWORD'),
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
jdbc_url = f"jdbc:sqlserver://{os.getenv('SQL_SERVER')}:1433;databaseName={os.getenv('SQL_DATABASE')}"
connection_properties = {
"user": os.getenv('SQL_USER'),
"password": os.getenv('SQL_PASSWORD'),
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
Load data from SQL Server
从SQL Server加载数据
df_sales = spark.read.jdbc(
url=jdbc_url,
table="fact_Sales",
properties=connection_properties
)
df_customers = spark.read.jdbc(
url=jdbc_url,
table="dim_Customers",
properties=connection_properties
)
df_products = spark.read.jdbc(
url=jdbc_url,
table="dim_Products",
properties=connection_properties
)
df_sales = spark.read.jdbc(
url=jdbc_url,
table="fact_Sales",
properties=connection_properties
)
df_customers = spark.read.jdbc(
url=jdbc_url,
table="dim_Customers",
properties=connection_properties
)
df_products = spark.read.jdbc(
url=jdbc_url,
table="dim_Products",
properties=connection_properties
)
Join and aggregate
关联与聚合
df_combined = df_sales
.join(df_customers, "CustomerID")
.join(df_products, "ProductID")
.join(df_customers, "CustomerID")
.join(df_products, "ProductID")
df_combined = df_sales
.join(df_customers, "CustomerID")
.join(df_products, "ProductID")
.join(df_customers, "CustomerID")
.join(df_products, "ProductID")
Monthly revenue analysis
月度收入分析
df_monthly = df_combined
.withColumn("Year", year(col("SaleDate")))
.withColumn("Month", month(col("SaleDate")))
.groupBy("Year", "Month", "Region")
.agg( _sum("TotalAmount").alias("MonthlyRevenue"), count("SaleID").alias("TransactionCount"), avg("TotalAmount").alias("AvgTransactionValue") )
.orderBy("Year", "Month", "Region")
.withColumn("Year", year(col("SaleDate")))
.withColumn("Month", month(col("SaleDate")))
.groupBy("Year", "Month", "Region")
.agg( _sum("TotalAmount").alias("MonthlyRevenue"), count("SaleID").alias("TransactionCount"), avg("TotalAmount").alias("AvgTransactionValue") )
.orderBy("Year", "Month", "Region")
df_monthly.show(20)
df_monthly = df_combined
.withColumn("Year", year(col("SaleDate")))
.withColumn("Month", month(col("SaleDate")))
.groupBy("Year", "Month", "Region")
.agg( _sum("TotalAmount").alias("MonthlyRevenue"), count("SaleID").alias("TransactionCount"), avg("TotalAmount").alias("AvgTransactionValue") )
.orderBy("Year", "Month", "Region")
.withColumn("Year", year(col("SaleDate")))
.withColumn("Month", month(col("SaleDate")))
.groupBy("Year", "Month", "Region")
.agg( _sum("TotalAmount").alias("MonthlyRevenue"), count("SaleID").alias("TransactionCount"), avg("TotalAmount").alias("AvgTransactionValue") )
.orderBy("Year", "Month", "Region")
df_monthly.show(20)
Write results back to SQL Server
将结果写回SQL Server
df_monthly.write.jdbc(
url=jdbc_url,
table="analytics_MonthlyRevenue",
mode="overwrite",
properties=connection_properties
)
df_monthly.write.jdbc(
url=jdbc_url,
table="analytics_MonthlyRevenue",
mode="overwrite",
properties=connection_properties
)
Product performance by category
按品类统计产品表现
df_category = df_combined
.groupBy("Category")
.agg( _sum("TotalAmount").alias("CategoryRevenue"), _sum("Quantity").alias("TotalUnitsSold"), count("CustomerID").distinct().alias("UniqueCustomers") )
.orderBy(col("CategoryRevenue").desc())
.groupBy("Category")
.agg( _sum("TotalAmount").alias("CategoryRevenue"), _sum("Quantity").alias("TotalUnitsSold"), count("CustomerID").distinct().alias("UniqueCustomers") )
.orderBy(col("CategoryRevenue").desc())
df_category.show()
spark.stop()
undefineddf_category = df_combined
.groupBy("Category")
.agg( _sum("TotalAmount").alias("CategoryRevenue"), _sum("Quantity").alias("TotalUnitsSold"), count("CustomerID").distinct().alias("UniqueCustomers") )
.orderBy(col("CategoryRevenue").desc())
.groupBy("Category")
.agg( _sum("TotalAmount").alias("CategoryRevenue"), _sum("Quantity").alias("TotalUnitsSold"), count("CustomerID").distinct().alias("UniqueCustomers") )
.orderBy(col("CategoryRevenue").desc())
df_category.show()
spark.stop()
undefinedParallel Processing for Large Datasets
大数据集并行处理
python
undefinedpython
undefinedbatch_processing.py
batch_processing.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
spark = SparkSession.builder
.appName("BatchDataProcessing")
.config("spark.executor.memory", "4g")
.config("spark.executor.cores", "4")
.getOrCreate()
.appName("BatchDataProcessing")
.config("spark.executor.memory", "4g")
.config("spark.executor.cores", "4")
.getOrCreate()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
spark = SparkSession.builder
.appName("BatchDataProcessing")
.config("spark.executor.memory", "4g")
.config("spark.executor.cores", "4")
.getOrCreate()
.appName("BatchDataProcessing")
.config("spark.executor.memory", "4g")
.config("spark.executor.cores", "4")
.getOrCreate()
Read large CSV files
读取大型CSV文件
df_raw = spark.read.csv(
"hdfs://data/sales/*.csv",
header=True,
inferSchema=True
)
df_raw = spark.read.csv(
"hdfs://data/sales/*.csv",
header=True,
inferSchema=True
)
Data quality transformations
数据质量转换
df_cleaned = df_raw
.filter(col("TotalAmount").isNotNull())
.filter(col("Quantity") > 0)
.withColumn( "IsHighValue", when(col("TotalAmount") > 1000, "Yes").otherwise("No") )
.dropDuplicates(["SaleID"])
.filter(col("TotalAmount").isNotNull())
.filter(col("Quantity") > 0)
.withColumn( "IsHighValue", when(col("TotalAmount") > 1000, "Yes").otherwise("No") )
.dropDuplicates(["SaleID"])
df_cleaned = df_raw
.filter(col("TotalAmount").isNotNull())
.filter(col("Quantity") > 0)
.withColumn( "IsHighValue", when(col("TotalAmount") > 1000, "Yes").otherwise("No") )
.dropDuplicates(["SaleID"])
.filter(col("TotalAmount").isNotNull())
.filter(col("Quantity") > 0)
.withColumn( "IsHighValue", when(col("TotalAmount") > 1000, "Yes").otherwise("No") )
.dropDuplicates(["SaleID"])
Partition by date for efficient querying
按日期分区以优化查询
df_cleaned.write
.partitionBy("SaleDate")
.mode("overwrite")
.parquet("hdfs://data/processed/sales_cleaned")
.partitionBy("SaleDate")
.mode("overwrite")
.parquet("hdfs://data/processed/sales_cleaned")
print(f"Processed {df_cleaned.count()} records")
undefineddf_cleaned.write
.partitionBy("SaleDate")
.mode("overwrite")
.parquet("hdfs://data/processed/sales_cleaned")
.partitionBy("SaleDate")
.mode("overwrite")
.parquet("hdfs://data/processed/sales_cleaned")
print(f"已处理 {df_cleaned.count()} 条记录")
undefinedCommon Patterns
通用模式
Incremental ETL (Load Only New Records)
增量ETL(仅加载新记录)
sql
-- Create staging table
CREATE TABLE stg_Sales (
SaleID INT,
CustomerID INT,
ProductID INT,
Quantity INT,
SaleDate DATE,
TotalAmount DECIMAL(10, 2),
LoadDate DATETIME DEFAULT GETDATE()
);
-- Merge statement for incremental load
MERGE INTO fact_Sales AS target
USING stg_Sales AS source
ON target.SaleID = source.SaleID
WHEN MATCHED THEN
UPDATE SET
Quantity = source.Quantity,
TotalAmount = source.TotalAmount
WHEN NOT MATCHED THEN
INSERT (SaleID, CustomerID, ProductID, Quantity, SaleDate, TotalAmount)
VALUES (source.SaleID, source.CustomerID, source.ProductID,
source.Quantity, source.SaleDate, source.TotalAmount);sql
-- 创建临时表
CREATE TABLE stg_Sales (
SaleID INT,
CustomerID INT,
ProductID INT,
Quantity INT,
SaleDate DATE,
TotalAmount DECIMAL(10, 2),
LoadDate DATETIME DEFAULT GETDATE()
);
-- 增量加载的Merge语句
MERGE INTO fact_Sales AS target
USING stg_Sales AS source
ON target.SaleID = source.SaleID
WHEN MATCHED THEN
UPDATE SET
Quantity = source.Quantity,
TotalAmount = source.TotalAmount
WHEN NOT MATCHED THEN
INSERT (SaleID, CustomerID, ProductID, Quantity, SaleDate, TotalAmount)
VALUES (source.SaleID, source.CustomerID, source.ProductID,
source.Quantity, source.SaleDate, source.TotalAmount);Automated Data Refresh
自动数据刷新
python
undefinedpython
undefinedscheduled_refresh.py
scheduled_refresh.py
import subprocess
import os
from datetime import datetime
def run_ssis_package():
"""Execute SSIS package via dtexec"""
package_path = r"C:\SSIS\EnterpriseETL\EnterpriseETL\Package.dtsx"
cmd = [
"dtexec",
"/FILE", package_path,
"/REPORTING", "E"
]
result = subprocess.run(cmd, capture_output=True, text=True)
log_file = f"etl_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(log_file, 'w') as f:
f.write(result.stdout)
f.write(result.stderr)
return result.returncode == 0def run_analytics():
"""Execute Python analytics after ETL"""
subprocess.run(["python", "project_audit.py"])
subprocess.run(["python", "pyspark_analytics.py"])
if name == "main":
if run_ssis_package():
print("ETL completed successfully")
run_analytics()
else:
print("ETL failed - check logs")
undefinedimport subprocess
import os
from datetime import datetime
def run_ssis_package():
"""通过dtexec执行SSIS包"""
package_path = r"C:\SSIS\EnterpriseETL\EnterpriseETL\Package.dtsx"
cmd = [
"dtexec",
"/FILE", package_path,
"/REPORTING", "E"
]
result = subprocess.run(cmd, capture_output=True, text=True)
log_file = f"etl_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(log_file, 'w') as f:
f.write(result.stdout)
f.write(result.stderr)
return result.returncode == 0def run_analytics():
"""ETL完成后执行Python分析脚本"""
subprocess.run(["python", "project_audit.py"])
subprocess.run(["python", "pyspark_analytics.py"])
if name == "main":
if run_ssis_package():
print("ETL执行成功")
run_analytics()
else:
print("ETL执行失败 - 请查看日志")
undefinedTroubleshooting
故障排查
SSIS Connection Issues
SSIS连接问题
plaintext
Error: "Cannot acquire connection to SQL Server"
Solution:
1. Verify SQL Server service is running
2. Check Windows Authentication vs SQL Authentication
3. Update connection string in Connection Manager
4. Enable TCP/IP protocol in SQL Server Configuration Managerplaintext
错误:"无法连接到SQL Server"
解决方案:
1. 验证SQL Server服务是否运行
2. 检查Windows身份验证与SQL身份验证设置
3. 更新连接管理器中的连接字符串
4. 在SQL Server配置管理器中启用TCP/IP协议Unicode/Encoding Errors
Unicode/编码错误
plaintext
Error: "Cannot convert between unicode and non-unicode string data types"
Solution in SSIS:
1. Add Data Conversion task
2. Convert DT_STR to DT_WSTR for Unicode columns
3. Set CodePage to 1252 (Windows Latin 1) in Flat File Connectionplaintext
错误:"无法在Unicode与非Unicode字符串数据类型之间转换"
SSIS中的解决方案:
1. 添加数据转换任务
2. 将DT_STR转换为DT_WSTR以支持Unicode列
3. 在平面文件连接中设置代码页为1252(Windows Latin 1)PySpark JDBC Driver Not Found
PySpark JDBC驱动未找到
bash
undefinedbash
undefinedDownload Microsoft JDBC driver
下载Microsoft JDBC驱动
Add to Spark session
添加到Spark会话
spark = SparkSession.builder
.config("spark.jars", "/path/to/mssql-jdbc-9.4.0.jre8.jar")
.getOrCreate()
.config("spark.jars", "/path/to/mssql-jdbc-9.4.0.jre8.jar")
.getOrCreate()
undefinedspark = SparkSession.builder
.config("spark.jars", "/path/to/mssql-jdbc-9.4.0.jre8.jar")
.getOrCreate()
.config("spark.jars", "/path/to/mssql-jdbc-9.4.0.jre8.jar")
.getOrCreate()
undefinedPerformance Optimization
性能优化
python
undefinedpython
undefinedEnable broadcast join for small dimension tables
为小型维度表启用广播连接
from pyspark.sql.functions import broadcast
df_result = df_sales.join(
broadcast(df_products),
"ProductID"
)
from pyspark.sql.functions import broadcast
df_result = df_sales.join(
broadcast(df_products),
"ProductID"
)
Cache frequently accessed DataFrames
缓存频繁访问的DataFrame
df_sales.cache()
df_sales.count() # Trigger caching
undefineddf_sales.cache()
df_sales.count() # 触发缓存
undefinedEnvironment Variables
环境变量
bash
undefinedbash
undefined.env file
.env文件
SQL_SERVER=localhost
SQL_DATABASE=EnterpriseDataWarehouse
SQL_USER=your_username
SQL_PASSWORD=your_password
SQL_SERVER=localhost
SQL_DATABASE=EnterpriseDataWarehouse
SQL_USER=your_username
SQL_PASSWORD=your_password
For PySpark
用于PySpark
SPARK_HOME=/path/to/spark
JAVA_HOME=/path/to/java
undefinedSPARK_HOME=/path/to/spark
JAVA_HOME=/path/to/java
undefinedRunning the Complete Pipeline
运行完整管道
bash
undefinedbash
undefinedStep 1: Setup database
步骤1:配置数据库
sqlcmd -S localhost -i 01_Schema_Setup.sql
sqlcmd -S localhost -i 01_Schema_Setup.sql
Step 2: Run SSIS package (via Visual Studio or dtexec)
步骤2:运行SSIS包(通过Visual Studio或dtexec)
dtexec /FILE "EnterpriseETL\Package.dtsx"
dtexec /FILE "EnterpriseETL\Package.dtsx"
Step 3: Run data quality audit
步骤3:运行数据质量审计
python project_audit.py
python project_audit.py
Step 4: Run PySpark analytics
步骤4:运行PySpark分析
spark-submit --jars mssql-jdbc-9.4.0.jre8.jar pyspark_analytics.py
This enterprise pipeline provides a complete solution for data warehousing, ETL automation, and big data analytics using industry-standard Microsoft and Apache technologies.spark-submit --jars mssql-jdbc-9.4.0.jre8.jar pyspark_analytics.py
该企业级管道提供了完整的数据仓库、ETL自动化与大数据分析解决方案,采用行业标准的微软与Apache技术。