background-jobs

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Background Jobs

后台任务

Production-ready background job processing with reliability guarantees.
具备可靠性保障的生产级后台任务处理方案。

When to Use This Skill

适用场景

  • Processing that takes longer than a request timeout
  • Scheduled/recurring tasks (reports, cleanup, sync)
  • Async workflows (email, notifications, webhooks)
  • Work that can fail and needs retries
  • 处理时间超过请求超时限制的任务
  • 定时/周期性任务(报表生成、清理操作、数据同步)
  • 异步工作流(邮件发送、通知推送、Webhook调用)
  • 可能失败且需要重试的任务

Architecture Overview

架构概述

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   API       │────▶│   Queue     │────▶│   Worker    │
│   Server    │     │   (Redis)   │     │   Process   │
└─────────────┘     └─────────────┘     └─────────────┘
                           │                   │
                           │              ┌────┴────┐
                           │              │ Success │
                           │              └────┬────┘
                           │                   │
                           ▼                   ▼
                    ┌─────────────┐     ┌─────────────┐
                    │    DLQ      │     │  Complete   │
                    │ (failures)  │     │   State     │
                    └─────────────┘     └─────────────┘
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   API       │────▶│   Queue     │────▶│   Worker    │
│   Server    │     │   (Redis)   │     │   Process   │
└─────────────┘     └─────────────┘     └─────────────┘
                           │                   │
                           │              ┌────┴────┐
                           │              │ Success │
                           │              └────┬────┘
                           │                   │
                           ▼                   ▼
                    ┌─────────────┐     ┌─────────────┐
                    │    DLQ      │     │  Complete   │
                    │ (failures)  │     │   State     │
                    └─────────────┘     └─────────────┘

Job State Machine

作业状态机

┌─────────┐
│ PENDING │──────────────────────────────┐
└────┬────┘                              │
     │ picked up                         │
     ▼                                   │
┌─────────┐                              │
│ RUNNING │──────────┐                   │
└────┬────┘          │                   │
     │               │ failure           │
     │ success       ▼                   │
     │         ┌──────────┐              │
     │         │ RETRYING │──────────────┤
     │         └────┬─────┘              │
     │              │ max retries        │
     ▼              ▼                    │
┌─────────┐  ┌──────────┐         ┌──────┴──────┐
│ SUCCESS │  │  FAILED  │         │  CANCELLED  │
└─────────┘  └──────────┘         └─────────────┘
┌─────────┐
│ PENDING │──────────────────────────────┐
└────┬────┘                              │
     │ 被拾取执行                         │
     ▼                                   │
┌─────────┐                              │
│ RUNNING │──────────┐                   │
└────┬────┘          │                   │
     │               │ 执行失败           │
     │ 执行成功       ▼                   │
     │         ┌──────────┐              │
     │         │ RETRYING │──────────────┤
     │         └────┬─────┘              │
     │              │ 达到最大重试次数     │
     ▼              ▼                    │
┌─────────┐  ┌──────────┐         ┌──────┴──────┐
│ SUCCESS │  │  FAILED  │         │  CANCELLED  │
└─────────┘  └──────────┘         └─────────────┘

TypeScript Implementation

TypeScript 实现

Job Types and Queue

作业类型与队列

typescript
// types.ts
type JobStatus = 'pending' | 'running' | 'success' | 'failed' | 'retrying' | 'cancelled';

interface Job<T = unknown> {
  id: string;
  type: string;
  payload: T;
  status: JobStatus;
  attempts: number;
  maxAttempts: number;
  createdAt: Date;
  scheduledFor: Date;
  startedAt?: Date;
  completedAt?: Date;
  error?: string;
  result?: unknown;
}

interface JobHandler<T = unknown> {
  (payload: T, job: Job<T>): Promise<unknown>;
}
typescript
// types.ts
type JobStatus = 'pending' | 'running' | 'success' | 'failed' | 'retrying' | 'cancelled';

interface Job<T = unknown> {
  id: string;
  type: string;
  payload: T;
  status: JobStatus;
  attempts: number;
  maxAttempts: number;
  createdAt: Date;
  scheduledFor: Date;
  startedAt?: Date;
  completedAt?: Date;
  error?: string;
  result?: unknown;
}

interface JobHandler<T = unknown> {
  (payload: T, job: Job<T>): Promise<unknown>;
}

Queue Implementation (Redis)

队列实现(基于Redis)

typescript
// queue.ts
import { Redis } from 'ioredis';
import { v4 as uuid } from 'uuid';

class JobQueue {
  private redis: Redis;
  private handlers = new Map<string, JobHandler>();

  constructor(redis: Redis) {
    this.redis = redis;
  }

  register<T>(type: string, handler: JobHandler<T>): void {
    this.handlers.set(type, handler as JobHandler);
  }

  async enqueue<T>(
    type: string,
    payload: T,
    options: { delay?: number; maxAttempts?: number } = {}
  ): Promise<string> {
    const job: Job<T> = {
      id: uuid(),
      type,
      payload,
      status: 'pending',
      attempts: 0,
      maxAttempts: options.maxAttempts || 3,
      createdAt: new Date(),
      scheduledFor: new Date(Date.now() + (options.delay || 0)),
    };

    await this.redis.zadd(
      'jobs:pending',
      job.scheduledFor.getTime(),
      JSON.stringify(job)
    );

    return job.id;
  }

  async process(): Promise<void> {
    while (true) {
      const result = await this.redis.bzpopmin('jobs:pending', 1);
      if (!result) continue;

      const job: Job = JSON.parse(result[1]);
      
      if (job.scheduledFor.getTime() > Date.now()) {
        // Not ready yet, put back
        await this.redis.zadd('jobs:pending', job.scheduledFor.getTime(), JSON.stringify(job));
        continue;
      }

      await this.executeJob(job);
    }
  }

  private async executeJob(job: Job): Promise<void> {
    const handler = this.handlers.get(job.type);
    if (!handler) {
      console.error(`No handler for job type: ${job.type}`);
      return;
    }

    job.status = 'running';
    job.attempts++;
    job.startedAt = new Date();

    try {
      job.result = await handler(job.payload, job);
      job.status = 'success';
      job.completedAt = new Date();
      
      await this.redis.hset('jobs:completed', job.id, JSON.stringify(job));
    } catch (error) {
      job.error = error instanceof Error ? error.message : String(error);

      if (job.attempts < job.maxAttempts) {
        job.status = 'retrying';
        const backoff = Math.pow(2, job.attempts) * 1000; // Exponential backoff
        job.scheduledFor = new Date(Date.now() + backoff);
        
        await this.redis.zadd('jobs:pending', job.scheduledFor.getTime(), JSON.stringify(job));
      } else {
        job.status = 'failed';
        job.completedAt = new Date();
        
        // Move to dead letter queue
        await this.redis.lpush('jobs:dlq', JSON.stringify(job));
      }
    }
  }
}

export { JobQueue, Job, JobHandler };
typescript
// queue.ts
import { Redis } from 'ioredis';
import { v4 as uuid } from 'uuid';

class JobQueue {
  private redis: Redis;
  private handlers = new Map<string, JobHandler>();

  constructor(redis: Redis) {
    this.redis = redis;
  }

  register<T>(type: string, handler: JobHandler<T>): void {
    this.handlers.set(type, handler as JobHandler);
  }

  async enqueue<T>(
    type: string,
    payload: T,
    options: { delay?: number; maxAttempts?: number } = {}
  ): Promise<string> {
    const job: Job<T> = {
      id: uuid(),
      type,
      payload,
      status: 'pending',
      attempts: 0,
      maxAttempts: options.maxAttempts || 3,
      createdAt: new Date(),
      scheduledFor: new Date(Date.now() + (options.delay || 0)),
    };

    await this.redis.zadd(
      'jobs:pending',
      job.scheduledFor.getTime(),
      JSON.stringify(job)
    );

    return job.id;
  }

  async process(): Promise<void> {
    while (true) {
      const result = await this.redis.bzpopmin('jobs:pending', 1);
      if (!result) continue;

      const job: Job = JSON.parse(result[1]);
      
      if (job.scheduledFor.getTime() > Date.now()) {
        // 任务尚未到执行时间,放回队列
        await this.redis.zadd('jobs:pending', job.scheduledFor.getTime(), JSON.stringify(job));
        continue;
      }

      await this.executeJob(job);
    }
  }

  private async executeJob(job: Job): Promise<void> {
    const handler = this.handlers.get(job.type);
    if (!handler) {
      console.error(`No handler for job type: ${job.type}`);
      return;
    }

    job.status = 'running';
    job.attempts++;
    job.startedAt = new Date();

    try {
      job.result = await handler(job.payload, job);
      job.status = 'success';
      job.completedAt = new Date();
      
      await this.redis.hset('jobs:completed', job.id, JSON.stringify(job));
    } catch (error) {
      job.error = error instanceof Error ? error.message : String(error);

      if (job.attempts < job.maxAttempts) {
        job.status = 'retrying';
        const backoff = Math.pow(2, job.attempts) * 1000; // 指数退避
        job.scheduledFor = new Date(Date.now() + backoff);
        
        await this.redis.zadd('jobs:pending', job.scheduledFor.getTime(), JSON.stringify(job));
      } else {
        job.status = 'failed';
        job.completedAt = new Date();
        
        // 移入死信队列
        await this.redis.lpush('jobs:dlq', JSON.stringify(job));
      }
    }
  }
}

export { JobQueue, Job, JobHandler };

Job Handlers

作业处理器

typescript
// handlers/email.ts
import { JobHandler } from '../queue';

interface SendEmailPayload {
  to: string;
  subject: string;
  template: string;
  data: Record<string, unknown>;
}

export const sendEmailHandler: JobHandler<SendEmailPayload> = async (payload) => {
  const { to, subject, template, data } = payload;
  
  // Render template
  const html = await renderTemplate(template, data);
  
  // Send via email provider
  await emailProvider.send({
    to,
    subject,
    html,
  });

  return { sent: true, to };
};

// handlers/webhook.ts
interface WebhookPayload {
  url: string;
  event: string;
  data: unknown;
}

export const webhookHandler: JobHandler<WebhookPayload> = async (payload, job) => {
  const response = await fetch(payload.url, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'X-Webhook-Event': payload.event,
      'X-Webhook-Delivery': job.id,
    },
    body: JSON.stringify(payload.data),
  });

  if (!response.ok) {
    throw new Error(`Webhook failed: ${response.status}`);
  }

  return { status: response.status };
};
typescript
// handlers/email.ts
import { JobHandler } from '../queue';

interface SendEmailPayload {
  to: string;
  subject: string;
  template: string;
  data: Record<string, unknown>;
}

export const sendEmailHandler: JobHandler<SendEmailPayload> = async (payload) => {
  const { to, subject, template, data } = payload;
  
  // 渲染模板
  const html = await renderTemplate(template, data);
  
  // 通过邮件服务商发送
  await emailProvider.send({
    to,
    subject,
    html,
  });

  return { sent: true, to };
};

// handlers/webhook.ts
interface WebhookPayload {
  url: string;
  event: string;
  data: unknown;
}

export const webhookHandler: JobHandler<WebhookPayload> = async (payload, job) => {
  const response = await fetch(payload.url, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'X-Webhook-Event': payload.event,
      'X-Webhook-Delivery': job.id,
    },
    body: JSON.stringify(payload.data),
  });

  if (!response.ok) {
    throw new Error(`Webhook failed: ${response.status}`);
  }

  return { status: response.status };
};

Worker Process

工作进程

typescript
// worker.ts
import { Redis } from 'ioredis';
import { JobQueue } from './queue';
import { sendEmailHandler } from './handlers/email';
import { webhookHandler } from './handlers/webhook';

const redis = new Redis(process.env.REDIS_URL);
const queue = new JobQueue(redis);

// Register handlers
queue.register('send-email', sendEmailHandler);
queue.register('webhook', webhookHandler);

// Graceful shutdown
let isShuttingDown = false;

process.on('SIGTERM', () => {
  console.log('Received SIGTERM, shutting down gracefully...');
  isShuttingDown = true;
});

// Start processing
console.log('Worker started, waiting for jobs...');
queue.process();
typescript
// worker.ts
import { Redis } from 'ioredis';
import { JobQueue } from './queue';
import { sendEmailHandler } from './handlers/email';
import { webhookHandler } from './handlers/webhook';

const redis = new Redis(process.env.REDIS_URL);
const queue = new JobQueue(redis);

// 注册处理器
queue.register('send-email', sendEmailHandler);
queue.register('webhook', webhookHandler);

// 优雅停机
let isShuttingDown = false;

process.on('SIGTERM', () => {
  console.log('Received SIGTERM, shutting down gracefully...');
  isShuttingDown = true;
});

// 开始处理任务
console.log('Worker started, waiting for jobs...');
queue.process();

Python Implementation

Python 实现

python
undefined
python
undefined

queue.py

queue.py

import json import uuid from datetime import datetime, timedelta from enum import Enum from typing import Any, Callable, Dict, Optional from redis import Redis from dataclasses import dataclass, asdict
class JobStatus(Enum): PENDING = "pending" RUNNING = "running" SUCCESS = "success" FAILED = "failed" RETRYING = "retrying"
@dataclass class Job: id: str type: str payload: Dict[str, Any] status: JobStatus attempts: int max_attempts: int created_at: datetime scheduled_for: datetime started_at: Optional[datetime] = None completed_at: Optional[datetime] = None error: Optional[str] = None result: Optional[Any] = None
class JobQueue: def init(self, redis: Redis): self.redis = redis self.handlers: Dict[str, Callable] = {}
def register(self, job_type: str, handler: Callable) -> None:
    self.handlers[job_type] = handler

def enqueue(
    self,
    job_type: str,
    payload: Dict[str, Any],
    delay: int = 0,
    max_attempts: int = 3,
) -> str:
    job = Job(
        id=str(uuid.uuid4()),
        type=job_type,
        payload=payload,
        status=JobStatus.PENDING,
        attempts=0,
        max_attempts=max_attempts,
        created_at=datetime.utcnow(),
        scheduled_for=datetime.utcnow() + timedelta(seconds=delay),
    )
    
    self.redis.zadd(
        "jobs:pending",
        {json.dumps(asdict(job), default=str): job.scheduled_for.timestamp()},
    )
    
    return job.id

def process(self) -> None:
    while True:
        result = self.redis.bzpopmin("jobs:pending", timeout=1)
        if not result:
            continue

        job_data = json.loads(result[1])
        job = Job(**job_data)
        job.status = JobStatus(job.status)
        
        self._execute_job(job)

def _execute_job(self, job: Job) -> None:
    handler = self.handlers.get(job.type)
    if not handler:
        return

    job.status = JobStatus.RUNNING
    job.attempts += 1
    job.started_at = datetime.utcnow()

    try:
        job.result = handler(job.payload, job)
        job.status = JobStatus.SUCCESS
        job.completed_at = datetime.utcnow()
        
        self.redis.hset("jobs:completed", job.id, json.dumps(asdict(job), default=str))
    except Exception as e:
        job.error = str(e)
        
        if job.attempts < job.max_attempts:
            job.status = JobStatus.RETRYING
            backoff = 2 ** job.attempts
            job.scheduled_for = datetime.utcnow() + timedelta(seconds=backoff)
            
            self.redis.zadd(
                "jobs:pending",
                {json.dumps(asdict(job), default=str): job.scheduled_for.timestamp()},
            )
        else:
            job.status = JobStatus.FAILED
            job.completed_at = datetime.utcnow()
            
            self.redis.lpush("jobs:dlq", json.dumps(asdict(job), default=str))
undefined
import json import uuid from datetime import datetime, timedelta from enum import Enum from typing import Any, Callable, Dict, Optional from redis import Redis from dataclasses import dataclass, asdict
class JobStatus(Enum): PENDING = "pending" RUNNING = "running" SUCCESS = "success" FAILED = "failed" RETRYING = "retrying"
@dataclass class Job: id: str type: str payload: Dict[str, Any] status: JobStatus attempts: int max_attempts: int created_at: datetime scheduled_for: datetime started_at: Optional[datetime] = None completed_at: Optional[datetime] = None error: Optional[str] = None result: Optional[Any] = None
class JobQueue: def init(self, redis: Redis): self.redis = redis self.handlers: Dict[str, Callable] = {}
def register(self, job_type: str, handler: Callable) -> None:
    self.handlers[job_type] = handler

def enqueue(
    self,
    job_type: str,
    payload: Dict[str, Any],
    delay: int = 0,
    max_attempts: int = 3,
) -> str:
    job = Job(
        id=str(uuid.uuid4()),
        type=job_type,
        payload=payload,
        status=JobStatus.PENDING,
        attempts=0,
        max_attempts=max_attempts,
        created_at=datetime.utcnow(),
        scheduled_for=datetime.utcnow() + timedelta(seconds=delay),
    )
    
    self.redis.zadd(
        "jobs:pending",
        {json.dumps(asdict(job), default=str): job.scheduled_for.timestamp()},
    )
    
    return job.id

def process(self) -> None:
    while True:
        result = self.redis.bzpopmin("jobs:pending", timeout=1)
        if not result:
            continue

        job_data = json.loads(result[1])
        job = Job(**job_data)
        job.status = JobStatus(job.status)
        
        self._execute_job(job)

def _execute_job(self, job: Job) -> None:
    handler = self.handlers.get(job.type)
    if not handler:
        return

    job.status = JobStatus.RUNNING
    job.attempts += 1
    job.started_at = datetime.utcnow()

    try:
        job.result = handler(job.payload, job)
        job.status = JobStatus.SUCCESS
        job.completed_at = datetime.utcnow()
        
        self.redis.hset("jobs:completed", job.id, json.dumps(asdict(job), default=str))
    except Exception as e:
        job.error = str(e)
        
        if job.attempts < job.max_attempts:
            job.status = JobStatus.RETRYING
            backoff = 2 ** job.attempts
            job.scheduled_for = datetime.utcnow() + timedelta(seconds=backoff)
            
            self.redis.zadd(
                "jobs:pending",
                {json.dumps(asdict(job), default=str): job.scheduled_for.timestamp()},
            )
        else:
            job.status = JobStatus.FAILED
            job.completed_at = datetime.utcnow()
            
            self.redis.lpush("jobs:dlq", json.dumps(asdict(job), default=str))
undefined

Dead Letter Queue Management

死信队列管理

typescript
// dlq.ts
class DLQManager {
  constructor(private redis: Redis) {}

  async getFailedJobs(limit = 100): Promise<Job[]> {
    const jobs = await this.redis.lrange('jobs:dlq', 0, limit - 1);
    return jobs.map(j => JSON.parse(j));
  }

  async retryJob(jobId: string): Promise<boolean> {
    const jobs = await this.getFailedJobs(1000);
    const job = jobs.find(j => j.id === jobId);
    
    if (!job) return false;

    // Reset and re-enqueue
    job.status = 'pending';
    job.attempts = 0;
    job.error = undefined;
    job.scheduledFor = new Date();

    await this.redis.zadd('jobs:pending', Date.now(), JSON.stringify(job));
    await this.redis.lrem('jobs:dlq', 1, JSON.stringify(job));
    
    return true;
  }

  async purgeOldJobs(olderThanDays = 7): Promise<number> {
    const cutoff = Date.now() - olderThanDays * 24 * 60 * 60 * 1000;
    const jobs = await this.getFailedJobs(10000);
    
    let purged = 0;
    for (const job of jobs) {
      if (new Date(job.completedAt!).getTime() < cutoff) {
        await this.redis.lrem('jobs:dlq', 1, JSON.stringify(job));
        purged++;
      }
    }
    
    return purged;
  }
}
typescript
// dlq.ts
class DLQManager {
  constructor(private redis: Redis) {}

  async getFailedJobs(limit = 100): Promise<Job[]> {
    const jobs = await this.redis.lrange('jobs:dlq', 0, limit - 1);
    return jobs.map(j => JSON.parse(j));
  }

  async retryJob(jobId: string): Promise<boolean> {
    const jobs = await this.getFailedJobs(1000);
    const job = jobs.find(j => j.id === jobId);
    
    if (!job) return false;

    // 重置并重新入队
    job.status = 'pending';
    job.attempts = 0;
    job.error = undefined;
    job.scheduledFor = new Date();

    await this.redis.zadd('jobs:pending', Date.now(), JSON.stringify(job));
    await this.redis.lrem('jobs:dlq', 1, JSON.stringify(job));
    
    return true;
  }

  async purgeOldJobs(olderThanDays = 7): Promise<number> {
    const cutoff = Date.now() - olderThanDays * 24 * 60 * 60 * 1000;
    const jobs = await this.getFailedJobs(10000);
    
    let purged = 0;
    for (const job of jobs) {
      if (new Date(job.completedAt!).getTime() < cutoff) {
        await this.redis.lrem('jobs:dlq', 1, JSON.stringify(job));
        purged++;
      }
    }
    
    return purged;
  }
}

Scheduling Recurring Jobs

周期性任务调度

typescript
// scheduler.ts
class JobScheduler {
  private intervals: NodeJS.Timeout[] = [];

  constructor(private queue: JobQueue) {}

  schedule(
    type: string,
    payload: unknown,
    cronExpression: string
  ): void {
    // Simple interval-based scheduling
    // For production, use node-cron or similar
    const interval = this.parseCron(cronExpression);
    
    const timer = setInterval(() => {
      this.queue.enqueue(type, payload);
    }, interval);
    
    this.intervals.push(timer);
  }

  stop(): void {
    this.intervals.forEach(clearInterval);
  }

  private parseCron(expr: string): number {
    // Simplified: "*/5 * * * *" = every 5 minutes
    const match = expr.match(/^\*\/(\d+)/);
    if (match) {
      return parseInt(match[1]) * 60 * 1000;
    }
    return 60000; // Default 1 minute
  }
}

// Usage
const scheduler = new JobScheduler(queue);
scheduler.schedule('cleanup-expired-sessions', {}, '*/15 * * * *');
scheduler.schedule('send-daily-digest', {}, '0 9 * * *');
typescript
// scheduler.ts
class JobScheduler {
  private intervals: NodeJS.Timeout[] = [];

  constructor(private queue: JobQueue) {}

  schedule(
    type: string,
    payload: unknown,
    cronExpression: string
  ): void {
    // 基于间隔的简单调度
    // 生产环境建议使用 node-cron 等库
    const interval = this.parseCron(cronExpression);
    
    const timer = setInterval(() => {
      this.queue.enqueue(type, payload);
    }, interval);
    
    this.intervals.push(timer);
  }

  stop(): void {
    this.intervals.forEach(clearInterval);
  }

  private parseCron(expr: string): number {
    // 简化实现:"*/5 * * * *" = 每5分钟执行一次
    const match = expr.match(/^\*\/(\d+)/);
    if (match) {
      return parseInt(match[1]) * 60 * 1000;
    }
    return 60000; // 默认1分钟
  }
}

// 使用示例
const scheduler = new JobScheduler(queue);
scheduler.schedule('cleanup-expired-sessions', {}, '*/15 * * * *');
scheduler.schedule('send-daily-digest', {}, '0 9 * * *');

Best Practices

最佳实践

  1. Always use exponential backoff: Prevents thundering herd on failures
  2. Set reasonable max attempts: 3-5 for most jobs
  3. Monitor DLQ size: Alert when it grows
  4. Make jobs idempotent: Same job can run multiple times safely
  5. Include job ID in logs: Makes debugging easier
  1. 始终使用指数退避:避免故障时出现流量风暴
  2. 设置合理的最大重试次数:大多数任务设置3-5次即可
  3. 监控死信队列大小:当队列增长时触发告警
  4. 确保任务幂等性:同一任务多次执行也能保证安全
  5. 日志中包含任务ID:便于调试排查

Common Mistakes

常见错误

  • Not handling worker crashes (jobs stuck in running state)
  • No visibility into job status
  • Forgetting to handle DLQ
  • Jobs that aren't idempotent
  • No graceful shutdown (jobs killed mid-execution)
  • 未处理工作进程崩溃(任务卡在运行状态)
  • 缺乏任务状态的可见性
  • 忘记处理死信队列
  • 任务不具备幂等性
  • 未实现优雅停机(任务执行中途被终止)

Observability

可观测性

typescript
// Add metrics
const jobsProcessed = new Counter({
  name: 'jobs_processed_total',
  help: 'Total jobs processed',
  labelNames: ['type', 'status'],
});

const jobDuration = new Histogram({
  name: 'job_duration_seconds',
  help: 'Job processing duration',
  labelNames: ['type'],
});

const dlqSize = new Gauge({
  name: 'dlq_size',
  help: 'Dead letter queue size',
});
typescript
// 添加监控指标
const jobsProcessed = new Counter({
  name: 'jobs_processed_total',
  help: 'Total jobs processed',
  labelNames: ['type', 'status'],
});

const jobDuration = new Histogram({
  name: 'job_duration_seconds',
  help: 'Job processing duration',
  labelNames: ['type'],
});

const dlqSize = new Gauge({
  name: 'dlq_size',
  help: 'Dead letter queue size',
});