Loading...
Loading...
Expert knowledge of Apache Airflow for building, scheduling, and monitoring data pipelines and workflows
npx skill4agent add aradotso/data-skills apache-airflow-orchestrationSkill by ara.so — Data Skills collection.
# Install Airflow with constraints for your Python version
AIRFLOW_VERSION=3.2.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"# Download docker-compose.yaml
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
# Create required directories
mkdir -p ./dags ./logs ./plugins ./config
# Set the Airflow user
echo -e "AIRFLOW_UID=$(id -u)" > .env
# Initialize the database
docker compose up airflow-init
# Start Airflow
docker compose uphttp://localhost:8080airflowairflow# Initialize database and create admin user
airflow db init
# Create admin user
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
# Start the web server (default port 8080)
airflow webserver --port 8080
# Start the scheduler (in another terminal)
airflow schedulerfrom datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# Default arguments applied to all tasks
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['alerts@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
# Define the DAG
dag = DAG(
'example_data_pipeline',
default_args=default_args,
description='A simple data pipeline',
schedule='0 0 * * *', # Run daily at midnight (cron expression)
start_date=datetime(2024, 1, 1),
catchup=False, # Don't run for past dates
tags=['example', 'data-engineering'],
)
def extract_data(**context):
"""Extract data from source"""
print("Extracting data...")
# Your extraction logic here
return {'records': 1000}
def transform_data(**context):
"""Transform extracted data"""
# Access data from previous task via XCom
ti = context['ti']
extracted = ti.xcom_pull(task_ids='extract')
print(f"Transforming {extracted['records']} records...")
return {'transformed_records': extracted['records']}
def load_data(**context):
"""Load data to destination"""
ti = context['ti']
transformed = ti.xcom_pull(task_ids='transform')
print(f"Loading {transformed['transformed_records']} records...")
# Define tasks
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
dag=dag,
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
dag=dag,
)
load = PythonOperator(
task_id='load',
python_callable=load_data,
dag=dag,
)
# Set task dependencies
extract >> transform >> loadfrom datetime import datetime
from airflow.decorators import dag, task
@dag(
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['taskflow', 'etl'],
)
def taskflow_etl_pipeline():
"""
ETL pipeline using TaskFlow API
"""
@task()
def extract():
"""Extract data from API"""
import requests
# Using environment variable for API key
import os
api_key = os.getenv('DATA_API_KEY')
# Simulated extraction
data = {'records': [1, 2, 3, 4, 5]}
return data
@task()
def transform(data: dict):
"""Transform the data"""
records = data['records']
transformed = [r * 2 for r in records]
return {'transformed': transformed}
@task()
def load(data: dict):
"""Load data to warehouse"""
print(f"Loading {len(data['transformed'])} records")
# Your loading logic here
return True
# Define pipeline
extracted_data = extract()
transformed_data = transform(extracted_data)
load(transformed_data)
# Instantiate the DAG
taskflow_etl_pipeline()from airflow.operators.bash import BashOperator
run_script = BashOperator(
task_id='run_data_script',
bash_command='python /opt/scripts/process_data.py --date {{ ds }}',
dag=dag,
)from airflow.operators.python import PythonOperator
def my_function(param1, param2, **context):
execution_date = context['execution_date']
print(f"Processing for {execution_date}")
# Your logic here
python_task = PythonOperator(
task_id='python_task',
python_callable=my_function,
op_kwargs={'param1': 'value1', 'param2': 'value2'},
dag=dag,
)from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
def choose_branch(**context):
"""Decide which branch to execute"""
execution_date = context['execution_date']
if execution_date.day % 2 == 0:
return 'even_day_task'
else:
return 'odd_day_task'
branch = BranchPythonOperator(
task_id='branch_task',
python_callable=choose_branch,
dag=dag,
)
even_task = EmptyOperator(task_id='even_day_task', dag=dag)
odd_task = EmptyOperator(task_id='odd_day_task', dag=dag)
branch >> [even_task, odd_task]from airflow.operators.email import EmailOperator
send_email = EmailOperator(
task_id='send_notification',
to='team@example.com',
subject='Pipeline {{ ds }} completed',
html_content='<p>The pipeline for {{ ds }} has completed successfully.</p>',
dag=dag,
)from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id='wait_for_data_file',
filepath='/data/input/file_{{ ds }}.csv',
poke_interval=30, # Check every 30 seconds
timeout=3600, # Timeout after 1 hour
mode='poke', # 'poke' or 'reschedule'
dag=dag,
)from airflow.sensors.time_delta import TimeDeltaSensor
from datetime import timedelta
wait_sensor = TimeDeltaSensor(
task_id='wait_10_minutes',
delta=timedelta(minutes=10),
dag=dag,
)from airflow.sensors.base import BaseSensorOperator
class CustomDataSensor(BaseSensorOperator):
def __init__(self, endpoint, **kwargs):
super().__init__(**kwargs)
self.endpoint = endpoint
def poke(self, context):
"""Check if data is available"""
import requests
import os
api_key = os.getenv('API_KEY')
response = requests.get(
self.endpoint,
headers={'Authorization': f'Bearer {api_key}'}
)
return response.status_code == 200 and response.json().get('ready', False)
check_data = CustomDataSensor(
task_id='check_data_ready',
endpoint='https://api.example.com/status',
poke_interval=60,
timeout=3600,
dag=dag,
)# Add a Postgres connection
airflow connections add 'postgres_default' \
--conn-type 'postgres' \
--conn-host 'localhost' \
--conn-schema 'mydb' \
--conn-login 'user' \
--conn-password 'password' \
--conn-port 5432
# Add an HTTP connection
airflow connections add 'http_api' \
--conn-type 'http' \
--conn-host 'https://api.example.com' \
--conn-extra '{"api_key": "from_env_var"}'export AIRFLOW_CONN_POSTGRES_DEFAULT='postgresql://user:password@localhost:5432/mydb'
export AIRFLOW_CONN_HTTP_API='http://api.example.com'from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
@task()
def query_database():
"""Query PostgreSQL database"""
hook = PostgresHook(postgres_conn_id='postgres_default')
# Execute query and fetch results
results = hook.get_records(
sql="SELECT * FROM users WHERE created_date = %s",
parameters=['2024-01-01']
)
# Or use pandas
df = hook.get_pandas_df(sql="SELECT * FROM transactions")
return len(results)
@task()
def insert_data():
"""Insert data into database"""
hook = PostgresHook(postgres_conn_id='postgres_default')
hook.run(
sql="INSERT INTO logs (message, timestamp) VALUES (%s, %s)",
parameters=[('Pipeline completed', datetime.now())]
)from airflow.providers.http.hooks.http import HttpHook
from airflow.decorators import task
@task()
def call_api():
"""Make HTTP API call"""
hook = HttpHook(http_conn_id='http_api', method='GET')
response = hook.run(
endpoint='/v1/data',
headers={'Content-Type': 'application/json'},
extra_options={'timeout': 30}
)
data = response.json()
return datafrom airflow.decorators import dag, task
from datetime import datetime
@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def xcom_example():
@task()
def push_data():
"""Push data to XCom"""
return {
'total_records': 1000,
'processing_time': 45.2,
'status': 'success'
}
@task()
def pull_data(data: dict):
"""Receive data from previous task"""
print(f"Received {data['total_records']} records")
print(f"Processing took {data['processing_time']} seconds")
# Can also use task instance to pull from specific task
from airflow.operators.python import get_current_context
context = get_current_context()
ti = context['ti']
# Pull from specific task
specific_data = ti.xcom_pull(task_ids='push_data')
return specific_data['status']
result = push_data()
pull_data(result)
xcom_example()@task()
def process_multiple():
"""Return multiple values"""
return {'key1': 'value1', 'key2': 'value2'}
@task()
def use_multiple(data: dict):
"""Use multiple values"""
print(data['key1'], data['key2'])
data = process_multiple()
use_multiple(data)task1 >> task2 >> task3
# Or
task1.set_downstream(task2)
task2.set_downstream(task3)# Fan-out
task1 >> [task2, task3, task4]
# Fan-in
[task2, task3, task4] >> task5# Multiple dependencies
task1 >> task2
task1 >> task3
[task2, task3] >> task4
# Or using chain
from airflow.models.baseoperator import chain
chain(task1, [task2, task3], task4)from airflow.sensors.external_task import ExternalTaskSensor
wait_for_other_dag = ExternalTaskSensor(
task_id='wait_for_upstream_dag',
external_dag_id='upstream_dag',
external_task_id='final_task',
timeout=3600,
dag=dag,
)from airflow.decorators import dag, task
from datetime import datetime
@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def dynamic_tasks_example():
@task()
def get_sources():
"""Get list of data sources to process"""
return ['source_1', 'source_2', 'source_3', 'source_4']
@task()
def process_source(source: str):
"""Process a single source"""
print(f"Processing {source}")
# Your processing logic
return f"{source}_processed"
@task()
def combine_results(results: list):
"""Combine all processed results"""
print(f"Combining {len(results)} results")
return results
sources = get_sources()
processed = process_source.expand(source=sources)
combine_results(processed)
dynamic_tasks_example()@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def task_mapping_example():
@task()
def extract_files():
"""Return list of files to process"""
return [
{'file': 'data1.csv', 'format': 'csv'},
{'file': 'data2.json', 'format': 'json'},
{'file': 'data3.parquet', 'format': 'parquet'},
]
@task()
def process_file(file_info: dict):
"""Process a single file"""
filename = file_info['file']
format = file_info['format']
print(f"Processing {filename} as {format}")
return f"Processed {filename}"
files = extract_files()
process_file.expand(file_info=files)
task_mapping_example()[core]
# DAGs folder
dags_folder = /opt/airflow/dags
# Executor (LocalExecutor, CeleryExecutor, KubernetesExecutor)
executor = LocalExecutor
# Parallelism
parallelism = 32
dag_concurrency = 16
max_active_runs_per_dag = 16
[database]
# Database connection
sql_alchemy_conn = postgresql+psycopg2://airflow:password@localhost/airflow
[scheduler]
# How often to scan for new DAGs
dag_dir_list_interval = 300
# Number of scheduler processes
scheduler_zombie_task_threshold = 300
[webserver]
# Web server host and port
web_server_host = 0.0.0.0
web_server_port = 8080
# Secret key for session
secret_key = your_secret_key_here
[email]
# Email backend
email_backend = airflow.utils.email.send_email_smtp
[smtp]
smtp_host = smtp.gmail.com
smtp_port = 587
smtp_user = your_email@gmail.com
smtp_password = your_app_password
smtp_mail_from = airflow@example.com# Override any config
export AIRFLOW__CORE__EXECUTOR=LocalExecutor
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://user:pass@localhost/airflow
export AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
export AIRFLOW__WEBSERVER__SECRET_KEY=your_secret_key
# Set Airflow home
export AIRFLOW_HOME=~/airflow# Via CLI
airflow variables set my_key my_value
airflow variables set api_endpoint "https://api.example.com/v1"
# Import from JSON file
airflow variables import variables.jsonfrom airflow.models import Variable
# Get variable
api_endpoint = Variable.get("api_endpoint")
# Get with default value
timeout = Variable.get("timeout", default_var=30)
# Get as JSON
config = Variable.get("config_json", deserialize_json=True)
# In a task
@task()
def use_variable():
endpoint = Variable.get("api_endpoint")
print(f"Using endpoint: {endpoint}")bash_task = BashOperator(
task_id='use_variable',
bash_command='echo "API: {{ var.value.api_endpoint }}"',
dag=dag,
)from airflow.operators.bash import BashOperator
templated_command = """
# Execution date
echo "Execution date: {{ ds }}" # YYYY-MM-DD
echo "Execution date no dash: {{ ds_nodash }}" # YYYYMMDD
# Date components
echo "Year: {{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}"
echo "Previous day: {{ macros.ds_add(ds, -1) }}"
# Task instance
echo "Task ID: {{ task.task_id }}"
echo "DAG ID: {{ dag.dag_id }}"
echo "Run ID: {{ run_id }}"
# Parameters
echo "Param: {{ params.my_param }}"
"""
task = BashOperator(
task_id='templated_task',
bash_command=templated_command,
params={'my_param': 'value'},
dag=dag,
)def custom_macro(value):
"""Custom Jinja macro"""
return value.upper()
dag = DAG(
'dag_with_macros',
user_defined_macros={
'custom_upper': custom_macro
},
start_date=datetime(2024, 1, 1),
)
task = BashOperator(
task_id='use_macro',
bash_command='echo "{{ custom_upper(params.name) }}"',
params={'name': 'airflow'},
dag=dag,
)# List all DAGs
airflow dags list
# List tasks in a DAG
airflow tasks list my_dag
# Show DAG structure
airflow dags show my_dag
# Trigger a DAG run
airflow dags trigger my_dag
# Trigger with config
airflow dags trigger my_dag --conf '{"key": "value"}'
# Pause/Unpause DAG
airflow dags pause my_dag
airflow dags unpause my_dag
# Delete DAG (from metadata, not file)
airflow dags delete my_dag# Test a task (doesn't save state)
airflow tasks test my_dag my_task 2024-01-01
# Run a task (saves state)
airflow tasks run my_dag my_task 2024-01-01
# Clear task state
airflow tasks clear my_dag --task-regex my_task
# Clear all tasks in DAG
airflow tasks clear my_dag --start-date 2024-01-01 --end-date 2024-01-31# Initialize database
airflow db init
# Upgrade database
airflow db upgrade
# Reset database (WARNING: deletes all data)
airflow db reset
# Check database
airflow db check# Create user
airflow users create \
--username john \
--firstname John \
--lastname Doe \
--role Admin \
--email john@example.com
# List users
airflow users list
# Delete user
airflow users delete --username john# List connections
airflow connections list
# Get connection details
airflow connections get postgres_default
# Export connections
airflow connections export connections.json
# Import connections
airflow connections import connections.json# Set variable
airflow variables set my_var my_value
# Get variable
airflow variables get my_var
# Delete variable
airflow variables delete my_var
# List all variables
airflow variables list
# Export variables to JSON
airflow variables export variables.json
# Import variables from JSON
airflow variables import variables.json# Install specific providers
pip install apache-airflow-providers-amazon
pip install apache-airflow-providers-google
pip install apache-airflow-providers-postgres
pip install apache-airflow-providers-http
pip install apache-airflow-providers-dockerfrom airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator
from airflow.decorators import dag, task
from datetime import datetime
@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def s3_example():
list_files = S3ListOperator(
task_id='list_s3_files',
bucket='my-bucket',
prefix='data/',
aws_conn_id='aws_default',
)
@task()
def download_from_s3():
"""Download file from S3"""
hook = S3Hook(aws_conn_id='aws_default')
# Download file
content = hook.read_key(
key='data/file.csv',
bucket_name='my-bucket'
)
return len(content)
@task()
def upload_to_s3():
"""Upload file to S3"""
hook = S3Hook(aws_conn_id='aws_default')
# Upload file
hook.load_string(
string_data='Hello, S3!',
key='output/result.txt',
bucket_name='my-bucket'
)
list_files >> download_from_s3() >> upload_to_s3()
s3_example()from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
load_to_bigquery = GCSToBigQueryOperator(
task_id='load_to_bq',
bucket='my-gcs-bucket',
source_objects=['data/*.csv'],
destination_project_dataset_table='project.dataset.table',
source_format='CSV',
skip_leading_rows=1,
write_disposition='WRITE_TRUNCATE',
gcp_conn_id='google_cloud_default',
dag=dag,
)from airflow.providers.docker.operators.docker import DockerOperator
run_container = DockerOperator(
task_id='run_docker_container',
image='python:3.10',
command='python -c "print(\'Hello from Docker\')"',
docker_url='unix://var/run/docker.sock',
network_mode='bridge',
dag=dag,
)@task(
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(hours=1),
)
def task_with_retries():
"""Task with custom retry logic"""
# Your code
passdef on_failure_callback(context):
"""Called when task fails"""
ti = context['task_instance']
print(f"Task {ti.task_id} failed!")
# Send alert, create ticket, etc.
def on_success_callback(context):
"""Called when task succeeds"""
print("Task succeeded!")
@task(
on_failure_callback=on_failure_callback,
on_success_callback=on_success_callback,
)
def monitored_task():
"""Task with callbacks"""
# Your code
passfrom airflow.exceptions import AirflowException
@task()
def safe_task():
"""Task with error handling"""
try:
# Your code that might fail
result = risky_operation()
return result
except SpecificException as e:
# Log error but don't fail task
print(f"Warning: {e}")
return None
except Exception as e:
# Fail task with custom message
raise AirflowException(f"Critical error: {e}")# test_dag.py
import pytest
from datetime import datetime
from airflow.models import DagBag
def test_dag_loaded():
"""Test that DAG is loaded correctly"""
dagbag = DagBag(dag_folder='dags/', include_examples=False)
assert 'my_dag' in dagbag.dags
assert len(dagbag.import_errors) == 0
def test_dag_structure():
"""Test DAG structure"""
dagbag = DagBag(dag_folder='dags/', include_examples=False)
dag = dagbag.get_dag('my_dag')
# Check task count
assert len(dag.tasks) == 5
# Check specific task exists
assert 'extract' in dag.task_ids
# Check dependencies
extract_task = dag.get_task('extract')
downstream = extract_task.downstream_task_ids
assert 'transform' in downstream
def test_task_execution():
"""Test task execution"""
from airflow.models import TaskInstance
from airflow import settings
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
task = dag.get_task('extract')
# Create task instance
ti = TaskInstance(task=task, execution_date=datetime(2024, 1, 1))
# Test execution
ti.run(ignore_ti_state=True)
# Check result
assert ti.state == 'success'# test_integration.py
from airflow.models import DagBag
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
def test_dag_run():
"""Test complete DAG run"""
dagbag = DagBag(dag_folder='dags/')
dag = dagbag.get_dag('my_dag')
# Trigger DAG
dag.test()@task()
def idempotent_load(execution_date):
"""
Task that can be run multiple times safely
"""
# Delete existing data for this date first
delete_query = "DELETE FROM table WHERE date = %s"
hook.run(delete_query, parameters=[execution_date])
# Then insert new data
insert_query = "INSERT INTO table ..."
hook.run(insert_query)# Good: Use connections
@task()
def good_practice():
hook = PostgresHook(postgres_conn_id='postgres_default')
# Use hook
# Bad: Hardcode credentials
@task()
def bad_practice():
import psycopg2
conn = psycopg2.connect(
host='localhost',
user='user', # Don't do this!
password='password' # Never do this!
)# Good: Pass references
@task()
def process_data():
# Process and save to database/storage
data_id = save_to_database(large_data)
return data_id # Return only ID
@task()