airflow-hitl
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAirflow Human-in-the-Loop Operators
Airflow 人工介入操作符
Implement human approval gates, form inputs, and human-driven branching in Airflow DAGs using the HITL operators. These deferrable operators pause workflow execution until a human responds via the Airflow UI or REST API.
在Airflow DAG中使用HITL操作符实现人工审批关卡、表单输入和人工驱动的分支。这些可延迟操作符会暂停工作流执行,直到人工通过Airflow UI或REST API做出响应。
Implementation Checklist
实施检查清单
Execute steps in order. Prefer deferrable HITL operators over custom sensors/polling loops.
CRITICAL: Requires Airflow 3.1+. NOT available in Airflow 2.x.Deferrable: All HITL operators are deferrable—they release their worker slot while waiting for human input.UI Location: View pending actions at Browse → Required Actions in Airflow UI. Respond via the task instance page's Required Actions tab or the REST API.Cross-reference: For AI/LLM calls, see the airflow-ai skill.
按顺序执行步骤。优先使用可延迟的HITL操作符,而非自定义传感器/轮询循环。
重要提示:要求Airflow 3.1及以上版本。Airflow 2.x版本不支持。可延迟:所有HITL操作符均为可延迟类型——在等待人工输入时会释放其工作进程槽位。UI位置:在Airflow UI的浏览 → 待处理操作中查看待执行操作。可通过任务实例页面的待处理操作标签页或REST API进行响应。交叉参考:AI/LLM调用相关内容请查看airflow-ai技能文档。
Step 1: Choose operator
步骤1:选择操作符
| Operator | Human action | Outcome |
|---|---|---|
| Approve or Reject | Reject causes downstream tasks to be skipped (approval task itself succeeds) |
| Select option(s) + form | Returns selections |
| Select downstream task(s) | Runs selected, skips others |
| Submit form | Returns form data |
| 操作符 | 人工操作 | 结果 |
|---|---|---|
| 审批或拒绝 | 拒绝会导致下游任务被跳过(审批任务本身仍会成功) |
| 选择选项+填写表单 | 返回所选内容 |
| 选择下游任务 | 运行所选任务,跳过其他任务 |
| 提交表单 | 返回表单数据 |
Step 2: Implement operator
步骤2:实现操作符
ApprovalOperator
ApprovalOperator
python
from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
@task
def prepare():
return "Review quarterly report"
approval = ApprovalOperator(
task_id="approve_report",
subject="Report Approval",
body="{{ ti.xcom_pull(task_ids='prepare') }}",
defaults="Approve", # Optional: auto on timeout
params={"comments": Param("", type="string")},
)
@task
def after_approval(result):
print(f"Decision: {result['chosen_options']}")
chain(prepare(), approval)
after_approval(approval.output)
approval_example()python
from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
@task
def prepare():
return "Review quarterly report"
approval = ApprovalOperator(
task_id="approve_report",
subject="Report Approval",
body="{{ ti.xcom_pull(task_ids='prepare') }}",
defaults="Approve", # Optional: auto on timeout
params={"comments": Param("", type="string")},
)
@task
def after_approval(result):
print(f"Decision: {result['chosen_options']}")
chain(prepare(), approval)
after_approval(approval.output)
approval_example()HITLOperator
HITLOperator
Required parameters:andsubject.options
python
from airflow.providers.standard.operators.hitl import HITLOperator
from airflow.sdk import dag, task, chain, Param
from datetime import timedelta
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def hitl_example():
hitl = HITLOperator(
task_id="select_option",
subject="Select Payment Method",
body="Choose how to process payment",
options=["ACH", "Wire", "Check"], # REQUIRED
defaults=["ACH"],
multiple=False,
execution_timeout=timedelta(hours=4),
params={"amount": Param(1000, type="number")},
)
@task
def process(result):
print(f"Selected: {result['chosen_options']}")
print(f"Amount: {result['params_input']['amount']}")
process(hitl.output)
hitl_example()必填参数:和subject。options
python
from airflow.providers.standard.operators.hitl import HITLOperator
from airflow.sdk import dag, task, chain, Param
from datetime import timedelta
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def hitl_example():
hitl = HITLOperator(
task_id="select_option",
subject="Select Payment Method",
body="Choose how to process payment",
options=["ACH", "Wire", "Check"], # REQUIRED
defaults=["ACH"],
multiple=False,
execution_timeout=timedelta(hours=4),
params={"amount": Param(1000, type="number")},
)
@task
def process(result):
print(f"Selected: {result['chosen_options']}")
print(f"Amount: {result['params_input']['amount']}")
process(hitl.output)
hitl_example()HITLBranchOperator
HITLBranchOperator
IMPORTANT: Options can either:
- Directly match downstream task IDs - simpler approach
- Use
- for human-friendly labels that map to task IDsoptions_mapping
python
from airflow.providers.standard.operators.hitl import HITLBranchOperator
from airflow.sdk import dag, task, chain
from pendulum import datetime
DEPTS = ["marketing", "engineering", "sales"]
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def branch_example():
branch = HITLBranchOperator(
task_id="select_dept",
subject="Select Departments",
options=[f"Fund {d}" for d in DEPTS],
options_mapping={f"Fund {d}": d for d in DEPTS},
multiple=True,
)
for dept in DEPTS:
@task(task_id=dept)
def handle(dept_name: str = dept):
# Bind the loop variable at definition time to avoid late-binding bugs
print(f"Processing {dept_name}")
chain(branch, handle())
branch_example()重要说明:选项可采用以下两种方式:
- 直接匹配下游任务ID - 更简单的方式
- 使用
- 为人性化标签映射到任务IDoptions_mapping
python
from airflow.providers.standard.operators.hitl import HITLBranchOperator
from airflow.sdk import dag, task, chain
from pendulum import datetime
DEPTS = ["marketing", "engineering", "sales"]
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def branch_example():
branch = HITLBranchOperator(
task_id="select_dept",
subject="Select Departments",
options=[f"Fund {d}" for d in DEPTS],
options_mapping={f"Fund {d}": d for d in DEPTS},
multiple=True,
)
for dept in DEPTS:
@task(task_id=dept)
def handle(dept_name: str = dept):
# Bind the loop variable at definition time to avoid late-binding bugs
print(f"Processing {dept_name}")
chain(branch, handle())
branch_example()HITLEntryOperator
HITLEntryOperator
python
from airflow.providers.standard.operators.hitl import HITLEntryOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def entry_example():
entry = HITLEntryOperator(
task_id="get_input",
subject="Enter Details",
body="Provide response",
params={
"response": Param("", type="string"),
"priority": Param("p3", type="string"),
},
)
@task
def process(result):
print(f"Response: {result['params_input']['response']}")
process(entry.output)
entry_example()python
from airflow.providers.standard.operators.hitl import HITLEntryOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def entry_example():
entry = HITLEntryOperator(
task_id="get_input",
subject="Enter Details",
body="Provide response",
params={
"response": Param("", type="string"),
"priority": Param("p3", type="string"),
},
)
@task
def process(result):
print(f"Response: {result['params_input']['response']}")
process(entry.output)
entry_example()Step 3: Optional features
步骤3:可选功能
Notifiers
通知器
python
from airflow.sdk import BaseNotifier, Context
from airflow.providers.standard.operators.hitl import HITLOperator
class MyNotifier(BaseNotifier):
template_fields = ("message",)
def __init__(self, message=""): self.message = message
def notify(self, context: Context):
if context["ti"].state == "running":
url = HITLOperator.generate_link_to_ui_from_context(context, base_url="https://airflow.example.com")
self.log.info(f"Action needed: {url}")
hitl = HITLOperator(..., notifiers=[MyNotifier("{{ task.subject }}")])python
from airflow.sdk import BaseNotifier, Context
from airflow.providers.standard.operators.hitl import HITLOperator
class MyNotifier(BaseNotifier):
template_fields = ("message",)
def __init__(self, message=""): self.message = message
def notify(self, context: Context):
if context["ti"].state == "running":
url = HITLOperator.generate_link_to_ui_from_context(context, base_url="https://airflow.example.com")
self.log.info(f"Action needed: {url}")
hitl = HITLOperator(..., notifiers=[MyNotifier("{{ task.subject }}")])Restrict respondents
限制响应人员
Format depends on your auth manager:
| Auth Manager | Format | Example |
|---|---|---|
| SimpleAuthManager | Username | |
| FabAuthManager | | |
| Astro | Astro ID | |
Astro Users: Find Astro ID at Organization → Access Management.
python
hitl = HITLOperator(..., respondents=["manager@example.com"]) # FabAuthManager格式取决于您使用的认证管理器:
| 认证管理器 | 格式 | 示例 |
|---|---|---|
| SimpleAuthManager | 用户名 | |
| FabAuthManager | 邮箱 | |
| Astro | Astro ID | |
Astro用户:在组织 → 访问管理中查找Astro ID。
python
hitl = HITLOperator(..., respondents=["manager@example.com"]) # FabAuthManagerTimeout behavior
超时行为
- With : Task succeeds, default option(s) selected
defaults - Without : Task fails on timeout
defaults
python
hitl = HITLOperator(
...,
options=["Option A", "Option B"],
defaults=["Option A"], # Auto-selected on timeout
execution_timeout=timedelta(hours=4),
)- 设置:任务成功,自动选择默认选项
defaults - 未设置:超时后任务失败
defaults
python
hitl = HITLOperator(
...,
options=["Option A", "Option B"],
defaults=["Option A"], # Auto-selected on timeout
execution_timeout=timedelta(hours=4),
)Markdown in body
正文中的Markdown支持
The parameter supports markdown formatting and is Jinja templatable:
bodypython
hitl = HITLOperator(
...,
body="""**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}
| Category | Amount |
|----------|--------|
| Marketing | $1M |
""",
)bodypython
hitl = HITLOperator(
...,
body="""**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}
| Category | Amount |
|----------|--------|
| Marketing | $1M |
""",
)Callbacks
回调函数
All HITL operators support standard Airflow callbacks:
python
def on_hitl_failure(context):
print(f"HITL task failed: {context['task_instance'].task_id}")
def on_hitl_success(context):
print(f"HITL task succeeded with: {context['task_instance'].xcom_pull()}")
hitl = HITLOperator(
task_id="approval_required",
subject="Review needed",
options=["Approve", "Reject"],
on_failure_callback=on_hitl_failure,
on_success_callback=on_hitl_success,
)所有HITL操作符均支持标准Airflow回调:
python
def on_hitl_failure(context):
print(f"HITL task failed: {context['task_instance'].task_id}")
def on_hitl_success(context):
print(f"HITL task succeeded with: {context['task_instance'].xcom_pull()}")
hitl = HITLOperator(
task_id="approval_required",
subject="Review needed",
options=["Approve", "Reject"],
on_failure_callback=on_hitl_failure,
on_success_callback=on_hitl_success,
)Step 4: API integration
步骤4:API集成
For external responders (Slack, custom app):
python
import requests, os
HOST = os.getenv("AIRFLOW_HOST")
TOKEN = os.getenv("AIRFLOW_API_TOKEN")针对外部响应者(如Slack、自定义应用):
python
import requests, os
HOST = os.getenv("AIRFLOW_HOST")
TOKEN = os.getenv("AIRFLOW_API_TOKEN")Get pending actions
Get pending actions
r = requests.get(f"{HOST}/api/v2/hitlDetails/?state=pending",
headers={"Authorization": f"Bearer {TOKEN}"})
r = requests.get(f"{HOST}/api/v2/hitlDetails/?state=pending",
headers={"Authorization": f"Bearer {TOKEN}"})
Respond
Respond
requests.patch(
f"{HOST}/api/v2/hitlDetails/{dag_id}/{run_id}/{task_id}",
headers={"Authorization": f"Bearer {TOKEN}"},
json={"chosen_options": ["ACH"], "params_input": {"amount": 1500}}
)
---requests.patch(
f"{HOST}/api/v2/hitlDetails/{dag_id}/{run_id}/{task_id}",
headers={"Authorization": f"Bearer {TOKEN}"},
json={"chosen_options": ["ACH"], "params_input": {"amount": 1500}}
)
---Step 5: Safety checks
步骤5:安全检查
Before finalizing, verify:
- Airflow 3.1+ installed
- For : options map to downstream task IDs
HITLBranchOperator - values are in
defaultslistoptions - API token configured if using external responders
完成前请验证:
- 已安装Airflow 3.1及以上版本
- 若使用:选项需映射到下游任务ID
HITLBranchOperator - 值存在于
defaults列表中options - 若使用外部响应者,需配置API令牌
Reference
参考资料
- Airflow HITL Operators: https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/operators/hitl.html
Related Skills
相关技能
- airflow-ai: For AI/LLM task decorators and GenAI patterns
- authoring-dags: For general DAG writing best practices
- testing-dags: For testing DAGs with debugging cycles
- airflow-ai:AI/LLM任务装饰器和生成式AI模式相关内容
- authoring-dags:通用DAG编写最佳实践相关内容
- testing-dags:带调试周期的DAG测试相关内容