worker-health-monitoring

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Worker Health Monitoring

Worker健康监控

Heartbeat-based health monitoring for background workers.
基于心跳的后台Worker健康监控。

When to Use This Skill

何时使用该方案

  • Monitoring background job workers
  • Detecting offline or stuck workers
  • Tracking worker performance degradation
  • Calculating failure rates and latency percentiles
  • 监控后台任务Worker
  • 检测离线或停滞的Worker
  • 跟踪Worker性能退化
  • 计算失败率和延迟百分位数

Core Concepts

核心概念

Workers can fail in subtle ways:
  • Offline - No heartbeat received
  • Degraded - Slow or occasionally failing
  • Unhealthy - High failure rate
  • Stuck - Started but never completed
The solution uses heartbeats, rolling windows, and configurable thresholds.
Worker可能会以多种隐蔽方式出现故障:
  • 离线 - 未收到心跳
  • 退化 - 响应缓慢或偶尔失败
  • 异常 - 高失败率
  • 停滞 - 已启动但从未完成
该方案使用心跳、滚动窗口和可配置阈值来解决这些问题。

Implementation

实现方案

TypeScript

TypeScript

typescript
enum HealthStatus {
  HEALTHY = 'healthy',
  DEGRADED = 'degraded',
  UNHEALTHY = 'unhealthy',
  OFFLINE = 'offline',
  UNKNOWN = 'unknown',
}

interface HealthThresholds {
  heartbeatTimeoutSeconds: number;
  degradedFailureRate: number;
  unhealthyFailureRate: number;
  degradedLatencyMultiplier: number;
  unhealthyLatencyMultiplier: number;
  maxQueueDepth: number;
}

interface WorkerHealthState {
  workerName: string;
  status: HealthStatus;
  lastHeartbeat?: Date;
  heartbeatCount: number;
  jobsProcessed: number;
  jobsFailed: number;
  avgDurationMs: number;
  lastDurationMs: number;
  expectedDurationMs: number;
  queueDepth: number;
  memoryMb: number;
  cpuPercent: number;
}

interface HealthSummary {
  totalWorkers: number;
  byStatus: Record<HealthStatus, number>;
  healthyCount: number;
  unhealthyCount: number;
  totalJobsProcessed: number;
  totalJobsFailed: number;
  overallFailureRate: number;
  systemStatus: 'healthy' | 'degraded' | 'unhealthy';
}

const DEFAULT_THRESHOLDS: HealthThresholds = {
  heartbeatTimeoutSeconds: 60,
  degradedFailureRate: 0.05,
  unhealthyFailureRate: 0.15,
  degradedLatencyMultiplier: 1.5,
  unhealthyLatencyMultiplier: 3.0,
  maxQueueDepth: 100,
};

class HealthMonitor {
  private workers = new Map<string, WorkerHealthState>();
  private thresholds: HealthThresholds;
  private durations = new Map<string, number[]>();

  constructor(thresholds: Partial<HealthThresholds> = {}) {
    this.thresholds = { ...DEFAULT_THRESHOLDS, ...thresholds };
  }

  registerWorker(workerName: string, expectedDurationMs: number): void {
    if (!this.workers.has(workerName)) {
      this.workers.set(workerName, {
        workerName,
        status: HealthStatus.UNKNOWN,
        heartbeatCount: 0,
        jobsProcessed: 0,
        jobsFailed: 0,
        avgDurationMs: 0,
        lastDurationMs: 0,
        expectedDurationMs,
        queueDepth: 0,
        memoryMb: 0,
        cpuPercent: 0,
      });
      this.durations.set(workerName, []);
    }
  }

  recordHeartbeat(
    workerName: string,
    metrics: { memoryMb?: number; cpuPercent?: number; queueDepth?: number } = {}
  ): void {
    const state = this.workers.get(workerName);
    if (!state) return;

    state.lastHeartbeat = new Date();
    state.heartbeatCount++;
    state.memoryMb = metrics.memoryMb ?? state.memoryMb;
    state.cpuPercent = metrics.cpuPercent ?? state.cpuPercent;
    state.queueDepth = metrics.queueDepth ?? state.queueDepth;
    state.status = this.determineStatus(state);
  }

  recordExecutionComplete(
    workerName: string,
    success: boolean,
    durationMs: number
  ): void {
    const state = this.workers.get(workerName);
    if (!state) return;

    state.jobsProcessed++;
    if (!success) state.jobsFailed++;
    state.lastDurationMs = durationMs;
    state.lastHeartbeat = new Date();

    // Update rolling duration window (keep last 100)
    const durations = this.durations.get(workerName) || [];
    durations.push(durationMs);
    if (durations.length > 100) durations.shift();
    this.durations.set(workerName, durations);

    state.avgDurationMs = durations.reduce((a, b) => a + b, 0) / durations.length;
    state.status = this.determineStatus(state);
  }

  private determineStatus(state: WorkerHealthState): HealthStatus {
    const now = new Date();

    // Check heartbeat
    if (!state.lastHeartbeat) return HealthStatus.OFFLINE;
    
    const heartbeatAge = (now.getTime() - state.lastHeartbeat.getTime()) / 1000;
    if (heartbeatAge > this.thresholds.heartbeatTimeoutSeconds) {
      return HealthStatus.OFFLINE;
    }

    // Check failure rate
    const failureRate = state.jobsProcessed > 0
      ? state.jobsFailed / state.jobsProcessed
      : 0;

    if (failureRate >= this.thresholds.unhealthyFailureRate) {
      return HealthStatus.UNHEALTHY;
    }
    if (failureRate >= this.thresholds.degradedFailureRate) {
      return HealthStatus.DEGRADED;
    }

    // Check latency
    if (state.avgDurationMs > state.expectedDurationMs * this.thresholds.unhealthyLatencyMultiplier) {
      return HealthStatus.UNHEALTHY;
    }
    if (state.avgDurationMs > state.expectedDurationMs * this.thresholds.degradedLatencyMultiplier) {
      return HealthStatus.DEGRADED;
    }

    // Check queue depth
    if (state.queueDepth > this.thresholds.maxQueueDepth) {
      return HealthStatus.DEGRADED;
    }

    return HealthStatus.HEALTHY;
  }

  getHealthSummary(): HealthSummary {
    const byStatus: Record<HealthStatus, number> = {
      healthy: 0, degraded: 0, unhealthy: 0, offline: 0, unknown: 0,
    };

    let totalJobs = 0, totalFailed = 0;

    for (const state of this.workers.values()) {
      state.status = this.determineStatus(state);
      byStatus[state.status]++;
      totalJobs += state.jobsProcessed;
      totalFailed += state.jobsFailed;
    }

    const unhealthyCount = byStatus.unhealthy + byStatus.offline;
    let systemStatus: 'healthy' | 'degraded' | 'unhealthy' = 'healthy';
    if (unhealthyCount > 0) systemStatus = 'unhealthy';
    else if (byStatus.degraded > 0) systemStatus = 'degraded';

    return {
      totalWorkers: this.workers.size,
      byStatus,
      healthyCount: byStatus.healthy,
      unhealthyCount,
      totalJobsProcessed: totalJobs,
      totalJobsFailed: totalFailed,
      overallFailureRate: totalJobs > 0 ? totalFailed / totalJobs : 0,
      systemStatus,
    };
  }

  getPercentileDuration(workerName: string, percentile: number): number {
    const durations = this.durations.get(workerName);
    if (!durations || durations.length === 0) return 0;

    const sorted = [...durations].sort((a, b) => a - b);
    const index = Math.ceil((percentile / 100) * sorted.length) - 1;
    return sorted[Math.max(0, index)];
  }

  checkStuckJobs(maxAgeSeconds = 300): string[] {
    const stuck: string[] = [];
    const now = new Date();

    for (const [name, state] of this.workers) {
      if (state.lastHeartbeat) {
        const age = (now.getTime() - state.lastHeartbeat.getTime()) / 1000;
        if (age > maxAgeSeconds && state.status !== HealthStatus.OFFLINE) {
          stuck.push(name);
        }
      }
    }
    return stuck;
  }
}

// Singleton
let monitor: HealthMonitor | null = null;
export function getHealthMonitor(): HealthMonitor {
  if (!monitor) monitor = new HealthMonitor();
  return monitor;
}
typescript
enum HealthStatus {
  HEALTHY = 'healthy',
  DEGRADED = 'degraded',
  UNHEALTHY = 'unhealthy',
  OFFLINE = 'offline',
  UNKNOWN = 'unknown',
}

interface HealthThresholds {
  heartbeatTimeoutSeconds: number;
  degradedFailureRate: number;
  unhealthyFailureRate: number;
  degradedLatencyMultiplier: number;
  unhealthyLatencyMultiplier: number;
  maxQueueDepth: number;
}

interface WorkerHealthState {
  workerName: string;
  status: HealthStatus;
  lastHeartbeat?: Date;
  heartbeatCount: number;
  jobsProcessed: number;
  jobsFailed: number;
  avgDurationMs: number;
  lastDurationMs: number;
  expectedDurationMs: number;
  queueDepth: number;
  memoryMb: number;
  cpuPercent: number;
}

interface HealthSummary {
  totalWorkers: number;
  byStatus: Record<HealthStatus, number>;
  healthyCount: number;
  unhealthyCount: number;
  totalJobsProcessed: number;
  totalJobsFailed: number;
  overallFailureRate: number;
  systemStatus: 'healthy' | 'degraded' | 'unhealthy';
}

const DEFAULT_THRESHOLDS: HealthThresholds = {
  heartbeatTimeoutSeconds: 60,
  degradedFailureRate: 0.05,
  unhealthyFailureRate: 0.15,
  degradedLatencyMultiplier: 1.5,
  unhealthyLatencyMultiplier: 3.0,
  maxQueueDepth: 100,
};

class HealthMonitor {
  private workers = new Map<string, WorkerHealthState>();
  private thresholds: HealthThresholds;
  private durations = new Map<string, number[]>();

  constructor(thresholds: Partial<HealthThresholds> = {}) {
    this.thresholds = { ...DEFAULT_THRESHOLDS, ...thresholds };
  }

  registerWorker(workerName: string, expectedDurationMs: number): void {
    if (!this.workers.has(workerName)) {
      this.workers.set(workerName, {
        workerName,
        status: HealthStatus.UNKNOWN,
        heartbeatCount: 0,
        jobsProcessed: 0,
        jobsFailed: 0,
        avgDurationMs: 0,
        lastDurationMs: 0,
        expectedDurationMs,
        queueDepth: 0,
        memoryMb: 0,
        cpuPercent: 0,
      });
      this.durations.set(workerName, []);
    }
  }

  recordHeartbeat(
    workerName: string,
    metrics: { memoryMb?: number; cpuPercent?: number; queueDepth?: number } = {}
  ): void {
    const state = this.workers.get(workerName);
    if (!state) return;

    state.lastHeartbeat = new Date();
    state.heartbeatCount++;
    state.memoryMb = metrics.memoryMb ?? state.memoryMb;
    state.cpuPercent = metrics.cpuPercent ?? state.cpuPercent;
    state.queueDepth = metrics.queueDepth ?? state.queueDepth;
    state.status = this.determineStatus(state);
  }

  recordExecutionComplete(
    workerName: string,
    success: boolean,
    durationMs: number
  ): void {
    const state = this.workers.get(workerName);
    if (!state) return;

    state.jobsProcessed++;
    if (!success) state.jobsFailed++;
    state.lastDurationMs = durationMs;
    state.lastHeartbeat = new Date();

    // Update rolling duration window (keep last 100)
    const durations = this.durations.get(workerName) || [];
    durations.push(durationMs);
    if (durations.length > 100) durations.shift();
    this.durations.set(workerName, durations);

    state.avgDurationMs = durations.reduce((a, b) => a + b, 0) / durations.length;
    state.status = this.determineStatus(state);
  }

  private determineStatus(state: WorkerHealthState): HealthStatus {
    const now = new Date();

    // Check heartbeat
    if (!state.lastHeartbeat) return HealthStatus.OFFLINE;
    
    const heartbeatAge = (now.getTime() - state.lastHeartbeat.getTime()) / 1000;
    if (heartbeatAge > this.thresholds.heartbeatTimeoutSeconds) {
      return HealthStatus.OFFLINE;
    }

    // Check failure rate
    const failureRate = state.jobsProcessed > 0
      ? state.jobsFailed / state.jobsProcessed
      : 0;

    if (failureRate >= this.thresholds.unhealthyFailureRate) {
      return HealthStatus.UNHEALTHY;
    }
    if (failureRate >= this.thresholds.degradedFailureRate) {
      return HealthStatus.DEGRADED;
    }

    // Check latency
    if (state.avgDurationMs > state.expectedDurationMs * this.thresholds.unhealthyLatencyMultiplier) {
      return HealthStatus.UNHEALTHY;
    }
    if (state.avgDurationMs > state.expectedDurationMs * this.thresholds.degradedLatencyMultiplier) {
      return HealthStatus.DEGRADED;
    }

    // Check queue depth
    if (state.queueDepth > this.thresholds.maxQueueDepth) {
      return HealthStatus.DEGRADED;
    }

    return HealthStatus.HEALTHY;
  }

  getHealthSummary(): HealthSummary {
    const byStatus: Record<HealthStatus, number> = {
      healthy: 0, degraded: 0, unhealthy: 0, offline: 0, unknown: 0,
    };

    let totalJobs = 0, totalFailed = 0;

    for (const state of this.workers.values()) {
      state.status = this.determineStatus(state);
      byStatus[state.status]++;
      totalJobs += state.jobsProcessed;
      totalFailed += state.jobsFailed;
    }

    const unhealthyCount = byStatus.unhealthy + byStatus.offline;
    let systemStatus: 'healthy' | 'degraded' | 'unhealthy' = 'healthy';
    if (unhealthyCount > 0) systemStatus = 'unhealthy';
    else if (byStatus.degraded > 0) systemStatus = 'degraded';

    return {
      totalWorkers: this.workers.size,
      byStatus,
      healthyCount: byStatus.healthy,
      unhealthyCount,
      totalJobsProcessed: totalJobs,
      totalJobsFailed: totalFailed,
      overallFailureRate: totalJobs > 0 ? totalFailed / totalJobs : 0,
      systemStatus,
    };
  }

  getPercentileDuration(workerName: string, percentile: number): number {
    const durations = this.durations.get(workerName);
    if (!durations || durations.length === 0) return 0;

    const sorted = [...durations].sort((a, b) => a - b);
    const index = Math.ceil((percentile / 100) * sorted.length) - 1;
    return sorted[Math.max(0, index)];
  }

  checkStuckJobs(maxAgeSeconds = 300): string[] {
    const stuck: string[] = [];
    const now = new Date();

    for (const [name, state] of this.workers) {
      if (state.lastHeartbeat) {
        const age = (now.getTime() - state.lastHeartbeat.getTime()) / 1000;
        if (age > maxAgeSeconds && state.status !== HealthStatus.OFFLINE) {
          stuck.push(name);
        }
      }
    }
    return stuck;
  }
}

// Singleton
let monitor: HealthMonitor | null = null;
export function getHealthMonitor(): HealthMonitor {
  if (!monitor) monitor = new HealthMonitor();
  return monitor;
}

Python

Python

python
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Dict, List, Optional
from enum import Enum


class HealthStatus(str, Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"
    OFFLINE = "offline"
    UNKNOWN = "unknown"


@dataclass
class HealthThresholds:
    heartbeat_timeout_seconds: int = 60
    degraded_failure_rate: float = 0.05
    unhealthy_failure_rate: float = 0.15
    degraded_latency_multiplier: float = 1.5
    unhealthy_latency_multiplier: float = 3.0
    max_queue_depth: int = 100


@dataclass
class WorkerHealthState:
    worker_name: str
    expected_duration_ms: float
    status: HealthStatus = HealthStatus.UNKNOWN
    last_heartbeat: Optional[datetime] = None
    heartbeat_count: int = 0
    jobs_processed: int = 0
    jobs_failed: int = 0
    avg_duration_ms: float = 0
    last_duration_ms: float = 0
    queue_depth: int = 0
    memory_mb: float = 0
    cpu_percent: float = 0


class HealthMonitor:
    def __init__(self, thresholds: Optional[HealthThresholds] = None):
        self._thresholds = thresholds or HealthThresholds()
        self._workers: Dict[str, WorkerHealthState] = {}
        self._durations: Dict[str, List[float]] = {}
    
    def register_worker(self, worker_name: str, expected_duration_ms: float) -> None:
        if worker_name not in self._workers:
            self._workers[worker_name] = WorkerHealthState(
                worker_name=worker_name,
                expected_duration_ms=expected_duration_ms,
            )
            self._durations[worker_name] = []
    
    def record_heartbeat(
        self,
        worker_name: str,
        memory_mb: float = 0,
        cpu_percent: float = 0,
        queue_depth: int = 0,
    ) -> None:
        state = self._workers.get(worker_name)
        if not state:
            return
        
        state.last_heartbeat = datetime.now(timezone.utc)
        state.heartbeat_count += 1
        state.memory_mb = memory_mb
        state.cpu_percent = cpu_percent
        state.queue_depth = queue_depth
        state.status = self._determine_status(state)
    
    def record_execution_complete(
        self,
        worker_name: str,
        success: bool,
        duration_ms: float,
    ) -> None:
        state = self._workers.get(worker_name)
        if not state:
            return
        
        state.jobs_processed += 1
        if not success:
            state.jobs_failed += 1
        state.last_duration_ms = duration_ms
        state.last_heartbeat = datetime.now(timezone.utc)
        
        # Update rolling window
        durations = self._durations.get(worker_name, [])
        durations.append(duration_ms)
        if len(durations) > 100:
            durations.pop(0)
        self._durations[worker_name] = durations
        
        state.avg_duration_ms = sum(durations) / len(durations) if durations else 0
        state.status = self._determine_status(state)
    
    def _determine_status(self, state: WorkerHealthState) -> HealthStatus:
        now = datetime.now(timezone.utc)
        
        if not state.last_heartbeat:
            return HealthStatus.OFFLINE
        
        heartbeat_age = (now - state.last_heartbeat).total_seconds()
        if heartbeat_age > self._thresholds.heartbeat_timeout_seconds:
            return HealthStatus.OFFLINE
        
        failure_rate = state.jobs_failed / state.jobs_processed if state.jobs_processed > 0 else 0
        
        if failure_rate >= self._thresholds.unhealthy_failure_rate:
            return HealthStatus.UNHEALTHY
        if failure_rate >= self._thresholds.degraded_failure_rate:
            return HealthStatus.DEGRADED
        
        if state.avg_duration_ms > state.expected_duration_ms * self._thresholds.unhealthy_latency_multiplier:
            return HealthStatus.UNHEALTHY
        if state.avg_duration_ms > state.expected_duration_ms * self._thresholds.degraded_latency_multiplier:
            return HealthStatus.DEGRADED
        
        if state.queue_depth > self._thresholds.max_queue_depth:
            return HealthStatus.DEGRADED
        
        return HealthStatus.HEALTHY
    
    def get_health_summary(self) -> dict:
        by_status = {s.value: 0 for s in HealthStatus}
        total_jobs = 0
        total_failed = 0
        
        for state in self._workers.values():
            state.status = self._determine_status(state)
            by_status[state.status.value] += 1
            total_jobs += state.jobs_processed
            total_failed += state.jobs_failed
        
        unhealthy_count = by_status["unhealthy"] + by_status["offline"]
        
        if unhealthy_count > 0:
            system_status = "unhealthy"
        elif by_status["degraded"] > 0:
            system_status = "degraded"
        else:
            system_status = "healthy"
        
        return {
            "total_workers": len(self._workers),
            "by_status": by_status,
            "healthy_count": by_status["healthy"],
            "unhealthy_count": unhealthy_count,
            "total_jobs_processed": total_jobs,
            "total_jobs_failed": total_failed,
            "overall_failure_rate": total_failed / total_jobs if total_jobs > 0 else 0,
            "system_status": system_status,
        }
    
    def get_percentile_duration(self, worker_name: str, percentile: float) -> float:
        durations = self._durations.get(worker_name, [])
        if not durations:
            return 0
        
        sorted_durations = sorted(durations)
        index = int((percentile / 100) * len(sorted_durations)) - 1
        return sorted_durations[max(0, index)]
python
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Dict, List, Optional
from enum import Enum


class HealthStatus(str, Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"
    OFFLINE = "offline"
    UNKNOWN = "unknown"


@dataclass
class HealthThresholds:
    heartbeat_timeout_seconds: int = 60
    degraded_failure_rate: float = 0.05
    unhealthy_failure_rate: float = 0.15
    degraded_latency_multiplier: float = 1.5
    unhealthy_latency_multiplier: float = 3.0
    max_queue_depth: int = 100


@dataclass
class WorkerHealthState:
    worker_name: str
    expected_duration_ms: float
    status: HealthStatus = HealthStatus.UNKNOWN
    last_heartbeat: Optional[datetime] = None
    heartbeat_count: int = 0
    jobs_processed: int = 0
    jobs_failed: int = 0
    avg_duration_ms: float = 0
    last_duration_ms: float = 0
    queue_depth: int = 0
    memory_mb: float = 0
    cpu_percent: float = 0


class HealthMonitor:
    def __init__(self, thresholds: Optional[HealthThresholds] = None):
        self._thresholds = thresholds or HealthThresholds()
        self._workers: Dict[str, WorkerHealthState] = {}
        self._durations: Dict[str, List[float]] = {}
    
    def register_worker(self, worker_name: str, expected_duration_ms: float) -> None:
        if worker_name not in self._workers:
            self._workers[worker_name] = WorkerHealthState(
                worker_name=worker_name,
                expected_duration_ms=expected_duration_ms,
            )
            self._durations[worker_name] = []
    
    def record_heartbeat(
        self,
        worker_name: str,
        memory_mb: float = 0,
        cpu_percent: float = 0,
        queue_depth: int = 0,
    ) -> None:
        state = self._workers.get(worker_name)
        if not state:
            return
        
        state.last_heartbeat = datetime.now(timezone.utc)
        state.heartbeat_count += 1
        state.memory_mb = memory_mb
        state.cpu_percent = cpu_percent
        state.queue_depth = queue_depth
        state.status = self._determine_status(state)
    
    def record_execution_complete(
        self,
        worker_name: str,
        success: bool,
        duration_ms: float,
    ) -> None:
        state = self._workers.get(worker_name)
        if not state:
            return
        
        state.jobs_processed += 1
        if not success:
            state.jobs_failed += 1
        state.last_duration_ms = duration_ms
        state.last_heartbeat = datetime.now(timezone.utc)
        
        # Update rolling window
        durations = self._durations.get(worker_name, [])
        durations.append(duration_ms)
        if len(durations) > 100:
            durations.pop(0)
        self._durations[worker_name] = durations
        
        state.avg_duration_ms = sum(durations) / len(durations) if durations else 0
        state.status = self._determine_status(state)
    
    def _determine_status(self, state: WorkerHealthState) -> HealthStatus:
        now = datetime.now(timezone.utc)
        
        if not state.last_heartbeat:
            return HealthStatus.OFFLINE
        
        heartbeat_age = (now - state.last_heartbeat).total_seconds()
        if heartbeat_age > self._thresholds.heartbeat_timeout_seconds:
            return HealthStatus.OFFLINE
        
        failure_rate = state.jobs_failed / state.jobs_processed if state.jobs_processed > 0 else 0
        
        if failure_rate >= self._thresholds.unhealthy_failure_rate:
            return HealthStatus.UNHEALTHY
        if failure_rate >= self._thresholds.degraded_failure_rate:
            return HealthStatus.DEGRADED
        
        if state.avg_duration_ms > state.expected_duration_ms * self._thresholds.unhealthy_latency_multiplier:
            return HealthStatus.UNHEALTHY
        if state.avg_duration_ms > state.expected_duration_ms * self._thresholds.degraded_latency_multiplier:
            return HealthStatus.DEGRADED
        
        if state.queue_depth > self._thresholds.max_queue_depth:
            return HealthStatus.DEGRADED
        
        return HealthStatus.HEALTHY
    
    def get_health_summary(self) -> dict:
        by_status = {s.value: 0 for s in HealthStatus}
        total_jobs = 0
        total_failed = 0
        
        for state in self._workers.values():
            state.status = self._determine_status(state)
            by_status[state.status.value] += 1
            total_jobs += state.jobs_processed
            total_failed += state.jobs_failed
        
        unhealthy_count = by_status["unhealthy"] + by_status["offline"]
        
        if unhealthy_count > 0:
            system_status = "unhealthy"
        elif by_status["degraded"] > 0:
            system_status = "degraded"
        else:
            system_status = "healthy"
        
        return {
            "total_workers": len(self._workers),
            "by_status": by_status,
            "healthy_count": by_status["healthy"],
            "unhealthy_count": unhealthy_count,
            "total_jobs_processed": total_jobs,
            "total_jobs_failed": total_failed,
            "overall_failure_rate": total_failed / total_jobs if total_jobs > 0 else 0,
            "system_status": system_status,
        }
    
    def get_percentile_duration(self, worker_name: str, percentile: float) -> float:
        durations = self._durations.get(worker_name, [])
        if not durations:
            return 0
        
        sorted_durations = sorted(durations)
        index = int((percentile / 100) * len(sorted_durations)) - 1
        return sorted_durations[max(0, index)]

Singleton

Singleton

_monitor: Optional[HealthMonitor] = None
def get_health_monitor() -> HealthMonitor: global _monitor if _monitor is None: _monitor = HealthMonitor() return _monitor
undefined
_monitor: Optional[HealthMonitor] = None
def get_health_monitor() -> HealthMonitor: global _monitor if _monitor is None: _monitor = HealthMonitor() return _monitor
undefined

Usage Examples

使用示例

Worker Registration

Worker注册

typescript
const monitor = getHealthMonitor();

// Register workers with expected durations
monitor.registerWorker('email-sender', 5000);     // 5s expected
monitor.registerWorker('data-processor', 30000);  // 30s expected
monitor.registerWorker('report-generator', 60000); // 60s expected
typescript
const monitor = getHealthMonitor();

// Register workers with expected durations
monitor.registerWorker('email-sender', 5000);     // 5s expected
monitor.registerWorker('data-processor', 30000);  // 30s expected
monitor.registerWorker('report-generator', 60000); // 60s expected

Job Execution Tracking

任务执行跟踪

typescript
async function processJob(job: Job) {
  const startTime = Date.now();

  try {
    await doWork(job);
    monitor.recordExecutionComplete('data-processor', true, Date.now() - startTime);
  } catch (error) {
    monitor.recordExecutionComplete('data-processor', false, Date.now() - startTime);
    throw error;
  }
}
typescript
async function processJob(job: Job) {
  const startTime = Date.now();

  try {
    await doWork(job);
    monitor.recordExecutionComplete('data-processor', true, Date.now() - startTime);
  } catch (error) {
    monitor.recordExecutionComplete('data-processor', false, Date.now() - startTime);
    throw error;
  }
}

Heartbeat Loop

心跳循环

typescript
setInterval(() => {
  const memUsage = process.memoryUsage();
  
  monitor.recordHeartbeat('data-processor', {
    memoryMb: Math.round(memUsage.heapUsed / 1024 / 1024),
    cpuPercent: getCpuUsage(),
    queueDepth: getQueueDepth(),
  });
}, 30000);
typescript
setInterval(() => {
  const memUsage = process.memoryUsage();
  
  monitor.recordHeartbeat('data-processor', {
    memoryMb: Math.round(memUsage.heapUsed / 1024 / 1024),
    cpuPercent: getCpuUsage(),
    queueDepth: getQueueDepth(),
  });
}, 30000);

Health API Endpoint

健康检查API端点

typescript
app.get('/health/workers', async (req, res) => {
  const summary = monitor.getHealthSummary();
  const statusCode = summary.systemStatus === 'unhealthy' ? 503 : 200;
  
  res.status(statusCode).json({
    status: summary.systemStatus,
    summary,
    percentiles: {
      'data-processor': {
        p50: monitor.getPercentileDuration('data-processor', 50),
        p95: monitor.getPercentileDuration('data-processor', 95),
        p99: monitor.getPercentileDuration('data-processor', 99),
      },
    },
  });
});
typescript
app.get('/health/workers', async (req, res) => {
  const summary = monitor.getHealthSummary();
  const statusCode = summary.systemStatus === 'unhealthy' ? 503 : 200;
  
  res.status(statusCode).json({
    status: summary.systemStatus,
    summary,
    percentiles: {
      'data-processor': {
        p50: monitor.getPercentileDuration('data-processor', 50),
        p95: monitor.getPercentileDuration('data-processor', 95),
        p99: monitor.getPercentileDuration('data-processor', 99),
      },
    },
  });
});

Best Practices

最佳实践

  1. Set expected durations based on actual baseline measurements
  2. Use rolling windows to smooth out outliers
  3. Configure thresholds based on your SLOs
  4. Send heartbeats even when idle
  5. Include resource metrics (memory, CPU) in heartbeats
  1. 根据实际基准测量值设置预期处理时长
  2. 使用滚动窗口来平滑异常值
  3. 根据你的SLO配置阈值
  4. 即使在空闲时也发送心跳
  5. 在心跳中包含资源指标(内存、CPU)

Common Mistakes

常见误区

  • Heartbeat timeout too short (false offline detection)
  • Not tracking job durations (miss degradation)
  • Failure rate thresholds too strict (alert fatigue)
  • No percentile tracking (miss tail latency issues)
  • Missing heartbeats during long jobs
  • 心跳超时设置过短(导致误判离线)
  • 未跟踪任务处理时长(错过性能退化)
  • 失败率阈值设置过严(导致告警疲劳)
  • 未跟踪延迟百分位数(错过尾部延迟问题)
  • 长任务执行期间未发送心跳

Related Patterns

相关模式

  • health-checks - HTTP health endpoints
  • anomaly-detection - Alert on health changes
  • graceful-shutdown - Drain workers cleanly
  • health-checks - HTTP健康检查端点
  • anomaly-detection - 针对健康变化触发告警
  • graceful-shutdown - 优雅地停止Worker并处理完剩余任务