workflow-orchestrator
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseWorkflow Orchestrator
工作流编排器
You design and implement reliable, scheduled, and event-driven workflows.
您可以设计并实现可靠的、定时触发的以及事件驱动的工作流。
When to use
适用场景
- "Schedule this script to run every night."
- "Create a pipeline for data processing."
- "Chain these tasks together."
- "安排此脚本每晚运行。"
- "创建数据处理管道。"
- "将这些任务串联起来。"
Instructions
操作指南
- DAG Design:
- Structure tasks as a Directed Acyclic Graph (DAG).
- Clearly define dependencies between tasks.
- Tooling:
- Use Airflow/Prefect for complex, data-heavy pipelines.
- Use Celery/Bull queues for async background tasks in web apps.
- Use Cron for simple system-level schedules.
- Reliability:
- Implement retries with exponential backoff.
- Set alerts for failed tasks.
- Ensure idempotency (running the workflow twice doesn't break things).
- DAG设计:
- 将任务构建为有向无环图(DAG)。
- 明确定义任务之间的依赖关系。
- 工具选择:
- 对于复杂、数据密集型管道,使用Airflow/Prefect。
- 对于Web应用中的异步后台任务,使用Celery/Bull队列。
- 对于简单的系统级定时任务,使用Cron。
- 可靠性保障:
- 实现带指数退避的重试机制。
- 为失败任务设置告警。
- 确保幂等性(重复运行工作流不会导致问题)。
Examples
示例
1. Airflow DAG for Data Pipeline
1. 用于数据管道的Airflow DAG
python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import pandas as pd
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email': ['alerts@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
}
dag = DAG(
'daily_sales_pipeline',
default_args=default_args,
description='Process daily sales data',
schedule_interval='0 2 * * *', # Run at 2 AM daily
catchup=False,
tags=['sales', 'etl'],
)
def extract_data(**context):
"""Extract data from source"""
# Idempotent: check if already processed
execution_date = context['ds']
print(f"Extracting data for {execution_date}")
# Your extraction logic here
return f"data_{execution_date}.csv"
def transform_data(**context):
"""Transform the extracted data"""
ti = context['ti']
filename = ti.xcom_pull(task_ids='extract')
df = pd.read_csv(filename)
# Transformation logic
df['total'] = df['quantity'] * df['price']
output_file = f"transformed_{context['ds']}.csv"
df.to_csv(output_file, index=False)
return output_file
def load_data(**context):
"""Load data to warehouse"""
ti = context['ti']
filename = ti.xcom_pull(task_ids='transform')
print(f"Loading {filename} to warehouse")
# Load logic herepython
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import pandas as pd
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email': ['alerts@company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
}
dag = DAG(
'daily_sales_pipeline',
default_args=default_args,
description='Process daily sales data',
schedule_interval='0 2 * * *', # Run at 2 AM daily
catchup=False,
tags=['sales', 'etl'],
)
def extract_data(**context):
"""Extract data from source"""
# Idempotent: check if already processed
execution_date = context['ds']
print(f"Extracting data for {execution_date}")
# Your extraction logic here
return f"data_{execution_date}.csv"
def transform_data(**context):
"""Transform the extracted data"""
ti = context['ti']
filename = ti.xcom_pull(task_ids='extract')
df = pd.read_csv(filename)
# Transformation logic
df['total'] = df['quantity'] * df['price']
output_file = f"transformed_{context['ds']}.csv"
df.to_csv(output_file, index=False)
return output_file
def load_data(**context):
"""Load data to warehouse"""
ti = context['ti']
filename = ti.xcom_pull(task_ids='transform')
print(f"Loading {filename} to warehouse")
# Load logic hereDefine tasks
Define tasks
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
dag=dag,
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
dag=dag,
)
load = PythonOperator(
task_id='load',
python_callable=load_data,
dag=dag,
)
cleanup = BashOperator(
task_id='cleanup',
bash_command='rm -f /tmp/data_{{ ds }}*.csv',
dag=dag,
)
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
dag=dag,
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
dag=dag,
)
load = PythonOperator(
task_id='load',
python_callable=load_data,
dag=dag,
)
cleanup = BashOperator(
task_id='cleanup',
bash_command='rm -f /tmp/data_{{ ds }}*.csv',
dag=dag,
)
Set dependencies
Set dependencies
extract >> transform >> load >> cleanup
undefinedextract >> transform >> load >> cleanup
undefined2. Celery Task Queue for Background Jobs
2. 用于后台任务的Celery任务队列
python
undefinedpython
undefinedtasks.py
tasks.py
from celery import Celery
from celery.utils.log import get_task_logger
import time
app = Celery('tasks', broker='redis://localhost:6379/0')
logger = get_task_logger(name)
from celery import Celery
from celery.utils.log import get_task_logger
import time
app = Celery('tasks', broker='redis://localhost:6379/0')
logger = get_task_logger(name)
Configure retries
Configure retries
app.conf.task_acks_late = True
app.conf.task_reject_on_worker_lost = True
@app.task(bind=True, max_retries=3)
def send_email(self, user_id, email_type):
"""Send email with retry logic"""
try:
logger.info(f"Sending {email_type} email to user {user_id}")
# Email sending logic
# Simulate potential failure
if email_type == 'verification':
# Your email logic here
pass
return f"Email sent to user {user_id}"
except Exception as exc:
# Retry with exponential backoff
logger.error(f"Failed to send email: {exc}")
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
@app.task
def process_image(image_path):
"""Process uploaded image"""
logger.info(f"Processing image: {image_path}")
# Image processing logic
time.sleep(2) # Simulate processing
return f"Processed: {image_path}"
@app.task
def generate_report(report_id):
"""Generate monthly report"""
logger.info(f"Generating report {report_id}")
# Report generation logic
return {"report_id": report_id, "status": "completed"}
app.conf.task_acks_late = True
app.conf.task_reject_on_worker_lost = True
@app.task(bind=True, max_retries=3)
def send_email(self, user_id, email_type):
"""Send email with retry logic"""
try:
logger.info(f"Sending {email_type} email to user {user_id}")
# Email sending logic
# Simulate potential failure
if email_type == 'verification':
# Your email logic here
pass
return f"Email sent to user {user_id}"
except Exception as exc:
# Retry with exponential backoff
logger.error(f"Failed to send email: {exc}")
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
@app.task
def process_image(image_path):
"""Process uploaded image"""
logger.info(f"Processing image: {image_path}")
# Image processing logic
time.sleep(2) # Simulate processing
return f"Processed: {image_path}"
@app.task
def generate_report(report_id):
"""Generate monthly report"""
logger.info(f"Generating report {report_id}")
# Report generation logic
return {"report_id": report_id, "status": "completed"}
Usage in your application:
Usage in your application:
from tasks import send_email
from tasks import send_email
send_email.delay(user_id=123, email_type='welcome')
send_email.delay(user_id=123, email_type='welcome')
undefinedundefined3. Cron Job with Retry Logic (Python Script)
3. 带重试逻辑的Cron任务(Python脚本)
python
#!/usr/bin/env python3
"""
Scheduled backup script with retry logic
Add to crontab: 0 3 * * * /usr/bin/python3 /path/to/backup.py
"""
import subprocess
import time
import logging
from datetime import datetime
import smtplib
from email.message import EmailMessage
logging.basicConfig(
filename='/var/log/backup.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def send_alert(subject, message):
"""Send email alert on failure"""
msg = EmailMessage()
msg.set_content(message)
msg['Subject'] = subject
msg['From'] = 'backup@company.com'
msg['To'] = 'admin@company.com'
try:
with smtplib.SMTP('localhost') as s:
s.send_message(msg)
except Exception as e:
logging.error(f"Failed to send alert: {e}")
def backup_database(max_retries=3):
"""Backup database with retry logic"""
backup_file = f"/backups/db_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.sql"
for attempt in range(1, max_retries + 1):
try:
logging.info(f"Starting backup attempt {attempt}/{max_retries}")
# Run backup command
result = subprocess.run(
['pg_dump', '-U', 'postgres', '-d', 'mydb', '-f', backup_file],
capture_output=True,
text=True,
timeout=300
)
if result.returncode == 0:
logging.info(f"Backup successful: {backup_file}")
# Verify backup file exists and has content
import os
if os.path.exists(backup_file) and os.path.getsize(backup_file) > 0:
return True
else:
raise Exception("Backup file is empty or missing")
else:
raise Exception(f"Backup command failed: {result.stderr}")
except Exception as e:
logging.error(f"Backup attempt {attempt} failed: {e}")
if attempt < max_retries:
# Exponential backoff
wait_time = 2 ** attempt
logging.info(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
# Final failure
error_msg = f"Backup failed after {max_retries} attempts"
logging.critical(error_msg)
send_alert("CRITICAL: Database Backup Failed", error_msg)
return False
return False
if __name__ == '__main__':
logging.info("=== Backup job started ===")
success = backup_database()
if success:
logging.info("=== Backup job completed successfully ===")
exit(0)
else:
logging.error("=== Backup job failed ===")
exit(1)python
#!/usr/bin/env python3
"""
Scheduled backup script with retry logic
Add to crontab: 0 3 * * * /usr/bin/python3 /path/to/backup.py
"""
import subprocess
import time
import logging
from datetime import datetime
import smtplib
from email.message import EmailMessage
logging.basicConfig(
filename='/var/log/backup.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def send_alert(subject, message):
"""Send email alert on failure"""
msg = EmailMessage()
msg.set_content(message)
msg['Subject'] = subject
msg['From'] = 'backup@company.com'
msg['To'] = 'admin@company.com'
try:
with smtplib.SMTP('localhost') as s:
s.send_message(msg)
except Exception as e:
logging.error(f"Failed to send alert: {e}")
def backup_database(max_retries=3):
"""Backup database with retry logic"""
backup_file = f"/backups/db_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.sql"
for attempt in range(1, max_retries + 1):
try:
logging.info(f"Starting backup attempt {attempt}/{max_retries}")
# Run backup command
result = subprocess.run(
['pg_dump', '-U', 'postgres', '-d', 'mydb', '-f', backup_file],
capture_output=True,
text=True,
timeout=300
)
if result.returncode == 0:
logging.info(f"Backup successful: {backup_file}")
# Verify backup file exists and has content
import os
if os.path.exists(backup_file) and os.path.getsize(backup_file) > 0:
return True
else:
raise Exception("Backup file is empty or missing")
else:
raise Exception(f"Backup command failed: {result.stderr}")
except Exception as e:
logging.error(f"Backup attempt {attempt} failed: {e}")
if attempt < max_retries:
# Exponential backoff
wait_time = 2 ** attempt
logging.info(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
# Final failure
error_msg = f"Backup failed after {max_retries} attempts"
logging.critical(error_msg)
send_alert("CRITICAL: Database Backup Failed", error_msg)
return False
return False
if __name__ == '__main__':
logging.info("=== Backup job started ===")
success = backup_database()
if success:
logging.info("=== Backup job completed successfully ===")
exit(0)
else:
logging.error("=== Backup job failed ===")
exit(1)