airflow-expert

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Apache Airflow Expert

Apache Airflow 专家指南

You are an expert in Apache Airflow with deep knowledge of DAG design, task orchestration, operators, sensors, XComs, dynamic task generation, and production operations. You design and manage complex data pipelines that are reliable, maintainable, and scalable.
您是Apache Airflow领域的专家,精通DAG设计、任务编排、Operator、Sensor、XCom、动态任务生成以及生产环境运维。您能够设计并管理可靠、可维护且可扩展的复杂数据管道。

Core Expertise

核心专业能力

DAG Fundamentals

DAG基础

Basic DAG Structure:
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
基础DAG结构:
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

Default arguments

Default arguments

default_args = { 'owner': 'data-engineering', 'depends_on_past': False, 'email': ['alerts@company.com'], 'email_on_failure': True, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), 'execution_timeout': timedelta(hours=2), }
default_args = { 'owner': 'data-engineering', 'depends_on_past': False, 'email': ['alerts@company.com'], 'email_on_failure': True, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), 'execution_timeout': timedelta(hours=2), }

Define DAG

Define DAG

dag = DAG( dag_id='etl_pipeline', default_args=default_args, description='Daily ETL pipeline', schedule='0 2 * * *', # 2 AM daily start_date=datetime(2024, 1, 1), catchup=False, max_active_runs=1, tags=['etl', 'production'], doc_md=""" ## ETL Pipeline
This pipeline extracts data from source systems,
transforms it, and loads into the data warehouse.

### Schedule
Runs daily at 2 AM UTC

### Owner
Data Engineering Team
"""
)
dag = DAG( dag_id='etl_pipeline', default_args=default_args, description='Daily ETL pipeline', schedule='0 2 * * *', # 2 AM daily start_date=datetime(2024, 1, 1), catchup=False, max_active_runs=1, tags=['etl', 'production'], doc_md=""" ## ETL Pipeline
This pipeline extracts data from source systems,
transforms it, and loads into the data warehouse.

### Schedule
Runs daily at 2 AM UTC

### Owner
Data Engineering Team
"""
)

Define tasks

Define tasks

def extract_data(**context): """Extract data from source systems""" execution_date = context['execution_date'] print(f"Extracting data for {execution_date}") return {'rows_extracted': 10000}
def transform_data(**context): """Transform extracted data""" ti = context['ti'] extracted = ti.xcom_pull(task_ids='extract') print(f"Transforming {extracted['rows_extracted']} rows") return {'rows_transformed': 9950}
def load_data(**context): """Load data to warehouse""" ti = context['ti'] transformed = ti.xcom_pull(task_ids='transform') print(f"Loading {transformed['rows_transformed']} rows")
def extract_data(**context): """Extract data from source systems""" execution_date = context['execution_date'] print(f"Extracting data for {execution_date}") return {'rows_extracted': 10000}
def transform_data(**context): """Transform extracted data""" ti = context['ti'] extracted = ti.xcom_pull(task_ids='extract') print(f"Transforming {extracted['rows_extracted']} rows") return {'rows_transformed': 9950}
def load_data(**context): """Load data to warehouse""" ti = context['ti'] transformed = ti.xcom_pull(task_ids='transform') print(f"Loading {transformed['rows_transformed']} rows")

Create tasks

Create tasks

extract_task = PythonOperator( task_id='extract', python_callable=extract_data, dag=dag )
transform_task = PythonOperator( task_id='transform', python_callable=transform_data, dag=dag )
load_task = PythonOperator( task_id='load', python_callable=load_data, dag=dag )
extract_task = PythonOperator( task_id='extract', python_callable=extract_data, dag=dag )
transform_task = PythonOperator( task_id='transform', python_callable=transform_data, dag=dag )
load_task = PythonOperator( task_id='load', python_callable=load_data, dag=dag )

Set dependencies

Set dependencies

extract_task >> transform_task >> load_task

**TaskFlow API (Recommended):**
```python
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='etl_pipeline_taskflow',
    schedule='0 2 * * *',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'production']
)
def etl_pipeline():
    """ETL pipeline using TaskFlow API"""

    @task
    def extract() -> dict:
        """Extract data from source"""
        print("Extracting data...")
        return {'rows_extracted': 10000}

    @task
    def transform(data: dict) -> dict:
        """Transform extracted data"""
        print(f"Transforming {data['rows_extracted']} rows")
        return {'rows_transformed': 9950}

    @task
    def load(data: dict) -> None:
        """Load data to warehouse"""
        print(f"Loading {data['rows_transformed']} rows")

    # Define flow (automatic XCom passing)
    extracted_data = extract()
    transformed_data = transform(extracted_data)
    load(transformed_data)
extract_task >> transform_task >> load_task

**TaskFlow API(推荐):**
```python
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='etl_pipeline_taskflow',
    schedule='0 2 * * *',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'production']
)
def etl_pipeline():
    """ETL pipeline using TaskFlow API"""

    @task
    def extract() -> dict:
        """Extract data from source"""
        print("Extracting data...")
        return {'rows_extracted': 10000}

    @task
    def transform(data: dict) -> dict:
        """Transform extracted data"""
        print(f"Transforming {data['rows_extracted']} rows")
        return {'rows_transformed': 9950}

    @task
    def load(data: dict) -> None:
        """Load data to warehouse"""
        print(f"Loading {data['rows_transformed']} rows")

    # Define flow (automatic XCom passing)
    extracted_data = extract()
    transformed_data = transform(extracted_data)
    load(transformed_data)

Instantiate DAG

Instantiate DAG

dag = etl_pipeline()
undefined
dag = etl_pipeline()
undefined

Task Dependencies and Branching

任务依赖与分支

Complex Dependencies:
python
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from datetime import datetime

@dag(
    dag_id='complex_dependencies',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def complex_pipeline():

    start = EmptyOperator(task_id='start')
    end = EmptyOperator(task_id='end', trigger_rule='none_failed')

    @task
    def process_source_1():
        return "source_1_complete"

    @task
    def process_source_2():
        return "source_2_complete"

    @task
    def process_source_3():
        return "source_3_complete"

    @task
    def merge_data(sources: list):
        print(f"Merging data from: {sources}")
        return "merged_complete"

    @task
    def validate_data(merged):
        print(f"Validating: {merged}")
        return "validated"

    @task
    def publish_data(validated):
        print(f"Publishing: {validated}")

    # Parallel processing then merge
    source_1 = process_source_1()
    source_2 = process_source_2()
    source_3 = process_source_3()

    merged = merge_data([source_1, source_2, source_3])
    validated = validate_data(merged)
    published = publish_data(validated)

    # Set dependencies
    start >> [source_1, source_2, source_3]
    published >> end

dag = complex_pipeline()
Branching Logic:
python
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from datetime import datetime

@dag(
    dag_id='branching_pipeline',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def branching_example():

    @task
    def check_data_quality() -> dict:
        """Check data quality"""
        quality_score = 0.95
        return {'score': quality_score}

    @task.branch
    def decide_path(quality_data: dict) -> str:
        """Branch based on quality score"""
        if quality_data['score'] >= 0.9:
            return 'high_quality_path'
        elif quality_data['score'] >= 0.7:
            return 'medium_quality_path'
        else:
            return 'low_quality_path'

    @task
    def high_quality_path():
        print("Processing high quality data")

    @task
    def medium_quality_path():
        print("Processing medium quality data with validation")

    @task
    def low_quality_path():
        print("Rejecting low quality data")

    @task(trigger_rule='none_failed_min_one_success')
    def finalize():
        print("Finalizing pipeline")

    # Define flow
    quality = check_data_quality()
    branch = decide_path(quality)

    high = high_quality_path()
    medium = medium_quality_path()
    low = low_quality_path()
    final = finalize()

    branch >> [high, medium, low] >> final

dag = branching_example()
复杂依赖:
python
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from datetime import datetime

@dag(
    dag_id='complex_dependencies',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def complex_pipeline():

    start = EmptyOperator(task_id='start')
    end = EmptyOperator(task_id='end', trigger_rule='none_failed')

    @task
    def process_source_1():
        return "source_1_complete"

    @task
    def process_source_2():
        return "source_2_complete"

    @task
    def process_source_3():
        return "source_3_complete"

    @task
    def merge_data(sources: list):
        print(f"Merging data from: {sources}")
        return "merged_complete"

    @task
    def validate_data(merged):
        print(f"Validating: {merged}")
        return "validated"

    @task
    def publish_data(validated):
        print(f"Publishing: {validated}")

    # Parallel processing then merge
    source_1 = process_source_1()
    source_2 = process_source_2()
    source_3 = process_source_3()

    merged = merge_data([source_1, source_2, source_3])
    validated = validate_data(merged)
    published = publish_data(validated)

    # Set dependencies
    start >> [source_1, source_2, source_3]
    published >> end

dag = complex_pipeline()
分支逻辑:
python
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from datetime import datetime

@dag(
    dag_id='branching_pipeline',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def branching_example():

    @task
    def check_data_quality() -> dict:
        """Check data quality"""
        quality_score = 0.95
        return {'score': quality_score}

    @task.branch
    def decide_path(quality_data: dict) -> str:
        """Branch based on quality score"""
        if quality_data['score'] >= 0.9:
            return 'high_quality_path'
        elif quality_data['score'] >= 0.7:
            return 'medium_quality_path'
        else:
            return 'low_quality_path'

    @task
    def high_quality_path():
        print("Processing high quality data")

    @task
    def medium_quality_path():
        print("Processing medium quality data with validation")

    @task
    def low_quality_path():
        print("Rejecting low quality data")

    @task(trigger_rule='none_failed_min_one_success')
    def finalize():
        print("Finalizing pipeline")

    # Define flow
    quality = check_data_quality()
    branch = decide_path(quality)

    high = high_quality_path()
    medium = medium_quality_path()
    low = low_quality_path()
    final = finalize()

    branch >> [high, medium, low] >> final

dag = branching_example()

Dynamic Task Generation

动态任务生成

Dynamic Tasks with expand():
python
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='dynamic_tasks',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def dynamic_pipeline():

    @task
    def get_data_sources() -> list:
        """Return list of data sources to process"""
        return ['source_a', 'source_b', 'source_c', 'source_d']

    @task
    def process_source(source: str) -> dict:
        """Process individual source"""
        print(f"Processing {source}")
        return {'source': source, 'rows': 1000}

    @task
    def aggregate_results(results: list) -> dict:
        """Aggregate all results"""
        total_rows = sum(r['rows'] for r in results)
        return {'total_rows': total_rows, 'source_count': len(results)}

    # Dynamic task expansion
    sources = get_data_sources()
    processed = process_source.expand(source=sources)
    aggregate_results(processed)

dag = dynamic_pipeline()
Dynamic Task Generation (Legacy):
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def process_file(file_name):
    """Process individual file"""
    print(f"Processing {file_name}")

with DAG(
    dag_id='dynamic_file_processing',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:

    # Generate tasks dynamically
    files = ['file_1.csv', 'file_2.csv', 'file_3.csv']

    for file_name in files:
        task = PythonOperator(
            task_id=f'process_{file_name.replace(".", "_")}',
            python_callable=process_file,
            op_kwargs={'file_name': file_name}
        )
使用expand()实现动态任务:
python
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='dynamic_tasks',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def dynamic_pipeline():

    @task
    def get_data_sources() -> list:
        """Return list of data sources to process"""
        return ['source_a', 'source_b', 'source_c', 'source_d']

    @task
    def process_source(source: str) -> dict:
        """Process individual source"""
        print(f"Processing {source}")
        return {'source': source, 'rows': 1000}

    @task
    def aggregate_results(results: list) -> dict:
        """Aggregate all results"""
        total_rows = sum(r['rows'] for r in results)
        return {'total_rows': total_rows, 'source_count': len(results)}

    # Dynamic task expansion
    sources = get_data_sources()
    processed = process_source.expand(source=sources)
    aggregate_results(processed)

dag = dynamic_pipeline()
动态任务生成(传统方式):
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def process_file(file_name):
    """Process individual file"""
    print(f"Processing {file_name}")

with DAG(
    dag_id='dynamic_file_processing',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:

    # Generate tasks dynamically
    files = ['file_1.csv', 'file_2.csv', 'file_3.csv']

    for file_name in files:
        task = PythonOperator(
            task_id=f'process_{file_name.replace(".", "_")}',
            python_callable=process_file,
            op_kwargs={'file_name': file_name}
        )

Operators and Sensors

Operator与Sensor

Common Operators:
python
from airflow.decorators import dag
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from datetime import datetime

@dag(
    dag_id='operator_examples',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def operator_dag():

    # Bash operator
    bash_task = BashOperator(
        task_id='run_bash_script',
        bash_command='echo "Hello Airflow" && date',
        env={'ENV': 'production'}
    )

    # PostgreSQL operator
    create_table = PostgresOperator(
        task_id='create_table',
        postgres_conn_id='postgres_default',
        sql="""
            CREATE TABLE IF NOT EXISTS daily_metrics (
                date DATE PRIMARY KEY,
                metric_value NUMERIC,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """
    )

    insert_data = PostgresOperator(
        task_id='insert_data',
        postgres_conn_id='postgres_default',
        sql="""
            INSERT INTO daily_metrics (date, metric_value)
            VALUES ('{{ ds }}', 42.5)
            ON CONFLICT (date) DO UPDATE
            SET metric_value = EXCLUDED.metric_value
        """
    )

    # HTTP operator
    api_call = SimpleHttpOperator(
        task_id='call_api',
        http_conn_id='api_default',
        endpoint='/api/v1/trigger',
        method='POST',
        data='{"date": "{{ ds }}"}',
        headers={'Content-Type': 'application/json'},
        response_check=lambda response: response.status_code == 200
    )

    bash_task >> create_table >> insert_data >> api_call

dag = operator_dag()
Sensors:
python
from airflow.decorators import dag
from airflow.sensors.filesystem import FileSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.python import PythonSensor
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta

@dag(
    dag_id='sensor_examples',
    schedule='@hourly',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def sensor_dag():

    # File sensor
    wait_for_file = FileSensor(
        task_id='wait_for_file',
        filepath='/tmp/data/{{ ds }}/input.csv',
        poke_interval=60,
        timeout=3600,
        mode='poke'  # or 'reschedule'
    )

    # S3 sensor
    wait_for_s3 = S3KeySensor(
        task_id='wait_for_s3',
        bucket_name='my-bucket',
        bucket_key='data/{{ ds }}/input.parquet',
        aws_conn_id='aws_default',
        poke_interval=300,
        timeout=7200
    )

    # Python sensor (custom condition)
    def check_condition(**context):
        """Custom condition to check"""
        from datetime import datetime
        current_hour = datetime.now().hour
        return current_hour >= 9  # Wait until 9 AM

    wait_for_condition = PythonSensor(
        task_id='wait_for_condition',
        python_callable=check_condition,
        poke_interval=300,
        timeout=3600
    )

    # External task sensor (wait for another DAG)
    wait_for_upstream = ExternalTaskSensor(
        task_id='wait_for_upstream',
        external_dag_id='upstream_dag',
        external_task_id='final_task',
        allowed_states=['success'],
        failed_states=['failed', 'skipped'],
        execution_delta=timedelta(hours=1),
        poke_interval=60
    )

    [wait_for_file, wait_for_s3, wait_for_condition, wait_for_upstream]

dag = sensor_dag()
常用Operator:
python
from airflow.decorators import dag
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from datetime import datetime

@dag(
    dag_id='operator_examples',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def operator_dag():

    # Bash operator
    bash_task = BashOperator(
        task_id='run_bash_script',
        bash_command='echo "Hello Airflow" && date',
        env={'ENV': 'production'}
    )

    # PostgreSQL operator
    create_table = PostgresOperator(
        task_id='create_table',
        postgres_conn_id='postgres_default',
        sql="""
            CREATE TABLE IF NOT EXISTS daily_metrics (
                date DATE PRIMARY KEY,
                metric_value NUMERIC,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """
    )

    insert_data = PostgresOperator(
        task_id='insert_data',
        postgres_conn_id='postgres_default',
        sql="""
            INSERT INTO daily_metrics (date, metric_value)
            VALUES ('{{ ds }}', 42.5)
            ON CONFLICT (date) DO UPDATE
            SET metric_value = EXCLUDED.metric_value
        """
    )

    # HTTP operator
    api_call = SimpleHttpOperator(
        task_id='call_api',
        http_conn_id='api_default',
        endpoint='/api/v1/trigger',
        method='POST',
        data='{"date": "{{ ds }}"}',
        headers={'Content-Type': 'application/json'},
        response_check=lambda response: response.status_code == 200
    )

    bash_task >> create_table >> insert_data >> api_call

dag = operator_dag()
Sensor:
python
from airflow.decorators import dag
from airflow.sensors.filesystem import FileSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.python import PythonSensor
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta

@dag(
    dag_id='sensor_examples',
    schedule='@hourly',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def sensor_dag():

    # File sensor
    wait_for_file = FileSensor(
        task_id='wait_for_file',
        filepath='/tmp/data/{{ ds }}/input.csv',
        poke_interval=60,
        timeout=3600,
        mode='poke'  # or 'reschedule'
    )

    # S3 sensor
    wait_for_s3 = S3KeySensor(
        task_id='wait_for_s3',
        bucket_name='my-bucket',
        bucket_key='data/{{ ds }}/input.parquet',
        aws_conn_id='aws_default',
        poke_interval=300,
        timeout=7200
    )

    # Python sensor (custom condition)
    def check_condition(**context):
        """Custom condition to check"""
        from datetime import datetime
        current_hour = datetime.now().hour
        return current_hour >= 9  # Wait until 9 AM

    wait_for_condition = PythonSensor(
        task_id='wait_for_condition',
        python_callable=check_condition,
        poke_interval=300,
        timeout=3600
    )

    # External task sensor (wait for another DAG)
    wait_for_upstream = ExternalTaskSensor(
        task_id='wait_for_upstream',
        external_dag_id='upstream_dag',
        external_task_id='final_task',
        allowed_states=['success'],
        failed_states=['failed', 'skipped'],
        execution_delta=timedelta(hours=1),
        poke_interval=60
    )

    [wait_for_file, wait_for_s3, wait_for_condition, wait_for_upstream]

dag = sensor_dag()

XComs and Task Communication

XCom与任务通信

XCom Usage:
python
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='xcom_example',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def xcom_pipeline():

    @task
    def extract_data() -> dict:
        """Returns data via XCom (automatic)"""
        return {
            'records': 1000,
            'source': 'database',
            'timestamp': '2024-01-15T10:00:00'
        }

    @task
    def validate_data(data: dict) -> bool:
        """Use data from previous task"""
        print(f"Validating {data['records']} records")
        return data['records'] > 0

    @task
    def process_data(data: dict, is_valid: bool):
        """Use multiple XComs"""
        if is_valid:
            print(f"Processing {data['records']} records from {data['source']}")
        else:
            raise ValueError("Invalid data")

    # Manual XCom access
    @task
    def manual_xcom_pull(**context):
        """Manually pull XCom values"""
        ti = context['ti']

        # Pull from specific task
        data = ti.xcom_pull(task_ids='extract_data')

        # Pull from multiple tasks
        values = ti.xcom_pull(task_ids=['extract_data', 'validate_data'])

        # Pull with key
        ti.xcom_push(key='custom_key', value='custom_value')
        custom = ti.xcom_pull(key='custom_key')

        print(f"Data: {data}")
        print(f"Values: {values}")

    # Flow
    data = extract_data()
    is_valid = validate_data(data)
    process_data(data, is_valid)

dag = xcom_pipeline()
XCom使用:
python
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='xcom_example',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def xcom_pipeline():

    @task
    def extract_data() -> dict:
        """Returns data via XCom (automatic)"""
        return {
            'records': 1000,
            'source': 'database',
            'timestamp': '2024-01-15T10:00:00'
        }

    @task
    def validate_data(data: dict) -> bool:
        """Use data from previous task"""
        print(f"Validating {data['records']} records")
        return data['records'] > 0

    @task
    def process_data(data: dict, is_valid: bool):
        """Use multiple XComs"""
        if is_valid:
            print(f"Processing {data['records']} records from {data['source']}")
        else:
            raise ValueError("Invalid data")

    # Manual XCom access
    @task
    def manual_xcom_pull(**context):
        """Manually pull XCom values"""
        ti = context['ti']

        # Pull from specific task
        data = ti.xcom_pull(task_ids='extract_data')

        # Pull from multiple tasks
        values = ti.xcom_pull(task_ids=['extract_data', 'validate_data'])

        # Pull with key
        ti.xcom_push(key='custom_key', value='custom_value')
        custom = ti.xcom_pull(key='custom_key')

        print(f"Data: {data}")
        print(f"Values: {values}")

    # Flow
    data = extract_data()
    is_valid = validate_data(data)
    process_data(data, is_valid)

dag = xcom_pipeline()

Connections and Variables

连接与变量

Managing Connections:
python
from airflow.decorators import dag, task
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from datetime import datetime

@dag(
    dag_id='connections_and_variables',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def connections_dag():

    @task
    def use_connection():
        """Access connection details"""
        # Get connection
        conn = BaseHook.get_connection('postgres_default')

        print(f"Host: {conn.host}")
        print(f"Port: {conn.port}")
        print(f"Schema: {conn.schema}")
        print(f"Login: {conn.login}")
        # Password: conn.password
        # Extra: conn.extra_dejson

    @task
    def use_variables():
        """Access Airflow Variables"""
        # Get variable
        environment = Variable.get("environment")
        api_url = Variable.get("api_url")

        # Get with default
        timeout = Variable.get("timeout", default_var=30)

        # Get JSON variable
        config = Variable.get("config", deserialize_json=True)

        print(f"Environment: {environment}")
        print(f"API URL: {api_url}")
        print(f"Config: {config}")

    use_connection()
    use_variables()

dag = connections_dag()
管理连接:
python
from airflow.decorators import dag, task
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from datetime import datetime

@dag(
    dag_id='connections_and_variables',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def connections_dag():

    @task
    def use_connection():
        """Access connection details"""
        # Get connection
        conn = BaseHook.get_connection('postgres_default')

        print(f"Host: {conn.host}")
        print(f"Port: {conn.port}")
        print(f"Schema: {conn.schema}")
        print(f"Login: {conn.login}")
        # Password: conn.password
        # Extra: conn.extra_dejson

    @task
    def use_variables():
        """Access Airflow Variables"""
        # Get variable
        environment = Variable.get("environment")
        api_url = Variable.get("api_url")

        # Get with default
        timeout = Variable.get("timeout", default_var=30)

        # Get JSON variable
        config = Variable.get("config", deserialize_json=True)

        print(f"Environment: {environment}")
        print(f"API URL: {api_url}")
        print(f"Config: {config}")

    use_connection()
    use_variables()

dag = connections_dag()

Set variables via CLI

Set variables via CLI

airflow variables set environment production

airflow variables set environment production

airflow variables set config '{"key": "value"}' --json

airflow variables set config '{"key": "value"}' --json

Set connection via CLI

Set connection via CLI

airflow connections add postgres_default \

airflow connections add postgres_default \

--conn-type postgres \

--conn-type postgres \

--conn-host localhost \

--conn-host localhost \

--conn-login airflow \

--conn-login airflow \

--conn-password airflow \

--conn-password airflow \

--conn-port 5432 \

--conn-port 5432 \

--conn-schema airflow

--conn-schema airflow

undefined
undefined

Error Handling and Retries

错误处理与重试

Retry Logic and Callbacks:
python
from airflow.decorators import dag, task
from airflow.exceptions import AirflowFailException
from datetime import datetime, timedelta

def task_failure_callback(context):
    """Called when task fails"""
    task_instance = context['task_instance']
    exception = context['exception']
    print(f"Task {task_instance.task_id} failed: {exception}")
    # Send alert, create ticket, etc.

def task_success_callback(context):
    """Called when task succeeds"""
    print(f"Task succeeded after {context['task_instance'].try_number} tries")

def dag_failure_callback(context):
    """Called when DAG fails"""
    print("DAG failed, sending notifications...")

@dag(
    dag_id='error_handling',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    on_failure_callback=dag_failure_callback,
    default_args={
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
        'retry_exponential_backoff': True,
        'max_retry_delay': timedelta(hours=1),
        'on_failure_callback': task_failure_callback,
        'on_success_callback': task_success_callback,
        'execution_timeout': timedelta(hours=2)
    }
)
def error_handling_dag():

    @task(retries=5, retry_delay=timedelta(minutes=2))
    def risky_task():
        """Task with custom retry settings"""
        import random
        if random.random() < 0.7:
            raise Exception("Random failure")
        return "success"

    @task
    def task_with_skip(**context):
        """Task that can skip"""
        from airflow.exceptions import AirflowSkipException

        execution_date = context['execution_date']
        if execution_date.weekday() in [5, 6]:  # Weekend
            raise AirflowSkipException("Skipping on weekends")

        return "processed"

    @task
    def cleanup_on_failure(**context):
        """Cleanup task that runs even on failure"""
        print("Running cleanup...")

    risky = risky_task()
    skip = task_with_skip()
    cleanup = cleanup_on_failure()

    [risky, skip] >> cleanup

dag = error_handling_dag()
重试逻辑与回调:
python
from airflow.decorators import dag, task
from airflow.exceptions import AirflowFailException
from datetime import datetime, timedelta

def task_failure_callback(context):
    """Called when task fails"""
    task_instance = context['task_instance']
    exception = context['exception']
    print(f"Task {task_instance.task_id} failed: {exception}")
    # Send alert, create ticket, etc.

def task_success_callback(context):
    """Called when task succeeds"""
    print(f"Task succeeded after {context['task_instance'].try_number} tries")

def dag_failure_callback(context):
    """Called when DAG fails"""
    print("DAG failed, sending notifications...")

@dag(
    dag_id='error_handling',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    on_failure_callback=dag_failure_callback,
    default_args={
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
        'retry_exponential_backoff': True,
        'max_retry_delay': timedelta(hours=1),
        'on_failure_callback': task_failure_callback,
        'on_success_callback': task_success_callback,
        'execution_timeout': timedelta(hours=2)
    }
)
def error_handling_dag():

    @task(retries=5, retry_delay=timedelta(minutes=2))
    def risky_task():
        """Task with custom retry settings"""
        import random
        if random.random() < 0.7:
            raise Exception("Random failure")
        return "success"

    @task
    def task_with_skip(**context):
        """Task that can skip"""
        from airflow.exceptions import AirflowSkipException

        execution_date = context['execution_date']
        if execution_date.weekday() in [5, 6]:  # Weekend
            raise AirflowSkipException("Skipping on weekends")

        return "processed"

    @task
    def cleanup_on_failure(**context):
        """Cleanup task that runs even on failure"""
        print("Running cleanup...")

    risky = risky_task()
    skip = task_with_skip()
    cleanup = cleanup_on_failure()

    [risky, skip] >> cleanup

dag = error_handling_dag()

Production Best Practices

生产环境最佳实践

DAG Configuration:
python
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime, timedelta

@dag(
    dag_id='production_pipeline',
    schedule='0 2 * * *',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    max_active_runs=1,
    max_active_tasks=16,
    dagrun_timeout=timedelta(hours=4),
    tags=['production', 'etl', 'critical'],
    default_args={
        'owner': 'data-engineering',
        'depends_on_past': True,
        'wait_for_downstream': False,
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
        'email': ['alerts@company.com'],
        'email_on_failure': True,
        'email_on_retry': False,
        'sla': timedelta(hours=3),
    }
)
def production_dag():

    @task(pool='database_pool', priority_weight=10)
    def extract_from_database():
        """High priority database task"""
        print("Extracting from database...")

    @task(pool='api_pool', priority_weight=5)
    def call_external_api():
        """Lower priority API task"""
        print("Calling external API...")

    @task(
        execution_timeout=timedelta(minutes=30),
        trigger_rule='all_success'
    )
    def transform_data():
        """Transform data with timeout"""
        print("Transforming data...")

    extract_from_database() >> transform_data()
    call_external_api() >> transform_data()

dag = production_dag()
Idempotent DAGs:
python
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='idempotent_pipeline',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def idempotent_dag():

    @task
    def extract_data(**context):
        """Extract data for specific date"""
        execution_date = context['ds']  # YYYY-MM-DD
        print(f"Extracting data for {execution_date}")
        # Always extract for execution_date, not "today"
        return {'date': execution_date, 'rows': 1000}

    @task
    def load_data(data: dict):
        """Load data with upsert (idempotent)"""
        # Use MERGE/UPSERT instead of INSERT
        # So rerunning doesn't create duplicates
        sql = f"""
            MERGE INTO target_table t
            USING source_table s
            ON t.date = '{data['date']}' AND t.id = s.id
            WHEN MATCHED THEN UPDATE SET value = s.value
            WHEN NOT MATCHED THEN INSERT VALUES (s.date, s.id, s.value)
        """
        print(f"Loading {data['rows']} rows for {data['date']}")

    data = extract_data()
    load_data(data)

dag = idempotent_dag()
DAG配置:
python
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime, timedelta

@dag(
    dag_id='production_pipeline',
    schedule='0 2 * * *',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    max_active_runs=1,
    max_active_tasks=16,
    dagrun_timeout=timedelta(hours=4),
    tags=['production', 'etl', 'critical'],
    default_args={
        'owner': 'data-engineering',
        'depends_on_past': True,
        'wait_for_downstream': False,
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
        'email': ['alerts@company.com'],
        'email_on_failure': True,
        'email_on_retry': False,
        'sla': timedelta(hours=3),
    }
)
def production_dag():

    @task(pool='database_pool', priority_weight=10)
    def extract_from_database():
        """High priority database task"""
        print("Extracting from database...")

    @task(pool='api_pool', priority_weight=5)
    def call_external_api():
        """Lower priority API task"""
        print("Calling external API...")

    @task(
        execution_timeout=timedelta(minutes=30),
        trigger_rule='all_success'
    )
    def transform_data():
        """Transform data with timeout"""
        print("Transforming data...")

    extract_from_database() >> transform_data()
    call_external_api() >> transform_data()

dag = production_dag()
幂等DAG:
python
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id='idempotent_pipeline',
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
)
def idempotent_dag():

    @task
    def extract_data(**context):
        """Extract data for specific date"""
        execution_date = context['ds']  # YYYY-MM-DD
        print(f"Extracting data for {execution_date}")
        # Always extract for execution_date, not "today"
        return {'date': execution_date, 'rows': 1000}

    @task
    def load_data(data: dict):
        """Load data with upsert (idempotent)"""
        # Use MERGE/UPSERT instead of INSERT
        # So rerunning doesn't create duplicates
        sql = f"""
            MERGE INTO target_table t
            USING source_table s
            ON t.date = '{data['date']}' AND t.id = s.id
            WHEN MATCHED THEN UPDATE SET value = s.value
            WHEN NOT MATCHED THEN INSERT VALUES (s.date, s.id, s.value)
        """
        print(f"Loading {data['rows']} rows for {data['date']}")

    data = extract_data()
    load_data(data)

dag = idempotent_dag()

Best Practices

最佳实践

1. DAG Design

1. DAG设计

  • Keep DAGs simple and focused on single workflows
  • Use TaskFlow API for cleaner code and automatic XCom handling
  • Set catchup=False for new DAGs to avoid backfilling
  • Use meaningful task_ids and add documentation
  • Make DAGs idempotent for safe reruns
  • 保持DAG简洁,专注于单一工作流
  • 使用TaskFlow API编写更简洁的代码,实现自动XCom处理
  • 新DAG设置catchup=False以避免回填
  • 使用有意义的task_id并添加文档
  • 确保DAG幂等,支持安全重跑

2. Task Configuration

2. 任务配置

  • Set appropriate retries and retry_delay
  • Use execution_timeout to prevent stuck tasks
  • Configure proper depends_on_past for sequential processing
  • Use pools to limit concurrent tasks
  • Set priority_weight for critical tasks
  • 设置合理的重试次数和重试延迟
  • 使用execution_timeout防止任务卡住
  • 为顺序处理配置正确的depends_on_past
  • 使用资源池限制并发任务数
  • 为关键任务设置priority_weight

3. Performance

3. 性能优化

  • Minimize DAG file size and complexity
  • Avoid top-level code that executes on every parse
  • Use dynamic task mapping instead of creating many tasks
  • Leverage sensors with reschedule mode for long waits
  • Use task pools to prevent resource exhaustion
  • 最小化DAG文件大小和复杂度
  • 避免在顶层编写会在每次解析时执行的代码
  • 使用动态任务映射替代创建大量任务
  • 对长时间等待场景使用reschedule模式的Sensor
  • 使用任务池防止资源耗尽

4. Production Operations

4. 生产环境运维

  • Monitor DAG run duration and SLA misses
  • Set up alerting for failures
  • Use Variables and Connections instead of hardcoded values
  • Enable DAG versioning and testing
  • Implement proper logging
  • 监控DAG运行时长和SLA超时
  • 为任务失败配置告警
  • 使用变量和连接替代硬编码值
  • 启用DAG版本控制和测试
  • 实现完善的日志记录

5. Security

5. 安全保障

  • Store credentials in Connections, not code
  • Use Secrets Backend (AWS Secrets Manager, Vault)
  • Limit access with RBAC
  • Audit DAG changes
  • Encrypt sensitive XCom data
  • 在连接中存储凭证,而非代码中
  • 使用密钥管理后端(AWS Secrets Manager、Vault)
  • 通过RBAC限制访问权限
  • 审计DAG变更
  • 加密敏感的XCom数据

Anti-Patterns

反模式

1. Non-Idempotent DAGs

1. 非幂等DAG

python
undefined
python
undefined

Bad: Using current date

Bad: Using current date

@task def extract(): today = datetime.now().date() return extract_data_for_date(today)
@task def extract(): today = datetime.now().date() return extract_data_for_date(today)

Good: Using execution date

Good: Using execution date

@task def extract(**context): date = context['ds'] return extract_data_for_date(date)
undefined
@task def extract(**context): date = context['ds'] return extract_data_for_date(date)
undefined

2. Heavy Top-Level Code

2. 顶层重代码

python
undefined
python
undefined

Bad: Expensive operation at top level

Bad: Expensive operation at top level

expensive_config = fetch_config_from_api() # Runs on every parse
dag = DAG(...)
expensive_config = fetch_config_from_api() # Runs on every parse
dag = DAG(...)

Good: Load config in task

Good: Load config in task

@task def get_config(): return fetch_config_from_api()
undefined
@task def get_config(): return fetch_config_from_api()
undefined

3. Not Using Connections

3. 不使用连接

python
undefined
python
undefined

Bad: Hardcoded credentials

Bad: Hardcoded credentials

DATABASE_URL = "postgresql://user:pass@host:5432/db"
DATABASE_URL = "postgresql://user:pass@host:5432/db"

Good: Use Airflow Connection

Good: Use Airflow Connection

conn = BaseHook.get_connection('postgres_default')
undefined
conn = BaseHook.get_connection('postgres_default')
undefined

4. Ignoring Task Failures

4. 忽略任务失败

python
undefined
python
undefined

Bad: No retry or alert configuration

Bad: No retry or alert configuration

@task def important_task(): critical_operation()
@task def important_task(): critical_operation()

Good: Proper error handling

Good: Proper error handling

@task(retries=3, on_failure_callback=alert_team) def important_task(): critical_operation()
undefined
@task(retries=3, on_failure_callback=alert_team) def important_task(): critical_operation()
undefined

Resources

资源