dbt-transformation-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

dbt Transformation Patterns

dbt 数据转换模式

Production-ready patterns for dbt (data build tool) including model organization, testing strategies, documentation, and incremental processing.
可用于生产环境的dbt(data build tool)模式,包括模型组织、测试策略、文档编写和增量处理。

When to Use This Skill

何时使用该技能

  • Building data transformation pipelines with dbt
  • Organizing models into staging, intermediate, and marts layers
  • Implementing data quality tests
  • Creating incremental models for large datasets
  • Documenting data models and lineage
  • Setting up dbt project structure
  • 使用dbt构建数据转换管道
  • 将模型组织为staging(暂存)、intermediate(中间)和marts(集市)层
  • 实施数据质量测试
  • 为大型数据集创建增量模型
  • 为数据模型和血缘关系编写文档
  • 搭建dbt项目结构

Core Concepts

核心概念

1. Model Layers (Medallion Architecture)

1. 模型层(Medallion架构)

sources/          Raw data definitions
staging/          1:1 with source, light cleaning
intermediate/     Business logic, joins, aggregations
marts/            Final analytics tables
sources/          原始数据定义
staging/          与源数据1:1映射,轻量清洗
intermediate/     业务逻辑、关联、聚合
marts/            最终分析表

2. Naming Conventions

2. 命名规范

LayerPrefixExample
Staging
stg_
stg_stripe__payments
Intermediate
int_
int_payments_pivoted
Marts
dim_
,
fct_
dim_customers
,
fct_orders
层级前缀示例
Staging
stg_
stg_stripe__payments
Intermediate
int_
int_payments_pivoted
Marts
dim_
,
fct_
dim_customers
,
fct_orders

Quick Start

快速开始

yaml
undefined
yaml
undefined

dbt_project.yml

dbt_project.yml

name: "analytics" version: "1.0.0" profile: "analytics"
model-paths: ["models"] analysis-paths: ["analyses"] test-paths: ["tests"] seed-paths: ["seeds"] macro-paths: ["macros"]
vars: start_date: "2020-01-01"
models: analytics: staging: +materialized: view +schema: staging intermediate: +materialized: ephemeral marts: +materialized: table +schema: analytics
undefined
name: "analytics" version: "1.0.0" profile: "analytics"
model-paths: ["models"] analysis-paths: ["analyses"] test-paths: ["tests"] seed-paths: ["seeds"] macro-paths: ["macros"]
vars: start_date: "2020-01-01"
models: analytics: staging: +materialized: view +schema: staging intermediate: +materialized: ephemeral marts: +materialized: table +schema: analytics
undefined

Project structure

项目结构

models/ ├── staging/ │ ├── stripe/ │ │ ├── _stripe__sources.yml │ │ ├── _stripe__models.yml │ │ ├── stg_stripe__customers.sql │ │ └── stg_stripe__payments.sql │ └── shopify/ │ ├── _shopify__sources.yml │ └── stg_shopify__orders.sql ├── intermediate/ │ └── finance/ │ └── int_payments_pivoted.sql └── marts/ ├── core/ │ ├── _core__models.yml │ ├── dim_customers.sql │ └── fct_orders.sql └── finance/ └── fct_revenue.sql
undefined
models/ ├── staging/ │ ├── stripe/ │ │ ├── _stripe__sources.yml │ │ ├── _stripe__models.yml │ │ ├── stg_stripe__customers.sql │ │ └── stg_stripe__payments.sql │ └── shopify/ │ ├── _shopify__sources.yml │ └── stg_shopify__orders.sql ├── intermediate/ │ └── finance/ │ └── int_payments_pivoted.sql └── marts/ ├── core/ │ ├── _core__models.yml │ ├── dim_customers.sql │ └── fct_orders.sql └── finance/ └── fct_revenue.sql
undefined

Patterns

模式示例

Pattern 1: Source Definitions

模式1:源数据定义

yaml
undefined
yaml
undefined

models/staging/stripe/_stripe__sources.yml

models/staging/stripe/_stripe__sources.yml

version: 2
sources:
  • name: stripe description: Raw Stripe data loaded via Fivetran database: raw schema: stripe loader: fivetran loaded_at_field: _fivetran_synced freshness: warn_after: { count: 12, period: hour } error_after: { count: 24, period: hour } tables:
    • name: customers description: Stripe customer records columns:
      • name: id description: Primary key tests:
        • unique
        • not_null
      • name: email description: Customer email
      • name: created description: Account creation timestamp
    • name: payments description: Stripe payment transactions columns:
      • name: id tests:
        • unique
        • not_null
      • name: customer_id tests:
        • not_null
        • relationships: to: source('stripe', 'customers') field: id
undefined
version: 2
sources:
  • name: stripe description: 通过Fivetran加载的原始Stripe数据 database: raw schema: stripe loader: fivetran loaded_at_field: _fivetran_synced freshness: warn_after: { count: 12, period: hour } error_after: { count: 24, period: hour } tables:
    • name: customers description: Stripe客户记录 columns:
      • name: id description: 主键 tests:
        • unique
        • not_null
      • name: email description: 客户邮箱
      • name: created description: 账户创建时间戳
    • name: payments description: Stripe支付交易记录 columns:
      • name: id tests:
        • unique
        • not_null
      • name: customer_id tests:
        • not_null
        • relationships: to: source('stripe', 'customers') field: id
undefined

Pattern 2: Staging Models

模式2:暂存模型

sql
-- models/staging/stripe/stg_stripe__customers.sql
with source as (
    select * from {{ source('stripe', 'customers') }}
),

renamed as (
    select
        -- ids
        id as customer_id,

        -- strings
        lower(email) as email,
        name as customer_name,

        -- timestamps
        created as created_at,

        -- metadata
        _fivetran_synced as _loaded_at

    from source
)

select * from renamed
sql
-- models/staging/stripe/stg_stripe__payments.sql
{{
    config(
        materialized='incremental',
        unique_key='payment_id',
        on_schema_change='append_new_columns'
    )
}}

with source as (
    select * from {{ source('stripe', 'payments') }}

    {% if is_incremental() %}
    where _fivetran_synced > (select max(_loaded_at) from {{ this }})
    {% endif %}
),

renamed as (
    select
        -- ids
        id as payment_id,
        customer_id,
        invoice_id,

        -- amounts (convert cents to dollars)
        amount / 100.0 as amount,
        amount_refunded / 100.0 as amount_refunded,

        -- status
        status as payment_status,

        -- timestamps
        created as created_at,

        -- metadata
        _fivetran_synced as _loaded_at

    from source
)

select * from renamed
sql
-- models/staging/stripe/stg_stripe__customers.sql
with source as (
    select * from {{ source('stripe', 'customers') }}
),

renamed as (
    select
        -- 标识符
        id as customer_id,

        -- 字符串字段
        lower(email) as email,
        name as customer_name,

        -- 时间戳
        created as created_at,

        -- 元数据
        _fivetran_synced as _loaded_at

    from source
)

select * from renamed
sql
-- models/staging/stripe/stg_stripe__payments.sql
{{
    config(
        materialized='incremental',
        unique_key='payment_id',
        on_schema_change='append_new_columns'
    )
}}

with source as (
    select * from {{ source('stripe', 'payments') }}

    {% if is_incremental() %}
    where _fivetran_synced > (select max(_loaded_at) from {{ this }})
    {% endif %}
),

renamed as (
    select
        -- 标识符
        id as payment_id,
        customer_id,
        invoice_id,

        -- 金额(转换为美元,原单位为美分)
        amount / 100.0 as amount,
        amount_refunded / 100.0 as amount_refunded,

        -- 状态
        status as payment_status,

        -- 时间戳
        created as created_at,

        -- 元数据
        _fivetran_synced as _loaded_at

    from source
)

select * from renamed

Pattern 3: Intermediate Models

模式3:中间模型

sql
-- models/intermediate/finance/int_payments_pivoted_to_customer.sql
with payments as (
    select * from {{ ref('stg_stripe__payments') }}
),

customers as (
    select * from {{ ref('stg_stripe__customers') }}
),

payment_summary as (
    select
        customer_id,
        count(*) as total_payments,
        count(case when payment_status = 'succeeded' then 1 end) as successful_payments,
        sum(case when payment_status = 'succeeded' then amount else 0 end) as total_amount_paid,
        min(created_at) as first_payment_at,
        max(created_at) as last_payment_at
    from payments
    group by customer_id
)

select
    customers.customer_id,
    customers.email,
    customers.created_at as customer_created_at,
    coalesce(payment_summary.total_payments, 0) as total_payments,
    coalesce(payment_summary.successful_payments, 0) as successful_payments,
    coalesce(payment_summary.total_amount_paid, 0) as lifetime_value,
    payment_summary.first_payment_at,
    payment_summary.last_payment_at

from customers
left join payment_summary using (customer_id)
sql
-- models/intermediate/finance/int_payments_pivoted_to_customer.sql
with payments as (
    select * from {{ ref('stg_stripe__payments') }}
),

customers as (
    select * from {{ ref('stg_stripe__customers') }}
),

payment_summary as (
    select
        customer_id,
        count(*) as total_payments,
        count(case when payment_status = 'succeeded' then 1 end) as successful_payments,
        sum(case when payment_status = 'succeeded' then amount else 0 end) as total_amount_paid,
        min(created_at) as first_payment_at,
        max(created_at) as last_payment_at
    from payments
    group by customer_id
)

select
    customers.customer_id,
    customers.email,
    customers.created_at as customer_created_at,
    coalesce(payment_summary.total_payments, 0) as total_payments,
    coalesce(payment_summary.successful_payments, 0) as successful_payments,
    coalesce(payment_summary.total_amount_paid, 0) as lifetime_value,
    payment_summary.first_payment_at,
    payment_summary.last_payment_at

from customers
left join payment_summary using (customer_id)

Pattern 4: Mart Models (Dimensions and Facts)

模式4:集市模型(维度表和事实表)

sql
-- models/marts/core/dim_customers.sql
{{
    config(
        materialized='table',
        unique_key='customer_id'
    )
}}

with customers as (
    select * from {{ ref('int_payments_pivoted_to_customer') }}
),

orders as (
    select * from {{ ref('stg_shopify__orders') }}
),

order_summary as (
    select
        customer_id,
        count(*) as total_orders,
        sum(total_price) as total_order_value,
        min(created_at) as first_order_at,
        max(created_at) as last_order_at
    from orders
    group by customer_id
),

final as (
    select
        -- surrogate key
        {{ dbt_utils.generate_surrogate_key(['customers.customer_id']) }} as customer_key,

        -- natural key
        customers.customer_id,

        -- attributes
        customers.email,
        customers.customer_created_at,

        -- payment metrics
        customers.total_payments,
        customers.successful_payments,
        customers.lifetime_value,
        customers.first_payment_at,
        customers.last_payment_at,

        -- order metrics
        coalesce(order_summary.total_orders, 0) as total_orders,
        coalesce(order_summary.total_order_value, 0) as total_order_value,
        order_summary.first_order_at,
        order_summary.last_order_at,

        -- calculated fields
        case
            when customers.lifetime_value >= 1000 then 'high'
            when customers.lifetime_value >= 100 then 'medium'
            else 'low'
        end as customer_tier,

        -- timestamps
        current_timestamp as _loaded_at

    from customers
    left join order_summary using (customer_id)
)

select * from final
sql
-- models/marts/core/fct_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        incremental_strategy='merge'
    )
}}

with orders as (
    select * from {{ ref('stg_shopify__orders') }}

    {% if is_incremental() %}
    where updated_at > (select max(updated_at) from {{ this }})
    {% endif %}
),

customers as (
    select * from {{ ref('dim_customers') }}
),

final as (
    select
        -- keys
        orders.order_id,
        customers.customer_key,
        orders.customer_id,

        -- dimensions
        orders.order_status,
        orders.fulfillment_status,
        orders.payment_status,

        -- measures
        orders.subtotal,
        orders.tax,
        orders.shipping,
        orders.total_price,
        orders.total_discount,
        orders.item_count,

        -- timestamps
        orders.created_at,
        orders.updated_at,
        orders.fulfilled_at,

        -- metadata
        current_timestamp as _loaded_at

    from orders
    left join customers on orders.customer_id = customers.customer_id
)

select * from final
sql
-- models/marts/core/dim_customers.sql
{{
    config(
        materialized='table',
        unique_key='customer_id'
    )
}}

with customers as (
    select * from {{ ref('int_payments_pivoted_to_customer') }}
),

orders as (
    select * from {{ ref('stg_shopify__orders') }}
),

order_summary as (
    select
        customer_id,
        count(*) as total_orders,
        sum(total_price) as total_order_value,
        min(created_at) as first_order_at,
        max(created_at) as last_order_at
    from orders
    group by customer_id
),

final as (
    select
        -- 代理键
        {{ dbt_utils.generate_surrogate_key(['customers.customer_id']) }} as customer_key,

        -- 自然键
        customers.customer_id,

        -- 属性字段
        customers.email,
        customers.customer_created_at,

        -- 支付指标
        customers.total_payments,
        customers.successful_payments,
        customers.lifetime_value,
        customers.first_payment_at,
        customers.last_payment_at,

        -- 订单指标
        coalesce(order_summary.total_orders, 0) as total_orders,
        coalesce(order_summary.total_order_value, 0) as total_order_value,
        order_summary.first_order_at,
        order_summary.last_order_at,

        -- 计算字段
        case
            when customers.lifetime_value >= 1000 then 'high'
            when customers.lifetime_value >= 100 then 'medium'
            else 'low'
        end as customer_tier,

        -- 时间戳
        current_timestamp as _loaded_at

    from customers
    left join order_summary using (customer_id)
)

select * from final
sql
-- models/marts/core/fct_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        incremental_strategy='merge'
    )
}}

with orders as (
    select * from {{ ref('stg_shopify__orders') }}

    {% if is_incremental() %}
    where updated_at > (select max(updated_at) from {{ this }})
    {% endif %}
),

customers as (
    select * from {{ ref('dim_customers') }}
),

final as (
    select
        -- 键值
        orders.order_id,
        customers.customer_key,
        orders.customer_id,

        -- 维度字段
        orders.order_status,
        orders.fulfillment_status,
        orders.payment_status,

        -- 度量字段
        orders.subtotal,
        orders.tax,
        orders.shipping,
        orders.total_price,
        orders.total_discount,
        orders.item_count,

        -- 时间戳
        orders.created_at,
        orders.updated_at,
        orders.fulfilled_at,

        -- 元数据
        current_timestamp as _loaded_at

    from orders
    left join customers on orders.customer_id = customers.customer_id
)

select * from final

Pattern 5: Testing and Documentation

模式5:测试与文档

yaml
undefined
yaml
undefined

models/marts/core/_core__models.yml

models/marts/core/_core__models.yml

version: 2
models:
  • name: dim_customers description: Customer dimension with payment and order metrics columns:
    • name: customer_key description: Surrogate key for the customer dimension tests:
      • unique
      • not_null
    • name: customer_id description: Natural key from source system tests:
      • unique
      • not_null
    • name: email description: Customer email address tests:
      • not_null
    • name: customer_tier description: Customer value tier based on lifetime value tests:
      • accepted_values: values: ["high", "medium", "low"]
    • name: lifetime_value description: Total amount paid by customer tests:
      • dbt_utils.expression_is_true: expression: ">= 0"
  • name: fct_orders description: Order fact table with all order transactions tests:
    • dbt_utils.recency: datepart: day field: created_at interval: 1 columns:
    • name: order_id tests:
      • unique
      • not_null
    • name: customer_key tests:
      • not_null
      • relationships: to: ref('dim_customers') field: customer_key
undefined
version: 2
models:
  • name: dim_customers description: 包含支付和订单指标的客户维度表 columns:
    • name: customer_key description: 客户维度表的代理键 tests:
      • unique
      • not_null
    • name: customer_id description: 源系统的自然键 tests:
      • unique
      • not_null
    • name: email description: 客户邮箱地址 tests:
      • not_null
    • name: customer_tier description: 基于客户生命周期价值划分的客户层级 tests:
      • accepted_values: values: ["high", "medium", "low"]
    • name: lifetime_value description: 客户累计支付金额 tests:
      • dbt_utils.expression_is_true: expression: ">= 0"
  • name: fct_orders description: 包含所有订单交易记录的订单事实表 tests:
    • dbt_utils.recency: datepart: day field: created_at interval: 1 columns:
    • name: order_id tests:
      • unique
      • not_null
    • name: customer_key tests:
      • not_null
      • relationships: to: ref('dim_customers') field: customer_key
undefined

Pattern 6: Macros and DRY Code

模式6:宏与DRY代码(避免重复)

sql
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
    round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}

-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) %}
    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}
        {{ default_schema }}
    {%- else -%}
        {{ default_schema }}_{{ custom_schema_name }}
    {%- endif -%}
{% endmacro %}

-- macros/limit_data_in_dev.sql
{% macro limit_data_in_dev(column_name, days=3) %}
    {% if target.name == 'dev' %}
        where {{ column_name }} >= dateadd(day, -{{ days }}, current_date)
    {% endif %}
{% endmacro %}

-- Usage in model
select * from {{ ref('stg_orders') }}
{{ limit_data_in_dev('created_at') }}
sql
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
    round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}

-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) %}
    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}
        {{ default_schema }}
    {%- else -%}
        {{ default_schema }}_{{ custom_schema_name }}
    {%- endif -%}
{% endmacro %}

-- macros/limit_data_in_dev.sql
{% macro limit_data_in_dev(column_name, days=3) %}
    {% if target.name == 'dev' %}
        where {{ column_name }} >= dateadd(day, -{{ days }}, current_date)
    {% endif %}
{% endmacro %}

-- 在模型中使用
select * from {{ ref('stg_orders') }}
{{ limit_data_in_dev('created_at') }}

Pattern 7: Incremental Strategies

模式7:增量策略

sql
-- Delete+Insert (default for most warehouses)
{{
    config(
        materialized='incremental',
        unique_key='id',
        incremental_strategy='delete+insert'
    )
}}

-- Merge (best for late-arriving data)
{{
    config(
        materialized='incremental',
        unique_key='id',
        incremental_strategy='merge',
        merge_update_columns=['status', 'amount', 'updated_at']
    )
}}

-- Insert Overwrite (partition-based)
{{
    config(
        materialized='incremental',
        incremental_strategy='insert_overwrite',
        partition_by={
            "field": "created_date",
            "data_type": "date",
            "granularity": "day"
        }
    )
}}

select
    *,
    date(created_at) as created_date
from {{ ref('stg_events') }}

{% if is_incremental() %}
where created_date >= dateadd(day, -3, current_date)
{% endif %}
sql
-- 删除+插入(多数数仓的默认策略)
{{
    config(
        materialized='incremental',
        unique_key='id',
        incremental_strategy='delete+insert'
    )
}}

-- 合并(适用于延迟到达的数据)
{{
    config(
        materialized='incremental',
        unique_key='id',
        incremental_strategy='merge',
        merge_update_columns=['status', 'amount', 'updated_at']
    )
}}

-- 插入覆盖(基于分区的策略)
{{
    config(
        materialized='incremental',
        incremental_strategy='insert_overwrite',
        partition_by={
            "field": "created_date",
            "data_type": "date",
            "granularity": "day"
        }
    )
}}

select
    *,
    date(created_at) as created_date
from {{ ref('stg_events') }}

{% if is_incremental() %}
where created_date >= dateadd(day, -3, current_date)
{% endif %}

dbt Commands

dbt 命令

bash
undefined
bash
undefined

Development

开发阶段

dbt run # Run all models dbt run --select staging # Run staging models only dbt run --select +fct_orders # Run fct_orders and its upstream dbt run --select fct_orders+ # Run fct_orders and its downstream dbt run --full-refresh # Rebuild incremental models
dbt run # 运行所有模型 dbt run --select staging # 仅运行暂存层模型 dbt run --select +fct_orders # 运行fct_orders及其上游依赖模型 dbt run --select fct_orders+ # 运行fct_orders及其下游依赖模型 dbt run --full-refresh # 重建增量模型

Testing

测试阶段

dbt test # Run all tests dbt test --select stg_stripe # Test specific models dbt build # Run + test in DAG order
dbt test # 运行所有测试 dbt test --select stg_stripe # 测试指定模型 dbt build # 按DAG顺序运行模型并执行测试

Documentation

文档生成

dbt docs generate # Generate docs dbt docs serve # Serve docs locally
dbt docs generate # 生成文档 dbt docs serve # 本地启动文档服务

Debugging

调试阶段

dbt compile # Compile SQL without running dbt debug # Test connection dbt ls --select tag:critical # List models by tag
undefined
dbt compile # 编译SQL但不执行 dbt debug # 测试数据库连接 dbt ls --select tag:critical # 按标签列出模型
undefined

Best Practices

最佳实践

Do's

建议

  • Use staging layer - Clean data once, use everywhere
  • Test aggressively - Not null, unique, relationships
  • Document everything - Column descriptions, model descriptions
  • Use incremental - For tables > 1M rows
  • Version control - dbt project in Git
  • 使用暂存层 - 一次性清洗数据,多处复用
  • 充分测试 - 非空、唯一性、关联关系等测试
  • 全面文档 - 字段描述、模型描述
  • 使用增量模型 - 适用于数据量超过100万行的表
  • 版本控制 - 将dbt项目纳入Git管理

Don'ts

禁忌

  • Don't skip staging - Raw → mart is tech debt
  • Don't hardcode dates - Use
    {{ var('start_date') }}
  • Don't repeat logic - Extract to macros
  • Don't test in prod - Use dev target
  • Don't ignore freshness - Monitor source data
  • 不要跳过暂存层 - 直接从原始数据到集市表会导致技术债务
  • 不要硬编码日期 - 使用
    {{ var('start_date') }}
    变量
  • 不要重复编写逻辑 - 将重复逻辑提取为宏
  • 不要在生产环境测试 - 使用开发环境目标
  • 不要忽略数据新鲜度 - 监控源数据的新鲜度

Resources

参考资源