checkpoint-resume

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Checkpoint & Resume Processing

检查点与恢复处理

Exactly-once processing semantics with distributed coordination for file-based data pipelines.
面向基于文件的数据管道的精确一次处理语义与分布式协调。

When to Use This Skill

何时使用该技能

  • Processing large file batches across multiple workers
  • Need to resume after worker crashes
  • Preventing duplicate processing of the same file
  • Tracking processing progress and statistics
  • 在多个工作节点间处理大型文件批次
  • 需要在工作节点崩溃后恢复处理
  • 防止同一文件被重复处理
  • 追踪处理进度与统计数据

Core Concepts

核心概念

The pattern provides:
  • Atomic file claiming (only one worker processes each file)
  • Status tracking (pending, processing, completed, failed)
  • Automatic retry with configurable limits
  • In-memory fallback for development/testing
┌──────────┐     ┌─────────────────┐     ┌──────────┐
│ Worker 1 │────▶│  Checkpoint DB  │◀────│ Worker 2 │
└──────────┘     └─────────────────┘     └──────────┘
     │                   │                    │
     ▼                   ▼                    ▼
  claim_file()     atomic claims        claim_file()
  process()        status tracking      process()
  complete()       retry logic          complete()
该模式提供:
  • 原子性文件认领(每个文件仅由一个工作节点处理)
  • 状态追踪(待处理、处理中、已完成、失败)
  • 可配置限制的自动重试
  • 用于开发/测试的内存回退机制
┌──────────┐     ┌─────────────────┐     ┌──────────┐
│ Worker 1 │────▶│  Checkpoint DB  │◀────│ Worker 2 │
└──────────┘     └─────────────────┘     └──────────┘
     │                   │                    │
     ▼                   ▼                    ▼
  claim_file()     atomic claims        claim_file()
  process()        status tracking      process()
  complete()       retry logic          complete()

Implementation

实现方案

Database Schema (PostgreSQL/Supabase)

数据库模式(PostgreSQL/Supabase)

sql
CREATE TABLE file_checkpoints (
  file_url TEXT PRIMARY KEY,
  file_type TEXT NOT NULL,
  file_timestamp TIMESTAMPTZ NOT NULL,
  status TEXT NOT NULL DEFAULT 'pending',
  records_total INTEGER DEFAULT 0,
  records_filtered INTEGER DEFAULT 0,
  records_persisted INTEGER DEFAULT 0,
  processing_time_ms INTEGER DEFAULT 0,
  error_message TEXT,
  retry_count INTEGER DEFAULT 0,
  processed_by TEXT,
  started_at TIMESTAMPTZ,
  completed_at TIMESTAMPTZ,
  created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_checkpoints_status ON file_checkpoints(status);

-- Atomic claim function
CREATE OR REPLACE FUNCTION claim_file(
  p_file_url TEXT,
  p_file_type TEXT,
  p_file_timestamp TIMESTAMPTZ,
  p_worker_id TEXT
) RETURNS BOOLEAN AS $$
DECLARE
  v_claimed BOOLEAN := FALSE;
BEGIN
  -- Try to insert new record
  INSERT INTO file_checkpoints (file_url, file_type, file_timestamp, status, processed_by, started_at)
  VALUES (p_file_url, p_file_type, p_file_timestamp, 'processing', p_worker_id, NOW())
  ON CONFLICT (file_url) DO NOTHING;
  
  IF FOUND THEN
    v_claimed := TRUE;
  ELSE
    -- Check if we can retry a failed file
    UPDATE file_checkpoints
    SET status = 'processing', processed_by = p_worker_id, started_at = NOW(), retry_count = retry_count + 1
    WHERE file_url = p_file_url AND status = 'failed' AND retry_count < 3;
    
    v_claimed := FOUND;
  END IF;
  
  RETURN v_claimed;
END;
$$ LANGUAGE plpgsql;
sql
CREATE TABLE file_checkpoints (
  file_url TEXT PRIMARY KEY,
  file_type TEXT NOT NULL,
  file_timestamp TIMESTAMPTZ NOT NULL,
  status TEXT NOT NULL DEFAULT 'pending',
  records_total INTEGER DEFAULT 0,
  records_filtered INTEGER DEFAULT 0,
  records_persisted INTEGER DEFAULT 0,
  processing_time_ms INTEGER DEFAULT 0,
  error_message TEXT,
  retry_count INTEGER DEFAULT 0,
  processed_by TEXT,
  started_at TIMESTAMPTZ,
  completed_at TIMESTAMPTZ,
  created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_checkpoints_status ON file_checkpoints(status);

-- Atomic claim function
CREATE OR REPLACE FUNCTION claim_file(
  p_file_url TEXT,
  p_file_type TEXT,
  p_file_timestamp TIMESTAMPTZ,
  p_worker_id TEXT
) RETURNS BOOLEAN AS $$
DECLARE
  v_claimed BOOLEAN := FALSE;
BEGIN
  -- Try to insert new record
  INSERT INTO file_checkpoints (file_url, file_type, file_timestamp, status, processed_by, started_at)
  VALUES (p_file_url, p_file_type, p_file_timestamp, 'processing', p_worker_id, NOW())
  ON CONFLICT (file_url) DO NOTHING;
  
  IF FOUND THEN
    v_claimed := TRUE;
  ELSE
    -- Check if we can retry a failed file
    UPDATE file_checkpoints
    SET status = 'processing', processed_by = p_worker_id, started_at = NOW(), retry_count = retry_count + 1
    WHERE file_url = p_file_url AND status = 'failed' AND retry_count < 3;
    
    v_claimed := FOUND;
  END IF;
  
  RETURN v_claimed;
END;
$$ LANGUAGE plpgsql;

TypeScript

TypeScript

typescript
interface FileCheckpoint {
  fileUrl: string;
  fileType: string;
  fileTimestamp: Date;
  status: 'pending' | 'processing' | 'completed' | 'failed';
  recordsTotal: number;
  recordsFiltered: number;
  recordsPersisted: number;
  processingTimeMs: number;
  errorMessage?: string;
  retryCount: number;
  processedBy?: string;
}

interface ProcessingStats {
  totalRows: number;
  filteredRows: number;
  persistedRows: number;
  durationMs: number;
}

class CheckpointManager {
  private workerId: string;
  private inMemory = new Map<string, FileCheckpoint>();
  private useInMemory = false;

  constructor(
    private getClient: () => DatabaseClient | null,
    workerId?: string
  ) {
    this.workerId = workerId || 
      `worker_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
  }

  async claimFile(
    fileUrl: string, 
    fileType: string, 
    fileTimestamp: Date
  ): Promise<boolean> {
    const client = this.getClient();
    
    if (client) {
      try {
        const result = await client.rpc('claim_file', {
          p_file_url: fileUrl,
          p_file_type: fileType,
          p_file_timestamp: fileTimestamp.toISOString(),
          p_worker_id: this.workerId,
        });
        
        if (!result.error) return result.data === true;
      } catch (e) {
        console.warn('Database unavailable, using in-memory');
      }
    }

    // Fallback to in-memory (single-worker mode)
    this.useInMemory = true;
    
    if (this.inMemory.has(fileUrl)) {
      const existing = this.inMemory.get(fileUrl)!;
      if (existing.status !== 'failed' || existing.retryCount >= 3) {
        return false;
      }
    }

    this.inMemory.set(fileUrl, {
      fileUrl,
      fileType,
      fileTimestamp,
      status: 'processing',
      recordsTotal: 0,
      recordsFiltered: 0,
      recordsPersisted: 0,
      processingTimeMs: 0,
      retryCount: 0,
      processedBy: this.workerId,
    });

    return true;
  }

  async completeFile(fileUrl: string, stats: ProcessingStats): Promise<void> {
    const client = this.getClient();
    
    if (client && !this.useInMemory) {
      await client.rpc('complete_file', {
        p_file_url: fileUrl,
        p_records_total: stats.totalRows,
        p_records_filtered: stats.filteredRows,
        p_records_persisted: stats.persistedRows,
        p_processing_time_ms: stats.durationMs,
      });
      return;
    }

    const checkpoint = this.inMemory.get(fileUrl);
    if (checkpoint) {
      checkpoint.status = 'completed';
      checkpoint.recordsTotal = stats.totalRows;
      checkpoint.recordsFiltered = stats.filteredRows;
      checkpoint.recordsPersisted = stats.persistedRows;
      checkpoint.processingTimeMs = stats.durationMs;
    }
  }

  async failFile(fileUrl: string, errorMessage: string): Promise<void> {
    const client = this.getClient();
    
    if (client && !this.useInMemory) {
      await client
        .from('file_checkpoints')
        .update({ status: 'failed', error_message: errorMessage })
        .eq('file_url', fileUrl);
      return;
    }

    const checkpoint = this.inMemory.get(fileUrl);
    if (checkpoint) {
      checkpoint.status = 'failed';
      checkpoint.errorMessage = errorMessage;
      checkpoint.retryCount++;
    }
  }

  async isProcessed(fileUrl: string): Promise<boolean> {
    const client = this.getClient();
    
    if (client && !this.useInMemory) {
      const { data } = await client
        .from('file_checkpoints')
        .select('status')
        .eq('file_url', fileUrl)
        .single();
      
      return data?.status === 'completed';
    }

    return this.inMemory.get(fileUrl)?.status === 'completed';
  }

  getWorkerId(): string {
    return this.workerId;
  }
}
typescript
interface FileCheckpoint {
  fileUrl: string;
  fileType: string;
  fileTimestamp: Date;
  status: 'pending' | 'processing' | 'completed' | 'failed';
  recordsTotal: number;
  recordsFiltered: number;
  recordsPersisted: number;
  processingTimeMs: number;
  errorMessage?: string;
  retryCount: number;
  processedBy?: string;
}

interface ProcessingStats {
  totalRows: number;
  filteredRows: number;
  persistedRows: number;
  durationMs: number;
}

class CheckpointManager {
  private workerId: string;
  private inMemory = new Map<string, FileCheckpoint>();
  private useInMemory = false;

  constructor(
    private getClient: () => DatabaseClient | null,
    workerId?: string
  ) {
    this.workerId = workerId || 
      `worker_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
  }

  async claimFile(
    fileUrl: string, 
    fileType: string, 
    fileTimestamp: Date
  ): Promise<boolean> {
    const client = this.getClient();
    
    if (client) {
      try {
        const result = await client.rpc('claim_file', {
          p_file_url: fileUrl,
          p_file_type: fileType,
          p_file_timestamp: fileTimestamp.toISOString(),
          p_worker_id: this.workerId,
        });
        
        if (!result.error) return result.data === true;
      } catch (e) {
        console.warn('Database unavailable, using in-memory');
      }
    }

    // Fallback to in-memory (single-worker mode)
    this.useInMemory = true;
    
    if (this.inMemory.has(fileUrl)) {
      const existing = this.inMemory.get(fileUrl)!;
      if (existing.status !== 'failed' || existing.retryCount >= 3) {
        return false;
      }
    }

    this.inMemory.set(fileUrl, {
      fileUrl,
      fileType,
      fileTimestamp,
      status: 'processing',
      recordsTotal: 0,
      recordsFiltered: 0,
      recordsPersisted: 0,
      processingTimeMs: 0,
      retryCount: 0,
      processedBy: this.workerId,
    });

    return true;
  }

  async completeFile(fileUrl: string, stats: ProcessingStats): Promise<void> {
    const client = this.getClient();
    
    if (client && !this.useInMemory) {
      await client.rpc('complete_file', {
        p_file_url: fileUrl,
        p_records_total: stats.totalRows,
        p_records_filtered: stats.filteredRows,
        p_records_persisted: stats.persistedRows,
        p_processing_time_ms: stats.durationMs,
      });
      return;
    }

    const checkpoint = this.inMemory.get(fileUrl);
    if (checkpoint) {
      checkpoint.status = 'completed';
      checkpoint.recordsTotal = stats.totalRows;
      checkpoint.recordsFiltered = stats.filteredRows;
      checkpoint.recordsPersisted = stats.persistedRows;
      checkpoint.processingTimeMs = stats.durationMs;
    }
  }

  async failFile(fileUrl: string, errorMessage: string): Promise<void> {
    const client = this.getClient();
    
    if (client && !this.useInMemory) {
      await client
        .from('file_checkpoints')
        .update({ status: 'failed', error_message: errorMessage })
        .eq('file_url', fileUrl);
      return;
    }

    const checkpoint = this.inMemory.get(fileUrl);
    if (checkpoint) {
      checkpoint.status = 'failed';
      checkpoint.errorMessage = errorMessage;
      checkpoint.retryCount++;
    }
  }

  async isProcessed(fileUrl: string): Promise<boolean> {
    const client = this.getClient();
    
    if (client && !this.useInMemory) {
      const { data } = await client
        .from('file_checkpoints')
        .select('status')
        .eq('file_url', fileUrl)
        .single();
      
      return data?.status === 'completed';
    }

    return this.inMemory.get(fileUrl)?.status === 'completed';
  }

  getWorkerId(): string {
    return this.workerId;
  }
}

Usage Examples

使用示例

Processing Files

文件处理

typescript
const checkpoint = new CheckpointManager(getDbClient);

async function processFiles(fileUrls: string[]) {
  for (const url of fileUrls) {
    // Try to claim
    const claimed = await checkpoint.claimFile(url, 'events', new Date());
    if (!claimed) {
      console.log(`Skipping ${url} - already claimed`);
      continue;
    }

    const startTime = Date.now();
    
    try {
      const result = await processFile(url);
      
      await checkpoint.completeFile(url, {
        totalRows: result.total,
        filteredRows: result.filtered,
        persistedRows: result.persisted,
        durationMs: Date.now() - startTime,
      });
      
    } catch (error) {
      await checkpoint.failFile(url, error.message);
    }
  }
}
typescript
const checkpoint = new CheckpointManager(getDbClient);

async function processFiles(fileUrls: string[]) {
  for (const url of fileUrls) {
    // Try to claim
    const claimed = await checkpoint.claimFile(url, 'events', new Date());
    if (!claimed) {
      console.log(`Skipping ${url} - already claimed`);
      continue;
    }

    const startTime = Date.now();
    
    try {
      const result = await processFile(url);
      
      await checkpoint.completeFile(url, {
        totalRows: result.total,
        filteredRows: result.filtered,
        persistedRows: result.persisted,
        durationMs: Date.now() - startTime,
      });
      
    } catch (error) {
      await checkpoint.failFile(url, error.message);
    }
  }
}

Best Practices

最佳实践

  1. Use database functions for atomic claims - prevents race conditions
  2. Always have in-memory fallback for dev/testing
  3. Track retry count to prevent infinite loops (max 3 retries)
  4. Include processing stats for observability
  5. Generate unique worker IDs to track which worker processed what
  1. 使用数据库函数实现原子性认领 - 避免竞争条件
  2. 始终保留内存回退以用于开发/测试
  3. 追踪重试次数以防止无限循环(最多3次重试)
  4. 记录处理统计数据以提升可观测性
  5. 生成唯一工作节点ID以追踪哪个节点处理了哪些文件

Common Mistakes

常见错误

  • Not using atomic operations for claiming (race conditions)
  • No retry limit (infinite retry loops)
  • Forgetting in-memory fallback (breaks local development)
  • Not tracking processing statistics (can't debug issues)
  • Using file path instead of URL as key (path changes between environments)
  • 未使用原子操作进行文件认领(会导致竞争条件)
  • 未设置重试限制(无限重试循环)
  • 遗漏内存回退(破坏本地开发体验)
  • 未追踪处理统计数据(无法调试问题)
  • 使用文件路径而非URL作为键(路径在不同环境中会变化)

Related Patterns

相关模式

  • batch-processing - Batch database operations
  • dead-letter-queue - Handle permanently failed files
  • distributed-lock - Coordinate between workers
  • batch-processing - 批量数据库操作
  • dead-letter-queue - 处理永久失败的文件
  • distributed-lock - 工作节点间的协调