cosmos-dbt-fusion
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCosmos + dbt Fusion: Implementation Checklist
Cosmos + dbt Fusion:实施检查清单
Execute steps in order. This skill covers Fusion-specific constraints only.
Version note: dbt Fusion support was introduced in Cosmos 1.11.0. Requires Cosmos ≥1.11.Reference: See reference/cosmos-config.md for ProfileConfig, operator_args, and Airflow 3 compatibility details.
Before starting, confirm: (1) dbt engine = Fusion (not Core → use cosmos-dbt-core), (2) warehouse = Snowflake or Databricks only, (3)is acceptable (no containerized/async/virtualenv).ExecutionMode.LOCAL
请按顺序执行以下步骤。本文仅涵盖Fusion特有的约束条件。
版本说明:dbt Fusion支持在Cosmos 1.11.0版本中引入。要求Cosmos版本≥1.11。参考文档:有关ProfileConfig、operator_args以及Airflow 3兼容性的详细信息,请查看**reference/cosmos-config.md**。
开始前确认:(1) dbt引擎为Fusion(若为Core请使用cosmos-dbt-core),(2) 仅支持Snowflake或Databricks数据仓库,(3) 可接受模式(不支持容器化/异步/虚拟环境)。ExecutionMode.LOCAL
Fusion-Specific Constraints
Fusion特有的约束条件
| Constraint | Details |
|---|---|
| Execution mode | |
| No async | |
| No containerized | |
| No virtualenv | Fusion is a binary, not a Python package |
| Warehouse support | Snowflake + Databricks only (public beta) |
| 约束条件 | 详情 |
|---|---|
| 执行模式 | 仅支持 |
| 不支持异步 | 不支持 |
| 不支持容器化 | 不支持 |
| 不支持虚拟环境 | Fusion是二进制文件,而非Python包 |
| 数据仓库支持 | 仅支持Snowflake和Databricks(公开测试版) |
1. Confirm Cosmos Version
1. 确认Cosmos版本
CRITICAL: Cosmos 1.11.0 introduced dbt Fusion compatibility.
bash
undefined重要提示:Cosmos 1.11.0版本开始支持dbt Fusion。
bash
undefinedCheck installed version
Check installed version
pip show astronomer-cosmos
pip show astronomer-cosmos
Install/upgrade if needed
Install/upgrade if needed
pip install "astronomer-cosmos>=1.11.0"
**Validate**: `pip show astronomer-cosmos` reports version ≥ 1.11.0
---pip install "astronomer-cosmos>=1.11.0"
**验证**:`pip show astronomer-cosmos`显示的版本≥1.11.0
---2. Install the dbt Fusion Binary (REQUIRED)
2. 安装dbt Fusion二进制文件(必填)
dbt Fusion is NOT bundled with Cosmos or dbt Core. Install it into the Airflow runtime/image.
Determine where to install the Fusion binary (Dockerfile / base image / runtime).
dbt Fusion未与Cosmos或dbt Core捆绑。请将其安装到Airflow运行环境/镜像中。
确定Fusion二进制文件的安装位置(Dockerfile / 基础镜像 / 运行时环境)。
Example Dockerfile Install
Dockerfile安装示例
dockerfile
USER root
RUN apt-get update && apt-get install -y curl
ENV SHELL=/bin/bash
RUN curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s -- --update
USER astrodockerfile
USER root
RUN apt-get update && apt-get install -y curl
ENV SHELL=/bin/bash
RUN curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s -- --update
USER astroCommon Install Paths
常见安装路径
| Environment | Typical path |
|---|---|
| Astro Runtime | |
| System-wide | |
Validate: The binary exists at the chosen path and succeeds.
dbtdbt --version| 环境 | 典型路径 |
|---|---|
| Astro Runtime | |
| 系统全局 | |
验证:所选路径下存在二进制文件,且执行成功。
dbtdbt --version3. Choose Parsing Strategy (RenderConfig)
3. 选择解析策略(RenderConfig)
Parsing strategy is the same as dbt Core. Pick ONE:
| Load mode | When to use | Required inputs |
|---|---|---|
| Large projects; fastest parsing | |
| Complex selectors; need dbt-native selection | Fusion binary accessible to scheduler |
| Simple setups; let Cosmos pick | (none) |
python
from cosmos import RenderConfig, LoadMode
_render_config = RenderConfig(
load_method=LoadMode.AUTOMATIC, # or DBT_MANIFEST, DBT_LS
)解析策略与dbt Core相同。请选择其中一种:
| 加载模式 | 使用场景 | 必填输入 |
|---|---|---|
| 大型项目;解析速度最快 | |
| 复杂选择器;需要dbt原生选择功能 | 调度器可访问Fusion二进制文件 |
| 简单配置;由Cosmos自动选择 | 无 |
python
from cosmos import RenderConfig, LoadMode
_render_config = RenderConfig(
load_method=LoadMode.AUTOMATIC, # or DBT_MANIFEST, DBT_LS
)4. Configure Warehouse Connection (ProfileConfig)
4. 配置数据仓库连接(ProfileConfig)
Reference: See reference/cosmos-config.md for full ProfileConfig options and examples.
| Warehouse | ProfileMapping Class |
|---|---|
| Snowflake | |
| Databricks | |
python
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
),
)参考文档:有关ProfileConfig的完整选项和示例,请查看**reference/cosmos-config.md**。
| 数据仓库 | ProfileMapping类 |
|---|---|
| Snowflake | |
| Databricks | |
python
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
),
)5. Configure ExecutionConfig (LOCAL Only)
5. 配置执行配置(仅LOCAL模式)
CRITICAL: dbt Fusion with Cosmos requireswithExecutionMode.LOCALpointing to the Fusion binary.dbt_executable_path
python
from cosmos import ExecutionConfig
_execution_config = ExecutionConfig(
dbt_executable_path="/home/astro/.local/bin/dbt", # REQUIRED: path to Fusion binary
# execution_mode is LOCAL by default - do not change
)重要提示:Cosmos搭配dbt Fusion使用时,必须设置,且ExecutionMode.LOCAL指向Fusion二进制文件。dbt_executable_path
python
from cosmos import ExecutionConfig
_execution_config = ExecutionConfig(
dbt_executable_path="/home/astro/.local/bin/dbt", # REQUIRED: path to Fusion binary
# execution_mode is LOCAL by default - do not change
)What "Local-Only" Means
"仅本地模式"的含义
| Allowed | Not Allowed |
|---|---|
| ✅ Install Fusion binary into Airflow image/runtime | ❌ |
✅ | ❌ |
❌ |
| 允许操作 | 禁止操作 |
|---|---|
| ✅ 将Fusion二进制文件安装到Airflow镜像/运行环境 | ❌ |
✅ | ❌ |
❌ |
6. Configure Project (ProjectConfig)
6. 配置项目(ProjectConfig)
python
from cosmos import ProjectConfig
_project_config = ProjectConfig(
dbt_project_path="/path/to/dbt/project",
# manifest_path="/path/to/manifest.json", # for dbt_manifest load mode
# install_dbt_deps=False, # if deps precomputed in CI
)python
from cosmos import ProjectConfig
_project_config = ProjectConfig(
dbt_project_path="/path/to/dbt/project",
# manifest_path="/path/to/manifest.json", # for dbt_manifest load mode
# install_dbt_deps=False, # if deps precomputed in CI
)7. Assemble DAG / TaskGroup
7. 组装DAG / TaskGroup
Option A: DbtDag (Standalone)
选项A:DbtDag(独立使用)
python
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime
_project_config = ProjectConfig(
dbt_project_path="/usr/local/airflow/dbt/my_project",
)
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
),
)
_execution_config = ExecutionConfig(
dbt_executable_path="/home/astro/.local/bin/dbt", # Fusion binary
)
_render_config = RenderConfig()
my_fusion_dag = DbtDag(
dag_id="my_fusion_cosmos_dag",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
start_date=datetime(2025, 1, 1),
schedule="@daily",
)python
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime
_project_config = ProjectConfig(
dbt_project_path="/usr/local/airflow/dbt/my_project",
)
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
),
)
_execution_config = ExecutionConfig(
dbt_executable_path="/home/astro/.local/bin/dbt", # Fusion binary
)
_render_config = RenderConfig()
my_fusion_dag = DbtDag(
dag_id="my_fusion_cosmos_dag",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
start_date=datetime(2025, 1, 1),
schedule="@daily",
)Option B: DbtTaskGroup (Inside Existing DAG)
选项B:DbtTaskGroup(嵌入现有DAG)
python
from airflow.sdk import dag, task # Airflow 3.xpython
from airflow.sdk import dag, task # Airflow 3.xfrom airflow.decorators import dag, task # Airflow 2.x
from airflow.decorators import dag, task # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
from pendulum import datetime
_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig(dbt_executable_path="/home/astro/.local/bin/dbt")
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
@task
def pre_dbt():
return "some_value"
dbt = DbtTaskGroup(
group_id="dbt_fusion_project",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
)
@task
def post_dbt():
pass
chain(pre_dbt(), dbt, post_dbt())my_dag()
---from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
from pendulum import datetime
_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig(dbt_executable_path="/home/astro/.local/bin/dbt")
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
@task
def pre_dbt():
return "some_value"
dbt = DbtTaskGroup(
group_id="dbt_fusion_project",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
)
@task
def post_dbt():
pass
chain(pre_dbt(), dbt, post_dbt())my_dag()
---8. Final Validation
8. 最终验证
Before finalizing, verify:
- Cosmos version: ≥1.11.0
- Execution mode: LOCAL only
- Fusion binary installed: Path exists and is executable
- Warehouse supported: Snowflake or Databricks only
- Secrets handling: Airflow connections or env vars, NOT plaintext
完成配置前,请验证以下内容:
- Cosmos版本:≥1.11.0
- 执行模式:仅使用LOCAL模式
- Fusion二进制文件已安装:路径存在且可执行
- 数据仓库受支持:仅使用Snowflake或Databricks
- 密钥管理:使用Airflow连接或环境变量,而非明文
Troubleshooting
故障排除
If user reports dbt Core regressions after enabling Fusion:
bash
AIRFLOW__COSMOS__PRE_DBT_FUSION=1若用户反馈启用Fusion后出现dbt Core回归问题:
bash
AIRFLOW__COSMOS__PRE_DBT_FUSION=1User Must Test
用户必须执行的测试
- The DAG parses in the Airflow UI (no import/parse-time errors)
- A manual run succeeds against the target warehouse (at least one model)
- DAG可在Airflow UI中解析(无导入/解析时错误)
- 手动运行可成功对接目标数据仓库(至少运行一个模型)
Reference
参考链接
- Cosmos dbt Fusion docs: https://astronomer.github.io/astronomer-cosmos/configuration/dbt-fusion.html
- dbt Fusion install: https://docs.getdbt.com/docs/core/pip-install#dbt-fusion
- Cosmos dbt Fusion文档:https://astronomer.github.io/astronomer-cosmos/configuration/dbt-fusion.html
- dbt Fusion安装:https://docs.getdbt.com/docs/core/pip-install#dbt-fusion
Related Skills
相关技能
- cosmos-dbt-core: For dbt Core projects (not Fusion)
- authoring-dags: General DAG authoring patterns
- testing-dags: Testing DAGs after creation
- cosmos-dbt-core:适用于dbt Core项目(非Fusion)
- authoring-dags:通用DAG编写模式
- testing-dags:创建DAG后的测试方法