cosmos-dbt-core
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCosmos + dbt Core: Implementation Checklist
Cosmos + dbt Core:实施检查清单
Execute steps in order. Prefer the simplest configuration that meets the user's constraints.
Version note: This skill targets Cosmos 1.11+ and Airflow 3.x. If the user is on Airflow 2.x, adjust imports accordingly (see Appendix A).Reference: Latest stable: https://pypi.org/project/astronomer-cosmos/
Before starting, confirm: (1) dbt engine = Core (not Fusion → use cosmos-dbt-fusion), (2) warehouse type, (3) Airflow version, (4) execution environment (Airflow env / venv / container), (5) DbtDag vs DbtTaskGroup, (6) manifest availability.
请按顺序执行以下步骤。优先选择符合用户约束条件的最简配置。
版本说明:本指南针对Cosmos 1.11+和Airflow 3.x版本。如果用户使用Airflow 2.x,请相应调整导入语句(见附录A)。参考链接:最新稳定版:https://pypi.org/project/astronomer-cosmos/
开始前确认:(1) dbt引擎为Core(若为Fusion,请使用cosmos-dbt-fusion),(2) 数据仓库类型,(3) Airflow版本,(4) 执行环境(Airflow环境/虚拟环境/容器),(5) 选择DbtDag还是DbtTaskGroup,(6) manifest文件是否可用。
1. Choose Parsing Strategy (RenderConfig)
1. 选择解析策略(RenderConfig)
Pick ONE load mode based on constraints:
| Load mode | When to use | Required inputs | Constraints |
|---|---|---|---|
| Large projects; containerized execution; fastest | | Remote manifest needs |
| Complex selectors; need dbt-native selection | dbt installed OR | Cannot use with containerized execution |
| dbt_ls selection without running dbt_ls every parse | | |
| Simple setups; let Cosmos pick | (none) | Falls back: manifest → dbt_ls → custom |
CRITICAL: Containerized execution (/DOCKER/etc.) → MUST useKUBERNETESload mode.dbt_manifest
python
from cosmos import RenderConfig, LoadMode
_render_config = RenderConfig(
load_method=LoadMode.DBT_MANIFEST, # or DBT_LS, DBT_LS_FILE, AUTOMATIC
)根据约束条件选择一种加载模式:
| 加载模式 | 使用场景 | 必填输入 | 约束条件 |
|---|---|---|---|
| 大型项目;容器化执行;速度最快 | | 远程manifest文件需要配置 |
| 复杂选择器;需要dbt原生选择逻辑 | 已安装dbt 或 指定 | 无法与容器化执行配合使用 |
| 无需每次解析都运行dbt_ls即可使用其选择结果 | | |
| 简单部署场景;由Cosmos自动选择 | 无 | fallback顺序:manifest → dbt_ls → 自定义 |
重要提示:容器化执行(/DOCKER等)→ 必须使用KUBERNETES加载模式。dbt_manifest
python
from cosmos import RenderConfig, LoadMode
_render_config = RenderConfig(
load_method=LoadMode.DBT_MANIFEST, # 也可选择DBT_LS, DBT_LS_FILE, AUTOMATIC
)2. Choose Execution Mode (ExecutionConfig)
2. 选择执行模式(ExecutionConfig)
Reference: See reference/cosmos-config.md for detailed configuration examples per mode.
Pick ONE execution mode:
| Execution mode | When to use | Speed | Required setup |
|---|---|---|---|
| Fastest; single | Fastest | dbt adapter in env OR |
| dbt + adapter in Airflow env | Fast | dbt 1.5+ in |
| dbt in venv baked into image | Medium | |
| BigQuery + long-running transforms | Varies | Airflow ≥2.8; provider deps |
| Can't modify image; runtime venv | Slower | |
| Containerized | Full isolation per task | Slowest | manifest required; container config |
CRITICAL: Containerized execution (/DOCKER/etc.) → MUST useKUBERNETESload mode.dbt_manifest
python
from cosmos import ExecutionConfig, ExecutionMode
_execution_config = ExecutionConfig(
execution_mode=ExecutionMode.LOCAL, # or WATCHER, VIRTUALENV, AIRFLOW_ASYNC, KUBERNETES, etc.
)参考链接:每种模式的详细配置示例请见 reference/cosmos-config.md。
选择一种执行模式:
| 执行模式 | 使用场景 | 速度 | 所需配置 |
|---|---|---|---|
| 速度最快;单个 | 最快 | 环境中已安装dbt适配器 或 指定 |
| Airflow环境中已安装dbt及适配器 | 快 | requirements.txt中包含dbt 1.5+ |
| dbt已打包到镜像的虚拟环境中 | 中等 | 指定 |
| BigQuery + 长时间运行的转换任务 | 不定 | Airflow ≥2.8;已安装对应依赖包 |
| 无法修改镜像;运行时创建虚拟环境 | 较慢 | operator_args中配置 |
| 容器化 | 每个任务完全隔离 | 最慢 | 需要manifest文件;配置容器相关参数 |
重要提示:容器化执行(/DOCKER等)→ 必须使用KUBERNETES加载模式。dbt_manifest
python
from cosmos import ExecutionConfig, ExecutionMode
_execution_config = ExecutionConfig(
execution_mode=ExecutionMode.LOCAL, # 也可选择WATCHER, VIRTUALENV, AIRFLOW_ASYNC, KUBERNETES等
)3. Configure Warehouse Connection (ProfileConfig)
3. 配置数据仓库连接(ProfileConfig)
Reference: See reference/cosmos-config.md for detailed ProfileConfig options and all ProfileMapping classes.
参考链接:详细的ProfileConfig选项及所有ProfileMapping类请见 reference/cosmos-config.md。
Option A: Airflow Connection + ProfileMapping (Recommended)
选项A:Airflow连接 + ProfileMapping(推荐)
python
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
profile_args={"schema": "my_schema"},
),
)python
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
profile_args={"schema": "my_schema"},
),
)Option B: Existing profiles.yml
选项B:使用已有的profiles.yml
CRITICAL: Do not hardcode secrets; use environment variables.
python
from cosmos import ProfileConfig
_profile_config = ProfileConfig(
profile_name="my_profile",
target_name="dev",
profiles_yml_filepath="/path/to/profiles.yml",
)重要提示:请勿硬编码密钥;请使用环境变量。
python
from cosmos import ProfileConfig
_profile_config = ProfileConfig(
profile_name="my_profile",
target_name="dev",
profiles_yml_filepath="/path/to/profiles.yml",
)4. Configure Project (ProjectConfig)
4. 配置项目(ProjectConfig)
| Approach | When to use | Required param |
|---|---|---|
| Project path | Files available locally | |
| Manifest only | | |
python
from cosmos import ProjectConfig
_project_config = ProjectConfig(
dbt_project_path="/path/to/dbt/project",
# manifest_path="/path/to/manifest.json", # for dbt_manifest load mode
# project_name="my_project", # if using manifest_path without dbt_project_path
# install_dbt_deps=False, # if deps precomputed in CI
)| 方式 | 使用场景 | 必填参数 |
|---|---|---|
| 项目路径 | 文件本地可用 | |
| 仅使用Manifest | 采用 | |
python
from cosmos import ProjectConfig
_project_config = ProjectConfig(
dbt_project_path="/path/to/dbt/project",
# manifest_path="/path/to/manifest.json", # 适用于dbt_manifest加载模式
# project_name="my_project", # 若仅使用manifest_path而不指定dbt_project_path时需要
# install_dbt_deps=False, # 若依赖已在CI中预安装
)5. Configure Testing Behavior (RenderConfig)
5. 配置测试行为(RenderConfig)
Reference: See reference/cosmos-config.md for detailed testing options.
| TestBehavior | Behavior |
|---|---|
| Tests run immediately after each model (default) |
| Combine run + test into single |
| All tests after all models complete |
| Skip tests |
python
from cosmos import RenderConfig, TestBehavior
_render_config = RenderConfig(
test_behavior=TestBehavior.AFTER_EACH,
)参考链接:详细的测试选项请见 reference/cosmos-config.md。
| TestBehavior | 行为 |
|---|---|
| 每个模型执行完成后立即运行测试(默认) |
| 将run和test合并为单个 |
| 所有模型执行完成后再运行所有测试 |
| 跳过测试 |
python
from cosmos import RenderConfig, TestBehavior
_render_config = RenderConfig(
test_behavior=TestBehavior.AFTER_EACH,
)6. Configure operator_args
6. 配置operator_args
Reference: See reference/cosmos-config.md for detailed operator_args options.
python
_operator_args = {
# BaseOperator params
"retries": 3,
# Cosmos-specific params
"install_deps": False,
"full_refresh": False,
"quiet": True,
# Runtime dbt vars (XCom / params)
"vars": '{"my_var": "{{ ti.xcom_pull(task_ids=\'pre_dbt\') }}"}',
}参考链接:详细的operator_args选项请见 reference/cosmos-config.md。
python
_operator_args = {
# BaseOperator参数
"retries": 3,
# Cosmos专属参数
"install_deps": False,
"full_refresh": False,
"quiet": True,
# 运行时dbt变量(XCom / 参数)
"vars": '{"my_var": "{{ ti.xcom_pull(task_ids=\'pre_dbt\') }}"}',
}7. Assemble DAG / TaskGroup
7. 组装DAG / TaskGroup
Option A: DbtDag (Standalone)
选项A:DbtDag(独立使用)
python
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime
_project_config = ProjectConfig(
dbt_project_path="/usr/local/airflow/dbt/my_project",
)
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
),
)
_execution_config = ExecutionConfig()
_render_config = RenderConfig()
my_cosmos_dag = DbtDag(
dag_id="my_cosmos_dag",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
operator_args={},
start_date=datetime(2025, 1, 1),
schedule="@daily",
)python
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime
_project_config = ProjectConfig(
dbt_project_path="/usr/local/airflow/dbt/my_project",
)
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
),
)
_execution_config = ExecutionConfig()
_render_config = RenderConfig()
my_cosmos_dag = DbtDag(
dag_id="my_cosmos_dag",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
operator_args={},
start_date=datetime(2025, 1, 1),
schedule="@daily",
)Option B: DbtTaskGroup (Inside Existing DAG)
选项B:DbtTaskGroup(嵌入现有DAG)
python
from airflow.sdk import dag, task # Airflow 3.xpython
from airflow.sdk import dag, task # Airflow 3.xfrom airflow.decorators import dag, task # Airflow 2.x
from airflow.decorators import dag, task # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from pendulum import datetime
_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig()
_render_config = RenderConfig()
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
@task
def pre_dbt():
return "some_value"
dbt = DbtTaskGroup(
group_id="dbt_project",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
)
@task
def post_dbt():
pass
chain(pre_dbt(), dbt, post_dbt())my_dag()
undefinedfrom airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from pendulum import datetime
_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig()
_render_config = RenderConfig()
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
@task
def pre_dbt():
return "some_value"
dbt = DbtTaskGroup(
group_id="dbt_project",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
)
@task
def post_dbt():
pass
chain(pre_dbt(), dbt, post_dbt())my_dag()
undefinedSetting Dependencies on Individual Cosmos Tasks
为单个Cosmos任务配置依赖
python
from cosmos import DbtDag, DbtResourceType
from airflow.sdk import task, chain
with DbtDag(...) as dag:
@task
def upstream_task():
pass
_upstream = upstream_task()
for unique_id, dbt_node in dag.dbt_graph.filtered_nodes.items():
if dbt_node.resource_type == DbtResourceType.SEED:
my_dbt_task = dag.tasks_map[unique_id]
chain(_upstream, my_dbt_task)python
from cosmos import DbtDag, DbtResourceType
from airflow.sdk import task, chain
with DbtDag(...) as dag:
@task
def upstream_task():
pass
_upstream = upstream_task()
for unique_id, dbt_node in dag.dbt_graph.filtered_nodes.items():
if dbt_node.resource_type == DbtResourceType.SEED:
my_dbt_task = dag.tasks_map[unique_id]
chain(_upstream, my_dbt_task)8. Safety Checks
8. 安全检查
Before finalizing, verify:
- Execution mode matches constraints (AIRFLOW_ASYNC → BigQuery only; containerized → manifest required)
- Warehouse adapter installed for chosen execution mode
- Secrets via Airflow connections or env vars, NOT plaintext
- Load mode matches execution (containerized → manifest; complex selectors → dbt_ls)
- Airflow 3 asset URIs if downstream DAGs scheduled on Cosmos assets (see Appendix A)
最终确定前,请验证:
- 执行模式符合约束条件(AIRFLOW_ASYNC仅适用于BigQuery;容器化执行需要manifest文件)
- 已为所选执行模式安装对应的数据仓库适配器
- 密钥通过Airflow连接或环境变量管理,而非明文存储
- 加载模式与执行方式匹配(容器化执行→manifest;复杂选择器→dbt_ls)
- 若下游DAG基于Cosmos资产调度,请使用Airflow 3的资产URI(见附录A)
Appendix A: Airflow 3 Compatibility
附录A:Airflow 3兼容性
Import Differences
导入差异
| Airflow 3.x | Airflow 2.x |
|---|---|
| |
| |
| Airflow 3.x | Airflow 2.x |
|---|---|
| |
| |
Asset/Dataset URI Format Change
资产/数据集URI格式变更
Cosmos ≤1.9 (Airflow 2 Datasets):
postgres://0.0.0.0:5434/postgres.public.ordersCosmos ≥1.10 (Airflow 3 Assets):
postgres://0.0.0.0:5434/postgres/public/ordersCRITICAL: Update asset URIs when upgrading to Airflow 3.
Cosmos ≤1.9(Airflow 2 Datasets):
postgres://0.0.0.0:5434/postgres.public.ordersCosmos ≥1.10(Airflow 3 Assets):
postgres://0.0.0.0:5434/postgres/public/orders重要提示:升级到Airflow 3时,请更新资产URI。
Appendix B: Operational Extras
附录B:额外操作配置
Caching
缓存
Cosmos caches artifacts to speed up parsing. Enabled by default.
Cosmos会缓存工件以加快解析速度,默认启用。
Memory-Optimized Imports
内存优化导入
bash
AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=TrueWhen enabled:
python
from cosmos.airflow.dag import DbtDag # instead of: from cosmos import DbtDagbash
AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=True启用后:
python
from cosmos.airflow.dag import DbtDag # 替代:from cosmos import DbtDagArtifact Upload to Object Storage
工件上传至对象存储
bash
AIRFLOW__COSMOS__REMOTE_TARGET_PATH=s3://bucket/target_dir/
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=aws_defaultpython
from cosmos.io import upload_to_cloud_storage
my_dag = DbtDag(
# ...
operator_args={"callback": upload_to_cloud_storage},
)bash
AIRFLOW__COSMOS__REMOTE_TARGET_PATH=s3://bucket/target_dir/
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=aws_defaultpython
from cosmos.io import upload_to_cloud_storage
my_dag = DbtDag(
# ...
operator_args={"callback": upload_to_cloud_storage},
)dbt Docs Hosting (Airflow 3.1+ / Cosmos 1.11+)
dbt Docs托管(Airflow 3.1+ / Cosmos 1.11+)
bash
AIRFLOW__COSMOS__DBT_DOCS_PROJECTS='{
"my_project": {
"dir": "s3://bucket/docs/",
"index": "index.html",
"conn_id": "aws_default",
"name": "My Project"
}
}'bash
AIRFLOW__COSMOS__DBT_DOCS_PROJECTS='{
"my_project": {
"dir": "s3://bucket/docs/",
"index": "index.html",
"conn_id": "aws_default",
"name": "My Project"
}
}'Related Skills
相关技能
- cosmos-dbt-fusion: For dbt Fusion projects (not dbt Core)
- authoring-dags: General DAG authoring patterns
- testing-dags: Testing DAGs after creation
- cosmos-dbt-fusion:适用于dbt Fusion项目(非dbt Core)
- authoring-dags:通用DAG编写模式
- testing-dags:创建DAG后的测试方法