authoring-dags

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

DAG Authoring Skill

DAG编写技能

This skill guides you through creating and validating Airflow DAGs using best practices and MCP tools.
For testing and debugging DAGs, see the testing-dags skill which covers the full test → debug → fix → retest workflow.

本技能将指导你使用最佳实践和MCP工具创建并验证Airflow DAG。
有关DAG的测试与调试,请查看testing-dags技能,它涵盖了完整的测试→调试→修复→重新测试工作流。

⚠️ CRITICAL WARNING: Use MCP Tools, NOT CLI Commands ⚠️

⚠️ 重要警告:使用MCP工具,而非CLI命令 ⚠️

STOP! Before running ANY Airflow-related command, read this.
You MUST use MCP tools for ALL Airflow interactions. CLI commands like
astro dev run
,
airflow dags
, or shell commands to read logs are FORBIDDEN.
Why? MCP tools provide structured, reliable output. CLI commands are fragile, produce unstructured text, and often fail silently.

停!在运行任何Airflow相关命令前,请阅读以下内容。
所有Airflow交互操作必须使用MCP工具。
astro dev run
airflow dags
等CLI命令,或用于读取日志的Shell命令均禁止使用
原因? MCP工具可提供结构化、可靠的输出。CLI命令稳定性差,生成非结构化文本,且经常静默失败。

CLI vs MCP Quick Reference

CLI与MCP快速参考

ALWAYS use Airflow MCP tools. NEVER use CLI commands.
❌ DO NOT USE✅ USE INSTEAD
astro dev run dags list
list_dags
MCP tool
airflow dags list
list_dags
MCP tool
astro dev run dags test
trigger_dag_and_wait
MCP tool
airflow tasks test
trigger_dag_and_wait
MCP tool
cat
/
grep
on Airflow logs
get_task_logs
MCP tool
find
in dags folder
list_dags
or
explore_dag
MCP tool
Any
astro dev run ...
Equivalent MCP tool
Any
airflow ...
CLI
Equivalent MCP tool
ls
on
/usr/local/airflow/dags/
list_dags
or
explore_dag
MCP tool
cat ... | jq
to filter MCP results
Read the JSON directly from MCP response
Remember:
  • ✅ Airflow is ALREADY running — the MCP server handles the connection
  • ❌ Do NOT attempt to start, stop, or manage the Airflow environment
  • ❌ Do NOT use shell commands to check DAG status, logs, or errors
  • ❌ Do NOT use bash to parse or filter MCP tool results — read the JSON directly
  • ❌ Do NOT use
    ls
    ,
    find
    , or
    cat
    on Airflow container paths (
    /usr/local/airflow/...
    )
  • ✅ ALWAYS use MCP tools — they return structured JSON you can read directly
始终使用Airflow MCP工具。切勿使用CLI命令。
❌ 禁止使用✅ 建议使用
astro dev run dags list
list_dags
MCP工具
airflow dags list
list_dags
MCP工具
astro dev run dags test
trigger_dag_and_wait
MCP工具
airflow tasks test
trigger_dag_and_wait
MCP工具
cat
/
grep
读取Airflow日志
get_task_logs
MCP工具
在dags目录中使用
find
list_dags
explore_dag
MCP工具
任何
astro dev run ...
命令
对应的MCP工具
任何
airflow ...
CLI命令
对应的MCP工具
/usr/local/airflow/dags/
目录使用
ls
list_dags
explore_dag
MCP工具
使用
cat ... | jq
过滤MCP结果
直接读取MCP响应中的JSON数据
请记住:
  • ✅ Airflow已处于运行状态 — MCP服务器负责处理连接
  • ❌ 请勿尝试启动、停止或管理Airflow环境
  • ❌ 请勿使用Shell命令检查DAG状态、日志或错误
  • ❌ 请勿使用bash解析或过滤MCP工具结果 — 直接读取JSON数据
  • ❌ 请勿在Airflow容器路径(
    /usr/local/airflow/...
    )使用
    ls
    find
    cat
    命令
  • ✅ 始终使用MCP工具 — 它们会返回可直接读取的结构化JSON

Workflow Overview

工作流概述

┌─────────────────────────────────────┐
│ 1. DISCOVER                         │
│    Understand codebase & environment│
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ 2. PLAN                             │
│    Propose structure, get approval  │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ 3. IMPLEMENT                        │
│    Write DAG following patterns     │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ 4. VALIDATE                         │
│    Check import errors, warnings    │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ 5. TEST (with user consent)         │
│    Trigger, monitor, check logs     │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ 6. ITERATE                          │
│    Fix issues, re-validate          │
└─────────────────────────────────────┘

┌─────────────────────────────────────┐
│ 1. 探索                         │
│    了解代码库与环境│
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ 2. 规划                             │
│    提出结构方案,获得批准  │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ 3. 实现                        │
│    遵循模式编写DAG     │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ 4. 验证                         │
│    检查导入错误与警告    │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ 5. 测试(需用户同意)         │
│    触发、监控、检查日志     │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ 6. 迭代                          │
│    修复问题,重新验证          │
└─────────────────────────────────────┘

Phase 1: Discover

阶段1:探索

Before writing code, understand the context.
编写代码前,先了解上下文。

Explore the Codebase

探索代码库

Use file tools to find existing patterns:
  • Glob
    for
    **/dags/**/*.py
    to find existing DAGs
  • Read
    similar DAGs to understand conventions
  • Check
    requirements.txt
    for available packages
使用文件工具查找现有模式:
  • 使用
    Glob
    查找
    **/dags/**/*.py
    以定位现有DAG
  • 使用
    Read
    读取类似DAG以了解规范
  • 查看
    requirements.txt
    了解可用包

Query the Airflow Environment

查询Airflow环境

Use MCP tools to understand what's available:
ToolPurpose
list_connections
What external systems are configured
list_variables
What configuration values exist
list_providers
What operator packages are installed
get_airflow_version
Version constraints and features
list_dags
Existing DAGs and naming conventions
list_pools
Resource pools for concurrency
Example discovery questions:
  • "Is there a Snowflake connection?" →
    list_connections
  • "What Airflow version?" →
    get_airflow_version
  • "Are S3 operators available?" →
    list_providers

使用MCP工具了解可用资源:
工具用途
list_connections
查看已配置的外部系统
list_variables
查看已存在的配置值
list_providers
查看已安装的Operator包
get_airflow_version
查看版本限制与功能
list_dags
查看现有DAG与命名规范
list_pools
查看用于并发控制的资源池
探索示例问题:
  • "是否配置了Snowflake连接?" → 使用
    list_connections
  • "Airflow版本是多少?" → 使用
    get_airflow_version
  • "是否有S3 Operator可用?" → 使用
    list_providers

Phase 2: Plan

阶段2:规划

Based on discovery, propose:
  1. DAG structure - Tasks, dependencies, schedule
  2. Operators to use - Based on available providers
  3. Connections needed - Existing or to be created
  4. Variables needed - Existing or to be created
  5. Packages needed - Additions to requirements.txt
Get user approval before implementing.

基于探索结果,提出以下方案:
  1. DAG结构 - 任务、依赖关系、调度计划
  2. 要使用的Operator - 基于已安装的提供者包
  3. 所需连接 - 已存在或需新建
  4. 所需变量 - 已存在或需新建
  5. 所需包 - 添加至requirements.txt
在实现前需获得用户批准。

Phase 3: Implement

阶段3:实现

Write the DAG following best practices (see below). Key steps:
  1. Create DAG file in appropriate location
  2. Update
    requirements.txt
    if needed
  3. Save the file

遵循最佳实践编写DAG(详见下文)。关键步骤:
  1. 在合适位置创建DAG文件
  2. 如有需要,更新
    requirements.txt
  3. 保存文件

Phase 4: Validate

阶段4:验证

Use the Airflow MCP as a feedback loop. Do NOT use CLI commands.
将Airflow MCP作为反馈循环。禁止使用CLI命令。

Step 1: Check Import Errors

步骤1:检查导入错误

After saving, call the MCP tool (Airflow will have already parsed the file):
MCP tool:
list_import_errors
  • If your file appears → fix and retry
  • If no errors → continue
Common causes: missing imports, syntax errors, missing packages.
保存文件后,调用MCP工具(Airflow已自动解析该文件):
MCP工具:
list_import_errors
  • 如果你的文件出现在结果中 → 修复后重试
  • 如果无错误 → 继续下一步
常见原因:缺失导入、语法错误、缺失依赖包。

Step 2: Verify DAG Exists

步骤2:验证DAG存在

MCP tool:
get_dag_details(dag_id="your_dag_id")
Check: DAG exists, schedule correct, tags set, paused status.
MCP工具:
get_dag_details(dag_id="your_dag_id")
检查:DAG是否存在、调度计划是否正确、标签是否设置、暂停状态是否正确。

Step 3: Check Warnings

步骤3:检查警告

MCP tool:
list_dag_warnings
Look for deprecation warnings or configuration issues.
MCP工具:
list_dag_warnings
查找弃用警告或配置问题。

Step 4: Explore DAG Structure

步骤4:探索DAG结构

MCP tool:
explore_dag(dag_id="your_dag_id")
Returns in one call: metadata, tasks, dependencies, source code.

MCP工具:
explore_dag(dag_id="your_dag_id")
一次调用即可返回:元数据、任务、依赖关系、源代码。

Phase 5: Test

阶段5:测试

📘 See the testing-dags skill for comprehensive testing guidance.
Once validation passes, test the DAG using the workflow in the testing-dags skill:
  1. Get user consent — Always ask before triggering
  2. Trigger and wait — Use
    trigger_dag_and_wait(dag_id, timeout=300)
  3. Analyze results — Check success/failure status
  4. Debug if needed — Use
    diagnose_dag_run
    and
    get_task_logs
📘 完整测试指南请查看testing-dags技能。
验证通过后,使用testing-dags技能中的工作流测试DAG:
  1. 获得用户同意 — 触发前务必询问用户
  2. 触发并等待 — 使用
    trigger_dag_and_wait(dag_id, timeout=300)
  3. 分析结果 — 检查成功/失败状态
  4. 如需调试 — 使用
    diagnose_dag_run
    get_task_logs

Quick Test (Minimal)

快速测试(最简版)

undefined
undefined

Ask user first, then:

先询问用户,然后执行:

trigger_dag_and_wait(dag_id="your_dag_id", timeout=300)

For the full test → debug → fix → retest loop, see **testing-dags**.

---
trigger_dag_and_wait(dag_id="your_dag_id", timeout=300)

完整的测试→调试→修复→重新测试流程,请查看**testing-dags**技能。

---

Phase 6: Iterate

阶段6:迭代

If issues found:
  1. Fix the code
  2. Check for import errors with
    list_import_errors
    MCP tool
  3. Re-validate using MCP tools (Phase 4)
  4. Re-test using the testing-dags skill workflow (Phase 5)
Never use CLI commands to check status or logs. Always use MCP tools.

如果发现问题:
  1. 修复代码
  2. 使用
    list_import_errors
    MCP工具检查导入错误
  3. 使用MCP工具重新验证(阶段4)
  4. 使用testing-dags技能的工作流重新测试(阶段5)
切勿使用CLI命令检查状态或日志。请始终使用MCP工具。

MCP Tools Quick Reference

MCP工具快速参考

PhaseToolPurpose
Discover
list_connections
Available connections
Discover
list_variables
Configuration values
Discover
list_providers
Installed operators
Discover
get_airflow_version
Version info
Validate
list_import_errors
Parse errors (check first!)
Validate
get_dag_details
Verify DAG config
Validate
list_dag_warnings
Configuration warnings
Validate
explore_dag
Full DAG inspection
Testing tools — See the testing-dags skill for
trigger_dag_and_wait
,
diagnose_dag_run
,
get_task_logs
, etc.

阶段工具用途
探索
list_connections
查看可用连接
探索
list_variables
查看配置值
探索
list_providers
查看已安装的Operator
探索
get_airflow_version
查看版本信息
验证
list_import_errors
检查解析错误(优先执行!)
验证
get_dag_details
验证DAG配置
验证
list_dag_warnings
检查配置警告
验证
explore_dag
完整DAG检查
测试工具 — 有关
trigger_dag_and_wait
diagnose_dag_run
get_task_logs
等工具,请查看testing-dags技能。

Best Practices & Anti-Patterns

最佳实践与反模式

For detailed code examples and patterns, see reference/best-practices.md.
Key topics covered:
  • TaskFlow API usage
  • Credentials management (connections, variables)
  • Provider operators
  • Idempotency patterns
  • Data intervals
  • Task groups
  • Setup/Teardown patterns
  • Data quality checks
  • Anti-patterns to avoid

有关详细代码示例与模式,请查看**reference/best-practices.md**。
涵盖的核心主题:
  • TaskFlow API的使用
  • 凭证管理(连接、变量)
  • 提供者Operator
  • 幂等性模式
  • 数据间隔
  • 任务组
  • 搭建/清理模式
  • 数据质量检查
  • 需避免的反模式

Related Skills

相关技能

  • testing-dags: For testing DAGs, debugging failures, and the test → fix → retest loop
  • debugging-dags: For troubleshooting failed DAGs
  • migrating-airflow-2-to-3: For migrating DAGs to Airflow 3
  • testing-dags:用于DAG测试、失败调试,以及测试→修复→重新测试循环
  • debugging-dags:用于排查失败的DAG
  • migrating-airflow-2-to-3:用于将DAG迁移至Airflow 3