apache-airflow-orchestration

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Apache Airflow Orchestration

Apache Airflow 编排

Skill by ara.so — Data Skills collection.
Apache Airflow is a platform to programmatically author, schedule, and monitor workflows. It allows you to define workflows as Directed Acyclic Graphs (DAGs) in Python code, making them maintainable, versionable, testable, and collaborative.
ara.so提供的技能 — 数据技能合集。
Apache Airflow是一个以编程方式创作、调度和监控工作流的平台。它允许你用Python代码将工作流定义为有向无环图(DAGs),使其易于维护、版本控制、测试和协作。

Installation

安装

Using pip

使用pip

bash
undefined
bash
undefined

Install Airflow with constraints for your Python version

Install Airflow with constraints for your Python version

AIRFLOW_VERSION=3.2.0 PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)" CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
undefined
AIRFLOW_VERSION=3.2.0 PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)" CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
undefined

Using Docker (Recommended for Development)

使用Docker(开发推荐)

bash
undefined
bash
undefined

Download docker-compose.yaml

Download docker-compose.yaml

Create required directories

Create required directories

mkdir -p ./dags ./logs ./plugins ./config
mkdir -p ./dags ./logs ./plugins ./config

Set the Airflow user

Set the Airflow user

echo -e "AIRFLOW_UID=$(id -u)" > .env
echo -e "AIRFLOW_UID=$(id -u)" > .env

Initialize the database

Initialize the database

docker compose up airflow-init
docker compose up airflow-init

Start Airflow

Start Airflow

docker compose up

Access the web UI at `http://localhost:8080` (default credentials: `airflow`/`airflow`).
docker compose up

访问Web UI地址为`http://localhost:8080`(默认凭据:`airflow`/`airflow`)。

Standalone Quick Start

独立快速启动

bash
undefined
bash
undefined

Initialize database and create admin user

Initialize database and create admin user

airflow db init
airflow db init

Create admin user

Create admin user

airflow users create
--username admin
--firstname Admin
--lastname User
--role Admin
--email admin@example.com
airflow users create
--username admin
--firstname Admin
--lastname User
--role Admin
--email admin@example.com

Start the web server (default port 8080)

Start the web server (default port 8080)

airflow webserver --port 8080
airflow webserver --port 8080

Start the scheduler (in another terminal)

Start the scheduler (in another terminal)

airflow scheduler
undefined
airflow scheduler
undefined

Core Concepts

核心概念

DAG (Directed Acyclic Graph)

DAG(有向无环图)

A DAG defines a workflow with tasks and their dependencies. Tasks must not create cycles.
DAG定义了包含任务及其依赖关系的工作流。任务不能形成循环。

Basic DAG Structure

基础DAG结构

python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

Default arguments applied to all tasks

Default arguments applied to all tasks

default_args = { 'owner': 'data-team', 'depends_on_past': False, 'email': ['alerts@example.com'], 'email_on_failure': True, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), }
default_args = { 'owner': 'data-team', 'depends_on_past': False, 'email': ['alerts@example.com'], 'email_on_failure': True, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), }

Define the DAG

Define the DAG

dag = DAG( 'example_data_pipeline', default_args=default_args, description='A simple data pipeline', schedule='0 0 * * *', # Run daily at midnight (cron expression) start_date=datetime(2024, 1, 1), catchup=False, # Don't run for past dates tags=['example', 'data-engineering'], )
def extract_data(**context): """Extract data from source""" print("Extracting data...") # Your extraction logic here return {'records': 1000}
def transform_data(**context): """Transform extracted data""" # Access data from previous task via XCom ti = context['ti'] extracted = ti.xcom_pull(task_ids='extract') print(f"Transforming {extracted['records']} records...") return {'transformed_records': extracted['records']}
def load_data(**context): """Load data to destination""" ti = context['ti'] transformed = ti.xcom_pull(task_ids='transform') print(f"Loading {transformed['transformed_records']} records...")
dag = DAG( 'example_data_pipeline', default_args=default_args, description='A simple data pipeline', schedule='0 0 * * *', # Run daily at midnight (cron expression) start_date=datetime(2024, 1, 1), catchup=False, # Don't run for past dates tags=['example', 'data-engineering'], )
def extract_data(**context): """Extract data from source""" print("Extracting data...") # Your extraction logic here return {'records': 1000}
def transform_data(**context): """Transform extracted data""" # Access data from previous task via XCom ti = context['ti'] extracted = ti.xcom_pull(task_ids='extract') print(f"Transforming {extracted['records']} records...") return {'transformed_records': extracted['records']}
def load_data(**context): """Load data to destination""" ti = context['ti'] transformed = ti.xcom_pull(task_ids='transform') print(f"Loading {transformed['transformed_records']} records...")

Define tasks

Define tasks

extract = PythonOperator( task_id='extract', python_callable=extract_data, dag=dag, )
transform = PythonOperator( task_id='transform', python_callable=transform_data, dag=dag, )
load = PythonOperator( task_id='load', python_callable=load_data, dag=dag, )
extract = PythonOperator( task_id='extract', python_callable=extract_data, dag=dag, )
transform = PythonOperator( task_id='transform', python_callable=transform_data, dag=dag, )
load = PythonOperator( task_id='load', python_callable=load_data, dag=dag, )

Set task dependencies

Set task dependencies

extract >> transform >> load
undefined
extract >> transform >> load
undefined

TaskFlow API (Recommended for Airflow 2.0+)

TaskFlow API(Airflow 2.0+推荐)

Modern, cleaner syntax using decorators:
python
from datetime import datetime
from airflow.decorators import dag, task

@dag(
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['taskflow', 'etl'],
)
def taskflow_etl_pipeline():
    """
    ETL pipeline using TaskFlow API
    """
    
    @task()
    def extract():
        """Extract data from API"""
        import requests
        # Using environment variable for API key
        import os
        api_key = os.getenv('DATA_API_KEY')
        
        # Simulated extraction
        data = {'records': [1, 2, 3, 4, 5]}
        return data
    
    @task()
    def transform(data: dict):
        """Transform the data"""
        records = data['records']
        transformed = [r * 2 for r in records]
        return {'transformed': transformed}
    
    @task()
    def load(data: dict):
        """Load data to warehouse"""
        print(f"Loading {len(data['transformed'])} records")
        # Your loading logic here
        return True
    
    # Define pipeline
    extracted_data = extract()
    transformed_data = transform(extracted_data)
    load(transformed_data)
使用装饰器的现代、简洁语法:
python
from datetime import datetime
from airflow.decorators import dag, task

@dag(
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['taskflow', 'etl'],
)
def taskflow_etl_pipeline():
    """
    ETL pipeline using TaskFlow API
    """
    
    @task()
    def extract():
        """Extract data from API"""
        import requests
        # Using environment variable for API key
        import os
        api_key = os.getenv('DATA_API_KEY')
        
        # Simulated extraction
        data = {'records': [1, 2, 3, 4, 5]}
        return data
    
    @task()
    def transform(data: dict):
        """Transform the data"""
        records = data['records']
        transformed = [r * 2 for r in records]
        return {'transformed': transformed}
    
    @task()
    def load(data: dict):
        """Load data to warehouse"""
        print(f"Loading {len(data['transformed'])} records")
        # Your loading logic here
        return True
    
    # Define pipeline
    extracted_data = extract()
    transformed_data = transform(extracted_data)
    load(transformed_data)

Instantiate the DAG

Instantiate the DAG

taskflow_etl_pipeline()
undefined
taskflow_etl_pipeline()
undefined

Common Operators

常用Operator

BashOperator

BashOperator

python
from airflow.operators.bash import BashOperator

run_script = BashOperator(
    task_id='run_data_script',
    bash_command='python /opt/scripts/process_data.py --date {{ ds }}',
    dag=dag,
)
python
from airflow.operators.bash import BashOperator

run_script = BashOperator(
    task_id='run_data_script',
    bash_command='python /opt/scripts/process_data.py --date {{ ds }}',
    dag=dag,
)

PythonOperator

PythonOperator

python
from airflow.operators.python import PythonOperator

def my_function(param1, param2, **context):
    execution_date = context['execution_date']
    print(f"Processing for {execution_date}")
    # Your logic here

python_task = PythonOperator(
    task_id='python_task',
    python_callable=my_function,
    op_kwargs={'param1': 'value1', 'param2': 'value2'},
    dag=dag,
)
python
from airflow.operators.python import PythonOperator

def my_function(param1, param2, **context):
    execution_date = context['execution_date']
    print(f"Processing for {execution_date}")
    # Your logic here

python_task = PythonOperator(
    task_id='python_task',
    python_callable=my_function,
    op_kwargs={'param1': 'value1', 'param2': 'value2'},
    dag=dag,
)

BranchPythonOperator

BranchPythonOperator

python
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator

def choose_branch(**context):
    """Decide which branch to execute"""
    execution_date = context['execution_date']
    if execution_date.day % 2 == 0:
        return 'even_day_task'
    else:
        return 'odd_day_task'

branch = BranchPythonOperator(
    task_id='branch_task',
    python_callable=choose_branch,
    dag=dag,
)

even_task = EmptyOperator(task_id='even_day_task', dag=dag)
odd_task = EmptyOperator(task_id='odd_day_task', dag=dag)

branch >> [even_task, odd_task]
python
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator

def choose_branch(**context):
    """Decide which branch to execute"""
    execution_date = context['execution_date']
    if execution_date.day % 2 == 0:
        return 'even_day_task'
    else:
        return 'odd_day_task'

branch = BranchPythonOperator(
    task_id='branch_task',
    python_callable=choose_branch,
    dag=dag,
)

even_task = EmptyOperator(task_id='even_day_task', dag=dag)
odd_task = EmptyOperator(task_id='odd_day_task', dag=dag)

branch >> [even_task, odd_task]

EmailOperator

EmailOperator

python
from airflow.operators.email import EmailOperator

send_email = EmailOperator(
    task_id='send_notification',
    to='team@example.com',
    subject='Pipeline {{ ds }} completed',
    html_content='<p>The pipeline for {{ ds }} has completed successfully.</p>',
    dag=dag,
)
python
from airflow.operators.email import EmailOperator

send_email = EmailOperator(
    task_id='send_notification',
    to='team@example.com',
    subject='Pipeline {{ ds }} completed',
    html_content='<p>The pipeline for {{ ds }} has completed successfully.</p>',
    dag=dag,
)

Sensors

Sensors

Sensors wait for a condition to be met before proceeding.
Sensors会等待条件满足后再继续执行。

FileSensor

FileSensor

python
from airflow.sensors.filesystem import FileSensor

wait_for_file = FileSensor(
    task_id='wait_for_data_file',
    filepath='/data/input/file_{{ ds }}.csv',
    poke_interval=30,  # Check every 30 seconds
    timeout=3600,  # Timeout after 1 hour
    mode='poke',  # 'poke' or 'reschedule'
    dag=dag,
)
python
from airflow.sensors.filesystem import FileSensor

wait_for_file = FileSensor(
    task_id='wait_for_data_file',
    filepath='/data/input/file_{{ ds }}.csv',
    poke_interval=30,  # Check every 30 seconds
    timeout=3600,  # Timeout after 1 hour
    mode='poke',  # 'poke' or 'reschedule'
    dag=dag,
)

TimeDeltaSensor

TimeDeltaSensor

python
from airflow.sensors.time_delta import TimeDeltaSensor
from datetime import timedelta

wait_sensor = TimeDeltaSensor(
    task_id='wait_10_minutes',
    delta=timedelta(minutes=10),
    dag=dag,
)
python
from airflow.sensors.time_delta import TimeDeltaSensor
from datetime import timedelta

wait_sensor = TimeDeltaSensor(
    task_id='wait_10_minutes',
    delta=timedelta(minutes=10),
    dag=dag,
)

Custom Sensor

自定义Sensor

python
from airflow.sensors.base import BaseSensorOperator

class CustomDataSensor(BaseSensorOperator):
    def __init__(self, endpoint, **kwargs):
        super().__init__(**kwargs)
        self.endpoint = endpoint
    
    def poke(self, context):
        """Check if data is available"""
        import requests
        import os
        
        api_key = os.getenv('API_KEY')
        response = requests.get(
            self.endpoint,
            headers={'Authorization': f'Bearer {api_key}'}
        )
        return response.status_code == 200 and response.json().get('ready', False)

check_data = CustomDataSensor(
    task_id='check_data_ready',
    endpoint='https://api.example.com/status',
    poke_interval=60,
    timeout=3600,
    dag=dag,
)
python
from airflow.sensors.base import BaseSensorOperator

class CustomDataSensor(BaseSensorOperator):
    def __init__(self, endpoint, **kwargs):
        super().__init__(**kwargs)
        self.endpoint = endpoint
    
    def poke(self, context):
        """Check if data is available"""
        import requests
        import os
        
        api_key = os.getenv('API_KEY')
        response = requests.get(
            self.endpoint,
            headers={'Authorization': f'Bearer {api_key}'}
        )
        return response.status_code == 200 and response.json().get('ready', False)

check_data = CustomDataSensor(
    task_id='check_data_ready',
    endpoint='https://api.example.com/status',
    poke_interval=60,
    timeout=3600,
    dag=dag,
)

Connections and Hooks

连接与钩子(Connections and Hooks)

Setting Up Connections

设置连接

Connections store credentials and connection details.
连接用于存储凭据和连接详情。

Via CLI

通过CLI

bash
undefined
bash
undefined

Add a Postgres connection

Add a Postgres connection

airflow connections add 'postgres_default'
--conn-type 'postgres'
--conn-host 'localhost'
--conn-schema 'mydb'
--conn-login 'user'
--conn-password 'password'
--conn-port 5432
airflow connections add 'postgres_default'
--conn-type 'postgres'
--conn-host 'localhost'
--conn-schema 'mydb'
--conn-login 'user'
--conn-password 'password'
--conn-port 5432

Add an HTTP connection

Add an HTTP connection

airflow connections add 'http_api'
--conn-type 'http'
--conn-host 'https://api.example.com'
--conn-extra '{"api_key": "from_env_var"}'
undefined
airflow connections add 'http_api'
--conn-type 'http'
--conn-host 'https://api.example.com'
--conn-extra '{"api_key": "from_env_var"}'
undefined

Via Environment Variables

通过环境变量

bash
export AIRFLOW_CONN_POSTGRES_DEFAULT='postgresql://user:password@localhost:5432/mydb'
export AIRFLOW_CONN_HTTP_API='http://api.example.com'
bash
export AIRFLOW_CONN_POSTGRES_DEFAULT='postgresql://user:password@localhost:5432/mydb'
export AIRFLOW_CONN_HTTP_API='http://api.example.com'

Using Hooks

使用钩子

python
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task

@task()
def query_database():
    """Query PostgreSQL database"""
    hook = PostgresHook(postgres_conn_id='postgres_default')
    
    # Execute query and fetch results
    results = hook.get_records(
        sql="SELECT * FROM users WHERE created_date = %s",
        parameters=['2024-01-01']
    )
    
    # Or use pandas
    df = hook.get_pandas_df(sql="SELECT * FROM transactions")
    
    return len(results)

@task()
def insert_data():
    """Insert data into database"""
    hook = PostgresHook(postgres_conn_id='postgres_default')
    
    hook.run(
        sql="INSERT INTO logs (message, timestamp) VALUES (%s, %s)",
        parameters=[('Pipeline completed', datetime.now())]
    )
python
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task

@task()
def query_database():
    """Query PostgreSQL database"""
    hook = PostgresHook(postgres_conn_id='postgres_default')
    
    # Execute query and fetch results
    results = hook.get_records(
        sql="SELECT * FROM users WHERE created_date = %s",
        parameters=['2024-01-01']
    )
    
    # Or use pandas
    df = hook.get_pandas_df(sql="SELECT * FROM transactions")
    
    return len(results)

@task()
def insert_data():
    """Insert data into database"""
    hook = PostgresHook(postgres_conn_id='postgres_default')
    
    hook.run(
        sql="INSERT INTO logs (message, timestamp) VALUES (%s, %s)",
        parameters=[('Pipeline completed', datetime.now())]
    )

HTTP Hook Example

HTTP Hook示例

python
from airflow.providers.http.hooks.http import HttpHook
from airflow.decorators import task

@task()
def call_api():
    """Make HTTP API call"""
    hook = HttpHook(http_conn_id='http_api', method='GET')
    
    response = hook.run(
        endpoint='/v1/data',
        headers={'Content-Type': 'application/json'},
        extra_options={'timeout': 30}
    )
    
    data = response.json()
    return data
python
from airflow.providers.http.hooks.http import HttpHook
from airflow.decorators import task

@task()
def call_api():
    """Make HTTP API call"""
    hook = HttpHook(http_conn_id='http_api', method='GET')
    
    response = hook.run(
        endpoint='/v1/data',
        headers={'Content-Type': 'application/json'},
        extra_options={'timeout': 30}
    )
    
    data = response.json()
    return data

XCom (Cross-Communication)

XCom(跨任务通信)

XCom allows tasks to exchange small amounts of data.
python
from airflow.decorators import dag, task
from datetime import datetime

@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def xcom_example():
    
    @task()
    def push_data():
        """Push data to XCom"""
        return {
            'total_records': 1000,
            'processing_time': 45.2,
            'status': 'success'
        }
    
    @task()
    def pull_data(data: dict):
        """Receive data from previous task"""
        print(f"Received {data['total_records']} records")
        print(f"Processing took {data['processing_time']} seconds")
        
        # Can also use task instance to pull from specific task
        from airflow.operators.python import get_current_context
        context = get_current_context()
        ti = context['ti']
        
        # Pull from specific task
        specific_data = ti.xcom_pull(task_ids='push_data')
        return specific_data['status']
    
    result = push_data()
    pull_data(result)

xcom_example()
XCom允许任务之间交换少量数据。
python
from airflow.decorators import dag, task
from datetime import datetime

@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def xcom_example():
    
    @task()
    def push_data():
        """Push data to XCom"""
        return {
            'total_records': 1000,
            'processing_time': 45.2,
            'status': 'success'
        }
    
    @task()
    def pull_data(data: dict):
        """Receive data from previous task"""
        print(f"Received {data['total_records']} records")
        print(f"Processing took {data['processing_time']} seconds")
        
        # Can also use task instance to pull from specific task
        from airflow.operators.python import get_current_context
        context = get_current_context()
        ti = context['ti']
        
        # Pull from specific task
        specific_data = ti.xcom_pull(task_ids='push_data')
        return specific_data['status']
    
    result = push_data()
    pull_data(result)

xcom_example()

XCom with Multiple Return Values

多返回值的XCom

python
@task()
def process_multiple():
    """Return multiple values"""
    return {'key1': 'value1', 'key2': 'value2'}

@task()
def use_multiple(data: dict):
    """Use multiple values"""
    print(data['key1'], data['key2'])

data = process_multiple()
use_multiple(data)
python
@task()
def process_multiple():
    """Return multiple values"""
    return {'key1': 'value1', 'key2': 'value2'}

@task()
def use_multiple(data: dict):
    """Use multiple values"""
    print(data['key1'], data['key2'])

data = process_multiple()
use_multiple(data)

Task Dependencies

任务依赖

Linear Dependencies

线性依赖

python
task1 >> task2 >> task3
python
task1 >> task2 >> task3

Or

Or

task1.set_downstream(task2) task2.set_downstream(task3)
undefined
task1.set_downstream(task2) task2.set_downstream(task3)
undefined

Parallel Dependencies

并行依赖

python
undefined
python
undefined

Fan-out

Fan-out

task1 >> [task2, task3, task4]
task1 >> [task2, task3, task4]

Fan-in

Fan-in

[task2, task3, task4] >> task5
undefined
[task2, task3, task4] >> task5
undefined

Complex Dependencies

复杂依赖

python
undefined
python
undefined

Multiple dependencies

Multiple dependencies

task1 >> task2 task1 >> task3 [task2, task3] >> task4
task1 >> task2 task1 >> task3 [task2, task3] >> task4

Or using chain

Or using chain

from airflow.models.baseoperator import chain
chain(task1, [task2, task3], task4)
undefined
from airflow.models.baseoperator import chain
chain(task1, [task2, task3], task4)
undefined

Cross-DAG Dependencies

跨DAG依赖

python
from airflow.sensors.external_task import ExternalTaskSensor

wait_for_other_dag = ExternalTaskSensor(
    task_id='wait_for_upstream_dag',
    external_dag_id='upstream_dag',
    external_task_id='final_task',
    timeout=3600,
    dag=dag,
)
python
from airflow.sensors.external_task import ExternalTaskSensor

wait_for_other_dag = ExternalTaskSensor(
    task_id='wait_for_upstream_dag',
    external_dag_id='upstream_dag',
    external_task_id='final_task',
    timeout=3600,
    dag=dag,
)

Dynamic Task Generation

动态任务生成

python
from airflow.decorators import dag, task
from datetime import datetime

@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def dynamic_tasks_example():
    
    @task()
    def get_sources():
        """Get list of data sources to process"""
        return ['source_1', 'source_2', 'source_3', 'source_4']
    
    @task()
    def process_source(source: str):
        """Process a single source"""
        print(f"Processing {source}")
        # Your processing logic
        return f"{source}_processed"
    
    @task()
    def combine_results(results: list):
        """Combine all processed results"""
        print(f"Combining {len(results)} results")
        return results
    
    sources = get_sources()
    processed = process_source.expand(source=sources)
    combine_results(processed)

dynamic_tasks_example()
python
from airflow.decorators import dag, task
from datetime import datetime

@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def dynamic_tasks_example():
    
    @task()
    def get_sources():
        """Get list of data sources to process"""
        return ['source_1', 'source_2', 'source_3', 'source_4']
    
    @task()
    def process_source(source: str):
        """Process a single source"""
        print(f"Processing {source}")
        # Your processing logic
        return f"{source}_processed"
    
    @task()
    def combine_results(results: list):
        """Combine all processed results"""
        print(f"Combining {len(results)} results")
        return results
    
    sources = get_sources()
    processed = process_source.expand(source=sources)
    combine_results(processed)

dynamic_tasks_example()

Dynamic Task Mapping (Airflow 2.3+)

动态任务映射(Airflow 2.3+)

python
@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def task_mapping_example():
    
    @task()
    def extract_files():
        """Return list of files to process"""
        return [
            {'file': 'data1.csv', 'format': 'csv'},
            {'file': 'data2.json', 'format': 'json'},
            {'file': 'data3.parquet', 'format': 'parquet'},
        ]
    
    @task()
    def process_file(file_info: dict):
        """Process a single file"""
        filename = file_info['file']
        format = file_info['format']
        print(f"Processing {filename} as {format}")
        return f"Processed {filename}"
    
    files = extract_files()
    process_file.expand(file_info=files)

task_mapping_example()
python
@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def task_mapping_example():
    
    @task()
    def extract_files():
        """Return list of files to process"""
        return [
            {'file': 'data1.csv', 'format': 'csv'},
            {'file': 'data2.json', 'format': 'json'},
            {'file': 'data3.parquet', 'format': 'parquet'},
        ]
    
    @task()
    def process_file(file_info: dict):
        """Process a single file"""
        filename = file_info['file']
        format = file_info['format']
        print(f"Processing {filename} as {format}")
        return f"Processed {filename}"
    
    files = extract_files()
    process_file.expand(file_info=files)

task_mapping_example()

Configuration

配置

airflow.cfg

airflow.cfg

Key configuration options:
ini
[core]
关键配置选项:
ini
[core]

DAGs folder

DAGs folder

dags_folder = /opt/airflow/dags
dags_folder = /opt/airflow/dags

Executor (LocalExecutor, CeleryExecutor, KubernetesExecutor)

Executor (LocalExecutor, CeleryExecutor, KubernetesExecutor)

executor = LocalExecutor
executor = LocalExecutor

Parallelism

Parallelism

parallelism = 32 dag_concurrency = 16 max_active_runs_per_dag = 16
[database]
parallelism = 32 dag_concurrency = 16 max_active_runs_per_dag = 16
[database]

Database connection

Database connection

sql_alchemy_conn = postgresql+psycopg2://airflow:password@localhost/airflow
[scheduler]
sql_alchemy_conn = postgresql+psycopg2://airflow:password@localhost/airflow
[scheduler]

How often to scan for new DAGs

How often to scan for new DAGs

dag_dir_list_interval = 300
dag_dir_list_interval = 300

Number of scheduler processes

Number of scheduler processes

scheduler_zombie_task_threshold = 300
[webserver]
scheduler_zombie_task_threshold = 300
[webserver]

Web server host and port

Web server host and port

web_server_host = 0.0.0.0 web_server_port = 8080
web_server_host = 0.0.0.0 web_server_port = 8080

Secret key for session

Secret key for session

secret_key = your_secret_key_here
[email]
secret_key = your_secret_key_here
[email]

Email backend

Email backend

email_backend = airflow.utils.email.send_email_smtp
[smtp] smtp_host = smtp.gmail.com smtp_port = 587 smtp_user = your_email@gmail.com smtp_password = your_app_password smtp_mail_from = airflow@example.com
undefined
email_backend = airflow.utils.email.send_email_smtp
[smtp] smtp_host = smtp.gmail.com smtp_port = 587 smtp_user = your_email@gmail.com smtp_password = your_app_password smtp_mail_from = airflow@example.com
undefined

Environment Variables

环境变量

bash
undefined
bash
undefined

Override any config

Override any config

export AIRFLOW__CORE__EXECUTOR=LocalExecutor export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://user:pass@localhost/airflow export AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags export AIRFLOW__WEBSERVER__SECRET_KEY=your_secret_key
export AIRFLOW__CORE__EXECUTOR=LocalExecutor export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://user:pass@localhost/airflow export AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags export AIRFLOW__WEBSERVER__SECRET_KEY=your_secret_key

Set Airflow home

Set Airflow home

export AIRFLOW_HOME=~/airflow
undefined
export AIRFLOW_HOME=~/airflow
undefined

Variables

变量

Store global configuration values.
存储全局配置值。

Set Variables

设置变量

bash
undefined
bash
undefined

Via CLI

Via CLI

airflow variables set my_key my_value airflow variables set api_endpoint "https://api.example.com/v1"
airflow variables set my_key my_value airflow variables set api_endpoint "https://api.example.com/v1"

Import from JSON file

Import from JSON file

airflow variables import variables.json
undefined
airflow variables import variables.json
undefined

Use Variables in DAGs

在DAG中使用变量

python
from airflow.models import Variable
python
from airflow.models import Variable

Get variable

Get variable

api_endpoint = Variable.get("api_endpoint")
api_endpoint = Variable.get("api_endpoint")

Get with default value

Get with default value

timeout = Variable.get("timeout", default_var=30)
timeout = Variable.get("timeout", default_var=30)

Get as JSON

Get as JSON

config = Variable.get("config_json", deserialize_json=True)
config = Variable.get("config_json", deserialize_json=True)

In a task

In a task

@task() def use_variable(): endpoint = Variable.get("api_endpoint") print(f"Using endpoint: {endpoint}")
undefined
@task() def use_variable(): endpoint = Variable.get("api_endpoint") print(f"Using endpoint: {endpoint}")
undefined

Variables in Templates

在模板中使用变量

python
bash_task = BashOperator(
    task_id='use_variable',
    bash_command='echo "API: {{ var.value.api_endpoint }}"',
    dag=dag,
)
python
bash_task = BashOperator(
    task_id='use_variable',
    bash_command='echo "API: {{ var.value.api_endpoint }}"',
    dag=dag,
)

Templating with Jinja

使用Jinja模板

Airflow uses Jinja templating for many fields.
Airflow在许多字段中使用Jinja模板。

Common Template Variables

常用模板变量

python
from airflow.operators.bash import BashOperator

templated_command = """
    # Execution date
    echo "Execution date: {{ ds }}"  # YYYY-MM-DD
    echo "Execution date no dash: {{ ds_nodash }}"  # YYYYMMDD
    
    # Date components
    echo "Year: {{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}"
    echo "Previous day: {{ macros.ds_add(ds, -1) }}"
    
    # Task instance
    echo "Task ID: {{ task.task_id }}"
    echo "DAG ID: {{ dag.dag_id }}"
    echo "Run ID: {{ run_id }}"
    
    # Parameters
    echo "Param: {{ params.my_param }}"
"""

task = BashOperator(
    task_id='templated_task',
    bash_command=templated_command,
    params={'my_param': 'value'},
    dag=dag,
)
python
from airflow.operators.bash import BashOperator

templated_command = """
    # Execution date
    echo "Execution date: {{ ds }}"  # YYYY-MM-DD
    echo "Execution date no dash: {{ ds_nodash }}"  # YYYYMMDD
    
    # Date components
    echo "Year: {{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}"
    echo "Previous day: {{ macros.ds_add(ds, -1) }}"
    
    # Task instance
    echo "Task ID: {{ task.task_id }}"
    echo "DAG ID: {{ dag.dag_id }}"
    echo "Run ID: {{ run_id }}"
    
    # Parameters
    echo "Param: {{ params.my_param }}"
"""

task = BashOperator(
    task_id='templated_task',
    bash_command=templated_command,
    params={'my_param': 'value'},
    dag=dag,
)

Custom Macros

自定义宏

python
def custom_macro(value):
    """Custom Jinja macro"""
    return value.upper()

dag = DAG(
    'dag_with_macros',
    user_defined_macros={
        'custom_upper': custom_macro
    },
    start_date=datetime(2024, 1, 1),
)

task = BashOperator(
    task_id='use_macro',
    bash_command='echo "{{ custom_upper(params.name) }}"',
    params={'name': 'airflow'},
    dag=dag,
)
python
def custom_macro(value):
    """Custom Jinja macro"""
    return value.upper()

dag = DAG(
    'dag_with_macros',
    user_defined_macros={
        'custom_upper': custom_macro
    },
    start_date=datetime(2024, 1, 1),
)

task = BashOperator(
    task_id='use_macro',
    bash_command='echo "{{ custom_upper(params.name) }}"',
    params={'name': 'airflow'},
    dag=dag,
)

CLI Commands

CLI命令

DAG Management

DAG管理

bash
undefined
bash
undefined

List all DAGs

List all DAGs

airflow dags list
airflow dags list

List tasks in a DAG

List tasks in a DAG

airflow tasks list my_dag
airflow tasks list my_dag

Show DAG structure

Show DAG structure

airflow dags show my_dag
airflow dags show my_dag

Trigger a DAG run

Trigger a DAG run

airflow dags trigger my_dag
airflow dags trigger my_dag

Trigger with config

Trigger with config

airflow dags trigger my_dag --conf '{"key": "value"}'
airflow dags trigger my_dag --conf '{"key": "value"}'

Pause/Unpause DAG

Pause/Unpause DAG

airflow dags pause my_dag airflow dags unpause my_dag
airflow dags pause my_dag airflow dags unpause my_dag

Delete DAG (from metadata, not file)

Delete DAG (from metadata, not file)

airflow dags delete my_dag
undefined
airflow dags delete my_dag
undefined

Task Management

任务管理

bash
undefined
bash
undefined

Test a task (doesn't save state)

Test a task (doesn't save state)

airflow tasks test my_dag my_task 2024-01-01
airflow tasks test my_dag my_task 2024-01-01

Run a task (saves state)

Run a task (saves state)

airflow tasks run my_dag my_task 2024-01-01
airflow tasks run my_dag my_task 2024-01-01

Clear task state

Clear task state

airflow tasks clear my_dag --task-regex my_task
airflow tasks clear my_dag --task-regex my_task

Clear all tasks in DAG

Clear all tasks in DAG

airflow tasks clear my_dag --start-date 2024-01-01 --end-date 2024-01-31
undefined
airflow tasks clear my_dag --start-date 2024-01-01 --end-date 2024-01-31
undefined

Database

数据库

bash
undefined
bash
undefined

Initialize database

Initialize database

airflow db init
airflow db init

Upgrade database

Upgrade database

airflow db upgrade
airflow db upgrade

Reset database (WARNING: deletes all data)

Reset database (WARNING: deletes all data)

airflow db reset
airflow db reset

Check database

Check database

airflow db check
undefined
airflow db check
undefined

Users

用户

bash
undefined
bash
undefined

Create user

Create user

airflow users create
--username john
--firstname John
--lastname Doe
--role Admin
--email john@example.com
airflow users create
--username john
--firstname John
--lastname Doe
--role Admin
--email john@example.com

List users

List users

airflow users list
airflow users list

Delete user

Delete user

airflow users delete --username john
undefined
airflow users delete --username john
undefined

Connections

连接

bash
undefined
bash
undefined

List connections

List connections

airflow connections list
airflow connections list

Get connection details

Get connection details

airflow connections get postgres_default
airflow connections get postgres_default

Export connections

Export connections

airflow connections export connections.json
airflow connections export connections.json

Import connections

Import connections

airflow connections import connections.json
undefined
airflow connections import connections.json
undefined

Variables

变量

bash
undefined
bash
undefined

Set variable

Set variable

airflow variables set my_var my_value
airflow variables set my_var my_value

Get variable

Get variable

airflow variables get my_var
airflow variables get my_var

Delete variable

Delete variable

airflow variables delete my_var
airflow variables delete my_var

List all variables

List all variables

airflow variables list
airflow variables list

Export variables to JSON

Export variables to JSON

airflow variables export variables.json
airflow variables export variables.json

Import variables from JSON

Import variables from JSON

airflow variables import variables.json
undefined
airflow variables import variables.json
undefined

Working with Providers

与Providers协作

Providers extend Airflow with additional operators, hooks, and sensors.
Providers为Airflow扩展了额外的Operator、Hook和Sensor。

Install Providers

安装Providers

bash
undefined
bash
undefined

Install specific providers

Install specific providers

pip install apache-airflow-providers-amazon pip install apache-airflow-providers-google pip install apache-airflow-providers-postgres pip install apache-airflow-providers-http pip install apache-airflow-providers-docker
undefined
pip install apache-airflow-providers-amazon pip install apache-airflow-providers-google pip install apache-airflow-providers-postgres pip install apache-airflow-providers-http pip install apache-airflow-providers-docker
undefined

AWS S3 Example

AWS S3示例

python
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator
from airflow.decorators import dag, task
from datetime import datetime

@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def s3_example():
    
    list_files = S3ListOperator(
        task_id='list_s3_files',
        bucket='my-bucket',
        prefix='data/',
        aws_conn_id='aws_default',
    )
    
    @task()
    def download_from_s3():
        """Download file from S3"""
        hook = S3Hook(aws_conn_id='aws_default')
        
        # Download file
        content = hook.read_key(
            key='data/file.csv',
            bucket_name='my-bucket'
        )
        
        return len(content)
    
    @task()
    def upload_to_s3():
        """Upload file to S3"""
        hook = S3Hook(aws_conn_id='aws_default')
        
        # Upload file
        hook.load_string(
            string_data='Hello, S3!',
            key='output/result.txt',
            bucket_name='my-bucket'
        )
    
    list_files >> download_from_s3() >> upload_to_s3()

s3_example()
python
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator
from airflow.decorators import dag, task
from datetime import datetime

@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def s3_example():
    
    list_files = S3ListOperator(
        task_id='list_s3_files',
        bucket='my-bucket',
        prefix='data/',
        aws_conn_id='aws_default',
    )
    
    @task()
    def download_from_s3():
        """Download file from S3"""
        hook = S3Hook(aws_conn_id='aws_default')
        
        # Download file
        content = hook.read_key(
            key='data/file.csv',
            bucket_name='my-bucket'
        )
        
        return len(content)
    
    @task()
    def upload_to_s3():
        """Upload file to S3"""
        hook = S3Hook(aws_conn_id='aws_default')
        
        # Upload file
        hook.load_string(
            string_data='Hello, S3!',
            key='output/result.txt',
            bucket_name='my-bucket'
        )
    
    list_files >> download_from_s3() >> upload_to_s3()

s3_example()

Google Cloud Storage Example

Google Cloud Storage示例

python
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

load_to_bigquery = GCSToBigQueryOperator(
    task_id='load_to_bq',
    bucket='my-gcs-bucket',
    source_objects=['data/*.csv'],
    destination_project_dataset_table='project.dataset.table',
    source_format='CSV',
    skip_leading_rows=1,
    write_disposition='WRITE_TRUNCATE',
    gcp_conn_id='google_cloud_default',
    dag=dag,
)
python
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

load_to_bigquery = GCSToBigQueryOperator(
    task_id='load_to_bq',
    bucket='my-gcs-bucket',
    source_objects=['data/*.csv'],
    destination_project_dataset_table='project.dataset.table',
    source_format='CSV',
    skip_leading_rows=1,
    write_disposition='WRITE_TRUNCATE',
    gcp_conn_id='google_cloud_default',
    dag=dag,
)

Docker Operator Example

Docker Operator示例

python
from airflow.providers.docker.operators.docker import DockerOperator

run_container = DockerOperator(
    task_id='run_docker_container',
    image='python:3.10',
    command='python -c "print(\'Hello from Docker\')"',
    docker_url='unix://var/run/docker.sock',
    network_mode='bridge',
    dag=dag,
)
python
from airflow.providers.docker.operators.docker import DockerOperator

run_container = DockerOperator(
    task_id='run_docker_container',
    image='python:3.10',
    command='python -c "print(\'Hello from Docker\')"',
    docker_url='unix://var/run/docker.sock',
    network_mode='bridge',
    dag=dag,
)

Error Handling and Retries

错误处理与重试

Task-level Configuration

任务级配置

python
@task(
    retries=3,
    retry_delay=timedelta(minutes=5),
    retry_exponential_backoff=True,
    max_retry_delay=timedelta(hours=1),
)
def task_with_retries():
    """Task with custom retry logic"""
    # Your code
    pass
python
@task(
    retries=3,
    retry_delay=timedelta(minutes=5),
    retry_exponential_backoff=True,
    max_retry_delay=timedelta(hours=1),
)
def task_with_retries():
    """Task with custom retry logic"""
    # Your code
    pass

Failure Callbacks

失败回调

python
def on_failure_callback(context):
    """Called when task fails"""
    ti = context['task_instance']
    print(f"Task {ti.task_id} failed!")
    # Send alert, create ticket, etc.

def on_success_callback(context):
    """Called when task succeeds"""
    print("Task succeeded!")

@task(
    on_failure_callback=on_failure_callback,
    on_success_callback=on_success_callback,
)
def monitored_task():
    """Task with callbacks"""
    # Your code
    pass
python
def on_failure_callback(context):
    """Called when task fails"""
    ti = context['task_instance']
    print(f"Task {ti.task_id} failed!")
    # Send alert, create ticket, etc.

def on_success_callback(context):
    """Called when task succeeds"""
    print("Task succeeded!")

@task(
    on_failure_callback=on_failure_callback,
    on_success_callback=on_success_callback,
)
def monitored_task():
    """Task with callbacks"""
    # Your code
    pass

Try/Except in Tasks

任务中的Try/Except

python
from airflow.exceptions import AirflowException

@task()
def safe_task():
    """Task with error handling"""
    try:
        # Your code that might fail
        result = risky_operation()
        return result
    except SpecificException as e:
        # Log error but don't fail task
        print(f"Warning: {e}")
        return None
    except Exception as e:
        # Fail task with custom message
        raise AirflowException(f"Critical error: {e}")
python
from airflow.exceptions import AirflowException

@task()
def safe_task():
    """Task with error handling"""
    try:
        # Your code that might fail
        result = risky_operation()
        return result
    except SpecificException as e:
        # Log error but don't fail task
        print(f"Warning: {e}")
        return None
    except Exception as e:
        # Fail task with custom message
        raise AirflowException(f"Critical error: {e}")

Testing DAGs

测试DAG

Unit Testing

单元测试

python
undefined
python
undefined

test_dag.py

test_dag.py

import pytest from datetime import datetime from airflow.models import DagBag
def test_dag_loaded(): """Test that DAG is loaded correctly""" dagbag = DagBag(dag_folder='dags/', include_examples=False) assert 'my_dag' in dagbag.dags assert len(dagbag.import_errors) == 0
def test_dag_structure(): """Test DAG structure""" dagbag = DagBag(dag_folder='dags/', include_examples=False) dag = dagbag.get_dag('my_dag')
# Check task count
assert len(dag.tasks) == 5

# Check specific task exists
assert 'extract' in dag.task_ids

# Check dependencies
extract_task = dag.get_task('extract')
downstream = extract_task.downstream_task_ids
assert 'transform' in downstream
def test_task_execution(): """Test task execution""" from airflow.models import TaskInstance from airflow import settings
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
task = dag.get_task('extract')

# Create task instance
ti = TaskInstance(task=task, execution_date=datetime(2024, 1, 1))

# Test execution
ti.run(ignore_ti_state=True)

# Check result
assert ti.state == 'success'
undefined
import pytest from datetime import datetime from airflow.models import DagBag
def test_dag_loaded(): """Test that DAG is loaded correctly""" dagbag = DagBag(dag_folder='dags/', include_examples=False) assert 'my_dag' in dagbag.dags assert len(dagbag.import_errors) == 0
def test_dag_structure(): """Test DAG structure""" dagbag = DagBag(dag_folder='dags/', include_examples=False) dag = dagbag.get_dag('my_dag')
# Check task count
assert len(dag.tasks) == 5

# Check specific task exists
assert 'extract' in dag.task_ids

# Check dependencies
extract_task = dag.get_task('extract')
downstream = extract_task.downstream_task_ids
assert 'transform' in downstream
def test_task_execution(): """Test task execution""" from airflow.models import TaskInstance from airflow import settings
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
task = dag.get_task('extract')

# Create task instance
ti = TaskInstance(task=task, execution_date=datetime(2024, 1, 1))

# Test execution
ti.run(ignore_ti_state=True)

# Check result
assert ti.state == 'success'
undefined

Integration Testing

集成测试

python
undefined
python
undefined

test_integration.py

test_integration.py

from airflow.models import DagBag from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType
def test_dag_run(): """Test complete DAG run""" dagbag = DagBag(dag_folder='dags/') dag = dagbag.get_dag('my_dag')
# Trigger DAG
dag.test()
undefined
from airflow.models import DagBag from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType
def test_dag_run(): """Test complete DAG run""" dagbag = DagBag(dag_folder='dags/') dag = dagbag.get_dag('my_dag')
# Trigger DAG
dag.test()
undefined

Best Practices

最佳实践

1. Idempotent Tasks

1. 幂等任务

python
@task()
def idempotent_load(execution_date):
    """
    Task that can be run multiple times safely
    """
    # Delete existing data for this date first
    delete_query = "DELETE FROM table WHERE date = %s"
    hook.run(delete_query, parameters=[execution_date])
    
    # Then insert new data
    insert_query = "INSERT INTO table ..."
    hook.run(insert_query)
python
@task()
def idempotent_load(execution_date):
    """
    Task that can be run multiple times safely
    """
    # Delete existing data for this date first
    delete_query = "DELETE FROM table WHERE date = %s"
    hook.run(delete_query, parameters=[execution_date])
    
    # Then insert new data
    insert_query = "INSERT INTO table ..."
    hook.run(insert_query)

2. Use Connections for Credentials

2. 使用连接存储凭据

python
undefined
python
undefined

Good: Use connections

Good: Use connections

@task() def good_practice(): hook = PostgresHook(postgres_conn_id='postgres_default') # Use hook
@task() def good_practice(): hook = PostgresHook(postgres_conn_id='postgres_default') # Use hook

Bad: Hardcode credentials

Bad: Hardcode credentials

@task() def bad_practice(): import psycopg2 conn = psycopg2.connect( host='localhost', user='user', # Don't do this! password='password' # Never do this! )
undefined
@task() def bad_practice(): import psycopg2 conn = psycopg2.connect( host='localhost', user='user', # Don't do this! password='password' # Never do this! )
undefined

3. Don't Pass Large Data Between Tasks

3. 不要在任务间传递大数据

python
undefined
python
undefined

Good: Pass references

Good: Pass references

@task() def process_data(): # Process and save to database/storage data_id = save_to_database(large_data) return data_id # Return only ID
@task()
@task() def process_data(): # Process and save to database/storage data_id = save_to_database(large_data) return data_id # Return only ID
@task() def use_data(data_id): # Retrieve data from database/storage data = get_from_database(data_id) # Process data
undefined

4. 使用TaskFlow API

python
undefined

Good: TaskFlow API

@dag() def good_dag(): @task() def extract(): return data
@task()
def transform(data):
    return transformed_data

extract() >> transform()

Bad: Traditional operators

def bad_dag(): extract = PythonOperator( task_id='extract', python_callable=extract_func, dag=dag, )
transform = PythonOperator(
    task_id='transform',
    python_callable=transform_func,
    op_kwargs={'data': '{{ ti.xcom_pull(task_ids="extract") }}'},
    dag=dag,
)

extract >> transform
undefined

5. 保持DAG简洁

python
undefined

Good: Split into smaller tasks

@task() def extract_source1(): pass
@task() def extract_source2(): pass
@task() def combine_extracts(): pass

Bad: Monolithic task

@task() def extract_all_sources(): # Extract from 10 different sources in one task pass
undefined

6. 使用版本控制

  • 将DAG代码存储在Git仓库中
  • 使用分支管理不同环境(开发、测试、生产)
  • 提交前运行单元测试

7. 监控与告警

python
undefined

Configure email alerts

default_args = { 'email': ['alerts@example.com'], 'email_on_failure': True, 'email_on_retry': False, }

Or use Slack/Teams alerts

from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
send_slack_alert = SlackWebhookOperator( task_id='send_slack_alert', slack_webhook_conn_id='slack_default', message='Task {{ task.task_id }} failed!', dag=dag, )
undefined

8. 清理旧数据

python
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

def clean_old_logs(**context):
    """Clean logs older than 30 days"""
    import os
    import shutil
    from datetime import datetime, timedelta
    
    log_dir = '/opt/airflow/logs'
    cutoff_date = datetime.now() - timedelta(days=30)
    
    for root, dirs, files in os.walk(log_dir):
        for file in files:
            file_path = os.path.join(root, file)
            file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
            if file_mtime < cutoff_date:
                os.remove(file_path)

clean_task = PythonOperator(
    task_id='clean_old_logs',
    python_callable=clean_old_logs,
    schedule='0 0 * * 0',  # Run weekly on Sunday
    dag=dag,
)