batch-processing

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Batch Processing

批量处理

30-40% throughput improvement by batching database operations with graceful fallback.
通过批量数据库操作并搭配优雅降级方案,可将吞吐量提升30-40%。

When to Use This Skill

何时使用该方案

  • Processing multiple related records (invoices, orders, events)
  • Network latency is significant (cloud databases)
  • Writes are independent (no inter-record dependencies)
  • You can implement fallback for reliability
  • 处理多条相关记录(发票、订单、事件)
  • 网络延迟显著时(如云数据库)
  • 写入操作相互独立(记录间无依赖)
  • 可实现降级方案以保障可靠性

Core Concepts

核心概念

Sequential processing is slow because each item requires multiple DB round trips. The solution is to collect all data first, then execute batch operations:
Sequential (slow):
Item 1 → DB → DB → DB
Item 2 → DB → DB → DB
Item 3 → DB → DB → DB

Batched (fast):
Item 1 → collect
Item 2 → collect
Item 3 → collect
All items → BATCH INSERT
Key insight: Sequential mapping (fuzzy matching needs context), but batched writes (independent operations).
顺序处理速度较慢,因为每个条目都需要多次数据库往返。解决方案是先收集所有数据,再执行批量操作:
Sequential (slow):
Item 1 → DB → DB → DB
Item 2 → DB → DB → DB
Item 3 → DB → DB → DB

Batched (fast):
Item 1 → collect
Item 2 → collect
Item 3 → collect
All items → BATCH INSERT
关键思路:顺序映射(模糊匹配需要上下文),但批量写入(操作相互独立)。

Implementation

实现方案

Python

Python

python
from decimal import Decimal
from typing import Dict, List
import time

class BatchProcessor:
    """
    Batch-optimized processor with fallback
    """
    
    def process_batch(self, items: List[Dict], user_id: str) -> Dict:
        start_time = time.perf_counter()
        
        # Collectors for batch operations
        transactions_to_create = []
        inventory_updates = {}
        failed_items = []
        items_processed = 0
        
        # Step 1: Process mappings sequentially (context-dependent)
        for idx, item in enumerate(items, 1):
            try:
                # Business logic that needs context
                mapping = self.find_or_create_mapping(item)
                
                # Collect for batch insert
                transactions_to_create.append({
                    "user_id": user_id,
                    "item_id": mapping['item_id'],
                    "quantity": float(item['quantity']),
                    "unit_cost": float(item['unit_price']),
                })
                
                # Aggregate inventory updates by item
                item_id = mapping['item_id']
                if item_id not in inventory_updates:
                    inventory_updates[item_id] = Decimal('0')
                inventory_updates[item_id] += Decimal(str(item['quantity']))
                
                items_processed += 1
                
            except Exception as e:
                failed_items.append({
                    "line": idx,
                    "error": str(e)
                })
                continue
        
        # Step 2: BATCH INSERT transactions
        if transactions_to_create:
            try:
                self.client.table("transactions").insert(
                    transactions_to_create
                ).execute()
            except Exception as e:
                # CRITICAL: Fallback to sequential on batch failure
                return self._fallback_to_sequential(items, user_id)
        
        # Step 3: BATCH UPDATE inventory (aggregate first)
        if inventory_updates:
            self._batch_update_inventory(inventory_updates)
        
        return {
            "status": "partial_success" if failed_items else "success",
            "items_processed": items_processed,
            "items_failed": len(failed_items),
            "failed_items": failed_items or None,
            "processing_time_seconds": round(time.perf_counter() - start_time, 2)
        }
    
    def _batch_update_inventory(self, updates: Dict[str, Decimal]):
        """Batch query, individual updates (Supabase limitation)"""
        item_ids = list(updates.keys())
        
        # Get current quantities in one query
        current = self.client.table("inventory").select(
            "id, quantity"
        ).in_("id", item_ids).execute()
        
        # Apply updates
        for item in current.data:
            item_id = item['id']
            new_qty = Decimal(str(item['quantity'])) + updates[item_id]
            self.client.table("inventory").update({
                "quantity": float(new_qty)
            }).eq("id", item_id).execute()
    
    def _fallback_to_sequential(self, items: List[Dict], user_id: str) -> Dict:
        """Fallback ensures data integrity when batch fails"""
        logger.warning("Falling back to sequential processing")
        # Process one at a time
        for item in items:
            self.process_single(item, user_id)
python
from decimal import Decimal
from typing import Dict, List
import time

class BatchProcessor:
    """
    Batch-optimized processor with fallback
    """
    
    def process_batch(self, items: List[Dict], user_id: str) -> Dict:
        start_time = time.perf_counter()
        
        # Collectors for batch operations
        transactions_to_create = []
        inventory_updates = {}
        failed_items = []
        items_processed = 0
        
        # Step 1: Process mappings sequentially (context-dependent)
        for idx, item in enumerate(items, 1):
            try:
                # Business logic that needs context
                mapping = self.find_or_create_mapping(item)
                
                # Collect for batch insert
                transactions_to_create.append({
                    "user_id": user_id,
                    "item_id": mapping['item_id'],
                    "quantity": float(item['quantity']),
                    "unit_cost": float(item['unit_price']),
                })
                
                # Aggregate inventory updates by item
                item_id = mapping['item_id']
                if item_id not in inventory_updates:
                    inventory_updates[item_id] = Decimal('0')
                inventory_updates[item_id] += Decimal(str(item['quantity']))
                
                items_processed += 1
                
            except Exception as e:
                failed_items.append({
                    "line": idx,
                    "error": str(e)
                })
                continue
        
        # Step 2: BATCH INSERT transactions
        if transactions_to_create:
            try:
                self.client.table("transactions").insert(
                    transactions_to_create
                ).execute()
            except Exception as e:
                # CRITICAL: Fallback to sequential on batch failure
                return self._fallback_to_sequential(items, user_id)
        
        # Step 3: BATCH UPDATE inventory (aggregate first)
        if inventory_updates:
            self._batch_update_inventory(inventory_updates)
        
        return {
            "status": "partial_success" if failed_items else "success",
            "items_processed": items_processed,
            "items_failed": len(failed_items),
            "failed_items": failed_items or None,
            "processing_time_seconds": round(time.perf_counter() - start_time, 2)
        }
    
    def _batch_update_inventory(self, updates: Dict[str, Decimal]):
        """Batch query, individual updates (Supabase limitation)"""
        item_ids = list(updates.keys())
        
        # Get current quantities in one query
        current = self.client.table("inventory").select(
            "id, quantity"
        ).in_("id", item_ids).execute()
        
        # Apply updates
        for item in current.data:
            item_id = item['id']
            new_qty = Decimal(str(item['quantity'])) + updates[item_id]
            self.client.table("inventory").update({
                "quantity": float(new_qty)
            }).eq("id", item_id).execute()
    
    def _fallback_to_sequential(self, items: List[Dict], user_id: str) -> Dict:
        """Fallback ensures data integrity when batch fails"""
        logger.warning("Falling back to sequential processing")
        # Process one at a time
        for item in items:
            self.process_single(item, user_id)

TypeScript

TypeScript

typescript
interface BatchResult {
  status: 'success' | 'partial_success' | 'failed';
  itemsProcessed: number;
  itemsFailed: number;
  failedItems?: { line: number; error: string }[];
  processingTimeMs: number;
}

class BatchProcessor {
  async processBatch(items: Item[], userId: string): Promise<BatchResult> {
    const startTime = Date.now();
    
    const transactionsToCreate: Transaction[] = [];
    const inventoryUpdates = new Map<string, number>();
    const failedItems: { line: number; error: string }[] = [];
    let itemsProcessed = 0;
    
    // Step 1: Process mappings sequentially
    for (let idx = 0; idx < items.length; idx++) {
      try {
        const mapping = await this.findOrCreateMapping(items[idx]);
        
        transactionsToCreate.push({
          userId,
          itemId: mapping.itemId,
          quantity: items[idx].quantity,
          unitCost: items[idx].unitPrice,
        });
        
        // Aggregate updates
        const current = inventoryUpdates.get(mapping.itemId) || 0;
        inventoryUpdates.set(mapping.itemId, current + items[idx].quantity);
        
        itemsProcessed++;
      } catch (error) {
        failedItems.push({ line: idx + 1, error: error.message });
      }
    }
    
    // Step 2: Batch insert
    if (transactionsToCreate.length > 0) {
      try {
        await this.db.transactions.insertMany(transactionsToCreate);
      } catch (error) {
        return this.fallbackToSequential(items, userId);
      }
    }
    
    // Step 3: Batch update inventory
    await this.batchUpdateInventory(inventoryUpdates);
    
    return {
      status: failedItems.length > 0 ? 'partial_success' : 'success',
      itemsProcessed,
      itemsFailed: failedItems.length,
      failedItems: failedItems.length > 0 ? failedItems : undefined,
      processingTimeMs: Date.now() - startTime,
    };
  }
}
typescript
interface BatchResult {
  status: 'success' | 'partial_success' | 'failed';
  itemsProcessed: number;
  itemsFailed: number;
  failedItems?: { line: number; error: string }[];
  processingTimeMs: number;
}

class BatchProcessor {
  async processBatch(items: Item[], userId: string): Promise<BatchResult> {
    const startTime = Date.now();
    
    const transactionsToCreate: Transaction[] = [];
    const inventoryUpdates = new Map<string, number>();
    const failedItems: { line: number; error: string }[] = [];
    let itemsProcessed = 0;
    
    // Step 1: Process mappings sequentially
    for (let idx = 0; idx < items.length; idx++) {
      try {
        const mapping = await this.findOrCreateMapping(items[idx]);
        
        transactionsToCreate.push({
          userId,
          itemId: mapping.itemId,
          quantity: items[idx].quantity,
          unitCost: items[idx].unitPrice,
        });
        
        // Aggregate updates
        const current = inventoryUpdates.get(mapping.itemId) || 0;
        inventoryUpdates.set(mapping.itemId, current + items[idx].quantity);
        
        itemsProcessed++;
      } catch (error) {
        failedItems.push({ line: idx + 1, error: error.message });
      }
    }
    
    // Step 2: Batch insert
    if (transactionsToCreate.length > 0) {
      try {
        await this.db.transactions.insertMany(transactionsToCreate);
      } catch (error) {
        return this.fallbackToSequential(items, userId);
      }
    }
    
    // Step 3: Batch update inventory
    await this.batchUpdateInventory(inventoryUpdates);
    
    return {
      status: failedItems.length > 0 ? 'partial_success' : 'success',
      itemsProcessed,
      itemsFailed: failedItems.length,
      failedItems: failedItems.length > 0 ? failedItems : undefined,
      processingTimeMs: Date.now() - startTime,
    };
  }
}

Usage Examples

使用示例

Invoice Processing

发票处理

python
processor = BatchProcessor()

result = processor.process_batch(
    items=invoice_data['line_items'],
    user_id=user_id
)

if result['status'] == 'partial_success':
    logger.warning(f"Some items failed: {result['failed_items']}")
python
processor = BatchProcessor()

result = processor.process_batch(
    items=invoice_data['line_items'],
    user_id=user_id
)

if result['status'] == 'partial_success':
    logger.warning(f"Some items failed: {result['failed_items']}")

Best Practices

最佳实践

  1. Sequential mapping, batched writes - fuzzy matching needs context, writes don't
  2. Always implement fallback - batch operations can fail, sequential is reliable
  3. Aggregate before update - combine multiple updates to same record
  4. Handle partial success - one bad item shouldn't fail the entire batch
  5. Chunk large batches - 500 records max to avoid timeouts
  1. 顺序映射,批量写入——模糊匹配需要上下文,而写入操作不需要
  2. 始终实现降级方案——批量操作可能失败,顺序处理更可靠
  3. 更新前先聚合——合并对同一条记录的多次更新
  4. 处理部分成功情况——单个错误条目不应导致整个批量操作失败
  5. 拆分大型批量——最多500条记录,避免超时

Common Mistakes

常见误区

  • Batching operations that depend on each other's results
  • No fallback when batch operations fail
  • Not aggregating updates to the same record
  • Collecting too many records before writing (memory pressure)
  • Not logging individual items when batch fails (lose context)
  • 对彼此依赖结果的操作进行批量处理
  • 批量操作失败时未设置降级方案
  • 未聚合对同一条记录的更新
  • 写入前收集过多记录(造成内存压力)
  • 批量失败时未记录单个条目信息(丢失上下文)

Related Patterns

相关模式

  • checkpoint-resume - Resume processing after failures
  • idempotency - Prevent duplicate processing on retry
  • dead-letter-queue - Handle failed items
  • 检查点恢复——失败后恢复处理
  • 幂等性——重试时避免重复处理
  • 死信队列——处理失败条目