batch-processing-jobs

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Batch Processing Jobs

批处理任务

Overview

概述

Implement scalable batch processing systems for handling large-scale data processing, scheduled tasks, and async operations efficiently.
实现可扩展的批处理系统,高效处理大规模数据处理、定时任务和异步操作。

When to Use

适用场景

  • Processing large datasets
  • Scheduled report generation
  • Email/notification campaigns
  • Data imports and exports
  • Image/video processing
  • ETL pipelines
  • Cleanup and maintenance tasks
  • Long-running computations
  • Bulk data updates
  • 处理大型数据集
  • 定时生成报告
  • 邮件/通知推送活动
  • 数据导入导出
  • 图片/视频处理
  • ETL管道
  • 清理和维护任务
  • 长时间运行的计算
  • 批量数据更新

Architecture Patterns

架构模式

┌─────────────┐      ┌─────────────┐      ┌──────────┐
│  Producer   │─────▶│    Queue    │─────▶│  Worker  │
└─────────────┘      └─────────────┘      └──────────┘
                           │                     │
                           │                     ▼
                           │              ┌──────────┐
                           └─────────────▶│  Result  │
                                         │  Storage │
                                         └──────────┘
┌─────────────┐      ┌─────────────┐      ┌──────────┐
│  Producer   │─────▶│    Queue    │─────▶│  Worker  │
└─────────────┘      └─────────────┘      └──────────┘
                           │                     │
                           │                     ▼
                           │              ┌──────────┐
                           └─────────────▶│  Result  │
                                         │  Storage │
                                         └──────────┘

Implementation Examples

实现示例

1. Bull Queue (Node.js)

1. Bull Queue(Node.js)

typescript
import Queue from 'bull';
import { v4 as uuidv4 } from 'uuid';

interface JobData {
  id: string;
  type: string;
  payload: any;
  userId?: string;
  metadata?: Record<string, any>;
}

interface JobResult {
  success: boolean;
  data?: any;
  error?: string;
  processedAt: number;
  duration: number;
}

class BatchProcessor {
  private queue: Queue.Queue<JobData>;
  private resultQueue: Queue.Queue<JobResult>;

  constructor(redisUrl: string) {
    // Main processing queue
    this.queue = new Queue('batch-jobs', redisUrl, {
      defaultJobOptions: {
        attempts: 3,
        backoff: {
          type: 'exponential',
          delay: 2000
        },
        removeOnComplete: 1000,
        removeOnFail: 5000,
        timeout: 300000 // 5 minutes
      },
      settings: {
        maxStalledCount: 2,
        stalledInterval: 30000
      }
    });

    // Results queue
    this.resultQueue = new Queue('batch-results', redisUrl);

    this.setupProcessors();
    this.setupEvents();
  }

  private setupProcessors(): void {
    // Data processing job
    this.queue.process('process-data', 10, async (job) => {
      const startTime = Date.now();
      const { payload } = job.data;

      job.log(`Processing ${payload.items?.length || 0} items`);

      try {
        // Update progress
        await job.progress(0);

        const results = await this.processDataBatch(
          payload.items,
          (progress) => job.progress(progress)
        );

        const duration = Date.now() - startTime;

        return {
          success: true,
          data: results,
          processedAt: Date.now(),
          duration
        };
      } catch (error: any) {
        const duration = Date.now() - startTime;
        throw new Error(`Processing failed: ${error.message}`);
      }
    });

    // Report generation job
    this.queue.process('generate-report', 2, async (job) => {
      const { payload } = job.data;

      const report = await this.generateReport(
        payload.type,
        payload.filters,
        payload.format
      );

      return {
        success: true,
        data: {
          reportId: uuidv4(),
          url: report.url,
          size: report.size
        },
        processedAt: Date.now(),
        duration: 0
      };
    });

    // Email batch job
    this.queue.process('send-emails', 5, async (job) => {
      const { payload } = job.data;
      const { recipients, template, data } = payload;

      const results = await this.sendEmailBatch(
        recipients,
        template,
        data
      );

      return {
        success: true,
        data: {
          sent: results.successful,
          failed: results.failed
        },
        processedAt: Date.now(),
        duration: 0
      };
    });
  }

  private setupEvents(): void {
    this.queue.on('completed', (job, result) => {
      console.log(`Job ${job.id} completed:`, result);

      // Store result
      this.resultQueue.add({
        jobId: job.id,
        ...result
      });
    });

    this.queue.on('failed', (job, error) => {
      console.error(`Job ${job?.id} failed:`, error.message);

      // Store failure
      this.resultQueue.add({
        jobId: job?.id,
        success: false,
        error: error.message,
        processedAt: Date.now(),
        duration: 0
      });
    });

    this.queue.on('progress', (job, progress) => {
      console.log(`Job ${job.id} progress: ${progress}%`);
    });

    this.queue.on('stalled', (job) => {
      console.warn(`Job ${job.id} stalled`);
    });
  }

  async addJob(
    type: string,
    payload: any,
    options?: Queue.JobOptions
  ): Promise<Queue.Job<JobData>> {
    const jobData: JobData = {
      id: uuidv4(),
      type,
      payload,
      metadata: {
        createdAt: Date.now()
      }
    };

    return this.queue.add(type, jobData, options);
  }

  async addBulkJobs(
    jobs: Array<{ type: string; payload: any; options?: Queue.JobOptions }>
  ): Promise<Queue.Job<JobData>[]> {
    const bulkData = jobs.map(({ type, payload, options }) => ({
      name: type,
      data: {
        id: uuidv4(),
        type,
        payload,
        metadata: { createdAt: Date.now() }
      },
      opts: options || {}
    }));

    return this.queue.addBulk(bulkData);
  }

  async scheduleJob(
    type: string,
    payload: any,
    cronExpression: string
  ): Promise<Queue.Job<JobData>> {
    return this.addJob(type, payload, {
      repeat: {
        cron: cronExpression
      }
    });
  }

  private async processDataBatch(
    items: any[],
    onProgress: (progress: number) => Promise<void>
  ): Promise<any[]> {
    const results = [];
    const total = items.length;

    for (let i = 0; i < total; i++) {
      const result = await this.processItem(items[i]);
      results.push(result);

      // Update progress
      const progress = Math.round(((i + 1) / total) * 100);
      await onProgress(progress);
    }

    return results;
  }

  private async processItem(item: any): Promise<any> {
    // Simulate processing
    await new Promise(resolve => setTimeout(resolve, 100));
    return { ...item, processed: true };
  }

  private async generateReport(
    type: string,
    filters: any,
    format: string
  ): Promise<any> {
    // Simulate report generation
    return {
      url: `https://cdn.example.com/reports/${uuidv4()}.${format}`,
      size: 1024 * 1024
    };
  }

  private async sendEmailBatch(
    recipients: string[],
    template: string,
    data: any
  ): Promise<{ successful: number; failed: number }> {
    // Simulate email sending
    return {
      successful: recipients.length,
      failed: 0
    };
  }

  async getJobStatus(jobId: string): Promise<any> {
    const job = await this.queue.getJob(jobId);
    if (!job) return null;

    const state = await job.getState();
    const logs = await this.queue.getJobLogs(jobId);

    return {
      id: job.id,
      name: job.name,
      data: job.data,
      state,
      progress: job.progress(),
      attempts: job.attemptsMade,
      failedReason: job.failedReason,
      finishedOn: job.finishedOn,
      processedOn: job.processedOn,
      logs: logs.logs
    };
  }

  async getQueueStats(): Promise<any> {
    const [
      waiting,
      active,
      completed,
      failed,
      delayed,
      paused
    ] = await Promise.all([
      this.queue.getWaitingCount(),
      this.queue.getActiveCount(),
      this.queue.getCompletedCount(),
      this.queue.getFailedCount(),
      this.queue.getDelayedCount(),
      this.queue.getPausedCount()
    ]);

    return {
      waiting,
      active,
      completed,
      failed,
      delayed,
      paused
    };
  }

  async pause(): Promise<void> {
    await this.queue.pause();
  }

  async resume(): Promise<void> {
    await this.queue.resume();
  }

  async clean(grace: number = 0): Promise<void> {
    await this.queue.clean(grace, 'completed');
    await this.queue.clean(grace, 'failed');
  }

  async close(): Promise<void> {
    await this.queue.close();
    await this.resultQueue.close();
  }
}

// Usage
const processor = new BatchProcessor('redis://localhost:6379');

// Add single job
const job = await processor.addJob('process-data', {
  items: [{ id: 1 }, { id: 2 }, { id: 3 }]
});

// Add bulk jobs
await processor.addBulkJobs([
  {
    type: 'process-data',
    payload: { items: [/* ... */] }
  },
  {
    type: 'generate-report',
    payload: { type: 'sales', format: 'pdf' }
  }
]);

// Schedule recurring job
await processor.scheduleJob(
  'generate-report',
  { type: 'daily-summary' },
  '0 0 * * *' // Daily at midnight
);

// Check status
const status = await processor.getJobStatus(job.id!);
console.log('Job status:', status);

// Get queue stats
const stats = await processor.getQueueStats();
console.log('Queue stats:', stats);
typescript
import Queue from 'bull';
import { v4 as uuidv4 } from 'uuid';

interface JobData {
  id: string;
  type: string;
  payload: any;
  userId?: string;
  metadata?: Record<string, any>;
}

interface JobResult {
  success: boolean;
  data?: any;
  error?: string;
  processedAt: number;
  duration: number;
}

class BatchProcessor {
  private queue: Queue.Queue<JobData>;
  private resultQueue: Queue.Queue<JobResult>;

  constructor(redisUrl: string) {
    // Main processing queue
    this.queue = new Queue('batch-jobs', redisUrl, {
      defaultJobOptions: {
        attempts: 3,
        backoff: {
          type: 'exponential',
          delay: 2000
        },
        removeOnComplete: 1000,
        removeOnFail: 5000,
        timeout: 300000 // 5 minutes
      },
      settings: {
        maxStalledCount: 2,
        stalledInterval: 30000
      }
    });

    // Results queue
    this.resultQueue = new Queue('batch-results', redisUrl);

    this.setupProcessors();
    this.setupEvents();
  }

  private setupProcessors(): void {
    // Data processing job
    this.queue.process('process-data', 10, async (job) => {
      const startTime = Date.now();
      const { payload } = job.data;

      job.log(`Processing ${payload.items?.length || 0} items`);

      try {
        // Update progress
        await job.progress(0);

        const results = await this.processDataBatch(
          payload.items,
          (progress) => job.progress(progress)
        );

        const duration = Date.now() - startTime;

        return {
          success: true,
          data: results,
          processedAt: Date.now(),
          duration
        };
      } catch (error: any) {
        const duration = Date.now() - startTime;
        throw new Error(`Processing failed: ${error.message}`);
      }
    });

    // Report generation job
    this.queue.process('generate-report', 2, async (job) => {
      const { payload } = job.data;

      const report = await this.generateReport(
        payload.type,
        payload.filters,
        payload.format
      );

      return {
        success: true,
        data: {
          reportId: uuidv4(),
          url: report.url,
          size: report.size
        },
        processedAt: Date.now(),
        duration: 0
      };
    });

    // Email batch job
    this.queue.process('send-emails', 5, async (job) => {
      const { payload } = job.data;
      const { recipients, template, data } = payload;

      const results = await this.sendEmailBatch(
        recipients,
        template,
        data
      );

      return {
        success: true,
        data: {
          sent: results.successful,
          failed: results.failed
        },
        processedAt: Date.now(),
        duration: 0
      };
    });
  }

  private setupEvents(): void {
    this.queue.on('completed', (job, result) => {
      console.log(`Job ${job.id} completed:`, result);

      // Store result
      this.resultQueue.add({
        jobId: job.id,
        ...result
      });
    });

    this.queue.on('failed', (job, error) => {
      console.error(`Job ${job?.id} failed:`, error.message);

      // Store failure
      this.resultQueue.add({
        jobId: job?.id,
        success: false,
        error: error.message,
        processedAt: Date.now(),
        duration: 0
      });
    });

    this.queue.on('progress', (job, progress) => {
      console.log(`Job ${job.id} progress: ${progress}%`);
    });

    this.queue.on('stalled', (job) => {
      console.warn(`Job ${job.id} stalled`);
    });
  }

  async addJob(
    type: string,
    payload: any,
    options?: Queue.JobOptions
  ): Promise<Queue.Job<JobData>> {
    const jobData: JobData = {
      id: uuidv4(),
      type,
      payload,
      metadata: {
        createdAt: Date.now()
      }
    };

    return this.queue.add(type, jobData, options);
  }

  async addBulkJobs(
    jobs: Array<{ type: string; payload: any; options?: Queue.JobOptions }>
  ): Promise<Queue.Job<JobData>[]> {
    const bulkData = jobs.map(({ type, payload, options }) => ({
      name: type,
      data: {
        id: uuidv4(),
        type,
        payload,
        metadata: { createdAt: Date.now() }
      },
      opts: options || {}
    }));

    return this.queue.addBulk(bulkData);
  }

  async scheduleJob(
    type: string,
    payload: any,
    cronExpression: string
  ): Promise<Queue.Job<JobData>> {
    return this.addJob(type, payload, {
      repeat: {
        cron: cronExpression
      }
    });
  }

  private async processDataBatch(
    items: any[],
    onProgress: (progress: number) => Promise<void>
  ): Promise<any[]> {
    const results = [];
    const total = items.length;

    for (let i = 0; i < total; i++) {
      const result = await this.processItem(items[i]);
      results.push(result);

      // Update progress
      const progress = Math.round(((i + 1) / total) * 100);
      await onProgress(progress);
    }

    return results;
  }

  private async processItem(item: any): Promise<any> {
    // Simulate processing
    await new Promise(resolve => setTimeout(resolve, 100));
    return { ...item, processed: true };
  }

  private async generateReport(
    type: string,
    filters: any,
    format: string
  ): Promise<any> {
    // Simulate report generation
    return {
      url: `https://cdn.example.com/reports/${uuidv4()}.${format}`,
      size: 1024 * 1024
    };
  }

  private async sendEmailBatch(
    recipients: string[],
    template: string,
    data: any
  ): Promise<{ successful: number; failed: number }> {
    // Simulate email sending
    return {
      successful: recipients.length,
      failed: 0
    };
  }

  async getJobStatus(jobId: string): Promise<any> {
    const job = await this.queue.getJob(jobId);
    if (!job) return null;

    const state = await job.getState();
    const logs = await this.queue.getJobLogs(jobId);

    return {
      id: job.id,
      name: job.name,
      data: job.data,
      state,
      progress: job.progress(),
      attempts: job.attemptsMade,
      failedReason: job.failedReason,
      finishedOn: job.finishedOn,
      processedOn: job.processedOn,
      logs: logs.logs
    };
  }

  async getQueueStats(): Promise<any> {
    const [
      waiting,
      active,
      completed,
      failed,
      delayed,
      paused
    ] = await Promise.all([
      this.queue.getWaitingCount(),
      this.queue.getActiveCount(),
      this.queue.getCompletedCount(),
      this.queue.getFailedCount(),
      this.queue.getDelayedCount(),
      this.queue.getPausedCount()
    ]);

    return {
      waiting,
      active,
      completed,
      failed,
      delayed,
      paused
    };
  }

  async pause(): Promise<void> {
    await this.queue.pause();
  }

  async resume(): Promise<void> {
    await this.queue.resume();
  }

  async clean(grace: number = 0): Promise<void> {
    await this.queue.clean(grace, 'completed');
    await this.queue.clean(grace, 'failed');
  }

  async close(): Promise<void> {
    await this.queue.close();
    await this.resultQueue.close();
  }
}

// Usage
const processor = new BatchProcessor('redis://localhost:6379');

// Add single job
const job = await processor.addJob('process-data', {
  items: [{ id: 1 }, { id: 2 }, { id: 3 }]
});

// Add bulk jobs
await processor.addBulkJobs([
  {
    type: 'process-data',
    payload: { items: [/* ... */] }
  },
  {
    type: 'generate-report',
    payload: { type: 'sales', format: 'pdf' }
  }
]);

// Schedule recurring job
await processor.scheduleJob(
  'generate-report',
  { type: 'daily-summary' },
  '0 0 * * *' // Daily at midnight
);

// Check status
const status = await processor.getJobStatus(job.id!);
console.log('Job status:', status);

// Get queue stats
const stats = await processor.getQueueStats();
console.log('Queue stats:', stats);

2. Celery-Style Worker (Python)

2. Celery风格工作节点(Python)

python
from celery import Celery, Task
from celery.schedules import crontab
from typing import List, Any, Dict
import time
import logging
python
from celery import Celery, Task
from celery.schedules import crontab
from typing import List, Any, Dict
import time
import logging

Initialize Celery

Initialize Celery

app = Celery( 'batch_processor', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1' )
app = Celery( 'batch_processor', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1' )

Configure Celery

Configure Celery

app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone='UTC', enable_utc=True, task_track_started=True, task_time_limit=300, # 5 minutes task_soft_time_limit=270, # 4.5 minutes worker_prefetch_multiplier=4, worker_max_tasks_per_child=1000, )
app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone='UTC', enable_utc=True, task_track_started=True, task_time_limit=300, # 5 minutes task_soft_time_limit=270, # 4.5 minutes worker_prefetch_multiplier=4, worker_max_tasks_per_child=1000, )

Periodic tasks

Periodic tasks

app.conf.beat_schedule = { 'daily-report': { 'task': 'tasks.generate_daily_report', 'schedule': crontab(hour=0, minute=0), }, 'cleanup-old-data': { 'task': 'tasks.cleanup_old_data', 'schedule': crontab(hour=2, minute=0), }, }
logger = logging.getLogger(name)
class CallbackTask(Task): """Base task with callback support."""
def on_success(self, retval, task_id, args, kwargs):
    logger.info(f"Task {task_id} succeeded: {retval}")

def on_failure(self, exc, task_id, args, kwargs, einfo):
    logger.error(f"Task {task_id} failed: {exc}")

def on_retry(self, exc, task_id, args, kwargs, einfo):
    logger.warning(f"Task {task_id} retrying: {exc}")
@app.task(base=CallbackTask, bind=True, max_retries=3) def process_batch_data(self, items: List[Dict[str, Any]]) -> Dict[str, Any]: """Process batch of data items.""" try: results = [] total = len(items)
    for i, item in enumerate(items):
        # Process item
        result = process_single_item(item)
        results.append(result)

        # Update progress
        progress = int((i + 1) / total * 100)
        self.update_state(
            state='PROGRESS',
            meta={'current': i + 1, 'total': total, 'percent': progress}
        )

    return {
        'processed': len(results),
        'success': True,
        'results': results
    }

except Exception as exc:
    logger.error(f"Batch processing failed: {exc}")
    raise self.retry(exc=exc, countdown=60)  # Retry after 1 minute
@app.task def process_single_item(item: Dict[str, Any]) -> Dict[str, Any]: """Process single item.""" # Simulate processing time.sleep(0.1) return { 'id': item.get('id'), 'processed': True, 'timestamp': time.time() }
@app.task(bind=True) def generate_report( self, report_type: str, filters: Dict[str, Any], format: str = 'pdf' ) -> Dict[str, str]: """Generate report.""" logger.info(f"Generating {report_type} report in {format} format")
self.update_state(state='PROGRESS', meta={'step': 'gathering_data'})
# Gather data
time.sleep(2)

self.update_state(state='PROGRESS', meta={'step': 'processing'})
# Process data
time.sleep(2)

self.update_state(state='PROGRESS', meta={'step': 'generating'})
# Generate report
time.sleep(2)

return {
    'report_id': f"report-{int(time.time())}",
    'url': f"https://cdn.example.com/reports/report.{format}",
    'format': format
}
@app.task def send_email_batch( recipients: List[str], template: str, context: Dict[str, Any] ) -> Dict[str, int]: """Send batch of emails.""" successful = 0 failed = 0
for recipient in recipients:
    try:
        send_email(recipient, template, context)
        successful += 1
    except Exception as e:
        logger.error(f"Failed to send email to {recipient}: {e}")
        failed += 1

return {
    'successful': successful,
    'failed': failed,
    'total': len(recipients)
}
@app.task def generate_daily_report(): """Scheduled task: Generate daily report.""" logger.info("Generating daily report") generate_report.delay('daily', {}, 'pdf')
@app.task def cleanup_old_data(): """Scheduled task: Clean up old data.""" logger.info("Cleaning up old data") # Cleanup logic here
def send_email(recipient: str, template: str, context: Dict[str, Any]): """Send single email.""" logger.info(f"Sending email to {recipient}") # Email sending logic
app.conf.beat_schedule = { 'daily-report': { 'task': 'tasks.generate_daily_report', 'schedule': crontab(hour=0, minute=0), }, 'cleanup-old-data': { 'task': 'tasks.cleanup_old_data', 'schedule': crontab(hour=2, minute=0), }, }
logger = logging.getLogger(name)
class CallbackTask(Task): """Base task with callback support."""
def on_success(self, retval, task_id, args, kwargs):
    logger.info(f"Task {task_id} succeeded: {retval}")

def on_failure(self, exc, task_id, args, kwargs, einfo):
    logger.error(f"Task {task_id} failed: {exc}")

def on_retry(self, exc, task_id, args, kwargs, einfo):
    logger.warning(f"Task {task_id} retrying: {exc}")
@app.task(base=CallbackTask, bind=True, max_retries=3) def process_batch_data(self, items: List[Dict[str, Any]]) -> Dict[str, Any]: """Process batch of data items.""" try: results = [] total = len(items)
    for i, item in enumerate(items):
        # Process item
        result = process_single_item(item)
        results.append(result)

        # Update progress
        progress = int((i + 1) / total * 100)
        self.update_state(
            state='PROGRESS',
            meta={'current': i + 1, 'total': total, 'percent': progress}
        )

    return {
        'processed': len(results),
        'success': True,
        'results': results
    }

except Exception as exc:
    logger.error(f"Batch processing failed: {exc}")
    raise self.retry(exc=exc, countdown=60)  # Retry after 1 minute
@app.task def process_single_item(item: Dict[str, Any]) -> Dict[str, Any]: """Process single item.""" # Simulate processing time.sleep(0.1) return { 'id': item.get('id'), 'processed': True, 'timestamp': time.time() }
@app.task(bind=True) def generate_report( self, report_type: string, filters: Dict[str, Any], format: string = 'pdf' ) -> Dict[str, str]: """Generate report.""" logger.info(f"Generating {report_type} report in {format} format")
self.update_state(state='PROGRESS', meta={'step': 'gathering_data'})
# Gather data
time.sleep(2)

self.update_state(state='PROGRESS', meta={'step': 'processing'})
# Process data
time.sleep(2)

self.update_state(state='PROGRESS', meta={'step': 'generating'})
# Generate report
time.sleep(2)

return {
    'report_id': f"report-{int(time.time())}",
    'url': f"https://cdn.example.com/reports/report.{format}",
    'format': format
}
@app_task def send_email_batch( recipients: List[str], template: string, context: Dict[str, Any] ) -> Dict[str, int]: """Send batch of emails.""" successful = 0 failed = 0
for recipient in recipients:
    try:
        send_email(recipient, template, context)
        successful += 1
    except Exception as e:
        logger.error(f"Failed to send email to {recipient}: {e}")
        failed += 1

return {
    'successful': successful,
    'failed': failed,
    'total': len(recipients)
}
@app_task def generate_daily_report(): """Scheduled task: Generate daily report.""" logger.info("Generating daily report") generate_report.delay('daily', {}, 'pdf')
@app_task def cleanup_old_data(): """Scheduled task: Clean up old data.""" logger.info("Cleaning up old data") # Cleanup logic here
def send_email(recipient: string, template: string, context: Dict[str, Any]): """Send single email.""" logger.info(f"Sending email to {recipient}") # Email sending logic

Task chaining and grouping

Task chaining and grouping

from celery import chain, group, chord
def process_in_chunks(items: List[Any], chunk_size: int = 100): """Process items in parallel chunks.""" chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
# Process chunks in parallel
job = group(process_batch_data.s(chunk) for chunk in chunks)
result = job.apply_async()

return result
def process_with_callback(items: List[Any]): """Process items and call callback when done.""" callback = send_notification.s() header = group(process_batch_data.s(chunk) for chunk in [items])
# Use chord to call callback after all tasks complete
job = chord(header)(callback)
return job
@app.task def send_notification(results): """Callback task after batch processing.""" logger.info(f"All tasks completed: {len(results)} results")
from celery import chain, group, chord
def process_in_chunks(items: List[Any], chunk_size: int = 100): """Process items in parallel chunks.""" chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
# Process chunks in parallel
job = group(process_batch_data.s(chunk) for chunk in chunks)
result = job.apply_async()

return result
def process_with_callback(items: List[Any]): """Process items and call callback when done.""" callback = send_notification.s() header = group(process_batch_data.s(chunk) for chunk in [items])
# Use chord to call callback after all tasks complete
job = chord(header)(callback)
return job
@app_task def send_notification(results): """Callback task after batch processing.""" logger.info(f"All tasks completed: {len(results)} results")

Usage examples

Usage examples

if name == 'main': # Enqueue task result = process_batch_data.delay([ {'id': 1, 'value': 'a'}, {'id': 2, 'value': 'b'} ])
# Check task status
print(f"Task ID: {result.id}")
print(f"Status: {result.status}")

# Wait for result (blocking)
final_result = result.get(timeout=10)
print(f"Result: {final_result}")

# Process in chunks
items = [{'id': i} for i in range(1000)]
chunk_result = process_in_chunks(items, chunk_size=100)

# Check group result
print(f"Chunks: {len(chunk_result)}")
undefined
if name == 'main': # Enqueue task result = process_batch_data.delay([ {'id': 1, 'value': 'a'}, {'id': 2, 'value': 'b'} ])
# Check task status
print(f"Task ID: {result.id}")
print(f"Status: {result.status}")

# Wait for result (blocking)
final_result = result.get(timeout=10)
print(f"Result: {final_result}")

# Process in chunks
items = [{'id': i} for i in range(1000)]
chunk_result = process_in_chunks(items, chunk_size=100)

# Check group result
print(f"Chunks: {len(chunk_result)}")
undefined

3. Cron Job Scheduler

3. Cron任务调度器

typescript
import cron from 'node-cron';

interface ScheduledJob {
  name: string;
  schedule: string;
  handler: () => Promise<void>;
  enabled: boolean;
  lastRun?: Date;
  nextRun?: Date;
}

class JobScheduler {
  private jobs: Map<string, cron.ScheduledTask> = new Map();
  private jobConfigs: Map<string, ScheduledJob> = new Map();

  register(job: ScheduledJob): void {
    if (this.jobs.has(job.name)) {
      throw new Error(`Job ${job.name} already registered`);
    }

    // Validate cron expression
    if (!cron.validate(job.schedule)) {
      throw new Error(`Invalid cron expression: ${job.schedule}`);
    }

    const task = cron.schedule(job.schedule, async () => {
      if (!job.enabled) return;

      console.log(`Running job: ${job.name}`);
      const startTime = Date.now();

      try {
        await job.handler();

        const duration = Date.now() - startTime;
        console.log(`Job ${job.name} completed in ${duration}ms`);

        job.lastRun = new Date();
      } catch (error) {
        console.error(`Job ${job.name} failed:`, error);
      }
    });

    this.jobs.set(job.name, task);
    this.jobConfigs.set(job.name, job);

    if (job.enabled) {
      task.start();
    }
  }

  start(name: string): void {
    const task = this.jobs.get(name);
    if (!task) {
      throw new Error(`Job ${name} not found`);
    }

    task.start();

    const config = this.jobConfigs.get(name)!;
    config.enabled = true;
  }

  stop(name: string): void {
    const task = this.jobs.get(name);
    if (!task) {
      throw new Error(`Job ${name} not found`);
    }

    task.stop();

    const config = this.jobConfigs.get(name)!;
    config.enabled = false;
  }

  remove(name: string): void {
    const task = this.jobs.get(name);
    if (task) {
      task.destroy();
      this.jobs.delete(name);
      this.jobConfigs.delete(name);
    }
  }

  getJobs(): ScheduledJob[] {
    return Array.from(this.jobConfigs.values());
  }
}

// Usage
const scheduler = new JobScheduler();

// Register jobs
scheduler.register({
  name: 'daily-backup',
  schedule: '0 2 * * *', // 2 AM daily
  enabled: true,
  handler: async () => {
    console.log('Running daily backup...');
    // Backup logic
  }
});

scheduler.register({
  name: 'hourly-cleanup',
  schedule: '0 * * * *', // Every hour
  enabled: true,
  handler: async () => {
    console.log('Running cleanup...');
    // Cleanup logic
  }
});

scheduler.register({
  name: 'weekly-report',
  schedule: '0 9 * * 1', // Monday 9 AM
  enabled: true,
  handler: async () => {
    console.log('Generating weekly report...');
    // Report generation
  }
});
typescript
import cron from 'node-cron';

interface ScheduledJob {
  name: string;
  schedule: string;
  handler: () => Promise<void>;
  enabled: boolean;
  lastRun?: Date;
  nextRun?: Date;
}

class JobScheduler {
  private jobs: Map<string, cron.ScheduledTask> = new Map();
  private jobConfigs: Map<string, ScheduledJob> = new Map();

  register(job: ScheduledJob): void {
    if (this.jobs.has(job.name)) {
      throw new Error(`Job ${job.name} already registered`);
    }

    // Validate cron expression
    if (!cron.validate(job.schedule)) {
      throw new Error(`Invalid cron expression: ${job.schedule}`);
    }

    const task = cron.schedule(job.schedule, async () => {
      if (!job.enabled) return;

      console.log(`Running job: ${job.name}`);
      const startTime = Date.now();

      try {
        await job.handler();

        const duration = Date.now() - startTime;
        console.log(`Job ${job.name} completed in ${duration}ms`);

        job.lastRun = new Date();
      } catch (error) {
        console.error(`Job ${job.name} failed:`, error);
      }
    });

    this.jobs.set(job.name, task);
    this.jobConfigs.set(job.name, job);

    if (job.enabled) {
      task.start();
    }
  }

  start(name: string): void {
    const task = this.jobs.get(name);
    if (!task) {
      throw new Error(`Job ${name} not found`);
    }

    task.start();

    const config = this.jobConfigs.get(name)!;
    config.enabled = true;
  }

  stop(name: string): void {
    const task = this.jobs.get(name);
    if (!task) {
      throw new Error(`Job ${name} not found`);
    }

    task.stop();

    const config = this.jobConfigs.get(name)!;
    config.enabled = false;
  }

  remove(name: string): void {
    const task = this.jobs.get(name);
    if (task) {
      task.destroy();
      this.jobs.delete(name);
      this.jobConfigs.delete(name);
    }
  }

  getJobs(): ScheduledJob[] {
    return Array.from(this.jobConfigs.values());
  }
}

// Usage
const scheduler = new JobScheduler();

// Register jobs
scheduler.register({
  name: 'daily-backup',
  schedule: '0 2 * * *', // 每天凌晨2点
  enabled: true,
  handler: async () => {
    console.log('执行每日备份...');
    // 备份逻辑
  }
});

scheduler.register({
  name: 'hourly-cleanup',
  schedule: '0 * * * *', // 每小时一次
  enabled: true,
  handler: async () => {
    console.log('执行清理任务...');
    // 清理逻辑
  }
});

scheduler.register({
  name: 'weekly-report',
  schedule: '0 9 * * 1', // 每周一上午9点
  enabled: true,
  handler: async () => {
    console.log('生成每周报告...');
    // 报告生成逻辑
  }
});

Best Practices

最佳实践

✅ DO

✅ 建议

  • Implement idempotency for all jobs
  • Use job queues for distributed processing
  • Monitor job success/failure rates
  • Implement retry logic with exponential backoff
  • Set appropriate timeouts
  • Log job execution details
  • Use dead letter queues for failed jobs
  • Implement job priority levels
  • Batch similar operations together
  • Use connection pooling
  • Implement graceful shutdown
  • Monitor queue depth and processing time
  • 为所有任务实现幂等性
  • 使用任务队列进行分布式处理
  • 监控任务成功/失败率
  • 实现带指数退避的重试逻辑
  • 设置合理的超时时间
  • 记录任务执行详情
  • 为失败任务使用死信队列
  • 实现任务优先级
  • 将相似操作批量处理
  • 使用连接池
  • 实现优雅关闭
  • 监控队列深度和处理时间

❌ DON'T

❌ 不建议

  • Process jobs synchronously in request handlers
  • Ignore failed jobs
  • Set unlimited retries
  • Skip monitoring and alerting
  • Process jobs without timeouts
  • Store large payloads in queue
  • Forget to clean up completed jobs
  • 在请求处理器中同步处理任务
  • 忽略失败的任务
  • 设置无限重试次数
  • 跳过监控和告警
  • 处理任务时不设置超时
  • 在队列中存储大型负载
  • 忘记清理已完成的任务

Resources

参考资源