celery

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Celery: Distributed Task Queue

Celery:分布式任务队列

Summary

概述

Celery is a distributed task queue system for Python that enables asynchronous execution of background jobs across multiple workers. It supports scheduling, retries, task workflows, and integrates seamlessly with Django, Flask, and FastAPI.
Celery是一款面向Python的分布式任务队列系统,支持跨多个Worker异步执行后台任务。它具备任务调度、重试、工作流编排能力,可与Django、Flask和FastAPI无缝集成。

When to Use

适用场景

  • Background Processing: Offload long-running operations (email, file processing, reports)
  • Scheduled Tasks: Cron-like periodic jobs (cleanup, backups, data sync)
  • Distributed Computing: Process tasks across multiple workers/servers
  • Async Workflows: Chain, group, and orchestrate complex task dependencies
  • Real-time Processing: Handle webhooks, notifications, data pipelines
  • Load Balancing: Distribute CPU-intensive work across workers
Don't Use When:
  • Simple async I/O (use
    asyncio
    instead)
  • Real-time request/response (use async web frameworks)
  • Sub-second latency required (use in-memory queues)
  • Minimal infrastructure (use simpler alternatives like RQ or Huey)
  • 后台处理:卸载耗时操作(邮件发送、文件处理、报表生成)
  • 定时任务:类Cron的周期性任务(清理、备份、数据同步)
  • 分布式计算:跨多个Worker/服务器处理任务
  • 异步工作流:链式、分组编排复杂任务依赖
  • 实时处理:处理Webhook、通知、数据管道
  • 负载均衡:将CPU密集型工作分配给多个Worker
不适用场景:
  • 简单异步I/O(建议使用
    asyncio
  • 实时请求/响应(建议使用异步Web框架)
  • 亚秒级延迟需求(建议使用内存队列)
  • 极简基础设施(建议使用RQ或Huey等轻量替代方案)

Quick Start

快速开始

Installation

安装

bash
undefined
bash
undefined

Basic installation

基础安装

pip install celery
pip install celery

With Redis broker

搭配Redis消息代理

pip install celery[redis]
pip install celery[redis]

With RabbitMQ broker

搭配RabbitMQ消息代理

pip install celery[amqp]
pip install celery[amqp]

Full batteries (recommended)

完整安装(推荐)

pip install celery[redis,msgpack,auth,cassandra,elasticsearch,s3,sqs]
undefined
pip install celery[redis,msgpack,auth,cassandra,elasticsearch,s3,sqs]
undefined

Basic Setup

基础配置

python
undefined
python
undefined

celery_app.py

celery_app.py

from celery import Celery
from celery import Celery

Create Celery app with Redis broker

创建基于Redis的Celery应用

app = Celery( 'myapp', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1' )
app = Celery( 'myapp', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1' )

Configuration

配置项

app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone='UTC', enable_utc=True, )
app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone='UTC', enable_utc=True, )

Define a task

定义任务

@app.task def add(x, y): return x + y
@app.task def send_email(to, subject, body): # Simulate email sending import time time.sleep(2) print(f"Email sent to {to}: {subject}") return {"status": "sent", "to": to}
undefined
@app.task def add(x, y): return x + y
@app.task def send_email(to, subject, body): # 模拟邮件发送 import time time.sleep(2) print(f"已向{to}发送邮件:{subject}") return {"status": "sent", "to": to}
undefined

Running Workers

启动Worker

bash
undefined
bash
undefined

Start worker

启动单个Worker

celery -A celery_app worker --loglevel=info
celery -A celery_app worker --loglevel=info

Multiple workers with concurrency

启动带并发能力的多个Worker

celery -A celery_app worker --concurrency=4 --loglevel=info
celery -A celery_app worker --concurrency=4 --loglevel=info

Named worker for specific queues

启动指定队列的命名Worker

celery -A celery_app worker -Q emails,reports --loglevel=info
undefined
celery -A celery_app worker -Q emails,reports --loglevel=info
undefined

Executing Tasks

执行任务

python
undefined
python
undefined

Call task asynchronously

异步调用任务

result = add.delay(4, 6)
result = add.delay(4, 6)

Wait for result

等待结果返回

print(result.get(timeout=10)) # 10
print(result.get(timeout=10)) # 10

Apply async with options

带参数的异步调用

result = send_email.apply_async( args=['user@example.com', 'Hello', 'Welcome!'], countdown=60 # Execute after 60 seconds )
result = send_email.apply_async( args=['user@example.com', 'Hello', 'Welcome!'], countdown=60 # 60秒后执行 )

Check task state

检查任务状态

print(result.status) # PENDING, STARTED, SUCCESS, FAILURE

---
print(result.status) # PENDING, STARTED, SUCCESS, FAILURE

---

Core Concepts

核心概念

Architecture Components

架构组件

Broker: Message queue that stores tasks
  • Redis (recommended for most use cases)
  • RabbitMQ (enterprise-grade, complex)
  • Amazon SQS (serverless, AWS-native)
Workers: Processes that execute tasks
  • Pull tasks from broker
  • Execute task code
  • Store results in backend
Result Backend: Storage for task results
  • Redis (fast, in-memory)
  • Database (PostgreSQL, MySQL)
  • S3 (large results)
  • Cassandra, Elasticsearch (specialized)
Beat Scheduler: Periodic task scheduler
  • Cron-like scheduling
  • Interval-based tasks
  • Stores schedule in database or file
Broker(消息代理):存储任务的消息队列
  • Redis(多数场景推荐)
  • RabbitMQ(企业级,适用于复杂场景)
  • Amazon SQS(无服务器,AWS原生)
Workers(工作进程):执行任务的进程
  • 从Broker拉取任务
  • 执行任务代码
  • 将结果存储到Result Backend
Result Backend(结果后端):存储任务结果的组件
  • Redis(快速,内存型)
  • 数据库(PostgreSQL、MySQL)
  • S3(存储大结果)
  • Cassandra、Elasticsearch(专用存储)
Beat Scheduler(定时调度器):周期性任务调度器
  • 类Cron的调度规则
  • 基于时间间隔的任务
  • 调度规则存储于数据库或文件

Task States

任务状态

PENDING → STARTED → SUCCESS
                 → RETRY → SUCCESS
                 → FAILURE
  • PENDING: Task waiting in queue
  • STARTED: Worker picked up task
  • SUCCESS: Task completed successfully
  • FAILURE: Task raised exception
  • RETRY: Task will retry after failure
  • REVOKED: Task cancelled before execution

PENDING → STARTED → SUCCESS
                 → RETRY → SUCCESS
                 → FAILURE
  • PENDING:任务在队列中等待
  • STARTED:Worker已拾取任务
  • SUCCESS:任务执行成功
  • FAILURE:任务执行抛出异常
  • RETRY:任务失败后将重试
  • REVOKED:任务在执行前被取消

Broker Setup

消息代理配置

Redis Configuration

Redis配置

python
undefined
python
undefined

celery_config.py

celery_config.py

broker_url = 'redis://localhost:6379/0' result_backend = 'redis://localhost:6379/1'
broker_url = 'redis://localhost:6379/0' result_backend = 'redis://localhost:6379/1'

With authentication

带认证的配置

broker_url = 'redis://:password@localhost:6379/0'
broker_url = 'redis://:password@localhost:6379/0'

Redis Sentinel (high availability)

Redis Sentinel(高可用)

broker_url = 'sentinel://localhost:26379;sentinel://localhost:26380' broker_transport_options = { 'master_name': 'mymaster', 'sentinel_kwargs': {'password': 'password'}, }
broker_url = 'sentinel://localhost:26379;sentinel://localhost:26380' broker_transport_options = { 'master_name': 'mymaster', 'sentinel_kwargs': {'password': 'password'}, }

Redis connection pool settings

Redis连接池设置

broker_pool_limit = 10 broker_connection_retry = True broker_connection_retry_on_startup = True broker_connection_max_retries = 10
undefined
broker_pool_limit = 10 broker_connection_retry = True broker_connection_retry_on_startup = True broker_connection_max_retries = 10
undefined

RabbitMQ Configuration

RabbitMQ配置

python
undefined
python
undefined

Basic RabbitMQ

基础RabbitMQ配置

broker_url = 'amqp://guest:guest@localhost:5672//'
broker_url = 'amqp://guest:guest@localhost:5672//'

With virtual host

带虚拟主机的配置

broker_url = 'amqp://user:password@localhost:5672/myvhost'
broker_url = 'amqp://user:password@localhost:5672/myvhost'

High availability (multiple brokers)

高可用配置(多代理)

broker_url = [ 'amqp://user:password@host1:5672//', 'amqp://user:password@host2:5672//', ]
broker_url = [ 'amqp://user:password@host1:5672//', 'amqp://user:password@host2:5672//', ]

RabbitMQ-specific settings

RabbitMQ专属设置

broker_heartbeat = 30 broker_pool_limit = 10
undefined
broker_heartbeat = 30 broker_pool_limit = 10
undefined

Amazon SQS Configuration

Amazon SQS配置

python
undefined
python
undefined

AWS SQS (serverless)

AWS SQS(无服务器)

broker_url = 'sqs://' broker_transport_options = { 'region': 'us-east-1', 'queue_name_prefix': 'myapp-', 'visibility_timeout': 3600, 'polling_interval': 1, }
broker_url = 'sqs://' broker_transport_options = { 'region': 'us-east-1', 'queue_name_prefix': 'myapp-', 'visibility_timeout': 3600, 'polling_interval': 1, }

With custom credentials

自定义凭证配置

import boto3 broker_transport_options = { 'region': 'us-east-1', 'predefined_queues': { 'default': { 'url': 'https://sqs.us-east-1.amazonaws.com/123456789/myapp-default', } } }

---
import boto3 broker_transport_options = { 'region': 'us-east-1', 'predefined_queues': { 'default': { 'url': 'https://sqs.us-east-1.amazonaws.com/123456789/myapp-default', } } }

---

Task Basics

任务基础

Task Definition

任务定义

python
from celery import Task, shared_task
from celery_app import app
python
from celery import Task, shared_task
from celery_app import app

Method 1: Decorator

方式1:装饰器

@app.task def simple_task(x, y): return x + y
@app.task def simple_task(x, y): return x + y

Method 2: Shared task (framework-agnostic)

方式2:共享任务(框架无关)

@shared_task def framework_task(data): return process(data)
@shared_task def framework_task(data): return process(data)

Method 3: Task class (advanced)

方式3:任务类(高级)

class CustomTask(Task): def on_success(self, retval, task_id, args, kwargs): print(f"Task {task_id} succeeded with {retval}")
def on_failure(self, exc, task_id, args, kwargs, einfo):
    print(f"Task {task_id} failed: {exc}")

def on_retry(self, exc, task_id, args, kwargs, einfo):
    print(f"Task {task_id} retrying: {exc}")
@app.task(base=CustomTask) def monitored_task(x): return x * 2
undefined
class CustomTask(Task): def on_success(self, retval, task_id, args, kwargs): print(f"任务{task_id}执行成功,结果:{retval}")
def on_failure(self, exc, task_id, args, kwargs, einfo):
    print(f"任务{task_id}执行失败:{exc}")

def on_retry(self, exc, task_id, args, kwargs, einfo):
    print(f"任务{task_id}将重试:{exc}")
@app.task(base=CustomTask) def monitored_task(x): return x * 2
undefined

Task Options

任务选项

python
@app.task(
    name='custom.task.name',           # Custom task name
    bind=True,                          # Bind task instance as first arg
    ignore_result=True,                 # Don't store result (performance)
    max_retries=3,                      # Max retry attempts
    default_retry_delay=60,             # Retry delay in seconds
    rate_limit='100/h',                 # Rate limiting
    time_limit=300,                     # Hard time limit (kills task)
    soft_time_limit=240,                # Soft time limit (raises exception)
    serializer='json',                  # Task serializer
    compression='gzip',                 # Compress large messages
    priority=5,                         # Task priority (0-9)
    queue='high_priority',              # Target queue
    routing_key='priority.high',        # Routing key
    acks_late=True,                     # Acknowledge after execution
    reject_on_worker_lost=True,         # Reject if worker dies
)
def advanced_task(self, data):
    try:
        return process(data)
    except Exception as exc:
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)
python
@app.task(
    name='custom.task.name',           # 自定义任务名称
    bind=True,                          # 将任务实例绑定为第一个参数
    ignore_result=True,                 # 不存储结果(提升性能)
    max_retries=3,                      # 最大重试次数
    default_retry_delay=60,             # 重试间隔(秒)
    rate_limit='100/h',                 # 速率限制
    time_limit=300,                     # 硬时间限制(超时终止任务)
    soft_time_limit=240,                # 软时间限制(超时抛出异常)
    serializer='json',                  # 任务序列化器
    compression='gzip',                 # 压缩大消息
    priority=5,                         # 任务优先级(0-9)
    queue='high_priority',              # 目标队列
    routing_key='priority.high',        # 路由键
    acks_late=True,                     # 执行完成后再确认
    reject_on_worker_lost=True,         # Worker崩溃时拒绝任务
)
def advanced_task(self, data):
    try:
        return process(data)
    except Exception as exc:
        # 指数退避重试
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

Task Context (bind=True)

任务上下文(bind=True)

python
@app.task(bind=True)
def context_aware_task(self, x, y):
    # Access task metadata
    print(f"Task ID: {self.request.id}")
    print(f"Task Name: {self.name}")
    print(f"Args: {self.request.args}")
    print(f"Kwargs: {self.request.kwargs}")
    print(f"Retries: {self.request.retries}")
    print(f"Delivery Info: {self.request.delivery_info}")

    # Manual retry
    try:
        result = risky_operation(x, y)
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60, max_retries=3)

    return result

python
@app.task(bind=True)
def context_aware_task(self, x, y):
    # 访问任务元数据
    print(f"任务ID:{self.request.id}")
    print(f"任务名称:{self.name}")
    print(f"参数:{self.request.args}")
    print(f"关键字参数:{self.request.kwargs}")
    print(f"重试次数:{self.request.retries}")
    print(f"投递信息:{self.request.delivery_info}")

    # 手动重试
    try:
        result = risky_operation(x, y)
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60, max_retries=3)

    return result

Task Execution

任务执行

Delay vs Apply Async

Delay与Apply Async对比

python
undefined
python
undefined

delay() - Simple async execution

delay() - 简单异步执行

result = add.delay(4, 6)
result = add.delay(4, 6)

apply_async() - Full control

apply_async() - 完全控制执行参数

result = add.apply_async( args=(4, 6), kwargs={'extra': 'data'},
# Timing options
countdown=60,                    # Execute after N seconds
eta=datetime(2025, 12, 1, 10, 0),  # Execute at specific time
expires=3600,                    # Task expires after N seconds

# Routing options
queue='math',                    # Target specific queue
routing_key='math.add',          # Custom routing key
exchange='tasks',                # Target exchange
priority=9,                      # High priority

# Execution options
serializer='json',               # Message serializer
compression='gzip',              # Compress payload
retry=True,                      # Auto-retry on failure
retry_policy={
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
},

# Task linking
link=log_result.s(),             # Success callback
link_error=handle_error.s(),     # Error callback
)
result = add.apply_async( args=(4, 6), kwargs={'extra': 'data'},
# 时间选项
countdown=60,                    # N秒后执行
eta=datetime(2025, 12, 1, 10, 0),  # 特定时间执行
expires=3600,                    # N秒后任务过期

# 路由选项
queue='math',                    # 目标特定队列
routing_key='math.add',          # 自定义路由键
exchange='tasks',                # 目标交换机
priority=9,                      # 高优先级

# 执行选项
serializer='json',               # 消息序列化器
compression='gzip',              # 压缩负载
retry=True,                      # 失败自动重试
retry_policy={
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
},

# 任务链接
link=log_result.s(),             # 成功回调
link_error=handle_error.s(),     # 错误回调
)

Check result

检查结果

if result.ready(): print(result.get()) # Get result (blocks) print(result.result) # Get result (non-blocking)
undefined
if result.ready(): print(result.get()) # 获取结果(阻塞) print(result.result) # 获取结果(非阻塞)
undefined

Task Signatures

任务签名

python
from celery import signature
python
from celery import signature

Create signature (doesn't execute)

创建签名(不执行任务)

sig = add.signature((2, 2), countdown=10) sig = add.s(2, 2) # Shorthand
sig = add.signature((2, 2), countdown=10) sig = add.s(2, 2) # 简写形式

Partial arguments (currying)

部分参数绑定(柯里化)

partial = add.s(2) # One arg fixed result = partial.apply_async(args=(4,)) # Add second arg
partial = add.s(2) # 固定一个参数 result = partial.apply_async(args=(4,)) # 传入第二个参数

Immutable signature (args can't be replaced)

不可变签名(参数不可替换)

immutable = add.si(2, 2)
immutable = add.si(2, 2)

Clone and modify

克隆并修改签名

new_sig = sig.clone(countdown=60)
new_sig = sig.clone(countdown=60)

Execute signature

执行签名

result = sig.delay() result = sig.apply_async() result = sig() # Synchronous execution
undefined
result = sig.delay() result = sig.apply_async() result = sig() # 同步执行
undefined

Result Handling

结果处理

python
undefined
python
undefined

Basic result retrieval

基础结果获取

result = add.delay(4, 6) value = result.get(timeout=10) # Blocks until complete
result = add.delay(4, 6) value = result.get(timeout=10) # 阻塞直到任务完成

Non-blocking result check

非阻塞结果检查

if result.ready(): print(result.result)
if result.ready(): print(result.result)

Result states

结果状态

print(result.status) # PENDING, STARTED, SUCCESS, FAILURE print(result.successful()) # True if SUCCESS print(result.failed()) # True if FAILURE
print(result.status) # PENDING, STARTED, SUCCESS, FAILURE print(result.successful()) # 成功返回True print(result.failed()) # 失败返回True

Result metadata

结果元数据

print(result.traceback) # Exception traceback if failed print(result.info) # Task return value or exception
print(result.traceback) # 失败时的异常栈 print(result.info) # 任务返回值或异常

Forget result (free memory)

遗忘结果(释放内存)

result.forget()
result.forget()

Revoke task (cancel)

撤销任务(取消执行)

result.revoke(terminate=True) # Kill running task add.AsyncResult(task_id).revoke() # Revoke by ID

---
result.revoke(terminate=True) # 终止正在运行的任务 add.AsyncResult(task_id).revoke() # 通过ID撤销任务

---

Task Routing

任务路由

Queue Configuration

队列配置

python
undefined
python
undefined

Define queues

定义队列

from kombu import Queue, Exchange
app.conf.task_queues = ( Queue('default', Exchange('default'), routing_key='default'), Queue('high_priority', Exchange('priority'), routing_key='priority.high'), Queue('low_priority', Exchange('priority'), routing_key='priority.low'), Queue('emails', Exchange('tasks'), routing_key='tasks.email'), Queue('reports', Exchange('tasks'), routing_key='tasks.report'), )
from kombu import Queue, Exchange
app.conf.task_queues = ( Queue('default', Exchange('default'), routing_key='default'), Queue('high_priority', Exchange('priority'), routing_key='priority.high'), Queue('low_priority', Exchange('priority'), routing_key='priority.low'), Queue('emails', Exchange('tasks'), routing_key='tasks.email'), Queue('reports', Exchange('tasks'), routing_key='tasks.report'), )

Default queue

默认队列

app.conf.task_default_queue = 'default' app.conf.task_default_exchange = 'tasks' app.conf.task_default_routing_key = 'default'
undefined
app.conf.task_default_queue = 'default' app.conf.task_default_exchange = 'tasks' app.conf.task_default_routing_key = 'default'
undefined

Task Routing Rules

任务路由规则

python
undefined
python
undefined

Route specific tasks to queues

将特定任务路由到指定队列

app.conf.task_routes = { 'myapp.tasks.send_email': {'queue': 'emails'}, 'myapp.tasks.generate_report': {'queue': 'reports', 'priority': 9}, 'myapp.tasks.*': {'queue': 'default'}, }
app.conf.task_routes = { 'myapp.tasks.send_email': {'queue': 'emails'}, 'myapp.tasks.generate_report': {'queue': 'reports', 'priority': 9}, 'myapp.tasks.*': {'queue': 'default'}, }

Function-based routing

基于函数的路由

def route_task(name, args, kwargs, options, task=None, **kw): if 'email' in name: return {'queue': 'emails', 'routing_key': 'email.send'} elif 'report' in name: return {'queue': 'reports', 'priority': 5} return {'queue': 'default'}
app.conf.task_routes = (route_task,)
undefined
def route_task(name, args, kwargs, options, task=None, **kw): if 'email' in name: return {'queue': 'emails', 'routing_key': 'email.send'} elif 'report' in name: return {'queue': 'reports', 'priority': 5} return {'queue': 'default'}
app.conf.task_routes = (route_task,)
undefined

Worker Queue Binding

Worker队列绑定

bash
undefined
bash
undefined

Worker consuming specific queues

Worker消费指定队列

celery -A myapp worker -Q emails,reports --loglevel=info
celery -A myapp worker -Q emails,reports --loglevel=info

Multiple workers for different queues

为不同队列启动多个Worker

celery -A myapp worker -Q high_priority -c 4 --loglevel=info celery -A myapp worker -Q default -c 2 --loglevel=info celery -A myapp worker -Q low_priority -c 1 --loglevel=info
undefined
celery -A myapp worker -Q high_priority -c 4 --loglevel=info celery -A myapp worker -Q default -c 2 --loglevel=info celery -A myapp worker -Q low_priority -c 1 --loglevel=info
undefined

Priority Queues

优先级队列

python
undefined
python
undefined

Configure priority support

配置优先级支持

app.conf.task_queue_max_priority = 10 app.conf.task_default_priority = 5
app.conf.task_queue_max_priority = 10 app.conf.task_default_priority = 5

Send task with priority

发送带优先级的任务

high_priority_task.apply_async(args=(), priority=9) low_priority_task.apply_async(args=(), priority=1)
high_priority_task.apply_async(args=(), priority=9) low_priority_task.apply_async(args=(), priority=1)

Priority-based routing

基于优先级的路由

app.conf.task_routes = { 'critical_task': {'queue': 'default', 'priority': 10}, 'background_task': {'queue': 'default', 'priority': 1}, }

---
app.conf.task_routes = { 'critical_task': {'queue': 'default', 'priority': 10}, 'background_task': {'queue': 'default', 'priority': 1}, }

---

Periodic Tasks

周期性任务

Celery Beat Setup

Celery Beat配置

python
undefined
python
undefined

celery_config.py

celery_config.py

from celery.schedules import crontab, solar
from celery.schedules import crontab, solar

Periodic task schedule

周期性任务调度

beat_schedule = { # Run every 30 seconds 'add-every-30-seconds': { 'task': 'myapp.tasks.add', 'schedule': 30.0, 'args': (16, 16) },
# Run every morning at 7:30 AM
'send-daily-report': {
    'task': 'myapp.tasks.send_daily_report',
    'schedule': crontab(hour=7, minute=30),
},

# Run every Monday morning
'weekly-cleanup': {
    'task': 'myapp.tasks.cleanup',
    'schedule': crontab(hour=0, minute=0, day_of_week=1),
},

# Run on specific days
'monthly-report': {
    'task': 'myapp.tasks.monthly_report',
    'schedule': crontab(hour=0, minute=0, day_of_month='1'),
    'kwargs': {'month_offset': 1}
},

# Solar schedule (sunrise/sunset)
'wake-up-at-sunrise': {
    'task': 'myapp.tasks.morning_routine',
    'schedule': solar('sunrise', -37.81, 144.96),  # Melbourne
},
}
app.conf.beat_schedule = beat_schedule
undefined
beat_schedule = { # 每30秒运行一次 'add-every-30-seconds': { 'task': 'myapp.tasks.add', 'schedule': 30.0, 'args': (16, 16) },
# 每天早上7:30运行
'send-daily-report': {
    'task': 'myapp.tasks.send_daily_report',
    'schedule': crontab(hour=7, minute=30),
},

# 每周一凌晨运行
'weekly-cleanup': {
    'task': 'myapp.tasks.cleanup',
    'schedule': crontab(hour=0, minute=0, day_of_week=1),
},

# 每月1号运行
'monthly-report': {
    'task': 'myapp.tasks.monthly_report',
    'schedule': crontab(hour=0, minute=0, day_of_month='1'),
    'kwargs': {'month_offset': 1}
},

# 太阳时调度(日出/日落)
'wake-up-at-sunrise': {
    'task': 'myapp.tasks.morning_routine',
    'schedule': solar('sunrise', -37.81, 144.96),  # 墨尔本坐标
},
}
app.conf.beat_schedule = beat_schedule
undefined

Crontab Patterns

Crontab模式

python
from celery.schedules import crontab
python
from celery.schedules import crontab

Every minute

每分钟运行

crontab()
crontab()

Every 15 minutes

每15分钟运行

crontab(minute='*/15')
crontab(minute='*/15')

Every hour at :30

每小时30分运行

crontab(minute=30)
crontab(minute=30)

Every day at midnight

每天午夜运行

crontab(hour=0, minute=0)
crontab(hour=0, minute=0)

Every weekday at 5 PM

工作日下午5点运行

crontab(hour=17, minute=0, day_of_week='1-5')
crontab(hour=17, minute=0, day_of_week='1-5')

Every Monday, Wednesday, Friday at noon

每周一、三、五中午运行

crontab(hour=12, minute=0, day_of_week='mon,wed,fri')
crontab(hour=12, minute=0, day_of_week='mon,wed,fri')

First day of month

每月1号运行

crontab(hour=0, minute=0, day_of_month='1')
crontab(hour=0, minute=0, day_of_month='1')

Last day of month (use day_of_month='28-31' with logic in task)

每月最后一天运行(需在任务中添加逻辑判断)

crontab(hour=0, minute=0, day_of_month='28-31')
crontab(hour=0, minute=0, day_of_month='28-31')

Quarterly (every 3 months)

每季度运行(每3个月)

crontab(hour=0, minute=0, day_of_month='1', month_of_year='*/3')
undefined
crontab(hour=0, minute=0, day_of_month='1', month_of_year='*/3')
undefined

Running Beat Scheduler

启动Beat调度器

bash
undefined
bash
undefined

Start beat scheduler

启动Beat调度器

celery -A myapp beat --loglevel=info
celery -A myapp beat --loglevel=info

Beat with custom scheduler

使用自定义调度器

celery -A myapp beat --scheduler django_celery_beat.schedulers:DatabaseScheduler
celery -A myapp beat --scheduler django_celery_beat.schedulers:DatabaseScheduler

Combine worker and beat (development only)

同时启动Worker和Beat(仅开发环境)

celery -A myapp worker --beat --loglevel=info
undefined
celery -A myapp worker --beat --loglevel=info
undefined

Dynamic Schedules (django-celery-beat)

动态调度(django-celery-beat)

bash
pip install django-celery-beat
python
undefined
bash
pip install django-celery-beat
python
undefined

settings.py (Django)

settings.py(Django)

INSTALLED_APPS = [ 'django_celery_beat', ]
INSTALLED_APPS = [ 'django_celery_beat', ]

Migrate database

迁移数据库

python manage.py migrate django_celery_beat
python manage.py migrate django_celery_beat

Run beat with database scheduler

使用数据库调度器启动Beat

celery -A myapp beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

```python
celery -A myapp beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

```python

Create periodic task via Django admin or ORM

通过Django admin或ORM创建周期性任务

from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule import json
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule import json

Create interval schedule (every 10 seconds)

创建间隔调度(每10秒)

schedule, created = IntervalSchedule.objects.get_or_create( every=10, period=IntervalSchedule.SECONDS, )
PeriodicTask.objects.create( interval=schedule, name='Import feed every 10 seconds', task='myapp.tasks.import_feed', args=json.dumps(['https://example.com/feed']), )
schedule, created = IntervalSchedule.objects.get_or_create( every=10, period=IntervalSchedule.SECONDS, )
PeriodicTask.objects.create( interval=schedule, name='每10秒导入Feed', task='myapp.tasks.import_feed', args=json.dumps(['https://example.com/feed']), )

Create crontab schedule

创建Crontab调度

schedule, created = CrontabSchedule.objects.get_or_create( minute='0', hour='/4', # Every 4 hours day_of_week='', day_of_month='', month_of_year='', )
PeriodicTask.objects.create( crontab=schedule, name='Hourly cleanup', task='myapp.tasks.cleanup', )

---
schedule, created = CrontabSchedule.objects.get_or_create( minute='0', hour='/4', # 每4小时 day_of_week='', day_of_month='', month_of_year='', )
PeriodicTask.objects.create( crontab=schedule, name='每4小时清理', task='myapp.tasks.cleanup', )

---

Workflows (Canvas)

工作流(Canvas)

Chains

链式任务

python
from celery import chain
python
from celery import chain

Sequential execution

顺序执行

result = chain(add.s(2, 2), add.s(4), add.s(8))()
result = chain(add.s(2, 2), add.s(4), add.s(8))()

Equivalent to: add(add(add(2, 2), 4), 8)

等价于:add(add(add(2, 2), 4), 8)

Result: 16

结果:16

Shorthand syntax

简写语法

result = (add.s(2, 2) | add.s(4) | add.s(8))()
result = (add.s(2, 2) | add.s(4) | add.s(8))()

Chain with different tasks

不同任务组成的链式工作流

workflow = ( fetch_data.s(url) | process_data.s() | save_results.s() ) result = workflow.apply_async()
undefined
workflow = ( fetch_data.s(url) | process_data.s() | save_results.s() ) result = workflow.apply_async()
undefined

Groups

分组任务

python
from celery import group
python
from celery import group

Parallel execution

并行执行

job = group([ add.s(2, 2), add.s(4, 4), add.s(8, 8), ]) result = job.apply_async()
job = group([ add.s(2, 2), add.s(4, 4), add.s(8, 8), ]) result = job.apply_async()

Wait for all results

等待所有结果返回

results = result.get(timeout=10) # [4, 8, 16]
results = result.get(timeout=10) # [4, 8, 16]

Group with callbacks

带回调的分组任务

job = group([ process_item.s(item) for item in items ]) | summarize_results.s()
undefined
job = group([ process_item.s(item) for item in items ]) | summarize_results.s()
undefined

Chords

和弦任务

python
from celery import chord
python
from celery import chord

Group with callback

分组任务带回调

job = chord([ fetch_url.s(url) for url in urls ])(combine_results.s())
job = chord([ fetch_url.s(url) for url in urls ])(combine_results.s())

Example: Process multiple files, then merge

示例:处理多个文件后合并

workflow = chord([ process_file.s(file) for file in files ])(merge_results.s())
result = workflow.apply_async()
undefined
workflow = chord([ process_file.s(file) for file in files ])(merge_results.s())
result = workflow.apply_async()
undefined

Map and Starmap

Map与Starmap

python
from celery import group
python
from celery import group

Map: Apply same task to list of args

Map:对参数列表应用同一任务

results = add.map([(2, 2), (4, 4), (8, 8)])
results = add.map([(2, 2), (4, 4), (8, 8)])

Starmap: Unpack arguments

Starmap:解包参数

results = add.starmap([(2, 2), (4, 4), (8, 8)])
results = add.starmap([(2, 2), (4, 4), (8, 8)])

Equivalent to:

等价于:

results = group([add.s(2, 2), add.s(4, 4), add.s(8, 8)])()
undefined
results = group([add.s(2, 2), add.s(4, 4), add.s(8, 8)])()
undefined

Complex Workflows

复杂工作流

python
from celery import chain, group, chord
python
from celery import chain, group, chord

Parallel processing with sequential steps

并行处理+顺序步骤的工作流

workflow = chain( # Step 1: Fetch data fetch_data.s(source),
# Step 2: Process in parallel
group([
    process_chunk.s(chunk_id) for chunk_id in range(10)
]),

# Step 3: Aggregate results
aggregate.s(),

# Step 4: Save to database
save_results.s()
)
workflow = chain( # 步骤1:获取数据 fetch_data.s(source),
# 步骤2:并行处理
group([
    process_chunk.s(chunk_id) for chunk_id in range(10)
]),

# 步骤3:聚合结果
aggregate.s(),

# 步骤4:保存到数据库
save_results.s()
)

Nested chords

嵌套和弦任务

workflow = chord([ chord([ subtask.s(item) for item in chunk ])(process_chunk.s()) for chunk in chunks ])(final_callback.s())
workflow = chord([ chord([ subtask.s(item) for item in chunk ])(process_chunk.s()) for chunk in chunks ])(final_callback.s())

Real-world example: Report generation

实际场景:报表生成

generate_report = chain( fetch_user_data.s(user_id), chord([ calculate_stats.s(), fetch_transactions.s(), fetch_activity.s(), ])(combine_sections.s()), render_pdf.s(), send_email.s(user_email) )

---
generate_report = chain( fetch_user_data.s(user_id), chord([ calculate_stats.s(), fetch_transactions.s(), fetch_activity.s(), ])(combine_sections.s()), render_pdf.s(), send_email.s(user_email) )

---

Error Handling

错误处理

Automatic Retries

自动重试

python
@app.task(
    autoretry_for=(RequestException, IOError),  # Auto-retry these exceptions
    retry_kwargs={'max_retries': 5},             # Max 5 retries
    retry_backoff=True,                          # Exponential backoff
    retry_backoff_max=600,                       # Max 10 minutes backoff
    retry_jitter=True,                           # Add randomness to backoff
)
def fetch_url(url):
    response = requests.get(url)
    response.raise_for_status()
    return response.json()
python
@app.task(
    autoretry_for=(RequestException, IOError),  # 对这些异常自动重试
    retry_kwargs={'max_retries': 5},             # 最多重试5次
    retry_backoff=True,                          # 指数退避
    retry_backoff_max=600,                       # 最大退避时间10分钟
    retry_jitter=True,                           # 退避时间添加随机值
)
def fetch_url(url):
    response = requests.get(url)
    response.raise_for_status()
    return response.json()

Manual Retries

手动重试

python
@app.task(bind=True, max_retries=3)
def process_data(self, data):
    try:
        result = external_api_call(data)
        return result
    except TemporaryError as exc:
        # Retry after 60 seconds
        raise self.retry(exc=exc, countdown=60)
    except PermanentError as exc:
        # Don't retry, log and fail
        logger.error(f"Permanent error: {exc}")
        raise
    except Exception as exc:
        # Exponential backoff
        raise self.retry(
            exc=exc,
            countdown=2 ** self.request.retries,
            max_retries=3
        )
python
@app.task(bind=True, max_retries=3)
def process_data(self, data):
    try:
        result = external_api_call(data)
        return result
    except TemporaryError as exc:
        # 60秒后重试
        raise self.retry(exc=exc, countdown=60)
    except PermanentError as exc:
        # 不重试,记录日志并失败
        logger.error(f"永久错误:{exc}")
        raise
    except Exception as exc:
        # 指数退避重试
        raise self.retry(
            exc=exc,
            countdown=2 ** self.request.retries,
            max_retries=3
        )

Error Callbacks

错误回调

python
@app.task
def on_error(request, exc, traceback):
    """Called when task fails"""
    logger.error(f"Task {request.id} failed: {exc}")
    send_alert(f"Task failure: {request.task}", str(exc))

@app.task
def risky_task(data):
    return process(data)
python
@app.task
def on_error(request, exc, traceback):
    """任务失败时调用"""
    logger.error(f"任务{request.id}失败:{exc}")
    send_alert(f"任务失败:{request.task}", str(exc))

@app.task
def risky_task(data):
    return process(data)

Link error callback

绑定错误回调

risky_task.apply_async( args=(data,), link_error=on_error.s() )
undefined
risky_task.apply_async( args=(data,), link_error=on_error.s() )
undefined

Task Failure Handling

任务失败处理

python
from celery import Task

class CallbackTask(Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """Handle task failure"""
        logger.error(f"Task {task_id} failed with {exc}")
        # Send notification
        send_notification('Task Failed', str(exc))

    def on_success(self, retval, task_id, args, kwargs):
        """Handle task success"""
        logger.info(f"Task {task_id} succeeded: {retval}")

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        """Handle task retry"""
        logger.warning(f"Task {task_id} retrying: {exc}")

@app.task(base=CallbackTask)
def monitored_task(x):
    if x < 0:
        raise ValueError("Negative value")
    return x * 2
python
from celery import Task

class CallbackTask(Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """处理任务失败"""
        logger.error(f"任务{task_id}失败:{exc}")
        # 发送通知
        send_notification('任务失败', str(exc))

    def on_success(self, retval, task_id, args, kwargs):
        """处理任务成功"""
        logger.info(f"任务{task_id}执行成功:{retval}")

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        """处理任务重试"""
        logger.warning(f"任务{task_id}将重试:{exc}")

@app.task(base=CallbackTask)
def monitored_task(x):
    if x < 0:
        raise ValueError("负数无效")
    return x * 2

Exception Handling Patterns

异常处理模式

python
@app.task(bind=True)
def robust_task(self, data):
    # Categorize exceptions
    try:
        return process(data)

    except NetworkError as exc:
        # Transient error - retry
        raise self.retry(exc=exc, countdown=60, max_retries=5)

    except ValidationError as exc:
        # Permanent error - don't retry
        logger.error(f"Invalid data: {exc}")
        return {'status': 'failed', 'error': str(exc)}

    except DatabaseError as exc:
        # Critical error - retry with exponential backoff
        backoff = min(2 ** self.request.retries * 60, 3600)
        raise self.retry(exc=exc, countdown=backoff, max_retries=10)

    except Exception as exc:
        # Unknown error - retry limited times
        if self.request.retries < 3:
            raise self.retry(exc=exc, countdown=120)
        else:
            # Max retries exceeded - fail and alert
            logger.critical(f"Task failed after retries: {exc}")
            send_alert('Critical Task Failure', str(exc))
            raise

python
@app.task(bind=True)
def robust_task(self, data):
    # 分类处理异常
    try:
        return process(data)

    except NetworkError as exc:
        # 临时错误 - 重试
        raise self.retry(exc=exc, countdown=60, max_retries=5)

    except ValidationError as exc:
        # 永久错误 - 不重试
        logger.error(f"数据无效:{exc}")
        return {'status': 'failed', 'error': str(exc)}

    except DatabaseError as exc:
        # 严重错误 - 指数退避重试
        backoff = min(2 ** self.request.retries * 60, 3600)
        raise self.retry(exc=exc, countdown=backoff, max_retries=10)

    except Exception as exc:
        # 未知错误 - 限制重试次数
        if self.request.retries < 3:
            raise self.retry(exc=exc, countdown=120)
        else:
            # 超过最大重试次数 - 失败并告警
            logger.critical(f"任务重试后仍失败:{exc}")
            send_alert('严重任务失败', str(exc))
            raise

Monitoring and Management

监控与管理

Task Events

任务事件

python
undefined
python
undefined

Enable events

启用事件

app.conf.worker_send_task_events = True app.conf.task_send_sent_event = True
app.conf.worker_send_task_events = True app.conf.task_send_sent_event = True

Event listeners

事件监听器

from celery import signals
@signals.task_prerun.connect def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **extra): print(f"Task {task.name}[{task_id}] starting")
@signals.task_postrun.connect def task_postrun_handler(sender=None, task_id=None, task=None, retval=None, **extra): print(f"Task {task.name}[{task_id}] completed: {retval}")
@signals.task_failure.connect def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, **extra): print(f"Task {task_id} failed: {exception}")
@signals.task_retry.connect def task_retry_handler(sender=None, task_id=None, reason=None, **extra): print(f"Task {task_id} retrying: {reason}")
undefined
from celery import signals
@signals.task_prerun.connect def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **extra): print(f"任务{task.name}[{task_id}]开始执行")
@signals.task_postrun.connect def task_postrun_handler(sender=None, task_id=None, task=None, retval=None, **extra): print(f"任务{task.name}[{task_id}]执行完成:{retval}")
@signals.task_failure.connect def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, **extra): print(f"任务{task_id}失败:{exception}")
@signals.task_retry.connect def task_retry_handler(sender=None, task_id=None, reason=None, **extra): print(f"任务{task_id}将重试:{reason}")
undefined

Flower Monitoring

Flower监控

bash
undefined
bash
undefined

Install Flower

安装Flower

pip install flower
pip install flower

Start Flower

启动Flower

celery -A myapp flower --port=5555
celery -A myapp flower --port=5555

Access dashboard

访问仪表盘


```python

```python

Flower configuration

Flower配置

flower_basic_auth = ['admin:password'] flower_persistent = True flower_db = 'flower.db' flower_max_tasks = 10000
undefined
flower_basic_auth = ['admin:password'] flower_persistent = True flower_db = 'flower.db' flower_max_tasks = 10000
undefined

Inspecting Workers

检查Worker状态

python
from celery_app import app
python
from celery_app import app

Get active tasks

获取活跃任务

i = app.control.inspect() print(i.active())
i = app.control.inspect() print(i.active())

Get scheduled tasks

获取调度中的任务

print(i.scheduled())
print(i.scheduled())

Get reserved tasks

获取保留的任务

print(i.reserved())
print(i.reserved())

Get worker stats

获取Worker统计信息

print(i.stats())
print(i.stats())

Get registered tasks

获取已注册任务

print(i.registered())
print(i.registered())

Revoke task

撤销任务

app.control.revoke(task_id, terminate=True)
app.control.revoke(task_id, terminate=True)

Shutdown worker

关闭Worker

app.control.shutdown()
app.control.shutdown()

Pool restart

重启Worker池

app.control.pool_restart()
app.control.pool_restart()

Rate limit

设置速率限制

app.control.rate_limit('myapp.tasks.slow_task', '10/m')
undefined
app.control.rate_limit('myapp.tasks.slow_task', '10/m')
undefined

Command Line Inspection

命令行检查

bash
undefined
bash
undefined

List active tasks

列出活跃任务

celery -A myapp inspect active
celery -A myapp inspect active

List scheduled tasks

列出调度中的任务

celery -A myapp inspect scheduled
celery -A myapp inspect scheduled

Worker stats

Worker统计信息

celery -A myapp inspect stats
celery -A myapp inspect stats

Registered tasks

已注册任务

celery -A myapp inspect registered
celery -A myapp inspect registered

Revoke task

撤销任务

celery -A myapp control revoke <task_id>
celery -A myapp control revoke <task_id>

Shutdown workers

关闭Worker

celery -A myapp control shutdown
celery -A myapp control shutdown

Purge all tasks

清空所有任务

celery -A myapp purge
undefined
celery -A myapp purge
undefined

Custom Metrics

自定义指标

python
@app.task(bind=True)
def tracked_task(self, data):
    from prometheus_client import Counter, Histogram

    task_counter = Counter('celery_tasks_total', 'Total tasks')
    task_duration = Histogram('celery_task_duration_seconds', 'Task duration')

    with task_duration.time():
        result = process(data)
        task_counter.inc()
        return result

python
@app.task(bind=True)
def tracked_task(self, data):
    from prometheus_client import Counter, Histogram

    task_counter = Counter('celery_tasks_total', '总任务数')
    task_duration = Histogram('celery_task_duration_seconds', '任务耗时')

    with task_duration.time():
        result = process(data)
        task_counter.inc()
        return result

Framework Integration

框架集成

Django Integration

Django集成

python
undefined
python
undefined

myproject/celery.py

myproject/celery.py

import os from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject') app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks()
import os from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject') app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks()

myproject/init.py

myproject/init.py

from .celery import app as celery_app all = ('celery_app',)
from .celery import app as celery_app all = ('celery_app',)

settings.py

settings.py

CELERY_BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULT_BACKEND = 'redis://localhost:6379/1' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TIMEZONE = 'UTC' CELERY_ENABLE_UTC = True
CELERY_BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULT_BACKEND = 'redis://localhost:6379/1' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TIMEZONE = 'UTC' CELERY_ENABLE_UTC = True

Task in Django app

Django应用中的任务

myapp/tasks.py

myapp/tasks.py

from celery import shared_task from django.core.mail import send_mail
@shared_task def send_email_task(subject, message, recipient): send_mail(subject, message, 'from@example.com', [recipient]) return f"Email sent to {recipient}"
from celery import shared_task from django.core.mail import send_mail
@shared_task def send_email_task(subject, message, recipient): send_mail(subject, message, 'from@example.com', [recipient]) return f"邮件已发送至{recipient}"

Use in views

在视图中使用

from myapp.tasks import send_email_task
def my_view(request): send_email_task.delay('Hello', 'Welcome!', 'user@example.com') return HttpResponse('Email queued')
undefined
from myapp.tasks import send_email_task
def my_view(request): send_email_task.delay('Hello', 'Welcome!', 'user@example.com') return HttpResponse('邮件已加入队列')
undefined

FastAPI Integration

FastAPI集成

python
undefined
python
undefined

celery_app.py

celery_app.py

from celery import Celery
celery_app = Celery( 'fastapi_app', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1' )
@celery_app.task def process_data(data: dict): # Long-running task import time time.sleep(10) return {"processed": data, "status": "complete"}
from celery import Celery
celery_app = Celery( 'fastapi_app', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1' )
@celery_app.task def process_data(data: dict): # 耗时任务 import time time.sleep(10) return {"processed": data, "status": "complete"}

main.py

main.py

from fastapi import FastAPI, BackgroundTasks from celery_app import process_data
app = FastAPI()
@app.post("/process") async def process_endpoint(data: dict): # Option 1: FastAPI BackgroundTasks (simple, in-process) # background_tasks.add_task(process_data, data)
# Option 2: Celery (distributed, persistent)
task = process_data.delay(data)
return {"task_id": task.id, "status": "queued"}
@app.get("/status/{task_id}") async def check_status(task_id: str): from celery.result import AsyncResult task = AsyncResult(task_id, app=celery_app)
return {
    "task_id": task_id,
    "status": task.status,
    "result": task.result if task.ready() else None
}

**When to Use Celery vs FastAPI BackgroundTasks**:
- **FastAPI BackgroundTasks**: Simple, fire-and-forget tasks (logging, cleanup)
- **Celery**: Distributed processing, retries, scheduling, task results
from fastapi import FastAPI, BackgroundTasks from celery_app import process_data
app = FastAPI()
@app.post("/process") async def process_endpoint(data: dict): # 选项1:FastAPI BackgroundTasks(简单,进程内) # background_tasks.add_task(process_data, data)
# 选项2:Celery(分布式,持久化)
task = process_data.delay(data)
return {"task_id": task.id, "status": "已加入队列"}
@app.get("/status/{task_id}") async def check_status(task_id: str): from celery.result import AsyncResult task = AsyncResult(task_id, app=celery_app)
return {
    "task_id": task_id,
    "status": task.status,
    "result": task.result if task.ready() else None
}

**Celery vs FastAPI BackgroundTasks 选择建议**:
- **FastAPI BackgroundTasks**:简单的即发即弃任务(日志、清理)
- **Celery**:分布式处理、重试、调度、任务结果存储

Flask Integration

Flask集成

python
undefined
python
undefined

celery_app.py

celery_app.py

from celery import Celery
def make_celery(app): celery = Celery( app.import_name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'] ) celery.conf.update(app.config)
class ContextTask(celery.Task):
    def __call__(self, *args, **kwargs):
        with app.app_context():
            return self.run(*args, **kwargs)

celery.Task = ContextTask
return celery
from celery import Celery
def make_celery(app): celery = Celery( app.import_name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'] ) celery.conf.update(app.config)
class ContextTask(celery.Task):
    def __call__(self, *args, **kwargs):
        with app.app_context():
            return self.run(*args, **kwargs)

celery.Task = ContextTask
return celery

app.py

app.py

from flask import Flask from celery_app import make_celery
app = Flask(name) app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0' app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/1'
celery = make_celery(app)
@celery.task def send_email(to, subject, body): with app.app_context(): # Use Flask-Mail or similar mail.send(Message(subject, recipients=[to], body=body))
@app.route('/send') def send_route(): send_email.delay('user@example.com', 'Hello', 'Welcome!') return 'Email queued'

---
from flask import Flask from celery_app import make_celery
app = Flask(name) app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0' app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/1'
celery = make_celery(app)
@celery.task def send_email(to, subject, body): with app.app_context(): # 使用Flask-Mail或类似库 mail.send(Message(subject, recipients=[to], body=body))
@app.route('/send') def send_route(): send_email.delay('user@example.com', 'Hello', 'Welcome!') return '邮件已加入队列'

---

Testing Strategies

测试策略

Eager Mode (Synchronous Execution)

急切模式(同步执行)

python
undefined
python
undefined

conftest.py (pytest)

conftest.py(pytest)

import pytest from celery_app import app
@pytest.fixture(scope='session') def celery_config(): return { 'broker_url': 'memory://', 'result_backend': 'cache+memory://', 'task_always_eager': True, # Execute tasks synchronously 'task_eager_propagates': True, # Propagate exceptions }
import pytest from celery_app import app
@pytest.fixture(scope='session') def celery_config(): return { 'broker_url': 'memory://', 'result_backend': 'cache+memory://', 'task_always_eager': True, # 同步执行任务 'task_eager_propagates': True, # 传播异常 }

Test tasks

测试任务

def test_add_task(): result = add.delay(4, 6) assert result.get() == 10
def test_task_failure(): with pytest.raises(ValueError): failing_task.delay()
undefined
def test_add_task(): result = add.delay(4, 6) assert result.get() == 10
def test_task_failure(): with pytest.raises(ValueError): failing_task.delay()
undefined

Testing with Real Broker

使用真实代理测试

python
undefined
python
undefined

conftest.py

conftest.py

import pytest from celery_app import app
@pytest.fixture(scope='session') def celery_config(): return { 'broker_url': 'redis://localhost:6379/15', # Test database 'result_backend': 'redis://localhost:6379/15', }
@pytest.fixture def celery_worker(celery_app): """Start worker for tests""" with celery_app.Worker() as worker: yield worker
def test_async_task(celery_worker): result = async_task.delay(data) assert result.get(timeout=10) == expected
undefined
import pytest from celery_app import app
@pytest.fixture(scope='session') def celery_config(): return { 'broker_url': 'redis://localhost:6379/15', # 测试数据库 'result_backend': 'redis://localhost:6379/15', }
@pytest.fixture def celery_worker(celery_app): """启动测试用Worker""" with celery_app.Worker() as worker: yield worker
def test_async_task(celery_worker): result = async_task.delay(data) assert result.get(timeout=10) == expected
undefined

Mocking External Dependencies

模拟外部依赖

python
from unittest.mock import patch, MagicMock

@app.task
def fetch_and_process(url):
    response = requests.get(url)
    return process(response.json())

def test_fetch_and_process():
    with patch('requests.get') as mock_get:
        mock_get.return_value.json.return_value = {'data': 'test'}

        result = fetch_and_process.delay('http://example.com')
        assert result.get() == expected_result
        mock_get.assert_called_once_with('http://example.com')
python
from unittest.mock import patch, MagicMock

@app.task
def fetch_and_process(url):
    response = requests.get(url)
    return process(response.json())

def test_fetch_and_process():
    with patch('requests.get') as mock_get:
        mock_get.return_value.json.return_value = {'data': 'test'}

        result = fetch_and_process.delay('http://example.com')
        assert result.get() == expected_result
        mock_get.assert_called_once_with('http://example.com')

Testing Periodic Tasks

测试周期性任务

python
from celery.schedules import crontab

def test_periodic_task_schedule():
    from celery_app import app

    schedule = app.conf.beat_schedule['daily-report']
    assert schedule['task'] == 'myapp.tasks.daily_report'
    assert schedule['schedule'] == crontab(hour=0, minute=0)

def test_periodic_task_execution():
    # Test task logic directly
    result = daily_report()
    assert result['status'] == 'complete'
python
from celery.schedules import crontab

def test_periodic_task_schedule():
    from celery_app import app

    schedule = app.conf.beat_schedule['daily-report']
    assert schedule['task'] == 'myapp.tasks.daily_report'
    assert schedule['schedule'] == crontab(hour=0, minute=0)

def test_periodic_task_execution():
    # 直接测试任务逻辑
    result = daily_report()
    assert result['status'] == 'complete'

Integration Testing

集成测试

python
import pytest
from celery_app import app

@pytest.fixture(scope='module')
def celery_app():
    app.conf.update(
        broker_url='redis://localhost:6379/15',
        result_backend='redis://localhost:6379/15',
    )
    return app

@pytest.fixture(scope='module')
def celery_worker(celery_app):
    with celery_app.Worker() as worker:
        yield worker

def test_workflow(celery_worker):
    from celery import chain

    workflow = chain(
        fetch_data.s(url),
        process_data.s(),
        save_results.s()
    )

    result = workflow.apply_async()
    output = result.get(timeout=30)

    assert output['status'] == 'saved'

python
import pytest
from celery_app import app

@pytest.fixture(scope='module')
def celery_app():
    app.conf.update(
        broker_url='redis://localhost:6379/15',
        result_backend='redis://localhost:6379/15',
    )
    return app

@pytest.fixture(scope='module')
def celery_worker(celery_app):
    with celery_app.Worker() as worker:
        yield worker

def test_workflow(celery_worker):
    from celery import chain

    workflow = chain(
        fetch_data.s(url),
        process_data.s(),
        save_results.s()
    )

    result = workflow.apply_async()
    output = result.get(timeout=30)

    assert output['status'] == 'saved'

Production Patterns

生产环境模式

Worker Configuration

Worker配置

bash
undefined
bash
undefined

Production worker with autoscaling

生产环境Worker,支持自动扩缩容

celery -A myapp worker
--autoscale=10,3
--max-tasks-per-child=1000
--time-limit=300
--soft-time-limit=240
--loglevel=info
--logfile=/var/log/celery/worker.log
--pidfile=/var/run/celery/worker.pid
celery -A myapp worker
--autoscale=10,3
--max-tasks-per-child=1000
--time-limit=300
--soft-time-limit=240
--loglevel=info
--logfile=/var/log/celery/worker.log
--pidfile=/var/run/celery/worker.pid

Multiple specialized workers

多个专用Worker

celery multi start
worker1 -A myapp -Q high_priority -c 4 --max-tasks-per-child=100
worker2 -A myapp -Q default -c 2 --max-tasks-per-child=1000
worker3 -A myapp -Q low_priority -c 1 --autoscale=3,1
celery multi start
worker1 -A myapp -Q high_priority -c 4 --max-tasks-per-child=100
worker2 -A myapp -Q default -c 2 --max-tasks-per-child=1000
worker3 -A myapp -Q low_priority -c 1 --autoscale=3,1

Graceful shutdown

优雅关闭

celery multi stop worker1 worker2 worker3 celery multi stopwait worker1 worker2 worker3 # Wait for tasks to finish
undefined
celery multi stop worker1 worker2 worker3 celery multi stopwait worker1 worker2 worker3 # 等待任务完成后关闭
undefined

Configuration Best Practices

配置最佳实践

python
undefined
python
undefined

production_config.py

production_config.py

import os
import os

Broker settings

代理设置

broker_url = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0') broker_connection_retry_on_startup = True broker_pool_limit = 50
broker_url = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0') broker_connection_retry_on_startup = True broker_pool_limit = 50

Result backend

结果后端

result_backend = os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1') result_expires = 3600 # 1 hour
result_backend = os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1') result_expires = 3600 # 1小时

Serialization

序列化

task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] timezone = 'UTC' enable_utc = True
task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] timezone = 'UTC' enable_utc = True

Performance

性能优化

worker_prefetch_multiplier = 4 # Tasks to prefetch per worker worker_max_tasks_per_child = 1000 # Restart worker after N tasks (prevent memory leaks) task_acks_late = True # Acknowledge after task completes task_reject_on_worker_lost = True # Requeue if worker dies
worker_prefetch_multiplier = 4 # 每个Worker预取的任务数 worker_max_tasks_per_child = 1000 # 执行N个任务后重启Worker(防止内存泄漏) task_acks_late = True # 任务执行完成后再确认 task_reject_on_worker_lost = True # Worker崩溃时重新入队

Reliability

可靠性

task_track_started = True # Track when task starts task_time_limit = 300 # 5 minutes hard limit task_soft_time_limit = 240 # 4 minutes soft limit
task_track_started = True # 跟踪任务开始时间 task_time_limit = 300 # 5分钟硬限制 task_soft_time_limit = 240 # 4分钟软限制

Logging

日志

worker_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s' worker_task_log_format = '[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s'
undefined
worker_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s' worker_task_log_format = '[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s'
undefined

Systemd Service

Systemd服务配置

ini
undefined
ini
undefined

/etc/systemd/system/celery.service

/etc/systemd/system/celery.service

[Unit] Description=Celery Service After=network.target redis.target
[Service] Type=forking User=celery Group=celery WorkingDirectory=/opt/myapp Environment="PATH=/opt/myapp/venv/bin" ExecStart=/opt/myapp/venv/bin/celery multi start worker1
-A myapp
--pidfile=/var/run/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
--loglevel=INFO ExecStop=/opt/myapp/venv/bin/celery multi stopwait worker1
--pidfile=/var/run/celery/%n.pid ExecReload=/opt/myapp/venv/bin/celery multi restart worker1
-A myapp
--pidfile=/var/run/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
--loglevel=INFO Restart=always
[Install] WantedBy=multi-user.target
[Unit] Description=Celery服务 After=network.target redis.target
[Service] Type=forking User=celery Group=celery WorkingDirectory=/opt/myapp Environment="PATH=/opt/myapp/venv/bin" ExecStart=/opt/myapp/venv/bin/celery multi start worker1
-A myapp
--pidfile=/var/run/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
--loglevel=INFO ExecStop=/opt/myapp/venv/bin/celery multi stopwait worker1
--pidfile=/var/run/celery/%n.pid ExecReload=/opt/myapp/venv/bin/celery multi restart worker1
-A myapp
--pidfile=/var/run/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
--loglevel=INFO Restart=always
[Install] WantedBy=multi-user.target

/etc/systemd/system/celerybeat.service

/etc/systemd/system/celerybeat.service

[Unit] Description=Celery Beat Service After=network.target redis.target
[Service] Type=simple User=celery Group=celery WorkingDirectory=/opt/myapp Environment="PATH=/opt/myapp/venv/bin" ExecStart=/opt/myapp/venv/bin/celery -A myapp beat
--loglevel=INFO
--pidfile=/var/run/celery/beat.pid Restart=always
[Install] WantedBy=multi-user.target
undefined
[Unit] Description=Celery Beat服务 After=network.target redis.target
[Service] Type=simple User=celery Group=celery WorkingDirectory=/opt/myapp Environment="PATH=/opt/myapp/venv/bin" ExecStart=/opt/myapp/venv/bin/celery -A myapp beat
--loglevel=INFO
--pidfile=/var/run/celery/beat.pid Restart=always
[Install] WantedBy=multi-user.target
undefined

Sentry Integration

Sentry集成

python
undefined
python
undefined

Install

安装

pip install sentry-sdk
pip install sentry-sdk

Configuration

配置

import sentry_sdk from sentry_sdk.integrations.celery import CeleryIntegration
sentry_sdk.init( dsn="https://your-sentry-dsn", integrations=[CeleryIntegration()], traces_sample_rate=0.1, # 10% of transactions )
import sentry_sdk from sentry_sdk.integrations.celery import CeleryIntegration
sentry_sdk.init( dsn="https://your-sentry-dsn", integrations=[CeleryIntegration()], traces_sample_rate=0.1, # 10%的事务采样 )

Tasks are automatically tracked

任务将自动被跟踪

@app.task def my_task(x): # Exceptions automatically sent to Sentry return risky_operation(x)
undefined
@app.task def my_task(x): # 异常将自动发送到Sentry return risky_operation(x)
undefined

Rate Limiting

速率限制

python
undefined
python
undefined

Global rate limiting

全局速率限制

app.conf.task_default_rate_limit = '100/m' # 100 tasks per minute
app.conf.task_default_rate_limit = '100/m' # 每分钟100个任务

Per-task rate limiting

单任务速率限制

@app.task(rate_limit='10/m') def rate_limited_task(x): return expensive_operation(x)
@app.task(rate_limit='10/m') def rate_limited_task(x): return expensive_operation(x)

Dynamic rate limiting

动态速率限制

app.control.rate_limit('myapp.tasks.slow_task', '5/m')
app.control.rate_limit('myapp.tasks.slow_task', '5/m')

Token bucket rate limiting

令牌桶速率限制

@app.task(rate_limit='10/s') def api_call(endpoint): return requests.get(endpoint)
undefined
@app.task(rate_limit='10/s') def api_call(endpoint): return requests.get(endpoint)
undefined

Health Checks

健康检查

python
undefined
python
undefined

health.py

health.py

from celery_app import app
def check_celery_health(): """Health check endpoint""" try: # Ping workers i = app.control.inspect() stats = i.stats()
    if not stats:
        return {'status': 'unhealthy', 'reason': 'No workers available'}

    # Check broker connection
    result = app.control.ping(timeout=1.0)
    if not result:
        return {'status': 'unhealthy', 'reason': 'Workers not responding'}

    return {'status': 'healthy', 'workers': len(stats)}
except Exception as e:
    return {'status': 'unhealthy', 'error': str(e)}
from celery_app import app
def check_celery_health(): """健康检查端点""" try: # Ping Worker i = app.control.inspect() stats = i.stats()
    if not stats:
        return {'status': 'unhealthy', 'reason': '无可用Worker'}

    # 检查代理连接
    result = app.control.ping(timeout=1.0)
    if not result:
        return {'status': 'unhealthy', 'reason': 'Worker无响应'}

    return {'status': 'healthy', 'workers': len(stats)}
except Exception as e:
    return {'status': 'unhealthy', 'error': str(e)}

FastAPI health endpoint

FastAPI健康检查端点

@app.get("/health/celery") async def celery_health(): return check_celery_health()

---
@app.get("/health/celery") async def celery_health(): return check_celery_health()

---

Performance Optimization

性能优化

Task Optimization

任务优化

python
undefined
python
undefined

Use ignore_result for fire-and-forget tasks

即发即弃任务使用ignore_result

@app.task(ignore_result=True) def send_notification(user_id, message): # Don't need result, save backend overhead notify(user_id, message)
@app.task(ignore_result=True) def send_notification(user_id, message): # 无需存储结果,节省后端开销 notify(user_id, message)

Compression for large payloads

大负载使用压缩

@app.task(compression='gzip') def process_large_data(data): return analyze(data)
@app.task(compression='gzip') def process_large_data(data): return analyze(data)

Serialization choice

选择高效序列化器

@app.task(serializer='msgpack') # Faster than JSON def fast_task(data): return process(data)
undefined
@app.task(serializer='msgpack') # 比JSON更快 def fast_task(data): return process(data)
undefined

Worker Tuning

Worker调优

python
undefined
python
undefined

Worker concurrency

Worker并发数

worker_concurrency = 4 # CPU-bound: num_cores worker_concurrency = 20 # I/O-bound: higher value
worker_concurrency = 4 # CPU密集型:等于核心数 worker_concurrency = 20 # I/O密集型:更高的值

Prefetch multiplier (how many tasks to prefetch)

预取乘数(每个Worker预取的任务数)

worker_prefetch_multiplier = 4 # Balance: 4x concurrency
worker_prefetch_multiplier = 4 # 平衡值:并发数的4倍

Task acknowledgment

任务确认时机

task_acks_late = True # Acknowledge after completion (reliability) task_acks_late = False # Acknowledge on receipt (performance)
task_acks_late = True # 执行完成后确认(可靠性优先) task_acks_late = False # 接收后确认(性能优先)

Memory management

内存管理

worker_max_tasks_per_child = 1000 # Restart worker after N tasks worker_max_memory_per_child = 200000 # Restart after 200MB
undefined
worker_max_tasks_per_child = 1000 # 执行N个任务后重启Worker worker_max_memory_per_child = 200000 # 内存达到200MB后重启
undefined

Database Result Backend Optimization

数据库结果后端优化

python
undefined
python
undefined

Use Redis instead of database for results

使用Redis替代数据库存储结果

result_backend = 'redis://localhost:6379/1'
result_backend = 'redis://localhost:6379/1'

If using database, optimize

如果使用数据库,进行优化

result_backend = 'db+postgresql://user:pass@localhost/celery' database_engine_options = { 'pool_size': 20, 'pool_recycle': 3600, }
result_backend = 'db+postgresql://user:pass@localhost/celery' database_engine_options = { 'pool_size': 20, 'pool_recycle': 3600, }

Reduce result expiry time

缩短结果过期时间

result_expires = 3600 # 1 hour instead of default 24 hours
undefined
result_expires = 3600 # 1小时,替代默认的24小时
undefined

Task Chunking

任务分块

python
from celery import group
python
from celery import group

Bad: One task per item (overhead)

不好的实践:每个任务处理一个条目(开销大)

for item in large_list: process_item.delay(item)
for item in large_list: process_item.delay(item)

Good: Chunk items

好的实践:分块处理

def chunks(lst, n): for i in range(0, len(lst), n): yield lst[i:i + n]
@app.task def process_batch(items): return [process_item(item) for item in items]
def chunks(lst, n): for i in range(0, len(lst), n): yield lst[i:i + n]
@app.task def process_batch(items): return [process_item(item) for item in items]

Process in batches of 100

按100个条目为一批处理

job = group(process_batch.s(chunk) for chunk in chunks(large_list, 100)) result = job.apply_async()
undefined
job = group(process_batch.s(chunk) for chunk in chunks(large_list, 100)) result = job.apply_async()
undefined

Connection Pooling

连接池

python
undefined
python
undefined

Redis connection pool

Redis连接池

broker_pool_limit = 50 # Max connections to broker redis_max_connections = 50
broker_pool_limit = 50 # 与代理的最大连接数 redis_max_connections = 50

Database connection pool

数据库连接池

from sqlalchemy import create_engine from sqlalchemy.pool import QueuePool
engine = create_engine( 'postgresql://user:pass@localhost/db', poolclass=QueuePool, pool_size=20, max_overflow=0, )

---
from sqlalchemy import create_engine from sqlalchemy.pool import QueuePool
engine = create_engine( 'postgresql://user:pass@localhost/db', poolclass=QueuePool, pool_size=20, max_overflow=0, )

---

Common Use Cases

常见使用场景

Email Sending

邮件发送

python
@app.task(bind=True, max_retries=3)
def send_email_task(self, to, subject, body, attachments=None):
    try:
        msg = EmailMessage(subject, body, 'from@example.com', [to])
        if attachments:
            for filename, content, mimetype in attachments:
                msg.attach(filename, content, mimetype)
        msg.send()
        return {'status': 'sent', 'to': to}
    except SMTPException as exc:
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
python
@app.task(bind=True, max_retries=3)
def send_email_task(self, to, subject, body, attachments=None):
    try:
        msg = EmailMessage(subject, body, 'from@example.com', [to])
        if attachments:
            for filename, content, mimetype in attachments:
                msg.attach(filename, content, mimetype)
        msg.send()
        return {'status': 'sent', 'to': to}
    except SMTPException as exc:
        # 指数退避重试
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

Bulk email with rate limiting

批量邮件带速率限制

@app.task(rate_limit='100/m') def send_bulk_email(recipients, subject, template): for recipient in recipients: send_email_task.delay(recipient, subject, render_template(template, recipient))
undefined
@app.task(rate_limit='100/m') def send_bulk_email(recipients, subject, template): for recipient in recipients: send_email_task.delay(recipient, subject, render_template(template, recipient))
undefined

Report Generation

报表生成

python
@app.task(bind=True, time_limit=600)
def generate_report(self, report_type, user_id, start_date, end_date):
    # Update progress
    self.update_state(state='PROGRESS', meta={'current': 0, 'total': 100})

    # Fetch data
    data = fetch_report_data(report_type, start_date, end_date)
    self.update_state(state='PROGRESS', meta={'current': 30, 'total': 100})

    # Generate PDF
    pdf = render_pdf(data)
    self.update_state(state='PROGRESS', meta={'current': 70, 'total': 100})

    # Upload to S3
    url = upload_to_s3(pdf, f'reports/{user_id}/{report_type}.pdf')
    self.update_state(state='PROGRESS', meta={'current': 90, 'total': 100})

    # Send notification
    send_email_task.delay(
        get_user_email(user_id),
        'Report Ready',
        f'Your report is ready: {url}'
    )

    return {'status': 'complete', 'url': url}
python
@app.task(bind=True, time_limit=600)
def generate_report(self, report_type, user_id, start_date, end_date):
    # 更新进度
    self.update_state(state='PROGRESS', meta={'current': 0, 'total': 100})

    # 获取数据
    data = fetch_report_data(report_type, start_date, end_date)
    self.update_state(state='PROGRESS', meta={'current': 30, 'total': 100})

    # 生成PDF
    pdf = render_pdf(data)
    self.update_state(state='PROGRESS', meta={'current': 70, 'total': 100})

    # 上传到S3
    url = upload_to_s3(pdf, f'reports/{user_id}/{report_type}.pdf')
    self.update_state(state='PROGRESS', meta={'current': 90, 'total': 100})

    # 发送通知
    send_email_task.delay(
        get_user_email(user_id),
        '报表已就绪',
        f'您的报表已就绪:{url}'
    )

    return {'status': 'complete', 'url': url}

Check progress

检查进度

from celery.result import AsyncResult
task = AsyncResult(task_id) if task.state == 'PROGRESS': print(task.info) # {'current': 30, 'total': 100}
undefined
from celery.result import AsyncResult
task = AsyncResult(task_id) if task.state == 'PROGRESS': print(task.info) # {'current': 30, 'total': 100}
undefined

Data Processing Pipeline

数据处理管道

python
from celery import chain, group

@app.task
def fetch_data(source):
    return download(source)

@app.task
def clean_data(raw_data):
    return clean(raw_data)

@app.task
def transform_data(clean_data):
    return transform(clean_data)

@app.task
def load_data(transformed_data):
    save_to_database(transformed_data)
    return {'status': 'loaded', 'rows': len(transformed_data)}
python
from celery import chain, group

@app task
def fetch_data(source):
    return download(source)

@app.task
def clean_data(raw_data):
    return clean(raw_data)

@app.task
def transform_data(clean_data):
    return transform(clean_data)

@app.task
def load_data(transformed_data):
    save_to_database(transformed_data)
    return {'status': 'loaded', 'rows': len(transformed_data)}

ETL pipeline

ETL管道

etl_pipeline = chain( fetch_data.s('https://api.example.com/data'), clean_data.s(), transform_data.s(), load_data.s() )
result = etl_pipeline.apply_async()
undefined
etl_pipeline = chain( fetch_data.s('https://api.example.com/data'), clean_data.s(), transform_data.s(), load_data.s() )
result = etl_pipeline.apply_async()
undefined

Webhook Processing

Webhook处理

python
@app.task(bind=True, autoretry_for=(RequestException,), max_retries=5)
def process_webhook(self, webhook_data):
    # Validate signature
    if not verify_signature(webhook_data):
        raise ValueError("Invalid signature")

    # Process event
    event_type = webhook_data['type']

    if event_type == 'payment.success':
        update_order_status(webhook_data['order_id'], 'paid')
        send_confirmation_email.delay(webhook_data['customer_email'])

    elif event_type == 'payment.failed':
        notify_admin.delay('Payment Failed', webhook_data)

    return {'status': 'processed', 'event': event_type}
python
@app.task(bind=True, autoretry_for=(RequestException,), max_retries=5)
def process_webhook(self, webhook_data):
    # 验证签名
    if not verify_signature(webhook_data):
        raise ValueError("无效签名")

    # 处理事件
    event_type = webhook_data['type']

    if event_type == 'payment.success':
        update_order_status(webhook_data['order_id'], 'paid')
        send_confirmation_email.delay(webhook_data['customer_email'])

    elif event_type == 'payment.failed':
        notify_admin.delay('支付失败', webhook_data)

    return {'status': 'processed', 'event': event_type}

FastAPI webhook endpoint

FastAPI Webhook端点

@app.post("/webhooks/stripe") async def stripe_webhook(request: Request): data = await request.json() process_webhook.delay(data) return {"status": "queued"}
undefined
@app.post("/webhooks/stripe") async def stripe_webhook(request: Request): data = await request.json() process_webhook.delay(data) return {"status": "已加入队列"}
undefined

Image Processing

图片处理

python
from celery import group, chord

@app.task
def resize_image(image_path, size):
    from PIL import Image
    img = Image.open(image_path)
    img.thumbnail(size)
    output_path = f"{image_path}_{size[0]}x{size[1]}.jpg"
    img.save(output_path)
    return output_path

@app.task
def upload_to_cdn(image_paths):
    urls = []
    for path in image_paths:
        url = cdn_upload(path)
        urls.append(url)
    return urls
python
from celery import group, chord

@app.task
def resize_image(image_path, size):
    from PIL import Image
    img = Image.open(image_path)
    img.thumbnail(size)
    output_path = f"{image_path}_{size[0]}x{size[1]}.jpg"
    img.save(output_path)
    return output_path

@app.task
def upload_to_cdn(image_paths):
    urls = []
    for path in image_paths:
        url = cdn_upload(path)
        urls.append(url)
    return urls

Generate multiple sizes and upload

生成多尺寸图片并上传

def process_uploaded_image(image_path): sizes = [(800, 600), (400, 300), (200, 150), (100, 100)]
workflow = chord([
    resize_image.s(image_path, size) for size in sizes
])(upload_to_cdn.s())

return workflow.apply_async()

---
def process_uploaded_image(image_path): sizes = [(800, 600), (400, 300), (200, 150), (100, 100)]
workflow = chord([
    resize_image.s(image_path, size) for size in sizes
])(upload_to_cdn.s())

return workflow.apply_async()

---

Alternatives Comparison

替代方案对比

Celery vs RQ (Redis Queue)

Celery vs RQ(Redis Queue)

RQ: Simpler Redis-only task queue
When to use RQ:
  • Simple use case (no routing, basic retries)
  • Redis-only infrastructure
  • Python 3 only
  • Smaller scale (<1000 tasks/min)
When to use Celery:
  • Complex workflows (chains, chords)
  • Multiple broker options
  • Advanced routing and priorities
  • Large scale (>1000 tasks/min)
  • Periodic tasks
python
undefined
RQ:简单的仅支持Redis的任务队列
选择RQ的场景:
  • 简单使用场景(无路由、基础重试)
  • 仅使用Redis的基础设施
  • 仅支持Python 3
  • 小规模(<1000任务/分钟)
选择Celery的场景:
  • 复杂工作流(链式、和弦任务)
  • 多代理选项
  • 高级路由与优先级
  • 大规模(>1000任务/分钟)
  • 周期性任务
python
undefined

RQ Example

RQ示例

from redis import Redis from rq import Queue
redis_conn = Redis() q = Queue(connection=redis_conn)
job = q.enqueue(my_function, arg1, arg2) result = job.result
undefined
from redis import Redis from rq import Queue
redis_conn = Redis() q = Queue(connection=redis_conn)
job = q.enqueue(my_function, arg1, arg2) result = job.result
undefined

Celery vs Huey

Celery vs Huey

Huey: Lightweight task queue with minimal dependencies
When to use Huey:
  • Small to medium projects
  • Minimal configuration
  • Redis or in-memory only
  • Simple periodic tasks
When to use Celery:
  • Enterprise-scale applications
  • Complex task dependencies
  • Multiple broker/backend options
  • Advanced monitoring needs
python
undefined
Huey:轻量级任务队列,依赖极少
选择Huey的场景:
  • 中小型项目
  • 极简配置
  • 仅支持Redis或内存队列
  • 简单周期性任务
选择Celery的场景:
  • 企业级应用
  • 复杂任务依赖
  • 多代理/后端选项
  • 高级监控需求
python
undefined

Huey Example

Huey示例

from huey import RedisHuey
huey = RedisHuey('myapp')
@huey.task() def add(a, b): return a + b
result = add(1, 2)
undefined
from huey import RedisHuey
huey = RedisHuey('myapp')
@huey.task() def add(a, b): return a + b
result = add(1, 2)
undefined

Celery vs Dramatiq

Celery vs Dramatiq

Dramatiq: Modern alternative focusing on reliability
When to use Dramatiq:
  • Reliability over features
  • Simpler API
  • Better type hints
  • RabbitMQ or Redis
When to use Celery:
  • Mature ecosystem
  • More broker options
  • Canvas workflows
  • Larger community
python
undefined
Dramatiq:现代替代方案,专注可靠性
选择Dramatiq的场景:
  • 可靠性优先于功能
  • 更简洁的API
  • 更好的类型提示
  • 仅支持RabbitMQ或Redis
选择Celery的场景:
  • 成熟的生态系统
  • 更多代理选项
  • Canvas工作流
  • 更大的社区
python
undefined

Dramatiq Example

Dramatiq示例

import dramatiq
@dramatiq.actor def add(x, y): return x + y
add.send(1, 2)
undefined
import dramatiq
@dramatiq.actor def add(x, y): return x + y
add.send(1, 2)
undefined

Celery vs Cloud Services

Celery vs 云服务

AWS Lambda, Google Cloud Functions, Azure Functions
When to use Cloud Functions:
  • Serverless infrastructure
  • Event-driven workflows
  • Pay-per-execution model
  • Auto-scaling
When to use Celery:
  • Self-hosted infrastructure
  • Complex task workflows
  • Cost predictability
  • Full control over execution

AWS Lambda、Google Cloud Functions、Azure Functions
选择云函数的场景:
  • 无服务器基础设施
  • 事件驱动工作流
  • 按执行次数付费
  • 自动扩缩容
选择Celery的场景:
  • 自托管基础设施
  • 复杂任务工作流
  • 可预测的成本
  • 对执行过程完全控制

Best Practices

最佳实践

Task Design

任务设计

  1. Idempotency: Tasks should be safe to run multiple times
    python
    @app.task
    def process_order(order_id):
        order = Order.objects.get(id=order_id)
        if order.status == 'processed':
            return  # Already processed, skip
    
        order.process()
        order.status = 'processed'
        order.save()
  2. Small, Focused Tasks: One responsibility per task
    python
    # Bad: Monolithic task
    @app.task
    def process_user(user_id):
        send_welcome_email(user_id)
        create_profile(user_id)
        setup_notifications(user_id)
    
    # Good: Separate tasks
    @app.task
    def send_welcome_email(user_id):
        ...
    
    @app.task
    def create_profile(user_id):
        ...
    
    workflow = group([
        send_welcome_email.s(user_id),
        create_profile.s(user_id),
        setup_notifications.s(user_id)
    ])
  3. Avoid Database Objects in Arguments: Use IDs instead
    python
    # Bad
    @app.task
    def process_user(user):  # User object
        ...
    
    # Good
    @app.task
    def process_user(user_id):
        user = User.objects.get(id=user_id)
        ...
  4. Set Time Limits: Prevent runaway tasks
    python
    @app.task(time_limit=300, soft_time_limit=240)
    def bounded_task():
        ...
  1. 幂等性:任务多次执行应安全无副作用
    python
    @app.task
    def process_order(order_id):
        order = Order.objects.get(id=order_id)
        if order.status == 'processed':
            return  # 已处理,跳过
    
        order.process()
        order.status = 'processed'
        order.save()
  2. 单一职责:每个任务只负责一件事
    python
    # 不好:单体任务
    @app.task
    def process_user(user_id):
        send_welcome_email(user_id)
        create_profile(user_id)
        setup_notifications(user_id)
    
    # 好:拆分任务
    @app.task
    def send_welcome_email(user_id):
        ...
    
    @app.task
    def create_profile(user_id):
        ...
    
    workflow = group([
        send_welcome_email.s(user_id),
        create_profile.s(user_id),
        setup_notifications.s(user_id)
    ])
  3. 参数使用ID而非对象:避免传递数据库对象
    python
    # 不好
    @app.task
    def process_user(user):  # User对象
        ...
    
    # 好
    @app.task
    def process_user(user_id):
        user = User.objects.get(id=user_id)
        ...
  4. 设置时间限制:防止任务失控
    python
    @app.task(time_limit=300, soft_time_limit=240)
    def bounded_task():
        ...

Error Handling

错误处理

  1. Categorize Exceptions: Different handling for different errors
  2. Use Exponential Backoff: Avoid overwhelming failing services
  3. Set Max Retries: Don't retry forever
  4. Log Failures: Always log why tasks fail
  1. 异常分类:不同错误采用不同处理方式
  2. 指数退避:避免压垮故障服务
  3. 设置最大重试次数:不要无限重试
  4. 记录失败日志:始终记录任务失败原因

Performance

性能优化

  1. Use
    ignore_result=True
    : For tasks that don't need results
  2. Batch Operations: Process multiple items per task
  3. Optimize Serialization: Use msgpack for speed
  4. Connection Pooling: Reuse database/broker connections
  5. Task Chunking: Avoid creating millions of tiny tasks
  1. 使用
    ignore_result=True
    :无需结果的任务启用此选项
  2. 批量操作:每个任务处理多个条目
  3. 优化序列化:使用msgpack提升速度
  4. 连接池:复用数据库/代理连接
  5. 任务分块:避免创建大量微小任务

Monitoring

监控

  1. Enable Events: Track task lifecycle
  2. Use Flower: Web-based monitoring
  3. Health Checks: Monitor worker availability
  4. Sentry Integration: Track errors
  1. 启用事件:跟踪任务生命周期
  2. 使用Flower:基于Web的监控工具
  3. 健康检查:监控Worker可用性
  4. Sentry集成:跟踪错误

Security

安全

  1. Validate Input: Always validate task arguments
  2. Secure Broker: Use authentication and encryption
  3. Limit Task Execution Time: Prevent resource exhaustion
  4. Rate Limiting: Protect against task flooding

  1. 验证输入:始终验证任务参数
  2. 安全代理:使用认证与加密
  3. 限制任务执行时间:防止资源耗尽
  4. 速率限制:防止任务洪水攻击

Troubleshooting

故障排查

Tasks Not Executing

任务未执行

Symptoms: Tasks queued but not processing
Diagnosis:
bash
undefined
症状:任务已加入队列但未处理
诊断:
bash
undefined

Check if workers are running

检查Worker是否运行

celery -A myapp inspect active
celery -A myapp inspect active

Check worker stats

检查Worker统计信息

celery -A myapp inspect stats
celery -A myapp inspect stats

Check registered tasks

检查已注册任务

celery -A myapp inspect registered

**Solutions**:
- Start workers: `celery -A myapp worker`
- Check worker is consuming correct queues
- Verify task routing configuration
- Check broker connectivity
celery -A myapp inspect registered

**解决方案**:
- 启动Worker:`celery -A myapp worker`
- 检查Worker是否消费正确队列
- 验证任务路由配置
- 检查代理连接

Tasks Failing Silently

任务静默失败

Symptoms: Tasks show SUCCESS but don't work
Diagnosis:
python
undefined
症状:任务显示SUCCESS但未实际执行
诊断:
python
undefined

Enable task tracking

启用任务跟踪

app.conf.task_track_started = True
app.conf.task_track_started = True

Check task result and traceback

检查任务结果与异常栈

result = task.delay() if result.failed(): print(result.traceback)

**Solutions**:
- Check logs: `celery -A myapp worker --loglevel=debug`
- Enable eager mode in tests to see exceptions
- Use `task_eager_propagates = True` in tests
result = task.delay() if result.failed(): print(result.traceback)

**解决方案**:
- 查看日志:`celery -A myapp worker --loglevel=debug`
- 测试环境启用急切模式查看异常
- 测试环境设置`task_eager_propagates = True`

Memory Leaks

内存泄漏

Symptoms: Worker memory grows over time
Solutions:
python
undefined
症状:Worker内存持续增长
解决方案:
python
undefined

Restart workers after N tasks

执行N个任务后重启Worker

worker_max_tasks_per_child = 1000
worker_max_tasks_per_child = 1000

Restart on memory limit

达到内存限制后重启

worker_max_memory_per_child = 200000 # 200MB
undefined
worker_max_memory_per_child = 200000 # 200MB
undefined

Slow Task Execution

任务执行缓慢

Symptoms: Tasks taking longer than expected
Diagnosis:
python
undefined
症状:任务执行时间远超预期
诊断:
python
undefined

Add timing

添加计时

import time
@app.task(bind=True) def timed_task(self): start = time.time() result = slow_operation() duration = time.time() - start logger.info(f"Task {self.request.id} took {duration}s") return result

**Solutions**:
- Increase worker concurrency
- Optimize task code
- Use task chunking
- Add more workers
import time
@app.task(bind=True) def timed_task(self): start = time.time() result = slow_operation() duration = time.time() - start logger.info(f"任务{self.request.id}耗时{duration}秒") return result

**解决方案**:
- 增加Worker并发数
- 优化任务代码
- 使用任务分块
- 添加更多Worker

Broker Connection Issues

代理连接问题

Symptoms: Tasks not reaching workers
Diagnosis:
bash
undefined
症状:任务无法到达Worker
诊断:
bash
undefined

Test broker connection

测试代理连接

python -c "from celery_app import app; print(app.connection().connect())"

**Solutions**:
- Check broker is running: `redis-cli ping` or `rabbitmqctl status`
- Verify broker URL in configuration
- Check network connectivity
- Enable connection retry: `broker_connection_retry_on_startup = True`
python -c "from celery_app import app; print(app.connection().connect())"

**解决方案**:
- 检查代理是否运行:`redis-cli ping` 或 `rabbitmqctl status`
- 验证配置中的代理URL
- 检查网络连接
- 启用连接重试:`broker_connection_retry_on_startup = True`

Task Results Not Persisting

任务结果未持久化

Symptoms:
result.get()
returns None
Solutions:
  • Verify result backend configured
  • Check task doesn't have
    ignore_result=True
  • Verify result hasn't expired (
    result_expires
    )
  • Test backend connection
症状
result.get()
返回None
解决方案:
  • 验证结果后端配置
  • 检查任务是否设置
    ignore_result=True
  • 验证结果是否已过期(
    result_expires
  • 测试后端连接

Beat Not Scheduling Tasks

Beat未调度任务

Symptoms: Periodic tasks not running
Diagnosis:
bash
undefined
症状:周期性任务未运行
诊断:
bash
undefined

Check beat is running

检查Beat是否运行

ps aux | grep celery | grep beat
ps aux | grep celery | grep beat

Check beat schedule

检查Beat调度

celery -A myapp inspect scheduled

**Solutions**:
- Ensure beat process is running
- Verify `beat_schedule` configuration
- Check beat log for errors
- Use database scheduler for dynamic schedules
celery -A myapp inspect scheduled

**解决方案**:
- 确保Beat进程运行
- 验证`beat_schedule`配置
- 查看Beat日志中的错误
- 动态调度使用数据库调度器

Worker Crashes

Worker崩溃

Symptoms: Workers die unexpectedly
Solutions:
  • Check logs for errors
  • Set
    worker_max_tasks_per_child
    to prevent memory leaks
  • Add task time limits
  • Use systemd for automatic restart
  • Monitor with Flower
症状:Worker意外终止
解决方案:
  • 查看日志中的错误
  • 设置
    worker_max_tasks_per_child
    防止内存泄漏
  • 添加任务时间限制
  • 使用systemd自动重启
  • 使用Flower监控

Task Queue Buildup

任务队列堆积

Symptoms: Tasks accumulating in queue
Solutions:
  • Add more workers
  • Increase worker concurrency
  • Optimize slow tasks
  • Add task routing to distribute load
  • Check for blocked workers

症状:任务在队列中累积
解决方案:
  • 添加更多Worker
  • 增加Worker并发数
  • 优化缓慢任务
  • 任务路由分发负载
  • 检查是否有阻塞的Worker

Advanced Configuration

高级配置

Custom Task Classes

自定义任务类

python
from celery import Task

class DatabaseTask(Task):
    """Task that manages database connections"""
    _db = None

    @property
    def db(self):
        if self._db is None:
            self._db = create_db_connection()
        return self._db

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        """Close connection after task"""
        if self._db is not None:
            self._db.close()

@app.task(base=DatabaseTask)
def db_task(query):
    return db_task.db.execute(query)
python
from celery import Task

class DatabaseTask(Task):
    """管理数据库连接的任务类"""
    _db = None

    @property
    def db(self):
        if self._db is None:
            self._db = create_db_connection()
        return self._db

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        """任务结束后关闭连接"""
        if self._db is not None:
            self._db.close()

@app.task(base=DatabaseTask)
def db_task(query):
    return db_task.db.execute(query)

Custom Serializers

自定义序列化器

python
from kombu.serialization import register

def my_encoder(obj):
    # Custom encoding logic
    return json.dumps(obj)

def my_decoder(data):
    # Custom decoding logic
    return json.loads(data)

register('myjson', my_encoder, my_decoder,
         content_type='application/x-myjson',
         content_encoding='utf-8')

app.conf.task_serializer = 'myjson'
python
from kombu.serialization import register

def my_encoder(obj):
    # 自定义编码逻辑
    return json.dumps(obj)

def my_decoder(data):
    # 自定义解码逻辑
    return json.loads(data)

register('myjson', my_encoder, my_decoder,
         content_type='application/x-myjson',
         content_encoding='utf-8')

app.conf.task_serializer = 'myjson'

Task Inheritance

任务继承

python
class BaseTask(Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        send_alert(f"Task {self.name} failed", str(exc))

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        logger.warning(f"Task {self.name} retrying")

@app.task(base=BaseTask)
def monitored_task():
    return perform_work()

End of Celery Skill Documentation
For more information:
python
class BaseTask(Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        send_alert(f"任务{self.name}失败", str(exc))

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        logger.warning(f"任务{self.name}将重试")

@app.task(base=BaseTask)
def monitored_task():
    return perform_work()

Celery技能文档结束
更多信息:

Related Skills

相关技能

When using Celery, these skills enhance your workflow:
  • django: Django + Celery integration for background tasks
  • fastapi-local-dev: FastAPI + Celery patterns for async API operations
  • test-driven-development: Testing async tasks and task chains
  • systematic-debugging: Debugging distributed task failures and race conditions
[Full documentation available in these skills if deployed in your bundle]
使用Celery时,以下技能可提升工作流:
  • django: Django + Celery集成实现后台任务
  • fastapi-local-dev: FastAPI + Celery异步API操作模式
  • test-driven-development: 异步任务与任务链的测试
  • systematic-debugging: 分布式任务失败与竞态条件调试
[若在技能包中部署,可查看这些技能的完整文档]