backpressure
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseBackpressure Management
背压管理
Prevent OOM crashes when producers outpace consumers.
当生产者速度超过消费者时,防止OOM崩溃。
When to Use This Skill
何时使用该技能
- Database writes slower than event ingestion
- Memory filling up with queued items
- Need to handle traffic spikes gracefully
- Want to drop low-priority data under load
- 数据库写入速度慢于事件摄入
- 内存被排队项占满
- 需要优雅处理流量峰值
- 希望在负载下丢弃低优先级数据
Core Concepts
核心概念
- Bounded buffer - Fixed-size queue prevents unbounded growth
- Watermarks - Thresholds trigger state changes
- Strategies - Block, drop oldest, drop newest, or sample
- Adaptive flushing - Adjust rate based on downstream health
- 有界缓冲区 - 固定大小的队列防止无限制增长
- 水位线 - 阈值触发状态变更
- 策略 - 阻塞、丢弃最旧数据、丢弃最新数据或采样
- 自适应刷新 - 根据下游服务健康状况调整速率
State Machine
状态机
NORMAL (< 50%) → ELEVATED (50-80%) → CRITICAL (80-100%) → BLOCKED (100%)
↑ │
└──────────────────────────────────────────────────────────┘
(buffer drains)NORMAL (< 50%) → ELEVATED (50-80%) → CRITICAL (80-100%) → BLOCKED (100%)
↑ │
└──────────────────────────────────────────────────────────┘
(buffer drains)TypeScript Implementation
TypeScript 实现
Types
类型定义
typescript
// types.ts
export enum BackpressureState {
NORMAL = 'normal',
ELEVATED = 'elevated',
CRITICAL = 'critical',
BLOCKED = 'blocked',
DRAINING = 'draining',
}
export enum BackpressureStrategy {
BLOCK = 'block',
DROP_OLDEST = 'drop_oldest',
DROP_NEWEST = 'drop_newest',
SAMPLE = 'sample',
}
export interface BackpressureConfig {
maxBufferSize: number;
highWatermark: number; // 0-1
lowWatermark: number; // 0-1
strategy: BackpressureStrategy;
sampleRate?: number;
maxBlockTimeMs?: number;
batchSize: number;
minFlushIntervalMs: number;
maxFlushIntervalMs: number;
targetLatencyMs: number;
}
export interface FlushResult {
success: number;
failed: number;
errors: Error[];
}
export type FlushFunction<T> = (items: T[]) => Promise<FlushResult>;typescript
// types.ts
export enum BackpressureState {
NORMAL = 'normal',
ELEVATED = 'elevated',
CRITICAL = 'critical',
BLOCKED = 'blocked',
DRAINING = 'draining',
}
export enum BackpressureStrategy {
BLOCK = 'block',
DROP_OLDEST = 'drop_oldest',
DROP_NEWEST = 'drop_newest',
SAMPLE = 'sample',
}
export interface BackpressureConfig {
maxBufferSize: number;
highWatermark: number; // 0-1
lowWatermark: number; // 0-1
strategy: BackpressureStrategy;
sampleRate?: number;
maxBlockTimeMs?: number;
batchSize: number;
minFlushIntervalMs: number;
maxFlushIntervalMs: number;
targetLatencyMs: number;
}
export interface FlushResult {
success: number;
failed: number;
errors: Error[];
}
export type FlushFunction<T> = (items: T[]) => Promise<FlushResult>;Bounded Buffer
有界缓冲区
typescript
// buffer.ts
export class BoundedBuffer<T> {
private items: T[] = [];
constructor(private readonly maxSize: number) {}
get size(): number { return this.items.length; }
get capacity(): number { return this.maxSize; }
get utilization(): number { return this.items.length / this.maxSize; }
isFull(): boolean { return this.items.length >= this.maxSize; }
isEmpty(): boolean { return this.items.length === 0; }
push(item: T): boolean {
if (this.isFull()) return false;
this.items.push(item);
return true;
}
pushWithEviction(item: T): T | null {
const evicted = this.isFull() ? this.items.shift() ?? null : null;
this.items.push(item);
return evicted;
}
take(count: number): T[] {
return this.items.splice(0, Math.min(count, this.items.length));
}
clear(): T[] {
const all = this.items;
this.items = [];
return all;
}
}typescript
// buffer.ts
export class BoundedBuffer<T> {
private items: T[] = [];
constructor(private readonly maxSize: number) {}
get size(): number { return this.items.length; }
get capacity(): number { return this.maxSize; }
get utilization(): number { return this.items.length / this.maxSize; }
isFull(): boolean { return this.items.length >= this.maxSize; }
isEmpty(): boolean { return this.items.length === 0; }
push(item: T): boolean {
if (this.isFull()) return false;
this.items.push(item);
return true;
}
pushWithEviction(item: T): T | null {
const evicted = this.isFull() ? this.items.shift() ?? null : null;
this.items.push(item);
return evicted;
}
take(count: number): T[] {
return this.items.splice(0, Math.min(count, this.items.length));
}
clear(): T[] {
const all = this.items;
this.items = [];
return all;
}
}Backpressure Controller
背压控制器
typescript
// controller.ts
import { BoundedBuffer } from './buffer';
import {
BackpressureState,
BackpressureStrategy,
BackpressureConfig,
FlushFunction,
} from './types';
const DEFAULT_CONFIG: BackpressureConfig = {
maxBufferSize: 10000,
highWatermark: 0.8,
lowWatermark: 0.5,
strategy: BackpressureStrategy.DROP_OLDEST,
batchSize: 100,
minFlushIntervalMs: 100,
maxFlushIntervalMs: 30000,
targetLatencyMs: 500,
};
export class BackpressureController<T> {
private buffer: BoundedBuffer<T>;
private state: BackpressureState = BackpressureState.NORMAL;
private config: BackpressureConfig;
private flushFn: FlushFunction<T>;
private flushInterval: NodeJS.Timeout | null = null;
private currentFlushIntervalMs: number;
private running = false;
// Metrics
private eventsAccepted = 0;
private eventsDropped = 0;
private eventsFlushed = 0;
private lastFlushLatencyMs = 0;
constructor(flushFn: FlushFunction<T>, config: Partial<BackpressureConfig> = {}) {
this.config = { ...DEFAULT_CONFIG, ...config };
this.buffer = new BoundedBuffer(this.config.maxBufferSize);
this.flushFn = flushFn;
this.currentFlushIntervalMs = this.config.minFlushIntervalMs;
}
start(): void {
if (this.running) return;
this.running = true;
this.scheduleFlush();
}
stop(): void {
this.running = false;
if (this.flushInterval) {
clearTimeout(this.flushInterval);
this.flushInterval = null;
}
}
async push(item: T): Promise<boolean> {
switch (this.config.strategy) {
case BackpressureStrategy.BLOCK:
if (this.state === BackpressureState.BLOCKED) {
const waited = await this.waitForSpace();
if (!waited) {
this.eventsDropped++;
return false;
}
}
break;
case BackpressureStrategy.DROP_NEWEST:
if (this.buffer.isFull()) {
this.eventsDropped++;
return false;
}
break;
case BackpressureStrategy.DROP_OLDEST:
if (this.buffer.isFull()) {
this.buffer.pushWithEviction(item);
this.eventsDropped++;
this.eventsAccepted++;
this.updateState();
return true;
}
break;
case BackpressureStrategy.SAMPLE:
if (this.state !== BackpressureState.NORMAL) {
const sampleRate = this.config.sampleRate || 10;
if (this.eventsAccepted % sampleRate !== 0) {
this.eventsDropped++;
return false;
}
}
break;
}
const accepted = this.buffer.push(item);
if (accepted) {
this.eventsAccepted++;
} else {
this.eventsDropped++;
}
this.updateState();
return accepted;
}
async drain(): Promise<void> {
this.state = BackpressureState.DRAINING;
while (!this.buffer.isEmpty()) {
await this.flush();
}
}
getMetrics() {
return {
state: this.state,
bufferSize: this.buffer.size,
bufferUtilization: this.buffer.utilization,
eventsAccepted: this.eventsAccepted,
eventsDropped: this.eventsDropped,
eventsFlushed: this.eventsFlushed,
lastFlushLatencyMs: this.lastFlushLatencyMs,
};
}
private async flush(): Promise<void> {
if (this.buffer.isEmpty()) return;
const batch = this.buffer.take(this.config.batchSize);
const startTime = Date.now();
try {
const result = await this.flushFn(batch);
this.eventsFlushed += result.success;
this.lastFlushLatencyMs = Date.now() - startTime;
this.adaptFlushInterval();
} catch (error) {
console.error('[Backpressure] Flush error:', error);
}
this.updateState();
}
private scheduleFlush(): void {
if (!this.running) return;
this.flushInterval = setTimeout(async () => {
await this.flush();
this.scheduleFlush();
}, this.currentFlushIntervalMs);
}
private adaptFlushInterval(): void {
const { targetLatencyMs, minFlushIntervalMs, maxFlushIntervalMs } = this.config;
if (this.lastFlushLatencyMs > targetLatencyMs * 1.5) {
this.currentFlushIntervalMs = Math.min(
this.currentFlushIntervalMs * 1.5,
maxFlushIntervalMs
);
} else if (this.lastFlushLatencyMs < targetLatencyMs * 0.5) {
this.currentFlushIntervalMs = Math.max(
this.currentFlushIntervalMs * 0.8,
minFlushIntervalMs
);
}
}
private updateState(): void {
const util = this.buffer.utilization;
if (this.state === BackpressureState.DRAINING) return;
if (util >= 1.0) {
this.state = BackpressureState.BLOCKED;
} else if (util >= this.config.highWatermark) {
this.state = BackpressureState.CRITICAL;
} else if (util >= this.config.lowWatermark) {
this.state = BackpressureState.ELEVATED;
} else {
this.state = BackpressureState.NORMAL;
}
}
private async waitForSpace(): Promise<boolean> {
const maxWait = this.config.maxBlockTimeMs || 5000;
const startTime = Date.now();
while (Date.now() - startTime < maxWait) {
if (!this.buffer.isFull()) return true;
await new Promise(r => setTimeout(r, 50));
}
return false;
}
}typescript
// controller.ts
import { BoundedBuffer } from './buffer';
import {
BackpressureState,
BackpressureStrategy,
BackpressureConfig,
FlushFunction,
} from './types';
const DEFAULT_CONFIG: BackpressureConfig = {
maxBufferSize: 10000,
highWatermark: 0.8,
lowWatermark: 0.5,
strategy: BackpressureStrategy.DROP_OLDEST,
batchSize: 100,
minFlushIntervalMs: 100,
maxFlushIntervalMs: 30000,
targetLatencyMs: 500,
};
export class BackpressureController<T> {
private buffer: BoundedBuffer<T>;
private state: BackpressureState = BackpressureState.NORMAL;
private config: BackpressureConfig;
private flushFn: FlushFunction<T>;
private flushInterval: NodeJS.Timeout | null = null;
private currentFlushIntervalMs: number;
private running = false;
// 指标
private eventsAccepted = 0;
private eventsDropped = 0;
private eventsFlushed = 0;
private lastFlushLatencyMs = 0;
constructor(flushFn: FlushFunction<T>, config: Partial<BackpressureConfig> = {}) {
this.config = { ...DEFAULT_CONFIG, ...config };
this.buffer = new BoundedBuffer(this.config.maxBufferSize);
this.flushFn = flushFn;
this.currentFlushIntervalMs = this.config.minFlushIntervalMs;
}
start(): void {
if (this.running) return;
this.running = true;
this.scheduleFlush();
}
stop(): void {
this.running = false;
if (this.flushInterval) {
clearTimeout(this.flushInterval);
this.flushInterval = null;
}
}
async push(item: T): Promise<boolean> {
switch (this.config.strategy) {
case BackpressureStrategy.BLOCK:
if (this.state === BackpressureState.BLOCKED) {
const waited = await this.waitForSpace();
if (!waited) {
this.eventsDropped++;
return false;
}
}
break;
case BackpressureStrategy.DROP_NEWEST:
if (this.buffer.isFull()) {
this.eventsDropped++;
return false;
}
break;
case BackpressureStrategy.DROP_OLDEST:
if (this.buffer.isFull()) {
this.buffer.pushWithEviction(item);
this.eventsDropped++;
this.eventsAccepted++;
this.updateState();
return true;
}
break;
case BackpressureStrategy.SAMPLE:
if (this.state !== BackpressureState.NORMAL) {
const sampleRate = this.config.sampleRate || 10;
if (this.eventsAccepted % sampleRate !== 0) {
this.eventsDropped++;
return false;
}
}
break;
}
const accepted = this.buffer.push(item);
if (accepted) {
this.eventsAccepted++;
} else {
this.eventsDropped++;
}
this.updateState();
return accepted;
}
async drain(): Promise<void> {
this.state = BackpressureState.DRAINING;
while (!this.buffer.isEmpty()) {
await this.flush();
}
}
getMetrics() {
return {
state: this.state,
bufferSize: this.buffer.size,
bufferUtilization: this.buffer.utilization,
eventsAccepted: this.eventsAccepted,
eventsDropped: this.eventsDropped,
eventsFlushed: this.eventsFlushed,
lastFlushLatencyMs: this.lastFlushLatencyMs,
};
}
private async flush(): Promise<void> {
if (this.buffer.isEmpty()) return;
const batch = this.buffer.take(this.config.batchSize);
const startTime = Date.now();
try {
const result = await this.flushFn(batch);
this.eventsFlushed += result.success;
this.lastFlushLatencyMs = Date.now() - startTime;
this.adaptFlushInterval();
} catch (error) {
console.error('[Backpressure] Flush error:', error);
}
this.updateState();
}
private scheduleFlush(): void {
if (!this.running) return;
this.flushInterval = setTimeout(async () => {
await this.flush();
this.scheduleFlush();
}, this.currentFlushIntervalMs);
}
private adaptFlushInterval(): void {
const { targetLatencyMs, minFlushIntervalMs, maxFlushIntervalMs } = this.config;
if (this.lastFlushLatencyMs > targetLatencyMs * 1.5) {
this.currentFlushIntervalMs = Math.min(
this.currentFlushIntervalMs * 1.5,
maxFlushIntervalMs
);
} else if (this.lastFlushLatencyMs < targetLatencyMs * 0.5) {
this.currentFlushIntervalMs = Math.max(
this.currentFlushIntervalMs * 0.8,
minFlushIntervalMs
);
}
}
private updateState(): void {
const util = this.buffer.utilization;
if (this.state === BackpressureState.DRAINING) return;
if (util >= 1.0) {
this.state = BackpressureState.BLOCKED;
} else if (util >= this.config.highWatermark) {
this.state = BackpressureState.CRITICAL;
} else if (util >= this.config.lowWatermark) {
this.state = BackpressureState.ELEVATED;
} else {
this.state = BackpressureState.NORMAL;
}
}
private async waitForSpace(): Promise<boolean> {
const maxWait = this.config.maxBlockTimeMs || 5000;
const startTime = Date.now();
while (Date.now() - startTime < maxWait) {
if (!this.buffer.isFull()) return true;
await new Promise(r => setTimeout(r, 50));
}
return false;
}
}Usage Examples
使用示例
typescript
// Create controller
const controller = new BackpressureController(
async (items) => {
const result = await db.batchInsert('events', items);
return { success: result.inserted, failed: 0, errors: [] };
},
{
strategy: BackpressureStrategy.DROP_OLDEST,
maxBufferSize: 10000,
batchSize: 100,
}
);
// Start processing
controller.start();
// Push events
await controller.push(event);
// On shutdown
await controller.drain();
controller.stop();typescript
// 创建控制器
const controller = new BackpressureController(
async (items) => {
const result = await db.batchInsert('events', items);
return { success: result.inserted, failed: 0, errors: [] };
},
{
strategy: BackpressureStrategy.DROP_OLDEST,
maxBufferSize: 10000,
batchSize: 100,
}
);
// 启动处理
controller.start();
// 推送事件
await controller.push(event);
// 关闭时处理
await controller.drain();
controller.stop();Strategy Selection
策略选择
| Strategy | Use Case | Trade-off |
|---|---|---|
| BLOCK | Critical data | Producers slow down |
| DROP_OLDEST | Time-series | Lose historical data |
| DROP_NEWEST | Batch jobs | Reject new work |
| SAMPLE | Telemetry | Statistical accuracy |
| 策略 | 使用场景 | 权衡 |
|---|---|---|
| BLOCK | 关键数据 | 生产者速度变慢 |
| DROP_OLDEST | 时间序列数据 | 丢失历史数据 |
| DROP_NEWEST | 批处理任务 | 拒绝新任务 |
| SAMPLE | 遥测数据 | 统计精度下降 |
Best Practices
最佳实践
- Size buffers for memory - Don't exceed available RAM
- Match strategy to data - Critical data = BLOCK
- Monitor drop rates - Alert on high drops
- Drain on shutdown - Don't lose buffered data
- Combine with circuit breaker - Protect flush function
- 根据内存大小设置缓冲区 - 不要超过可用RAM
- 根据数据类型匹配策略 - 关键数据使用BLOCK策略
- 监控丢弃率 - 丢弃率过高时触发告警
- 关闭时排空缓冲区 - 不丢失缓冲数据
- 与断路器结合使用 - 保护刷新函数
Common Mistakes
常见错误
- Unbounded queues (OOM crash)
- No metrics on drops
- Not draining on shutdown
- Wrong strategy for data criticality
- No adaptive rate adjustment
- 使用无界队列(导致OOM崩溃)
- 未监控数据丢弃情况
- 关闭时未排空缓冲区
- 为数据选择了错误的策略
- 未实现自适应速率调整
Related Skills
相关技能
- Circuit Breaker
- Background Jobs
- Health Checks
- 断路器
- 后台任务
- 健康检查