dust-temporal

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Creating Temporal Workflows

创建Temporal工作流

This skill guides you through creating Temporal workflows for durable background processing.
本技能将指导你创建用于持久化后台处理的Temporal工作流。

Quick Reference

快速参考

Files Structure (per queue)

文件结构(每个队列)

temporal/your_queue/
├── config.ts          # Queue name and version
├── helpers.ts         # Workflow ID generators
├── activities.ts      # Activity implementations (DB, API calls)
├── workflows.ts       # Workflow orchestration
├── worker.ts          # Worker setup
└── client.ts          # Workflow launcher functions
temporal/your_queue/
├── config.ts          # 队列名称和版本
├── helpers.ts         # Workflow ID生成器
├── activities.ts      # Activity实现(数据库、API调用)
├── workflows.ts       # Workflow编排逻辑
├── worker.ts          # Worker配置
└── client.ts          # Workflow启动函数

Key Concepts

核心概念

  • Workflow: Durable, deterministic function that orchestrates activities
  • Activity: Non-deterministic function with side effects (DB, API calls)
  • Task Queue: Named queue where workflows/activities execute
  • Workflow ID: Unique identifier for idempotency
  • Workflow: 用于编排Activities的持久化、确定性函数
  • Activity: 带有副作用的非确定性函数(数据库、API调用等)
  • Task Queue: Workflow/Activity执行的命名队列
  • Workflow ID: 用于保证幂等性的唯一标识符

Step-by-Step Implementation

分步实现

Step 1: Create Queue Configuration

步骤1:创建队列配置

Create
temporal/your_queue/config.ts
:
typescript
const QUEUE_VERSION = 1;
export const QUEUE_NAME = `your-queue-v${QUEUE_VERSION}`;
创建
temporal/your_queue/config.ts
:
typescript
const QUEUE_VERSION = 1;
export const QUEUE_NAME = `your-queue-v${QUEUE_VERSION}`;

Step 2: Create Workflow ID Helper

步骤2:创建Workflow ID辅助函数

Create
temporal/your_queue/helpers.ts
:
typescript
export function makeYourWorkflowId({ entityId }: { entityId: string }): string {
  return `your-workflow-${entityId}`;
}
Important: Workflow IDs must be deterministic (same inputs = same ID) for idempotency.
创建
temporal/your_queue/helpers.ts
:
typescript
export function makeYourWorkflowId({ entityId }: { entityId: string }): string {
  return `your-workflow-${entityId}`;
}
重要提示: 为保证幂等性,Workflow ID必须是确定性的(相同输入对应相同ID)。

Step 3: Create Activities

步骤3:创建Activities

Create
temporal/your_queue/activities.ts
:
typescript
import { YourResource } from "@app/lib/resources/your_resource";
import logger from "@app/logger/logger";

export async function yourActivity({
  entityId,
  workspaceId,
}: {
  entityId: string;
  workspaceId: number;
}): Promise<void> {
  const entity = await YourResource.fetchById(entityId);
  if (!entity) {
    throw new Error(`Entity not found: ${entityId}`);
  }

  const result = await entity.doSomething();
  if (result.isErr()) {
    logger.error({ entityId, error: result.error }, "Failed to process entity");
    throw new Error(`Failed to process: ${result.error.message}`);
  }
}
Guidelines: Activities perform side effects, can throw (Temporal retries), should be idempotent.
创建
temporal/your_queue/activities.ts
:
typescript
import { YourResource } from "@app/lib/resources/your_resource";
import logger from "@app/logger/logger";

export async function yourActivity({
  entityId,
  workspaceId,
}: {
  entityId: string;
  workspaceId: number;
}): Promise<void> {
  const entity = await YourResource.fetchById(entityId);
  if (!entity) {
    throw new Error(`Entity not found: ${entityId}`);
  }

  const result = await entity.doSomething();
  if (result.isErr()) {
    logger.error({ entityId, error: result.error }, "Failed to process entity");
    throw new Error(`Failed to process: ${result.error.message}`);
  }
}
规则说明: Activities用于执行有副作用的操作,可以抛出错误(Temporal会自动重试),且应该具备幂等性。

Step 4: Create Workflow

步骤4:创建Workflow

Create
temporal/your_queue/workflows.ts
:
typescript
import { proxyActivities } from "@temporalio/workflow";
import type * as activities from "@app/temporal/your_queue/activities";

const { yourActivity } = proxyActivities<typeof activities>({
  startToCloseTimeout: "5 minutes",
});

export async function yourWorkflow({
  entityId,
  workspaceId,
}: {
  entityId: string;
  workspaceId: number;
}): Promise<void> {
  await yourActivity({ entityId, workspaceId });
}
Guidelines: Workflows are deterministic - no
Math.random()
,
Date.now()
, etc.
创建
temporal/your_queue/workflows.ts
:
typescript
import { proxyActivities } from "@temporalio/workflow";
import type * as activities from "@app/temporal/your_queue/activities";

const { yourActivity } = proxyActivities<typeof activities>({
  startToCloseTimeout: "5 minutes",
});

export async function yourWorkflow({
  entityId,
  workspaceId,
}: {
  entityId: string;
  workspaceId: number;
}): Promise<void> {
  await yourActivity({ entityId, workspaceId });
}
规则说明: Workflows必须是确定性的 - 不允许使用
Math.random()
Date.now()
等非确定性方法。

Step 5: Create Client Launcher

步骤5:创建客户端启动器

Create
temporal/your_queue/client.ts
:
typescript
import { WorkflowExecutionAlreadyStartedError } from "@temporalio/client";
import { getTemporalClientForFrontNamespace } from "@app/lib/temporal";
import logger from "@app/logger/logger";
import { QUEUE_NAME } from "@app/temporal/your_queue/config";
import { makeYourWorkflowId } from "@app/temporal/your_queue/helpers";
import { yourWorkflow } from "@app/temporal/your_queue/workflows";
import type { Result } from "@app/types";
import { Err, normalizeError, Ok } from "@app/types";

export async function launchYourWorkflow({
  entityId,
  workspaceId,
}: {
  entityId: string;
  workspaceId: number;
}): Promise<Result<undefined, Error>> {
  const client = await getTemporalClientForFrontNamespace();
  const workflowId = makeYourWorkflowId({ entityId });

  try {
    await client.workflow.start(yourWorkflow, {
      args: [{ entityId, workspaceId }],
      taskQueue: QUEUE_NAME,
      workflowId,
      memo: { entityId, workspaceId },
    });
    return new Ok(undefined);
  } catch (e) {
    if (!(e instanceof WorkflowExecutionAlreadyStartedError)) {
      logger.error({ workflowId, entityId, workspaceId, error: e }, "Failed starting workflow");
    }
    return new Err(normalizeError(e));
  }
}
创建
temporal/your_queue/client.ts
:
typescript
import { WorkflowExecutionAlreadyStartedError } from "@temporalio/client";
import { getTemporalClientForFrontNamespace } from "@app/lib/temporal";
import logger from "@app/logger/logger";
import { QUEUE_NAME } from "@app/temporal/your_queue/config";
import { makeYourWorkflowId } from "@app/temporal/your_queue/helpers";
import { yourWorkflow } from "@app/temporal/your_queue/workflows";
import type { Result } from "@app/types";
import { Err, normalizeError, Ok } from "@app/types";

export async function launchYourWorkflow({
  entityId,
  workspaceId,
}: {
  entityId: string;
  workspaceId: number;
}): Promise<Result<undefined, Error>> {
  const client = await getTemporalClientForFrontNamespace();
  const workflowId = makeYourWorkflowId({ entityId });

  try {
    await client.workflow.start(yourWorkflow, {
      args: [{ entityId, workspaceId }],
      taskQueue: QUEUE_NAME,
      workflowId,
      memo: { entityId, workspaceId },
    });
    return new Ok(undefined);
  } catch (e) {
    if (!(e instanceof WorkflowExecutionAlreadyStartedError)) {
      logger.error({ workflowId, entityId, workspaceId, error: e }, "Failed starting workflow");
    }
    return new Err(normalizeError(e));
  }
}

Step 6: Create Worker

步骤6:创建Worker

Create
temporal/your_queue/worker.ts
:
typescript
import type { Context } from "@temporalio/activity";
import { Worker } from "@temporalio/worker";
import { getTemporalWorkerConnection, TEMPORAL_MAXED_CACHED_WORKFLOWS } from "@app/lib/temporal";
import { ActivityInboundLogInterceptor } from "@app/lib/temporal_monitoring";
import logger from "@app/logger/logger";
import * as activities from "@app/temporal/your_queue/activities";
import { getWorkflowConfig } from "@app/temporal/bundle_helper";
import { QUEUE_NAME } from "./config";

export async function runYourQueueWorker() {
  const { connection, namespace } = await getTemporalWorkerConnection();

  const worker = await Worker.create({
    ...getWorkflowConfig({
      workerName: "your_queue",
      getWorkflowsPath: () => require.resolve("./workflows"),
    }),
    activities,
    taskQueue: QUEUE_NAME,
    maxCachedWorkflows: TEMPORAL_MAXED_CACHED_WORKFLOWS,
    maxConcurrentActivityTaskExecutions: 16,
    connection,
    namespace,
    interceptors: {
      activityInbound: [(ctx: Context) => new ActivityInboundLogInterceptor(ctx, logger)],
    },
  });

  await worker.run();
}
创建
temporal/your_queue/worker.ts
:
typescript
import type { Context } from "@temporalio/activity";
import { Worker } from "@temporalio/worker";
import { getTemporalWorkerConnection, TEMPORAL_MAXED_CACHED_WORKFLOWS } from "@app/lib/temporal";
import { ActivityInboundLogInterceptor } from "@app/lib/temporal_monitoring";
import logger from "@app/logger/logger";
import * as activities from "@app/temporal/your_queue/activities";
import { getWorkflowConfig } from "@app/temporal/bundle_helper";
import { QUEUE_NAME } from "./config";

export async function runYourQueueWorker() {
  const { connection, namespace } = await getTemporalWorkerConnection();

  const worker = await Worker.create({
    ...getWorkflowConfig({
      workerName: "your_queue",
      getWorkflowsPath: () => require.resolve("./workflows"),
    }),
    activities,
    taskQueue: QUEUE_NAME,
    maxCachedWorkflows: TEMPORAL_MAXED_CACHED_WORKFLOWS,
    maxConcurrentActivityTaskExecutions: 16,
    connection,
    namespace,
    interceptors: {
      activityInbound: [(ctx: Context) => new ActivityInboundLogInterceptor(ctx, logger)],
    },
  });

  await worker.run();
}

Step 7: Register Worker (Critical!)

步骤7:注册Worker(关键步骤!)

Edit
temporal/worker_registry.ts
:
typescript
// 1. Add import
import { runYourQueueWorker } from "@app/temporal/your_queue/worker";

// 2. Add to WorkerName type
export type WorkerName =
  | "agent_loop"
  // ... existing workers
  | "your_queue"; // <- Add this

// 3. Add to workerFunctions mapping
export const workerFunctions: Record<WorkerName, () => Promise<void>> = {
  // ... existing workers
  your_queue: runYourQueueWorker, // <- Add this
};
Without registration, workflows will never execute!
编辑
temporal/worker_registry.ts
:
typescript
// 1. 添加导入
import { runYourQueueWorker } from "@app/temporal/your_queue/worker";

// 2. 添加到WorkerName类型定义
export type WorkerName =
  | "agent_loop"
  // ... 现有Worker
  | "your_queue"; // <- 添加该行

// 3. 添加到workerFunctions映射中
export const workerFunctions: Record<WorkerName, () => Promise<void>> = {
  // ... 现有Worker
  your_queue: runYourQueueWorker, // <- 添加该行
};
如果不完成注册,Workflow将永远不会执行!

Timeout & Retry Configuration

超时与重试配置

typescript
const { yourActivity } = proxyActivities<typeof activities>({
  startToCloseTimeout: "5 minutes",
  retry: {
    maximumAttempts: 3,
    initialInterval: "1s",
    backoffCoefficient: 2,
    maximumInterval: "1m",
    nonRetryableErrorTypes: ["ValidationError"],
  },
});
typescript
const { yourActivity } = proxyActivities<typeof activities>({
  startToCloseTimeout: "5 minutes",
  retry: {
    maximumAttempts: 3,
    initialInterval: "1s",
    backoffCoefficient: 2,
    maximumInterval: "1m",
    nonRetryableErrorTypes: ["ValidationError"],
  },
});

Validation Checklist

验证检查清单

  • Queue config created with versioned name
  • Workflow ID helper is deterministic
  • Activities handle errors properly
  • Workflow uses
    proxyActivities
    with appropriate timeouts
  • Client returns
    Result<>
    and handles
    WorkflowExecutionAlreadyStartedError
  • Worker registered in
    worker_registry.ts
  • Tested locally
  • 已创建带版本号命名的队列配置
  • Workflow ID辅助函数是确定性的
  • Activities已正确处理错误
  • Workflow使用
    proxyActivities
    并配置了合理的超时时间
  • 客户端返回
    Result<>
    并处理了
    WorkflowExecutionAlreadyStartedError
  • Worker已在
    worker_registry.ts
    中注册
  • 已在本地完成测试

Reference Examples

参考示例

See
temporal/
directory for existing implementations.
查看
temporal/
目录下的现有实现。