dead-letter-queue

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Dead Letter Queue

死信队列

Store failed jobs for replay and debugging.
存储失败的任务以供重放和调试。

When to Use This Skill

何时使用该方案

  • Jobs fail after max retries
  • Need visibility into failure patterns
  • Want to replay failed jobs manually
  • Can't afford to lose failed work
  • 任务达到最大重试次数后仍失败
  • 需要掌握失败模式的可见性
  • 希望手动重放失败的任务
  • 无法承受失败任务的丢失

Core Concepts

核心概念

  1. Capture context - Store enough info to replay
  2. Track attempts - Record all error messages
  3. Enable replay - Allow manual re-processing
  4. Enforce limits - Prevent unbounded growth
  1. 捕获上下文 - 存储足够的信息以支持任务重放
  2. 跟踪尝试次数 - 记录所有错误信息
  3. 支持重放 - 允许手动重新处理任务
  4. 限制队列大小 - 防止队列无限制增长

TypeScript Implementation

TypeScript 实现

typescript
// dead-letter-queue.ts
interface DeadLetterJob {
  id: string;
  workerName: string;
  payload: Record<string, unknown>;
  errorMessage: string;
  errorType: string;
  stackTrace?: string;
  attempts: number;
  attemptErrors: string[];
  firstAttemptAt: Date;
  lastAttemptAt: Date;
  createdAt: Date;
  resolvedAt?: Date;
  resolution?: string;
}

class DeadLetterQueue {
  private jobs = new Map<string, DeadLetterJob>();
  private maxSize = 1000;
  private counter = 0;

  add(
    workerName: string,
    payload: Record<string, unknown>,
    errorMessage: string,
    errorType: string,
    attempts: number,
    stackTrace?: string
  ): DeadLetterJob {
    const id = `dlq_${++this.counter}_${Date.now()}`;
    const now = new Date();

    const job: DeadLetterJob = {
      id,
      workerName,
      payload,
      errorMessage,
      errorType,
      stackTrace,
      attempts,
      attemptErrors: [errorMessage],
      firstAttemptAt: now,
      lastAttemptAt: now,
      createdAt: now,
    };

    this.jobs.set(id, job);
    this.enforceMaxSize();
    
    console.log(`[DLQ] Added: ${id} (${workerName})`);
    return job;
  }

  recordAttempt(jobId: string, errorMessage: string): boolean {
    const job = this.jobs.get(jobId);
    if (!job) return false;

    job.attempts++;
    job.lastAttemptAt = new Date();
    job.attemptErrors.push(errorMessage);
    job.errorMessage = errorMessage;
    return true;
  }

  resolve(jobId: string, resolution: string): boolean {
    const job = this.jobs.get(jobId);
    if (!job) return false;

    job.resolvedAt = new Date();
    job.resolution = resolution;
    return true;
  }

  discard(jobId: string): boolean {
    return this.jobs.delete(jobId);
  }

  getUnresolved(): DeadLetterJob[] {
    return Array.from(this.jobs.values())
      .filter(j => !j.resolvedAt)
      .sort((a, b) => b.createdAt.getTime() - a.createdAt.getTime());
  }

  getReplayable(maxAttempts = 5): DeadLetterJob[] {
    return this.getUnresolved().filter(j => j.attempts < maxAttempts);
  }

  getByWorker(workerName: string): DeadLetterJob[] {
    return this.getUnresolved().filter(j => j.workerName === workerName);
  }

  getStats() {
    const jobs = Array.from(this.jobs.values());
    const unresolved = jobs.filter(j => !j.resolvedAt);
    
    const byWorker: Record<string, number> = {};
    const byErrorType: Record<string, number> = {};
    
    for (const job of unresolved) {
      byWorker[job.workerName] = (byWorker[job.workerName] || 0) + 1;
      byErrorType[job.errorType] = (byErrorType[job.errorType] || 0) + 1;
    }

    return { total: jobs.length, unresolved: unresolved.length, byWorker, byErrorType };
  }

  cleanupResolved(olderThanHours = 24): number {
    const cutoff = Date.now() - olderThanHours * 60 * 60 * 1000;
    let deleted = 0;

    for (const [id, job] of this.jobs) {
      if (job.resolvedAt && job.resolvedAt.getTime() < cutoff) {
        this.jobs.delete(id);
        deleted++;
      }
    }
    return deleted;
  }

  private enforceMaxSize(): void {
    if (this.jobs.size <= this.maxSize) return;

    // Remove oldest resolved first, then oldest unresolved
    const sorted = Array.from(this.jobs.entries())
      .sort((a, b) => {
        if (a[1].resolvedAt && !b[1].resolvedAt) return -1;
        return a[1].createdAt.getTime() - b[1].createdAt.getTime();
      });

    while (sorted.length > this.maxSize) {
      const [id] = sorted.shift()!;
      this.jobs.delete(id);
    }
  }
}

// Singleton
let dlq: DeadLetterQueue | null = null;
export function getDeadLetterQueue(): DeadLetterQueue {
  if (!dlq) dlq = new DeadLetterQueue();
  return dlq;
}
typescript
// dead-letter-queue.ts
interface DeadLetterJob {
  id: string;
  workerName: string;
  payload: Record<string, unknown>;
  errorMessage: string;
  errorType: string;
  stackTrace?: string;
  attempts: number;
  attemptErrors: string[];
  firstAttemptAt: Date;
  lastAttemptAt: Date;
  createdAt: Date;
  resolvedAt?: Date;
  resolution?: string;
}

class DeadLetterQueue {
  private jobs = new Map<string, DeadLetterJob>();
  private maxSize = 1000;
  private counter = 0;

  add(
    workerName: string,
    payload: Record<string, unknown>,
    errorMessage: string,
    errorType: string,
    attempts: number,
    stackTrace?: string
  ): DeadLetterJob {
    const id = `dlq_${++this.counter}_${Date.now()}`;
    const now = new Date();

    const job: DeadLetterJob = {
      id,
      workerName,
      payload,
      errorMessage,
      errorType,
      stackTrace,
      attempts,
      attemptErrors: [errorMessage],
      firstAttemptAt: now,
      lastAttemptAt: now,
      createdAt: now,
    };

    this.jobs.set(id, job);
    this.enforceMaxSize();
    
    console.log(`[DLQ] Added: ${id} (${workerName})`);
    return job;
  }

  recordAttempt(jobId: string, errorMessage: string): boolean {
    const job = this.jobs.get(jobId);
    if (!job) return false;

    job.attempts++;
    job.lastAttemptAt = new Date();
    job.attemptErrors.push(errorMessage);
    job.errorMessage = errorMessage;
    return true;
  }

  resolve(jobId: string, resolution: string): boolean {
    const job = this.jobs.get(jobId);
    if (!job) return false;

    job.resolvedAt = new Date();
    job.resolution = resolution;
    return true;
  }

  discard(jobId: string): boolean {
    return this.jobs.delete(jobId);
  }

  getUnresolved(): DeadLetterJob[] {
    return Array.from(this.jobs.values())
      .filter(j => !j.resolvedAt)
      .sort((a, b) => b.createdAt.getTime() - a.createdAt.getTime());
  }

  getReplayable(maxAttempts = 5): DeadLetterJob[] {
    return this.getUnresolved().filter(j => j.attempts < maxAttempts);
  }

  getByWorker(workerName: string): DeadLetterJob[] {
    return this.getUnresolved().filter(j => j.workerName === workerName);
  }

  getStats() {
    const jobs = Array.from(this.jobs.values());
    const unresolved = jobs.filter(j => !j.resolvedAt);
    
    const byWorker: Record<string, number> = {};
    const byErrorType: Record<string, number> = {};
    
    for (const job of unresolved) {
      byWorker[job.workerName] = (byWorker[job.workerName] || 0) + 1;
      byErrorType[job.errorType] = (byErrorType[job.errorType] || 0) + 1;
    }

    return { total: jobs.length, unresolved: unresolved.length, byWorker, byErrorType };
  }

  cleanupResolved(olderThanHours = 24): number {
    const cutoff = Date.now() - olderThanHours * 60 * 60 * 1000;
    let deleted = 0;

    for (const [id, job] of this.jobs) {
      if (job.resolvedAt && job.resolvedAt.getTime() < cutoff) {
        this.jobs.delete(id);
        deleted++;
      }
    }
    return deleted;
  }

  private enforceMaxSize(): void {
    if (this.jobs.size <= this.maxSize) return;

    // Remove oldest resolved first, then oldest unresolved
    const sorted = Array.from(this.jobs.entries())
      .sort((a, b) => {
        if (a[1].resolvedAt && !b[1].resolvedAt) return -1;
        return a[1].createdAt.getTime() - b[1].createdAt.getTime();
      });

    while (sorted.length > this.maxSize) {
      const [id] = sorted.shift()!;
      this.jobs.delete(id);
    }
  }
}

// Singleton
let dlq: DeadLetterQueue | null = null;
export function getDeadLetterQueue(): DeadLetterQueue {
  if (!dlq) dlq = new DeadLetterQueue();
  return dlq;
}

Python Implementation

Python 实现

python
undefined
python
undefined

dead_letter_queue.py

dead_letter_queue.py

from dataclasses import dataclass, field from datetime import datetime from typing import Dict, List, Optional, Any
@dataclass class DeadLetterJob: id: str worker_name: str payload: Dict[str, Any] error_message: str error_type: str attempts: int attempt_errors: List[str] first_attempt_at: datetime last_attempt_at: datetime created_at: datetime stack_trace: Optional[str] = None resolved_at: Optional[datetime] = None resolution: Optional[str] = None
class DeadLetterQueue: def init(self, max_size: int = 1000): self._jobs: Dict[str, DeadLetterJob] = {} self._max_size = max_size self._counter = 0
def add(
    self,
    worker_name: str,
    payload: Dict[str, Any],
    error_message: str,
    error_type: str,
    attempts: int,
    stack_trace: Optional[str] = None,
) -> DeadLetterJob:
    self._counter += 1
    job_id = f"dlq_{self._counter}_{int(datetime.now().timestamp())}"
    now = datetime.now()

    job = DeadLetterJob(
        id=job_id,
        worker_name=worker_name,
        payload=payload,
        error_message=error_message,
        error_type=error_type,
        stack_trace=stack_trace,
        attempts=attempts,
        attempt_errors=[error_message],
        first_attempt_at=now,
        last_attempt_at=now,
        created_at=now,
    )

    self._jobs[job_id] = job
    self._enforce_max_size()
    return job

def record_attempt(self, job_id: str, error_message: str) -> bool:
    job = self._jobs.get(job_id)
    if not job:
        return False

    job.attempts += 1
    job.last_attempt_at = datetime.now()
    job.attempt_errors.append(error_message)
    job.error_message = error_message
    return True

def resolve(self, job_id: str, resolution: str) -> bool:
    job = self._jobs.get(job_id)
    if not job:
        return False

    job.resolved_at = datetime.now()
    job.resolution = resolution
    return True

def get_unresolved(self) -> List[DeadLetterJob]:
    return sorted(
        [j for j in self._jobs.values() if not j.resolved_at],
        key=lambda j: j.created_at,
        reverse=True,
    )

def get_replayable(self, max_attempts: int = 5) -> List[DeadLetterJob]:
    return [j for j in self.get_unresolved() if j.attempts < max_attempts]

def get_stats(self) -> Dict[str, Any]:
    unresolved = self.get_unresolved()
    by_worker: Dict[str, int] = {}
    by_error: Dict[str, int] = {}

    for job in unresolved:
        by_worker[job.worker_name] = by_worker.get(job.worker_name, 0) + 1
        by_error[job.error_type] = by_error.get(job.error_type, 0) + 1

    return {
        "total": len(self._jobs),
        "unresolved": len(unresolved),
        "by_worker": by_worker,
        "by_error_type": by_error,
    }

def _enforce_max_size(self):
    if len(self._jobs) <= self._max_size:
        return

    # Sort: resolved first, then by age
    sorted_jobs = sorted(
        self._jobs.items(),
        key=lambda x: (x[1].resolved_at is None, x[1].created_at),
    )

    while len(sorted_jobs) > self._max_size:
        job_id, _ = sorted_jobs.pop(0)
        del self._jobs[job_id]
from dataclasses import dataclass, field from datetime import datetime from typing import Dict, List, Optional, Any
@dataclass class DeadLetterJob: id: str worker_name: str payload: Dict[str, Any] error_message: str error_type: str attempts: int attempt_errors: List[str] first_attempt_at: datetime last_attempt_at: datetime created_at: datetime stack_trace: Optional[str] = None resolved_at: Optional[datetime] = None resolution: Optional[str] = None
class DeadLetterQueue: def init(self, max_size: int = 1000): self._jobs: Dict[str, DeadLetterJob] = {} self._max_size = max_size self._counter = 0
def add(
    self,
    worker_name: str,
    payload: Dict[str, Any],
    error_message: str,
    error_type: str,
    attempts: int,
    stack_trace: Optional[str] = None,
) -> DeadLetterJob:
    self._counter += 1
    job_id = f"dlq_{self._counter}_{int(datetime.now().timestamp())}"
    now = datetime.now()

    job = DeadLetterJob(
        id=job_id,
        worker_name=worker_name,
        payload=payload,
        error_message=error_message,
        error_type=error_type,
        stack_trace=stack_trace,
        attempts=attempts,
        attempt_errors=[error_message],
        first_attempt_at=now,
        last_attempt_at=now,
        created_at=now,
    )

    self._jobs[job_id] = job
    self._enforce_max_size()
    return job

def record_attempt(self, job_id: str, error_message: str) -> bool:
    job = self._jobs.get(job_id)
    if not job:
        return False

    job.attempts += 1
    job.last_attempt_at = datetime.now()
    job.attempt_errors.append(error_message)
    job.error_message = error_message
    return True

def resolve(self, job_id: str, resolution: str) -> bool:
    job = self._jobs.get(job_id)
    if not job:
        return False

    job.resolved_at = datetime.now()
    job.resolution = resolution
    return True

def get_unresolved(self) -> List[DeadLetterJob]:
    return sorted(
        [j for j in self._jobs.values() if not j.resolved_at],
        key=lambda j: j.created_at,
        reverse=True,
    )

def get_replayable(self, max_attempts: int = 5) -> List[DeadLetterJob]:
    return [j for j in self.get_unresolved() if j.attempts < max_attempts]

def get_stats(self) -> Dict[str, Any]:
    unresolved = self.get_unresolved()
    by_worker: Dict[str, int] = {}
    by_error: Dict[str, int] = {}

    for job in unresolved:
        by_worker[job.worker_name] = by_worker.get(job.worker_name, 0) + 1
        by_error[job.error_type] = by_error.get(job.error_type, 0) + 1

    return {
        "total": len(self._jobs),
        "unresolved": len(unresolved),
        "by_worker": by_worker,
        "by_error_type": by_error,
    }

def _enforce_max_size(self):
    if len(self._jobs) <= self._max_size:
        return

    # Sort: resolved first, then by age
    sorted_jobs = sorted(
        self._jobs.items(),
        key=lambda x: (x[1].resolved_at is None, x[1].created_at),
    )

    while len(sorted_jobs) > self._max_size:
        job_id, _ = sorted_jobs.pop(0)
        del self._jobs[job_id]

Singleton

Singleton

_dlq: Optional[DeadLetterQueue] = None
def get_dead_letter_queue() -> DeadLetterQueue: global _dlq if _dlq is None: _dlq = DeadLetterQueue() return _dlq
undefined
_dlq: Optional[DeadLetterQueue] = None
def get_dead_letter_queue() -> DeadLetterQueue: global _dlq if _dlq is None: _dlq = DeadLetterQueue() return _dlq
undefined

Usage Examples

使用示例

Worker Integration

工作进程集成

typescript
const dlq = getDeadLetterQueue();
const MAX_RETRIES = 3;

async function processJob(job: Job) {
  try {
    await doWork(job.payload);
  } catch (error) {
    if (job.attempts >= MAX_RETRIES) {
      dlq.add(
        'my-worker',
        job.payload,
        error.message,
        error.name,
        job.attempts,
        error.stack
      );
    } else {
      throw error; // Let retry mechanism handle it
    }
  }
}
typescript
const dlq = getDeadLetterQueue();
const MAX_RETRIES = 3;

async function processJob(job: Job) {
  try {
    await doWork(job.payload);
  } catch (error) {
    if (job.attempts >= MAX_RETRIES) {
      dlq.add(
        'my-worker',
        job.payload,
        error.message,
        error.name,
        job.attempts,
        error.stack
      );
    } else {
      throw error; // Let retry mechanism handle it
    }
  }
}

Admin Replay

管理员重放

typescript
async function replayFailedJobs() {
  const dlq = getDeadLetterQueue();
  const replayable = dlq.getReplayable();
  
  for (const job of replayable) {
    try {
      await processJob({ payload: job.payload, attempts: 0 });
      dlq.resolve(job.id, 'Replayed successfully');
    } catch (e) {
      dlq.recordAttempt(job.id, e.message);
    }
  }
}
typescript
async function replayFailedJobs() {
  const dlq = getDeadLetterQueue();
  const replayable = dlq.getReplayable();
  
  for (const job of replayable) {
    try {
      await processJob({ payload: job.payload, attempts: 0 });
      dlq.resolve(job.id, 'Replayed successfully');
    } catch (e) {
      dlq.recordAttempt(job.id, e.message);
    }
  }
}

Monitoring Endpoint

监控端点

typescript
app.get('/admin/dlq/stats', (req, res) => {
  const dlq = getDeadLetterQueue();
  res.json(dlq.getStats());
});

app.get('/admin/dlq/jobs', (req, res) => {
  const dlq = getDeadLetterQueue();
  res.json(dlq.getUnresolved());
});

app.post('/admin/dlq/jobs/:id/replay', async (req, res) => {
  const dlq = getDeadLetterQueue();
  const job = dlq.getUnresolved().find(j => j.id === req.params.id);
  
  if (!job) {
    return res.status(404).json({ error: 'Job not found' });
  }
  
  // Replay logic...
});
typescript
app.get('/admin/dlq/stats', (req, res) => {
  const dlq = getDeadLetterQueue();
  res.json(dlq.getStats());
});

app.get('/admin/dlq/jobs', (req, res) => {
  const dlq = getDeadLetterQueue();
  res.json(dlq.getUnresolved());
});

app.post('/admin/dlq/jobs/:id/replay', async (req, res) => {
  const dlq = getDeadLetterQueue();
  const job = dlq.getUnresolved().find(j => j.id === req.params.id);
  
  if (!job) {
    return res.status(404).json({ error: 'Job not found' });
  }
  
  // Replay logic...
});

Best Practices

最佳实践

  1. Store full context - Include everything needed to replay
  2. Track all errors - Keep history of attempt failures
  3. Enforce size limits - Prevent memory exhaustion
  4. Expose stats - Monitor failure patterns
  5. Cleanup resolved - Don't keep forever
  1. 存储完整上下文 - 包含任务重放所需的所有信息
  2. 跟踪所有错误 - 保留每次尝试的失败历史
  3. 限制队列大小 - 防止内存耗尽
  4. 暴露统计数据 - 监控失败模式
  5. 清理已解决任务 - 不要永久保留

Common Mistakes

常见错误

  • Not storing enough context to replay
  • Unbounded queue growth
  • No visibility into failure patterns
  • Forgetting to cleanup old resolved jobs
  • Not tracking attempt history
  • 未存储足够的上下文信息以支持任务重放
  • 队列无限制增长
  • 对失败模式缺乏可见性
  • 忘记清理旧的已解决任务
  • 未跟踪尝试历史

Related Skills

相关技术方案

  • Background Jobs
  • Error Handling
  • Graceful Shutdown
  • 后台任务
  • 错误处理
  • 优雅停机