dbt-data-transformation
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
Chinesedbt Data Transformation
dbt 数据转换
A comprehensive skill for mastering dbt (data build tool) for analytics engineering. This skill covers model development, testing strategies, documentation practices, incremental builds, Jinja templating, macro development, package management, and production deployment workflows.
这是一份帮助分析工程师掌握dbt(data build tool,数据构建工具)的全面指南。本指南涵盖模型开发、测试策略、文档实践、增量构建、Jinja模板、宏开发、包管理以及生产部署工作流。
When to Use This Skill
何时使用本指南
Use this skill when:
- Building data transformation pipelines for analytics and business intelligence
- Creating a data warehouse with modular, testable SQL transformations
- Implementing ELT (Extract, Load, Transform) workflows
- Developing dimensional models (facts, dimensions) for analytics
- Managing complex SQL dependencies and data lineage
- Creating reusable data transformation logic across projects
- Testing data quality and implementing data contracts
- Documenting data models and business logic
- Building incremental models for large datasets
- Orchestrating dbt with tools like Airflow, Dagster, or dbt Cloud
- Migrating legacy ETL processes to modern ELT architecture
- Implementing DataOps practices for analytics teams
在以下场景中使用本指南:
- 为分析和商业智能构建数据转换管道
- 创建具备模块化、可测试SQL转换的数据仓库
- 实现ELT(提取、加载、转换)工作流
- 为分析开发维度模型(事实表、维度表)
- 管理复杂的SQL依赖和数据血缘
- 在项目间创建可复用的数据转换逻辑
- 测试数据质量并实现数据契约
- 为数据模型和业务逻辑编写文档
- 为大型数据集构建增量模型
- 使用Airflow、Dagster或dbt Cloud编排dbt
- 将传统ETL流程迁移至现代ELT架构
- 为分析团队实施DataOps实践
Core Concepts
核心概念
What is dbt?
什么是dbt?
dbt (data build tool) enables analytics engineers to transform data in their warehouse more effectively. It's a development framework that brings software engineering best practices to data transformation:
- Version Control: SQL transformations as code in Git
- Testing: Built-in data quality testing framework
- Documentation: Auto-generated, searchable data dictionary
- Modularity: Reusable SQL through refs and macros
- Lineage: Automatic dependency resolution and visualization
- Deployment: CI/CD for data transformations
dbt(data build tool)让分析工程师能够更高效地在数据仓库中转换数据。它是一个将软件工程最佳实践引入数据转换的开发框架:
- 版本控制:SQL转换以代码形式存储在Git中
- 测试:内置的数据质量测试框架
- 文档:自动生成的可搜索数据字典
- 模块化:通过引用(refs)和宏实现可复用SQL
- 血缘关系:自动依赖解析和可视化
- 部署:数据转换的CI/CD流程
The dbt Workflow
dbt工作流
1. Develop: Write SQL SELECT statements as models
2. Test: Define data quality tests
3. Document: Add descriptions and metadata
4. Build: dbt run compiles and executes models
5. Test: dbt test validates data quality
6. Deploy: CI/CD pipelines deploy to production1. 开发:将SQL SELECT语句编写为模型
2. 测试:定义数据质量测试
3. 文档:添加描述和元数据
4. 构建:dbt run编译并执行模型
5. 测试:dbt test验证数据质量
6. 部署:CI/CD管道部署至生产环境Key dbt Entities
dbt关键实体
- Models: SQL SELECT statements that define data transformations
- Sources: Raw data tables in your warehouse
- Seeds: CSV files loaded into your warehouse
- Tests: Data quality assertions
- Macros: Reusable Jinja-SQL functions
- Snapshots: Type 2 slowly changing dimension captures
- Exposures: Downstream uses of dbt models (dashboards, ML models)
- Metrics: Business metric definitions
- 模型:定义数据转换的SQL SELECT语句
- 数据源:数据仓库中的原始数据表
- 种子数据:加载至数据仓库的CSV文件
- 测试:数据质量断言
- 宏:可复用的Jinja-SQL函数
- 快照:Type 2缓慢变化维度捕获
- 暴露对象:dbt模型的下游使用(仪表盘、机器学习模型)
- 指标:业务指标定义
Model Development
模型开发
Basic Model Structure
基础模型结构
A dbt model is a SELECT statement saved as a file:
.sqlsql
-- models/staging/stg_orders.sql
with source as (
select * from {{ source('jaffle_shop', 'orders') }}
),
renamed as (
select
id as order_id,
user_id as customer_id,
order_date,
status,
_etl_loaded_at
from source
)
select * from renamedKey Points:
- Models are SELECT statements only (no DDL)
- Use CTEs (Common Table Expressions) for readability
- Reference sources with
{{ source() }} - dbt handles CREATE/INSERT logic based on materialization
一个dbt模型是保存为文件的SELECT语句:
.sqlsql
-- models/staging/stg_orders.sql
with source as (
select * from {{ source('jaffle_shop', 'orders') }}
),
renamed as (
select
id as order_id,
user_id as customer_id,
order_date,
status,
_etl_loaded_at
from source
)
select * from renamed关键点:
- 模型仅包含SELECT语句(无DDL)
- 使用CTE(公共表表达式)提升可读性
- 使用引用数据源
{{ source() }} - dbt根据物化方式处理CREATE/INSERT逻辑
The ref() Function
ref()函数
Reference other models using :
{{ ref() }}sql
-- models/marts/fct_orders.sql
with orders as (
select * from {{ ref('stg_orders') }}
),
customers as (
select * from {{ ref('stg_customers') }}
),
joined as (
select
orders.order_id,
orders.order_date,
customers.customer_name,
orders.status
from orders
left join customers
on orders.customer_id = customers.customer_id
)
select * from joinedBenefits of ref():
- Builds dependency graph automatically
- Resolves to correct schema/database
- Enables testing in dev without affecting prod
- Powers lineage visualization
使用引用其他模型:
{{ ref() }}sql
-- models/marts/fct_orders.sql
with orders as (
select * from {{ ref('stg_orders') }}
),
customers as (
select * from {{ ref('stg_customers') }}
),
joined as (
select
orders.order_id,
orders.order_date,
customers.customer_name,
orders.status
from orders
left join customers
on orders.customer_id = customers.customer_id
)
select * from joinedref()函数的优势:
- 自动构建依赖图
- 解析为正确的模式/数据库
- 支持在开发环境测试而不影响生产
- 支持血缘关系可视化
The source() Function
source()函数
Define and reference raw data sources:
yaml
undefined定义并引用原始数据源:
yaml
undefinedmodels/staging/sources.yml
models/staging/sources.yml
version: 2
sources:
- name: jaffle_shop
description: Raw data from the Jaffle Shop application
database: raw
schema: jaffle_shop
tables:
- name: orders
description: One record per order
columns:
- name: id
description: Primary key for orders
tests:
- unique
- not_null
- name: user_id description: Foreign key to customers
- name: order_date description: Date order was placed
- name: status description: Order status (completed, pending, cancelled)
- name: id
description: Primary key for orders
tests:
- name: orders
description: One record per order
columns:
```sql
-- Reference the source
select * from {{ source('jaffle_shop', 'orders') }}Source Features:
- Document raw data tables
- Test source data quality
- Track freshness with config
freshness - Separate source definitions from transformations
version: 2
sources:
- name: jaffle_shop
description: 来自Jaffle Shop应用的原始数据
database: raw
schema: jaffle_shop
tables:
- name: orders
description: 每条记录对应一个订单
columns:
- name: id
description: 订单主键
tests:
- unique
- not_null
- name: user_id description: 关联客户的外键
- name: order_date description: 订单创建日期
- name: status description: 订单状态(已完成、待处理、已取消)
- name: id
description: 订单主键
tests:
- name: orders
description: 每条记录对应一个订单
columns:
```sql
-- 引用数据源
select * from {{ source('jaffle_shop', 'orders') }}数据源特性:
- 为原始数据表编写文档
- 测试源数据质量
- 使用配置跟踪数据新鲜度
freshness - 将源定义与转换逻辑分离
Model Organization
模型组织
Recommended project structure:
models/
├── staging/ # One-to-one with source tables
│ ├── jaffle_shop/
│ │ ├── _jaffle_shop__sources.yml
│ │ ├── _jaffle_shop__models.yml
│ │ ├── stg_jaffle_shop__orders.sql
│ │ └── stg_jaffle_shop__customers.sql
│ └── stripe/
│ ├── _stripe__sources.yml
│ ├── _stripe__models.yml
│ └── stg_stripe__payments.sql
├── intermediate/ # Purpose-built transformations
│ └── int_orders_joined.sql
└── marts/ # Business-defined entities
├── core/
│ ├── _core__models.yml
│ ├── dim_customers.sql
│ └── fct_orders.sql
└── marketing/
└── fct_customer_sessions.sqlNaming Conventions:
- : Staging models (one-to-one with sources)
stg_ - : Intermediate models (not exposed to end users)
int_ - : Fact tables
fct_ - : Dimension tables
dim_
推荐的项目结构:
models/
├── staging/ # 与源表一一对应
│ ├── jaffle_shop/
│ │ ├── _jaffle_shop__sources.yml
│ │ ├── _jaffle_shop__models.yml
│ │ ├── stg_jaffle_shop__orders.sql
│ │ └── stg_jaffle_shop__customers.sql
│ └── stripe/
│ ├── _stripe__sources.yml
│ ├── _stripe__models.yml
│ └── stg_stripe__payments.sql
├── intermediate/ # 特定用途的转换
│ └── int_orders_joined.sql
└── marts/ # 业务定义的实体
├── core/
│ ├── _core__models.yml
│ ├── dim_customers.sql
│ └── fct_orders.sql
└── marketing/
└── fct_customer_sessions.sql命名规范:
- : staging模型(与源表一一对应)
stg_ - : 中间模型(不暴露给终端用户)
int_ - : 事实表
fct_ - : 维度表
dim_
Materializations
物化方式
Materializations determine how dbt builds models in your warehouse:
物化方式决定dbt如何在数据仓库中构建模型:
1. View (Default)
1. 视图(默认)
sql
{{ config(materialized='view') }}
select * from {{ ref('base_model') }}Characteristics:
- Lightweight, no data stored
- Query runs each time view is accessed
- Best for: Small datasets, models queried infrequently
- Fast to build, slower to query
sql
{{ config(materialized='view') }}
select * from {{ ref('base_model') }}特性:
- 轻量级,不存储数据
- 每次访问视图时执行查询
- 最佳场景:小型数据集、不常查询的模型
- 构建速度快,查询速度慢
2. Table
2. 表
sql
{{ config(materialized='table') }}
select * from {{ ref('base_model') }}Characteristics:
- Full table rebuild on each run
- Data physically stored
- Best for: Small to medium datasets, heavily queried models
- Slower to build, faster to query
sql
{{ config(materialized='table') }}
select * from {{ ref('base_model') }}特性:
- 每次运行时完全重建表
- 数据物理存储
- 最佳场景:中小型数据集、频繁查询的模型
- 构建速度慢,查询速度快
3. Incremental
3. 增量
sql
{{ config(
materialized='incremental',
unique_key='order_id',
on_schema_change='fail'
) }}
select * from {{ source('jaffle_shop', 'orders') }}
{% if is_incremental() %}
-- Only process new/updated records
where order_date > (select max(order_date) from {{ this }})
{% endif %}Characteristics:
- Only processes new data on subsequent runs
- First run builds full table
- Best for: Large datasets, event/time-series data
- Fast incremental builds, maintains historical data
Incremental Strategies:
sql
-- Append (default): Add new rows only
{{ config(
materialized='incremental',
incremental_strategy='append'
) }}
-- Merge: Upsert based on unique_key
{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge'
) }}
-- Delete+Insert: Delete matching records, insert new
{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='delete+insert'
) }}sql
{{ config(
materialized='incremental',
unique_key='order_id',
on_schema_change='fail'
) }}
select * from {{ source('jaffle_shop', 'orders') }}
{% if is_incremental() %}
-- 仅处理新增/更新的记录
where order_date > (select max(order_date) from {{ this }})
{% endif %}特性:
- 后续运行仅处理新数据
- 首次运行构建完整表
- 最佳场景:大型数据集、时间序列数据
- 增量构建速度快,保留历史数据
增量策略:
sql
-- 追加(默认):仅添加新行
{{ config(
materialized='incremental',
incremental_strategy='append'
) }}
-- 合并:基于unique_key进行更新插入
{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge'
) }}
-- 删除+插入:删除匹配记录,插入新记录
{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='delete+insert'
) }}4. Ephemeral
4. 临时
sql
{{ config(materialized='ephemeral') }}
select * from {{ ref('base_model') }}Characteristics:
- Not built in warehouse
- Interpolated as CTE in dependent models
- Best for: Lightweight transformations, avoiding view proliferation
- No storage, compiled into downstream models
sql
{{ config(materialized='ephemeral') }}
select * from {{ ref('base_model') }}特性:
- 不在数据仓库中构建
- 作为CTE插入到依赖模型中
- 最佳场景:轻量级转换、避免视图泛滥
- 无存储,编译到下游模型中
Configuration Comparison
配置对比
| Materialization | Build Speed | Query Speed | Storage | Use Case |
|---|---|---|---|---|
| View | Fast | Slow | None | Small datasets, infrequent queries |
| Table | Slow | Fast | High | Medium datasets, frequent queries |
| Incremental | Fast* | Fast | High | Large datasets, time-series data |
| Ephemeral | N/A | Varies | None | Intermediate logic, CTEs |
*After initial full build
| 物化方式 | 构建速度 | 查询速度 | 存储 | 使用场景 |
|---|---|---|---|---|
| 视图 | 快 | 慢 | 无 | 小型数据集、不常查询 |
| 表 | 慢 | 快 | 高 | 中型数据集、频繁查询 |
| 增量 | 快* | 快 | 高 | 大型数据集、时间序列数据 |
| 临时 | N/A | 多变 | 无 | 中间逻辑、CTE |
*首次完整构建后
Testing
测试
Schema Tests
模式测试
Built-in generic tests defined in YAML:
yaml
undefined在YAML中定义内置通用测试:
yaml
undefinedmodels/staging/stg_orders.yml
models/staging/stg_orders.yml
version: 2
models:
- name: stg_orders
description: Staged order data
columns:
-
name: order_id description: Primary key tests:
- unique
- not_null
-
name: customer_id description: Foreign key to customers tests:
- not_null
- relationships: to: ref('stg_customers') field: customer_id
-
name: status description: Order status tests:
- accepted_values: values: ['placed', 'shipped', 'completed', 'returned', 'cancelled']
-
name: order_total description: Total order amount tests:
- not_null
- dbt_utils.expression_is_true: expression: ">= 0"
-
**Built-in Tests:**
- `unique`: No duplicate values
- `not_null`: No null values
- `accepted_values`: Value in specified list
- `relationships`: Foreign key validationversion: 2
models:
- name: stg_orders
description: 经过预处理的订单数据
columns:
-
name: order_id description: 主键 tests:
- unique
- not_null
-
name: customer_id description: 关联客户的外键 tests:
- not_null
- relationships: to: ref('stg_customers') field: customer_id
-
name: status description: 订单状态 tests:
- accepted_values: values: ['placed', 'shipped', 'completed', 'returned', 'cancelled']
-
name: order_total description: 订单总金额 tests:
- not_null
- dbt_utils.expression_is_true: expression: ">= 0"
-
**内置测试:**
- `unique`: 无重复值
- `not_null`: 无空值
- `accepted_values`: 值在指定列表中
- `relationships`: 外键验证Custom Data Tests
自定义数据测试
Create custom tests in directory:
tests/sql
-- tests/assert_positive_order_totals.sql
select
order_id,
order_total
from {{ ref('fct_orders') }}
where order_total < 0How it works:
- Test fails if query returns any rows
- Query should return failing records
- Can use any SQL logic
在目录中创建自定义测试:
tests/sql
-- tests/assert_positive_order_totals.sql
select
order_id,
order_total
from {{ ref('fct_orders') }}
where order_total < 0工作原理:
- 如果查询返回任何行,测试失败
- 查询应返回不符合要求的记录
- 可使用任意SQL逻辑
Advanced Testing Patterns
高级测试模式
sql
-- Test for data freshness
-- tests/assert_orders_are_fresh.sql
with latest_order as (
select max(order_date) as max_date
from {{ ref('fct_orders') }}
)
select max_date
from latest_order
where max_date < current_date - interval '1 day'sql
-- Test for referential integrity across time
-- tests/assert_no_orphaned_orders.sql
select
o.order_id,
o.customer_id
from {{ ref('fct_orders') }} o
left join {{ ref('dim_customers') }} c
on o.customer_id = c.customer_id
where c.customer_id is nullsql
-- 测试数据新鲜度
-- tests/assert_orders_are_fresh.sql
with latest_order as (
select max(order_date) as max_date
from {{ ref('fct_orders') }}
)
select max_date
from latest_order
where max_date < current_date - interval '1 day'sql
-- 跨时间测试引用完整性
-- tests/assert_no_orphaned_orders.sql
select
o.order_id,
o.customer_id
from {{ ref('fct_orders') }} o
left join {{ ref('dim_customers') }} c
on o.customer_id = c.customer_id
where c.customer_id is nullTesting with dbt_utils
使用dbt_utils测试
yaml
undefinedyaml
undefinedRequires dbt-utils package
需要dbt-utils包
models:
- name: stg_orders
columns:
-
name: order_id tests:
Test for uniqueness across multiple columns
- dbt_utils.unique_combination_of_columns: combination_of_columns: - order_id - order_date
Test for sequential values
- dbt_utils.sequential_values: interval: 1
Test that values match regex
- dbt_utils.not_null_proportion: at_least: 0.95
-
undefinedmodels:
- name: stg_orders
columns:
-
name: order_id tests:
测试多列组合唯一性
- dbt_utils.unique_combination_of_columns: combination_of_columns: - order_id - order_date
测试值的连续性
- dbt_utils.sequential_values: interval: 1
测试值匹配正则表达式
- dbt_utils.not_null_proportion: at_least: 0.95
-
undefinedTest Severity Levels
测试严重级别
yaml
models:
- name: stg_orders
columns:
- name: order_id
tests:
- unique:
severity: error # Fail build (default)
- not_null:
severity: warn # Warning onlyyaml
models:
- name: stg_orders
columns:
- name: order_id
tests:
- unique:
severity: error # 构建失败(默认)
- not_null:
severity: warn # 仅警告Documentation
文档
Model Documentation
模型文档
yaml
undefinedyaml
undefinedmodels/marts/core/_core__models.yml
models/marts/core/_core__models.yml
version: 2
models:
-
name: fct_orders description: | Order fact table containing one row per order with associated customer and payment information. This is the primary table for order analytics and reporting.Grain: One row per orderRefresh: Incremental, updates daily at 2 AM UTCNotes:
- Includes cancelled orders (filter with status column)
- Payment info joined from Stripe data
- Customer info joined from application database
columns:-
name: order_id description: Primary key for orders table tests:
- unique
- not_null
-
name: customer_id description: | Foreign key to dim_customers. Links to customer who placed the order. Note: May be null for guest checkout orders.
-
name: order_date description: Date order was placed (UTC timezone)
-
name: status description: | Current order status. Possible values:
- : Order received, not yet processed
placed - : Order shipped to customer
shipped - : Order delivered and confirmed
completed - : Order returned by customer
returned - : Order cancelled before shipment
cancelled
-
name: order_total description: Total order amount in USD including tax and shipping
undefinedversion: 2
models:
-
name: fct_orders description: | 订单事实表,每条记录对应一个订单,包含关联的客户和支付信息。这是订单分析和报告的主表。粒度:每条记录对应一个订单刷新频率:增量更新,每天UTC时间2点更新说明:
- 包含已取消的订单(通过status列过滤)
- 支付信息来自Stripe数据
- 客户信息来自应用数据库
columns:-
name: order_id description: 订单表主键 tests:
- unique
- not_null
-
name: customer_id description: | 关联dim_customers的外键,指向创建订单的客户。 注意:访客结账的订单该字段可能为null。
-
name: order_date description: 订单创建日期(UTC时区)
-
name: status description: | 当前订单状态。可能的值:
- : 订单已接收,尚未处理
placed - : 订单已发货
shipped - : 订单已送达并确认
completed - : 客户已发起退货
returned - : 订单已取消
cancelled
-
name: order_total description: 订单总金额(包含税费和运费)
undefinedDocumentation Blocks
文档块
Create reusable documentation:
markdown
<!-- models/docs.md -->
{% docs order_status %}
Order status indicates the current state of an order in our fulfillment pipeline.
| Status | Description | Next Steps |
|--------|-------------|------------|
| placed | Order received | Inventory check |
| shipped | En route to customer | Track shipment |
| completed | Delivered successfully | Request feedback |
| returned | Customer return initiated | Process refund |
| cancelled | Order cancelled | Update inventory |
{% enddocs %}
{% docs customer_id %}
Unique identifier for customers. This ID is:
- Generated by the application on account creation
- Persistent across orders
- Used to track customer lifetime value
- **Note:** NULL for guest checkouts
{% enddocs %}Reference documentation blocks:
yaml
models:
- name: fct_orders
columns:
- name: status
description: "{{ doc('order_status') }}"
- name: customer_id
description: "{{ doc('customer_id') }}"创建可复用的文档:
markdown
<!-- models/docs.md -->
{% docs order_status %}
订单状态表示订单在履约流程中的当前状态。
| 状态 | 描述 | 下一步操作 |
|--------|-------------|------------|
| placed | 订单已接收 | 库存检查 |
| shipped | 正在配送 | 跟踪物流 |
| completed | 已成功送达 | 请求反馈 |
| returned | 客户发起退货 | 处理退款 |
| cancelled | 订单已取消 | 更新库存 |
{% enddocs %}
{% docs customer_id %}
客户的唯一标识符。该ID:
- 在客户创建账户时由应用生成
- 在所有订单中保持一致
- 用于跟踪客户生命周期价值
- **注意**:访客结账的订单该字段为NULL
{% enddocs %}引用文档块:
yaml
models:
- name: fct_orders
columns:
- name: status
description: "{{ doc('order_status') }}"
- name: customer_id
description: "{{ doc('customer_id') }}"Generating Documentation
生成文档
bash
undefinedbash
undefinedGenerate documentation site
生成文档站点
dbt docs generate
dbt docs generate
Serve documentation locally
本地启动文档服务
dbt docs serve --port 8001
dbt docs serve --port 8001
View in browser at http://localhost:8001
在浏览器中访问 http://localhost:8001
**Documentation Features:**
- Interactive lineage graph (DAG visualization)
- Searchable model catalog
- Column-level documentation
- Source freshness tracking
- Test coverage visibility
- Compiled SQL preview
**文档特性:**
- 交互式血缘图(DAG可视化)
- 可搜索的模型目录
- 列级文档
- 源数据新鲜度跟踪
- 测试覆盖率可见性
- 编译后的SQL预览Documentation Best Practices
文档最佳实践
- Document at all levels: Project, models, columns, sources
- Explain business logic: Why transformations exist
- Define grain explicitly: One row represents...
- Note refresh schedules: How often data updates
- Document assumptions: Edge cases, known issues
- Link to external resources: Confluence, wiki, dashboards
- 全层级文档:项目、模型、列、数据源都需要文档
- 解释业务逻辑:说明转换的原因
- 明确定义粒度:每条记录代表什么
- 说明刷新计划:数据更新频率
- 记录假设:边缘情况、已知问题
- 链接外部资源:Confluence、Wiki、仪表盘
Incremental Models
增量模型
Basic Incremental Pattern
基础增量模式
sql
{{ config(
materialized='incremental',
unique_key='event_id'
) }}
with source as (
select
event_id,
user_id,
event_timestamp,
event_type,
event_properties
from {{ source('analytics', 'events') }}
{% if is_incremental() %}
-- Only process events newer than existing data
where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}
)
select * from sourceKey Components:
- : True after first run
is_incremental() - : References current model's table
{{ this }} - : Column(s) for deduplication
unique_key
sql
{{ config(
materialized='incremental',
unique_key='event_id'
) }}
with source as (
select
event_id,
user_id,
event_timestamp,
event_type,
event_properties
from {{ source('analytics', 'events') }}
{% if is_incremental() %}
-- 仅处理比现有数据新的事件
where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}
)
select * from source核心组件:
- :首次运行后为True
is_incremental() - :引用当前模型的表
{{ this }} - :用于去重的列
unique_key
Incremental with Merge Strategy
合并策略的增量模型
sql
{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge',
merge_update_columns=['status', 'updated_at'],
merge_exclude_columns=['created_at']
) }}
with orders as (
select
order_id,
customer_id,
order_date,
status,
order_total,
current_timestamp() as updated_at,
case
when status = 'placed' then current_timestamp()
else null
end as created_at
from {{ source('ecommerce', 'orders') }}
{% if is_incremental() %}
-- Look back 3 days to catch late-arriving updates
where order_date >= (select max(order_date) - interval '3 days' from {{ this }})
{% endif %}
)
select * from ordersMerge Strategy Features:
- Updates existing records based on
unique_key - Inserts new records
- Optional: Specify which columns to update/exclude
- Best for: Slowly changing data, updates to historical records
sql
{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge',
merge_update_columns=['status', 'updated_at'],
merge_exclude_columns=['created_at']
) }}
with orders as (
select
order_id,
customer_id,
order_date,
status,
order_total,
current_timestamp() as updated_at,
case
when status = 'placed' then current_timestamp()
else null
end as created_at
from {{ source('ecommerce', 'orders') }}
{% if is_incremental() %}
-- 回溯3天以捕获延迟到达的更新
where order_date >= (select max(order_date) - interval '3 days' from {{ this }})
{% endif %}
)
select * from orders合并策略特性:
- 基于更新现有记录
unique_key - 插入新记录
- 可选:指定要更新/排除的列
- 最佳场景:缓慢变化的数据、历史记录更新
Incremental with Delete+Insert
删除+插入策略的增量模型
sql
{{ config(
materialized='incremental',
unique_key=['date', 'customer_id'],
incremental_strategy='delete+insert'
) }}
with daily_metrics as (
select
date_trunc('day', order_timestamp) as date,
customer_id,
count(*) as order_count,
sum(order_total) as total_revenue
from {{ ref('fct_orders') }}
{% if is_incremental() %}
where date_trunc('day', order_timestamp) >= (
select max(date) - interval '7 days' from {{ this }}
)
{% endif %}
group by 1, 2
)
select * from daily_metricsDelete+Insert Strategy:
- Deletes all rows matching
unique_key - Inserts new rows
- Best for: Aggregated data, full partition replacement
- More efficient than merge for bulk updates
sql
{{ config(
materialized='incremental',
unique_key=['date', 'customer_id'],
incremental_strategy='delete+insert'
) }}
with daily_metrics as (
select
date_trunc('day', order_timestamp) as date,
customer_id,
count(*) as order_count,
sum(order_total) as total_revenue
from {{ ref('fct_orders') }}
{% if is_incremental() %}
where date_trunc('day', order_timestamp) >= (
select max(date) - interval '7 days' from {{ this }}
)
{% endif %}
group by 1, 2
)
select * from daily_metrics删除+插入策略:
- 删除所有匹配的行
unique_key - 插入新行
- 最佳场景:聚合数据、完整分区替换
- 批量更新比合并更高效
Handling Late-Arriving Data
处理延迟到达的数据
sql
{{ config(
materialized='incremental',
unique_key='order_id'
) }}
select
order_id,
customer_id,
order_date,
status,
_loaded_at
from {{ source('ecommerce', 'orders') }}
{% if is_incremental() %}
-- Use _loaded_at instead of order_date to catch updates
where _loaded_at > (select max(_loaded_at) from {{ this }})
-- OR use a lookback window
-- where order_date > (select max(order_date) - interval '3 days' from {{ this }})
{% endif %}sql
{{ config(
materialized='incremental',
unique_key='order_id'
) }}
select
order_id,
customer_id,
order_date,
status,
_loaded_at
from {{ source('ecommerce', 'orders') }}
{% if is_incremental() %}
-- 使用_loaded_at而非order_date来捕获更新
where _loaded_at > (select max(_loaded_at) from {{ this }})
-- 或者使用回溯窗口
-- where order_date > (select max(order_date) - interval '3 days' from {{ this }})
{% endif %}Incremental with Partitioning
带分区的增量模型
sql
{{ config(
materialized='incremental',
unique_key='event_id',
partition_by={
'field': 'event_date',
'data_type': 'date',
'granularity': 'day'
},
cluster_by=['user_id', 'event_type']
) }}
select
event_id,
user_id,
event_type,
event_timestamp,
date(event_timestamp) as event_date
from {{ source('analytics', 'raw_events') }}
{% if is_incremental() %}
where date(event_timestamp) > (select max(event_date) from {{ this }})
{% endif %}Partition Benefits:
- Improved query performance
- Cost optimization (scan less data)
- Efficient incremental processing
- Better for time-series data
sql
{{ config(
materialized='incremental',
unique_key='event_id',
partition_by={
'field': 'event_date',
'data_type': 'date',
'granularity': 'day'
},
cluster_by=['user_id', 'event_type']
) }}
select
event_id,
user_id,
event_type,
event_timestamp,
date(event_timestamp) as event_date
from {{ source('analytics', 'raw_events') }}
{% if is_incremental() %}
where date(event_timestamp) > (select max(event_date) from {{ this }})
{% endif %}分区优势:
- 提升查询性能
- 成本优化(扫描更少数据)
- 高效的增量处理
- 更适合时间序列数据
Full Refresh Capability
全量刷新能力
bash
undefinedbash
undefinedForce full rebuild of incremental models
强制全量重建增量模型
dbt run --full-refresh
dbt run --full-refresh
Full refresh specific model
全量刷新特定模型
dbt run --select my_incremental_model --full-refresh
undefineddbt run --select my_incremental_model --full-refresh
undefinedMacros & Jinja
宏与Jinja
Basic Macro Structure
基础宏结构
sql
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}Usage:
sql
select
order_id,
{{ cents_to_dollars('amount_cents') }} as amount_dollars
from {{ ref('stg_orders') }}sql
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}使用方式:
sql
select
order_id,
{{ cents_to_dollars('amount_cents') }} as amount_dollars
from {{ ref('stg_orders') }}Reusable Data Quality Macros
可复用的数据质量宏
sql
-- macros/test_not_negative.sql
{% macro test_not_negative(model, column_name) %}
select
{{ column_name }}
from {{ model }}
where {{ column_name }} < 0
{% endmacro %}sql
-- macros/test_not_negative.sql
{% macro test_not_negative(model, column_name) %}
select
{{ column_name }}
from {{ model }}
where {{ column_name }} < 0
{% endmacro %}Date Spine Macro
日期序列宏
sql
-- macros/date_spine.sql
{% macro date_spine(start_date, end_date) %}
with date_spine as (
{{ dbt_utils.date_spine(
datepart="day",
start_date="cast('" ~ start_date ~ "' as date)",
end_date="cast('" ~ end_date ~ "' as date)"
) }}
)
select
date_day
from date_spine
{% endmacro %}sql
-- macros/date_spine.sql
{% macro date_spine(start_date, end_date) %}
with date_spine as (
{{ dbt_utils.date_spine(
datepart="day",
start_date="cast('" ~ start_date ~ "' as date)",
end_date="cast('" ~ end_date ~ "' as date)"
) }}
)
select
date_day
from date_spine
{% endmacro %}Dynamic SQL Generation
动态SQL生成
sql
-- macros/pivot_metric.sql
{% macro pivot_metric(metric_column, group_by_column, values) %}
select
{{ group_by_column }},
{% for value in values %}
sum(case when status = '{{ value }}' then {{ metric_column }} else 0 end)
as {{ value }}_{{ metric_column }}
{% if not loop.last %},{% endif %}
{% endfor %}
from {{ ref('fct_orders') }}
group by 1
{% endmacro %}Usage:
sql
{{ pivot_metric(
metric_column='order_total',
group_by_column='customer_id',
values=['completed', 'pending', 'cancelled']
) }}sql
-- macros/pivot_metric.sql
{% macro pivot_metric(metric_column, group_by_column, values) %}
select
{{ group_by_column }},
{% for value in values %}
sum(case when status = '{{ value }}' then {{ metric_column }} else 0 end)
as {{ value }}_{{ metric_column }}
{% if not loop.last %},{% endif %}
{% endfor %}
from {{ ref('fct_orders') }}
group by 1
{% endmacro %}使用方式:
sql
{{ pivot_metric(
metric_column='order_total',
group_by_column='customer_id',
values=['completed', 'pending', 'cancelled']
) }}Grant Permissions Macro
权限授予宏
sql
-- macros/grant_select.sql
{% macro grant_select(schema, role) %}
{% set sql %}
grant select on all tables in schema {{ schema }} to {{ role }};
{% endset %}
{% do run_query(sql) %}
{% do log("Granted select on " ~ schema ~ " to " ~ role, info=True) %}
{% endmacro %}Usage in hooks:
yaml
undefinedsql
-- macros/grant_select.sql
{% macro grant_select(schema, role) %}
{% set sql %}
grant select on all tables in schema {{ schema }} to {{ role }};
{% endset %}
{% do run_query(sql) %}
{% do log("Granted select on " ~ schema ~ " to " ~ role, info=True) %}
{% endmacro %}在钩子中使用:
yaml
undefineddbt_project.yml
dbt_project.yml
on-run-end:
- "{{ grant_select(target.schema, 'analyst_role') }}"
undefinedon-run-end:
- "{{ grant_select(target.schema, 'analyst_role') }}"
undefinedEnvironment-Specific Logic
环境特定逻辑
sql
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if target.name == 'prod' -%}
{%- if custom_schema_name is not none -%}
{{ custom_schema_name | trim }}
{%- else -%}
{{ default_schema }}
{%- endif -%}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}sql
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if target.name == 'prod' -%}
{%- if custom_schema_name is not none -%}
{{ custom_schema_name | trim }}
{%- else -%}
{{ default_schema }}
{%- endif -%}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}Audit Column Macro
审计列宏
sql
-- macros/add_audit_columns.sql
{% macro add_audit_columns() %}
current_timestamp() as dbt_updated_at,
current_timestamp() as dbt_created_at,
'{{ var("dbt_user") }}' as dbt_updated_by
{% endmacro %}Usage:
sql
select
order_id,
customer_id,
order_total,
{{ add_audit_columns() }}
from {{ ref('stg_orders') }}sql
-- macros/add_audit_columns.sql
{% macro add_audit_columns() %}
current_timestamp() as dbt_updated_at,
current_timestamp() as dbt_created_at,
'{{ var("dbt_user") }}' as dbt_updated_by
{% endmacro %}使用方式:
sql
select
order_id,
customer_id,
order_total,
{{ add_audit_columns() }}
from {{ ref('stg_orders') }}Jinja Control Structures
Jinja控制结构
sql
-- Conditionals
{% if target.name == 'prod' %}
from {{ source('production', 'orders') }}
{% else %}
from {{ source('development', 'orders') }}
{% endif %}
-- Loops
{% for status in ['placed', 'shipped', 'completed'] %}
sum(case when status = '{{ status }}' then 1 else 0 end) as {{ status }}_count
{% if not loop.last %},{% endif %}
{% endfor %}
-- Set variables
{% set payment_methods = ['credit_card', 'paypal', 'bank_transfer'] %}
{% for method in payment_methods %}
count(distinct case when payment_method = '{{ method }}'
then customer_id end) as {{ method }}_customers
{% if not loop.last %},{% endif %}
{% endfor %}sql
-- 条件判断
{% if target.name == 'prod' %}
from {{ source('production', 'orders') }}
{% else %}
from {{ source('development', 'orders') }}
{% endif %}
-- 循环
{% for status in ['placed', 'shipped', 'completed'] %}
sum(case when status = '{{ status }}' then 1 else 0 end) as {{ status }}_count
{% if not loop.last %},{% endif %}
{% endfor %}
-- 设置变量
{% set payment_methods = ['credit_card', 'paypal', 'bank_transfer'] %}
{% for method in payment_methods %}
count(distinct case when payment_method = '{{ method }}'
then customer_id end) as {{ method }}_customers
{% if not loop.last %},{% endif %}
{% endfor %}Package Management
包管理
Installing Packages
安装包
yaml
undefinedyaml
undefinedpackages.yml
packages.yml
packages:
dbt-utils: Essential utility macros
- package: dbt-labs/dbt_utils version: 1.1.1
Audit helper: Compare datasets
- package: dbt-labs/audit_helper version: 0.9.0
Codegen: Code generation utilities
- package: dbt-labs/codegen version: 0.11.0
Custom package from Git
- git: "https://github.com/your-org/dbt-custom-package.git" revision: main
Local package
- local: ../dbt-shared-macros
Install packages:
```bash
dbt depspackages:
dbt-utils:核心工具宏
- package: dbt-labs/dbt_utils version: 1.1.1
Audit helper:数据集对比
- package: dbt-labs/audit_helper version: 0.9.0
Codegen:代码生成工具
- package: dbt-labs/codegen version: 0.11.0
来自Git的自定义包
- git: "https://github.com/your-org/dbt-custom-package.git" revision: main
本地包
- local: ../dbt-shared-macros
安装包:
```bash
dbt depsUsing dbt_utils
使用dbt_utils
sql
-- Surrogate key generation
select
{{ dbt_utils.generate_surrogate_key(['order_id', 'line_item_id']) }} as order_line_id,
order_id,
line_item_id
from {{ ref('stg_order_lines') }}
-- Union multiple tables
{{ dbt_utils.union_relations(
relations=[
ref('orders_2022'),
ref('orders_2023'),
ref('orders_2024')
]
) }}
-- Get column values as list
{% set statuses = dbt_utils.get_column_values(
table=ref('stg_orders'),
column='status'
) %}
-- Pivot table
{{ dbt_utils.pivot(
column='metric_name',
values=dbt_utils.get_column_values(table=ref('metrics'), column='metric_name'),
agg='sum',
then_value='metric_value',
else_value=0,
prefix='',
suffix='_total'
) }}sql
-- 生成代理键
select
{{ dbt_utils.generate_surrogate_key(['order_id', 'line_item_id']) }} as order_line_id,
order_id,
line_item_id
from {{ ref('stg_order_lines') }}
-- 合并多个表
{{ dbt_utils.union_relations(
relations=[
ref('orders_2022'),
ref('orders_2023'),
ref('orders_2024')
]
) }}
-- 获取列值列表
{% set statuses = dbt_utils.get_column_values(
table=ref('stg_orders'),
column='status'
) %}
-- 透视表
{{ dbt_utils.pivot(
column='metric_name',
values=dbt_utils.get_column_values(table=ref('metrics'), column='metric_name'),
agg='sum',
then_value='metric_value',
else_value=0,
prefix='',
suffix='_total'
) }}Creating Custom Packages
创建自定义包
Project structure for a package:
dbt-custom-package/
├── dbt_project.yml
├── macros/
│ ├── custom_test.sql
│ └── custom_macro.sql
├── models/
│ └── example_model.sql
└── README.mdyaml
undefined自定义包的项目结构:
dbt-custom-package/
├── dbt_project.yml
├── macros/
│ ├── custom_test.sql
│ └── custom_macro.sql
├── models/
│ └── example_model.sql
└── README.mdyaml
undefineddbt_project.yml for custom package
自定义包的dbt_project.yml
name: 'custom_package'
version: '1.0.0'
config-version: 2
require-dbt-version: [">=1.0.0", "<2.0.0"]
undefinedname: 'custom_package'
version: '1.0.0'
config-version: 2
require-dbt-version: [">=1.0.0", "<2.0.0"]
undefinedPackage Versioning
包版本控制
yaml
undefinedyaml
undefinedSemantic versioning
语义版本控制
packages:
- package: dbt-labs/dbt_utils version: [">=1.0.0", "<2.0.0"] # Any 1.x version
Exact version
- package: dbt-labs/dbt_utils version: 1.1.1
Git branch/tag
- git: "https://github.com/org/package.git" revision: v1.2.3
Latest from branch
- git: "https://github.com/org/package.git" revision: main
undefinedpackages:
- package: dbt-labs/dbt_utils version: [">=1.0.0", "<2.0.0"] # 任意1.x版本
精确版本
- package: dbt-labs/dbt_utils version: 1.1.1
Git分支/标签
- git: "https://github.com/org/package.git" revision: v1.2.3
分支最新版本
- git: "https://github.com/org/package.git" revision: main
undefinedProduction Workflows
生产工作流
CI/CD Pipeline (GitHub Actions)
CI/CD管道(GitHub Actions)
yaml
undefinedyaml
undefined.github/workflows/dbt_ci.yml
.github/workflows/dbt_ci.yml
name: dbt CI
on:
pull_request:
branches: [main]
jobs:
dbt_run:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dbt
run: |
pip install dbt-core dbt-snowflake
- name: Install dbt packages
run: dbt deps
- name: Run dbt debug
run: dbt debug
env:
DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.DBT_SNOWFLAKE_ACCOUNT }}
DBT_SNOWFLAKE_USER: ${{ secrets.DBT_SNOWFLAKE_USER }}
DBT_SNOWFLAKE_PASSWORD: ${{ secrets.DBT_SNOWFLAKE_PASSWORD }}
- name: Run dbt models (modified only)
run: dbt run --select state:modified+ --state ./prod_manifest
env:
DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.DBT_SNOWFLAKE_ACCOUNT }}
DBT_SNOWFLAKE_USER: ${{ secrets.DBT_SNOWFLAKE_USER }}
DBT_SNOWFLAKE_PASSWORD: ${{ secrets.DBT_SNOWFLAKE_PASSWORD }}
- name: Run dbt tests
run: dbt test --select state:modified+
env:
DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.DBT_SNOWFLAKE_ACCOUNT }}
DBT_SNOWFLAKE_USER: ${{ secrets.DBT_SNOWFLAKE_USER }}
DBT_SNOWFLAKE_PASSWORD: ${{ secrets.DBT_SNOWFLAKE_PASSWORD }}undefinedname: dbt CI
on:
pull_request:
branches: [main]
jobs:
dbt_run:
runs-on: ubuntu-latest
steps:
- name: 拉取代码
uses: actions/checkout@v3
- name: 设置Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: 安装dbt
run: |
pip install dbt-core dbt-snowflake
- name: 安装dbt包
run: dbt deps
- name: 运行dbt debug
run: dbt debug
env:
DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.DBT_SNOWFLAKE_ACCOUNT }}
DBT_SNOWFLAKE_USER: ${{ secrets.DBT_SNOWFLAKE_USER }}
DBT_SNOWFLAKE_PASSWORD: ${{ secrets.DBT_SNOWFLAKE_PASSWORD }}
- name: 运行dbt模型(仅修改的模型)
run: dbt run --select state:modified+ --state ./prod_manifest
env:
DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.DBT_SNOWFLAKE_ACCOUNT }}
DBT_SNOWFLAKE_USER: ${{ secrets.DBT_SNOWFLAKE_USER }}
DBT_SNOWFLAKE_PASSWORD: ${{ secrets.DBT_SNOWFLAKE_PASSWORD }}
- name: 运行dbt测试
run: dbt test --select state:modified+
env:
DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.DBT_SNOWFLAKE_ACCOUNT }}
DBT_SNOWFLAKE_USER: ${{ secrets.DBT_SNOWFLAKE_USER }}
DBT_SNOWFLAKE_PASSWORD: ${{ secrets.DBT_SNOWFLAKE_PASSWORD }}undefinedSlim CI (Test Changed Models Only)
轻量CI(仅测试修改的模型)
bash
undefinedbash
undefinedStore production manifest
存储生产清单
dbt compile --target prod
cp target/manifest.json ./prod_manifest/
dbt compile --target prod
cp target/manifest.json ./prod_manifest/
In CI: Test only changed models and downstream dependencies
在CI中:仅测试修改的模型和下游依赖
dbt test --select state:modified+ --state ./prod_manifest
undefineddbt test --select state:modified+ --state ./prod_manifest
undefinedProduction Deployment
生产部署
yaml
undefinedyaml
undefined.github/workflows/dbt_prod.yml
.github/workflows/dbt_prod.yml
name: dbt Production Deploy
on:
push:
branches: [main]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dbt
run: pip install dbt-core dbt-snowflake
- name: Install packages
run: dbt deps
- name: Run dbt seed
run: dbt seed --target prod
- name: Run dbt run
run: dbt run --target prod
- name: Run dbt test
run: dbt test --target prod
- name: Generate docs
run: dbt docs generate --target prod
- name: Upload docs to S3
run: |
aws s3 sync target/ s3://dbt-docs-bucket/
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}undefinedname: dbt生产部署
on:
push:
branches: [main]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: 设置Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: 安装dbt
run: pip install dbt-core dbt-snowflake
- name: 安装包
run: dbt deps
- name: 运行dbt seed
run: dbt seed --target prod
- name: 运行dbt模型
run: dbt run --target prod
- name: 运行dbt测试
run: dbt test --target prod
- name: 生成文档
run: dbt docs generate --target prod
- name: 上传文档至S3
run: |
aws s3 sync target/ s3://dbt-docs-bucket/
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}undefinedOrchestration with Airflow
使用Airflow编排
python
undefinedpython
undefineddags/dbt_dag.py
dags/dbt_dag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'analytics',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'dbt_production',
default_args=default_args,
description='Run dbt models in production',
schedule_interval='0 2 * * *', # 2 AM daily
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['dbt', 'analytics'],
) as dag:
dbt_deps = BashOperator(
task_id='dbt_deps',
bash_command='cd /opt/dbt && dbt deps',
)
dbt_seed = BashOperator(
task_id='dbt_seed',
bash_command='cd /opt/dbt && dbt seed --target prod',
)
dbt_run_staging = BashOperator(
task_id='dbt_run_staging',
bash_command='cd /opt/dbt && dbt run --select staging.* --target prod',
)
dbt_run_marts = BashOperator(
task_id='dbt_run_marts',
bash_command='cd /opt/dbt && dbt run --select marts.* --target prod',
)
dbt_test = BashOperator(
task_id='dbt_test',
bash_command='cd /opt/dbt && dbt test --target prod',
)
dbt_docs = BashOperator(
task_id='dbt_docs',
bash_command='cd /opt/dbt && dbt docs generate --target prod',
)
# Define task dependencies
dbt_deps >> dbt_seed >> dbt_run_staging >> dbt_run_marts >> dbt_test >> dbt_docsundefinedfrom airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'analytics',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'dbt_production',
default_args=default_args,
description='在生产环境中运行dbt模型',
schedule_interval='0 2 * * *', # 每天UTC时间2点
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['dbt', 'analytics'],
) as dag:
dbt_deps = BashOperator(
task_id='dbt_deps',
bash_command='cd /opt/dbt && dbt deps',
)
dbt_seed = BashOperator(
task_id='dbt_seed',
bash_command='cd /opt/dbt && dbt seed --target prod',
)
dbt_run_staging = BashOperator(
task_id='dbt_run_staging',
bash_command='cd /opt/dbt && dbt run --select staging.* --target prod',
)
dbt_run_marts = BashOperator(
task_id='dbt_run_marts',
bash_command='cd /opt/dbt && dbt run --select marts.* --target prod',
)
dbt_test = BashOperator(
task_id='dbt_test',
bash_command='cd /opt/dbt && dbt test --target prod',
)
dbt_docs = BashOperator(
task_id='dbt_docs',
bash_command='cd /opt/dbt && dbt docs generate --target prod',
)
# 定义任务依赖
dbt_deps >> dbt_seed >> dbt_run_staging >> dbt_run_marts >> dbt_test >> dbt_docsundefineddbt Cloud Integration
dbt Cloud集成
yaml
undefinedyaml
undefineddbt_cloud.yml
dbt_cloud.yml
Environment configuration
环境配置
environments:
-
name: Production dbt_version: 1.7.latest type: deployment
-
name: Development dbt_version: 1.7.latest type: development
environments:
-
name: Production dbt_version: 1.7.latest type: deployment
-
name: Development dbt_version: 1.7.latest type: development
Job configuration
任务配置
jobs:
-
name: Production Run environment: Production triggers: schedule: cron: "0 2 * * *" # 2 AM daily commands:
- dbt deps
- dbt seed
- dbt run
- dbt test
-
name: CI Check environment: Development triggers: github_webhook: true commands:
- dbt deps
- dbt run --select state:modified+
- dbt test --select state:modified+
undefinedjobs:
-
name: 生产运行 environment: Production triggers: schedule: cron: "0 2 * * *" # 每天UTC时间2点 commands:
- dbt deps
- dbt seed
- dbt run
- dbt test
-
name: CI检查 environment: Development triggers: github_webhook: true commands:
- dbt deps
- dbt run --select state:modified+
- dbt test --select state:modified+
undefinedMonitoring & Alerting
监控与告警
sql
-- macros/post_hook_monitoring.sql
{% macro monitor_row_count(threshold=0) %}
{% if execute %}
{% set row_count_query %}
select count(*) as row_count from {{ this }}
{% endset %}
{% set results = run_query(row_count_query) %}
{% set row_count = results.columns[0].values()[0] %}
{% if row_count < threshold %}
{{ exceptions.raise_compiler_error("Row count " ~ row_count ~ " below threshold " ~ threshold) }}
{% endif %}
{{ log("Model " ~ this ~ " has " ~ row_count ~ " rows", info=True) }}
{% endif %}
{% endmacro %}Usage:
sql
{{ config(
post_hook="{{ monitor_row_count(threshold=1000) }}"
) }}
select * from {{ ref('stg_orders') }}sql
-- macros/post_hook_monitoring.sql
{% macro monitor_row_count(threshold=0) %}
{% if execute %}
{% set row_count_query %}
select count(*) as row_count from {{ this }}
{% endset %}
{% set results = run_query(row_count_query) %}
{% set row_count = results.columns[0].values()[0] %}
{% if row_count < threshold %}
{{ exceptions.raise_compiler_error("Row count " ~ row_count ~ " below threshold " ~ threshold) }}
{% endif %}
{{ log("Model " ~ this ~ " has " ~ row_count ~ " rows", info=True) }}
{% endif %}
{% endmacro %}使用方式:
sql
{{ config(
post_hook="{{ monitor_row_count(threshold=1000) }}"
) }}
select * from {{ ref('stg_orders') }}Best Practices
最佳实践
Naming Conventions
命名规范
Models:
stg_[source]__[entity].sql # Staging: stg_stripe__payments.sql
int_[entity]_[verb].sql # Intermediate: int_orders_joined.sql
fct_[entity].sql # Fact: fct_orders.sql
dim_[entity].sql # Dimension: dim_customers.sqlTests:
assert_[description].sql # assert_positive_order_totals.sqlMacros:
[verb]_[noun].sql # generate_surrogate_key.sql模型:
stg_[source]__[entity].sql # Staging模型:stg_stripe__payments.sql
int_[entity]_[verb].sql # 中间模型:int_orders_joined.sql
fct_[entity].sql # 事实表:fct_orders.sql
dim_[entity].sql # 维度表:dim_customers.sql测试:
assert_[description].sql # assert_positive_order_totals.sql宏:
[verb]_[noun].sql # generate_surrogate_key.sqlSQL Style Guide
SQL风格指南
sql
-- ✓ Good: Clear CTEs, proper formatting
with orders as (
select
order_id,
customer_id,
order_date,
status
from {{ ref('stg_orders') }}
where status != 'cancelled'
),
customers as (
select
customer_id,
customer_name,
customer_email
from {{ ref('dim_customers') }}
),
final as (
select
orders.order_id,
orders.order_date,
customers.customer_name,
orders.status
from orders
left join customers
on orders.customer_id = customers.customer_id
)
select * from final
-- ✗ Bad: Nested subqueries, poor formatting
select o.order_id, o.order_date, c.customer_name, o.status from (
select order_id, customer_id, order_date, status from {{ ref('stg_orders') }}
where status != 'cancelled') o left join (select customer_id, customer_name from
{{ ref('dim_customers') }}) c on o.customer_id = c.customer_idsql
-- ✓ 良好:清晰的CTE、格式规范
with orders as (
select
order_id,
customer_id,
order_date,
status
from {{ ref('stg_orders') }}
where status != 'cancelled'
),
customers as (
select
customer_id,
customer_name,
customer_email
from {{ ref('dim_customers') }}
),
final as (
select
orders.order_id,
orders.order_date,
customers.customer_name,
orders.status
from orders
left join customers
on orders.customer_id = customers.customer_id
)
select * from final
-- ✗ 不良:嵌套子查询、格式混乱
select o.order_id, o.order_date, c.customer_name, o.status from (
select order_id, customer_id, order_date, status from {{ ref('stg_orders') }}
where status != 'cancelled') o left join (select customer_id, customer_name from
{{ ref('dim_customers') }}) c on o.customer_id = c.customer_idPerformance Optimization
性能优化
1. Use Incremental Models for Large Tables
sql
-- Process only new data
{{ config(materialized='incremental') }}
select * from {{ source('events', 'page_views') }}
{% if is_incremental() %}
where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}2. Leverage Clustering and Partitioning
sql
{{ config(
materialized='table',
partition_by={'field': 'order_date', 'data_type': 'date'},
cluster_by=['customer_id', 'status']
) }}3. Reduce Data Scanned
sql
-- ✓ Good: Filter early
with source as (
select *
from {{ source('app', 'events') }}
where event_date >= '2024-01-01' -- Filter in source CTE
)
-- ✗ Bad: Filter late
with source as (
select * from {{ source('app', 'events') }}
)
select * from source
where event_date >= '2024-01-01' -- Filtering after full scan4. Use Ephemeral for Simple Transformations
sql
-- Avoid creating unnecessary views
{{ config(materialized='ephemeral') }}
select
order_id,
lower(trim(status)) as status_clean
from {{ ref('stg_orders') }}1. 对大表使用增量模型
sql
-- 仅处理新数据
{{ config(materialized='incremental') }}
select * from {{ source('events', 'page_views') }}
{% if is_incremental() %}
where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}2. 利用聚类和分区
sql
{{ config(
materialized='table',
partition_by={'field': 'order_date', 'data_type': 'date'},
cluster_by=['customer_id', 'status']
) }}3. 减少扫描数据量
sql
-- ✓ 良好:尽早过滤
with source as (
select *
from {{ source('app', 'events') }}
where event_date >= '2024-01-01' # 在源CTE中过滤
)
-- ✗ 不良:延迟过滤
with source as (
select * from {{ source('app', 'events') }}
)
select * from source
where event_date >= '2024-01-01' # 全表扫描后过滤4. 对简单转换使用临时模型
sql
-- 避免创建不必要的视图
{{ config(materialized='ephemeral') }}
select
order_id,
lower(trim(status)) as status_clean
from {{ ref('stg_orders') }}Project Structure Best Practices
项目结构最佳实践
1. Layer Your Transformations
Staging → Intermediate → Marts
↓ ↓ ↓
1:1 Purpose-built Business
Sources Logic Entities2. Modularize Complex Logic
sql
-- Instead of one massive model, break it down:
-- intermediate/int_order_items_aggregated.sql
-- intermediate/int_customer_lifetime_value.sql
-- intermediate/int_payment_summaries.sql
-- marts/fct_orders.sql (combines intermediate models)3. Use Consistent File Organization
models/
├── staging/
│ └── [source]/
│ ├── _[source]__sources.yml
│ ├── _[source]__models.yml
│ └── stg_[source]__[table].sql
├── intermediate/
│ └── int_[purpose].sql
└── marts/
└── [business_area]/
├── _[area]__models.yml
└── [model_type]_[entity].sql1. 分层转换
Staging → Intermediate → Marts
↓ ↓ ↓
与源表一一对应 特定用途逻辑 业务实体2. 拆分复杂逻辑
sql
-- 不要使用一个庞大的模型,而是拆分为多个:
-- intermediate/int_order_items_aggregated.sql
-- intermediate/int_customer_lifetime_value.sql
-- intermediate/int_payment_summaries.sql
-- marts/fct_orders.sql(组合中间模型)3. 一致的文件组织
models/
├── staging/
│ └── [source]/
│ ├── _[source]__sources.yml
│ ├── _[source]__models.yml
│ └── stg_[source]__[table].sql
├── intermediate/
│ └── int_[purpose].sql
└── marts/
└── [business_area]/
├── _[area]__models.yml
└── [model_type]_[entity].sqlTesting Strategy
测试策略
1. Test at Multiple Levels
yaml
undefined1. 多级别测试
yaml
undefinedSource tests: Data quality at ingestion
源测试: ingestion阶段的数据质量
sources:
- name: raw_data
tables:
- name: orders
columns:
- name: id tests: [unique, not_null]
- name: orders
columns:
sources:
- name: raw_data
tables:
- name: orders
columns:
- name: id tests: [unique, not_null]
- name: orders
columns:
Model tests: Transformation logic
模型测试:转换逻辑
models:
- name: fct_orders
tests:
- dbt_utils.expression_is_true: expression: "order_total >= 0" columns:
- name: order_id tests: [unique, not_null]
models:
- name: fct_orders
tests:
- dbt_utils.expression_is_true: expression: "order_total >= 0" columns:
- name: order_id tests: [unique, not_null]
Custom tests: Business logic
自定义测试:业务逻辑
tests/assert_revenue_reconciliation.sql
tests/assert_revenue_reconciliation.sql
**2. Use Appropriate Test Severity**
```yaml
**2. 使用合适的测试严重级别**
```yamlCritical tests: error (fail build)
关键测试:error(构建失败)
Nice-to-have tests: warn (log but don't fail)
可选测试:warn(仅记录不失败)
tests:
- unique: severity: error
- dbt_utils.not_null_proportion: at_least: 0.95 severity: warn
**3. Test Coverage Goals**
- 100% of primary keys: unique + not_null
- 100% of foreign keys: relationships tests
- All business logic: custom data tests
- Critical calculations: expression teststests:
- unique: severity: error
- dbt_utils.not_null_proportion: at_least: 0.95 severity: warn
**3. 测试覆盖率目标**
- 100%的主键:unique + not_null
- 100%的外键:relationships测试
- 所有业务逻辑:自定义数据测试
- 关键计算:expression测试Documentation Standards
文档标准
1. Document Every Model
yaml
models:
- name: fct_orders
description: |
**Purpose:** [Why this model exists]
**Grain:** [One row represents...]
**Refresh:** [When and how often]
**Consumers:** [Who uses this]2. Document Complex Logic
sql
-- Use comments for complex business rules
select
order_id,
-- Revenue recognition: Only count completed orders
-- cancelled within 30 days (per finance policy 2024-03)
case
when status = 'completed'
and datediff('day', order_date, current_date) > 30
then order_total
else 0
end as recognized_revenue
from {{ ref('stg_orders') }}3. Keep Docs Updated
- Update docs when logic changes
- Review docs during code reviews
- Generate docs regularly:
dbt docs generate
1. 为所有模型编写文档
yaml
models:
- name: fct_orders
description: |
**用途**:[该模型存在的原因]
**粒度**:[每条记录代表什么]
**刷新频率**:[更新时间和频率]
**使用者**:[谁使用该模型]2. 记录复杂逻辑
sql
-- 对复杂业务规则使用注释
select
order_id,
-- 收入确认:仅统计完成且超过30天的订单(根据2024-03的财务政策)
case
when status = 'completed'
and datediff('day', order_date, current_date) > 30
then order_total
else 0
end as recognized_revenue
from {{ ref('stg_orders') }}3. 保持文档更新
- 逻辑变更时更新文档
- 代码审查时检查文档
- 定期生成文档:
dbt docs generate
20 Detailed Examples
20个详细示例
Example 1: Basic Staging Model
示例1:基础Staging模型
sql
-- models/staging/jaffle_shop/stg_jaffle_shop__customers.sql
with source as (
select * from {{ source('jaffle_shop', 'customers') }}
),
renamed as (
select
id as customer_id,
first_name,
last_name,
first_name || ' ' || last_name as customer_name,
email,
_loaded_at
from source
)
select * from renamedsql
-- models/staging/jaffle_shop/stg_jaffle_shop__customers.sql
with source as (
select * from {{ source('jaffle_shop', 'customers') }}
),
renamed as (
select
id as customer_id,
first_name,
last_name,
first_name || ' ' || last_name as customer_name,
email,
_loaded_at
from source
)
select * from renamedExample 2: Fact Table with Multiple Joins
示例2:多关联的事实表
sql
-- models/marts/core/fct_orders.sql
{{ config(
materialized='table',
tags=['core', 'daily']
) }}
with orders as (
select * from {{ ref('stg_jaffle_shop__orders') }}
),
customers as (
select * from {{ ref('dim_customers') }}
),
payments as (
select
order_id,
sum(amount) as total_payment_amount
from {{ ref('stg_stripe__payments') }}
where status = 'success'
group by 1
),
final as (
select
orders.order_id,
orders.customer_id,
customers.customer_name,
orders.order_date,
orders.status,
coalesce(payments.total_payment_amount, 0) as order_total,
{{ add_audit_columns() }}
from orders
left join customers
on orders.customer_id = customers.customer_id
left join payments
on orders.order_id = payments.order_id
)
select * from finalsql
-- models/marts/core/fct_orders.sql
{{ config(
materialized='table',
tags=['core', 'daily']
) }}
with orders as (
select * from {{ ref('stg_jaffle_shop__orders') }}
),
customers as (
select * from {{ ref('dim_customers') }}
),
payments as (
select
order_id,
sum(amount) as total_payment_amount
from {{ ref('stg_stripe__payments') }}
where status = 'success'
group by 1
),
final as (
select
orders.order_id,
orders.customer_id,
customers.customer_name,
orders.order_date,
orders.status,
coalesce(payments.total_payment_amount, 0) as order_total,
{{ add_audit_columns() }}
from orders
left join customers
on orders.customer_id = customers.customer_id
left join payments
on orders.order_id = payments.order_id
)
select * from finalExample 3: Incremental Event Table
示例3:增量事件表
sql
-- models/marts/analytics/fct_page_views.sql
{{ config(
materialized='incremental',
unique_key='page_view_id',
partition_by={
'field': 'event_date',
'data_type': 'date',
'granularity': 'day'
},
cluster_by=['user_id', 'page_path']
) }}
with events as (
select
event_id as page_view_id,
user_id,
session_id,
event_timestamp,
date(event_timestamp) as event_date,
event_properties:page_path::string as page_path,
event_properties:referrer::string as referrer,
_loaded_at
from {{ source('analytics', 'raw_events') }}
where event_type = 'page_view'
{% if is_incremental() %}
-- Use _loaded_at to catch late-arriving data
and _loaded_at > (select max(_loaded_at) from {{ this }})
{% endif %}
),
enriched as (
select
page_view_id,
user_id,
session_id,
event_timestamp,
event_date,
page_path,
referrer,
-- Parse URL components
split_part(page_path, '?', 1) as page_path_clean,
case
when referrer like '%google%' then 'Google'
when referrer like '%facebook%' then 'Facebook'
when referrer is null then 'Direct'
else 'Other'
end as referrer_source,
_loaded_at
from events
)
select * from enrichedsql
-- models/marts/analytics/fct_page_views.sql
{{ config(
materialized='incremental',
unique_key='page_view_id',
partition_by={
'field': 'event_date',
'data_type': 'date',
'granularity': 'day'
},
cluster_by=['user_id', 'page_path']
) }}
with events as (
select
event_id as page_view_id,
user_id,
session_id,
event_timestamp,
date(event_timestamp) as event_date,
event_properties:page_path::string as page_path,
event_properties:referrer::string as referrer,
_loaded_at
from {{ source('analytics', 'raw_events') }}
where event_type = 'page_view'
{% if is_incremental() %}
-- 使用_loaded_at捕获延迟到达的数据
and _loaded_at > (select max(_loaded_at) from {{ this }})
{% endif %}
),
enriched as (
select
page_view_id,
user_id,
session_id,
event_timestamp,
event_date,
page_path,
referrer,
-- 解析URL组件
split_part(page_path, '?', 1) as page_path_clean,
case
when referrer like '%google%' then 'Google'
when referrer like '%facebook%' then 'Facebook'
when referrer is null then 'Direct'
else 'Other'
end as referrer_source,
_loaded_at
from events
)
select * from enrichedExample 4: Customer Dimension with SCD Type 2
示例4:Type 2缓慢变化维度的客户维度表
sql
-- models/marts/core/dim_customers.sql
{{ config(
materialized='table',
unique_key='customer_key'
) }}
with customers as (
select * from {{ ref('stg_jaffle_shop__customers') }}
),
customer_orders as (
select
customer_id,
min(order_date) as first_order_date,
max(order_date) as most_recent_order_date,
count(order_id) as total_orders
from {{ ref('fct_orders') }}
group by 1
),
final as (
select
{{ dbt_utils.generate_surrogate_key(['customers.customer_id', 'customers._loaded_at']) }}
as customer_key,
customers.customer_id,
customers.customer_name,
customers.email,
customer_orders.first_order_date,
customer_orders.most_recent_order_date,
customer_orders.total_orders,
case
when customer_orders.total_orders >= 10 then 'VIP'
when customer_orders.total_orders >= 5 then 'Regular'
when customer_orders.total_orders >= 1 then 'New'
else 'Prospect'
end as customer_segment,
customers._loaded_at as effective_from,
null as effective_to,
true as is_current
from customers
left join customer_orders
on customers.customer_id = customer_orders.customer_id
)
select * from finalsql
-- models/marts/core/dim_customers.sql
{{ config(
materialized='table',
unique_key='customer_key'
) }}
with customers as (
select * from {{ ref('stg_jaffle_shop__customers') }}
),
customer_orders as (
select
customer_id,
min(order_date) as first_order_date,
max(order_date) as most_recent_order_date,
count(order_id) as total_orders
from {{ ref('fct_orders') }}
group by 1
),
final as (
select
{{ dbt_utils.generate_surrogate_key(['customers.customer_id', 'customers._loaded_at']) }}
as customer_key,
customers.customer_id,
customers.customer_name,
customers.email,
customer_orders.first_order_date,
customer_orders.most_recent_order_date,
customer_orders.total_orders,
case
when customer_orders.total_orders >= 10 then 'VIP'
when customer_orders.total_orders >= 5 then 'Regular'
when customer_orders.total_orders >= 1 then 'New'
else 'Prospect'
end as customer_segment,
customers._loaded_at as effective_from,
null as effective_to,
true as is_current
from customers
left join customer_orders
on customers.customer_id = customer_orders.customer_id
)
select * from finalExample 5: Aggregated Metrics Table
示例5:聚合指标表
sql
-- models/marts/analytics/daily_order_metrics.sql
{{ config(
materialized='incremental',
unique_key=['metric_date', 'status'],
incremental_strategy='delete+insert'
) }}
with orders as (
select * from {{ ref('fct_orders') }}
{% if is_incremental() %}
where order_date >= (select max(metric_date) - interval '7 days' from {{ this }})
{% endif %}
),
daily_metrics as (
select
date_trunc('day', order_date) as metric_date,
status,
count(distinct order_id) as order_count,
count(distinct customer_id) as unique_customers,
sum(order_total) as total_revenue,
avg(order_total) as avg_order_value,
min(order_total) as min_order_value,
max(order_total) as max_order_value,
percentile_cont(0.5) within group (order by order_total) as median_order_value
from orders
group by 1, 2
)
select * from daily_metricssql
-- models/marts/analytics/daily_order_metrics.sql
{{ config(
materialized='incremental',
unique_key=['metric_date', 'status'],
incremental_strategy='delete+insert'
) }}
with orders as (
select * from {{ ref('fct_orders') }}
{% if is_incremental() %}
where order_date >= (select max(metric_date) - interval '7 days' from {{ this }})
{% endif %}
),
daily_metrics as (
select
date_trunc('day', order_date) as metric_date,
status,
count(distinct order_id) as order_count,
count(distinct customer_id) as unique_customers,
sum(order_total) as total_revenue,
avg(order_total) as avg_order_value,
min(order_total) as min_order_value,
max(order_total) as max_order_value,
percentile_cont(0.5) within group (order by order_total) as median_order_value
from orders
group by 1, 2
)
select * from daily_metricsExample 6: Pivoted Metrics Using Macro
示例6:使用宏的透视指标
sql
-- models/marts/analytics/customer_order_status_summary.sql
with orders as (
select
customer_id,
status,
order_total
from {{ ref('fct_orders') }}
)
select
customer_id,
{% for status in ['placed', 'shipped', 'completed', 'returned', 'cancelled'] %}
sum(case when status = '{{ status }}' then 1 else 0 end)
as {{ status }}_count,
sum(case when status = '{{ status }}' then order_total else 0 end)
as {{ status }}_revenue
{% if not loop.last %},{% endif %}
{% endfor %}
from orders
group by 1sql
-- models/marts/analytics/customer_order_status_summary.sql
with orders as (
select
customer_id,
status,
order_total
from {{ ref('fct_orders') }}
)
select
customer_id,
{% for status in ['placed', 'shipped', 'completed', 'returned', 'cancelled'] %}
sum(case when status = '{{ status }}' then 1 else 0 end)
as {{ status }}_count,
sum(case when status = '{{ status }}' then order_total else 0 end)
as {{ status }}_revenue
{% if not loop.last %},{% endif %}
{% endfor %}
from orders
group by 1Example 7: Snapshot for SCD Type 2
示例7:Type 2缓慢变化维度的快照
sql
-- snapshots/customers_snapshot.sql
{% snapshot customers_snapshot %}
{{
config(
target_schema='snapshots',
target_database='analytics',
unique_key='customer_id',
strategy='timestamp',
updated_at='updated_at',
invalidate_hard_deletes=True
)
}}
select
customer_id,
customer_name,
email,
customer_segment,
updated_at
from {{ source('jaffle_shop', 'customers') }}
{% endsnapshot %}sql
-- snapshots/customers_snapshot.sql
{% snapshot customers_snapshot %}
{{
config(
target_schema='snapshots',
target_database='analytics',
unique_key='customer_id',
strategy='timestamp',
updated_at='updated_at',
invalidate_hard_deletes=True
)
}}
select
customer_id,
customer_name,
email,
customer_segment,
updated_at
from {{ source('jaffle_shop', 'customers') }}
{% endsnapshot %}Example 8: Funnel Analysis Model
示例8:漏斗分析模型
sql
-- models/marts/analytics/conversion_funnel.sql
with page_views as (
select
user_id,
session_id,
min(event_timestamp) as session_start
from {{ ref('fct_page_views') }}
where event_date >= current_date - interval '30 days'
group by 1, 2
),
product_views as (
select distinct
user_id,
session_id
from {{ ref('fct_page_views') }}
where page_path like '/product/%'
and event_date >= current_date - interval '30 days'
),
add_to_cart as (
select distinct
user_id,
session_id
from {{ ref('fct_events') }}
where event_type = 'add_to_cart'
and event_date >= current_date - interval '30 days'
),
checkout_started as (
select distinct
user_id,
session_id
from {{ ref('fct_events') }}
where event_type = 'checkout_started'
and event_date >= current_date - interval '30 days'
),
orders as (
select distinct
customer_id as user_id,
session_id
from {{ ref('fct_orders') }}
where order_date >= current_date - interval '30 days'
and status = 'completed'
),
funnel as (
select
count(distinct page_views.session_id) as sessions,
count(distinct product_views.session_id) as product_views,
count(distinct add_to_cart.session_id) as add_to_cart,
count(distinct checkout_started.session_id) as checkout_started,
count(distinct orders.session_id) as completed_orders
from page_views
left join product_views using (session_id)
left join add_to_cart using (session_id)
left join checkout_started using (session_id)
left join orders using (session_id)
),
funnel_metrics as (
select
sessions,
product_views,
round(100.0 * product_views / nullif(sessions, 0), 2) as pct_product_views,
add_to_cart,
round(100.0 * add_to_cart / nullif(product_views, 0), 2) as pct_add_to_cart,
checkout_started,
round(100.0 * checkout_started / nullif(add_to_cart, 0), 2) as pct_checkout_started,
completed_orders,
round(100.0 * completed_orders / nullif(checkout_started, 0), 2) as pct_completed_orders,
round(100.0 * completed_orders / nullif(sessions, 0), 2) as overall_conversion_rate
from funnel
)
select * from funnel_metricssql
-- models/marts/analytics/conversion_funnel.sql
with page_views as (
select
user_id,
session_id,
min(event_timestamp) as session_start
from {{ ref('fct_page_views') }}
where event_date >= current_date - interval '30 days'
group by 1, 2
),
product_views as (
select distinct
user_id,
session_id
from {{ ref('fct_page_views') }}
where page_path like '/product/%'
and event_date >= current_date - interval '30 days'
),
add_to_cart as (
select distinct
user_id,
session_id
from {{ ref('fct_events') }}
where event_type = 'add_to_cart'
and event_date >= current_date - interval '30 days'
),
checkout_started as (
select distinct
user_id,
session_id
from {{ ref('fct_events') }}
where event_type = 'checkout_started'
and event_date >= current_date - interval '30 days'
),
orders as (
select distinct
customer_id as user_id,
session_id
from {{ ref('fct_orders') }}
where order_date >= current_date - interval '30 days'
and status = 'completed'
),
funnel as (
select
count(distinct page_views.session_id) as sessions,
count(distinct product_views.session_id) as product_views,
count(distinct add_to_cart.session_id) as add_to_cart,
count(distinct checkout_started.session_id) as checkout_started,
count(distinct orders.session_id) as completed_orders
from page_views
left join product_views using (session_id)
left join add_to_cart using (session_id)
left join checkout_started using (session_id)
left join orders using (session_id)
),
funnel_metrics as (
select
sessions,
product_views,
round(100.0 * product_views / nullif(sessions, 0), 2) as pct_product_views,
add_to_cart,
round(100.0 * add_to_cart / nullif(product_views, 0), 2) as pct_add_to_cart,
checkout_started,
round(100.0 * checkout_started / nullif(add_to_cart, 0), 2) as pct_checkout_started,
completed_orders,
round(100.0 * completed_orders / nullif(checkout_started, 0), 2) as pct_completed_orders,
round(100.0 * completed_orders / nullif(sessions, 0), 2) as overall_conversion_rate
from funnel
)
select * from funnel_metricsExample 9: Cohort Retention Analysis
示例9:同期群留存分析
sql
-- models/marts/analytics/cohort_retention.sql
with customer_orders as (
select
customer_id,
date_trunc('month', order_date) as order_month
from {{ ref('fct_orders') }}
where status = 'completed'
),
first_order as (
select
customer_id,
min(order_month) as cohort_month
from customer_orders
group by 1
),
cohort_data as (
select
f.cohort_month,
c.order_month,
datediff('month', f.cohort_month, c.order_month) as months_since_first_order,
count(distinct c.customer_id) as customer_count
from first_order f
join customer_orders c
on f.customer_id = c.customer_id
group by 1, 2, 3
),
cohort_size as (
select
cohort_month,
customer_count as cohort_size
from cohort_data
where months_since_first_order = 0
),
retention as (
select
cohort_data.cohort_month,
cohort_data.months_since_first_order,
cohort_data.customer_count,
cohort_size.cohort_size,
round(100.0 * cohort_data.customer_count / cohort_size.cohort_size, 2) as retention_pct
from cohort_data
join cohort_size
on cohort_data.cohort_month = cohort_size.cohort_month
)
select * from retention
order by cohort_month, months_since_first_ordersql
-- models/marts/analytics/cohort_retention.sql
with customer_orders as (
select
customer_id,
date_trunc('month', order_date) as order_month
from {{ ref('fct_orders') }}
where status = 'completed'
),
first_order as (
select
customer_id,
min(order_month) as cohort_month
from customer_orders
group by 1
),
cohort_data as (
select
f.cohort_month,
c.order_month,
datediff('month', f.cohort_month, c.order_month) as months_since_first_order,
count(distinct c.customer_id) as customer_count
from first_order f
join customer_orders c
on f.customer_id = c.customer_id
group by 1, 2, 3
),
cohort_size as (
select
cohort_month,
customer_count as cohort_size
from cohort_data
where months_since_first_order = 0
),
retention as (
select
cohort_data.cohort_month,
cohort_data.months_since_first_order,
cohort_data.customer_count,
cohort_size.cohort_size,
round(100.0 * cohort_data.customer_count / cohort_size.cohort_size, 2) as retention_pct
from cohort_data
join cohort_size
on cohort_data.cohort_month = cohort_size.cohort_month
)
select * from retention
order by cohort_month, months_since_first_orderExample 10: Revenue Attribution Model
示例10:收入归因模型
sql
-- models/marts/analytics/revenue_attribution.sql
with touchpoints as (
select
user_id,
session_id,
event_timestamp,
case
when referrer like '%google%' then 'Google'
when referrer like '%facebook%' then 'Facebook'
when referrer like '%email%' then 'Email'
when referrer is null then 'Direct'
else 'Other'
end as channel
from {{ ref('fct_page_views') }}
),
customer_journeys as (
select
t.user_id,
o.order_id,
o.order_total,
t.channel,
t.event_timestamp,
o.order_date,
row_number() over (
partition by o.order_id
order by t.event_timestamp
) as touchpoint_number,
count(*) over (partition by o.order_id) as total_touchpoints
from touchpoints t
join {{ ref('fct_orders') }} o
on t.user_id = o.customer_id
and t.event_timestamp <= o.order_date
and t.event_timestamp >= dateadd('day', -30, o.order_date)
),
attributed_revenue as (
select
order_id,
channel,
order_total,
-- First touch attribution
case when touchpoint_number = 1
then order_total else 0 end as first_touch_revenue,
-- Last touch attribution
case when touchpoint_number = total_touchpoints
then order_total else 0 end as last_touch_revenue,
-- Linear attribution
order_total / total_touchpoints as linear_revenue,
-- Time decay (more recent touchpoints get more credit)
order_total * (power(2, touchpoint_number - 1) /
(power(2, total_touchpoints) - 1)) as time_decay_revenue
from customer_journeys
)
select
channel,
count(distinct order_id) as orders,
sum(first_touch_revenue) as first_touch_revenue,
sum(last_touch_revenue) as last_touch_revenue,
sum(linear_revenue) as linear_revenue,
sum(time_decay_revenue) as time_decay_revenue
from attributed_revenue
group by 1sql
-- models/marts/analytics/revenue_attribution.sql
with touchpoints as (
select
user_id,
session_id,
event_timestamp,
case
when referrer like '%google%' then 'Google'
when referrer like '%facebook%' then 'Facebook'
when referrer like '%email%' then 'Email'
when referrer is null then 'Direct'
else 'Other'
end as channel
from {{ ref('fct_page_views') }}
),
customer_journeys as (
select
t.user_id,
o.order_id,
o.order_total,
t.channel,
t.event_timestamp,
o.order_date,
row_number() over (
partition by o.order_id
order by t.event_timestamp
) as touchpoint_number,
count(*) over (partition by o.order_id) as total_touchpoints
from touchpoints t
join {{ ref('fct_orders') }} o
on t.user_id = o.customer_id
and t.event_timestamp <= o.order_date
and t.event_timestamp >= dateadd('day', -30, o.order_date)
),
attributed_revenue as (
select
order_id,
channel,
order_total,
-- 首次触点归因
case when touchpoint_number = 1
then order_total else 0 end as first_touch_revenue,
-- 末次触点归因
case when touchpoint_number = total_touchpoints
then order_total else 0 end as last_touch_revenue,
-- 线性归因
order_total / total_touchpoints as linear_revenue,
-- 时间衰减(越近的触点获得更多权重)
order_total * (power(2, touchpoint_number - 1) /
(power(2, total_touchpoints) - 1)) as time_decay_revenue
from customer_journeys
)
select
channel,
count(distinct order_id) as orders,
sum(first_touch_revenue) as first_touch_revenue,
sum(last_touch_revenue) as last_touch_revenue,
sum(linear_revenue) as linear_revenue,
sum(time_decay_revenue) as time_decay_revenue
from attributed_revenue
group by 1Example 11: Data Quality Test Suite
示例11:数据质量测试套件
sql
-- tests/assert_fct_orders_quality.sql
-- Test multiple data quality rules in one test
with order_quality_checks as (
select
order_id,
customer_id,
order_date,
order_total,
status,
-- Check 1: Order total should be positive
case when order_total < 0
then 'Negative order total' end as check_1,
-- Check 2: Order date should not be in future
case when order_date > current_date
then 'Future order date' end as check_2,
-- Check 3: Customer ID should exist
case when customer_id is null
then 'Missing customer ID' end as check_3,
-- Check 4: Status should be valid
case when status not in ('placed', 'shipped', 'completed', 'returned', 'cancelled')
then 'Invalid status' end as check_4
from {{ ref('fct_orders') }}
),
failed_checks as (
select
order_id,
check_1,
check_2,
check_3,
check_4
from order_quality_checks
where check_1 is not null
or check_2 is not null
or check_3 is not null
or check_4 is not null
)
select * from failed_checkssql
-- tests/assert_fct_orders_quality.sql
-- 在一个测试中测试多个数据质量规则
with order_quality_checks as (
select
order_id,
customer_id,
order_date,
order_total,
status,
-- 检查1:订单总金额应为正数
case when order_total < 0
then 'Negative order total' end as check_1,
-- 检查2:订单日期不应在未来
case when order_date > current_date
then 'Future order date' end as check_2,
-- 检查3:客户ID不应为空
case when customer_id is null
then 'Missing customer ID' end as check_3,
-- 检查4:状态应有效
case when status not in ('placed', 'shipped', 'completed', 'returned', 'cancelled')
then 'Invalid status' end as check_4
from {{ ref('fct_orders') }}
),
failed_checks as (
select
order_id,
check_1,
check_2,
check_3,
check_4
from order_quality_checks
where check_1 is not null
or check_2 is not null
or check_3 is not null
or check_4 is not null
)
select * from failed_checksExample 12: Slowly Changing Dimension Merge
示例12:缓慢变化维度的合并
sql
-- models/marts/core/dim_products_scd.sql
{{ config(
materialized='incremental',
unique_key='product_key',
merge_update_columns=['product_name', 'category', 'price', 'effective_to', 'is_current']
) }}
with source_data as (
select
product_id,
product_name,
category,
price,
updated_at
from {{ source('ecommerce', 'products') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }} where is_current = true)
{% endif %}
),
{% if is_incremental() %}
existing_records as (
select *
from {{ this }}
where is_current = true
),
changed_records as (
select
s.product_id,
s.product_name,
s.category,
s.price,
s.updated_at
from source_data s
join existing_records e
on s.product_id = e.product_id
and (
s.product_name != e.product_name
or s.category != e.category
or s.price != e.price
)
),
expire_old_records as (
select
e.product_key,
e.product_id,
e.product_name,
e.category,
e.price,
e.effective_from,
c.updated_at as effective_to,
false as is_current,
e.updated_at
from existing_records e
join changed_records c
on e.product_id = c.product_id
),
new_versions as (
select
{{ dbt_utils.generate_surrogate_key(['c.product_id', 'c.updated_at']) }} as product_key,
c.product_id,
c.product_name,
c.category,
c.price,
c.updated_at as effective_from,
null::timestamp as effective_to,
true as is_current,
c.updated_at
from changed_records c
),
combined as (
select * from expire_old_records
union all
select * from new_versions
)
select * from combined
{% else %}
-- First load: all records are current
select
{{ dbt_utils.generate_surrogate_key(['product_id', 'updated_at']) }} as product_key,
product_id,
product_name,
category,
price,
updated_at as effective_from,
null::timestamp as effective_to,
true as is_current,
updated_at
from source_data
{% endif %}sql
-- models/marts/core/dim_products_scd.sql
{{ config(
materialized='incremental',
unique_key='product_key',
merge_update_columns=['product_name', 'category', 'price', 'effective_to', 'is_current']
) }}
with source_data as (
select
product_id,
product_name,
category,
price,
updated_at
from {{ source('ecommerce', 'products') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }} where is_current = true)
{% endif %}
),
{% if is_incremental() %}
existing_records as (
select *
from {{ this }}
where is_current = true
),
changed_records as (
select
s.product_id,
s.product_name,
s.category,
s.price,
s.updated_at
from source_data s
join existing_records e
on s.product_id = e.product_id
and (
s.product_name != e.product_name
or s.category != e.category
or s.price != e.price
)
),
expire_old_records as (
select
e.product_key,
e.product_id,
e.product_name,
e.category,
e.price,
e.effective_from,
c.updated_at as effective_to,
false as is_current,
e.updated_at
from existing_records e
join changed_records c
on e.product_id = c.product_id
),
new_versions as (
select
{{ dbt_utils.generate_surrogate_key(['c.product_id', 'c.updated_at']) }} as product_key,
c.product_id,
c.product_name,
c.category,
c.price,
c.updated_at as effective_from,
null::timestamp as effective_to,
true as is_current,
c.updated_at
from changed_records c
),
combined as (
select * from expire_old_records
union all
select * from new_versions
)
select * from combined
{% else %}
-- 首次加载:所有记录都是当前版本
select
{{ dbt_utils.generate_surrogate_key(['product_id', 'updated_at']) }} as product_key,
product_id,
product_name,
category,
price,
updated_at as effective_from,
null::timestamp as effective_to,
true as is_current,
updated_at
from source_data
{% endif %}Example 13: Window Functions for Rankings
示例13:用于排名的窗口函数
sql
-- models/marts/analytics/customer_rfm_score.sql
with customer_metrics as (
select
customer_id,
max(order_date) as last_order_date,
count(order_id) as total_orders,
sum(order_total) as total_revenue
from {{ ref('fct_orders') }}
where status = 'completed'
group by 1
),
rfm_calculations as (
select
customer_id,
-- Recency: Days since last order
datediff('day', last_order_date, current_date) as recency_days,
-- Frequency: Total orders
total_orders as frequency,
-- Monetary: Total revenue
total_revenue as monetary,
-- Recency score (1-5, lower days = higher score)
ntile(5) over (order by datediff('day', last_order_date, current_date) desc) as recency_score,
-- Frequency score (1-5, more orders = higher score)
ntile(5) over (order by total_orders) as frequency_score,
-- Monetary score (1-5, more revenue = higher score)
ntile(5) over (order by total_revenue) as monetary_score
from customer_metrics
),
rfm_segments as (
select
customer_id,
recency_days,
frequency,
monetary,
recency_score,
frequency_score,
monetary_score,
recency_score * 100 + frequency_score * 10 + monetary_score as rfm_score,
case
when recency_score >= 4 and frequency_score >= 4 and monetary_score >= 4
then 'Champions'
when recency_score >= 3 and frequency_score >= 3 and monetary_score >= 3
then 'Loyal Customers'
when recency_score >= 4 and frequency_score <= 2 and monetary_score <= 2
then 'Promising'
when recency_score >= 3 and frequency_score <= 2 and monetary_score <= 2
then 'Potential Loyalists'
when recency_score <= 2 and frequency_score >= 3 and monetary_score >= 3
then 'At Risk'
when recency_score <= 2 and frequency_score <= 2 and monetary_score <= 2
then 'Hibernating'
when recency_score <= 1
then 'Lost'
else 'Need Attention'
end as customer_segment
from rfm_calculations
)
select * from rfm_segmentssql
-- models/marts/analytics/customer_rfm_score.sql
with customer_metrics as (
select
customer_id,
max(order_date) as last_order_date,
count(order_id) as total_orders,
sum(order_total) as total_revenue
from {{ ref('fct_orders') }}
where status = 'completed'
group by 1
),
rfm_calculations as (
select
customer_id,
-- 最近一次购买:距离上次购买的天数
datediff('day', last_order_date, current_date) as recency_days,
-- 购买频率:总订单数
total_orders as frequency,
-- 消费金额:总收入
total_revenue as monetary,
-- 最近一次购买得分(1-5,天数越少得分越高)
ntile(5) over (order by datediff('day', last_order_date, current_date) desc) as recency_score,
-- 购买频率得分(1-5,订单数越多得分越高)
ntile(5) over (order by total_orders) as frequency_score,
-- 消费金额得分(1-5,收入越高得分越高)
ntile(5) over (order by total_revenue) as monetary_score
from customer_metrics
),
rfm_segments as (
select
customer_id,
recency_days,
frequency,
monetary,
recency_score,
frequency_score,
monetary_score,
recency_score * 100 + frequency_score * 10 + monetary_score as rfm_score,
case
when recency_score >= 4 and frequency_score >= 4 and monetary_score >= 4
then 'Champions'
when recency_score >= 3 and frequency_score >= 3 and monetary_score >= 3
then 'Loyal Customers'
when recency_score >= 4 and frequency_score <= 2 and monetary_score <= 2
then 'Promising'
when recency_score >= 3 and frequency_score <= 2 and monetary_score <= 2
then 'Potential Loyalists'
when recency_score <= 2 and frequency_score >= 3 and monetary_score >= 3
then 'At Risk'
when recency_score <= 2 and frequency_score <= 2 and monetary_score <= 2
then 'Hibernating'
when recency_score <= 1
then 'Lost'
else 'Need Attention'
end as customer_segment
from rfm_calculations
)
select * from rfm_segmentsExample 14: Union Multiple Sources
示例14:合并多个数据源
sql
-- models/staging/stg_all_events.sql
{{ config(
materialized='view'
) }}
-- Union events from multiple sources using dbt_utils
{{
dbt_utils.union_relations(
relations=[
ref('stg_web_events'),
ref('stg_mobile_events'),
ref('stg_api_events')
],
exclude=['_loaded_at'], -- Exclude source-specific columns
source_column_name='event_source' -- Add column to track source
)
}}sql
-- models/staging/stg_all_events.sql
{{ config(
materialized='view'
) }}
-- 使用dbt_utils合并多个数据源的事件
{{
dbt_utils.union_relations(
relations=[
ref('stg_web_events'),
ref('stg_mobile_events'),
ref('stg_api_events')
],
exclude=['_loaded_at'], -- 排除源特定列
source_column_name='event_source' -- 添加列跟踪数据源
)
}}Example 15: Surrogate Key Generation
示例15:代理键生成
sql
-- models/marts/core/fct_order_lines.sql
with order_lines as (
select
order_id,
line_number,
product_id,
quantity,
unit_price,
quantity * unit_price as line_total
from {{ source('ecommerce', 'order_lines') }}
)
select
{{ dbt_utils.generate_surrogate_key(['order_id', 'line_number']) }} as order_line_key,
{{ dbt_utils.generate_surrogate_key(['order_id']) }} as order_key,
{{ dbt_utils.generate_surrogate_key(['product_id']) }} as product_key,
order_id,
line_number,
product_id,
quantity,
unit_price,
line_total
from order_linessql
-- models/marts/core/fct_order_lines.sql
with order_lines as (
select
order_id,
line_number,
product_id,
quantity,
unit_price,
quantity * unit_price as line_total
from {{ source('ecommerce', 'order_lines') }}
)
select
{{ dbt_utils.generate_surrogate_key(['order_id', 'line_number']) }} as order_line_key,
{{ dbt_utils.generate_surrogate_key(['order_id']) }} as order_key,
{{ dbt_utils.generate_surrogate_key(['product_id']) }} as product_key,
order_id,
line_number,
product_id,
quantity,
unit_price,
line_total
from order_linesExample 16: Date Spine for Time Series
示例16:时间序列的日期序列
sql
-- models/marts/analytics/daily_revenue_complete.sql
-- Generate complete date spine to ensure no missing dates
with date_spine as (
{{ dbt_utils.date_spine(
datepart="day",
start_date="cast('2020-01-01' as date)",
end_date="cast(current_date as date)"
) }}
),
daily_revenue as (
select
date_trunc('day', order_date) as order_date,
sum(order_total) as revenue
from {{ ref('fct_orders') }}
where status = 'completed'
group by 1
),
complete_series as (
select
date_spine.date_day,
coalesce(daily_revenue.revenue, 0) as revenue,
-- 7-day moving average
avg(coalesce(daily_revenue.revenue, 0)) over (
order by date_spine.date_day
rows between 6 preceding and current row
) as revenue_7d_ma,
-- Month-to-date revenue
sum(coalesce(daily_revenue.revenue, 0)) over (
partition by date_trunc('month', date_spine.date_day)
order by date_spine.date_day
) as revenue_mtd
from date_spine
left join daily_revenue
on date_spine.date_day = daily_revenue.order_date
)
select * from complete_seriessql
-- models/marts/analytics/daily_revenue_complete.sql
-- 生成完整的日期序列以确保无缺失日期
with date_spine as (
{{ dbt_utils.date_spine(
datepart="day",
start_date="cast('2020-01-01' as date)",
end_date="cast(current_date as date)"
) }}
),
daily_revenue as (
select
date_trunc('day', order_date) as order_date,
sum(order_total) as revenue
from {{ ref('fct_orders') }}
where status = 'completed'
group by 1
),
complete_series as (
select
date_spine.date_day,
coalesce(daily_revenue.revenue, 0) as revenue,
-- 7天移动平均
avg(coalesce(daily_revenue.revenue, 0)) over (
order by date_spine.date_day
rows between 6 preceding and current row
) as revenue_7d_ma,
-- 当月累计收入
sum(coalesce(daily_revenue.revenue, 0)) over (
partition by date_trunc('month', date_spine.date_day)
order by date_spine.date_day
) as revenue_mtd
from date_spine
left join daily_revenue
on date_spine.date_day = daily_revenue.order_date
)
select * from complete_seriesExample 17: Custom Schema Macro Override
示例17:自定义模式宏覆盖
sql
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if target.name == 'prod' -%}
-- Production: Use custom schema names directly
{%- if custom_schema_name is not none -%}
{{ custom_schema_name | trim }}
{%- else -%}
{{ default_schema }}
{%- endif -%}
{%- elif target.name == 'dev' -%}
-- Development: Prefix with dev_username
{%- if custom_schema_name is not none -%}
dev_{{ env_var('DBT_USER', 'unknown') }}_{{ custom_schema_name | trim }}
{%- else -%}
dev_{{ env_var('DBT_USER', 'unknown') }}
{%- endif -%}
{%- else -%}
-- Default: Concatenate target schema with custom schema
{{ default_schema }}_{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}sql
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if target.name == 'prod' -%}
-- 生产环境:直接使用自定义模式名
{%- if custom_schema_name is not none -%}
{{ custom_schema_name | trim }}
{%- else -%}
{{ default_schema }}
{%- endif -%}
{%- elif target.name == 'dev' -%}
-- 开发环境:前缀为dev_username
{%- if custom_schema_name is not none -%}
dev_{{ env_var('DBT_USER', 'unknown') }}_{{ custom_schema_name | trim }}
{%- else -%}
dev_{{ env_var('DBT_USER', 'unknown') }}
{%- endif -%}
{%- else -%}
-- 默认:拼接目标模式和自定义模式
{{ default_schema }}_{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}Example 18: Cross-Database Query Macro
示例18:跨数据库查询宏
sql
-- macros/cross_db_concat.sql
-- Handle database-specific concat syntax
{% macro concat(fields) -%}
{{ return(adapter.dispatch('concat', 'dbt_utils')(fields)) }}
{%- endmacro %}
{% macro default__concat(fields) -%}
concat({{ fields|join(', ') }})
{%- endmacro %}
{% macro snowflake__concat(fields) -%}
{{ fields|join(' || ') }}
{%- endmacro %}
{% macro bigquery__concat(fields) -%}
concat({{ fields|join(', ') }})
{%- endmacro %}
{% macro redshift__concat(fields) -%}
{{ fields|join(' || ') }}
{%- endmacro %}Usage:
sql
select
{{ concat(['first_name', "' '", 'last_name']) }} as full_name
from {{ ref('stg_customers') }}sql
-- macros/cross_db_concat.sql
-- 处理数据库特定的拼接语法
{% macro concat(fields) -%}
{{ return(adapter.dispatch('concat', 'dbt_utils')(fields)) }}
{%- endmacro %}
{% macro default__concat(fields) -%}
concat({{ fields|join(', ') }})
{%- endmacro %}
{% macro snowflake__concat(fields) -%}
{{ fields|join(' || ') }}
{%- endmacro %}
{% macro bigquery__concat(fields) -%}
concat({{ fields|join(', ') }})
{%- endmacro %}
{% macro redshift__concat(fields) -%}
{{ fields|join(' || ') }}
{%- endmacro %}使用方式:
sql
select
{{ concat(['first_name', "' '", 'last_name']) }} as full_name
from {{ ref('stg_customers') }}Example 19: Pre-Hook and Post-Hook Configuration
示例19:预钩子和后钩子配置
sql
-- models/marts/core/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
pre_hook=[
"delete from {{ this }} where order_date < dateadd('year', -3, current_date)",
"{{ log('Starting incremental load for fct_orders', info=True) }}"
],
post_hook=[
"create index if not exists idx_fct_orders_customer_id on {{ this }}(customer_id)",
"create index if not exists idx_fct_orders_order_date on {{ this }}(order_date)",
"{{ grant_select(this, 'analyst_role') }}",
"{{ log('Completed incremental load for fct_orders', info=True) }}"
],
tags=['core', 'incremental']
)
}}
select * from {{ ref('stg_orders') }}
{% if is_incremental() %}
where order_date > (select max(order_date) from {{ this }})
{% endif %}sql
-- models/marts/core/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
pre_hook=[
"delete from {{ this }} where order_date < dateadd('year', -3, current_date)",
"{{ log('Starting incremental load for fct_orders', info=True) }}"
],
post_hook=[
"create index if not exists idx_fct_orders_customer_id on {{ this }}(customer_id)",
"create index if not exists idx_fct_orders_order_date on {{ this }}(order_date)",
"{{ grant_select(this, 'analyst_role') }}",
"{{ log('Completed incremental load for fct_orders', info=True) }}"
],
tags=['core', 'incremental']
)
}}
select * from {{ ref('stg_orders') }}
{% if is_incremental() %}
where order_date > (select max(order_date) from {{ this }})
{% endif %}Example 20: Exposure Definition
示例20:暴露对象定义
yaml
undefinedyaml
undefinedmodels/exposures.yml
models/exposures.yml
version: 2
exposures:
-
name: customer_dashboard description: | Executive dashboard showing customer metrics including:
- Customer acquisition trends
- Customer lifetime value
- Retention rates
- RFM segmentation type: dashboard maturity: high url: https://looker.company.com/dashboards/customer-metrics
owner: name: Analytics Team email: analytics@company.comdepends_on:- ref('fct_orders')
- ref('dim_customers')
- ref('customer_rfm_score')
- ref('cohort_retention')
tags: ['executive', 'customer-analytics'] -
name: revenue_forecast_model description: | Machine learning model for revenue forecasting. Uses historical order data to predict future revenue. type: ml maturity: medium url: https://mlflow.company.com/models/revenue-forecastowner: name: Data Science Team email: datascience@company.comdepends_on:
- ref('fct_orders')
- ref('daily_revenue_complete')
tags: ['ml', 'forecasting']
undefinedversion: 2
exposures:
-
name: customer_dashboard description: | 高管仪表盘,显示客户指标,包括:
- 客户获取趋势
- 客户生命周期价值
- 留存率
- RFM细分 type: dashboard maturity: high url: https://looker.company.com/dashboards/customer-metrics
owner: name: Analytics Team email: analytics@company.comdepends_on:- ref('fct_orders')
- ref('dim_customers')
- ref('customer_rfm_score')
- ref('cohort_retention')
tags: ['executive', 'customer-analytics'] -
name: revenue_forecast_model description: | 收入预测的机器学习模型。 使用历史订单数据预测未来收入。 type: ml maturity: medium url: https://mlflow.company.com/models/revenue-forecastowner: name: Data Science Team email: datascience@company.comdepends_on:
- ref('fct_orders')
- ref('daily_revenue_complete')
tags: ['ml', 'forecasting']
undefinedQuick Reference Commands
快速参考命令
Essential dbt Commands
核心dbt命令
bash
undefinedbash
undefinedInstall dependencies
安装依赖
dbt deps
dbt deps
Compile project (check for errors)
编译项目(检查错误)
dbt compile
dbt compile
Run all models
运行所有模型
dbt run
dbt run
Run specific model
运行特定模型
dbt run --select fct_orders
dbt run --select fct_orders
Run model and downstream dependencies
运行模型和下游依赖
dbt run --select fct_orders+
dbt run --select fct_orders+
Run model and upstream dependencies
运行模型和上游依赖
dbt run --select +fct_orders
dbt run --select +fct_orders
Run model and all dependencies
运行模型和所有依赖
dbt run --select +fct_orders+
dbt run --select +fct_orders+
Run all models in a directory
运行目录中的所有模型
dbt run --select staging.*
dbt run --select staging.*
Run models with specific tag
运行带有特定标签的模型
dbt run --select tag:daily
dbt run --select tag:daily
Run models, exclude specific ones
运行模型,排除特定模型
dbt run --exclude staging.*
dbt run --exclude staging.*
Run with full refresh (incremental models)
全量刷新运行(增量模型)
dbt run --full-refresh
dbt run --full-refresh
Test all models
测试所有模型
dbt test
dbt test
Test specific model
测试特定模型
dbt test --select fct_orders
dbt test --select fct_orders
Generate documentation
生成文档
dbt docs generate
dbt docs generate
Serve documentation
启动文档服务
dbt docs serve
dbt docs serve
Debug connection
调试连接
dbt debug
dbt debug
Clean compiled files
清理编译文件
dbt clean
dbt clean
Seed CSV files
加载CSV种子数据
dbt seed
dbt seed
Snapshot models
快照模型
dbt snapshot
dbt snapshot
List resources
列出资源
dbt ls --select staging.*
dbt ls --select staging.*
Show compiled SQL
显示编译后的SQL
dbt show --select fct_orders
dbt show --select fct_orders
Parse project
解析项目
dbt parse
undefineddbt parse
undefinedModel Selection Syntax
模型选择语法
bash
undefinedbash
undefinedBy name
按名称
--select model_name
--select model_name
By path
按路径
--select staging.jaffle_shop.*
--select staging.jaffle_shop.*
By tag
按标签
--select tag:daily
--select tag:daily
By resource type
按资源类型
--select resource_type:model
--select resource_type:model
By package
按包
--select package:dbt_utils
--select package:dbt_utils
By status (modified, new)
按状态(修改、新增)
--select state:modified+ --state ./prod_manifest
--select state:modified+ --state ./prod_manifest
Combinations (union)
组合(并集)
--select model_a model_b
--select model_a model_b
Intersections
交集
--select tag:daily,staging.*
--select tag:daily,staging.*
Graph operators
图操作符
--select +model_name # Upstream dependencies
--select model_name+ # Downstream dependencies
--select +model_name+ # All dependencies
--select @model_name # Model + children/parents to nth degree
undefined--select +model_name # 上游依赖
--select model_name+ # 下游依赖
--select +model_name+ # 所有依赖
--select @model_name # 模型及其n级子/父依赖
undefinedResources
资源
- Official dbt Documentation: https://docs.getdbt.com/
- dbt Discourse Community: https://discourse.getdbt.com/
- dbt GitHub Repository: https://github.com/dbt-labs/dbt-core
- dbt Package Hub: https://hub.getdbt.com/
- dbt Learn: https://courses.getdbt.com/
- dbt Style Guide: https://github.com/dbt-labs/corp/blob/main/dbt_style_guide.md
- Analytics Engineering Guide: https://www.getdbt.com/analytics-engineering/
- dbt Slack Community: https://www.getdbt.com/community/join-the-community/
Skill Version: 1.0.0
Last Updated: October 2025
Skill Category: Data Engineering, Analytics Engineering, Data Transformation
Compatible With: dbt Core 1.0+, dbt Cloud, Snowflake, BigQuery, Redshift, Postgres, Databricks
- 官方dbt文档:https://docs.getdbt.com/
- dbt Discourse社区:https://discourse.getdbt.com/
- dbt GitHub仓库:https://github.com/dbt-labs/dbt-core
- dbt包中心:https://hub.getdbt.com/
- dbt学习平台:https://courses.getdbt.com/
- dbt风格指南:https://github.com/dbt-labs/corp/blob/main/dbt_style_guide.md
- 分析工程指南:https://www.getdbt.com/analytics-engineering/
- dbt Slack社区:https://www.getdbt.com/community/join-the-community/
技能版本:1.0.0
最后更新:2025年10月
技能分类:数据工程、分析工程、数据转换
兼容版本:dbt Core 1.0+, dbt Cloud, Snowflake, BigQuery, Redshift, Postgres, Databricks