async-jobs

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Async Jobs

异步任务

Patterns for background task processing with Celery, ARQ, and Redis. Covers task queues, canvas workflows, scheduling, retry strategies, rate limiting, and production monitoring. Each category has individual rule files in
references/
loaded on-demand.
基于Celery、ARQ和Redis的后台任务处理模式。涵盖任务队列、Canvas工作流、调度、重试策略、速率限制和生产环境监控。每个分类在
references/
目录下都有独立的规则文件,可按需加载。

Quick Reference

快速参考

CategoryRulesImpactWhen to Use
Configurationcelery-configHIGHCelery app setup, broker, serialization, worker tuning
Task Routingtask-routingHIGHPriority queues, multi-queue workers, dynamic routing
Canvas Workflowscanvas-workflowsHIGHChain, group, chord, nested workflows
Retry Strategiesretry-strategiesHIGHExponential backoff, idempotency, dead letter queues
Schedulingscheduled-tasksMEDIUMCelery Beat, crontab, database-backed schedules
Monitoringmonitoring-healthMEDIUMFlower, custom events, health checks, metrics
Result Backendsresult-backendsMEDIUMRedis results, custom states, progress tracking
ARQ Patternsarq-patternsMEDIUMAsync Redis Queue for FastAPI, lightweight jobs
Temporal Workflowstemporal-workflowsHIGHDurable workflow definitions, sagas, signals, queries
Temporal Activitiestemporal-activitiesHIGHActivity patterns, workers, heartbeats, testing
Total: 10 rules across 9 categories
分类规则影响级别使用场景
配置celery-configCelery应用设置、消息代理、序列化、Worker调优
任务路由task-routing优先级队列、多队列Worker、动态路由
Canvas工作流canvas-workflows链式、分组、和弦式、嵌套工作流
重试策略retry-strategies指数退避、幂等性、死信队列
任务调度scheduled-tasksCelery Beat、crontab、基于数据库的调度
监控monitoring-healthFlower、自定义事件、健康检查、指标
结果后端result-backendsRedis结果存储、自定义状态、进度跟踪
ARQ模式arq-patterns用于FastAPI的异步Redis队列、轻量级任务
Temporal工作流temporal-workflows持久化工作流定义、Saga模式、信号、查询
Temporal活动temporal-activities活动模式、Worker、心跳、测试
总计:9个分类下的10条规则

Quick Start

快速开始

python
undefined
python
undefined

Celery task with retry

Celery task with retry

from celery import shared_task
@shared_task( bind=True, max_retries=3, autoretry_for=(ConnectionError, TimeoutError), retry_backoff=True, ) def process_order(self, order_id: str) -> dict: result = do_processing(order_id) return {"order_id": order_id, "status": "completed"}

```python
from celery import shared_task
@shared_task( bind=True, max_retries=3, autoretry_for=(ConnectionError, TimeoutError), retry_backoff=True, ) def process_order(self, order_id: str) -> dict: result = do_processing(order_id) return {"order_id": order_id, "status": "completed"}

```python

ARQ task with FastAPI

ARQ task with FastAPI

from arq import create_pool from arq.connections import RedisSettings
async def generate_report(ctx: dict, report_id: str) -> dict: data = await ctx["db"].fetch_report_data(report_id) pdf = await render_pdf(data) return {"report_id": report_id, "size": len(pdf)}
@router.post("/api/v1/reports") async def create_report(data: ReportRequest, arq: ArqRedis = Depends(get_arq_pool)): job = await arq.enqueue_job("generate_report", data.report_id) return {"job_id": job.job_id}
undefined
from arq import create_pool from arq.connections import RedisSettings
async def generate_report(ctx: dict, report_id: str) -> dict: data = await ctx["db"].fetch_report_data(report_id) pdf = await render_pdf(data) return {"report_id": report_id, "size": len(pdf)}
@router.post("/api/v1/reports") async def create_report(data: ReportRequest, arq: ArqRedis = Depends(get_arq_pool)): job = await arq.enqueue_job("generate_report", data.report_id) return {"job_id": job.job_id}
undefined

Configuration

配置

Production Celery app configuration with secure defaults and worker tuning.
生产环境Celery应用配置,包含安全默认值和Worker调优。

Key Patterns

核心模式

  • JSON serialization with
    task_serializer="json"
    for safety
  • Late acknowledgment with
    task_acks_late=True
    to prevent task loss on crash
  • Time limits with both
    task_time_limit
    (hard) and
    task_soft_time_limit
    (soft)
  • Fair distribution with
    worker_prefetch_multiplier=1
  • Reject on lost with
    task_reject_on_worker_lost=True
  • 使用
    task_serializer="json"
    进行JSON序列化以保障安全
  • 启用
    task_acks_late=True
    延迟确认,防止Worker崩溃时任务丢失
  • 同时设置
    task_time_limit
    (硬限制)和
    task_soft_time_limit
    (软限制)的时间限制
  • 设置
    worker_prefetch_multiplier=1
    实现公平分配
  • 启用
    task_reject_on_worker_lost=True
    实现Worker丢失时拒绝任务

Key Decisions

关键决策

DecisionRecommendation
SerializerJSON (never pickle)
Ack modeLate ack (
task_acks_late=True
)
Prefetch1 for fair, 4-8 for throughput
Time limitsoft < hard (e.g., 540/600)
TimezoneUTC always
决策项推荐方案
序列化器JSON(绝不要用pickle)
确认模式延迟确认(
task_acks_late=True
预取数公平分配设为1,吞吐量优先设为4-8
时间限制软限制 < 硬限制(例如:540/600)
时区始终使用UTC

Task Routing

任务路由

Priority queue configuration with multi-queue workers and dynamic routing.
优先级队列配置,支持多队列Worker和动态路由。

Key Patterns

核心模式

  • Named queues for critical/high/default/low/bulk separation
  • Redis priority with
    queue_order_strategy: "priority"
    and 0-9 levels
  • Task router classes for dynamic routing based on task attributes
  • Per-queue workers with tuned concurrency and prefetch settings
  • Content-based routing for dynamic workflow dispatch
  • 为关键/高/默认/低/批量任务分离设置命名队列
  • 结合Redis的
    queue_order_strategy: "priority"
    和0-9级别实现Redis优先级
  • 基于任务属性实现动态路由的任务路由类
  • 针对不同队列调优并发数和预取设置的队列专属Worker
  • 用于动态工作流分发的基于内容的路由

Key Decisions

关键决策

DecisionRecommendation
Queue count3-5 (critical/high/default/low/bulk)
Priority levels0-9 with Redis
x-max-priority
Worker assignmentDedicated workers per queue
Prefetch1 for critical, 4-8 for bulk
RoutingRouter class for 5+ routing rules
决策项推荐方案
队列数量3-5个(关键/高/默认/低/批量)
优先级级别结合Redis
x-max-priority
设置0-9级
Worker分配为每个队列配置专属Worker
预取数关键任务设为1,批量任务设为4-8
路由方式路由规则≥5条时使用路由类

Canvas Workflows

Canvas工作流

Celery canvas primitives for sequential, parallel, and fan-in/fan-out workflows.
用于顺序、并行和扇入/扇出工作流的Celery Canvas原语。

Key Patterns

核心模式

  • Chain for sequential ETL pipelines with result passing
  • Group for parallel execution of independent tasks
  • Chord for fan-out/fan-in with aggregation callback
  • Immutable signatures (
    si()
    ) for steps that ignore input
  • Nested workflows combining groups inside chains
  • Link error callbacks for workflow-level error handling
  • 用于顺序ETL管道并传递结果的Chain
  • 用于独立任务并行执行的Group
  • 用于扇出/扇入并带有聚合回调的Chord
  • 用于忽略输入步骤的不可变签名(
    si()
  • 结合Group和Chain的嵌套工作流
  • 用于工作流级错误处理的链接错误回调

Key Decisions

关键决策

DecisionRecommendation
SequentialChain with
s()
ParallelGroup for independent tasks
Fan-inChord (all must succeed for callback)
Ignore inputUse
si()
immutable signature
Error in chainReject stops chain, retry continues
Partial failuresReturn error dict in chord tasks
决策项推荐方案
顺序执行使用
s()
创建Chain
并行执行对独立任务使用Group
扇入操作使用Chord(所有任务成功后触发回调)
忽略输入使用
si()
不可变签名
链中错误拒绝任务会终止链,重试任务会继续执行
部分失败在Chord任务中返回错误字典

Retry Strategies

重试策略

Retry patterns with exponential backoff, idempotency, and dead letter queues.
包含指数退避、幂等性和死信队列的重试模式。

Key Patterns

核心模式

  • Exponential backoff with
    retry_backoff=True
    and
    retry_backoff_max
  • Jitter with
    retry_jitter=True
    to prevent thundering herd
  • Idempotency keys in Redis to prevent duplicate processing
  • Dead letter queues for failed tasks requiring manual review
  • Task locking to prevent concurrent execution of singleton tasks
  • Base task classes with shared retry configuration
  • 结合
    retry_backoff=True
    retry_backoff_max
    实现指数退避
  • 启用
    retry_jitter=True
    添加抖动,防止惊群效应
  • 在Redis中存储幂等性键,防止重复处理
  • 为需要人工审核的失败任务设置死信队列
  • 为单例任务设置任务锁,防止并发执行
  • 包含共享重试配置的基础任务类

Key Decisions

关键决策

DecisionRecommendation
Retry delayExponential backoff with jitter
Max retries3-5 for transient, 0 for permanent
IdempotencyRedis key with TTL
Failed tasksDLQ for manual review
SingletonRedis lock with TTL
决策项推荐方案
重试延迟带抖动的指数退避
最大重试次数瞬时错误设为3-5次,永久错误设为0次
幂等性带TTL的Redis键
失败任务处理送入死信队列等待人工审核
单例任务带TTL的Redis锁

Scheduling

任务调度

Celery Beat periodic task configuration with crontab, database-backed schedules, and overlap prevention.
包含crontab、基于数据库的调度和防止重叠的Celery Beat定时任务配置。

Key Patterns

核心模式

  • Crontab for time-based schedules (daily, weekly, monthly)
  • Interval for fixed-frequency tasks (every N seconds)
  • Database scheduler with
    django-celery-beat
    for dynamic schedules
  • Schedule locks to prevent overlapping long-running scheduled tasks
  • Adaptive polling with self-rescheduling tasks
  • 用于基于时间的调度(每日、每周、每月)的Crontab
  • 用于固定频率任务(每N秒执行一次)的Interval
  • 使用
    django-celery-beat
    数据库调度器实现动态调度
  • 用于防止长时间定时任务重叠的调度锁
  • 带有自调度功能的自适应轮询任务

Key Decisions

关键决策

DecisionRecommendation
Schedule typeCrontab for time-based, interval for frequency
DynamicDatabase scheduler (
django-celery-beat
)
OverlapRedis lock with timeout
Beat processSeparate process (not embedded)
TimezoneUTC always
决策项推荐方案
调度类型基于时间用Crontab,基于频率用Interval
动态调度使用数据库调度器(
django-celery-beat
重叠处理带超时的Redis锁
Beat进程使用独立进程(而非嵌入式)
时区始终使用UTC

Monitoring

监控

Production monitoring with Flower, custom signals, health checks, and Prometheus metrics.
基于Flower、自定义信号、健康检查和Prometheus指标的生产环境监控方案。

Key Patterns

核心模式

  • Flower dashboard for real-time task monitoring
  • Celery signals (
    task_prerun
    ,
    task_postrun
    ,
    task_failure
    ) for metrics
  • Health check endpoint verifying broker connection and active workers
  • Queue depth monitoring for autoscaling decisions
  • Beat monitoring for scheduled task dispatch tracking
  • 用于实时任务监控的Flower仪表盘
  • 用于指标采集的Celery信号
    task_prerun
    task_postrun
    task_failure
  • 验证消息代理连接和活跃Worker的健康检查端点
  • 用于自动扩缩容决策的队列深度监控
  • 用于跟踪定时任务分发的Beat监控

Key Decisions

关键决策

DecisionRecommendation
DashboardFlower with persistent storage
MetricsPrometheus via celery signals
HealthBroker + worker + queue depth
AlertingSignal on task_failure
AutoscaleQueue depth > threshold
决策项推荐方案
监控仪表盘带持久化存储的Flower
指标采集通过Celery信号对接Prometheus
健康检查包含消息代理+Worker+队列深度
告警基于
task_failure
信号触发
自动扩缩容队列深度超过阈值时触发

Result Backends

结果后端

Task result storage, custom states, and progress tracking patterns.
任务结果存储、自定义状态和进度跟踪模式。

Key Patterns

核心模式

  • Redis backend for task status and small results
  • Custom task states (VALIDATING, PROCESSING, UPLOADING) for progress
  • update_state()
    for real-time progress reporting
  • S3/database for large result storage (never Redis)
  • AsyncResult for querying task state and progress
  • 用于任务状态和小结果存储的Redis后端
  • 用于进度跟踪的自定义任务状态(VALIDATING、PROCESSING、UPLOADING)
  • 用于实时进度上报的**
    update_state()
    **
  • 用于大结果存储的S3/数据库(绝不要用Redis)
  • 用于查询任务状态和进度的AsyncResult

Key Decisions

关键决策

DecisionRecommendation
Status storageRedis result backend
Large resultsS3 or database (never Redis)
ProgressCustom states with
update_state()
Result queryAsyncResult with state checks
决策项推荐方案
状态存储Redis结果后端
大结果存储S3或数据库(绝不要用Redis)
进度跟踪结合
update_state()
使用自定义状态
结果查询使用AsyncResult并检查状态

ARQ Patterns

ARQ模式

Lightweight async Redis Queue for FastAPI and simple background tasks.
用于FastAPI和简单后台任务的轻量级异步Redis队列。

Key Patterns

核心模式

  • Native async/await with
    arq
    for FastAPI integration
  • Worker lifecycle with
    startup
    /
    shutdown
    hooks for resource management
  • Job enqueue from FastAPI routes with
    enqueue_job()
  • Job status tracking with
    Job.status()
    and
    Job.result()
  • Delayed tasks with
    _delay=timedelta()
    for deferred execution
  • 与FastAPI集成的
    arq
    原生async/await支持
  • 用于资源管理的Worker生命周期
    startup
    /
    shutdown
    钩子)
  • 在FastAPI路由中使用
    enqueue_job()
    入队任务
  • 使用
    Job.status()
    Job.result()
    跟踪任务状态
  • 使用
    _delay=timedelta()
    实现延迟任务

Key Decisions

关键决策

DecisionRecommendation
Simple asyncARQ (native async)
Complex workflowsCelery (chains, chords)
In-process quickFastAPI BackgroundTasks
LLM workflowsLangGraph (not Celery)
决策项推荐方案
简单异步任务ARQ(原生异步支持)
复杂工作流Celery(支持Chain、Chord)
进程内快速任务FastAPI BackgroundTasks
LLM工作流LangGraph(而非Celery)

Tool Selection

工具选择

ToolBest ForComplexity
ARQFastAPI, simple async jobsLow
CeleryComplex workflows, enterpriseHigh
RQSimple Redis queuesLow
DramatiqReliable messagingMedium
FastAPI BackgroundTasksIn-process quick tasksMinimal
工具适用场景复杂度
ARQFastAPI、简单异步任务
Celery复杂工作流、企业级应用
RQ简单Redis队列
Dramatiq可靠消息队列
FastAPI BackgroundTasks进程内快速任务极低

Anti-Patterns (FORBIDDEN)

反模式(禁止使用)

python
undefined
python
undefined

NEVER run long tasks synchronously in request handlers

NEVER run long tasks synchronously in request handlers

@router.post("/api/v1/reports") async def create_report(data: ReportRequest): pdf = await generate_pdf(data) # Blocks for minutes!
@router.post("/api/v1/reports") async def create_report(data: ReportRequest): pdf = await generate_pdf(data) # Blocks for minutes!

NEVER block on results inside tasks (causes deadlock)

NEVER block on results inside tasks (causes deadlock)

@celery_app.task def bad_task(): result = other_task.delay() return result.get() # Blocks worker!
@celery_app.task def bad_task(): result = other_task.delay() return result.get() # Blocks worker!

NEVER store large results in Redis

NEVER store large results in Redis

@shared_task def process_file(file_id: str) -> bytes: return large_file_bytes # Store in S3/DB instead!
@shared_task def process_file(file_id: str) -> bytes: return large_file_bytes # Store in S3/DB instead!

NEVER skip idempotency for retried tasks

NEVER skip idempotency for retried tasks

@celery_app.task(max_retries=3) def create_order(order): Order.create(order) # Creates duplicates on retry!
@celery_app.task(max_retries=3) def create_order(order): Order.create(order) # Creates duplicates on retry!

NEVER use BackgroundTasks for distributed work

NEVER use BackgroundTasks for distributed work

background_tasks.add_task(long_running_job) # Lost if server restarts
background_tasks.add_task(long_running_job) # Lost if server restarts

NEVER ignore task acknowledgment settings

NEVER ignore task acknowledgment settings

celery_app.conf.task_acks_late = False # Default loses tasks on crash
celery_app.conf.task_acks_late = False # Default loses tasks on crash

ALWAYS use immutable signatures in chords

ALWAYS use immutable signatures in chords

chord([task.s(x) for x in items], callback.si()) # si() prevents arg pollution
undefined
chord([task.s(x) for x in items], callback.si()) # si() prevents arg pollution
undefined

Temporal Workflows

Temporal工作流

Durable execution engine for reliable distributed applications with Temporal.io.
用于可靠分布式应用的持久化执行引擎Temporal.io。

Key Patterns

核心模式

  • Workflow definitions with
    @workflow.defn
    and deterministic code
  • Saga pattern with compensation for multi-step transactions
  • Signals and queries for external interaction with running workflows
  • Timers with
    workflow.wait_condition()
    for human-in-the-loop
  • Parallel activities via
    asyncio.gather
    inside workflows
  • 使用
    @workflow.defn
    定义的工作流定义,代码需具备确定性
  • 用于多步骤事务补偿的Saga模式
  • 用于与运行中工作流交互的信号和查询
  • 用于人机交互场景的计时器(
    workflow.wait_condition()
  • 在工作流中通过
    asyncio.gather
    实现并行活动

Key Decisions

关键决策

DecisionRecommendation
Workflow IDBusiness-meaningful, idempotent
DeterminismUse
workflow.random()
,
workflow.now()
I/OAlways via activities, never directly
决策项推荐方案
工作流ID具备业务含义且幂等
确定性使用
workflow.random()
workflow.now()
I/O操作始终通过活动执行,绝不直接操作

Temporal Activities

Temporal活动

Activity and worker patterns for Temporal.io I/O operations.
用于Temporal.io I/O操作的活动和Worker模式。

Key Patterns

核心模式

  • Activity definitions with
    @activity.defn
    for all I/O
  • Heartbeating for long-running activities (> 60s)
  • Error classification with
    ApplicationError(non_retryable=True)
    for business errors
  • Worker configuration with dedicated task queues
  • Testing with
    WorkflowEnvironment.start_local()
  • 使用
    @activity.defn
    定义的活动定义,所有I/O操作均通过活动执行
  • 用于长时间运行活动(>60秒)的心跳机制
  • 使用
    ApplicationError(non_retryable=True)
    进行错误分类,标记业务错误为不可重试
  • 为不同活动配置专属任务队列的Worker配置
  • 使用
    WorkflowEnvironment.start_local()
    进行测试

Key Decisions

关键决策

DecisionRecommendation
Activity timeout
start_to_close
for most cases
Error handlingNon-retryable for business errors
TestingWorkflowEnvironment for integration tests
决策项推荐方案
活动超时大多数场景使用
start_to_close
错误处理业务错误标记为不可重试
测试方式使用WorkflowEnvironment进行集成测试

Related Skills

相关技能

  • python-backend
    - FastAPI, asyncio, SQLAlchemy patterns
  • langgraph
    - LangGraph workflow patterns (use for LLM workflows, not Celery)
  • distributed-systems
    - Resilience patterns, circuit breakers
  • monitoring-observability
    - Metrics and alerting
  • python-backend
    - FastAPI、asyncio、SQLAlchemy模式
  • langgraph
    - LangGraph工作流模式(用于LLM工作流,而非Celery)
  • distributed-systems
    - 弹性模式、断路器
  • monitoring-observability
    - 指标与告警

Capability Details

能力详情

celery-config

celery-config

Keywords: celery, configuration, broker, worker, setup Solves:
  • Production Celery app configuration
  • Broker and backend setup
  • Worker tuning and time limits
关键词: celery, configuration, broker, worker, setup 解决问题:
  • 生产环境Celery应用配置
  • 消息代理和后端设置
  • Worker调优和时间限制

task-routing

task-routing

Keywords: priority, queue, routing, high priority, worker Solves:
  • Premium user task prioritization
  • Multi-queue worker deployment
  • Dynamic task routing
关键词: priority, queue, routing, high priority, worker 解决问题:
  • 付费用户任务优先级划分
  • 多队列Worker部署
  • 动态任务路由

canvas-workflows

canvas-workflows

Keywords: chain, group, chord, signature, canvas, workflow, pipeline Solves:
  • Complex multi-step task pipelines
  • Parallel task execution with aggregation
  • Sequential task dependencies
关键词: chain, group, chord, signature, canvas, workflow, pipeline 解决问题:
  • 复杂多步骤任务管道
  • 带聚合的并行任务执行
  • 顺序任务依赖管理

retry-strategies

retry-strategies

Keywords: retry, backoff, idempotency, dead letter, resilience Solves:
  • Exponential backoff with jitter
  • Duplicate prevention for retried tasks
  • Failed task handling with DLQ
关键词: retry, backoff, idempotency, dead letter, resilience 解决问题:
  • 带抖动的指数退避
  • 重试任务的重复执行预防
  • 死信队列处理失败任务

scheduled-tasks

scheduled-tasks

Keywords: periodic, scheduled, cron, celery beat, interval Solves:
  • Run tasks on schedule (crontab)
  • Dynamic schedule management
  • Overlap prevention for long tasks
关键词: periodic, scheduled, cron, celery beat, interval 解决问题:
  • 按计划执行任务(crontab)
  • 动态调度管理
  • 长时间任务的重叠预防

monitoring-health

monitoring-health

Keywords: flower, monitoring, health check, metrics, alerting Solves:
  • Production task monitoring dashboard
  • Worker health checks
  • Queue depth autoscaling
关键词: flower, monitoring, health check, metrics, alerting 解决问题:
  • 生产环境任务监控仪表盘
  • Worker健康检查
  • 基于队列深度的自动扩缩容

result-backends

result-backends

Keywords: result, state, progress, AsyncResult, status Solves:
  • Task progress tracking with custom states
  • Result storage strategies
  • Job status API endpoints
关键词: result, state, progress, AsyncResult, status 解决问题:
  • 自定义状态实现任务进度跟踪
  • 结果存储策略
  • 任务状态API端点

arq-patterns

arq-patterns

Keywords: arq, async queue, redis queue, fastapi background Solves:
  • Lightweight async background tasks for FastAPI
  • Simple Redis job queue with async/await
  • Job status tracking
关键词: arq, async queue, redis queue, fastapi background 解决问题:
  • FastAPI的轻量级异步后台任务
  • 基于async/await的简单Redis任务队列
  • 任务状态跟踪