python-background-jobs
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePython Background Jobs & Task Queues
Python后台作业与任务队列
Decouple long-running or unreliable work from request/response cycles. Return immediately to the user while background workers handle the heavy lifting asynchronously.
将长时间运行或不可靠的工作负载与请求/响应周期解耦。立即向用户返回响应,同时由后台工作进程异步处理繁重任务。
When to Use This Skill
何时使用该技术方案
- Processing tasks that take longer than a few seconds
- Sending emails, notifications, or webhooks
- Generating reports or exporting data
- Processing uploads or media transformations
- Integrating with unreliable external services
- Building event-driven architectures
- 处理耗时超过数秒的任务
- 发送邮件、通知或Webhook
- 生成报表或导出数据
- 处理上传文件或媒体转换
- 与不可靠的外部服务集成
- 构建事件驱动架构
Core Concepts
核心概念
1. Task Queue Pattern
1. 任务队列模式
API accepts request, enqueues a job, returns immediately with a job ID. Workers process jobs asynchronously.
API接收请求,将作业加入队列,立即返回作业ID。工作进程异步处理作业。
2. Idempotency
2. 幂等性
Tasks may be retried on failure. Design for safe re-execution.
任务在失败时可能会被重试。设计时需确保任务可以安全地重复执行。
3. Job State Machine
3. 作业状态机
Jobs transition through states: pending → running → succeeded/failed.
作业会在不同状态间转换:待处理 → 运行中 → 成功/失败。
4. At-Least-Once Delivery
4. 至少一次投递
Most queues guarantee at-least-once delivery. Your code must handle duplicates.
大多数队列保证至少一次投递。你的代码必须能够处理重复任务。
Quick Start
快速入门
This skill uses Celery for examples, a widely adopted task queue. Alternatives like RQ, Dramatiq, and cloud-native solutions (AWS SQS, GCP Tasks) are equally valid choices.
python
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")
@app.task
def send_email(to: str, subject: str, body: str) -> None:
# This runs in a background worker
email_client.send(to, subject, body)本技术方案使用Celery作为示例,它是一款被广泛采用的任务队列。RQ、Dramatiq以及云原生解决方案(AWS SQS、GCP Tasks)也是同样有效的选择。
python
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")
@app.task
def send_email(to: str, subject: str, body: str) -> None:
# This runs in a background worker
email_client.send(to, subject, body)In your API handler
In your API handler
send_email.delay("user@example.com", "Welcome!", "Thanks for signing up")
undefinedsend_email.delay("user@example.com", "Welcome!", "Thanks for signing up")
undefinedFundamental Patterns
基础模式
Pattern 1: Return Job ID Immediately
模式1:立即返回作业ID
For operations exceeding a few seconds, return a job ID and process asynchronously.
python
from uuid import uuid4
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
class JobStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
@dataclass
class Job:
id: str
status: JobStatus
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None
result: dict | None = None
error: str | None = None对于耗时超过数秒的操作,返回作业ID并异步处理。
python
from uuid import uuid4
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
class JobStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
@dataclass
class Job:
id: str
status: JobStatus
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None
result: dict | None = None
error: str | None = NoneAPI endpoint
API endpoint
async def start_export(request: ExportRequest) -> JobResponse:
"""Start export job and return job ID."""
job_id = str(uuid4())
# Persist job record
await jobs_repo.create(Job(
id=job_id,
status=JobStatus.PENDING,
created_at=datetime.utcnow(),
))
# Enqueue task for background processing
await task_queue.enqueue(
"export_data",
job_id=job_id,
params=request.model_dump(),
)
# Return immediately with job ID
return JobResponse(
job_id=job_id,
status="pending",
poll_url=f"/jobs/{job_id}",
)undefinedasync def start_export(request: ExportRequest) -> JobResponse:
"""Start export job and return job ID."""
job_id = str(uuid4())
# Persist job record
await jobs_repo.create(Job(
id=job_id,
status=JobStatus.PENDING,
created_at=datetime.utcnow(),
))
# Enqueue task for background processing
await task_queue.enqueue(
"export_data",
job_id=job_id,
params=request.model_dump(),
)
# Return immediately with job ID
return JobResponse(
job_id=job_id,
status="pending",
poll_url=f"/jobs/{job_id}",
)undefinedPattern 2: Celery Task Configuration
模式2:Celery任务配置
Configure Celery tasks with proper retry and timeout settings.
python
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")为Celery任务配置合适的重试和超时设置。
python
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")Global configuration
Global configuration
app.conf.update(
task_time_limit=3600, # Hard limit: 1 hour
task_soft_time_limit=3000, # Soft limit: 50 minutes
task_acks_late=True, # Acknowledge after completion
task_reject_on_worker_lost=True,
worker_prefetch_multiplier=1, # Don't prefetch too many tasks
)
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
)
def process_payment(self, payment_id: str) -> dict:
"""Process payment with automatic retry on transient errors."""
try:
result = payment_gateway.charge(payment_id)
return {"status": "success", "transaction_id": result.id}
except PaymentDeclinedError as e:
# Don't retry permanent failures
return {"status": "declined", "reason": str(e)}
except TransientError as e:
# Retry with exponential backoff
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)
undefinedapp.conf.update(
task_time_limit=3600, # Hard limit: 1 hour
task_soft_time_limit=3000, # Soft limit: 50 minutes
task_acks_late=True, # Acknowledge after completion
task_reject_on_worker_lost=True,
worker_prefetch_multiplier=1, # Don't prefetch too many tasks
)
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
)
def process_payment(self, payment_id: str) -> dict:
"""Process payment with automatic retry on transient errors."""
try:
result = payment_gateway.charge(payment_id)
return {"status": "success", "transaction_id": result.id}
except PaymentDeclinedError as e:
# Don't retry permanent failures
return {"status": "declined", "reason": str(e)}
except TransientError as e:
# Retry with exponential backoff
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)
undefinedPattern 3: Make Tasks Idempotent
模式3:实现任务幂等性
Workers may retry on crash or timeout. Design for safe re-execution.
python
@app.task(bind=True)
def process_order(self, order_id: str) -> None:
"""Process order idempotently."""
order = orders_repo.get(order_id)
# Already processed? Return early
if order.status == OrderStatus.COMPLETED:
logger.info("Order already processed", order_id=order_id)
return
# Already in progress? Check if we should continue
if order.status == OrderStatus.PROCESSING:
# Use idempotency key to avoid double-charging
pass
# Process with idempotency key
result = payment_provider.charge(
amount=order.total,
idempotency_key=f"order-{order_id}", # Critical!
)
orders_repo.update(order_id, status=OrderStatus.COMPLETED)Idempotency Strategies:
- Check-before-write: Verify state before action
- Idempotency keys: Use unique tokens with external services
- Upsert patterns:
INSERT ... ON CONFLICT UPDATE - Deduplication window: Track processed IDs for N hours
工作进程可能会在崩溃或超时后重试任务。设计时需确保任务可以安全地重复执行。
python
@app.task(bind=True)
def process_order(self, order_id: str) -> None:
"""Process order idempotently."""
order = orders_repo.get(order_id)
# Already processed? Return early
if order.status == OrderStatus.COMPLETED:
logger.info("Order already processed", order_id=order_id)
return
# Already in progress? Check if we should continue
if order.status == OrderStatus.PROCESSING:
# Use idempotency key to avoid double-charging
pass
# Process with idempotency key
result = payment_provider.charge(
amount=order.total,
idempotency_key=f"order-{order_id}", # Critical!
)
orders_repo.update(order_id, status=OrderStatus.COMPLETED)幂等性策略:
- 先检查后写入:执行操作前验证状态
- 幂等键:与外部服务交互时使用唯一令牌
- Upsert模式:
INSERT ... ON CONFLICT UPDATE - 去重窗口:在N小时内跟踪已处理的ID
Pattern 4: Job State Management
模式4:作业状态管理
Persist job state transitions for visibility and debugging.
python
class JobRepository:
"""Repository for managing job state."""
async def create(self, job: Job) -> Job:
"""Create new job record."""
await self._db.execute(
"""INSERT INTO jobs (id, status, created_at)
VALUES ($1, $2, $3)""",
job.id, job.status.value, job.created_at,
)
return job
async def update_status(
self,
job_id: str,
status: JobStatus,
**fields,
) -> None:
"""Update job status with timestamp."""
updates = {"status": status.value, **fields}
if status == JobStatus.RUNNING:
updates["started_at"] = datetime.utcnow()
elif status in (JobStatus.SUCCEEDED, JobStatus.FAILED):
updates["completed_at"] = datetime.utcnow()
await self._db.execute(
"UPDATE jobs SET status = $1, ... WHERE id = $2",
updates, job_id,
)
logger.info(
"Job status updated",
job_id=job_id,
status=status.value,
)持久化作业状态转换,以便查看和调试。
python
class JobRepository:
"""Repository for managing job state."""
async def create(self, job: Job) -> Job:
"""Create new job record."""
await self._db.execute(
"""INSERT INTO jobs (id, status, created_at)
VALUES ($1, $2, $3)""",
job.id, job.status.value, job.created_at,
)
return job
async def update_status(
self,
job_id: str,
status: JobStatus,
**fields,
) -> None:
"""Update job status with timestamp."""
updates = {"status": status.value, **fields}
if status == JobStatus.RUNNING:
updates["started_at"] = datetime.utcnow()
elif status in (JobStatus.SUCCEEDED, JobStatus.FAILED):
updates["completed_at"] = datetime.utcnow()
await self._db.execute(
"UPDATE jobs SET status = $1, ... WHERE id = $2",
updates, job_id,
)
logger.info(
"Job status updated",
job_id=job_id,
status=status.value,
)Advanced Patterns
进阶模式
Pattern 5: Dead Letter Queue
模式5:死信队列
Handle permanently failed tasks for manual inspection.
python
@app.task(bind=True, max_retries=3)
def process_webhook(self, webhook_id: str, payload: dict) -> None:
"""Process webhook with DLQ for failures."""
try:
result = send_webhook(payload)
if not result.success:
raise WebhookFailedError(result.error)
except Exception as e:
if self.request.retries >= self.max_retries:
# Move to dead letter queue for manual inspection
dead_letter_queue.send({
"task": "process_webhook",
"webhook_id": webhook_id,
"payload": payload,
"error": str(e),
"attempts": self.request.retries + 1,
"failed_at": datetime.utcnow().isoformat(),
})
logger.error(
"Webhook moved to DLQ after max retries",
webhook_id=webhook_id,
error=str(e),
)
return
# Exponential backoff retry
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)处理永久失败的任务,以便人工检查。
python
@app.task(bind=True, max_retries=3)
def process_webhook(self, webhook_id: str, payload: dict) -> None:
"""Process webhook with DLQ for failures."""
try:
result = send_webhook(payload)
if not result.success:
raise WebhookFailedError(result.error)
except Exception as e:
if self.request.retries >= self.max_retries:
# Move to dead letter queue for manual inspection
dead_letter_queue.send({
"task": "process_webhook",
"webhook_id": webhook_id,
"payload": payload,
"error": str(e),
"attempts": self.request.retries + 1,
"failed_at": datetime.utcnow().isoformat(),
})
logger.error(
"Webhook moved to DLQ after max retries",
webhook_id=webhook_id,
error=str(e),
)
return
# Exponential backoff retry
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)Pattern 6: Status Polling Endpoint
模式6:状态轮询端点
Provide an endpoint for clients to check job status.
python
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str) -> JobStatusResponse:
"""Get current status of a background job."""
job = await jobs_repo.get(job_id)
if job is None:
raise HTTPException(404, f"Job {job_id} not found")
return JobStatusResponse(
job_id=job.id,
status=job.status.value,
created_at=job.created_at,
started_at=job.started_at,
completed_at=job.completed_at,
result=job.result if job.status == JobStatus.SUCCEEDED else None,
error=job.error if job.status == JobStatus.FAILED else None,
# Helpful for clients
is_terminal=job.status in (JobStatus.SUCCEEDED, JobStatus.FAILED),
)提供端点供客户端检查作业状态。
python
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str) -> JobStatusResponse:
"""Get current status of a background job."""
job = await jobs_repo.get(job_id)
if job is None:
raise HTTPException(404, f"Job {job_id} not found")
return JobStatusResponse(
job_id=job.id,
status=job.status.value,
created_at=job.created_at,
started_at=job.started_at,
completed_at=job.completed_at,
result=job.result if job.status == JobStatus.SUCCEEDED else None,
error=job.error if job.status == JobStatus.FAILED else None,
# Helpful for clients
is_terminal=job.status in (JobStatus.SUCCEEDED, JobStatus.FAILED),
)Pattern 7: Task Chaining and Workflows
模式7:任务链与工作流
Compose complex workflows from simple tasks.
python
from celery import chain, group, chord通过简单任务组合复杂工作流。
python
from celery import chain, group, chordSimple chain: A → B → C
Simple chain: A → B → C
workflow = chain(
extract_data.s(source_id),
transform_data.s(),
load_data.s(destination_id),
)
workflow = chain(
extract_data.s(source_id),
transform_data.s(),
load_data.s(destination_id),
)
Parallel execution: A, B, C all at once
Parallel execution: A, B, C all at once
parallel = group(
send_email.s(user_email),
send_sms.s(user_phone),
update_analytics.s(event_data),
)
parallel = group(
send_email.s(user_email),
send_sms.s(user_phone),
update_analytics.s(event_data),
)
Chord: Run tasks in parallel, then a callback
Chord: Run tasks in parallel, then a callback
Process all items, then send completion notification
Process all items, then send completion notification
workflow = chord(
[process_item.s(item_id) for item_id in item_ids],
send_completion_notification.s(batch_id),
)
workflow.apply_async()
undefinedworkflow = chord(
[process_item.s(item_id) for item_id in item_ids],
send_completion_notification.s(batch_id),
)
workflow.apply_async()
undefinedPattern 8: Alternative Task Queues
模式8:替代任务队列
Choose the right tool for your needs.
RQ (Redis Queue): Simple, Redis-based
python
from rq import Queue
from redis import Redis
queue = Queue(connection=Redis())
job = queue.enqueue(send_email, "user@example.com", "Subject", "Body")Dramatiq: Modern Celery alternative
python
import dramatiq
from dramatiq.brokers.redis import RedisBroker
dramatiq.set_broker(RedisBroker())
@dramatiq.actor
def send_email(to: str, subject: str, body: str) -> None:
email_client.send(to, subject, body)Cloud-native options:
- AWS SQS + Lambda
- Google Cloud Tasks
- Azure Functions
根据需求选择合适的工具。
RQ (Redis Queue): 轻量级、基于Redis的队列
python
from rq import Queue
from redis import Redis
queue = Queue(connection=Redis())
job = queue.enqueue(send_email, "user@example.com", "Subject", "Body")Dramatiq: 现代化的Celery替代方案
python
import dramatiq
from dramatiq.brokers.redis import RedisBroker
dramatiq.set_broker(RedisBroker())
@dramatiq.actor
def send_email(to: str, subject: str, body: str) -> None:
email_client.send(to, subject, body)云原生选项:
- AWS SQS + Lambda
- Google Cloud Tasks
- Azure Functions
Best Practices Summary
最佳实践总结
- Return immediately - Don't block requests for long operations
- Persist job state - Enable status polling and debugging
- Make tasks idempotent - Safe to retry on any failure
- Use idempotency keys - For external service calls
- Set timeouts - Both soft and hard limits
- Implement DLQ - Capture permanently failed tasks
- Log transitions - Track job state changes
- Retry appropriately - Exponential backoff for transient errors
- Don't retry permanent failures - Validation errors, invalid credentials
- Monitor queue depth - Alert on backlog growth
- 立即返回响应 - 不要让请求等待长时间运行的操作
- 持久化作业状态 - 支持状态轮询和调试
- 实现任务幂等性 - 确保任务可安全重试
- 使用幂等键 - 与外部服务交互时使用
- 设置超时时间 - 同时配置软限制和硬限制
- 实现死信队列 - 捕获永久失败的任务
- 记录状态转换 - 跟踪作业状态变化
- 合理重试 - 对临时错误使用指数退避策略
- 不重试永久失败 - 如验证错误、无效凭证等
- 监控队列长度 - 当出现积压时触发警报