data-engineering-medallion-pipeline

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Data Engineering Medallion Pipeline Skill

数据工程Medallion管道Skill

Skill by ara.so — Data Skills collection.
This skill enables AI agents to work with a complete data engineering pipeline implementing the Medallion Architecture (Bronze → Silver → Gold) using modern open-source tools: MinIO (S3-compatible storage), Airbyte (data ingestion), PostgreSQL (data warehouse), DBT (transformations), Apache Airflow (orchestration), and Grafana (monitoring).
ara.so提供的Skill —— 数据技能合集。
本Skill支持AI Agent使用现代开源工具构建完整的Medallion架构(青铜→白银→黄金)数据工程管道,涉及工具包括:MinIO(兼容S3的存储)、Airbyte(数据摄取)、PostgreSQL(数据仓库)、DBT(数据转换)、Apache Airflow(任务编排)和Grafana(监控)。

What This Project Does

项目功能

The data-engineering-medallion project provides a complete end-to-end data pipeline that:
  1. Ingests raw data from MinIO object storage into PostgreSQL using Airbyte
  2. Transforms data through three layers (Bronze/Silver/Gold) using DBT
  3. Orchestrates the entire pipeline with Apache Airflow DAGs
  4. Validates data quality with automated DBT tests
  5. Monitors infrastructure health with Prometheus and Grafana
  6. Visualizes business metrics in Power BI dashboards
The architecture follows ELT (Extract-Load-Transform) pattern with clear separation of concerns:
  • Bronze: Raw immutable data from sources (JSONB format)
  • Silver: Cleaned, validated, and typed data
  • Gold: Business-ready aggregated metrics and KPIs
data-engineering-medallion项目提供一套完整的端到端数据管道,具备以下能力:
  1. 数据摄取:使用Airbyte将MinIO对象存储中的原始数据导入PostgreSQL
  2. 数据转换:通过DBT实现青铜/白银/黄金三层数据转换
  3. 任务编排:使用Apache Airflow DAGs编排整个管道流程
  4. 数据质量校验:通过自动化DBT测试验证数据质量
  5. 基础设施监控:使用Prometheus和Grafana监控基础设施健康状态
  6. 业务指标可视化:在Power BI仪表盘中展示业务指标
架构遵循ELT(抽取-加载-转换)模式,各层职责清晰:
  • 青铜层:来自数据源的原始不可变数据(JSONB格式)
  • 白银层:经过清洗、校验和类型转换的数据
  • 黄金层:面向业务的聚合指标与关键绩效指标(KPI)

Installation & Setup

安装与配置

Prerequisites

前置依赖

bash
undefined
bash
undefined

Required

必需依赖

docker --version # 20.10+ docker-compose --version # 2.0+
docker --version # 20.10+ docker-compose --version # 2.0+

8GB RAM minimum, 16GB recommended

最低8GB内存,推荐16GB

undefined
undefined

Clone and Initialize

克隆与初始化

bash
git clone https://github.com/LucasGoulartCouto/data-engineering-medallion.git
cd data-engineering-medallion
bash
git clone https://github.com/LucasGoulartCouto/data-engineering-medallion.git
cd data-engineering-medallion

Setup environment and start all services

配置环境并启动所有服务

make setup make start
make setup make start

Verify all containers are healthy

验证所有容器状态健康

make status
undefined
make status
undefined

Service URLs

服务访问地址

After startup, access these interfaces:

Key Commands (Makefile)

核心命令(Makefile)

bash
undefined
bash
undefined

Infrastructure

基础设施管理

make setup # Create .env, directories, install dependencies make start # Start all Docker services make stop # Stop all services make restart # Restart all services make status # Check container health make logs SERVICE=airflow # View logs for specific service
make setup # 创建.env文件、目录并安装依赖 make start # 启动所有Docker服务 make stop # 停止所有服务 make restart # 重启所有服务 make status # 检查容器健康状态 make logs SERVICE=airflow # 查看指定服务的日志

DBT Operations

DBT操作

make dbt-run # Run all DBT models (bronze → silver → gold) make dbt-test # Run data quality tests make dbt-docs # Generate and serve documentation make dbt-snapshot # Capture SCD Type 2 snapshots make dbt-clean # Clean compiled artifacts
make dbt-run # 运行所有DBT模型(青铜→白银→黄金) make dbt-test # 运行数据质量测试 make dbt-docs # 生成并启动文档服务 make dbt-snapshot # 捕获SCD Type 2快照 make dbt-clean # 清理编译产物

Data Pipeline

数据管道操作

make upload-data # Upload sample data to MinIO make trigger-dag DAG_ID=bronze_ingestion_dag # Manually trigger Airflow DAG
make upload-data # 将示例数据上传至MinIO make trigger-dag DAG_ID=bronze_ingestion_dag # 手动触发Airflow DAG

Development

开发相关

make lint # Lint Python and SQL code make format # Format Python code with black make validate # Validate Airflow DAGs and DBT models
make lint # 检查Python和SQL代码规范 make format # 使用black格式化Python代码 make validate # 验证Airflow DAGs和DBT模型

Cleanup

清理操作

make clean # Remove volumes and stop services make clean-all # Full cleanup including Docker images
undefined
make clean # 删除卷并停止服务 make clean-all # 完全清理,包括Docker镜像
undefined

Project Structure

项目结构

data-engineering-medallion/
├── airflow/
│   └── dags/
│       ├── bronze_ingestion_dag.py      # Triggers Airbyte sync
│       ├── silver_transformation_dag.py  # Runs DBT silver models
│       └── gold_aggregation_dag.py       # Runs DBT gold models
├── dbt/
│   ├── models/
│   │   ├── bronze/         # Extract JSONB → columnar
│   │   ├── silver/         # Clean, validate, dedupe
│   │   └── gold/           # Business metrics
│   ├── macros/             # Reusable SQL functions
│   ├── snapshots/          # SCD Type 2 history
│   └── tests/              # Custom data quality tests
├── scripts/
│   ├── upload_to_minio.py  # Upload CSV/JSON to MinIO
│   └── test_connections.py # Verify service connectivity
├── postgres/init/          # Database initialization SQL
├── monitoring/
│   ├── grafana/provisioning/
│   └── prometheus/
├── docker-compose.yml
├── Makefile
└── .env                    # Configuration (create from .env.example)
data-engineering-medallion/
├── airflow/
│   └── dags/
│       ├── bronze_ingestion_dag.py      # 触发Airbyte同步任务
│       ├── silver_transformation_dag.py  # 运行DBT白银层模型
│       └── gold_aggregation_dag.py       # 运行DBT黄金层模型
├── dbt/
│   ├── models/
│   │   ├── bronze/         # 将JSONB转换为列式存储
│   │   ├── silver/         # 清洗、校验、去重
│   │   └── gold/           # 生成业务指标
│   ├── macros/             # 可复用SQL函数
│   ├── snapshots/          # SCD Type 2历史快照
│   └── tests/              # 自定义数据质量测试
├── scripts/
│   ├── upload_to_minio.py  # 将CSV/JSON文件上传至MinIO
│   └── test_connections.py # 验证服务连通性
├── postgres/init/          # 数据库初始化SQL脚本
├── monitoring/
│   ├── grafana/provisioning/
│   └── prometheus/
├── docker-compose.yml
├── Makefile
└── .env                    # 配置文件(从.env.example创建)

DBT Model Development

DBT模型开发

Bronze Layer (Raw Extraction)

青铜层(原始数据抽取)

Bronze models extract JSONB data from Airbyte into typed columns:
sql
-- dbt/models/bronze/bronze_orders.sql
{{ config(
    materialized='view',
    schema='bronze'
) }}

SELECT
    _airbyte_ab_id,
    _airbyte_emitted_at,
    _airbyte_data->>'order_id' AS order_id,
    _airbyte_data->>'customer_id' AS customer_id,
    _airbyte_data->>'order_date' AS order_date,
    _airbyte_data->>'total_amount' AS total_amount,
    _airbyte_data->>'status' AS status,
    _airbyte_data AS raw_data
FROM {{ source('airbyte_raw', '_airbyte_raw_orders') }}
青铜层模型将Airbyte中的JSONB数据转换为带类型的列:
sql
-- dbt/models/bronze/bronze_orders.sql
{{ config(
    materialized='view',
    schema='bronze'
) }}

SELECT
    _airbyte_ab_id,
    _airbyte_emitted_at,
    _airbyte_data->>'order_id' AS order_id,
    _airbyte_data->>'customer_id' AS customer_id,
    _airbyte_data->>'order_date' AS order_date,
    _airbyte_data->>'total_amount' AS total_amount,
    _airbyte_data->>'status' AS status,
    _airbyte_data AS raw_data
FROM {{ source('airbyte_raw', '_airbyte_raw_orders') }}

Silver Layer (Cleaned & Validated)

白银层(数据清洗与校验)

Silver models clean, cast types, deduplicate, and add calculated fields:
sql
-- dbt/models/silver/silver_orders.sql
{{ config(
    materialized='table',
    schema='silver',
    unique_key='order_id'
) }}

WITH deduplicated AS (
    SELECT *,
           ROW_NUMBER() OVER (
               PARTITION BY order_id 
               ORDER BY _airbyte_emitted_at DESC
           ) AS rn
    FROM {{ ref('bronze_orders') }}
)

SELECT
    order_id::INTEGER,
    customer_id::INTEGER,
    order_date::DATE,
    total_amount::DECIMAL(10,2),
    UPPER(TRIM(status)) AS status,
    CASE 
        WHEN total_amount::DECIMAL > 1000 THEN 'high_value'
        WHEN total_amount::DECIMAL > 500 THEN 'medium_value'
        ELSE 'low_value'
    END AS order_value_segment,
    _airbyte_emitted_at AS ingested_at,
    CURRENT_TIMESTAMP AS transformed_at
FROM deduplicated
WHERE rn = 1
    AND order_id IS NOT NULL
    AND order_date::DATE <= CURRENT_DATE
白银层模型完成数据清洗、类型转换、去重,并添加计算字段:
sql
-- dbt/models/silver/silver_orders.sql
{{ config(
    materialized='table',
    schema='silver',
    unique_key='order_id'
) }}

WITH deduplicated AS (
    SELECT *,
           ROW_NUMBER() OVER (
               PARTITION BY order_id 
               ORDER BY _airbyte_emitted_at DESC
           ) AS rn
    FROM {{ ref('bronze_orders') }}
)

SELECT
    order_id::INTEGER,
    customer_id::INTEGER,
    order_date::DATE,
    total_amount::DECIMAL(10,2),
    UPPER(TRIM(status)) AS status,
    CASE 
        WHEN total_amount::DECIMAL > 1000 THEN 'high_value'
        WHEN total_amount::DECIMAL > 500 THEN 'medium_value'
        ELSE 'low_value'
    END AS order_value_segment,
    _airbyte_emitted_at AS ingested_at,
    CURRENT_TIMESTAMP AS transformed_at
FROM deduplicated
WHERE rn = 1
    AND order_id IS NOT NULL
    AND order_date::DATE <= CURRENT_DATE

Gold Layer (Business Metrics)

黄金层(业务指标聚合)

Gold models aggregate data for business consumption:
sql
-- dbt/models/gold/gold_product_performance.sql
{{ config(
    materialized='table',
    schema='gold'
) }}

SELECT
    p.product_id,
    p.product_name,
    p.category,
    p.supplier,
    COUNT(DISTINCT oi.order_id) AS total_orders,
    SUM(oi.quantity) AS total_quantity_sold,
    SUM(oi.line_total) AS total_revenue,
    SUM(oi.quantity * p.cost) AS total_cost,
    SUM(oi.line_total) - SUM(oi.quantity * p.cost) AS total_profit,
    ROUND(
        (SUM(oi.line_total) - SUM(oi.quantity * p.cost)) / 
        NULLIF(SUM(oi.line_total), 0) * 100, 
        2
    ) AS profit_margin_percentage,
    AVG(oi.unit_price) AS avg_unit_price,
    MAX(o.order_date) AS last_sale_date
FROM {{ ref('silver_products') }} p
INNER JOIN {{ ref('silver_order_items') }} oi 
    ON p.product_id = oi.product_id
INNER JOIN {{ ref('silver_orders') }} o 
    ON oi.order_id = o.order_id
WHERE o.status = 'completed'
GROUP BY p.product_id, p.product_name, p.category, p.supplier
黄金层模型聚合数据以满足业务使用需求:
sql
-- dbt/models/gold/gold_product_performance.sql
{{ config(
    materialized='table',
    schema='gold'
) }}

SELECT
    p.product_id,
    p.product_name,
    p.category,
    p.supplier,
    COUNT(DISTINCT oi.order_id) AS total_orders,
    SUM(oi.quantity) AS total_quantity_sold,
    SUM(oi.line_total) AS total_revenue,
    SUM(oi.quantity * p.cost) AS total_cost,
    SUM(oi.line_total) - SUM(oi.quantity * p.cost) AS total_profit,
    ROUND(
        (SUM(oi.line_total) - SUM(oi.quantity * p.cost)) / 
        NULLIF(SUM(oi.line_total), 0) * 100, 
        2
    ) AS profit_margin_percentage,
    AVG(oi.unit_price) AS avg_unit_price,
    MAX(o.order_date) AS last_sale_date
FROM {{ ref('silver_products') }} p
INNER JOIN {{ ref('silver_order_items') }} oi 
    ON p.product_id = oi.product_id
INNER JOIN {{ ref('silver_orders') }} o 
    ON oi.order_id = o.order_id
WHERE o.status = 'completed'
GROUP BY p.product_id, p.product_name, p.category, p.supplier

DBT Testing & Data Quality

DBT测试与数据质量

Schema Tests (schema.yml)

Schema测试(schema.yml)

yaml
undefined
yaml
undefined

dbt/models/silver/schema.yml

dbt/models/silver/schema.yml

version: 2
models:
  • name: silver_orders description: "Cleaned and validated orders" columns:
    • name: order_id description: "Primary key" tests:
      • unique
      • not_null
    • name: customer_id description: "Foreign key to customers" tests:
      • not_null
      • relationships: to: ref('silver_customers') field: customer_id
    • name: status tests:
      • accepted_values: values: ['PENDING', 'COMPLETED', 'CANCELLED', 'REFUNDED']
    • name: total_amount tests:
      • dbt_utils.accepted_range: min_value: 0 max_value: 1000000
undefined
version: 2
models:
  • name: silver_orders description: "经过清洗和校验的订单数据" columns:
    • name: order_id description: "主键" tests:
      • unique
      • not_null
    • name: customer_id description: "关联客户表的外键" tests:
      • not_null
      • relationships: to: ref('silver_customers') field: customer_id
    • name: status tests:
      • accepted_values: values: ['PENDING', 'COMPLETED', 'CANCELLED', 'REFUNDED']
    • name: total_amount tests:
      • dbt_utils.accepted_range: min_value: 0 max_value: 1000000
undefined

Custom Tests

自定义测试

sql
-- dbt/tests/assert_no_future_dates_in_sales.sql
SELECT order_id, order_date
FROM {{ ref('silver_orders') }}
WHERE order_date > CURRENT_DATE
sql
-- dbt/tests/assert_no_negative_profit.sql
SELECT product_id, total_profit
FROM {{ ref('gold_product_performance') }}
WHERE total_profit < 0
sql
-- dbt/tests/assert_no_future_dates_in_sales.sql
SELECT order_id, order_date
FROM {{ ref('silver_orders') }}
WHERE order_date > CURRENT_DATE
sql
-- dbt/tests/assert_no_negative_profit.sql
SELECT product_id, total_profit
FROM {{ ref('gold_product_performance') }}
WHERE total_profit < 0

Running Tests

运行测试

bash
undefined
bash
undefined

Run all tests

运行所有测试

make dbt-test
make dbt-test

Run tests for specific model

运行指定模型的测试

docker-compose exec dbt dbt test --select silver_orders
docker-compose exec dbt dbt test --select silver_orders

Run specific test type

运行指定类型的测试

docker-compose exec dbt dbt test --select test_type:generic docker-compose exec dbt dbt test --select test_type:singular
undefined
docker-compose exec dbt dbt test --select test_type:generic docker-compose exec dbt dbt test --select test_type:singular
undefined

Airflow DAG Development

Airflow DAG开发

Bronze Ingestion DAG

青铜层摄取DAG

python
undefined
python
undefined

airflow/dags/bronze_ingestion_dag.py

airflow/dags/bronze_ingestion_dag.py

from datetime import datetime, timedelta from airflow import DAG from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator from airflow.operators.python import PythonOperator
default_args = { 'owner': 'data-engineering', 'depends_on_past': False, 'email_on_failure': True, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), }
with DAG( 'bronze_ingestion_dag', default_args=default_args, description='Ingest data from MinIO to PostgreSQL Bronze layer', schedule_interval='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['bronze', 'ingestion', 'airbyte'], ) as dag:
trigger_airbyte_sync = AirbyteTriggerSyncOperator(
    task_id='trigger_airbyte_orders_sync',
    airbyte_conn_id='airbyte_default',
    connection_id='{{ var.value.airbyte_connection_id }}',
    asynchronous=False,
    timeout=3600,
)

def validate_ingestion(**context):
    from airflow.providers.postgres.hooks.postgres import PostgresHook
    pg_hook = PostgresHook(postgres_conn_id='postgres_default')
    
    # Check row count
    result = pg_hook.get_first(
        "SELECT COUNT(*) FROM airbyte_raw._airbyte_raw_orders"
    )
    if result[0] == 0:
        raise ValueError("No data ingested to bronze layer")
    
    print(f"Validated {result[0]} rows in bronze layer")

validate_task = PythonOperator(
    task_id='validate_bronze_ingestion',
    python_callable=validate_ingestion,
)

trigger_airbyte_sync >> validate_task
undefined
from datetime import datetime, timedelta from airflow import DAG from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator from airflow.operators.python import PythonOperator
default_args = { 'owner': 'data-engineering', 'depends_on_past': False, 'email_on_failure': True, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), }
with DAG( 'bronze_ingestion_dag', default_args=default_args, description='将MinIO数据摄取到PostgreSQL青铜层', schedule_interval='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['bronze', 'ingestion', 'airbyte'], ) as dag:
trigger_airbyte_sync = AirbyteTriggerSyncOperator(
    task_id='trigger_airbyte_orders_sync',
    airbyte_conn_id='airbyte_default',
    connection_id='{{ var.value.airbyte_connection_id }}',
    asynchronous=False,
    timeout=3600,
)

def validate_ingestion(**context):
    from airflow.providers.postgres.hooks.postgres import PostgresHook
    pg_hook = PostgresHook(postgres_conn_id='postgres_default')
    
    # 检查数据行数
    result = pg_hook.get_first(
        "SELECT COUNT(*) FROM airbyte_raw._airbyte_raw_orders"
    )
    if result[0] == 0:
        raise ValueError("青铜层未摄取到任何数据")
    
    print(f"已验证青铜层中有{result[0]}条数据")

validate_task = PythonOperator(
    task_id='validate_bronze_ingestion',
    python_callable=validate_ingestion,
)

trigger_airbyte_sync >> validate_task
undefined

Silver Transformation DAG

白银层转换DAG

python
undefined
python
undefined

airflow/dags/silver_transformation_dag.py

airflow/dags/silver_transformation_dag.py

from datetime import datetime, timedelta from airflow import DAG from airflow.providers.dbt.cloud.operators.dbt import DbtRunOperator from airflow.operators.bash import BashOperator
default_args = { 'owner': 'data-engineering', 'retries': 2, 'retry_delay': timedelta(minutes=3), }
with DAG( 'silver_transformation_dag', default_args=default_args, description='Transform bronze to silver layer with DBT', schedule_interval='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['silver', 'transformation', 'dbt'], ) as dag:
run_silver_models = BashOperator(
    task_id='run_dbt_silver_models',
    bash_command='cd /opt/dbt && dbt run --select silver.*',
)

test_silver_models = BashOperator(
    task_id='test_dbt_silver_models',
    bash_command='cd /opt/dbt && dbt test --select silver.*',
)

run_silver_models >> test_silver_models
undefined
from datetime import datetime, timedelta from airflow import DAG from airflow.providers.dbt.cloud.operators.dbt import DbtRunOperator from airflow.operators.bash import BashOperator
default_args = { 'owner': 'data-engineering', 'retries': 2, 'retry_delay': timedelta(minutes=3), }
with DAG( 'silver_transformation_dag', default_args=default_args, description='使用DBT将青铜层数据转换为白银层', schedule_interval='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['silver', 'transformation', 'dbt'], ) as dag:
run_silver_models = BashOperator(
    task_id='run_dbt_silver_models',
    bash_command='cd /opt/dbt && dbt run --select silver.*',
)

test_silver_models = BashOperator(
    task_id='test_dbt_silver_models',
    bash_command='cd /opt/dbt && dbt test --select silver.*',
)

run_silver_models >> test_silver_models
undefined

Gold Aggregation DAG

黄金层聚合DAG

python
undefined
python
undefined

airflow/dags/gold_aggregation_dag.py

airflow/dags/gold_aggregation_dag.py

from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import BranchPythonOperator
default_args = { 'owner': 'data-engineering', 'retries': 2, 'retry_delay': timedelta(minutes=3), }
with DAG( 'gold_aggregation_dag', default_args=default_args, description='Build gold layer business metrics', schedule_interval='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['gold', 'aggregation', 'metrics'], ) as dag:
run_gold_models = BashOperator(
    task_id='run_dbt_gold_models',
    bash_command='cd /opt/dbt && dbt run --select gold.*',
)

test_gold_models = BashOperator(
    task_id='test_dbt_gold_models',
    bash_command='cd /opt/dbt && dbt test --select gold.*',
)

snapshot_gold = BashOperator(
    task_id='snapshot_gold_metrics',
    bash_command='cd /opt/dbt && dbt snapshot',
)

run_gold_models >> test_gold_models >> snapshot_gold
undefined
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import BranchPythonOperator
default_args = { 'owner': 'data-engineering', 'retries': 2, 'retry_delay': timedelta(minutes=3), }
with DAG( 'gold_aggregation_dag', default_args=default_args, description='构建黄金层业务指标', schedule_interval='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['gold', 'aggregation', 'metrics'], ) as dag:
run_gold_models = BashOperator(
    task_id='run_dbt_gold_models',
    bash_command='cd /opt/dbt && dbt run --select gold.*',
)

test_gold_models = BashOperator(
    task_id='test_dbt_gold_models',
    bash_command='cd /opt/dbt && dbt test --select gold.*',
)

snapshot_gold = BashOperator(
    task_id='snapshot_gold_metrics',
    bash_command='cd /opt/dbt && dbt snapshot',
)

run_gold_models >> test_gold_models >> snapshot_gold
undefined

Data Upload to MinIO

数据上传至MinIO

python
undefined
python
undefined

scripts/upload_to_minio.py

scripts/upload_to_minio.py

import os from minio import Minio from minio.error import S3Error import pandas as pd from pathlib import Path
def upload_data_to_minio(): """Upload sample CSV data to MinIO bucket"""
# Initialize MinIO client
client = Minio(
    os.getenv('MINIO_ENDPOINT', 'localhost:9000'),
    access_key=os.getenv('MINIO_ACCESS_KEY', 'minioadmin'),
    secret_key=os.getenv('MINIO_SECRET_KEY'),
    secure=False
)

bucket_name = os.getenv('MINIO_BUCKET', 'raw-data')

# Create bucket if not exists
try:
    if not client.bucket_exists(bucket_name):
        client.make_bucket(bucket_name)
        print(f"Created bucket: {bucket_name}")
except S3Error as e:
    print(f"Error creating bucket: {e}")
    return

# Upload files from data directory
data_dir = Path('data/raw')
for file_path in data_dir.glob('*.csv'):
    try:
        client.fput_object(
            bucket_name,
            file_path.name,
            str(file_path),
            content_type='text/csv'
        )
        print(f"Uploaded: {file_path.name}")
    except S3Error as e:
        print(f"Error uploading {file_path.name}: {e}")
if name == 'main': upload_data_to_minio()

Run the upload:

```bash
make upload-data
import os from minio import Minio from minio.error import S3Error import pandas as pd from pathlib import Path
def upload_data_to_minio(): """将示例CSV数据上传至MinIO存储桶"""
# 初始化MinIO客户端
client = Minio(
    os.getenv('MINIO_ENDPOINT', 'localhost:9000'),
    access_key=os.getenv('MINIO_ACCESS_KEY', 'minioadmin'),
    secret_key=os.getenv('MINIO_SECRET_KEY'),
    secure=False
)

bucket_name = os.getenv('MINIO_BUCKET', 'raw-data')

# 如果存储桶不存在则创建
try:
    if not client.bucket_exists(bucket_name):
        client.make_bucket(bucket_name)
        print(f"已创建存储桶: {bucket_name}")
except S3Error as e:
    print(f"创建存储桶出错: {e}")
    return

# 上传data目录中的文件
data_dir = Path('data/raw')
for file_path in data_dir.glob('*.csv'):
    try:
        client.fput_object(
            bucket_name,
            file_path.name,
            str(file_path),
            content_type='text/csv'
        )
        print(f"已上传: {file_path.name}")
    except S3Error as e:
        print(f"上传{file_path.name}出错: {e}")
if name == 'main': upload_data_to_minio()

运行上传命令:

```bash
make upload-data

or

python scripts/upload_to_minio.py
undefined
python scripts/upload_to_minio.py
undefined

Configuration

配置说明

Environment Variables (.env)

环境变量(.env)

bash
undefined
bash
undefined

PostgreSQL

PostgreSQL

POSTGRES_USER=dataeng POSTGRES_PASSWORD=<your-secure-password> POSTGRES_DB=datawarehouse POSTGRES_HOST=postgres POSTGRES_PORT=5432
POSTGRES_USER=dataeng POSTGRES_PASSWORD=<your-secure-password> POSTGRES_DB=datawarehouse POSTGRES_HOST=postgres POSTGRES_PORT=5432

MinIO

MinIO

MINIO_ROOT_USER=minioadmin MINIO_ROOT_PASSWORD=<your-secure-password> MINIO_ENDPOINT=minio:9000 MINIO_BUCKET=raw-data
MINIO_ROOT_USER=minioadmin MINIO_ROOT_PASSWORD=<your-secure-password> MINIO_ENDPOINT=minio:9000 MINIO_BUCKET=raw-data

Airflow

Airflow

AIRFLOW_UID=50000 AIRFLOW_GID=0 AIRFLOW__CORE__EXECUTOR=LocalExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/airflow AIRFLOW__CORE__FERNET_KEY=<generate-with-python-cryptography>
AIRFLOW_UID=50000 AIRFLOW_GID=0 AIRFLOW__CORE__EXECUTOR=LocalExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/airflow AIRFLOW__CORE__FERNET_KEY=<generate-with-python-cryptography>

Airbyte

Airbyte

AIRBYTE_VERSION=0.50.0
AIRBYTE_VERSION=0.50.0

DBT

DBT

DBT_PROFILES_DIR=/opt/dbt DBT_PROJECT_DIR=/opt/dbt
DBT_PROFILES_DIR=/opt/dbt DBT_PROJECT_DIR=/opt/dbt

Grafana

Grafana

GF_SECURITY_ADMIN_PASSWORD=<your-secure-password>
undefined
GF_SECURITY_ADMIN_PASSWORD=<your-secure-password>
undefined

DBT Profile Configuration

DBT配置文件

yaml
undefined
yaml
undefined

dbt/profiles.yml

dbt/profiles.yml

datawarehouse: target: dev outputs: dev: type: postgres host: "{{ env_var('POSTGRES_HOST') }}" port: "{{ env_var('POSTGRES_PORT') | int }}" user: "{{ env_var('POSTGRES_USER') }}" password: "{{ env_var('POSTGRES_PASSWORD') }}" dbname: "{{ env_var('POSTGRES_DB') }}" schema: public threads: 4 keepalives_idle: 0
undefined
datawarehouse: target: dev outputs: dev: type: postgres host: "{{ env_var('POSTGRES_HOST') }}" port: "{{ env_var('POSTGRES_PORT') | int }}" user: "{{ env_var('POSTGRES_USER') }}" password: "{{ env_var('POSTGRES_PASSWORD') }}" dbname: "{{ env_var('POSTGRES_DB') }}" schema: public threads: 4 keepalives_idle: 0
undefined

Airbyte Connection Setup

Airbyte连接配置

  1. Access Airbyte UI: http://localhost:8000
  2. Create MinIO source:
    • Connector: S3
    • Endpoint: http://minio:9000
    • Bucket: raw-data
    • Access Key: ${MINIO_ROOT_USER}
    • Secret Key: ${MINIO_ROOT_PASSWORD}
  3. Create PostgreSQL destination:
    • Host: postgres
    • Port: 5432
    • Database: datawarehouse
    • Schema: bronze
    • Username: ${POSTGRES_USER}
    • Password: ${POSTGRES_PASSWORD}
  4. Create connection with sync mode: Full Refresh | Overwrite
  1. 访问Airbyte界面:http://localhost:8000
  2. 创建MinIO数据源:
    • 连接器:S3
    • 端点:http://minio:9000
    • 存储桶:raw-data
    • 访问密钥:${MINIO_ROOT_USER}
    • 密钥:${MINIO_ROOT_PASSWORD}
  3. 创建PostgreSQL目标端:
    • 主机:postgres
    • 端口:5432
    • 数据库:datawarehouse
    • Schema:bronze
    • 用户名:${POSTGRES_USER}
    • 密码:${POSTGRES_PASSWORD}
  4. 创建连接,同步模式选择:Full Refresh | Overwrite

Common Patterns

常用模式

Incremental Model Pattern

增量模型模式

sql
-- dbt/models/silver/silver_orders_incremental.sql
{{ config(
    materialized='incremental',
    unique_key='order_id',
    schema='silver',
    on_schema_change='append_new_columns'
) }}

SELECT
    order_id,
    customer_id,
    order_date,
    total_amount,
    status,
    _airbyte_emitted_at AS ingested_at
FROM {{ ref('bronze_orders') }}
{% if is_incremental() %}
WHERE _airbyte_emitted_at > (SELECT MAX(ingested_at) FROM {{ this }})
{% endif %}
sql
-- dbt/models/silver/silver_orders_incremental.sql
{{ config(
    materialized='incremental',
    unique_key='order_id',
    schema='silver',
    on_schema_change='append_new_columns'
) }}

SELECT
    order_id,
    customer_id,
    order_date,
    total_amount,
    status,
    _airbyte_emitted_at AS ingested_at
FROM {{ ref('bronze_orders') }}
{% if is_incremental() %}
WHERE _airbyte_emitted_at > (SELECT MAX(ingested_at) FROM {{ this }})
{% endif %}

Macro for Common Transformations

通用转换宏

sql
-- dbt/macros/clean_string.sql
{% macro clean_string(column_name) %}
    UPPER(TRIM(REGEXP_REPLACE({{ column_name }}, '\s+', ' ', 'g')))
{% endmacro %}

-- Usage in model
SELECT {{ clean_string('customer_name') }} AS customer_name
FROM {{ ref('bronze_customers') }}
sql
-- dbt/macros/clean_string.sql
{% macro clean_string(column_name) %}
    UPPER(TRIM(REGEXP_REPLACE({{ column_name }}, '\s+', ' ', 'g')))
{% endmacro %}

-- 在模型中使用
SELECT {{ clean_string('customer_name') }} AS customer_name
FROM {{ ref('bronze_customers') }}

Snapshot (SCD Type 2)

快照(SCD Type 2)

sql
-- dbt/snapshots/snapshot_customer_segments.sql
{% snapshot snapshot_customer_segments %}

{{
    config(
      target_schema='snapshots',
      unique_key='customer_id',
      strategy='timestamp',
      updated_at='updated_at',
    )
}}

SELECT * FROM {{ ref('gold_customer_metrics') }}

{% endsnapshot %}
sql
-- dbt/snapshots/snapshot_customer_segments.sql
{% snapshot snapshot_customer_segments %}

{{
    config(
      target_schema='snapshots',
      unique_key='customer_id',
      strategy='timestamp',
      updated_at='updated_at',
    )
}}

SELECT * FROM {{ ref('gold_customer_metrics') }}

{% endsnapshot %}

Custom Test Macro

自定义测试宏

sql
-- dbt/macros/test_no_orphans.sql
{% test no_orphan_records(model, column_name, parent_model, parent_column) %}

SELECT {{ column_name }}
FROM {{ model }}
WHERE {{ column_name }} IS NOT NULL
  AND {{ column_name }} NOT IN (
      SELECT {{ parent_column }}
      FROM {{ parent_model }}
  )

{% endtest %}
sql
-- dbt/macros/test_no_orphans.sql
{% test no_orphan_records(model, column_name, parent_model, parent_column) %}

SELECT {{ column_name }}
FROM {{ model }}
WHERE {{ column_name }} IS NOT NULL
  AND {{ column_name }} NOT IN (
      SELECT {{ parent_column }}
      FROM {{ parent_model }}
  )

{% endtest %}

Querying the Data Warehouse

数据仓库查询示例

Bronze Layer Query

青铜层查询

sql
-- Raw JSONB data from Airbyte
SELECT 
    _airbyte_ab_id,
    _airbyte_emitted_at,
    _airbyte_data->>'order_id' AS order_id,
    _airbyte_data
FROM airbyte_raw._airbyte_raw_orders
LIMIT 10;
sql
-- 来自Airbyte的原始JSONB数据
SELECT 
    _airbyte_ab_id,
    _airbyte_emitted_at,
    _airbyte_data->>'order_id' AS order_id,
    _airbyte_data
FROM airbyte_raw._airbyte_raw_orders
LIMIT 10;

Silver Layer Query

白银层查询

sql
-- Cleaned typed data
SELECT 
    order_id,
    customer_id,
    order_date,
    total_amount,
    status,
    order_value_segment
FROM silver.silver_orders
WHERE order_date >= CURRENT_DATE - INTERVAL '30 days'
ORDER BY order_date DESC;
sql
-- 经过清洗的结构化数据
SELECT 
    order_id,
    customer_id,
    order_date,
    total_amount,
    status,
    order_value_segment
FROM silver.silver_orders
WHERE order_date >= CURRENT_DATE - INTERVAL '30 days'
ORDER BY order_date DESC;

Gold Layer Query

黄金层查询

sql
-- Business metrics
SELECT 
    product_name,
    category,
    total_revenue,
    total_profit,
    profit_margin_percentage,
    total_quantity_sold
FROM gold.gold_product_performance
WHERE profit_margin_percentage > 20
ORDER BY total_revenue DESC
LIMIT 10;

-- Customer segmentation
SELECT 
    customer_segment,
    COUNT(*) AS customer_count,
    AVG(total_spent) AS avg_lifetime_value,
    AVG(recency_days) AS avg_recency
FROM gold.gold_customer_metrics
GROUP BY customer_segment
ORDER BY avg_lifetime_value DESC;
sql
-- 业务指标
SELECT 
    product_name,
    category,
    total_revenue,
    total_profit,
    profit_margin_percentage,
    total_quantity_sold
FROM gold.gold_product_performance
WHERE profit_margin_percentage > 20
ORDER BY total_revenue DESC
LIMIT 10;

-- 客户分群
SELECT 
    customer_segment,
    COUNT(*) AS customer_count,
    AVG(total_spent) AS avg_lifetime_value,
    AVG(recency_days) AS avg_recency
FROM gold.gold_customer_metrics
GROUP BY customer_segment
ORDER BY avg_lifetime_value DESC;

Troubleshooting

问题排查

Container Won't Start

容器无法启动

bash
undefined
bash
undefined

Check logs

查看日志

make logs SERVICE=postgres make logs SERVICE=airflow-webserver
make logs SERVICE=postgres make logs SERVICE=airflow-webserver

Verify resource allocation

验证资源分配

docker system df docker system prune # Clean up if needed
docker system df docker system prune # 必要时清理资源

Reset specific service

重启指定服务

docker-compose restart postgres
undefined
docker-compose restart postgres
undefined

Airbyte Connection Failing

Airbyte连接失败

bash
undefined
bash
undefined

Test MinIO connectivity

测试MinIO连通性

docker-compose exec airbyte-worker curl http://minio:9000/minio/health/live
docker-compose exec airbyte-worker curl http://minio:9000/minio/health/live

Test PostgreSQL connectivity

测试PostgreSQL连通性

docker-compose exec airbyte-worker
psql -h postgres -U ${POSTGRES_USER} -d ${POSTGRES_DB} -c "SELECT 1"
docker-compose exec airbyte-worker
psql -h postgres -U ${POSTGRES_USER} -d ${POSTGRES_DB} -c "SELECT 1"

Check Airbyte logs

查看Airbyte日志

docker-compose logs airbyte-worker | grep ERROR
undefined
docker-compose logs airbyte-worker | grep ERROR
undefined

DBT Model Failing

DBT模型运行失败

bash
undefined
bash
undefined

Run with debug logging

开启调试日志运行

docker-compose exec dbt dbt run --select silver_orders --debug
docker-compose exec dbt dbt run --select silver_orders --debug

Check compiled SQL

查看编译后的SQL

cat dbt/target/compiled/datawarehouse/models/silver/silver_orders.sql
cat dbt/target/compiled/datawarehouse/models/silver/silver_orders.sql

Test single model

重新运行单个模型

docker-compose exec dbt dbt run --select silver_orders --full-refresh
docker-compose exec dbt dbt run --select silver_orders --full-refresh

Validate syntax without execution

仅验证语法不执行

docker-compose exec dbt dbt parse
undefined
docker-compose exec dbt dbt parse
undefined

Airflow DAG Not Appearing

Airflow DAG未显示

bash
undefined
bash
undefined

Check DAG parsing errors

检查DAG导入错误

docker-compose exec airflow-webserver airflow dags list-import-errors
docker-compose exec airflow-webserver airflow dags list-import-errors

Validate DAG Python syntax

验证DAG Python语法

python -m py_compile airflow/dags/bronze_ingestion_dag.py
python -m py_compile airflow/dags/bronze_ingestion_dag.py

Refresh DAGs

刷新DAG

docker-compose exec airflow-webserver airflow dags trigger <dag_id>
undefined
docker-compose exec airflow-webserver airflow dags trigger <dag_id>
undefined

PostgreSQL Connection Refused

PostgreSQL连接被拒绝

bash
undefined
bash
undefined

Wait for PostgreSQL to be ready

等待PostgreSQL就绪

docker-compose exec postgres pg_isready -U ${POSTGRES_USER}
docker-compose exec postgres pg_isready -U ${POSTGRES_USER}

Check connection from within container

在容器内测试连接

docker-compose exec dbt psql -h postgres -U ${POSTGRES_USER} -d ${POSTGRES_DB}
docker-compose exec dbt psql -h postgres -U ${POSTGRES_USER} -d ${POSTGRES_DB}

Verify connection string in .env

验证.env中的连接字符串

grep POSTGRES .env
undefined
grep POSTGRES .env
undefined

Data Quality Test Failures

数据质量测试失败

bash
undefined
bash
undefined

Run tests with detailed output

运行测试并保存失败结果

docker-compose exec dbt dbt test --select silver_orders --store-failures
docker-compose exec dbt dbt test --select silver_orders --store-failures

Query failed test results

查询测试失败结果

SELECT * FROM silver.test_failures WHERE test_name = 'unique_order_id';
SELECT * FROM silver.test_failures WHERE test_name = 'unique_order_id';

Run specific test

运行指定测试

docker-compose exec dbt dbt test --select test_name:unique_order_id
undefined
docker-compose exec dbt dbt test --select test_name:unique_order_id
undefined

MinIO Upload Failing

MinIO上传失败

bash
undefined
bash
undefined

Check MinIO service status

检查MinIO服务状态

docker-compose ps minio
docker-compose ps minio

Test MinIO API directly

直接测试MinIO API

docker-compose exec minio mc alias set local http://localhost:9000
${MINIO_ROOT_USER} ${MINIO_ROOT_PASSWORD}
docker-compose exec minio mc ls local/${MINIO_BUCKET}
docker-compose exec minio mc alias set local http://localhost:9000
${MINIO_ROOT_USER} ${MINIO_ROOT_PASSWORD}
docker-compose exec minio mc ls local/${MINIO_BUCKET}

Upload test file

上传测试文件

docker-compose exec minio mc cp /tmp/test.csv local/${MINIO_BUCKET}/
undefined
docker-compose exec minio mc cp /tmp/test.csv local/${MINIO_BUCKET}/
undefined

Monitoring & Observability

监控与可观测性

Check Pipeline Health

检查管道健康状态

bash
undefined
bash
undefined

Airflow task status

Airflow任务状态

docker-compose exec airflow-webserver
airflow tasks state bronze_ingestion_dag trigger_airbyte_orders_sync 2024-01-01
docker-compose exec airflow-webserver
airflow tasks state bronze_ingestion_dag trigger_airbyte_orders_sync 2024-01-01

DBT run results

DBT运行结果

docker-compose exec dbt dbt run-operation log_run_results
docker-compose exec dbt dbt run-operation log_run_results

PostgreSQL query performance

PostgreSQL查询性能

docker-compose exec postgres psql -U ${POSTGRES_USER} -d ${POSTGRES_DB} -c
"SELECT query, calls, total_time FROM pg_stat_statements ORDER BY total_time DESC LIMIT 10;"
undefined
docker-compose exec postgres psql -U ${POSTGRES_USER} -d ${POSTGRES_DB} -c
"SELECT query, calls, total_time FROM pg_stat_statements ORDER BY total_time DESC LIMIT 10;"
undefined

Grafana Dashboard Access

Grafana仪表板访问

  1. Access http://localhost:3000
  2. Default dashboard: PostgreSQL Overview
  3. Key metrics:
    • Cache hit rate (should be >95%)
    • Active connections
    • TPS (transactions per second)
    • Query duration percentiles
  1. 访问地址:http://localhost:3000
  2. 默认仪表板:PostgreSQL Overview
  3. 关键指标:
    • 缓存命中率(应>95%)
    • 活跃连接数
    • TPS(每秒事务数)
    • 查询时长百分位数

Custom Alerts

自定义告警

Add to
monitoring/prometheus/alerts.yml
:
yaml
groups:
  - name: dbt_pipeline
    rules:
      - alert: DBTModelFailed
        expr: dbt_model_run_status{status="error"} > 0
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "DBT model {{ $labels.model }} failed"
This skill covers the complete data-engineering-medallion pipeline from setup through production operations, enabling AI agents to assist with implementation, troubleshooting, and extension of medallion architecture data pipelines.
将以下内容添加至
monitoring/prometheus/alerts.yml
yaml
groups:
  - name: dbt_pipeline
    rules:
      - alert: DBTModelFailed
        expr: dbt_model_run_status{status="error"} > 0
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "DBT模型{{ $labels.model }}运行失败"
本Skill覆盖了Medallion架构数据管道从搭建到生产运维的完整流程,支持AI Agent协助实现管道部署、问题排查与功能扩展。