Loading...
Loading...
Complete guide for dbt data transformation including models, tests, documentation, incremental builds, macros, packages, and production workflows
npx skill4agent add manutej/luxor-claude-marketplace dbt-data-transformation1. Develop: Write SQL SELECT statements as models
2. Test: Define data quality tests
3. Document: Add descriptions and metadata
4. Build: dbt run compiles and executes models
5. Test: dbt test validates data quality
6. Deploy: CI/CD pipelines deploy to production.sql-- models/staging/stg_orders.sql
with source as (
select * from {{ source('jaffle_shop', 'orders') }}
),
renamed as (
select
id as order_id,
user_id as customer_id,
order_date,
status,
_etl_loaded_at
from source
)
select * from renamed{{ source() }}{{ ref() }}-- models/marts/fct_orders.sql
with orders as (
select * from {{ ref('stg_orders') }}
),
customers as (
select * from {{ ref('stg_customers') }}
),
joined as (
select
orders.order_id,
orders.order_date,
customers.customer_name,
orders.status
from orders
left join customers
on orders.customer_id = customers.customer_id
)
select * from joined# models/staging/sources.yml
version: 2
sources:
- name: jaffle_shop
description: Raw data from the Jaffle Shop application
database: raw
schema: jaffle_shop
tables:
- name: orders
description: One record per order
columns:
- name: id
description: Primary key for orders
tests:
- unique
- not_null
- name: user_id
description: Foreign key to customers
- name: order_date
description: Date order was placed
- name: status
description: Order status (completed, pending, cancelled)-- Reference the source
select * from {{ source('jaffle_shop', 'orders') }}freshnessmodels/
├── staging/ # One-to-one with source tables
│ ├── jaffle_shop/
│ │ ├── _jaffle_shop__sources.yml
│ │ ├── _jaffle_shop__models.yml
│ │ ├── stg_jaffle_shop__orders.sql
│ │ └── stg_jaffle_shop__customers.sql
│ └── stripe/
│ ├── _stripe__sources.yml
│ ├── _stripe__models.yml
│ └── stg_stripe__payments.sql
├── intermediate/ # Purpose-built transformations
│ └── int_orders_joined.sql
└── marts/ # Business-defined entities
├── core/
│ ├── _core__models.yml
│ ├── dim_customers.sql
│ └── fct_orders.sql
└── marketing/
└── fct_customer_sessions.sqlstg_int_fct_dim_{{ config(materialized='view') }}
select * from {{ ref('base_model') }}{{ config(materialized='table') }}
select * from {{ ref('base_model') }}{{ config(
materialized='incremental',
unique_key='order_id',
on_schema_change='fail'
) }}
select * from {{ source('jaffle_shop', 'orders') }}
{% if is_incremental() %}
-- Only process new/updated records
where order_date > (select max(order_date) from {{ this }})
{% endif %}-- Append (default): Add new rows only
{{ config(
materialized='incremental',
incremental_strategy='append'
) }}
-- Merge: Upsert based on unique_key
{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge'
) }}
-- Delete+Insert: Delete matching records, insert new
{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='delete+insert'
) }}{{ config(materialized='ephemeral') }}
select * from {{ ref('base_model') }}| Materialization | Build Speed | Query Speed | Storage | Use Case |
|---|---|---|---|---|
| View | Fast | Slow | None | Small datasets, infrequent queries |
| Table | Slow | Fast | High | Medium datasets, frequent queries |
| Incremental | Fast* | Fast | High | Large datasets, time-series data |
| Ephemeral | N/A | Varies | None | Intermediate logic, CTEs |
# models/staging/stg_orders.yml
version: 2
models:
- name: stg_orders
description: Staged order data
columns:
- name: order_id
description: Primary key
tests:
- unique
- not_null
- name: customer_id
description: Foreign key to customers
tests:
- not_null
- relationships:
to: ref('stg_customers')
field: customer_id
- name: status
description: Order status
tests:
- accepted_values:
values: ['placed', 'shipped', 'completed', 'returned', 'cancelled']
- name: order_total
description: Total order amount
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"uniquenot_nullaccepted_valuesrelationshipstests/-- tests/assert_positive_order_totals.sql
select
order_id,
order_total
from {{ ref('fct_orders') }}
where order_total < 0-- Test for data freshness
-- tests/assert_orders_are_fresh.sql
with latest_order as (
select max(order_date) as max_date
from {{ ref('fct_orders') }}
)
select max_date
from latest_order
where max_date < current_date - interval '1 day'-- Test for referential integrity across time
-- tests/assert_no_orphaned_orders.sql
select
o.order_id,
o.customer_id
from {{ ref('fct_orders') }} o
left join {{ ref('dim_customers') }} c
on o.customer_id = c.customer_id
where c.customer_id is null# Requires dbt-utils package
models:
- name: stg_orders
columns:
- name: order_id
tests:
# Test for uniqueness across multiple columns
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- order_id
- order_date
# Test for sequential values
- dbt_utils.sequential_values:
interval: 1
# Test that values match regex
- dbt_utils.not_null_proportion:
at_least: 0.95models:
- name: stg_orders
columns:
- name: order_id
tests:
- unique:
severity: error # Fail build (default)
- not_null:
severity: warn # Warning only# models/marts/core/_core__models.yml
version: 2
models:
- name: fct_orders
description: |
Order fact table containing one row per order with associated
customer and payment information. This is the primary table for
order analytics and reporting.
**Grain:** One row per order
**Refresh:** Incremental, updates daily at 2 AM UTC
**Notes:**
- Includes cancelled orders (filter with status column)
- Payment info joined from Stripe data
- Customer info joined from application database
columns:
- name: order_id
description: Primary key for orders table
tests:
- unique
- not_null
- name: customer_id
description: |
Foreign key to dim_customers. Links to customer who placed the order.
**Note:** May be null for guest checkout orders.
- name: order_date
description: Date order was placed (UTC timezone)
- name: status
description: |
Current order status. Possible values:
- `placed`: Order received, not yet processed
- `shipped`: Order shipped to customer
- `completed`: Order delivered and confirmed
- `returned`: Order returned by customer
- `cancelled`: Order cancelled before shipment
- name: order_total
description: Total order amount in USD including tax and shipping<!-- models/docs.md -->
{% docs order_status %}
Order status indicates the current state of an order in our fulfillment pipeline.
| Status | Description | Next Steps |
|--------|-------------|------------|
| placed | Order received | Inventory check |
| shipped | En route to customer | Track shipment |
| completed | Delivered successfully | Request feedback |
| returned | Customer return initiated | Process refund |
| cancelled | Order cancelled | Update inventory |
{% enddocs %}
{% docs customer_id %}
Unique identifier for customers. This ID is:
- Generated by the application on account creation
- Persistent across orders
- Used to track customer lifetime value
- **Note:** NULL for guest checkouts
{% enddocs %}models:
- name: fct_orders
columns:
- name: status
description: "{{ doc('order_status') }}"
- name: customer_id
description: "{{ doc('customer_id') }}"# Generate documentation site
dbt docs generate
# Serve documentation locally
dbt docs serve --port 8001
# View in browser at http://localhost:8001{{ config(
materialized='incremental',
unique_key='event_id'
) }}
with source as (
select
event_id,
user_id,
event_timestamp,
event_type,
event_properties
from {{ source('analytics', 'events') }}
{% if is_incremental() %}
-- Only process events newer than existing data
where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}
)
select * from sourceis_incremental(){{ this }}unique_key{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge',
merge_update_columns=['status', 'updated_at'],
merge_exclude_columns=['created_at']
) }}
with orders as (
select
order_id,
customer_id,
order_date,
status,
order_total,
current_timestamp() as updated_at,
case
when status = 'placed' then current_timestamp()
else null
end as created_at
from {{ source('ecommerce', 'orders') }}
{% if is_incremental() %}
-- Look back 3 days to catch late-arriving updates
where order_date >= (select max(order_date) - interval '3 days' from {{ this }})
{% endif %}
)
select * from ordersunique_key{{ config(
materialized='incremental',
unique_key=['date', 'customer_id'],
incremental_strategy='delete+insert'
) }}
with daily_metrics as (
select
date_trunc('day', order_timestamp) as date,
customer_id,
count(*) as order_count,
sum(order_total) as total_revenue
from {{ ref('fct_orders') }}
{% if is_incremental() %}
where date_trunc('day', order_timestamp) >= (
select max(date) - interval '7 days' from {{ this }}
)
{% endif %}
group by 1, 2
)
select * from daily_metricsunique_key{{ config(
materialized='incremental',
unique_key='order_id'
) }}
select
order_id,
customer_id,
order_date,
status,
_loaded_at
from {{ source('ecommerce', 'orders') }}
{% if is_incremental() %}
-- Use _loaded_at instead of order_date to catch updates
where _loaded_at > (select max(_loaded_at) from {{ this }})
-- OR use a lookback window
-- where order_date > (select max(order_date) - interval '3 days' from {{ this }})
{% endif %}{{ config(
materialized='incremental',
unique_key='event_id',
partition_by={
'field': 'event_date',
'data_type': 'date',
'granularity': 'day'
},
cluster_by=['user_id', 'event_type']
) }}
select
event_id,
user_id,
event_type,
event_timestamp,
date(event_timestamp) as event_date
from {{ source('analytics', 'raw_events') }}
{% if is_incremental() %}
where date(event_timestamp) > (select max(event_date) from {{ this }})
{% endif %}# Force full rebuild of incremental models
dbt run --full-refresh
# Full refresh specific model
dbt run --select my_incremental_model --full-refresh-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}select
order_id,
{{ cents_to_dollars('amount_cents') }} as amount_dollars
from {{ ref('stg_orders') }}-- macros/test_not_negative.sql
{% macro test_not_negative(model, column_name) %}
select
{{ column_name }}
from {{ model }}
where {{ column_name }} < 0
{% endmacro %}-- macros/date_spine.sql
{% macro date_spine(start_date, end_date) %}
with date_spine as (
{{ dbt_utils.date_spine(
datepart="day",
start_date="cast('" ~ start_date ~ "' as date)",
end_date="cast('" ~ end_date ~ "' as date)"
) }}
)
select
date_day
from date_spine
{% endmacro %}-- macros/pivot_metric.sql
{% macro pivot_metric(metric_column, group_by_column, values) %}
select
{{ group_by_column }},
{% for value in values %}
sum(case when status = '{{ value }}' then {{ metric_column }} else 0 end)
as {{ value }}_{{ metric_column }}
{% if not loop.last %},{% endif %}
{% endfor %}
from {{ ref('fct_orders') }}
group by 1
{% endmacro %}{{ pivot_metric(
metric_column='order_total',
group_by_column='customer_id',
values=['completed', 'pending', 'cancelled']
) }}-- macros/grant_select.sql
{% macro grant_select(schema, role) %}
{% set sql %}
grant select on all tables in schema {{ schema }} to {{ role }};
{% endset %}
{% do run_query(sql) %}
{% do log("Granted select on " ~ schema ~ " to " ~ role, info=True) %}
{% endmacro %}# dbt_project.yml
on-run-end:
- "{{ grant_select(target.schema, 'analyst_role') }}"-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if target.name == 'prod' -%}
{%- if custom_schema_name is not none -%}
{{ custom_schema_name | trim }}
{%- else -%}
{{ default_schema }}
{%- endif -%}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}-- macros/add_audit_columns.sql
{% macro add_audit_columns() %}
current_timestamp() as dbt_updated_at,
current_timestamp() as dbt_created_at,
'{{ var("dbt_user") }}' as dbt_updated_by
{% endmacro %}select
order_id,
customer_id,
order_total,
{{ add_audit_columns() }}
from {{ ref('stg_orders') }}-- Conditionals
{% if target.name == 'prod' %}
from {{ source('production', 'orders') }}
{% else %}
from {{ source('development', 'orders') }}
{% endif %}
-- Loops
{% for status in ['placed', 'shipped', 'completed'] %}
sum(case when status = '{{ status }}' then 1 else 0 end) as {{ status }}_count
{% if not loop.last %},{% endif %}
{% endfor %}
-- Set variables
{% set payment_methods = ['credit_card', 'paypal', 'bank_transfer'] %}
{% for method in payment_methods %}
count(distinct case when payment_method = '{{ method }}'
then customer_id end) as {{ method }}_customers
{% if not loop.last %},{% endif %}
{% endfor %}# packages.yml
packages:
# dbt-utils: Essential utility macros
- package: dbt-labs/dbt_utils
version: 1.1.1
# Audit helper: Compare datasets
- package: dbt-labs/audit_helper
version: 0.9.0
# Codegen: Code generation utilities
- package: dbt-labs/codegen
version: 0.11.0
# Custom package from Git
- git: "https://github.com/your-org/dbt-custom-package.git"
revision: main
# Local package
- local: ../dbt-shared-macrosdbt deps-- Surrogate key generation
select
{{ dbt_utils.generate_surrogate_key(['order_id', 'line_item_id']) }} as order_line_id,
order_id,
line_item_id
from {{ ref('stg_order_lines') }}
-- Union multiple tables
{{ dbt_utils.union_relations(
relations=[
ref('orders_2022'),
ref('orders_2023'),
ref('orders_2024')
]
) }}
-- Get column values as list
{% set statuses = dbt_utils.get_column_values(
table=ref('stg_orders'),
column='status'
) %}
-- Pivot table
{{ dbt_utils.pivot(
column='metric_name',
values=dbt_utils.get_column_values(table=ref('metrics'), column='metric_name'),
agg='sum',
then_value='metric_value',
else_value=0,
prefix='',
suffix='_total'
) }}dbt-custom-package/
├── dbt_project.yml
├── macros/
│ ├── custom_test.sql
│ └── custom_macro.sql
├── models/
│ └── example_model.sql
└── README.md# dbt_project.yml for custom package
name: 'custom_package'
version: '1.0.0'
config-version: 2
require-dbt-version: [">=1.0.0", "<2.0.0"]# Semantic versioning
packages:
- package: dbt-labs/dbt_utils
version: [">=1.0.0", "<2.0.0"] # Any 1.x version
# Exact version
- package: dbt-labs/dbt_utils
version: 1.1.1
# Git branch/tag
- git: "https://github.com/org/package.git"
revision: v1.2.3
# Latest from branch
- git: "https://github.com/org/package.git"
revision: main# .github/workflows/dbt_ci.yml
name: dbt CI
on:
pull_request:
branches: [main]
jobs:
dbt_run:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dbt
run: |
pip install dbt-core dbt-snowflake
- name: Install dbt packages
run: dbt deps
- name: Run dbt debug
run: dbt debug
env:
DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.DBT_SNOWFLAKE_ACCOUNT }}
DBT_SNOWFLAKE_USER: ${{ secrets.DBT_SNOWFLAKE_USER }}
DBT_SNOWFLAKE_PASSWORD: ${{ secrets.DBT_SNOWFLAKE_PASSWORD }}
- name: Run dbt models (modified only)
run: dbt run --select state:modified+ --state ./prod_manifest
env:
DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.DBT_SNOWFLAKE_ACCOUNT }}
DBT_SNOWFLAKE_USER: ${{ secrets.DBT_SNOWFLAKE_USER }}
DBT_SNOWFLAKE_PASSWORD: ${{ secrets.DBT_SNOWFLAKE_PASSWORD }}
- name: Run dbt tests
run: dbt test --select state:modified+
env:
DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.DBT_SNOWFLAKE_ACCOUNT }}
DBT_SNOWFLAKE_USER: ${{ secrets.DBT_SNOWFLAKE_USER }}
DBT_SNOWFLAKE_PASSWORD: ${{ secrets.DBT_SNOWFLAKE_PASSWORD }}# Store production manifest
dbt compile --target prod
cp target/manifest.json ./prod_manifest/
# In CI: Test only changed models and downstream dependencies
dbt test --select state:modified+ --state ./prod_manifest# .github/workflows/dbt_prod.yml
name: dbt Production Deploy
on:
push:
branches: [main]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dbt
run: pip install dbt-core dbt-snowflake
- name: Install packages
run: dbt deps
- name: Run dbt seed
run: dbt seed --target prod
- name: Run dbt run
run: dbt run --target prod
- name: Run dbt test
run: dbt test --target prod
- name: Generate docs
run: dbt docs generate --target prod
- name: Upload docs to S3
run: |
aws s3 sync target/ s3://dbt-docs-bucket/
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}# dags/dbt_dag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'analytics',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'dbt_production',
default_args=default_args,
description='Run dbt models in production',
schedule_interval='0 2 * * *', # 2 AM daily
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['dbt', 'analytics'],
) as dag:
dbt_deps = BashOperator(
task_id='dbt_deps',
bash_command='cd /opt/dbt && dbt deps',
)
dbt_seed = BashOperator(
task_id='dbt_seed',
bash_command='cd /opt/dbt && dbt seed --target prod',
)
dbt_run_staging = BashOperator(
task_id='dbt_run_staging',
bash_command='cd /opt/dbt && dbt run --select staging.* --target prod',
)
dbt_run_marts = BashOperator(
task_id='dbt_run_marts',
bash_command='cd /opt/dbt && dbt run --select marts.* --target prod',
)
dbt_test = BashOperator(
task_id='dbt_test',
bash_command='cd /opt/dbt && dbt test --target prod',
)
dbt_docs = BashOperator(
task_id='dbt_docs',
bash_command='cd /opt/dbt && dbt docs generate --target prod',
)
# Define task dependencies
dbt_deps >> dbt_seed >> dbt_run_staging >> dbt_run_marts >> dbt_test >> dbt_docs# dbt_cloud.yml
# Environment configuration
environments:
- name: Production
dbt_version: 1.7.latest
type: deployment
- name: Development
dbt_version: 1.7.latest
type: development
# Job configuration
jobs:
- name: Production Run
environment: Production
triggers:
schedule:
cron: "0 2 * * *" # 2 AM daily
commands:
- dbt deps
- dbt seed
- dbt run
- dbt test
- name: CI Check
environment: Development
triggers:
github_webhook: true
commands:
- dbt deps
- dbt run --select state:modified+
- dbt test --select state:modified+-- macros/post_hook_monitoring.sql
{% macro monitor_row_count(threshold=0) %}
{% if execute %}
{% set row_count_query %}
select count(*) as row_count from {{ this }}
{% endset %}
{% set results = run_query(row_count_query) %}
{% set row_count = results.columns[0].values()[0] %}
{% if row_count < threshold %}
{{ exceptions.raise_compiler_error("Row count " ~ row_count ~ " below threshold " ~ threshold) }}
{% endif %}
{{ log("Model " ~ this ~ " has " ~ row_count ~ " rows", info=True) }}
{% endif %}
{% endmacro %}{{ config(
post_hook="{{ monitor_row_count(threshold=1000) }}"
) }}
select * from {{ ref('stg_orders') }}stg_[source]__[entity].sql # Staging: stg_stripe__payments.sql
int_[entity]_[verb].sql # Intermediate: int_orders_joined.sql
fct_[entity].sql # Fact: fct_orders.sql
dim_[entity].sql # Dimension: dim_customers.sqlassert_[description].sql # assert_positive_order_totals.sql[verb]_[noun].sql # generate_surrogate_key.sql-- ✓ Good: Clear CTEs, proper formatting
with orders as (
select
order_id,
customer_id,
order_date,
status
from {{ ref('stg_orders') }}
where status != 'cancelled'
),
customers as (
select
customer_id,
customer_name,
customer_email
from {{ ref('dim_customers') }}
),
final as (
select
orders.order_id,
orders.order_date,
customers.customer_name,
orders.status
from orders
left join customers
on orders.customer_id = customers.customer_id
)
select * from final
-- ✗ Bad: Nested subqueries, poor formatting
select o.order_id, o.order_date, c.customer_name, o.status from (
select order_id, customer_id, order_date, status from {{ ref('stg_orders') }}
where status != 'cancelled') o left join (select customer_id, customer_name from
{{ ref('dim_customers') }}) c on o.customer_id = c.customer_id-- Process only new data
{{ config(materialized='incremental') }}
select * from {{ source('events', 'page_views') }}
{% if is_incremental() %}
where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}{{ config(
materialized='table',
partition_by={'field': 'order_date', 'data_type': 'date'},
cluster_by=['customer_id', 'status']
) }}-- ✓ Good: Filter early
with source as (
select *
from {{ source('app', 'events') }}
where event_date >= '2024-01-01' -- Filter in source CTE
)
-- ✗ Bad: Filter late
with source as (
select * from {{ source('app', 'events') }}
)
select * from source
where event_date >= '2024-01-01' -- Filtering after full scan-- Avoid creating unnecessary views
{{ config(materialized='ephemeral') }}
select
order_id,
lower(trim(status)) as status_clean
from {{ ref('stg_orders') }}Staging → Intermediate → Marts
↓ ↓ ↓
1:1 Purpose-built Business
Sources Logic Entities-- Instead of one massive model, break it down:
-- intermediate/int_order_items_aggregated.sql
-- intermediate/int_customer_lifetime_value.sql
-- intermediate/int_payment_summaries.sql
-- marts/fct_orders.sql (combines intermediate models)models/
├── staging/
│ └── [source]/
│ ├── _[source]__sources.yml
│ ├── _[source]__models.yml
│ └── stg_[source]__[table].sql
├── intermediate/
│ └── int_[purpose].sql
└── marts/
└── [business_area]/
├── _[area]__models.yml
└── [model_type]_[entity].sql# Source tests: Data quality at ingestion
sources:
- name: raw_data
tables:
- name: orders
columns:
- name: id
tests: [unique, not_null]
# Model tests: Transformation logic
models:
- name: fct_orders
tests:
- dbt_utils.expression_is_true:
expression: "order_total >= 0"
columns:
- name: order_id
tests: [unique, not_null]
# Custom tests: Business logic
# tests/assert_revenue_reconciliation.sql# Critical tests: error (fail build)
# Nice-to-have tests: warn (log but don't fail)
tests:
- unique:
severity: error
- dbt_utils.not_null_proportion:
at_least: 0.95
severity: warnmodels:
- name: fct_orders
description: |
**Purpose:** [Why this model exists]
**Grain:** [One row represents...]
**Refresh:** [When and how often]
**Consumers:** [Who uses this]-- Use comments for complex business rules
select
order_id,
-- Revenue recognition: Only count completed orders
-- cancelled within 30 days (per finance policy 2024-03)
case
when status = 'completed'
and datediff('day', order_date, current_date) > 30
then order_total
else 0
end as recognized_revenue
from {{ ref('stg_orders') }}dbt docs generate-- models/staging/jaffle_shop/stg_jaffle_shop__customers.sql
with source as (
select * from {{ source('jaffle_shop', 'customers') }}
),
renamed as (
select
id as customer_id,
first_name,
last_name,
first_name || ' ' || last_name as customer_name,
email,
_loaded_at
from source
)
select * from renamed-- models/marts/core/fct_orders.sql
{{ config(
materialized='table',
tags=['core', 'daily']
) }}
with orders as (
select * from {{ ref('stg_jaffle_shop__orders') }}
),
customers as (
select * from {{ ref('dim_customers') }}
),
payments as (
select
order_id,
sum(amount) as total_payment_amount
from {{ ref('stg_stripe__payments') }}
where status = 'success'
group by 1
),
final as (
select
orders.order_id,
orders.customer_id,
customers.customer_name,
orders.order_date,
orders.status,
coalesce(payments.total_payment_amount, 0) as order_total,
{{ add_audit_columns() }}
from orders
left join customers
on orders.customer_id = customers.customer_id
left join payments
on orders.order_id = payments.order_id
)
select * from final-- models/marts/analytics/fct_page_views.sql
{{ config(
materialized='incremental',
unique_key='page_view_id',
partition_by={
'field': 'event_date',
'data_type': 'date',
'granularity': 'day'
},
cluster_by=['user_id', 'page_path']
) }}
with events as (
select
event_id as page_view_id,
user_id,
session_id,
event_timestamp,
date(event_timestamp) as event_date,
event_properties:page_path::string as page_path,
event_properties:referrer::string as referrer,
_loaded_at
from {{ source('analytics', 'raw_events') }}
where event_type = 'page_view'
{% if is_incremental() %}
-- Use _loaded_at to catch late-arriving data
and _loaded_at > (select max(_loaded_at) from {{ this }})
{% endif %}
),
enriched as (
select
page_view_id,
user_id,
session_id,
event_timestamp,
event_date,
page_path,
referrer,
-- Parse URL components
split_part(page_path, '?', 1) as page_path_clean,
case
when referrer like '%google%' then 'Google'
when referrer like '%facebook%' then 'Facebook'
when referrer is null then 'Direct'
else 'Other'
end as referrer_source,
_loaded_at
from events
)
select * from enriched-- models/marts/core/dim_customers.sql
{{ config(
materialized='table',
unique_key='customer_key'
) }}
with customers as (
select * from {{ ref('stg_jaffle_shop__customers') }}
),
customer_orders as (
select
customer_id,
min(order_date) as first_order_date,
max(order_date) as most_recent_order_date,
count(order_id) as total_orders
from {{ ref('fct_orders') }}
group by 1
),
final as (
select
{{ dbt_utils.generate_surrogate_key(['customers.customer_id', 'customers._loaded_at']) }}
as customer_key,
customers.customer_id,
customers.customer_name,
customers.email,
customer_orders.first_order_date,
customer_orders.most_recent_order_date,
customer_orders.total_orders,
case
when customer_orders.total_orders >= 10 then 'VIP'
when customer_orders.total_orders >= 5 then 'Regular'
when customer_orders.total_orders >= 1 then 'New'
else 'Prospect'
end as customer_segment,
customers._loaded_at as effective_from,
null as effective_to,
true as is_current
from customers
left join customer_orders
on customers.customer_id = customer_orders.customer_id
)
select * from final-- models/marts/analytics/daily_order_metrics.sql
{{ config(
materialized='incremental',
unique_key=['metric_date', 'status'],
incremental_strategy='delete+insert'
) }}
with orders as (
select * from {{ ref('fct_orders') }}
{% if is_incremental() %}
where order_date >= (select max(metric_date) - interval '7 days' from {{ this }})
{% endif %}
),
daily_metrics as (
select
date_trunc('day', order_date) as metric_date,
status,
count(distinct order_id) as order_count,
count(distinct customer_id) as unique_customers,
sum(order_total) as total_revenue,
avg(order_total) as avg_order_value,
min(order_total) as min_order_value,
max(order_total) as max_order_value,
percentile_cont(0.5) within group (order by order_total) as median_order_value
from orders
group by 1, 2
)
select * from daily_metrics-- models/marts/analytics/customer_order_status_summary.sql
with orders as (
select
customer_id,
status,
order_total
from {{ ref('fct_orders') }}
)
select
customer_id,
{% for status in ['placed', 'shipped', 'completed', 'returned', 'cancelled'] %}
sum(case when status = '{{ status }}' then 1 else 0 end)
as {{ status }}_count,
sum(case when status = '{{ status }}' then order_total else 0 end)
as {{ status }}_revenue
{% if not loop.last %},{% endif %}
{% endfor %}
from orders
group by 1-- snapshots/customers_snapshot.sql
{% snapshot customers_snapshot %}
{{
config(
target_schema='snapshots',
target_database='analytics',
unique_key='customer_id',
strategy='timestamp',
updated_at='updated_at',
invalidate_hard_deletes=True
)
}}
select
customer_id,
customer_name,
email,
customer_segment,
updated_at
from {{ source('jaffle_shop', 'customers') }}
{% endsnapshot %}-- models/marts/analytics/conversion_funnel.sql
with page_views as (
select
user_id,
session_id,
min(event_timestamp) as session_start
from {{ ref('fct_page_views') }}
where event_date >= current_date - interval '30 days'
group by 1, 2
),
product_views as (
select distinct
user_id,
session_id
from {{ ref('fct_page_views') }}
where page_path like '/product/%'
and event_date >= current_date - interval '30 days'
),
add_to_cart as (
select distinct
user_id,
session_id
from {{ ref('fct_events') }}
where event_type = 'add_to_cart'
and event_date >= current_date - interval '30 days'
),
checkout_started as (
select distinct
user_id,
session_id
from {{ ref('fct_events') }}
where event_type = 'checkout_started'
and event_date >= current_date - interval '30 days'
),
orders as (
select distinct
customer_id as user_id,
session_id
from {{ ref('fct_orders') }}
where order_date >= current_date - interval '30 days'
and status = 'completed'
),
funnel as (
select
count(distinct page_views.session_id) as sessions,
count(distinct product_views.session_id) as product_views,
count(distinct add_to_cart.session_id) as add_to_cart,
count(distinct checkout_started.session_id) as checkout_started,
count(distinct orders.session_id) as completed_orders
from page_views
left join product_views using (session_id)
left join add_to_cart using (session_id)
left join checkout_started using (session_id)
left join orders using (session_id)
),
funnel_metrics as (
select
sessions,
product_views,
round(100.0 * product_views / nullif(sessions, 0), 2) as pct_product_views,
add_to_cart,
round(100.0 * add_to_cart / nullif(product_views, 0), 2) as pct_add_to_cart,
checkout_started,
round(100.0 * checkout_started / nullif(add_to_cart, 0), 2) as pct_checkout_started,
completed_orders,
round(100.0 * completed_orders / nullif(checkout_started, 0), 2) as pct_completed_orders,
round(100.0 * completed_orders / nullif(sessions, 0), 2) as overall_conversion_rate
from funnel
)
select * from funnel_metrics-- models/marts/analytics/cohort_retention.sql
with customer_orders as (
select
customer_id,
date_trunc('month', order_date) as order_month
from {{ ref('fct_orders') }}
where status = 'completed'
),
first_order as (
select
customer_id,
min(order_month) as cohort_month
from customer_orders
group by 1
),
cohort_data as (
select
f.cohort_month,
c.order_month,
datediff('month', f.cohort_month, c.order_month) as months_since_first_order,
count(distinct c.customer_id) as customer_count
from first_order f
join customer_orders c
on f.customer_id = c.customer_id
group by 1, 2, 3
),
cohort_size as (
select
cohort_month,
customer_count as cohort_size
from cohort_data
where months_since_first_order = 0
),
retention as (
select
cohort_data.cohort_month,
cohort_data.months_since_first_order,
cohort_data.customer_count,
cohort_size.cohort_size,
round(100.0 * cohort_data.customer_count / cohort_size.cohort_size, 2) as retention_pct
from cohort_data
join cohort_size
on cohort_data.cohort_month = cohort_size.cohort_month
)
select * from retention
order by cohort_month, months_since_first_order-- models/marts/analytics/revenue_attribution.sql
with touchpoints as (
select
user_id,
session_id,
event_timestamp,
case
when referrer like '%google%' then 'Google'
when referrer like '%facebook%' then 'Facebook'
when referrer like '%email%' then 'Email'
when referrer is null then 'Direct'
else 'Other'
end as channel
from {{ ref('fct_page_views') }}
),
customer_journeys as (
select
t.user_id,
o.order_id,
o.order_total,
t.channel,
t.event_timestamp,
o.order_date,
row_number() over (
partition by o.order_id
order by t.event_timestamp
) as touchpoint_number,
count(*) over (partition by o.order_id) as total_touchpoints
from touchpoints t
join {{ ref('fct_orders') }} o
on t.user_id = o.customer_id
and t.event_timestamp <= o.order_date
and t.event_timestamp >= dateadd('day', -30, o.order_date)
),
attributed_revenue as (
select
order_id,
channel,
order_total,
-- First touch attribution
case when touchpoint_number = 1
then order_total else 0 end as first_touch_revenue,
-- Last touch attribution
case when touchpoint_number = total_touchpoints
then order_total else 0 end as last_touch_revenue,
-- Linear attribution
order_total / total_touchpoints as linear_revenue,
-- Time decay (more recent touchpoints get more credit)
order_total * (power(2, touchpoint_number - 1) /
(power(2, total_touchpoints) - 1)) as time_decay_revenue
from customer_journeys
)
select
channel,
count(distinct order_id) as orders,
sum(first_touch_revenue) as first_touch_revenue,
sum(last_touch_revenue) as last_touch_revenue,
sum(linear_revenue) as linear_revenue,
sum(time_decay_revenue) as time_decay_revenue
from attributed_revenue
group by 1-- tests/assert_fct_orders_quality.sql
-- Test multiple data quality rules in one test
with order_quality_checks as (
select
order_id,
customer_id,
order_date,
order_total,
status,
-- Check 1: Order total should be positive
case when order_total < 0
then 'Negative order total' end as check_1,
-- Check 2: Order date should not be in future
case when order_date > current_date
then 'Future order date' end as check_2,
-- Check 3: Customer ID should exist
case when customer_id is null
then 'Missing customer ID' end as check_3,
-- Check 4: Status should be valid
case when status not in ('placed', 'shipped', 'completed', 'returned', 'cancelled')
then 'Invalid status' end as check_4
from {{ ref('fct_orders') }}
),
failed_checks as (
select
order_id,
check_1,
check_2,
check_3,
check_4
from order_quality_checks
where check_1 is not null
or check_2 is not null
or check_3 is not null
or check_4 is not null
)
select * from failed_checks-- models/marts/core/dim_products_scd.sql
{{ config(
materialized='incremental',
unique_key='product_key',
merge_update_columns=['product_name', 'category', 'price', 'effective_to', 'is_current']
) }}
with source_data as (
select
product_id,
product_name,
category,
price,
updated_at
from {{ source('ecommerce', 'products') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }} where is_current = true)
{% endif %}
),
{% if is_incremental() %}
existing_records as (
select *
from {{ this }}
where is_current = true
),
changed_records as (
select
s.product_id,
s.product_name,
s.category,
s.price,
s.updated_at
from source_data s
join existing_records e
on s.product_id = e.product_id
and (
s.product_name != e.product_name
or s.category != e.category
or s.price != e.price
)
),
expire_old_records as (
select
e.product_key,
e.product_id,
e.product_name,
e.category,
e.price,
e.effective_from,
c.updated_at as effective_to,
false as is_current,
e.updated_at
from existing_records e
join changed_records c
on e.product_id = c.product_id
),
new_versions as (
select
{{ dbt_utils.generate_surrogate_key(['c.product_id', 'c.updated_at']) }} as product_key,
c.product_id,
c.product_name,
c.category,
c.price,
c.updated_at as effective_from,
null::timestamp as effective_to,
true as is_current,
c.updated_at
from changed_records c
),
combined as (
select * from expire_old_records
union all
select * from new_versions
)
select * from combined
{% else %}
-- First load: all records are current
select
{{ dbt_utils.generate_surrogate_key(['product_id', 'updated_at']) }} as product_key,
product_id,
product_name,
category,
price,
updated_at as effective_from,
null::timestamp as effective_to,
true as is_current,
updated_at
from source_data
{% endif %}-- models/marts/analytics/customer_rfm_score.sql
with customer_metrics as (
select
customer_id,
max(order_date) as last_order_date,
count(order_id) as total_orders,
sum(order_total) as total_revenue
from {{ ref('fct_orders') }}
where status = 'completed'
group by 1
),
rfm_calculations as (
select
customer_id,
-- Recency: Days since last order
datediff('day', last_order_date, current_date) as recency_days,
-- Frequency: Total orders
total_orders as frequency,
-- Monetary: Total revenue
total_revenue as monetary,
-- Recency score (1-5, lower days = higher score)
ntile(5) over (order by datediff('day', last_order_date, current_date) desc) as recency_score,
-- Frequency score (1-5, more orders = higher score)
ntile(5) over (order by total_orders) as frequency_score,
-- Monetary score (1-5, more revenue = higher score)
ntile(5) over (order by total_revenue) as monetary_score
from customer_metrics
),
rfm_segments as (
select
customer_id,
recency_days,
frequency,
monetary,
recency_score,
frequency_score,
monetary_score,
recency_score * 100 + frequency_score * 10 + monetary_score as rfm_score,
case
when recency_score >= 4 and frequency_score >= 4 and monetary_score >= 4
then 'Champions'
when recency_score >= 3 and frequency_score >= 3 and monetary_score >= 3
then 'Loyal Customers'
when recency_score >= 4 and frequency_score <= 2 and monetary_score <= 2
then 'Promising'
when recency_score >= 3 and frequency_score <= 2 and monetary_score <= 2
then 'Potential Loyalists'
when recency_score <= 2 and frequency_score >= 3 and monetary_score >= 3
then 'At Risk'
when recency_score <= 2 and frequency_score <= 2 and monetary_score <= 2
then 'Hibernating'
when recency_score <= 1
then 'Lost'
else 'Need Attention'
end as customer_segment
from rfm_calculations
)
select * from rfm_segments-- models/staging/stg_all_events.sql
{{ config(
materialized='view'
) }}
-- Union events from multiple sources using dbt_utils
{{
dbt_utils.union_relations(
relations=[
ref('stg_web_events'),
ref('stg_mobile_events'),
ref('stg_api_events')
],
exclude=['_loaded_at'], -- Exclude source-specific columns
source_column_name='event_source' -- Add column to track source
)
}}-- models/marts/core/fct_order_lines.sql
with order_lines as (
select
order_id,
line_number,
product_id,
quantity,
unit_price,
quantity * unit_price as line_total
from {{ source('ecommerce', 'order_lines') }}
)
select
{{ dbt_utils.generate_surrogate_key(['order_id', 'line_number']) }} as order_line_key,
{{ dbt_utils.generate_surrogate_key(['order_id']) }} as order_key,
{{ dbt_utils.generate_surrogate_key(['product_id']) }} as product_key,
order_id,
line_number,
product_id,
quantity,
unit_price,
line_total
from order_lines-- models/marts/analytics/daily_revenue_complete.sql
-- Generate complete date spine to ensure no missing dates
with date_spine as (
{{ dbt_utils.date_spine(
datepart="day",
start_date="cast('2020-01-01' as date)",
end_date="cast(current_date as date)"
) }}
),
daily_revenue as (
select
date_trunc('day', order_date) as order_date,
sum(order_total) as revenue
from {{ ref('fct_orders') }}
where status = 'completed'
group by 1
),
complete_series as (
select
date_spine.date_day,
coalesce(daily_revenue.revenue, 0) as revenue,
-- 7-day moving average
avg(coalesce(daily_revenue.revenue, 0)) over (
order by date_spine.date_day
rows between 6 preceding and current row
) as revenue_7d_ma,
-- Month-to-date revenue
sum(coalesce(daily_revenue.revenue, 0)) over (
partition by date_trunc('month', date_spine.date_day)
order by date_spine.date_day
) as revenue_mtd
from date_spine
left join daily_revenue
on date_spine.date_day = daily_revenue.order_date
)
select * from complete_series-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if target.name == 'prod' -%}
-- Production: Use custom schema names directly
{%- if custom_schema_name is not none -%}
{{ custom_schema_name | trim }}
{%- else -%}
{{ default_schema }}
{%- endif -%}
{%- elif target.name == 'dev' -%}
-- Development: Prefix with dev_username
{%- if custom_schema_name is not none -%}
dev_{{ env_var('DBT_USER', 'unknown') }}_{{ custom_schema_name | trim }}
{%- else -%}
dev_{{ env_var('DBT_USER', 'unknown') }}
{%- endif -%}
{%- else -%}
-- Default: Concatenate target schema with custom schema
{{ default_schema }}_{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}-- macros/cross_db_concat.sql
-- Handle database-specific concat syntax
{% macro concat(fields) -%}
{{ return(adapter.dispatch('concat', 'dbt_utils')(fields)) }}
{%- endmacro %}
{% macro default__concat(fields) -%}
concat({{ fields|join(', ') }})
{%- endmacro %}
{% macro snowflake__concat(fields) -%}
{{ fields|join(' || ') }}
{%- endmacro %}
{% macro bigquery__concat(fields) -%}
concat({{ fields|join(', ') }})
{%- endmacro %}
{% macro redshift__concat(fields) -%}
{{ fields|join(' || ') }}
{%- endmacro %}select
{{ concat(['first_name', "' '", 'last_name']) }} as full_name
from {{ ref('stg_customers') }}-- models/marts/core/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
pre_hook=[
"delete from {{ this }} where order_date < dateadd('year', -3, current_date)",
"{{ log('Starting incremental load for fct_orders', info=True) }}"
],
post_hook=[
"create index if not exists idx_fct_orders_customer_id on {{ this }}(customer_id)",
"create index if not exists idx_fct_orders_order_date on {{ this }}(order_date)",
"{{ grant_select(this, 'analyst_role') }}",
"{{ log('Completed incremental load for fct_orders', info=True) }}"
],
tags=['core', 'incremental']
)
}}
select * from {{ ref('stg_orders') }}
{% if is_incremental() %}
where order_date > (select max(order_date) from {{ this }})
{% endif %}# models/exposures.yml
version: 2
exposures:
- name: customer_dashboard
description: |
Executive dashboard showing customer metrics including:
- Customer acquisition trends
- Customer lifetime value
- Retention rates
- RFM segmentation
type: dashboard
maturity: high
url: https://looker.company.com/dashboards/customer-metrics
owner:
name: Analytics Team
email: analytics@company.com
depends_on:
- ref('fct_orders')
- ref('dim_customers')
- ref('customer_rfm_score')
- ref('cohort_retention')
tags: ['executive', 'customer-analytics']
- name: revenue_forecast_model
description: |
Machine learning model for revenue forecasting.
Uses historical order data to predict future revenue.
type: ml
maturity: medium
url: https://mlflow.company.com/models/revenue-forecast
owner:
name: Data Science Team
email: datascience@company.com
depends_on:
- ref('fct_orders')
- ref('daily_revenue_complete')
tags: ['ml', 'forecasting']# Install dependencies
dbt deps
# Compile project (check for errors)
dbt compile
# Run all models
dbt run
# Run specific model
dbt run --select fct_orders
# Run model and downstream dependencies
dbt run --select fct_orders+
# Run model and upstream dependencies
dbt run --select +fct_orders
# Run model and all dependencies
dbt run --select +fct_orders+
# Run all models in a directory
dbt run --select staging.*
# Run models with specific tag
dbt run --select tag:daily
# Run models, exclude specific ones
dbt run --exclude staging.*
# Run with full refresh (incremental models)
dbt run --full-refresh
# Test all models
dbt test
# Test specific model
dbt test --select fct_orders
# Generate documentation
dbt docs generate
# Serve documentation
dbt docs serve
# Debug connection
dbt debug
# Clean compiled files
dbt clean
# Seed CSV files
dbt seed
# Snapshot models
dbt snapshot
# List resources
dbt ls --select staging.*
# Show compiled SQL
dbt show --select fct_orders
# Parse project
dbt parse# By name
--select model_name
# By path
--select staging.jaffle_shop.*
# By tag
--select tag:daily
# By resource type
--select resource_type:model
# By package
--select package:dbt_utils
# By status (modified, new)
--select state:modified+ --state ./prod_manifest
# Combinations (union)
--select model_a model_b
# Intersections
--select tag:daily,staging.*
# Graph operators
--select +model_name # Upstream dependencies
--select model_name+ # Downstream dependencies
--select +model_name+ # All dependencies
--select @model_name # Model + children/parents to nth degree