celery
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCelery: Distributed Task Queue
Celery:分布式任务队列
Summary
概述
Celery is a distributed task queue system for Python that enables asynchronous execution of background jobs across multiple workers. It supports scheduling, retries, task workflows, and integrates seamlessly with Django, Flask, and FastAPI.
Celery是一款面向Python的分布式任务队列系统,支持跨多个Worker异步执行后台任务。它具备任务调度、重试、工作流编排能力,可与Django、Flask和FastAPI无缝集成。
When to Use
适用场景
- Background Processing: Offload long-running operations (email, file processing, reports)
- Scheduled Tasks: Cron-like periodic jobs (cleanup, backups, data sync)
- Distributed Computing: Process tasks across multiple workers/servers
- Async Workflows: Chain, group, and orchestrate complex task dependencies
- Real-time Processing: Handle webhooks, notifications, data pipelines
- Load Balancing: Distribute CPU-intensive work across workers
Don't Use When:
- Simple async I/O (use instead)
asyncio - Real-time request/response (use async web frameworks)
- Sub-second latency required (use in-memory queues)
- Minimal infrastructure (use simpler alternatives like RQ or Huey)
- 后台处理:卸载耗时操作(邮件发送、文件处理、报表生成)
- 定时任务:类Cron的周期性任务(清理、备份、数据同步)
- 分布式计算:跨多个Worker/服务器处理任务
- 异步工作流:链式、分组编排复杂任务依赖
- 实时处理:处理Webhook、通知、数据管道
- 负载均衡:将CPU密集型工作分配给多个Worker
不适用场景:
- 简单异步I/O(建议使用)
asyncio - 实时请求/响应(建议使用异步Web框架)
- 亚秒级延迟需求(建议使用内存队列)
- 极简基础设施(建议使用RQ或Huey等轻量替代方案)
Quick Start
快速开始
Installation
安装
bash
undefinedbash
undefinedBasic installation
基础安装
pip install celery
pip install celery
With Redis broker
搭配Redis消息代理
pip install celery[redis]
pip install celery[redis]
With RabbitMQ broker
搭配RabbitMQ消息代理
pip install celery[amqp]
pip install celery[amqp]
Full batteries (recommended)
完整安装(推荐)
pip install celery[redis,msgpack,auth,cassandra,elasticsearch,s3,sqs]
undefinedpip install celery[redis,msgpack,auth,cassandra,elasticsearch,s3,sqs]
undefinedBasic Setup
基础配置
python
undefinedpython
undefinedcelery_app.py
celery_app.py
from celery import Celery
from celery import Celery
Create Celery app with Redis broker
创建基于Redis的Celery应用
app = Celery(
'myapp',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
app = Celery(
'myapp',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
Configuration
配置项
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
)
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
)
Define a task
定义任务
@app.task
def add(x, y):
return x + y
@app.task
def send_email(to, subject, body):
# Simulate email sending
import time
time.sleep(2)
print(f"Email sent to {to}: {subject}")
return {"status": "sent", "to": to}
undefined@app.task
def add(x, y):
return x + y
@app.task
def send_email(to, subject, body):
# 模拟邮件发送
import time
time.sleep(2)
print(f"已向{to}发送邮件:{subject}")
return {"status": "sent", "to": to}
undefinedRunning Workers
启动Worker
bash
undefinedbash
undefinedStart worker
启动单个Worker
celery -A celery_app worker --loglevel=info
celery -A celery_app worker --loglevel=info
Multiple workers with concurrency
启动带并发能力的多个Worker
celery -A celery_app worker --concurrency=4 --loglevel=info
celery -A celery_app worker --concurrency=4 --loglevel=info
Named worker for specific queues
启动指定队列的命名Worker
celery -A celery_app worker -Q emails,reports --loglevel=info
undefinedcelery -A celery_app worker -Q emails,reports --loglevel=info
undefinedExecuting Tasks
执行任务
python
undefinedpython
undefinedCall task asynchronously
异步调用任务
result = add.delay(4, 6)
result = add.delay(4, 6)
Wait for result
等待结果返回
print(result.get(timeout=10)) # 10
print(result.get(timeout=10)) # 10
Apply async with options
带参数的异步调用
result = send_email.apply_async(
args=['user@example.com', 'Hello', 'Welcome!'],
countdown=60 # Execute after 60 seconds
)
result = send_email.apply_async(
args=['user@example.com', 'Hello', 'Welcome!'],
countdown=60 # 60秒后执行
)
Check task state
检查任务状态
print(result.status) # PENDING, STARTED, SUCCESS, FAILURE
---print(result.status) # PENDING, STARTED, SUCCESS, FAILURE
---Core Concepts
核心概念
Architecture Components
架构组件
Broker: Message queue that stores tasks
- Redis (recommended for most use cases)
- RabbitMQ (enterprise-grade, complex)
- Amazon SQS (serverless, AWS-native)
Workers: Processes that execute tasks
- Pull tasks from broker
- Execute task code
- Store results in backend
Result Backend: Storage for task results
- Redis (fast, in-memory)
- Database (PostgreSQL, MySQL)
- S3 (large results)
- Cassandra, Elasticsearch (specialized)
Beat Scheduler: Periodic task scheduler
- Cron-like scheduling
- Interval-based tasks
- Stores schedule in database or file
Broker(消息代理):存储任务的消息队列
- Redis(多数场景推荐)
- RabbitMQ(企业级,适用于复杂场景)
- Amazon SQS(无服务器,AWS原生)
Workers(工作进程):执行任务的进程
- 从Broker拉取任务
- 执行任务代码
- 将结果存储到Result Backend
Result Backend(结果后端):存储任务结果的组件
- Redis(快速,内存型)
- 数据库(PostgreSQL、MySQL)
- S3(存储大结果)
- Cassandra、Elasticsearch(专用存储)
Beat Scheduler(定时调度器):周期性任务调度器
- 类Cron的调度规则
- 基于时间间隔的任务
- 调度规则存储于数据库或文件
Task States
任务状态
PENDING → STARTED → SUCCESS
→ RETRY → SUCCESS
→ FAILURE- PENDING: Task waiting in queue
- STARTED: Worker picked up task
- SUCCESS: Task completed successfully
- FAILURE: Task raised exception
- RETRY: Task will retry after failure
- REVOKED: Task cancelled before execution
PENDING → STARTED → SUCCESS
→ RETRY → SUCCESS
→ FAILURE- PENDING:任务在队列中等待
- STARTED:Worker已拾取任务
- SUCCESS:任务执行成功
- FAILURE:任务执行抛出异常
- RETRY:任务失败后将重试
- REVOKED:任务在执行前被取消
Broker Setup
消息代理配置
Redis Configuration
Redis配置
python
undefinedpython
undefinedcelery_config.py
celery_config.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
With authentication
带认证的配置
broker_url = 'redis://:password@localhost:6379/0'
broker_url = 'redis://:password@localhost:6379/0'
Redis Sentinel (high availability)
Redis Sentinel(高可用)
broker_url = 'sentinel://localhost:26379;sentinel://localhost:26380'
broker_transport_options = {
'master_name': 'mymaster',
'sentinel_kwargs': {'password': 'password'},
}
broker_url = 'sentinel://localhost:26379;sentinel://localhost:26380'
broker_transport_options = {
'master_name': 'mymaster',
'sentinel_kwargs': {'password': 'password'},
}
Redis connection pool settings
Redis连接池设置
broker_pool_limit = 10
broker_connection_retry = True
broker_connection_retry_on_startup = True
broker_connection_max_retries = 10
undefinedbroker_pool_limit = 10
broker_connection_retry = True
broker_connection_retry_on_startup = True
broker_connection_max_retries = 10
undefinedRabbitMQ Configuration
RabbitMQ配置
python
undefinedpython
undefinedBasic RabbitMQ
基础RabbitMQ配置
broker_url = 'amqp://guest:guest@localhost:5672//'
broker_url = 'amqp://guest:guest@localhost:5672//'
With virtual host
带虚拟主机的配置
broker_url = 'amqp://user:password@localhost:5672/myvhost'
broker_url = 'amqp://user:password@localhost:5672/myvhost'
High availability (multiple brokers)
高可用配置(多代理)
broker_url = [
'amqp://user:password@host1:5672//',
'amqp://user:password@host2:5672//',
]
broker_url = [
'amqp://user:password@host1:5672//',
'amqp://user:password@host2:5672//',
]
RabbitMQ-specific settings
RabbitMQ专属设置
broker_heartbeat = 30
broker_pool_limit = 10
undefinedbroker_heartbeat = 30
broker_pool_limit = 10
undefinedAmazon SQS Configuration
Amazon SQS配置
python
undefinedpython
undefinedAWS SQS (serverless)
AWS SQS(无服务器)
broker_url = 'sqs://'
broker_transport_options = {
'region': 'us-east-1',
'queue_name_prefix': 'myapp-',
'visibility_timeout': 3600,
'polling_interval': 1,
}
broker_url = 'sqs://'
broker_transport_options = {
'region': 'us-east-1',
'queue_name_prefix': 'myapp-',
'visibility_timeout': 3600,
'polling_interval': 1,
}
With custom credentials
自定义凭证配置
import boto3
broker_transport_options = {
'region': 'us-east-1',
'predefined_queues': {
'default': {
'url': 'https://sqs.us-east-1.amazonaws.com/123456789/myapp-default',
}
}
}
---import boto3
broker_transport_options = {
'region': 'us-east-1',
'predefined_queues': {
'default': {
'url': 'https://sqs.us-east-1.amazonaws.com/123456789/myapp-default',
}
}
}
---Task Basics
任务基础
Task Definition
任务定义
python
from celery import Task, shared_task
from celery_app import apppython
from celery import Task, shared_task
from celery_app import appMethod 1: Decorator
方式1:装饰器
@app.task
def simple_task(x, y):
return x + y
@app.task
def simple_task(x, y):
return x + y
Method 2: Shared task (framework-agnostic)
方式2:共享任务(框架无关)
@shared_task
def framework_task(data):
return process(data)
@shared_task
def framework_task(data):
return process(data)
Method 3: Task class (advanced)
方式3:任务类(高级)
class CustomTask(Task):
def on_success(self, retval, task_id, args, kwargs):
print(f"Task {task_id} succeeded with {retval}")
def on_failure(self, exc, task_id, args, kwargs, einfo):
print(f"Task {task_id} failed: {exc}")
def on_retry(self, exc, task_id, args, kwargs, einfo):
print(f"Task {task_id} retrying: {exc}")@app.task(base=CustomTask)
def monitored_task(x):
return x * 2
undefinedclass CustomTask(Task):
def on_success(self, retval, task_id, args, kwargs):
print(f"任务{task_id}执行成功,结果:{retval}")
def on_failure(self, exc, task_id, args, kwargs, einfo):
print(f"任务{task_id}执行失败:{exc}")
def on_retry(self, exc, task_id, args, kwargs, einfo):
print(f"任务{task_id}将重试:{exc}")@app.task(base=CustomTask)
def monitored_task(x):
return x * 2
undefinedTask Options
任务选项
python
@app.task(
name='custom.task.name', # Custom task name
bind=True, # Bind task instance as first arg
ignore_result=True, # Don't store result (performance)
max_retries=3, # Max retry attempts
default_retry_delay=60, # Retry delay in seconds
rate_limit='100/h', # Rate limiting
time_limit=300, # Hard time limit (kills task)
soft_time_limit=240, # Soft time limit (raises exception)
serializer='json', # Task serializer
compression='gzip', # Compress large messages
priority=5, # Task priority (0-9)
queue='high_priority', # Target queue
routing_key='priority.high', # Routing key
acks_late=True, # Acknowledge after execution
reject_on_worker_lost=True, # Reject if worker dies
)
def advanced_task(self, data):
try:
return process(data)
except Exception as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)python
@app.task(
name='custom.task.name', # 自定义任务名称
bind=True, # 将任务实例绑定为第一个参数
ignore_result=True, # 不存储结果(提升性能)
max_retries=3, # 最大重试次数
default_retry_delay=60, # 重试间隔(秒)
rate_limit='100/h', # 速率限制
time_limit=300, # 硬时间限制(超时终止任务)
soft_time_limit=240, # 软时间限制(超时抛出异常)
serializer='json', # 任务序列化器
compression='gzip', # 压缩大消息
priority=5, # 任务优先级(0-9)
queue='high_priority', # 目标队列
routing_key='priority.high', # 路由键
acks_late=True, # 执行完成后再确认
reject_on_worker_lost=True, # Worker崩溃时拒绝任务
)
def advanced_task(self, data):
try:
return process(data)
except Exception as exc:
# 指数退避重试
raise self.retry(exc=exc, countdown=2 ** self.request.retries)Task Context (bind=True)
任务上下文(bind=True)
python
@app.task(bind=True)
def context_aware_task(self, x, y):
# Access task metadata
print(f"Task ID: {self.request.id}")
print(f"Task Name: {self.name}")
print(f"Args: {self.request.args}")
print(f"Kwargs: {self.request.kwargs}")
print(f"Retries: {self.request.retries}")
print(f"Delivery Info: {self.request.delivery_info}")
# Manual retry
try:
result = risky_operation(x, y)
except Exception as exc:
raise self.retry(exc=exc, countdown=60, max_retries=3)
return resultpython
@app.task(bind=True)
def context_aware_task(self, x, y):
# 访问任务元数据
print(f"任务ID:{self.request.id}")
print(f"任务名称:{self.name}")
print(f"参数:{self.request.args}")
print(f"关键字参数:{self.request.kwargs}")
print(f"重试次数:{self.request.retries}")
print(f"投递信息:{self.request.delivery_info}")
# 手动重试
try:
result = risky_operation(x, y)
except Exception as exc:
raise self.retry(exc=exc, countdown=60, max_retries=3)
return resultTask Execution
任务执行
Delay vs Apply Async
Delay与Apply Async对比
python
undefinedpython
undefineddelay() - Simple async execution
delay() - 简单异步执行
result = add.delay(4, 6)
result = add.delay(4, 6)
apply_async() - Full control
apply_async() - 完全控制执行参数
result = add.apply_async(
args=(4, 6),
kwargs={'extra': 'data'},
# Timing options
countdown=60, # Execute after N seconds
eta=datetime(2025, 12, 1, 10, 0), # Execute at specific time
expires=3600, # Task expires after N seconds
# Routing options
queue='math', # Target specific queue
routing_key='math.add', # Custom routing key
exchange='tasks', # Target exchange
priority=9, # High priority
# Execution options
serializer='json', # Message serializer
compression='gzip', # Compress payload
retry=True, # Auto-retry on failure
retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
},
# Task linking
link=log_result.s(), # Success callback
link_error=handle_error.s(), # Error callback)
result = add.apply_async(
args=(4, 6),
kwargs={'extra': 'data'},
# 时间选项
countdown=60, # N秒后执行
eta=datetime(2025, 12, 1, 10, 0), # 特定时间执行
expires=3600, # N秒后任务过期
# 路由选项
queue='math', # 目标特定队列
routing_key='math.add', # 自定义路由键
exchange='tasks', # 目标交换机
priority=9, # 高优先级
# 执行选项
serializer='json', # 消息序列化器
compression='gzip', # 压缩负载
retry=True, # 失败自动重试
retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
},
# 任务链接
link=log_result.s(), # 成功回调
link_error=handle_error.s(), # 错误回调)
Check result
检查结果
if result.ready():
print(result.get()) # Get result (blocks)
print(result.result) # Get result (non-blocking)
undefinedif result.ready():
print(result.get()) # 获取结果(阻塞)
print(result.result) # 获取结果(非阻塞)
undefinedTask Signatures
任务签名
python
from celery import signaturepython
from celery import signatureCreate signature (doesn't execute)
创建签名(不执行任务)
sig = add.signature((2, 2), countdown=10)
sig = add.s(2, 2) # Shorthand
sig = add.signature((2, 2), countdown=10)
sig = add.s(2, 2) # 简写形式
Partial arguments (currying)
部分参数绑定(柯里化)
partial = add.s(2) # One arg fixed
result = partial.apply_async(args=(4,)) # Add second arg
partial = add.s(2) # 固定一个参数
result = partial.apply_async(args=(4,)) # 传入第二个参数
Immutable signature (args can't be replaced)
不可变签名(参数不可替换)
immutable = add.si(2, 2)
immutable = add.si(2, 2)
Clone and modify
克隆并修改签名
new_sig = sig.clone(countdown=60)
new_sig = sig.clone(countdown=60)
Execute signature
执行签名
result = sig.delay()
result = sig.apply_async()
result = sig() # Synchronous execution
undefinedresult = sig.delay()
result = sig.apply_async()
result = sig() # 同步执行
undefinedResult Handling
结果处理
python
undefinedpython
undefinedBasic result retrieval
基础结果获取
result = add.delay(4, 6)
value = result.get(timeout=10) # Blocks until complete
result = add.delay(4, 6)
value = result.get(timeout=10) # 阻塞直到任务完成
Non-blocking result check
非阻塞结果检查
if result.ready():
print(result.result)
if result.ready():
print(result.result)
Result states
结果状态
print(result.status) # PENDING, STARTED, SUCCESS, FAILURE
print(result.successful()) # True if SUCCESS
print(result.failed()) # True if FAILURE
print(result.status) # PENDING, STARTED, SUCCESS, FAILURE
print(result.successful()) # 成功返回True
print(result.failed()) # 失败返回True
Result metadata
结果元数据
print(result.traceback) # Exception traceback if failed
print(result.info) # Task return value or exception
print(result.traceback) # 失败时的异常栈
print(result.info) # 任务返回值或异常
Forget result (free memory)
遗忘结果(释放内存)
result.forget()
result.forget()
Revoke task (cancel)
撤销任务(取消执行)
result.revoke(terminate=True) # Kill running task
add.AsyncResult(task_id).revoke() # Revoke by ID
---result.revoke(terminate=True) # 终止正在运行的任务
add.AsyncResult(task_id).revoke() # 通过ID撤销任务
---Task Routing
任务路由
Queue Configuration
队列配置
python
undefinedpython
undefinedDefine queues
定义队列
from kombu import Queue, Exchange
app.conf.task_queues = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('high_priority', Exchange('priority'), routing_key='priority.high'),
Queue('low_priority', Exchange('priority'), routing_key='priority.low'),
Queue('emails', Exchange('tasks'), routing_key='tasks.email'),
Queue('reports', Exchange('tasks'), routing_key='tasks.report'),
)
from kombu import Queue, Exchange
app.conf.task_queues = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('high_priority', Exchange('priority'), routing_key='priority.high'),
Queue('low_priority', Exchange('priority'), routing_key='priority.low'),
Queue('emails', Exchange('tasks'), routing_key='tasks.email'),
Queue('reports', Exchange('tasks'), routing_key='tasks.report'),
)
Default queue
默认队列
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'tasks'
app.conf.task_default_routing_key = 'default'
undefinedapp.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'tasks'
app.conf.task_default_routing_key = 'default'
undefinedTask Routing Rules
任务路由规则
python
undefinedpython
undefinedRoute specific tasks to queues
将特定任务路由到指定队列
app.conf.task_routes = {
'myapp.tasks.send_email': {'queue': 'emails'},
'myapp.tasks.generate_report': {'queue': 'reports', 'priority': 9},
'myapp.tasks.*': {'queue': 'default'},
}
app.conf.task_routes = {
'myapp.tasks.send_email': {'queue': 'emails'},
'myapp.tasks.generate_report': {'queue': 'reports', 'priority': 9},
'myapp.tasks.*': {'queue': 'default'},
}
Function-based routing
基于函数的路由
def route_task(name, args, kwargs, options, task=None, **kw):
if 'email' in name:
return {'queue': 'emails', 'routing_key': 'email.send'}
elif 'report' in name:
return {'queue': 'reports', 'priority': 5}
return {'queue': 'default'}
app.conf.task_routes = (route_task,)
undefineddef route_task(name, args, kwargs, options, task=None, **kw):
if 'email' in name:
return {'queue': 'emails', 'routing_key': 'email.send'}
elif 'report' in name:
return {'queue': 'reports', 'priority': 5}
return {'queue': 'default'}
app.conf.task_routes = (route_task,)
undefinedWorker Queue Binding
Worker队列绑定
bash
undefinedbash
undefinedWorker consuming specific queues
Worker消费指定队列
celery -A myapp worker -Q emails,reports --loglevel=info
celery -A myapp worker -Q emails,reports --loglevel=info
Multiple workers for different queues
为不同队列启动多个Worker
celery -A myapp worker -Q high_priority -c 4 --loglevel=info
celery -A myapp worker -Q default -c 2 --loglevel=info
celery -A myapp worker -Q low_priority -c 1 --loglevel=info
undefinedcelery -A myapp worker -Q high_priority -c 4 --loglevel=info
celery -A myapp worker -Q default -c 2 --loglevel=info
celery -A myapp worker -Q low_priority -c 1 --loglevel=info
undefinedPriority Queues
优先级队列
python
undefinedpython
undefinedConfigure priority support
配置优先级支持
app.conf.task_queue_max_priority = 10
app.conf.task_default_priority = 5
app.conf.task_queue_max_priority = 10
app.conf.task_default_priority = 5
Send task with priority
发送带优先级的任务
high_priority_task.apply_async(args=(), priority=9)
low_priority_task.apply_async(args=(), priority=1)
high_priority_task.apply_async(args=(), priority=9)
low_priority_task.apply_async(args=(), priority=1)
Priority-based routing
基于优先级的路由
app.conf.task_routes = {
'critical_task': {'queue': 'default', 'priority': 10},
'background_task': {'queue': 'default', 'priority': 1},
}
---app.conf.task_routes = {
'critical_task': {'queue': 'default', 'priority': 10},
'background_task': {'queue': 'default', 'priority': 1},
}
---Periodic Tasks
周期性任务
Celery Beat Setup
Celery Beat配置
python
undefinedpython
undefinedcelery_config.py
celery_config.py
from celery.schedules import crontab, solar
from celery.schedules import crontab, solar
Periodic task schedule
周期性任务调度
beat_schedule = {
# Run every 30 seconds
'add-every-30-seconds': {
'task': 'myapp.tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
# Run every morning at 7:30 AM
'send-daily-report': {
'task': 'myapp.tasks.send_daily_report',
'schedule': crontab(hour=7, minute=30),
},
# Run every Monday morning
'weekly-cleanup': {
'task': 'myapp.tasks.cleanup',
'schedule': crontab(hour=0, minute=0, day_of_week=1),
},
# Run on specific days
'monthly-report': {
'task': 'myapp.tasks.monthly_report',
'schedule': crontab(hour=0, minute=0, day_of_month='1'),
'kwargs': {'month_offset': 1}
},
# Solar schedule (sunrise/sunset)
'wake-up-at-sunrise': {
'task': 'myapp.tasks.morning_routine',
'schedule': solar('sunrise', -37.81, 144.96), # Melbourne
},}
app.conf.beat_schedule = beat_schedule
undefinedbeat_schedule = {
# 每30秒运行一次
'add-every-30-seconds': {
'task': 'myapp.tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
# 每天早上7:30运行
'send-daily-report': {
'task': 'myapp.tasks.send_daily_report',
'schedule': crontab(hour=7, minute=30),
},
# 每周一凌晨运行
'weekly-cleanup': {
'task': 'myapp.tasks.cleanup',
'schedule': crontab(hour=0, minute=0, day_of_week=1),
},
# 每月1号运行
'monthly-report': {
'task': 'myapp.tasks.monthly_report',
'schedule': crontab(hour=0, minute=0, day_of_month='1'),
'kwargs': {'month_offset': 1}
},
# 太阳时调度(日出/日落)
'wake-up-at-sunrise': {
'task': 'myapp.tasks.morning_routine',
'schedule': solar('sunrise', -37.81, 144.96), # 墨尔本坐标
},}
app.conf.beat_schedule = beat_schedule
undefinedCrontab Patterns
Crontab模式
python
from celery.schedules import crontabpython
from celery.schedules import crontabEvery minute
每分钟运行
crontab()
crontab()
Every 15 minutes
每15分钟运行
crontab(minute='*/15')
crontab(minute='*/15')
Every hour at :30
每小时30分运行
crontab(minute=30)
crontab(minute=30)
Every day at midnight
每天午夜运行
crontab(hour=0, minute=0)
crontab(hour=0, minute=0)
Every weekday at 5 PM
工作日下午5点运行
crontab(hour=17, minute=0, day_of_week='1-5')
crontab(hour=17, minute=0, day_of_week='1-5')
Every Monday, Wednesday, Friday at noon
每周一、三、五中午运行
crontab(hour=12, minute=0, day_of_week='mon,wed,fri')
crontab(hour=12, minute=0, day_of_week='mon,wed,fri')
First day of month
每月1号运行
crontab(hour=0, minute=0, day_of_month='1')
crontab(hour=0, minute=0, day_of_month='1')
Last day of month (use day_of_month='28-31' with logic in task)
每月最后一天运行(需在任务中添加逻辑判断)
crontab(hour=0, minute=0, day_of_month='28-31')
crontab(hour=0, minute=0, day_of_month='28-31')
Quarterly (every 3 months)
每季度运行(每3个月)
crontab(hour=0, minute=0, day_of_month='1', month_of_year='*/3')
undefinedcrontab(hour=0, minute=0, day_of_month='1', month_of_year='*/3')
undefinedRunning Beat Scheduler
启动Beat调度器
bash
undefinedbash
undefinedStart beat scheduler
启动Beat调度器
celery -A myapp beat --loglevel=info
celery -A myapp beat --loglevel=info
Beat with custom scheduler
使用自定义调度器
celery -A myapp beat --scheduler django_celery_beat.schedulers:DatabaseScheduler
celery -A myapp beat --scheduler django_celery_beat.schedulers:DatabaseScheduler
Combine worker and beat (development only)
同时启动Worker和Beat(仅开发环境)
celery -A myapp worker --beat --loglevel=info
undefinedcelery -A myapp worker --beat --loglevel=info
undefinedDynamic Schedules (django-celery-beat)
动态调度(django-celery-beat)
bash
pip install django-celery-beatpython
undefinedbash
pip install django-celery-beatpython
undefinedsettings.py (Django)
settings.py(Django)
INSTALLED_APPS = [
'django_celery_beat',
]
INSTALLED_APPS = [
'django_celery_beat',
]
Migrate database
迁移数据库
python manage.py migrate django_celery_beat
python manage.py migrate django_celery_beat
Run beat with database scheduler
使用数据库调度器启动Beat
celery -A myapp beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
```pythoncelery -A myapp beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
```pythonCreate periodic task via Django admin or ORM
通过Django admin或ORM创建周期性任务
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
import json
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
import json
Create interval schedule (every 10 seconds)
创建间隔调度(每10秒)
schedule, created = IntervalSchedule.objects.get_or_create(
every=10,
period=IntervalSchedule.SECONDS,
)
PeriodicTask.objects.create(
interval=schedule,
name='Import feed every 10 seconds',
task='myapp.tasks.import_feed',
args=json.dumps(['https://example.com/feed']),
)
schedule, created = IntervalSchedule.objects.get_or_create(
every=10,
period=IntervalSchedule.SECONDS,
)
PeriodicTask.objects.create(
interval=schedule,
name='每10秒导入Feed',
task='myapp.tasks.import_feed',
args=json.dumps(['https://example.com/feed']),
)
Create crontab schedule
创建Crontab调度
schedule, created = CrontabSchedule.objects.get_or_create(
minute='0',
hour='/4', # Every 4 hours
day_of_week='',
day_of_month='',
month_of_year='',
)
PeriodicTask.objects.create(
crontab=schedule,
name='Hourly cleanup',
task='myapp.tasks.cleanup',
)
---schedule, created = CrontabSchedule.objects.get_or_create(
minute='0',
hour='/4', # 每4小时
day_of_week='',
day_of_month='',
month_of_year='',
)
PeriodicTask.objects.create(
crontab=schedule,
name='每4小时清理',
task='myapp.tasks.cleanup',
)
---Workflows (Canvas)
工作流(Canvas)
Chains
链式任务
python
from celery import chainpython
from celery import chainSequential execution
顺序执行
result = chain(add.s(2, 2), add.s(4), add.s(8))()
result = chain(add.s(2, 2), add.s(4), add.s(8))()
Equivalent to: add(add(add(2, 2), 4), 8)
等价于:add(add(add(2, 2), 4), 8)
Result: 16
结果:16
Shorthand syntax
简写语法
result = (add.s(2, 2) | add.s(4) | add.s(8))()
result = (add.s(2, 2) | add.s(4) | add.s(8))()
Chain with different tasks
不同任务组成的链式工作流
workflow = (
fetch_data.s(url) |
process_data.s() |
save_results.s()
)
result = workflow.apply_async()
undefinedworkflow = (
fetch_data.s(url) |
process_data.s() |
save_results.s()
)
result = workflow.apply_async()
undefinedGroups
分组任务
python
from celery import grouppython
from celery import groupParallel execution
并行执行
job = group([
add.s(2, 2),
add.s(4, 4),
add.s(8, 8),
])
result = job.apply_async()
job = group([
add.s(2, 2),
add.s(4, 4),
add.s(8, 8),
])
result = job.apply_async()
Wait for all results
等待所有结果返回
results = result.get(timeout=10) # [4, 8, 16]
results = result.get(timeout=10) # [4, 8, 16]
Group with callbacks
带回调的分组任务
job = group([
process_item.s(item) for item in items
]) | summarize_results.s()
undefinedjob = group([
process_item.s(item) for item in items
]) | summarize_results.s()
undefinedChords
和弦任务
python
from celery import chordpython
from celery import chordGroup with callback
分组任务带回调
job = chord([
fetch_url.s(url) for url in urls
])(combine_results.s())
job = chord([
fetch_url.s(url) for url in urls
])(combine_results.s())
Example: Process multiple files, then merge
示例:处理多个文件后合并
workflow = chord([
process_file.s(file) for file in files
])(merge_results.s())
result = workflow.apply_async()
undefinedworkflow = chord([
process_file.s(file) for file in files
])(merge_results.s())
result = workflow.apply_async()
undefinedMap and Starmap
Map与Starmap
python
from celery import grouppython
from celery import groupMap: Apply same task to list of args
Map:对参数列表应用同一任务
results = add.map([(2, 2), (4, 4), (8, 8)])
results = add.map([(2, 2), (4, 4), (8, 8)])
Starmap: Unpack arguments
Starmap:解包参数
results = add.starmap([(2, 2), (4, 4), (8, 8)])
results = add.starmap([(2, 2), (4, 4), (8, 8)])
Equivalent to:
等价于:
results = group([add.s(2, 2), add.s(4, 4), add.s(8, 8)])()
undefinedresults = group([add.s(2, 2), add.s(4, 4), add.s(8, 8)])()
undefinedComplex Workflows
复杂工作流
python
from celery import chain, group, chordpython
from celery import chain, group, chordParallel processing with sequential steps
并行处理+顺序步骤的工作流
workflow = chain(
# Step 1: Fetch data
fetch_data.s(source),
# Step 2: Process in parallel
group([
process_chunk.s(chunk_id) for chunk_id in range(10)
]),
# Step 3: Aggregate results
aggregate.s(),
# Step 4: Save to database
save_results.s())
workflow = chain(
# 步骤1:获取数据
fetch_data.s(source),
# 步骤2:并行处理
group([
process_chunk.s(chunk_id) for chunk_id in range(10)
]),
# 步骤3:聚合结果
aggregate.s(),
# 步骤4:保存到数据库
save_results.s())
Nested chords
嵌套和弦任务
workflow = chord([
chord([
subtask.s(item) for item in chunk
])(process_chunk.s())
for chunk in chunks
])(final_callback.s())
workflow = chord([
chord([
subtask.s(item) for item in chunk
])(process_chunk.s())
for chunk in chunks
])(final_callback.s())
Real-world example: Report generation
实际场景:报表生成
generate_report = chain(
fetch_user_data.s(user_id),
chord([
calculate_stats.s(),
fetch_transactions.s(),
fetch_activity.s(),
])(combine_sections.s()),
render_pdf.s(),
send_email.s(user_email)
)
---generate_report = chain(
fetch_user_data.s(user_id),
chord([
calculate_stats.s(),
fetch_transactions.s(),
fetch_activity.s(),
])(combine_sections.s()),
render_pdf.s(),
send_email.s(user_email)
)
---Error Handling
错误处理
Automatic Retries
自动重试
python
@app.task(
autoretry_for=(RequestException, IOError), # Auto-retry these exceptions
retry_kwargs={'max_retries': 5}, # Max 5 retries
retry_backoff=True, # Exponential backoff
retry_backoff_max=600, # Max 10 minutes backoff
retry_jitter=True, # Add randomness to backoff
)
def fetch_url(url):
response = requests.get(url)
response.raise_for_status()
return response.json()python
@app.task(
autoretry_for=(RequestException, IOError), # 对这些异常自动重试
retry_kwargs={'max_retries': 5}, # 最多重试5次
retry_backoff=True, # 指数退避
retry_backoff_max=600, # 最大退避时间10分钟
retry_jitter=True, # 退避时间添加随机值
)
def fetch_url(url):
response = requests.get(url)
response.raise_for_status()
return response.json()Manual Retries
手动重试
python
@app.task(bind=True, max_retries=3)
def process_data(self, data):
try:
result = external_api_call(data)
return result
except TemporaryError as exc:
# Retry after 60 seconds
raise self.retry(exc=exc, countdown=60)
except PermanentError as exc:
# Don't retry, log and fail
logger.error(f"Permanent error: {exc}")
raise
except Exception as exc:
# Exponential backoff
raise self.retry(
exc=exc,
countdown=2 ** self.request.retries,
max_retries=3
)python
@app.task(bind=True, max_retries=3)
def process_data(self, data):
try:
result = external_api_call(data)
return result
except TemporaryError as exc:
# 60秒后重试
raise self.retry(exc=exc, countdown=60)
except PermanentError as exc:
# 不重试,记录日志并失败
logger.error(f"永久错误:{exc}")
raise
except Exception as exc:
# 指数退避重试
raise self.retry(
exc=exc,
countdown=2 ** self.request.retries,
max_retries=3
)Error Callbacks
错误回调
python
@app.task
def on_error(request, exc, traceback):
"""Called when task fails"""
logger.error(f"Task {request.id} failed: {exc}")
send_alert(f"Task failure: {request.task}", str(exc))
@app.task
def risky_task(data):
return process(data)python
@app.task
def on_error(request, exc, traceback):
"""任务失败时调用"""
logger.error(f"任务{request.id}失败:{exc}")
send_alert(f"任务失败:{request.task}", str(exc))
@app.task
def risky_task(data):
return process(data)Link error callback
绑定错误回调
risky_task.apply_async(
args=(data,),
link_error=on_error.s()
)
undefinedrisky_task.apply_async(
args=(data,),
link_error=on_error.s()
)
undefinedTask Failure Handling
任务失败处理
python
from celery import Task
class CallbackTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Handle task failure"""
logger.error(f"Task {task_id} failed with {exc}")
# Send notification
send_notification('Task Failed', str(exc))
def on_success(self, retval, task_id, args, kwargs):
"""Handle task success"""
logger.info(f"Task {task_id} succeeded: {retval}")
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""Handle task retry"""
logger.warning(f"Task {task_id} retrying: {exc}")
@app.task(base=CallbackTask)
def monitored_task(x):
if x < 0:
raise ValueError("Negative value")
return x * 2python
from celery import Task
class CallbackTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""处理任务失败"""
logger.error(f"任务{task_id}失败:{exc}")
# 发送通知
send_notification('任务失败', str(exc))
def on_success(self, retval, task_id, args, kwargs):
"""处理任务成功"""
logger.info(f"任务{task_id}执行成功:{retval}")
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""处理任务重试"""
logger.warning(f"任务{task_id}将重试:{exc}")
@app.task(base=CallbackTask)
def monitored_task(x):
if x < 0:
raise ValueError("负数无效")
return x * 2Exception Handling Patterns
异常处理模式
python
@app.task(bind=True)
def robust_task(self, data):
# Categorize exceptions
try:
return process(data)
except NetworkError as exc:
# Transient error - retry
raise self.retry(exc=exc, countdown=60, max_retries=5)
except ValidationError as exc:
# Permanent error - don't retry
logger.error(f"Invalid data: {exc}")
return {'status': 'failed', 'error': str(exc)}
except DatabaseError as exc:
# Critical error - retry with exponential backoff
backoff = min(2 ** self.request.retries * 60, 3600)
raise self.retry(exc=exc, countdown=backoff, max_retries=10)
except Exception as exc:
# Unknown error - retry limited times
if self.request.retries < 3:
raise self.retry(exc=exc, countdown=120)
else:
# Max retries exceeded - fail and alert
logger.critical(f"Task failed after retries: {exc}")
send_alert('Critical Task Failure', str(exc))
raisepython
@app.task(bind=True)
def robust_task(self, data):
# 分类处理异常
try:
return process(data)
except NetworkError as exc:
# 临时错误 - 重试
raise self.retry(exc=exc, countdown=60, max_retries=5)
except ValidationError as exc:
# 永久错误 - 不重试
logger.error(f"数据无效:{exc}")
return {'status': 'failed', 'error': str(exc)}
except DatabaseError as exc:
# 严重错误 - 指数退避重试
backoff = min(2 ** self.request.retries * 60, 3600)
raise self.retry(exc=exc, countdown=backoff, max_retries=10)
except Exception as exc:
# 未知错误 - 限制重试次数
if self.request.retries < 3:
raise self.retry(exc=exc, countdown=120)
else:
# 超过最大重试次数 - 失败并告警
logger.critical(f"任务重试后仍失败:{exc}")
send_alert('严重任务失败', str(exc))
raiseMonitoring and Management
监控与管理
Task Events
任务事件
python
undefinedpython
undefinedEnable events
启用事件
app.conf.worker_send_task_events = True
app.conf.task_send_sent_event = True
app.conf.worker_send_task_events = True
app.conf.task_send_sent_event = True
Event listeners
事件监听器
from celery import signals
@signals.task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **extra):
print(f"Task {task.name}[{task_id}] starting")
@signals.task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, retval=None, **extra):
print(f"Task {task.name}[{task_id}] completed: {retval}")
@signals.task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, **extra):
print(f"Task {task_id} failed: {exception}")
@signals.task_retry.connect
def task_retry_handler(sender=None, task_id=None, reason=None, **extra):
print(f"Task {task_id} retrying: {reason}")
undefinedfrom celery import signals
@signals.task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **extra):
print(f"任务{task.name}[{task_id}]开始执行")
@signals.task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, retval=None, **extra):
print(f"任务{task.name}[{task_id}]执行完成:{retval}")
@signals.task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, **extra):
print(f"任务{task_id}失败:{exception}")
@signals.task_retry.connect
def task_retry_handler(sender=None, task_id=None, reason=None, **extra):
print(f"任务{task_id}将重试:{reason}")
undefinedFlower Monitoring
Flower监控
bash
undefinedbash
undefinedInstall Flower
安装Flower
pip install flower
pip install flower
Start Flower
启动Flower
celery -A myapp flower --port=5555
celery -A myapp flower --port=5555
Access dashboard
访问仪表盘
```python
```pythonFlower configuration
Flower配置
flower_basic_auth = ['admin:password']
flower_persistent = True
flower_db = 'flower.db'
flower_max_tasks = 10000
undefinedflower_basic_auth = ['admin:password']
flower_persistent = True
flower_db = 'flower.db'
flower_max_tasks = 10000
undefinedInspecting Workers
检查Worker状态
python
from celery_app import apppython
from celery_app import appGet active tasks
获取活跃任务
i = app.control.inspect()
print(i.active())
i = app.control.inspect()
print(i.active())
Get scheduled tasks
获取调度中的任务
print(i.scheduled())
print(i.scheduled())
Get reserved tasks
获取保留的任务
print(i.reserved())
print(i.reserved())
Get worker stats
获取Worker统计信息
print(i.stats())
print(i.stats())
Get registered tasks
获取已注册任务
print(i.registered())
print(i.registered())
Revoke task
撤销任务
app.control.revoke(task_id, terminate=True)
app.control.revoke(task_id, terminate=True)
Shutdown worker
关闭Worker
app.control.shutdown()
app.control.shutdown()
Pool restart
重启Worker池
app.control.pool_restart()
app.control.pool_restart()
Rate limit
设置速率限制
app.control.rate_limit('myapp.tasks.slow_task', '10/m')
undefinedapp.control.rate_limit('myapp.tasks.slow_task', '10/m')
undefinedCommand Line Inspection
命令行检查
bash
undefinedbash
undefinedList active tasks
列出活跃任务
celery -A myapp inspect active
celery -A myapp inspect active
List scheduled tasks
列出调度中的任务
celery -A myapp inspect scheduled
celery -A myapp inspect scheduled
Worker stats
Worker统计信息
celery -A myapp inspect stats
celery -A myapp inspect stats
Registered tasks
已注册任务
celery -A myapp inspect registered
celery -A myapp inspect registered
Revoke task
撤销任务
celery -A myapp control revoke <task_id>
celery -A myapp control revoke <task_id>
Shutdown workers
关闭Worker
celery -A myapp control shutdown
celery -A myapp control shutdown
Purge all tasks
清空所有任务
celery -A myapp purge
undefinedcelery -A myapp purge
undefinedCustom Metrics
自定义指标
python
@app.task(bind=True)
def tracked_task(self, data):
from prometheus_client import Counter, Histogram
task_counter = Counter('celery_tasks_total', 'Total tasks')
task_duration = Histogram('celery_task_duration_seconds', 'Task duration')
with task_duration.time():
result = process(data)
task_counter.inc()
return resultpython
@app.task(bind=True)
def tracked_task(self, data):
from prometheus_client import Counter, Histogram
task_counter = Counter('celery_tasks_total', '总任务数')
task_duration = Histogram('celery_task_duration_seconds', '任务耗时')
with task_duration.time():
result = process(data)
task_counter.inc()
return resultFramework Integration
框架集成
Django Integration
Django集成
python
undefinedpython
undefinedmyproject/celery.py
myproject/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
myproject/init.py
myproject/init.py
from .celery import app as celery_app
all = ('celery_app',)
from .celery import app as celery_app
all = ('celery_app',)
settings.py
settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
Task in Django app
Django应用中的任务
myapp/tasks.py
myapp/tasks.py
from celery import shared_task
from django.core.mail import send_mail
@shared_task
def send_email_task(subject, message, recipient):
send_mail(subject, message, 'from@example.com', [recipient])
return f"Email sent to {recipient}"
from celery import shared_task
from django.core.mail import send_mail
@shared_task
def send_email_task(subject, message, recipient):
send_mail(subject, message, 'from@example.com', [recipient])
return f"邮件已发送至{recipient}"
Use in views
在视图中使用
from myapp.tasks import send_email_task
def my_view(request):
send_email_task.delay('Hello', 'Welcome!', 'user@example.com')
return HttpResponse('Email queued')
undefinedfrom myapp.tasks import send_email_task
def my_view(request):
send_email_task.delay('Hello', 'Welcome!', 'user@example.com')
return HttpResponse('邮件已加入队列')
undefinedFastAPI Integration
FastAPI集成
python
undefinedpython
undefinedcelery_app.py
celery_app.py
from celery import Celery
celery_app = Celery(
'fastapi_app',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
@celery_app.task
def process_data(data: dict):
# Long-running task
import time
time.sleep(10)
return {"processed": data, "status": "complete"}
from celery import Celery
celery_app = Celery(
'fastapi_app',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
@celery_app.task
def process_data(data: dict):
# 耗时任务
import time
time.sleep(10)
return {"processed": data, "status": "complete"}
main.py
main.py
from fastapi import FastAPI, BackgroundTasks
from celery_app import process_data
app = FastAPI()
@app.post("/process")
async def process_endpoint(data: dict):
# Option 1: FastAPI BackgroundTasks (simple, in-process)
# background_tasks.add_task(process_data, data)
# Option 2: Celery (distributed, persistent)
task = process_data.delay(data)
return {"task_id": task.id, "status": "queued"}@app.get("/status/{task_id}")
async def check_status(task_id: str):
from celery.result import AsyncResult
task = AsyncResult(task_id, app=celery_app)
return {
"task_id": task_id,
"status": task.status,
"result": task.result if task.ready() else None
}
**When to Use Celery vs FastAPI BackgroundTasks**:
- **FastAPI BackgroundTasks**: Simple, fire-and-forget tasks (logging, cleanup)
- **Celery**: Distributed processing, retries, scheduling, task resultsfrom fastapi import FastAPI, BackgroundTasks
from celery_app import process_data
app = FastAPI()
@app.post("/process")
async def process_endpoint(data: dict):
# 选项1:FastAPI BackgroundTasks(简单,进程内)
# background_tasks.add_task(process_data, data)
# 选项2:Celery(分布式,持久化)
task = process_data.delay(data)
return {"task_id": task.id, "status": "已加入队列"}@app.get("/status/{task_id}")
async def check_status(task_id: str):
from celery.result import AsyncResult
task = AsyncResult(task_id, app=celery_app)
return {
"task_id": task_id,
"status": task.status,
"result": task.result if task.ready() else None
}
**Celery vs FastAPI BackgroundTasks 选择建议**:
- **FastAPI BackgroundTasks**:简单的即发即弃任务(日志、清理)
- **Celery**:分布式处理、重试、调度、任务结果存储Flask Integration
Flask集成
python
undefinedpython
undefinedcelery_app.py
celery_app.py
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
broker=app.config['CELERY_BROKER_URL'],
backend=app.config['CELERY_RESULT_BACKEND']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celeryfrom celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
broker=app.config['CELERY_BROKER_URL'],
backend=app.config['CELERY_RESULT_BACKEND']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celeryapp.py
app.py
from flask import Flask
from celery_app import make_celery
app = Flask(name)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/1'
celery = make_celery(app)
@celery.task
def send_email(to, subject, body):
with app.app_context():
# Use Flask-Mail or similar
mail.send(Message(subject, recipients=[to], body=body))
@app.route('/send')
def send_route():
send_email.delay('user@example.com', 'Hello', 'Welcome!')
return 'Email queued'
---from flask import Flask
from celery_app import make_celery
app = Flask(name)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/1'
celery = make_celery(app)
@celery.task
def send_email(to, subject, body):
with app.app_context():
# 使用Flask-Mail或类似库
mail.send(Message(subject, recipients=[to], body=body))
@app.route('/send')
def send_route():
send_email.delay('user@example.com', 'Hello', 'Welcome!')
return '邮件已加入队列'
---Testing Strategies
测试策略
Eager Mode (Synchronous Execution)
急切模式(同步执行)
python
undefinedpython
undefinedconftest.py (pytest)
conftest.py(pytest)
import pytest
from celery_app import app
@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': 'memory://',
'result_backend': 'cache+memory://',
'task_always_eager': True, # Execute tasks synchronously
'task_eager_propagates': True, # Propagate exceptions
}
import pytest
from celery_app import app
@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': 'memory://',
'result_backend': 'cache+memory://',
'task_always_eager': True, # 同步执行任务
'task_eager_propagates': True, # 传播异常
}
Test tasks
测试任务
def test_add_task():
result = add.delay(4, 6)
assert result.get() == 10
def test_task_failure():
with pytest.raises(ValueError):
failing_task.delay()
undefineddef test_add_task():
result = add.delay(4, 6)
assert result.get() == 10
def test_task_failure():
with pytest.raises(ValueError):
failing_task.delay()
undefinedTesting with Real Broker
使用真实代理测试
python
undefinedpython
undefinedconftest.py
conftest.py
import pytest
from celery_app import app
@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': 'redis://localhost:6379/15', # Test database
'result_backend': 'redis://localhost:6379/15',
}
@pytest.fixture
def celery_worker(celery_app):
"""Start worker for tests"""
with celery_app.Worker() as worker:
yield worker
def test_async_task(celery_worker):
result = async_task.delay(data)
assert result.get(timeout=10) == expected
undefinedimport pytest
from celery_app import app
@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': 'redis://localhost:6379/15', # 测试数据库
'result_backend': 'redis://localhost:6379/15',
}
@pytest.fixture
def celery_worker(celery_app):
"""启动测试用Worker"""
with celery_app.Worker() as worker:
yield worker
def test_async_task(celery_worker):
result = async_task.delay(data)
assert result.get(timeout=10) == expected
undefinedMocking External Dependencies
模拟外部依赖
python
from unittest.mock import patch, MagicMock
@app.task
def fetch_and_process(url):
response = requests.get(url)
return process(response.json())
def test_fetch_and_process():
with patch('requests.get') as mock_get:
mock_get.return_value.json.return_value = {'data': 'test'}
result = fetch_and_process.delay('http://example.com')
assert result.get() == expected_result
mock_get.assert_called_once_with('http://example.com')python
from unittest.mock import patch, MagicMock
@app.task
def fetch_and_process(url):
response = requests.get(url)
return process(response.json())
def test_fetch_and_process():
with patch('requests.get') as mock_get:
mock_get.return_value.json.return_value = {'data': 'test'}
result = fetch_and_process.delay('http://example.com')
assert result.get() == expected_result
mock_get.assert_called_once_with('http://example.com')Testing Periodic Tasks
测试周期性任务
python
from celery.schedules import crontab
def test_periodic_task_schedule():
from celery_app import app
schedule = app.conf.beat_schedule['daily-report']
assert schedule['task'] == 'myapp.tasks.daily_report'
assert schedule['schedule'] == crontab(hour=0, minute=0)
def test_periodic_task_execution():
# Test task logic directly
result = daily_report()
assert result['status'] == 'complete'python
from celery.schedules import crontab
def test_periodic_task_schedule():
from celery_app import app
schedule = app.conf.beat_schedule['daily-report']
assert schedule['task'] == 'myapp.tasks.daily_report'
assert schedule['schedule'] == crontab(hour=0, minute=0)
def test_periodic_task_execution():
# 直接测试任务逻辑
result = daily_report()
assert result['status'] == 'complete'Integration Testing
集成测试
python
import pytest
from celery_app import app
@pytest.fixture(scope='module')
def celery_app():
app.conf.update(
broker_url='redis://localhost:6379/15',
result_backend='redis://localhost:6379/15',
)
return app
@pytest.fixture(scope='module')
def celery_worker(celery_app):
with celery_app.Worker() as worker:
yield worker
def test_workflow(celery_worker):
from celery import chain
workflow = chain(
fetch_data.s(url),
process_data.s(),
save_results.s()
)
result = workflow.apply_async()
output = result.get(timeout=30)
assert output['status'] == 'saved'python
import pytest
from celery_app import app
@pytest.fixture(scope='module')
def celery_app():
app.conf.update(
broker_url='redis://localhost:6379/15',
result_backend='redis://localhost:6379/15',
)
return app
@pytest.fixture(scope='module')
def celery_worker(celery_app):
with celery_app.Worker() as worker:
yield worker
def test_workflow(celery_worker):
from celery import chain
workflow = chain(
fetch_data.s(url),
process_data.s(),
save_results.s()
)
result = workflow.apply_async()
output = result.get(timeout=30)
assert output['status'] == 'saved'Production Patterns
生产环境模式
Worker Configuration
Worker配置
bash
undefinedbash
undefinedProduction worker with autoscaling
生产环境Worker,支持自动扩缩容
celery -A myapp worker
--autoscale=10,3
--max-tasks-per-child=1000
--time-limit=300
--soft-time-limit=240
--loglevel=info
--logfile=/var/log/celery/worker.log
--pidfile=/var/run/celery/worker.pid
--autoscale=10,3
--max-tasks-per-child=1000
--time-limit=300
--soft-time-limit=240
--loglevel=info
--logfile=/var/log/celery/worker.log
--pidfile=/var/run/celery/worker.pid
celery -A myapp worker
--autoscale=10,3
--max-tasks-per-child=1000
--time-limit=300
--soft-time-limit=240
--loglevel=info
--logfile=/var/log/celery/worker.log
--pidfile=/var/run/celery/worker.pid
--autoscale=10,3
--max-tasks-per-child=1000
--time-limit=300
--soft-time-limit=240
--loglevel=info
--logfile=/var/log/celery/worker.log
--pidfile=/var/run/celery/worker.pid
Multiple specialized workers
多个专用Worker
celery multi start
worker1 -A myapp -Q high_priority -c 4 --max-tasks-per-child=100
worker2 -A myapp -Q default -c 2 --max-tasks-per-child=1000
worker3 -A myapp -Q low_priority -c 1 --autoscale=3,1
worker1 -A myapp -Q high_priority -c 4 --max-tasks-per-child=100
worker2 -A myapp -Q default -c 2 --max-tasks-per-child=1000
worker3 -A myapp -Q low_priority -c 1 --autoscale=3,1
celery multi start
worker1 -A myapp -Q high_priority -c 4 --max-tasks-per-child=100
worker2 -A myapp -Q default -c 2 --max-tasks-per-child=1000
worker3 -A myapp -Q low_priority -c 1 --autoscale=3,1
worker1 -A myapp -Q high_priority -c 4 --max-tasks-per-child=100
worker2 -A myapp -Q default -c 2 --max-tasks-per-child=1000
worker3 -A myapp -Q low_priority -c 1 --autoscale=3,1
Graceful shutdown
优雅关闭
celery multi stop worker1 worker2 worker3
celery multi stopwait worker1 worker2 worker3 # Wait for tasks to finish
undefinedcelery multi stop worker1 worker2 worker3
celery multi stopwait worker1 worker2 worker3 # 等待任务完成后关闭
undefinedConfiguration Best Practices
配置最佳实践
python
undefinedpython
undefinedproduction_config.py
production_config.py
import os
import os
Broker settings
代理设置
broker_url = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
broker_connection_retry_on_startup = True
broker_pool_limit = 50
broker_url = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
broker_connection_retry_on_startup = True
broker_pool_limit = 50
Result backend
结果后端
result_backend = os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1')
result_expires = 3600 # 1 hour
result_backend = os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1')
result_expires = 3600 # 1小时
Serialization
序列化
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'UTC'
enable_utc = True
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'UTC'
enable_utc = True
Performance
性能优化
worker_prefetch_multiplier = 4 # Tasks to prefetch per worker
worker_max_tasks_per_child = 1000 # Restart worker after N tasks (prevent memory leaks)
task_acks_late = True # Acknowledge after task completes
task_reject_on_worker_lost = True # Requeue if worker dies
worker_prefetch_multiplier = 4 # 每个Worker预取的任务数
worker_max_tasks_per_child = 1000 # 执行N个任务后重启Worker(防止内存泄漏)
task_acks_late = True # 任务执行完成后再确认
task_reject_on_worker_lost = True # Worker崩溃时重新入队
Reliability
可靠性
task_track_started = True # Track when task starts
task_time_limit = 300 # 5 minutes hard limit
task_soft_time_limit = 240 # 4 minutes soft limit
task_track_started = True # 跟踪任务开始时间
task_time_limit = 300 # 5分钟硬限制
task_soft_time_limit = 240 # 4分钟软限制
Logging
日志
worker_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
worker_task_log_format = '[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s'
undefinedworker_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
worker_task_log_format = '[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s'
undefinedSystemd Service
Systemd服务配置
ini
undefinedini
undefined/etc/systemd/system/celery.service
/etc/systemd/system/celery.service
[Unit]
Description=Celery Service
After=network.target redis.target
[Service]
Type=forking
User=celery
Group=celery
WorkingDirectory=/opt/myapp
Environment="PATH=/opt/myapp/venv/bin"
ExecStart=/opt/myapp/venv/bin/celery multi start worker1
-A myapp
--pidfile=/var/run/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
--loglevel=INFO ExecStop=/opt/myapp/venv/bin/celery multi stopwait worker1
--pidfile=/var/run/celery/%n.pid ExecReload=/opt/myapp/venv/bin/celery multi restart worker1
-A myapp
--pidfile=/var/run/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
--loglevel=INFO Restart=always
-A myapp
--pidfile=/var/run/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
--loglevel=INFO ExecStop=/opt/myapp/venv/bin/celery multi stopwait worker1
--pidfile=/var/run/celery/%n.pid ExecReload=/opt/myapp/venv/bin/celery multi restart worker1
-A myapp
--pidfile=/var/run/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
--loglevel=INFO Restart=always
[Install]
WantedBy=multi-user.target
[Unit]
Description=Celery服务
After=network.target redis.target
[Service]
Type=forking
User=celery
Group=celery
WorkingDirectory=/opt/myapp
Environment="PATH=/opt/myapp/venv/bin"
ExecStart=/opt/myapp/venv/bin/celery multi start worker1
-A myapp
--pidfile=/var/run/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
--loglevel=INFO ExecStop=/opt/myapp/venv/bin/celery multi stopwait worker1
--pidfile=/var/run/celery/%n.pid ExecReload=/opt/myapp/venv/bin/celery multi restart worker1
-A myapp
--pidfile=/var/run/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
--loglevel=INFO Restart=always
-A myapp
--pidfile=/var/run/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
--loglevel=INFO ExecStop=/opt/myapp/venv/bin/celery multi stopwait worker1
--pidfile=/var/run/celery/%n.pid ExecReload=/opt/myapp/venv/bin/celery multi restart worker1
-A myapp
--pidfile=/var/run/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
--loglevel=INFO Restart=always
[Install]
WantedBy=multi-user.target
/etc/systemd/system/celerybeat.service
/etc/systemd/system/celerybeat.service
[Unit]
Description=Celery Beat Service
After=network.target redis.target
[Service]
Type=simple
User=celery
Group=celery
WorkingDirectory=/opt/myapp
Environment="PATH=/opt/myapp/venv/bin"
ExecStart=/opt/myapp/venv/bin/celery -A myapp beat
--loglevel=INFO
--pidfile=/var/run/celery/beat.pid Restart=always
--loglevel=INFO
--pidfile=/var/run/celery/beat.pid Restart=always
[Install]
WantedBy=multi-user.target
undefined[Unit]
Description=Celery Beat服务
After=network.target redis.target
[Service]
Type=simple
User=celery
Group=celery
WorkingDirectory=/opt/myapp
Environment="PATH=/opt/myapp/venv/bin"
ExecStart=/opt/myapp/venv/bin/celery -A myapp beat
--loglevel=INFO
--pidfile=/var/run/celery/beat.pid Restart=always
--loglevel=INFO
--pidfile=/var/run/celery/beat.pid Restart=always
[Install]
WantedBy=multi-user.target
undefinedSentry Integration
Sentry集成
python
undefinedpython
undefinedInstall
安装
pip install sentry-sdk
pip install sentry-sdk
Configuration
配置
import sentry_sdk
from sentry_sdk.integrations.celery import CeleryIntegration
sentry_sdk.init(
dsn="https://your-sentry-dsn",
integrations=[CeleryIntegration()],
traces_sample_rate=0.1, # 10% of transactions
)
import sentry_sdk
from sentry_sdk.integrations.celery import CeleryIntegration
sentry_sdk.init(
dsn="https://your-sentry-dsn",
integrations=[CeleryIntegration()],
traces_sample_rate=0.1, # 10%的事务采样
)
Tasks are automatically tracked
任务将自动被跟踪
@app.task
def my_task(x):
# Exceptions automatically sent to Sentry
return risky_operation(x)
undefined@app.task
def my_task(x):
# 异常将自动发送到Sentry
return risky_operation(x)
undefinedRate Limiting
速率限制
python
undefinedpython
undefinedGlobal rate limiting
全局速率限制
app.conf.task_default_rate_limit = '100/m' # 100 tasks per minute
app.conf.task_default_rate_limit = '100/m' # 每分钟100个任务
Per-task rate limiting
单任务速率限制
@app.task(rate_limit='10/m')
def rate_limited_task(x):
return expensive_operation(x)
@app.task(rate_limit='10/m')
def rate_limited_task(x):
return expensive_operation(x)
Dynamic rate limiting
动态速率限制
app.control.rate_limit('myapp.tasks.slow_task', '5/m')
app.control.rate_limit('myapp.tasks.slow_task', '5/m')
Token bucket rate limiting
令牌桶速率限制
@app.task(rate_limit='10/s')
def api_call(endpoint):
return requests.get(endpoint)
undefined@app.task(rate_limit='10/s')
def api_call(endpoint):
return requests.get(endpoint)
undefinedHealth Checks
健康检查
python
undefinedpython
undefinedhealth.py
health.py
from celery_app import app
def check_celery_health():
"""Health check endpoint"""
try:
# Ping workers
i = app.control.inspect()
stats = i.stats()
if not stats:
return {'status': 'unhealthy', 'reason': 'No workers available'}
# Check broker connection
result = app.control.ping(timeout=1.0)
if not result:
return {'status': 'unhealthy', 'reason': 'Workers not responding'}
return {'status': 'healthy', 'workers': len(stats)}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}from celery_app import app
def check_celery_health():
"""健康检查端点"""
try:
# Ping Worker
i = app.control.inspect()
stats = i.stats()
if not stats:
return {'status': 'unhealthy', 'reason': '无可用Worker'}
# 检查代理连接
result = app.control.ping(timeout=1.0)
if not result:
return {'status': 'unhealthy', 'reason': 'Worker无响应'}
return {'status': 'healthy', 'workers': len(stats)}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}FastAPI health endpoint
FastAPI健康检查端点
@app.get("/health/celery")
async def celery_health():
return check_celery_health()
---@app.get("/health/celery")
async def celery_health():
return check_celery_health()
---Performance Optimization
性能优化
Task Optimization
任务优化
python
undefinedpython
undefinedUse ignore_result for fire-and-forget tasks
即发即弃任务使用ignore_result
@app.task(ignore_result=True)
def send_notification(user_id, message):
# Don't need result, save backend overhead
notify(user_id, message)
@app.task(ignore_result=True)
def send_notification(user_id, message):
# 无需存储结果,节省后端开销
notify(user_id, message)
Compression for large payloads
大负载使用压缩
@app.task(compression='gzip')
def process_large_data(data):
return analyze(data)
@app.task(compression='gzip')
def process_large_data(data):
return analyze(data)
Serialization choice
选择高效序列化器
@app.task(serializer='msgpack') # Faster than JSON
def fast_task(data):
return process(data)
undefined@app.task(serializer='msgpack') # 比JSON更快
def fast_task(data):
return process(data)
undefinedWorker Tuning
Worker调优
python
undefinedpython
undefinedWorker concurrency
Worker并发数
worker_concurrency = 4 # CPU-bound: num_cores
worker_concurrency = 20 # I/O-bound: higher value
worker_concurrency = 4 # CPU密集型:等于核心数
worker_concurrency = 20 # I/O密集型:更高的值
Prefetch multiplier (how many tasks to prefetch)
预取乘数(每个Worker预取的任务数)
worker_prefetch_multiplier = 4 # Balance: 4x concurrency
worker_prefetch_multiplier = 4 # 平衡值:并发数的4倍
Task acknowledgment
任务确认时机
task_acks_late = True # Acknowledge after completion (reliability)
task_acks_late = False # Acknowledge on receipt (performance)
task_acks_late = True # 执行完成后确认(可靠性优先)
task_acks_late = False # 接收后确认(性能优先)
Memory management
内存管理
worker_max_tasks_per_child = 1000 # Restart worker after N tasks
worker_max_memory_per_child = 200000 # Restart after 200MB
undefinedworker_max_tasks_per_child = 1000 # 执行N个任务后重启Worker
worker_max_memory_per_child = 200000 # 内存达到200MB后重启
undefinedDatabase Result Backend Optimization
数据库结果后端优化
python
undefinedpython
undefinedUse Redis instead of database for results
使用Redis替代数据库存储结果
result_backend = 'redis://localhost:6379/1'
result_backend = 'redis://localhost:6379/1'
If using database, optimize
如果使用数据库,进行优化
result_backend = 'db+postgresql://user:pass@localhost/celery'
database_engine_options = {
'pool_size': 20,
'pool_recycle': 3600,
}
result_backend = 'db+postgresql://user:pass@localhost/celery'
database_engine_options = {
'pool_size': 20,
'pool_recycle': 3600,
}
Reduce result expiry time
缩短结果过期时间
result_expires = 3600 # 1 hour instead of default 24 hours
undefinedresult_expires = 3600 # 1小时,替代默认的24小时
undefinedTask Chunking
任务分块
python
from celery import grouppython
from celery import groupBad: One task per item (overhead)
不好的实践:每个任务处理一个条目(开销大)
for item in large_list:
process_item.delay(item)
for item in large_list:
process_item.delay(item)
Good: Chunk items
好的实践:分块处理
def chunks(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i + n]
@app.task
def process_batch(items):
return [process_item(item) for item in items]
def chunks(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i + n]
@app.task
def process_batch(items):
return [process_item(item) for item in items]
Process in batches of 100
按100个条目为一批处理
job = group(process_batch.s(chunk) for chunk in chunks(large_list, 100))
result = job.apply_async()
undefinedjob = group(process_batch.s(chunk) for chunk in chunks(large_list, 100))
result = job.apply_async()
undefinedConnection Pooling
连接池
python
undefinedpython
undefinedRedis connection pool
Redis连接池
broker_pool_limit = 50 # Max connections to broker
redis_max_connections = 50
broker_pool_limit = 50 # 与代理的最大连接数
redis_max_connections = 50
Database connection pool
数据库连接池
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
'postgresql://user:pass@localhost/db',
poolclass=QueuePool,
pool_size=20,
max_overflow=0,
)
---from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
'postgresql://user:pass@localhost/db',
poolclass=QueuePool,
pool_size=20,
max_overflow=0,
)
---Common Use Cases
常见使用场景
Email Sending
邮件发送
python
@app.task(bind=True, max_retries=3)
def send_email_task(self, to, subject, body, attachments=None):
try:
msg = EmailMessage(subject, body, 'from@example.com', [to])
if attachments:
for filename, content, mimetype in attachments:
msg.attach(filename, content, mimetype)
msg.send()
return {'status': 'sent', 'to': to}
except SMTPException as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))python
@app.task(bind=True, max_retries=3)
def send_email_task(self, to, subject, body, attachments=None):
try:
msg = EmailMessage(subject, body, 'from@example.com', [to])
if attachments:
for filename, content, mimetype in attachments:
msg.attach(filename, content, mimetype)
msg.send()
return {'status': 'sent', 'to': to}
except SMTPException as exc:
# 指数退避重试
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))Bulk email with rate limiting
批量邮件带速率限制
@app.task(rate_limit='100/m')
def send_bulk_email(recipients, subject, template):
for recipient in recipients:
send_email_task.delay(recipient, subject, render_template(template, recipient))
undefined@app.task(rate_limit='100/m')
def send_bulk_email(recipients, subject, template):
for recipient in recipients:
send_email_task.delay(recipient, subject, render_template(template, recipient))
undefinedReport Generation
报表生成
python
@app.task(bind=True, time_limit=600)
def generate_report(self, report_type, user_id, start_date, end_date):
# Update progress
self.update_state(state='PROGRESS', meta={'current': 0, 'total': 100})
# Fetch data
data = fetch_report_data(report_type, start_date, end_date)
self.update_state(state='PROGRESS', meta={'current': 30, 'total': 100})
# Generate PDF
pdf = render_pdf(data)
self.update_state(state='PROGRESS', meta={'current': 70, 'total': 100})
# Upload to S3
url = upload_to_s3(pdf, f'reports/{user_id}/{report_type}.pdf')
self.update_state(state='PROGRESS', meta={'current': 90, 'total': 100})
# Send notification
send_email_task.delay(
get_user_email(user_id),
'Report Ready',
f'Your report is ready: {url}'
)
return {'status': 'complete', 'url': url}python
@app.task(bind=True, time_limit=600)
def generate_report(self, report_type, user_id, start_date, end_date):
# 更新进度
self.update_state(state='PROGRESS', meta={'current': 0, 'total': 100})
# 获取数据
data = fetch_report_data(report_type, start_date, end_date)
self.update_state(state='PROGRESS', meta={'current': 30, 'total': 100})
# 生成PDF
pdf = render_pdf(data)
self.update_state(state='PROGRESS', meta={'current': 70, 'total': 100})
# 上传到S3
url = upload_to_s3(pdf, f'reports/{user_id}/{report_type}.pdf')
self.update_state(state='PROGRESS', meta={'current': 90, 'total': 100})
# 发送通知
send_email_task.delay(
get_user_email(user_id),
'报表已就绪',
f'您的报表已就绪:{url}'
)
return {'status': 'complete', 'url': url}Check progress
检查进度
from celery.result import AsyncResult
task = AsyncResult(task_id)
if task.state == 'PROGRESS':
print(task.info) # {'current': 30, 'total': 100}
undefinedfrom celery.result import AsyncResult
task = AsyncResult(task_id)
if task.state == 'PROGRESS':
print(task.info) # {'current': 30, 'total': 100}
undefinedData Processing Pipeline
数据处理管道
python
from celery import chain, group
@app.task
def fetch_data(source):
return download(source)
@app.task
def clean_data(raw_data):
return clean(raw_data)
@app.task
def transform_data(clean_data):
return transform(clean_data)
@app.task
def load_data(transformed_data):
save_to_database(transformed_data)
return {'status': 'loaded', 'rows': len(transformed_data)}python
from celery import chain, group
@app task
def fetch_data(source):
return download(source)
@app.task
def clean_data(raw_data):
return clean(raw_data)
@app.task
def transform_data(clean_data):
return transform(clean_data)
@app.task
def load_data(transformed_data):
save_to_database(transformed_data)
return {'status': 'loaded', 'rows': len(transformed_data)}ETL pipeline
ETL管道
etl_pipeline = chain(
fetch_data.s('https://api.example.com/data'),
clean_data.s(),
transform_data.s(),
load_data.s()
)
result = etl_pipeline.apply_async()
undefinedetl_pipeline = chain(
fetch_data.s('https://api.example.com/data'),
clean_data.s(),
transform_data.s(),
load_data.s()
)
result = etl_pipeline.apply_async()
undefinedWebhook Processing
Webhook处理
python
@app.task(bind=True, autoretry_for=(RequestException,), max_retries=5)
def process_webhook(self, webhook_data):
# Validate signature
if not verify_signature(webhook_data):
raise ValueError("Invalid signature")
# Process event
event_type = webhook_data['type']
if event_type == 'payment.success':
update_order_status(webhook_data['order_id'], 'paid')
send_confirmation_email.delay(webhook_data['customer_email'])
elif event_type == 'payment.failed':
notify_admin.delay('Payment Failed', webhook_data)
return {'status': 'processed', 'event': event_type}python
@app.task(bind=True, autoretry_for=(RequestException,), max_retries=5)
def process_webhook(self, webhook_data):
# 验证签名
if not verify_signature(webhook_data):
raise ValueError("无效签名")
# 处理事件
event_type = webhook_data['type']
if event_type == 'payment.success':
update_order_status(webhook_data['order_id'], 'paid')
send_confirmation_email.delay(webhook_data['customer_email'])
elif event_type == 'payment.failed':
notify_admin.delay('支付失败', webhook_data)
return {'status': 'processed', 'event': event_type}FastAPI webhook endpoint
FastAPI Webhook端点
@app.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
data = await request.json()
process_webhook.delay(data)
return {"status": "queued"}
undefined@app.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
data = await request.json()
process_webhook.delay(data)
return {"status": "已加入队列"}
undefinedImage Processing
图片处理
python
from celery import group, chord
@app.task
def resize_image(image_path, size):
from PIL import Image
img = Image.open(image_path)
img.thumbnail(size)
output_path = f"{image_path}_{size[0]}x{size[1]}.jpg"
img.save(output_path)
return output_path
@app.task
def upload_to_cdn(image_paths):
urls = []
for path in image_paths:
url = cdn_upload(path)
urls.append(url)
return urlspython
from celery import group, chord
@app.task
def resize_image(image_path, size):
from PIL import Image
img = Image.open(image_path)
img.thumbnail(size)
output_path = f"{image_path}_{size[0]}x{size[1]}.jpg"
img.save(output_path)
return output_path
@app.task
def upload_to_cdn(image_paths):
urls = []
for path in image_paths:
url = cdn_upload(path)
urls.append(url)
return urlsGenerate multiple sizes and upload
生成多尺寸图片并上传
def process_uploaded_image(image_path):
sizes = [(800, 600), (400, 300), (200, 150), (100, 100)]
workflow = chord([
resize_image.s(image_path, size) for size in sizes
])(upload_to_cdn.s())
return workflow.apply_async()
---def process_uploaded_image(image_path):
sizes = [(800, 600), (400, 300), (200, 150), (100, 100)]
workflow = chord([
resize_image.s(image_path, size) for size in sizes
])(upload_to_cdn.s())
return workflow.apply_async()
---Alternatives Comparison
替代方案对比
Celery vs RQ (Redis Queue)
Celery vs RQ(Redis Queue)
RQ: Simpler Redis-only task queue
When to use RQ:
- Simple use case (no routing, basic retries)
- Redis-only infrastructure
- Python 3 only
- Smaller scale (<1000 tasks/min)
When to use Celery:
- Complex workflows (chains, chords)
- Multiple broker options
- Advanced routing and priorities
- Large scale (>1000 tasks/min)
- Periodic tasks
python
undefinedRQ:简单的仅支持Redis的任务队列
选择RQ的场景:
- 简单使用场景(无路由、基础重试)
- 仅使用Redis的基础设施
- 仅支持Python 3
- 小规模(<1000任务/分钟)
选择Celery的场景:
- 复杂工作流(链式、和弦任务)
- 多代理选项
- 高级路由与优先级
- 大规模(>1000任务/分钟)
- 周期性任务
python
undefinedRQ Example
RQ示例
from redis import Redis
from rq import Queue
redis_conn = Redis()
q = Queue(connection=redis_conn)
job = q.enqueue(my_function, arg1, arg2)
result = job.result
undefinedfrom redis import Redis
from rq import Queue
redis_conn = Redis()
q = Queue(connection=redis_conn)
job = q.enqueue(my_function, arg1, arg2)
result = job.result
undefinedCelery vs Huey
Celery vs Huey
Huey: Lightweight task queue with minimal dependencies
When to use Huey:
- Small to medium projects
- Minimal configuration
- Redis or in-memory only
- Simple periodic tasks
When to use Celery:
- Enterprise-scale applications
- Complex task dependencies
- Multiple broker/backend options
- Advanced monitoring needs
python
undefinedHuey:轻量级任务队列,依赖极少
选择Huey的场景:
- 中小型项目
- 极简配置
- 仅支持Redis或内存队列
- 简单周期性任务
选择Celery的场景:
- 企业级应用
- 复杂任务依赖
- 多代理/后端选项
- 高级监控需求
python
undefinedHuey Example
Huey示例
from huey import RedisHuey
huey = RedisHuey('myapp')
@huey.task()
def add(a, b):
return a + b
result = add(1, 2)
undefinedfrom huey import RedisHuey
huey = RedisHuey('myapp')
@huey.task()
def add(a, b):
return a + b
result = add(1, 2)
undefinedCelery vs Dramatiq
Celery vs Dramatiq
Dramatiq: Modern alternative focusing on reliability
When to use Dramatiq:
- Reliability over features
- Simpler API
- Better type hints
- RabbitMQ or Redis
When to use Celery:
- Mature ecosystem
- More broker options
- Canvas workflows
- Larger community
python
undefinedDramatiq:现代替代方案,专注可靠性
选择Dramatiq的场景:
- 可靠性优先于功能
- 更简洁的API
- 更好的类型提示
- 仅支持RabbitMQ或Redis
选择Celery的场景:
- 成熟的生态系统
- 更多代理选项
- Canvas工作流
- 更大的社区
python
undefinedDramatiq Example
Dramatiq示例
import dramatiq
@dramatiq.actor
def add(x, y):
return x + y
add.send(1, 2)
undefinedimport dramatiq
@dramatiq.actor
def add(x, y):
return x + y
add.send(1, 2)
undefinedCelery vs Cloud Services
Celery vs 云服务
AWS Lambda, Google Cloud Functions, Azure Functions
When to use Cloud Functions:
- Serverless infrastructure
- Event-driven workflows
- Pay-per-execution model
- Auto-scaling
When to use Celery:
- Self-hosted infrastructure
- Complex task workflows
- Cost predictability
- Full control over execution
AWS Lambda、Google Cloud Functions、Azure Functions
选择云函数的场景:
- 无服务器基础设施
- 事件驱动工作流
- 按执行次数付费
- 自动扩缩容
选择Celery的场景:
- 自托管基础设施
- 复杂任务工作流
- 可预测的成本
- 对执行过程完全控制
Best Practices
最佳实践
Task Design
任务设计
-
Idempotency: Tasks should be safe to run multiple timespython
@app.task def process_order(order_id): order = Order.objects.get(id=order_id) if order.status == 'processed': return # Already processed, skip order.process() order.status = 'processed' order.save() -
Small, Focused Tasks: One responsibility per taskpython
# Bad: Monolithic task @app.task def process_user(user_id): send_welcome_email(user_id) create_profile(user_id) setup_notifications(user_id) # Good: Separate tasks @app.task def send_welcome_email(user_id): ... @app.task def create_profile(user_id): ... workflow = group([ send_welcome_email.s(user_id), create_profile.s(user_id), setup_notifications.s(user_id) ]) -
Avoid Database Objects in Arguments: Use IDs insteadpython
# Bad @app.task def process_user(user): # User object ... # Good @app.task def process_user(user_id): user = User.objects.get(id=user_id) ... -
Set Time Limits: Prevent runaway taskspython
@app.task(time_limit=300, soft_time_limit=240) def bounded_task(): ...
-
幂等性:任务多次执行应安全无副作用python
@app.task def process_order(order_id): order = Order.objects.get(id=order_id) if order.status == 'processed': return # 已处理,跳过 order.process() order.status = 'processed' order.save() -
单一职责:每个任务只负责一件事python
# 不好:单体任务 @app.task def process_user(user_id): send_welcome_email(user_id) create_profile(user_id) setup_notifications(user_id) # 好:拆分任务 @app.task def send_welcome_email(user_id): ... @app.task def create_profile(user_id): ... workflow = group([ send_welcome_email.s(user_id), create_profile.s(user_id), setup_notifications.s(user_id) ]) -
参数使用ID而非对象:避免传递数据库对象python
# 不好 @app.task def process_user(user): # User对象 ... # 好 @app.task def process_user(user_id): user = User.objects.get(id=user_id) ... -
设置时间限制:防止任务失控python
@app.task(time_limit=300, soft_time_limit=240) def bounded_task(): ...
Error Handling
错误处理
- Categorize Exceptions: Different handling for different errors
- Use Exponential Backoff: Avoid overwhelming failing services
- Set Max Retries: Don't retry forever
- Log Failures: Always log why tasks fail
- 异常分类:不同错误采用不同处理方式
- 指数退避:避免压垮故障服务
- 设置最大重试次数:不要无限重试
- 记录失败日志:始终记录任务失败原因
Performance
性能优化
- Use : For tasks that don't need results
ignore_result=True - Batch Operations: Process multiple items per task
- Optimize Serialization: Use msgpack for speed
- Connection Pooling: Reuse database/broker connections
- Task Chunking: Avoid creating millions of tiny tasks
- 使用:无需结果的任务启用此选项
ignore_result=True - 批量操作:每个任务处理多个条目
- 优化序列化:使用msgpack提升速度
- 连接池:复用数据库/代理连接
- 任务分块:避免创建大量微小任务
Monitoring
监控
- Enable Events: Track task lifecycle
- Use Flower: Web-based monitoring
- Health Checks: Monitor worker availability
- Sentry Integration: Track errors
- 启用事件:跟踪任务生命周期
- 使用Flower:基于Web的监控工具
- 健康检查:监控Worker可用性
- Sentry集成:跟踪错误
Security
安全
- Validate Input: Always validate task arguments
- Secure Broker: Use authentication and encryption
- Limit Task Execution Time: Prevent resource exhaustion
- Rate Limiting: Protect against task flooding
- 验证输入:始终验证任务参数
- 安全代理:使用认证与加密
- 限制任务执行时间:防止资源耗尽
- 速率限制:防止任务洪水攻击
Troubleshooting
故障排查
Tasks Not Executing
任务未执行
Symptoms: Tasks queued but not processing
Diagnosis:
bash
undefined症状:任务已加入队列但未处理
诊断:
bash
undefinedCheck if workers are running
检查Worker是否运行
celery -A myapp inspect active
celery -A myapp inspect active
Check worker stats
检查Worker统计信息
celery -A myapp inspect stats
celery -A myapp inspect stats
Check registered tasks
检查已注册任务
celery -A myapp inspect registered
**Solutions**:
- Start workers: `celery -A myapp worker`
- Check worker is consuming correct queues
- Verify task routing configuration
- Check broker connectivitycelery -A myapp inspect registered
**解决方案**:
- 启动Worker:`celery -A myapp worker`
- 检查Worker是否消费正确队列
- 验证任务路由配置
- 检查代理连接Tasks Failing Silently
任务静默失败
Symptoms: Tasks show SUCCESS but don't work
Diagnosis:
python
undefined症状:任务显示SUCCESS但未实际执行
诊断:
python
undefinedEnable task tracking
启用任务跟踪
app.conf.task_track_started = True
app.conf.task_track_started = True
Check task result and traceback
检查任务结果与异常栈
result = task.delay()
if result.failed():
print(result.traceback)
**Solutions**:
- Check logs: `celery -A myapp worker --loglevel=debug`
- Enable eager mode in tests to see exceptions
- Use `task_eager_propagates = True` in testsresult = task.delay()
if result.failed():
print(result.traceback)
**解决方案**:
- 查看日志:`celery -A myapp worker --loglevel=debug`
- 测试环境启用急切模式查看异常
- 测试环境设置`task_eager_propagates = True`Memory Leaks
内存泄漏
Symptoms: Worker memory grows over time
Solutions:
python
undefined症状:Worker内存持续增长
解决方案:
python
undefinedRestart workers after N tasks
执行N个任务后重启Worker
worker_max_tasks_per_child = 1000
worker_max_tasks_per_child = 1000
Restart on memory limit
达到内存限制后重启
worker_max_memory_per_child = 200000 # 200MB
undefinedworker_max_memory_per_child = 200000 # 200MB
undefinedSlow Task Execution
任务执行缓慢
Symptoms: Tasks taking longer than expected
Diagnosis:
python
undefined症状:任务执行时间远超预期
诊断:
python
undefinedAdd timing
添加计时
import time
@app.task(bind=True)
def timed_task(self):
start = time.time()
result = slow_operation()
duration = time.time() - start
logger.info(f"Task {self.request.id} took {duration}s")
return result
**Solutions**:
- Increase worker concurrency
- Optimize task code
- Use task chunking
- Add more workersimport time
@app.task(bind=True)
def timed_task(self):
start = time.time()
result = slow_operation()
duration = time.time() - start
logger.info(f"任务{self.request.id}耗时{duration}秒")
return result
**解决方案**:
- 增加Worker并发数
- 优化任务代码
- 使用任务分块
- 添加更多WorkerBroker Connection Issues
代理连接问题
Symptoms: Tasks not reaching workers
Diagnosis:
bash
undefined症状:任务无法到达Worker
诊断:
bash
undefinedTest broker connection
测试代理连接
python -c "from celery_app import app; print(app.connection().connect())"
**Solutions**:
- Check broker is running: `redis-cli ping` or `rabbitmqctl status`
- Verify broker URL in configuration
- Check network connectivity
- Enable connection retry: `broker_connection_retry_on_startup = True`python -c "from celery_app import app; print(app.connection().connect())"
**解决方案**:
- 检查代理是否运行:`redis-cli ping` 或 `rabbitmqctl status`
- 验证配置中的代理URL
- 检查网络连接
- 启用连接重试:`broker_connection_retry_on_startup = True`Task Results Not Persisting
任务结果未持久化
Symptoms: returns None
result.get()Solutions:
- Verify result backend configured
- Check task doesn't have
ignore_result=True - Verify result hasn't expired ()
result_expires - Test backend connection
症状:返回None
result.get()解决方案:
- 验证结果后端配置
- 检查任务是否设置
ignore_result=True - 验证结果是否已过期()
result_expires - 测试后端连接
Beat Not Scheduling Tasks
Beat未调度任务
Symptoms: Periodic tasks not running
Diagnosis:
bash
undefined症状:周期性任务未运行
诊断:
bash
undefinedCheck beat is running
检查Beat是否运行
ps aux | grep celery | grep beat
ps aux | grep celery | grep beat
Check beat schedule
检查Beat调度
celery -A myapp inspect scheduled
**Solutions**:
- Ensure beat process is running
- Verify `beat_schedule` configuration
- Check beat log for errors
- Use database scheduler for dynamic schedulescelery -A myapp inspect scheduled
**解决方案**:
- 确保Beat进程运行
- 验证`beat_schedule`配置
- 查看Beat日志中的错误
- 动态调度使用数据库调度器Worker Crashes
Worker崩溃
Symptoms: Workers die unexpectedly
Solutions:
- Check logs for errors
- Set to prevent memory leaks
worker_max_tasks_per_child - Add task time limits
- Use systemd for automatic restart
- Monitor with Flower
症状:Worker意外终止
解决方案:
- 查看日志中的错误
- 设置防止内存泄漏
worker_max_tasks_per_child - 添加任务时间限制
- 使用systemd自动重启
- 使用Flower监控
Task Queue Buildup
任务队列堆积
Symptoms: Tasks accumulating in queue
Solutions:
- Add more workers
- Increase worker concurrency
- Optimize slow tasks
- Add task routing to distribute load
- Check for blocked workers
症状:任务在队列中累积
解决方案:
- 添加更多Worker
- 增加Worker并发数
- 优化缓慢任务
- 任务路由分发负载
- 检查是否有阻塞的Worker
Advanced Configuration
高级配置
Custom Task Classes
自定义任务类
python
from celery import Task
class DatabaseTask(Task):
"""Task that manages database connections"""
_db = None
@property
def db(self):
if self._db is None:
self._db = create_db_connection()
return self._db
def after_return(self, status, retval, task_id, args, kwargs, einfo):
"""Close connection after task"""
if self._db is not None:
self._db.close()
@app.task(base=DatabaseTask)
def db_task(query):
return db_task.db.execute(query)python
from celery import Task
class DatabaseTask(Task):
"""管理数据库连接的任务类"""
_db = None
@property
def db(self):
if self._db is None:
self._db = create_db_connection()
return self._db
def after_return(self, status, retval, task_id, args, kwargs, einfo):
"""任务结束后关闭连接"""
if self._db is not None:
self._db.close()
@app.task(base=DatabaseTask)
def db_task(query):
return db_task.db.execute(query)Custom Serializers
自定义序列化器
python
from kombu.serialization import register
def my_encoder(obj):
# Custom encoding logic
return json.dumps(obj)
def my_decoder(data):
# Custom decoding logic
return json.loads(data)
register('myjson', my_encoder, my_decoder,
content_type='application/x-myjson',
content_encoding='utf-8')
app.conf.task_serializer = 'myjson'python
from kombu.serialization import register
def my_encoder(obj):
# 自定义编码逻辑
return json.dumps(obj)
def my_decoder(data):
# 自定义解码逻辑
return json.loads(data)
register('myjson', my_encoder, my_decoder,
content_type='application/x-myjson',
content_encoding='utf-8')
app.conf.task_serializer = 'myjson'Task Inheritance
任务继承
python
class BaseTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
send_alert(f"Task {self.name} failed", str(exc))
def on_retry(self, exc, task_id, args, kwargs, einfo):
logger.warning(f"Task {self.name} retrying")
@app.task(base=BaseTask)
def monitored_task():
return perform_work()End of Celery Skill Documentation
For more information:
- Official Documentation: https://docs.celeryq.dev/
- GitHub: https://github.com/celery/celery
- Community: https://groups.google.com/forum/#!forum/celery-users
python
class BaseTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
send_alert(f"任务{self.name}失败", str(exc))
def on_retry(self, exc, task_id, args, kwargs, einfo):
logger.warning(f"任务{self.name}将重试")
@app.task(base=BaseTask)
def monitored_task():
return perform_work()Celery技能文档结束
更多信息:
Related Skills
相关技能
When using Celery, these skills enhance your workflow:
- django: Django + Celery integration for background tasks
- fastapi-local-dev: FastAPI + Celery patterns for async API operations
- test-driven-development: Testing async tasks and task chains
- systematic-debugging: Debugging distributed task failures and race conditions
[Full documentation available in these skills if deployed in your bundle]
使用Celery时,以下技能可提升工作流:
- django: Django + Celery集成实现后台任务
- fastapi-local-dev: FastAPI + Celery异步API操作模式
- test-driven-development: 异步任务与任务链的测试
- systematic-debugging: 分布式任务失败与竞态条件调试
[若在技能包中部署,可查看这些技能的完整文档]