celery-advanced
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAdvanced Celery Patterns
高级Celery模式
Enterprise-grade task orchestration beyond basic background jobs.
超越基础后台任务的企业级任务编排方案。
Overview
概述
- Complex multi-step task workflows (ETL pipelines, order processing)
- Priority-based task processing (premium vs standard users)
- Rate-limited external API calls (API quotas, throttling)
- Multi-queue routing (dedicated workers per task type)
- Production monitoring and observability
- Task result aggregation and fan-out patterns
- 复杂的多步骤任务工作流(ETL流水线、订单处理)
- 基于优先级的任务处理(付费用户 vs 普通用户)
- 受限速控制的外部API调用(API配额、流量削峰)
- 多队列路由(为不同任务类型分配专属Worker)
- 生产环境监控与可观测性
- 任务结果聚合与扇出模式
Canvas Workflows
画布工作流
Signatures (Task Invocation)
Signatures(任务调用)
python
from celery import signature, chain, group, chordpython
from celery import signature, chain, group, chordCreate a reusable task signature
Create a reusable task signature
sig = signature("tasks.process_order", args=[order_id], kwargs={"priority": "high"})
sig = signature("tasks.process_order", args=[order_id], kwargs={"priority": "high"})
Immutable signature (won't receive results from previous task)
Immutable signature (won't receive results from previous task)
sig = process_order.si(order_id)
sig = process_order.si(order_id)
Partial signature (curry arguments)
Partial signature (curry arguments)
partial_sig = send_email.s(subject="Order Update")
partial_sig = send_email.s(subject="Order Update")
Later: partial_sig.delay(to="user@example.com", body="...")
Later: partial_sig.delay(to="user@example.com", body="...")
undefinedundefinedChains (Sequential Execution)
Chains(顺序执行)
python
from celery import chainpython
from celery import chainTasks execute sequentially, passing results
Tasks execute sequentially, passing results
workflow = chain(
extract_data.s(source_id), # Returns raw_data
transform_data.s(), # Receives raw_data, returns clean_data
load_data.s(destination_id), # Receives clean_data
)
result = workflow.apply_async()
workflow = chain(
extract_data.s(source_id), # Returns raw_data
transform_data.s(), # Receives raw_data, returns clean_data
load_data.s(destination_id), # Receives clean_data
)
result = workflow.apply_async()
Access intermediate results
Access intermediate results
chain_result = result.get() # Final result
parent_result = result.parent.get() # Previous task result
chain_result = result.get() # Final result
parent_result = result.parent.get() # Previous task result
Error handling in chains
Error handling in chains
@celery_app.task(bind=True)
def transform_data(self, raw_data):
try:
return do_transform(raw_data)
except TransformError as exc:
# Chain stops here, no subsequent tasks run
raise self.retry(exc=exc, countdown=60)
undefined@celery_app.task(bind=True)
def transform_data(self, raw_data):
try:
return do_transform(raw_data)
except TransformError as exc:
# Chain stops here, no subsequent tasks run
raise self.retry(exc=exc, countdown=60)
undefinedGroups (Parallel Execution)
Groups(并行执行)
python
from celery import grouppython
from celery import groupExecute tasks in parallel
Execute tasks in parallel
parallel = group(
process_chunk.s(chunk) for chunk in chunks
)
group_result = parallel.apply_async()
parallel = group(
process_chunk.s(chunk) for chunk in chunks
)
group_result = parallel.apply_async()
Wait for all to complete
Wait for all to complete
results = group_result.get() # List of results
results = group_result.get() # List of results
Check completion status
Check completion status
group_result.ready() # All completed?
group_result.successful() # All succeeded?
group_result.failed() # Any failed?
group_result.ready() # All completed?
group_result.successful() # All succeeded?
group_result.failed() # Any failed?
Iterate as they complete
Iterate as they complete
for result in group_result:
if result.ready():
print(f"Completed: {result.get()}")
undefinedfor result in group_result:
if result.ready():
print(f"Completed: {result.get()}")
undefinedChords (Parallel + Callback)
Chords(并行+回调)
python
from celery import chordpython
from celery import chordParallel execution with callback when all complete
Parallel execution with callback when all complete
workflow = chord(
[process_chunk.s(chunk) for chunk in chunks],
aggregate_results.s() # Receives list of all results
)
result = workflow.apply_async()
workflow = chord(
[process_chunk.s(chunk) for chunk in chunks],
aggregate_results.s() # Receives list of all results
)
result = workflow.apply_async()
Chord with header and body
Chord with header and body
header = group(fetch_data.s(url) for url in urls)
body = combine_data.s()
workflow = chord(header, body)
header = group(fetch_data.s(url) for url in urls)
body = combine_data.s()
workflow = chord(header, body)
Error handling: if any header task fails, body won't run
Error handling: if any header task fails, body won't run
@celery_app.task(bind=True)
def aggregate_results(self, results):
# results = [result1, result2, ...]
return sum(results)
undefined@celery_app.task(bind=True)
def aggregate_results(self, results):
# results = [result1, result2, ...]
return sum(results)
undefinedMap and Starmap
Map和Starmap
python
undefinedpython
undefinedMap: apply same task to each item
Map: apply same task to each item
workflow = process_item.map([item1, item2, item3])
workflow = process_item.map([item1, item2, item3])
Starmap: unpack args for each call
Starmap: unpack args for each call
workflow = send_email.starmap([
("user1@example.com", "Subject 1"),
("user2@example.com", "Subject 2"),
])
workflow = send_email.starmap([
("user1@example.com", "Subject 1"),
("user2@example.com", "Subject 2"),
])
Chunks: split large list into batches
Chunks: split large list into batches
workflow = process_item.chunks(items, batch_size=100)
undefinedworkflow = process_item.chunks(items, batch_size=100)
undefinedPriority Queues
优先级队列
Queue Configuration
队列配置
python
undefinedpython
undefinedcelery_config.py
celery_config.py
from kombu import Queue
celery_app.conf.task_queues = (
Queue("high", routing_key="high"),
Queue("default", routing_key="default"),
Queue("low", routing_key="low"),
)
celery_app.conf.task_default_queue = "default"
celery_app.conf.task_default_routing_key = "default"
from kombu import Queue
celery_app.conf.task_queues = (
Queue("high", routing_key="high"),
Queue("default", routing_key="default"),
Queue("low", routing_key="low"),
)
celery_app.conf.task_default_queue = "default"
celery_app.conf.task_default_routing_key = "default"
Priority within queue (requires Redis 5+)
Priority within queue (requires Redis 5+)
celery_app.conf.broker_transport_options = {
"priority_steps": list(range(10)), # 0-9 priority levels
"sep": ":",
"queue_order_strategy": "priority",
}
undefinedcelery_app.conf.broker_transport_options = {
"priority_steps": list(range(10)), # 0-9 priority levels
"sep": ":",
"queue_order_strategy": "priority",
}
undefinedTask Routing
任务路由
python
undefinedpython
undefinedRoute by task name
Route by task name
celery_app.conf.task_routes = {
"tasks.critical_task": {"queue": "high"},
"tasks.bulk_": {"queue": "low"},
"tasks.default_": {"queue": "default"},
}
celery_app.conf.task_routes = {
"tasks.critical_task": {"queue": "high"},
"tasks.bulk_": {"queue": "low"},
"tasks.default_": {"queue": "default"},
}
Route dynamically at call time
Route dynamically at call time
critical_task.apply_async(args=[data], queue="high", priority=9)
bulk_task.apply_async(args=[data], queue="low", priority=1)
critical_task.apply_async(args=[data], queue="high", priority=9)
bulk_task.apply_async(args=[data], queue="low", priority=1)
Route by task attribute
Route by task attribute
@celery_app.task(queue="high", priority=8)
def premium_user_task(user_id):
pass
undefined@celery_app.task(queue="high", priority=8)
def premium_user_task(user_id):
pass
undefinedWorker Configuration
Worker配置
bash
undefinedbash
undefinedStart workers for specific queues
Start workers for specific queues
celery -A app worker -Q high -c 4 --prefetch-multiplier=1
celery -A app worker -Q default -c 8
celery -A app worker -Q low -c 2 --prefetch-multiplier=4
undefinedcelery -A app worker -Q high -c 4 --prefetch-multiplier=1
celery -A app worker -Q default -c 8
celery -A app worker -Q low -c 2 --prefetch-multiplier=4
undefinedRate Limiting
速率限制
Per-Task Rate Limits
单任务速率限制
python
@celery_app.task(rate_limit="100/m") # 100 per minute
def call_external_api(endpoint):
return requests.get(endpoint)
@celery_app.task(rate_limit="10/s") # 10 per second
def send_notification(user_id):
pass
@celery_app.task(rate_limit="1000/h") # 1000 per hour
def bulk_email(batch):
passpython
@celery_app.task(rate_limit="100/m") # 100 per minute
def call_external_api(endpoint):
return requests.get(endpoint)
@celery_app.task(rate_limit="10/s") # 10 per second
def send_notification(user_id):
pass
@celery_app.task(rate_limit="1000/h") # 1000 per hour
def bulk_email(batch):
passDynamic Rate Limiting
动态速率限制
python
from celery import current_apppython
from celery import current_appChange rate limit at runtime
Change rate limit at runtime
current_app.control.rate_limit(
"tasks.call_external_api",
"50/m", # Reduce during high load
destination=["worker1@hostname"],
)
current_app.control.rate_limit(
"tasks.call_external_api",
"50/m", # Reduce during high load
destination=["worker1@hostname"],
)
Custom rate limiter with token bucket
Custom rate limiter with token bucket
from celery.utils.time import rate
from celery_singleton import Singleton
class RateLimitedTask(celery_app.Task):
_rate_limit_key = "api:rate_limit"
def __call__(self, *args, **kwargs):
if not self._acquire_token():
self.retry(countdown=self._get_backoff())
return super().__call__(*args, **kwargs)
def _acquire_token(self):
return redis_client.set(
self._rate_limit_key,
"1",
nx=True,
ex=1 # 1 second window
)undefinedfrom celery.utils.time import rate
from celery_singleton import Singleton
class RateLimitedTask(celery_app.Task):
_rate_limit_key = "api:rate_limit"
def __call__(self, *args, **kwargs):
if not self._acquire_token():
self.retry(countdown=self._get_backoff())
return super().__call__(*args, **kwargs)
def _acquire_token(self):
return redis_client.set(
self._rate_limit_key,
"1",
nx=True,
ex=1 # 1 second window
)undefinedMulti-Queue Routing
多队列路由
Router Classes
路由类
python
class TaskRouter:
def route_for_task(self, task, args=None, kwargs=None):
if task.startswith("tasks.premium"):
return {"queue": "premium", "priority": 8}
elif task.startswith("tasks.analytics"):
return {"queue": "analytics"}
elif kwargs and kwargs.get("urgent"):
return {"queue": "high"}
return {"queue": "default"}
celery_app.conf.task_routes = (TaskRouter(),)python
class TaskRouter:
def route_for_task(self, task, args=None, kwargs=None):
if task.startswith("tasks.premium"):
return {"queue": "premium", "priority": 8}
elif task.startswith("tasks.analytics"):
return {"queue": "analytics"}
elif kwargs and kwargs.get("urgent"):
return {"queue": "high"}
return {"queue": "default"}
celery_app.conf.task_routes = (TaskRouter(),)Content-Based Routing
基于内容的路由
python
@celery_app.task(bind=True)
def process_order(self, order):
# Route based on order value
if order["total"] > 1000:
self.update_state(state="ROUTING", meta={"queue": "premium"})
return chain(
verify_inventory.s(order).set(queue="high"),
process_payment.s().set(queue="high"),
notify_customer.s().set(queue="notifications"),
).apply_async()
else:
return standard_workflow(order)python
@celery_app.task(bind=True)
def process_order(self, order):
# Route based on order value
if order["total"] > 1000:
self.update_state(state="ROUTING", meta={"queue": "premium"})
return chain(
verify_inventory.s(order).set(queue="high"),
process_payment.s().set(queue="high"),
notify_customer.s().set(queue="notifications"),
).apply_async()
else:
return standard_workflow(order)Production Monitoring
生产环境监控
Flower Dashboard
Flower仪表盘
bash
undefinedbash
undefinedInstall and run Flower
Install and run Flower
pip install flower
celery -A app flower --port=5555 --basic_auth=admin:password
pip install flower
celery -A app flower --port=5555 --basic_auth=admin:password
With persistent storage
With persistent storage
celery -A app flower --persistent=True --db=flower.db
undefinedcelery -A app flower --persistent=True --db=flower.db
undefinedCustom Events
自定义事件
python
from celery import signals
@signals.task_prerun.connect
def on_task_start(sender, task_id, task, args, kwargs, **_):
metrics.counter("task_started", tags={"task": task.name})
@signals.task_postrun.connect
def on_task_complete(sender, task_id, task, args, kwargs, retval, state, **_):
metrics.counter("task_completed", tags={"task": task.name, "state": state})
@signals.task_failure.connect
def on_task_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **_):
alerting.send_alert(
f"Task {sender.name} failed: {exception}",
severity="error"
)python
from celery import signals
@signals.task_prerun.connect
def on_task_start(sender, task_id, task, args, kwargs, **_):
metrics.counter("task_started", tags={"task": task.name})
@signals.task_postrun.connect
def on_task_complete(sender, task_id, task, args, kwargs, retval, state, **_):
metrics.counter("task_completed", tags={"task": task.name, "state": state})
@signals.task_failure.connect
def on_task_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **_):
alerting.send_alert(
f"Task {sender.name} failed: {exception}",
severity="error"
)Health Checks
健康检查
python
from celery import current_app
def celery_health_check():
try:
# Check broker connection
conn = current_app.connection()
conn.ensure_connection(max_retries=3)
# Check workers responding
inspector = current_app.control.inspect()
active_workers = inspector.active()
if not active_workers:
return {"status": "unhealthy", "reason": "No active workers"}
return {
"status": "healthy",
"workers": list(active_workers.keys()),
"active_tasks": sum(len(tasks) for tasks in active_workers.values()),
}
except Exception as e:
return {"status": "unhealthy", "reason": str(e)}python
from celery import current_app
def celery_health_check():
try:
# Check broker connection
conn = current_app.connection()
conn.ensure_connection(max_retries=3)
# Check workers responding
inspector = current_app.control.inspect()
active_workers = inspector.active()
if not active_workers:
return {"status": "unhealthy", "reason": "No active workers"}
return {
"status": "healthy",
"workers": list(active_workers.keys()),
"active_tasks": sum(len(tasks) for tasks in active_workers.values()),
}
except Exception as e:
return {"status": "unhealthy", "reason": str(e)}Custom Task States
自定义任务状态
python
from celery import statespython
from celery import statesDefine custom states
Define custom states
VALIDATING = "VALIDATING"
PROCESSING = "PROCESSING"
UPLOADING = "UPLOADING"
@celery_app.task(bind=True)
def long_running_task(self, data):
self.update_state(state=VALIDATING, meta={"step": 1, "total": 3})
validate(data)
self.update_state(state=PROCESSING, meta={"step": 2, "total": 3})
result = process(data)
self.update_state(state=UPLOADING, meta={"step": 3, "total": 3})
upload(result)
return {"status": "complete", "url": result.url}VALIDATING = "VALIDATING"
PROCESSING = "PROCESSING"
UPLOADING = "UPLOADING"
@celery_app.task(bind=True)
def long_running_task(self, data):
self.update_state(state=VALIDATING, meta={"step": 1, "total": 3})
validate(data)
self.update_state(state=PROCESSING, meta={"step": 2, "total": 3})
result = process(data)
self.update_state(state=UPLOADING, meta={"step": 3, "total": 3})
upload(result)
return {"status": "complete", "url": result.url}Query task progress
Query task progress
from celery.result import AsyncResult
result = AsyncResult(task_id)
if result.state == PROCESSING:
print(f"Step {result.info['step']}/{result.info['total']}")
undefinedfrom celery.result import AsyncResult
result = AsyncResult(task_id)
if result.state == PROCESSING:
print(f"Step {result.info['step']}/{result.info['total']}")
undefinedBase Tasks and Inheritance
基础任务与继承
python
from celery import Task
class DatabaseTask(Task):
"""Base task with database session management."""
_db = None
@property
def db(self):
if self._db is None:
self._db = create_session()
return self._db
def after_return(self, status, retval, task_id, args, kwargs, einfo):
if self._db:
self._db.close()
self._db = None
class RetryableTask(Task):
"""Base task with exponential backoff retry."""
autoretry_for = (ConnectionError, TimeoutError)
max_retries = 5
retry_backoff = True
retry_backoff_max = 600
retry_jitter = True
@celery_app.task(base=DatabaseTask)
def query_database(query):
return query_database.db.execute(query)
@celery_app.task(base=RetryableTask)
def call_flaky_api(endpoint):
return requests.get(endpoint, timeout=30)python
from celery import Task
class DatabaseTask(Task):
"""Base task with database session management."""
_db = None
@property
def db(self):
if self._db is None:
self._db = create_session()
return self._db
def after_return(self, status, retval, task_id, args, kwargs, einfo):
if self._db:
self._db.close()
self._db = None
class RetryableTask(Task):
"""Base task with exponential backoff retry."""
autoretry_for = (ConnectionError, TimeoutError)
max_retries = 5
retry_backoff = True
retry_backoff_max = 600
retry_jitter = True
@celery_app.task(base=DatabaseTask)
def query_database(query):
return query_database.db.execute(query)
@celery_app.task(base=RetryableTask)
def call_flaky_api(endpoint):
return requests.get(endpoint, timeout=30)Key Decisions
关键决策
| Decision | Recommendation |
|---|---|
| Workflow type | Chain for sequential, Group for parallel, Chord for fan-in |
| Priority queues | 3 queues (high/default/low) for most use cases |
| Rate limiting | Per-task |
| Monitoring | Flower + custom signals for production |
| Task routing | Content-based router for dynamic routing needs |
| Worker scaling | Separate workers per queue, autoscale with HPA |
| Error handling | Base task with retry + dead letter queue |
| 决策项 | 推荐方案 |
|---|---|
| 工作流类型 | 顺序任务用Chain,并行任务用Group,扇入场景用Chord |
| 优先级队列 | 大多数场景使用3个队列(high/default/low) |
| 速率限制 | 简单场景用单任务 |
| 监控方案 | 生产环境使用Flower+自定义事件信号 |
| 任务路由 | 动态路由需求使用基于内容的路由类 |
| Worker扩容 | 为每个队列分配独立Worker,结合HPA实现自动扩容 |
| 错误处理 | 基于基础任务实现重试+死信队列机制 |
Anti-Patterns (FORBIDDEN)
反模式(禁止使用)
python
undefinedpython
undefinedNEVER block on results in tasks
NEVER block on results in tasks
@celery_app.task
def bad_task():
result = other_task.delay()
return result.get() # Blocks worker, causes deadlock!
@celery_app.task
def bad_task():
result = other_task.delay()
return result.get() # Blocks worker, causes deadlock!
NEVER use synchronous I/O without timeout
NEVER use synchronous I/O without timeout
requests.get(url) # Can hang forever
requests.get(url) # Can hang forever
NEVER ignore task acknowledgment
NEVER ignore task acknowledgment
celery_app.conf.task_acks_late = False # Default loses tasks on crash
celery_app.conf.task_acks_late = False # Default loses tasks on crash
NEVER skip idempotency for retried tasks
NEVER skip idempotency for retried tasks
@celery_app.task(max_retries=3)
def create_order(order):
Order.create(order) # Creates duplicates on retry!
@celery_app.task(max_retries=3)
def create_order(order):
Order.create(order) # Creates duplicates on retry!
ALWAYS use immutable signatures in chords
ALWAYS use immutable signatures in chords
chord([task.s(x) for x in items], callback.si()) # si() prevents arg pollution
undefinedchord([task.s(x) for x in items], callback.si()) # si() prevents arg pollution
undefinedReferences
参考资料
For detailed implementation patterns, see:
- - Deep dive on chain/group/chord with error handling
references/canvas-workflows.md - - Redis priority queue configuration
references/priority-queue-setup.md - - Per-task and dynamic rate limiting
references/rate-limiting-patterns.md - - Periodic task configuration
references/celery-beat-scheduling.md
如需了解详细实现模式,请参考:
- - 包含错误处理的Chain/Group/Chord深度解析
references/canvas-workflows.md - - Redis优先级队列配置指南
references/priority-queue-setup.md - - 单任务与动态速率限制实现
references/rate-limiting-patterns.md - - 周期任务配置说明
references/celery-beat-scheduling.md
Templates
模板
Production-ready code templates:
- - Complete production Celery configuration
scripts/celery-config-template.py - - ETL pipeline using canvas patterns
scripts/canvas-workflow-template.py - - Multi-queue worker with per-user rate limiting
scripts/priority-worker-template.py
生产环境可用的代码模板:
- - 完整的生产环境Celery配置
scripts/celery-config-template.py - - 使用画布模式的ETL流水线示例
scripts/canvas-workflow-template.py - - 带用户级速率限制的多队列Worker模板
scripts/priority-worker-template.py
Checklists
检查清单
- - Production deployment verification
checklists/celery-production-checklist.md
- - 生产环境部署验证清单
checklists/celery-production-checklist.md
Examples
示例
- - Real-world e-commerce order processing
examples/order-processing-pipeline.md
- - 真实电商场景的订单处理流水线
examples/order-processing-pipeline.md
Related Skills
相关技能
- - Basic Celery and ARQ patterns
background-jobs - - RabbitMQ/Kafka integration
message-queues - - Circuit breakers, retries
resilience-patterns - - Metrics and alerting
observability-monitoring
- - 基础Celery与ARQ模式
background-jobs - - RabbitMQ/Kafka集成
message-queues - - 熔断器、重试机制
resilience-patterns - - 指标采集与告警
observability-monitoring
Capability Details
能力详情
canvas-workflows
canvas-workflows
Keywords: chain, group, chord, signature, canvas, workflow
Solves:
- Complex multi-step task pipelines
- Parallel task execution with aggregation
- Sequential task dependencies
关键词: chain, group, chord, signature, canvas, workflow
解决问题:
- 复杂多步骤任务流水线
- 带聚合的并行任务执行
- 顺序任务依赖管理
priority-queues
priority-queues
Keywords: priority, queue, routing, high priority, low priority
Solves:
- Premium user task prioritization
- Urgent vs batch task handling
- Multi-queue worker deployment
关键词: priority, queue, routing, high priority, low priority
解决问题:
- 付费用户任务优先级保障
- 紧急任务与批量任务分离处理
- 多队列Worker部署
rate-limiting
rate-limiting
Keywords: rate limit, throttle, quota, api limit
Solves:
- External API rate limiting
- Per-task execution limits
- Dynamic rate adjustment
关键词: rate limit, throttle, quota, api limit
解决问题:
- 外部API调用速率控制
- 单任务执行频次限制
- 动态速率调整
task-monitoring
task-monitoring
Keywords: flower, monitoring, health check, task state
Solves: Production task monitoring, worker health checks, custom task state tracking
关键词: flower, monitoring, health check, task state
解决问题: 生产环境任务监控、Worker健康检查、自定义任务状态追踪