cosmos-dbt-core

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Cosmos + dbt Core: Implementation Checklist

Cosmos + dbt Core:实施检查清单

Execute steps in order. Prefer the simplest configuration that meets the user's constraints.
Version note: This skill targets Cosmos 1.11+ and Airflow 3.x. If the user is on Airflow 2.x, adjust imports accordingly (see Appendix A).
Before starting, confirm: (1) dbt engine = Core (not Fusion → use cosmos-dbt-fusion), (2) warehouse type, (3) Airflow version, (4) execution environment (Airflow env / venv / container), (5) DbtDag vs DbtTaskGroup, (6) manifest availability.

请按顺序执行以下步骤。优先选择符合用户约束条件的最简配置。
版本说明:本指南针对Cosmos 1.11+和Airflow 3.x版本。如果用户使用Airflow 2.x,请相应调整导入语句(见附录A)。
参考链接:最新稳定版:https://pypi.org/project/astronomer-cosmos/
开始前确认:(1) dbt引擎为Core(若为Fusion,请使用cosmos-dbt-fusion),(2) 数据仓库类型,(3) Airflow版本,(4) 执行环境(Airflow环境/虚拟环境/容器),(5) 选择DbtDag还是DbtTaskGroup,(6) manifest文件是否可用。

1. Choose Parsing Strategy (RenderConfig)

1. 选择解析策略(RenderConfig)

Pick ONE load mode based on constraints:
Load modeWhen to useRequired inputsConstraints
dbt_manifest
Large projects; containerized execution; fastest
ProjectConfig.manifest_path
Remote manifest needs
manifest_conn_id
dbt_ls
Complex selectors; need dbt-native selectiondbt installed OR
dbt_executable_path
Cannot use with containerized execution
dbt_ls_file
dbt_ls selection without running dbt_ls every parse
RenderConfig.dbt_ls_path
select
/
exclude
won't work
automatic
Simple setups; let Cosmos pick(none)Falls back: manifest → dbt_ls → custom
CRITICAL: Containerized execution (
DOCKER
/
KUBERNETES
/etc.) → MUST use
dbt_manifest
load mode.
python
from cosmos import RenderConfig, LoadMode

_render_config = RenderConfig(
    load_method=LoadMode.DBT_MANIFEST,  # or DBT_LS, DBT_LS_FILE, AUTOMATIC
)

根据约束条件选择一种加载模式:
加载模式使用场景必填输入约束条件
dbt_manifest
大型项目;容器化执行;速度最快
ProjectConfig.manifest_path
远程manifest文件需要配置
manifest_conn_id
dbt_ls
复杂选择器;需要dbt原生选择逻辑已安装dbt 或 指定
dbt_executable_path
无法与容器化执行配合使用
dbt_ls_file
无需每次解析都运行dbt_ls即可使用其选择结果
RenderConfig.dbt_ls_path
select
/
exclude
参数将失效
automatic
简单部署场景;由Cosmos自动选择fallback顺序:manifest → dbt_ls → 自定义
重要提示:容器化执行(
DOCKER
/
KUBERNETES
等)→ 必须使用
dbt_manifest
加载模式。
python
from cosmos import RenderConfig, LoadMode

_render_config = RenderConfig(
    load_method=LoadMode.DBT_MANIFEST,  # 也可选择DBT_LS, DBT_LS_FILE, AUTOMATIC
)

2. Choose Execution Mode (ExecutionConfig)

2. 选择执行模式(ExecutionConfig)

Reference: See reference/cosmos-config.md for detailed configuration examples per mode.
Pick ONE execution mode:
Execution modeWhen to useSpeedRequired setup
WATCHER
Fastest; single
dbt build
visibility
Fastestdbt adapter in env OR
dbt_executable_path
LOCAL
+
DBT_RUNNER
dbt + adapter in Airflow envFastdbt 1.5+ in
requirements.txt
LOCAL
+
SUBPROCESS
dbt in venv baked into imageMedium
dbt_executable_path
AIRFLOW_ASYNC
BigQuery + long-running transformsVariesAirflow ≥2.8; provider deps
VIRTUALENV
Can't modify image; runtime venvSlower
py_requirements
in operator_args
ContainerizedFull isolation per taskSlowestmanifest required; container config
CRITICAL: Containerized execution (
DOCKER
/
KUBERNETES
/etc.) → MUST use
dbt_manifest
load mode.
python
from cosmos import ExecutionConfig, ExecutionMode

_execution_config = ExecutionConfig(
    execution_mode=ExecutionMode.LOCAL,  # or WATCHER, VIRTUALENV, AIRFLOW_ASYNC, KUBERNETES, etc.
)

参考链接:每种模式的详细配置示例请见 reference/cosmos-config.md
选择一种执行模式:
执行模式使用场景速度所需配置
WATCHER
速度最快;单个
dbt build
任务的可见性
最快环境中已安装dbt适配器 或 指定
dbt_executable_path
LOCAL
+
DBT_RUNNER
Airflow环境中已安装dbt及适配器requirements.txt中包含dbt 1.5+
LOCAL
+
SUBPROCESS
dbt已打包到镜像的虚拟环境中中等指定
dbt_executable_path
AIRFLOW_ASYNC
BigQuery + 长时间运行的转换任务不定Airflow ≥2.8;已安装对应依赖包
VIRTUALENV
无法修改镜像;运行时创建虚拟环境较慢operator_args中配置
py_requirements
容器化每个任务完全隔离最慢需要manifest文件;配置容器相关参数
重要提示:容器化执行(
DOCKER
/
KUBERNETES
等)→ 必须使用
dbt_manifest
加载模式。
python
from cosmos import ExecutionConfig, ExecutionMode

_execution_config = ExecutionConfig(
    execution_mode=ExecutionMode.LOCAL,  # 也可选择WATCHER, VIRTUALENV, AIRFLOW_ASYNC, KUBERNETES等
)

3. Configure Warehouse Connection (ProfileConfig)

3. 配置数据仓库连接(ProfileConfig)

Reference: See reference/cosmos-config.md for detailed ProfileConfig options and all ProfileMapping classes.
参考链接:详细的ProfileConfig选项及所有ProfileMapping类请见 reference/cosmos-config.md

Option A: Airflow Connection + ProfileMapping (Recommended)

选项A:Airflow连接 + ProfileMapping(推荐)

python
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
        profile_args={"schema": "my_schema"},
    ),
)
python
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
        profile_args={"schema": "my_schema"},
    ),
)

Option B: Existing profiles.yml

选项B:使用已有的profiles.yml

CRITICAL: Do not hardcode secrets; use environment variables.
python
from cosmos import ProfileConfig

_profile_config = ProfileConfig(
    profile_name="my_profile",
    target_name="dev",
    profiles_yml_filepath="/path/to/profiles.yml",
)

重要提示:请勿硬编码密钥;请使用环境变量。
python
from cosmos import ProfileConfig

_profile_config = ProfileConfig(
    profile_name="my_profile",
    target_name="dev",
    profiles_yml_filepath="/path/to/profiles.yml",
)

4. Configure Project (ProjectConfig)

4. 配置项目(ProjectConfig)

ApproachWhen to useRequired param
Project pathFiles available locally
dbt_project_path
Manifest only
dbt_manifest
load; containerized
manifest_path
+
project_name
python
from cosmos import ProjectConfig

_project_config = ProjectConfig(
    dbt_project_path="/path/to/dbt/project",
    # manifest_path="/path/to/manifest.json",  # for dbt_manifest load mode
    # project_name="my_project",  # if using manifest_path without dbt_project_path
    # install_dbt_deps=False,  # if deps precomputed in CI
)

方式使用场景必填参数
项目路径文件本地可用
dbt_project_path
仅使用Manifest采用
dbt_manifest
加载模式;容器化执行
manifest_path
+
project_name
python
from cosmos import ProjectConfig

_project_config = ProjectConfig(
    dbt_project_path="/path/to/dbt/project",
    # manifest_path="/path/to/manifest.json",  # 适用于dbt_manifest加载模式
    # project_name="my_project",  # 若仅使用manifest_path而不指定dbt_project_path时需要
    # install_dbt_deps=False,  # 若依赖已在CI中预安装
)

5. Configure Testing Behavior (RenderConfig)

5. 配置测试行为(RenderConfig)

Reference: See reference/cosmos-config.md for detailed testing options.
TestBehaviorBehavior
AFTER_EACH
Tests run immediately after each model (default)
BUILD
Combine run + test into single
dbt build
AFTER_ALL
All tests after all models complete
NONE
Skip tests
python
from cosmos import RenderConfig, TestBehavior

_render_config = RenderConfig(
    test_behavior=TestBehavior.AFTER_EACH,
)

参考链接:详细的测试选项请见 reference/cosmos-config.md
TestBehavior行为
AFTER_EACH
每个模型执行完成后立即运行测试(默认)
BUILD
将run和test合并为单个
dbt build
任务
AFTER_ALL
所有模型执行完成后再运行所有测试
NONE
跳过测试
python
from cosmos import RenderConfig, TestBehavior

_render_config = RenderConfig(
    test_behavior=TestBehavior.AFTER_EACH,
)

6. Configure operator_args

6. 配置operator_args

Reference: See reference/cosmos-config.md for detailed operator_args options.
python
_operator_args = {
    # BaseOperator params
    "retries": 3,

    # Cosmos-specific params
    "install_deps": False,
    "full_refresh": False,
    "quiet": True,

    # Runtime dbt vars (XCom / params)
    "vars": '{"my_var": "{{ ti.xcom_pull(task_ids=\'pre_dbt\') }}"}',
}

参考链接:详细的operator_args选项请见 reference/cosmos-config.md
python
_operator_args = {
    # BaseOperator参数
    "retries": 3,

    # Cosmos专属参数
    "install_deps": False,
    "full_refresh": False,
    "quiet": True,

    # 运行时dbt变量(XCom / 参数)
    "vars": '{"my_var": "{{ ti.xcom_pull(task_ids=\'pre_dbt\') }}"}',
}

7. Assemble DAG / TaskGroup

7. 组装DAG / TaskGroup

Option A: DbtDag (Standalone)

选项A:DbtDag(独立使用)

python
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime

_project_config = ProjectConfig(
    dbt_project_path="/usr/local/airflow/dbt/my_project",
)

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

_execution_config = ExecutionConfig()
_render_config = RenderConfig()

my_cosmos_dag = DbtDag(
    dag_id="my_cosmos_dag",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
    operator_args={},
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
)
python
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime

_project_config = ProjectConfig(
    dbt_project_path="/usr/local/airflow/dbt/my_project",
)

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

_execution_config = ExecutionConfig()
_render_config = RenderConfig()

my_cosmos_dag = DbtDag(
    dag_id="my_cosmos_dag",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
    operator_args={},
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
)

Option B: DbtTaskGroup (Inside Existing DAG)

选项B:DbtTaskGroup(嵌入现有DAG)

python
from airflow.sdk import dag, task  # Airflow 3.x
python
from airflow.sdk import dag, task  # Airflow 3.x

from airflow.decorators import dag, task # Airflow 2.x

from airflow.decorators import dag, task # Airflow 2.x

from airflow.models.baseoperator import chain from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig from pendulum import datetime
_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project") _profile_config = ProfileConfig(profile_name="default", target_name="dev") _execution_config = ExecutionConfig() _render_config = RenderConfig()
@dag(start_date=datetime(2025, 1, 1), schedule="@daily") def my_dag(): @task def pre_dbt(): return "some_value"
dbt = DbtTaskGroup(
    group_id="dbt_project",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
)

@task
def post_dbt():
    pass

chain(pre_dbt(), dbt, post_dbt())
my_dag()
undefined
from airflow.models.baseoperator import chain from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig from pendulum import datetime
_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project") _profile_config = ProfileConfig(profile_name="default", target_name="dev") _execution_config = ExecutionConfig() _render_config = RenderConfig()
@dag(start_date=datetime(2025, 1, 1), schedule="@daily") def my_dag(): @task def pre_dbt(): return "some_value"
dbt = DbtTaskGroup(
    group_id="dbt_project",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
)

@task
def post_dbt():
    pass

chain(pre_dbt(), dbt, post_dbt())
my_dag()
undefined

Setting Dependencies on Individual Cosmos Tasks

为单个Cosmos任务配置依赖

python
from cosmos import DbtDag, DbtResourceType
from airflow.sdk import task, chain

with DbtDag(...) as dag:
    @task
    def upstream_task():
        pass

    _upstream = upstream_task()

    for unique_id, dbt_node in dag.dbt_graph.filtered_nodes.items():
        if dbt_node.resource_type == DbtResourceType.SEED:
            my_dbt_task = dag.tasks_map[unique_id]
            chain(_upstream, my_dbt_task)

python
from cosmos import DbtDag, DbtResourceType
from airflow.sdk import task, chain

with DbtDag(...) as dag:
    @task
    def upstream_task():
        pass

    _upstream = upstream_task()

    for unique_id, dbt_node in dag.dbt_graph.filtered_nodes.items():
        if dbt_node.resource_type == DbtResourceType.SEED:
            my_dbt_task = dag.tasks_map[unique_id]
            chain(_upstream, my_dbt_task)

8. Safety Checks

8. 安全检查

Before finalizing, verify:
  • Execution mode matches constraints (AIRFLOW_ASYNC → BigQuery only; containerized → manifest required)
  • Warehouse adapter installed for chosen execution mode
  • Secrets via Airflow connections or env vars, NOT plaintext
  • Load mode matches execution (containerized → manifest; complex selectors → dbt_ls)
  • Airflow 3 asset URIs if downstream DAGs scheduled on Cosmos assets (see Appendix A)

最终确定前,请验证:
  • 执行模式符合约束条件(AIRFLOW_ASYNC仅适用于BigQuery;容器化执行需要manifest文件)
  • 已为所选执行模式安装对应的数据仓库适配器
  • 密钥通过Airflow连接或环境变量管理,而非明文存储
  • 加载模式与执行方式匹配(容器化执行→manifest;复杂选择器→dbt_ls)
  • 若下游DAG基于Cosmos资产调度,请使用Airflow 3的资产URI(见附录A)

Appendix A: Airflow 3 Compatibility

附录A:Airflow 3兼容性

Import Differences

导入差异

Airflow 3.xAirflow 2.x
from airflow.sdk import dag, task
from airflow.decorators import dag, task
from airflow.sdk import chain
from airflow.models.baseoperator import chain
Airflow 3.xAirflow 2.x
from airflow.sdk import dag, task
from airflow.decorators import dag, task
from airflow.sdk import chain
from airflow.models.baseoperator import chain

Asset/Dataset URI Format Change

资产/数据集URI格式变更

Cosmos ≤1.9 (Airflow 2 Datasets):
postgres://0.0.0.0:5434/postgres.public.orders
Cosmos ≥1.10 (Airflow 3 Assets):
postgres://0.0.0.0:5434/postgres/public/orders
CRITICAL: Update asset URIs when upgrading to Airflow 3.

Cosmos ≤1.9(Airflow 2 Datasets):
postgres://0.0.0.0:5434/postgres.public.orders
Cosmos ≥1.10(Airflow 3 Assets):
postgres://0.0.0.0:5434/postgres/public/orders
重要提示:升级到Airflow 3时,请更新资产URI。

Appendix B: Operational Extras

附录B:额外操作配置

Caching

缓存

Cosmos caches artifacts to speed up parsing. Enabled by default.
Cosmos会缓存工件以加快解析速度,默认启用。

Memory-Optimized Imports

内存优化导入

bash
AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=True
When enabled:
python
from cosmos.airflow.dag import DbtDag  # instead of: from cosmos import DbtDag
bash
AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=True
启用后:
python
from cosmos.airflow.dag import DbtDag  # 替代:from cosmos import DbtDag

Artifact Upload to Object Storage

工件上传至对象存储

bash
AIRFLOW__COSMOS__REMOTE_TARGET_PATH=s3://bucket/target_dir/
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=aws_default
python
from cosmos.io import upload_to_cloud_storage

my_dag = DbtDag(
    # ...
    operator_args={"callback": upload_to_cloud_storage},
)
bash
AIRFLOW__COSMOS__REMOTE_TARGET_PATH=s3://bucket/target_dir/
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=aws_default
python
from cosmos.io import upload_to_cloud_storage

my_dag = DbtDag(
    # ...
    operator_args={"callback": upload_to_cloud_storage},
)

dbt Docs Hosting (Airflow 3.1+ / Cosmos 1.11+)

dbt Docs托管(Airflow 3.1+ / Cosmos 1.11+)

bash
AIRFLOW__COSMOS__DBT_DOCS_PROJECTS='{
    "my_project": {
        "dir": "s3://bucket/docs/",
        "index": "index.html",
        "conn_id": "aws_default",
        "name": "My Project"
    }
}'

bash
AIRFLOW__COSMOS__DBT_DOCS_PROJECTS='{
    "my_project": {
        "dir": "s3://bucket/docs/",
        "index": "index.html",
        "conn_id": "aws_default",
        "name": "My Project"
    }
}'

Related Skills

相关技能

  • cosmos-dbt-fusion: For dbt Fusion projects (not dbt Core)
  • authoring-dags: General DAG authoring patterns
  • testing-dags: Testing DAGs after creation
  • cosmos-dbt-fusion:适用于dbt Fusion项目(非dbt Core)
  • authoring-dags:通用DAG编写模式
  • testing-dags:创建DAG后的测试方法