data-engineering-medallion-pipeline
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseData Engineering Medallion Pipeline Skill
数据工程Medallion管道Skill
Skill by ara.so — Data Skills collection.
This skill enables AI agents to work with a complete data engineering pipeline implementing the Medallion Architecture (Bronze → Silver → Gold) using modern open-source tools: MinIO (S3-compatible storage), Airbyte (data ingestion), PostgreSQL (data warehouse), DBT (transformations), Apache Airflow (orchestration), and Grafana (monitoring).
由ara.so提供的Skill —— 数据技能合集。
本Skill支持AI Agent使用现代开源工具构建完整的Medallion架构(青铜→白银→黄金)数据工程管道,涉及工具包括:MinIO(兼容S3的存储)、Airbyte(数据摄取)、PostgreSQL(数据仓库)、DBT(数据转换)、Apache Airflow(任务编排)和Grafana(监控)。
What This Project Does
项目功能
The data-engineering-medallion project provides a complete end-to-end data pipeline that:
- Ingests raw data from MinIO object storage into PostgreSQL using Airbyte
- Transforms data through three layers (Bronze/Silver/Gold) using DBT
- Orchestrates the entire pipeline with Apache Airflow DAGs
- Validates data quality with automated DBT tests
- Monitors infrastructure health with Prometheus and Grafana
- Visualizes business metrics in Power BI dashboards
The architecture follows ELT (Extract-Load-Transform) pattern with clear separation of concerns:
- Bronze: Raw immutable data from sources (JSONB format)
- Silver: Cleaned, validated, and typed data
- Gold: Business-ready aggregated metrics and KPIs
data-engineering-medallion项目提供一套完整的端到端数据管道,具备以下能力:
- 数据摄取:使用Airbyte将MinIO对象存储中的原始数据导入PostgreSQL
- 数据转换:通过DBT实现青铜/白银/黄金三层数据转换
- 任务编排:使用Apache Airflow DAGs编排整个管道流程
- 数据质量校验:通过自动化DBT测试验证数据质量
- 基础设施监控:使用Prometheus和Grafana监控基础设施健康状态
- 业务指标可视化:在Power BI仪表盘中展示业务指标
架构遵循ELT(抽取-加载-转换)模式,各层职责清晰:
- 青铜层:来自数据源的原始不可变数据(JSONB格式)
- 白银层:经过清洗、校验和类型转换的数据
- 黄金层:面向业务的聚合指标与关键绩效指标(KPI)
Installation & Setup
安装与配置
Prerequisites
前置依赖
bash
undefinedbash
undefinedRequired
必需依赖
docker --version # 20.10+
docker-compose --version # 2.0+
docker --version # 20.10+
docker-compose --version # 2.0+
8GB RAM minimum, 16GB recommended
最低8GB内存,推荐16GB
undefinedundefinedClone and Initialize
克隆与初始化
bash
git clone https://github.com/LucasGoulartCouto/data-engineering-medallion.git
cd data-engineering-medallionbash
git clone https://github.com/LucasGoulartCouto/data-engineering-medallion.git
cd data-engineering-medallionSetup environment and start all services
配置环境并启动所有服务
make setup
make start
make setup
make start
Verify all containers are healthy
验证所有容器状态健康
make status
undefinedmake status
undefinedService URLs
服务访问地址
After startup, access these interfaces:
- Airflow: http://localhost:8080 (admin/admin)
- MinIO: http://localhost:9001 (minioadmin/[from .env])
- Airbyte: http://localhost:8000 (create account on first visit)
- Grafana: http://localhost:3000 (admin/admin)
- Prometheus: http://localhost:9090
- DBT Docs: http://localhost:8085 (after )
make dbt-docs
启动完成后,可通过以下地址访问各服务界面:
- Airflow:http://localhost:8080(账号/密码:admin/admin)
- MinIO:http://localhost:9001(账号:minioadmin,密码来自.env文件)
- Airbyte:http://localhost:8000(首次访问需创建账号)
- Grafana:http://localhost:3000(账号/密码:admin/admin)
- Prometheus:http://localhost:9090
- DBT Docs:http://localhost:8085(执行`make dbt-docs`后可访问)
Key Commands (Makefile)
核心命令(Makefile)
bash
undefinedbash
undefinedInfrastructure
基础设施管理
make setup # Create .env, directories, install dependencies
make start # Start all Docker services
make stop # Stop all services
make restart # Restart all services
make status # Check container health
make logs SERVICE=airflow # View logs for specific service
make setup # 创建.env文件、目录并安装依赖
make start # 启动所有Docker服务
make stop # 停止所有服务
make restart # 重启所有服务
make status # 检查容器健康状态
make logs SERVICE=airflow # 查看指定服务的日志
DBT Operations
DBT操作
make dbt-run # Run all DBT models (bronze → silver → gold)
make dbt-test # Run data quality tests
make dbt-docs # Generate and serve documentation
make dbt-snapshot # Capture SCD Type 2 snapshots
make dbt-clean # Clean compiled artifacts
make dbt-run # 运行所有DBT模型(青铜→白银→黄金)
make dbt-test # 运行数据质量测试
make dbt-docs # 生成并启动文档服务
make dbt-snapshot # 捕获SCD Type 2快照
make dbt-clean # 清理编译产物
Data Pipeline
数据管道操作
make upload-data # Upload sample data to MinIO
make trigger-dag DAG_ID=bronze_ingestion_dag # Manually trigger Airflow DAG
make upload-data # 将示例数据上传至MinIO
make trigger-dag DAG_ID=bronze_ingestion_dag # 手动触发Airflow DAG
Development
开发相关
make lint # Lint Python and SQL code
make format # Format Python code with black
make validate # Validate Airflow DAGs and DBT models
make lint # 检查Python和SQL代码规范
make format # 使用black格式化Python代码
make validate # 验证Airflow DAGs和DBT模型
Cleanup
清理操作
make clean # Remove volumes and stop services
make clean-all # Full cleanup including Docker images
undefinedmake clean # 删除卷并停止服务
make clean-all # 完全清理,包括Docker镜像
undefinedProject Structure
项目结构
data-engineering-medallion/
├── airflow/
│ └── dags/
│ ├── bronze_ingestion_dag.py # Triggers Airbyte sync
│ ├── silver_transformation_dag.py # Runs DBT silver models
│ └── gold_aggregation_dag.py # Runs DBT gold models
├── dbt/
│ ├── models/
│ │ ├── bronze/ # Extract JSONB → columnar
│ │ ├── silver/ # Clean, validate, dedupe
│ │ └── gold/ # Business metrics
│ ├── macros/ # Reusable SQL functions
│ ├── snapshots/ # SCD Type 2 history
│ └── tests/ # Custom data quality tests
├── scripts/
│ ├── upload_to_minio.py # Upload CSV/JSON to MinIO
│ └── test_connections.py # Verify service connectivity
├── postgres/init/ # Database initialization SQL
├── monitoring/
│ ├── grafana/provisioning/
│ └── prometheus/
├── docker-compose.yml
├── Makefile
└── .env # Configuration (create from .env.example)data-engineering-medallion/
├── airflow/
│ └── dags/
│ ├── bronze_ingestion_dag.py # 触发Airbyte同步任务
│ ├── silver_transformation_dag.py # 运行DBT白银层模型
│ └── gold_aggregation_dag.py # 运行DBT黄金层模型
├── dbt/
│ ├── models/
│ │ ├── bronze/ # 将JSONB转换为列式存储
│ │ ├── silver/ # 清洗、校验、去重
│ │ └── gold/ # 生成业务指标
│ ├── macros/ # 可复用SQL函数
│ ├── snapshots/ # SCD Type 2历史快照
│ └── tests/ # 自定义数据质量测试
├── scripts/
│ ├── upload_to_minio.py # 将CSV/JSON文件上传至MinIO
│ └── test_connections.py # 验证服务连通性
├── postgres/init/ # 数据库初始化SQL脚本
├── monitoring/
│ ├── grafana/provisioning/
│ └── prometheus/
├── docker-compose.yml
├── Makefile
└── .env # 配置文件(从.env.example创建)DBT Model Development
DBT模型开发
Bronze Layer (Raw Extraction)
青铜层(原始数据抽取)
Bronze models extract JSONB data from Airbyte into typed columns:
sql
-- dbt/models/bronze/bronze_orders.sql
{{ config(
materialized='view',
schema='bronze'
) }}
SELECT
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_data->>'order_id' AS order_id,
_airbyte_data->>'customer_id' AS customer_id,
_airbyte_data->>'order_date' AS order_date,
_airbyte_data->>'total_amount' AS total_amount,
_airbyte_data->>'status' AS status,
_airbyte_data AS raw_data
FROM {{ source('airbyte_raw', '_airbyte_raw_orders') }}青铜层模型将Airbyte中的JSONB数据转换为带类型的列:
sql
-- dbt/models/bronze/bronze_orders.sql
{{ config(
materialized='view',
schema='bronze'
) }}
SELECT
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_data->>'order_id' AS order_id,
_airbyte_data->>'customer_id' AS customer_id,
_airbyte_data->>'order_date' AS order_date,
_airbyte_data->>'total_amount' AS total_amount,
_airbyte_data->>'status' AS status,
_airbyte_data AS raw_data
FROM {{ source('airbyte_raw', '_airbyte_raw_orders') }}Silver Layer (Cleaned & Validated)
白银层(数据清洗与校验)
Silver models clean, cast types, deduplicate, and add calculated fields:
sql
-- dbt/models/silver/silver_orders.sql
{{ config(
materialized='table',
schema='silver',
unique_key='order_id'
) }}
WITH deduplicated AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY _airbyte_emitted_at DESC
) AS rn
FROM {{ ref('bronze_orders') }}
)
SELECT
order_id::INTEGER,
customer_id::INTEGER,
order_date::DATE,
total_amount::DECIMAL(10,2),
UPPER(TRIM(status)) AS status,
CASE
WHEN total_amount::DECIMAL > 1000 THEN 'high_value'
WHEN total_amount::DECIMAL > 500 THEN 'medium_value'
ELSE 'low_value'
END AS order_value_segment,
_airbyte_emitted_at AS ingested_at,
CURRENT_TIMESTAMP AS transformed_at
FROM deduplicated
WHERE rn = 1
AND order_id IS NOT NULL
AND order_date::DATE <= CURRENT_DATE白银层模型完成数据清洗、类型转换、去重,并添加计算字段:
sql
-- dbt/models/silver/silver_orders.sql
{{ config(
materialized='table',
schema='silver',
unique_key='order_id'
) }}
WITH deduplicated AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY _airbyte_emitted_at DESC
) AS rn
FROM {{ ref('bronze_orders') }}
)
SELECT
order_id::INTEGER,
customer_id::INTEGER,
order_date::DATE,
total_amount::DECIMAL(10,2),
UPPER(TRIM(status)) AS status,
CASE
WHEN total_amount::DECIMAL > 1000 THEN 'high_value'
WHEN total_amount::DECIMAL > 500 THEN 'medium_value'
ELSE 'low_value'
END AS order_value_segment,
_airbyte_emitted_at AS ingested_at,
CURRENT_TIMESTAMP AS transformed_at
FROM deduplicated
WHERE rn = 1
AND order_id IS NOT NULL
AND order_date::DATE <= CURRENT_DATEGold Layer (Business Metrics)
黄金层(业务指标聚合)
Gold models aggregate data for business consumption:
sql
-- dbt/models/gold/gold_product_performance.sql
{{ config(
materialized='table',
schema='gold'
) }}
SELECT
p.product_id,
p.product_name,
p.category,
p.supplier,
COUNT(DISTINCT oi.order_id) AS total_orders,
SUM(oi.quantity) AS total_quantity_sold,
SUM(oi.line_total) AS total_revenue,
SUM(oi.quantity * p.cost) AS total_cost,
SUM(oi.line_total) - SUM(oi.quantity * p.cost) AS total_profit,
ROUND(
(SUM(oi.line_total) - SUM(oi.quantity * p.cost)) /
NULLIF(SUM(oi.line_total), 0) * 100,
2
) AS profit_margin_percentage,
AVG(oi.unit_price) AS avg_unit_price,
MAX(o.order_date) AS last_sale_date
FROM {{ ref('silver_products') }} p
INNER JOIN {{ ref('silver_order_items') }} oi
ON p.product_id = oi.product_id
INNER JOIN {{ ref('silver_orders') }} o
ON oi.order_id = o.order_id
WHERE o.status = 'completed'
GROUP BY p.product_id, p.product_name, p.category, p.supplier黄金层模型聚合数据以满足业务使用需求:
sql
-- dbt/models/gold/gold_product_performance.sql
{{ config(
materialized='table',
schema='gold'
) }}
SELECT
p.product_id,
p.product_name,
p.category,
p.supplier,
COUNT(DISTINCT oi.order_id) AS total_orders,
SUM(oi.quantity) AS total_quantity_sold,
SUM(oi.line_total) AS total_revenue,
SUM(oi.quantity * p.cost) AS total_cost,
SUM(oi.line_total) - SUM(oi.quantity * p.cost) AS total_profit,
ROUND(
(SUM(oi.line_total) - SUM(oi.quantity * p.cost)) /
NULLIF(SUM(oi.line_total), 0) * 100,
2
) AS profit_margin_percentage,
AVG(oi.unit_price) AS avg_unit_price,
MAX(o.order_date) AS last_sale_date
FROM {{ ref('silver_products') }} p
INNER JOIN {{ ref('silver_order_items') }} oi
ON p.product_id = oi.product_id
INNER JOIN {{ ref('silver_orders') }} o
ON oi.order_id = o.order_id
WHERE o.status = 'completed'
GROUP BY p.product_id, p.product_name, p.category, p.supplierDBT Testing & Data Quality
DBT测试与数据质量
Schema Tests (schema.yml)
Schema测试(schema.yml)
yaml
undefinedyaml
undefineddbt/models/silver/schema.yml
dbt/models/silver/schema.yml
version: 2
models:
- name: silver_orders
description: "Cleaned and validated orders"
columns:
- name: order_id
description: "Primary key"
tests:
- unique
- not_null
- name: customer_id
description: "Foreign key to customers"
tests:
- not_null
- relationships: to: ref('silver_customers') field: customer_id
- name: status
tests:
- accepted_values: values: ['PENDING', 'COMPLETED', 'CANCELLED', 'REFUNDED']
- name: total_amount
tests:
- dbt_utils.accepted_range: min_value: 0 max_value: 1000000
- name: order_id
description: "Primary key"
tests:
undefinedversion: 2
models:
- name: silver_orders
description: "经过清洗和校验的订单数据"
columns:
- name: order_id
description: "主键"
tests:
- unique
- not_null
- name: customer_id
description: "关联客户表的外键"
tests:
- not_null
- relationships: to: ref('silver_customers') field: customer_id
- name: status
tests:
- accepted_values: values: ['PENDING', 'COMPLETED', 'CANCELLED', 'REFUNDED']
- name: total_amount
tests:
- dbt_utils.accepted_range: min_value: 0 max_value: 1000000
- name: order_id
description: "主键"
tests:
undefinedCustom Tests
自定义测试
sql
-- dbt/tests/assert_no_future_dates_in_sales.sql
SELECT order_id, order_date
FROM {{ ref('silver_orders') }}
WHERE order_date > CURRENT_DATEsql
-- dbt/tests/assert_no_negative_profit.sql
SELECT product_id, total_profit
FROM {{ ref('gold_product_performance') }}
WHERE total_profit < 0sql
-- dbt/tests/assert_no_future_dates_in_sales.sql
SELECT order_id, order_date
FROM {{ ref('silver_orders') }}
WHERE order_date > CURRENT_DATEsql
-- dbt/tests/assert_no_negative_profit.sql
SELECT product_id, total_profit
FROM {{ ref('gold_product_performance') }}
WHERE total_profit < 0Running Tests
运行测试
bash
undefinedbash
undefinedRun all tests
运行所有测试
make dbt-test
make dbt-test
Run tests for specific model
运行指定模型的测试
docker-compose exec dbt dbt test --select silver_orders
docker-compose exec dbt dbt test --select silver_orders
Run specific test type
运行指定类型的测试
docker-compose exec dbt dbt test --select test_type:generic
docker-compose exec dbt dbt test --select test_type:singular
undefineddocker-compose exec dbt dbt test --select test_type:generic
docker-compose exec dbt dbt test --select test_type:singular
undefinedAirflow DAG Development
Airflow DAG开发
Bronze Ingestion DAG
青铜层摄取DAG
python
undefinedpython
undefinedairflow/dags/bronze_ingestion_dag.py
airflow/dags/bronze_ingestion_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'bronze_ingestion_dag',
default_args=default_args,
description='Ingest data from MinIO to PostgreSQL Bronze layer',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['bronze', 'ingestion', 'airbyte'],
) as dag:
trigger_airbyte_sync = AirbyteTriggerSyncOperator(
task_id='trigger_airbyte_orders_sync',
airbyte_conn_id='airbyte_default',
connection_id='{{ var.value.airbyte_connection_id }}',
asynchronous=False,
timeout=3600,
)
def validate_ingestion(**context):
from airflow.providers.postgres.hooks.postgres import PostgresHook
pg_hook = PostgresHook(postgres_conn_id='postgres_default')
# Check row count
result = pg_hook.get_first(
"SELECT COUNT(*) FROM airbyte_raw._airbyte_raw_orders"
)
if result[0] == 0:
raise ValueError("No data ingested to bronze layer")
print(f"Validated {result[0]} rows in bronze layer")
validate_task = PythonOperator(
task_id='validate_bronze_ingestion',
python_callable=validate_ingestion,
)
trigger_airbyte_sync >> validate_taskundefinedfrom datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'bronze_ingestion_dag',
default_args=default_args,
description='将MinIO数据摄取到PostgreSQL青铜层',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['bronze', 'ingestion', 'airbyte'],
) as dag:
trigger_airbyte_sync = AirbyteTriggerSyncOperator(
task_id='trigger_airbyte_orders_sync',
airbyte_conn_id='airbyte_default',
connection_id='{{ var.value.airbyte_connection_id }}',
asynchronous=False,
timeout=3600,
)
def validate_ingestion(**context):
from airflow.providers.postgres.hooks.postgres import PostgresHook
pg_hook = PostgresHook(postgres_conn_id='postgres_default')
# 检查数据行数
result = pg_hook.get_first(
"SELECT COUNT(*) FROM airbyte_raw._airbyte_raw_orders"
)
if result[0] == 0:
raise ValueError("青铜层未摄取到任何数据")
print(f"已验证青铜层中有{result[0]}条数据")
validate_task = PythonOperator(
task_id='validate_bronze_ingestion',
python_callable=validate_ingestion,
)
trigger_airbyte_sync >> validate_taskundefinedSilver Transformation DAG
白银层转换DAG
python
undefinedpython
undefinedairflow/dags/silver_transformation_dag.py
airflow/dags/silver_transformation_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.dbt.cloud.operators.dbt import DbtRunOperator
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=3),
}
with DAG(
'silver_transformation_dag',
default_args=default_args,
description='Transform bronze to silver layer with DBT',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['silver', 'transformation', 'dbt'],
) as dag:
run_silver_models = BashOperator(
task_id='run_dbt_silver_models',
bash_command='cd /opt/dbt && dbt run --select silver.*',
)
test_silver_models = BashOperator(
task_id='test_dbt_silver_models',
bash_command='cd /opt/dbt && dbt test --select silver.*',
)
run_silver_models >> test_silver_modelsundefinedfrom datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.dbt.cloud.operators.dbt import DbtRunOperator
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=3),
}
with DAG(
'silver_transformation_dag',
default_args=default_args,
description='使用DBT将青铜层数据转换为白银层',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['silver', 'transformation', 'dbt'],
) as dag:
run_silver_models = BashOperator(
task_id='run_dbt_silver_models',
bash_command='cd /opt/dbt && dbt run --select silver.*',
)
test_silver_models = BashOperator(
task_id='test_dbt_silver_models',
bash_command='cd /opt/dbt && dbt test --select silver.*',
)
run_silver_models >> test_silver_modelsundefinedGold Aggregation DAG
黄金层聚合DAG
python
undefinedpython
undefinedairflow/dags/gold_aggregation_dag.py
airflow/dags/gold_aggregation_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=3),
}
with DAG(
'gold_aggregation_dag',
default_args=default_args,
description='Build gold layer business metrics',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['gold', 'aggregation', 'metrics'],
) as dag:
run_gold_models = BashOperator(
task_id='run_dbt_gold_models',
bash_command='cd /opt/dbt && dbt run --select gold.*',
)
test_gold_models = BashOperator(
task_id='test_dbt_gold_models',
bash_command='cd /opt/dbt && dbt test --select gold.*',
)
snapshot_gold = BashOperator(
task_id='snapshot_gold_metrics',
bash_command='cd /opt/dbt && dbt snapshot',
)
run_gold_models >> test_gold_models >> snapshot_goldundefinedfrom datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=3),
}
with DAG(
'gold_aggregation_dag',
default_args=default_args,
description='构建黄金层业务指标',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['gold', 'aggregation', 'metrics'],
) as dag:
run_gold_models = BashOperator(
task_id='run_dbt_gold_models',
bash_command='cd /opt/dbt && dbt run --select gold.*',
)
test_gold_models = BashOperator(
task_id='test_dbt_gold_models',
bash_command='cd /opt/dbt && dbt test --select gold.*',
)
snapshot_gold = BashOperator(
task_id='snapshot_gold_metrics',
bash_command='cd /opt/dbt && dbt snapshot',
)
run_gold_models >> test_gold_models >> snapshot_goldundefinedData Upload to MinIO
数据上传至MinIO
python
undefinedpython
undefinedscripts/upload_to_minio.py
scripts/upload_to_minio.py
import os
from minio import Minio
from minio.error import S3Error
import pandas as pd
from pathlib import Path
def upload_data_to_minio():
"""Upload sample CSV data to MinIO bucket"""
# Initialize MinIO client
client = Minio(
os.getenv('MINIO_ENDPOINT', 'localhost:9000'),
access_key=os.getenv('MINIO_ACCESS_KEY', 'minioadmin'),
secret_key=os.getenv('MINIO_SECRET_KEY'),
secure=False
)
bucket_name = os.getenv('MINIO_BUCKET', 'raw-data')
# Create bucket if not exists
try:
if not client.bucket_exists(bucket_name):
client.make_bucket(bucket_name)
print(f"Created bucket: {bucket_name}")
except S3Error as e:
print(f"Error creating bucket: {e}")
return
# Upload files from data directory
data_dir = Path('data/raw')
for file_path in data_dir.glob('*.csv'):
try:
client.fput_object(
bucket_name,
file_path.name,
str(file_path),
content_type='text/csv'
)
print(f"Uploaded: {file_path.name}")
except S3Error as e:
print(f"Error uploading {file_path.name}: {e}")if name == 'main':
upload_data_to_minio()
Run the upload:
```bash
make upload-dataimport os
from minio import Minio
from minio.error import S3Error
import pandas as pd
from pathlib import Path
def upload_data_to_minio():
"""将示例CSV数据上传至MinIO存储桶"""
# 初始化MinIO客户端
client = Minio(
os.getenv('MINIO_ENDPOINT', 'localhost:9000'),
access_key=os.getenv('MINIO_ACCESS_KEY', 'minioadmin'),
secret_key=os.getenv('MINIO_SECRET_KEY'),
secure=False
)
bucket_name = os.getenv('MINIO_BUCKET', 'raw-data')
# 如果存储桶不存在则创建
try:
if not client.bucket_exists(bucket_name):
client.make_bucket(bucket_name)
print(f"已创建存储桶: {bucket_name}")
except S3Error as e:
print(f"创建存储桶出错: {e}")
return
# 上传data目录中的文件
data_dir = Path('data/raw')
for file_path in data_dir.glob('*.csv'):
try:
client.fput_object(
bucket_name,
file_path.name,
str(file_path),
content_type='text/csv'
)
print(f"已上传: {file_path.name}")
except S3Error as e:
print(f"上传{file_path.name}出错: {e}")if name == 'main':
upload_data_to_minio()
运行上传命令:
```bash
make upload-dataor
或
python scripts/upload_to_minio.py
undefinedpython scripts/upload_to_minio.py
undefinedConfiguration
配置说明
Environment Variables (.env)
环境变量(.env)
bash
undefinedbash
undefinedPostgreSQL
PostgreSQL
POSTGRES_USER=dataeng
POSTGRES_PASSWORD=<your-secure-password>
POSTGRES_DB=datawarehouse
POSTGRES_HOST=postgres
POSTGRES_PORT=5432
POSTGRES_USER=dataeng
POSTGRES_PASSWORD=<your-secure-password>
POSTGRES_DB=datawarehouse
POSTGRES_HOST=postgres
POSTGRES_PORT=5432
MinIO
MinIO
MINIO_ROOT_USER=minioadmin
MINIO_ROOT_PASSWORD=<your-secure-password>
MINIO_ENDPOINT=minio:9000
MINIO_BUCKET=raw-data
MINIO_ROOT_USER=minioadmin
MINIO_ROOT_PASSWORD=<your-secure-password>
MINIO_ENDPOINT=minio:9000
MINIO_BUCKET=raw-data
Airflow
Airflow
AIRFLOW_UID=50000
AIRFLOW_GID=0
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/airflow
AIRFLOW__CORE__FERNET_KEY=<generate-with-python-cryptography>
AIRFLOW_UID=50000
AIRFLOW_GID=0
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/airflow
AIRFLOW__CORE__FERNET_KEY=<generate-with-python-cryptography>
Airbyte
Airbyte
AIRBYTE_VERSION=0.50.0
AIRBYTE_VERSION=0.50.0
DBT
DBT
DBT_PROFILES_DIR=/opt/dbt
DBT_PROJECT_DIR=/opt/dbt
DBT_PROFILES_DIR=/opt/dbt
DBT_PROJECT_DIR=/opt/dbt
Grafana
Grafana
GF_SECURITY_ADMIN_PASSWORD=<your-secure-password>
undefinedGF_SECURITY_ADMIN_PASSWORD=<your-secure-password>
undefinedDBT Profile Configuration
DBT配置文件
yaml
undefinedyaml
undefineddbt/profiles.yml
dbt/profiles.yml
datawarehouse:
target: dev
outputs:
dev:
type: postgres
host: "{{ env_var('POSTGRES_HOST') }}"
port: "{{ env_var('POSTGRES_PORT') | int }}"
user: "{{ env_var('POSTGRES_USER') }}"
password: "{{ env_var('POSTGRES_PASSWORD') }}"
dbname: "{{ env_var('POSTGRES_DB') }}"
schema: public
threads: 4
keepalives_idle: 0
undefineddatawarehouse:
target: dev
outputs:
dev:
type: postgres
host: "{{ env_var('POSTGRES_HOST') }}"
port: "{{ env_var('POSTGRES_PORT') | int }}"
user: "{{ env_var('POSTGRES_USER') }}"
password: "{{ env_var('POSTGRES_PASSWORD') }}"
dbname: "{{ env_var('POSTGRES_DB') }}"
schema: public
threads: 4
keepalives_idle: 0
undefinedAirbyte Connection Setup
Airbyte连接配置
-
Access Airbyte UI: http://localhost:8000
-
Create MinIO source:
- Connector: S3
- Endpoint: http://minio:9000
- Bucket: raw-data
- Access Key: ${MINIO_ROOT_USER}
- Secret Key: ${MINIO_ROOT_PASSWORD}
-
Create PostgreSQL destination:
- Host: postgres
- Port: 5432
- Database: datawarehouse
- Schema: bronze
- Username: ${POSTGRES_USER}
- Password: ${POSTGRES_PASSWORD}
-
Create connection with sync mode: Full Refresh | Overwrite
-
访问Airbyte界面:http://localhost:8000
-
创建MinIO数据源:
- 连接器:S3
- 端点:http://minio:9000
- 存储桶:raw-data
- 访问密钥:${MINIO_ROOT_USER}
- 密钥:${MINIO_ROOT_PASSWORD}
-
创建PostgreSQL目标端:
- 主机:postgres
- 端口:5432
- 数据库:datawarehouse
- Schema:bronze
- 用户名:${POSTGRES_USER}
- 密码:${POSTGRES_PASSWORD}
-
创建连接,同步模式选择:Full Refresh | Overwrite
Common Patterns
常用模式
Incremental Model Pattern
增量模型模式
sql
-- dbt/models/silver/silver_orders_incremental.sql
{{ config(
materialized='incremental',
unique_key='order_id',
schema='silver',
on_schema_change='append_new_columns'
) }}
SELECT
order_id,
customer_id,
order_date,
total_amount,
status,
_airbyte_emitted_at AS ingested_at
FROM {{ ref('bronze_orders') }}
{% if is_incremental() %}
WHERE _airbyte_emitted_at > (SELECT MAX(ingested_at) FROM {{ this }})
{% endif %}sql
-- dbt/models/silver/silver_orders_incremental.sql
{{ config(
materialized='incremental',
unique_key='order_id',
schema='silver',
on_schema_change='append_new_columns'
) }}
SELECT
order_id,
customer_id,
order_date,
total_amount,
status,
_airbyte_emitted_at AS ingested_at
FROM {{ ref('bronze_orders') }}
{% if is_incremental() %}
WHERE _airbyte_emitted_at > (SELECT MAX(ingested_at) FROM {{ this }})
{% endif %}Macro for Common Transformations
通用转换宏
sql
-- dbt/macros/clean_string.sql
{% macro clean_string(column_name) %}
UPPER(TRIM(REGEXP_REPLACE({{ column_name }}, '\s+', ' ', 'g')))
{% endmacro %}
-- Usage in model
SELECT {{ clean_string('customer_name') }} AS customer_name
FROM {{ ref('bronze_customers') }}sql
-- dbt/macros/clean_string.sql
{% macro clean_string(column_name) %}
UPPER(TRIM(REGEXP_REPLACE({{ column_name }}, '\s+', ' ', 'g')))
{% endmacro %}
-- 在模型中使用
SELECT {{ clean_string('customer_name') }} AS customer_name
FROM {{ ref('bronze_customers') }}Snapshot (SCD Type 2)
快照(SCD Type 2)
sql
-- dbt/snapshots/snapshot_customer_segments.sql
{% snapshot snapshot_customer_segments %}
{{
config(
target_schema='snapshots',
unique_key='customer_id',
strategy='timestamp',
updated_at='updated_at',
)
}}
SELECT * FROM {{ ref('gold_customer_metrics') }}
{% endsnapshot %}sql
-- dbt/snapshots/snapshot_customer_segments.sql
{% snapshot snapshot_customer_segments %}
{{
config(
target_schema='snapshots',
unique_key='customer_id',
strategy='timestamp',
updated_at='updated_at',
)
}}
SELECT * FROM {{ ref('gold_customer_metrics') }}
{% endsnapshot %}Custom Test Macro
自定义测试宏
sql
-- dbt/macros/test_no_orphans.sql
{% test no_orphan_records(model, column_name, parent_model, parent_column) %}
SELECT {{ column_name }}
FROM {{ model }}
WHERE {{ column_name }} IS NOT NULL
AND {{ column_name }} NOT IN (
SELECT {{ parent_column }}
FROM {{ parent_model }}
)
{% endtest %}sql
-- dbt/macros/test_no_orphans.sql
{% test no_orphan_records(model, column_name, parent_model, parent_column) %}
SELECT {{ column_name }}
FROM {{ model }}
WHERE {{ column_name }} IS NOT NULL
AND {{ column_name }} NOT IN (
SELECT {{ parent_column }}
FROM {{ parent_model }}
)
{% endtest %}Querying the Data Warehouse
数据仓库查询示例
Bronze Layer Query
青铜层查询
sql
-- Raw JSONB data from Airbyte
SELECT
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_data->>'order_id' AS order_id,
_airbyte_data
FROM airbyte_raw._airbyte_raw_orders
LIMIT 10;sql
-- 来自Airbyte的原始JSONB数据
SELECT
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_data->>'order_id' AS order_id,
_airbyte_data
FROM airbyte_raw._airbyte_raw_orders
LIMIT 10;Silver Layer Query
白银层查询
sql
-- Cleaned typed data
SELECT
order_id,
customer_id,
order_date,
total_amount,
status,
order_value_segment
FROM silver.silver_orders
WHERE order_date >= CURRENT_DATE - INTERVAL '30 days'
ORDER BY order_date DESC;sql
-- 经过清洗的结构化数据
SELECT
order_id,
customer_id,
order_date,
total_amount,
status,
order_value_segment
FROM silver.silver_orders
WHERE order_date >= CURRENT_DATE - INTERVAL '30 days'
ORDER BY order_date DESC;Gold Layer Query
黄金层查询
sql
-- Business metrics
SELECT
product_name,
category,
total_revenue,
total_profit,
profit_margin_percentage,
total_quantity_sold
FROM gold.gold_product_performance
WHERE profit_margin_percentage > 20
ORDER BY total_revenue DESC
LIMIT 10;
-- Customer segmentation
SELECT
customer_segment,
COUNT(*) AS customer_count,
AVG(total_spent) AS avg_lifetime_value,
AVG(recency_days) AS avg_recency
FROM gold.gold_customer_metrics
GROUP BY customer_segment
ORDER BY avg_lifetime_value DESC;sql
-- 业务指标
SELECT
product_name,
category,
total_revenue,
total_profit,
profit_margin_percentage,
total_quantity_sold
FROM gold.gold_product_performance
WHERE profit_margin_percentage > 20
ORDER BY total_revenue DESC
LIMIT 10;
-- 客户分群
SELECT
customer_segment,
COUNT(*) AS customer_count,
AVG(total_spent) AS avg_lifetime_value,
AVG(recency_days) AS avg_recency
FROM gold.gold_customer_metrics
GROUP BY customer_segment
ORDER BY avg_lifetime_value DESC;Troubleshooting
问题排查
Container Won't Start
容器无法启动
bash
undefinedbash
undefinedCheck logs
查看日志
make logs SERVICE=postgres
make logs SERVICE=airflow-webserver
make logs SERVICE=postgres
make logs SERVICE=airflow-webserver
Verify resource allocation
验证资源分配
docker system df
docker system prune # Clean up if needed
docker system df
docker system prune # 必要时清理资源
Reset specific service
重启指定服务
docker-compose restart postgres
undefineddocker-compose restart postgres
undefinedAirbyte Connection Failing
Airbyte连接失败
bash
undefinedbash
undefinedTest MinIO connectivity
测试MinIO连通性
docker-compose exec airbyte-worker curl http://minio:9000/minio/health/live
docker-compose exec airbyte-worker curl http://minio:9000/minio/health/live
Test PostgreSQL connectivity
测试PostgreSQL连通性
docker-compose exec airbyte-worker
psql -h postgres -U ${POSTGRES_USER} -d ${POSTGRES_DB} -c "SELECT 1"
psql -h postgres -U ${POSTGRES_USER} -d ${POSTGRES_DB} -c "SELECT 1"
docker-compose exec airbyte-worker
psql -h postgres -U ${POSTGRES_USER} -d ${POSTGRES_DB} -c "SELECT 1"
psql -h postgres -U ${POSTGRES_USER} -d ${POSTGRES_DB} -c "SELECT 1"
Check Airbyte logs
查看Airbyte日志
docker-compose logs airbyte-worker | grep ERROR
undefineddocker-compose logs airbyte-worker | grep ERROR
undefinedDBT Model Failing
DBT模型运行失败
bash
undefinedbash
undefinedRun with debug logging
开启调试日志运行
docker-compose exec dbt dbt run --select silver_orders --debug
docker-compose exec dbt dbt run --select silver_orders --debug
Check compiled SQL
查看编译后的SQL
cat dbt/target/compiled/datawarehouse/models/silver/silver_orders.sql
cat dbt/target/compiled/datawarehouse/models/silver/silver_orders.sql
Test single model
重新运行单个模型
docker-compose exec dbt dbt run --select silver_orders --full-refresh
docker-compose exec dbt dbt run --select silver_orders --full-refresh
Validate syntax without execution
仅验证语法不执行
docker-compose exec dbt dbt parse
undefineddocker-compose exec dbt dbt parse
undefinedAirflow DAG Not Appearing
Airflow DAG未显示
bash
undefinedbash
undefinedCheck DAG parsing errors
检查DAG导入错误
docker-compose exec airflow-webserver airflow dags list-import-errors
docker-compose exec airflow-webserver airflow dags list-import-errors
Validate DAG Python syntax
验证DAG Python语法
python -m py_compile airflow/dags/bronze_ingestion_dag.py
python -m py_compile airflow/dags/bronze_ingestion_dag.py
Refresh DAGs
刷新DAG
docker-compose exec airflow-webserver airflow dags trigger <dag_id>
undefineddocker-compose exec airflow-webserver airflow dags trigger <dag_id>
undefinedPostgreSQL Connection Refused
PostgreSQL连接被拒绝
bash
undefinedbash
undefinedWait for PostgreSQL to be ready
等待PostgreSQL就绪
docker-compose exec postgres pg_isready -U ${POSTGRES_USER}
docker-compose exec postgres pg_isready -U ${POSTGRES_USER}
Check connection from within container
在容器内测试连接
docker-compose exec dbt psql -h postgres -U ${POSTGRES_USER} -d ${POSTGRES_DB}
docker-compose exec dbt psql -h postgres -U ${POSTGRES_USER} -d ${POSTGRES_DB}
Verify connection string in .env
验证.env中的连接字符串
grep POSTGRES .env
undefinedgrep POSTGRES .env
undefinedData Quality Test Failures
数据质量测试失败
bash
undefinedbash
undefinedRun tests with detailed output
运行测试并保存失败结果
docker-compose exec dbt dbt test --select silver_orders --store-failures
docker-compose exec dbt dbt test --select silver_orders --store-failures
Query failed test results
查询测试失败结果
SELECT * FROM silver.test_failures
WHERE test_name = 'unique_order_id';
SELECT * FROM silver.test_failures
WHERE test_name = 'unique_order_id';
Run specific test
运行指定测试
docker-compose exec dbt dbt test --select test_name:unique_order_id
undefineddocker-compose exec dbt dbt test --select test_name:unique_order_id
undefinedMinIO Upload Failing
MinIO上传失败
bash
undefinedbash
undefinedCheck MinIO service status
检查MinIO服务状态
docker-compose ps minio
docker-compose ps minio
Test MinIO API directly
直接测试MinIO API
docker-compose exec minio mc alias set local http://localhost:9000
${MINIO_ROOT_USER} ${MINIO_ROOT_PASSWORD}
${MINIO_ROOT_USER} ${MINIO_ROOT_PASSWORD}
docker-compose exec minio mc ls local/${MINIO_BUCKET}
docker-compose exec minio mc alias set local http://localhost:9000
${MINIO_ROOT_USER} ${MINIO_ROOT_PASSWORD}
${MINIO_ROOT_USER} ${MINIO_ROOT_PASSWORD}
docker-compose exec minio mc ls local/${MINIO_BUCKET}
Upload test file
上传测试文件
docker-compose exec minio mc cp /tmp/test.csv local/${MINIO_BUCKET}/
undefineddocker-compose exec minio mc cp /tmp/test.csv local/${MINIO_BUCKET}/
undefinedMonitoring & Observability
监控与可观测性
Check Pipeline Health
检查管道健康状态
bash
undefinedbash
undefinedAirflow task status
Airflow任务状态
docker-compose exec airflow-webserver
airflow tasks state bronze_ingestion_dag trigger_airbyte_orders_sync 2024-01-01
airflow tasks state bronze_ingestion_dag trigger_airbyte_orders_sync 2024-01-01
docker-compose exec airflow-webserver
airflow tasks state bronze_ingestion_dag trigger_airbyte_orders_sync 2024-01-01
airflow tasks state bronze_ingestion_dag trigger_airbyte_orders_sync 2024-01-01
DBT run results
DBT运行结果
docker-compose exec dbt dbt run-operation log_run_results
docker-compose exec dbt dbt run-operation log_run_results
PostgreSQL query performance
PostgreSQL查询性能
docker-compose exec postgres psql -U ${POSTGRES_USER} -d ${POSTGRES_DB} -c
"SELECT query, calls, total_time FROM pg_stat_statements ORDER BY total_time DESC LIMIT 10;"
"SELECT query, calls, total_time FROM pg_stat_statements ORDER BY total_time DESC LIMIT 10;"
undefineddocker-compose exec postgres psql -U ${POSTGRES_USER} -d ${POSTGRES_DB} -c
"SELECT query, calls, total_time FROM pg_stat_statements ORDER BY total_time DESC LIMIT 10;"
"SELECT query, calls, total_time FROM pg_stat_statements ORDER BY total_time DESC LIMIT 10;"
undefinedGrafana Dashboard Access
Grafana仪表板访问
- Access http://localhost:3000
- Default dashboard: PostgreSQL Overview
- Key metrics:
- Cache hit rate (should be >95%)
- Active connections
- TPS (transactions per second)
- Query duration percentiles
- 访问地址:http://localhost:3000
- 默认仪表板:PostgreSQL Overview
- 关键指标:
- 缓存命中率(应>95%)
- 活跃连接数
- TPS(每秒事务数)
- 查询时长百分位数
Custom Alerts
自定义告警
Add to :
monitoring/prometheus/alerts.ymlyaml
groups:
- name: dbt_pipeline
rules:
- alert: DBTModelFailed
expr: dbt_model_run_status{status="error"} > 0
for: 5m
labels:
severity: critical
annotations:
summary: "DBT model {{ $labels.model }} failed"This skill covers the complete data-engineering-medallion pipeline from setup through production operations, enabling AI agents to assist with implementation, troubleshooting, and extension of medallion architecture data pipelines.
将以下内容添加至:
monitoring/prometheus/alerts.ymlyaml
groups:
- name: dbt_pipeline
rules:
- alert: DBTModelFailed
expr: dbt_model_run_status{status="error"} > 0
for: 5m
labels:
severity: critical
annotations:
summary: "DBT模型{{ $labels.model }}运行失败"本Skill覆盖了Medallion架构数据管道从搭建到生产运维的完整流程,支持AI Agent协助实现管道部署、问题排查与功能扩展。