Loading...
Loading...
Complex DAG testing workflows with debugging and fixing cycles. Use for multi-step testing requests like "test this dag and fix it if it fails", "test and debug", "run the pipeline and troubleshoot issues". For simple test requests ("test dag", "run dag"), the airflow entrypoint skill handles it directly. This skill is for iterative test-debug-fix cycles.
npx skill4agent add astronomer/agents testing-dagsafafuvx --from astro-airflow-mcp@latest af <command>afuvx --from astro-airflow-mcp@latest afaf runs trigger-wait <dag_id>af dags listaf dags getaf dags errorsgrepls┌─────────────────────────────────────┐
│ 1. TRIGGER AND WAIT │
│ Run DAG, wait for completion │
└─────────────────────────────────────┘
↓
┌───────┴───────┐
↓ ↓
┌─────────┐ ┌──────────┐
│ SUCCESS │ │ FAILED │
│ Done! │ │ Debug... │
└─────────┘ └──────────┘
↓
┌─────────────────────────────────────┐
│ 2. DEBUG (only if failed) │
│ Get logs, identify root cause │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 3. FIX AND RETEST │
│ Apply fix, restart from step 1 │
└─────────────────────────────────────┘af runs trigger-waitaf runs trigger-wait <dag_id> --timeout 300af runs trigger-wait my_dag --timeout 300{
"dag_run": {
"dag_id": "my_dag",
"dag_run_id": "manual__2025-01-14T...",
"state": "success",
"start_date": "...",
"end_date": "..."
},
"timed_out": false,
"elapsed_seconds": 45.2
}{
"dag_run": {
"state": "failed"
},
"timed_out": false,
"elapsed_seconds": 30.1,
"failed_tasks": [
{
"task_id": "extract_data",
"state": "failed",
"try_number": 2
}
]
}{
"dag_id": "my_dag",
"dag_run_id": "manual__...",
"state": "running",
"timed_out": true,
"elapsed_seconds": 300.0,
"message": "Timed out after 300 seconds. DAG run is still running."
}# Step 1: Trigger
af runs trigger my_dag
# Returns: {"dag_run_id": "manual__...", "state": "queued"}
# Step 2: Check status
af runs get my_dag manual__2025-01-14T...
# Returns current stateaf runs get <dag_id> <dag_run_id>af runs diagnose <dag_id> <dag_run_id>af tasks logs <dag_id> <dag_run_id> <task_id>af tasks logs my_dag manual__2025-01-14T... extract_dataaf tasks logs my_dag manual__2025-01-14T... extract_data --try 2upstream_failedaf runs diagnoseaf dags errors| Issue | Fix |
|---|---|
| Missing import | Add to DAG file |
| Missing package | Add to |
| Connection error | Check |
| Variable missing | Check |
| Timeout | Increase task timeout or optimize query |
| Permission error | Check credentials in connection |
af runs trigger-wait <dag_id>| Phase | Command | Purpose |
|---|---|---|
| Test | | Primary test method — start here |
| Test | | Start run (alternative) |
| Test | | Check run status |
| Debug | | Comprehensive failure diagnosis |
| Debug | | Get task output/errors |
| Debug | | Check for parse errors (if DAG won't load) |
| Debug | | Verify DAG config |
| Debug | | Full DAG inspection |
| Config | | List connections |
| Config | | List variables |
af runs trigger-wait my_dag
# Success! Done.# 1. Run and wait
af runs trigger-wait my_dag
# Failed...
# 2. Find failed tasks
af runs diagnose my_dag manual__2025-01-14T...
# 3. Get error details
af tasks logs my_dag manual__2025-01-14T... extract_data
# 4. [Fix the issue in DAG code]
# 5. Retest
af runs trigger-wait my_dag# 1. Trigger fails - DAG not found
af runs trigger-wait my_dag
# Error: DAG not found
# 2. Find parse error
af dags errors
# 3. [Fix the issue in DAG code]
# 4. Retest
af runs trigger-wait my_dag# 1. Get failure summary
af runs diagnose my_dag scheduled__2025-01-14T...
# 2. Get error from failed task
af tasks logs my_dag scheduled__2025-01-14T... failed_task_id
# 3. [Fix the issue]
# 4. Retest
af runs trigger-wait my_dagaf runs trigger-wait my_dag --conf '{"env": "staging", "batch_size": 100}' --timeout 600# Wait up to 1 hour
af runs trigger-wait my_dag --timeout 3600
# If timed out, check current state
af runs get my_dag manual__2025-01-14T...af config connectionsrequirements.txt