airflow-hitl

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Airflow Human-in-the-Loop Operators

Airflow 人工介入操作符

Implement human approval gates, form inputs, and human-driven branching in Airflow DAGs using the HITL operators. These deferrable operators pause workflow execution until a human responds via the Airflow UI or REST API.
在Airflow DAG中使用HITL操作符实现人工审批关卡、表单输入和人工驱动的分支。这些可延迟操作符会暂停工作流执行,直到人工通过Airflow UI或REST API做出响应。

Implementation Checklist

实施检查清单

Execute steps in order. Prefer deferrable HITL operators over custom sensors/polling loops.
CRITICAL: Requires Airflow 3.1+. NOT available in Airflow 2.x.
Deferrable: All HITL operators are deferrable—they release their worker slot while waiting for human input.
UI Location: View pending actions at Browse → Required Actions in Airflow UI. Respond via the task instance page's Required Actions tab or the REST API.
Cross-reference: For AI/LLM calls, see the airflow-ai skill.

按顺序执行步骤。优先使用可延迟的HITL操作符,而非自定义传感器/轮询循环。
重要提示:要求Airflow 3.1及以上版本。Airflow 2.x版本不支持。
可延迟:所有HITL操作符均为可延迟类型——在等待人工输入时会释放其工作进程槽位。
UI位置:在Airflow UI的浏览 → 待处理操作中查看待执行操作。可通过任务实例页面的待处理操作标签页或REST API进行响应。
交叉参考:AI/LLM调用相关内容请查看airflow-ai技能文档。

Step 1: Choose operator

步骤1:选择操作符

OperatorHuman actionOutcome
ApprovalOperator
Approve or RejectReject causes downstream tasks to be skipped (approval task itself succeeds)
HITLOperator
Select option(s) + formReturns selections
HITLBranchOperator
Select downstream task(s)Runs selected, skips others
HITLEntryOperator
Submit formReturns form data

操作符人工操作结果
ApprovalOperator
审批或拒绝拒绝会导致下游任务被跳过(审批任务本身仍会成功)
HITLOperator
选择选项+填写表单返回所选内容
HITLBranchOperator
选择下游任务运行所选任务,跳过其他任务
HITLEntryOperator
提交表单返回表单数据

Step 2: Implement operator

步骤2:实现操作符

ApprovalOperator

ApprovalOperator

python
from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
    @task
    def prepare():
        return "Review quarterly report"

    approval = ApprovalOperator(
        task_id="approve_report",
        subject="Report Approval",
        body="{{ ti.xcom_pull(task_ids='prepare') }}",
        defaults="Approve",  # Optional: auto on timeout
        params={"comments": Param("", type="string")},
    )

    @task
    def after_approval(result):
        print(f"Decision: {result['chosen_options']}")

    chain(prepare(), approval)
    after_approval(approval.output)

approval_example()
python
from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
    @task
    def prepare():
        return "Review quarterly report"

    approval = ApprovalOperator(
        task_id="approve_report",
        subject="Report Approval",
        body="{{ ti.xcom_pull(task_ids='prepare') }}",
        defaults="Approve",  # Optional: auto on timeout
        params={"comments": Param("", type="string")},
    )

    @task
    def after_approval(result):
        print(f"Decision: {result['chosen_options']}")

    chain(prepare(), approval)
    after_approval(approval.output)

approval_example()

HITLOperator

HITLOperator

Required parameters:
subject
and
options
.
python
from airflow.providers.standard.operators.hitl import HITLOperator
from airflow.sdk import dag, task, chain, Param
from datetime import timedelta
from pendulum import datetime

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def hitl_example():
    hitl = HITLOperator(
        task_id="select_option",
        subject="Select Payment Method",
        body="Choose how to process payment",
        options=["ACH", "Wire", "Check"],  # REQUIRED
        defaults=["ACH"],
        multiple=False,
        execution_timeout=timedelta(hours=4),
        params={"amount": Param(1000, type="number")},
    )

    @task
    def process(result):
        print(f"Selected: {result['chosen_options']}")
        print(f"Amount: {result['params_input']['amount']}")

    process(hitl.output)

hitl_example()
必填参数
subject
options
python
from airflow.providers.standard.operators.hitl import HITLOperator
from airflow.sdk import dag, task, chain, Param
from datetime import timedelta
from pendulum import datetime

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def hitl_example():
    hitl = HITLOperator(
        task_id="select_option",
        subject="Select Payment Method",
        body="Choose how to process payment",
        options=["ACH", "Wire", "Check"],  # REQUIRED
        defaults=["ACH"],
        multiple=False,
        execution_timeout=timedelta(hours=4),
        params={"amount": Param(1000, type="number")},
    )

    @task
    def process(result):
        print(f"Selected: {result['chosen_options']}")
        print(f"Amount: {result['params_input']['amount']}")

    process(hitl.output)

hitl_example()

HITLBranchOperator

HITLBranchOperator

IMPORTANT: Options can either:
  1. Directly match downstream task IDs - simpler approach
  2. Use
    options_mapping
    - for human-friendly labels that map to task IDs
python
from airflow.providers.standard.operators.hitl import HITLBranchOperator
from airflow.sdk import dag, task, chain
from pendulum import datetime

DEPTS = ["marketing", "engineering", "sales"]

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def branch_example():
    branch = HITLBranchOperator(
        task_id="select_dept",
        subject="Select Departments",
        options=[f"Fund {d}" for d in DEPTS],
        options_mapping={f"Fund {d}": d for d in DEPTS},
        multiple=True,
    )

    for dept in DEPTS:
        @task(task_id=dept)
        def handle(dept_name: str = dept):
            # Bind the loop variable at definition time to avoid late-binding bugs
            print(f"Processing {dept_name}")
        chain(branch, handle())

branch_example()
重要说明:选项可采用以下两种方式:
  1. 直接匹配下游任务ID - 更简单的方式
  2. 使用
    options_mapping
    - 为人性化标签映射到任务ID
python
from airflow.providers.standard.operators.hitl import HITLBranchOperator
from airflow.sdk import dag, task, chain
from pendulum import datetime

DEPTS = ["marketing", "engineering", "sales"]

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def branch_example():
    branch = HITLBranchOperator(
        task_id="select_dept",
        subject="Select Departments",
        options=[f"Fund {d}" for d in DEPTS],
        options_mapping={f"Fund {d}": d for d in DEPTS},
        multiple=True,
    )

    for dept in DEPTS:
        @task(task_id=dept)
        def handle(dept_name: str = dept):
            # Bind the loop variable at definition time to avoid late-binding bugs
            print(f"Processing {dept_name}")
        chain(branch, handle())

branch_example()

HITLEntryOperator

HITLEntryOperator

python
from airflow.providers.standard.operators.hitl import HITLEntryOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def entry_example():
    entry = HITLEntryOperator(
        task_id="get_input",
        subject="Enter Details",
        body="Provide response",
        params={
            "response": Param("", type="string"),
            "priority": Param("p3", type="string"),
        },
    )

    @task
    def process(result):
        print(f"Response: {result['params_input']['response']}")

    process(entry.output)

entry_example()

python
from airflow.providers.standard.operators.hitl import HITLEntryOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def entry_example():
    entry = HITLEntryOperator(
        task_id="get_input",
        subject="Enter Details",
        body="Provide response",
        params={
            "response": Param("", type="string"),
            "priority": Param("p3", type="string"),
        },
    )

    @task
    def process(result):
        print(f"Response: {result['params_input']['response']}")

    process(entry.output)

entry_example()

Step 3: Optional features

步骤3:可选功能

Notifiers

通知器

python
from airflow.sdk import BaseNotifier, Context
from airflow.providers.standard.operators.hitl import HITLOperator

class MyNotifier(BaseNotifier):
    template_fields = ("message",)
    def __init__(self, message=""): self.message = message
    def notify(self, context: Context):
        if context["ti"].state == "running":
            url = HITLOperator.generate_link_to_ui_from_context(context, base_url="https://airflow.example.com")
            self.log.info(f"Action needed: {url}")

hitl = HITLOperator(..., notifiers=[MyNotifier("{{ task.subject }}")])
python
from airflow.sdk import BaseNotifier, Context
from airflow.providers.standard.operators.hitl import HITLOperator

class MyNotifier(BaseNotifier):
    template_fields = ("message",)
    def __init__(self, message=""): self.message = message
    def notify(self, context: Context):
        if context["ti"].state == "running":
            url = HITLOperator.generate_link_to_ui_from_context(context, base_url="https://airflow.example.com")
            self.log.info(f"Action needed: {url}")

hitl = HITLOperator(..., notifiers=[MyNotifier("{{ task.subject }}")])

Restrict respondents

限制响应人员

Format depends on your auth manager:
Auth ManagerFormatExample
SimpleAuthManagerUsername
["admin", "manager"]
FabAuthManagerEmail
["manager@example.com"]
AstroAstro ID
["cl1a2b3cd456789ef1gh2ijkl3"]
Astro Users: Find Astro ID at Organization → Access Management.
python
hitl = HITLOperator(..., respondents=["manager@example.com"])  # FabAuthManager
格式取决于您使用的认证管理器:
认证管理器格式示例
SimpleAuthManager用户名
["admin", "manager"]
FabAuthManager邮箱
["manager@example.com"]
AstroAstro ID
["cl1a2b3cd456789ef1gh2ijkl3"]
Astro用户:在组织 → 访问管理中查找Astro ID。
python
hitl = HITLOperator(..., respondents=["manager@example.com"])  # FabAuthManager

Timeout behavior

超时行为

  • With
    defaults
    : Task succeeds, default option(s) selected
  • Without
    defaults
    : Task fails on timeout
python
hitl = HITLOperator(
    ...,
    options=["Option A", "Option B"],
    defaults=["Option A"],  # Auto-selected on timeout
    execution_timeout=timedelta(hours=4),
)
  • 设置
    defaults
    :任务成功,自动选择默认选项
  • 未设置
    defaults
    :超时后任务失败
python
hitl = HITLOperator(
    ...,
    options=["Option A", "Option B"],
    defaults=["Option A"],  # Auto-selected on timeout
    execution_timeout=timedelta(hours=4),
)

Markdown in body

正文中的Markdown支持

The
body
parameter supports markdown formatting and is Jinja templatable:
python
hitl = HITLOperator(
    ...,
    body="""**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}

| Category | Amount |
|----------|--------|
| Marketing | $1M |
""",
)
body
参数支持Markdown格式且支持Jinja模板
python
hitl = HITLOperator(
    ...,
    body="""**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}

| Category | Amount |
|----------|--------|
| Marketing | $1M |
""",
)

Callbacks

回调函数

All HITL operators support standard Airflow callbacks:
python
def on_hitl_failure(context):
    print(f"HITL task failed: {context['task_instance'].task_id}")

def on_hitl_success(context):
    print(f"HITL task succeeded with: {context['task_instance'].xcom_pull()}")

hitl = HITLOperator(
    task_id="approval_required",
    subject="Review needed",
    options=["Approve", "Reject"],
    on_failure_callback=on_hitl_failure,
    on_success_callback=on_hitl_success,
)

所有HITL操作符均支持标准Airflow回调:
python
def on_hitl_failure(context):
    print(f"HITL task failed: {context['task_instance'].task_id}")

def on_hitl_success(context):
    print(f"HITL task succeeded with: {context['task_instance'].xcom_pull()}")

hitl = HITLOperator(
    task_id="approval_required",
    subject="Review needed",
    options=["Approve", "Reject"],
    on_failure_callback=on_hitl_failure,
    on_success_callback=on_hitl_success,
)

Step 4: API integration

步骤4:API集成

For external responders (Slack, custom app):
python
import requests, os

HOST = os.getenv("AIRFLOW_HOST")
TOKEN = os.getenv("AIRFLOW_API_TOKEN")
针对外部响应者(如Slack、自定义应用):
python
import requests, os

HOST = os.getenv("AIRFLOW_HOST")
TOKEN = os.getenv("AIRFLOW_API_TOKEN")

Get pending actions

Get pending actions

r = requests.get(f"{HOST}/api/v2/hitlDetails/?state=pending", headers={"Authorization": f"Bearer {TOKEN}"})
r = requests.get(f"{HOST}/api/v2/hitlDetails/?state=pending", headers={"Authorization": f"Bearer {TOKEN}"})

Respond

Respond

requests.patch( f"{HOST}/api/v2/hitlDetails/{dag_id}/{run_id}/{task_id}", headers={"Authorization": f"Bearer {TOKEN}"}, json={"chosen_options": ["ACH"], "params_input": {"amount": 1500}} )

---
requests.patch( f"{HOST}/api/v2/hitlDetails/{dag_id}/{run_id}/{task_id}", headers={"Authorization": f"Bearer {TOKEN}"}, json={"chosen_options": ["ACH"], "params_input": {"amount": 1500}} )

---

Step 5: Safety checks

步骤5:安全检查

Before finalizing, verify:
  • Airflow 3.1+ installed
  • For
    HITLBranchOperator
    : options map to downstream task IDs
  • defaults
    values are in
    options
    list
  • API token configured if using external responders

完成前请验证:
  • 已安装Airflow 3.1及以上版本
  • 若使用
    HITLBranchOperator
    :选项需映射到下游任务ID
  • defaults
    值存在于
    options
    列表中
  • 若使用外部响应者,需配置API令牌

Reference

参考资料

Related Skills

相关技能

  • airflow-ai: For AI/LLM task decorators and GenAI patterns
  • authoring-dags: For general DAG writing best practices
  • testing-dags: For testing DAGs with debugging cycles
  • airflow-ai:AI/LLM任务装饰器和生成式AI模式相关内容
  • authoring-dags:通用DAG编写最佳实践相关内容
  • testing-dags:带调试周期的DAG测试相关内容