Loading...
Loading...
Task routing and queue management patterns for Celery including priority queues, topic exchanges, worker-specific routing, and advanced queue configurations. Use when configuring task routing, managing queues, setting up priority queues, implementing worker routing, configuring topic exchanges, or when user mentions task routing, queue management, Celery routing, worker assignments, or message broker routing.
npx skill4agent add vanman2024/ai-dev-marketplace routing-strategiesscripts/test-routing.sh <config-file># Test routing configuration
./scripts/test-routing.sh ./celery_config.py
# Test with custom broker URL
BROKER_URL=amqp://user:password@localhost:5672// ./scripts/test-routing.sh ./celery_config.py
# Verbose output
VERBOSE=1 ./scripts/test-routing.sh ./celery_config.py012scripts/validate-queues.sh <project-dir># Validate current project
./scripts/validate-queues.sh .
# Validate specific directory
./scripts/validate-queues.sh /path/to/celery-app
# Generate detailed report
REPORT=1 ./scripts/validate-queues.sh . > queue-validation-report.md01templates/queue-config.pyfrom celery import Celery
from templates.queue_config import CELERY_ROUTES, CELERY_QUEUES
app = Celery('myapp')
app.conf.task_routes = CELERY_ROUTES
app.conf.task_queues = CELERY_QUEUESCELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('high_priority', Exchange('default'), routing_key='high'),
Queue('low_priority', Exchange('default'), routing_key='low'),
Queue('emails', Exchange('emails'), routing_key='email.*'),
Queue('reports', Exchange('reports'), routing_key='report.*'),
)
CELERY_ROUTES = {
'myapp.tasks.send_email': {'queue': 'emails', 'routing_key': 'email.send'},
'myapp.tasks.generate_report': {'queue': 'reports', 'routing_key': 'report.generate'},
'myapp.tasks.urgent_task': {'queue': 'high_priority', 'priority': 9},
}templates/routing-rules.pydef route_task(name, args, kwargs, options, task=None, **kw):
"""
Dynamic routing based on task name or arguments
"""
if name.startswith('urgent.'):
return {'queue': 'high_priority', 'priority': 9}
if 'priority' in kwargs and kwargs['priority'] == 'high':
return {'queue': 'high_priority'}
if name.startswith('email.'):
return {'queue': 'emails', 'exchange': 'emails'}
return {'queue': 'default'}
app.conf.task_routes = (route_task,)templates/priority-queues.py# Priority queue configuration
CELERY_QUEUES = (
Queue('critical', Exchange('tasks'), routing_key='critical',
queue_arguments={'x-max-priority': 10}),
Queue('high', Exchange('tasks'), routing_key='high',
queue_arguments={'x-max-priority': 10}),
Queue('normal', Exchange('tasks'), routing_key='normal',
queue_arguments={'x-max-priority': 10}),
Queue('low', Exchange('tasks'), routing_key='low',
queue_arguments={'x-max-priority': 10}),
)
# Task priority mapping
PRIORITY_LEVELS = {
'critical': 10, # Highest priority
'high': 7,
'normal': 5,
'low': 2,
}
# Apply priority to task
@app.task(priority=PRIORITY_LEVELS['high'])
def urgent_processing():
passtemplates/topic-exchange.pyfrom kombu import Exchange, Queue
# Topic exchange setup
task_exchange = Exchange('tasks', type='topic', durable=True)
CELERY_QUEUES = (
# Match specific patterns
Queue('user.notifications', exchange=task_exchange,
routing_key='user.notification.*'),
# Match all email types
Queue('emails', exchange=task_exchange,
routing_key='email.#'),
# Match processing tasks
Queue('processing', exchange=task_exchange,
routing_key='*.processing.*'),
# Match all reports
Queue('reports', exchange=task_exchange,
routing_key='report.*'),
)
# Routing configuration
CELERY_ROUTES = {
'myapp.tasks.send_welcome_email': {
'exchange': 'tasks',
'routing_key': 'email.welcome.send'
},
'myapp.tasks.notify_user': {
'exchange': 'tasks',
'routing_key': 'user.notification.send'
},
}templates/worker-routing.py# Worker pool definitions
WORKER_POOLS = {
'cpu_intensive': {
'queues': ['ml_training', 'video_processing', 'data_analysis'],
'concurrency': 4,
'prefetch_multiplier': 1,
},
'io_intensive': {
'queues': ['api_calls', 'file_uploads', 'emails'],
'concurrency': 50,
'prefetch_multiplier': 10,
},
'general': {
'queues': ['default', 'background'],
'concurrency': 10,
'prefetch_multiplier': 4,
},
}
# Start workers
# celery -A myapp worker --queues=ml_training,video_processing -c 4 -n cpu_worker@%h
# celery -A myapp worker --queues=api_calls,file_uploads -c 50 -n io_worker@%hexamples/priority-queue-setup.mdx-max-priorityexamples/topic-routing.md*#email.*.sendemail.welcome.sendemail.notification.senduser.#user.createuser.update.profileexamples/worker-queue-assignment.md# CPU-intensive workers (low concurrency)
celery -A myapp worker -Q ml_training,video_processing -c 4 -n cpu@%h
# I/O-intensive workers (high concurrency)
celery -A myapp worker -Q api_calls,emails -c 50 -n io@%h
# General purpose workers
celery -A myapp worker -Q default,background -c 10 -n general@%hemail.*.sendpriority=highregion=us-eastx-max-lengthcelery worker --prefetch-multiplier=4.gitignoreamqp://user:password@localhost:5672//# ❌ WRONG
BROKER_URL = 'amqp://myuser:secretpass123@rabbitmq.example.com:5672//'
# ✅ CORRECT
import os
BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'amqp://guest:guest@localhost:5672//')CELERY_BROKER_URLCELERY_RESULT_BACKENDexamples/priority-queue-setup.mdexamples/topic-routing.mdexamples/worker-queue-assignment.mdx-max-priority