job-state-machine
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseJob State Machine
任务状态机
Validated state transitions for async jobs.
为异步任务提供经过验证的状态转换机制。
When to Use This Skill
何时使用该技能
- Processing async jobs (image generation, exports, etc.)
- Need progress tracking for long operations
- Want to link created assets to jobs
- Need reliable state management
- 处理异步任务(如图像生成、导出等)
- 需要对长时间操作进行进度跟踪
- 希望将生成的资产与任务关联
- 需要可靠的状态管理
Core Concepts
核心概念
- Defined states - QUEUED → PROCESSING → COMPLETED/FAILED/PARTIAL
- Valid transitions - Only allowed state changes
- Terminal states - Jobs must reach an end state
- Asset linking - Track outputs created by jobs
- 定义状态 - QUEUED → PROCESSING → COMPLETED/FAILED/PARTIAL
- 合法转换 - 仅允许指定的状态变更
- 终端状态 - 任务必须进入结束状态
- 资产关联 - 跟踪任务生成的输出内容
State Machine
状态机
QUEUED
│
▼
PROCESSING
/ | \
▼ ▼ ▼
COMPLETED PARTIAL FAILED
(terminal states) QUEUED
│
▼
PROCESSING
/ | \
▼ ▼ ▼
COMPLETED PARTIAL FAILED
(terminal states)TypeScript Implementation
TypeScript 实现
Types
类型定义
typescript
// types.ts
export enum JobStatus {
QUEUED = 'queued',
PROCESSING = 'processing',
COMPLETED = 'completed',
FAILED = 'failed',
PARTIAL = 'partial',
}
export const VALID_TRANSITIONS: Record<JobStatus, JobStatus[]> = {
[JobStatus.QUEUED]: [JobStatus.PROCESSING],
[JobStatus.PROCESSING]: [JobStatus.COMPLETED, JobStatus.PARTIAL, JobStatus.FAILED],
[JobStatus.COMPLETED]: [],
[JobStatus.FAILED]: [],
[JobStatus.PARTIAL]: [],
};
export function isTerminalState(status: JobStatus): boolean {
return VALID_TRANSITIONS[status].length === 0;
}
export interface Job {
id: string;
userId: string;
jobType: string;
status: JobStatus;
progress: number;
errorMessage?: string;
parameters?: Record<string, unknown>;
result?: Record<string, unknown>;
createdAt: Date;
updatedAt: Date;
completedAt?: Date;
}
export interface Asset {
id: string;
jobId: string;
userId: string;
assetType: string;
url: string;
storagePath: string;
fileSize: number;
createdAt: Date;
}typescript
// types.ts
export enum JobStatus {
QUEUED = 'queued',
PROCESSING = 'processing',
COMPLETED = 'completed',
FAILED = 'failed',
PARTIAL = 'partial',
}
export const VALID_TRANSITIONS: Record<JobStatus, JobStatus[]> = {
[JobStatus.QUEUED]: [JobStatus.PROCESSING],
[JobStatus.PROCESSING]: [JobStatus.COMPLETED, JobStatus.PARTIAL, JobStatus.FAILED],
[JobStatus.COMPLETED]: [],
[JobStatus.FAILED]: [],
[JobStatus.PARTIAL]: [],
};
export function isTerminalState(status: JobStatus): boolean {
return VALID_TRANSITIONS[status].length === 0;
}
export interface Job {
id: string;
userId: string;
jobType: string;
status: JobStatus;
progress: number;
errorMessage?: string;
parameters?: Record<string, unknown>;
result?: Record<string, unknown>;
createdAt: Date;
updatedAt: Date;
completedAt?: Date;
}
export interface Asset {
id: string;
jobId: string;
userId: string;
assetType: string;
url: string;
storagePath: string;
fileSize: number;
createdAt: Date;
}Job Service
任务服务
typescript
// job-service.ts
import { Job, JobStatus, Asset, VALID_TRANSITIONS, isTerminalState } from './types';
export class InvalidStateTransitionError extends Error {
constructor(public currentStatus: JobStatus, public targetStatus: JobStatus) {
super(`Cannot transition from '${currentStatus}' to '${targetStatus}'`);
this.name = 'InvalidStateTransitionError';
}
}
export class JobService {
constructor(private db: Database) {}
async createJob(
userId: string,
jobType: string,
parameters?: Record<string, unknown>
): Promise<Job> {
const job: Job = {
id: crypto.randomUUID(),
userId,
jobType,
status: JobStatus.QUEUED,
progress: 0,
parameters,
createdAt: new Date(),
updatedAt: new Date(),
};
await this.db.jobs.insert(job);
return job;
}
async getJob(jobId: string, userId: string): Promise<Job> {
const job = await this.db.jobs.findOne({ id: jobId });
if (!job) throw new Error('Job not found');
if (job.userId !== userId) throw new Error('Unauthorized');
return job;
}
async transitionStatus(
jobId: string,
targetStatus: JobStatus,
options: {
progress?: number;
errorMessage?: string;
result?: Record<string, unknown>;
} = {}
): Promise<Job> {
const job = await this.db.jobs.findOne({ id: jobId });
if (!job) throw new Error('Job not found');
// Allow same-state for progress updates
if (job.status !== targetStatus) {
if (!VALID_TRANSITIONS[job.status].includes(targetStatus)) {
throw new InvalidStateTransitionError(job.status, targetStatus);
}
}
const updates: Partial<Job> = {
status: targetStatus,
progress: options.progress ?? job.progress,
updatedAt: new Date(),
};
if (options.errorMessage !== undefined) {
updates.errorMessage = options.errorMessage;
}
if (options.result !== undefined) {
updates.result = options.result;
}
if (isTerminalState(targetStatus)) {
updates.completedAt = new Date();
}
await this.db.jobs.update({ id: jobId }, updates);
return { ...job, ...updates };
}
async updateProgress(jobId: string, progress: number): Promise<Job> {
const job = await this.db.jobs.findOne({ id: jobId });
if (!job) throw new Error('Job not found');
if (job.status !== JobStatus.PROCESSING) {
console.warn(`Ignoring progress update for job ${jobId} in ${job.status} state`);
return job;
}
return this.transitionStatus(jobId, JobStatus.PROCESSING, { progress });
}
async markCompleted(jobId: string, result?: Record<string, unknown>): Promise<Job> {
return this.transitionStatus(jobId, JobStatus.COMPLETED, { progress: 100, result });
}
async markFailed(jobId: string, errorMessage: string): Promise<Job> {
return this.transitionStatus(jobId, JobStatus.FAILED, { errorMessage });
}
async markPartial(
jobId: string,
result?: Record<string, unknown>,
errorMessage?: string
): Promise<Job> {
return this.transitionStatus(jobId, JobStatus.PARTIAL, {
progress: 100,
result,
errorMessage,
});
}
async createAsset(
jobId: string,
userId: string,
assetType: string,
url: string,
storagePath: string,
fileSize: number
): Promise<Asset> {
const asset: Asset = {
id: crypto.randomUUID(),
jobId,
userId,
assetType,
url,
storagePath,
fileSize,
createdAt: new Date(),
};
await this.db.assets.insert(asset);
return asset;
}
async getJobAssets(jobId: string, userId: string): Promise<Asset[]> {
await this.getJob(jobId, userId); // Verify ownership
return this.db.assets.find({ jobId });
}
}typescript
// job-service.ts
import { Job, JobStatus, Asset, VALID_TRANSITIONS, isTerminalState } from './types';
export class InvalidStateTransitionError extends Error {
constructor(public currentStatus: JobStatus, public targetStatus: JobStatus) {
super(`Cannot transition from '${currentStatus}' to '${targetStatus}'`);
this.name = 'InvalidStateTransitionError';
}
}
export class JobService {
constructor(private db: Database) {}
async createJob(
userId: string,
jobType: string,
parameters?: Record<string, unknown>
): Promise<Job> {
const job: Job = {
id: crypto.randomUUID(),
userId,
jobType,
status: JobStatus.QUEUED,
progress: 0,
parameters,
createdAt: new Date(),
updatedAt: new Date(),
};
await this.db.jobs.insert(job);
return job;
}
async getJob(jobId: string, userId: string): Promise<Job> {
const job = await this.db.jobs.findOne({ id: jobId });
if (!job) throw new Error('Job not found');
if (job.userId !== userId) throw new Error('Unauthorized');
return job;
}
async transitionStatus(
jobId: string,
targetStatus: JobStatus,
options: {
progress?: number;
errorMessage?: string;
result?: Record<string, unknown>;
} = {}
): Promise<Job> {
const job = await this.db.jobs.findOne({ id: jobId });
if (!job) throw new Error('Job not found');
// Allow same-state for progress updates
if (job.status !== targetStatus) {
if (!VALID_TRANSITIONS[job.status].includes(targetStatus)) {
throw new InvalidStateTransitionError(job.status, targetStatus);
}
}
const updates: Partial<Job> = {
status: targetStatus,
progress: options.progress ?? job.progress,
updatedAt: new Date(),
};
if (options.errorMessage !== undefined) {
updates.errorMessage = options.errorMessage;
}
if (options.result !== undefined) {
updates.result = options.result;
}
if (isTerminalState(targetStatus)) {
updates.completedAt = new Date();
}
await this.db.jobs.update({ id: jobId }, updates);
return { ...job, ...updates };
}
async updateProgress(jobId: string, progress: number): Promise<Job> {
const job = await this.db.jobs.findOne({ id: jobId });
if (!job) throw new Error('Job not found');
if (job.status !== JobStatus.PROCESSING) {
console.warn(`Ignoring progress update for job ${jobId} in ${job.status} state`);
return job;
}
return this.transitionStatus(jobId, JobStatus.PROCESSING, { progress });
}
async markCompleted(jobId: string, result?: Record<string, unknown>): Promise<Job> {
return this.transitionStatus(jobId, JobStatus.COMPLETED, { progress: 100, result });
}
async markFailed(jobId: string, errorMessage: string): Promise<Job> {
return this.transitionStatus(jobId, JobStatus.FAILED, { errorMessage });
}
async markPartial(
jobId: string,
result?: Record<string, unknown>,
errorMessage?: string
): Promise<Job> {
return this.transitionStatus(jobId, JobStatus.PARTIAL, {
progress: 100,
result,
errorMessage,
});
}
async createAsset(
jobId: string,
userId: string,
assetType: string,
url: string,
storagePath: string,
fileSize: number
): Promise<Asset> {
const asset: Asset = {
id: crypto.randomUUID(),
jobId,
userId,
assetType,
url,
storagePath,
fileSize,
createdAt: new Date(),
};
await this.db.assets.insert(asset);
return asset;
}
async getJobAssets(jobId: string, userId: string): Promise<Asset[]> {
await this.getJob(jobId, userId); // Verify ownership
return this.db.assets.find({ jobId });
}
}Python Implementation
Python 实现
python
undefinedpython
undefinedjob_service.py
job_service.py
from enum import Enum
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Dict, Any, List
from uuid import uuid4
class JobStatus(str, Enum):
QUEUED = "queued"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
PARTIAL = "partial"
VALID_TRANSITIONS = {
JobStatus.QUEUED: [JobStatus.PROCESSING],
JobStatus.PROCESSING: [JobStatus.COMPLETED, JobStatus.PARTIAL, JobStatus.FAILED],
JobStatus.COMPLETED: [],
JobStatus.FAILED: [],
JobStatus.PARTIAL: [],
}
def is_terminal_state(status: JobStatus) -> bool:
return len(VALID_TRANSITIONS.get(status, [])) == 0
@dataclass
class Job:
id: str
user_id: str
job_type: str
status: JobStatus
progress: int
created_at: datetime
updated_at: datetime
error_message: Optional[str] = None
parameters: Optional[Dict[str, Any]] = None
result: Optional[Dict[str, Any]] = None
completed_at: Optional[datetime] = None
class InvalidStateTransitionError(Exception):
def init(self, current: JobStatus, target: JobStatus):
self.current = current
self.target = target
super().init(f"Cannot transition from '{current}' to '{target}'")
class JobService:
def init(self, db):
self.db = db
async def create_job(
self,
user_id: str,
job_type: str,
parameters: Optional[Dict[str, Any]] = None,
) -> Job:
now = datetime.utcnow()
job = Job(
id=str(uuid4()),
user_id=user_id,
job_type=job_type,
status=JobStatus.QUEUED,
progress=0,
parameters=parameters,
created_at=now,
updated_at=now,
)
await self.db.jobs.insert(job)
return job
async def transition_status(
self,
job_id: str,
target_status: JobStatus,
progress: Optional[int] = None,
error_message: Optional[str] = None,
result: Optional[Dict[str, Any]] = None,
) -> Job:
job = await self.db.jobs.find_one(id=job_id)
if not job:
raise ValueError("Job not found")
if job.status != target_status:
if target_status not in VALID_TRANSITIONS.get(job.status, []):
raise InvalidStateTransitionError(job.status, target_status)
job.status = target_status
job.updated_at = datetime.utcnow()
if progress is not None:
job.progress = progress
if error_message is not None:
job.error_message = error_message
if result is not None:
job.result = result
if is_terminal_state(target_status):
job.completed_at = datetime.utcnow()
await self.db.jobs.update(job)
return job
async def mark_completed(self, job_id: str, result: Optional[Dict] = None) -> Job:
return await self.transition_status(
job_id, JobStatus.COMPLETED, progress=100, result=result
)
async def mark_failed(self, job_id: str, error_message: str) -> Job:
return await self.transition_status(
job_id, JobStatus.FAILED, error_message=error_message
)undefinedfrom enum import Enum
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Dict, Any, List
from uuid import uuid4
class JobStatus(str, Enum):
QUEUED = "queued"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
PARTIAL = "partial"
VALID_TRANSITIONS = {
JobStatus.QUEUED: [JobStatus.PROCESSING],
JobStatus.PROCESSING: [JobStatus.COMPLETED, JobStatus.PARTIAL, JobStatus.FAILED],
JobStatus.COMPLETED: [],
JobStatus.FAILED: [],
JobStatus.PARTIAL: [],
}
def is_terminal_state(status: JobStatus) -> bool:
return len(VALID_TRANSITIONS.get(status, [])) == 0
@dataclass
class Job:
id: str
user_id: str
job_type: str
status: JobStatus
progress: int
created_at: datetime
updated_at: datetime
error_message: Optional[str] = None
parameters: Optional[Dict[str, Any]] = None
result: Optional[Dict[str, Any]] = None
completed_at: Optional[datetime] = None
class InvalidStateTransitionError(Exception):
def init(self, current: JobStatus, target: JobStatus):
self.current = current
self.target = target
super().init(f"Cannot transition from '{current}' to '{target}'")
class JobService:
def init(self, db):
self.db = db
async def create_job(
self,
user_id: str,
job_type: str,
parameters: Optional[Dict[str, Any]] = None,
) -> Job:
now = datetime.utcnow()
job = Job(
id=str(uuid4()),
user_id=user_id,
job_type=job_type,
status=JobStatus.QUEUED,
progress=0,
parameters=parameters,
created_at=now,
updated_at=now,
)
await self.db.jobs.insert(job)
return job
async def transition_status(
self,
job_id: str,
target_status: JobStatus,
progress: Optional[int] = None,
error_message: Optional[str] = None,
result: Optional[Dict[str, Any]] = None,
) -> Job:
job = await self.db.jobs.find_one(id=job_id)
if not job:
raise ValueError("Job not found")
if job.status != target_status:
if target_status not in VALID_TRANSITIONS.get(job.status, []):
raise InvalidStateTransitionError(job.status, target_status)
job.status = target_status
job.updated_at = datetime.utcnow()
if progress is not None:
job.progress = progress
if error_message is not None:
job.error_message = error_message
if result is not None:
job.result = result
if is_terminal_state(target_status):
job.completed_at = datetime.utcnow()
await self.db.jobs.update(job)
return job
async def mark_completed(self, job_id: str, result: Optional[Dict] = None) -> Job:
return await self.transition_status(
job_id, JobStatus.COMPLETED, progress=100, result=result
)
async def mark_failed(self, job_id: str, error_message: str) -> Job:
return await self.transition_status(
job_id, JobStatus.FAILED, error_message=error_message
)undefinedWorker Integration
Worker 集成
typescript
async function processJob(jobId: string) {
const jobService = getJobService();
try {
// Transition to PROCESSING
await jobService.transitionStatus(jobId, JobStatus.PROCESSING, { progress: 0 });
const job = await jobService.getJobInternal(jobId);
// Process with progress updates
await jobService.updateProgress(jobId, 25);
const result1 = await doStep1(job.parameters);
await jobService.updateProgress(jobId, 50);
const result2 = await doStep2(result1);
await jobService.updateProgress(jobId, 75);
const assetUrl = await uploadResult(result2);
// Create asset
await jobService.createAsset(
jobId,
job.userId,
job.jobType,
assetUrl,
`${job.userId}/${jobId}/result.png`,
result2.length
);
// Mark completed
await jobService.markCompleted(jobId, { assetCount: 1 });
} catch (error) {
console.error(`Job ${jobId} failed:`, error);
await jobService.markFailed(jobId, error.message);
}
}typescript
async function processJob(jobId: string) {
const jobService = getJobService();
try {
// Transition to PROCESSING
await jobService.transitionStatus(jobId, JobStatus.PROCESSING, { progress: 0 });
const job = await jobService.getJobInternal(jobId);
// Process with progress updates
await jobService.updateProgress(jobId, 25);
const result1 = await doStep1(job.parameters);
await jobService.updateProgress(jobId, 50);
const result2 = await doStep2(result1);
await jobService.updateProgress(jobId, 75);
const assetUrl = await uploadResult(result2);
// Create asset
await jobService.createAsset(
jobId,
job.userId,
job.jobType,
assetUrl,
`${job.userId}/${jobId}/result.png`,
result2.length
);
// Mark completed
await jobService.markCompleted(jobId, { assetCount: 1 });
} catch (error) {
console.error(`Job ${jobId} failed:`, error);
await jobService.markFailed(jobId, error.message);
}
}Database Schema
数据库模式
sql
CREATE TABLE jobs (
id UUID PRIMARY KEY,
user_id UUID NOT NULL REFERENCES users(id),
job_type VARCHAR(50) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'queued',
progress INTEGER NOT NULL DEFAULT 0,
error_message TEXT,
parameters JSONB,
result JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
CONSTRAINT valid_status CHECK (status IN ('queued', 'processing', 'completed', 'failed', 'partial')),
CONSTRAINT valid_progress CHECK (progress >= 0 AND progress <= 100)
);
CREATE INDEX idx_jobs_user_status ON jobs(user_id, status);
CREATE TABLE assets (
id UUID PRIMARY KEY,
job_id UUID NOT NULL REFERENCES jobs(id) ON DELETE CASCADE,
user_id UUID NOT NULL REFERENCES users(id),
asset_type VARCHAR(50) NOT NULL,
url TEXT NOT NULL,
storage_path TEXT NOT NULL,
file_size INTEGER NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_assets_job_id ON assets(job_id);sql
CREATE TABLE jobs (
id UUID PRIMARY KEY,
user_id UUID NOT NULL REFERENCES users(id),
job_type VARCHAR(50) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'queued',
progress INTEGER NOT NULL DEFAULT 0,
error_message TEXT,
parameters JSONB,
result JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
CONSTRAINT valid_status CHECK (status IN ('queued', 'processing', 'completed', 'failed', 'partial')),
CONSTRAINT valid_progress CHECK (progress >= 0 AND progress <= 100)
);
CREATE INDEX idx_jobs_user_status ON jobs(user_id, status);
CREATE TABLE assets (
id UUID PRIMARY KEY,
job_id UUID NOT NULL REFERENCES jobs(id) ON DELETE CASCADE,
user_id UUID NOT NULL REFERENCES users(id),
asset_type VARCHAR(50) NOT NULL,
url TEXT NOT NULL,
storage_path TEXT NOT NULL,
file_size INTEGER NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_assets_job_id ON assets(job_id);Best Practices
最佳实践
- Validate all transitions - Never skip state machine validation
- Always reach terminal - Jobs must complete, fail, or partial
- Track progress - Provide feedback for long operations
- Link assets - Maintain relationship for cleanup
- Log transitions - Essential for debugging
- 验证所有状态转换 - 绝不跳过状态机验证
- 确保进入终端状态 - 任务必须完成、失败或部分完成
- 跟踪进度 - 为长时间操作提供反馈
- 关联资产 - 维护关联关系以便清理
- 记录状态转换 - 对调试至关重要
Common Mistakes
常见错误
- Allowing invalid state transitions
- Jobs stuck in non-terminal states
- Not tracking progress for long jobs
- Orphaned assets without job links
- Missing error messages on failure
- 允许无效的状态转换
- 任务卡在非终端状态
- 未跟踪长时间任务的进度
- 存在与任务无关联的孤立资产
- 失败时未记录错误信息
Related Skills
相关技能
- Background Jobs
- Dead Letter Queue
- Graceful Shutdown
- 后台任务
- 死信队列
- 优雅关闭