checkpoint-resume
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCheckpoint & Resume Processing
检查点与恢复处理
Exactly-once processing semantics with distributed coordination for file-based data pipelines.
面向基于文件的数据管道的精确一次处理语义与分布式协调。
When to Use This Skill
何时使用该技能
- Processing large file batches across multiple workers
- Need to resume after worker crashes
- Preventing duplicate processing of the same file
- Tracking processing progress and statistics
- 在多个工作节点间处理大型文件批次
- 需要在工作节点崩溃后恢复处理
- 防止同一文件被重复处理
- 追踪处理进度与统计数据
Core Concepts
核心概念
The pattern provides:
- Atomic file claiming (only one worker processes each file)
- Status tracking (pending, processing, completed, failed)
- Automatic retry with configurable limits
- In-memory fallback for development/testing
┌──────────┐ ┌─────────────────┐ ┌──────────┐
│ Worker 1 │────▶│ Checkpoint DB │◀────│ Worker 2 │
└──────────┘ └─────────────────┘ └──────────┘
│ │ │
▼ ▼ ▼
claim_file() atomic claims claim_file()
process() status tracking process()
complete() retry logic complete()该模式提供:
- 原子性文件认领(每个文件仅由一个工作节点处理)
- 状态追踪(待处理、处理中、已完成、失败)
- 可配置限制的自动重试
- 用于开发/测试的内存回退机制
┌──────────┐ ┌─────────────────┐ ┌──────────┐
│ Worker 1 │────▶│ Checkpoint DB │◀────│ Worker 2 │
└──────────┘ └─────────────────┘ └──────────┘
│ │ │
▼ ▼ ▼
claim_file() atomic claims claim_file()
process() status tracking process()
complete() retry logic complete()Implementation
实现方案
Database Schema (PostgreSQL/Supabase)
数据库模式(PostgreSQL/Supabase)
sql
CREATE TABLE file_checkpoints (
file_url TEXT PRIMARY KEY,
file_type TEXT NOT NULL,
file_timestamp TIMESTAMPTZ NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
records_total INTEGER DEFAULT 0,
records_filtered INTEGER DEFAULT 0,
records_persisted INTEGER DEFAULT 0,
processing_time_ms INTEGER DEFAULT 0,
error_message TEXT,
retry_count INTEGER DEFAULT 0,
processed_by TEXT,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_checkpoints_status ON file_checkpoints(status);
-- Atomic claim function
CREATE OR REPLACE FUNCTION claim_file(
p_file_url TEXT,
p_file_type TEXT,
p_file_timestamp TIMESTAMPTZ,
p_worker_id TEXT
) RETURNS BOOLEAN AS $$
DECLARE
v_claimed BOOLEAN := FALSE;
BEGIN
-- Try to insert new record
INSERT INTO file_checkpoints (file_url, file_type, file_timestamp, status, processed_by, started_at)
VALUES (p_file_url, p_file_type, p_file_timestamp, 'processing', p_worker_id, NOW())
ON CONFLICT (file_url) DO NOTHING;
IF FOUND THEN
v_claimed := TRUE;
ELSE
-- Check if we can retry a failed file
UPDATE file_checkpoints
SET status = 'processing', processed_by = p_worker_id, started_at = NOW(), retry_count = retry_count + 1
WHERE file_url = p_file_url AND status = 'failed' AND retry_count < 3;
v_claimed := FOUND;
END IF;
RETURN v_claimed;
END;
$$ LANGUAGE plpgsql;sql
CREATE TABLE file_checkpoints (
file_url TEXT PRIMARY KEY,
file_type TEXT NOT NULL,
file_timestamp TIMESTAMPTZ NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
records_total INTEGER DEFAULT 0,
records_filtered INTEGER DEFAULT 0,
records_persisted INTEGER DEFAULT 0,
processing_time_ms INTEGER DEFAULT 0,
error_message TEXT,
retry_count INTEGER DEFAULT 0,
processed_by TEXT,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_checkpoints_status ON file_checkpoints(status);
-- Atomic claim function
CREATE OR REPLACE FUNCTION claim_file(
p_file_url TEXT,
p_file_type TEXT,
p_file_timestamp TIMESTAMPTZ,
p_worker_id TEXT
) RETURNS BOOLEAN AS $$
DECLARE
v_claimed BOOLEAN := FALSE;
BEGIN
-- Try to insert new record
INSERT INTO file_checkpoints (file_url, file_type, file_timestamp, status, processed_by, started_at)
VALUES (p_file_url, p_file_type, p_file_timestamp, 'processing', p_worker_id, NOW())
ON CONFLICT (file_url) DO NOTHING;
IF FOUND THEN
v_claimed := TRUE;
ELSE
-- Check if we can retry a failed file
UPDATE file_checkpoints
SET status = 'processing', processed_by = p_worker_id, started_at = NOW(), retry_count = retry_count + 1
WHERE file_url = p_file_url AND status = 'failed' AND retry_count < 3;
v_claimed := FOUND;
END IF;
RETURN v_claimed;
END;
$$ LANGUAGE plpgsql;TypeScript
TypeScript
typescript
interface FileCheckpoint {
fileUrl: string;
fileType: string;
fileTimestamp: Date;
status: 'pending' | 'processing' | 'completed' | 'failed';
recordsTotal: number;
recordsFiltered: number;
recordsPersisted: number;
processingTimeMs: number;
errorMessage?: string;
retryCount: number;
processedBy?: string;
}
interface ProcessingStats {
totalRows: number;
filteredRows: number;
persistedRows: number;
durationMs: number;
}
class CheckpointManager {
private workerId: string;
private inMemory = new Map<string, FileCheckpoint>();
private useInMemory = false;
constructor(
private getClient: () => DatabaseClient | null,
workerId?: string
) {
this.workerId = workerId ||
`worker_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
}
async claimFile(
fileUrl: string,
fileType: string,
fileTimestamp: Date
): Promise<boolean> {
const client = this.getClient();
if (client) {
try {
const result = await client.rpc('claim_file', {
p_file_url: fileUrl,
p_file_type: fileType,
p_file_timestamp: fileTimestamp.toISOString(),
p_worker_id: this.workerId,
});
if (!result.error) return result.data === true;
} catch (e) {
console.warn('Database unavailable, using in-memory');
}
}
// Fallback to in-memory (single-worker mode)
this.useInMemory = true;
if (this.inMemory.has(fileUrl)) {
const existing = this.inMemory.get(fileUrl)!;
if (existing.status !== 'failed' || existing.retryCount >= 3) {
return false;
}
}
this.inMemory.set(fileUrl, {
fileUrl,
fileType,
fileTimestamp,
status: 'processing',
recordsTotal: 0,
recordsFiltered: 0,
recordsPersisted: 0,
processingTimeMs: 0,
retryCount: 0,
processedBy: this.workerId,
});
return true;
}
async completeFile(fileUrl: string, stats: ProcessingStats): Promise<void> {
const client = this.getClient();
if (client && !this.useInMemory) {
await client.rpc('complete_file', {
p_file_url: fileUrl,
p_records_total: stats.totalRows,
p_records_filtered: stats.filteredRows,
p_records_persisted: stats.persistedRows,
p_processing_time_ms: stats.durationMs,
});
return;
}
const checkpoint = this.inMemory.get(fileUrl);
if (checkpoint) {
checkpoint.status = 'completed';
checkpoint.recordsTotal = stats.totalRows;
checkpoint.recordsFiltered = stats.filteredRows;
checkpoint.recordsPersisted = stats.persistedRows;
checkpoint.processingTimeMs = stats.durationMs;
}
}
async failFile(fileUrl: string, errorMessage: string): Promise<void> {
const client = this.getClient();
if (client && !this.useInMemory) {
await client
.from('file_checkpoints')
.update({ status: 'failed', error_message: errorMessage })
.eq('file_url', fileUrl);
return;
}
const checkpoint = this.inMemory.get(fileUrl);
if (checkpoint) {
checkpoint.status = 'failed';
checkpoint.errorMessage = errorMessage;
checkpoint.retryCount++;
}
}
async isProcessed(fileUrl: string): Promise<boolean> {
const client = this.getClient();
if (client && !this.useInMemory) {
const { data } = await client
.from('file_checkpoints')
.select('status')
.eq('file_url', fileUrl)
.single();
return data?.status === 'completed';
}
return this.inMemory.get(fileUrl)?.status === 'completed';
}
getWorkerId(): string {
return this.workerId;
}
}typescript
interface FileCheckpoint {
fileUrl: string;
fileType: string;
fileTimestamp: Date;
status: 'pending' | 'processing' | 'completed' | 'failed';
recordsTotal: number;
recordsFiltered: number;
recordsPersisted: number;
processingTimeMs: number;
errorMessage?: string;
retryCount: number;
processedBy?: string;
}
interface ProcessingStats {
totalRows: number;
filteredRows: number;
persistedRows: number;
durationMs: number;
}
class CheckpointManager {
private workerId: string;
private inMemory = new Map<string, FileCheckpoint>();
private useInMemory = false;
constructor(
private getClient: () => DatabaseClient | null,
workerId?: string
) {
this.workerId = workerId ||
`worker_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
}
async claimFile(
fileUrl: string,
fileType: string,
fileTimestamp: Date
): Promise<boolean> {
const client = this.getClient();
if (client) {
try {
const result = await client.rpc('claim_file', {
p_file_url: fileUrl,
p_file_type: fileType,
p_file_timestamp: fileTimestamp.toISOString(),
p_worker_id: this.workerId,
});
if (!result.error) return result.data === true;
} catch (e) {
console.warn('Database unavailable, using in-memory');
}
}
// Fallback to in-memory (single-worker mode)
this.useInMemory = true;
if (this.inMemory.has(fileUrl)) {
const existing = this.inMemory.get(fileUrl)!;
if (existing.status !== 'failed' || existing.retryCount >= 3) {
return false;
}
}
this.inMemory.set(fileUrl, {
fileUrl,
fileType,
fileTimestamp,
status: 'processing',
recordsTotal: 0,
recordsFiltered: 0,
recordsPersisted: 0,
processingTimeMs: 0,
retryCount: 0,
processedBy: this.workerId,
});
return true;
}
async completeFile(fileUrl: string, stats: ProcessingStats): Promise<void> {
const client = this.getClient();
if (client && !this.useInMemory) {
await client.rpc('complete_file', {
p_file_url: fileUrl,
p_records_total: stats.totalRows,
p_records_filtered: stats.filteredRows,
p_records_persisted: stats.persistedRows,
p_processing_time_ms: stats.durationMs,
});
return;
}
const checkpoint = this.inMemory.get(fileUrl);
if (checkpoint) {
checkpoint.status = 'completed';
checkpoint.recordsTotal = stats.totalRows;
checkpoint.recordsFiltered = stats.filteredRows;
checkpoint.recordsPersisted = stats.persistedRows;
checkpoint.processingTimeMs = stats.durationMs;
}
}
async failFile(fileUrl: string, errorMessage: string): Promise<void> {
const client = this.getClient();
if (client && !this.useInMemory) {
await client
.from('file_checkpoints')
.update({ status: 'failed', error_message: errorMessage })
.eq('file_url', fileUrl);
return;
}
const checkpoint = this.inMemory.get(fileUrl);
if (checkpoint) {
checkpoint.status = 'failed';
checkpoint.errorMessage = errorMessage;
checkpoint.retryCount++;
}
}
async isProcessed(fileUrl: string): Promise<boolean> {
const client = this.getClient();
if (client && !this.useInMemory) {
const { data } = await client
.from('file_checkpoints')
.select('status')
.eq('file_url', fileUrl)
.single();
return data?.status === 'completed';
}
return this.inMemory.get(fileUrl)?.status === 'completed';
}
getWorkerId(): string {
return this.workerId;
}
}Usage Examples
使用示例
Processing Files
文件处理
typescript
const checkpoint = new CheckpointManager(getDbClient);
async function processFiles(fileUrls: string[]) {
for (const url of fileUrls) {
// Try to claim
const claimed = await checkpoint.claimFile(url, 'events', new Date());
if (!claimed) {
console.log(`Skipping ${url} - already claimed`);
continue;
}
const startTime = Date.now();
try {
const result = await processFile(url);
await checkpoint.completeFile(url, {
totalRows: result.total,
filteredRows: result.filtered,
persistedRows: result.persisted,
durationMs: Date.now() - startTime,
});
} catch (error) {
await checkpoint.failFile(url, error.message);
}
}
}typescript
const checkpoint = new CheckpointManager(getDbClient);
async function processFiles(fileUrls: string[]) {
for (const url of fileUrls) {
// Try to claim
const claimed = await checkpoint.claimFile(url, 'events', new Date());
if (!claimed) {
console.log(`Skipping ${url} - already claimed`);
continue;
}
const startTime = Date.now();
try {
const result = await processFile(url);
await checkpoint.completeFile(url, {
totalRows: result.total,
filteredRows: result.filtered,
persistedRows: result.persisted,
durationMs: Date.now() - startTime,
});
} catch (error) {
await checkpoint.failFile(url, error.message);
}
}
}Best Practices
最佳实践
- Use database functions for atomic claims - prevents race conditions
- Always have in-memory fallback for dev/testing
- Track retry count to prevent infinite loops (max 3 retries)
- Include processing stats for observability
- Generate unique worker IDs to track which worker processed what
- 使用数据库函数实现原子性认领 - 避免竞争条件
- 始终保留内存回退以用于开发/测试
- 追踪重试次数以防止无限循环(最多3次重试)
- 记录处理统计数据以提升可观测性
- 生成唯一工作节点ID以追踪哪个节点处理了哪些文件
Common Mistakes
常见错误
- Not using atomic operations for claiming (race conditions)
- No retry limit (infinite retry loops)
- Forgetting in-memory fallback (breaks local development)
- Not tracking processing statistics (can't debug issues)
- Using file path instead of URL as key (path changes between environments)
- 未使用原子操作进行文件认领(会导致竞争条件)
- 未设置重试限制(无限重试循环)
- 遗漏内存回退(破坏本地开发体验)
- 未追踪处理统计数据(无法调试问题)
- 使用文件路径而非URL作为键(路径在不同环境中会变化)
Related Patterns
相关模式
- batch-processing - Batch database operations
- dead-letter-queue - Handle permanently failed files
- distributed-lock - Coordinate between workers
- batch-processing - 批量数据库操作
- dead-letter-queue - 处理永久失败的文件
- distributed-lock - 工作节点间的协调