apache-airflow-orchestration

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Apache Airflow Orchestration

Apache Airflow 工作流编排

A comprehensive skill for mastering Apache Airflow workflow orchestration. This skill covers DAG development, operators, sensors, task dependencies, dynamic workflows, XCom communication, scheduling patterns, and production deployment strategies.
这是一份掌握Apache Airflow工作流编排的全面指南,涵盖DAG开发、操作符、传感器、任务依赖、动态工作流、XCom通信、调度模式以及生产部署策略。

When to Use This Skill

何时使用本指南

Use this skill when:
  • Building and managing complex data pipelines with task dependencies
  • Orchestrating ETL/ELT workflows across multiple systems
  • Scheduling and monitoring batch processing jobs
  • Coordinating multi-step data transformations
  • Managing workflows with conditional execution and branching
  • Implementing event-driven or asset-based workflows
  • Deploying production-grade workflow automation
  • Creating dynamic workflows that generate tasks programmatically
  • Coordinating distributed task execution across clusters
  • Building data engineering platforms with workflow orchestration
在以下场景使用本指南:
  • 构建和管理带有任务依赖的复杂数据管道
  • 跨多个系统编排ETL/ELT工作流
  • 调度和监控批处理作业
  • 协调多步骤数据转换
  • 管理带有条件执行和分支的工作流
  • 实现事件驱动或基于资产的工作流
  • 部署生产级工作流自动化
  • 创建以编程方式生成任务的动态工作流
  • 协调跨集群的分布式任务执行
  • 构建带有工作流编排的数据工程平台

Core Concepts

核心概念

What is Apache Airflow?

什么是Apache Airflow?

Apache Airflow is an open-source platform for programmatically authoring, scheduling, and monitoring workflows. It allows you to define workflows as Directed Acyclic Graphs (DAGs) using Python code, making complex workflow orchestration maintainable and version-controlled.
Key Principles:
  • Dynamic: Workflows are defined in Python, enabling dynamic generation
  • Extensible: Rich ecosystem of operators, sensors, and hooks
  • Scalable: Can scale from single machine to large clusters
  • Observable: Comprehensive UI for monitoring and troubleshooting
Apache Airflow是一个开源平台,用于以编程方式创作、调度和监控工作流。它允许你使用Python代码将工作流定义为有向无环图(DAG),使复杂的工作流编排易于维护且可版本控制。
核心原则:
  • 动态性:工作流用Python定义,支持动态生成
  • 可扩展性:拥有丰富的操作符、传感器和钩子生态系统
  • 可伸缩性:可从单机器扩展到大型集群
  • 可观测性:具备用于监控和故障排查的全面UI

DAGs (Directed Acyclic Graphs)

DAG(有向无环图)

A DAG is a collection of tasks organized to reflect their relationships and dependencies.
DAG Properties:
  • dag_id: Unique identifier for the DAG
  • start_date: When the DAG should start being scheduled
  • schedule: How often to run (cron, timedelta, or asset-based)
  • catchup: Whether to run missed intervals on DAG activation
  • tags: Labels for organization and filtering
  • default_args: Default parameters for all tasks in the DAG
DAG Definition Example:
python
from datetime import datetime
from airflow.sdk import DAG

with DAG(
    dag_id="example_dag",
    start_date=datetime(2022, 1, 1),
    schedule="0 0 * * *",  # Daily at midnight
    catchup=False,
    tags=["example", "tutorial"],
) as dag:
    # Tasks defined here
    pass
DAG是一组按关系和依赖组织的任务集合。
DAG属性:
  • dag_id:DAG的唯一标识符
  • start_date:DAG开始调度的时间
  • schedule:运行频率(cron表达式、timedelta或基于资产)
  • catchup:激活DAG时是否运行错过的时间间隔
  • tags:用于组织和过滤的标签
  • default_args:DAG中所有任务的默认参数
DAG定义示例:
python
from datetime import datetime
from airflow.sdk import DAG

with DAG(
    dag_id="example_dag",
    start_date=datetime(2022, 1, 1),
    schedule="0 0 * * *",  # 每日午夜
    catchup=False,
    tags=["example", "tutorial"],
) as dag:
    # 在此定义任务
    pass

Tasks and Operators

任务与操作符

Tasks are the basic units of execution in Airflow. Operators are templates for creating tasks.
Common Operator Types:
  1. BashOperator: Execute bash commands
  2. PythonOperator: Execute Python functions
  3. EmailOperator: Send emails
  4. EmptyOperator: Placeholder/dummy tasks
  5. Custom Operators: User-defined operators for specific needs
Operator vs. Task:
  • Operator: Template/class definition
  • Task: Instantiation of an operator with specific parameters
任务是Airflow中的基本执行单元。操作符是用于创建任务的模板。
常见操作符类型:
  1. BashOperator:执行bash命令
  2. PythonOperator:执行Python函数
  3. EmailOperator:发送邮件
  4. EmptyOperator:占位符/虚拟任务
  5. 自定义操作符:针对特定需求的用户自定义操作符
操作符 vs 任务:
  • 操作符:模板/类定义
  • 任务:带有特定参数的操作符实例

Task Dependencies

任务依赖

Task dependencies define the execution order and workflow structure.
Dependency Operators:
  • >>
    : Sets downstream dependency (task1 >> task2)
  • <<
    : Sets upstream dependency (task2 << task1)
  • chain()
    : Sequential dependencies for multiple tasks
  • cross_downstream()
    : Many-to-many relationships
Dependency Examples:
python
undefined
任务依赖定义了执行顺序和工作流结构。
依赖操作符:
  • >>
    :设置下游依赖(task1 >> task2)
  • <<
    :设置上游依赖(task2 << task1)
  • chain()
    :多个任务的顺序依赖
  • cross_downstream()
    :多对多关系
依赖示例:
python
undefined

Simple linear flow

简单线性流程

task1 >> task2 >> task3
task1 >> task2 >> task3

Fan-out pattern

扇出模式

task1 >> [task2, task3, task4]
task1 >> [task2, task3, task4]

Fan-in pattern

扇入模式

[task1, task2, task3] >> task4
[task1, task2, task3] >> task4

Complex dependencies

复杂依赖

first_task >> [second_task, third_task] third_task << fourth_task
undefined
first_task >> [second_task, third_task] third_task << fourth_task
undefined

Executors

执行器

Executors determine how and where tasks run.
Executor Types:
  • SequentialExecutor: Single-threaded, local (default, not for production)
  • LocalExecutor: Multi-threaded, single machine
  • CeleryExecutor: Distributed execution using Celery
  • KubernetesExecutor: Each task runs in a separate Kubernetes pod
  • DaskExecutor: Distributed execution using Dask
执行器决定任务的运行方式和位置。
执行器类型:
  • SequentialExecutor:单线程,本地运行(默认,不用于生产环境)
  • LocalExecutor:多线程,单机器运行
  • CeleryExecutor:使用Celery的分布式执行
  • KubernetesExecutor:每个任务在独立的Kubernetes Pod中运行
  • DaskExecutor:使用Dask的分布式执行

Scheduler

调度器

The Airflow scheduler:
  • Monitors all DAGs and their tasks
  • Triggers task instances based on dependencies and schedules
  • Submits tasks to executors for execution
  • Handles retries and task state management
Starting the Scheduler:
bash
airflow scheduler
Airflow调度器的功能:
  • 监控所有DAG及其任务
  • 根据依赖和调度触发任务实例
  • 将任务提交给执行器执行
  • 处理重试和任务状态管理
启动调度器:
bash
airflow scheduler

DAG Development Patterns

DAG开发模式

Basic DAG Structure

基础DAG结构

Every DAG follows this structure:
python
from datetime import datetime
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator

with DAG(
    dag_id="basic_dag",
    start_date=datetime(2022, 1, 1),
    schedule="0 0 * * *",
    catchup=False,
) as dag:

    task1 = BashOperator(
        task_id="task1",
        bash_command="echo 'Task 1 executed'"
    )

    task2 = BashOperator(
        task_id="task2",
        bash_command="echo 'Task 2 executed'"
    )

    task1 >> task2
每个DAG遵循以下结构:
python
from datetime import datetime
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator

with DAG(
    dag_id="basic_dag",
    start_date=datetime(2022, 1, 1),
    schedule="0 0 * * *",
    catchup=False,
) as dag:

    task1 = BashOperator(
        task_id="task1",
        bash_command="echo 'Task 1 executed'"
    )

    task2 = BashOperator(
        task_id="task2",
        bash_command="echo 'Task 2 executed'"
    )

    task1 >> task2

Task Dependencies and Chains

任务依赖与链式调用

Linear Chain:
python
from airflow.sdk import chain
线性链式:
python
from airflow.sdk import chain

These are equivalent:

以下两种写法等效:

task1 >> task2 >> task3 >> task4 chain(task1, task2, task3, task4)

**Dynamic Chain:**
```python
from airflow.sdk import chain
from airflow.operators.empty import EmptyOperator
task1 >> task2 >> task3 >> task4 chain(task1, task2, task3, task4)

**动态链式:**
```python
from airflow.sdk import chain
from airflow.operators.empty import EmptyOperator

Dynamically generate and chain tasks

动态生成并链式调用任务

chain(*[EmptyOperator(task_id=f"task_{i}") for i in range(1, 6)])

**Pairwise Chain:**
```python
from airflow.sdk import chain
chain(*[EmptyOperator(task_id=f"task_{i}") for i in range(1, 6)])

**成对链式:**
```python
from airflow.sdk import chain

Creates paired dependencies:

创建成对依赖:

op1 >> op2 >> op4 >> op6

op1 >> op2 >> op4 >> op6

op1 >> op3 >> op5 >> op6

op1 >> op3 >> op5 >> op6

chain(op1, [op2, op3], [op4, op5], op6)

**Cross Downstream:**
```python
from airflow.sdk import cross_downstream
chain(op1, [op2, op3], [op4, op5], op6)

**交叉下游:**
```python
from airflow.sdk import cross_downstream

Both op1 and op2 feed into both op3 and op4

op1和op2都输出到op3和op4

cross_downstream([op1, op2], [op3, op4])
undefined
cross_downstream([op1, op2], [op3, op4])
undefined

Branching and Conditional Execution

分支与条件执行

BranchPythonOperator:
python
from airflow.operators.python import BranchPythonOperator

def choose_branch(**context):
    if context['data_interval_start'].day == 1:
        return 'monthly_task'
    return 'daily_task'

branch = BranchPythonOperator(
    task_id='branch_task',
    python_callable=choose_branch
)

daily_task = BashOperator(task_id='daily_task', bash_command='echo daily')
monthly_task = BashOperator(task_id='monthly_task', bash_command='echo monthly')

branch >> [daily_task, monthly_task]
Custom Branch Operator:
python
from airflow.operators.branch import BaseBranchOperator

class MyBranchOperator(BaseBranchOperator):
    def choose_branch(self, context):
        """
        Run extra branch on first day of month
        """
        if context['data_interval_start'].day == 1:
            return ['daily_task_id', 'monthly_task_id']
        elif context['data_interval_start'].day == 2:
            return 'daily_task_id'
        else:
            return None  # Skip all downstream tasks
BranchPythonOperator:
python
from airflow.operators.python import BranchPythonOperator

def choose_branch(**context):
    if context['data_interval_start'].day == 1:
        return 'monthly_task'
    return 'daily_task'

branch = BranchPythonOperator(
    task_id='branch_task',
    python_callable=choose_branch
)

daily_task = BashOperator(task_id='daily_task', bash_command='echo daily')
monthly_task = BashOperator(task_id='monthly_task', bash_command='echo monthly')

branch >> [daily_task, monthly_task]
自定义分支操作符:
python
from airflow.operators.branch import BaseBranchOperator

class MyBranchOperator(BaseBranchOperator):
    def choose_branch(self, context):
        """
        在每月第一天执行额外分支
        """
        if context['data_interval_start'].day == 1:
            return ['daily_task_id', 'monthly_task_id']
        elif context['data_interval_start'].day == 2:
            return 'daily_task_id'
        else:
            return None  # 跳过所有下游任务

TaskGroups for Organization

TaskGroups任务分组

TaskGroups help organize related tasks hierarchically:
python
from airflow.sdk import task_group
from airflow.operators.empty import EmptyOperator

@task_group()
def data_processing_group():
    extract = EmptyOperator(task_id="extract")
    transform = EmptyOperator(task_id="transform")
    load = EmptyOperator(task_id="load")

    extract >> transform >> load

@task_group()
def validation_group():
    validate_schema = EmptyOperator(task_id="validate_schema")
    validate_data = EmptyOperator(task_id="validate_data")

    validate_schema >> validate_data

start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")

start >> data_processing_group() >> validation_group() >> end
TaskGroups帮助按层级组织相关任务:
python
from airflow.sdk import task_group
from airflow.operators.empty import EmptyOperator

@task_group()
def data_processing_group():
    extract = EmptyOperator(task_id="extract")
    transform = EmptyOperator(task_id="transform")
    load = EmptyOperator(task_id="load")

    extract >> transform >> load

@task_group()
def validation_group():
    validate_schema = EmptyOperator(task_id="validate_schema")
    validate_data = EmptyOperator(task_id="validate_data")

    validate_schema >> validate_data

start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")

start >> data_processing_group() >> validation_group() >> end

Edge Labeling

边标签

Add labels to dependency edges for clarity:
python
from airflow.sdk import Label
为依赖边添加标签以提升可读性:
python
from airflow.sdk import Label

Inline labeling

内联标签

my_task >> Label("When empty") >> other_task
my_task >> Label("当为空时") >> other_task

Method-based labeling

方法式标签

my_task.set_downstream(other_task, Label("When empty"))
undefined
my_task.set_downstream(other_task, Label("当为空时"))
undefined

LatestOnlyOperator

LatestOnlyOperator

Skip tasks if not the latest DAG run:
python
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator
import pendulum

with DAG(
    dag_id='latest_only_example',
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=True,
    schedule="@daily",
) as dag:
    latest_only = LatestOnlyOperator(task_id='latest_only')
    task1 = EmptyOperator(task_id='task1')
    task2 = EmptyOperator(task_id='task2')
    task3 = EmptyOperator(task_id='task3')
    task4 = EmptyOperator(task_id='task4', trigger_rule='all_done')

    latest_only >> task1 >> task3
    latest_only >> task4
    task2 >> task3
    task2 >> task4
如果不是最新的DAG运行,则跳过任务:
python
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator
import pendulum

with DAG(
    dag_id='latest_only_example',
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=True,
    schedule="@daily",
) as dag:
    latest_only = LatestOnlyOperator(task_id='latest_only')
    task1 = EmptyOperator(task_id='task1')
    task2 = EmptyOperator(task_id='task2')
    task3 = EmptyOperator(task_id='task3')
    task4 = EmptyOperator(task_id='task4', trigger_rule='all_done')

    latest_only >> task1 >> task3
    latest_only >> task4
    task2 >> task3
    task2 >> task4

Operators Deep Dive

操作符深入解析

BashOperator

BashOperator

Execute bash commands:
python
from airflow.providers.standard.operators.bash import BashOperator

bash_task = BashOperator(
    task_id="bash_example",
    bash_command="echo 'Hello from Bash'; date",
    env={'MY_VAR': 'value'},  # Environment variables
    append_env=True,  # Append to existing env vars
    output_encoding='utf-8'
)
Complex Bash Command:
python
bash_complex = BashOperator(
    task_id="complex_bash",
    bash_command="""
        cd /path/to/dir
        python process_data.py --input {{ ds }} --output {{ tomorrow_ds }}
        if [ $? -eq 0 ]; then
            echo "Success"
        else
            echo "Failed" && exit 1
        fi
    """,
)
执行bash命令:
python
from airflow.providers.standard.operators.bash import BashOperator

bash_task = BashOperator(
    task_id="bash_example",
    bash_command="echo 'Hello from Bash'; date",
    env={'MY_VAR': 'value'},  # 环境变量
    append_env=True,  # 追加到现有环境变量
    output_encoding='utf-8'
)
复杂Bash命令:
python
bash_complex = BashOperator(
    task_id="complex_bash",
    bash_command="""
        cd /path/to/dir
        python process_data.py --input {{ ds }} --output {{ tomorrow_ds }}
        if [ $? -eq 0 ]; then
            echo "Success"
        else
            echo "Failed" && exit 1
        fi
    """,
)

PythonOperator

PythonOperator

Execute Python functions:
python
from airflow.providers.standard.operators.python import PythonOperator

def my_python_function(name, **context):
    print(f"Hello {name}!")
    print(f"Execution date: {context['ds']}")
    return "Success"

python_task = PythonOperator(
    task_id="python_example",
    python_callable=my_python_function,
    op_kwargs={'name': 'Airflow'},
    provide_context=True
)
Traditional ETL with PythonOperator:
python
import json
import pendulum
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator

def extract():
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    return json.loads(data_string)

def transform(ti):
    # Pull from XCom
    order_data_dict = ti.xcom_pull(task_ids="extract")
    total_order_value = sum(order_data_dict.values())
    return {"total_order_value": total_order_value}

def load(ti):
    # Pull from XCom
    total = ti.xcom_pull(task_ids="transform")["total_order_value"]
    print(f"Total order value is: {total:.2f}")

with DAG(
    dag_id="legacy_etl_pipeline",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
) as dag:
    extract_task = PythonOperator(task_id="extract", python_callable=extract)
    transform_task = PythonOperator(task_id="transform", python_callable=transform)
    load_task = PythonOperator(task_id="load", python_callable=load)

    extract_task >> transform_task >> load_task
执行Python函数:
python
from airflow.providers.standard.operators.python import PythonOperator

def my_python_function(name, **context):
    print(f"Hello {name}!")
    print(f"执行日期: {context['ds']}")
    return "Success"

python_task = PythonOperator(
    task_id="python_example",
    python_callable=my_python_function,
    op_kwargs={'name': 'Airflow'},
    provide_context=True
)
使用PythonOperator的传统ETL:
python
import json
import pendulum
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator

def extract():
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    return json.loads(data_string)

def transform(ti):
    # 从XCom拉取数据
    order_data_dict = ti.xcom_pull(task_ids="extract")
    total_order_value = sum(order_data_dict.values())
    return {"total_order_value": total_order_value}

def load(ti):
    # 从XCom拉取数据
    total = ti.xcom_pull(task_ids="transform")["total_order_value"]
    print(f"订单总金额为: {total:.2f}")

with DAG(
    dag_id="legacy_etl_pipeline",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
) as dag:
    extract_task = PythonOperator(task_id="extract", python_callable=extract)
    transform_task = PythonOperator(task_id="transform", python_callable=transform)
    load_task = PythonOperator(task_id="load", python_callable=load)

    extract_task >> transform_task >> load_task

EmailOperator

EmailOperator

Send email notifications:
python
from airflow.providers.smtp.operators.smtp import EmailOperator

email_task = EmailOperator(
    task_id='send_email',
    to='recipient@example.com',
    subject='Airflow Notification',
    html_content='<h3>Task completed successfully!</h3>',
    cc=['cc@example.com'],
    bcc=['bcc@example.com']
)
发送邮件通知:
python
from airflow.providers.smtp.operators.smtp import EmailOperator

email_task = EmailOperator(
    task_id='send_email',
    to='recipient@example.com',
    subject='Airflow通知',
    html_content='<h3>任务执行成功!</h3>',
    cc=['cc@example.com'],
    bcc=['bcc@example.com']
)

EmptyOperator

EmptyOperator

Placeholder for workflow structure:
python
from airflow.operators.empty import EmptyOperator

start = EmptyOperator(task_id='start')
end = EmptyOperator(task_id='end')
用于工作流结构的占位符:
python
from airflow.operators.empty import EmptyOperator

start = EmptyOperator(task_id='start')
end = EmptyOperator(task_id='end')

Useful for organizing complex DAGs

用于组织复杂DAG

start >> [task1, task2, task3] >> end
undefined
start >> [task1, task2, task3] >> end
undefined

Custom Operators

自定义操作符

Create reusable custom operators:
python
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

class MyCustomOperator(BaseOperator):
    @apply_defaults
    def __init__(self, my_param, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.my_param = my_param

    def execute(self, context):
        self.log.info(f"Executing with param: {self.my_param}")
        # Custom logic here
        return "Result"
创建可复用的自定义操作符:
python
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

class MyCustomOperator(BaseOperator):
    @apply_defaults
    def __init__(self, my_param, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.my_param = my_param

    def execute(self, context):
        self.log.info(f"使用参数执行: {self.my_param}")
        # 自定义逻辑
        return "Result"

Usage

使用示例

custom_task = MyCustomOperator( task_id="custom", my_param="value" )
undefined
custom_task = MyCustomOperator( task_id="custom", my_param="value" )
undefined

Sensors Deep Dive

传感器深入解析

Sensors are a special type of operator that wait for a certain condition to be met before proceeding.
传感器是一种特殊的操作符,会等待特定条件满足后再继续执行。

ExternalTaskSensor

ExternalTaskSensor

Wait for tasks in other DAGs:
python
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
import pendulum

with DAG(
    dag_id="example_external_task_sensor",
    start_date=pendulum.datetime(2021, 10, 20, tz="UTC"),
    catchup=False,
    schedule=None,
) as dag:

    wait_for_task = ExternalTaskSensor(
        task_id="wait_for_task",
        external_dag_id="upstream_dag",
        external_task_id="upstream_task",
        allowed_states=["success"],
        failed_states=["failed"],
        execution_delta=None,  # Same execution_date
        timeout=600,  # 10 minutes
        poke_interval=60,  # Check every 60 seconds
    )
Deferrable ExternalTaskSensor:
python
undefined
等待其他DAG中的任务完成:
python
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
import pendulum

with DAG(
    dag_id="example_external_task_sensor",
    start_date=pendulum.datetime(2021, 10, 20, tz="UTC"),
    catchup=False,
    schedule=None,
) as dag:

    wait_for_task = ExternalTaskSensor(
        task_id="wait_for_task",
        external_dag_id="upstream_dag",
        external_task_id="upstream_task",
        allowed_states=["success"],
        failed_states=["failed"],
        execution_delta=None,  # 相同的execution_date
        timeout=600,  # 10分钟
        poke_interval=60,  # 每60秒检查一次
    )
可延迟的ExternalTaskSensor:
python
undefined

More efficient - releases worker slot while waiting

更高效 - 等待时释放工作者插槽

wait_for_task_async = ExternalTaskSensor( task_id="wait_for_task_async", external_dag_id="upstream_dag", external_task_id="upstream_task", allowed_states=["success"], failed_states=["failed"], deferrable=True, # Use async mode )
undefined
wait_for_task_async = ExternalTaskSensor( task_id="wait_for_task_async", external_dag_id="upstream_dag", external_task_id="upstream_task", allowed_states=["success"], failed_states=["failed"], deferrable=True, # 使用异步模式 )
undefined

FileSensor

FileSensor

Wait for files to appear:
python
from airflow.sensors.filesystem import FileSensor

wait_for_file = FileSensor(
    task_id="wait_for_file",
    filepath="/path/to/file.csv",
    poke_interval=30,
    timeout=600,
    mode='poke'  # or 'reschedule' for long waits
)
等待文件出现:
python
from airflow.sensors.filesystem import FileSensor

wait_for_file = FileSensor(
    task_id="wait_for_file",
    filepath="/path/to/file.csv",
    poke_interval=30,
    timeout=600,
    mode='poke'  # 或使用'reschedule'用于长时间等待
)

TimeDeltaSensor

TimeDeltaSensor

Wait for a specific time period:
python
from datetime import timedelta
from airflow.sensors.time_delta import TimeDeltaSensor

wait_one_hour = TimeDeltaSensor(
    task_id="wait_one_hour",
    delta=timedelta(hours=1)
)
等待特定时长:
python
from datetime import timedelta
from airflow.sensors.time_delta import TimeDeltaSensor

wait_one_hour = TimeDeltaSensor(
    task_id="wait_one_hour",
    delta=timedelta(hours=1)
)

BigQuery Table Sensor

BigQuery Table Sensor

Wait for BigQuery table to exist:
python
from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceSensor
import pendulum

with DAG(
    dag_id="bigquery_sensor_example",
    start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
) as dag:

    wait_for_table = BigQueryTableExistenceSensor(
        task_id="wait_for_table",
        project_id="your-project-id",
        dataset_id="your_dataset",
        table_id="your_table",
        bigquery_conn_id="google_cloud_default",
        location="US",
        poke_interval=60,
        timeout=3600,
    )
等待BigQuery表存在:
python
from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceSensor
import pendulum

with DAG(
    dag_id="bigquery_sensor_example",
    start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
) as dag:

    wait_for_table = BigQueryTableExistenceSensor(
        task_id="wait_for_table",
        project_id="your-project-id",
        dataset_id="your_dataset",
        table_id="your_table",
        bigquery_conn_id="google_cloud_default",
        location="US",
        poke_interval=60,
        timeout=3600,
    )

Custom Sensors

自定义传感器

Create custom sensors for specific conditions:
python
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

class MyCustomSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, my_condition, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.my_condition = my_condition

    def poke(self, context):
        # Return True when condition is met
        self.log.info(f"Checking condition: {self.my_condition}")
        # Custom logic to check condition
        return check_condition(self.my_condition)
创建针对特定条件的自定义传感器:
python
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

class MyCustomSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, my_condition, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.my_condition = my_condition

    def poke(self, context):
        # 条件满足时返回True
        self.log.info(f"检查条件: {self.my_condition}")
        # 检查条件的自定义逻辑
        return check_condition(self.my_condition)

Deferrable Sensors

可延迟传感器

Deferrable sensors release worker slots while waiting:
python
from datetime import timedelta
from airflow.sdk import BaseSensorOperator, StartTriggerArgs

class WaitHoursSensor(BaseSensorOperator):
    start_trigger_args = StartTriggerArgs(
        trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
        trigger_kwargs={"moment": timedelta(hours=1)},
        next_method="execute_complete",
        next_kwargs=None,
        timeout=None,
    )
    start_from_trigger = True

    def __init__(self, *args, trigger_kwargs=None, start_from_trigger=True, **kwargs):
        super().__init__(*args, **kwargs)
        if trigger_kwargs:
            self.start_trigger_args.trigger_kwargs = trigger_kwargs
        self.start_from_trigger = start_from_trigger

    def execute_complete(self, context, event=None):
        return  # Task complete
可延迟传感器在等待时释放工作者插槽:
python
from datetime import timedelta
from airflow.sdk import BaseSensorOperator, StartTriggerArgs

class WaitHoursSensor(BaseSensorOperator):
    start_trigger_args = StartTriggerArgs(
        trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
        trigger_kwargs={"moment": timedelta(hours=1)},
        next_method="execute_complete",
        next_kwargs=None,
        timeout=None,
    )
    start_from_trigger = True

    def __init__(self, *args, trigger_kwargs=None, start_from_trigger=True, **kwargs):
        super().__init__(*args, **kwargs)
        if trigger_kwargs:
            self.start_trigger_args.trigger_kwargs = trigger_kwargs
        self.start_from_trigger = start_from_trigger

    def execute_complete(self, context, event=None):
        return  # 任务完成

XComs (Cross-Communication)

XCom(跨任务通信)

XComs enable task-to-task communication by storing and retrieving data.
XCom通过存储和检索数据实现任务间通信。

Basic XCom Usage

XCom基础用法

Pushing to XCom:
python
def push_function(**context):
    value = "Important data"
    context['ti'].xcom_push(key='my_key', value=value)
    # Or simply return (uses 'return_value' key)
    return value

push_task = PythonOperator(
    task_id='push',
    python_callable=push_function,
    provide_context=True
)
Pulling from XCom:
python
def pull_function(**context):
    # Pull by task_id (uses 'return_value' key)
    value = context['ti'].xcom_pull(task_ids='push')

    # Pull with specific key
    value = context['ti'].xcom_pull(task_ids='push', key='my_key')

    print(f"Pulled value: {value}")

pull_task = PythonOperator(
    task_id='pull',
    python_callable=pull_function,
    provide_context=True
)
推送数据到XCom:
python
def push_function(**context):
    value = "重要数据"
    context['ti'].xcom_push(key='my_key', value=value)
    # 或者直接返回(使用'return_value'作为键)
    return value

push_task = PythonOperator(
    task_id='push',
    python_callable=push_function,
    provide_context=True
)
从XCom拉取数据:
python
def pull_function(**context):
    # 按task_id拉取(使用'return_value'键)
    value = context['ti'].xcom_pull(task_ids='push')

    # 使用特定键拉取
    value = context['ti'].xcom_pull(task_ids='push', key='my_key')

    print(f"拉取到的值: {value}")

pull_task = PythonOperator(
    task_id='pull',
    python_callable=pull_function,
    provide_context=True
)

XCom with TaskFlow API

结合TaskFlow API使用XCom

TaskFlow API automatically manages XComs:
python
from airflow.decorators import task

@task
def extract():
    return {"data": [1, 2, 3, 4, 5]}

@task
def transform(data_dict):
    # Automatically receives XCom from extract
    total = sum(data_dict['data'])
    return {"total": total}

@task
def load(summary):
    print(f"Total: {summary['total']}")
TaskFlow API自动管理XCom:
python
from airflow.decorators import task

@task
def extract():
    return {"data": [1, 2, 3, 4, 5]}

@task
def transform(data_dict):
    # 自动接收来自extract的XCom数据
    total = sum(data_dict['data'])
    return {"total": total}

@task
def load(summary):
    print(f"总和: {summary['total']}")

Automatic XCom handling

自动XCom处理

data = extract() summary = transform(data) load(summary)
undefined
data = extract() summary = transform(data) load(summary)
undefined

XCom Best Practices

XCom最佳实践

Size Limitations:
  • XComs are stored in the metadata database
  • Keep XCom data small (< 1MB recommended)
  • For large data, store in external systems and pass references
Example with External Storage:
python
@task
def process_large_data():
    # Process data
    large_result = compute_large_dataset()

    # Store in S3/GCS
    file_path = save_to_s3(large_result, "s3://bucket/result.parquet")

    # Return only the path
    return {"result_path": file_path}

@task
def consume_large_data(metadata):
    # Load from S3/GCS
    data = load_from_s3(metadata['result_path'])
    process(data)
大小限制:
  • XCom存储在元数据数据库中
  • 保持XCom数据较小(建议小于1MB)
  • 对于大数据,存储在外部系统并传递引用
外部存储示例:
python
@task
def process_large_data():
    # 处理数据
    large_result = compute_large_dataset()

    # 存储到S3/GCS
    file_path = save_to_s3(large_result, "s3://bucket/result.parquet")

    # 仅返回路径
    return {"result_path": file_path}

@task
def consume_large_data(metadata):
    # 从S3/GCS加载数据
    data = load_from_s3(metadata['result_path'])
    process(data)

XCom with Operators

结合操作符使用XCom

Reading XCom in Templates:
python
from airflow.providers.standard.operators.bash import BashOperator

process_file = BashOperator(
    task_id="process",
    bash_command="python process.py {{ ti.xcom_pull(task_ids='extract') }}",
)
XCom with EmailOperator:
python
from airflow.sdk import task
from airflow.providers.smtp.operators.smtp import EmailOperator

@task
def get_ip():
    return "192.168.1.1"

@task(multiple_outputs=True)
def compose_email(external_ip):
    return {
        'subject': f'Server connected from {external_ip}',
        'body': f'Your server is connected from {external_ip}<br>'
    }

email_info = compose_email(get_ip())

EmailOperator(
    task_id='send_email',
    to='example@example.com',
    subject=email_info['subject'],
    html_content=email_info['body']
)
在模板中读取XCom:
python
from airflow.providers.standard.operators.bash import BashOperator

process_file = BashOperator(
    task_id="process",
    bash_command="python process.py {{ ti.xcom_pull(task_ids='extract') }}",
)
结合EmailOperator使用XCom:
python
from airflow.sdk import task
from airflow.providers.smtp.operators.smtp import EmailOperator

@task
def get_ip():
    return "192.168.1.1"

@task(multiple_outputs=True)
def compose_email(external_ip):
    return {
        'subject': f'服务器从{external_ip}连接',
        'body': f'您的服务器从{external_ip}连接<br>'
    }

email_info = compose_email(get_ip())

EmailOperator(
    task_id='send_email',
    to='example@example.com',
    subject=email_info['subject'],
    html_content=email_info['body']
)

Dynamic Workflows

动态工作流

Create tasks dynamically based on runtime conditions or external data.
基于运行时条件或外部数据动态创建任务。

Dynamic Task Generation with Loops

使用循环动态生成任务

python
from airflow.sdk import DAG
from airflow.operators.empty import EmptyOperator

with DAG("dynamic_loop_example", ...) as dag:
    start = EmptyOperator(task_id="start")
    end = EmptyOperator(task_id="end")

    # Dynamically create tasks
    options = ["branch_a", "branch_b", "branch_c", "branch_d"]
    for option in options:
        task = EmptyOperator(task_id=option)
        start >> task >> end
python
from airflow.sdk import DAG
from airflow.operators.empty import EmptyOperator

with DAG("dynamic_loop_example", ...) as dag:
    start = EmptyOperator(task_id="start")
    end = EmptyOperator(task_id="end")

    # 动态创建任务
    options = ["branch_a", "branch_b", "branch_c", "branch_d"]
    for option in options:
        task = EmptyOperator(task_id=option)
        start >> task >> end

Dynamic Task Mapping

动态任务映射

Map over task outputs to create dynamic parallel tasks:
python
from airflow.decorators import task

@task
def extract():
    # Returns list of items to process
    return [1, 2, 3, 4, 5]

@task
def transform(item):
    # Processes single item
    return item * 2

@task
def load(items):
    # Receives all transformed items
    print(f"Loaded {len(items)} items: {items}")
映射任务输出以创建动态并行任务:
python
from airflow.decorators import task

@task
def extract():
    # 返回待处理的项目列表
    return [1, 2, 3, 4, 5]

@task
def transform(item):
    # 处理单个项目
    return item * 2

@task
def load(items):
    # 接收所有转换后的项目
    print(f"加载了{len(items)}个项目: {items}")

Dynamic mapping

动态映射

data = extract() transformed = transform.expand(item=data) # Creates 5 parallel tasks load(transformed)

**Mapping with Classic Operators:**
```python
from airflow.operators.bash import BashOperator

class ExtractOperator(BaseOperator):
    def execute(self, context):
        return ["file1.csv", "file2.csv", "file3.csv"]

class TransformOperator(BaseOperator):
    def __init__(self, input, **kwargs):
        super().__init__(**kwargs)
        self.input = input

    def execute(self, context):
        # Process single file
        return f"processed_{self.input}"

extract = ExtractOperator(task_id="extract")
transform = TransformOperator.partial(task_id="transform").expand(input=extract.output)
data = extract() transformed = transform.expand(item=data) # 创建5个并行任务 load(transformed)

**结合传统操作符使用映射:**
```python
from airflow.operators.bash import BashOperator

class ExtractOperator(BaseOperator):
    def execute(self, context):
        return ["file1.csv", "file2.csv", "file3.csv"]

class TransformOperator(BaseOperator):
    def __init__(self, input, **kwargs):
        super().__init__(**kwargs)
        self.input = input

    def execute(self, context):
        # 处理单个文件
        return f"processed_{self.input}"

extract = ExtractOperator(task_id="extract")
transform = TransformOperator.partial(task_id="transform").expand(input=extract.output)

Task Group Mapping

任务组映射

Map over entire task groups:
python
from airflow.decorators import task, task_group

@task
def add_one(value):
    return value + 1

@task
def double(value):
    return value * 2

@task_group
def process_group(value):
    incremented = add_one(value)
    return double(incremented)

@task
def aggregate(results):
    print(f"Results: {results}")
对整个任务组进行映射:
python
from airflow.decorators import task, task_group

@task
def add_one(value):
    return value + 1

@task
def double(value):
    return value * 2

@task_group
def process_group(value):
    incremented = add_one(value)
    return double(incremented)

@task
def aggregate(results):
    print(f"结果: {results}")

Map task group over values

对任务组进行映射

results = process_group.expand(value=[1, 2, 3, 4, 5]) aggregate(results)
undefined
results = process_group.expand(value=[1, 2, 3, 4, 5]) aggregate(results)
undefined

Partial Parameters with Mapping

映射结合部分参数

Mix static and dynamic parameters:
python
@task
def process(base_path, filename):
    full_path = f"{base_path}/{filename}"
    return f"Processed {full_path}"
混合静态和动态参数:
python
@task
def process(base_path, filename):
    full_path = f"{base_path}/{filename}"
    return f"已处理 {full_path}"

Static parameter 'base_path', dynamic 'filename'

静态参数'base_path',动态参数'filename'

results = process.partial(base_path="/data").expand( filename=["file1.csv", "file2.csv", "file3.csv"] )
undefined
results = process.partial(base_path="/data").expand( filename=["file1.csv", "file2.csv", "file3.csv"] )
undefined

TaskFlow API

TaskFlow API

The modern way to write Airflow DAGs with automatic XCom handling and cleaner syntax.
这是编写Airflow DAG的现代方式,支持自动XCom处理和更简洁的语法。

Basic TaskFlow Example

TaskFlow基础示例

python
from airflow.decorators import dag, task
import pendulum

@dag(
    dag_id="taskflow_example",
    start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
    schedule=None,
    catchup=False,
)
def my_taskflow_dag():

    @task
    def extract():
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
        import json
        return json.loads(data_string)

    @task
    def transform(order_data_dict):
        total = sum(order_data_dict.values())
        return {"total_order_value": total}

    @task
    def load(summary):
        print(f"Total order value: {summary['total_order_value']:.2f}")

    # Function calls create task dependencies automatically
    order_data = extract()
    summary = transform(order_data)
    load(summary)
python
from airflow.decorators import dag, task
import pendulum

@dag(
    dag_id="taskflow_example",
    start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
    schedule=None,
    catchup=False,
)
def my_taskflow_dag():

    @task
    def extract():
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
        import json
        return json.loads(data_string)

    @task
    def transform(order_data_dict):
        total = sum(order_data_dict.values())
        return {"total_order_value": total}

    @task
    def load(summary):
        print(f"订单总金额: {summary['total_order_value']:.2f}")

    # 函数调用自动创建任务依赖
    order_data = extract()
    summary = transform(order_data)
    load(summary)

Instantiate the DAG

实例化DAG

my_taskflow_dag()
undefined
my_taskflow_dag()
undefined

Multiple Outputs

多输出

Return multiple values from tasks:
python
@task(multiple_outputs=True)
def extract_data():
    return {
        'orders': [1, 2, 3],
        'customers': ['A', 'B', 'C'],
        'revenue': 1000.50
    }

@task
def process_orders(orders):
    print(f"Processing {len(orders)} orders")

@task
def process_customers(customers):
    print(f"Processing {len(customers)} customers")
从任务返回多个值:
python
@task(multiple_outputs=True)
def extract_data():
    return {
        'orders': [1, 2, 3],
        'customers': ['A', 'B', 'C'],
        'revenue': 1000.50
    }

@task
def process_orders(orders):
    print(f"处理{len(orders)}个订单")

@task
def process_customers(customers):
    print(f"处理{len(customers)}个客户")

Access individual outputs

访问单独的输出

data = extract_data() process_orders(data['orders']) process_customers(data['customers'])
undefined
data = extract_data() process_orders(data['orders']) process_customers(data['customers'])
undefined

Mixing TaskFlow with Traditional Operators

混合TaskFlow与传统操作符

python
from airflow.decorators import dag, task
from airflow.providers.standard.operators.bash import BashOperator
import pendulum

@dag(
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    schedule=None,
)
def mixed_dag():

    @task
    def get_date():
        from datetime import datetime
        return datetime.now().strftime("%Y%m%d")

    # Traditional operator
    bash_task = BashOperator(
        task_id="print_date",
        bash_command="echo Processing data for {{ ti.xcom_pull(task_ids='get_date') }}"
    )

    @task
    def process_results():
        print("Processing complete")

    # Mix task types
    date = get_date()
    date >> bash_task >> process_results()

mixed_dag()
python
from airflow.decorators import dag, task
from airflow.providers.standard.operators.bash import BashOperator
import pendulum

@dag(
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    schedule=None,
)
def mixed_dag():

    @task
    def get_date():
        from datetime import datetime
        return datetime.now().strftime("%Y%m%d")

    # 传统操作符
    bash_task = BashOperator(
        task_id="print_date",
        bash_command="echo 处理{{ ti.xcom_pull(task_ids='get_date') }}的数据"
    )

    @task
    def process_results():
        print("处理完成")

    # 混合任务类型
    date = get_date()
    date >> bash_task >> process_results()

mixed_dag()

Virtual Environment for Tasks

任务的虚拟环境

Isolate task dependencies:
python
from airflow.decorators import dag, task
import pendulum

@dag(
    dag_id="virtualenv_example",
    start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
)
def virtualenv_dag():

    @task.virtualenv(
        requirements=["pandas==1.5.0", "numpy==1.23.0"],
        system_site_packages=False
    )
    def analyze_data():
        import pandas as pd
        import numpy as np

        df = pd.DataFrame({'col1': [1, 2, 3], 'col2': [4, 5, 6]})
        result = np.mean(df['col1'])
        return float(result)

    @task
    def report_results(mean_value):
        print(f"Mean value: {mean_value}")

    result = analyze_data()
    report_results(result)

virtualenv_dag()
隔离任务依赖:
python
from airflow.decorators import dag, task
import pendulum

@dag(
    dag_id="virtualenv_example",
    start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
)
def virtualenv_dag():

    @task.virtualenv(
        requirements=["pandas==1.5.0", "numpy==1.23.0"],
        system_site_packages=False
    )
    def analyze_data():
        import pandas as pd
        import numpy as np

        df = pd.DataFrame({'col1': [1, 2, 3], 'col2': [4, 5, 6]})
        result = np.mean(df['col1'])
        return float(result)

    @task
    def report_results(mean_value):
        print(f"平均值: {mean_value}")

    result = analyze_data()
    report_results(result)

virtualenv_dag()

Asset-Based Scheduling

基于资产的调度

Schedule DAGs based on data assets (formerly datasets) rather than time.
基于数据资产(原数据集)而非时间来调度DAG。

Producer-Consumer Pattern

生产者-消费者模式

python
from airflow.sdk import DAG, Asset
from airflow.operators.bash import BashOperator
from datetime import datetime
python
from airflow.sdk import DAG, Asset
from airflow.operators.bash import BashOperator
from datetime import datetime

Define asset

定义资产

customer_data = Asset("s3://my-bucket/customers.parquet")
customer_data = Asset("s3://my-bucket/customers.parquet")

Producer DAG

生产者DAG

with DAG( dag_id="producer_dag", start_date=datetime(2023, 1, 1), schedule="@daily", ) as producer:
BashOperator(
    task_id="generate_data",
    bash_command="python generate_customers.py",
    outlets=[customer_data]  # Marks asset as updated
)
with DAG( dag_id="producer_dag", start_date=datetime(2023, 1, 1), schedule="@daily", ) as producer:
BashOperator(
    task_id="generate_data",
    bash_command="python generate_customers.py",
    outlets=[customer_data]  # 标记资产已更新
)

Consumer DAG - triggered when asset updates

消费者DAG - 资产更新时触发

with DAG( dag_id="consumer_dag", schedule=[customer_data], # Triggered by asset start_date=datetime(2023, 1, 1), catchup=False, ) as consumer:
BashOperator(
    task_id="process_data",
    bash_command="python process_customers.py"
)
undefined
with DAG( dag_id="consumer_dag", schedule=[customer_data], # 由资产触发 start_date=datetime(2023, 1, 1), catchup=False, ) as consumer:
BashOperator(
    task_id="process_data",
    bash_command="python process_customers.py"
)
undefined

Multiple Asset Dependencies

多资产依赖

AND Logic (all assets must update):
python
from airflow.datasets import Dataset

asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")

with DAG(
    dag_id="wait_for_both",
    schedule=[asset_1 & asset_2],  # Both must update
    start_date=datetime(2023, 1, 1),
):
    pass
OR Logic (any asset update triggers):
python
asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")

with DAG(
    dag_id="triggered_by_either",
    schedule=[asset_1 | asset_2],  # Either can trigger
    start_date=datetime(2023, 1, 1),
):
    pass
Complex Logic:
python
asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")
asset_3 = Dataset("s3://bucket/file3.csv")

with DAG(
    dag_id="complex_condition",
    schedule=(asset_1 | (asset_2 & asset_3)),  # asset_1 OR (asset_2 AND asset_3)
    start_date=datetime(2023, 1, 1),
):
    pass
AND逻辑(所有资产必须更新):
python
from airflow.datasets import Dataset

asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")

with DAG(
    dag_id="wait_for_both",
    schedule=[asset_1 & asset_2],  # 两者都必须更新
    start_date=datetime(2023, 1, 1),
):
    pass
OR逻辑(任意资产更新即可触发):
python
asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")

with DAG(
    dag_id="triggered_by_either",
    schedule=[asset_1 | asset_2],  # 任意一个都可触发
    start_date=datetime(2023, 1, 1),
):
    pass
复杂逻辑:
python
asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")
asset_3 = Dataset("s3://bucket/file3.csv")

with DAG(
    dag_id="complex_condition",
    schedule=(asset_1 | (asset_2 & asset_3)),  # asset_1 或者 (asset_2 且 asset_3)
    start_date=datetime(2023, 1, 1),
):
    pass

Asset Aliases

资产别名

Use aliases for flexible asset references:
python
from airflow.datasets import Dataset, AssetAlias
from airflow.decorators import task
使用别名实现灵活的资产引用:
python
from airflow.datasets import Dataset, AssetAlias
from airflow.decorators import task

Producer with alias

带别名的生产者

with DAG(dag_id="alias_producer", start_date=datetime(2023, 1, 1)): @task(outlets=[AssetAlias("my-alias")]) def produce_data(*, outlet_events): # Dynamically add actual asset outlet_events[AssetAlias("my-alias")].add( Dataset("s3://bucket/my-file.csv") )
with DAG(dag_id="alias_producer", start_date=datetime(2023, 1, 1)): @task(outlets=[AssetAlias("my-alias")]) def produce_data(*, outlet_events): # 动态添加实际资产 outlet_events[AssetAlias("my-alias")].add( Dataset("s3://bucket/my-file.csv") )

Consumer depending on alias

依赖别名的消费者

with DAG( dag_id="alias_consumer", schedule=AssetAlias("my-alias"), start_date=datetime(2023, 1, 1), ): pass
undefined
with DAG( dag_id="alias_consumer", schedule=AssetAlias("my-alias"), start_date=datetime(2023, 1, 1), ): pass
undefined

Accessing Asset Event Information

访问资产事件信息

python
@task
def process_asset_data(*, triggering_asset_events):
    for event in triggering_asset_events:
        print(f"Asset: {event.asset.uri}")
        print(f"Timestamp: {event.timestamp}")
        print(f"Extra: {event.extra}")
python
@task
def process_asset_data(*, triggering_asset_events):
    for event in triggering_asset_events:
        print(f"资产: {event.asset.uri}")
        print(f"时间戳: {event.timestamp}")
        print(f"额外信息: {event.extra}")

Scheduling Patterns

调度模式

Cron Expressions

Cron表达式

python
undefined
python
undefined

Every day at midnight

每日午夜

schedule="0 0 * * *"
schedule="0 0 * * *"

Every Monday at 9 AM

每周一上午9点

schedule="0 9 * * 1"
schedule="0 9 * * 1"

Every 15 minutes

每15分钟

schedule="*/15 * * * *"
schedule="*/15 * * * *"

First day of month at noon

每月1日中午

schedule="0 12 1 * *"
schedule="0 12 1 * *"

Weekdays at 6 PM

工作日下午6点

schedule="0 18 * * 1-5"
undefined
schedule="0 18 * * 1-5"
undefined

Timedelta Scheduling

Timedelta调度

python
from datetime import timedelta

with DAG(
    dag_id="timedelta_schedule",
    start_date=datetime(2023, 1, 1),
    schedule=timedelta(hours=6),  # Every 6 hours
):
    pass
python
from datetime import timedelta

with DAG(
    dag_id="timedelta_schedule",
    start_date=datetime(2023, 1, 1),
    schedule=timedelta(hours=6),  # 每6小时
):
    pass

Preset Schedules

预设调度

python
undefined
python
undefined

Common presets

常见预设

schedule="@once" # Run once schedule="@hourly" # Every hour schedule="@daily" # Daily at midnight schedule="@weekly" # Every Sunday at midnight schedule="@monthly" # First day of month at midnight schedule="@yearly" # January 1st at midnight schedule=None # Manual trigger only
undefined
schedule="@once" # 仅运行一次 schedule="@hourly" # 每小时运行一次 schedule="@daily" # 每日午夜运行 schedule="@weekly" # 每周日午夜运行 schedule="@monthly" # 每月1日午夜运行 schedule="@yearly" # 每年1月1日午夜运行 schedule=None # 仅手动触发
undefined

Catchup and Backfilling

补跑与回填

Catchup:
python
with DAG(
    dag_id="catchup_example",
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
    catchup=True,  # Run all missed intervals
):
    pass

with DAG(
    dag_id="no_catchup",
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
    catchup=False,  # Only run latest interval
):
    pass
Manual Backfilling:
bash
undefined
补跑:
python
with DAG(
    dag_id="catchup_example",
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
    catchup=True,  # 运行所有错过的时间间隔
):
    pass

with DAG(
    dag_id="no_catchup",
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
    catchup=False,  # 仅运行最新的时间间隔
):
    pass
手动回填:
bash
undefined

Backfill specific date range

回填特定日期范围

airflow dags backfill
--start-date 2023-01-01
--end-date 2023-01-31
my_dag_id
airflow dags backfill
--start-date 2023-01-01
--end-date 2023-01-31
my_dag_id

Backfill with marking success (no execution)

标记成功的回填(不实际执行)

airflow dags backfill
--start-date 2023-01-01
--end-date 2023-01-31
--mark-success
my_dag_id
undefined
airflow dags backfill
--start-date 2023-01-01
--end-date 2023-01-31
--mark-success
my_dag_id
undefined

Production Patterns

生产环境模式

Error Handling and Retries

错误处理与重试

Task-Level Retries:
python
from airflow.operators.bash import BashOperator
from datetime import timedelta

task_with_retry = BashOperator(
    task_id="retry_task",
    bash_command="python might_fail.py",
    retries=3,
    retry_delay=timedelta(minutes=5),
    retry_exponential_backoff=True,
    max_retry_delay=timedelta(minutes=30),
)
DAG-Level Default Args:
python
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email': ['alerts@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id="production_dag",
    default_args=default_args,
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
):
    pass
任务级重试:
python
from airflow.operators.bash import BashOperator
from datetime import timedelta

task_with_retry = BashOperator(
    task_id="retry_task",
    bash_command="python might_fail.py",
    retries=3,
    retry_delay=timedelta(minutes=5),
    retry_exponential_backoff=True,
    max_retry_delay=timedelta(minutes=30),
)
DAG级默认参数:
python
from datetime import datetime, timedelta

default_args = {
    'owner': '数据团队',
    'depends_on_past': False,
    'email': ['alerts@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id="production_dag",
    default_args=default_args,
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
):
    pass

Task Concurrency Control

任务并发控制

Per-Task Concurrency:
python
from airflow.operators.bash import BashOperator
from datetime import timedelta
单任务并发限制:
python
from airflow.operators.bash import BashOperator
from datetime import timedelta

Limit concurrent instances of this task

限制此任务的并发实例数

limited_task = BashOperator( task_id="limited_task", bash_command="echo 'Processing'", max_active_tis_per_dag=3 # Max 3 instances running )

**DAG-Level Concurrency:**
```python
with DAG(
    dag_id="concurrent_dag",
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
    max_active_runs=5,  # Max 5 DAG runs simultaneously
    concurrency=10,     # Max 10 task instances across all runs
):
    pass
limited_task = BashOperator( task_id="limited_task", bash_command="echo '处理中'", max_active_tis_per_dag=3 # 最多3个实例同时运行 )

**DAG级并发限制:**
```python
with DAG(
    dag_id="concurrent_dag",
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
    max_active_runs=5,  # 最多5个DAG运行实例同时运行
    concurrency=10,     # 所有运行实例中最多10个任务实例
):
    pass

Idempotency

幂等性

Make tasks idempotent for safe retries:
python
@task
def idempotent_load(**context):
    execution_date = context['ds']

    # Delete existing data for this date first
    delete_query = f"""
        DELETE FROM target_table
        WHERE date = '{execution_date}'
    """
    execute_sql(delete_query)

    # Insert new data
    insert_query = f"""
        INSERT INTO target_table
        SELECT * FROM source
        WHERE date = '{execution_date}'
    """
    execute_sql(insert_query)
让任务具备幂等性以支持安全重试:
python
@task
def idempotent_load(**context):
    execution_date = context['ds']

    # 先删除该日期的现有数据
    delete_query = f"""
        DELETE FROM target_table
        WHERE date = '{execution_date}'
    """
    execute_sql(delete_query)

    # 插入新数据
    insert_query = f"""
        INSERT INTO target_table
        SELECT * FROM source
        WHERE date = '{execution_date}'
    """
    execute_sql(insert_query)

SLAs and Alerts

SLA与告警

python
from datetime import timedelta

def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    print(f"SLA missed for {task_list}")
    # Send alert to monitoring system

with DAG(
    dag_id="sla_dag",
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
    default_args={
        'sla': timedelta(hours=2),  # Task should complete in 2 hours
    },
    sla_miss_callback=sla_miss_callback,
):
    pass
python
from datetime import timedelta

def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    print(f"以下任务未满足SLA: {task_list}")
    # 发送告警到监控系统

with DAG(
    dag_id="sla_dag",
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
    default_args={
        'sla': timedelta(hours=2),  # 任务应在2小时内完成
    },
    sla_miss_callback=sla_miss_callback,
):
    pass

Task Callbacks

任务回调

python
def on_failure_callback(context):
    print(f"Task {context['task_instance'].task_id} failed")
    # Send to Slack, PagerDuty, etc.

def on_success_callback(context):
    print(f"Task {context['task_instance'].task_id} succeeded")

def on_retry_callback(context):
    print(f"Task {context['task_instance'].task_id} retrying")

task_with_callbacks = BashOperator(
    task_id="monitored_task",
    bash_command="python my_script.py",
    on_failure_callback=on_failure_callback,
    on_success_callback=on_success_callback,
    on_retry_callback=on_retry_callback,
)
python
def on_failure_callback(context):
    print(f"任务{context['task_instance'].task_id}执行失败")
    # 发送到Slack、PagerDuty等

def on_success_callback(context):
    print(f"任务{context['task_instance'].task_id}执行成功")

def on_retry_callback(context):
    print(f"任务{context['task_instance'].task_id}正在重试")

task_with_callbacks = BashOperator(
    task_id="monitored_task",
    bash_command="python my_script.py",
    on_failure_callback=on_failure_callback,
    on_success_callback=on_success_callback,
    on_retry_callback=on_retry_callback,
)

Docker Deployment

Docker部署

Docker Compose for Local Development:
yaml
version: '3'
services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow

  webserver:
    image: apache/airflow:2.7.0
    depends_on:
      - postgres
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    ports:
      - "8080:8080"
    command: webserver

  scheduler:
    image: apache/airflow:2.7.0
    depends_on:
      - postgres
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    command: scheduler
用于本地开发的Docker Compose:
yaml
version: '3'
services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow

  webserver:
    image: apache/airflow:2.7.0
    depends_on:
      - postgres
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    ports:
      - "8080:8080"
    command: webserver

  scheduler:
    image: apache/airflow:2.7.0
    depends_on:
      - postgres
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    command: scheduler

Kubernetes Executor

Kubernetes执行器

KubernetesExecutor Configuration:
python
undefined
KubernetesExecutor配置:
python
undefined

In airflow.cfg

在airflow.cfg中

[kubernetes] namespace = airflow worker_container_repository = my-registry/airflow worker_container_tag = 2.7.0 delete_worker_pods = True delete_worker_pods_on_failure = False
[core] executor = KubernetesExecutor

**Pod Override for Specific Task:**
```python
from kubernetes.client import models as k8s

task_with_gpu = BashOperator(
    task_id="gpu_task",
    bash_command="python train_model.py",
    executor_config={
        "pod_override": k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name="base",
                        resources=k8s.V1ResourceRequirements(
                            limits={"nvidia.com/gpu": "1"}
                        )
                    )
                ]
            )
        )
    }
)
[kubernetes] namespace = airflow worker_container_repository = my-registry/airflow worker_container_tag = 2.7.0 delete_worker_pods = True delete_worker_pods_on_failure = False
[core] executor = KubernetesExecutor

**特定任务的Pod覆盖配置:**
```python
from kubernetes.client import models as k8s

task_with_gpu = BashOperator(
    task_id="gpu_task",
    bash_command="python train_model.py",
    executor_config={
        "pod_override": k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name="base",
                        resources=k8s.V1ResourceRequirements(
                            limits={"nvidia.com/gpu": "1"}
                        )
                    )
                ]
            )
        )
    }
)

Monitoring and Logging

监控与日志

Structured Logging:
python
from airflow.decorators import task
import logging

@task
def monitored_task():
    logger = logging.getLogger(__name__)

    logger.info("Starting data processing", extra={
        'process_id': 'abc123',
        'record_count': 1000
    })

    try:
        process_data()
        logger.info("Processing complete")
    except Exception as e:
        logger.error(f"Processing failed: {str(e)}", extra={
            'error_type': type(e).__name__
        })
        raise
StatsD Metrics:
python
from airflow.stats import Stats

@task
def task_with_metrics():
    Stats.incr('my_dag.task_started')

    start_time = time.time()
    process_data()
    duration = time.time() - start_time

    Stats.timing('my_dag.task_duration', duration)
    Stats.incr('my_dag.task_completed')
结构化日志:
python
from airflow.decorators import task
import logging

@task
def monitored_task():
    logger = logging.getLogger(__name__)

    logger.info("开始数据处理", extra={
        'process_id': 'abc123',
        'record_count': 1000
    })

    try:
        process_data()
        logger.info("处理完成")
    except Exception as e:
        logger.error(f"处理失败: {str(e)}", extra={
            'error_type': type(e).__name__
        })
        raise
StatsD指标:
python
from airflow.stats import Stats

@task
def task_with_metrics():
    Stats.incr('my_dag.task_started')

    start_time = time.time()
    process_data()
    duration = time.time() - start_time

    Stats.timing('my_dag.task_duration', duration)
    Stats.incr('my_dag.task_completed')

Best Practices

最佳实践

DAG Design

DAG设计

  1. Keep DAGs Simple: Break complex workflows into multiple DAGs
  2. Use Descriptive Names: dag_id and task_id should be self-explanatory
  3. Idempotent Tasks: Tasks should produce same result when re-run
  4. Small XComs: Keep XCom data under 1MB
  5. External Storage: Use S3/GCS for large data, pass references
  6. Proper Dependencies: Model true dependencies, avoid unnecessary ones
  7. Error Handling: Use retries, callbacks, and proper error logging
  8. Resource Management: Set appropriate task concurrency limits
  1. 保持DAG简洁:将复杂工作流拆分为多个DAG
  2. 使用描述性名称:dag_id和task_id应具备自解释性
  3. 幂等任务:任务重新运行时应产生相同结果
  4. 小型XCom:保持XCom数据小于1MB
  5. 外部存储:使用S3/GCS存储大数据,传递引用
  6. 合理的依赖:建模真实依赖,避免不必要的依赖
  7. 错误处理:使用重试、回调和适当的错误日志
  8. 资源管理:设置合适的任务并发限制

Code Organization

代码组织

dags/
├── common/
│   ├── __init__.py
│   ├── operators.py      # Custom operators
│   ├── sensors.py        # Custom sensors
│   └── utils.py          # Utility functions
├── etl/
│   ├── customer_pipeline.py
│   ├── order_pipeline.py
│   └── product_pipeline.py
├── ml/
│   ├── training_dag.py
│   └── inference_dag.py
└── maintenance/
    ├── cleanup_dag.py
    └── backup_dag.py
dags/
├── common/
│   ├── __init__.py
│   ├── operators.py      # 自定义操作符
│   ├── sensors.py        # 自定义传感器
│   └── utils.py          # 工具函数
├── etl/
│   ├── customer_pipeline.py
│   ├── order_pipeline.py
│   └── product_pipeline.py
├── ml/
│   ├── training_dag.py
│   └── inference_dag.py
└── maintenance/
    ├── cleanup_dag.py
    └── backup_dag.py

Testing DAGs

测试DAG

Unit Testing:
python
import pytest
from airflow.models import DagBag

def test_dag_loaded():
    dagbag = DagBag(dag_folder='dags/', include_examples=False)
    assert len(dagbag.import_errors) == 0

def test_task_count():
    dagbag = DagBag(dag_folder='dags/')
    dag = dagbag.get_dag('my_dag')
    assert len(dag.tasks) == 5

def test_task_dependencies():
    dagbag = DagBag(dag_folder='dags/')
    dag = dagbag.get_dag('my_dag')

    extract = dag.get_task('extract')
    transform = dag.get_task('transform')

    assert transform in extract.downstream_list
Integration Testing:
python
from airflow.models import DagBag
from airflow.utils.state import State

def test_dag_runs():
    dagbag = DagBag(dag_folder='dags/')
    dag = dagbag.get_dag('my_dag')

    # Test DAG run
    dag_run = dag.create_dagrun(
        state=State.RUNNING,
        execution_date=datetime(2023, 1, 1),
        run_type='manual'
    )

    # Run specific task
    task_instance = dag_run.get_task_instance('extract')
    task_instance.run()

    assert task_instance.state == State.SUCCESS
单元测试:
python
import pytest
from airflow.models import DagBag

def test_dag_loaded():
    dagbag = DagBag(dag_folder='dags/', include_examples=False)
    assert len(dagbag.import_errors) == 0

def test_task_count():
    dagbag = DagBag(dag_folder='dags/')
    dag = dagbag.get_dag('my_dag')
    assert len(dag.tasks) == 5

def test_task_dependencies():
    dagbag = DagBag(dag_folder='dags/')
    dag = dagbag.get_dag('my_dag')

    extract = dag.get_task('extract')
    transform = dag.get_task('transform')

    assert transform in extract.downstream_list
集成测试:
python
from airflow.models import DagBag
from airflow.utils.state import State

def test_dag_runs():
    dagbag = DagBag(dag_folder='dags/')
    dag = dagbag.get_dag('my_dag')

    # 测试DAG运行
    dag_run = dag.create_dagrun(
        state=State.RUNNING,
        execution_date=datetime(2023, 1, 1),
        run_type='manual'
    )

    # 运行特定任务
    task_instance = dag_run.get_task_instance('extract')
    task_instance.run()

    assert task_instance.state == State.SUCCESS

Performance Optimization

性能优化

  1. Use Deferrable Operators: For sensors and long-running waits
  2. Dynamic Task Mapping: For parallel processing
  3. Appropriate Executor: Choose based on scale (Local, Celery, Kubernetes)
  4. Connection Pooling: Reuse database connections
  5. Task Parallelism: Set max_active_runs and concurrency appropriately
  6. Lazy Loading: Don't execute heavy logic at DAG parse time
  7. External Storage: Keep metadata database light
  1. 使用可延迟操作符:用于传感器和长时间等待场景
  2. 动态任务映射:用于并行处理
  3. 合适的执行器:根据规模选择(Local、Celery、Kubernetes)
  4. 连接池:复用数据库连接
  5. 任务并行性:合理设置max_active_runs和concurrency
  6. 延迟加载:不在DAG解析时执行重逻辑
  7. 外部存储:减轻元数据数据库负载

Security

安全

  1. Secrets Management: Use Airflow Secrets Backend (not hardcoded)
  2. Connection Encryption: Use encrypted connections for databases
  3. RBAC: Enable role-based access control
  4. Audit Logging: Enable audit logs for compliance
  5. Network Isolation: Restrict worker network access
  6. Credential Rotation: Regularly rotate credentials
  1. 密钥管理:使用Airflow密钥后端(不要硬编码)
  2. 连接加密:对数据库使用加密连接
  3. RBAC:启用基于角色的访问控制
  4. 审计日志:启用审计日志以满足合规要求
  5. 网络隔离:限制工作者的网络访问
  6. 凭证轮换:定期轮换凭证

Configuration Management

配置管理

python
undefined
python
undefined

Use Variables for configuration

使用Variables存储配置

from airflow.models import Variable
config = Variable.get("my_config", deserialize_json=True) api_key = Variable.get("api_key")
from airflow.models import Variable
config = Variable.get("my_config", deserialize_json=True) api_key = Variable.get("api_key")

Use Connections for external services

使用Connections配置外部服务

from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection('my_postgres') db_url = f"postgresql://{conn.login}:{conn.password}@{conn.host}:{conn.port}/{conn.schema}"
undefined
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection('my_postgres') db_url = f"postgresql://{conn.login}:{conn.password}@{conn.host}:{conn.port}/{conn.schema}"
undefined

Common Patterns and Examples

常见模式与示例

See EXAMPLES.md for 18+ detailed real-world examples including:
  • ETL pipelines
  • Machine learning workflows
  • Data quality checks
  • Multi-cloud orchestration
  • Event-driven architectures
  • Complex branching logic
  • Dynamic task generation
  • Asset-based scheduling
  • Sensor patterns
  • Error handling strategies
查看EXAMPLES.md获取18+个详细的真实世界示例,包括:
  • ETL管道
  • 机器学习工作流
  • 数据质量检查
  • 多云编排
  • 事件驱动架构
  • 复杂分支逻辑
  • 动态任务生成
  • 基于资产的调度
  • 传感器模式
  • 错误处理策略

Troubleshooting

故障排查

DAG Not Appearing in UI

DAG未出现在UI中

  1. Check for Python syntax errors in DAG file
  2. Verify DAG file is in correct directory
  3. Check dag_id is unique
  4. Ensure schedule is not None if you expect it to run
  5. Check scheduler logs for import errors
  1. 检查DAG文件中的Python语法错误
  2. 验证DAG文件位于正确目录
  3. 确保dag_id唯一
  4. 如果期望自动运行,确保schedule不为None
  5. 检查调度器日志中的导入错误

Tasks Not Running

任务未运行

  1. Check task dependencies are correct
  2. Verify upstream tasks succeeded
  3. Check task concurrency limits
  4. Ensure executor has available slots
  5. Review task logs for errors
  1. 检查任务依赖是否正确
  2. 验证上游任务执行成功
  3. 检查任务并发限制
  4. 确保执行器有可用插槽
  5. 查看任务日志中的错误

Performance Issues

性能问题

  1. Reduce DAG complexity (break into multiple DAGs)
  2. Optimize SQL queries in tasks
  3. Use appropriate executor for scale
  4. Enable task parallelism
  5. Check for slow sensors (use deferrable mode)
  6. Monitor metadata database performance
  1. 降低DAG复杂度(拆分为多个DAG)
  2. 优化任务中的SQL查询
  3. 根据规模选择合适的执行器
  4. 启用任务并行性
  5. 检查慢传感器(使用可延迟模式)
  6. 监控元数据数据库性能

Common Errors

常见错误

Import Errors:
python
undefined
导入错误:
python
undefined

Bad - imports at DAG level slow parsing

错误 - 在DAG级别导入会减慢解析速度

from heavy_library import process
with DAG(...): pass
from heavy_library import process
with DAG(...): pass

Good - imports inside tasks

正确 - 在任务内部导入

with DAG(...): @task def my_task(): from heavy_library import process process()

**Circular Dependencies:**
```python
with DAG(...): @task def my_task(): from heavy_library import process process()

**循环依赖:**
```python

This will fail

此写法会失败

task1 >> task2 >> task3 >> task1 # Circular!
task1 >> task2 >> task3 >> task1 # 循环依赖!

Must be acyclic

必须是无环的

task1 >> task2 >> task3

**Large XComs:**
```python
task1 >> task2 >> task3

**大型XCom:**
```python

Bad - storing large data in XCom

错误 - 在XCom中存储大数据

@task def process(): large_df = pd.read_csv('big_file.csv') return large_df # Too large!
@task def process(): large_df = pd.read_csv('big_file.csv') return large_df # 数据过大!

Good - store reference

正确 - 存储引用

@task def process(): large_df = pd.read_csv('big_file.csv') path = save_to_s3(large_df) return path # Just the path
undefined
@task def process(): large_df = pd.read_csv('big_file.csv') path = save_to_s3(large_df) return path # 仅返回路径
undefined

Resources

资源


Skill Version: 1.0.0 Last Updated: January 2025 Apache Airflow Version: 2.7+ Skill Category: Data Engineering, Workflow Orchestration, Pipeline Management

指南版本:1.0.0 最后更新:2025年1月 Apache Airflow版本:2.7+ 指南分类:数据工程、工作流编排、管道管理