apache-airflow-orchestration
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseApache Airflow Orchestration
Apache Airflow 工作流编排
A comprehensive skill for mastering Apache Airflow workflow orchestration. This skill covers DAG development, operators, sensors, task dependencies, dynamic workflows, XCom communication, scheduling patterns, and production deployment strategies.
这是一份掌握Apache Airflow工作流编排的全面指南,涵盖DAG开发、操作符、传感器、任务依赖、动态工作流、XCom通信、调度模式以及生产部署策略。
When to Use This Skill
何时使用本指南
Use this skill when:
- Building and managing complex data pipelines with task dependencies
- Orchestrating ETL/ELT workflows across multiple systems
- Scheduling and monitoring batch processing jobs
- Coordinating multi-step data transformations
- Managing workflows with conditional execution and branching
- Implementing event-driven or asset-based workflows
- Deploying production-grade workflow automation
- Creating dynamic workflows that generate tasks programmatically
- Coordinating distributed task execution across clusters
- Building data engineering platforms with workflow orchestration
在以下场景使用本指南:
- 构建和管理带有任务依赖的复杂数据管道
- 跨多个系统编排ETL/ELT工作流
- 调度和监控批处理作业
- 协调多步骤数据转换
- 管理带有条件执行和分支的工作流
- 实现事件驱动或基于资产的工作流
- 部署生产级工作流自动化
- 创建以编程方式生成任务的动态工作流
- 协调跨集群的分布式任务执行
- 构建带有工作流编排的数据工程平台
Core Concepts
核心概念
What is Apache Airflow?
什么是Apache Airflow?
Apache Airflow is an open-source platform for programmatically authoring, scheduling, and monitoring workflows. It allows you to define workflows as Directed Acyclic Graphs (DAGs) using Python code, making complex workflow orchestration maintainable and version-controlled.
Key Principles:
- Dynamic: Workflows are defined in Python, enabling dynamic generation
- Extensible: Rich ecosystem of operators, sensors, and hooks
- Scalable: Can scale from single machine to large clusters
- Observable: Comprehensive UI for monitoring and troubleshooting
Apache Airflow是一个开源平台,用于以编程方式创作、调度和监控工作流。它允许你使用Python代码将工作流定义为有向无环图(DAG),使复杂的工作流编排易于维护且可版本控制。
核心原则:
- 动态性:工作流用Python定义,支持动态生成
- 可扩展性:拥有丰富的操作符、传感器和钩子生态系统
- 可伸缩性:可从单机器扩展到大型集群
- 可观测性:具备用于监控和故障排查的全面UI
DAGs (Directed Acyclic Graphs)
DAG(有向无环图)
A DAG is a collection of tasks organized to reflect their relationships and dependencies.
DAG Properties:
- dag_id: Unique identifier for the DAG
- start_date: When the DAG should start being scheduled
- schedule: How often to run (cron, timedelta, or asset-based)
- catchup: Whether to run missed intervals on DAG activation
- tags: Labels for organization and filtering
- default_args: Default parameters for all tasks in the DAG
DAG Definition Example:
python
from datetime import datetime
from airflow.sdk import DAG
with DAG(
dag_id="example_dag",
start_date=datetime(2022, 1, 1),
schedule="0 0 * * *", # Daily at midnight
catchup=False,
tags=["example", "tutorial"],
) as dag:
# Tasks defined here
passDAG是一组按关系和依赖组织的任务集合。
DAG属性:
- dag_id:DAG的唯一标识符
- start_date:DAG开始调度的时间
- schedule:运行频率(cron表达式、timedelta或基于资产)
- catchup:激活DAG时是否运行错过的时间间隔
- tags:用于组织和过滤的标签
- default_args:DAG中所有任务的默认参数
DAG定义示例:
python
from datetime import datetime
from airflow.sdk import DAG
with DAG(
dag_id="example_dag",
start_date=datetime(2022, 1, 1),
schedule="0 0 * * *", # 每日午夜
catchup=False,
tags=["example", "tutorial"],
) as dag:
# 在此定义任务
passTasks and Operators
任务与操作符
Tasks are the basic units of execution in Airflow. Operators are templates for creating tasks.
Common Operator Types:
- BashOperator: Execute bash commands
- PythonOperator: Execute Python functions
- EmailOperator: Send emails
- EmptyOperator: Placeholder/dummy tasks
- Custom Operators: User-defined operators for specific needs
Operator vs. Task:
- Operator: Template/class definition
- Task: Instantiation of an operator with specific parameters
任务是Airflow中的基本执行单元。操作符是用于创建任务的模板。
常见操作符类型:
- BashOperator:执行bash命令
- PythonOperator:执行Python函数
- EmailOperator:发送邮件
- EmptyOperator:占位符/虚拟任务
- 自定义操作符:针对特定需求的用户自定义操作符
操作符 vs 任务:
- 操作符:模板/类定义
- 任务:带有特定参数的操作符实例
Task Dependencies
任务依赖
Task dependencies define the execution order and workflow structure.
Dependency Operators:
- : Sets downstream dependency (task1 >> task2)
>> - : Sets upstream dependency (task2 << task1)
<< - : Sequential dependencies for multiple tasks
chain() - : Many-to-many relationships
cross_downstream()
Dependency Examples:
python
undefined任务依赖定义了执行顺序和工作流结构。
依赖操作符:
- :设置下游依赖(task1 >> task2)
>> - :设置上游依赖(task2 << task1)
<< - :多个任务的顺序依赖
chain() - :多对多关系
cross_downstream()
依赖示例:
python
undefinedSimple linear flow
简单线性流程
task1 >> task2 >> task3
task1 >> task2 >> task3
Fan-out pattern
扇出模式
task1 >> [task2, task3, task4]
task1 >> [task2, task3, task4]
Fan-in pattern
扇入模式
[task1, task2, task3] >> task4
[task1, task2, task3] >> task4
Complex dependencies
复杂依赖
first_task >> [second_task, third_task]
third_task << fourth_task
undefinedfirst_task >> [second_task, third_task]
third_task << fourth_task
undefinedExecutors
执行器
Executors determine how and where tasks run.
Executor Types:
- SequentialExecutor: Single-threaded, local (default, not for production)
- LocalExecutor: Multi-threaded, single machine
- CeleryExecutor: Distributed execution using Celery
- KubernetesExecutor: Each task runs in a separate Kubernetes pod
- DaskExecutor: Distributed execution using Dask
执行器决定任务的运行方式和位置。
执行器类型:
- SequentialExecutor:单线程,本地运行(默认,不用于生产环境)
- LocalExecutor:多线程,单机器运行
- CeleryExecutor:使用Celery的分布式执行
- KubernetesExecutor:每个任务在独立的Kubernetes Pod中运行
- DaskExecutor:使用Dask的分布式执行
Scheduler
调度器
The Airflow scheduler:
- Monitors all DAGs and their tasks
- Triggers task instances based on dependencies and schedules
- Submits tasks to executors for execution
- Handles retries and task state management
Starting the Scheduler:
bash
airflow schedulerAirflow调度器的功能:
- 监控所有DAG及其任务
- 根据依赖和调度触发任务实例
- 将任务提交给执行器执行
- 处理重试和任务状态管理
启动调度器:
bash
airflow schedulerDAG Development Patterns
DAG开发模式
Basic DAG Structure
基础DAG结构
Every DAG follows this structure:
python
from datetime import datetime
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
with DAG(
dag_id="basic_dag",
start_date=datetime(2022, 1, 1),
schedule="0 0 * * *",
catchup=False,
) as dag:
task1 = BashOperator(
task_id="task1",
bash_command="echo 'Task 1 executed'"
)
task2 = BashOperator(
task_id="task2",
bash_command="echo 'Task 2 executed'"
)
task1 >> task2每个DAG遵循以下结构:
python
from datetime import datetime
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
with DAG(
dag_id="basic_dag",
start_date=datetime(2022, 1, 1),
schedule="0 0 * * *",
catchup=False,
) as dag:
task1 = BashOperator(
task_id="task1",
bash_command="echo 'Task 1 executed'"
)
task2 = BashOperator(
task_id="task2",
bash_command="echo 'Task 2 executed'"
)
task1 >> task2Task Dependencies and Chains
任务依赖与链式调用
Linear Chain:
python
from airflow.sdk import chain线性链式:
python
from airflow.sdk import chainThese are equivalent:
以下两种写法等效:
task1 >> task2 >> task3 >> task4
chain(task1, task2, task3, task4)
**Dynamic Chain:**
```python
from airflow.sdk import chain
from airflow.operators.empty import EmptyOperatortask1 >> task2 >> task3 >> task4
chain(task1, task2, task3, task4)
**动态链式:**
```python
from airflow.sdk import chain
from airflow.operators.empty import EmptyOperatorDynamically generate and chain tasks
动态生成并链式调用任务
chain(*[EmptyOperator(task_id=f"task_{i}") for i in range(1, 6)])
**Pairwise Chain:**
```python
from airflow.sdk import chainchain(*[EmptyOperator(task_id=f"task_{i}") for i in range(1, 6)])
**成对链式:**
```python
from airflow.sdk import chainCreates paired dependencies:
创建成对依赖:
op1 >> op2 >> op4 >> op6
op1 >> op2 >> op4 >> op6
op1 >> op3 >> op5 >> op6
op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)
**Cross Downstream:**
```python
from airflow.sdk import cross_downstreamchain(op1, [op2, op3], [op4, op5], op6)
**交叉下游:**
```python
from airflow.sdk import cross_downstreamBoth op1 and op2 feed into both op3 and op4
op1和op2都输出到op3和op4
cross_downstream([op1, op2], [op3, op4])
undefinedcross_downstream([op1, op2], [op3, op4])
undefinedBranching and Conditional Execution
分支与条件执行
BranchPythonOperator:
python
from airflow.operators.python import BranchPythonOperator
def choose_branch(**context):
if context['data_interval_start'].day == 1:
return 'monthly_task'
return 'daily_task'
branch = BranchPythonOperator(
task_id='branch_task',
python_callable=choose_branch
)
daily_task = BashOperator(task_id='daily_task', bash_command='echo daily')
monthly_task = BashOperator(task_id='monthly_task', bash_command='echo monthly')
branch >> [daily_task, monthly_task]Custom Branch Operator:
python
from airflow.operators.branch import BaseBranchOperator
class MyBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
"""
Run extra branch on first day of month
"""
if context['data_interval_start'].day == 1:
return ['daily_task_id', 'monthly_task_id']
elif context['data_interval_start'].day == 2:
return 'daily_task_id'
else:
return None # Skip all downstream tasksBranchPythonOperator:
python
from airflow.operators.python import BranchPythonOperator
def choose_branch(**context):
if context['data_interval_start'].day == 1:
return 'monthly_task'
return 'daily_task'
branch = BranchPythonOperator(
task_id='branch_task',
python_callable=choose_branch
)
daily_task = BashOperator(task_id='daily_task', bash_command='echo daily')
monthly_task = BashOperator(task_id='monthly_task', bash_command='echo monthly')
branch >> [daily_task, monthly_task]自定义分支操作符:
python
from airflow.operators.branch import BaseBranchOperator
class MyBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
"""
在每月第一天执行额外分支
"""
if context['data_interval_start'].day == 1:
return ['daily_task_id', 'monthly_task_id']
elif context['data_interval_start'].day == 2:
return 'daily_task_id'
else:
return None # 跳过所有下游任务TaskGroups for Organization
TaskGroups任务分组
TaskGroups help organize related tasks hierarchically:
python
from airflow.sdk import task_group
from airflow.operators.empty import EmptyOperator
@task_group()
def data_processing_group():
extract = EmptyOperator(task_id="extract")
transform = EmptyOperator(task_id="transform")
load = EmptyOperator(task_id="load")
extract >> transform >> load
@task_group()
def validation_group():
validate_schema = EmptyOperator(task_id="validate_schema")
validate_data = EmptyOperator(task_id="validate_data")
validate_schema >> validate_data
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
start >> data_processing_group() >> validation_group() >> endTaskGroups帮助按层级组织相关任务:
python
from airflow.sdk import task_group
from airflow.operators.empty import EmptyOperator
@task_group()
def data_processing_group():
extract = EmptyOperator(task_id="extract")
transform = EmptyOperator(task_id="transform")
load = EmptyOperator(task_id="load")
extract >> transform >> load
@task_group()
def validation_group():
validate_schema = EmptyOperator(task_id="validate_schema")
validate_data = EmptyOperator(task_id="validate_data")
validate_schema >> validate_data
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
start >> data_processing_group() >> validation_group() >> endEdge Labeling
边标签
Add labels to dependency edges for clarity:
python
from airflow.sdk import Label为依赖边添加标签以提升可读性:
python
from airflow.sdk import LabelInline labeling
内联标签
my_task >> Label("When empty") >> other_task
my_task >> Label("当为空时") >> other_task
Method-based labeling
方法式标签
my_task.set_downstream(other_task, Label("When empty"))
undefinedmy_task.set_downstream(other_task, Label("当为空时"))
undefinedLatestOnlyOperator
LatestOnlyOperator
Skip tasks if not the latest DAG run:
python
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator
import pendulum
with DAG(
dag_id='latest_only_example',
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=True,
schedule="@daily",
) as dag:
latest_only = LatestOnlyOperator(task_id='latest_only')
task1 = EmptyOperator(task_id='task1')
task2 = EmptyOperator(task_id='task2')
task3 = EmptyOperator(task_id='task3')
task4 = EmptyOperator(task_id='task4', trigger_rule='all_done')
latest_only >> task1 >> task3
latest_only >> task4
task2 >> task3
task2 >> task4如果不是最新的DAG运行,则跳过任务:
python
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator
import pendulum
with DAG(
dag_id='latest_only_example',
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=True,
schedule="@daily",
) as dag:
latest_only = LatestOnlyOperator(task_id='latest_only')
task1 = EmptyOperator(task_id='task1')
task2 = EmptyOperator(task_id='task2')
task3 = EmptyOperator(task_id='task3')
task4 = EmptyOperator(task_id='task4', trigger_rule='all_done')
latest_only >> task1 >> task3
latest_only >> task4
task2 >> task3
task2 >> task4Operators Deep Dive
操作符深入解析
BashOperator
BashOperator
Execute bash commands:
python
from airflow.providers.standard.operators.bash import BashOperator
bash_task = BashOperator(
task_id="bash_example",
bash_command="echo 'Hello from Bash'; date",
env={'MY_VAR': 'value'}, # Environment variables
append_env=True, # Append to existing env vars
output_encoding='utf-8'
)Complex Bash Command:
python
bash_complex = BashOperator(
task_id="complex_bash",
bash_command="""
cd /path/to/dir
python process_data.py --input {{ ds }} --output {{ tomorrow_ds }}
if [ $? -eq 0 ]; then
echo "Success"
else
echo "Failed" && exit 1
fi
""",
)执行bash命令:
python
from airflow.providers.standard.operators.bash import BashOperator
bash_task = BashOperator(
task_id="bash_example",
bash_command="echo 'Hello from Bash'; date",
env={'MY_VAR': 'value'}, # 环境变量
append_env=True, # 追加到现有环境变量
output_encoding='utf-8'
)复杂Bash命令:
python
bash_complex = BashOperator(
task_id="complex_bash",
bash_command="""
cd /path/to/dir
python process_data.py --input {{ ds }} --output {{ tomorrow_ds }}
if [ $? -eq 0 ]; then
echo "Success"
else
echo "Failed" && exit 1
fi
""",
)PythonOperator
PythonOperator
Execute Python functions:
python
from airflow.providers.standard.operators.python import PythonOperator
def my_python_function(name, **context):
print(f"Hello {name}!")
print(f"Execution date: {context['ds']}")
return "Success"
python_task = PythonOperator(
task_id="python_example",
python_callable=my_python_function,
op_kwargs={'name': 'Airflow'},
provide_context=True
)Traditional ETL with PythonOperator:
python
import json
import pendulum
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
return json.loads(data_string)
def transform(ti):
# Pull from XCom
order_data_dict = ti.xcom_pull(task_ids="extract")
total_order_value = sum(order_data_dict.values())
return {"total_order_value": total_order_value}
def load(ti):
# Pull from XCom
total = ti.xcom_pull(task_ids="transform")["total_order_value"]
print(f"Total order value is: {total:.2f}")
with DAG(
dag_id="legacy_etl_pipeline",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
) as dag:
extract_task = PythonOperator(task_id="extract", python_callable=extract)
transform_task = PythonOperator(task_id="transform", python_callable=transform)
load_task = PythonOperator(task_id="load", python_callable=load)
extract_task >> transform_task >> load_task执行Python函数:
python
from airflow.providers.standard.operators.python import PythonOperator
def my_python_function(name, **context):
print(f"Hello {name}!")
print(f"执行日期: {context['ds']}")
return "Success"
python_task = PythonOperator(
task_id="python_example",
python_callable=my_python_function,
op_kwargs={'name': 'Airflow'},
provide_context=True
)使用PythonOperator的传统ETL:
python
import json
import pendulum
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
return json.loads(data_string)
def transform(ti):
# 从XCom拉取数据
order_data_dict = ti.xcom_pull(task_ids="extract")
total_order_value = sum(order_data_dict.values())
return {"total_order_value": total_order_value}
def load(ti):
# 从XCom拉取数据
total = ti.xcom_pull(task_ids="transform")["total_order_value"]
print(f"订单总金额为: {total:.2f}")
with DAG(
dag_id="legacy_etl_pipeline",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
) as dag:
extract_task = PythonOperator(task_id="extract", python_callable=extract)
transform_task = PythonOperator(task_id="transform", python_callable=transform)
load_task = PythonOperator(task_id="load", python_callable=load)
extract_task >> transform_task >> load_taskEmailOperator
EmailOperator
Send email notifications:
python
from airflow.providers.smtp.operators.smtp import EmailOperator
email_task = EmailOperator(
task_id='send_email',
to='recipient@example.com',
subject='Airflow Notification',
html_content='<h3>Task completed successfully!</h3>',
cc=['cc@example.com'],
bcc=['bcc@example.com']
)发送邮件通知:
python
from airflow.providers.smtp.operators.smtp import EmailOperator
email_task = EmailOperator(
task_id='send_email',
to='recipient@example.com',
subject='Airflow通知',
html_content='<h3>任务执行成功!</h3>',
cc=['cc@example.com'],
bcc=['bcc@example.com']
)EmptyOperator
EmptyOperator
Placeholder for workflow structure:
python
from airflow.operators.empty import EmptyOperator
start = EmptyOperator(task_id='start')
end = EmptyOperator(task_id='end')用于工作流结构的占位符:
python
from airflow.operators.empty import EmptyOperator
start = EmptyOperator(task_id='start')
end = EmptyOperator(task_id='end')Useful for organizing complex DAGs
用于组织复杂DAG
start >> [task1, task2, task3] >> end
undefinedstart >> [task1, task2, task3] >> end
undefinedCustom Operators
自定义操作符
Create reusable custom operators:
python
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class MyCustomOperator(BaseOperator):
@apply_defaults
def __init__(self, my_param, *args, **kwargs):
super().__init__(*args, **kwargs)
self.my_param = my_param
def execute(self, context):
self.log.info(f"Executing with param: {self.my_param}")
# Custom logic here
return "Result"创建可复用的自定义操作符:
python
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class MyCustomOperator(BaseOperator):
@apply_defaults
def __init__(self, my_param, *args, **kwargs):
super().__init__(*args, **kwargs)
self.my_param = my_param
def execute(self, context):
self.log.info(f"使用参数执行: {self.my_param}")
# 自定义逻辑
return "Result"Usage
使用示例
custom_task = MyCustomOperator(
task_id="custom",
my_param="value"
)
undefinedcustom_task = MyCustomOperator(
task_id="custom",
my_param="value"
)
undefinedSensors Deep Dive
传感器深入解析
Sensors are a special type of operator that wait for a certain condition to be met before proceeding.
传感器是一种特殊的操作符,会等待特定条件满足后再继续执行。
ExternalTaskSensor
ExternalTaskSensor
Wait for tasks in other DAGs:
python
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
import pendulum
with DAG(
dag_id="example_external_task_sensor",
start_date=pendulum.datetime(2021, 10, 20, tz="UTC"),
catchup=False,
schedule=None,
) as dag:
wait_for_task = ExternalTaskSensor(
task_id="wait_for_task",
external_dag_id="upstream_dag",
external_task_id="upstream_task",
allowed_states=["success"],
failed_states=["failed"],
execution_delta=None, # Same execution_date
timeout=600, # 10 minutes
poke_interval=60, # Check every 60 seconds
)Deferrable ExternalTaskSensor:
python
undefined等待其他DAG中的任务完成:
python
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
import pendulum
with DAG(
dag_id="example_external_task_sensor",
start_date=pendulum.datetime(2021, 10, 20, tz="UTC"),
catchup=False,
schedule=None,
) as dag:
wait_for_task = ExternalTaskSensor(
task_id="wait_for_task",
external_dag_id="upstream_dag",
external_task_id="upstream_task",
allowed_states=["success"],
failed_states=["failed"],
execution_delta=None, # 相同的execution_date
timeout=600, # 10分钟
poke_interval=60, # 每60秒检查一次
)可延迟的ExternalTaskSensor:
python
undefinedMore efficient - releases worker slot while waiting
更高效 - 等待时释放工作者插槽
wait_for_task_async = ExternalTaskSensor(
task_id="wait_for_task_async",
external_dag_id="upstream_dag",
external_task_id="upstream_task",
allowed_states=["success"],
failed_states=["failed"],
deferrable=True, # Use async mode
)
undefinedwait_for_task_async = ExternalTaskSensor(
task_id="wait_for_task_async",
external_dag_id="upstream_dag",
external_task_id="upstream_task",
allowed_states=["success"],
failed_states=["failed"],
deferrable=True, # 使用异步模式
)
undefinedFileSensor
FileSensor
Wait for files to appear:
python
from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id="wait_for_file",
filepath="/path/to/file.csv",
poke_interval=30,
timeout=600,
mode='poke' # or 'reschedule' for long waits
)等待文件出现:
python
from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id="wait_for_file",
filepath="/path/to/file.csv",
poke_interval=30,
timeout=600,
mode='poke' # 或使用'reschedule'用于长时间等待
)TimeDeltaSensor
TimeDeltaSensor
Wait for a specific time period:
python
from datetime import timedelta
from airflow.sensors.time_delta import TimeDeltaSensor
wait_one_hour = TimeDeltaSensor(
task_id="wait_one_hour",
delta=timedelta(hours=1)
)等待特定时长:
python
from datetime import timedelta
from airflow.sensors.time_delta import TimeDeltaSensor
wait_one_hour = TimeDeltaSensor(
task_id="wait_one_hour",
delta=timedelta(hours=1)
)BigQuery Table Sensor
BigQuery Table Sensor
Wait for BigQuery table to exist:
python
from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceSensor
import pendulum
with DAG(
dag_id="bigquery_sensor_example",
start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
) as dag:
wait_for_table = BigQueryTableExistenceSensor(
task_id="wait_for_table",
project_id="your-project-id",
dataset_id="your_dataset",
table_id="your_table",
bigquery_conn_id="google_cloud_default",
location="US",
poke_interval=60,
timeout=3600,
)等待BigQuery表存在:
python
from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceSensor
import pendulum
with DAG(
dag_id="bigquery_sensor_example",
start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
) as dag:
wait_for_table = BigQueryTableExistenceSensor(
task_id="wait_for_table",
project_id="your-project-id",
dataset_id="your_dataset",
table_id="your_table",
bigquery_conn_id="google_cloud_default",
location="US",
poke_interval=60,
timeout=3600,
)Custom Sensors
自定义传感器
Create custom sensors for specific conditions:
python
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class MyCustomSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, my_condition, *args, **kwargs):
super().__init__(*args, **kwargs)
self.my_condition = my_condition
def poke(self, context):
# Return True when condition is met
self.log.info(f"Checking condition: {self.my_condition}")
# Custom logic to check condition
return check_condition(self.my_condition)创建针对特定条件的自定义传感器:
python
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class MyCustomSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, my_condition, *args, **kwargs):
super().__init__(*args, **kwargs)
self.my_condition = my_condition
def poke(self, context):
# 条件满足时返回True
self.log.info(f"检查条件: {self.my_condition}")
# 检查条件的自定义逻辑
return check_condition(self.my_condition)Deferrable Sensors
可延迟传感器
Deferrable sensors release worker slots while waiting:
python
from datetime import timedelta
from airflow.sdk import BaseSensorOperator, StartTriggerArgs
class WaitHoursSensor(BaseSensorOperator):
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
trigger_kwargs={"moment": timedelta(hours=1)},
next_method="execute_complete",
next_kwargs=None,
timeout=None,
)
start_from_trigger = True
def __init__(self, *args, trigger_kwargs=None, start_from_trigger=True, **kwargs):
super().__init__(*args, **kwargs)
if trigger_kwargs:
self.start_trigger_args.trigger_kwargs = trigger_kwargs
self.start_from_trigger = start_from_trigger
def execute_complete(self, context, event=None):
return # Task complete可延迟传感器在等待时释放工作者插槽:
python
from datetime import timedelta
from airflow.sdk import BaseSensorOperator, StartTriggerArgs
class WaitHoursSensor(BaseSensorOperator):
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
trigger_kwargs={"moment": timedelta(hours=1)},
next_method="execute_complete",
next_kwargs=None,
timeout=None,
)
start_from_trigger = True
def __init__(self, *args, trigger_kwargs=None, start_from_trigger=True, **kwargs):
super().__init__(*args, **kwargs)
if trigger_kwargs:
self.start_trigger_args.trigger_kwargs = trigger_kwargs
self.start_from_trigger = start_from_trigger
def execute_complete(self, context, event=None):
return # 任务完成XComs (Cross-Communication)
XCom(跨任务通信)
XComs enable task-to-task communication by storing and retrieving data.
XCom通过存储和检索数据实现任务间通信。
Basic XCom Usage
XCom基础用法
Pushing to XCom:
python
def push_function(**context):
value = "Important data"
context['ti'].xcom_push(key='my_key', value=value)
# Or simply return (uses 'return_value' key)
return value
push_task = PythonOperator(
task_id='push',
python_callable=push_function,
provide_context=True
)Pulling from XCom:
python
def pull_function(**context):
# Pull by task_id (uses 'return_value' key)
value = context['ti'].xcom_pull(task_ids='push')
# Pull with specific key
value = context['ti'].xcom_pull(task_ids='push', key='my_key')
print(f"Pulled value: {value}")
pull_task = PythonOperator(
task_id='pull',
python_callable=pull_function,
provide_context=True
)推送数据到XCom:
python
def push_function(**context):
value = "重要数据"
context['ti'].xcom_push(key='my_key', value=value)
# 或者直接返回(使用'return_value'作为键)
return value
push_task = PythonOperator(
task_id='push',
python_callable=push_function,
provide_context=True
)从XCom拉取数据:
python
def pull_function(**context):
# 按task_id拉取(使用'return_value'键)
value = context['ti'].xcom_pull(task_ids='push')
# 使用特定键拉取
value = context['ti'].xcom_pull(task_ids='push', key='my_key')
print(f"拉取到的值: {value}")
pull_task = PythonOperator(
task_id='pull',
python_callable=pull_function,
provide_context=True
)XCom with TaskFlow API
结合TaskFlow API使用XCom
TaskFlow API automatically manages XComs:
python
from airflow.decorators import task
@task
def extract():
return {"data": [1, 2, 3, 4, 5]}
@task
def transform(data_dict):
# Automatically receives XCom from extract
total = sum(data_dict['data'])
return {"total": total}
@task
def load(summary):
print(f"Total: {summary['total']}")TaskFlow API自动管理XCom:
python
from airflow.decorators import task
@task
def extract():
return {"data": [1, 2, 3, 4, 5]}
@task
def transform(data_dict):
# 自动接收来自extract的XCom数据
total = sum(data_dict['data'])
return {"total": total}
@task
def load(summary):
print(f"总和: {summary['total']}")Automatic XCom handling
自动XCom处理
data = extract()
summary = transform(data)
load(summary)
undefineddata = extract()
summary = transform(data)
load(summary)
undefinedXCom Best Practices
XCom最佳实践
Size Limitations:
- XComs are stored in the metadata database
- Keep XCom data small (< 1MB recommended)
- For large data, store in external systems and pass references
Example with External Storage:
python
@task
def process_large_data():
# Process data
large_result = compute_large_dataset()
# Store in S3/GCS
file_path = save_to_s3(large_result, "s3://bucket/result.parquet")
# Return only the path
return {"result_path": file_path}
@task
def consume_large_data(metadata):
# Load from S3/GCS
data = load_from_s3(metadata['result_path'])
process(data)大小限制:
- XCom存储在元数据数据库中
- 保持XCom数据较小(建议小于1MB)
- 对于大数据,存储在外部系统并传递引用
外部存储示例:
python
@task
def process_large_data():
# 处理数据
large_result = compute_large_dataset()
# 存储到S3/GCS
file_path = save_to_s3(large_result, "s3://bucket/result.parquet")
# 仅返回路径
return {"result_path": file_path}
@task
def consume_large_data(metadata):
# 从S3/GCS加载数据
data = load_from_s3(metadata['result_path'])
process(data)XCom with Operators
结合操作符使用XCom
Reading XCom in Templates:
python
from airflow.providers.standard.operators.bash import BashOperator
process_file = BashOperator(
task_id="process",
bash_command="python process.py {{ ti.xcom_pull(task_ids='extract') }}",
)XCom with EmailOperator:
python
from airflow.sdk import task
from airflow.providers.smtp.operators.smtp import EmailOperator
@task
def get_ip():
return "192.168.1.1"
@task(multiple_outputs=True)
def compose_email(external_ip):
return {
'subject': f'Server connected from {external_ip}',
'body': f'Your server is connected from {external_ip}<br>'
}
email_info = compose_email(get_ip())
EmailOperator(
task_id='send_email',
to='example@example.com',
subject=email_info['subject'],
html_content=email_info['body']
)在模板中读取XCom:
python
from airflow.providers.standard.operators.bash import BashOperator
process_file = BashOperator(
task_id="process",
bash_command="python process.py {{ ti.xcom_pull(task_ids='extract') }}",
)结合EmailOperator使用XCom:
python
from airflow.sdk import task
from airflow.providers.smtp.operators.smtp import EmailOperator
@task
def get_ip():
return "192.168.1.1"
@task(multiple_outputs=True)
def compose_email(external_ip):
return {
'subject': f'服务器从{external_ip}连接',
'body': f'您的服务器从{external_ip}连接<br>'
}
email_info = compose_email(get_ip())
EmailOperator(
task_id='send_email',
to='example@example.com',
subject=email_info['subject'],
html_content=email_info['body']
)Dynamic Workflows
动态工作流
Create tasks dynamically based on runtime conditions or external data.
基于运行时条件或外部数据动态创建任务。
Dynamic Task Generation with Loops
使用循环动态生成任务
python
from airflow.sdk import DAG
from airflow.operators.empty import EmptyOperator
with DAG("dynamic_loop_example", ...) as dag:
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
# Dynamically create tasks
options = ["branch_a", "branch_b", "branch_c", "branch_d"]
for option in options:
task = EmptyOperator(task_id=option)
start >> task >> endpython
from airflow.sdk import DAG
from airflow.operators.empty import EmptyOperator
with DAG("dynamic_loop_example", ...) as dag:
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
# 动态创建任务
options = ["branch_a", "branch_b", "branch_c", "branch_d"]
for option in options:
task = EmptyOperator(task_id=option)
start >> task >> endDynamic Task Mapping
动态任务映射
Map over task outputs to create dynamic parallel tasks:
python
from airflow.decorators import task
@task
def extract():
# Returns list of items to process
return [1, 2, 3, 4, 5]
@task
def transform(item):
# Processes single item
return item * 2
@task
def load(items):
# Receives all transformed items
print(f"Loaded {len(items)} items: {items}")映射任务输出以创建动态并行任务:
python
from airflow.decorators import task
@task
def extract():
# 返回待处理的项目列表
return [1, 2, 3, 4, 5]
@task
def transform(item):
# 处理单个项目
return item * 2
@task
def load(items):
# 接收所有转换后的项目
print(f"加载了{len(items)}个项目: {items}")Dynamic mapping
动态映射
data = extract()
transformed = transform.expand(item=data) # Creates 5 parallel tasks
load(transformed)
**Mapping with Classic Operators:**
```python
from airflow.operators.bash import BashOperator
class ExtractOperator(BaseOperator):
def execute(self, context):
return ["file1.csv", "file2.csv", "file3.csv"]
class TransformOperator(BaseOperator):
def __init__(self, input, **kwargs):
super().__init__(**kwargs)
self.input = input
def execute(self, context):
# Process single file
return f"processed_{self.input}"
extract = ExtractOperator(task_id="extract")
transform = TransformOperator.partial(task_id="transform").expand(input=extract.output)data = extract()
transformed = transform.expand(item=data) # 创建5个并行任务
load(transformed)
**结合传统操作符使用映射:**
```python
from airflow.operators.bash import BashOperator
class ExtractOperator(BaseOperator):
def execute(self, context):
return ["file1.csv", "file2.csv", "file3.csv"]
class TransformOperator(BaseOperator):
def __init__(self, input, **kwargs):
super().__init__(**kwargs)
self.input = input
def execute(self, context):
# 处理单个文件
return f"processed_{self.input}"
extract = ExtractOperator(task_id="extract")
transform = TransformOperator.partial(task_id="transform").expand(input=extract.output)Task Group Mapping
任务组映射
Map over entire task groups:
python
from airflow.decorators import task, task_group
@task
def add_one(value):
return value + 1
@task
def double(value):
return value * 2
@task_group
def process_group(value):
incremented = add_one(value)
return double(incremented)
@task
def aggregate(results):
print(f"Results: {results}")对整个任务组进行映射:
python
from airflow.decorators import task, task_group
@task
def add_one(value):
return value + 1
@task
def double(value):
return value * 2
@task_group
def process_group(value):
incremented = add_one(value)
return double(incremented)
@task
def aggregate(results):
print(f"结果: {results}")Map task group over values
对任务组进行映射
results = process_group.expand(value=[1, 2, 3, 4, 5])
aggregate(results)
undefinedresults = process_group.expand(value=[1, 2, 3, 4, 5])
aggregate(results)
undefinedPartial Parameters with Mapping
映射结合部分参数
Mix static and dynamic parameters:
python
@task
def process(base_path, filename):
full_path = f"{base_path}/{filename}"
return f"Processed {full_path}"混合静态和动态参数:
python
@task
def process(base_path, filename):
full_path = f"{base_path}/{filename}"
return f"已处理 {full_path}"Static parameter 'base_path', dynamic 'filename'
静态参数'base_path',动态参数'filename'
results = process.partial(base_path="/data").expand(
filename=["file1.csv", "file2.csv", "file3.csv"]
)
undefinedresults = process.partial(base_path="/data").expand(
filename=["file1.csv", "file2.csv", "file3.csv"]
)
undefinedTaskFlow API
TaskFlow API
The modern way to write Airflow DAGs with automatic XCom handling and cleaner syntax.
这是编写Airflow DAG的现代方式,支持自动XCom处理和更简洁的语法。
Basic TaskFlow Example
TaskFlow基础示例
python
from airflow.decorators import dag, task
import pendulum
@dag(
dag_id="taskflow_example",
start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
schedule=None,
catchup=False,
)
def my_taskflow_dag():
@task
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
import json
return json.loads(data_string)
@task
def transform(order_data_dict):
total = sum(order_data_dict.values())
return {"total_order_value": total}
@task
def load(summary):
print(f"Total order value: {summary['total_order_value']:.2f}")
# Function calls create task dependencies automatically
order_data = extract()
summary = transform(order_data)
load(summary)python
from airflow.decorators import dag, task
import pendulum
@dag(
dag_id="taskflow_example",
start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
schedule=None,
catchup=False,
)
def my_taskflow_dag():
@task
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
import json
return json.loads(data_string)
@task
def transform(order_data_dict):
total = sum(order_data_dict.values())
return {"total_order_value": total}
@task
def load(summary):
print(f"订单总金额: {summary['total_order_value']:.2f}")
# 函数调用自动创建任务依赖
order_data = extract()
summary = transform(order_data)
load(summary)Instantiate the DAG
实例化DAG
my_taskflow_dag()
undefinedmy_taskflow_dag()
undefinedMultiple Outputs
多输出
Return multiple values from tasks:
python
@task(multiple_outputs=True)
def extract_data():
return {
'orders': [1, 2, 3],
'customers': ['A', 'B', 'C'],
'revenue': 1000.50
}
@task
def process_orders(orders):
print(f"Processing {len(orders)} orders")
@task
def process_customers(customers):
print(f"Processing {len(customers)} customers")从任务返回多个值:
python
@task(multiple_outputs=True)
def extract_data():
return {
'orders': [1, 2, 3],
'customers': ['A', 'B', 'C'],
'revenue': 1000.50
}
@task
def process_orders(orders):
print(f"处理{len(orders)}个订单")
@task
def process_customers(customers):
print(f"处理{len(customers)}个客户")Access individual outputs
访问单独的输出
data = extract_data()
process_orders(data['orders'])
process_customers(data['customers'])
undefineddata = extract_data()
process_orders(data['orders'])
process_customers(data['customers'])
undefinedMixing TaskFlow with Traditional Operators
混合TaskFlow与传统操作符
python
from airflow.decorators import dag, task
from airflow.providers.standard.operators.bash import BashOperator
import pendulum
@dag(
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
schedule=None,
)
def mixed_dag():
@task
def get_date():
from datetime import datetime
return datetime.now().strftime("%Y%m%d")
# Traditional operator
bash_task = BashOperator(
task_id="print_date",
bash_command="echo Processing data for {{ ti.xcom_pull(task_ids='get_date') }}"
)
@task
def process_results():
print("Processing complete")
# Mix task types
date = get_date()
date >> bash_task >> process_results()
mixed_dag()python
from airflow.decorators import dag, task
from airflow.providers.standard.operators.bash import BashOperator
import pendulum
@dag(
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
schedule=None,
)
def mixed_dag():
@task
def get_date():
from datetime import datetime
return datetime.now().strftime("%Y%m%d")
# 传统操作符
bash_task = BashOperator(
task_id="print_date",
bash_command="echo 处理{{ ti.xcom_pull(task_ids='get_date') }}的数据"
)
@task
def process_results():
print("处理完成")
# 混合任务类型
date = get_date()
date >> bash_task >> process_results()
mixed_dag()Virtual Environment for Tasks
任务的虚拟环境
Isolate task dependencies:
python
from airflow.decorators import dag, task
import pendulum
@dag(
dag_id="virtualenv_example",
start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
)
def virtualenv_dag():
@task.virtualenv(
requirements=["pandas==1.5.0", "numpy==1.23.0"],
system_site_packages=False
)
def analyze_data():
import pandas as pd
import numpy as np
df = pd.DataFrame({'col1': [1, 2, 3], 'col2': [4, 5, 6]})
result = np.mean(df['col1'])
return float(result)
@task
def report_results(mean_value):
print(f"Mean value: {mean_value}")
result = analyze_data()
report_results(result)
virtualenv_dag()隔离任务依赖:
python
from airflow.decorators import dag, task
import pendulum
@dag(
dag_id="virtualenv_example",
start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
)
def virtualenv_dag():
@task.virtualenv(
requirements=["pandas==1.5.0", "numpy==1.23.0"],
system_site_packages=False
)
def analyze_data():
import pandas as pd
import numpy as np
df = pd.DataFrame({'col1': [1, 2, 3], 'col2': [4, 5, 6]})
result = np.mean(df['col1'])
return float(result)
@task
def report_results(mean_value):
print(f"平均值: {mean_value}")
result = analyze_data()
report_results(result)
virtualenv_dag()Asset-Based Scheduling
基于资产的调度
Schedule DAGs based on data assets (formerly datasets) rather than time.
基于数据资产(原数据集)而非时间来调度DAG。
Producer-Consumer Pattern
生产者-消费者模式
python
from airflow.sdk import DAG, Asset
from airflow.operators.bash import BashOperator
from datetime import datetimepython
from airflow.sdk import DAG, Asset
from airflow.operators.bash import BashOperator
from datetime import datetimeDefine asset
定义资产
customer_data = Asset("s3://my-bucket/customers.parquet")
customer_data = Asset("s3://my-bucket/customers.parquet")
Producer DAG
生产者DAG
with DAG(
dag_id="producer_dag",
start_date=datetime(2023, 1, 1),
schedule="@daily",
) as producer:
BashOperator(
task_id="generate_data",
bash_command="python generate_customers.py",
outlets=[customer_data] # Marks asset as updated
)with DAG(
dag_id="producer_dag",
start_date=datetime(2023, 1, 1),
schedule="@daily",
) as producer:
BashOperator(
task_id="generate_data",
bash_command="python generate_customers.py",
outlets=[customer_data] # 标记资产已更新
)Consumer DAG - triggered when asset updates
消费者DAG - 资产更新时触发
with DAG(
dag_id="consumer_dag",
schedule=[customer_data], # Triggered by asset
start_date=datetime(2023, 1, 1),
catchup=False,
) as consumer:
BashOperator(
task_id="process_data",
bash_command="python process_customers.py"
)undefinedwith DAG(
dag_id="consumer_dag",
schedule=[customer_data], # 由资产触发
start_date=datetime(2023, 1, 1),
catchup=False,
) as consumer:
BashOperator(
task_id="process_data",
bash_command="python process_customers.py"
)undefinedMultiple Asset Dependencies
多资产依赖
AND Logic (all assets must update):
python
from airflow.datasets import Dataset
asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")
with DAG(
dag_id="wait_for_both",
schedule=[asset_1 & asset_2], # Both must update
start_date=datetime(2023, 1, 1),
):
passOR Logic (any asset update triggers):
python
asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")
with DAG(
dag_id="triggered_by_either",
schedule=[asset_1 | asset_2], # Either can trigger
start_date=datetime(2023, 1, 1),
):
passComplex Logic:
python
asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")
asset_3 = Dataset("s3://bucket/file3.csv")
with DAG(
dag_id="complex_condition",
schedule=(asset_1 | (asset_2 & asset_3)), # asset_1 OR (asset_2 AND asset_3)
start_date=datetime(2023, 1, 1),
):
passAND逻辑(所有资产必须更新):
python
from airflow.datasets import Dataset
asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")
with DAG(
dag_id="wait_for_both",
schedule=[asset_1 & asset_2], # 两者都必须更新
start_date=datetime(2023, 1, 1),
):
passOR逻辑(任意资产更新即可触发):
python
asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")
with DAG(
dag_id="triggered_by_either",
schedule=[asset_1 | asset_2], # 任意一个都可触发
start_date=datetime(2023, 1, 1),
):
pass复杂逻辑:
python
asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")
asset_3 = Dataset("s3://bucket/file3.csv")
with DAG(
dag_id="complex_condition",
schedule=(asset_1 | (asset_2 & asset_3)), # asset_1 或者 (asset_2 且 asset_3)
start_date=datetime(2023, 1, 1),
):
passAsset Aliases
资产别名
Use aliases for flexible asset references:
python
from airflow.datasets import Dataset, AssetAlias
from airflow.decorators import task使用别名实现灵活的资产引用:
python
from airflow.datasets import Dataset, AssetAlias
from airflow.decorators import taskProducer with alias
带别名的生产者
with DAG(dag_id="alias_producer", start_date=datetime(2023, 1, 1)):
@task(outlets=[AssetAlias("my-alias")])
def produce_data(*, outlet_events):
# Dynamically add actual asset
outlet_events[AssetAlias("my-alias")].add(
Dataset("s3://bucket/my-file.csv")
)
with DAG(dag_id="alias_producer", start_date=datetime(2023, 1, 1)):
@task(outlets=[AssetAlias("my-alias")])
def produce_data(*, outlet_events):
# 动态添加实际资产
outlet_events[AssetAlias("my-alias")].add(
Dataset("s3://bucket/my-file.csv")
)
Consumer depending on alias
依赖别名的消费者
with DAG(
dag_id="alias_consumer",
schedule=AssetAlias("my-alias"),
start_date=datetime(2023, 1, 1),
):
pass
undefinedwith DAG(
dag_id="alias_consumer",
schedule=AssetAlias("my-alias"),
start_date=datetime(2023, 1, 1),
):
pass
undefinedAccessing Asset Event Information
访问资产事件信息
python
@task
def process_asset_data(*, triggering_asset_events):
for event in triggering_asset_events:
print(f"Asset: {event.asset.uri}")
print(f"Timestamp: {event.timestamp}")
print(f"Extra: {event.extra}")python
@task
def process_asset_data(*, triggering_asset_events):
for event in triggering_asset_events:
print(f"资产: {event.asset.uri}")
print(f"时间戳: {event.timestamp}")
print(f"额外信息: {event.extra}")Scheduling Patterns
调度模式
Cron Expressions
Cron表达式
python
undefinedpython
undefinedEvery day at midnight
每日午夜
schedule="0 0 * * *"
schedule="0 0 * * *"
Every Monday at 9 AM
每周一上午9点
schedule="0 9 * * 1"
schedule="0 9 * * 1"
Every 15 minutes
每15分钟
schedule="*/15 * * * *"
schedule="*/15 * * * *"
First day of month at noon
每月1日中午
schedule="0 12 1 * *"
schedule="0 12 1 * *"
Weekdays at 6 PM
工作日下午6点
schedule="0 18 * * 1-5"
undefinedschedule="0 18 * * 1-5"
undefinedTimedelta Scheduling
Timedelta调度
python
from datetime import timedelta
with DAG(
dag_id="timedelta_schedule",
start_date=datetime(2023, 1, 1),
schedule=timedelta(hours=6), # Every 6 hours
):
passpython
from datetime import timedelta
with DAG(
dag_id="timedelta_schedule",
start_date=datetime(2023, 1, 1),
schedule=timedelta(hours=6), # 每6小时
):
passPreset Schedules
预设调度
python
undefinedpython
undefinedCommon presets
常见预设
schedule="@once" # Run once
schedule="@hourly" # Every hour
schedule="@daily" # Daily at midnight
schedule="@weekly" # Every Sunday at midnight
schedule="@monthly" # First day of month at midnight
schedule="@yearly" # January 1st at midnight
schedule=None # Manual trigger only
undefinedschedule="@once" # 仅运行一次
schedule="@hourly" # 每小时运行一次
schedule="@daily" # 每日午夜运行
schedule="@weekly" # 每周日午夜运行
schedule="@monthly" # 每月1日午夜运行
schedule="@yearly" # 每年1月1日午夜运行
schedule=None # 仅手动触发
undefinedCatchup and Backfilling
补跑与回填
Catchup:
python
with DAG(
dag_id="catchup_example",
start_date=datetime(2023, 1, 1),
schedule="@daily",
catchup=True, # Run all missed intervals
):
pass
with DAG(
dag_id="no_catchup",
start_date=datetime(2023, 1, 1),
schedule="@daily",
catchup=False, # Only run latest interval
):
passManual Backfilling:
bash
undefined补跑:
python
with DAG(
dag_id="catchup_example",
start_date=datetime(2023, 1, 1),
schedule="@daily",
catchup=True, # 运行所有错过的时间间隔
):
pass
with DAG(
dag_id="no_catchup",
start_date=datetime(2023, 1, 1),
schedule="@daily",
catchup=False, # 仅运行最新的时间间隔
):
pass手动回填:
bash
undefinedBackfill specific date range
回填特定日期范围
airflow dags backfill
--start-date 2023-01-01
--end-date 2023-01-31
my_dag_id
--start-date 2023-01-01
--end-date 2023-01-31
my_dag_id
airflow dags backfill
--start-date 2023-01-01
--end-date 2023-01-31
my_dag_id
--start-date 2023-01-01
--end-date 2023-01-31
my_dag_id
Backfill with marking success (no execution)
标记成功的回填(不实际执行)
airflow dags backfill
--start-date 2023-01-01
--end-date 2023-01-31
--mark-success
my_dag_id
--start-date 2023-01-01
--end-date 2023-01-31
--mark-success
my_dag_id
undefinedairflow dags backfill
--start-date 2023-01-01
--end-date 2023-01-31
--mark-success
my_dag_id
--start-date 2023-01-01
--end-date 2023-01-31
--mark-success
my_dag_id
undefinedProduction Patterns
生产环境模式
Error Handling and Retries
错误处理与重试
Task-Level Retries:
python
from airflow.operators.bash import BashOperator
from datetime import timedelta
task_with_retry = BashOperator(
task_id="retry_task",
bash_command="python might_fail.py",
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=30),
)DAG-Level Default Args:
python
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['alerts@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id="production_dag",
default_args=default_args,
start_date=datetime(2023, 1, 1),
schedule="@daily",
):
pass任务级重试:
python
from airflow.operators.bash import BashOperator
from datetime import timedelta
task_with_retry = BashOperator(
task_id="retry_task",
bash_command="python might_fail.py",
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=30),
)DAG级默认参数:
python
from datetime import datetime, timedelta
default_args = {
'owner': '数据团队',
'depends_on_past': False,
'email': ['alerts@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id="production_dag",
default_args=default_args,
start_date=datetime(2023, 1, 1),
schedule="@daily",
):
passTask Concurrency Control
任务并发控制
Per-Task Concurrency:
python
from airflow.operators.bash import BashOperator
from datetime import timedelta单任务并发限制:
python
from airflow.operators.bash import BashOperator
from datetime import timedeltaLimit concurrent instances of this task
限制此任务的并发实例数
limited_task = BashOperator(
task_id="limited_task",
bash_command="echo 'Processing'",
max_active_tis_per_dag=3 # Max 3 instances running
)
**DAG-Level Concurrency:**
```python
with DAG(
dag_id="concurrent_dag",
start_date=datetime(2023, 1, 1),
schedule="@daily",
max_active_runs=5, # Max 5 DAG runs simultaneously
concurrency=10, # Max 10 task instances across all runs
):
passlimited_task = BashOperator(
task_id="limited_task",
bash_command="echo '处理中'",
max_active_tis_per_dag=3 # 最多3个实例同时运行
)
**DAG级并发限制:**
```python
with DAG(
dag_id="concurrent_dag",
start_date=datetime(2023, 1, 1),
schedule="@daily",
max_active_runs=5, # 最多5个DAG运行实例同时运行
concurrency=10, # 所有运行实例中最多10个任务实例
):
passIdempotency
幂等性
Make tasks idempotent for safe retries:
python
@task
def idempotent_load(**context):
execution_date = context['ds']
# Delete existing data for this date first
delete_query = f"""
DELETE FROM target_table
WHERE date = '{execution_date}'
"""
execute_sql(delete_query)
# Insert new data
insert_query = f"""
INSERT INTO target_table
SELECT * FROM source
WHERE date = '{execution_date}'
"""
execute_sql(insert_query)让任务具备幂等性以支持安全重试:
python
@task
def idempotent_load(**context):
execution_date = context['ds']
# 先删除该日期的现有数据
delete_query = f"""
DELETE FROM target_table
WHERE date = '{execution_date}'
"""
execute_sql(delete_query)
# 插入新数据
insert_query = f"""
INSERT INTO target_table
SELECT * FROM source
WHERE date = '{execution_date}'
"""
execute_sql(insert_query)SLAs and Alerts
SLA与告警
python
from datetime import timedelta
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
print(f"SLA missed for {task_list}")
# Send alert to monitoring system
with DAG(
dag_id="sla_dag",
start_date=datetime(2023, 1, 1),
schedule="@daily",
default_args={
'sla': timedelta(hours=2), # Task should complete in 2 hours
},
sla_miss_callback=sla_miss_callback,
):
passpython
from datetime import timedelta
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
print(f"以下任务未满足SLA: {task_list}")
# 发送告警到监控系统
with DAG(
dag_id="sla_dag",
start_date=datetime(2023, 1, 1),
schedule="@daily",
default_args={
'sla': timedelta(hours=2), # 任务应在2小时内完成
},
sla_miss_callback=sla_miss_callback,
):
passTask Callbacks
任务回调
python
def on_failure_callback(context):
print(f"Task {context['task_instance'].task_id} failed")
# Send to Slack, PagerDuty, etc.
def on_success_callback(context):
print(f"Task {context['task_instance'].task_id} succeeded")
def on_retry_callback(context):
print(f"Task {context['task_instance'].task_id} retrying")
task_with_callbacks = BashOperator(
task_id="monitored_task",
bash_command="python my_script.py",
on_failure_callback=on_failure_callback,
on_success_callback=on_success_callback,
on_retry_callback=on_retry_callback,
)python
def on_failure_callback(context):
print(f"任务{context['task_instance'].task_id}执行失败")
# 发送到Slack、PagerDuty等
def on_success_callback(context):
print(f"任务{context['task_instance'].task_id}执行成功")
def on_retry_callback(context):
print(f"任务{context['task_instance'].task_id}正在重试")
task_with_callbacks = BashOperator(
task_id="monitored_task",
bash_command="python my_script.py",
on_failure_callback=on_failure_callback,
on_success_callback=on_success_callback,
on_retry_callback=on_retry_callback,
)Docker Deployment
Docker部署
Docker Compose for Local Development:
yaml
version: '3'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
webserver:
image: apache/airflow:2.7.0
depends_on:
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
ports:
- "8080:8080"
command: webserver
scheduler:
image: apache/airflow:2.7.0
depends_on:
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
command: scheduler用于本地开发的Docker Compose:
yaml
version: '3'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
webserver:
image: apache/airflow:2.7.0
depends_on:
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
ports:
- "8080:8080"
command: webserver
scheduler:
image: apache/airflow:2.7.0
depends_on:
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
command: schedulerKubernetes Executor
Kubernetes执行器
KubernetesExecutor Configuration:
python
undefinedKubernetesExecutor配置:
python
undefinedIn airflow.cfg
在airflow.cfg中
[kubernetes]
namespace = airflow
worker_container_repository = my-registry/airflow
worker_container_tag = 2.7.0
delete_worker_pods = True
delete_worker_pods_on_failure = False
[core]
executor = KubernetesExecutor
**Pod Override for Specific Task:**
```python
from kubernetes.client import models as k8s
task_with_gpu = BashOperator(
task_id="gpu_task",
bash_command="python train_model.py",
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s.V1ResourceRequirements(
limits={"nvidia.com/gpu": "1"}
)
)
]
)
)
}
)[kubernetes]
namespace = airflow
worker_container_repository = my-registry/airflow
worker_container_tag = 2.7.0
delete_worker_pods = True
delete_worker_pods_on_failure = False
[core]
executor = KubernetesExecutor
**特定任务的Pod覆盖配置:**
```python
from kubernetes.client import models as k8s
task_with_gpu = BashOperator(
task_id="gpu_task",
bash_command="python train_model.py",
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s.V1ResourceRequirements(
limits={"nvidia.com/gpu": "1"}
)
)
]
)
)
}
)Monitoring and Logging
监控与日志
Structured Logging:
python
from airflow.decorators import task
import logging
@task
def monitored_task():
logger = logging.getLogger(__name__)
logger.info("Starting data processing", extra={
'process_id': 'abc123',
'record_count': 1000
})
try:
process_data()
logger.info("Processing complete")
except Exception as e:
logger.error(f"Processing failed: {str(e)}", extra={
'error_type': type(e).__name__
})
raiseStatsD Metrics:
python
from airflow.stats import Stats
@task
def task_with_metrics():
Stats.incr('my_dag.task_started')
start_time = time.time()
process_data()
duration = time.time() - start_time
Stats.timing('my_dag.task_duration', duration)
Stats.incr('my_dag.task_completed')结构化日志:
python
from airflow.decorators import task
import logging
@task
def monitored_task():
logger = logging.getLogger(__name__)
logger.info("开始数据处理", extra={
'process_id': 'abc123',
'record_count': 1000
})
try:
process_data()
logger.info("处理完成")
except Exception as e:
logger.error(f"处理失败: {str(e)}", extra={
'error_type': type(e).__name__
})
raiseStatsD指标:
python
from airflow.stats import Stats
@task
def task_with_metrics():
Stats.incr('my_dag.task_started')
start_time = time.time()
process_data()
duration = time.time() - start_time
Stats.timing('my_dag.task_duration', duration)
Stats.incr('my_dag.task_completed')Best Practices
最佳实践
DAG Design
DAG设计
- Keep DAGs Simple: Break complex workflows into multiple DAGs
- Use Descriptive Names: dag_id and task_id should be self-explanatory
- Idempotent Tasks: Tasks should produce same result when re-run
- Small XComs: Keep XCom data under 1MB
- External Storage: Use S3/GCS for large data, pass references
- Proper Dependencies: Model true dependencies, avoid unnecessary ones
- Error Handling: Use retries, callbacks, and proper error logging
- Resource Management: Set appropriate task concurrency limits
- 保持DAG简洁:将复杂工作流拆分为多个DAG
- 使用描述性名称:dag_id和task_id应具备自解释性
- 幂等任务:任务重新运行时应产生相同结果
- 小型XCom:保持XCom数据小于1MB
- 外部存储:使用S3/GCS存储大数据,传递引用
- 合理的依赖:建模真实依赖,避免不必要的依赖
- 错误处理:使用重试、回调和适当的错误日志
- 资源管理:设置合适的任务并发限制
Code Organization
代码组织
dags/
├── common/
│ ├── __init__.py
│ ├── operators.py # Custom operators
│ ├── sensors.py # Custom sensors
│ └── utils.py # Utility functions
├── etl/
│ ├── customer_pipeline.py
│ ├── order_pipeline.py
│ └── product_pipeline.py
├── ml/
│ ├── training_dag.py
│ └── inference_dag.py
└── maintenance/
├── cleanup_dag.py
└── backup_dag.pydags/
├── common/
│ ├── __init__.py
│ ├── operators.py # 自定义操作符
│ ├── sensors.py # 自定义传感器
│ └── utils.py # 工具函数
├── etl/
│ ├── customer_pipeline.py
│ ├── order_pipeline.py
│ └── product_pipeline.py
├── ml/
│ ├── training_dag.py
│ └── inference_dag.py
└── maintenance/
├── cleanup_dag.py
└── backup_dag.pyTesting DAGs
测试DAG
Unit Testing:
python
import pytest
from airflow.models import DagBag
def test_dag_loaded():
dagbag = DagBag(dag_folder='dags/', include_examples=False)
assert len(dagbag.import_errors) == 0
def test_task_count():
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
assert len(dag.tasks) == 5
def test_task_dependencies():
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
extract = dag.get_task('extract')
transform = dag.get_task('transform')
assert transform in extract.downstream_listIntegration Testing:
python
from airflow.models import DagBag
from airflow.utils.state import State
def test_dag_runs():
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
# Test DAG run
dag_run = dag.create_dagrun(
state=State.RUNNING,
execution_date=datetime(2023, 1, 1),
run_type='manual'
)
# Run specific task
task_instance = dag_run.get_task_instance('extract')
task_instance.run()
assert task_instance.state == State.SUCCESS单元测试:
python
import pytest
from airflow.models import DagBag
def test_dag_loaded():
dagbag = DagBag(dag_folder='dags/', include_examples=False)
assert len(dagbag.import_errors) == 0
def test_task_count():
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
assert len(dag.tasks) == 5
def test_task_dependencies():
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
extract = dag.get_task('extract')
transform = dag.get_task('transform')
assert transform in extract.downstream_list集成测试:
python
from airflow.models import DagBag
from airflow.utils.state import State
def test_dag_runs():
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
# 测试DAG运行
dag_run = dag.create_dagrun(
state=State.RUNNING,
execution_date=datetime(2023, 1, 1),
run_type='manual'
)
# 运行特定任务
task_instance = dag_run.get_task_instance('extract')
task_instance.run()
assert task_instance.state == State.SUCCESSPerformance Optimization
性能优化
- Use Deferrable Operators: For sensors and long-running waits
- Dynamic Task Mapping: For parallel processing
- Appropriate Executor: Choose based on scale (Local, Celery, Kubernetes)
- Connection Pooling: Reuse database connections
- Task Parallelism: Set max_active_runs and concurrency appropriately
- Lazy Loading: Don't execute heavy logic at DAG parse time
- External Storage: Keep metadata database light
- 使用可延迟操作符:用于传感器和长时间等待场景
- 动态任务映射:用于并行处理
- 合适的执行器:根据规模选择(Local、Celery、Kubernetes)
- 连接池:复用数据库连接
- 任务并行性:合理设置max_active_runs和concurrency
- 延迟加载:不在DAG解析时执行重逻辑
- 外部存储:减轻元数据数据库负载
Security
安全
- Secrets Management: Use Airflow Secrets Backend (not hardcoded)
- Connection Encryption: Use encrypted connections for databases
- RBAC: Enable role-based access control
- Audit Logging: Enable audit logs for compliance
- Network Isolation: Restrict worker network access
- Credential Rotation: Regularly rotate credentials
- 密钥管理:使用Airflow密钥后端(不要硬编码)
- 连接加密:对数据库使用加密连接
- RBAC:启用基于角色的访问控制
- 审计日志:启用审计日志以满足合规要求
- 网络隔离:限制工作者的网络访问
- 凭证轮换:定期轮换凭证
Configuration Management
配置管理
python
undefinedpython
undefinedUse Variables for configuration
使用Variables存储配置
from airflow.models import Variable
config = Variable.get("my_config", deserialize_json=True)
api_key = Variable.get("api_key")
from airflow.models import Variable
config = Variable.get("my_config", deserialize_json=True)
api_key = Variable.get("api_key")
Use Connections for external services
使用Connections配置外部服务
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection('my_postgres')
db_url = f"postgresql://{conn.login}:{conn.password}@{conn.host}:{conn.port}/{conn.schema}"
undefinedfrom airflow.hooks.base import BaseHook
conn = BaseHook.get_connection('my_postgres')
db_url = f"postgresql://{conn.login}:{conn.password}@{conn.host}:{conn.port}/{conn.schema}"
undefinedCommon Patterns and Examples
常见模式与示例
See EXAMPLES.md for 18+ detailed real-world examples including:
- ETL pipelines
- Machine learning workflows
- Data quality checks
- Multi-cloud orchestration
- Event-driven architectures
- Complex branching logic
- Dynamic task generation
- Asset-based scheduling
- Sensor patterns
- Error handling strategies
查看EXAMPLES.md获取18+个详细的真实世界示例,包括:
- ETL管道
- 机器学习工作流
- 数据质量检查
- 多云编排
- 事件驱动架构
- 复杂分支逻辑
- 动态任务生成
- 基于资产的调度
- 传感器模式
- 错误处理策略
Troubleshooting
故障排查
DAG Not Appearing in UI
DAG未出现在UI中
- Check for Python syntax errors in DAG file
- Verify DAG file is in correct directory
- Check dag_id is unique
- Ensure schedule is not None if you expect it to run
- Check scheduler logs for import errors
- 检查DAG文件中的Python语法错误
- 验证DAG文件位于正确目录
- 确保dag_id唯一
- 如果期望自动运行,确保schedule不为None
- 检查调度器日志中的导入错误
Tasks Not Running
任务未运行
- Check task dependencies are correct
- Verify upstream tasks succeeded
- Check task concurrency limits
- Ensure executor has available slots
- Review task logs for errors
- 检查任务依赖是否正确
- 验证上游任务执行成功
- 检查任务并发限制
- 确保执行器有可用插槽
- 查看任务日志中的错误
Performance Issues
性能问题
- Reduce DAG complexity (break into multiple DAGs)
- Optimize SQL queries in tasks
- Use appropriate executor for scale
- Enable task parallelism
- Check for slow sensors (use deferrable mode)
- Monitor metadata database performance
- 降低DAG复杂度(拆分为多个DAG)
- 优化任务中的SQL查询
- 根据规模选择合适的执行器
- 启用任务并行性
- 检查慢传感器(使用可延迟模式)
- 监控元数据数据库性能
Common Errors
常见错误
Import Errors:
python
undefined导入错误:
python
undefinedBad - imports at DAG level slow parsing
错误 - 在DAG级别导入会减慢解析速度
from heavy_library import process
with DAG(...):
pass
from heavy_library import process
with DAG(...):
pass
Good - imports inside tasks
正确 - 在任务内部导入
with DAG(...):
@task
def my_task():
from heavy_library import process
process()
**Circular Dependencies:**
```pythonwith DAG(...):
@task
def my_task():
from heavy_library import process
process()
**循环依赖:**
```pythonThis will fail
此写法会失败
task1 >> task2 >> task3 >> task1 # Circular!
task1 >> task2 >> task3 >> task1 # 循环依赖!
Must be acyclic
必须是无环的
task1 >> task2 >> task3
**Large XComs:**
```pythontask1 >> task2 >> task3
**大型XCom:**
```pythonBad - storing large data in XCom
错误 - 在XCom中存储大数据
@task
def process():
large_df = pd.read_csv('big_file.csv')
return large_df # Too large!
@task
def process():
large_df = pd.read_csv('big_file.csv')
return large_df # 数据过大!
Good - store reference
正确 - 存储引用
@task
def process():
large_df = pd.read_csv('big_file.csv')
path = save_to_s3(large_df)
return path # Just the path
undefined@task
def process():
large_df = pd.read_csv('big_file.csv')
path = save_to_s3(large_df)
return path # 仅返回路径
undefinedResources
资源
- Official Documentation: https://airflow.apache.org/docs/
- Airflow GitHub: https://github.com/apache/airflow
- Astronomer Guides: https://docs.astronomer.io/learn
- Community Slack: https://apache-airflow.slack.com
- Stack Overflow: Tag
apache-airflow - Awesome Airflow: https://github.com/jghoman/awesome-apache-airflow
Skill Version: 1.0.0
Last Updated: January 2025
Apache Airflow Version: 2.7+
Skill Category: Data Engineering, Workflow Orchestration, Pipeline Management
- 官方文档:https://airflow.apache.org/docs/
- Airflow GitHub:https://github.com/apache/airflow
- Astronomer指南:https://docs.astronomer.io/learn
- 社区Slack:https://apache-airflow.slack.com
- Stack Overflow:标签
apache-airflow - Awesome Airflow:https://github.com/jghoman/awesome-apache-airflow
指南版本:1.0.0
最后更新:2025年1月
Apache Airflow版本:2.7+
指南分类:数据工程、工作流编排、管道管理