graceful-shutdown
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseGraceful Shutdown
优雅关闭
Clean shutdown without data loss.
无数据丢失的优雅关闭。
When to Use This Skill
适用场景
- Running background workers
- Processing queues or streams
- Buffering data before persistence
- Any long-running process that handles state
- 运行后台工作进程
- 处理队列或流数据
- 持久化前缓冲数据
- 任何处理状态的长期运行进程
Core Concepts
核心概念
- Signal handlers - Catch SIGTERM/SIGINT
- In-flight tracking - Know what's still running
- Buffer draining - Flush before exit
- Cleanup callbacks - Close connections properly
- 信号处理器 - 捕获SIGTERM/SIGINT信号
- 运行中任务跟踪 - 掌握当前仍在运行的任务
- 缓冲区排空 - 退出前刷新缓冲区
- 清理回调 - 正确关闭连接
Shutdown Flow
关闭流程
SIGTERM received
│
▼
Stop accepting new work
│
▼
Wait for in-flight jobs
│
▼
Drain buffers
│
▼
Run cleanup callbacks
│
▼
Process exits收到SIGTERM信号
│
▼
停止接收新任务
│
▼
等待运行中任务完成
│
▼
排空缓冲区
│
▼
执行清理回调
│
▼
进程退出TypeScript Implementation
TypeScript 实现
typescript
// graceful-shutdown.ts
type ShutdownCallback = () => Promise<void>;
type DrainCallback = () => Promise<{ flushed: number; dropped: number }>;
interface InFlightJob {
id: string;
workerName: string;
startedAt: Date;
timeoutMs: number;
}
class GracefulShutdown {
private isShuttingDown = false;
private shutdownPromise: Promise<void> | null = null;
private callbacks: ShutdownCallback[] = [];
private drainCallbacks: DrainCallback[] = [];
private inFlightJobs = new Map<string, InFlightJob>();
private shutdownTimeoutMs = 30000;
registerSignals(): void {
const handler = (signal: string) => {
console.log(`[Shutdown] Received ${signal}`);
this.shutdown(`Signal: ${signal}`);
};
process.on('SIGTERM', () => handler('SIGTERM'));
process.on('SIGINT', () => handler('SIGINT'));
}
onShutdown(callback: ShutdownCallback): void {
this.callbacks.push(callback);
}
onDrain(callback: DrainCallback): void {
this.drainCallbacks.push(callback);
}
trackJob(id: string, workerName: string, timeoutMs = 60000): void {
if (this.isShuttingDown) return;
this.inFlightJobs.set(id, { id, workerName, startedAt: new Date(), timeoutMs });
}
completeJob(id: string): void {
this.inFlightJobs.delete(id);
}
isShutdownInProgress(): boolean {
return this.isShuttingDown;
}
async shutdown(reason: string): Promise<void> {
if (this.shutdownPromise) return this.shutdownPromise;
this.isShuttingDown = true;
console.log(`[Shutdown] Starting: ${reason}`);
this.shutdownPromise = this.performShutdown();
return this.shutdownPromise;
}
private async performShutdown(): Promise<void> {
const startTime = Date.now();
// 1. Wait for in-flight jobs
console.log(`[Shutdown] Waiting for ${this.inFlightJobs.size} jobs...`);
while (this.inFlightJobs.size > 0) {
if (Date.now() - startTime > this.shutdownTimeoutMs) {
console.log(`[Shutdown] Timeout! ${this.inFlightJobs.size} jobs still running`);
break;
}
// Force-complete stuck jobs
const now = Date.now();
for (const [id, job] of this.inFlightJobs) {
if (now - job.startedAt.getTime() > job.timeoutMs) {
console.log(`[Shutdown] Force-completing stuck job: ${id}`);
this.inFlightJobs.delete(id);
}
}
await this.sleep(100);
}
// 2. Drain buffers
if (this.drainCallbacks.length > 0) {
console.log(`[Shutdown] Draining ${this.drainCallbacks.length} buffers...`);
let totalFlushed = 0, totalDropped = 0;
for (const drain of this.drainCallbacks) {
try {
const result = await Promise.race([
drain(),
this.sleep(10000).then(() => ({ flushed: 0, dropped: 0 })),
]);
totalFlushed += result.flushed;
totalDropped += result.dropped;
} catch (err) {
console.error('[Shutdown] Drain error:', err);
}
}
console.log(`[Shutdown] Drained: ${totalFlushed} flushed, ${totalDropped} dropped`);
}
// 3. Run cleanup callbacks
console.log(`[Shutdown] Running ${this.callbacks.length} cleanup callbacks...`);
for (const callback of this.callbacks) {
try {
await Promise.race([
callback(),
this.sleep(5000).then(() => { throw new Error('Callback timeout'); }),
]);
} catch (err) {
console.error('[Shutdown] Callback error:', err);
}
}
console.log(`[Shutdown] Complete in ${Date.now() - startTime}ms`);
process.exit(0);
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Singleton
let instance: GracefulShutdown | null = null;
export function getShutdownHandler(): GracefulShutdown {
if (!instance) instance = new GracefulShutdown();
return instance;
}typescript
// graceful-shutdown.ts
type ShutdownCallback = () => Promise<void>;
type DrainCallback = () => Promise<{ flushed: number; dropped: number }>;
interface InFlightJob {
id: string;
workerName: string;
startedAt: Date;
timeoutMs: number;
}
class GracefulShutdown {
private isShuttingDown = false;
private shutdownPromise: Promise<void> | null = null;
private callbacks: ShutdownCallback[] = [];
private drainCallbacks: DrainCallback[] = [];
private inFlightJobs = new Map<string, InFlightJob>();
private shutdownTimeoutMs = 30000;
registerSignals(): void {
const handler = (signal: string) => {
console.log(`[Shutdown] Received ${signal}`);
this.shutdown(`Signal: ${signal}`);
};
process.on('SIGTERM', () => handler('SIGTERM'));
process.on('SIGINT', () => handler('SIGINT'));
}
onShutdown(callback: ShutdownCallback): void {
this.callbacks.push(callback);
}
onDrain(callback: DrainCallback): void {
this.drainCallbacks.push(callback);
}
trackJob(id: string, workerName: string, timeoutMs = 60000): void {
if (this.isShuttingDown) return;
this.inFlightJobs.set(id, { id, workerName, startedAt: new Date(), timeoutMs });
}
completeJob(id: string): void {
this.inFlightJobs.delete(id);
}
isShutdownInProgress(): boolean {
return this.isShuttingDown;
}
async shutdown(reason: string): Promise<void> {
if (this.shutdownPromise) return this.shutdownPromise;
this.isShuttingDown = true;
console.log(`[Shutdown] Starting: ${reason}`);
this.shutdownPromise = this.performShutdown();
return this.shutdownPromise;
}
private async performShutdown(): Promise<void> {
const startTime = Date.now();
// 1. 等待运行中任务完成
console.log(`[Shutdown] Waiting for ${this.inFlightJobs.size} jobs...`);
while (this.inFlightJobs.size > 0) {
if (Date.now() - startTime > this.shutdownTimeoutMs) {
console.log(`[Shutdown] Timeout! ${this.inFlightJobs.size} jobs still running`);
break;
}
// 强制完成卡住的任务
const now = Date.now();
for (const [id, job] of this.inFlightJobs) {
if (now - job.startedAt.getTime() > job.timeoutMs) {
console.log(`[Shutdown] Force-completing stuck job: ${id}`);
this.inFlightJobs.delete(id);
}
}
await this.sleep(100);
}
// 2. 排空缓冲区
if (this.drainCallbacks.length > 0) {
console.log(`[Shutdown] Draining ${this.drainCallbacks.length} buffers...`);
let totalFlushed = 0, totalDropped = 0;
for (const drain of this.drainCallbacks) {
try {
const result = await Promise.race([
drain(),
this.sleep(10000).then(() => ({ flushed: 0, dropped: 0 })),
]);
totalFlushed += result.flushed;
totalDropped += result.dropped;
} catch (err) {
console.error('[Shutdown] Drain error:', err);
}
}
console.log(`[Shutdown] Drained: ${totalFlushed} flushed, ${totalDropped} dropped`);
}
// 3. 执行清理回调
console.log(`[Shutdown] Running ${this.callbacks.length} cleanup callbacks...`);
for (const callback of this.callbacks) {
try {
await Promise.race([
callback(),
this.sleep(5000).then(() => { throw new Error('Callback timeout'); }),
]);
} catch (err) {
console.error('[Shutdown] Callback error:', err);
}
}
console.log(`[Shutdown] Complete in ${Date.now() - startTime}ms`);
process.exit(0);
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// 单例模式
let instance: GracefulShutdown | null = null;
export function getShutdownHandler(): GracefulShutdown {
if (!instance) instance = new GracefulShutdown();
return instance;
}Python Implementation
Python 实现
python
undefinedpython
undefinedgraceful_shutdown.py
graceful_shutdown.py
import asyncio
import signal
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable, Awaitable, Dict, List
@dataclass
class InFlightJob:
id: str
worker_name: str
started_at: datetime
timeout_seconds: float
ShutdownCallback = Callable[[], Awaitable[None]]
DrainCallback = Callable[[], Awaitable[Dict[str, int]]]
class GracefulShutdown:
def init(self, timeout_seconds: float = 30.0):
self._is_shutting_down = False
self._shutdown_task: asyncio.Task | None = None
self._callbacks: List[ShutdownCallback] = []
self._drain_callbacks: List[DrainCallback] = []
self._in_flight: Dict[str, InFlightJob] = {}
self._timeout = timeout_seconds
def register_signals(self):
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda s=sig: asyncio.create_task(self.shutdown(f"Signal: {s.name}"))
)
def on_shutdown(self, callback: ShutdownCallback):
self._callbacks.append(callback)
def on_drain(self, callback: DrainCallback):
self._drain_callbacks.append(callback)
def track_job(self, job_id: str, worker_name: str, timeout_seconds: float = 60.0):
if self._is_shutting_down:
return
self._in_flight[job_id] = InFlightJob(
id=job_id,
worker_name=worker_name,
started_at=datetime.now(),
timeout_seconds=timeout_seconds,
)
def complete_job(self, job_id: str):
self._in_flight.pop(job_id, None)
@property
def is_shutting_down(self) -> bool:
return self._is_shutting_down
async def shutdown(self, reason: str):
if self._shutdown_task:
return await self._shutdown_task
self._is_shutting_down = True
print(f"[Shutdown] Starting: {reason}")
self._shutdown_task = asyncio.create_task(self._perform_shutdown())
return await self._shutdown_task
async def _perform_shutdown(self):
start_time = datetime.now()
# Wait for in-flight jobs
print(f"[Shutdown] Waiting for {len(self._in_flight)} jobs...")
while self._in_flight:
elapsed = (datetime.now() - start_time).total_seconds()
if elapsed > self._timeout:
print(f"[Shutdown] Timeout! {len(self._in_flight)} jobs still running")
break
# Force-complete stuck jobs
now = datetime.now()
stuck = [
job_id for job_id, job in self._in_flight.items()
if (now - job.started_at).total_seconds() > job.timeout_seconds
]
for job_id in stuck:
print(f"[Shutdown] Force-completing stuck job: {job_id}")
self._in_flight.pop(job_id)
await asyncio.sleep(0.1)
# Drain buffers
for drain in self._drain_callbacks:
try:
result = await asyncio.wait_for(drain(), timeout=10.0)
print(f"[Shutdown] Drained: {result}")
except Exception as e:
print(f"[Shutdown] Drain error: {e}")
# Run cleanup callbacks
for callback in self._callbacks:
try:
await asyncio.wait_for(callback(), timeout=5.0)
except Exception as e:
print(f"[Shutdown] Callback error: {e}")
elapsed = (datetime.now() - start_time).total_seconds()
print(f"[Shutdown] Complete in {elapsed:.1f}s")import asyncio
import signal
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable, Awaitable, Dict, List
@dataclass
class InFlightJob:
id: str
worker_name: str
started_at: datetime
timeout_seconds: float
ShutdownCallback = Callable[[], Awaitable[None]]
DrainCallback = Callable[[], Awaitable[Dict[str, int]]]
class GracefulShutdown:
def init(self, timeout_seconds: float = 30.0):
self._is_shutting_down = False
self._shutdown_task: asyncio.Task | None = None
self._callbacks: List[ShutdownCallback] = []
self._drain_callbacks: List[DrainCallback] = []
self._in_flight: Dict[str, InFlightJob] = {}
self._timeout = timeout_seconds
def register_signals(self):
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda s=sig: asyncio.create_task(self.shutdown(f"Signal: {s.name}"))
)
def on_shutdown(self, callback: ShutdownCallback):
self._callbacks.append(callback)
def on_drain(self, callback: DrainCallback):
self._drain_callbacks.append(callback)
def track_job(self, job_id: str, worker_name: str, timeout_seconds: float = 60.0):
if self._is_shutting_down:
return
self._in_flight[job_id] = InFlightJob(
id=job_id,
worker_name=worker_name,
started_at=datetime.now(),
timeout_seconds=timeout_seconds,
)
def complete_job(self, job_id: str):
self._in_flight.pop(job_id, None)
@property
def is_shutting_down(self) -> bool:
return self._is_shutting_down
async def shutdown(self, reason: str):
if self._shutdown_task:
return await self._shutdown_task
self._is_shutting_down = True
print(f"[Shutdown] Starting: {reason}")
self._shutdown_task = asyncio.create_task(self._perform_shutdown())
return await self._shutdown_task
async def _perform_shutdown(self):
start_time = datetime.now()
# 等待运行中任务完成
print(f"[Shutdown] Waiting for {len(self._in_flight)} jobs...")
while self._in_flight:
elapsed = (datetime.now() - start_time).total_seconds()
if elapsed > self._timeout:
print(f"[Shutdown] Timeout! {len(self._in_flight)} jobs still running")
break
# 强制完成卡住的任务
now = datetime.now()
stuck = [
job_id for job_id, job in self._in_flight.items()
if (now - job.started_at).total_seconds() > job.timeout_seconds
]
for job_id in stuck:
print(f"[Shutdown] Force-completing stuck job: {job_id}")
self._in_flight.pop(job_id)
await asyncio.sleep(0.1)
# 排空缓冲区
for drain in self._drain_callbacks:
try:
result = await asyncio.wait_for(drain(), timeout=10.0)
print(f"[Shutdown] Drained: {result}")
except Exception as e:
print(f"[Shutdown] Drain error: {e}")
# 执行清理回调
for callback in self._callbacks:
try:
await asyncio.wait_for(callback(), timeout=5.0)
except Exception as e:
print(f"[Shutdown] Callback error: {e}")
elapsed = (datetime.now() - start_time).total_seconds()
print(f"[Shutdown] Complete in {elapsed:.1f}s")Singleton
单例模式
_instance: GracefulShutdown | None = None
def get_shutdown_handler() -> GracefulShutdown:
global _instance
if _instance is None:
_instance = GracefulShutdown()
return _instance
undefined_instance: GracefulShutdown | None = None
def get_shutdown_handler() -> GracefulShutdown:
global _instance
if _instance is None:
_instance = GracefulShutdown()
return _instance
undefinedUsage Examples
使用示例
Basic Setup
基础配置
typescript
const shutdown = getShutdownHandler();
shutdown.registerSignals();
// Register cleanup
shutdown.onShutdown(async () => {
await database.close();
await redis.quit();
});
// Register buffer drain
shutdown.onDrain(async () => {
return backpressureBuffer.flush();
});typescript
const shutdown = getShutdownHandler();
shutdown.registerSignals();
// 注册清理操作
shutdown.onShutdown(async () => {
await database.close();
await redis.quit();
});
// 注册缓冲区排空操作
shutdown.onDrain(async () => {
return backpressureBuffer.flush();
});Job Tracking
任务跟踪
typescript
async function processJob(jobId: string) {
const shutdown = getShutdownHandler();
// Don't start new work during shutdown
if (shutdown.isShutdownInProgress()) {
return;
}
shutdown.trackJob(jobId, 'my-worker', 30000);
try {
await doWork(jobId);
} finally {
shutdown.completeJob(jobId);
}
}typescript
async function processJob(jobId: string) {
const shutdown = getShutdownHandler();
// 关闭过程中不启动新任务
if (shutdown.isShutdownInProgress()) {
return;
}
shutdown.trackJob(jobId, 'my-worker', 30000);
try {
await doWork(jobId);
} finally {
shutdown.completeJob(jobId);
}
}With Express/Fastify
结合Express/Fastify使用
typescript
const shutdown = getShutdownHandler();
shutdown.registerSignals();
// Stop accepting new requests
shutdown.onShutdown(async () => {
await new Promise<void>((resolve) => {
server.close(() => resolve());
});
});
// Close database connections
shutdown.onShutdown(async () => {
await prisma.$disconnect();
});typescript
const shutdown = getShutdownHandler();
shutdown.registerSignals();
// 停止接收新请求
shutdown.onShutdown(async () => {
await new Promise<void>((resolve) => {
server.close(() => resolve());
});
});
// 关闭数据库连接
shutdown.onShutdown(async () => {
await prisma.$disconnect();
});Best Practices
最佳实践
- Register signals early - First thing in app startup
- Track all in-flight work - With appropriate timeouts
- Drain before cleanup - Buffers first, connections last
- Set reasonable timeouts - Don't hang forever
- Check before new work - Don't start during shutdown
- 尽早注册信号处理器 - 在应用启动时首先完成
- 跟踪所有运行中任务 - 设置合理的超时时间
- 先排空再清理 - 先处理缓冲区,再关闭连接
- 设置合理超时时间 - 避免无限挂起
- 启动新任务前检查状态 - 关闭过程中不启动新任务
Common Mistakes
常见错误
- Not registering signal handlers
- Starting new work during shutdown
- No timeout on cleanup callbacks
- Forgetting to track in-flight jobs
- Closing connections before draining buffers
- 未注册信号处理器
- 关闭过程中启动新任务
- 清理回调未设置超时
- 忘记跟踪运行中任务
- 未排空缓冲区就关闭连接
Related Skills
相关技能
- Background Jobs
- Backpressure
- Health Checks
- 后台任务
- 背压处理
- 健康检查