background-job-orchestrator

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Background Job Orchestrator

后台任务编排器

Expert in designing and implementing production-grade background job systems that handle long-running tasks without blocking API responses.
专注于设计并实现生产级后台任务系统,处理长时间运行的任务且不会阻塞API响应。

When to Use

适用场景

Use for:
  • Long-running tasks (email sends, report generation, image processing)
  • Batch operations (bulk imports, exports, data migrations)
  • Scheduled tasks (daily digests, cleanup jobs, recurring reports)
  • Tasks requiring retry logic (external API calls, flaky operations)
  • Priority-based processing (premium users first, critical alerts)
  • Rate-limited operations (API quotas, third-party service limits)
NOT for:
  • Real-time bidirectional communication (use WebSockets)
  • Sub-second latency requirements (use in-memory caching)
  • Simple delays (setTimeout is fine for <5 seconds)
  • Synchronous API responses (keep logic in request handler)
适用场景
  • 长时间运行的任务(邮件发送、报表生成、图片处理)
  • 批量操作(批量导入、导出、数据迁移)
  • 定时任务(每日摘要、清理任务、周期性报表)
  • 需要重试逻辑的任务(外部API调用、不稳定操作)
  • 基于优先级的处理(优先处理高级用户、关键告警)
  • 速率受限的操作(API配额、第三方服务限制)
不适用场景
  • 实时双向通信(请使用WebSockets)
  • 亚秒级延迟要求(请使用内存缓存)
  • 简单延迟(延迟<5秒时使用setTimeout即可)
  • 同步API响应(逻辑保留在请求处理程序中)

Quick Decision Tree

快速决策树

Does this task:
├── Take &gt;5 seconds? → Background job
├── Need to retry on failure? → Background job
├── Run on a schedule? → Background job (cron pattern)
├── Block user interaction? → Background job
├── Process in batches? → Background job
└── Return immediately? → Keep synchronous

Does this task:
├── Take &gt;5 seconds? → Background job
├── Need to retry on failure? → Background job
├── Run on a schedule? → Background job (cron pattern)
├── Block user interaction? → Background job
├── Process in batches? → Background job
└── Return immediately? → Keep synchronous

Technology Selection

技术选型

Node.js: BullMQ (Recommended 2024+)

Node.js:BullMQ(2024+推荐)

When to use:
  • TypeScript project
  • Redis already in stack
  • Need advanced features (rate limiting, priorities, repeatable jobs)
Why BullMQ over Bull:
  • Bull (v3) → BullMQ (v4+): Complete rewrite in TypeScript
  • Better Redis connection handling
  • Improved concurrency and performance
  • Active maintenance (Bull is in maintenance mode)
适用场景
  • TypeScript项目
  • 技术栈中已使用Redis
  • 需要高级功能(速率限制、优先级、可重复任务)
为什么选择BullMQ而非Bull
  • Bull(v3)→ BullMQ(v4+):基于TypeScript完全重写
  • 更优的Redis连接处理
  • 改进的并发性能
  • 持续维护中(Bull已进入维护模式)

Python: Celery

Python:Celery

When to use:
  • Python/Django project
  • Need distributed task execution
  • Complex workflows (chains, groups, chords)
Alternatives:
  • RQ (Redis Queue): Simpler, fewer features
  • Dramatiq: Modern, less ecosystem
  • Huey: Lightweight, good for small projects
适用场景
  • Python/Django项目
  • 需要分布式任务执行
  • 复杂工作流(链式、分组、和弦任务)
替代方案
  • RQ(Redis Queue):更简单,功能较少
  • Dramatiq:现代化,生态系统较小
  • Huey:轻量级,适合小型项目

Cloud-Native: AWS SQS, Google Cloud Tasks

云原生:AWS SQS、Google Cloud Tasks

When to use:
  • Serverless architecture
  • Don't want to manage Redis/RabbitMQ
  • Need guaranteed delivery and dead-letter queues

适用场景
  • 无服务器架构
  • 不想自行管理Redis/RabbitMQ
  • 需要消息可靠投递和死信队列

Common Anti-Patterns

常见反模式

Anti-Pattern 1: No Dead Letter Queue

反模式1:未配置死信队列

Novice thinking: "Retry 3 times, then fail silently"
Problem: Failed jobs disappear with no visibility or recovery path.
Correct approach:
typescript
// BullMQ with dead letter queue
const queue = new Queue('email-queue', {
  connection: redis,
  defaultJobOptions: {
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 2000
    },
    removeOnComplete: 100, // Keep last 100 successful
    removeOnFail: false     // Keep all failed for inspection
  }
});

// Monitor failed jobs
const failedJobs = await queue.getFailed();
Timeline:
  • Pre-2020: Retry and forget
  • 2020+: Dead letter queues standard
  • 2024+: Observability for job failures required

新手误区:“重试3次后静默失败”
问题:失败的任务消失,没有可见性或恢复路径。
正确做法
typescript
// BullMQ with dead letter queue
const queue = new Queue('email-queue', {
  connection: redis,
  defaultJobOptions: {
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 2000
    },
    removeOnComplete: 100, // Keep last 100 successful
    removeOnFail: false     // Keep all failed for inspection
  }
});

// Monitor failed jobs
const failedJobs = await queue.getFailed();
时间线
  • 2020年前:重试后忽略
  • 2020年起:死信队列成为标准
  • 2024年起:任务失败可观测性为必备要求

Anti-Pattern 2: Synchronous Job Processing

反模式2:同步任务处理

Symptom: API endpoint waits for job completion
Problem:
typescript
// ❌ WRONG - Blocks API response
app.post('/send-email', async (req, res) => {
  await sendEmail(req.body.to, req.body.subject);
  res.json({ success: true });
});
Why wrong: Timeout, poor UX, wastes server resources
Correct approach:
typescript
// ✅ RIGHT - Queue and return immediately
app.post('/send-email', async (req, res) => {
  const job = await emailQueue.add('send', {
    to: req.body.to,
    subject: req.body.subject
  });

  res.json({
    success: true,
    jobId: job.id,
    status: 'queued'
  });
});

// Separate worker processes the job
worker.process('send', async (job) => {
  await sendEmail(job.data.to, job.data.subject);
});

症状:API端点等待任务完成
问题示例
typescript
// ❌ WRONG - Blocks API response
app.post('/send-email', async (req, res) => {
  await sendEmail(req.body.to, req.body.subject);
  res.json({ success: true });
});
错误原因:超时、用户体验差、浪费服务器资源
正确做法
typescript
// ✅ RIGHT - Queue and return immediately
app.post('/send-email', async (req, res) => {
  const job = await emailQueue.add('send', {
    to: req.body.to,
    subject: req.body.subject
  });

  res.json({
    success: true,
    jobId: job.id,
    status: 'queued'
  });
});

// Separate worker processes the job
worker.process('send', async (job) => {
  await sendEmail(job.data.to, job.data.subject);
});

Anti-Pattern 3: No Idempotency

反模式3:不保证幂等性

Problem: Job runs twice → duplicate charges, double emails
Why it happens:
  • Redis connection drops mid-processing
  • Worker crashes before job completion
  • Job timeout triggers retry while still running
Correct approach:
typescript
// ✅ Idempotent job with deduplication key
await queue.add('charge-payment', {
  userId: 123,
  amount: 50.00
}, {
  jobId: `payment-${orderId}`, // Prevents duplicates
  attempts: 3
});

// In worker: Check if already processed
worker.process('charge-payment', async (job) => {
  const { userId, amount } = job.data;

  // Check idempotency
  const existing = await db.payments.findOne({
    jobId: job.id
  });
  if (existing) {
    return existing; // Already processed
  }

  // Process payment
  const result = await stripe.charges.create({...});

  // Store idempotency record
  await db.payments.create({
    jobId: job.id,
    result
  });

  return result;
});

问题:任务运行两次→重复收费、重复发送邮件
发生原因
  • Redis连接在处理中途断开
  • Worker在任务完成前崩溃
  • 任务超时触发重试,但原任务仍在运行
正确做法
typescript
// ✅ Idempotent job with deduplication key
await queue.add('charge-payment', {
  userId: 123,
  amount: 50.00
}, {
  jobId: `payment-${orderId}`, // Prevents duplicates
  attempts: 3
});

// In worker: Check if already processed
worker.process('charge-payment', async (job) => {
  const { userId, amount } = job.data;

  // Check idempotency
  const existing = await db.payments.findOne({
    jobId: job.id
  });
  if (existing) {
    return existing; // Already processed
  }

  // Process payment
  const result = await stripe.charges.create({...});

  // Store idempotency record
  await db.payments.create({
    jobId: job.id,
    result
  });

  return result;
});

Anti-Pattern 4: No Rate Limiting

反模式4:未配置速率限制

Problem: Overwhelm third-party APIs or exhaust quotas
Symptom: "Rate limit exceeded" errors from Sendgrid, Stripe, etc.
Correct approach:
typescript
// BullMQ rate limiting
const queue = new Queue('api-calls', {
  limiter: {
    max: 100,        // Max 100 jobs
    duration: 60000  // Per 60 seconds
  }
});

// Or: Priority-based rate limits
await queue.add('send-email', data, {
  priority: user.isPremium ? 1 : 10,
  rateLimiter: {
    max: user.isPremium ? 1000 : 100,
    duration: 3600000 // Per hour
  }
});

问题:压垮第三方API或耗尽配额
症状:收到Sendgrid、Stripe等平台的“速率限制超出”错误
正确做法
typescript
// BullMQ rate limiting
const queue = new Queue('api-calls', {
  limiter: {
    max: 100,        // Max 100 jobs
    duration: 60000  // Per 60 seconds
  }
});

// Or: Priority-based rate limits
await queue.add('send-email', data, {
  priority: user.isPremium ? 1 : 10,
  rateLimiter: {
    max: user.isPremium ? 1000 : 100,
    duration: 3600000 // Per hour
  }
});

Anti-Pattern 5: Forgetting Worker Scaling

反模式5:忽略Worker扩容

Problem: Single worker can't keep up with queue depth
Symptom: Queue backs up, jobs delayed hours/days
Correct approach:
typescript
// Horizontal scaling with multiple workers
const worker = new Worker('email-queue', async (job) => {
  await processEmail(job.data);
}, {
  connection: redis,
  concurrency: 5  // Process 5 jobs concurrently per worker
});

// Run multiple worker processes (PM2, Kubernetes, etc.)
// Each worker processes concurrency * num_workers jobs
Monitoring:
typescript
// Set up alerts for queue depth
setInterval(async () => {
  const waiting = await queue.getWaitingCount();
  if (waiting > 1000) {
    alert('Queue depth exceeds 1000, scale workers!');
  }
}, 60000);

问题:单个Worker无法处理队列积压
症状:队列任务堆积,任务延迟数小时/数天
正确做法
typescript
// Horizontal scaling with multiple workers
const worker = new Worker('email-queue', async (job) => {
  await processEmail(job.data);
}, {
  connection: redis,
  concurrency: 5  // Process 5 jobs concurrently per worker
});

// Run multiple worker processes (PM2, Kubernetes, etc.)
// Each worker processes concurrency * num_workers jobs
监控
typescript
// Set up alerts for queue depth
setInterval(async () => {
  const waiting = await queue.getWaitingCount();
  if (waiting > 1000) {
    alert('Queue depth exceeds 1000, scale workers!');
  }
}, 60000);

Implementation Patterns

实现模式

Pattern 1: Email Campaigns

模式1:邮件营销活动

typescript
// Queue setup
const emailQueue = new Queue('email-campaign', { connection: redis });

// Enqueue batch
async function sendCampaign(userIds: number[], template: string) {
  const jobs = userIds.map(userId => ({
    name: 'send',
    data: { userId, template },
    opts: {
      attempts: 3,
      backoff: { type: 'exponential', delay: 5000 }
    }
  }));

  await emailQueue.addBulk(jobs);
}

// Worker with retry logic
const worker = new Worker('email-campaign', async (job) => {
  const { userId, template } = job.data;

  const user = await db.users.findById(userId);
  const email = renderTemplate(template, user);

  try {
    await sendgrid.send({
      to: user.email,
      subject: email.subject,
      html: email.body
    });
  } catch (error) {
    if (error.code === 'ECONNREFUSED') {
      throw error; // Retry
    }
    // Invalid email, don't retry
    console.error(`Invalid email for user ${userId}`);
  }
}, {
  connection: redis,
  concurrency: 10
});
typescript
// Queue setup
const emailQueue = new Queue('email-campaign', { connection: redis });

// Enqueue batch
async function sendCampaign(userIds: number[], template: string) {
  const jobs = userIds.map(userId => ({
    name: 'send',
    data: { userId, template },
    opts: {
      attempts: 3,
      backoff: { type: 'exponential', delay: 5000 }
    }
  }));

  await emailQueue.addBulk(jobs);
}

// Worker with retry logic
const worker = new Worker('email-campaign', async (job) => {
  const { userId, template } = job.data;

  const user = await db.users.findById(userId);
  const email = renderTemplate(template, user);

  try {
    await sendgrid.send({
      to: user.email,
      subject: email.subject,
      html: email.body
    });
  } catch (error) {
    if (error.code === 'ECONNREFUSED') {
      throw error; // Retry
    }
    // Invalid email, don't retry
    console.error(`Invalid email for user ${userId}`);
  }
}, {
  connection: redis,
  concurrency: 10
});

Pattern 2: Scheduled Reports

模式2:定时报表

typescript
// Daily report at 9 AM
await queue.add('daily-report', {
  type: 'sales',
  recipients: ['admin@company.com']
}, {
  repeat: {
    pattern: '0 9 * * *', // Cron syntax
    tz: 'America/New_York'
  }
});

// Worker generates and emails report
worker.process('daily-report', async (job) => {
  const { type, recipients } = job.data;

  const data = await generateReport(type);
  const pdf = await createPDF(data);

  await emailQueue.add('send', {
    to: recipients,
    subject: `Daily ${type} Report`,
    attachments: [{ filename: 'report.pdf', content: pdf }]
  });
});
typescript
// Daily report at 9 AM
await queue.add('daily-report', {
  type: 'sales',
  recipients: ['admin@company.com']
}, {
  repeat: {
    pattern: '0 9 * * *', // Cron syntax
    tz: 'America/New_York'
  }
});

// Worker generates and emails report
worker.process('daily-report', async (job) => {
  const { type, recipients } = job.data;

  const data = await generateReport(type);
  const pdf = await createPDF(data);

  await emailQueue.add('send', {
    to: recipients,
    subject: `Daily ${type} Report`,
    attachments: [{ filename: 'report.pdf', content: pdf }]
  });
});

Pattern 3: Video Transcoding Pipeline

模式3:视频转码流水线

typescript
// Multi-stage job with progress tracking
await videoQueue.add('transcode', {
  videoId: 123,
  formats: ['720p', '1080p', '4k']
}, {
  attempts: 2,
  timeout: 3600000 // 1 hour timeout
});

worker.process('transcode', async (job) => {
  const { videoId, formats } = job.data;

  for (let i = 0; i < formats.length; i++) {
    const format = formats[i];

    // Update progress
    await job.updateProgress((i / formats.length) * 100);

    // Transcode
    await ffmpeg.transcode(videoId, format);
  }

  await job.updateProgress(100);
});

// Client polls for progress
app.get('/videos/:id/status', async (req, res) => {
  const job = await queue.getJob(req.params.jobId);
  res.json({
    state: await job.getState(),
    progress: job.progress
  });
});

typescript
// Multi-stage job with progress tracking
await videoQueue.add('transcode', {
  videoId: 123,
  formats: ['720p', '1080p', '4k']
}, {
  attempts: 2,
  timeout: 3600000 // 1 hour timeout
});

worker.process('transcode', async (job) => {
  const { videoId, formats } = job.data;

  for (let i = 0; i < formats.length; i++) {
    const format = formats[i];

    // Update progress
    await job.updateProgress((i / formats.length) * 100);

    // Transcode
    await ffmpeg.transcode(videoId, format);
  }

  await job.updateProgress(100);
});

// Client polls for progress
app.get('/videos/:id/status', async (req, res) => {
  const job = await queue.getJob(req.params.jobId);
  res.json({
    state: await job.getState(),
    progress: job.progress
  });
});

Monitoring & Observability

监控与可观测性

Essential Metrics

核心指标

typescript
// Queue health dashboard
async function getQueueMetrics() {
  const [waiting, active, completed, failed, delayed] = await Promise.all([
    queue.getWaitingCount(),
    queue.getActiveCount(),
    queue.getCompletedCount(),
    queue.getFailedCount(),
    queue.getDelayedCount()
  ]);

  return {
    waiting,    // Jobs waiting to be processed
    active,     // Jobs currently processing
    completed,  // Successfully completed
    failed,     // Failed after retries
    delayed,    // Scheduled for future
    health: waiting < 1000 && failed < 100 ? 'healthy' : 'degraded'
  };
}
typescript
// Queue health dashboard
async function getQueueMetrics() {
  const [waiting, active, completed, failed, delayed] = await Promise.all([
    queue.getWaitingCount(),
    queue.getActiveCount(),
    queue.getCompletedCount(),
    queue.getFailedCount(),
    queue.getDelayedCount()
  ]);

  return {
    waiting,    // Jobs waiting to be processed
    active,     // Jobs currently processing
    completed,  // Successfully completed
    failed,     // Failed after retries
    delayed,    // Scheduled for future
    health: waiting < 1000 && failed < 100 ? 'healthy' : 'degraded'
  };
}

BullMQ Board (UI)

BullMQ Board(可视化界面)

typescript
// Development: Monitor jobs visually
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';

const serverAdapter = new ExpressAdapter();

createBullBoard({
  queues: [
    new BullMQAdapter(emailQueue),
    new BullMQAdapter(videoQueue)
  ],
  serverAdapter
});

app.use('/admin/queues', serverAdapter.getRouter());
// Visit http://localhost:3000/admin/queues

typescript
// Development: Monitor jobs visually
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';

const serverAdapter = new ExpressAdapter();

createBullBoard({
  queues: [
    new BullMQAdapter(emailQueue),
    new BullMQAdapter(videoQueue)
  ],
  serverAdapter
});

app.use('/admin/queues', serverAdapter.getRouter());
// Visit http://localhost:3000/admin/queues

Production Checklist

生产环境检查清单

□ Dead letter queue configured
□ Retry strategy with exponential backoff
□ Job timeout limits set
□ Rate limiting for third-party APIs
□ Idempotency keys for critical operations
□ Worker concurrency tuned (CPU cores * 2)
□ Horizontal scaling configured (multiple workers)
□ Queue depth monitoring with alerts
□ Failed job inspection workflow
□ Job data doesn't contain PII in logs
□ Redis persistence enabled (AOF or RDB)
□ Graceful shutdown handling (SIGTERM)

□ Dead letter queue configured
□ Retry strategy with exponential backoff
□ Job timeout limits set
□ Rate limiting for third-party APIs
□ Idempotency keys for critical operations
□ Worker concurrency tuned (CPU cores * 2)
□ Horizontal scaling configured (multiple workers)
□ Queue depth monitoring with alerts
□ Failed job inspection workflow
□ Job data doesn't contain PII in logs
□ Redis persistence enabled (AOF or RDB)
□ Graceful shutdown handling (SIGTERM)

When to Use vs Avoid

适用与不适用场景对比

ScenarioUse Background Jobs?
Send welcome email on signup✅ Yes - can take 2-5 seconds
Charge credit card⚠️ Maybe - depends on payment provider latency
Generate PDF report (30 seconds)✅ Yes - definitely background
Fetch user profile from DB❌ No - milliseconds, keep synchronous
Process video upload (5 minutes)✅ Yes - always background
Validate form input❌ No - synchronous validation
Daily cron job✅ Yes - use repeatable jobs
Real-time chat message❌ No - use WebSockets

场景是否使用后台任务?
注册时发送欢迎邮件✅ 是 - 可能耗时2-5秒
信用卡扣费⚠️ 视情况而定 - 取决于支付提供商延迟
生成PDF报表(30秒)✅ 是 - 必须使用后台任务
从数据库获取用户资料❌ 否 - 仅需毫秒级,保持同步
处理视频上传(5分钟)✅ 是 - 务必使用后台任务
表单输入验证❌ 否 - 同步验证即可
每日定时任务✅ 是 - 使用可重复任务
实时聊天消息❌ 否 - 使用WebSockets

Technology Comparison

技术对比

FeatureBullMQCeleryAWS SQS
LanguageNode.jsPythonAny (HTTP API)
BackendRedisRedis/RabbitMQ/SQSManaged
Priorities
Rate Limiting✅ (via attributes)
Repeat/Cron✅ (celery-beat)❌ (use EventBridge)
UI DashboardBull BoardFlowerCloudWatch
Workflows✅ (chains, groups)
Learning CurveMediumMediumLow
CostRedis hostingRedis hosting$0.40/million requests

特性BullMQCeleryAWS SQS
支持语言Node.jsPython任意(HTTP API)
后端存储RedisRedis/RabbitMQ/SQS托管服务
优先级队列
速率限制✅(通过属性配置)
重复/定时任务✅(celery-beat)❌(需搭配EventBridge)
UI控制台Bull BoardFlowerCloudWatch
工作流✅(链式、分组任务)
学习曲线中等中等
成本Redis托管费用Redis托管费用每百万请求0.40美元

References

参考资料

  • /references/bullmq-patterns.md
    - Advanced BullMQ patterns and examples
  • /references/celery-workflows.md
    - Celery chains, groups, and chords
  • /references/job-observability.md
    - Monitoring, alerting, and debugging
  • /references/bullmq-patterns.md
    - 高级BullMQ模式与示例
  • /references/celery-workflows.md
    - Celery链式、分组及和弦任务
  • /references/job-observability.md
    - 监控、告警与调试

Scripts

脚本

  • scripts/setup_bullmq.sh
    - Initialize BullMQ with Redis
  • scripts/queue_health_check.ts
    - Queue metrics dashboard
  • scripts/retry_failed_jobs.ts
    - Bulk retry failed jobs

This skill guides: Background job implementation | Queue architecture | Retry strategies | Worker scaling | Job observability
  • scripts/setup_bullmq.sh
    - 初始化BullMQ与Redis
  • scripts/queue_health_check.ts
    - 队列指标仪表盘
  • scripts/retry_failed_jobs.ts
    - 批量重试失败任务

本技能涵盖:后台任务实现 | 队列架构 | 重试策略 | Worker扩容 | 任务可观测性