worker-orchestration

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Worker Orchestration

工作进程编排

Coordinate multiple background workers.
协调多个后台工作进程。

When to Use This Skill

何时使用该技能

  • Multiple background jobs need coordination
  • Jobs have dependencies on each other
  • Need to prevent conflicting concurrent execution
  • Want automatic disabling of failing workers
  • 需要协调多个后台任务
  • 任务之间存在依赖关系
  • 需要防止冲突的并发执行
  • 希望自动禁用故障工作进程

Core Concepts

核心概念

  1. Scheduling - Run workers on intervals
  2. Dependencies - Worker A must complete before B starts
  3. Blocking - Workers that can't run concurrently
  4. Auto-disable - Stop workers after consecutive failures
  1. 调度 - 按时间间隔运行工作进程
  2. 依赖管理 - 工作进程A必须在B启动前完成
  3. 阻塞 - 无法并发运行的工作进程
  4. 自动禁用 - 连续失败后停止工作进程

TypeScript Implementation

TypeScript 实现

Types

类型定义

typescript
// types.ts
export enum WorkerExecutionMode {
  SCHEDULED = 'scheduled',
  TRIGGERED = 'triggered',
  CONTINUOUS = 'continuous',
}

export enum JobPriority {
  LOW = 0,
  NORMAL = 1,
  HIGH = 2,
  CRITICAL = 3,
}

export interface WorkerConfig {
  name: string;
  executionMode: WorkerExecutionMode;
  intervalSeconds: number;
  timeoutSeconds: number;
  maxRetries: number;
  priority: JobPriority;
  maxConsecutiveFailures: number;
  dependsOn: string[];
  blocks: string[];
  
  // Runtime state
  isEnabled: boolean;
  isRunning: boolean;
  consecutiveFailures: number;
  lastRun?: Date;
  lastSuccess?: Date;
  lastError?: string;
}

export type WorkerFn = (config: WorkerConfig) => Promise<void>;
typescript
// types.ts
export enum WorkerExecutionMode {
  SCHEDULED = 'scheduled',
  TRIGGERED = 'triggered',
  CONTINUOUS = 'continuous',
}

export enum JobPriority {
  LOW = 0,
  NORMAL = 1,
  HIGH = 2,
  CRITICAL = 3,
}

export interface WorkerConfig {
  name: string;
  executionMode: WorkerExecutionMode;
  intervalSeconds: number;
  timeoutSeconds: number;
  maxRetries: number;
  priority: JobPriority;
  maxConsecutiveFailures: number;
  dependsOn: string[];
  blocks: string[];
  
  // Runtime state
  isEnabled: boolean;
  isRunning: boolean;
  consecutiveFailures: number;
  lastRun?: Date;
  lastSuccess?: Date;
  lastError?: string;
}

export type WorkerFn = (config: WorkerConfig) => Promise<void>;

Orchestrator

编排器

typescript
// orchestrator.ts
import { WorkerConfig, WorkerFn, WorkerExecutionMode, JobPriority } from './types';

interface OrchestratorConfig {
  tickIntervalMs: number;
  maxConcurrentWorkers: number;
}

export class WorkerOrchestrator {
  private workers = new Map<string, WorkerConfig>();
  private workerFns = new Map<string, WorkerFn>();
  private running = new Set<string>();
  private tickInterval: NodeJS.Timeout | null = null;
  private state: 'stopped' | 'running' | 'stopping' = 'stopped';

  constructor(private config: OrchestratorConfig) {}

  registerWorker(
    name: string,
    fn: WorkerFn,
    options: Partial<WorkerConfig> = {}
  ): void {
    this.workers.set(name, {
      name,
      executionMode: options.executionMode || WorkerExecutionMode.SCHEDULED,
      intervalSeconds: options.intervalSeconds || 300,
      timeoutSeconds: options.timeoutSeconds || 60,
      maxRetries: options.maxRetries || 3,
      priority: options.priority || JobPriority.NORMAL,
      maxConsecutiveFailures: options.maxConsecutiveFailures || 5,
      dependsOn: options.dependsOn || [],
      blocks: options.blocks || [],
      isEnabled: true,
      isRunning: false,
      consecutiveFailures: 0,
    });
    this.workerFns.set(name, fn);
  }

  async start(): Promise<void> {
    if (this.state !== 'stopped') return;
    
    this.state = 'running';
    this.tickInterval = setInterval(
      () => this.tick(),
      this.config.tickIntervalMs
    );
    console.log(`[Orchestrator] Started with ${this.workers.size} workers`);
  }

  async stop(): Promise<void> {
    if (this.state === 'stopped') return;
    
    this.state = 'stopping';
    if (this.tickInterval) clearInterval(this.tickInterval);

    // Wait for running workers
    const maxWait = 30000;
    const start = Date.now();
    while (this.running.size > 0 && Date.now() - start < maxWait) {
      await this.sleep(100);
    }

    this.state = 'stopped';
    console.log('[Orchestrator] Stopped');
  }

  private async tick(): Promise<void> {
    if (this.state !== 'running') return;

    // Sort by priority (higher first)
    const sortedWorkers = Array.from(this.workers.entries())
      .sort((a, b) => b[1].priority - a[1].priority);

    for (const [name, config] of sortedWorkers) {
      if (!config.isEnabled || config.isRunning) continue;
      if (this.running.size >= this.config.maxConcurrentWorkers) break;

      if (this.shouldRun(config)) {
        this.executeWorker(name, config);
      }
    }
  }

  private shouldRun(config: WorkerConfig): boolean {
    // Check dependencies - must not be running
    for (const dep of config.dependsOn) {
      if (this.workers.get(dep)?.isRunning) return false;
    }

    // Check blockers - must not be running
    for (const blocker of config.blocks) {
      if (this.workers.get(blocker)?.isRunning) return false;
    }

    // Check schedule
    if (!config.lastRun) return true;
    const elapsed = (Date.now() - config.lastRun.getTime()) / 1000;
    return elapsed >= config.intervalSeconds;
  }

  private async executeWorker(name: string, config: WorkerConfig): Promise<void> {
    const fn = this.workerFns.get(name);
    if (!fn) return;

    config.isRunning = true;
    this.running.add(name);
    console.log(`[Orchestrator] Starting ${name}`);

    try {
      // Execute with timeout
      await Promise.race([
        fn(config),
        this.sleep(config.timeoutSeconds * 1000).then(() => {
          throw new Error('Worker timeout');
        }),
      ]);
      
      config.lastRun = new Date();
      config.lastSuccess = new Date();
      config.consecutiveFailures = 0;
      console.log(`[Orchestrator] Completed ${name}`);
      
    } catch (error) {
      config.lastRun = new Date();
      config.lastError = error instanceof Error ? error.message : String(error);
      config.consecutiveFailures++;
      console.error(`[Orchestrator] Failed ${name}:`, config.lastError);

      // Auto-disable after too many failures
      if (config.consecutiveFailures >= config.maxConsecutiveFailures) {
        config.isEnabled = false;
        console.log(`[Orchestrator] Disabled ${name} after ${config.consecutiveFailures} failures`);
      }
    } finally {
      config.isRunning = false;
      this.running.delete(name);
    }
  }

  async triggerWorker(name: string): Promise<boolean> {
    const config = this.workers.get(name);
    if (!config || config.isRunning) return false;
    
    await this.executeWorker(name, config);
    return true;
  }

  enableWorker(name: string): void {
    const config = this.workers.get(name);
    if (config) {
      config.isEnabled = true;
      config.consecutiveFailures = 0;
    }
  }

  disableWorker(name: string): void {
    const config = this.workers.get(name);
    if (config) {
      config.isEnabled = false;
    }
  }

  getStatus() {
    return {
      state: this.state,
      workers: this.workers.size,
      running: this.running.size,
      workerStates: Object.fromEntries(
        Array.from(this.workers.entries()).map(([name, config]) => [
          name,
          {
            enabled: config.isEnabled,
            running: config.isRunning,
            failures: config.consecutiveFailures,
            lastRun: config.lastRun,
            lastError: config.lastError,
          },
        ])
      ),
    };
  }

  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}
typescript
// orchestrator.ts
import { WorkerConfig, WorkerFn, WorkerExecutionMode, JobPriority } from './types';

interface OrchestratorConfig {
  tickIntervalMs: number;
  maxConcurrentWorkers: number;
}

export class WorkerOrchestrator {
  private workers = new Map<string, WorkerConfig>();
  private workerFns = new Map<string, WorkerFn>();
  private running = new Set<string>();
  private tickInterval: NodeJS.Timeout | null = null;
  private state: 'stopped' | 'running' | 'stopping' = 'stopped';

  constructor(private config: OrchestratorConfig) {}

  registerWorker(
    name: string,
    fn: WorkerFn,
    options: Partial<WorkerConfig> = {}
  ): void {
    this.workers.set(name, {
      name,
      executionMode: options.executionMode || WorkerExecutionMode.SCHEDULED,
      intervalSeconds: options.intervalSeconds || 300,
      timeoutSeconds: options.timeoutSeconds || 60,
      maxRetries: options.maxRetries || 3,
      priority: options.priority || JobPriority.NORMAL,
      maxConsecutiveFailures: options.maxConsecutiveFailures || 5,
      dependsOn: options.dependsOn || [],
      blocks: options.blocks || [],
      isEnabled: true,
      isRunning: false,
      consecutiveFailures: 0,
    });
    this.workerFns.set(name, fn);
  }

  async start(): Promise<void> {
    if (this.state !== 'stopped') return;
    
    this.state = 'running';
    this.tickInterval = setInterval(
      () => this.tick(),
      this.config.tickIntervalMs
    );
    console.log(`[Orchestrator] Started with ${this.workers.size} workers`);
  }

  async stop(): Promise<void> {
    if (this.state === 'stopped') return;
    
    this.state = 'stopping';
    if (this.tickInterval) clearInterval(this.tickInterval);

    // Wait for running workers
    const maxWait = 30000;
    const start = Date.now();
    while (this.running.size > 0 && Date.now() - start < maxWait) {
      await this.sleep(100);
    }

    this.state = 'stopped';
    console.log('[Orchestrator] Stopped');
  }

  private async tick(): Promise<void> {
    if (this.state !== 'running') return;

    // Sort by priority (higher first)
    const sortedWorkers = Array.from(this.workers.entries())
      .sort((a, b) => b[1].priority - a[1].priority);

    for (const [name, config] of sortedWorkers) {
      if (!config.isEnabled || config.isRunning) continue;
      if (this.running.size >= this.config.maxConcurrentWorkers) break;

      if (this.shouldRun(config)) {
        this.executeWorker(name, config);
      }
    }
  }

  private shouldRun(config: WorkerConfig): boolean {
    // Check dependencies - must not be running
    for (const dep of config.dependsOn) {
      if (this.workers.get(dep)?.isRunning) return false;
    }

    // Check blockers - must not be running
    for (const blocker of config.blocks) {
      if (this.workers.get(blocker)?.isRunning) return false;
    }

    // Check schedule
    if (!config.lastRun) return true;
    const elapsed = (Date.now() - config.lastRun.getTime()) / 1000;
    return elapsed >= config.intervalSeconds;
  }

  private async executeWorker(name: string, config: WorkerConfig): Promise<void> {
    const fn = this.workerFns.get(name);
    if (!fn) return;

    config.isRunning = true;
    this.running.add(name);
    console.log(`[Orchestrator] Starting ${name}`);

    try {
      // Execute with timeout
      await Promise.race([
        fn(config),
        this.sleep(config.timeoutSeconds * 1000).then(() => {
          throw new Error('Worker timeout');
        }),
      ]);
      
      config.lastRun = new Date();
      config.lastSuccess = new Date();
      config.consecutiveFailures = 0;
      console.log(`[Orchestrator] Completed ${name}`);
      
    } catch (error) {
      config.lastRun = new Date();
      config.lastError = error instanceof Error ? error.message : String(error);
      config.consecutiveFailures++;
      console.error(`[Orchestrator] Failed ${name}:`, config.lastError);

      // Auto-disable after too many failures
      if (config.consecutiveFailures >= config.maxConsecutiveFailures) {
        config.isEnabled = false;
        console.log(`[Orchestrator] Disabled ${name} after ${config.consecutiveFailures} failures`);
      }
    } finally {
      config.isRunning = false;
      this.running.delete(name);
    }
  }

  async triggerWorker(name: string): Promise<boolean> {
    const config = this.workers.get(name);
    if (!config || config.isRunning) return false;
    
    await this.executeWorker(name, config);
    return true;
  }

  enableWorker(name: string): void {
    const config = this.workers.get(name);
    if (config) {
      config.isEnabled = true;
      config.consecutiveFailures = 0;
    }
  }

  disableWorker(name: string): void {
    const config = this.workers.get(name);
    if (config) {
      config.isEnabled = false;
    }
  }

  getStatus() {
    return {
      state: this.state,
      workers: this.workers.size,
      running: this.running.size,
      workerStates: Object.fromEntries(
        Array.from(this.workers.entries()).map(([name, config]) => [
          name,
          {
            enabled: config.isEnabled,
            running: config.isRunning,
            failures: config.consecutiveFailures,
            lastRun: config.lastRun,
            lastError: config.lastError,
          },
        ])
      ),
    };
  }

  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

Python Implementation

Python 实现

python
undefined
python
undefined

orchestrator.py

orchestrator.py

import asyncio from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Dict, Set, Callable, Awaitable, Optional, List
class JobPriority(int, Enum): LOW = 0 NORMAL = 1 HIGH = 2 CRITICAL = 3
@dataclass class WorkerConfig: name: str interval_seconds: int = 300 timeout_seconds: int = 60 max_consecutive_failures: int = 5 priority: JobPriority = JobPriority.NORMAL depends_on: List[str] = field(default_factory=list) blocks: List[str] = field(default_factory=list)
# Runtime state
is_enabled: bool = True
is_running: bool = False
consecutive_failures: int = 0
last_run: Optional[datetime] = None
last_error: Optional[str] = None
WorkerFn = Callable[[WorkerConfig], Awaitable[None]]
class WorkerOrchestrator: def init(self, tick_interval: float = 5.0, max_concurrent: int = 5): self._tick_interval = tick_interval self._max_concurrent = max_concurrent self._workers: Dict[str, WorkerConfig] = {} self._worker_fns: Dict[str, WorkerFn] = {} self._running: Set[str] = set() self._state = "stopped" self._task: Optional[asyncio.Task] = None
def register_worker(
    self,
    name: str,
    fn: WorkerFn,
    **options
):
    self._workers[name] = WorkerConfig(name=name, **options)
    self._worker_fns[name] = fn

async def start(self):
    if self._state != "stopped":
        return
    
    self._state = "running"
    self._task = asyncio.create_task(self._tick_loop())
    print(f"[Orchestrator] Started with {len(self._workers)} workers")

async def stop(self):
    if self._state == "stopped":
        return
    
    self._state = "stopping"
    if self._task:
        self._task.cancel()
    
    # Wait for running workers
    timeout = 30.0
    start = datetime.now()
    while self._running and (datetime.now() - start).total_seconds() < timeout:
        await asyncio.sleep(0.1)
    
    self._state = "stopped"
    print("[Orchestrator] Stopped")

async def _tick_loop(self):
    while self._state == "running":
        await self._tick()
        await asyncio.sleep(self._tick_interval)

async def _tick(self):
    # Sort by priority
    sorted_workers = sorted(
        self._workers.items(),
        key=lambda x: x[1].priority,
        reverse=True
    )

    for name, config in sorted_workers:
        if not config.is_enabled or config.is_running:
            continue
        if len(self._running) >= self._max_concurrent:
            break

        if self._should_run(config):
            asyncio.create_task(self._execute_worker(name, config))

def _should_run(self, config: WorkerConfig) -> bool:
    # Check dependencies
    for dep in config.depends_on:
        if self._workers.get(dep, WorkerConfig(name="")).is_running:
            return False

    # Check blockers
    for blocker in config.blocks:
        if self._workers.get(blocker, WorkerConfig(name="")).is_running:
            return False

    # Check schedule
    if not config.last_run:
        return True
    elapsed = (datetime.now() - config.last_run).total_seconds()
    return elapsed >= config.interval_seconds

async def _execute_worker(self, name: str, config: WorkerConfig):
    fn = self._worker_fns.get(name)
    if not fn:
        return

    config.is_running = True
    self._running.add(name)

    try:
        await asyncio.wait_for(
            fn(config),
            timeout=config.timeout_seconds
        )
        config.last_run = datetime.now()
        config.consecutive_failures = 0
    except Exception as e:
        config.last_run = datetime.now()
        config.last_error = str(e)
        config.consecutive_failures += 1

        if config.consecutive_failures >= config.max_consecutive_failures:
            config.is_enabled = False
            print(f"[Orchestrator] Disabled {name}")
    finally:
        config.is_running = False
        self._running.discard(name)

def get_status(self) -> Dict:
    return {
        "state": self._state,
        "workers": len(self._workers),
        "running": len(self._running),
        "worker_states": {
            name: {
                "enabled": c.is_enabled,
                "running": c.is_running,
                "failures": c.consecutive_failures,
            }
            for name, c in self._workers.items()
        },
    }
undefined
import asyncio from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Dict, Set, Callable, Awaitable, Optional, List
class JobPriority(int, Enum): LOW = 0 NORMAL = 1 HIGH = 2 CRITICAL = 3
@dataclass class WorkerConfig: name: str interval_seconds: int = 300 timeout_seconds: int = 60 max_consecutive_failures: int = 5 priority: JobPriority = JobPriority.NORMAL depends_on: List[str] = field(default_factory=list) blocks: List[str] = field(default_factory=list)
# Runtime state
is_enabled: bool = True
is_running: bool = False
consecutive_failures: int = 0
last_run: Optional[datetime] = None
last_error: Optional[str] = None
WorkerFn = Callable[[WorkerConfig], Awaitable[None]]
class WorkerOrchestrator: def init(self, tick_interval: float = 5.0, max_concurrent: int = 5): self._tick_interval = tick_interval self._max_concurrent = max_concurrent self._workers: Dict[str, WorkerConfig] = {} self._worker_fns: Dict[str, WorkerFn] = {} self._running: Set[str] = set() self._state = "stopped" self._task: Optional[asyncio.Task] = None
def register_worker(
    self,
    name: str,
    fn: WorkerFn,
    **options
):
    self._workers[name] = WorkerConfig(name=name, **options)
    self._worker_fns[name] = fn

async def start(self):
    if self._state != "stopped":
        return
    
    self._state = "running"
    self._task = asyncio.create_task(self._tick_loop())
    print(f"[Orchestrator] Started with {len(self._workers)} workers")

async def stop(self):
    if self._state == "stopped":
        return
    
    self._state = "stopping"
    if self._task:
        self._task.cancel()
    
    # Wait for running workers
    timeout = 30.0
    start = datetime.now()
    while self._running and (datetime.now() - start).total_seconds() < timeout:
        await asyncio.sleep(0.1)
    
    self._state = "stopped"
    print("[Orchestrator] Stopped")

async def _tick_loop(self):
    while self._state == "running":
        await self._tick()
        await asyncio.sleep(self._tick_interval)

async def _tick(self):
    # Sort by priority
    sorted_workers = sorted(
        self._workers.items(),
        key=lambda x: x[1].priority,
        reverse=True
    )

    for name, config in sorted_workers:
        if not config.is_enabled or config.is_running:
            continue
        if len(self._running) >= self._max_concurrent:
            break

        if self._should_run(config):
            asyncio.create_task(self._execute_worker(name, config))

def _should_run(self, config: WorkerConfig) -> bool:
    # Check dependencies
    for dep in config.depends_on:
        if self._workers.get(dep, WorkerConfig(name="")).is_running:
            return False

    # Check blockers
    for blocker in config.blocks:
        if self._workers.get(blocker, WorkerConfig(name="")).is_running:
            return False

    # Check schedule
    if not config.last_run:
        return True
    elapsed = (datetime.now() - config.last_run).total_seconds()
    return elapsed >= config.interval_seconds

async def _execute_worker(self, name: str, config: WorkerConfig):
    fn = self._worker_fns.get(name)
    if not fn:
        return

    config.is_running = True
    self._running.add(name)

    try:
        await asyncio.wait_for(
            fn(config),
            timeout=config.timeout_seconds
        )
        config.last_run = datetime.now()
        config.consecutive_failures = 0
    except Exception as e:
        config.last_run = datetime.now()
        config.last_error = str(e)
        config.consecutive_failures += 1

        if config.consecutive_failures >= config.max_consecutive_failures:
            config.is_enabled = False
            print(f"[Orchestrator] Disabled {name}")
    finally:
        config.is_running = False
        self._running.discard(name)

def get_status(self) -> Dict:
    return {
        "state": self._state,
        "workers": len(self._workers),
        "running": len(self._running),
        "worker_states": {
            name: {
                "enabled": c.is_enabled,
                "running": c.is_running,
                "failures": c.consecutive_failures,
            }
            for name, c in self._workers.items()
        },
    }
undefined

Usage Examples

使用示例

typescript
const orchestrator = new WorkerOrchestrator({
  tickIntervalMs: 5000,
  maxConcurrentWorkers: 5,
});

// High-priority data fetcher
orchestrator.registerWorker('fetch-data', fetchDataWorker, {
  intervalSeconds: 60,
  timeoutSeconds: 30,
  priority: JobPriority.HIGH,
});

// Depends on fetch-data completing first
orchestrator.registerWorker('process-data', processDataWorker, {
  intervalSeconds: 120,
  dependsOn: ['fetch-data'],
});

// Can't run while process-data is running
orchestrator.registerWorker('cleanup', cleanupWorker, {
  intervalSeconds: 3600,
  priority: JobPriority.LOW,
  blocks: ['process-data'],
});

// Start
await orchestrator.start();

// Graceful shutdown
process.on('SIGTERM', () => orchestrator.stop());
typescript
const orchestrator = new WorkerOrchestrator({
  tickIntervalMs: 5000,
  maxConcurrentWorkers: 5,
});

// High-priority data fetcher
orchestrator.registerWorker('fetch-data', fetchDataWorker, {
  intervalSeconds: 60,
  timeoutSeconds: 30,
  priority: JobPriority.HIGH,
});

// Depends on fetch-data completing first
orchestrator.registerWorker('process-data', processDataWorker, {
  intervalSeconds: 120,
  dependsOn: ['fetch-data'],
});

// Can't run while process-data is running
orchestrator.registerWorker('cleanup', cleanupWorker, {
  intervalSeconds: 3600,
  priority: JobPriority.LOW,
  blocks: ['process-data'],
});

// Start
await orchestrator.start();

// Graceful shutdown
process.on('SIGTERM', () => orchestrator.stop());

Best Practices

最佳实践

  1. Use dependencies - For sequential workflows
  2. Use blocks - For mutually exclusive jobs
  3. Set timeouts - Prevent hung workers
  4. Auto-disable - Stop failing workers automatically
  5. Monitor status - Expose metrics endpoint
  1. 使用依赖管理 - 适用于顺序工作流
  2. 使用阻塞规则 - 适用于互斥任务
  3. 设置超时时间 - 防止工作进程挂起
  4. 启用自动禁用 - 自动停止故障工作进程
  5. 监控状态 - 暴露指标端点

Common Mistakes

常见错误

  • No timeout on workers (hung forever)
  • Missing dependencies (race conditions)
  • Too many concurrent workers (resource exhaustion)
  • Not handling graceful shutdown
  • No visibility into worker state
  • 未给工作进程设置超时(永久挂起)
  • 遗漏依赖关系(竞态条件)
  • 并发工作进程过多(资源耗尽)
  • 未处理优雅关闭
  • 无法查看工作进程状态

Related Skills

相关技能

  • Background Jobs
  • Graceful Shutdown
  • Leader Election
  • 后台任务
  • 优雅关闭
  • 主节点选举