graceful-shutdown

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Graceful Shutdown

优雅关闭

Clean shutdown without data loss.
无数据丢失的优雅关闭。

When to Use This Skill

适用场景

  • Running background workers
  • Processing queues or streams
  • Buffering data before persistence
  • Any long-running process that handles state
  • 运行后台工作进程
  • 处理队列或流数据
  • 持久化前缓冲数据
  • 任何处理状态的长期运行进程

Core Concepts

核心概念

  1. Signal handlers - Catch SIGTERM/SIGINT
  2. In-flight tracking - Know what's still running
  3. Buffer draining - Flush before exit
  4. Cleanup callbacks - Close connections properly
  1. 信号处理器 - 捕获SIGTERM/SIGINT信号
  2. 运行中任务跟踪 - 掌握当前仍在运行的任务
  3. 缓冲区排空 - 退出前刷新缓冲区
  4. 清理回调 - 正确关闭连接

Shutdown Flow

关闭流程

SIGTERM received
Stop accepting new work
Wait for in-flight jobs
Drain buffers
Run cleanup callbacks
Process exits
收到SIGTERM信号
停止接收新任务
等待运行中任务完成
排空缓冲区
执行清理回调
进程退出

TypeScript Implementation

TypeScript 实现

typescript
// graceful-shutdown.ts
type ShutdownCallback = () => Promise<void>;
type DrainCallback = () => Promise<{ flushed: number; dropped: number }>;

interface InFlightJob {
  id: string;
  workerName: string;
  startedAt: Date;
  timeoutMs: number;
}

class GracefulShutdown {
  private isShuttingDown = false;
  private shutdownPromise: Promise<void> | null = null;
  private callbacks: ShutdownCallback[] = [];
  private drainCallbacks: DrainCallback[] = [];
  private inFlightJobs = new Map<string, InFlightJob>();
  private shutdownTimeoutMs = 30000;

  registerSignals(): void {
    const handler = (signal: string) => {
      console.log(`[Shutdown] Received ${signal}`);
      this.shutdown(`Signal: ${signal}`);
    };

    process.on('SIGTERM', () => handler('SIGTERM'));
    process.on('SIGINT', () => handler('SIGINT'));
  }

  onShutdown(callback: ShutdownCallback): void {
    this.callbacks.push(callback);
  }

  onDrain(callback: DrainCallback): void {
    this.drainCallbacks.push(callback);
  }

  trackJob(id: string, workerName: string, timeoutMs = 60000): void {
    if (this.isShuttingDown) return;
    this.inFlightJobs.set(id, { id, workerName, startedAt: new Date(), timeoutMs });
  }

  completeJob(id: string): void {
    this.inFlightJobs.delete(id);
  }

  isShutdownInProgress(): boolean {
    return this.isShuttingDown;
  }

  async shutdown(reason: string): Promise<void> {
    if (this.shutdownPromise) return this.shutdownPromise;

    this.isShuttingDown = true;
    console.log(`[Shutdown] Starting: ${reason}`);

    this.shutdownPromise = this.performShutdown();
    return this.shutdownPromise;
  }

  private async performShutdown(): Promise<void> {
    const startTime = Date.now();

    // 1. Wait for in-flight jobs
    console.log(`[Shutdown] Waiting for ${this.inFlightJobs.size} jobs...`);
    
    while (this.inFlightJobs.size > 0) {
      if (Date.now() - startTime > this.shutdownTimeoutMs) {
        console.log(`[Shutdown] Timeout! ${this.inFlightJobs.size} jobs still running`);
        break;
      }

      // Force-complete stuck jobs
      const now = Date.now();
      for (const [id, job] of this.inFlightJobs) {
        if (now - job.startedAt.getTime() > job.timeoutMs) {
          console.log(`[Shutdown] Force-completing stuck job: ${id}`);
          this.inFlightJobs.delete(id);
        }
      }

      await this.sleep(100);
    }

    // 2. Drain buffers
    if (this.drainCallbacks.length > 0) {
      console.log(`[Shutdown] Draining ${this.drainCallbacks.length} buffers...`);
      
      let totalFlushed = 0, totalDropped = 0;
      
      for (const drain of this.drainCallbacks) {
        try {
          const result = await Promise.race([
            drain(),
            this.sleep(10000).then(() => ({ flushed: 0, dropped: 0 })),
          ]);
          totalFlushed += result.flushed;
          totalDropped += result.dropped;
        } catch (err) {
          console.error('[Shutdown] Drain error:', err);
        }
      }
      
      console.log(`[Shutdown] Drained: ${totalFlushed} flushed, ${totalDropped} dropped`);
    }

    // 3. Run cleanup callbacks
    console.log(`[Shutdown] Running ${this.callbacks.length} cleanup callbacks...`);
    
    for (const callback of this.callbacks) {
      try {
        await Promise.race([
          callback(),
          this.sleep(5000).then(() => { throw new Error('Callback timeout'); }),
        ]);
      } catch (err) {
        console.error('[Shutdown] Callback error:', err);
      }
    }

    console.log(`[Shutdown] Complete in ${Date.now() - startTime}ms`);
    process.exit(0);
  }

  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// Singleton
let instance: GracefulShutdown | null = null;

export function getShutdownHandler(): GracefulShutdown {
  if (!instance) instance = new GracefulShutdown();
  return instance;
}
typescript
// graceful-shutdown.ts
type ShutdownCallback = () => Promise<void>;
type DrainCallback = () => Promise<{ flushed: number; dropped: number }>;

interface InFlightJob {
  id: string;
  workerName: string;
  startedAt: Date;
  timeoutMs: number;
}

class GracefulShutdown {
  private isShuttingDown = false;
  private shutdownPromise: Promise<void> | null = null;
  private callbacks: ShutdownCallback[] = [];
  private drainCallbacks: DrainCallback[] = [];
  private inFlightJobs = new Map<string, InFlightJob>();
  private shutdownTimeoutMs = 30000;

  registerSignals(): void {
    const handler = (signal: string) => {
      console.log(`[Shutdown] Received ${signal}`);
      this.shutdown(`Signal: ${signal}`);
    };

    process.on('SIGTERM', () => handler('SIGTERM'));
    process.on('SIGINT', () => handler('SIGINT'));
  }

  onShutdown(callback: ShutdownCallback): void {
    this.callbacks.push(callback);
  }

  onDrain(callback: DrainCallback): void {
    this.drainCallbacks.push(callback);
  }

  trackJob(id: string, workerName: string, timeoutMs = 60000): void {
    if (this.isShuttingDown) return;
    this.inFlightJobs.set(id, { id, workerName, startedAt: new Date(), timeoutMs });
  }

  completeJob(id: string): void {
    this.inFlightJobs.delete(id);
  }

  isShutdownInProgress(): boolean {
    return this.isShuttingDown;
  }

  async shutdown(reason: string): Promise<void> {
    if (this.shutdownPromise) return this.shutdownPromise;

    this.isShuttingDown = true;
    console.log(`[Shutdown] Starting: ${reason}`);

    this.shutdownPromise = this.performShutdown();
    return this.shutdownPromise;
  }

  private async performShutdown(): Promise<void> {
    const startTime = Date.now();

    // 1. 等待运行中任务完成
    console.log(`[Shutdown] Waiting for ${this.inFlightJobs.size} jobs...`);
    
    while (this.inFlightJobs.size > 0) {
      if (Date.now() - startTime > this.shutdownTimeoutMs) {
        console.log(`[Shutdown] Timeout! ${this.inFlightJobs.size} jobs still running`);
        break;
      }

      // 强制完成卡住的任务
      const now = Date.now();
      for (const [id, job] of this.inFlightJobs) {
        if (now - job.startedAt.getTime() > job.timeoutMs) {
          console.log(`[Shutdown] Force-completing stuck job: ${id}`);
          this.inFlightJobs.delete(id);
        }
      }

      await this.sleep(100);
    }

    // 2. 排空缓冲区
    if (this.drainCallbacks.length > 0) {
      console.log(`[Shutdown] Draining ${this.drainCallbacks.length} buffers...`);
      
      let totalFlushed = 0, totalDropped = 0;
      
      for (const drain of this.drainCallbacks) {
        try {
          const result = await Promise.race([
            drain(),
            this.sleep(10000).then(() => ({ flushed: 0, dropped: 0 })),
          ]);
          totalFlushed += result.flushed;
          totalDropped += result.dropped;
        } catch (err) {
          console.error('[Shutdown] Drain error:', err);
        }
      }
      
      console.log(`[Shutdown] Drained: ${totalFlushed} flushed, ${totalDropped} dropped`);
    }

    // 3. 执行清理回调
    console.log(`[Shutdown] Running ${this.callbacks.length} cleanup callbacks...`);
    
    for (const callback of this.callbacks) {
      try {
        await Promise.race([
          callback(),
          this.sleep(5000).then(() => { throw new Error('Callback timeout'); }),
        ]);
      } catch (err) {
        console.error('[Shutdown] Callback error:', err);
      }
    }

    console.log(`[Shutdown] Complete in ${Date.now() - startTime}ms`);
    process.exit(0);
  }

  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// 单例模式
let instance: GracefulShutdown | null = null;

export function getShutdownHandler(): GracefulShutdown {
  if (!instance) instance = new GracefulShutdown();
  return instance;
}

Python Implementation

Python 实现

python
undefined
python
undefined

graceful_shutdown.py

graceful_shutdown.py

import asyncio import signal from dataclasses import dataclass, field from datetime import datetime from typing import Callable, Awaitable, Dict, List
@dataclass class InFlightJob: id: str worker_name: str started_at: datetime timeout_seconds: float
ShutdownCallback = Callable[[], Awaitable[None]] DrainCallback = Callable[[], Awaitable[Dict[str, int]]]
class GracefulShutdown: def init(self, timeout_seconds: float = 30.0): self._is_shutting_down = False self._shutdown_task: asyncio.Task | None = None self._callbacks: List[ShutdownCallback] = [] self._drain_callbacks: List[DrainCallback] = [] self._in_flight: Dict[str, InFlightJob] = {} self._timeout = timeout_seconds
def register_signals(self):
    loop = asyncio.get_event_loop()
    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(
            sig,
            lambda s=sig: asyncio.create_task(self.shutdown(f"Signal: {s.name}"))
        )

def on_shutdown(self, callback: ShutdownCallback):
    self._callbacks.append(callback)

def on_drain(self, callback: DrainCallback):
    self._drain_callbacks.append(callback)

def track_job(self, job_id: str, worker_name: str, timeout_seconds: float = 60.0):
    if self._is_shutting_down:
        return
    self._in_flight[job_id] = InFlightJob(
        id=job_id,
        worker_name=worker_name,
        started_at=datetime.now(),
        timeout_seconds=timeout_seconds,
    )

def complete_job(self, job_id: str):
    self._in_flight.pop(job_id, None)

@property
def is_shutting_down(self) -> bool:
    return self._is_shutting_down

async def shutdown(self, reason: str):
    if self._shutdown_task:
        return await self._shutdown_task

    self._is_shutting_down = True
    print(f"[Shutdown] Starting: {reason}")
    
    self._shutdown_task = asyncio.create_task(self._perform_shutdown())
    return await self._shutdown_task

async def _perform_shutdown(self):
    start_time = datetime.now()

    # Wait for in-flight jobs
    print(f"[Shutdown] Waiting for {len(self._in_flight)} jobs...")
    
    while self._in_flight:
        elapsed = (datetime.now() - start_time).total_seconds()
        if elapsed > self._timeout:
            print(f"[Shutdown] Timeout! {len(self._in_flight)} jobs still running")
            break

        # Force-complete stuck jobs
        now = datetime.now()
        stuck = [
            job_id for job_id, job in self._in_flight.items()
            if (now - job.started_at).total_seconds() > job.timeout_seconds
        ]
        for job_id in stuck:
            print(f"[Shutdown] Force-completing stuck job: {job_id}")
            self._in_flight.pop(job_id)

        await asyncio.sleep(0.1)

    # Drain buffers
    for drain in self._drain_callbacks:
        try:
            result = await asyncio.wait_for(drain(), timeout=10.0)
            print(f"[Shutdown] Drained: {result}")
        except Exception as e:
            print(f"[Shutdown] Drain error: {e}")

    # Run cleanup callbacks
    for callback in self._callbacks:
        try:
            await asyncio.wait_for(callback(), timeout=5.0)
        except Exception as e:
            print(f"[Shutdown] Callback error: {e}")

    elapsed = (datetime.now() - start_time).total_seconds()
    print(f"[Shutdown] Complete in {elapsed:.1f}s")
import asyncio import signal from dataclasses import dataclass, field from datetime import datetime from typing import Callable, Awaitable, Dict, List
@dataclass class InFlightJob: id: str worker_name: str started_at: datetime timeout_seconds: float
ShutdownCallback = Callable[[], Awaitable[None]] DrainCallback = Callable[[], Awaitable[Dict[str, int]]]
class GracefulShutdown: def init(self, timeout_seconds: float = 30.0): self._is_shutting_down = False self._shutdown_task: asyncio.Task | None = None self._callbacks: List[ShutdownCallback] = [] self._drain_callbacks: List[DrainCallback] = [] self._in_flight: Dict[str, InFlightJob] = {} self._timeout = timeout_seconds
def register_signals(self):
    loop = asyncio.get_event_loop()
    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(
            sig,
            lambda s=sig: asyncio.create_task(self.shutdown(f"Signal: {s.name}"))
        )

def on_shutdown(self, callback: ShutdownCallback):
    self._callbacks.append(callback)

def on_drain(self, callback: DrainCallback):
    self._drain_callbacks.append(callback)

def track_job(self, job_id: str, worker_name: str, timeout_seconds: float = 60.0):
    if self._is_shutting_down:
        return
    self._in_flight[job_id] = InFlightJob(
        id=job_id,
        worker_name=worker_name,
        started_at=datetime.now(),
        timeout_seconds=timeout_seconds,
    )

def complete_job(self, job_id: str):
    self._in_flight.pop(job_id, None)

@property
def is_shutting_down(self) -> bool:
    return self._is_shutting_down

async def shutdown(self, reason: str):
    if self._shutdown_task:
        return await self._shutdown_task

    self._is_shutting_down = True
    print(f"[Shutdown] Starting: {reason}")
    
    self._shutdown_task = asyncio.create_task(self._perform_shutdown())
    return await self._shutdown_task

async def _perform_shutdown(self):
    start_time = datetime.now()

    # 等待运行中任务完成
    print(f"[Shutdown] Waiting for {len(self._in_flight)} jobs...")
    
    while self._in_flight:
        elapsed = (datetime.now() - start_time).total_seconds()
        if elapsed > self._timeout:
            print(f"[Shutdown] Timeout! {len(self._in_flight)} jobs still running")
            break

        # 强制完成卡住的任务
        now = datetime.now()
        stuck = [
            job_id for job_id, job in self._in_flight.items()
            if (now - job.started_at).total_seconds() > job.timeout_seconds
        ]
        for job_id in stuck:
            print(f"[Shutdown] Force-completing stuck job: {job_id}")
            self._in_flight.pop(job_id)

        await asyncio.sleep(0.1)

    # 排空缓冲区
    for drain in self._drain_callbacks:
        try:
            result = await asyncio.wait_for(drain(), timeout=10.0)
            print(f"[Shutdown] Drained: {result}")
        except Exception as e:
            print(f"[Shutdown] Drain error: {e}")

    # 执行清理回调
    for callback in self._callbacks:
        try:
            await asyncio.wait_for(callback(), timeout=5.0)
        except Exception as e:
            print(f"[Shutdown] Callback error: {e}")

    elapsed = (datetime.now() - start_time).total_seconds()
    print(f"[Shutdown] Complete in {elapsed:.1f}s")

Singleton

单例模式

_instance: GracefulShutdown | None = None
def get_shutdown_handler() -> GracefulShutdown: global _instance if _instance is None: _instance = GracefulShutdown() return _instance
undefined
_instance: GracefulShutdown | None = None
def get_shutdown_handler() -> GracefulShutdown: global _instance if _instance is None: _instance = GracefulShutdown() return _instance
undefined

Usage Examples

使用示例

Basic Setup

基础配置

typescript
const shutdown = getShutdownHandler();
shutdown.registerSignals();

// Register cleanup
shutdown.onShutdown(async () => {
  await database.close();
  await redis.quit();
});

// Register buffer drain
shutdown.onDrain(async () => {
  return backpressureBuffer.flush();
});
typescript
const shutdown = getShutdownHandler();
shutdown.registerSignals();

// 注册清理操作
shutdown.onShutdown(async () => {
  await database.close();
  await redis.quit();
});

// 注册缓冲区排空操作
shutdown.onDrain(async () => {
  return backpressureBuffer.flush();
});

Job Tracking

任务跟踪

typescript
async function processJob(jobId: string) {
  const shutdown = getShutdownHandler();
  
  // Don't start new work during shutdown
  if (shutdown.isShutdownInProgress()) {
    return;
  }
  
  shutdown.trackJob(jobId, 'my-worker', 30000);
  
  try {
    await doWork(jobId);
  } finally {
    shutdown.completeJob(jobId);
  }
}
typescript
async function processJob(jobId: string) {
  const shutdown = getShutdownHandler();
  
  // 关闭过程中不启动新任务
  if (shutdown.isShutdownInProgress()) {
    return;
  }
  
  shutdown.trackJob(jobId, 'my-worker', 30000);
  
  try {
    await doWork(jobId);
  } finally {
    shutdown.completeJob(jobId);
  }
}

With Express/Fastify

结合Express/Fastify使用

typescript
const shutdown = getShutdownHandler();
shutdown.registerSignals();

// Stop accepting new requests
shutdown.onShutdown(async () => {
  await new Promise<void>((resolve) => {
    server.close(() => resolve());
  });
});

// Close database connections
shutdown.onShutdown(async () => {
  await prisma.$disconnect();
});
typescript
const shutdown = getShutdownHandler();
shutdown.registerSignals();

// 停止接收新请求
shutdown.onShutdown(async () => {
  await new Promise<void>((resolve) => {
    server.close(() => resolve());
  });
});

// 关闭数据库连接
shutdown.onShutdown(async () => {
  await prisma.$disconnect();
});

Best Practices

最佳实践

  1. Register signals early - First thing in app startup
  2. Track all in-flight work - With appropriate timeouts
  3. Drain before cleanup - Buffers first, connections last
  4. Set reasonable timeouts - Don't hang forever
  5. Check before new work - Don't start during shutdown
  1. 尽早注册信号处理器 - 在应用启动时首先完成
  2. 跟踪所有运行中任务 - 设置合理的超时时间
  3. 先排空再清理 - 先处理缓冲区,再关闭连接
  4. 设置合理超时时间 - 避免无限挂起
  5. 启动新任务前检查状态 - 关闭过程中不启动新任务

Common Mistakes

常见错误

  • Not registering signal handlers
  • Starting new work during shutdown
  • No timeout on cleanup callbacks
  • Forgetting to track in-flight jobs
  • Closing connections before draining buffers
  • 未注册信号处理器
  • 关闭过程中启动新任务
  • 清理回调未设置超时
  • 忘记跟踪运行中任务
  • 未排空缓冲区就关闭连接

Related Skills

相关技能

  • Background Jobs
  • Backpressure
  • Health Checks
  • 后台任务
  • 背压处理
  • 健康检查