Loading...
Loading...
Expert in background job processing with Bull/BullMQ (Redis), Celery, and cloud queues. Implements retries, scheduling, priority queues, and worker management. Use for async task processing, email campaigns, report generation, batch operations. Activate on "background job", "async task", "queue", "worker", "BullMQ", "Celery". NOT for real-time WebSocket communication, synchronous API calls, or simple setTimeout operations.
npx skill4agent add erichowens/some_claude_skills background-job-orchestratorDoes this task:
├── Take >5 seconds? → Background job
├── Need to retry on failure? → Background job
├── Run on a schedule? → Background job (cron pattern)
├── Block user interaction? → Background job
├── Process in batches? → Background job
└── Return immediately? → Keep synchronous// BullMQ with dead letter queue
const queue = new Queue('email-queue', {
connection: redis,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
},
removeOnComplete: 100, // Keep last 100 successful
removeOnFail: false // Keep all failed for inspection
}
});
// Monitor failed jobs
const failedJobs = await queue.getFailed();// ❌ WRONG - Blocks API response
app.post('/send-email', async (req, res) => {
await sendEmail(req.body.to, req.body.subject);
res.json({ success: true });
});// ✅ RIGHT - Queue and return immediately
app.post('/send-email', async (req, res) => {
const job = await emailQueue.add('send', {
to: req.body.to,
subject: req.body.subject
});
res.json({
success: true,
jobId: job.id,
status: 'queued'
});
});
// Separate worker processes the job
worker.process('send', async (job) => {
await sendEmail(job.data.to, job.data.subject);
});// ✅ Idempotent job with deduplication key
await queue.add('charge-payment', {
userId: 123,
amount: 50.00
}, {
jobId: `payment-${orderId}`, // Prevents duplicates
attempts: 3
});
// In worker: Check if already processed
worker.process('charge-payment', async (job) => {
const { userId, amount } = job.data;
// Check idempotency
const existing = await db.payments.findOne({
jobId: job.id
});
if (existing) {
return existing; // Already processed
}
// Process payment
const result = await stripe.charges.create({...});
// Store idempotency record
await db.payments.create({
jobId: job.id,
result
});
return result;
});// BullMQ rate limiting
const queue = new Queue('api-calls', {
limiter: {
max: 100, // Max 100 jobs
duration: 60000 // Per 60 seconds
}
});
// Or: Priority-based rate limits
await queue.add('send-email', data, {
priority: user.isPremium ? 1 : 10,
rateLimiter: {
max: user.isPremium ? 1000 : 100,
duration: 3600000 // Per hour
}
});// Horizontal scaling with multiple workers
const worker = new Worker('email-queue', async (job) => {
await processEmail(job.data);
}, {
connection: redis,
concurrency: 5 // Process 5 jobs concurrently per worker
});
// Run multiple worker processes (PM2, Kubernetes, etc.)
// Each worker processes concurrency * num_workers jobs// Set up alerts for queue depth
setInterval(async () => {
const waiting = await queue.getWaitingCount();
if (waiting > 1000) {
alert('Queue depth exceeds 1000, scale workers!');
}
}, 60000);// Queue setup
const emailQueue = new Queue('email-campaign', { connection: redis });
// Enqueue batch
async function sendCampaign(userIds: number[], template: string) {
const jobs = userIds.map(userId => ({
name: 'send',
data: { userId, template },
opts: {
attempts: 3,
backoff: { type: 'exponential', delay: 5000 }
}
}));
await emailQueue.addBulk(jobs);
}
// Worker with retry logic
const worker = new Worker('email-campaign', async (job) => {
const { userId, template } = job.data;
const user = await db.users.findById(userId);
const email = renderTemplate(template, user);
try {
await sendgrid.send({
to: user.email,
subject: email.subject,
html: email.body
});
} catch (error) {
if (error.code === 'ECONNREFUSED') {
throw error; // Retry
}
// Invalid email, don't retry
console.error(`Invalid email for user ${userId}`);
}
}, {
connection: redis,
concurrency: 10
});// Daily report at 9 AM
await queue.add('daily-report', {
type: 'sales',
recipients: ['admin@company.com']
}, {
repeat: {
pattern: '0 9 * * *', // Cron syntax
tz: 'America/New_York'
}
});
// Worker generates and emails report
worker.process('daily-report', async (job) => {
const { type, recipients } = job.data;
const data = await generateReport(type);
const pdf = await createPDF(data);
await emailQueue.add('send', {
to: recipients,
subject: `Daily ${type} Report`,
attachments: [{ filename: 'report.pdf', content: pdf }]
});
});// Multi-stage job with progress tracking
await videoQueue.add('transcode', {
videoId: 123,
formats: ['720p', '1080p', '4k']
}, {
attempts: 2,
timeout: 3600000 // 1 hour timeout
});
worker.process('transcode', async (job) => {
const { videoId, formats } = job.data;
for (let i = 0; i < formats.length; i++) {
const format = formats[i];
// Update progress
await job.updateProgress((i / formats.length) * 100);
// Transcode
await ffmpeg.transcode(videoId, format);
}
await job.updateProgress(100);
});
// Client polls for progress
app.get('/videos/:id/status', async (req, res) => {
const job = await queue.getJob(req.params.jobId);
res.json({
state: await job.getState(),
progress: job.progress
});
});// Queue health dashboard
async function getQueueMetrics() {
const [waiting, active, completed, failed, delayed] = await Promise.all([
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getCompletedCount(),
queue.getFailedCount(),
queue.getDelayedCount()
]);
return {
waiting, // Jobs waiting to be processed
active, // Jobs currently processing
completed, // Successfully completed
failed, // Failed after retries
delayed, // Scheduled for future
health: waiting < 1000 && failed < 100 ? 'healthy' : 'degraded'
};
}// Development: Monitor jobs visually
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
const serverAdapter = new ExpressAdapter();
createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(videoQueue)
],
serverAdapter
});
app.use('/admin/queues', serverAdapter.getRouter());
// Visit http://localhost:3000/admin/queues□ Dead letter queue configured
□ Retry strategy with exponential backoff
□ Job timeout limits set
□ Rate limiting for third-party APIs
□ Idempotency keys for critical operations
□ Worker concurrency tuned (CPU cores * 2)
□ Horizontal scaling configured (multiple workers)
□ Queue depth monitoring with alerts
□ Failed job inspection workflow
□ Job data doesn't contain PII in logs
□ Redis persistence enabled (AOF or RDB)
□ Graceful shutdown handling (SIGTERM)| Scenario | Use Background Jobs? |
|---|---|
| Send welcome email on signup | ✅ Yes - can take 2-5 seconds |
| Charge credit card | ⚠️ Maybe - depends on payment provider latency |
| Generate PDF report (30 seconds) | ✅ Yes - definitely background |
| Fetch user profile from DB | ❌ No - milliseconds, keep synchronous |
| Process video upload (5 minutes) | ✅ Yes - always background |
| Validate form input | ❌ No - synchronous validation |
| Daily cron job | ✅ Yes - use repeatable jobs |
| Real-time chat message | ❌ No - use WebSockets |
| Feature | BullMQ | Celery | AWS SQS |
|---|---|---|---|
| Language | Node.js | Python | Any (HTTP API) |
| Backend | Redis | Redis/RabbitMQ/SQS | Managed |
| Priorities | ✅ | ✅ | ✅ |
| Rate Limiting | ✅ | ❌ | ✅ (via attributes) |
| Repeat/Cron | ✅ | ✅ (celery-beat) | ❌ (use EventBridge) |
| UI Dashboard | Bull Board | Flower | CloudWatch |
| Workflows | ❌ | ✅ (chains, groups) | ❌ |
| Learning Curve | Medium | Medium | Low |
| Cost | Redis hosting | Redis hosting | $0.40/million requests |
/references/bullmq-patterns.md/references/celery-workflows.md/references/job-observability.mdscripts/setup_bullmq.shscripts/queue_health_check.tsscripts/retry_failed_jobs.ts