apache-airflow-orchestration
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseApache Airflow Orchestration
Apache Airflow 编排
Skill by ara.so — Data Skills collection.
Apache Airflow is a platform to programmatically author, schedule, and monitor workflows. It allows you to define workflows as Directed Acyclic Graphs (DAGs) in Python code, making them maintainable, versionable, testable, and collaborative.
由ara.so提供的技能 — 数据技能合集。
Apache Airflow是一个以编程方式创作、调度和监控工作流的平台。它允许你用Python代码将工作流定义为有向无环图(DAGs),使其易于维护、版本控制、测试和协作。
Installation
安装
Using pip
使用pip
bash
undefinedbash
undefinedInstall Airflow with constraints for your Python version
Install Airflow with constraints for your Python version
AIRFLOW_VERSION=3.2.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
undefinedAIRFLOW_VERSION=3.2.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
undefinedUsing Docker (Recommended for Development)
使用Docker(开发推荐)
bash
undefinedbash
undefinedDownload docker-compose.yaml
Download docker-compose.yaml
Create required directories
Create required directories
mkdir -p ./dags ./logs ./plugins ./config
mkdir -p ./dags ./logs ./plugins ./config
Set the Airflow user
Set the Airflow user
echo -e "AIRFLOW_UID=$(id -u)" > .env
echo -e "AIRFLOW_UID=$(id -u)" > .env
Initialize the database
Initialize the database
docker compose up airflow-init
docker compose up airflow-init
Start Airflow
Start Airflow
docker compose up
Access the web UI at `http://localhost:8080` (default credentials: `airflow`/`airflow`).docker compose up
访问Web UI地址为`http://localhost:8080`(默认凭据:`airflow`/`airflow`)。Standalone Quick Start
独立快速启动
bash
undefinedbash
undefinedInitialize database and create admin user
Initialize database and create admin user
airflow db init
airflow db init
Create admin user
Create admin user
airflow users create
--username admin
--firstname Admin
--lastname User
--role Admin
--email admin@example.com
--username admin
--firstname Admin
--lastname User
--role Admin
--email admin@example.com
airflow users create
--username admin
--firstname Admin
--lastname User
--role Admin
--email admin@example.com
--username admin
--firstname Admin
--lastname User
--role Admin
--email admin@example.com
Start the web server (default port 8080)
Start the web server (default port 8080)
airflow webserver --port 8080
airflow webserver --port 8080
Start the scheduler (in another terminal)
Start the scheduler (in another terminal)
airflow scheduler
undefinedairflow scheduler
undefinedCore Concepts
核心概念
DAG (Directed Acyclic Graph)
DAG(有向无环图)
A DAG defines a workflow with tasks and their dependencies. Tasks must not create cycles.
DAG定义了包含任务及其依赖关系的工作流。任务不能形成循环。
Basic DAG Structure
基础DAG结构
python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperatorpython
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperatorDefault arguments applied to all tasks
Default arguments applied to all tasks
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['alerts@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['alerts@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
Define the DAG
Define the DAG
dag = DAG(
'example_data_pipeline',
default_args=default_args,
description='A simple data pipeline',
schedule='0 0 * * *', # Run daily at midnight (cron expression)
start_date=datetime(2024, 1, 1),
catchup=False, # Don't run for past dates
tags=['example', 'data-engineering'],
)
def extract_data(**context):
"""Extract data from source"""
print("Extracting data...")
# Your extraction logic here
return {'records': 1000}
def transform_data(**context):
"""Transform extracted data"""
# Access data from previous task via XCom
ti = context['ti']
extracted = ti.xcom_pull(task_ids='extract')
print(f"Transforming {extracted['records']} records...")
return {'transformed_records': extracted['records']}
def load_data(**context):
"""Load data to destination"""
ti = context['ti']
transformed = ti.xcom_pull(task_ids='transform')
print(f"Loading {transformed['transformed_records']} records...")
dag = DAG(
'example_data_pipeline',
default_args=default_args,
description='A simple data pipeline',
schedule='0 0 * * *', # Run daily at midnight (cron expression)
start_date=datetime(2024, 1, 1),
catchup=False, # Don't run for past dates
tags=['example', 'data-engineering'],
)
def extract_data(**context):
"""Extract data from source"""
print("Extracting data...")
# Your extraction logic here
return {'records': 1000}
def transform_data(**context):
"""Transform extracted data"""
# Access data from previous task via XCom
ti = context['ti']
extracted = ti.xcom_pull(task_ids='extract')
print(f"Transforming {extracted['records']} records...")
return {'transformed_records': extracted['records']}
def load_data(**context):
"""Load data to destination"""
ti = context['ti']
transformed = ti.xcom_pull(task_ids='transform')
print(f"Loading {transformed['transformed_records']} records...")
Define tasks
Define tasks
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
dag=dag,
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
dag=dag,
)
load = PythonOperator(
task_id='load',
python_callable=load_data,
dag=dag,
)
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
dag=dag,
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
dag=dag,
)
load = PythonOperator(
task_id='load',
python_callable=load_data,
dag=dag,
)
Set task dependencies
Set task dependencies
extract >> transform >> load
undefinedextract >> transform >> load
undefinedTaskFlow API (Recommended for Airflow 2.0+)
TaskFlow API(Airflow 2.0+推荐)
Modern, cleaner syntax using decorators:
python
from datetime import datetime
from airflow.decorators import dag, task
@dag(
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['taskflow', 'etl'],
)
def taskflow_etl_pipeline():
"""
ETL pipeline using TaskFlow API
"""
@task()
def extract():
"""Extract data from API"""
import requests
# Using environment variable for API key
import os
api_key = os.getenv('DATA_API_KEY')
# Simulated extraction
data = {'records': [1, 2, 3, 4, 5]}
return data
@task()
def transform(data: dict):
"""Transform the data"""
records = data['records']
transformed = [r * 2 for r in records]
return {'transformed': transformed}
@task()
def load(data: dict):
"""Load data to warehouse"""
print(f"Loading {len(data['transformed'])} records")
# Your loading logic here
return True
# Define pipeline
extracted_data = extract()
transformed_data = transform(extracted_data)
load(transformed_data)使用装饰器的现代、简洁语法:
python
from datetime import datetime
from airflow.decorators import dag, task
@dag(
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['taskflow', 'etl'],
)
def taskflow_etl_pipeline():
"""
ETL pipeline using TaskFlow API
"""
@task()
def extract():
"""Extract data from API"""
import requests
# Using environment variable for API key
import os
api_key = os.getenv('DATA_API_KEY')
# Simulated extraction
data = {'records': [1, 2, 3, 4, 5]}
return data
@task()
def transform(data: dict):
"""Transform the data"""
records = data['records']
transformed = [r * 2 for r in records]
return {'transformed': transformed}
@task()
def load(data: dict):
"""Load data to warehouse"""
print(f"Loading {len(data['transformed'])} records")
# Your loading logic here
return True
# Define pipeline
extracted_data = extract()
transformed_data = transform(extracted_data)
load(transformed_data)Instantiate the DAG
Instantiate the DAG
taskflow_etl_pipeline()
undefinedtaskflow_etl_pipeline()
undefinedCommon Operators
常用Operator
BashOperator
BashOperator
python
from airflow.operators.bash import BashOperator
run_script = BashOperator(
task_id='run_data_script',
bash_command='python /opt/scripts/process_data.py --date {{ ds }}',
dag=dag,
)python
from airflow.operators.bash import BashOperator
run_script = BashOperator(
task_id='run_data_script',
bash_command='python /opt/scripts/process_data.py --date {{ ds }}',
dag=dag,
)PythonOperator
PythonOperator
python
from airflow.operators.python import PythonOperator
def my_function(param1, param2, **context):
execution_date = context['execution_date']
print(f"Processing for {execution_date}")
# Your logic here
python_task = PythonOperator(
task_id='python_task',
python_callable=my_function,
op_kwargs={'param1': 'value1', 'param2': 'value2'},
dag=dag,
)python
from airflow.operators.python import PythonOperator
def my_function(param1, param2, **context):
execution_date = context['execution_date']
print(f"Processing for {execution_date}")
# Your logic here
python_task = PythonOperator(
task_id='python_task',
python_callable=my_function,
op_kwargs={'param1': 'value1', 'param2': 'value2'},
dag=dag,
)BranchPythonOperator
BranchPythonOperator
python
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
def choose_branch(**context):
"""Decide which branch to execute"""
execution_date = context['execution_date']
if execution_date.day % 2 == 0:
return 'even_day_task'
else:
return 'odd_day_task'
branch = BranchPythonOperator(
task_id='branch_task',
python_callable=choose_branch,
dag=dag,
)
even_task = EmptyOperator(task_id='even_day_task', dag=dag)
odd_task = EmptyOperator(task_id='odd_day_task', dag=dag)
branch >> [even_task, odd_task]python
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
def choose_branch(**context):
"""Decide which branch to execute"""
execution_date = context['execution_date']
if execution_date.day % 2 == 0:
return 'even_day_task'
else:
return 'odd_day_task'
branch = BranchPythonOperator(
task_id='branch_task',
python_callable=choose_branch,
dag=dag,
)
even_task = EmptyOperator(task_id='even_day_task', dag=dag)
odd_task = EmptyOperator(task_id='odd_day_task', dag=dag)
branch >> [even_task, odd_task]EmailOperator
EmailOperator
python
from airflow.operators.email import EmailOperator
send_email = EmailOperator(
task_id='send_notification',
to='team@example.com',
subject='Pipeline {{ ds }} completed',
html_content='<p>The pipeline for {{ ds }} has completed successfully.</p>',
dag=dag,
)python
from airflow.operators.email import EmailOperator
send_email = EmailOperator(
task_id='send_notification',
to='team@example.com',
subject='Pipeline {{ ds }} completed',
html_content='<p>The pipeline for {{ ds }} has completed successfully.</p>',
dag=dag,
)Sensors
Sensors
Sensors wait for a condition to be met before proceeding.
Sensors会等待条件满足后再继续执行。
FileSensor
FileSensor
python
from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id='wait_for_data_file',
filepath='/data/input/file_{{ ds }}.csv',
poke_interval=30, # Check every 30 seconds
timeout=3600, # Timeout after 1 hour
mode='poke', # 'poke' or 'reschedule'
dag=dag,
)python
from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id='wait_for_data_file',
filepath='/data/input/file_{{ ds }}.csv',
poke_interval=30, # Check every 30 seconds
timeout=3600, # Timeout after 1 hour
mode='poke', # 'poke' or 'reschedule'
dag=dag,
)TimeDeltaSensor
TimeDeltaSensor
python
from airflow.sensors.time_delta import TimeDeltaSensor
from datetime import timedelta
wait_sensor = TimeDeltaSensor(
task_id='wait_10_minutes',
delta=timedelta(minutes=10),
dag=dag,
)python
from airflow.sensors.time_delta import TimeDeltaSensor
from datetime import timedelta
wait_sensor = TimeDeltaSensor(
task_id='wait_10_minutes',
delta=timedelta(minutes=10),
dag=dag,
)Custom Sensor
自定义Sensor
python
from airflow.sensors.base import BaseSensorOperator
class CustomDataSensor(BaseSensorOperator):
def __init__(self, endpoint, **kwargs):
super().__init__(**kwargs)
self.endpoint = endpoint
def poke(self, context):
"""Check if data is available"""
import requests
import os
api_key = os.getenv('API_KEY')
response = requests.get(
self.endpoint,
headers={'Authorization': f'Bearer {api_key}'}
)
return response.status_code == 200 and response.json().get('ready', False)
check_data = CustomDataSensor(
task_id='check_data_ready',
endpoint='https://api.example.com/status',
poke_interval=60,
timeout=3600,
dag=dag,
)python
from airflow.sensors.base import BaseSensorOperator
class CustomDataSensor(BaseSensorOperator):
def __init__(self, endpoint, **kwargs):
super().__init__(**kwargs)
self.endpoint = endpoint
def poke(self, context):
"""Check if data is available"""
import requests
import os
api_key = os.getenv('API_KEY')
response = requests.get(
self.endpoint,
headers={'Authorization': f'Bearer {api_key}'}
)
return response.status_code == 200 and response.json().get('ready', False)
check_data = CustomDataSensor(
task_id='check_data_ready',
endpoint='https://api.example.com/status',
poke_interval=60,
timeout=3600,
dag=dag,
)Connections and Hooks
连接与钩子(Connections and Hooks)
Setting Up Connections
设置连接
Connections store credentials and connection details.
连接用于存储凭据和连接详情。
Via CLI
通过CLI
bash
undefinedbash
undefinedAdd a Postgres connection
Add a Postgres connection
airflow connections add 'postgres_default'
--conn-type 'postgres'
--conn-host 'localhost'
--conn-schema 'mydb'
--conn-login 'user'
--conn-password 'password'
--conn-port 5432
--conn-type 'postgres'
--conn-host 'localhost'
--conn-schema 'mydb'
--conn-login 'user'
--conn-password 'password'
--conn-port 5432
airflow connections add 'postgres_default'
--conn-type 'postgres'
--conn-host 'localhost'
--conn-schema 'mydb'
--conn-login 'user'
--conn-password 'password'
--conn-port 5432
--conn-type 'postgres'
--conn-host 'localhost'
--conn-schema 'mydb'
--conn-login 'user'
--conn-password 'password'
--conn-port 5432
Add an HTTP connection
Add an HTTP connection
airflow connections add 'http_api'
--conn-type 'http'
--conn-host 'https://api.example.com'
--conn-extra '{"api_key": "from_env_var"}'
--conn-type 'http'
--conn-host 'https://api.example.com'
--conn-extra '{"api_key": "from_env_var"}'
undefinedairflow connections add 'http_api'
--conn-type 'http'
--conn-host 'https://api.example.com'
--conn-extra '{"api_key": "from_env_var"}'
--conn-type 'http'
--conn-host 'https://api.example.com'
--conn-extra '{"api_key": "from_env_var"}'
undefinedVia Environment Variables
通过环境变量
bash
export AIRFLOW_CONN_POSTGRES_DEFAULT='postgresql://user:password@localhost:5432/mydb'
export AIRFLOW_CONN_HTTP_API='http://api.example.com'bash
export AIRFLOW_CONN_POSTGRES_DEFAULT='postgresql://user:password@localhost:5432/mydb'
export AIRFLOW_CONN_HTTP_API='http://api.example.com'Using Hooks
使用钩子
python
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
@task()
def query_database():
"""Query PostgreSQL database"""
hook = PostgresHook(postgres_conn_id='postgres_default')
# Execute query and fetch results
results = hook.get_records(
sql="SELECT * FROM users WHERE created_date = %s",
parameters=['2024-01-01']
)
# Or use pandas
df = hook.get_pandas_df(sql="SELECT * FROM transactions")
return len(results)
@task()
def insert_data():
"""Insert data into database"""
hook = PostgresHook(postgres_conn_id='postgres_default')
hook.run(
sql="INSERT INTO logs (message, timestamp) VALUES (%s, %s)",
parameters=[('Pipeline completed', datetime.now())]
)python
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
@task()
def query_database():
"""Query PostgreSQL database"""
hook = PostgresHook(postgres_conn_id='postgres_default')
# Execute query and fetch results
results = hook.get_records(
sql="SELECT * FROM users WHERE created_date = %s",
parameters=['2024-01-01']
)
# Or use pandas
df = hook.get_pandas_df(sql="SELECT * FROM transactions")
return len(results)
@task()
def insert_data():
"""Insert data into database"""
hook = PostgresHook(postgres_conn_id='postgres_default')
hook.run(
sql="INSERT INTO logs (message, timestamp) VALUES (%s, %s)",
parameters=[('Pipeline completed', datetime.now())]
)HTTP Hook Example
HTTP Hook示例
python
from airflow.providers.http.hooks.http import HttpHook
from airflow.decorators import task
@task()
def call_api():
"""Make HTTP API call"""
hook = HttpHook(http_conn_id='http_api', method='GET')
response = hook.run(
endpoint='/v1/data',
headers={'Content-Type': 'application/json'},
extra_options={'timeout': 30}
)
data = response.json()
return datapython
from airflow.providers.http.hooks.http import HttpHook
from airflow.decorators import task
@task()
def call_api():
"""Make HTTP API call"""
hook = HttpHook(http_conn_id='http_api', method='GET')
response = hook.run(
endpoint='/v1/data',
headers={'Content-Type': 'application/json'},
extra_options={'timeout': 30}
)
data = response.json()
return dataXCom (Cross-Communication)
XCom(跨任务通信)
XCom allows tasks to exchange small amounts of data.
python
from airflow.decorators import dag, task
from datetime import datetime
@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def xcom_example():
@task()
def push_data():
"""Push data to XCom"""
return {
'total_records': 1000,
'processing_time': 45.2,
'status': 'success'
}
@task()
def pull_data(data: dict):
"""Receive data from previous task"""
print(f"Received {data['total_records']} records")
print(f"Processing took {data['processing_time']} seconds")
# Can also use task instance to pull from specific task
from airflow.operators.python import get_current_context
context = get_current_context()
ti = context['ti']
# Pull from specific task
specific_data = ti.xcom_pull(task_ids='push_data')
return specific_data['status']
result = push_data()
pull_data(result)
xcom_example()XCom允许任务之间交换少量数据。
python
from airflow.decorators import dag, task
from datetime import datetime
@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def xcom_example():
@task()
def push_data():
"""Push data to XCom"""
return {
'total_records': 1000,
'processing_time': 45.2,
'status': 'success'
}
@task()
def pull_data(data: dict):
"""Receive data from previous task"""
print(f"Received {data['total_records']} records")
print(f"Processing took {data['processing_time']} seconds")
# Can also use task instance to pull from specific task
from airflow.operators.python import get_current_context
context = get_current_context()
ti = context['ti']
# Pull from specific task
specific_data = ti.xcom_pull(task_ids='push_data')
return specific_data['status']
result = push_data()
pull_data(result)
xcom_example()XCom with Multiple Return Values
多返回值的XCom
python
@task()
def process_multiple():
"""Return multiple values"""
return {'key1': 'value1', 'key2': 'value2'}
@task()
def use_multiple(data: dict):
"""Use multiple values"""
print(data['key1'], data['key2'])
data = process_multiple()
use_multiple(data)python
@task()
def process_multiple():
"""Return multiple values"""
return {'key1': 'value1', 'key2': 'value2'}
@task()
def use_multiple(data: dict):
"""Use multiple values"""
print(data['key1'], data['key2'])
data = process_multiple()
use_multiple(data)Task Dependencies
任务依赖
Linear Dependencies
线性依赖
python
task1 >> task2 >> task3python
task1 >> task2 >> task3Or
Or
task1.set_downstream(task2)
task2.set_downstream(task3)
undefinedtask1.set_downstream(task2)
task2.set_downstream(task3)
undefinedParallel Dependencies
并行依赖
python
undefinedpython
undefinedFan-out
Fan-out
task1 >> [task2, task3, task4]
task1 >> [task2, task3, task4]
Fan-in
Fan-in
[task2, task3, task4] >> task5
undefined[task2, task3, task4] >> task5
undefinedComplex Dependencies
复杂依赖
python
undefinedpython
undefinedMultiple dependencies
Multiple dependencies
task1 >> task2
task1 >> task3
[task2, task3] >> task4
task1 >> task2
task1 >> task3
[task2, task3] >> task4
Or using chain
Or using chain
from airflow.models.baseoperator import chain
chain(task1, [task2, task3], task4)
undefinedfrom airflow.models.baseoperator import chain
chain(task1, [task2, task3], task4)
undefinedCross-DAG Dependencies
跨DAG依赖
python
from airflow.sensors.external_task import ExternalTaskSensor
wait_for_other_dag = ExternalTaskSensor(
task_id='wait_for_upstream_dag',
external_dag_id='upstream_dag',
external_task_id='final_task',
timeout=3600,
dag=dag,
)python
from airflow.sensors.external_task import ExternalTaskSensor
wait_for_other_dag = ExternalTaskSensor(
task_id='wait_for_upstream_dag',
external_dag_id='upstream_dag',
external_task_id='final_task',
timeout=3600,
dag=dag,
)Dynamic Task Generation
动态任务生成
python
from airflow.decorators import dag, task
from datetime import datetime
@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def dynamic_tasks_example():
@task()
def get_sources():
"""Get list of data sources to process"""
return ['source_1', 'source_2', 'source_3', 'source_4']
@task()
def process_source(source: str):
"""Process a single source"""
print(f"Processing {source}")
# Your processing logic
return f"{source}_processed"
@task()
def combine_results(results: list):
"""Combine all processed results"""
print(f"Combining {len(results)} results")
return results
sources = get_sources()
processed = process_source.expand(source=sources)
combine_results(processed)
dynamic_tasks_example()python
from airflow.decorators import dag, task
from datetime import datetime
@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def dynamic_tasks_example():
@task()
def get_sources():
"""Get list of data sources to process"""
return ['source_1', 'source_2', 'source_3', 'source_4']
@task()
def process_source(source: str):
"""Process a single source"""
print(f"Processing {source}")
# Your processing logic
return f"{source}_processed"
@task()
def combine_results(results: list):
"""Combine all processed results"""
print(f"Combining {len(results)} results")
return results
sources = get_sources()
processed = process_source.expand(source=sources)
combine_results(processed)
dynamic_tasks_example()Dynamic Task Mapping (Airflow 2.3+)
动态任务映射(Airflow 2.3+)
python
@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def task_mapping_example():
@task()
def extract_files():
"""Return list of files to process"""
return [
{'file': 'data1.csv', 'format': 'csv'},
{'file': 'data2.json', 'format': 'json'},
{'file': 'data3.parquet', 'format': 'parquet'},
]
@task()
def process_file(file_info: dict):
"""Process a single file"""
filename = file_info['file']
format = file_info['format']
print(f"Processing {filename} as {format}")
return f"Processed {filename}"
files = extract_files()
process_file.expand(file_info=files)
task_mapping_example()python
@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def task_mapping_example():
@task()
def extract_files():
"""Return list of files to process"""
return [
{'file': 'data1.csv', 'format': 'csv'},
{'file': 'data2.json', 'format': 'json'},
{'file': 'data3.parquet', 'format': 'parquet'},
]
@task()
def process_file(file_info: dict):
"""Process a single file"""
filename = file_info['file']
format = file_info['format']
print(f"Processing {filename} as {format}")
return f"Processed {filename}"
files = extract_files()
process_file.expand(file_info=files)
task_mapping_example()Configuration
配置
airflow.cfg
airflow.cfg
Key configuration options:
ini
[core]关键配置选项:
ini
[core]DAGs folder
DAGs folder
dags_folder = /opt/airflow/dags
dags_folder = /opt/airflow/dags
Executor (LocalExecutor, CeleryExecutor, KubernetesExecutor)
Executor (LocalExecutor, CeleryExecutor, KubernetesExecutor)
executor = LocalExecutor
executor = LocalExecutor
Parallelism
Parallelism
parallelism = 32
dag_concurrency = 16
max_active_runs_per_dag = 16
[database]
parallelism = 32
dag_concurrency = 16
max_active_runs_per_dag = 16
[database]
Database connection
Database connection
sql_alchemy_conn = postgresql+psycopg2://airflow:password@localhost/airflow
[scheduler]
sql_alchemy_conn = postgresql+psycopg2://airflow:password@localhost/airflow
[scheduler]
How often to scan for new DAGs
How often to scan for new DAGs
dag_dir_list_interval = 300
dag_dir_list_interval = 300
Number of scheduler processes
Number of scheduler processes
scheduler_zombie_task_threshold = 300
[webserver]
scheduler_zombie_task_threshold = 300
[webserver]
Web server host and port
Web server host and port
web_server_host = 0.0.0.0
web_server_port = 8080
web_server_host = 0.0.0.0
web_server_port = 8080
Secret key for session
Secret key for session
secret_key = your_secret_key_here
[email]
secret_key = your_secret_key_here
[email]
Email backend
Email backend
email_backend = airflow.utils.email.send_email_smtp
[smtp]
smtp_host = smtp.gmail.com
smtp_port = 587
smtp_user = your_email@gmail.com
smtp_password = your_app_password
smtp_mail_from = airflow@example.com
undefinedemail_backend = airflow.utils.email.send_email_smtp
[smtp]
smtp_host = smtp.gmail.com
smtp_port = 587
smtp_user = your_email@gmail.com
smtp_password = your_app_password
smtp_mail_from = airflow@example.com
undefinedEnvironment Variables
环境变量
bash
undefinedbash
undefinedOverride any config
Override any config
export AIRFLOW__CORE__EXECUTOR=LocalExecutor
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://user:pass@localhost/airflow
export AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
export AIRFLOW__WEBSERVER__SECRET_KEY=your_secret_key
export AIRFLOW__CORE__EXECUTOR=LocalExecutor
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://user:pass@localhost/airflow
export AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
export AIRFLOW__WEBSERVER__SECRET_KEY=your_secret_key
Set Airflow home
Set Airflow home
export AIRFLOW_HOME=~/airflow
undefinedexport AIRFLOW_HOME=~/airflow
undefinedVariables
变量
Store global configuration values.
存储全局配置值。
Set Variables
设置变量
bash
undefinedbash
undefinedVia CLI
Via CLI
airflow variables set my_key my_value
airflow variables set api_endpoint "https://api.example.com/v1"
airflow variables set my_key my_value
airflow variables set api_endpoint "https://api.example.com/v1"
Import from JSON file
Import from JSON file
airflow variables import variables.json
undefinedairflow variables import variables.json
undefinedUse Variables in DAGs
在DAG中使用变量
python
from airflow.models import Variablepython
from airflow.models import VariableGet variable
Get variable
api_endpoint = Variable.get("api_endpoint")
api_endpoint = Variable.get("api_endpoint")
Get with default value
Get with default value
timeout = Variable.get("timeout", default_var=30)
timeout = Variable.get("timeout", default_var=30)
Get as JSON
Get as JSON
config = Variable.get("config_json", deserialize_json=True)
config = Variable.get("config_json", deserialize_json=True)
In a task
In a task
@task()
def use_variable():
endpoint = Variable.get("api_endpoint")
print(f"Using endpoint: {endpoint}")
undefined@task()
def use_variable():
endpoint = Variable.get("api_endpoint")
print(f"Using endpoint: {endpoint}")
undefinedVariables in Templates
在模板中使用变量
python
bash_task = BashOperator(
task_id='use_variable',
bash_command='echo "API: {{ var.value.api_endpoint }}"',
dag=dag,
)python
bash_task = BashOperator(
task_id='use_variable',
bash_command='echo "API: {{ var.value.api_endpoint }}"',
dag=dag,
)Templating with Jinja
使用Jinja模板
Airflow uses Jinja templating for many fields.
Airflow在许多字段中使用Jinja模板。
Common Template Variables
常用模板变量
python
from airflow.operators.bash import BashOperator
templated_command = """
# Execution date
echo "Execution date: {{ ds }}" # YYYY-MM-DD
echo "Execution date no dash: {{ ds_nodash }}" # YYYYMMDD
# Date components
echo "Year: {{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}"
echo "Previous day: {{ macros.ds_add(ds, -1) }}"
# Task instance
echo "Task ID: {{ task.task_id }}"
echo "DAG ID: {{ dag.dag_id }}"
echo "Run ID: {{ run_id }}"
# Parameters
echo "Param: {{ params.my_param }}"
"""
task = BashOperator(
task_id='templated_task',
bash_command=templated_command,
params={'my_param': 'value'},
dag=dag,
)python
from airflow.operators.bash import BashOperator
templated_command = """
# Execution date
echo "Execution date: {{ ds }}" # YYYY-MM-DD
echo "Execution date no dash: {{ ds_nodash }}" # YYYYMMDD
# Date components
echo "Year: {{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}"
echo "Previous day: {{ macros.ds_add(ds, -1) }}"
# Task instance
echo "Task ID: {{ task.task_id }}"
echo "DAG ID: {{ dag.dag_id }}"
echo "Run ID: {{ run_id }}"
# Parameters
echo "Param: {{ params.my_param }}"
"""
task = BashOperator(
task_id='templated_task',
bash_command=templated_command,
params={'my_param': 'value'},
dag=dag,
)Custom Macros
自定义宏
python
def custom_macro(value):
"""Custom Jinja macro"""
return value.upper()
dag = DAG(
'dag_with_macros',
user_defined_macros={
'custom_upper': custom_macro
},
start_date=datetime(2024, 1, 1),
)
task = BashOperator(
task_id='use_macro',
bash_command='echo "{{ custom_upper(params.name) }}"',
params={'name': 'airflow'},
dag=dag,
)python
def custom_macro(value):
"""Custom Jinja macro"""
return value.upper()
dag = DAG(
'dag_with_macros',
user_defined_macros={
'custom_upper': custom_macro
},
start_date=datetime(2024, 1, 1),
)
task = BashOperator(
task_id='use_macro',
bash_command='echo "{{ custom_upper(params.name) }}"',
params={'name': 'airflow'},
dag=dag,
)CLI Commands
CLI命令
DAG Management
DAG管理
bash
undefinedbash
undefinedList all DAGs
List all DAGs
airflow dags list
airflow dags list
List tasks in a DAG
List tasks in a DAG
airflow tasks list my_dag
airflow tasks list my_dag
Show DAG structure
Show DAG structure
airflow dags show my_dag
airflow dags show my_dag
Trigger a DAG run
Trigger a DAG run
airflow dags trigger my_dag
airflow dags trigger my_dag
Trigger with config
Trigger with config
airflow dags trigger my_dag --conf '{"key": "value"}'
airflow dags trigger my_dag --conf '{"key": "value"}'
Pause/Unpause DAG
Pause/Unpause DAG
airflow dags pause my_dag
airflow dags unpause my_dag
airflow dags pause my_dag
airflow dags unpause my_dag
Delete DAG (from metadata, not file)
Delete DAG (from metadata, not file)
airflow dags delete my_dag
undefinedairflow dags delete my_dag
undefinedTask Management
任务管理
bash
undefinedbash
undefinedTest a task (doesn't save state)
Test a task (doesn't save state)
airflow tasks test my_dag my_task 2024-01-01
airflow tasks test my_dag my_task 2024-01-01
Run a task (saves state)
Run a task (saves state)
airflow tasks run my_dag my_task 2024-01-01
airflow tasks run my_dag my_task 2024-01-01
Clear task state
Clear task state
airflow tasks clear my_dag --task-regex my_task
airflow tasks clear my_dag --task-regex my_task
Clear all tasks in DAG
Clear all tasks in DAG
airflow tasks clear my_dag --start-date 2024-01-01 --end-date 2024-01-31
undefinedairflow tasks clear my_dag --start-date 2024-01-01 --end-date 2024-01-31
undefinedDatabase
数据库
bash
undefinedbash
undefinedInitialize database
Initialize database
airflow db init
airflow db init
Upgrade database
Upgrade database
airflow db upgrade
airflow db upgrade
Reset database (WARNING: deletes all data)
Reset database (WARNING: deletes all data)
airflow db reset
airflow db reset
Check database
Check database
airflow db check
undefinedairflow db check
undefinedUsers
用户
bash
undefinedbash
undefinedCreate user
Create user
airflow users create
--username john
--firstname John
--lastname Doe
--role Admin
--email john@example.com
--username john
--firstname John
--lastname Doe
--role Admin
--email john@example.com
airflow users create
--username john
--firstname John
--lastname Doe
--role Admin
--email john@example.com
--username john
--firstname John
--lastname Doe
--role Admin
--email john@example.com
List users
List users
airflow users list
airflow users list
Delete user
Delete user
airflow users delete --username john
undefinedairflow users delete --username john
undefinedConnections
连接
bash
undefinedbash
undefinedList connections
List connections
airflow connections list
airflow connections list
Get connection details
Get connection details
airflow connections get postgres_default
airflow connections get postgres_default
Export connections
Export connections
airflow connections export connections.json
airflow connections export connections.json
Import connections
Import connections
airflow connections import connections.json
undefinedairflow connections import connections.json
undefinedVariables
变量
bash
undefinedbash
undefinedSet variable
Set variable
airflow variables set my_var my_value
airflow variables set my_var my_value
Get variable
Get variable
airflow variables get my_var
airflow variables get my_var
Delete variable
Delete variable
airflow variables delete my_var
airflow variables delete my_var
List all variables
List all variables
airflow variables list
airflow variables list
Export variables to JSON
Export variables to JSON
airflow variables export variables.json
airflow variables export variables.json
Import variables from JSON
Import variables from JSON
airflow variables import variables.json
undefinedairflow variables import variables.json
undefinedWorking with Providers
与Providers协作
Providers extend Airflow with additional operators, hooks, and sensors.
Providers为Airflow扩展了额外的Operator、Hook和Sensor。
Install Providers
安装Providers
bash
undefinedbash
undefinedInstall specific providers
Install specific providers
pip install apache-airflow-providers-amazon
pip install apache-airflow-providers-google
pip install apache-airflow-providers-postgres
pip install apache-airflow-providers-http
pip install apache-airflow-providers-docker
undefinedpip install apache-airflow-providers-amazon
pip install apache-airflow-providers-google
pip install apache-airflow-providers-postgres
pip install apache-airflow-providers-http
pip install apache-airflow-providers-docker
undefinedAWS S3 Example
AWS S3示例
python
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator
from airflow.decorators import dag, task
from datetime import datetime
@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def s3_example():
list_files = S3ListOperator(
task_id='list_s3_files',
bucket='my-bucket',
prefix='data/',
aws_conn_id='aws_default',
)
@task()
def download_from_s3():
"""Download file from S3"""
hook = S3Hook(aws_conn_id='aws_default')
# Download file
content = hook.read_key(
key='data/file.csv',
bucket_name='my-bucket'
)
return len(content)
@task()
def upload_to_s3():
"""Upload file to S3"""
hook = S3Hook(aws_conn_id='aws_default')
# Upload file
hook.load_string(
string_data='Hello, S3!',
key='output/result.txt',
bucket_name='my-bucket'
)
list_files >> download_from_s3() >> upload_to_s3()
s3_example()python
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator
from airflow.decorators import dag, task
from datetime import datetime
@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def s3_example():
list_files = S3ListOperator(
task_id='list_s3_files',
bucket='my-bucket',
prefix='data/',
aws_conn_id='aws_default',
)
@task()
def download_from_s3():
"""Download file from S3"""
hook = S3Hook(aws_conn_id='aws_default')
# Download file
content = hook.read_key(
key='data/file.csv',
bucket_name='my-bucket'
)
return len(content)
@task()
def upload_to_s3():
"""Upload file to S3"""
hook = S3Hook(aws_conn_id='aws_default')
# Upload file
hook.load_string(
string_data='Hello, S3!',
key='output/result.txt',
bucket_name='my-bucket'
)
list_files >> download_from_s3() >> upload_to_s3()
s3_example()Google Cloud Storage Example
Google Cloud Storage示例
python
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
load_to_bigquery = GCSToBigQueryOperator(
task_id='load_to_bq',
bucket='my-gcs-bucket',
source_objects=['data/*.csv'],
destination_project_dataset_table='project.dataset.table',
source_format='CSV',
skip_leading_rows=1,
write_disposition='WRITE_TRUNCATE',
gcp_conn_id='google_cloud_default',
dag=dag,
)python
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
load_to_bigquery = GCSToBigQueryOperator(
task_id='load_to_bq',
bucket='my-gcs-bucket',
source_objects=['data/*.csv'],
destination_project_dataset_table='project.dataset.table',
source_format='CSV',
skip_leading_rows=1,
write_disposition='WRITE_TRUNCATE',
gcp_conn_id='google_cloud_default',
dag=dag,
)Docker Operator Example
Docker Operator示例
python
from airflow.providers.docker.operators.docker import DockerOperator
run_container = DockerOperator(
task_id='run_docker_container',
image='python:3.10',
command='python -c "print(\'Hello from Docker\')"',
docker_url='unix://var/run/docker.sock',
network_mode='bridge',
dag=dag,
)python
from airflow.providers.docker.operators.docker import DockerOperator
run_container = DockerOperator(
task_id='run_docker_container',
image='python:3.10',
command='python -c "print(\'Hello from Docker\')"',
docker_url='unix://var/run/docker.sock',
network_mode='bridge',
dag=dag,
)Error Handling and Retries
错误处理与重试
Task-level Configuration
任务级配置
python
@task(
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(hours=1),
)
def task_with_retries():
"""Task with custom retry logic"""
# Your code
passpython
@task(
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(hours=1),
)
def task_with_retries():
"""Task with custom retry logic"""
# Your code
passFailure Callbacks
失败回调
python
def on_failure_callback(context):
"""Called when task fails"""
ti = context['task_instance']
print(f"Task {ti.task_id} failed!")
# Send alert, create ticket, etc.
def on_success_callback(context):
"""Called when task succeeds"""
print("Task succeeded!")
@task(
on_failure_callback=on_failure_callback,
on_success_callback=on_success_callback,
)
def monitored_task():
"""Task with callbacks"""
# Your code
passpython
def on_failure_callback(context):
"""Called when task fails"""
ti = context['task_instance']
print(f"Task {ti.task_id} failed!")
# Send alert, create ticket, etc.
def on_success_callback(context):
"""Called when task succeeds"""
print("Task succeeded!")
@task(
on_failure_callback=on_failure_callback,
on_success_callback=on_success_callback,
)
def monitored_task():
"""Task with callbacks"""
# Your code
passTry/Except in Tasks
任务中的Try/Except
python
from airflow.exceptions import AirflowException
@task()
def safe_task():
"""Task with error handling"""
try:
# Your code that might fail
result = risky_operation()
return result
except SpecificException as e:
# Log error but don't fail task
print(f"Warning: {e}")
return None
except Exception as e:
# Fail task with custom message
raise AirflowException(f"Critical error: {e}")python
from airflow.exceptions import AirflowException
@task()
def safe_task():
"""Task with error handling"""
try:
# Your code that might fail
result = risky_operation()
return result
except SpecificException as e:
# Log error but don't fail task
print(f"Warning: {e}")
return None
except Exception as e:
# Fail task with custom message
raise AirflowException(f"Critical error: {e}")Testing DAGs
测试DAG
Unit Testing
单元测试
python
undefinedpython
undefinedtest_dag.py
test_dag.py
import pytest
from datetime import datetime
from airflow.models import DagBag
def test_dag_loaded():
"""Test that DAG is loaded correctly"""
dagbag = DagBag(dag_folder='dags/', include_examples=False)
assert 'my_dag' in dagbag.dags
assert len(dagbag.import_errors) == 0
def test_dag_structure():
"""Test DAG structure"""
dagbag = DagBag(dag_folder='dags/', include_examples=False)
dag = dagbag.get_dag('my_dag')
# Check task count
assert len(dag.tasks) == 5
# Check specific task exists
assert 'extract' in dag.task_ids
# Check dependencies
extract_task = dag.get_task('extract')
downstream = extract_task.downstream_task_ids
assert 'transform' in downstreamdef test_task_execution():
"""Test task execution"""
from airflow.models import TaskInstance
from airflow import settings
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
task = dag.get_task('extract')
# Create task instance
ti = TaskInstance(task=task, execution_date=datetime(2024, 1, 1))
# Test execution
ti.run(ignore_ti_state=True)
# Check result
assert ti.state == 'success'undefinedimport pytest
from datetime import datetime
from airflow.models import DagBag
def test_dag_loaded():
"""Test that DAG is loaded correctly"""
dagbag = DagBag(dag_folder='dags/', include_examples=False)
assert 'my_dag' in dagbag.dags
assert len(dagbag.import_errors) == 0
def test_dag_structure():
"""Test DAG structure"""
dagbag = DagBag(dag_folder='dags/', include_examples=False)
dag = dagbag.get_dag('my_dag')
# Check task count
assert len(dag.tasks) == 5
# Check specific task exists
assert 'extract' in dag.task_ids
# Check dependencies
extract_task = dag.get_task('extract')
downstream = extract_task.downstream_task_ids
assert 'transform' in downstreamdef test_task_execution():
"""Test task execution"""
from airflow.models import TaskInstance
from airflow import settings
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
task = dag.get_task('extract')
# Create task instance
ti = TaskInstance(task=task, execution_date=datetime(2024, 1, 1))
# Test execution
ti.run(ignore_ti_state=True)
# Check result
assert ti.state == 'success'undefinedIntegration Testing
集成测试
python
undefinedpython
undefinedtest_integration.py
test_integration.py
from airflow.models import DagBag
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
def test_dag_run():
"""Test complete DAG run"""
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
# Trigger DAG
dag.test()undefinedfrom airflow.models import DagBag
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
def test_dag_run():
"""Test complete DAG run"""
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
# Trigger DAG
dag.test()undefinedBest Practices
最佳实践
1. Idempotent Tasks
1. 幂等任务
python
@task()
def idempotent_load(execution_date):
"""
Task that can be run multiple times safely
"""
# Delete existing data for this date first
delete_query = "DELETE FROM table WHERE date = %s"
hook.run(delete_query, parameters=[execution_date])
# Then insert new data
insert_query = "INSERT INTO table ..."
hook.run(insert_query)python
@task()
def idempotent_load(execution_date):
"""
Task that can be run multiple times safely
"""
# Delete existing data for this date first
delete_query = "DELETE FROM table WHERE date = %s"
hook.run(delete_query, parameters=[execution_date])
# Then insert new data
insert_query = "INSERT INTO table ..."
hook.run(insert_query)2. Use Connections for Credentials
2. 使用连接存储凭据
python
undefinedpython
undefinedGood: Use connections
Good: Use connections
@task()
def good_practice():
hook = PostgresHook(postgres_conn_id='postgres_default')
# Use hook
@task()
def good_practice():
hook = PostgresHook(postgres_conn_id='postgres_default')
# Use hook
Bad: Hardcode credentials
Bad: Hardcode credentials
@task()
def bad_practice():
import psycopg2
conn = psycopg2.connect(
host='localhost',
user='user', # Don't do this!
password='password' # Never do this!
)
undefined@task()
def bad_practice():
import psycopg2
conn = psycopg2.connect(
host='localhost',
user='user', # Don't do this!
password='password' # Never do this!
)
undefined3. Don't Pass Large Data Between Tasks
3. 不要在任务间传递大数据
python
undefinedpython
undefinedGood: Pass references
Good: Pass references
@task()
def process_data():
# Process and save to database/storage
data_id = save_to_database(large_data)
return data_id # Return only ID
@task()
@task()
def process_data():
# Process and save to database/storage
data_id = save_to_database(large_data)
return data_id # Return only ID
@task()
def use_data(data_id):
# Retrieve data from database/storage
data = get_from_database(data_id)
# Process data
undefined—
4. 使用TaskFlow API
—
python
undefined—
Good: TaskFlow API
—
@dag()
def good_dag():
@task()
def extract():
return data
@task()
def transform(data):
return transformed_data
extract() >> transform()—
Bad: Traditional operators
—
def bad_dag():
extract = PythonOperator(
task_id='extract',
python_callable=extract_func,
dag=dag,
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_func,
op_kwargs={'data': '{{ ti.xcom_pull(task_ids="extract") }}'},
dag=dag,
)
extract >> transformundefined—
5. 保持DAG简洁
—
python
undefined—
Good: Split into smaller tasks
—
@task()
def extract_source1():
pass
@task()
def extract_source2():
pass
@task()
def combine_extracts():
pass
—
Bad: Monolithic task
—
@task()
def extract_all_sources():
# Extract from 10 different sources in one task
pass
undefined—
6. 使用版本控制
—
- 将DAG代码存储在Git仓库中
- 使用分支管理不同环境(开发、测试、生产)
- 提交前运行单元测试
—
7. 监控与告警
—
python
undefined—
Configure email alerts
—
default_args = {
'email': ['alerts@example.com'],
'email_on_failure': True,
'email_on_retry': False,
}
—
Or use Slack/Teams alerts
—
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
send_slack_alert = SlackWebhookOperator(
task_id='send_slack_alert',
slack_webhook_conn_id='slack_default',
message='Task {{ task.task_id }} failed!',
dag=dag,
)
undefined—
8. 清理旧数据
—
python
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def clean_old_logs(**context):
"""Clean logs older than 30 days"""
import os
import shutil
from datetime import datetime, timedelta
log_dir = '/opt/airflow/logs'
cutoff_date = datetime.now() - timedelta(days=30)
for root, dirs, files in os.walk(log_dir):
for file in files:
file_path = os.path.join(root, file)
file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_mtime < cutoff_date:
os.remove(file_path)
clean_task = PythonOperator(
task_id='clean_old_logs',
python_callable=clean_old_logs,
schedule='0 0 * * 0', # Run weekly on Sunday
dag=dag,
)