background-job-orchestrator
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseBackground Job Orchestrator
后台任务编排器
Expert in designing and implementing production-grade background job systems that handle long-running tasks without blocking API responses.
专注于设计并实现生产级后台任务系统,处理长时间运行的任务且不会阻塞API响应。
When to Use
适用场景
✅ Use for:
- Long-running tasks (email sends, report generation, image processing)
- Batch operations (bulk imports, exports, data migrations)
- Scheduled tasks (daily digests, cleanup jobs, recurring reports)
- Tasks requiring retry logic (external API calls, flaky operations)
- Priority-based processing (premium users first, critical alerts)
- Rate-limited operations (API quotas, third-party service limits)
❌ NOT for:
- Real-time bidirectional communication (use WebSockets)
- Sub-second latency requirements (use in-memory caching)
- Simple delays (setTimeout is fine for <5 seconds)
- Synchronous API responses (keep logic in request handler)
✅ 适用场景:
- 长时间运行的任务(邮件发送、报表生成、图片处理)
- 批量操作(批量导入、导出、数据迁移)
- 定时任务(每日摘要、清理任务、周期性报表)
- 需要重试逻辑的任务(外部API调用、不稳定操作)
- 基于优先级的处理(优先处理高级用户、关键告警)
- 速率受限的操作(API配额、第三方服务限制)
❌ 不适用场景:
- 实时双向通信(请使用WebSockets)
- 亚秒级延迟要求(请使用内存缓存)
- 简单延迟(延迟<5秒时使用setTimeout即可)
- 同步API响应(逻辑保留在请求处理程序中)
Quick Decision Tree
快速决策树
Does this task:
├── Take >5 seconds? → Background job
├── Need to retry on failure? → Background job
├── Run on a schedule? → Background job (cron pattern)
├── Block user interaction? → Background job
├── Process in batches? → Background job
└── Return immediately? → Keep synchronousDoes this task:
├── Take >5 seconds? → Background job
├── Need to retry on failure? → Background job
├── Run on a schedule? → Background job (cron pattern)
├── Block user interaction? → Background job
├── Process in batches? → Background job
└── Return immediately? → Keep synchronousTechnology Selection
技术选型
Node.js: BullMQ (Recommended 2024+)
Node.js:BullMQ(2024+推荐)
When to use:
- TypeScript project
- Redis already in stack
- Need advanced features (rate limiting, priorities, repeatable jobs)
Why BullMQ over Bull:
- Bull (v3) → BullMQ (v4+): Complete rewrite in TypeScript
- Better Redis connection handling
- Improved concurrency and performance
- Active maintenance (Bull is in maintenance mode)
适用场景:
- TypeScript项目
- 技术栈中已使用Redis
- 需要高级功能(速率限制、优先级、可重复任务)
为什么选择BullMQ而非Bull:
- Bull(v3)→ BullMQ(v4+):基于TypeScript完全重写
- 更优的Redis连接处理
- 改进的并发性能
- 持续维护中(Bull已进入维护模式)
Python: Celery
Python:Celery
When to use:
- Python/Django project
- Need distributed task execution
- Complex workflows (chains, groups, chords)
Alternatives:
- RQ (Redis Queue): Simpler, fewer features
- Dramatiq: Modern, less ecosystem
- Huey: Lightweight, good for small projects
适用场景:
- Python/Django项目
- 需要分布式任务执行
- 复杂工作流(链式、分组、和弦任务)
替代方案:
- RQ(Redis Queue):更简单,功能较少
- Dramatiq:现代化,生态系统较小
- Huey:轻量级,适合小型项目
Cloud-Native: AWS SQS, Google Cloud Tasks
云原生:AWS SQS、Google Cloud Tasks
When to use:
- Serverless architecture
- Don't want to manage Redis/RabbitMQ
- Need guaranteed delivery and dead-letter queues
适用场景:
- 无服务器架构
- 不想自行管理Redis/RabbitMQ
- 需要消息可靠投递和死信队列
Common Anti-Patterns
常见反模式
Anti-Pattern 1: No Dead Letter Queue
反模式1:未配置死信队列
Novice thinking: "Retry 3 times, then fail silently"
Problem: Failed jobs disappear with no visibility or recovery path.
Correct approach:
typescript
// BullMQ with dead letter queue
const queue = new Queue('email-queue', {
connection: redis,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
},
removeOnComplete: 100, // Keep last 100 successful
removeOnFail: false // Keep all failed for inspection
}
});
// Monitor failed jobs
const failedJobs = await queue.getFailed();Timeline:
- Pre-2020: Retry and forget
- 2020+: Dead letter queues standard
- 2024+: Observability for job failures required
新手误区:“重试3次后静默失败”
问题:失败的任务消失,没有可见性或恢复路径。
正确做法:
typescript
// BullMQ with dead letter queue
const queue = new Queue('email-queue', {
connection: redis,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
},
removeOnComplete: 100, // Keep last 100 successful
removeOnFail: false // Keep all failed for inspection
}
});
// Monitor failed jobs
const failedJobs = await queue.getFailed();时间线:
- 2020年前:重试后忽略
- 2020年起:死信队列成为标准
- 2024年起:任务失败可观测性为必备要求
Anti-Pattern 2: Synchronous Job Processing
反模式2:同步任务处理
Symptom: API endpoint waits for job completion
Problem:
typescript
// ❌ WRONG - Blocks API response
app.post('/send-email', async (req, res) => {
await sendEmail(req.body.to, req.body.subject);
res.json({ success: true });
});Why wrong: Timeout, poor UX, wastes server resources
Correct approach:
typescript
// ✅ RIGHT - Queue and return immediately
app.post('/send-email', async (req, res) => {
const job = await emailQueue.add('send', {
to: req.body.to,
subject: req.body.subject
});
res.json({
success: true,
jobId: job.id,
status: 'queued'
});
});
// Separate worker processes the job
worker.process('send', async (job) => {
await sendEmail(job.data.to, job.data.subject);
});症状:API端点等待任务完成
问题示例:
typescript
// ❌ WRONG - Blocks API response
app.post('/send-email', async (req, res) => {
await sendEmail(req.body.to, req.body.subject);
res.json({ success: true });
});错误原因:超时、用户体验差、浪费服务器资源
正确做法:
typescript
// ✅ RIGHT - Queue and return immediately
app.post('/send-email', async (req, res) => {
const job = await emailQueue.add('send', {
to: req.body.to,
subject: req.body.subject
});
res.json({
success: true,
jobId: job.id,
status: 'queued'
});
});
// Separate worker processes the job
worker.process('send', async (job) => {
await sendEmail(job.data.to, job.data.subject);
});Anti-Pattern 3: No Idempotency
反模式3:不保证幂等性
Problem: Job runs twice → duplicate charges, double emails
Why it happens:
- Redis connection drops mid-processing
- Worker crashes before job completion
- Job timeout triggers retry while still running
Correct approach:
typescript
// ✅ Idempotent job with deduplication key
await queue.add('charge-payment', {
userId: 123,
amount: 50.00
}, {
jobId: `payment-${orderId}`, // Prevents duplicates
attempts: 3
});
// In worker: Check if already processed
worker.process('charge-payment', async (job) => {
const { userId, amount } = job.data;
// Check idempotency
const existing = await db.payments.findOne({
jobId: job.id
});
if (existing) {
return existing; // Already processed
}
// Process payment
const result = await stripe.charges.create({...});
// Store idempotency record
await db.payments.create({
jobId: job.id,
result
});
return result;
});问题:任务运行两次→重复收费、重复发送邮件
发生原因:
- Redis连接在处理中途断开
- Worker在任务完成前崩溃
- 任务超时触发重试,但原任务仍在运行
正确做法:
typescript
// ✅ Idempotent job with deduplication key
await queue.add('charge-payment', {
userId: 123,
amount: 50.00
}, {
jobId: `payment-${orderId}`, // Prevents duplicates
attempts: 3
});
// In worker: Check if already processed
worker.process('charge-payment', async (job) => {
const { userId, amount } = job.data;
// Check idempotency
const existing = await db.payments.findOne({
jobId: job.id
});
if (existing) {
return existing; // Already processed
}
// Process payment
const result = await stripe.charges.create({...});
// Store idempotency record
await db.payments.create({
jobId: job.id,
result
});
return result;
});Anti-Pattern 4: No Rate Limiting
反模式4:未配置速率限制
Problem: Overwhelm third-party APIs or exhaust quotas
Symptom: "Rate limit exceeded" errors from Sendgrid, Stripe, etc.
Correct approach:
typescript
// BullMQ rate limiting
const queue = new Queue('api-calls', {
limiter: {
max: 100, // Max 100 jobs
duration: 60000 // Per 60 seconds
}
});
// Or: Priority-based rate limits
await queue.add('send-email', data, {
priority: user.isPremium ? 1 : 10,
rateLimiter: {
max: user.isPremium ? 1000 : 100,
duration: 3600000 // Per hour
}
});问题:压垮第三方API或耗尽配额
症状:收到Sendgrid、Stripe等平台的“速率限制超出”错误
正确做法:
typescript
// BullMQ rate limiting
const queue = new Queue('api-calls', {
limiter: {
max: 100, // Max 100 jobs
duration: 60000 // Per 60 seconds
}
});
// Or: Priority-based rate limits
await queue.add('send-email', data, {
priority: user.isPremium ? 1 : 10,
rateLimiter: {
max: user.isPremium ? 1000 : 100,
duration: 3600000 // Per hour
}
});Anti-Pattern 5: Forgetting Worker Scaling
反模式5:忽略Worker扩容
Problem: Single worker can't keep up with queue depth
Symptom: Queue backs up, jobs delayed hours/days
Correct approach:
typescript
// Horizontal scaling with multiple workers
const worker = new Worker('email-queue', async (job) => {
await processEmail(job.data);
}, {
connection: redis,
concurrency: 5 // Process 5 jobs concurrently per worker
});
// Run multiple worker processes (PM2, Kubernetes, etc.)
// Each worker processes concurrency * num_workers jobsMonitoring:
typescript
// Set up alerts for queue depth
setInterval(async () => {
const waiting = await queue.getWaitingCount();
if (waiting > 1000) {
alert('Queue depth exceeds 1000, scale workers!');
}
}, 60000);问题:单个Worker无法处理队列积压
症状:队列任务堆积,任务延迟数小时/数天
正确做法:
typescript
// Horizontal scaling with multiple workers
const worker = new Worker('email-queue', async (job) => {
await processEmail(job.data);
}, {
connection: redis,
concurrency: 5 // Process 5 jobs concurrently per worker
});
// Run multiple worker processes (PM2, Kubernetes, etc.)
// Each worker processes concurrency * num_workers jobs监控:
typescript
// Set up alerts for queue depth
setInterval(async () => {
const waiting = await queue.getWaitingCount();
if (waiting > 1000) {
alert('Queue depth exceeds 1000, scale workers!');
}
}, 60000);Implementation Patterns
实现模式
Pattern 1: Email Campaigns
模式1:邮件营销活动
typescript
// Queue setup
const emailQueue = new Queue('email-campaign', { connection: redis });
// Enqueue batch
async function sendCampaign(userIds: number[], template: string) {
const jobs = userIds.map(userId => ({
name: 'send',
data: { userId, template },
opts: {
attempts: 3,
backoff: { type: 'exponential', delay: 5000 }
}
}));
await emailQueue.addBulk(jobs);
}
// Worker with retry logic
const worker = new Worker('email-campaign', async (job) => {
const { userId, template } = job.data;
const user = await db.users.findById(userId);
const email = renderTemplate(template, user);
try {
await sendgrid.send({
to: user.email,
subject: email.subject,
html: email.body
});
} catch (error) {
if (error.code === 'ECONNREFUSED') {
throw error; // Retry
}
// Invalid email, don't retry
console.error(`Invalid email for user ${userId}`);
}
}, {
connection: redis,
concurrency: 10
});typescript
// Queue setup
const emailQueue = new Queue('email-campaign', { connection: redis });
// Enqueue batch
async function sendCampaign(userIds: number[], template: string) {
const jobs = userIds.map(userId => ({
name: 'send',
data: { userId, template },
opts: {
attempts: 3,
backoff: { type: 'exponential', delay: 5000 }
}
}));
await emailQueue.addBulk(jobs);
}
// Worker with retry logic
const worker = new Worker('email-campaign', async (job) => {
const { userId, template } = job.data;
const user = await db.users.findById(userId);
const email = renderTemplate(template, user);
try {
await sendgrid.send({
to: user.email,
subject: email.subject,
html: email.body
});
} catch (error) {
if (error.code === 'ECONNREFUSED') {
throw error; // Retry
}
// Invalid email, don't retry
console.error(`Invalid email for user ${userId}`);
}
}, {
connection: redis,
concurrency: 10
});Pattern 2: Scheduled Reports
模式2:定时报表
typescript
// Daily report at 9 AM
await queue.add('daily-report', {
type: 'sales',
recipients: ['admin@company.com']
}, {
repeat: {
pattern: '0 9 * * *', // Cron syntax
tz: 'America/New_York'
}
});
// Worker generates and emails report
worker.process('daily-report', async (job) => {
const { type, recipients } = job.data;
const data = await generateReport(type);
const pdf = await createPDF(data);
await emailQueue.add('send', {
to: recipients,
subject: `Daily ${type} Report`,
attachments: [{ filename: 'report.pdf', content: pdf }]
});
});typescript
// Daily report at 9 AM
await queue.add('daily-report', {
type: 'sales',
recipients: ['admin@company.com']
}, {
repeat: {
pattern: '0 9 * * *', // Cron syntax
tz: 'America/New_York'
}
});
// Worker generates and emails report
worker.process('daily-report', async (job) => {
const { type, recipients } = job.data;
const data = await generateReport(type);
const pdf = await createPDF(data);
await emailQueue.add('send', {
to: recipients,
subject: `Daily ${type} Report`,
attachments: [{ filename: 'report.pdf', content: pdf }]
});
});Pattern 3: Video Transcoding Pipeline
模式3:视频转码流水线
typescript
// Multi-stage job with progress tracking
await videoQueue.add('transcode', {
videoId: 123,
formats: ['720p', '1080p', '4k']
}, {
attempts: 2,
timeout: 3600000 // 1 hour timeout
});
worker.process('transcode', async (job) => {
const { videoId, formats } = job.data;
for (let i = 0; i < formats.length; i++) {
const format = formats[i];
// Update progress
await job.updateProgress((i / formats.length) * 100);
// Transcode
await ffmpeg.transcode(videoId, format);
}
await job.updateProgress(100);
});
// Client polls for progress
app.get('/videos/:id/status', async (req, res) => {
const job = await queue.getJob(req.params.jobId);
res.json({
state: await job.getState(),
progress: job.progress
});
});typescript
// Multi-stage job with progress tracking
await videoQueue.add('transcode', {
videoId: 123,
formats: ['720p', '1080p', '4k']
}, {
attempts: 2,
timeout: 3600000 // 1 hour timeout
});
worker.process('transcode', async (job) => {
const { videoId, formats } = job.data;
for (let i = 0; i < formats.length; i++) {
const format = formats[i];
// Update progress
await job.updateProgress((i / formats.length) * 100);
// Transcode
await ffmpeg.transcode(videoId, format);
}
await job.updateProgress(100);
});
// Client polls for progress
app.get('/videos/:id/status', async (req, res) => {
const job = await queue.getJob(req.params.jobId);
res.json({
state: await job.getState(),
progress: job.progress
});
});Monitoring & Observability
监控与可观测性
Essential Metrics
核心指标
typescript
// Queue health dashboard
async function getQueueMetrics() {
const [waiting, active, completed, failed, delayed] = await Promise.all([
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getCompletedCount(),
queue.getFailedCount(),
queue.getDelayedCount()
]);
return {
waiting, // Jobs waiting to be processed
active, // Jobs currently processing
completed, // Successfully completed
failed, // Failed after retries
delayed, // Scheduled for future
health: waiting < 1000 && failed < 100 ? 'healthy' : 'degraded'
};
}typescript
// Queue health dashboard
async function getQueueMetrics() {
const [waiting, active, completed, failed, delayed] = await Promise.all([
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getCompletedCount(),
queue.getFailedCount(),
queue.getDelayedCount()
]);
return {
waiting, // Jobs waiting to be processed
active, // Jobs currently processing
completed, // Successfully completed
failed, // Failed after retries
delayed, // Scheduled for future
health: waiting < 1000 && failed < 100 ? 'healthy' : 'degraded'
};
}BullMQ Board (UI)
BullMQ Board(可视化界面)
typescript
// Development: Monitor jobs visually
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
const serverAdapter = new ExpressAdapter();
createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(videoQueue)
],
serverAdapter
});
app.use('/admin/queues', serverAdapter.getRouter());
// Visit http://localhost:3000/admin/queuestypescript
// Development: Monitor jobs visually
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
const serverAdapter = new ExpressAdapter();
createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(videoQueue)
],
serverAdapter
});
app.use('/admin/queues', serverAdapter.getRouter());
// Visit http://localhost:3000/admin/queuesProduction Checklist
生产环境检查清单
□ Dead letter queue configured
□ Retry strategy with exponential backoff
□ Job timeout limits set
□ Rate limiting for third-party APIs
□ Idempotency keys for critical operations
□ Worker concurrency tuned (CPU cores * 2)
□ Horizontal scaling configured (multiple workers)
□ Queue depth monitoring with alerts
□ Failed job inspection workflow
□ Job data doesn't contain PII in logs
□ Redis persistence enabled (AOF or RDB)
□ Graceful shutdown handling (SIGTERM)□ Dead letter queue configured
□ Retry strategy with exponential backoff
□ Job timeout limits set
□ Rate limiting for third-party APIs
□ Idempotency keys for critical operations
□ Worker concurrency tuned (CPU cores * 2)
□ Horizontal scaling configured (multiple workers)
□ Queue depth monitoring with alerts
□ Failed job inspection workflow
□ Job data doesn't contain PII in logs
□ Redis persistence enabled (AOF or RDB)
□ Graceful shutdown handling (SIGTERM)When to Use vs Avoid
适用与不适用场景对比
| Scenario | Use Background Jobs? |
|---|---|
| Send welcome email on signup | ✅ Yes - can take 2-5 seconds |
| Charge credit card | ⚠️ Maybe - depends on payment provider latency |
| Generate PDF report (30 seconds) | ✅ Yes - definitely background |
| Fetch user profile from DB | ❌ No - milliseconds, keep synchronous |
| Process video upload (5 minutes) | ✅ Yes - always background |
| Validate form input | ❌ No - synchronous validation |
| Daily cron job | ✅ Yes - use repeatable jobs |
| Real-time chat message | ❌ No - use WebSockets |
| 场景 | 是否使用后台任务? |
|---|---|
| 注册时发送欢迎邮件 | ✅ 是 - 可能耗时2-5秒 |
| 信用卡扣费 | ⚠️ 视情况而定 - 取决于支付提供商延迟 |
| 生成PDF报表(30秒) | ✅ 是 - 必须使用后台任务 |
| 从数据库获取用户资料 | ❌ 否 - 仅需毫秒级,保持同步 |
| 处理视频上传(5分钟) | ✅ 是 - 务必使用后台任务 |
| 表单输入验证 | ❌ 否 - 同步验证即可 |
| 每日定时任务 | ✅ 是 - 使用可重复任务 |
| 实时聊天消息 | ❌ 否 - 使用WebSockets |
Technology Comparison
技术对比
| Feature | BullMQ | Celery | AWS SQS |
|---|---|---|---|
| Language | Node.js | Python | Any (HTTP API) |
| Backend | Redis | Redis/RabbitMQ/SQS | Managed |
| Priorities | ✅ | ✅ | ✅ |
| Rate Limiting | ✅ | ❌ | ✅ (via attributes) |
| Repeat/Cron | ✅ | ✅ (celery-beat) | ❌ (use EventBridge) |
| UI Dashboard | Bull Board | Flower | CloudWatch |
| Workflows | ❌ | ✅ (chains, groups) | ❌ |
| Learning Curve | Medium | Medium | Low |
| Cost | Redis hosting | Redis hosting | $0.40/million requests |
| 特性 | BullMQ | Celery | AWS SQS |
|---|---|---|---|
| 支持语言 | Node.js | Python | 任意(HTTP API) |
| 后端存储 | Redis | Redis/RabbitMQ/SQS | 托管服务 |
| 优先级队列 | ✅ | ✅ | ✅ |
| 速率限制 | ✅ | ❌ | ✅(通过属性配置) |
| 重复/定时任务 | ✅ | ✅(celery-beat) | ❌(需搭配EventBridge) |
| UI控制台 | Bull Board | Flower | CloudWatch |
| 工作流 | ❌ | ✅(链式、分组任务) | ❌ |
| 学习曲线 | 中等 | 中等 | 低 |
| 成本 | Redis托管费用 | Redis托管费用 | 每百万请求0.40美元 |
References
参考资料
- - Advanced BullMQ patterns and examples
/references/bullmq-patterns.md - - Celery chains, groups, and chords
/references/celery-workflows.md - - Monitoring, alerting, and debugging
/references/job-observability.md
- - 高级BullMQ模式与示例
/references/bullmq-patterns.md - - Celery链式、分组及和弦任务
/references/celery-workflows.md - - 监控、告警与调试
/references/job-observability.md
Scripts
脚本
- - Initialize BullMQ with Redis
scripts/setup_bullmq.sh - - Queue metrics dashboard
scripts/queue_health_check.ts - - Bulk retry failed jobs
scripts/retry_failed_jobs.ts
This skill guides: Background job implementation | Queue architecture | Retry strategies | Worker scaling | Job observability
- - 初始化BullMQ与Redis
scripts/setup_bullmq.sh - - 队列指标仪表盘
scripts/queue_health_check.ts - - 批量重试失败任务
scripts/retry_failed_jobs.ts
本技能涵盖:后台任务实现 | 队列架构 | 重试策略 | Worker扩容 | 任务可观测性