airflow

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Airflow Operations

Airflow 操作

Use
af
commands to query, manage, and troubleshoot Airflow workflows.
使用
af
命令查询、管理和排查Airflow工作流问题。

Running the CLI

运行CLI

Run all
af
commands using uvx (no installation required):
bash
uvx --from astro-airflow-mcp@latest af <command>
Throughout this document,
af
is shorthand for
uvx --from astro-airflow-mcp@latest af
.
使用uvx运行所有
af
命令(无需安装):
bash
uvx --from astro-airflow-mcp@latest af <command>
在本文档中,
af
uvx --from astro-airflow-mcp@latest af
的简写。

Instance Configuration

实例配置

Manage multiple Airflow instances with persistent configuration:
bash
undefined
通过持久化配置管理多个Airflow实例:
bash
undefined

Add a new instance

添加新实例

af instance add prod --url https://airflow.example.com --token "$API_TOKEN" af instance add staging --url https://staging.example.com --username admin --password admin
af instance add prod --url https://airflow.example.com --token "$API_TOKEN" af instance add staging --url https://staging.example.com --username admin --password admin

List and switch instances

列出并切换实例

af instance list # Shows all instances in a table af instance use prod # Switch to prod instance af instance current # Show current instance af instance delete old-instance
af instance list # 以表格形式显示所有实例 af instance use prod # 切换到prod实例 af instance current # 显示当前实例 af instance delete old-instance

Auto-discover instances (use --dry-run to preview first)

自动发现实例(先使用--dry-run预览)

af instance discover --dry-run # Preview all discoverable instances af instance discover # Discover from all backends (astro, local) af instance discover astro # Discover Astro deployments only af instance discover astro --all-workspaces # Include all accessible workspaces af instance discover local # Scan common local Airflow ports af instance discover local --scan # Deep scan all ports 1024-65535
af instance discover --dry-run # 预览所有可发现的实例 af instance discover # 从所有后端(Astro、本地)发现实例 af instance discover astro # 仅发现Astro部署实例 af instance discover astro --all-workspaces # 包含所有可访问的工作区 af instance discover local # 扫描常见的本地Airflow端口 af instance discover local --scan # 深度扫描所有1024-65535端口

IMPORTANT: Always run with --dry-run first and ask for user consent before

重要提示:在不使用--dry-run运行discover之前,务必先执行--dry-run并获取用户同意。非dry-run模式会在Astro Cloud中创建API令牌,这是需要明确批准的敏感操作。

running discover without it. The non-dry-run mode creates API tokens in

为单个命令覆盖当前实例

Astro Cloud, which is a sensitive action that requires explicit approval.

Override instance for a single command

af --instance staging dags list

Config file: `~/.af/config.yaml` (override with `--config` or `AF_CONFIG` env var)

Tokens in config can reference environment variables using `${VAR}` syntax:
```yaml
instances:
- name: prod
  url: https://airflow.example.com
  auth:
    token: ${AIRFLOW_API_TOKEN}
Or use environment variables directly (no config file needed):
bash
export AIRFLOW_API_URL=http://localhost:8080
export AIRFLOW_AUTH_TOKEN=your-token-here
af --instance staging dags list

配置文件:`~/.af/config.yaml`(可通过`--config`或`AF_CONFIG`环境变量覆盖)

配置中的令牌可以使用`${VAR}`语法引用环境变量:
```yaml
instances:
- name: prod
  url: https://airflow.example.com
  auth:
    token: ${AIRFLOW_API_TOKEN}
也可以直接使用环境变量(无需配置文件):
bash
export AIRFLOW_API_URL=http://localhost:8080
export AIRFLOW_AUTH_TOKEN=your-token-here

Or username/password:

或使用用户名/密码:

export AIRFLOW_USERNAME=admin export AIRFLOW_PASSWORD=admin

Or CLI flags: `af --airflow-url http://localhost:8080 --token "$TOKEN" <command>`
export AIRFLOW_USERNAME=admin export AIRFLOW_PASSWORD=admin

或使用CLI标志:`af --airflow-url http://localhost:8080 --token "$TOKEN" <command>`

Quick Reference

快速参考

CommandDescription
af health
System health check
af dags list
List all DAGs
af dags get <dag_id>
Get DAG details
af dags explore <dag_id>
Full DAG investigation
af dags source <dag_id>
Get DAG source code
af dags pause <dag_id>
Pause DAG scheduling
af dags unpause <dag_id>
Resume DAG scheduling
af dags errors
List import errors
af dags warnings
List DAG warnings
af dags stats
DAG run statistics
af runs list
List DAG runs
af runs get <dag_id> <run_id>
Get run details
af runs trigger <dag_id>
Trigger a DAG run
af runs trigger-wait <dag_id>
Trigger and wait for completion
af runs diagnose <dag_id> <run_id>
Diagnose failed run
af tasks list <dag_id>
List tasks in DAG
af tasks get <dag_id> <task_id>
Get task definition
af tasks instance <dag_id> <run_id> <task_id>
Get task instance
af tasks logs <dag_id> <run_id> <task_id>
Get task logs
af config version
Airflow version
af config show
Full configuration
af config connections
List connections
af config variables
List variables
af config variable <key>
Get specific variable
af config pools
List pools
af config pool <name>
Get pool details
af config plugins
List plugins
af config providers
List providers
af config assets
List assets/datasets
af api <endpoint>
Direct REST API access
af api ls
List available API endpoints
af api ls --filter X
List endpoints matching pattern
命令说明
af health
系统健康检查
af dags list
列出所有DAG
af dags get <dag_id>
获取DAG详情
af dags explore <dag_id>
全面调查DAG
af dags source <dag_id>
获取DAG源代码
af dags pause <dag_id>
暂停DAG调度
af dags unpause <dag_id>
恢复DAG调度
af dags errors
列出导入错误
af dags warnings
列出DAG警告
af dags stats
DAG运行统计
af runs list
列出DAG运行记录
af runs get <dag_id> <run_id>
获取运行记录详情
af runs trigger <dag_id>
触发DAG运行
af runs trigger-wait <dag_id>
触发DAG运行并等待完成
af runs diagnose <dag_id> <run_id>
诊断失败的运行记录
af tasks list <dag_id>
列出DAG中的任务
af tasks get <dag_id> <task_id>
获取任务定义
af tasks instance <dag_id> <run_id> <task_id>
获取任务实例
af tasks logs <dag_id> <run_id> <task_id>
获取任务日志
af config version
Airflow版本信息
af config show
查看完整配置
af config connections
列出连接
af config variables
列出变量
af config variable <key>
获取指定变量
af config pools
列出资源池
af config pool <name>
获取资源池详情
af config plugins
列出插件
af config providers
列出提供者
af config assets
列出资产/数据集
af api <endpoint>
直接访问REST API
af api ls
列出可用的API端点
af api ls --filter X
列出匹配指定模式的端点

User Intent Patterns

用户意图场景

DAG Operations

DAG操作

  • "What DAGs exist?" / "List all DAGs" ->
    af dags list
  • "Tell me about DAG X" / "What is DAG Y?" ->
    af dags explore <dag_id>
  • "What's the schedule for DAG X?" ->
    af dags get <dag_id>
  • "Show me the code for DAG X" ->
    af dags source <dag_id>
  • "Stop DAG X" / "Pause this workflow" ->
    af dags pause <dag_id>
  • "Resume DAG X" ->
    af dags unpause <dag_id>
  • "Are there any DAG errors?" ->
    af dags errors
  • "有哪些DAG?" / "列出所有DAG" ->
    af dags list
  • "告诉我DAG X的情况" / "DAG Y是什么?" ->
    af dags explore <dag_id>
  • "DAG X的调度周期是什么?" ->
    af dags get <dag_id>
  • "显示DAG X的代码" ->
    af dags source <dag_id>
  • "停止DAG X" / "暂停这个工作流" ->
    af dags pause <dag_id>
  • "恢复DAG X" ->
    af dags unpause <dag_id>
  • "有没有DAG错误?" ->
    af dags errors

Run Operations

运行记录操作

  • "What runs have executed?" ->
    af runs list
  • "Run DAG X" / "Trigger the pipeline" ->
    af runs trigger <dag_id>
  • "Run DAG X and wait" ->
    af runs trigger-wait <dag_id>
  • "Why did this run fail?" ->
    af runs diagnose <dag_id> <run_id>
  • "有哪些已执行的运行记录?" ->
    af runs list
  • "运行DAG X" / "触发管道" ->
    af runs trigger <dag_id>
  • "运行DAG X并等待完成" ->
    af runs trigger-wait <dag_id>
  • "这个运行记录为什么失败了?" ->
    af runs diagnose <dag_id> <run_id>

Task Operations

任务操作

  • "What tasks are in DAG X?" ->
    af tasks list <dag_id>
  • "Get task logs" / "Why did task fail?" ->
    af tasks logs <dag_id> <run_id> <task_id>
  • "DAG X中有哪些任务?" ->
    af tasks list <dag_id>
  • "获取任务日志" / "任务为什么失败?" ->
    af tasks logs <dag_id> <run_id> <task_id>

System Operations

系统操作

  • "What version of Airflow?" ->
    af config version
  • "What connections exist?" ->
    af config connections
  • "Are pools full?" ->
    af config pools
  • "Is Airflow healthy?" ->
    af health
  • "Airflow的版本是多少?" ->
    af config version
  • "有哪些连接?" ->
    af config connections
  • "资源池是否已满?" ->
    af config pools
  • "Airflow是否健康?" ->
    af health

API Exploration

API探索

  • "What API endpoints are available?" ->
    af api ls
  • "Find variable endpoints" ->
    af api ls --filter variable
  • "Access XCom values" / "Get XCom" ->
    af api xcom-entries -F dag_id=X -F task_id=Y
  • "Get event logs" / "Audit trail" ->
    af api event-logs -F dag_id=X
  • "Create connection via API" ->
    af api connections -X POST --body '{...}'
  • "Create variable via API" ->
    af api variables -X POST -F key=name -f value=val
  • "有哪些可用的API端点?" ->
    af api ls
  • "查找变量相关的端点" ->
    af api ls --filter variable
  • "访问XCom值" / "获取XCom" ->
    af api xcom-entries -F dag_id=X -F task_id=Y
  • "获取事件日志" / "审计追踪" ->
    af api event-logs -F dag_id=X
  • "通过API创建连接" ->
    af api connections -X POST --body '{...}'
  • "通过API创建变量" ->
    af api variables -X POST -F key=name -f value=val

Common Workflows

常见工作流

Investigate a Failed Run

调查失败的运行记录

bash
undefined
bash
undefined

1. List recent runs to find failure

1. 列出最近的运行记录以找到失败项

af runs list --dag-id my_dag
af runs list --dag-id my_dag

2. Diagnose the specific run

2. 诊断特定的运行记录

af runs diagnose my_dag manual__2024-01-15T10:00:00+00:00
af runs diagnose my_dag manual__2024-01-15T10:00:00+00:00

3. Get logs for failed task (from diagnose output)

3. 获取失败任务的日志(来自诊断输出)

af tasks logs my_dag manual__2024-01-15T10:00:00+00:00 extract_data
undefined
af tasks logs my_dag manual__2024-01-15T10:00:00+00:00 extract_data
undefined

Morning Health Check

晨间健康检查

bash
undefined
bash
undefined

1. Overall system health

1. 整体系统健康状况

af health
af health

2. Check for broken DAGs

2. 检查损坏的DAG

af dags errors
af dags errors

3. Check pool utilization

3. 检查资源池使用率

af config pools
undefined
af config pools
undefined

Understand a DAG

了解某个DAG

bash
undefined
bash
undefined

Get comprehensive overview (metadata + tasks + source)

获取全面概述(元数据 + 任务 + 源代码)

af dags explore my_dag
undefined
af dags explore my_dag
undefined

Check Why DAG Isn't Running

排查DAG未运行的原因

bash
undefined
bash
undefined

Check if paused

检查是否已暂停

af dags get my_dag
af dags get my_dag

Check for import errors

检查导入错误

af dags errors
af dags errors

Check recent runs

检查最近的运行记录

af runs list --dag-id my_dag
undefined
af runs list --dag-id my_dag
undefined

Trigger and Monitor

触发并监控

bash
undefined
bash
undefined

Option 1: Trigger and wait (blocking)

选项1:触发并等待(阻塞式)

af runs trigger-wait my_dag --timeout 1800
af runs trigger-wait my_dag --timeout 1800

Option 2: Trigger and check later

选项2:触发后稍后检查

af runs trigger my_dag af runs get my_dag <run_id>
undefined
af runs trigger my_dag af runs get my_dag <run_id>
undefined

Output Format

输出格式

All commands output JSON (except
instance
commands which use human-readable tables):
bash
af dags list
所有命令均输出JSON格式内容(
instance
命令除外,其使用易读的表格形式):
bash
af dags list

{

{

"total_dags": 5,

"total_dags": 5,

"returned_count": 5,

"returned_count": 5,

"dags": [...]

"dags": [...]

}

}


Use `jq` for filtering:

```bash

使用`jq`进行过滤:

```bash

Find failed runs

查找失败的运行记录

af runs list | jq '.dag_runs[] | select(.state == "failed")'
af runs list | jq '.dag_runs[] | select(.state == "failed")'

Get DAG IDs only

仅获取DAG ID

af dags list | jq '.dags[].dag_id'
af dags list | jq '.dags[].dag_id'

Find paused DAGs

查找已暂停的DAG

af dags list | jq '[.dags[] | select(.is_paused == true)]'
undefined
af dags list | jq '[.dags[] | select(.is_paused == true)]'
undefined

Task Logs Options

任务日志选项

bash
undefined
bash
undefined

Get logs for specific retry attempt

获取特定重试次数的日志

af tasks logs my_dag run_id task_id --try 2
af tasks logs my_dag run_id task_id --try 2

Get logs for mapped task index

获取映射任务指定索引的日志

af tasks logs my_dag run_id task_id --map-index 5
undefined
af tasks logs my_dag run_id task_id --map-index 5
undefined

Direct API Access with
af api

使用
af api
直接访问API

Use
af api
for endpoints not covered by high-level commands (XCom, event-logs, backfills, etc).
bash
undefined
对于高级命令未覆盖的端点(如XCom、事件日志、回填等),使用
af api
进行访问。
bash
undefined

Discover available endpoints

发现可用的端点

af api ls af api ls --filter variable
af api ls af api ls --filter variable

Basic usage

基本用法

af api dags af api dags -F limit=10 -F only_active=true af api variables -X POST -F key=my_var -f value="my value" af api variables/old_var -X DELETE

**Field syntax**: `-F key=value` auto-converts types, `-f key=value` keeps as string.

**Full reference**: See [api-reference.md](api-reference.md) for all options, common endpoints (XCom, event-logs, backfills), and examples.
af api dags af api dags -F limit=10 -F only_active=true af api variables -X POST -F key=my_var -f value="my value" af api variables/old_var -X DELETE

**字段语法**:`-F key=value`会自动转换类型,`-f key=value`保持字符串类型。

**完整参考**:查看[api-reference.md](api-reference.md)获取所有选项、常见端点(XCom、事件日志、回填等)及示例。

Related Skills

相关技能

  • testing-dags
    - Test DAGs with debugging and fixing cycles
  • debugging-dags
    - Comprehensive DAG failure diagnosis and root cause analysis
  • authoring-dags
    - Creating and editing DAG files with best practices
  • managing-astro-local-env
    - Starting/stopping local Airflow environment
  • testing-dags
    - 通过调试和修复周期测试DAG
  • debugging-dags
    - 全面的DAG失败诊断与根因分析
  • authoring-dags
    - 遵循最佳实践创建和编辑DAG文件
  • managing-astro-local-env
    - 启动/停止本地Airflow环境