django-celery-expert

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Django Celery Expert

Django Celery 专家指南

Overview

概述

This skill provides expert guidance for Django applications using Celery for asynchronous task processing. It covers task design patterns, worker configuration, error handling, monitoring, and production deployment strategies.
Key Capabilities:
  • Task design and implementation patterns
  • Worker configuration and scaling
  • Error handling and retry strategies
  • Periodic/scheduled task management
  • Monitoring and observability
  • Production deployment best practices
本技能为使用Celery进行异步任务处理的Django应用提供专业指导,涵盖任务设计模式、Worker配置、错误处理、监控及生产环境部署策略。
核心能力:
  • 任务设计与实现模式
  • Worker配置与扩容
  • 错误处理与重试策略
  • 定时/计划任务管理
  • 监控与可观测性
  • 生产环境部署最佳实践

When to Use

适用场景

Invoke this skill when you encounter these triggers:
Task Design:
  • "Create a Celery task for..."
  • "Move this to a background job"
  • "Process this asynchronously"
  • "Handle this outside the request"
Configuration & Setup:
  • "Configure Celery for Django"
  • "Set up task queues"
  • "Configure Celery workers"
  • "Set up Celery Beat for scheduling"
Error Handling:
  • "Handle task failures"
  • "Implement retry logic"
  • "Task keeps failing"
  • "Set up dead letter queue"
Performance & Scaling:
  • "Scale Celery workers"
  • "Optimize task throughput"
  • "Tasks are too slow"
  • "Handle high task volume"
Monitoring:
  • "Monitor Celery tasks"
  • "Set up Flower"
  • "Track task progress"
  • "Debug stuck tasks"
当遇到以下触发场景时,可调用本技能:
任务设计:
  • "为...创建Celery任务"
  • "将此迁移至后台作业"
  • "异步处理此任务"
  • "在请求外部处理此逻辑"
配置与搭建:
  • "为Django配置Celery"
  • "设置任务队列"
  • "配置Celery Worker"
  • "设置Celery Beat实现任务调度"
错误处理:
  • "处理任务失败"
  • "实现重试逻辑"
  • "任务持续失败"
  • "搭建死信队列"
性能与扩容:
  • "扩容Celery Worker"
  • "优化任务吞吐量"
  • "任务执行过慢"
  • "处理高任务量"
监控:
  • "监控Celery任务"
  • "搭建Flower"
  • "跟踪任务进度"
  • "调试停滞任务"

Instructions

操作流程

Follow this workflow when handling Django Celery requests:
处理Django Celery相关请求时,请遵循以下工作流:

1. Analyze the Request

1. 分析请求

Identify the task type:
  • Simple background task (fire-and-forget)
  • Task with result tracking (need to poll for completion)
  • Chained/grouped tasks (workflow orchestration)
  • Periodic/scheduled tasks (cron-like behavior)
  • Long-running tasks (need progress tracking)
Key questions:
  • Does the caller need the result?
  • Should failures be retried?
  • Is idempotency required?
  • What's the expected execution time?
  • How critical is guaranteed execution?
识别任务类型:
  • 简单后台任务(即发即弃)
  • 需要跟踪结果的任务(需轮询完成状态)
  • 链式/分组任务(工作流编排)
  • 定时/计划任务(类Cron行为)
  • 长时运行任务(需进度跟踪)
关键问题:
  • 调用方是否需要任务结果?
  • 失败时是否需要重试?
  • 是否需要幂等性?
  • 预期执行时长是多少?
  • 任务执行的可靠性要求有多高?

2. Load Relevant Reference Documentation

2. 加载相关参考文档

Based on the task type, reference the appropriate bundled documentation:
  • Django-specific patterns ->
    references/django-integration.md
  • Task implementation ->
    references/task-design-patterns.md
  • Configuration & setup ->
    references/configuration-guide.md
  • Error handling & retries ->
    references/error-handling.md
  • Periodic tasks ->
    references/periodic-tasks.md
  • Monitoring & debugging ->
    references/monitoring-observability.md
  • Production deployment ->
    references/production-deployment.md
根据任务类型,加载对应的内置文档:
  • Django专属模式 ->
    references/django-integration.md
  • 任务实现 ->
    references/task-design-patterns.md
  • 配置与搭建 ->
    references/configuration-guide.md
  • 错误处理与重试 ->
    references/error-handling.md
  • 定时任务 ->
    references/periodic-tasks.md
  • 监控与调试 ->
    references/monitoring-observability.md
  • 生产环境部署 ->
    references/production-deployment.md

3. Implement Following Best Practices

3. 遵循最佳实践实现

Task design principles:
  • Keep tasks small and focused
  • Design for idempotency when possible
  • Use explicit task names
  • Bind tasks for access to self
  • Pass serializable arguments only (IDs, not objects)
Error handling:
  • Configure appropriate retry behavior
  • Use exponential backoff
  • Set max retry limits
  • Handle specific exceptions appropriately
  • Log failures with context
Performance:
  • Use appropriate serializers (JSON for safety, pickle for Python objects)
  • Configure prefetch limits
  • Use task routing for prioritization
  • Batch operations when appropriate
  • Monitor memory usage
任务设计原则:
  • 保持任务小巧且聚焦单一职责
  • 尽可能设计为幂等任务
  • 使用明确的任务名称
  • 绑定任务以访问自身属性
  • 仅传递可序列化参数(如ID,而非对象)
错误处理:
  • 配置合适的重试行为
  • 使用指数退避策略
  • 设置最大重试次数限制
  • 针对性处理特定异常
  • 记录包含上下文信息的失败日志
性能优化:
  • 使用合适的序列化器(JSON保证安全,pickle用于Python对象)
  • 配置预取限制
  • 使用任务路由实现优先级管理
  • 合适时批量处理操作
  • 监控内存使用情况

4. Validate Implementation

4. 验证实现方案

Before presenting the solution:
  • Verify task is idempotent if retries enabled
  • Check serialization of arguments
  • Ensure proper error handling
  • Verify monitoring/logging is in place
  • Consider failure scenarios
在提供解决方案前:
  • 若启用重试,验证任务是否具备幂等性
  • 检查参数的可序列化性
  • 确保错误处理逻辑正确
  • 确认已配置监控/日志
  • 考虑各种失败场景

Bundled Resources

内置资源

references/ - Comprehensive Celery documentation loaded into context as needed
  • references/django-integration.md
    • transaction.on_commit() for safe task queuing
    • Database as source of truth with recovery tasks
    • Request-task correlation with django-guid
    • Testing Django Celery tasks
  • references/task-design-patterns.md
    • Task signatures and calling patterns
    • Binding and accessing task properties
    • Task inheritance and base classes
    • Workflow patterns (chains, groups, chords)
    • Idempotency and exactly-once delivery
  • references/configuration-guide.md
    • Django-Celery integration setup
    • Broker configuration (Redis, RabbitMQ)
    • Result backend options
    • Worker settings and concurrency
    • Task routing and queues
  • references/error-handling.md
    • Retry strategies and backoff
    • Exception handling patterns
    • Dead letter queues
    • Task rejection and requeue
    • Timeout handling
  • references/periodic-tasks.md
    • Celery Beat configuration
    • Crontab and interval schedules
    • Django database scheduler
    • Dynamic schedule management
    • Timezone considerations
  • references/monitoring-observability.md
    • Flower setup and usage
    • Prometheus/Grafana integration
    • Task event monitoring
    • Logging best practices
    • Debugging stuck tasks
  • references/production-deployment.md
    • Worker deployment patterns
    • Process supervision (systemd, supervisor)
    • Containerized deployments
    • Scaling strategies
    • Health checks and graceful shutdown
references/ - 全面的Celery文档,可根据需要加载至上下文
  • references/django-integration.md
    • 使用transaction.on_commit()安全队列化任务
    • 以数据库为可信源并实现恢复任务
    • 使用django-guid实现请求-任务关联
    • 测试Django Celery任务
  • references/task-design-patterns.md
    • 任务签名与调用模式
    • 绑定与访问任务属性
    • 任务继承与基类
    • 工作流模式(链式、分组、和弦)
    • 幂等性与恰好一次交付
  • references/configuration-guide.md
    • Django-Celery集成搭建
    • Broker配置(Redis、RabbitMQ)
    • 结果后端选项
    • Worker设置与并发配置
    • 任务路由与队列
  • references/error-handling.md
    • 重试策略与退避机制
    • 异常处理模式
    • 死信队列
    • 任务拒绝与重新入队
    • 超时处理
  • references/periodic-tasks.md
    • Celery Beat配置
    • Crontab与间隔调度
    • Django数据库调度器
    • 动态调度管理
    • 时区考量
  • references/monitoring-observability.md
    • Flower搭建与使用
    • Prometheus/Grafana集成
    • 任务事件监控
    • 日志最佳实践
    • 调试停滞任务
  • references/production-deployment.md
    • Worker部署模式
    • 进程管理(systemd、supervisor)
    • 容器化部署
    • 扩容策略
    • 健康检查与优雅关闭

Examples

示例

Example 1: Basic Background Task

示例1:基础后台任务

User Request:
"Send welcome emails in the background after user registration"
Implementation:
python
undefined
用户请求:
"用户注册后在后台发送欢迎邮件"
实现代码:
python
undefined

tasks.py

tasks.py

from celery import shared_task from django.core.mail import send_mail
@shared_task(bind=True, max_retries=3) def send_welcome_email(self, user_id): from users.models import User
try:
    user = User.objects.get(id=user_id)
    send_mail(
        subject="Welcome!",
        message=f"Hi {user.name}, welcome to our platform!",
        from_email="noreply@example.com",
        recipient_list=[user.email],
    )
except User.DoesNotExist:
    pass  # User deleted, skip
except Exception as exc:
    raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
from celery import shared_task from django.core.mail import send_mail
@shared_task(bind=True, max_retries=3) def send_welcome_email(self, user_id): from users.models import User
try:
    user = User.objects.get(id=user_id)
    send_mail(
        subject="Welcome!",
        message=f"Hi {user.name}, welcome to our platform!",
        from_email="noreply@example.com",
        recipient_list=[user.email],
    )
except User.DoesNotExist:
    pass  # 用户已删除,跳过
except Exception as exc:
    raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

views.py

views.py

def register(request): user = User.objects.create(...) send_welcome_email.delay(user.id) # Fire and forget return redirect('dashboard')
undefined
def register(request): user = User.objects.create(...) send_welcome_email.delay(user.id) # 即发即弃 return redirect('dashboard')
undefined

Example 2: Task with Progress Tracking

示例2:带进度跟踪的任务

User Request:
"Process a large CSV import with progress updates"
Implementation:
python
@shared_task(bind=True)
def import_csv(self, file_path, total_rows):
    from myapp.models import Record

    with open(file_path) as f:
        reader = csv.DictReader(f)
        for i, row in enumerate(reader):
            Record.objects.create(**row)
            if i % 100 == 0:
                self.update_state(
                    state='PROGRESS',
                    meta={'current': i, 'total': total_rows}
                )

    return {'status': 'complete', 'processed': total_rows}
用户请求:
"处理大型CSV导入并提供进度更新"
实现代码:
python
@shared_task(bind=True)
def import_csv(self, file_path, total_rows):
    from myapp.models import Record

    with open(file_path) as f:
        reader = csv.DictReader(f)
        for i, row in enumerate(reader):
            Record.objects.create(**row)
            if i % 100 == 0:
                self.update_state(
                    state='PROGRESS',
                    meta={'current': i, 'total': total_rows}
                )

    return {'status': 'complete', 'processed': total_rows}

Check progress

检查进度

result = import_csv.AsyncResult(task_id) if result.state == 'PROGRESS': progress = result.info.get('current', 0) / result.info.get('total', 1)
undefined
result = import_csv.AsyncResult(task_id) if result.state == 'PROGRESS': progress = result.info.get('current', 0) / result.info.get('total', 1)
undefined

Example 3: Workflow with Chains

示例3:链式工作流

User Request:
"Process an order: validate inventory, charge payment, then send confirmation"
Implementation:
python
from celery import chain

@shared_task
def validate_inventory(order_id):
    # Returns order_id if valid, raises if not
    order = Order.objects.get(id=order_id)
    if not order.items_in_stock():
        raise ValueError("Items out of stock")
    return order_id

@shared_task
def charge_payment(order_id):
    order = Order.objects.get(id=order_id)
    order.charge()
    return order_id

@shared_task
def send_confirmation(order_id):
    order = Order.objects.get(id=order_id)
    order.send_confirmation_email()

def process_order(order_id):
    workflow = chain(
        validate_inventory.s(order_id),
        charge_payment.s(),
        send_confirmation.s()
    )
    workflow.delay()
用户请求:
"处理订单:验证库存、收取付款、然后发送确认邮件"
实现代码:
python
from celery import chain

@shared_task
def validate_inventory(order_id):
    # 验证通过返回order_id,失败则抛出异常
    order = Order.objects.get(id=order_id)
    if not order.items_in_stock():
        raise ValueError("Items out of stock")
    return order_id

@shared_task
def charge_payment(order_id):
    order = Order.objects.get(id=order_id)
    order.charge()
    return order_id

@shared_task
def send_confirmation(order_id):
    order = Order.objects.get(id=order_id)
    order.send_confirmation_email()

def process_order(order_id):
    workflow = chain(
        validate_inventory.s(order_id),
        charge_payment.s(),
        send_confirmation.s()
    )
    workflow.delay()

Additional Notes

补充说明

Common Pitfalls:
  • Passing Django model instances instead of IDs
  • Not handling task idempotency with retries
  • Missing timeout configuration for long tasks
  • Not monitoring task queue depth
  • Ignoring result backend cleanup
Django Integration:
  • Use
    django-celery-beat
    for database-backed schedules
  • Use
    django-celery-results
    for storing task results in Django
  • Configure
    CELERY_
    settings in Django settings.py
  • Use
    @shared_task
    for reusable apps
Security:
  • Never pass sensitive data in task arguments
  • Use signed serializers if pickle is required
  • Restrict Flower access in production
  • Validate task arguments
常见陷阱:
  • 传递Django模型实例而非ID
  • 启用重试时未处理任务幂等性
  • 未为长时任务配置超时
  • 未监控任务队列深度
  • 忽略结果后端清理
Django集成要点:
  • 使用
    django-celery-beat
    实现基于数据库的调度
  • 使用
    django-celery-results
    将任务结果存储至Django
  • 在Django的settings.py中配置
    CELERY_
    相关设置
  • 为可复用应用使用
    @shared_task
安全注意事项:
  • 切勿在任务参数中传递敏感数据
  • 若需使用pickle,需使用签名序列化器
  • 生产环境中限制Flower的访问权限
  • 验证任务参数