cosmos-dbt-fusion

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Cosmos + dbt Fusion: Implementation Checklist

Cosmos + dbt Fusion:实施检查清单

Execute steps in order. This skill covers Fusion-specific constraints only.
Version note: dbt Fusion support was introduced in Cosmos 1.11.0. Requires Cosmos ≥1.11.
Reference: See reference/cosmos-config.md for ProfileConfig, operator_args, and Airflow 3 compatibility details.
Before starting, confirm: (1) dbt engine = Fusion (not Core → use cosmos-dbt-core), (2) warehouse = Snowflake or Databricks only, (3)
ExecutionMode.LOCAL
is acceptable (no containerized/async/virtualenv).
请按顺序执行以下步骤。本文仅涵盖Fusion特有的约束条件。
版本说明:dbt Fusion支持在Cosmos 1.11.0版本中引入。要求Cosmos版本≥1.11。
参考文档:有关ProfileConfig、operator_args以及Airflow 3兼容性的详细信息,请查看**reference/cosmos-config.md**。
开始前确认:(1) dbt引擎为Fusion(若为Core请使用cosmos-dbt-core),(2) 仅支持Snowflake或Databricks数据仓库,(3) 可接受
ExecutionMode.LOCAL
模式(不支持容器化/异步/虚拟环境)。

Fusion-Specific Constraints

Fusion特有的约束条件

ConstraintDetails
Execution mode
ExecutionMode.LOCAL
only
No async
AIRFLOW_ASYNC
not supported
No containerized
DOCKER
/
KUBERNETES
/ etc. not supported
No virtualenvFusion is a binary, not a Python package
Warehouse supportSnowflake + Databricks only (public beta)

约束条件详情
执行模式仅支持
ExecutionMode.LOCAL
不支持异步不支持
AIRFLOW_ASYNC
不支持容器化不支持
DOCKER
/
KUBERNETES
等模式
不支持虚拟环境Fusion是二进制文件,而非Python包
数据仓库支持仅支持Snowflake和Databricks(公开测试版)

1. Confirm Cosmos Version

1. 确认Cosmos版本

CRITICAL: Cosmos 1.11.0 introduced dbt Fusion compatibility.
bash
undefined
重要提示:Cosmos 1.11.0版本开始支持dbt Fusion。
bash
undefined

Check installed version

Check installed version

pip show astronomer-cosmos
pip show astronomer-cosmos

Install/upgrade if needed

Install/upgrade if needed

pip install "astronomer-cosmos>=1.11.0"

**Validate**: `pip show astronomer-cosmos` reports version ≥ 1.11.0

---
pip install "astronomer-cosmos>=1.11.0"

**验证**:`pip show astronomer-cosmos`显示的版本≥1.11.0

---

2. Install the dbt Fusion Binary (REQUIRED)

2. 安装dbt Fusion二进制文件(必填)

dbt Fusion is NOT bundled with Cosmos or dbt Core. Install it into the Airflow runtime/image.
Determine where to install the Fusion binary (Dockerfile / base image / runtime).
dbt Fusion未与Cosmos或dbt Core捆绑。请将其安装到Airflow运行环境/镜像中。
确定Fusion二进制文件的安装位置(Dockerfile / 基础镜像 / 运行时环境)。

Example Dockerfile Install

Dockerfile安装示例

dockerfile
USER root
RUN apt-get update && apt-get install -y curl
ENV SHELL=/bin/bash
RUN curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s -- --update
USER astro
dockerfile
USER root
RUN apt-get update && apt-get install -y curl
ENV SHELL=/bin/bash
RUN curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s -- --update
USER astro

Common Install Paths

常见安装路径

EnvironmentTypical path
Astro Runtime
/home/astro/.local/bin/dbt
System-wide
/usr/local/bin/dbt
Validate: The
dbt
binary exists at the chosen path and
dbt --version
succeeds.

环境典型路径
Astro Runtime
/home/astro/.local/bin/dbt
系统全局
/usr/local/bin/dbt
验证:所选路径下存在
dbt
二进制文件,且执行
dbt --version
成功。

3. Choose Parsing Strategy (RenderConfig)

3. 选择解析策略(RenderConfig)

Parsing strategy is the same as dbt Core. Pick ONE:
Load modeWhen to useRequired inputs
dbt_manifest
Large projects; fastest parsing
ProjectConfig.manifest_path
dbt_ls
Complex selectors; need dbt-native selectionFusion binary accessible to scheduler
automatic
Simple setups; let Cosmos pick(none)
python
from cosmos import RenderConfig, LoadMode

_render_config = RenderConfig(
    load_method=LoadMode.AUTOMATIC,  # or DBT_MANIFEST, DBT_LS
)

解析策略与dbt Core相同。请选择其中一种:
加载模式使用场景必填输入
dbt_manifest
大型项目;解析速度最快
ProjectConfig.manifest_path
dbt_ls
复杂选择器;需要dbt原生选择功能调度器可访问Fusion二进制文件
automatic
简单配置;由Cosmos自动选择
python
from cosmos import RenderConfig, LoadMode

_render_config = RenderConfig(
    load_method=LoadMode.AUTOMATIC,  # or DBT_MANIFEST, DBT_LS
)

4. Configure Warehouse Connection (ProfileConfig)

4. 配置数据仓库连接(ProfileConfig)

Reference: See reference/cosmos-config.md for full ProfileConfig options and examples.
WarehouseProfileMapping Class
Snowflake
SnowflakeUserPasswordProfileMapping
Databricks
DatabricksTokenProfileMapping
python
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

参考文档:有关ProfileConfig的完整选项和示例,请查看**reference/cosmos-config.md**。
数据仓库ProfileMapping类
Snowflake
SnowflakeUserPasswordProfileMapping
Databricks
DatabricksTokenProfileMapping
python
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

5. Configure ExecutionConfig (LOCAL Only)

5. 配置执行配置(仅LOCAL模式)

CRITICAL: dbt Fusion with Cosmos requires
ExecutionMode.LOCAL
with
dbt_executable_path
pointing to the Fusion binary.
python
from cosmos import ExecutionConfig

_execution_config = ExecutionConfig(
    dbt_executable_path="/home/astro/.local/bin/dbt",  # REQUIRED: path to Fusion binary
    # execution_mode is LOCAL by default - do not change
)
重要提示:Cosmos搭配dbt Fusion使用时,必须设置
ExecutionMode.LOCAL
,且
dbt_executable_path
指向Fusion二进制文件。
python
from cosmos import ExecutionConfig

_execution_config = ExecutionConfig(
    dbt_executable_path="/home/astro/.local/bin/dbt",  # REQUIRED: path to Fusion binary
    # execution_mode is LOCAL by default - do not change
)

What "Local-Only" Means

"仅本地模式"的含义

AllowedNot Allowed
✅ Install Fusion binary into Airflow image/runtime
ExecutionMode.DOCKER
/
KUBERNETES
ExecutionMode.LOCAL
(default)
ExecutionMode.AIRFLOW_ASYNC
ExecutionMode.VIRTUALENV
(Fusion is a binary, not a Python package)

允许操作禁止操作
✅ 将Fusion二进制文件安装到Airflow镜像/运行环境
ExecutionMode.DOCKER
/
KUBERNETES
ExecutionMode.LOCAL
(默认值)
ExecutionMode.AIRFLOW_ASYNC
ExecutionMode.VIRTUALENV
(Fusion是二进制文件,而非Python包)

6. Configure Project (ProjectConfig)

6. 配置项目(ProjectConfig)

python
from cosmos import ProjectConfig

_project_config = ProjectConfig(
    dbt_project_path="/path/to/dbt/project",
    # manifest_path="/path/to/manifest.json",  # for dbt_manifest load mode
    # install_dbt_deps=False,  # if deps precomputed in CI
)

python
from cosmos import ProjectConfig

_project_config = ProjectConfig(
    dbt_project_path="/path/to/dbt/project",
    # manifest_path="/path/to/manifest.json",  # for dbt_manifest load mode
    # install_dbt_deps=False,  # if deps precomputed in CI
)

7. Assemble DAG / TaskGroup

7. 组装DAG / TaskGroup

Option A: DbtDag (Standalone)

选项A:DbtDag(独立使用)

python
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime

_project_config = ProjectConfig(
    dbt_project_path="/usr/local/airflow/dbt/my_project",
)

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

_execution_config = ExecutionConfig(
    dbt_executable_path="/home/astro/.local/bin/dbt",  # Fusion binary
)

_render_config = RenderConfig()

my_fusion_dag = DbtDag(
    dag_id="my_fusion_cosmos_dag",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
)
python
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime

_project_config = ProjectConfig(
    dbt_project_path="/usr/local/airflow/dbt/my_project",
)

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

_execution_config = ExecutionConfig(
    dbt_executable_path="/home/astro/.local/bin/dbt",  # Fusion binary
)

_render_config = RenderConfig()

my_fusion_dag = DbtDag(
    dag_id="my_fusion_cosmos_dag",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
)

Option B: DbtTaskGroup (Inside Existing DAG)

选项B:DbtTaskGroup(嵌入现有DAG)

python
from airflow.sdk import dag, task  # Airflow 3.x
python
from airflow.sdk import dag, task  # Airflow 3.x

from airflow.decorators import dag, task # Airflow 2.x

from airflow.decorators import dag, task # Airflow 2.x

from airflow.models.baseoperator import chain from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig from pendulum import datetime
_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project") _profile_config = ProfileConfig(profile_name="default", target_name="dev") _execution_config = ExecutionConfig(dbt_executable_path="/home/astro/.local/bin/dbt")
@dag(start_date=datetime(2025, 1, 1), schedule="@daily") def my_dag(): @task def pre_dbt(): return "some_value"
dbt = DbtTaskGroup(
    group_id="dbt_fusion_project",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
)

@task
def post_dbt():
    pass

chain(pre_dbt(), dbt, post_dbt())
my_dag()

---
from airflow.models.baseoperator import chain from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig from pendulum import datetime
_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project") _profile_config = ProfileConfig(profile_name="default", target_name="dev") _execution_config = ExecutionConfig(dbt_executable_path="/home/astro/.local/bin/dbt")
@dag(start_date=datetime(2025, 1, 1), schedule="@daily") def my_dag(): @task def pre_dbt(): return "some_value"
dbt = DbtTaskGroup(
    group_id="dbt_fusion_project",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
)

@task
def post_dbt():
    pass

chain(pre_dbt(), dbt, post_dbt())
my_dag()

---

8. Final Validation

8. 最终验证

Before finalizing, verify:
  • Cosmos version: ≥1.11.0
  • Execution mode: LOCAL only
  • Fusion binary installed: Path exists and is executable
  • Warehouse supported: Snowflake or Databricks only
  • Secrets handling: Airflow connections or env vars, NOT plaintext
完成配置前,请验证以下内容:
  • Cosmos版本:≥1.11.0
  • 执行模式:仅使用LOCAL模式
  • Fusion二进制文件已安装:路径存在且可执行
  • 数据仓库受支持:仅使用Snowflake或Databricks
  • 密钥管理:使用Airflow连接或环境变量,而非明文

Troubleshooting

故障排除

If user reports dbt Core regressions after enabling Fusion:
bash
AIRFLOW__COSMOS__PRE_DBT_FUSION=1
若用户反馈启用Fusion后出现dbt Core回归问题:
bash
AIRFLOW__COSMOS__PRE_DBT_FUSION=1

User Must Test

用户必须执行的测试

  • The DAG parses in the Airflow UI (no import/parse-time errors)
  • A manual run succeeds against the target warehouse (at least one model)

  • DAG可在Airflow UI中解析(无导入/解析时错误)
  • 手动运行可成功对接目标数据仓库(至少运行一个模型)

Reference

参考链接

Related Skills

相关技能

  • cosmos-dbt-core: For dbt Core projects (not Fusion)
  • authoring-dags: General DAG authoring patterns
  • testing-dags: Testing DAGs after creation
  • cosmos-dbt-core:适用于dbt Core项目(非Fusion)
  • authoring-dags:通用DAG编写模式
  • testing-dags:创建DAG后的测试方法