job-state-machine

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Job State Machine

任务状态机

Validated state transitions for async jobs.
为异步任务提供经过验证的状态转换机制。

When to Use This Skill

何时使用该技能

  • Processing async jobs (image generation, exports, etc.)
  • Need progress tracking for long operations
  • Want to link created assets to jobs
  • Need reliable state management
  • 处理异步任务(如图像生成、导出等)
  • 需要对长时间操作进行进度跟踪
  • 希望将生成的资产与任务关联
  • 需要可靠的状态管理

Core Concepts

核心概念

  1. Defined states - QUEUED → PROCESSING → COMPLETED/FAILED/PARTIAL
  2. Valid transitions - Only allowed state changes
  3. Terminal states - Jobs must reach an end state
  4. Asset linking - Track outputs created by jobs
  1. 定义状态 - QUEUED → PROCESSING → COMPLETED/FAILED/PARTIAL
  2. 合法转换 - 仅允许指定的状态变更
  3. 终端状态 - 任务必须进入结束状态
  4. 资产关联 - 跟踪任务生成的输出内容

State Machine

状态机

         QUEUED
       PROCESSING
       /    |    \
      ▼     ▼     ▼
COMPLETED PARTIAL FAILED
   (terminal states)
         QUEUED
       PROCESSING
       /    |    \
      ▼     ▼     ▼
COMPLETED PARTIAL FAILED
   (terminal states)

TypeScript Implementation

TypeScript 实现

Types

类型定义

typescript
// types.ts
export enum JobStatus {
  QUEUED = 'queued',
  PROCESSING = 'processing',
  COMPLETED = 'completed',
  FAILED = 'failed',
  PARTIAL = 'partial',
}

export const VALID_TRANSITIONS: Record<JobStatus, JobStatus[]> = {
  [JobStatus.QUEUED]: [JobStatus.PROCESSING],
  [JobStatus.PROCESSING]: [JobStatus.COMPLETED, JobStatus.PARTIAL, JobStatus.FAILED],
  [JobStatus.COMPLETED]: [],
  [JobStatus.FAILED]: [],
  [JobStatus.PARTIAL]: [],
};

export function isTerminalState(status: JobStatus): boolean {
  return VALID_TRANSITIONS[status].length === 0;
}

export interface Job {
  id: string;
  userId: string;
  jobType: string;
  status: JobStatus;
  progress: number;
  errorMessage?: string;
  parameters?: Record<string, unknown>;
  result?: Record<string, unknown>;
  createdAt: Date;
  updatedAt: Date;
  completedAt?: Date;
}

export interface Asset {
  id: string;
  jobId: string;
  userId: string;
  assetType: string;
  url: string;
  storagePath: string;
  fileSize: number;
  createdAt: Date;
}
typescript
// types.ts
export enum JobStatus {
  QUEUED = 'queued',
  PROCESSING = 'processing',
  COMPLETED = 'completed',
  FAILED = 'failed',
  PARTIAL = 'partial',
}

export const VALID_TRANSITIONS: Record<JobStatus, JobStatus[]> = {
  [JobStatus.QUEUED]: [JobStatus.PROCESSING],
  [JobStatus.PROCESSING]: [JobStatus.COMPLETED, JobStatus.PARTIAL, JobStatus.FAILED],
  [JobStatus.COMPLETED]: [],
  [JobStatus.FAILED]: [],
  [JobStatus.PARTIAL]: [],
};

export function isTerminalState(status: JobStatus): boolean {
  return VALID_TRANSITIONS[status].length === 0;
}

export interface Job {
  id: string;
  userId: string;
  jobType: string;
  status: JobStatus;
  progress: number;
  errorMessage?: string;
  parameters?: Record<string, unknown>;
  result?: Record<string, unknown>;
  createdAt: Date;
  updatedAt: Date;
  completedAt?: Date;
}

export interface Asset {
  id: string;
  jobId: string;
  userId: string;
  assetType: string;
  url: string;
  storagePath: string;
  fileSize: number;
  createdAt: Date;
}

Job Service

任务服务

typescript
// job-service.ts
import { Job, JobStatus, Asset, VALID_TRANSITIONS, isTerminalState } from './types';

export class InvalidStateTransitionError extends Error {
  constructor(public currentStatus: JobStatus, public targetStatus: JobStatus) {
    super(`Cannot transition from '${currentStatus}' to '${targetStatus}'`);
    this.name = 'InvalidStateTransitionError';
  }
}

export class JobService {
  constructor(private db: Database) {}

  async createJob(
    userId: string,
    jobType: string,
    parameters?: Record<string, unknown>
  ): Promise<Job> {
    const job: Job = {
      id: crypto.randomUUID(),
      userId,
      jobType,
      status: JobStatus.QUEUED,
      progress: 0,
      parameters,
      createdAt: new Date(),
      updatedAt: new Date(),
    };

    await this.db.jobs.insert(job);
    return job;
  }

  async getJob(jobId: string, userId: string): Promise<Job> {
    const job = await this.db.jobs.findOne({ id: jobId });
    if (!job) throw new Error('Job not found');
    if (job.userId !== userId) throw new Error('Unauthorized');
    return job;
  }

  async transitionStatus(
    jobId: string,
    targetStatus: JobStatus,
    options: {
      progress?: number;
      errorMessage?: string;
      result?: Record<string, unknown>;
    } = {}
  ): Promise<Job> {
    const job = await this.db.jobs.findOne({ id: jobId });
    if (!job) throw new Error('Job not found');

    // Allow same-state for progress updates
    if (job.status !== targetStatus) {
      if (!VALID_TRANSITIONS[job.status].includes(targetStatus)) {
        throw new InvalidStateTransitionError(job.status, targetStatus);
      }
    }

    const updates: Partial<Job> = {
      status: targetStatus,
      progress: options.progress ?? job.progress,
      updatedAt: new Date(),
    };

    if (options.errorMessage !== undefined) {
      updates.errorMessage = options.errorMessage;
    }

    if (options.result !== undefined) {
      updates.result = options.result;
    }

    if (isTerminalState(targetStatus)) {
      updates.completedAt = new Date();
    }

    await this.db.jobs.update({ id: jobId }, updates);
    return { ...job, ...updates };
  }

  async updateProgress(jobId: string, progress: number): Promise<Job> {
    const job = await this.db.jobs.findOne({ id: jobId });
    if (!job) throw new Error('Job not found');

    if (job.status !== JobStatus.PROCESSING) {
      console.warn(`Ignoring progress update for job ${jobId} in ${job.status} state`);
      return job;
    }

    return this.transitionStatus(jobId, JobStatus.PROCESSING, { progress });
  }

  async markCompleted(jobId: string, result?: Record<string, unknown>): Promise<Job> {
    return this.transitionStatus(jobId, JobStatus.COMPLETED, { progress: 100, result });
  }

  async markFailed(jobId: string, errorMessage: string): Promise<Job> {
    return this.transitionStatus(jobId, JobStatus.FAILED, { errorMessage });
  }

  async markPartial(
    jobId: string,
    result?: Record<string, unknown>,
    errorMessage?: string
  ): Promise<Job> {
    return this.transitionStatus(jobId, JobStatus.PARTIAL, {
      progress: 100,
      result,
      errorMessage,
    });
  }

  async createAsset(
    jobId: string,
    userId: string,
    assetType: string,
    url: string,
    storagePath: string,
    fileSize: number
  ): Promise<Asset> {
    const asset: Asset = {
      id: crypto.randomUUID(),
      jobId,
      userId,
      assetType,
      url,
      storagePath,
      fileSize,
      createdAt: new Date(),
    };

    await this.db.assets.insert(asset);
    return asset;
  }

  async getJobAssets(jobId: string, userId: string): Promise<Asset[]> {
    await this.getJob(jobId, userId); // Verify ownership
    return this.db.assets.find({ jobId });
  }
}
typescript
// job-service.ts
import { Job, JobStatus, Asset, VALID_TRANSITIONS, isTerminalState } from './types';

export class InvalidStateTransitionError extends Error {
  constructor(public currentStatus: JobStatus, public targetStatus: JobStatus) {
    super(`Cannot transition from '${currentStatus}' to '${targetStatus}'`);
    this.name = 'InvalidStateTransitionError';
  }
}

export class JobService {
  constructor(private db: Database) {}

  async createJob(
    userId: string,
    jobType: string,
    parameters?: Record<string, unknown>
  ): Promise<Job> {
    const job: Job = {
      id: crypto.randomUUID(),
      userId,
      jobType,
      status: JobStatus.QUEUED,
      progress: 0,
      parameters,
      createdAt: new Date(),
      updatedAt: new Date(),
    };

    await this.db.jobs.insert(job);
    return job;
  }

  async getJob(jobId: string, userId: string): Promise<Job> {
    const job = await this.db.jobs.findOne({ id: jobId });
    if (!job) throw new Error('Job not found');
    if (job.userId !== userId) throw new Error('Unauthorized');
    return job;
  }

  async transitionStatus(
    jobId: string,
    targetStatus: JobStatus,
    options: {
      progress?: number;
      errorMessage?: string;
      result?: Record<string, unknown>;
    } = {}
  ): Promise<Job> {
    const job = await this.db.jobs.findOne({ id: jobId });
    if (!job) throw new Error('Job not found');

    // Allow same-state for progress updates
    if (job.status !== targetStatus) {
      if (!VALID_TRANSITIONS[job.status].includes(targetStatus)) {
        throw new InvalidStateTransitionError(job.status, targetStatus);
      }
    }

    const updates: Partial<Job> = {
      status: targetStatus,
      progress: options.progress ?? job.progress,
      updatedAt: new Date(),
    };

    if (options.errorMessage !== undefined) {
      updates.errorMessage = options.errorMessage;
    }

    if (options.result !== undefined) {
      updates.result = options.result;
    }

    if (isTerminalState(targetStatus)) {
      updates.completedAt = new Date();
    }

    await this.db.jobs.update({ id: jobId }, updates);
    return { ...job, ...updates };
  }

  async updateProgress(jobId: string, progress: number): Promise<Job> {
    const job = await this.db.jobs.findOne({ id: jobId });
    if (!job) throw new Error('Job not found');

    if (job.status !== JobStatus.PROCESSING) {
      console.warn(`Ignoring progress update for job ${jobId} in ${job.status} state`);
      return job;
    }

    return this.transitionStatus(jobId, JobStatus.PROCESSING, { progress });
  }

  async markCompleted(jobId: string, result?: Record<string, unknown>): Promise<Job> {
    return this.transitionStatus(jobId, JobStatus.COMPLETED, { progress: 100, result });
  }

  async markFailed(jobId: string, errorMessage: string): Promise<Job> {
    return this.transitionStatus(jobId, JobStatus.FAILED, { errorMessage });
  }

  async markPartial(
    jobId: string,
    result?: Record<string, unknown>,
    errorMessage?: string
  ): Promise<Job> {
    return this.transitionStatus(jobId, JobStatus.PARTIAL, {
      progress: 100,
      result,
      errorMessage,
    });
  }

  async createAsset(
    jobId: string,
    userId: string,
    assetType: string,
    url: string,
    storagePath: string,
    fileSize: number
  ): Promise<Asset> {
    const asset: Asset = {
      id: crypto.randomUUID(),
      jobId,
      userId,
      assetType,
      url,
      storagePath,
      fileSize,
      createdAt: new Date(),
    };

    await this.db.assets.insert(asset);
    return asset;
  }

  async getJobAssets(jobId: string, userId: string): Promise<Asset[]> {
    await this.getJob(jobId, userId); // Verify ownership
    return this.db.assets.find({ jobId });
  }
}

Python Implementation

Python 实现

python
undefined
python
undefined

job_service.py

job_service.py

from enum import Enum from dataclasses import dataclass from datetime import datetime from typing import Optional, Dict, Any, List from uuid import uuid4
class JobStatus(str, Enum): QUEUED = "queued" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" PARTIAL = "partial"
VALID_TRANSITIONS = { JobStatus.QUEUED: [JobStatus.PROCESSING], JobStatus.PROCESSING: [JobStatus.COMPLETED, JobStatus.PARTIAL, JobStatus.FAILED], JobStatus.COMPLETED: [], JobStatus.FAILED: [], JobStatus.PARTIAL: [], }
def is_terminal_state(status: JobStatus) -> bool: return len(VALID_TRANSITIONS.get(status, [])) == 0
@dataclass class Job: id: str user_id: str job_type: str status: JobStatus progress: int created_at: datetime updated_at: datetime error_message: Optional[str] = None parameters: Optional[Dict[str, Any]] = None result: Optional[Dict[str, Any]] = None completed_at: Optional[datetime] = None
class InvalidStateTransitionError(Exception): def init(self, current: JobStatus, target: JobStatus): self.current = current self.target = target super().init(f"Cannot transition from '{current}' to '{target}'")
class JobService: def init(self, db): self.db = db
async def create_job(
    self,
    user_id: str,
    job_type: str,
    parameters: Optional[Dict[str, Any]] = None,
) -> Job:
    now = datetime.utcnow()
    job = Job(
        id=str(uuid4()),
        user_id=user_id,
        job_type=job_type,
        status=JobStatus.QUEUED,
        progress=0,
        parameters=parameters,
        created_at=now,
        updated_at=now,
    )
    await self.db.jobs.insert(job)
    return job

async def transition_status(
    self,
    job_id: str,
    target_status: JobStatus,
    progress: Optional[int] = None,
    error_message: Optional[str] = None,
    result: Optional[Dict[str, Any]] = None,
) -> Job:
    job = await self.db.jobs.find_one(id=job_id)
    if not job:
        raise ValueError("Job not found")

    if job.status != target_status:
        if target_status not in VALID_TRANSITIONS.get(job.status, []):
            raise InvalidStateTransitionError(job.status, target_status)

    job.status = target_status
    job.updated_at = datetime.utcnow()
    
    if progress is not None:
        job.progress = progress
    if error_message is not None:
        job.error_message = error_message
    if result is not None:
        job.result = result
    if is_terminal_state(target_status):
        job.completed_at = datetime.utcnow()

    await self.db.jobs.update(job)
    return job

async def mark_completed(self, job_id: str, result: Optional[Dict] = None) -> Job:
    return await self.transition_status(
        job_id, JobStatus.COMPLETED, progress=100, result=result
    )

async def mark_failed(self, job_id: str, error_message: str) -> Job:
    return await self.transition_status(
        job_id, JobStatus.FAILED, error_message=error_message
    )
undefined
from enum import Enum from dataclasses import dataclass from datetime import datetime from typing import Optional, Dict, Any, List from uuid import uuid4
class JobStatus(str, Enum): QUEUED = "queued" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" PARTIAL = "partial"
VALID_TRANSITIONS = { JobStatus.QUEUED: [JobStatus.PROCESSING], JobStatus.PROCESSING: [JobStatus.COMPLETED, JobStatus.PARTIAL, JobStatus.FAILED], JobStatus.COMPLETED: [], JobStatus.FAILED: [], JobStatus.PARTIAL: [], }
def is_terminal_state(status: JobStatus) -> bool: return len(VALID_TRANSITIONS.get(status, [])) == 0
@dataclass class Job: id: str user_id: str job_type: str status: JobStatus progress: int created_at: datetime updated_at: datetime error_message: Optional[str] = None parameters: Optional[Dict[str, Any]] = None result: Optional[Dict[str, Any]] = None completed_at: Optional[datetime] = None
class InvalidStateTransitionError(Exception): def init(self, current: JobStatus, target: JobStatus): self.current = current self.target = target super().init(f"Cannot transition from '{current}' to '{target}'")
class JobService: def init(self, db): self.db = db
async def create_job(
    self,
    user_id: str,
    job_type: str,
    parameters: Optional[Dict[str, Any]] = None,
) -> Job:
    now = datetime.utcnow()
    job = Job(
        id=str(uuid4()),
        user_id=user_id,
        job_type=job_type,
        status=JobStatus.QUEUED,
        progress=0,
        parameters=parameters,
        created_at=now,
        updated_at=now,
    )
    await self.db.jobs.insert(job)
    return job

async def transition_status(
    self,
    job_id: str,
    target_status: JobStatus,
    progress: Optional[int] = None,
    error_message: Optional[str] = None,
    result: Optional[Dict[str, Any]] = None,
) -> Job:
    job = await self.db.jobs.find_one(id=job_id)
    if not job:
        raise ValueError("Job not found")

    if job.status != target_status:
        if target_status not in VALID_TRANSITIONS.get(job.status, []):
            raise InvalidStateTransitionError(job.status, target_status)

    job.status = target_status
    job.updated_at = datetime.utcnow()
    
    if progress is not None:
        job.progress = progress
    if error_message is not None:
        job.error_message = error_message
    if result is not None:
        job.result = result
    if is_terminal_state(target_status):
        job.completed_at = datetime.utcnow()

    await self.db.jobs.update(job)
    return job

async def mark_completed(self, job_id: str, result: Optional[Dict] = None) -> Job:
    return await self.transition_status(
        job_id, JobStatus.COMPLETED, progress=100, result=result
    )

async def mark_failed(self, job_id: str, error_message: str) -> Job:
    return await self.transition_status(
        job_id, JobStatus.FAILED, error_message=error_message
    )
undefined

Worker Integration

Worker 集成

typescript
async function processJob(jobId: string) {
  const jobService = getJobService();

  try {
    // Transition to PROCESSING
    await jobService.transitionStatus(jobId, JobStatus.PROCESSING, { progress: 0 });

    const job = await jobService.getJobInternal(jobId);

    // Process with progress updates
    await jobService.updateProgress(jobId, 25);
    const result1 = await doStep1(job.parameters);

    await jobService.updateProgress(jobId, 50);
    const result2 = await doStep2(result1);

    await jobService.updateProgress(jobId, 75);
    const assetUrl = await uploadResult(result2);

    // Create asset
    await jobService.createAsset(
      jobId,
      job.userId,
      job.jobType,
      assetUrl,
      `${job.userId}/${jobId}/result.png`,
      result2.length
    );

    // Mark completed
    await jobService.markCompleted(jobId, { assetCount: 1 });

  } catch (error) {
    console.error(`Job ${jobId} failed:`, error);
    await jobService.markFailed(jobId, error.message);
  }
}
typescript
async function processJob(jobId: string) {
  const jobService = getJobService();

  try {
    // Transition to PROCESSING
    await jobService.transitionStatus(jobId, JobStatus.PROCESSING, { progress: 0 });

    const job = await jobService.getJobInternal(jobId);

    // Process with progress updates
    await jobService.updateProgress(jobId, 25);
    const result1 = await doStep1(job.parameters);

    await jobService.updateProgress(jobId, 50);
    const result2 = await doStep2(result1);

    await jobService.updateProgress(jobId, 75);
    const assetUrl = await uploadResult(result2);

    // Create asset
    await jobService.createAsset(
      jobId,
      job.userId,
      job.jobType,
      assetUrl,
      `${job.userId}/${jobId}/result.png`,
      result2.length
    );

    // Mark completed
    await jobService.markCompleted(jobId, { assetCount: 1 });

  } catch (error) {
    console.error(`Job ${jobId} failed:`, error);
    await jobService.markFailed(jobId, error.message);
  }
}

Database Schema

数据库模式

sql
CREATE TABLE jobs (
    id UUID PRIMARY KEY,
    user_id UUID NOT NULL REFERENCES users(id),
    job_type VARCHAR(50) NOT NULL,
    status VARCHAR(20) NOT NULL DEFAULT 'queued',
    progress INTEGER NOT NULL DEFAULT 0,
    error_message TEXT,
    parameters JSONB,
    result JSONB,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at TIMESTAMPTZ,
    
    CONSTRAINT valid_status CHECK (status IN ('queued', 'processing', 'completed', 'failed', 'partial')),
    CONSTRAINT valid_progress CHECK (progress >= 0 AND progress <= 100)
);

CREATE INDEX idx_jobs_user_status ON jobs(user_id, status);

CREATE TABLE assets (
    id UUID PRIMARY KEY,
    job_id UUID NOT NULL REFERENCES jobs(id) ON DELETE CASCADE,
    user_id UUID NOT NULL REFERENCES users(id),
    asset_type VARCHAR(50) NOT NULL,
    url TEXT NOT NULL,
    storage_path TEXT NOT NULL,
    file_size INTEGER NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_assets_job_id ON assets(job_id);
sql
CREATE TABLE jobs (
    id UUID PRIMARY KEY,
    user_id UUID NOT NULL REFERENCES users(id),
    job_type VARCHAR(50) NOT NULL,
    status VARCHAR(20) NOT NULL DEFAULT 'queued',
    progress INTEGER NOT NULL DEFAULT 0,
    error_message TEXT,
    parameters JSONB,
    result JSONB,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at TIMESTAMPTZ,
    
    CONSTRAINT valid_status CHECK (status IN ('queued', 'processing', 'completed', 'failed', 'partial')),
    CONSTRAINT valid_progress CHECK (progress >= 0 AND progress <= 100)
);

CREATE INDEX idx_jobs_user_status ON jobs(user_id, status);

CREATE TABLE assets (
    id UUID PRIMARY KEY,
    job_id UUID NOT NULL REFERENCES jobs(id) ON DELETE CASCADE,
    user_id UUID NOT NULL REFERENCES users(id),
    asset_type VARCHAR(50) NOT NULL,
    url TEXT NOT NULL,
    storage_path TEXT NOT NULL,
    file_size INTEGER NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_assets_job_id ON assets(job_id);

Best Practices

最佳实践

  1. Validate all transitions - Never skip state machine validation
  2. Always reach terminal - Jobs must complete, fail, or partial
  3. Track progress - Provide feedback for long operations
  4. Link assets - Maintain relationship for cleanup
  5. Log transitions - Essential for debugging
  1. 验证所有状态转换 - 绝不跳过状态机验证
  2. 确保进入终端状态 - 任务必须完成、失败或部分完成
  3. 跟踪进度 - 为长时间操作提供反馈
  4. 关联资产 - 维护关联关系以便清理
  5. 记录状态转换 - 对调试至关重要

Common Mistakes

常见错误

  • Allowing invalid state transitions
  • Jobs stuck in non-terminal states
  • Not tracking progress for long jobs
  • Orphaned assets without job links
  • Missing error messages on failure
  • 允许无效的状态转换
  • 任务卡在非终端状态
  • 未跟踪长时间任务的进度
  • 存在与任务无关联的孤立资产
  • 失败时未记录错误信息

Related Skills

相关技能

  • Background Jobs
  • Dead Letter Queue
  • Graceful Shutdown
  • 后台任务
  • 死信队列
  • 优雅关闭