testing-dags

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

DAG Testing Skill

DAG测试Skill

Use
af
commands to test, debug, and fix DAGs in iterative cycles.
使用
af
命令在迭代循环中测试、调试和修复DAG。

Running the CLI

运行CLI

Run all
af
commands using uvx (no installation required):
bash
uvx --from astro-airflow-mcp@latest af <command>
Throughout this document,
af
is shorthand for
uvx --from astro-airflow-mcp@latest af
.

使用uvx运行所有
af
命令(无需安装):
bash
uvx --from astro-airflow-mcp@latest af <command>
在本文档中,
af
uvx --from astro-airflow-mcp@latest af
的简写。

FIRST ACTION: Just Trigger the DAG

首要操作:直接触发DAG

When the user asks to test a DAG, your FIRST AND ONLY action should be:
bash
af runs trigger-wait <dag_id>
DO NOT:
  • Call
    af dags list
    first
  • Call
    af dags get
    first
  • Call
    af dags errors
    first
  • Use
    grep
    or
    ls
    or any other bash command
  • Do any "pre-flight checks"
Just trigger the DAG. If it fails, THEN debug.

当用户要求测试DAG时,你的唯一首要操作应该是:
bash
af runs trigger-wait <dag_id>
禁止:
  • 先调用
    af dags list
  • 先调用
    af dags get
  • 先调用
    af dags errors
  • 使用
    grep
    ls
    或其他任何bash命令
  • 进行任何“飞行前检查”
直接触发DAG即可。 如果失败,再进行调试。

Testing Workflow Overview

测试工作流概述

┌─────────────────────────────────────┐
│ 1. TRIGGER AND WAIT                 │
│    Run DAG, wait for completion     │
└─────────────────────────────────────┘
        ┌───────┴───────┐
        ↓               ↓
   ┌─────────┐    ┌──────────┐
   │ SUCCESS │    │ FAILED   │
   │ Done!   │    │ Debug... │
   └─────────┘    └──────────┘
        ┌─────────────────────────────────────┐
        │ 2. DEBUG (only if failed)           │
        │    Get logs, identify root cause    │
        └─────────────────────────────────────┘
        ┌─────────────────────────────────────┐
        │ 3. FIX AND RETEST                   │
        │    Apply fix, restart from step 1   │
        └─────────────────────────────────────┘
Philosophy: Try first, debug on failure. Don't waste time on pre-flight checks — just run the DAG and diagnose if something goes wrong.

┌─────────────────────────────────────┐
│ 1. 触发并等待                 │
│    运行DAG,等待完成     │
└─────────────────────────────────────┘
        ┌───────┴───────┐
        ↓               ↓
   ┌─────────┐    ┌──────────┐
   │ 成功 │    │ 失败   │
   │ 完成!   │    │ 调试中... │
   └─────────┘    └──────────┘
        ┌─────────────────────────────────────┐
        │ 2. 调试(仅在失败时)           │
        │    获取日志,确定根本原因    │
        └─────────────────────────────────────┘
        ┌─────────────────────────────────────┐
        │ 3. 修复并重新测试                   │
        │    应用修复,从步骤1重新开始   │
        └─────────────────────────────────────┘
理念:先尝试,失败后再调试。 不要浪费时间在飞行前检查上——直接运行DAG,若出现问题再诊断。

Phase 1: Trigger and Wait

阶段1:触发并等待

Use
af runs trigger-wait
to test the DAG:
使用
af runs trigger-wait
测试DAG:

Primary Method: Trigger and Wait

主要方法:触发并等待

bash
af runs trigger-wait <dag_id> --timeout 300
Example:
bash
af runs trigger-wait my_dag --timeout 300
Why this is the preferred method:
  • Single command handles trigger + monitoring
  • Returns immediately when DAG completes (success or failure)
  • Includes failed task details if run fails
  • No manual polling required
bash
af runs trigger-wait <dag_id> --timeout 300
示例:
bash
af runs trigger-wait my_dag --timeout 300
为何此方法为首选:
  • 单个命令处理触发+监控
  • DAG完成(成功或失败)后立即返回
  • 若运行失败,包含失败任务详情
  • 无需手动轮询

Response Interpretation

响应解读

Success:
json
{
  "dag_run": {
    "dag_id": "my_dag",
    "dag_run_id": "manual__2025-01-14T...",
    "state": "success",
    "start_date": "...",
    "end_date": "..."
  },
  "timed_out": false,
  "elapsed_seconds": 45.2
}
Failure:
json
{
  "dag_run": {
    "state": "failed"
  },
  "timed_out": false,
  "elapsed_seconds": 30.1,
  "failed_tasks": [
    {
      "task_id": "extract_data",
      "state": "failed",
      "try_number": 2
    }
  ]
}
Timeout:
json
{
  "dag_id": "my_dag",
  "dag_run_id": "manual__...",
  "state": "running",
  "timed_out": true,
  "elapsed_seconds": 300.0,
  "message": "Timed out after 300 seconds. DAG run is still running."
}
成功:
json
{
  "dag_run": {
    "dag_id": "my_dag",
    "dag_run_id": "manual__2025-01-14T...",
    "state": "success",
    "start_date": "...",
    "end_date": "..."
  },
  "timed_out": false,
  "elapsed_seconds": 45.2
}
失败:
json
{
  "dag_run": {
    "state": "failed"
  },
  "timed_out": false,
  "elapsed_seconds": 30.1,
  "failed_tasks": [
    {
      "task_id": "extract_data",
      "state": "failed",
      "try_number": 2
    }
  ]
}
超时:
json
{
  "dag_id": "my_dag",
  "dag_run_id": "manual__...",
  "state": "running",
  "timed_out": true,
  "elapsed_seconds": 300.0,
  "message": "Timed out after 300 seconds. DAG run is still running."
}

Alternative: Trigger and Monitor Separately

替代方案:分开触发和监控

Use this only when you need more control:
bash
undefined
仅在需要更多控制时使用:
bash
undefined

Step 1: Trigger

步骤1:触发

af runs trigger my_dag
af runs trigger my_dag

Returns: {"dag_run_id": "manual__...", "state": "queued"}

返回: {"dag_run_id": "manual__...", "state": "queued"}

Step 2: Check status

步骤2:检查状态

af runs get my_dag manual__2025-01-14T...
af runs get my_dag manual__2025-01-14T...

Returns current state

返回当前状态


---

---

Handling Results

处理结果

If Success

若成功

The DAG ran successfully. Summarize for the user:
  • Total elapsed time
  • Number of tasks completed
  • Any notable outputs (if visible in logs)
You're done!
DAG运行成功。为用户总结:
  • 总耗时
  • 已完成任务数量
  • 任何可见的显著输出(若日志中有显示)
操作完成!

If Timed Out

若超时

The DAG is still running. Options:
  1. Check current status:
    af runs get <dag_id> <dag_run_id>
  2. Ask user if they want to continue waiting
  3. Increase timeout and try again
DAG仍在运行。可选操作:
  1. 检查当前状态:
    af runs get <dag_id> <dag_run_id>
  2. 询问用户是否要继续等待
  3. 增加超时时间后重试

If Failed

若失败

Move to Phase 2 (Debug) to identify the root cause.

进入阶段2(调试)以确定根本原因。

Phase 2: Debug Failures (Only If Needed)

阶段2:调试失败(仅在需要时)

When a DAG run fails, use these commands to diagnose:
当DAG运行失败时,使用以下命令诊断:

Get Comprehensive Diagnosis

获取全面诊断信息

bash
af runs diagnose <dag_id> <dag_run_id>
Returns in one call:
  • Run metadata (state, timing)
  • All task instances with states
  • Summary of failed tasks
  • State counts (success, failed, skipped, etc.)
bash
af runs diagnose <dag_id> <dag_run_id>
一次调用返回:
  • 运行元数据(状态、计时)
  • 所有任务实例及其状态
  • 失败任务摘要
  • 状态统计(成功、失败、跳过等)

Get Task Logs

获取任务日志

bash
af tasks logs <dag_id> <dag_run_id> <task_id>
Example:
bash
af tasks logs my_dag manual__2025-01-14T... extract_data
For specific retry attempt:
bash
af tasks logs my_dag manual__2025-01-14T... extract_data --try 2
Look for:
  • Exception messages and stack traces
  • Connection errors (database, API, S3)
  • Permission errors
  • Timeout errors
  • Missing dependencies
bash
af tasks logs <dag_id> <dag_run_id> <task_id>
示例:
bash
af tasks logs my_dag manual__2025-01-14T... extract_data
针对特定重试尝试:
bash
af tasks logs my_dag manual__2025-01-14T... extract_data --try 2
重点关注:
  • 异常消息和堆栈跟踪
  • 连接错误(数据库、API、S3)
  • 权限错误
  • 超时错误
  • 缺失依赖

Check Upstream Tasks

检查上游任务

If a task shows
upstream_failed
, the root cause is in an upstream task. Use
af runs diagnose
to find which task actually failed.
如果任务显示
upstream_failed
,则根本原因在上游任务中。使用
af runs diagnose
找出实际失败的任务。

Check Import Errors (If DAG Didn't Run)

检查导入错误(若DAG未运行)

If the trigger failed because the DAG doesn't exist:
bash
af dags errors
This reveals syntax errors or missing dependencies that prevented the DAG from loading.

如果因DAG不存在导致触发失败:
bash
af dags errors
此命令会显示阻止DAG加载的语法错误或缺失依赖。

Phase 3: Fix and Retest

阶段3:修复并重新测试

Once you identify the issue:
确定问题后:

Common Fixes

常见修复方案

IssueFix
Missing importAdd to DAG file
Missing packageAdd to
requirements.txt
Connection errorCheck
af config connections
, verify credentials
Variable missingCheck
af config variables
, create if needed
TimeoutIncrease task timeout or optimize query
Permission errorCheck credentials in connection
问题修复方法
缺失导入添加到DAG文件
缺失包添加到
requirements.txt
连接错误检查
af config connections
,验证凭据
缺失变量检查
af config variables
,必要时创建
超时增加任务超时时间或优化查询
权限错误检查连接中的凭据

After Fixing

修复后

  1. Save the file
  2. Retest:
    af runs trigger-wait <dag_id>
Repeat the test → debug → fix loop until the DAG succeeds.

  1. 保存文件
  2. 重新测试:
    af runs trigger-wait <dag_id>
重复测试→调试→修复循环,直到DAG成功运行。

CLI Quick Reference

CLI快速参考

PhaseCommandPurpose
Test
af runs trigger-wait <dag_id>
Primary test method — start here
Test
af runs trigger <dag_id>
Start run (alternative)
Test
af runs get <dag_id> <run_id>
Check run status
Debug
af runs diagnose <dag_id> <run_id>
Comprehensive failure diagnosis
Debug
af tasks logs <dag_id> <run_id> <task_id>
Get task output/errors
Debug
af dags errors
Check for parse errors (if DAG won't load)
Debug
af dags get <dag_id>
Verify DAG config
Debug
af dags explore <dag_id>
Full DAG inspection
Config
af config connections
List connections
Config
af config variables
List variables

阶段命令用途
测试
af runs trigger-wait <dag_id>
主要测试方法——从此开始
测试
af runs trigger <dag_id>
启动运行(替代方案)
测试
af runs get <dag_id> <run_id>
检查运行状态
调试
af runs diagnose <dag_id> <run_id>
全面故障诊断
调试
af tasks logs <dag_id> <run_id> <task_id>
获取任务输出/错误
调试
af dags errors
检查解析错误(若DAG无法加载)
调试
af dags get <dag_id>
验证DAG配置
调试
af dags explore <dag_id>
全面检查DAG
配置
af config connections
列出连接
配置
af config variables
列出变量

Testing Scenarios

测试场景

Scenario 1: Test a DAG (Happy Path)

场景1:测试DAG(正常路径)

bash
af runs trigger-wait my_dag
bash
af runs trigger-wait my_dag

Success! Done.

成功!完成。

undefined
undefined

Scenario 2: Test a DAG (With Failure)

场景2:测试DAG(含失败情况)

bash
undefined
bash
undefined

1. Run and wait

1. 运行并等待

af runs trigger-wait my_dag
af runs trigger-wait my_dag

Failed...

失败...

2. Find failed tasks

2. 查找失败任务

af runs diagnose my_dag manual__2025-01-14T...
af runs diagnose my_dag manual__2025-01-14T...

3. Get error details

3. 获取错误详情

af tasks logs my_dag manual__2025-01-14T... extract_data
af tasks logs my_dag manual__2025-01-14T... extract_data

4. [Fix the issue in DAG code]

4. [在DAG代码中修复问题]

5. Retest

5. 重新测试

af runs trigger-wait my_dag
undefined
af runs trigger-wait my_dag
undefined

Scenario 3: DAG Doesn't Exist / Won't Load

场景3:DAG不存在/无法加载

bash
undefined
bash
undefined

1. Trigger fails - DAG not found

1. 触发失败 - 未找到DAG

af runs trigger-wait my_dag
af runs trigger-wait my_dag

Error: DAG not found

错误:未找到DAG

2. Find parse error

2. 查找解析错误

af dags errors
af dags errors

3. [Fix the issue in DAG code]

3. [在DAG代码中修复问题]

4. Retest

4. 重新测试

af runs trigger-wait my_dag
undefined
af runs trigger-wait my_dag
undefined

Scenario 4: Debug a Failed Scheduled Run

场景4:调试失败的调度运行

bash
undefined
bash
undefined

1. Get failure summary

1. 获取失败摘要

af runs diagnose my_dag scheduled__2025-01-14T...
af runs diagnose my_dag scheduled__2025-01-14T...

2. Get error from failed task

2. 获取失败任务的错误信息

af tasks logs my_dag scheduled__2025-01-14T... failed_task_id
af tasks logs my_dag scheduled__2025-01-14T... failed_task_id

3. [Fix the issue]

3. [修复问题]

4. Retest

4. 重新测试

af runs trigger-wait my_dag
undefined
af runs trigger-wait my_dag
undefined

Scenario 5: Test with Custom Configuration

场景5:使用自定义配置测试

bash
af runs trigger-wait my_dag --conf '{"env": "staging", "batch_size": 100}' --timeout 600
bash
af runs trigger-wait my_dag --conf '{"env": "staging", "batch_size": 100}' --timeout 600

Scenario 6: Long-Running DAG

场景6:长时间运行的DAG

bash
undefined
bash
undefined

Wait up to 1 hour

最多等待1小时

af runs trigger-wait my_dag --timeout 3600
af runs trigger-wait my_dag --timeout 3600

If timed out, check current state

若超时,检查当前状态

af runs get my_dag manual__2025-01-14T...

---
af runs get my_dag manual__2025-01-14T...

---

Debugging Tips

调试技巧

Common Error Patterns

常见错误模式

Connection Refused / Timeout:
  • Check
    af config connections
    for correct host/port
  • Verify network connectivity to external system
  • Check if connection credentials are correct
ModuleNotFoundError:
  • Package missing from
    requirements.txt
  • After adding, may need environment restart
PermissionError:
  • Check IAM roles, database grants, API keys
  • Verify connection has correct credentials
Task Timeout:
  • Query or operation taking too long
  • Consider adding timeout parameter to task
  • Optimize underlying query/operation
连接被拒绝/超时:
  • 检查
    af config connections
    中的主机/端口是否正确
  • 验证与外部系统的网络连通性
  • 检查连接凭据是否正确
ModuleNotFoundError:
  • requirements.txt
    中缺失包
  • 添加后可能需要重启环境
PermissionError:
  • 检查IAM角色、数据库权限、API密钥
  • 验证连接是否有正确的凭据
任务超时:
  • 查询或操作耗时过长
  • 考虑为任务添加超时参数
  • 优化底层查询/操作

Reading Task Logs

读取任务日志

Task logs typically show:
  1. Task start timestamp
  2. Any print/log statements from task code
  3. Return value (for @task decorated functions)
  4. Exception + full stack trace (if failed)
  5. Task end timestamp and duration
Focus on the exception at the bottom of failed task logs.

任务日志通常包含:
  1. 任务启动时间戳
  2. 任务代码中的任何打印/日志语句
  3. 返回值(针对@task装饰的函数)
  4. 异常+完整堆栈跟踪(若失败)
  5. 任务结束时间戳和耗时
重点关注失败任务日志底部的异常信息。

Related Skills

相关Skills

  • authoring-dags: For creating new DAGs (includes validation before testing)
  • debugging-dags: For general Airflow troubleshooting
  • authoring-dags:用于创建新DAG(包含测试前的验证)
  • debugging-dags:用于常规Airflow故障排查