dust-temporal
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCreating Temporal Workflows
创建Temporal工作流
This skill guides you through creating Temporal workflows for durable background processing.
本技能将指导你创建用于持久化后台处理的Temporal工作流。
Quick Reference
快速参考
Files Structure (per queue)
文件结构(每个队列)
temporal/your_queue/
├── config.ts # Queue name and version
├── helpers.ts # Workflow ID generators
├── activities.ts # Activity implementations (DB, API calls)
├── workflows.ts # Workflow orchestration
├── worker.ts # Worker setup
└── client.ts # Workflow launcher functionstemporal/your_queue/
├── config.ts # 队列名称和版本
├── helpers.ts # Workflow ID生成器
├── activities.ts # Activity实现(数据库、API调用)
├── workflows.ts # Workflow编排逻辑
├── worker.ts # Worker配置
└── client.ts # Workflow启动函数Key Concepts
核心概念
- Workflow: Durable, deterministic function that orchestrates activities
- Activity: Non-deterministic function with side effects (DB, API calls)
- Task Queue: Named queue where workflows/activities execute
- Workflow ID: Unique identifier for idempotency
- Workflow: 用于编排Activities的持久化、确定性函数
- Activity: 带有副作用的非确定性函数(数据库、API调用等)
- Task Queue: Workflow/Activity执行的命名队列
- Workflow ID: 用于保证幂等性的唯一标识符
Step-by-Step Implementation
分步实现
Step 1: Create Queue Configuration
步骤1:创建队列配置
Create :
temporal/your_queue/config.tstypescript
const QUEUE_VERSION = 1;
export const QUEUE_NAME = `your-queue-v${QUEUE_VERSION}`;创建 :
temporal/your_queue/config.tstypescript
const QUEUE_VERSION = 1;
export const QUEUE_NAME = `your-queue-v${QUEUE_VERSION}`;Step 2: Create Workflow ID Helper
步骤2:创建Workflow ID辅助函数
Create :
temporal/your_queue/helpers.tstypescript
export function makeYourWorkflowId({ entityId }: { entityId: string }): string {
return `your-workflow-${entityId}`;
}Important: Workflow IDs must be deterministic (same inputs = same ID) for idempotency.
创建 :
temporal/your_queue/helpers.tstypescript
export function makeYourWorkflowId({ entityId }: { entityId: string }): string {
return `your-workflow-${entityId}`;
}重要提示: 为保证幂等性,Workflow ID必须是确定性的(相同输入对应相同ID)。
Step 3: Create Activities
步骤3:创建Activities
Create :
temporal/your_queue/activities.tstypescript
import { YourResource } from "@app/lib/resources/your_resource";
import logger from "@app/logger/logger";
export async function yourActivity({
entityId,
workspaceId,
}: {
entityId: string;
workspaceId: number;
}): Promise<void> {
const entity = await YourResource.fetchById(entityId);
if (!entity) {
throw new Error(`Entity not found: ${entityId}`);
}
const result = await entity.doSomething();
if (result.isErr()) {
logger.error({ entityId, error: result.error }, "Failed to process entity");
throw new Error(`Failed to process: ${result.error.message}`);
}
}Guidelines: Activities perform side effects, can throw (Temporal retries), should be idempotent.
创建 :
temporal/your_queue/activities.tstypescript
import { YourResource } from "@app/lib/resources/your_resource";
import logger from "@app/logger/logger";
export async function yourActivity({
entityId,
workspaceId,
}: {
entityId: string;
workspaceId: number;
}): Promise<void> {
const entity = await YourResource.fetchById(entityId);
if (!entity) {
throw new Error(`Entity not found: ${entityId}`);
}
const result = await entity.doSomething();
if (result.isErr()) {
logger.error({ entityId, error: result.error }, "Failed to process entity");
throw new Error(`Failed to process: ${result.error.message}`);
}
}规则说明: Activities用于执行有副作用的操作,可以抛出错误(Temporal会自动重试),且应该具备幂等性。
Step 4: Create Workflow
步骤4:创建Workflow
Create :
temporal/your_queue/workflows.tstypescript
import { proxyActivities } from "@temporalio/workflow";
import type * as activities from "@app/temporal/your_queue/activities";
const { yourActivity } = proxyActivities<typeof activities>({
startToCloseTimeout: "5 minutes",
});
export async function yourWorkflow({
entityId,
workspaceId,
}: {
entityId: string;
workspaceId: number;
}): Promise<void> {
await yourActivity({ entityId, workspaceId });
}Guidelines: Workflows are deterministic - no , , etc.
Math.random()Date.now()创建 :
temporal/your_queue/workflows.tstypescript
import { proxyActivities } from "@temporalio/workflow";
import type * as activities from "@app/temporal/your_queue/activities";
const { yourActivity } = proxyActivities<typeof activities>({
startToCloseTimeout: "5 minutes",
});
export async function yourWorkflow({
entityId,
workspaceId,
}: {
entityId: string;
workspaceId: number;
}): Promise<void> {
await yourActivity({ entityId, workspaceId });
}规则说明: Workflows必须是确定性的 - 不允许使用、等非确定性方法。
Math.random()Date.now()Step 5: Create Client Launcher
步骤5:创建客户端启动器
Create :
temporal/your_queue/client.tstypescript
import { WorkflowExecutionAlreadyStartedError } from "@temporalio/client";
import { getTemporalClientForFrontNamespace } from "@app/lib/temporal";
import logger from "@app/logger/logger";
import { QUEUE_NAME } from "@app/temporal/your_queue/config";
import { makeYourWorkflowId } from "@app/temporal/your_queue/helpers";
import { yourWorkflow } from "@app/temporal/your_queue/workflows";
import type { Result } from "@app/types";
import { Err, normalizeError, Ok } from "@app/types";
export async function launchYourWorkflow({
entityId,
workspaceId,
}: {
entityId: string;
workspaceId: number;
}): Promise<Result<undefined, Error>> {
const client = await getTemporalClientForFrontNamespace();
const workflowId = makeYourWorkflowId({ entityId });
try {
await client.workflow.start(yourWorkflow, {
args: [{ entityId, workspaceId }],
taskQueue: QUEUE_NAME,
workflowId,
memo: { entityId, workspaceId },
});
return new Ok(undefined);
} catch (e) {
if (!(e instanceof WorkflowExecutionAlreadyStartedError)) {
logger.error({ workflowId, entityId, workspaceId, error: e }, "Failed starting workflow");
}
return new Err(normalizeError(e));
}
}创建 :
temporal/your_queue/client.tstypescript
import { WorkflowExecutionAlreadyStartedError } from "@temporalio/client";
import { getTemporalClientForFrontNamespace } from "@app/lib/temporal";
import logger from "@app/logger/logger";
import { QUEUE_NAME } from "@app/temporal/your_queue/config";
import { makeYourWorkflowId } from "@app/temporal/your_queue/helpers";
import { yourWorkflow } from "@app/temporal/your_queue/workflows";
import type { Result } from "@app/types";
import { Err, normalizeError, Ok } from "@app/types";
export async function launchYourWorkflow({
entityId,
workspaceId,
}: {
entityId: string;
workspaceId: number;
}): Promise<Result<undefined, Error>> {
const client = await getTemporalClientForFrontNamespace();
const workflowId = makeYourWorkflowId({ entityId });
try {
await client.workflow.start(yourWorkflow, {
args: [{ entityId, workspaceId }],
taskQueue: QUEUE_NAME,
workflowId,
memo: { entityId, workspaceId },
});
return new Ok(undefined);
} catch (e) {
if (!(e instanceof WorkflowExecutionAlreadyStartedError)) {
logger.error({ workflowId, entityId, workspaceId, error: e }, "Failed starting workflow");
}
return new Err(normalizeError(e));
}
}Step 6: Create Worker
步骤6:创建Worker
Create :
temporal/your_queue/worker.tstypescript
import type { Context } from "@temporalio/activity";
import { Worker } from "@temporalio/worker";
import { getTemporalWorkerConnection, TEMPORAL_MAXED_CACHED_WORKFLOWS } from "@app/lib/temporal";
import { ActivityInboundLogInterceptor } from "@app/lib/temporal_monitoring";
import logger from "@app/logger/logger";
import * as activities from "@app/temporal/your_queue/activities";
import { getWorkflowConfig } from "@app/temporal/bundle_helper";
import { QUEUE_NAME } from "./config";
export async function runYourQueueWorker() {
const { connection, namespace } = await getTemporalWorkerConnection();
const worker = await Worker.create({
...getWorkflowConfig({
workerName: "your_queue",
getWorkflowsPath: () => require.resolve("./workflows"),
}),
activities,
taskQueue: QUEUE_NAME,
maxCachedWorkflows: TEMPORAL_MAXED_CACHED_WORKFLOWS,
maxConcurrentActivityTaskExecutions: 16,
connection,
namespace,
interceptors: {
activityInbound: [(ctx: Context) => new ActivityInboundLogInterceptor(ctx, logger)],
},
});
await worker.run();
}创建 :
temporal/your_queue/worker.tstypescript
import type { Context } from "@temporalio/activity";
import { Worker } from "@temporalio/worker";
import { getTemporalWorkerConnection, TEMPORAL_MAXED_CACHED_WORKFLOWS } from "@app/lib/temporal";
import { ActivityInboundLogInterceptor } from "@app/lib/temporal_monitoring";
import logger from "@app/logger/logger";
import * as activities from "@app/temporal/your_queue/activities";
import { getWorkflowConfig } from "@app/temporal/bundle_helper";
import { QUEUE_NAME } from "./config";
export async function runYourQueueWorker() {
const { connection, namespace } = await getTemporalWorkerConnection();
const worker = await Worker.create({
...getWorkflowConfig({
workerName: "your_queue",
getWorkflowsPath: () => require.resolve("./workflows"),
}),
activities,
taskQueue: QUEUE_NAME,
maxCachedWorkflows: TEMPORAL_MAXED_CACHED_WORKFLOWS,
maxConcurrentActivityTaskExecutions: 16,
connection,
namespace,
interceptors: {
activityInbound: [(ctx: Context) => new ActivityInboundLogInterceptor(ctx, logger)],
},
});
await worker.run();
}Step 7: Register Worker (Critical!)
步骤7:注册Worker(关键步骤!)
Edit :
temporal/worker_registry.tstypescript
// 1. Add import
import { runYourQueueWorker } from "@app/temporal/your_queue/worker";
// 2. Add to WorkerName type
export type WorkerName =
| "agent_loop"
// ... existing workers
| "your_queue"; // <- Add this
// 3. Add to workerFunctions mapping
export const workerFunctions: Record<WorkerName, () => Promise<void>> = {
// ... existing workers
your_queue: runYourQueueWorker, // <- Add this
};Without registration, workflows will never execute!
编辑 :
temporal/worker_registry.tstypescript
// 1. 添加导入
import { runYourQueueWorker } from "@app/temporal/your_queue/worker";
// 2. 添加到WorkerName类型定义
export type WorkerName =
| "agent_loop"
// ... 现有Worker
| "your_queue"; // <- 添加该行
// 3. 添加到workerFunctions映射中
export const workerFunctions: Record<WorkerName, () => Promise<void>> = {
// ... 现有Worker
your_queue: runYourQueueWorker, // <- 添加该行
};如果不完成注册,Workflow将永远不会执行!
Timeout & Retry Configuration
超时与重试配置
typescript
const { yourActivity } = proxyActivities<typeof activities>({
startToCloseTimeout: "5 minutes",
retry: {
maximumAttempts: 3,
initialInterval: "1s",
backoffCoefficient: 2,
maximumInterval: "1m",
nonRetryableErrorTypes: ["ValidationError"],
},
});typescript
const { yourActivity } = proxyActivities<typeof activities>({
startToCloseTimeout: "5 minutes",
retry: {
maximumAttempts: 3,
initialInterval: "1s",
backoffCoefficient: 2,
maximumInterval: "1m",
nonRetryableErrorTypes: ["ValidationError"],
},
});Validation Checklist
验证检查清单
- Queue config created with versioned name
- Workflow ID helper is deterministic
- Activities handle errors properly
- Workflow uses with appropriate timeouts
proxyActivities - Client returns and handles
Result<>WorkflowExecutionAlreadyStartedError - Worker registered in
worker_registry.ts - Tested locally
- 已创建带版本号命名的队列配置
- Workflow ID辅助函数是确定性的
- Activities已正确处理错误
- Workflow使用并配置了合理的超时时间
proxyActivities - 客户端返回并处理了
Result<>WorkflowExecutionAlreadyStartedError - Worker已在中注册
worker_registry.ts - 已在本地完成测试
Reference Examples
参考示例
See directory for existing implementations.
temporal/查看目录下的现有实现。
temporal/