dbt-transformation-patterns
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
Chinesedbt Transformation Patterns
dbt 数据转换模式
Production-ready patterns for dbt (data build tool) including model organization, testing strategies, documentation, and incremental processing.
可用于生产环境的dbt(data build tool)模式,包括模型组织、测试策略、文档编写和增量处理。
When to Use This Skill
何时使用该技能
- Building data transformation pipelines with dbt
- Organizing models into staging, intermediate, and marts layers
- Implementing data quality tests
- Creating incremental models for large datasets
- Documenting data models and lineage
- Setting up dbt project structure
- 使用dbt构建数据转换管道
- 将模型组织为staging(暂存)、intermediate(中间)和marts(集市)层
- 实施数据质量测试
- 为大型数据集创建增量模型
- 为数据模型和血缘关系编写文档
- 搭建dbt项目结构
Core Concepts
核心概念
1. Model Layers (Medallion Architecture)
1. 模型层(Medallion架构)
sources/ Raw data definitions
↓
staging/ 1:1 with source, light cleaning
↓
intermediate/ Business logic, joins, aggregations
↓
marts/ Final analytics tablessources/ 原始数据定义
↓
staging/ 与源数据1:1映射,轻量清洗
↓
intermediate/ 业务逻辑、关联、聚合
↓
marts/ 最终分析表2. Naming Conventions
2. 命名规范
| Layer | Prefix | Example |
|---|---|---|
| Staging | | |
| Intermediate | | |
| Marts | | |
| 层级 | 前缀 | 示例 |
|---|---|---|
| Staging | | |
| Intermediate | | |
| Marts | | |
Quick Start
快速开始
yaml
undefinedyaml
undefineddbt_project.yml
dbt_project.yml
name: "analytics"
version: "1.0.0"
profile: "analytics"
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
vars:
start_date: "2020-01-01"
models:
analytics:
staging:
+materialized: view
+schema: staging
intermediate:
+materialized: ephemeral
marts:
+materialized: table
+schema: analytics
undefinedname: "analytics"
version: "1.0.0"
profile: "analytics"
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
vars:
start_date: "2020-01-01"
models:
analytics:
staging:
+materialized: view
+schema: staging
intermediate:
+materialized: ephemeral
marts:
+materialized: table
+schema: analytics
undefinedProject structure
项目结构
models/
├── staging/
│ ├── stripe/
│ │ ├── _stripe__sources.yml
│ │ ├── _stripe__models.yml
│ │ ├── stg_stripe__customers.sql
│ │ └── stg_stripe__payments.sql
│ └── shopify/
│ ├── _shopify__sources.yml
│ └── stg_shopify__orders.sql
├── intermediate/
│ └── finance/
│ └── int_payments_pivoted.sql
└── marts/
├── core/
│ ├── _core__models.yml
│ ├── dim_customers.sql
│ └── fct_orders.sql
└── finance/
└── fct_revenue.sql
undefinedmodels/
├── staging/
│ ├── stripe/
│ │ ├── _stripe__sources.yml
│ │ ├── _stripe__models.yml
│ │ ├── stg_stripe__customers.sql
│ │ └── stg_stripe__payments.sql
│ └── shopify/
│ ├── _shopify__sources.yml
│ └── stg_shopify__orders.sql
├── intermediate/
│ └── finance/
│ └── int_payments_pivoted.sql
└── marts/
├── core/
│ ├── _core__models.yml
│ ├── dim_customers.sql
│ └── fct_orders.sql
└── finance/
└── fct_revenue.sql
undefinedPatterns
模式示例
Pattern 1: Source Definitions
模式1:源数据定义
yaml
undefinedyaml
undefinedmodels/staging/stripe/_stripe__sources.yml
models/staging/stripe/_stripe__sources.yml
version: 2
sources:
- name: stripe
description: Raw Stripe data loaded via Fivetran
database: raw
schema: stripe
loader: fivetran
loaded_at_field: _fivetran_synced
freshness:
warn_after: { count: 12, period: hour }
error_after: { count: 24, period: hour }
tables:
-
name: customers description: Stripe customer records columns:
- name: id
description: Primary key
tests:
- unique
- not_null
- name: email description: Customer email
- name: created description: Account creation timestamp
- name: id
description: Primary key
tests:
-
name: payments description: Stripe payment transactions columns:
- name: id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships: to: source('stripe', 'customers') field: id
- name: id
tests:
-
undefinedversion: 2
sources:
- name: stripe
description: 通过Fivetran加载的原始Stripe数据
database: raw
schema: stripe
loader: fivetran
loaded_at_field: _fivetran_synced
freshness:
warn_after: { count: 12, period: hour }
error_after: { count: 24, period: hour }
tables:
-
name: customers description: Stripe客户记录 columns:
- name: id
description: 主键
tests:
- unique
- not_null
- name: email description: 客户邮箱
- name: created description: 账户创建时间戳
- name: id
description: 主键
tests:
-
name: payments description: Stripe支付交易记录 columns:
- name: id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships: to: source('stripe', 'customers') field: id
- name: id
tests:
-
undefinedPattern 2: Staging Models
模式2:暂存模型
sql
-- models/staging/stripe/stg_stripe__customers.sql
with source as (
select * from {{ source('stripe', 'customers') }}
),
renamed as (
select
-- ids
id as customer_id,
-- strings
lower(email) as email,
name as customer_name,
-- timestamps
created as created_at,
-- metadata
_fivetran_synced as _loaded_at
from source
)
select * from renamedsql
-- models/staging/stripe/stg_stripe__payments.sql
{{
config(
materialized='incremental',
unique_key='payment_id',
on_schema_change='append_new_columns'
)
}}
with source as (
select * from {{ source('stripe', 'payments') }}
{% if is_incremental() %}
where _fivetran_synced > (select max(_loaded_at) from {{ this }})
{% endif %}
),
renamed as (
select
-- ids
id as payment_id,
customer_id,
invoice_id,
-- amounts (convert cents to dollars)
amount / 100.0 as amount,
amount_refunded / 100.0 as amount_refunded,
-- status
status as payment_status,
-- timestamps
created as created_at,
-- metadata
_fivetran_synced as _loaded_at
from source
)
select * from renamedsql
-- models/staging/stripe/stg_stripe__customers.sql
with source as (
select * from {{ source('stripe', 'customers') }}
),
renamed as (
select
-- 标识符
id as customer_id,
-- 字符串字段
lower(email) as email,
name as customer_name,
-- 时间戳
created as created_at,
-- 元数据
_fivetran_synced as _loaded_at
from source
)
select * from renamedsql
-- models/staging/stripe/stg_stripe__payments.sql
{{
config(
materialized='incremental',
unique_key='payment_id',
on_schema_change='append_new_columns'
)
}}
with source as (
select * from {{ source('stripe', 'payments') }}
{% if is_incremental() %}
where _fivetran_synced > (select max(_loaded_at) from {{ this }})
{% endif %}
),
renamed as (
select
-- 标识符
id as payment_id,
customer_id,
invoice_id,
-- 金额(转换为美元,原单位为美分)
amount / 100.0 as amount,
amount_refunded / 100.0 as amount_refunded,
-- 状态
status as payment_status,
-- 时间戳
created as created_at,
-- 元数据
_fivetran_synced as _loaded_at
from source
)
select * from renamedPattern 3: Intermediate Models
模式3:中间模型
sql
-- models/intermediate/finance/int_payments_pivoted_to_customer.sql
with payments as (
select * from {{ ref('stg_stripe__payments') }}
),
customers as (
select * from {{ ref('stg_stripe__customers') }}
),
payment_summary as (
select
customer_id,
count(*) as total_payments,
count(case when payment_status = 'succeeded' then 1 end) as successful_payments,
sum(case when payment_status = 'succeeded' then amount else 0 end) as total_amount_paid,
min(created_at) as first_payment_at,
max(created_at) as last_payment_at
from payments
group by customer_id
)
select
customers.customer_id,
customers.email,
customers.created_at as customer_created_at,
coalesce(payment_summary.total_payments, 0) as total_payments,
coalesce(payment_summary.successful_payments, 0) as successful_payments,
coalesce(payment_summary.total_amount_paid, 0) as lifetime_value,
payment_summary.first_payment_at,
payment_summary.last_payment_at
from customers
left join payment_summary using (customer_id)sql
-- models/intermediate/finance/int_payments_pivoted_to_customer.sql
with payments as (
select * from {{ ref('stg_stripe__payments') }}
),
customers as (
select * from {{ ref('stg_stripe__customers') }}
),
payment_summary as (
select
customer_id,
count(*) as total_payments,
count(case when payment_status = 'succeeded' then 1 end) as successful_payments,
sum(case when payment_status = 'succeeded' then amount else 0 end) as total_amount_paid,
min(created_at) as first_payment_at,
max(created_at) as last_payment_at
from payments
group by customer_id
)
select
customers.customer_id,
customers.email,
customers.created_at as customer_created_at,
coalesce(payment_summary.total_payments, 0) as total_payments,
coalesce(payment_summary.successful_payments, 0) as successful_payments,
coalesce(payment_summary.total_amount_paid, 0) as lifetime_value,
payment_summary.first_payment_at,
payment_summary.last_payment_at
from customers
left join payment_summary using (customer_id)Pattern 4: Mart Models (Dimensions and Facts)
模式4:集市模型(维度表和事实表)
sql
-- models/marts/core/dim_customers.sql
{{
config(
materialized='table',
unique_key='customer_id'
)
}}
with customers as (
select * from {{ ref('int_payments_pivoted_to_customer') }}
),
orders as (
select * from {{ ref('stg_shopify__orders') }}
),
order_summary as (
select
customer_id,
count(*) as total_orders,
sum(total_price) as total_order_value,
min(created_at) as first_order_at,
max(created_at) as last_order_at
from orders
group by customer_id
),
final as (
select
-- surrogate key
{{ dbt_utils.generate_surrogate_key(['customers.customer_id']) }} as customer_key,
-- natural key
customers.customer_id,
-- attributes
customers.email,
customers.customer_created_at,
-- payment metrics
customers.total_payments,
customers.successful_payments,
customers.lifetime_value,
customers.first_payment_at,
customers.last_payment_at,
-- order metrics
coalesce(order_summary.total_orders, 0) as total_orders,
coalesce(order_summary.total_order_value, 0) as total_order_value,
order_summary.first_order_at,
order_summary.last_order_at,
-- calculated fields
case
when customers.lifetime_value >= 1000 then 'high'
when customers.lifetime_value >= 100 then 'medium'
else 'low'
end as customer_tier,
-- timestamps
current_timestamp as _loaded_at
from customers
left join order_summary using (customer_id)
)
select * from finalsql
-- models/marts/core/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge'
)
}}
with orders as (
select * from {{ ref('stg_shopify__orders') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}
),
customers as (
select * from {{ ref('dim_customers') }}
),
final as (
select
-- keys
orders.order_id,
customers.customer_key,
orders.customer_id,
-- dimensions
orders.order_status,
orders.fulfillment_status,
orders.payment_status,
-- measures
orders.subtotal,
orders.tax,
orders.shipping,
orders.total_price,
orders.total_discount,
orders.item_count,
-- timestamps
orders.created_at,
orders.updated_at,
orders.fulfilled_at,
-- metadata
current_timestamp as _loaded_at
from orders
left join customers on orders.customer_id = customers.customer_id
)
select * from finalsql
-- models/marts/core/dim_customers.sql
{{
config(
materialized='table',
unique_key='customer_id'
)
}}
with customers as (
select * from {{ ref('int_payments_pivoted_to_customer') }}
),
orders as (
select * from {{ ref('stg_shopify__orders') }}
),
order_summary as (
select
customer_id,
count(*) as total_orders,
sum(total_price) as total_order_value,
min(created_at) as first_order_at,
max(created_at) as last_order_at
from orders
group by customer_id
),
final as (
select
-- 代理键
{{ dbt_utils.generate_surrogate_key(['customers.customer_id']) }} as customer_key,
-- 自然键
customers.customer_id,
-- 属性字段
customers.email,
customers.customer_created_at,
-- 支付指标
customers.total_payments,
customers.successful_payments,
customers.lifetime_value,
customers.first_payment_at,
customers.last_payment_at,
-- 订单指标
coalesce(order_summary.total_orders, 0) as total_orders,
coalesce(order_summary.total_order_value, 0) as total_order_value,
order_summary.first_order_at,
order_summary.last_order_at,
-- 计算字段
case
when customers.lifetime_value >= 1000 then 'high'
when customers.lifetime_value >= 100 then 'medium'
else 'low'
end as customer_tier,
-- 时间戳
current_timestamp as _loaded_at
from customers
left join order_summary using (customer_id)
)
select * from finalsql
-- models/marts/core/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge'
)
}}
with orders as (
select * from {{ ref('stg_shopify__orders') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}
),
customers as (
select * from {{ ref('dim_customers') }}
),
final as (
select
-- 键值
orders.order_id,
customers.customer_key,
orders.customer_id,
-- 维度字段
orders.order_status,
orders.fulfillment_status,
orders.payment_status,
-- 度量字段
orders.subtotal,
orders.tax,
orders.shipping,
orders.total_price,
orders.total_discount,
orders.item_count,
-- 时间戳
orders.created_at,
orders.updated_at,
orders.fulfilled_at,
-- 元数据
current_timestamp as _loaded_at
from orders
left join customers on orders.customer_id = customers.customer_id
)
select * from finalPattern 5: Testing and Documentation
模式5:测试与文档
yaml
undefinedyaml
undefinedmodels/marts/core/_core__models.yml
models/marts/core/_core__models.yml
version: 2
models:
-
name: dim_customers description: Customer dimension with payment and order metrics columns:
-
name: customer_key description: Surrogate key for the customer dimension tests:
- unique
- not_null
-
name: customer_id description: Natural key from source system tests:
- unique
- not_null
-
name: email description: Customer email address tests:
- not_null
-
name: customer_tier description: Customer value tier based on lifetime value tests:
- accepted_values: values: ["high", "medium", "low"]
-
name: lifetime_value description: Total amount paid by customer tests:
- dbt_utils.expression_is_true: expression: ">= 0"
-
-
name: fct_orders description: Order fact table with all order transactions tests:
- dbt_utils.recency: datepart: day field: created_at interval: 1 columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_key
tests:
- not_null
- relationships: to: ref('dim_customers') field: customer_key
undefinedversion: 2
models:
-
name: dim_customers description: 包含支付和订单指标的客户维度表 columns:
-
name: customer_key description: 客户维度表的代理键 tests:
- unique
- not_null
-
name: customer_id description: 源系统的自然键 tests:
- unique
- not_null
-
name: email description: 客户邮箱地址 tests:
- not_null
-
name: customer_tier description: 基于客户生命周期价值划分的客户层级 tests:
- accepted_values: values: ["high", "medium", "low"]
-
name: lifetime_value description: 客户累计支付金额 tests:
- dbt_utils.expression_is_true: expression: ">= 0"
-
-
name: fct_orders description: 包含所有订单交易记录的订单事实表 tests:
- dbt_utils.recency: datepart: day field: created_at interval: 1 columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_key
tests:
- not_null
- relationships: to: ref('dim_customers') field: customer_key
undefinedPattern 6: Macros and DRY Code
模式6:宏与DRY代码(避免重复)
sql
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) %}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name }}
{%- endif -%}
{% endmacro %}
-- macros/limit_data_in_dev.sql
{% macro limit_data_in_dev(column_name, days=3) %}
{% if target.name == 'dev' %}
where {{ column_name }} >= dateadd(day, -{{ days }}, current_date)
{% endif %}
{% endmacro %}
-- Usage in model
select * from {{ ref('stg_orders') }}
{{ limit_data_in_dev('created_at') }}sql
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) %}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name }}
{%- endif -%}
{% endmacro %}
-- macros/limit_data_in_dev.sql
{% macro limit_data_in_dev(column_name, days=3) %}
{% if target.name == 'dev' %}
where {{ column_name }} >= dateadd(day, -{{ days }}, current_date)
{% endif %}
{% endmacro %}
-- 在模型中使用
select * from {{ ref('stg_orders') }}
{{ limit_data_in_dev('created_at') }}Pattern 7: Incremental Strategies
模式7:增量策略
sql
-- Delete+Insert (default for most warehouses)
{{
config(
materialized='incremental',
unique_key='id',
incremental_strategy='delete+insert'
)
}}
-- Merge (best for late-arriving data)
{{
config(
materialized='incremental',
unique_key='id',
incremental_strategy='merge',
merge_update_columns=['status', 'amount', 'updated_at']
)
}}
-- Insert Overwrite (partition-based)
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={
"field": "created_date",
"data_type": "date",
"granularity": "day"
}
)
}}
select
*,
date(created_at) as created_date
from {{ ref('stg_events') }}
{% if is_incremental() %}
where created_date >= dateadd(day, -3, current_date)
{% endif %}sql
-- 删除+插入(多数数仓的默认策略)
{{
config(
materialized='incremental',
unique_key='id',
incremental_strategy='delete+insert'
)
}}
-- 合并(适用于延迟到达的数据)
{{
config(
materialized='incremental',
unique_key='id',
incremental_strategy='merge',
merge_update_columns=['status', 'amount', 'updated_at']
)
}}
-- 插入覆盖(基于分区的策略)
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={
"field": "created_date",
"data_type": "date",
"granularity": "day"
}
)
}}
select
*,
date(created_at) as created_date
from {{ ref('stg_events') }}
{% if is_incremental() %}
where created_date >= dateadd(day, -3, current_date)
{% endif %}dbt Commands
dbt 命令
bash
undefinedbash
undefinedDevelopment
开发阶段
dbt run # Run all models
dbt run --select staging # Run staging models only
dbt run --select +fct_orders # Run fct_orders and its upstream
dbt run --select fct_orders+ # Run fct_orders and its downstream
dbt run --full-refresh # Rebuild incremental models
dbt run # 运行所有模型
dbt run --select staging # 仅运行暂存层模型
dbt run --select +fct_orders # 运行fct_orders及其上游依赖模型
dbt run --select fct_orders+ # 运行fct_orders及其下游依赖模型
dbt run --full-refresh # 重建增量模型
Testing
测试阶段
dbt test # Run all tests
dbt test --select stg_stripe # Test specific models
dbt build # Run + test in DAG order
dbt test # 运行所有测试
dbt test --select stg_stripe # 测试指定模型
dbt build # 按DAG顺序运行模型并执行测试
Documentation
文档生成
dbt docs generate # Generate docs
dbt docs serve # Serve docs locally
dbt docs generate # 生成文档
dbt docs serve # 本地启动文档服务
Debugging
调试阶段
dbt compile # Compile SQL without running
dbt debug # Test connection
dbt ls --select tag:critical # List models by tag
undefineddbt compile # 编译SQL但不执行
dbt debug # 测试数据库连接
dbt ls --select tag:critical # 按标签列出模型
undefinedBest Practices
最佳实践
Do's
建议
- Use staging layer - Clean data once, use everywhere
- Test aggressively - Not null, unique, relationships
- Document everything - Column descriptions, model descriptions
- Use incremental - For tables > 1M rows
- Version control - dbt project in Git
- 使用暂存层 - 一次性清洗数据,多处复用
- 充分测试 - 非空、唯一性、关联关系等测试
- 全面文档 - 字段描述、模型描述
- 使用增量模型 - 适用于数据量超过100万行的表
- 版本控制 - 将dbt项目纳入Git管理
Don'ts
禁忌
- Don't skip staging - Raw → mart is tech debt
- Don't hardcode dates - Use
{{ var('start_date') }} - Don't repeat logic - Extract to macros
- Don't test in prod - Use dev target
- Don't ignore freshness - Monitor source data
- 不要跳过暂存层 - 直接从原始数据到集市表会导致技术债务
- 不要硬编码日期 - 使用变量
{{ var('start_date') }} - 不要重复编写逻辑 - 将重复逻辑提取为宏
- 不要在生产环境测试 - 使用开发环境目标
- 不要忽略数据新鲜度 - 监控源数据的新鲜度