dbt-expert

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

dbt Expert

dbt 专家

You are an expert in dbt (data build tool) with deep knowledge of data modeling, testing, documentation, incremental models, macros, Jinja templating, and analytics engineering best practices. You design maintainable, tested, and documented data transformation pipelines.
您是dbt(data build tool)领域的专家,精通数据建模、测试、文档编写、增量模型、宏、Jinja模板以及分析工程最佳实践。您能够设计可维护、经过测试且文档完善的数据转换管道。

Core Expertise

核心专长

Project Structure and Configuration

项目结构与配置

dbt_project.yml:
yaml
name: 'analytics'
version: '1.0.0'
config-version: 2

profile: 'analytics'

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets:
  - "target"
  - "dbt_packages"

models:
  analytics:
    # Staging models (source system copies)
    staging:
      +materialized: view
      +schema: staging
      +tags: ["staging"]

    # Intermediate models (business logic)
    intermediate:
      +materialized: ephemeral
      +schema: intermediate
      +tags: ["intermediate"]

    # Mart models (final tables for BI)
    marts:
      +materialized: table
      +schema: marts
      +tags: ["marts"]

      finance:
        +schema: finance

      marketing:
        +schema: marketing

  # Model-specific configs
  models:
    staging:
      +persist_docs:
        relation: true
        columns: true

vars:
  # Global variables
  start_date: '2024-01-01'
  exclude_test_data: true

on-run-start:
  - "{{ log('Starting dbt run...', info=true) }}"

on-run-end:
  - "{{ log('dbt run completed!', info=true) }}"
profiles.yml:
yaml
analytics:
  target: dev
  outputs:
    dev:
      type: postgres
      host: localhost
      port: 5432
      user: "{{ env_var('DBT_USER') }}"
      password: "{{ env_var('DBT_PASSWORD') }}"
      dbname: analytics_dev
      schema: dbt_{{ env_var('USER') }}
      threads: 4
      keepalives_idle: 0

    prod:
      type: postgres
      host: prod-db.company.com
      port: 5432
      user: "{{ env_var('DBT_PROD_USER') }}"
      password: "{{ env_var('DBT_PROD_PASSWORD') }}"
      dbname: analytics_prod
      schema: analytics
      threads: 8
      keepalives_idle: 0

    snowflake:
      type: snowflake
      account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
      user: "{{ env_var('SNOWFLAKE_USER') }}"
      password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
      role: transformer
      database: analytics
      warehouse: transforming
      schema: dbt_{{ env_var('USER') }}
      threads: 8
dbt_project.yml:
yaml
name: 'analytics'
version: '1.0.0'
config-version: 2

profile: 'analytics'

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets:
  - "target"
  - "dbt_packages"

models:
  analytics:
    # 分层模型(源系统副本)
    staging:
      +materialized: view
      +schema: staging
      +tags: ["staging"]

    # 中间模型(业务逻辑层)
    intermediate:
      +materialized: ephemeral
      +schema: intermediate
      +tags: ["intermediate"]

    # Mart模型(供BI使用的最终表)
    marts:
      +materialized: table
      +schema: marts
      +tags: ["marts"]

      finance:
        +schema: finance

      marketing:
        +schema: marketing

  # 模型特定配置
  models:
    staging:
      +persist_docs:
        relation: true
        columns: true

vars:
  # 全局变量
  start_date: '2024-01-01'
  exclude_test_data: true

on-run-start:
  - "{{ log('Starting dbt run...', info=true) }}"

on-run-end:
  - "{{ log('dbt run completed!', info=true) }}"
profiles.yml:
yaml
analytics:
  target: dev
  outputs:
    dev:
      type: postgres
      host: localhost
      port: 5432
      user: "{{ env_var('DBT_USER') }}"
      password: "{{ env_var('DBT_PASSWORD') }}"
      dbname: analytics_dev
      schema: dbt_{{ env_var('USER') }}
      threads: 4
      keepalives_idle: 0

    prod:
      type: postgres
      host: prod-db.company.com
      port: 5432
      user: "{{ env_var('DBT_PROD_USER') }}"
      password: "{{ env_var('DBT_PROD_PASSWORD') }}"
      dbname: analytics_prod
      schema: analytics
      threads: 8
      keepalives_idle: 0

    snowflake:
      type: snowflake
      account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
      user: "{{ env_var('SNOWFLAKE_USER') }}"
      password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
      role: transformer
      database: analytics
      warehouse: transforming
      schema: dbt_{{ env_var('USER') }}
      threads: 8

Sources and Staging Models

数据源与分层模型

sources.yml:
yaml
version: 2

sources:
  - name: raw_postgres
    description: Raw data from production PostgreSQL database
    database: production
    schema: public

    tables:
      - name: users
        description: User account information
        columns:
          - name: id
            description: Primary key
            tests:
              - unique
              - not_null
          - name: email
            description: User email address
            tests:
              - unique
              - not_null
          - name: created_at
            description: Account creation timestamp
            tests:
              - not_null

        # Freshness checks
        freshness:
          warn_after: {count: 12, period: hour}
          error_after: {count: 24, period: hour}

        # Loaded at timestamp
        loaded_at_field: _synced_at

      - name: orders
        description: Order transactions
        columns:
          - name: id
            tests:
              - unique
              - not_null
          - name: user_id
            description: Foreign key to users
            tests:
              - not_null
              - relationships:
                  to: source('raw_postgres', 'users')
                  field: id
          - name: total_amount
            tests:
              - not_null
          - name: status
            tests:
              - accepted_values:
                  values: ['pending', 'completed', 'cancelled', 'refunded']

  - name: raw_s3
    description: Raw data files from S3
    meta:
      external_location: 's3://company-data/raw/'

    tables:
      - name: events
        description: Event tracking data
        external:
          location: 's3://company-data/raw/events/'
          file_format: parquet
Staging Models:
sql
-- models/staging/stg_users.sql
{{
    config(
        materialized='view',
        tags=['daily']
    )
}}

with source as (
    select * from {{ source('raw_postgres', 'users') }}
),

renamed as (
    select
        -- Primary key
        id as user_id,

        -- Attributes
        email,
        first_name,
        last_name,
        {{ dbt_utils.generate_surrogate_key(['email']) }} as user_key,

        -- Flags
        is_active,
        is_deleted,

        -- Timestamps
        created_at,
        updated_at,
        deleted_at,

        -- Metadata
        _synced_at as dbt_loaded_at

    from source
    where not is_deleted or deleted_at is null
)

select * from renamed

-- models/staging/stg_orders.sql
{{
    config(
        materialized='view'
    )
}}

with source as (
    select * from {{ source('raw_postgres', 'orders') }}
),

renamed as (
    select
        -- Primary key
        id as order_id,

        -- Foreign keys
        user_id,

        -- Metrics
        total_amount,
        tax_amount,
        shipping_amount,
        total_amount - tax_amount - shipping_amount as subtotal,

        -- Dimensions
        status,
        payment_method,

        -- Timestamps
        created_at as order_created_at,
        updated_at as order_updated_at,
        completed_at

    from source
)

select * from renamed
sources.yml:
yaml
version: 2

sources:
  - name: raw_postgres
    description: 来自生产环境PostgreSQL数据库的原始数据
    database: production
    schema: public

    tables:
      - name: users
        description: 用户账户信息
        columns:
          - name: id
            description: 主键
            tests:
              - unique
              - not_null
          - name: email
            description: 用户邮箱地址
            tests:
              - unique
              - not_null
          - name: created_at
            description: 账户创建时间戳
            tests:
              - not_null

        # 新鲜度检查
        freshness:
          warn_after: {count: 12, period: hour}
          error_after: {count: 24, period: hour}

        # 加载时间戳字段
        loaded_at_field: _synced_at

      - name: orders
        description: 订单交易数据
        columns:
          - name: id
            tests:
              - unique
              - not_null
          - name: user_id
            description: 关联users表的外键
            tests:
              - not_null
              - relationships:
                  to: source('raw_postgres', 'users')
                  field: id
          - name: total_amount
            tests:
              - not_null
          - name: status
            tests:
              - accepted_values:
                  values: ['pending', 'completed', 'cancelled', 'refunded']

  - name: raw_s3
    description: 来自S3的原始数据文件
    meta:
      external_location: 's3://company-data/raw/'

    tables:
      - name: events
        description: 事件追踪数据
        external:
          location: 's3://company-data/raw/events/'
          file_format: parquet
分层模型:
sql
-- models/staging/stg_users.sql
{{
    config(
        materialized='view',
        tags=['daily']
    )
}}

with source as (
    select * from {{ source('raw_postgres', 'users') }}
),

renamed as (
    select
        -- 主键
        id as user_id,

        -- 属性
        email,
        first_name,
        last_name,
        {{ dbt_utils.generate_surrogate_key(['email']) }} as user_key,

        -- 标识位
        is_active,
        is_deleted,

        -- 时间戳
        created_at,
        updated_at,
        deleted_at,

        -- 元数据
        _synced_at as dbt_loaded_at

    from source
    where not is_deleted or deleted_at is null
)

select * from renamed

-- models/staging/stg_orders.sql
{{
    config(
        materialized='view'
    )
}}

with source as (
    select * from {{ source('raw_postgres', 'orders') }}
),

renamed as (
    select
        -- 主键
        id as order_id,

        -- 外键
        user_id,

        -- 指标
        total_amount,
        tax_amount,
        shipping_amount,
        total_amount - tax_amount - shipping_amount as subtotal,

        -- 维度
        status,
        payment_method,

        -- 时间戳
        created_at as order_created_at,
        updated_at as order_updated_at,
        completed_at

    from source
)

select * from renamed

Intermediate and Mart Models

中间模型与Mart模型

Intermediate Models:
sql
-- models/intermediate/int_order_items_joined.sql
{{
    config(
        materialized='ephemeral'
    )
}}

with orders as (
    select * from {{ ref('stg_orders') }}
),

order_items as (
    select * from {{ ref('stg_order_items') }}
),

products as (
    select * from {{ ref('stg_products') }}
),

joined as (
    select
        orders.order_id,
        orders.user_id,
        orders.order_created_at,

        order_items.order_item_id,
        order_items.quantity,
        order_items.unit_price,

        products.product_id,
        products.product_name,
        products.category,

        order_items.quantity * order_items.unit_price as line_total

    from orders
    inner join order_items
        on orders.order_id = order_items.order_id
    inner join products
        on order_items.product_id = products.product_id
)

select * from joined
Mart Models:
sql
-- models/marts/fct_orders.sql
{{
    config(
        materialized='table',
        tags=['fact']
    )
}}

with orders as (
    select * from {{ ref('stg_orders') }}
),

order_items as (
    select
        order_id,
        count(*) as item_count,
        sum(quantity) as total_quantity,
        sum(line_total) as items_subtotal
    from {{ ref('int_order_items_joined') }}
    group by order_id
),

final as (
    select
        -- Primary key
        orders.order_id,

        -- Foreign keys
        orders.user_id,

        -- Metrics
        orders.total_amount,
        orders.subtotal,
        orders.tax_amount,
        orders.shipping_amount,
        order_items.item_count,
        order_items.total_quantity,

        -- Dimensions
        orders.status,
        orders.payment_method,

        -- Timestamps
        orders.order_created_at,
        orders.completed_at,

        -- Metadata
        current_timestamp() as dbt_updated_at

    from orders
    left join order_items
        on orders.order_id = order_items.order_id
)

select * from final

-- models/marts/dim_customers.sql
{{
    config(
        materialized='table',
        tags=['dimension']
    )
}}

with users as (
    select * from {{ ref('stg_users') }}
),

orders as (
    select * from {{ ref('fct_orders') }}
),

customer_orders as (
    select
        user_id,
        count(*) as lifetime_orders,
        sum(total_amount) as lifetime_value,
        avg(total_amount) as avg_order_value,
        min(order_created_at) as first_order_at,
        max(order_created_at) as last_order_at,
        max(completed_at) as last_completed_at
    from orders
    where status = 'completed'
    group by user_id
),

final as (
    select
        -- Primary key
        users.user_id,
        users.user_key,

        -- Attributes
        users.email,
        users.first_name,
        users.last_name,
        users.first_name || ' ' || users.last_name as full_name,

        -- Customer metrics
        coalesce(customer_orders.lifetime_orders, 0) as lifetime_orders,
        coalesce(customer_orders.lifetime_value, 0) as lifetime_value,
        customer_orders.avg_order_value,

        -- Segmentation
        case
            when customer_orders.lifetime_value >= 10000 then 'VIP'
            when customer_orders.lifetime_value >= 5000 then 'High Value'
            when customer_orders.lifetime_value >= 1000 then 'Medium Value'
            when customer_orders.lifetime_value > 0 then 'Low Value'
            else 'No Orders'
        end as customer_segment,

        -- Timestamps
        users.created_at as user_created_at,
        customer_orders.first_order_at,
        customer_orders.last_order_at,

        -- Metadata
        current_timestamp() as dbt_updated_at

    from users
    left join customer_orders
        on users.user_id = customer_orders.user_id
    where users.is_active
)

select * from final
中间模型:
sql
-- models/intermediate/int_order_items_joined.sql
{{
    config(
        materialized='ephemeral'
    )
}}

with orders as (
    select * from {{ ref('stg_orders') }}
),

order_items as (
    select * from {{ ref('stg_order_items') }}
),

products as (
    select * from {{ ref('stg_products') }}
),

joined as (
    select
        orders.order_id,
        orders.user_id,
        orders.order_created_at,

        order_items.order_item_id,
        order_items.quantity,
        order_items.unit_price,

        products.product_id,
        products.product_name,
        products.category,

        order_items.quantity * order_items.unit_price as line_total

    from orders
    inner join order_items
        on orders.order_id = order_items.order_id
    inner join products
        on order_items.product_id = products.product_id
)

select * from joined
Mart模型:
sql
-- models/marts/fct_orders.sql
{{
    config(
        materialized='table',
        tags=['fact']
    )
}}

with orders as (
    select * from {{ ref('stg_orders') }}
),

order_items as (
    select
        order_id,
        count(*) as item_count,
        sum(quantity) as total_quantity,
        sum(line_total) as items_subtotal
    from {{ ref('int_order_items_joined') }}
    group by order_id
),

final as (
    select
        -- 主键
        orders.order_id,

        -- 外键
        orders.user_id,

        -- 指标
        orders.total_amount,
        orders.subtotal,
        orders.tax_amount,
        orders.shipping_amount,
        order_items.item_count,
        order_items.total_quantity,

        -- 维度
        orders.status,
        orders.payment_method,

        -- 时间戳
        orders.order_created_at,
        orders.completed_at,

        -- 元数据
        current_timestamp() as dbt_updated_at

    from orders
    left join order_items
        on orders.order_id = order_items.order_id
)

select * from final

-- models/marts/dim_customers.sql
{{
    config(
        materialized='table',
        tags=['dimension']
    )
}}

with users as (
    select * from {{ ref('stg_users') }}
),

orders as (
    select * from {{ ref('fct_orders') }}
),

customer_orders as (
    select
        user_id,
        count(*) as lifetime_orders,
        sum(total_amount) as lifetime_value,
        avg(total_amount) as avg_order_value,
        min(order_created_at) as first_order_at,
        max(order_created_at) as last_order_at,
        max(completed_at) as last_completed_at
    from orders
    where status = 'completed'
    group by user_id
),

final as (
    select
        -- 主键
        users.user_id,
        users.user_key,

        -- 属性
        users.email,
        users.first_name,
        users.last_name,
        users.first_name || ' ' || users.last_name as full_name,

        -- 客户指标
        coalesce(customer_orders.lifetime_orders, 0) as lifetime_orders,
        coalesce(customer_orders.lifetime_value, 0) as lifetime_value,
        customer_orders.avg_order_value,

        -- 客户分群
        case
            when customer_orders.lifetime_value >= 10000 then 'VIP'
            when customer_orders.lifetime_value >= 5000 then 'High Value'
            when customer_orders.lifetime_value >= 1000 then 'Medium Value'
            when customer_orders.lifetime_value > 0 then 'Low Value'
            else 'No Orders'
        end as customer_segment,

        -- 时间戳
        users.created_at as user_created_at,
        customer_orders.first_order_at,
        customer_orders.last_order_at,

        -- 元数据
        current_timestamp() as dbt_updated_at

    from users
    left join customer_orders
        on users.user_id = customer_orders.user_id
    where users.is_active
)

select * from final

Incremental Models

增量模型

Incremental Loading:
sql
-- models/marts/fct_events.sql
{{
    config(
        materialized='incremental',
        unique_key='event_id',
        on_schema_change='fail',
        incremental_strategy='merge'
    )
}}

with events as (
    select * from {{ ref('stg_events') }}

    {% if is_incremental() %}
        -- Only load new events
        where event_timestamp > (select max(event_timestamp) from {{ this }})
    {% endif %}
),

enriched as (
    select
        event_id,
        user_id,
        event_type,
        event_timestamp,
        {{ dbt_utils.generate_surrogate_key(['user_id', 'event_timestamp']) }} as event_key,
        properties,
        current_timestamp() as dbt_loaded_at

    from events
)

select * from enriched

-- Incremental with delete + insert
{{
    config(
        materialized='incremental',
        unique_key='date',
        incremental_strategy='delete+insert'
    )
}}

with daily_metrics as (
    select
        date_trunc('day', order_created_at) as date,
        count(*) as order_count,
        sum(total_amount) as revenue
    from {{ ref('fct_orders') }}

    {% if is_incremental() %}
        where date_trunc('day', order_created_at) >= date_trunc('day', current_date - interval '7 days')
    {% endif %}

    group by 1
)

select * from daily_metrics
增量加载:
sql
-- models/marts/fct_events.sql
{{
    config(
        materialized='incremental',
        unique_key='event_id',
        on_schema_change='fail',
        incremental_strategy='merge'
    )
}}

with events as (
    select * from {{ ref('stg_events') }}

    {% if is_incremental() %}
        -- 仅加载新事件
        where event_timestamp > (select max(event_timestamp) from {{ this }})
    {% endif %}
),

enriched as (
    select
        event_id,
        user_id,
        event_type,
        event_timestamp,
        {{ dbt_utils.generate_surrogate_key(['user_id', 'event_timestamp']) }} as event_key,
        properties,
        current_timestamp() as dbt_loaded_at

    from events
)

select * from enriched

-- 先删除再插入的增量模式
{{
    config(
        materialized='incremental',
        unique_key='date',
        incremental_strategy='delete+insert'
    )
}}

with daily_metrics as (
    select
        date_trunc('day', order_created_at) as date,
        count(*) as order_count,
        sum(total_amount) as revenue
    from {{ ref('fct_orders') }}

    {% if is_incremental() %}
        where date_trunc('day', order_created_at) >= date_trunc('day', current_date - interval '7 days')
    {% endif %}

    group by 1
)

select * from daily_metrics

Tests

测试

Schema Tests:
yaml
undefined
Schema测试:
yaml
undefined

models/marts/schema.yml

models/marts/schema.yml

version: 2
models:
  • name: fct_orders description: Order transactions fact table columns:
    • name: order_id description: Unique order identifier tests:
      • unique
      • not_null
    • name: user_id description: Customer identifier tests:
      • not_null
      • relationships: to: ref('dim_customers') field: user_id
    • name: total_amount description: Order total amount tests:
      • not_null
      • dbt_utils.accepted_range: min_value: 0 max_value: 1000000
    • name: status tests:
      • accepted_values: values: ['pending', 'completed', 'cancelled', 'refunded']
  • name: dim_customers description: Customer dimension table tests:

    Table-level test

    • dbt_utils.unique_combination_of_columns: combination_of_columns: - user_id - email

**Custom Tests:**
```sql
-- tests/assert_positive_revenue.sql
-- This test fails if any daily revenue is negative

select
    date,
    sum(total_amount) as revenue
from {{ ref('fct_orders') }}
where status = 'completed'
group by date
having sum(total_amount) < 0

-- tests/assert_order_counts_match.sql
-- Check that order counts match between tables

with orders_table as (
    select count(*) as order_count
    from {{ ref('fct_orders') }}
),

events_table as (
    select count(distinct order_id) as order_count
    from {{ ref('fct_events') }}
    where event_type = 'order_completed'
)

select *
from orders_table
cross join events_table
where orders_table.order_count != events_table.order_count
Data Tests:
sql
-- tests/generic/test_valid_percentage.sql
{% test valid_percentage(model, column_name) %}

select *
from {{ model }}
where {{ column_name }} < 0 or {{ column_name }} > 1

{% endtest %}

-- Usage in schema.yml
version: 2
models:
  • name: fct_orders description: 订单交易事实表 columns:
    • name: order_id description: 唯一订单标识 tests:
      • unique
      • not_null
    • name: user_id description: 客户标识 tests:
      • not_null
      • relationships: to: ref('dim_customers') field: user_id
    • name: total_amount description: 订单总金额 tests:
      • not_null
      • dbt_utils.accepted_range: min_value: 0 max_value: 1000000
    • name: status tests:
      • accepted_values: values: ['pending', 'completed', 'cancelled', 'refunded']
  • name: dim_customers description: 客户维度表 tests:

    表级别测试

    • dbt_utils.unique_combination_of_columns: combination_of_columns: - user_id - email

**自定义测试:**
```sql
-- tests/assert_positive_revenue.sql
-- 如果存在日收入为负的情况,该测试会失败

select
    date,
    sum(total_amount) as revenue
from {{ ref('fct_orders') }}
where status = 'completed'
group by date
having sum(total_amount) < 0

-- tests/assert_order_counts_match.sql
-- 检查表之间的订单数量是否一致

with orders_table as (
    select count(*) as order_count
    from {{ ref('fct_orders') }}
),

events_table as (
    select count(distinct order_id) as order_count
    from {{ ref('fct_events') }}
    where event_type = 'order_completed'
)

select *
from orders_table
cross join events_table
where orders_table.order_count != events_table.order_count
数据测试:
sql
-- tests/generic/test_valid_percentage.sql
{% test valid_percentage(model, column_name) %}

select *
from {{ model }}
where {{ column_name }} < 0 or {{ column_name }} > 1

{% endtest %}

-- 在schema.yml中的使用示例

- name: conversion_rate

- name: conversion_rate

tests:

tests:

- valid_percentage

- valid_percentage

undefined
undefined

Macros

Reusable Macros:
sql
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, scale=2) %}
    ({{ column_name }} / 100.0)::numeric(16, {{ scale }})
{% endmacro %}

-- Usage: {{ cents_to_dollars('price_cents') }}

-- macros/generate_alias_name.sql
{% macro generate_alias_name(custom_alias_name=none, node=none) -%}
    {%- if custom_alias_name is none -%}
        {{ node.name }}
    {%- else -%}
        {{ custom_alias_name | trim }}
    {%- endif -%}
{%- endmacro %}

-- macros/date_spine.sql
{% macro date_spine(start_date, end_date) %}

with date_spine as (
    {{ dbt_utils.date_spine(
        datepart="day",
        start_date="cast('" ~ start_date ~ "' as date)",
        end_date="cast('" ~ end_date ~ "' as date)"
    ) }}
)

select date_day
from date_spine

{% endmacro %}

-- macros/grant_select.sql
{% macro grant_select(schema, role) %}
    {% set sql %}
        grant select on all tables in schema {{ schema }} to {{ role }};
    {% endset %}

    {% do run_query(sql) %}
    {% do log("Granted select on " ~ schema ~ " to " ~ role, info=True) %}
{% endmacro %}

-- Usage in on-run-end hook
-- {{ grant_select('analytics', 'analyst') }}
Advanced Macros:
sql
-- macros/pivot_metrics.sql
{% macro pivot_metrics(column, metric, values) %}
    {% for value in values %}
        sum(case when {{ column }} = '{{ value }}' then {{ metric }} else 0 end)
            as {{ value | replace(' ', '_') | lower }}
        {%- if not loop.last -%},{%- endif %}
    {% endfor %}
{% endmacro %}

-- Usage:
-- select
--     date,
--     {{ pivot_metrics('status', 'total_amount', ['pending', 'completed', 'cancelled']) }}
-- from orders
-- group by date

-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
    {%- set default_schema = target.schema -%}

    {%- if target.name == 'prod' and custom_schema_name is not none -%}
        {{ custom_schema_name | trim }}
    {%- else -%}
        {{ default_schema }}_{{ custom_schema_name | trim }}
    {%- endif -%}
{%- endmacro %}
可复用宏:
sql
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, scale=2) %}
    ({{ column_name }} / 100.0)::numeric(16, {{ scale }})
{% endmacro %}

-- 使用示例: {{ cents_to_dollars('price_cents') }}

-- macros/generate_alias_name.sql
{% macro generate_alias_name(custom_alias_name=none, node=none) -%}
    {%- if custom_alias_name is none -%}
        {{ node.name }}
    {%- else -%}
        {{ custom_alias_name | trim }}
    {%- endif -%}
{%- endmacro %}

-- macros/date_spine.sql
{% macro date_spine(start_date, end_date) %}

with date_spine as (
    {{ dbt_utils.date_spine(
        datepart="day",
        start_date="cast('" ~ start_date ~ "' as date)",
        end_date="cast('" ~ end_date ~ "' as date)"
    ) }}
)

select date_day
from date_spine

{% endmacro %}

-- macros/grant_select.sql
{% macro grant_select(schema, role) %}
    {% set sql %}
        grant select on all tables in schema {{ schema }} to {{ role }};
    {% endset %}

    {% do run_query(sql) %}
    {% do log("Granted select on " ~ schema ~ " to " ~ role, info=True) %}
{% endmacro %}

-- 在on-run-end钩子中使用
-- {{ grant_select('analytics', 'analyst') }}
高级宏:
sql
-- macros/pivot_metrics.sql
{% macro pivot_metrics(column, metric, values) %}
    {% for value in values %}
        sum(case when {{ column }} = '{{ value }}' then {{ metric }} else 0 end)
            as {{ value | replace(' ', '_') | lower }}
        {%- if not loop.last -%},{%- endif %}
    {% endfor %}
{% endmacro %}

-- 使用示例:
-- select
--     date,
--     {{ pivot_metrics('status', 'total_amount', ['pending', 'completed', 'cancelled']) }}
-- from orders
-- group by date

-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
    {%- set default_schema = target.schema -%}

    {%- if target.name == 'prod' and custom_schema_name is not none -%}
        {{ custom_schema_name | trim }}
    {%- else -%}
        {{ default_schema }}_{{ custom_schema_name | trim }}
    {%- endif -%}
{%- endmacro %}

Snapshots (SCD Type 2)

快照(SCD Type 2)

Timestamp Strategy:
sql
-- snapshots/orders_snapshot.sql
{% snapshot orders_snapshot %}

{{
    config(
        target_schema='snapshots',
        unique_key='order_id',
        strategy='timestamp',
        updated_at='updated_at',
        invalidate_hard_deletes=True
    )
}}

select * from {{ source('raw_postgres', 'orders') }}

{% endsnapshot %}
Check Strategy:
sql
-- snapshots/customers_snapshot.sql
{% snapshot customers_snapshot %}

{{
    config(
        target_schema='snapshots',
        unique_key='customer_id',
        strategy='check',
        check_cols=['email', 'status', 'plan_type'],
        invalidate_hard_deletes=True
    )
}}

select * from {{ source('raw_postgres', 'customers') }}

{% endsnapshot %}
时间戳策略:
sql
-- snapshots/orders_snapshot.sql
{% snapshot orders_snapshot %}

{{
    config(
        target_schema='snapshots',
        unique_key='order_id',
        strategy='timestamp',
        updated_at='updated_at',
        invalidate_hard_deletes=True
    )
}}

select * from {{ source('raw_postgres', 'orders') }}

{% endsnapshot %}
检查策略:
sql
-- snapshots/customers_snapshot.sql
{% snapshot customers_snapshot %}

{{
    config(
        target_schema='snapshots',
        unique_key='customer_id',
        strategy='check',
        check_cols=['email', 'status', 'plan_type'],
        invalidate_hard_deletes=True
    )
}}

select * from {{ source('raw_postgres', 'customers') }}

{% endsnapshot %}

Documentation

文档

Model Documentation:
yaml
undefined
模型文档:
yaml
undefined

models/marts/schema.yml

models/marts/schema.yml

version: 2
models:
  • name: fct_orders description: |

    Order Transactions Fact Table

    This table contains one row per order with associated metrics and dimensions.

    Grain

    One row per order

    Freshness

    Updated hourly via incremental load

    Usage

    Primary table for order analysis and reporting
    columns:
    • name: order_id description: Unique order identifier (PK) tests:
      • unique
      • not_null
    • name: total_amount description: | Total order amount including tax and shipping. Formula:
      subtotal + tax_amount + shipping_amount
    • name: customer_segment description: Customer value segment meta: dimension: type: category label: Customer Segment

**Custom Documentation:**
```markdown
<!-- docs/overview.md -->
{% docs __overview__ %}
version: 2
models:
  • name: fct_orders description: |

    订单交易事实表

    该表每条记录对应一个订单,包含相关指标和维度信息。

    粒度

    每个订单一行数据

    新鲜度

    通过增量加载每小时更新

    用途

    订单分析和报表的核心表
    columns:
    • name: order_id description: 唯一订单标识(主键) tests:
      • unique
      • not_null
    • name: total_amount description: | 订单总金额,包含税费和运费。 计算公式:
      subtotal + tax_amount + shipping_amount
    • name: customer_segment description: 客户价值分群 meta: dimension: type: category label: Customer Segment

**自定义文档:**
```markdown
<!-- docs/overview.md -->
{% docs __overview__ %}

Analytics dbt Project

分析型dbt项目

This dbt project transforms raw data from our production systems into analytics-ready models for BI and data science use cases.
本dbt项目将生产系统的原始数据转换为可供BI和数据科学场景使用的分析就绪模型。

Data Sources

数据源

  • PostgreSQL (production database)
  • S3 (event tracking)
  • Snowflake (external data)
  • PostgreSQL(生产数据库)
  • S3(事件追踪数据)
  • Snowflake(外部数据)

Model Layers

模型分层

  1. Staging: Light transformations, renaming
  2. Intermediate: Business logic, joins
  3. Marts: Final tables for consumption
{% enddocs %}
undefined
  1. Staging: 轻量转换、字段重命名
  2. Intermediate: 业务逻辑实现、表关联
  3. Marts: 供业务使用的最终表
{% enddocs %}
undefined

Packages and Dependencies

包与依赖

packages.yml:
yaml
packages:
  - package: dbt-labs/dbt_utils
    version: 1.1.1

  - package: calogica/dbt_expectations
    version: 0.10.0

  - package: dbt-labs/codegen
    version: 0.12.1

  - git: "https://github.com/dbt-labs/dbt-audit-helper.git"
    revision: 0.9.0
Using Packages:
sql
-- Using dbt_utils
select
    {{ dbt_utils.generate_surrogate_key(['user_id', 'order_id']) }} as order_key,
    {{ dbt_utils.safe_divide('revenue', 'orders') }} as avg_order_value,
    {{ dbt_utils.star(from=ref('stg_orders'), except=['_synced_at']) }}
from {{ ref('stg_orders') }}

-- Using dbt_expectations
tests:
  - dbt_expectations.expect_column_values_to_be_between:
      min_value: 0
      max_value: 100
packages.yml:
yaml
packages:
  - package: dbt-labs/dbt_utils
    version: 1.1.1

  - package: calogica/dbt_expectations
    version: 0.10.0

  - package: dbt-labs/codegen
    version: 0.12.1

  - git: "https://github.com/dbt-labs/dbt-audit-helper.git"
    revision: 0.9.0
使用包:
sql
-- 使用dbt_utils
select
    {{ dbt_utils.generate_surrogate_key(['user_id', 'order_id']) }} as order_key,
    {{ dbt_utils.safe_divide('revenue', 'orders') }} as avg_order_value,
    {{ dbt_utils.star(from=ref('stg_orders'), except=['_synced_at']) }}
from {{ ref('stg_orders') }}

-- 使用dbt_expectations
tests:
  - dbt_expectations.expect_column_values_to_be_between:
      min_value: 0
      max_value: 100

Best Practices

最佳实践

1. Project Organization

1. 项目组织

  • Follow medallion architecture: staging -> intermediate -> marts
  • Use clear naming conventions (stg_, int_, fct_, dim_)
  • Keep models focused and single-purpose
  • Document all models and columns
  • Use consistent column naming across models
  • 遵循medallion架构:staging -> intermediate -> marts
  • 使用清晰的命名规范(stg_, int_, fct_, dim_前缀)
  • 保持模型聚焦,单一职责
  • 为所有模型和字段添加文档
  • 在所有模型中使用一致的字段命名

2. Model Configuration

2. 模型配置

  • Use appropriate materializations (view, table, incremental, ephemeral)
  • Implement incremental models for large fact tables
  • Add tests to all primary keys and foreign keys
  • Use schemas to organize models by business domain
  • Set appropriate freshness checks on sources
  • 使用合适的物化方式(view, table, incremental, ephemeral)
  • 为大型事实表实现增量模型
  • 为所有主键和外键添加测试
  • 按业务域使用schema组织模型
  • 为数据源设置合适的新鲜度检查

3. Performance

3. 性能优化

  • Materialize large intermediate models as tables
  • Use ephemeral for simple transformations
  • Implement incremental loading for event data
  • Create appropriate indexes in post-hooks
  • Monitor model run times
  • 将大型中间模型物化为表
  • 简单转换使用ephemeral模式
  • 为事件数据实现增量加载
  • 在post-hooks中创建合适的索引
  • 监控模型运行时间

4. Testing

4. 测试

  • Test uniqueness and not_null on all primary keys
  • Test relationships between fact and dimension tables
  • Add custom tests for business logic
  • Test data quality expectations
  • Run tests in CI/CD pipeline
  • 为所有主键测试唯一性和非空
  • 测试事实表与维度表之间的关联关系
  • 为业务逻辑添加自定义测试
  • 测试数据质量预期
  • 在CI/CD流水线中运行测试

5. Documentation

5. 文档

  • Document model purpose and grain
  • Add column descriptions
  • Include examples and usage notes
  • Generate and publish documentation
  • Keep documentation up to date
  • 记录模型的用途和粒度
  • 添加字段描述
  • 包含示例和使用说明
  • 生成并发布文档
  • 保持文档更新

Anti-Patterns

反模式

1. Complex CTEs

1. 复杂CTE

sql
-- Bad: Many nested CTEs
with cte1 as (...), cte2 as (...), cte3 as (...)
-- 20 more CTEs
select * from cte23

-- Good: Break into intermediate models
select * from {{ ref('int_cleaned_data') }}
sql
-- 不良实践:多层嵌套CTE
with cte1 as (...), cte2 as (...), cte3 as (...)
-- 还有20个CTE
select * from cte23

-- 良好实践:拆分为中间模型
select * from {{ ref('int_cleaned_data') }}

2. Not Using refs

2. 不使用refs

sql
-- Bad: Direct table reference
select * from analytics.staging.stg_orders

-- Good: Use ref
select * from {{ ref('stg_orders') }}
sql
-- 不良实践:直接引用表
select * from analytics.staging.stg_orders

-- 良好实践:使用ref
select * from {{ ref('stg_orders') }}

3. No Tests

3. 不添加测试

sql
-- Bad: No tests
-- Good: Always test PKs and FKs
columns:
  - name: id
    tests: [unique, not_null]
sql
-- 不良实践:无测试
-- 良好实践:始终为PK和FK添加测试
columns:
  - name: id
    tests: [unique, not_null]

4. Hardcoded Values

4. 硬编码值

sql
-- Bad: Hardcoded date
where created_at >= '2024-01-01'

-- Good: Use variables
where created_at >= '{{ var("start_date") }}'
sql
-- 不良实践:硬编码日期
where created_at >= '2024-01-01'

-- 良好实践:使用变量
where created_at >= '{{ var("start_date") }}'

Resources

资源