django-celery

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Django + Celery Async Task Patterns

Django + Celery 异步任务实践模式

Production-grade patterns for background task processing in Django using Celery with Redis or RabbitMQ.
基于Redis或RabbitMQ,在Django中使用Celery实现生产级后台任务处理的实践模式。

When to Activate

适用场景

  • Adding background jobs or async processing to a Django app
  • Implementing periodic/scheduled tasks
  • Offloading slow operations (email, PDF generation, API calls) from request cycle
  • Setting up Celery Beat for cron-like scheduling
  • Debugging task failures, retries, or queue backlogs
  • Writing tests for Celery tasks
  • 为Django应用添加后台任务或异步处理逻辑
  • 实现周期性/定时任务
  • 将耗时操作(邮件发送、PDF生成、API调用)从请求周期中剥离
  • 配置Celery Beat实现类Cron的调度功能
  • 调试任务失败、重试或队列积压问题
  • 为Celery任务编写测试用例

Project Setup

项目配置

Installation

安装

bash
pip install celery[redis] django-celery-results django-celery-beat
bash
pip install celery[redis] django-celery-results django-celery-beat

celery.py
— App Entrypoint

celery.py
— 应用入口

python
undefined
python
undefined

config/celery.py

config/celery.py

import os from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.development')
app = Celery('myproject') app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks() # Discovers tasks.py in each INSTALLED_APP
@app.task(bind=True, ignore_result=True) def debug_task(self): print(f'Request: {self.request!r}')

```python
import os from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.development')
app = Celery('myproject') app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks() # Discovers tasks.py in each INSTALLED_APP
@app.task(bind=True, ignore_result=True) def debug_task(self): print(f'Request: {self.request!r}')

```python

config/init.py

config/init.py

from .celery import app as celery_app
all = ('celery_app',)
undefined
from .celery import app as celery_app
all = ('celery_app',)
undefined

Django Settings

Django 配置

python
undefined
python
undefined

config/settings/base.py

config/settings/base.py

Broker (Redis recommended for production)

Broker(生产环境推荐使用Redis)

CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='redis://localhost:6379/0') CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND', default='django-db')
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='redis://localhost:6379/0') CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND', default='django-db')

Serialization

序列化配置

CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json'

Task behavior

任务行为配置

CELERY_TASK_TRACK_STARTED = True CELERY_TASK_TIME_LIMIT = 30 * 60 # Hard limit: 30 min CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60 # Soft limit: sends SoftTimeLimitExceeded CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # Prevent worker hoarding long tasks CELERY_TASK_ACKS_LATE = True # Re-queue on worker crash
CELERY_TASK_TRACK_STARTED = True CELERY_TASK_TIME_LIMIT = 30 * 60 # 硬限制:30分钟 CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60 # 软限制:触发SoftTimeLimitExceeded异常 CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # 避免Worker占用过长任务 CELERY_TASK_ACKS_LATE = True # Worker崩溃时重新入队

Result persistence

结果持久化配置

CELERY_RESULT_EXPIRES = 60 * 60 * 24 # Keep results 24 hours
CELERY_RESULT_EXPIRES = 60 * 60 * 24 # 结果保留24小时

Beat scheduler (for periodic tasks)

Beat调度器(用于周期性任务)

CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

Installed apps

已安装应用

INSTALLED_APPS += [ 'django_celery_results', 'django_celery_beat', ]
undefined
INSTALLED_APPS += [ 'django_celery_results', 'django_celery_beat', ]
undefined

Running Workers

启动Worker

bash
undefined
bash
undefined

Start worker (development)

启动Worker(开发环境)

celery -A config worker --loglevel=info
celery -A config worker --loglevel=info

Start beat scheduler (periodic tasks)

启动Beat调度器(周期性任务)

celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler

Combined worker + beat (dev only, never production)

同时启动Worker + Beat(仅开发环境使用,禁止生产环境)

celery -A config worker --beat --loglevel=info
celery -A config worker --beat --loglevel=info

Production: multiple workers with concurrency

生产环境:启动多个Worker并设置并发数

celery -A config worker --loglevel=warning --concurrency=4 -Q default,high_priority
undefined
celery -A config worker --loglevel=warning --concurrency=4 -Q default,high_priority
undefined

Task Design Patterns

任务设计模式

Basic Task

基础任务

python
undefined
python
undefined

apps/notifications/tasks.py

apps/notifications/tasks.py

from celery import shared_task import logging
logger = logging.getLogger(name)
@shared_task(name='notifications.send_welcome_email') def send_welcome_email(user_id: int) -> None: """Send welcome email to newly registered user.""" from apps.users.models import User from apps.notifications.services import EmailService
try:
    user = User.objects.get(pk=user_id)
except User.DoesNotExist:
    logger.warning('send_welcome_email: user %s not found', user_id)
    return  # Idempotent — do not raise, task already impossible to complete

EmailService.send_welcome(user)
logger.info('Welcome email sent to user %s', user_id)
undefined
from celery import shared_task import logging
logger = logging.getLogger(name)
@shared_task(name='notifications.send_welcome_email') def send_welcome_email(user_id: int) -> None: """Send welcome email to newly registered user.""" from apps.users.models import User from apps.notifications.services import EmailService
try:
    user = User.objects.get(pk=user_id)
except User.DoesNotExist:
    logger.warning('send_welcome_email: user %s not found', user_id)
    return  # Idempotent — do not raise, task already impossible to complete

EmailService.send_welcome(user)
logger.info('Welcome email sent to user %s', user_id)
undefined

Retryable Task

可重试任务

python
@shared_task(
    bind=True,
    name='integrations.sync_to_crm',
    max_retries=5,
    default_retry_delay=60,       # seconds before first retry
    autoretry_for=(ConnectionError, TimeoutError),
    retry_backoff=True,           # exponential backoff
    retry_backoff_max=600,        # cap at 10 minutes
    retry_jitter=True,            # randomise to avoid thundering herd
)
def sync_contact_to_crm(self, contact_id: int) -> dict:
    """Sync contact to external CRM with retry on transient failures."""
    from apps.crm.services import CRMClient

    try:
        result = CRMClient().sync(contact_id)
        return result
    except CRMClient.RateLimitError as exc:
        # Specific retry delay from response header
        raise self.retry(exc=exc, countdown=int(exc.retry_after))
python
@shared_task(
    bind=True,
    name='integrations.sync_to_crm',
    max_retries=5,
    default_retry_delay=60,       # 首次重试间隔(秒)
    autoretry_for=(ConnectionError, TimeoutError),
    retry_backoff=True,           # 指数退避
    retry_backoff_max=600,        # 最大间隔10分钟
    retry_jitter=True,            # 随机化避免惊群效应
)
def sync_contact_to_crm(self, contact_id: int) -> dict:
    """Sync contact to external CRM with retry on transient failures."""
    from apps.crm.services import CRMClient

    try:
        result = CRMClient().sync(contact_id)
        return result
    except CRMClient.RateLimitError as exc:
        # 根据响应头设置特定重试延迟
        raise self.retry(exc=exc, countdown=int(exc.retry_after))

Idempotent Task Pattern

幂等任务模式

Design tasks so they can safely run multiple times with the same inputs:
python
@shared_task(name='orders.mark_shipped')
def mark_order_shipped(order_id: int, tracking_number: str) -> None:
    """Mark order as shipped — safe to run multiple times."""
    from apps.orders.models import Order

    updated = Order.objects.filter(
        pk=order_id,
        status=Order.Status.PROCESSING,    # Guard: only update if not already shipped
    ).update(
        status=Order.Status.SHIPPED,
        tracking_number=tracking_number,
    )

    if not updated:
        logger.info('mark_order_shipped: order %s already shipped or not found', order_id)
设计可安全重复执行的任务,相同输入多次运行无副作用:
python
@shared_task(name='orders.mark_shipped')
def mark_order_shipped(order_id: int, tracking_number: str) -> None:
    """Mark order as shipped — safe to run multiple times."""
    from apps.orders.models import Order

    updated = Order.objects.filter(
        pk=order_id,
        status=Order.Status.PROCESSING,    # 防护:仅更新未发货订单
    ).update(
        status=Order.Status.SHIPPED,
        tracking_number=tracking_number,
    )

    if not updated:
        logger.info('mark_order_shipped: order %s already shipped or not found', order_id)

Task with Soft Time Limit

带软超时限制的任务

python
from celery.exceptions import SoftTimeLimitExceeded

@shared_task(
    bind=True,
    name='reports.generate_pdf',
    soft_time_limit=120,
    time_limit=150,
)
def generate_pdf_report(self, report_id: int) -> str:
    """Generate PDF report with graceful timeout handling."""
    from apps.reports.services import PDFGenerator

    try:
        path = PDFGenerator.build(report_id)
        return path
    except SoftTimeLimitExceeded:
        # Clean up partial files before hard kill
        PDFGenerator.cleanup(report_id)
        raise
python
from celery.exceptions import SoftTimeLimitExceeded

@shared_task(
    bind=True,
    name='reports.generate_pdf',
    soft_time_limit=120,
    time_limit=150,
)
def generate_pdf_report(self, report_id: int) -> str:
    """Generate PDF report with graceful timeout handling."""
    from apps.reports.services import PDFGenerator

    try:
        path = PDFGenerator.build(report_id)
        return path
    except SoftTimeLimitExceeded:
        # 硬终止前清理临时文件
        PDFGenerator.cleanup(report_id)
        raise

Calling Tasks

调用任务

python
from datetime import timedelta
from django.utils import timezone
python
from datetime import timedelta
from django.utils import timezone

Fire and forget (async)

异步触发(无需等待结果)

send_welcome_email.delay(user.pk)
send_welcome_email.delay(user.pk)

Schedule in the future

延迟触发

send_reminder.apply_async(args=[user.pk], countdown=3600) # 1 hour from now send_reminder.apply_async(args=[user.pk], eta=timezone.now() + timedelta(days=1))
send_reminder.apply_async(args=[user.pk], countdown=3600) # 1小时后执行 send_reminder.apply_async(args=[user.pk], eta=timezone.now() + timedelta(days=1))

Apply with queue routing

指定队列路由

sync_contact_to_crm.apply_async(args=[contact.pk], queue='high_priority')
sync_contact_to_crm.apply_async(args=[contact.pk], queue='high_priority')

Run synchronously (tests / debugging only)

同步执行(仅测试/调试使用)

result = generate_pdf_report.apply(args=[report.pk])
undefined
result = generate_pdf_report.apply(args=[report.pk])
undefined

Beat Scheduling (Periodic Tasks)

Beat调度(周期性任务)

Code-Defined Schedule

代码定义调度

python
undefined
python
undefined

config/settings/base.py

config/settings/base.py

from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = { 'cleanup-expired-sessions': { 'task': 'users.cleanup_expired_sessions', 'schedule': crontab(hour=2, minute=0), # 2am daily }, 'sync-inventory': { 'task': 'products.sync_inventory', 'schedule': 60.0, # every 60 seconds }, 'weekly-digest': { 'task': 'notifications.send_weekly_digest', 'schedule': crontab(day_of_week='monday', hour=8, minute=0), }, }
undefined
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = { 'cleanup-expired-sessions': { 'task': 'users.cleanup_expired_sessions', 'schedule': crontab(hour=2, minute=0), # 每日凌晨2点执行 }, 'sync-inventory': { 'task': 'products.sync_inventory', 'schedule': 60.0, # 每60秒执行一次 }, 'weekly-digest': { 'task': 'notifications.send_weekly_digest', 'schedule': crontab(day_of_week='monday', hour=8, minute=0), }, }
undefined

Database-Defined Schedule (via django-celery-beat)

数据库定义调度(基于django-celery-beat)

python
undefined
python
undefined

Manage periodic tasks from Django admin or code

通过Django后台或代码管理周期性任务

from django_celery_beat.models import PeriodicTask, CrontabSchedule import json
schedule, _ = CrontabSchedule.objects.get_or_create( hour='*/6', minute='0', timezone='UTC', )
PeriodicTask.objects.update_or_create( name='Sync inventory every 6 hours', defaults={ 'crontab': schedule, 'task': 'products.sync_inventory', 'args': json.dumps([]), 'enabled': True, } )
undefined
from django_celery_beat.models import PeriodicTask, CrontabSchedule import json
schedule, _ = CrontabSchedule.objects.get_or_create( hour='*/6', minute='0', timezone='UTC', )
PeriodicTask.objects.update_or_create( name='Sync inventory every 6 hours', defaults={ 'crontab': schedule, 'task': 'products.sync_inventory', 'args': json.dumps([]), 'enabled': True, } )
undefined

Canvas: Chaining and Grouping Tasks

Canvas:任务链式与分组执行

python
from celery import chain, group, chord
python
from celery import chain, group, chord

Chain: run tasks sequentially, passing results

Chain:按顺序执行任务,传递结果

pipeline = chain( fetch_data.s(source_id), transform_data.s(), # receives fetch_data result as first arg load_to_warehouse.s(), ) pipeline.delay()
pipeline = chain( fetch_data.s(source_id), transform_data.s(), # 接收fetch_data的结果作为第一个参数 load_to_warehouse.s(), ) pipeline.delay()

Group: run tasks in parallel

Group:并行执行任务

parallel = group( send_welcome_email.s(user_id) for user_id in new_user_ids ) parallel.delay()
parallel = group( send_welcome_email.s(user_id) for user_id in new_user_ids ) parallel.delay()

Chord: parallel tasks + callback when all complete

Chord:并行任务执行完成后触发回调

result = chord( group(process_chunk.s(chunk) for chunk in data_chunks), aggregate_results.s(), # called with list of chunk results ) result.delay()
undefined
result = chord( group(process_chunk.s(chunk) for chunk in data_chunks), aggregate_results.s(), # 接收所有chunk任务的结果列表 ) result.delay()
undefined

Error Handling and Dead Letter Queue

错误处理与死信队列

python
undefined
python
undefined

apps/core/tasks.py

apps/core/tasks.py

from celery.signals import task_failure
@task_failure.connect def on_task_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **kw): """Log all task failures to Sentry / alerting.""" import sentry_sdk with sentry_sdk.new_scope() as scope: scope.set_context('celery', { 'task': sender.name, 'task_id': task_id, 'args': args, 'kwargs': kwargs, }) sentry_sdk.capture_exception(exception)

```python
from celery.signals import task_failure
@task_failure.connect def on_task_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **kw): """Log all task failures to Sentry / alerting.""" import sentry_sdk with sentry_sdk.new_scope() as scope: scope.set_context('celery', { 'task': sender.name, 'task_id': task_id, 'args': args, 'kwargs': kwargs, }) sentry_sdk.capture_exception(exception)

```python

Route failed tasks to dead-letter queue after max retries

达到最大重试次数后将失败任务路由到死信队列

@shared_task( bind=True, max_retries=3, name='payments.charge_card', ) def charge_card(self, order_id: int) -> None: from apps.payments.models import Order, FailedCharge
try:
    _do_charge(order_id)
except Exception as exc:
    if self.request.retries >= self.max_retries:
        # Persist to dead-letter table for manual review
        FailedCharge.objects.create(
            order_id=order_id,
            error=str(exc),
            task_id=self.request.id,
        )
        return  # Don't raise — task is permanently failed
    raise self.retry(exc=exc)
undefined
@shared_task( bind=True, max_retries=3, name='payments.charge_card', ) def charge_card(self, order_id: int) -> None: from apps.payments.models import Order, FailedCharge
try:
    _do_charge(order_id)
except Exception as exc:
    if self.request.retries >= self.max_retries:
        # 持久化到死信表以便人工审核
        FailedCharge.objects.create(
            order_id=order_id,
            error=str(exc),
            task_id=self.request.id,
        )
        return  # 不再抛出异常——任务已永久失败
    raise self.retry(exc=exc)
undefined

Testing Celery Tasks

测试Celery任务

Unit Testing (No Broker)

单元测试(无需Broker)

python
undefined
python
undefined

tests/test_tasks.py

tests/test_tasks.py

import pytest from unittest.mock import patch, MagicMock from apps.notifications.tasks import send_welcome_email
class TestSendWelcomeEmail:
@pytest.mark.django_db
def test_sends_email_to_existing_user(self, user):
    with patch('apps.notifications.services.EmailService') as mock_email:
        send_welcome_email(user.pk)
        mock_email.send_welcome.assert_called_once_with(user)

@pytest.mark.django_db
def test_skips_missing_user_gracefully(self):
    """Should not raise when user is deleted between enqueue and execute."""
    send_welcome_email(99999)  # Non-existent user — must not raise
undefined
import pytest from unittest.mock import patch, MagicMock from apps.notifications.tasks import send_welcome_email
class TestSendWelcomeEmail:
@pytest.mark.django_db
def test_sends_email_to_existing_user(self, user):
    with patch('apps.notifications.services.EmailService') as mock_email:
        send_welcome_email(user.pk)
        mock_email.send_welcome.assert_called_once_with(user)

@pytest.mark.django_db
def test_skips_missing_user_gracefully(self):
    """Should not raise when user is deleted between enqueue and execute."""
    send_welcome_email(99999)  # 不存在的用户——不能抛出异常
undefined

Integration Testing with CELERY_TASK_ALWAYS_EAGER

集成测试(使用CELERY_TASK_ALWAYS_EAGER)

python
undefined
python
undefined

config/settings/test.py

config/settings/test.py

CELERY_TASK_ALWAYS_EAGER = True # Run tasks synchronously in tests CELERY_TASK_EAGER_PROPAGATES = True # Re-raise exceptions from tasks
CELERY_TASK_ALWAYS_EAGER = True # 测试中同步执行任务 CELERY_TASK_EAGER_PROPAGATES = True # 抛出任务中的异常

tests/test_integration.py

tests/test_integration.py

@pytest.mark.django_db def test_registration_triggers_welcome_email(client): with patch('apps.notifications.services.EmailService') as mock_email: response = client.post('/api/users/', { 'email': 'new@example.com', 'password': 'strongpass123', })
assert response.status_code == 201
mock_email.send_welcome.assert_called_once()
undefined
@pytest.mark.django_db def test_registration_triggers_welcome_email(client): with patch('apps.notifications.services.EmailService') as mock_email: response = client.post('/api/users/', { 'email': 'new@example.com', 'password': 'strongpass123', })
assert response.status_code == 201
mock_email.send_welcome.assert_called_once()
undefined

Testing Retries

测试重试机制

python
@pytest.mark.django_db
def test_task_retries_on_connection_error():
    with patch('apps.crm.services.CRMClient.sync') as mock_sync:
        mock_sync.side_effect = ConnectionError('timeout')

        with pytest.raises(ConnectionError):
            sync_contact_to_crm.apply(args=[1], throw=True)

        assert mock_sync.call_count == 1  # First attempt only when eager
python
@pytest.mark.django_db
def test_task_retries_on_connection_error():
    with patch('apps.crm.services.CRMClient.sync') as mock_sync:
        mock_sync.side_effect = ConnectionError('timeout')

        with pytest.raises(ConnectionError):
            sync_contact_to_crm.apply(args=[1], throw=True)

        assert mock_sync.call_count == 1  # 同步模式下仅执行首次尝试

Monitoring

监控

bash
undefined
bash
undefined

Inspect active workers and queues

查看活跃Worker和队列

celery -A config inspect active celery -A config inspect stats celery -A config inspect reserved
celery -A config inspect active celery -A config inspect stats celery -A config inspect reserved

Check queue lengths (Redis)

查看队列长度(Redis)

redis-cli llen celery
redis-cli llen celery

Flower: web-based real-time monitor

Flower:基于Web的实时监控工具

pip install flower celery -A config flower --port=5555
undefined
pip install flower celery -A config flower --port=5555
undefined

Anti-Patterns

反模式

python
undefined
python
undefined

BAD: Passing model instances — they may be stale by execution time

错误:传递模型实例——执行时实例可能已过期

send_welcome_email.delay(user) # Never pass ORM objects send_welcome_email.delay(user.pk) # Always pass PKs
send_welcome_email.delay(user) # 绝不要传递ORM对象 send_welcome_email.delay(user.pk) # 始终传递主键

BAD: Calling tasks synchronously in production views

错误:生产环境视图中同步调用任务

result = generate_report.apply() # Blocks the request thread
result = generate_report.apply() # 会阻塞请求线程

BAD: Non-idempotent task without guards

错误:无防护的非幂等任务

@shared_task def charge_and_fulfill(order_id): order.charge() # May charge twice if task retries! order.fulfill()
@shared_task def charge_and_fulfill(order_id): order.charge() # 任务重试时可能重复扣费! order.fulfill()

GOOD: Idempotent with status guard

正确:带状态防护的幂等任务

@shared_task def charge_and_fulfill(order_id): order = Order.objects.select_for_update().get(pk=order_id) if order.status != Order.Status.PENDING: return # Already processed order.charge() order.fulfill()
undefined
@shared_task def charge_and_fulfill(order_id): order = Order.objects.select_for_update().get(pk=order_id) if order.status != Order.Status.PENDING: return # 已处理过则直接返回 order.charge() order.fulfill()
undefined

Production Checklist

生产环境检查清单

CheckSetting
Worker restarts on crash
supervisord
or
systemd
unit
CELERY_TASK_ACKS_LATE = True
Re-queue tasks on worker crash
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
Fair distribution of long tasks
Separate queues per priority
-Q default,high_priority,low_priority
CELERY_TASK_SOFT_TIME_LIMIT
set
Graceful timeout before hard kill
Sentry integrationCapture all
task_failure
signals
Flower or other monitorVisibility into queue depths
Beat runs on single node onlyPrevents duplicate scheduled task execution
检查项配置
Worker崩溃后自动重启使用
supervisord
systemd
单元
CELERY_TASK_ACKS_LATE = True
Worker崩溃时重新入队任务
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
公平分配长耗时任务
按优先级拆分队列
-Q default,high_priority,low_priority
设置
CELERY_TASK_SOFT_TIME_LIMIT
硬终止前优雅处理超时
Sentry集成捕获所有
task_failure
信号
Flower或其他监控工具实时查看队列深度
Beat仅在单节点运行避免重复执行定时任务

Related Skills

相关技能

  • django-patterns
    — ORM, service layer, and project structure
  • django-tdd
    — Testing Django models, views, and services
  • python-testing
    — pytest configuration and fixtures
  • django-patterns
    — ORM、服务层与项目结构
  • django-tdd
    — Django模型、视图与服务测试
  • python-testing
    — pytest配置与夹具