django-celery
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseDjango + Celery Async Task Patterns
Django + Celery 异步任务实践模式
Production-grade patterns for background task processing in Django using Celery with Redis or RabbitMQ.
基于Redis或RabbitMQ,在Django中使用Celery实现生产级后台任务处理的实践模式。
When to Activate
适用场景
- Adding background jobs or async processing to a Django app
- Implementing periodic/scheduled tasks
- Offloading slow operations (email, PDF generation, API calls) from request cycle
- Setting up Celery Beat for cron-like scheduling
- Debugging task failures, retries, or queue backlogs
- Writing tests for Celery tasks
- 为Django应用添加后台任务或异步处理逻辑
- 实现周期性/定时任务
- 将耗时操作(邮件发送、PDF生成、API调用)从请求周期中剥离
- 配置Celery Beat实现类Cron的调度功能
- 调试任务失败、重试或队列积压问题
- 为Celery任务编写测试用例
Project Setup
项目配置
Installation
安装
bash
pip install celery[redis] django-celery-results django-celery-beatbash
pip install celery[redis] django-celery-results django-celery-beatcelery.py
— App Entrypoint
celery.pycelery.py
— 应用入口
celery.pypython
undefinedpython
undefinedconfig/celery.py
config/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.development')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks() # Discovers tasks.py in each INSTALLED_APP
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')
```pythonimport os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.development')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks() # Discovers tasks.py in each INSTALLED_APP
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')
```pythonconfig/init.py
config/init.py
from .celery import app as celery_app
all = ('celery_app',)
undefinedfrom .celery import app as celery_app
all = ('celery_app',)
undefinedDjango Settings
Django 配置
python
undefinedpython
undefinedconfig/settings/base.py
config/settings/base.py
Broker (Redis recommended for production)
Broker(生产环境推荐使用Redis)
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='redis://localhost:6379/0')
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND', default='django-db')
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='redis://localhost:6379/0')
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND', default='django-db')
Serialization
序列化配置
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
Task behavior
任务行为配置
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60 # Hard limit: 30 min
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60 # Soft limit: sends SoftTimeLimitExceeded
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # Prevent worker hoarding long tasks
CELERY_TASK_ACKS_LATE = True # Re-queue on worker crash
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60 # 硬限制:30分钟
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60 # 软限制:触发SoftTimeLimitExceeded异常
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # 避免Worker占用过长任务
CELERY_TASK_ACKS_LATE = True # Worker崩溃时重新入队
Result persistence
结果持久化配置
CELERY_RESULT_EXPIRES = 60 * 60 * 24 # Keep results 24 hours
CELERY_RESULT_EXPIRES = 60 * 60 * 24 # 结果保留24小时
Beat scheduler (for periodic tasks)
Beat调度器(用于周期性任务)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
Installed apps
已安装应用
INSTALLED_APPS += [
'django_celery_results',
'django_celery_beat',
]
undefinedINSTALLED_APPS += [
'django_celery_results',
'django_celery_beat',
]
undefinedRunning Workers
启动Worker
bash
undefinedbash
undefinedStart worker (development)
启动Worker(开发环境)
celery -A config worker --loglevel=info
celery -A config worker --loglevel=info
Start beat scheduler (periodic tasks)
启动Beat调度器(周期性任务)
celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
Combined worker + beat (dev only, never production)
同时启动Worker + Beat(仅开发环境使用,禁止生产环境)
celery -A config worker --beat --loglevel=info
celery -A config worker --beat --loglevel=info
Production: multiple workers with concurrency
生产环境:启动多个Worker并设置并发数
celery -A config worker --loglevel=warning --concurrency=4 -Q default,high_priority
undefinedcelery -A config worker --loglevel=warning --concurrency=4 -Q default,high_priority
undefinedTask Design Patterns
任务设计模式
Basic Task
基础任务
python
undefinedpython
undefinedapps/notifications/tasks.py
apps/notifications/tasks.py
from celery import shared_task
import logging
logger = logging.getLogger(name)
@shared_task(name='notifications.send_welcome_email')
def send_welcome_email(user_id: int) -> None:
"""Send welcome email to newly registered user."""
from apps.users.models import User
from apps.notifications.services import EmailService
try:
user = User.objects.get(pk=user_id)
except User.DoesNotExist:
logger.warning('send_welcome_email: user %s not found', user_id)
return # Idempotent — do not raise, task already impossible to complete
EmailService.send_welcome(user)
logger.info('Welcome email sent to user %s', user_id)undefinedfrom celery import shared_task
import logging
logger = logging.getLogger(name)
@shared_task(name='notifications.send_welcome_email')
def send_welcome_email(user_id: int) -> None:
"""Send welcome email to newly registered user."""
from apps.users.models import User
from apps.notifications.services import EmailService
try:
user = User.objects.get(pk=user_id)
except User.DoesNotExist:
logger.warning('send_welcome_email: user %s not found', user_id)
return # Idempotent — do not raise, task already impossible to complete
EmailService.send_welcome(user)
logger.info('Welcome email sent to user %s', user_id)undefinedRetryable Task
可重试任务
python
@shared_task(
bind=True,
name='integrations.sync_to_crm',
max_retries=5,
default_retry_delay=60, # seconds before first retry
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True, # exponential backoff
retry_backoff_max=600, # cap at 10 minutes
retry_jitter=True, # randomise to avoid thundering herd
)
def sync_contact_to_crm(self, contact_id: int) -> dict:
"""Sync contact to external CRM with retry on transient failures."""
from apps.crm.services import CRMClient
try:
result = CRMClient().sync(contact_id)
return result
except CRMClient.RateLimitError as exc:
# Specific retry delay from response header
raise self.retry(exc=exc, countdown=int(exc.retry_after))python
@shared_task(
bind=True,
name='integrations.sync_to_crm',
max_retries=5,
default_retry_delay=60, # 首次重试间隔(秒)
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True, # 指数退避
retry_backoff_max=600, # 最大间隔10分钟
retry_jitter=True, # 随机化避免惊群效应
)
def sync_contact_to_crm(self, contact_id: int) -> dict:
"""Sync contact to external CRM with retry on transient failures."""
from apps.crm.services import CRMClient
try:
result = CRMClient().sync(contact_id)
return result
except CRMClient.RateLimitError as exc:
# 根据响应头设置特定重试延迟
raise self.retry(exc=exc, countdown=int(exc.retry_after))Idempotent Task Pattern
幂等任务模式
Design tasks so they can safely run multiple times with the same inputs:
python
@shared_task(name='orders.mark_shipped')
def mark_order_shipped(order_id: int, tracking_number: str) -> None:
"""Mark order as shipped — safe to run multiple times."""
from apps.orders.models import Order
updated = Order.objects.filter(
pk=order_id,
status=Order.Status.PROCESSING, # Guard: only update if not already shipped
).update(
status=Order.Status.SHIPPED,
tracking_number=tracking_number,
)
if not updated:
logger.info('mark_order_shipped: order %s already shipped or not found', order_id)设计可安全重复执行的任务,相同输入多次运行无副作用:
python
@shared_task(name='orders.mark_shipped')
def mark_order_shipped(order_id: int, tracking_number: str) -> None:
"""Mark order as shipped — safe to run multiple times."""
from apps.orders.models import Order
updated = Order.objects.filter(
pk=order_id,
status=Order.Status.PROCESSING, # 防护:仅更新未发货订单
).update(
status=Order.Status.SHIPPED,
tracking_number=tracking_number,
)
if not updated:
logger.info('mark_order_shipped: order %s already shipped or not found', order_id)Task with Soft Time Limit
带软超时限制的任务
python
from celery.exceptions import SoftTimeLimitExceeded
@shared_task(
bind=True,
name='reports.generate_pdf',
soft_time_limit=120,
time_limit=150,
)
def generate_pdf_report(self, report_id: int) -> str:
"""Generate PDF report with graceful timeout handling."""
from apps.reports.services import PDFGenerator
try:
path = PDFGenerator.build(report_id)
return path
except SoftTimeLimitExceeded:
# Clean up partial files before hard kill
PDFGenerator.cleanup(report_id)
raisepython
from celery.exceptions import SoftTimeLimitExceeded
@shared_task(
bind=True,
name='reports.generate_pdf',
soft_time_limit=120,
time_limit=150,
)
def generate_pdf_report(self, report_id: int) -> str:
"""Generate PDF report with graceful timeout handling."""
from apps.reports.services import PDFGenerator
try:
path = PDFGenerator.build(report_id)
return path
except SoftTimeLimitExceeded:
# 硬终止前清理临时文件
PDFGenerator.cleanup(report_id)
raiseCalling Tasks
调用任务
python
from datetime import timedelta
from django.utils import timezonepython
from datetime import timedelta
from django.utils import timezoneFire and forget (async)
异步触发(无需等待结果)
send_welcome_email.delay(user.pk)
send_welcome_email.delay(user.pk)
Schedule in the future
延迟触发
send_reminder.apply_async(args=[user.pk], countdown=3600) # 1 hour from now
send_reminder.apply_async(args=[user.pk], eta=timezone.now() + timedelta(days=1))
send_reminder.apply_async(args=[user.pk], countdown=3600) # 1小时后执行
send_reminder.apply_async(args=[user.pk], eta=timezone.now() + timedelta(days=1))
Apply with queue routing
指定队列路由
sync_contact_to_crm.apply_async(args=[contact.pk], queue='high_priority')
sync_contact_to_crm.apply_async(args=[contact.pk], queue='high_priority')
Run synchronously (tests / debugging only)
同步执行(仅测试/调试使用)
result = generate_pdf_report.apply(args=[report.pk])
undefinedresult = generate_pdf_report.apply(args=[report.pk])
undefinedBeat Scheduling (Periodic Tasks)
Beat调度(周期性任务)
Code-Defined Schedule
代码定义调度
python
undefinedpython
undefinedconfig/settings/base.py
config/settings/base.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'cleanup-expired-sessions': {
'task': 'users.cleanup_expired_sessions',
'schedule': crontab(hour=2, minute=0), # 2am daily
},
'sync-inventory': {
'task': 'products.sync_inventory',
'schedule': 60.0, # every 60 seconds
},
'weekly-digest': {
'task': 'notifications.send_weekly_digest',
'schedule': crontab(day_of_week='monday', hour=8, minute=0),
},
}
undefinedfrom celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'cleanup-expired-sessions': {
'task': 'users.cleanup_expired_sessions',
'schedule': crontab(hour=2, minute=0), # 每日凌晨2点执行
},
'sync-inventory': {
'task': 'products.sync_inventory',
'schedule': 60.0, # 每60秒执行一次
},
'weekly-digest': {
'task': 'notifications.send_weekly_digest',
'schedule': crontab(day_of_week='monday', hour=8, minute=0),
},
}
undefinedDatabase-Defined Schedule (via django-celery-beat)
数据库定义调度(基于django-celery-beat)
python
undefinedpython
undefinedManage periodic tasks from Django admin or code
通过Django后台或代码管理周期性任务
from django_celery_beat.models import PeriodicTask, CrontabSchedule
import json
schedule, _ = CrontabSchedule.objects.get_or_create(
hour='*/6', minute='0',
timezone='UTC',
)
PeriodicTask.objects.update_or_create(
name='Sync inventory every 6 hours',
defaults={
'crontab': schedule,
'task': 'products.sync_inventory',
'args': json.dumps([]),
'enabled': True,
}
)
undefinedfrom django_celery_beat.models import PeriodicTask, CrontabSchedule
import json
schedule, _ = CrontabSchedule.objects.get_or_create(
hour='*/6', minute='0',
timezone='UTC',
)
PeriodicTask.objects.update_or_create(
name='Sync inventory every 6 hours',
defaults={
'crontab': schedule,
'task': 'products.sync_inventory',
'args': json.dumps([]),
'enabled': True,
}
)
undefinedCanvas: Chaining and Grouping Tasks
Canvas:任务链式与分组执行
python
from celery import chain, group, chordpython
from celery import chain, group, chordChain: run tasks sequentially, passing results
Chain:按顺序执行任务,传递结果
pipeline = chain(
fetch_data.s(source_id),
transform_data.s(), # receives fetch_data result as first arg
load_to_warehouse.s(),
)
pipeline.delay()
pipeline = chain(
fetch_data.s(source_id),
transform_data.s(), # 接收fetch_data的结果作为第一个参数
load_to_warehouse.s(),
)
pipeline.delay()
Group: run tasks in parallel
Group:并行执行任务
parallel = group(
send_welcome_email.s(user_id)
for user_id in new_user_ids
)
parallel.delay()
parallel = group(
send_welcome_email.s(user_id)
for user_id in new_user_ids
)
parallel.delay()
Chord: parallel tasks + callback when all complete
Chord:并行任务执行完成后触发回调
result = chord(
group(process_chunk.s(chunk) for chunk in data_chunks),
aggregate_results.s(), # called with list of chunk results
)
result.delay()
undefinedresult = chord(
group(process_chunk.s(chunk) for chunk in data_chunks),
aggregate_results.s(), # 接收所有chunk任务的结果列表
)
result.delay()
undefinedError Handling and Dead Letter Queue
错误处理与死信队列
python
undefinedpython
undefinedapps/core/tasks.py
apps/core/tasks.py
from celery.signals import task_failure
@task_failure.connect
def on_task_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **kw):
"""Log all task failures to Sentry / alerting."""
import sentry_sdk
with sentry_sdk.new_scope() as scope:
scope.set_context('celery', {
'task': sender.name,
'task_id': task_id,
'args': args,
'kwargs': kwargs,
})
sentry_sdk.capture_exception(exception)
```pythonfrom celery.signals import task_failure
@task_failure.connect
def on_task_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **kw):
"""Log all task failures to Sentry / alerting."""
import sentry_sdk
with sentry_sdk.new_scope() as scope:
scope.set_context('celery', {
'task': sender.name,
'task_id': task_id,
'args': args,
'kwargs': kwargs,
})
sentry_sdk.capture_exception(exception)
```pythonRoute failed tasks to dead-letter queue after max retries
达到最大重试次数后将失败任务路由到死信队列
@shared_task(
bind=True,
max_retries=3,
name='payments.charge_card',
)
def charge_card(self, order_id: int) -> None:
from apps.payments.models import Order, FailedCharge
try:
_do_charge(order_id)
except Exception as exc:
if self.request.retries >= self.max_retries:
# Persist to dead-letter table for manual review
FailedCharge.objects.create(
order_id=order_id,
error=str(exc),
task_id=self.request.id,
)
return # Don't raise — task is permanently failed
raise self.retry(exc=exc)undefined@shared_task(
bind=True,
max_retries=3,
name='payments.charge_card',
)
def charge_card(self, order_id: int) -> None:
from apps.payments.models import Order, FailedCharge
try:
_do_charge(order_id)
except Exception as exc:
if self.request.retries >= self.max_retries:
# 持久化到死信表以便人工审核
FailedCharge.objects.create(
order_id=order_id,
error=str(exc),
task_id=self.request.id,
)
return # 不再抛出异常——任务已永久失败
raise self.retry(exc=exc)undefinedTesting Celery Tasks
测试Celery任务
Unit Testing (No Broker)
单元测试(无需Broker)
python
undefinedpython
undefinedtests/test_tasks.py
tests/test_tasks.py
import pytest
from unittest.mock import patch, MagicMock
from apps.notifications.tasks import send_welcome_email
class TestSendWelcomeEmail:
@pytest.mark.django_db
def test_sends_email_to_existing_user(self, user):
with patch('apps.notifications.services.EmailService') as mock_email:
send_welcome_email(user.pk)
mock_email.send_welcome.assert_called_once_with(user)
@pytest.mark.django_db
def test_skips_missing_user_gracefully(self):
"""Should not raise when user is deleted between enqueue and execute."""
send_welcome_email(99999) # Non-existent user — must not raiseundefinedimport pytest
from unittest.mock import patch, MagicMock
from apps.notifications.tasks import send_welcome_email
class TestSendWelcomeEmail:
@pytest.mark.django_db
def test_sends_email_to_existing_user(self, user):
with patch('apps.notifications.services.EmailService') as mock_email:
send_welcome_email(user.pk)
mock_email.send_welcome.assert_called_once_with(user)
@pytest.mark.django_db
def test_skips_missing_user_gracefully(self):
"""Should not raise when user is deleted between enqueue and execute."""
send_welcome_email(99999) # 不存在的用户——不能抛出异常undefinedIntegration Testing with CELERY_TASK_ALWAYS_EAGER
集成测试(使用CELERY_TASK_ALWAYS_EAGER)
python
undefinedpython
undefinedconfig/settings/test.py
config/settings/test.py
CELERY_TASK_ALWAYS_EAGER = True # Run tasks synchronously in tests
CELERY_TASK_EAGER_PROPAGATES = True # Re-raise exceptions from tasks
CELERY_TASK_ALWAYS_EAGER = True # 测试中同步执行任务
CELERY_TASK_EAGER_PROPAGATES = True # 抛出任务中的异常
tests/test_integration.py
tests/test_integration.py
@pytest.mark.django_db
def test_registration_triggers_welcome_email(client):
with patch('apps.notifications.services.EmailService') as mock_email:
response = client.post('/api/users/', {
'email': 'new@example.com',
'password': 'strongpass123',
})
assert response.status_code == 201
mock_email.send_welcome.assert_called_once()undefined@pytest.mark.django_db
def test_registration_triggers_welcome_email(client):
with patch('apps.notifications.services.EmailService') as mock_email:
response = client.post('/api/users/', {
'email': 'new@example.com',
'password': 'strongpass123',
})
assert response.status_code == 201
mock_email.send_welcome.assert_called_once()undefinedTesting Retries
测试重试机制
python
@pytest.mark.django_db
def test_task_retries_on_connection_error():
with patch('apps.crm.services.CRMClient.sync') as mock_sync:
mock_sync.side_effect = ConnectionError('timeout')
with pytest.raises(ConnectionError):
sync_contact_to_crm.apply(args=[1], throw=True)
assert mock_sync.call_count == 1 # First attempt only when eagerpython
@pytest.mark.django_db
def test_task_retries_on_connection_error():
with patch('apps.crm.services.CRMClient.sync') as mock_sync:
mock_sync.side_effect = ConnectionError('timeout')
with pytest.raises(ConnectionError):
sync_contact_to_crm.apply(args=[1], throw=True)
assert mock_sync.call_count == 1 # 同步模式下仅执行首次尝试Monitoring
监控
bash
undefinedbash
undefinedInspect active workers and queues
查看活跃Worker和队列
celery -A config inspect active
celery -A config inspect stats
celery -A config inspect reserved
celery -A config inspect active
celery -A config inspect stats
celery -A config inspect reserved
Check queue lengths (Redis)
查看队列长度(Redis)
redis-cli llen celery
redis-cli llen celery
Flower: web-based real-time monitor
Flower:基于Web的实时监控工具
pip install flower
celery -A config flower --port=5555
undefinedpip install flower
celery -A config flower --port=5555
undefinedAnti-Patterns
反模式
python
undefinedpython
undefinedBAD: Passing model instances — they may be stale by execution time
错误:传递模型实例——执行时实例可能已过期
send_welcome_email.delay(user) # Never pass ORM objects
send_welcome_email.delay(user.pk) # Always pass PKs
send_welcome_email.delay(user) # 绝不要传递ORM对象
send_welcome_email.delay(user.pk) # 始终传递主键
BAD: Calling tasks synchronously in production views
错误:生产环境视图中同步调用任务
result = generate_report.apply() # Blocks the request thread
result = generate_report.apply() # 会阻塞请求线程
BAD: Non-idempotent task without guards
错误:无防护的非幂等任务
@shared_task
def charge_and_fulfill(order_id):
order.charge() # May charge twice if task retries!
order.fulfill()
@shared_task
def charge_and_fulfill(order_id):
order.charge() # 任务重试时可能重复扣费!
order.fulfill()
GOOD: Idempotent with status guard
正确:带状态防护的幂等任务
@shared_task
def charge_and_fulfill(order_id):
order = Order.objects.select_for_update().get(pk=order_id)
if order.status != Order.Status.PENDING:
return # Already processed
order.charge()
order.fulfill()
undefined@shared_task
def charge_and_fulfill(order_id):
order = Order.objects.select_for_update().get(pk=order_id)
if order.status != Order.Status.PENDING:
return # 已处理过则直接返回
order.charge()
order.fulfill()
undefinedProduction Checklist
生产环境检查清单
| Check | Setting |
|---|---|
| Worker restarts on crash | |
| Re-queue tasks on worker crash |
| Fair distribution of long tasks |
| Separate queues per priority | |
| Graceful timeout before hard kill |
| Sentry integration | Capture all |
| Flower or other monitor | Visibility into queue depths |
| Beat runs on single node only | Prevents duplicate scheduled task execution |
| 检查项 | 配置 |
|---|---|
| Worker崩溃后自动重启 | 使用 |
| Worker崩溃时重新入队任务 |
| 公平分配长耗时任务 |
| 按优先级拆分队列 | |
设置 | 硬终止前优雅处理超时 |
| Sentry集成 | 捕获所有 |
| Flower或其他监控工具 | 实时查看队列深度 |
| Beat仅在单节点运行 | 避免重复执行定时任务 |
Related Skills
相关技能
- — ORM, service layer, and project structure
django-patterns - — Testing Django models, views, and services
django-tdd - — pytest configuration and fixtures
python-testing
- — ORM、服务层与项目结构
django-patterns - — Django模型、视图与服务测试
django-tdd - — pytest配置与夹具
python-testing