Loading...
Loading...
Complete guide for Apache Airflow orchestration including DAGs, operators, sensors, XComs, task dependencies, dynamic workflows, and production deployment
npx skill4agent add manutej/luxor-claude-marketplace apache-airflow-orchestrationfrom datetime import datetime
from airflow.sdk import DAG
with DAG(
dag_id="example_dag",
start_date=datetime(2022, 1, 1),
schedule="0 0 * * *", # Daily at midnight
catchup=False,
tags=["example", "tutorial"],
) as dag:
# Tasks defined here
pass>><<chain()cross_downstream()# Simple linear flow
task1 >> task2 >> task3
# Fan-out pattern
task1 >> [task2, task3, task4]
# Fan-in pattern
[task1, task2, task3] >> task4
# Complex dependencies
first_task >> [second_task, third_task]
third_task << fourth_taskairflow schedulerfrom datetime import datetime
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
with DAG(
dag_id="basic_dag",
start_date=datetime(2022, 1, 1),
schedule="0 0 * * *",
catchup=False,
) as dag:
task1 = BashOperator(
task_id="task1",
bash_command="echo 'Task 1 executed'"
)
task2 = BashOperator(
task_id="task2",
bash_command="echo 'Task 2 executed'"
)
task1 >> task2from airflow.sdk import chain
# These are equivalent:
task1 >> task2 >> task3 >> task4
chain(task1, task2, task3, task4)from airflow.sdk import chain
from airflow.operators.empty import EmptyOperator
# Dynamically generate and chain tasks
chain(*[EmptyOperator(task_id=f"task_{i}") for i in range(1, 6)])from airflow.sdk import chain
# Creates paired dependencies:
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)from airflow.sdk import cross_downstream
# Both op1 and op2 feed into both op3 and op4
cross_downstream([op1, op2], [op3, op4])from airflow.operators.python import BranchPythonOperator
def choose_branch(**context):
if context['data_interval_start'].day == 1:
return 'monthly_task'
return 'daily_task'
branch = BranchPythonOperator(
task_id='branch_task',
python_callable=choose_branch
)
daily_task = BashOperator(task_id='daily_task', bash_command='echo daily')
monthly_task = BashOperator(task_id='monthly_task', bash_command='echo monthly')
branch >> [daily_task, monthly_task]from airflow.operators.branch import BaseBranchOperator
class MyBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
"""
Run extra branch on first day of month
"""
if context['data_interval_start'].day == 1:
return ['daily_task_id', 'monthly_task_id']
elif context['data_interval_start'].day == 2:
return 'daily_task_id'
else:
return None # Skip all downstream tasksfrom airflow.sdk import task_group
from airflow.operators.empty import EmptyOperator
@task_group()
def data_processing_group():
extract = EmptyOperator(task_id="extract")
transform = EmptyOperator(task_id="transform")
load = EmptyOperator(task_id="load")
extract >> transform >> load
@task_group()
def validation_group():
validate_schema = EmptyOperator(task_id="validate_schema")
validate_data = EmptyOperator(task_id="validate_data")
validate_schema >> validate_data
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
start >> data_processing_group() >> validation_group() >> endfrom airflow.sdk import Label
# Inline labeling
my_task >> Label("When empty") >> other_task
# Method-based labeling
my_task.set_downstream(other_task, Label("When empty"))from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator
import pendulum
with DAG(
dag_id='latest_only_example',
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=True,
schedule="@daily",
) as dag:
latest_only = LatestOnlyOperator(task_id='latest_only')
task1 = EmptyOperator(task_id='task1')
task2 = EmptyOperator(task_id='task2')
task3 = EmptyOperator(task_id='task3')
task4 = EmptyOperator(task_id='task4', trigger_rule='all_done')
latest_only >> task1 >> task3
latest_only >> task4
task2 >> task3
task2 >> task4from airflow.providers.standard.operators.bash import BashOperator
bash_task = BashOperator(
task_id="bash_example",
bash_command="echo 'Hello from Bash'; date",
env={'MY_VAR': 'value'}, # Environment variables
append_env=True, # Append to existing env vars
output_encoding='utf-8'
)bash_complex = BashOperator(
task_id="complex_bash",
bash_command="""
cd /path/to/dir
python process_data.py --input {{ ds }} --output {{ tomorrow_ds }}
if [ $? -eq 0 ]; then
echo "Success"
else
echo "Failed" && exit 1
fi
""",
)from airflow.providers.standard.operators.python import PythonOperator
def my_python_function(name, **context):
print(f"Hello {name}!")
print(f"Execution date: {context['ds']}")
return "Success"
python_task = PythonOperator(
task_id="python_example",
python_callable=my_python_function,
op_kwargs={'name': 'Airflow'},
provide_context=True
)import json
import pendulum
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
return json.loads(data_string)
def transform(ti):
# Pull from XCom
order_data_dict = ti.xcom_pull(task_ids="extract")
total_order_value = sum(order_data_dict.values())
return {"total_order_value": total_order_value}
def load(ti):
# Pull from XCom
total = ti.xcom_pull(task_ids="transform")["total_order_value"]
print(f"Total order value is: {total:.2f}")
with DAG(
dag_id="legacy_etl_pipeline",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
) as dag:
extract_task = PythonOperator(task_id="extract", python_callable=extract)
transform_task = PythonOperator(task_id="transform", python_callable=transform)
load_task = PythonOperator(task_id="load", python_callable=load)
extract_task >> transform_task >> load_taskfrom airflow.providers.smtp.operators.smtp import EmailOperator
email_task = EmailOperator(
task_id='send_email',
to='recipient@example.com',
subject='Airflow Notification',
html_content='<h3>Task completed successfully!</h3>',
cc=['cc@example.com'],
bcc=['bcc@example.com']
)from airflow.operators.empty import EmptyOperator
start = EmptyOperator(task_id='start')
end = EmptyOperator(task_id='end')
# Useful for organizing complex DAGs
start >> [task1, task2, task3] >> endfrom airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class MyCustomOperator(BaseOperator):
@apply_defaults
def __init__(self, my_param, *args, **kwargs):
super().__init__(*args, **kwargs)
self.my_param = my_param
def execute(self, context):
self.log.info(f"Executing with param: {self.my_param}")
# Custom logic here
return "Result"
# Usage
custom_task = MyCustomOperator(
task_id="custom",
my_param="value"
)from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
import pendulum
with DAG(
dag_id="example_external_task_sensor",
start_date=pendulum.datetime(2021, 10, 20, tz="UTC"),
catchup=False,
schedule=None,
) as dag:
wait_for_task = ExternalTaskSensor(
task_id="wait_for_task",
external_dag_id="upstream_dag",
external_task_id="upstream_task",
allowed_states=["success"],
failed_states=["failed"],
execution_delta=None, # Same execution_date
timeout=600, # 10 minutes
poke_interval=60, # Check every 60 seconds
)# More efficient - releases worker slot while waiting
wait_for_task_async = ExternalTaskSensor(
task_id="wait_for_task_async",
external_dag_id="upstream_dag",
external_task_id="upstream_task",
allowed_states=["success"],
failed_states=["failed"],
deferrable=True, # Use async mode
)from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id="wait_for_file",
filepath="/path/to/file.csv",
poke_interval=30,
timeout=600,
mode='poke' # or 'reschedule' for long waits
)from datetime import timedelta
from airflow.sensors.time_delta import TimeDeltaSensor
wait_one_hour = TimeDeltaSensor(
task_id="wait_one_hour",
delta=timedelta(hours=1)
)from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceSensor
import pendulum
with DAG(
dag_id="bigquery_sensor_example",
start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
) as dag:
wait_for_table = BigQueryTableExistenceSensor(
task_id="wait_for_table",
project_id="your-project-id",
dataset_id="your_dataset",
table_id="your_table",
bigquery_conn_id="google_cloud_default",
location="US",
poke_interval=60,
timeout=3600,
)from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class MyCustomSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, my_condition, *args, **kwargs):
super().__init__(*args, **kwargs)
self.my_condition = my_condition
def poke(self, context):
# Return True when condition is met
self.log.info(f"Checking condition: {self.my_condition}")
# Custom logic to check condition
return check_condition(self.my_condition)from datetime import timedelta
from airflow.sdk import BaseSensorOperator, StartTriggerArgs
class WaitHoursSensor(BaseSensorOperator):
start_trigger_args = StartTriggerArgs(
trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
trigger_kwargs={"moment": timedelta(hours=1)},
next_method="execute_complete",
next_kwargs=None,
timeout=None,
)
start_from_trigger = True
def __init__(self, *args, trigger_kwargs=None, start_from_trigger=True, **kwargs):
super().__init__(*args, **kwargs)
if trigger_kwargs:
self.start_trigger_args.trigger_kwargs = trigger_kwargs
self.start_from_trigger = start_from_trigger
def execute_complete(self, context, event=None):
return # Task completedef push_function(**context):
value = "Important data"
context['ti'].xcom_push(key='my_key', value=value)
# Or simply return (uses 'return_value' key)
return value
push_task = PythonOperator(
task_id='push',
python_callable=push_function,
provide_context=True
)def pull_function(**context):
# Pull by task_id (uses 'return_value' key)
value = context['ti'].xcom_pull(task_ids='push')
# Pull with specific key
value = context['ti'].xcom_pull(task_ids='push', key='my_key')
print(f"Pulled value: {value}")
pull_task = PythonOperator(
task_id='pull',
python_callable=pull_function,
provide_context=True
)from airflow.decorators import task
@task
def extract():
return {"data": [1, 2, 3, 4, 5]}
@task
def transform(data_dict):
# Automatically receives XCom from extract
total = sum(data_dict['data'])
return {"total": total}
@task
def load(summary):
print(f"Total: {summary['total']}")
# Automatic XCom handling
data = extract()
summary = transform(data)
load(summary)@task
def process_large_data():
# Process data
large_result = compute_large_dataset()
# Store in S3/GCS
file_path = save_to_s3(large_result, "s3://bucket/result.parquet")
# Return only the path
return {"result_path": file_path}
@task
def consume_large_data(metadata):
# Load from S3/GCS
data = load_from_s3(metadata['result_path'])
process(data)from airflow.providers.standard.operators.bash import BashOperator
process_file = BashOperator(
task_id="process",
bash_command="python process.py {{ ti.xcom_pull(task_ids='extract') }}",
)from airflow.sdk import task
from airflow.providers.smtp.operators.smtp import EmailOperator
@task
def get_ip():
return "192.168.1.1"
@task(multiple_outputs=True)
def compose_email(external_ip):
return {
'subject': f'Server connected from {external_ip}',
'body': f'Your server is connected from {external_ip}<br>'
}
email_info = compose_email(get_ip())
EmailOperator(
task_id='send_email',
to='example@example.com',
subject=email_info['subject'],
html_content=email_info['body']
)from airflow.sdk import DAG
from airflow.operators.empty import EmptyOperator
with DAG("dynamic_loop_example", ...) as dag:
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
# Dynamically create tasks
options = ["branch_a", "branch_b", "branch_c", "branch_d"]
for option in options:
task = EmptyOperator(task_id=option)
start >> task >> endfrom airflow.decorators import task
@task
def extract():
# Returns list of items to process
return [1, 2, 3, 4, 5]
@task
def transform(item):
# Processes single item
return item * 2
@task
def load(items):
# Receives all transformed items
print(f"Loaded {len(items)} items: {items}")
# Dynamic mapping
data = extract()
transformed = transform.expand(item=data) # Creates 5 parallel tasks
load(transformed)from airflow.operators.bash import BashOperator
class ExtractOperator(BaseOperator):
def execute(self, context):
return ["file1.csv", "file2.csv", "file3.csv"]
class TransformOperator(BaseOperator):
def __init__(self, input, **kwargs):
super().__init__(**kwargs)
self.input = input
def execute(self, context):
# Process single file
return f"processed_{self.input}"
extract = ExtractOperator(task_id="extract")
transform = TransformOperator.partial(task_id="transform").expand(input=extract.output)from airflow.decorators import task, task_group
@task
def add_one(value):
return value + 1
@task
def double(value):
return value * 2
@task_group
def process_group(value):
incremented = add_one(value)
return double(incremented)
@task
def aggregate(results):
print(f"Results: {results}")
# Map task group over values
results = process_group.expand(value=[1, 2, 3, 4, 5])
aggregate(results)@task
def process(base_path, filename):
full_path = f"{base_path}/{filename}"
return f"Processed {full_path}"
# Static parameter 'base_path', dynamic 'filename'
results = process.partial(base_path="/data").expand(
filename=["file1.csv", "file2.csv", "file3.csv"]
)from airflow.decorators import dag, task
import pendulum
@dag(
dag_id="taskflow_example",
start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
schedule=None,
catchup=False,
)
def my_taskflow_dag():
@task
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
import json
return json.loads(data_string)
@task
def transform(order_data_dict):
total = sum(order_data_dict.values())
return {"total_order_value": total}
@task
def load(summary):
print(f"Total order value: {summary['total_order_value']:.2f}")
# Function calls create task dependencies automatically
order_data = extract()
summary = transform(order_data)
load(summary)
# Instantiate the DAG
my_taskflow_dag()@task(multiple_outputs=True)
def extract_data():
return {
'orders': [1, 2, 3],
'customers': ['A', 'B', 'C'],
'revenue': 1000.50
}
@task
def process_orders(orders):
print(f"Processing {len(orders)} orders")
@task
def process_customers(customers):
print(f"Processing {len(customers)} customers")
# Access individual outputs
data = extract_data()
process_orders(data['orders'])
process_customers(data['customers'])from airflow.decorators import dag, task
from airflow.providers.standard.operators.bash import BashOperator
import pendulum
@dag(
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
schedule=None,
)
def mixed_dag():
@task
def get_date():
from datetime import datetime
return datetime.now().strftime("%Y%m%d")
# Traditional operator
bash_task = BashOperator(
task_id="print_date",
bash_command="echo Processing data for {{ ti.xcom_pull(task_ids='get_date') }}"
)
@task
def process_results():
print("Processing complete")
# Mix task types
date = get_date()
date >> bash_task >> process_results()
mixed_dag()from airflow.decorators import dag, task
import pendulum
@dag(
dag_id="virtualenv_example",
start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
)
def virtualenv_dag():
@task.virtualenv(
requirements=["pandas==1.5.0", "numpy==1.23.0"],
system_site_packages=False
)
def analyze_data():
import pandas as pd
import numpy as np
df = pd.DataFrame({'col1': [1, 2, 3], 'col2': [4, 5, 6]})
result = np.mean(df['col1'])
return float(result)
@task
def report_results(mean_value):
print(f"Mean value: {mean_value}")
result = analyze_data()
report_results(result)
virtualenv_dag()from airflow.sdk import DAG, Asset
from airflow.operators.bash import BashOperator
from datetime import datetime
# Define asset
customer_data = Asset("s3://my-bucket/customers.parquet")
# Producer DAG
with DAG(
dag_id="producer_dag",
start_date=datetime(2023, 1, 1),
schedule="@daily",
) as producer:
BashOperator(
task_id="generate_data",
bash_command="python generate_customers.py",
outlets=[customer_data] # Marks asset as updated
)
# Consumer DAG - triggered when asset updates
with DAG(
dag_id="consumer_dag",
schedule=[customer_data], # Triggered by asset
start_date=datetime(2023, 1, 1),
catchup=False,
) as consumer:
BashOperator(
task_id="process_data",
bash_command="python process_customers.py"
)from airflow.datasets import Dataset
asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")
with DAG(
dag_id="wait_for_both",
schedule=[asset_1 & asset_2], # Both must update
start_date=datetime(2023, 1, 1),
):
passasset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")
with DAG(
dag_id="triggered_by_either",
schedule=[asset_1 | asset_2], # Either can trigger
start_date=datetime(2023, 1, 1),
):
passasset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")
asset_3 = Dataset("s3://bucket/file3.csv")
with DAG(
dag_id="complex_condition",
schedule=(asset_1 | (asset_2 & asset_3)), # asset_1 OR (asset_2 AND asset_3)
start_date=datetime(2023, 1, 1),
):
passfrom airflow.datasets import Dataset, AssetAlias
from airflow.decorators import task
# Producer with alias
with DAG(dag_id="alias_producer", start_date=datetime(2023, 1, 1)):
@task(outlets=[AssetAlias("my-alias")])
def produce_data(*, outlet_events):
# Dynamically add actual asset
outlet_events[AssetAlias("my-alias")].add(
Dataset("s3://bucket/my-file.csv")
)
# Consumer depending on alias
with DAG(
dag_id="alias_consumer",
schedule=AssetAlias("my-alias"),
start_date=datetime(2023, 1, 1),
):
pass@task
def process_asset_data(*, triggering_asset_events):
for event in triggering_asset_events:
print(f"Asset: {event.asset.uri}")
print(f"Timestamp: {event.timestamp}")
print(f"Extra: {event.extra}")# Every day at midnight
schedule="0 0 * * *"
# Every Monday at 9 AM
schedule="0 9 * * 1"
# Every 15 minutes
schedule="*/15 * * * *"
# First day of month at noon
schedule="0 12 1 * *"
# Weekdays at 6 PM
schedule="0 18 * * 1-5"from datetime import timedelta
with DAG(
dag_id="timedelta_schedule",
start_date=datetime(2023, 1, 1),
schedule=timedelta(hours=6), # Every 6 hours
):
pass# Common presets
schedule="@once" # Run once
schedule="@hourly" # Every hour
schedule="@daily" # Daily at midnight
schedule="@weekly" # Every Sunday at midnight
schedule="@monthly" # First day of month at midnight
schedule="@yearly" # January 1st at midnight
schedule=None # Manual trigger onlywith DAG(
dag_id="catchup_example",
start_date=datetime(2023, 1, 1),
schedule="@daily",
catchup=True, # Run all missed intervals
):
pass
with DAG(
dag_id="no_catchup",
start_date=datetime(2023, 1, 1),
schedule="@daily",
catchup=False, # Only run latest interval
):
pass# Backfill specific date range
airflow dags backfill \
--start-date 2023-01-01 \
--end-date 2023-01-31 \
my_dag_id
# Backfill with marking success (no execution)
airflow dags backfill \
--start-date 2023-01-01 \
--end-date 2023-01-31 \
--mark-success \
my_dag_idfrom airflow.operators.bash import BashOperator
from datetime import timedelta
task_with_retry = BashOperator(
task_id="retry_task",
bash_command="python might_fail.py",
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=30),
)from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['alerts@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id="production_dag",
default_args=default_args,
start_date=datetime(2023, 1, 1),
schedule="@daily",
):
passfrom airflow.operators.bash import BashOperator
from datetime import timedelta
# Limit concurrent instances of this task
limited_task = BashOperator(
task_id="limited_task",
bash_command="echo 'Processing'",
max_active_tis_per_dag=3 # Max 3 instances running
)with DAG(
dag_id="concurrent_dag",
start_date=datetime(2023, 1, 1),
schedule="@daily",
max_active_runs=5, # Max 5 DAG runs simultaneously
concurrency=10, # Max 10 task instances across all runs
):
pass@task
def idempotent_load(**context):
execution_date = context['ds']
# Delete existing data for this date first
delete_query = f"""
DELETE FROM target_table
WHERE date = '{execution_date}'
"""
execute_sql(delete_query)
# Insert new data
insert_query = f"""
INSERT INTO target_table
SELECT * FROM source
WHERE date = '{execution_date}'
"""
execute_sql(insert_query)from datetime import timedelta
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
print(f"SLA missed for {task_list}")
# Send alert to monitoring system
with DAG(
dag_id="sla_dag",
start_date=datetime(2023, 1, 1),
schedule="@daily",
default_args={
'sla': timedelta(hours=2), # Task should complete in 2 hours
},
sla_miss_callback=sla_miss_callback,
):
passdef on_failure_callback(context):
print(f"Task {context['task_instance'].task_id} failed")
# Send to Slack, PagerDuty, etc.
def on_success_callback(context):
print(f"Task {context['task_instance'].task_id} succeeded")
def on_retry_callback(context):
print(f"Task {context['task_instance'].task_id} retrying")
task_with_callbacks = BashOperator(
task_id="monitored_task",
bash_command="python my_script.py",
on_failure_callback=on_failure_callback,
on_success_callback=on_success_callback,
on_retry_callback=on_retry_callback,
)version: '3'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
webserver:
image: apache/airflow:2.7.0
depends_on:
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
ports:
- "8080:8080"
command: webserver
scheduler:
image: apache/airflow:2.7.0
depends_on:
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
command: scheduler# In airflow.cfg
[kubernetes]
namespace = airflow
worker_container_repository = my-registry/airflow
worker_container_tag = 2.7.0
delete_worker_pods = True
delete_worker_pods_on_failure = False
[core]
executor = KubernetesExecutorfrom kubernetes.client import models as k8s
task_with_gpu = BashOperator(
task_id="gpu_task",
bash_command="python train_model.py",
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s.V1ResourceRequirements(
limits={"nvidia.com/gpu": "1"}
)
)
]
)
)
}
)from airflow.decorators import task
import logging
@task
def monitored_task():
logger = logging.getLogger(__name__)
logger.info("Starting data processing", extra={
'process_id': 'abc123',
'record_count': 1000
})
try:
process_data()
logger.info("Processing complete")
except Exception as e:
logger.error(f"Processing failed: {str(e)}", extra={
'error_type': type(e).__name__
})
raisefrom airflow.stats import Stats
@task
def task_with_metrics():
Stats.incr('my_dag.task_started')
start_time = time.time()
process_data()
duration = time.time() - start_time
Stats.timing('my_dag.task_duration', duration)
Stats.incr('my_dag.task_completed')dags/
├── common/
│ ├── __init__.py
│ ├── operators.py # Custom operators
│ ├── sensors.py # Custom sensors
│ └── utils.py # Utility functions
├── etl/
│ ├── customer_pipeline.py
│ ├── order_pipeline.py
│ └── product_pipeline.py
├── ml/
│ ├── training_dag.py
│ └── inference_dag.py
└── maintenance/
├── cleanup_dag.py
└── backup_dag.pyimport pytest
from airflow.models import DagBag
def test_dag_loaded():
dagbag = DagBag(dag_folder='dags/', include_examples=False)
assert len(dagbag.import_errors) == 0
def test_task_count():
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
assert len(dag.tasks) == 5
def test_task_dependencies():
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
extract = dag.get_task('extract')
transform = dag.get_task('transform')
assert transform in extract.downstream_listfrom airflow.models import DagBag
from airflow.utils.state import State
def test_dag_runs():
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
# Test DAG run
dag_run = dag.create_dagrun(
state=State.RUNNING,
execution_date=datetime(2023, 1, 1),
run_type='manual'
)
# Run specific task
task_instance = dag_run.get_task_instance('extract')
task_instance.run()
assert task_instance.state == State.SUCCESS# Use Variables for configuration
from airflow.models import Variable
config = Variable.get("my_config", deserialize_json=True)
api_key = Variable.get("api_key")
# Use Connections for external services
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection('my_postgres')
db_url = f"postgresql://{conn.login}:{conn.password}@{conn.host}:{conn.port}/{conn.schema}"# Bad - imports at DAG level slow parsing
from heavy_library import process
with DAG(...):
pass
# Good - imports inside tasks
with DAG(...):
@task
def my_task():
from heavy_library import process
process()# This will fail
task1 >> task2 >> task3 >> task1 # Circular!
# Must be acyclic
task1 >> task2 >> task3# Bad - storing large data in XCom
@task
def process():
large_df = pd.read_csv('big_file.csv')
return large_df # Too large!
# Good - store reference
@task
def process():
large_df = pd.read_csv('big_file.csv')
path = save_to_s3(large_df)
return path # Just the pathapache-airflow