async-jobs
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAsync Jobs
异步任务
Patterns for background task processing with Celery, ARQ, and Redis. Covers task queues, canvas workflows, scheduling, retry strategies, rate limiting, and production monitoring. Each category has individual rule files in loaded on-demand.
references/基于Celery、ARQ和Redis的后台任务处理模式。涵盖任务队列、Canvas工作流、调度、重试策略、速率限制和生产环境监控。每个分类在目录下都有独立的规则文件,可按需加载。
references/Quick Reference
快速参考
| Category | Rules | Impact | When to Use |
|---|---|---|---|
| Configuration | celery-config | HIGH | Celery app setup, broker, serialization, worker tuning |
| Task Routing | task-routing | HIGH | Priority queues, multi-queue workers, dynamic routing |
| Canvas Workflows | canvas-workflows | HIGH | Chain, group, chord, nested workflows |
| Retry Strategies | retry-strategies | HIGH | Exponential backoff, idempotency, dead letter queues |
| Scheduling | scheduled-tasks | MEDIUM | Celery Beat, crontab, database-backed schedules |
| Monitoring | monitoring-health | MEDIUM | Flower, custom events, health checks, metrics |
| Result Backends | result-backends | MEDIUM | Redis results, custom states, progress tracking |
| ARQ Patterns | arq-patterns | MEDIUM | Async Redis Queue for FastAPI, lightweight jobs |
| Temporal Workflows | temporal-workflows | HIGH | Durable workflow definitions, sagas, signals, queries |
| Temporal Activities | temporal-activities | HIGH | Activity patterns, workers, heartbeats, testing |
Total: 10 rules across 9 categories
| 分类 | 规则 | 影响级别 | 使用场景 |
|---|---|---|---|
| 配置 | celery-config | 高 | Celery应用设置、消息代理、序列化、Worker调优 |
| 任务路由 | task-routing | 高 | 优先级队列、多队列Worker、动态路由 |
| Canvas工作流 | canvas-workflows | 高 | 链式、分组、和弦式、嵌套工作流 |
| 重试策略 | retry-strategies | 高 | 指数退避、幂等性、死信队列 |
| 任务调度 | scheduled-tasks | 中 | Celery Beat、crontab、基于数据库的调度 |
| 监控 | monitoring-health | 中 | Flower、自定义事件、健康检查、指标 |
| 结果后端 | result-backends | 中 | Redis结果存储、自定义状态、进度跟踪 |
| ARQ模式 | arq-patterns | 中 | 用于FastAPI的异步Redis队列、轻量级任务 |
| Temporal工作流 | temporal-workflows | 高 | 持久化工作流定义、Saga模式、信号、查询 |
| Temporal活动 | temporal-activities | 高 | 活动模式、Worker、心跳、测试 |
总计:9个分类下的10条规则
Quick Start
快速开始
python
undefinedpython
undefinedCelery task with retry
Celery task with retry
from celery import shared_task
@shared_task(
bind=True,
max_retries=3,
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True,
)
def process_order(self, order_id: str) -> dict:
result = do_processing(order_id)
return {"order_id": order_id, "status": "completed"}
```pythonfrom celery import shared_task
@shared_task(
bind=True,
max_retries=3,
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True,
)
def process_order(self, order_id: str) -> dict:
result = do_processing(order_id)
return {"order_id": order_id, "status": "completed"}
```pythonARQ task with FastAPI
ARQ task with FastAPI
from arq import create_pool
from arq.connections import RedisSettings
async def generate_report(ctx: dict, report_id: str) -> dict:
data = await ctx["db"].fetch_report_data(report_id)
pdf = await render_pdf(data)
return {"report_id": report_id, "size": len(pdf)}
@router.post("/api/v1/reports")
async def create_report(data: ReportRequest, arq: ArqRedis = Depends(get_arq_pool)):
job = await arq.enqueue_job("generate_report", data.report_id)
return {"job_id": job.job_id}
undefinedfrom arq import create_pool
from arq.connections import RedisSettings
async def generate_report(ctx: dict, report_id: str) -> dict:
data = await ctx["db"].fetch_report_data(report_id)
pdf = await render_pdf(data)
return {"report_id": report_id, "size": len(pdf)}
@router.post("/api/v1/reports")
async def create_report(data: ReportRequest, arq: ArqRedis = Depends(get_arq_pool)):
job = await arq.enqueue_job("generate_report", data.report_id)
return {"job_id": job.job_id}
undefinedConfiguration
配置
Production Celery app configuration with secure defaults and worker tuning.
生产环境Celery应用配置,包含安全默认值和Worker调优。
Key Patterns
核心模式
- JSON serialization with for safety
task_serializer="json" - Late acknowledgment with to prevent task loss on crash
task_acks_late=True - Time limits with both (hard) and
task_time_limit(soft)task_soft_time_limit - Fair distribution with
worker_prefetch_multiplier=1 - Reject on lost with
task_reject_on_worker_lost=True
- 使用进行JSON序列化以保障安全
task_serializer="json" - 启用的延迟确认,防止Worker崩溃时任务丢失
task_acks_late=True - 同时设置(硬限制)和
task_time_limit(软限制)的时间限制task_soft_time_limit - 设置实现公平分配
worker_prefetch_multiplier=1 - 启用实现Worker丢失时拒绝任务
task_reject_on_worker_lost=True
Key Decisions
关键决策
| Decision | Recommendation |
|---|---|
| Serializer | JSON (never pickle) |
| Ack mode | Late ack ( |
| Prefetch | 1 for fair, 4-8 for throughput |
| Time limit | soft < hard (e.g., 540/600) |
| Timezone | UTC always |
| 决策项 | 推荐方案 |
|---|---|
| 序列化器 | JSON(绝不要用pickle) |
| 确认模式 | 延迟确认( |
| 预取数 | 公平分配设为1,吞吐量优先设为4-8 |
| 时间限制 | 软限制 < 硬限制(例如:540/600) |
| 时区 | 始终使用UTC |
Task Routing
任务路由
Priority queue configuration with multi-queue workers and dynamic routing.
优先级队列配置,支持多队列Worker和动态路由。
Key Patterns
核心模式
- Named queues for critical/high/default/low/bulk separation
- Redis priority with and 0-9 levels
queue_order_strategy: "priority" - Task router classes for dynamic routing based on task attributes
- Per-queue workers with tuned concurrency and prefetch settings
- Content-based routing for dynamic workflow dispatch
- 为关键/高/默认/低/批量任务分离设置命名队列
- 结合Redis的和0-9级别实现Redis优先级
queue_order_strategy: "priority" - 基于任务属性实现动态路由的任务路由类
- 针对不同队列调优并发数和预取设置的队列专属Worker
- 用于动态工作流分发的基于内容的路由
Key Decisions
关键决策
| Decision | Recommendation |
|---|---|
| Queue count | 3-5 (critical/high/default/low/bulk) |
| Priority levels | 0-9 with Redis |
| Worker assignment | Dedicated workers per queue |
| Prefetch | 1 for critical, 4-8 for bulk |
| Routing | Router class for 5+ routing rules |
| 决策项 | 推荐方案 |
|---|---|
| 队列数量 | 3-5个(关键/高/默认/低/批量) |
| 优先级级别 | 结合Redis |
| Worker分配 | 为每个队列配置专属Worker |
| 预取数 | 关键任务设为1,批量任务设为4-8 |
| 路由方式 | 路由规则≥5条时使用路由类 |
Canvas Workflows
Canvas工作流
Celery canvas primitives for sequential, parallel, and fan-in/fan-out workflows.
用于顺序、并行和扇入/扇出工作流的Celery Canvas原语。
Key Patterns
核心模式
- Chain for sequential ETL pipelines with result passing
- Group for parallel execution of independent tasks
- Chord for fan-out/fan-in with aggregation callback
- Immutable signatures () for steps that ignore input
si() - Nested workflows combining groups inside chains
- Link error callbacks for workflow-level error handling
- 用于顺序ETL管道并传递结果的Chain
- 用于独立任务并行执行的Group
- 用于扇出/扇入并带有聚合回调的Chord
- 用于忽略输入步骤的不可变签名()
si() - 结合Group和Chain的嵌套工作流
- 用于工作流级错误处理的链接错误回调
Key Decisions
关键决策
| Decision | Recommendation |
|---|---|
| Sequential | Chain with |
| Parallel | Group for independent tasks |
| Fan-in | Chord (all must succeed for callback) |
| Ignore input | Use |
| Error in chain | Reject stops chain, retry continues |
| Partial failures | Return error dict in chord tasks |
| 决策项 | 推荐方案 |
|---|---|
| 顺序执行 | 使用 |
| 并行执行 | 对独立任务使用Group |
| 扇入操作 | 使用Chord(所有任务成功后触发回调) |
| 忽略输入 | 使用 |
| 链中错误 | 拒绝任务会终止链,重试任务会继续执行 |
| 部分失败 | 在Chord任务中返回错误字典 |
Retry Strategies
重试策略
Retry patterns with exponential backoff, idempotency, and dead letter queues.
包含指数退避、幂等性和死信队列的重试模式。
Key Patterns
核心模式
- Exponential backoff with and
retry_backoff=Trueretry_backoff_max - Jitter with to prevent thundering herd
retry_jitter=True - Idempotency keys in Redis to prevent duplicate processing
- Dead letter queues for failed tasks requiring manual review
- Task locking to prevent concurrent execution of singleton tasks
- Base task classes with shared retry configuration
- 结合和
retry_backoff=True实现指数退避retry_backoff_max - 启用添加抖动,防止惊群效应
retry_jitter=True - 在Redis中存储幂等性键,防止重复处理
- 为需要人工审核的失败任务设置死信队列
- 为单例任务设置任务锁,防止并发执行
- 包含共享重试配置的基础任务类
Key Decisions
关键决策
| Decision | Recommendation |
|---|---|
| Retry delay | Exponential backoff with jitter |
| Max retries | 3-5 for transient, 0 for permanent |
| Idempotency | Redis key with TTL |
| Failed tasks | DLQ for manual review |
| Singleton | Redis lock with TTL |
| 决策项 | 推荐方案 |
|---|---|
| 重试延迟 | 带抖动的指数退避 |
| 最大重试次数 | 瞬时错误设为3-5次,永久错误设为0次 |
| 幂等性 | 带TTL的Redis键 |
| 失败任务处理 | 送入死信队列等待人工审核 |
| 单例任务 | 带TTL的Redis锁 |
Scheduling
任务调度
Celery Beat periodic task configuration with crontab, database-backed schedules, and overlap prevention.
包含crontab、基于数据库的调度和防止重叠的Celery Beat定时任务配置。
Key Patterns
核心模式
- Crontab for time-based schedules (daily, weekly, monthly)
- Interval for fixed-frequency tasks (every N seconds)
- Database scheduler with for dynamic schedules
django-celery-beat - Schedule locks to prevent overlapping long-running scheduled tasks
- Adaptive polling with self-rescheduling tasks
- 用于基于时间的调度(每日、每周、每月)的Crontab
- 用于固定频率任务(每N秒执行一次)的Interval
- 使用的数据库调度器实现动态调度
django-celery-beat - 用于防止长时间定时任务重叠的调度锁
- 带有自调度功能的自适应轮询任务
Key Decisions
关键决策
| Decision | Recommendation |
|---|---|
| Schedule type | Crontab for time-based, interval for frequency |
| Dynamic | Database scheduler ( |
| Overlap | Redis lock with timeout |
| Beat process | Separate process (not embedded) |
| Timezone | UTC always |
| 决策项 | 推荐方案 |
|---|---|
| 调度类型 | 基于时间用Crontab,基于频率用Interval |
| 动态调度 | 使用数据库调度器( |
| 重叠处理 | 带超时的Redis锁 |
| Beat进程 | 使用独立进程(而非嵌入式) |
| 时区 | 始终使用UTC |
Monitoring
监控
Production monitoring with Flower, custom signals, health checks, and Prometheus metrics.
基于Flower、自定义信号、健康检查和Prometheus指标的生产环境监控方案。
Key Patterns
核心模式
- Flower dashboard for real-time task monitoring
- Celery signals (,
task_prerun,task_postrun) for metricstask_failure - Health check endpoint verifying broker connection and active workers
- Queue depth monitoring for autoscaling decisions
- Beat monitoring for scheduled task dispatch tracking
- 用于实时任务监控的Flower仪表盘
- 用于指标采集的Celery信号(、
task_prerun、task_postrun)task_failure - 验证消息代理连接和活跃Worker的健康检查端点
- 用于自动扩缩容决策的队列深度监控
- 用于跟踪定时任务分发的Beat监控
Key Decisions
关键决策
| Decision | Recommendation |
|---|---|
| Dashboard | Flower with persistent storage |
| Metrics | Prometheus via celery signals |
| Health | Broker + worker + queue depth |
| Alerting | Signal on task_failure |
| Autoscale | Queue depth > threshold |
| 决策项 | 推荐方案 |
|---|---|
| 监控仪表盘 | 带持久化存储的Flower |
| 指标采集 | 通过Celery信号对接Prometheus |
| 健康检查 | 包含消息代理+Worker+队列深度 |
| 告警 | 基于 |
| 自动扩缩容 | 队列深度超过阈值时触发 |
Result Backends
结果后端
Task result storage, custom states, and progress tracking patterns.
任务结果存储、自定义状态和进度跟踪模式。
Key Patterns
核心模式
- Redis backend for task status and small results
- Custom task states (VALIDATING, PROCESSING, UPLOADING) for progress
- for real-time progress reporting
update_state() - S3/database for large result storage (never Redis)
- AsyncResult for querying task state and progress
- 用于任务状态和小结果存储的Redis后端
- 用于进度跟踪的自定义任务状态(VALIDATING、PROCESSING、UPLOADING)
- 用于实时进度上报的****
update_state() - 用于大结果存储的S3/数据库(绝不要用Redis)
- 用于查询任务状态和进度的AsyncResult
Key Decisions
关键决策
| Decision | Recommendation |
|---|---|
| Status storage | Redis result backend |
| Large results | S3 or database (never Redis) |
| Progress | Custom states with |
| Result query | AsyncResult with state checks |
| 决策项 | 推荐方案 |
|---|---|
| 状态存储 | Redis结果后端 |
| 大结果存储 | S3或数据库(绝不要用Redis) |
| 进度跟踪 | 结合 |
| 结果查询 | 使用AsyncResult并检查状态 |
ARQ Patterns
ARQ模式
Lightweight async Redis Queue for FastAPI and simple background tasks.
用于FastAPI和简单后台任务的轻量级异步Redis队列。
Key Patterns
核心模式
- Native async/await with for FastAPI integration
arq - Worker lifecycle with /
startuphooks for resource managementshutdown - Job enqueue from FastAPI routes with
enqueue_job() - Job status tracking with and
Job.status()Job.result() - Delayed tasks with for deferred execution
_delay=timedelta()
- 与FastAPI集成的原生async/await支持
arq - 用于资源管理的Worker生命周期(/
startup钩子)shutdown - 在FastAPI路由中使用入队任务
enqueue_job() - 使用和
Job.status()跟踪任务状态Job.result() - 使用实现延迟任务
_delay=timedelta()
Key Decisions
关键决策
| Decision | Recommendation |
|---|---|
| Simple async | ARQ (native async) |
| Complex workflows | Celery (chains, chords) |
| In-process quick | FastAPI BackgroundTasks |
| LLM workflows | LangGraph (not Celery) |
| 决策项 | 推荐方案 |
|---|---|
| 简单异步任务 | ARQ(原生异步支持) |
| 复杂工作流 | Celery(支持Chain、Chord) |
| 进程内快速任务 | FastAPI BackgroundTasks |
| LLM工作流 | LangGraph(而非Celery) |
Tool Selection
工具选择
| Tool | Best For | Complexity |
|---|---|---|
| ARQ | FastAPI, simple async jobs | Low |
| Celery | Complex workflows, enterprise | High |
| RQ | Simple Redis queues | Low |
| Dramatiq | Reliable messaging | Medium |
| FastAPI BackgroundTasks | In-process quick tasks | Minimal |
| 工具 | 适用场景 | 复杂度 |
|---|---|---|
| ARQ | FastAPI、简单异步任务 | 低 |
| Celery | 复杂工作流、企业级应用 | 高 |
| RQ | 简单Redis队列 | 低 |
| Dramatiq | 可靠消息队列 | 中 |
| FastAPI BackgroundTasks | 进程内快速任务 | 极低 |
Anti-Patterns (FORBIDDEN)
反模式(禁止使用)
python
undefinedpython
undefinedNEVER run long tasks synchronously in request handlers
NEVER run long tasks synchronously in request handlers
@router.post("/api/v1/reports")
async def create_report(data: ReportRequest):
pdf = await generate_pdf(data) # Blocks for minutes!
@router.post("/api/v1/reports")
async def create_report(data: ReportRequest):
pdf = await generate_pdf(data) # Blocks for minutes!
NEVER block on results inside tasks (causes deadlock)
NEVER block on results inside tasks (causes deadlock)
@celery_app.task
def bad_task():
result = other_task.delay()
return result.get() # Blocks worker!
@celery_app.task
def bad_task():
result = other_task.delay()
return result.get() # Blocks worker!
NEVER store large results in Redis
NEVER store large results in Redis
@shared_task
def process_file(file_id: str) -> bytes:
return large_file_bytes # Store in S3/DB instead!
@shared_task
def process_file(file_id: str) -> bytes:
return large_file_bytes # Store in S3/DB instead!
NEVER skip idempotency for retried tasks
NEVER skip idempotency for retried tasks
@celery_app.task(max_retries=3)
def create_order(order):
Order.create(order) # Creates duplicates on retry!
@celery_app.task(max_retries=3)
def create_order(order):
Order.create(order) # Creates duplicates on retry!
NEVER use BackgroundTasks for distributed work
NEVER use BackgroundTasks for distributed work
background_tasks.add_task(long_running_job) # Lost if server restarts
background_tasks.add_task(long_running_job) # Lost if server restarts
NEVER ignore task acknowledgment settings
NEVER ignore task acknowledgment settings
celery_app.conf.task_acks_late = False # Default loses tasks on crash
celery_app.conf.task_acks_late = False # Default loses tasks on crash
ALWAYS use immutable signatures in chords
ALWAYS use immutable signatures in chords
chord([task.s(x) for x in items], callback.si()) # si() prevents arg pollution
undefinedchord([task.s(x) for x in items], callback.si()) # si() prevents arg pollution
undefinedTemporal Workflows
Temporal工作流
Durable execution engine for reliable distributed applications with Temporal.io.
用于可靠分布式应用的持久化执行引擎Temporal.io。
Key Patterns
核心模式
- Workflow definitions with and deterministic code
@workflow.defn - Saga pattern with compensation for multi-step transactions
- Signals and queries for external interaction with running workflows
- Timers with for human-in-the-loop
workflow.wait_condition() - Parallel activities via inside workflows
asyncio.gather
- 使用定义的工作流定义,代码需具备确定性
@workflow.defn - 用于多步骤事务补偿的Saga模式
- 用于与运行中工作流交互的信号和查询
- 用于人机交互场景的计时器()
workflow.wait_condition() - 在工作流中通过实现并行活动
asyncio.gather
Key Decisions
关键决策
| Decision | Recommendation |
|---|---|
| Workflow ID | Business-meaningful, idempotent |
| Determinism | Use |
| I/O | Always via activities, never directly |
| 决策项 | 推荐方案 |
|---|---|
| 工作流ID | 具备业务含义且幂等 |
| 确定性 | 使用 |
| I/O操作 | 始终通过活动执行,绝不直接操作 |
Temporal Activities
Temporal活动
Activity and worker patterns for Temporal.io I/O operations.
用于Temporal.io I/O操作的活动和Worker模式。
Key Patterns
核心模式
- Activity definitions with for all I/O
@activity.defn - Heartbeating for long-running activities (> 60s)
- Error classification with for business errors
ApplicationError(non_retryable=True) - Worker configuration with dedicated task queues
- Testing with
WorkflowEnvironment.start_local()
- 使用定义的活动定义,所有I/O操作均通过活动执行
@activity.defn - 用于长时间运行活动(>60秒)的心跳机制
- 使用进行错误分类,标记业务错误为不可重试
ApplicationError(non_retryable=True) - 为不同活动配置专属任务队列的Worker配置
- 使用进行测试
WorkflowEnvironment.start_local()
Key Decisions
关键决策
| Decision | Recommendation |
|---|---|
| Activity timeout | |
| Error handling | Non-retryable for business errors |
| Testing | WorkflowEnvironment for integration tests |
| 决策项 | 推荐方案 |
|---|---|
| 活动超时 | 大多数场景使用 |
| 错误处理 | 业务错误标记为不可重试 |
| 测试方式 | 使用WorkflowEnvironment进行集成测试 |
Related Skills
相关技能
- - FastAPI, asyncio, SQLAlchemy patterns
python-backend - - LangGraph workflow patterns (use for LLM workflows, not Celery)
langgraph - - Resilience patterns, circuit breakers
distributed-systems - - Metrics and alerting
monitoring-observability
- - FastAPI、asyncio、SQLAlchemy模式
python-backend - - LangGraph工作流模式(用于LLM工作流,而非Celery)
langgraph - - 弹性模式、断路器
distributed-systems - - 指标与告警
monitoring-observability
Capability Details
能力详情
celery-config
celery-config
Keywords: celery, configuration, broker, worker, setup
Solves:
- Production Celery app configuration
- Broker and backend setup
- Worker tuning and time limits
关键词: celery, configuration, broker, worker, setup
解决问题:
- 生产环境Celery应用配置
- 消息代理和后端设置
- Worker调优和时间限制
task-routing
task-routing
Keywords: priority, queue, routing, high priority, worker
Solves:
- Premium user task prioritization
- Multi-queue worker deployment
- Dynamic task routing
关键词: priority, queue, routing, high priority, worker
解决问题:
- 付费用户任务优先级划分
- 多队列Worker部署
- 动态任务路由
canvas-workflows
canvas-workflows
Keywords: chain, group, chord, signature, canvas, workflow, pipeline
Solves:
- Complex multi-step task pipelines
- Parallel task execution with aggregation
- Sequential task dependencies
关键词: chain, group, chord, signature, canvas, workflow, pipeline
解决问题:
- 复杂多步骤任务管道
- 带聚合的并行任务执行
- 顺序任务依赖管理
retry-strategies
retry-strategies
Keywords: retry, backoff, idempotency, dead letter, resilience
Solves:
- Exponential backoff with jitter
- Duplicate prevention for retried tasks
- Failed task handling with DLQ
关键词: retry, backoff, idempotency, dead letter, resilience
解决问题:
- 带抖动的指数退避
- 重试任务的重复执行预防
- 死信队列处理失败任务
scheduled-tasks
scheduled-tasks
Keywords: periodic, scheduled, cron, celery beat, interval
Solves:
- Run tasks on schedule (crontab)
- Dynamic schedule management
- Overlap prevention for long tasks
关键词: periodic, scheduled, cron, celery beat, interval
解决问题:
- 按计划执行任务(crontab)
- 动态调度管理
- 长时间任务的重叠预防
monitoring-health
monitoring-health
Keywords: flower, monitoring, health check, metrics, alerting
Solves:
- Production task monitoring dashboard
- Worker health checks
- Queue depth autoscaling
关键词: flower, monitoring, health check, metrics, alerting
解决问题:
- 生产环境任务监控仪表盘
- Worker健康检查
- 基于队列深度的自动扩缩容
result-backends
result-backends
Keywords: result, state, progress, AsyncResult, status
Solves:
- Task progress tracking with custom states
- Result storage strategies
- Job status API endpoints
关键词: result, state, progress, AsyncResult, status
解决问题:
- 自定义状态实现任务进度跟踪
- 结果存储策略
- 任务状态API端点
arq-patterns
arq-patterns
Keywords: arq, async queue, redis queue, fastapi background
Solves:
- Lightweight async background tasks for FastAPI
- Simple Redis job queue with async/await
- Job status tracking
关键词: arq, async queue, redis queue, fastapi background
解决问题:
- FastAPI的轻量级异步后台任务
- 基于async/await的简单Redis任务队列
- 任务状态跟踪