enterprise-data-engineering-pipeline-ssis-pyspark

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Enterprise Data Engineering Pipeline (SSIS + PySpark)

企业数据工程管道(SSIS + PySpark)

Skill by ara.so — Data Skills collection.
技能来自 ara.so — 数据技能合集。

Overview

概述

This project provides a complete enterprise data engineering solution that combines:
  • SSIS (SQL Server Integration Services) for ETL orchestration
  • SQL Server with Star Schema data warehouse design (fact and dimension tables)
  • Python (Pandas) for data quality audits and visualization
  • PySpark for big data analytics and aggregation
The pipeline ingests raw CSV files (Sales, Products, Customers), transforms them through SSIS, loads into a dimensional model, and performs analytics at scale.
本项目提供完整的企业数据工程解决方案,整合了以下技术:
  • SSIS(SQL Server Integration Services):用于ETL编排
  • SQL Server:采用星型架构数据仓库设计(事实表与维度表)
  • Python(Pandas):用于数据质量审计与可视化
  • PySpark:用于大数据分析与聚合
该管道会读取原始CSV文件(销售、产品、客户数据),通过SSIS进行转换,加载到维度模型中,并执行大规模分析。

Architecture Components

架构组件

  1. Source Layer: Raw CSV files containing transactional and master data
  2. ETL Layer: SSIS packages handle extraction, transformation, error handling
  3. Storage Layer: SQL Server Data Warehouse with Star Schema
  4. Analytics Layer: Python/PySpark scripts for business intelligence
  1. 源层:包含交易数据和主数据的原始CSV文件
  2. ETL层:SSIS包负责数据提取、转换与错误处理
  3. 存储层:采用星型架构的SQL Server数据仓库
  4. 分析层:用于商业智能的Python/PySpark脚本

Installation & Setup

安装与配置

Prerequisites

前置依赖

bash
undefined
bash
undefined

Required software

所需软件

  • SQL Server 2019+ (Developer or Enterprise Edition)
  • SQL Server Integration Services (SSIS)
  • Visual Studio with SQL Server Data Tools (SSDT)
  • Python 3.10+
  • Java 8+ (for PySpark)
undefined
  • SQL Server 2019+(开发者版或企业版)
  • SQL Server Integration Services (SSIS)
  • 安装SQL Server Data Tools (SSDT)的Visual Studio
  • Python 3.10+
  • Java 8+(用于PySpark)
undefined

Python Dependencies

Python依赖包

bash
pip install pandas sqlalchemy pyodbc pyspark matplotlib
bash
pip install pandas sqlalchemy pyodbc pyspark matplotlib

Database Setup

数据库配置

sql
-- 01_Schema_Setup.sql
-- Create the data warehouse database
CREATE DATABASE EnterpriseDataWarehouse;
GO

USE EnterpriseDataWarehouse;
GO

-- Dimension: Customers
CREATE TABLE dim_Customers (
    CustomerID INT PRIMARY KEY,
    CustomerName NVARCHAR(100),
    Email NVARCHAR(100),
    Region NVARCHAR(50),
    RegistrationDate DATE
);

-- Dimension: Products
CREATE TABLE dim_Products (
    ProductID INT PRIMARY KEY,
    ProductName NVARCHAR(100),
    Category NVARCHAR(50),
    UnitPrice DECIMAL(10, 2)
);

-- Fact: Sales
CREATE TABLE fact_Sales (
    SaleID INT PRIMARY KEY,
    CustomerID INT FOREIGN KEY REFERENCES dim_Customers(CustomerID),
    ProductID INT FOREIGN KEY REFERENCES dim_Products(ProductID),
    Quantity INT,
    SaleDate DATE,
    TotalAmount DECIMAL(10, 2)
);

-- Business Intelligence View: Revenue by Product
CREATE VIEW vw_RevenueByProduct AS
SELECT 
    p.ProductName,
    p.Category,
    SUM(s.TotalAmount) AS TotalRevenue,
    SUM(s.Quantity) AS TotalQuantity
FROM fact_Sales s
INNER JOIN dim_Products p ON s.ProductID = p.ProductID
GROUP BY p.ProductName, p.Category;

-- Business Intelligence View: Customer Lifetime Value
CREATE VIEW vw_CustomerLTV AS
SELECT 
    c.CustomerID,
    c.CustomerName,
    c.Region,
    COUNT(s.SaleID) AS TotalPurchases,
    SUM(s.TotalAmount) AS LifetimeValue
FROM dim_Customers c
LEFT JOIN fact_Sales s ON c.CustomerID = s.CustomerID
GROUP BY c.CustomerID, c.CustomerName, c.Region;
sql
-- 01_Schema_Setup.sql
-- 创建数据仓库数据库
CREATE DATABASE EnterpriseDataWarehouse;
GO

USE EnterpriseDataWarehouse;
GO

-- 维度表:客户
CREATE TABLE dim_Customers (
    CustomerID INT PRIMARY KEY,
    CustomerName NVARCHAR(100),
    Email NVARCHAR(100),
    Region NVARCHAR(50),
    RegistrationDate DATE
);

-- 维度表:产品
CREATE TABLE dim_Products (
    ProductID INT PRIMARY KEY,
    ProductName NVARCHAR(100),
    Category NVARCHAR(50),
    UnitPrice DECIMAL(10, 2)
);

-- 事实表:销售
CREATE TABLE fact_Sales (
    SaleID INT PRIMARY KEY,
    CustomerID INT FOREIGN KEY REFERENCES dim_Customers(CustomerID),
    ProductID INT FOREIGN KEY REFERENCES dim_Products(ProductID),
    Quantity INT,
    SaleDate DATE,
    TotalAmount DECIMAL(10, 2)
);

-- 商业智能视图:按产品统计收入
CREATE VIEW vw_RevenueByProduct AS
SELECT 
    p.ProductName,
    p.Category,
    SUM(s.TotalAmount) AS TotalRevenue,
    SUM(s.Quantity) AS TotalQuantity
FROM fact_Sales s
INNER JOIN dim_Products p ON s.ProductID = p.ProductID
GROUP BY p.ProductName, p.Category;

-- 商业智能视图:客户生命周期价值
CREATE VIEW vw_CustomerLTV AS
SELECT 
    c.CustomerID,
    c.CustomerName,
    c.Region,
    COUNT(s.SaleID) AS TotalPurchases,
    SUM(s.TotalAmount) AS LifetimeValue
FROM dim_Customers c
LEFT JOIN fact_Sales s ON c.CustomerID = s.CustomerID
GROUP BY c.CustomerID, c.CustomerName, c.Region;

SSIS Package Configuration

SSIS包配置

Creating the SSIS Project

创建SSIS项目

  1. Open Visual Studio with SSDT
  2. Create new Integration Services Project:
    EnterpriseETL.sln
  3. Add Connection Managers:
    • Source_FlatFile: Points to CSV directory
    • Destination_OLEDB: SQL Server connection string
  1. 打开安装了SSDT的Visual Studio
  2. 创建新的集成服务项目:
    EnterpriseETL.sln
  3. 添加连接管理器:
    • Source_FlatFile:指向CSV文件目录
    • Destination_OLEDB:SQL Server连接字符串

SSIS Package Flow

SSIS包流程

xml
<!-- Key SSIS Components -->
<!-- Data Flow Task: Load dim_Customers -->
- Flat File Source (Customers.csv)
- Data Conversion (handle Unicode, trim strings)
- Derived Column (add audit columns)
- OLEDB Destination (dim_Customers)

<!-- Data Flow Task: Load dim_Products -->
- Flat File Source (Products.csv)
- Data Conversion (decimal precision for prices)
- OLEDB Destination (dim_Products)

<!-- Data Flow Task: Load fact_Sales -->
- Flat File Source (Sales.csv)
- Lookup Transformation (validate CustomerID, ProductID)
- Derived Column (calculate TotalAmount = Quantity * UnitPrice)
- OLEDB Destination (fact_Sales)
xml
<!-- 核心SSIS组件 -->
<!-- 数据流任务:加载dim_Customers -->
- 平面文件源(Customers.csv)
- 数据转换(处理Unicode、修剪字符串)
- 派生列(添加审计列)
- OLEDB目标(dim_Customers)

<!-- 数据流任务:加载dim_Products -->
- 平面文件源(Products.csv)
- 数据转换(调整价格的小数精度)
- OLEDB目标(dim_Products)

<!-- 数据流任务:加载fact_Sales -->
- 平面文件源(Sales.csv)
- 查找转换(验证CustomerID、ProductID)
- 派生列(计算TotalAmount = Quantity * UnitPrice)
- OLEDB目标(fact_Sales)

Error Handling in SSIS

SSIS错误处理

sql
-- Create error logging table
CREATE TABLE ETL_ErrorLog (
    ErrorID INT IDENTITY(1,1) PRIMARY KEY,
    PackageName NVARCHAR(100),
    TaskName NVARCHAR(100),
    ErrorDescription NVARCHAR(MAX),
    ErrorDate DATETIME DEFAULT GETDATE()
);
sql
-- 创建错误日志表
CREATE TABLE ETL_ErrorLog (
    ErrorID INT IDENTITY(1,1) PRIMARY KEY,
    PackageName NVARCHAR(100),
    TaskName NVARCHAR(100),
    ErrorDescription NVARCHAR(MAX),
    ErrorDate DATETIME DEFAULT GETDATE()
);

Python Analytics

Python分析

Data Quality Audit Script

数据质量审计脚本

python
undefined
python
undefined

project_audit.py

project_audit.py

import pandas as pd import pyodbc from sqlalchemy import create_engine import matplotlib.pyplot as plt
import pandas as pd import pyodbc from sqlalchemy import create_engine import matplotlib.pyplot as plt

Database connection

数据库连接

def get_connection(): conn_str = ( "mssql+pyodbc:///?odbc_connect=" "DRIVER={ODBC Driver 17 for SQL Server};" f"SERVER={os.getenv('SQL_SERVER')};" f"DATABASE={os.getenv('SQL_DATABASE')};" "Trusted_Connection=yes;" ) return create_engine(conn_str)
def get_connection(): conn_str = ( "mssql+pyodbc:///?odbc_connect=" "DRIVER={ODBC Driver 17 for SQL Server};" f"SERVER={os.getenv('SQL_SERVER')};" f"DATABASE={os.getenv('SQL_DATABASE')};" "Trusted_Connection=yes;" ) return create_engine(conn_str)

Data Quality Checks

数据质量检查

def run_audit(): engine = get_connection()
# Check 1: Null values in critical columns
query_nulls = """
SELECT 
    'dim_Customers' AS TableName,
    SUM(CASE WHEN CustomerName IS NULL THEN 1 ELSE 0 END) AS NullCustomerName,
    SUM(CASE WHEN Email IS NULL THEN 1 ELSE 0 END) AS NullEmail
FROM dim_Customers
UNION ALL
SELECT 
    'fact_Sales',
    SUM(CASE WHEN CustomerID IS NULL THEN 1 ELSE 0 END),
    SUM(CASE WHEN ProductID IS NULL THEN 1 ELSE 0 END)
FROM fact_Sales
"""
df_nulls = pd.read_sql(query_nulls, engine)
print("Null Value Audit:")
print(df_nulls)

# Check 2: Orphaned records (referential integrity)
query_orphans = """
SELECT COUNT(*) AS OrphanedSales
FROM fact_Sales s
WHERE NOT EXISTS (SELECT 1 FROM dim_Customers c WHERE c.CustomerID = s.CustomerID)
   OR NOT EXISTS (SELECT 1 FROM dim_Products p WHERE p.ProductID = s.ProductID)
"""
df_orphans = pd.read_sql(query_orphans, engine)
print("\nOrphaned Records:")
print(df_orphans)

# Check 3: Revenue distribution
query_revenue = "SELECT * FROM vw_RevenueByProduct ORDER BY TotalRevenue DESC"
df_revenue = pd.read_sql(query_revenue, engine)

# Visualization
plt.figure(figsize=(10, 6))
plt.bar(df_revenue['ProductName'][:10], df_revenue['TotalRevenue'][:10])
plt.xlabel('Product')
plt.ylabel('Total Revenue')
plt.title('Top 10 Products by Revenue')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.savefig('revenue_analysis.png')
print("\nRevenue chart saved to revenue_analysis.png")
if name == "main": run_audit()
undefined
def run_audit(): engine = get_connection()
# 检查1:关键列的空值情况
query_nulls = """
SELECT 
    'dim_Customers' AS TableName,
    SUM(CASE WHEN CustomerName IS NULL THEN 1 ELSE 0 END) AS NullCustomerName,
    SUM(CASE WHEN Email IS NULL THEN 1 ELSE 0 END) AS NullEmail
FROM dim_Customers
UNION ALL
SELECT 
    'fact_Sales',
    SUM(CASE WHEN CustomerID IS NULL THEN 1 ELSE 0 END),
    SUM(CASE WHEN ProductID IS NULL THEN 1 ELSE 0 END)
FROM fact_Sales
"""
df_nulls = pd.read_sql(query_nulls, engine)
print("空值审计结果:")
print(df_nulls)

# 检查2:孤立记录(参照完整性)
query_orphans = """
SELECT COUNT(*) AS OrphanedSales
FROM fact_Sales s
WHERE NOT EXISTS (SELECT 1 FROM dim_Customers c WHERE c.CustomerID = s.CustomerID)
   OR NOT EXISTS (SELECT 1 FROM dim_Products p WHERE p.ProductID = s.ProductID)
"""
df_orphans = pd.read_sql(query_orphans, engine)
print("\n孤立记录:")
print(df_orphans)

# 检查3:收入分布
query_revenue = "SELECT * FROM vw_RevenueByProduct ORDER BY TotalRevenue DESC"
df_revenue = pd.read_sql(query_revenue, engine)

# 可视化
plt.figure(figsize=(10, 6))
plt.bar(df_revenue['ProductName'][:10], df_revenue['TotalRevenue'][:10])
plt.xlabel('产品')
plt.ylabel('总收入')
plt.title('收入Top 10产品')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.savefig('revenue_analysis.png')
print("\n收入分析图表已保存至revenue_analysis.png")
if name == "main": run_audit()
undefined

Customer Segmentation Analysis

客户细分分析

python
undefined
python
undefined

customer_segmentation.py

customer_segmentation.py

import pandas as pd from sqlalchemy import create_engine import os
def segment_customers(): engine = create_engine( f"mssql+pyodbc:///?odbc_connect=" f"DRIVER={{ODBC Driver 17 for SQL Server}};" f"SERVER={os.getenv('SQL_SERVER')};" f"DATABASE={os.getenv('SQL_DATABASE')};" f"Trusted_Connection=yes;" )
# Load customer LTV data
df = pd.read_sql("SELECT * FROM vw_CustomerLTV", engine)

# RFM-style segmentation
df['Segment'] = pd.cut(
    df['LifetimeValue'],
    bins=[0, 1000, 5000, float('inf')],
    labels=['Bronze', 'Silver', 'Gold']
)

# Aggregate by segment
segment_summary = df.groupby('Segment').agg({
    'CustomerID': 'count',
    'LifetimeValue': 'sum',
    'TotalPurchases': 'mean'
}).reset_index()

print(segment_summary)
return segment_summary
undefined
import pandas as pd from sqlalchemy import create_engine import os
def segment_customers(): engine = create_engine( f"mssql+pyodbc:///?odbc_connect=" f"DRIVER={{ODBC Driver 17 for SQL Server}};" f"SERVER={os.getenv('SQL_SERVER')};" f"DATABASE={os.getenv('SQL_DATABASE')};" f"Trusted_Connection=yes;" )
# 加载客户LTV数据
df = pd.read_sql("SELECT * FROM vw_CustomerLTV", engine)

# RFM风格细分
df['Segment'] = pd.cut(
    df['LifetimeValue'],
    bins=[0, 1000, 5000, float('inf')],
    labels=['青铜', '白银', '黄金']
)

# 按细分维度聚合
segment_summary = df.groupby('Segment').agg({
    'CustomerID': 'count',
    'LifetimeValue': 'sum',
    'TotalPurchases': 'mean'
}).reset_index()

print(segment_summary)
return segment_summary
undefined

PySpark Big Data Processing

PySpark大数据处理

High-Volume Sales Aggregation

高容量销售聚合

python
undefined
python
undefined

pyspark_analytics.py

pyspark_analytics.py

from pyspark.sql import SparkSession from pyspark.sql.functions import col, sum as _sum, count, avg, year, month import os
from pyspark.sql import SparkSession from pyspark.sql.functions import col, sum as _sum, count, avg, year, month import os

Initialize Spark

初始化Spark

spark = SparkSession.builder
.appName("EnterpriseSalesAnalytics")
.config("spark.jars", "mssql-jdbc-9.4.0.jre8.jar")
.getOrCreate()
spark = SparkSession.builder
.appName("EnterpriseSalesAnalytics")
.config("spark.jars", "mssql-jdbc-9.4.0.jre8.jar")
.getOrCreate()

JDBC connection properties

JDBC连接属性

jdbc_url = f"jdbc:sqlserver://{os.getenv('SQL_SERVER')}:1433;databaseName={os.getenv('SQL_DATABASE')}" connection_properties = { "user": os.getenv('SQL_USER'), "password": os.getenv('SQL_PASSWORD'), "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver" }
jdbc_url = f"jdbc:sqlserver://{os.getenv('SQL_SERVER')}:1433;databaseName={os.getenv('SQL_DATABASE')}" connection_properties = { "user": os.getenv('SQL_USER'), "password": os.getenv('SQL_PASSWORD'), "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver" }

Load data from SQL Server

从SQL Server加载数据

df_sales = spark.read.jdbc( url=jdbc_url, table="fact_Sales", properties=connection_properties )
df_customers = spark.read.jdbc( url=jdbc_url, table="dim_Customers", properties=connection_properties )
df_products = spark.read.jdbc( url=jdbc_url, table="dim_Products", properties=connection_properties )
df_sales = spark.read.jdbc( url=jdbc_url, table="fact_Sales", properties=connection_properties )
df_customers = spark.read.jdbc( url=jdbc_url, table="dim_Customers", properties=connection_properties )
df_products = spark.read.jdbc( url=jdbc_url, table="dim_Products", properties=connection_properties )

Join and aggregate

关联与聚合

df_combined = df_sales
.join(df_customers, "CustomerID")
.join(df_products, "ProductID")
df_combined = df_sales
.join(df_customers, "CustomerID")
.join(df_products, "ProductID")

Monthly revenue analysis

月度收入分析

df_monthly = df_combined
.withColumn("Year", year(col("SaleDate")))
.withColumn("Month", month(col("SaleDate")))
.groupBy("Year", "Month", "Region")
.agg( _sum("TotalAmount").alias("MonthlyRevenue"), count("SaleID").alias("TransactionCount"), avg("TotalAmount").alias("AvgTransactionValue") )
.orderBy("Year", "Month", "Region")
df_monthly.show(20)
df_monthly = df_combined
.withColumn("Year", year(col("SaleDate")))
.withColumn("Month", month(col("SaleDate")))
.groupBy("Year", "Month", "Region")
.agg( _sum("TotalAmount").alias("MonthlyRevenue"), count("SaleID").alias("TransactionCount"), avg("TotalAmount").alias("AvgTransactionValue") )
.orderBy("Year", "Month", "Region")
df_monthly.show(20)

Write results back to SQL Server

将结果写回SQL Server

df_monthly.write.jdbc( url=jdbc_url, table="analytics_MonthlyRevenue", mode="overwrite", properties=connection_properties )
df_monthly.write.jdbc( url=jdbc_url, table="analytics_MonthlyRevenue", mode="overwrite", properties=connection_properties )

Product performance by category

按品类统计产品表现

df_category = df_combined
.groupBy("Category")
.agg( _sum("TotalAmount").alias("CategoryRevenue"), _sum("Quantity").alias("TotalUnitsSold"), count("CustomerID").distinct().alias("UniqueCustomers") )
.orderBy(col("CategoryRevenue").desc())
df_category.show()
spark.stop()
undefined
df_category = df_combined
.groupBy("Category")
.agg( _sum("TotalAmount").alias("CategoryRevenue"), _sum("Quantity").alias("TotalUnitsSold"), count("CustomerID").distinct().alias("UniqueCustomers") )
.orderBy(col("CategoryRevenue").desc())
df_category.show()
spark.stop()
undefined

Parallel Processing for Large Datasets

大数据集并行处理

python
undefined
python
undefined

batch_processing.py

batch_processing.py

from pyspark.sql import SparkSession from pyspark.sql.functions import col, when
spark = SparkSession.builder
.appName("BatchDataProcessing")
.config("spark.executor.memory", "4g")
.config("spark.executor.cores", "4")
.getOrCreate()
from pyspark.sql import SparkSession from pyspark.sql.functions import col, when
spark = SparkSession.builder
.appName("BatchDataProcessing")
.config("spark.executor.memory", "4g")
.config("spark.executor.cores", "4")
.getOrCreate()

Read large CSV files

读取大型CSV文件

df_raw = spark.read.csv( "hdfs://data/sales/*.csv", header=True, inferSchema=True )
df_raw = spark.read.csv( "hdfs://data/sales/*.csv", header=True, inferSchema=True )

Data quality transformations

数据质量转换

df_cleaned = df_raw
.filter(col("TotalAmount").isNotNull())
.filter(col("Quantity") > 0)
.withColumn( "IsHighValue", when(col("TotalAmount") > 1000, "Yes").otherwise("No") )
.dropDuplicates(["SaleID"])
df_cleaned = df_raw
.filter(col("TotalAmount").isNotNull())
.filter(col("Quantity") > 0)
.withColumn( "IsHighValue", when(col("TotalAmount") > 1000, "Yes").otherwise("No") )
.dropDuplicates(["SaleID"])

Partition by date for efficient querying

按日期分区以优化查询

df_cleaned.write
.partitionBy("SaleDate")
.mode("overwrite")
.parquet("hdfs://data/processed/sales_cleaned")
print(f"Processed {df_cleaned.count()} records")
undefined
df_cleaned.write
.partitionBy("SaleDate")
.mode("overwrite")
.parquet("hdfs://data/processed/sales_cleaned")
print(f"已处理 {df_cleaned.count()} 条记录")
undefined

Common Patterns

通用模式

Incremental ETL (Load Only New Records)

增量ETL(仅加载新记录)

sql
-- Create staging table
CREATE TABLE stg_Sales (
    SaleID INT,
    CustomerID INT,
    ProductID INT,
    Quantity INT,
    SaleDate DATE,
    TotalAmount DECIMAL(10, 2),
    LoadDate DATETIME DEFAULT GETDATE()
);

-- Merge statement for incremental load
MERGE INTO fact_Sales AS target
USING stg_Sales AS source
ON target.SaleID = source.SaleID
WHEN MATCHED THEN
    UPDATE SET
        Quantity = source.Quantity,
        TotalAmount = source.TotalAmount
WHEN NOT MATCHED THEN
    INSERT (SaleID, CustomerID, ProductID, Quantity, SaleDate, TotalAmount)
    VALUES (source.SaleID, source.CustomerID, source.ProductID, 
            source.Quantity, source.SaleDate, source.TotalAmount);
sql
-- 创建临时表
CREATE TABLE stg_Sales (
    SaleID INT,
    CustomerID INT,
    ProductID INT,
    Quantity INT,
    SaleDate DATE,
    TotalAmount DECIMAL(10, 2),
    LoadDate DATETIME DEFAULT GETDATE()
);

-- 增量加载的Merge语句
MERGE INTO fact_Sales AS target
USING stg_Sales AS source
ON target.SaleID = source.SaleID
WHEN MATCHED THEN
    UPDATE SET
        Quantity = source.Quantity,
        TotalAmount = source.TotalAmount
WHEN NOT MATCHED THEN
    INSERT (SaleID, CustomerID, ProductID, Quantity, SaleDate, TotalAmount)
    VALUES (source.SaleID, source.CustomerID, source.ProductID, 
            source.Quantity, source.SaleDate, source.TotalAmount);

Automated Data Refresh

自动数据刷新

python
undefined
python
undefined

scheduled_refresh.py

scheduled_refresh.py

import subprocess import os from datetime import datetime
def run_ssis_package(): """Execute SSIS package via dtexec""" package_path = r"C:\SSIS\EnterpriseETL\EnterpriseETL\Package.dtsx"
cmd = [
    "dtexec",
    "/FILE", package_path,
    "/REPORTING", "E"
]

result = subprocess.run(cmd, capture_output=True, text=True)

log_file = f"etl_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(log_file, 'w') as f:
    f.write(result.stdout)
    f.write(result.stderr)

return result.returncode == 0
def run_analytics(): """Execute Python analytics after ETL""" subprocess.run(["python", "project_audit.py"]) subprocess.run(["python", "pyspark_analytics.py"])
if name == "main": if run_ssis_package(): print("ETL completed successfully") run_analytics() else: print("ETL failed - check logs")
undefined
import subprocess import os from datetime import datetime
def run_ssis_package(): """通过dtexec执行SSIS包""" package_path = r"C:\SSIS\EnterpriseETL\EnterpriseETL\Package.dtsx"
cmd = [
    "dtexec",
    "/FILE", package_path,
    "/REPORTING", "E"
]

result = subprocess.run(cmd, capture_output=True, text=True)

log_file = f"etl_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(log_file, 'w') as f:
    f.write(result.stdout)
    f.write(result.stderr)

return result.returncode == 0
def run_analytics(): """ETL完成后执行Python分析脚本""" subprocess.run(["python", "project_audit.py"]) subprocess.run(["python", "pyspark_analytics.py"])
if name == "main": if run_ssis_package(): print("ETL执行成功") run_analytics() else: print("ETL执行失败 - 请查看日志")
undefined

Troubleshooting

故障排查

SSIS Connection Issues

SSIS连接问题

plaintext
Error: "Cannot acquire connection to SQL Server"
Solution:
1. Verify SQL Server service is running
2. Check Windows Authentication vs SQL Authentication
3. Update connection string in Connection Manager
4. Enable TCP/IP protocol in SQL Server Configuration Manager
plaintext
错误:"无法连接到SQL Server"
解决方案:
1. 验证SQL Server服务是否运行
2. 检查Windows身份验证与SQL身份验证设置
3. 更新连接管理器中的连接字符串
4. 在SQL Server配置管理器中启用TCP/IP协议

Unicode/Encoding Errors

Unicode/编码错误

plaintext
Error: "Cannot convert between unicode and non-unicode string data types"
Solution in SSIS:
1. Add Data Conversion task
2. Convert DT_STR to DT_WSTR for Unicode columns
3. Set CodePage to 1252 (Windows Latin 1) in Flat File Connection
plaintext
错误:"无法在Unicode与非Unicode字符串数据类型之间转换"
SSIS中的解决方案:
1. 添加数据转换任务
2. 将DT_STR转换为DT_WSTR以支持Unicode列
3. 在平面文件连接中设置代码页为1252(Windows Latin 1)

PySpark JDBC Driver Not Found

PySpark JDBC驱动未找到

bash
undefined
bash
undefined

Download Microsoft JDBC driver

下载Microsoft JDBC驱动

Add to Spark session

添加到Spark会话

spark = SparkSession.builder
.config("spark.jars", "/path/to/mssql-jdbc-9.4.0.jre8.jar")
.getOrCreate()
undefined
spark = SparkSession.builder
.config("spark.jars", "/path/to/mssql-jdbc-9.4.0.jre8.jar")
.getOrCreate()
undefined

Performance Optimization

性能优化

python
undefined
python
undefined

Enable broadcast join for small dimension tables

为小型维度表启用广播连接

from pyspark.sql.functions import broadcast
df_result = df_sales.join( broadcast(df_products), "ProductID" )
from pyspark.sql.functions import broadcast
df_result = df_sales.join( broadcast(df_products), "ProductID" )

Cache frequently accessed DataFrames

缓存频繁访问的DataFrame

df_sales.cache() df_sales.count() # Trigger caching
undefined
df_sales.cache() df_sales.count() # 触发缓存
undefined

Environment Variables

环境变量

bash
undefined
bash
undefined

.env file

.env文件

SQL_SERVER=localhost SQL_DATABASE=EnterpriseDataWarehouse SQL_USER=your_username SQL_PASSWORD=your_password
SQL_SERVER=localhost SQL_DATABASE=EnterpriseDataWarehouse SQL_USER=your_username SQL_PASSWORD=your_password

For PySpark

用于PySpark

SPARK_HOME=/path/to/spark JAVA_HOME=/path/to/java
undefined
SPARK_HOME=/path/to/spark JAVA_HOME=/path/to/java
undefined

Running the Complete Pipeline

运行完整管道

bash
undefined
bash
undefined

Step 1: Setup database

步骤1:配置数据库

sqlcmd -S localhost -i 01_Schema_Setup.sql
sqlcmd -S localhost -i 01_Schema_Setup.sql

Step 2: Run SSIS package (via Visual Studio or dtexec)

步骤2:运行SSIS包(通过Visual Studio或dtexec)

dtexec /FILE "EnterpriseETL\Package.dtsx"
dtexec /FILE "EnterpriseETL\Package.dtsx"

Step 3: Run data quality audit

步骤3:运行数据质量审计

python project_audit.py
python project_audit.py

Step 4: Run PySpark analytics

步骤4:运行PySpark分析

spark-submit --jars mssql-jdbc-9.4.0.jre8.jar pyspark_analytics.py

This enterprise pipeline provides a complete solution for data warehousing, ETL automation, and big data analytics using industry-standard Microsoft and Apache technologies.
spark-submit --jars mssql-jdbc-9.4.0.jre8.jar pyspark_analytics.py

该企业级管道提供了完整的数据仓库、ETL自动化与大数据分析解决方案,采用行业标准的微软与Apache技术。