workflow

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

CRITICAL: Always Use Correct
workflow
Documentation

重要提示:始终使用正确的
workflow
文档

Your knowledge of
workflow
is outdated.
The
workflow
documentation outlined below matches the installed version of the Workflow DevKit. Follow these instructions before starting on any
workflow
-related tasks:
Search the bundled documentation in
node_modules/workflow/docs/
:
  1. Find docs:
    glob "node_modules/workflow/docs/**/*.mdx"
  2. Search content:
    grep "your query" node_modules/workflow/docs/
Documentation structure in
node_modules/workflow/docs/
:
  • getting-started/
    - Framework setup (next.mdx, express.mdx, hono.mdx, etc.)
  • foundations/
    - Core concepts (workflows-and-steps.mdx, hooks.mdx, streaming.mdx, etc.)
  • api-reference/workflow/
    - API docs (sleep.mdx, create-hook.mdx, fatal-error.mdx, etc.)
  • api-reference/workflow-api/
    - Client API (start.mdx, get-run.mdx, resume-hook.mdx, etc.)
  • ai/
    - AI SDK integration docs
  • errors/
    - Error code documentation
Related packages also include bundled docs:
  • @workflow/ai
    :
    node_modules/@workflow/ai/docs/
    - DurableAgent and AI integration
  • @workflow/core
    :
    node_modules/@workflow/core/docs/
    - Core runtime (foundations, how-it-works)
  • @workflow/next
    :
    node_modules/@workflow/next/docs/
    - Next.js integration
When in doubt, update to the latest version of the Workflow DevKit.
你所掌握的
workflow
相关知识已经过时。
下文列出的
workflow
文档与你安装的Workflow DevKit版本匹配。在开始任何与
workflow
相关的任务前,请遵循以下指引:
请在
node_modules/workflow/docs/
中搜索内置文档:
  1. 查找文档
    glob "node_modules/workflow/docs/**/*.mdx"
  2. 搜索内容
    grep "your query" node_modules/workflow/docs/
node_modules/workflow/docs/
下的文档结构:
  • getting-started/
    - 框架配置(next.mdx、express.mdx、hono.mdx等)
  • foundations/
    - 核心概念(workflows-and-steps.mdx、hooks.mdx、streaming.mdx等)
  • api-reference/workflow/
    - API文档(sleep.mdx、create-hook.mdx、fatal-error.mdx等)
  • api-reference/workflow-api/
    - 客户端API(start.mdx、get-run.mdx、resume-hook.mdx等)
  • ai/
    - AI SDK集成文档
  • errors/
    - 错误码文档
相关依赖包也包含内置文档:
  • @workflow/ai
    node_modules/@workflow/ai/docs/
    - DurableAgent与AI集成相关内容
  • @workflow/core
    node_modules/@workflow/core/docs/
    - 核心运行时(基础概念、运行原理解析)
  • @workflow/next
    node_modules/@workflow/next/docs/
    - Next.js集成相关内容
如果存在疑问,请先升级到最新版本的Workflow DevKit。

Official Resources

官方资源

Quick Reference

快速参考

Directives:
typescript
"use workflow";  // First line - makes async function durable
"use step";      // First line - makes function a cached, retryable unit
Essential imports:
typescript
// Workflow primitives
import { sleep, fetch, createHook, createWebhook, getWritable } from "workflow";
import { FatalError, RetryableError } from "workflow";
import { getWorkflowMetadata, getStepMetadata } from "workflow";

// API operations
import { start, getRun, resumeHook, resumeWebhook } from "workflow/api";

// Framework integrations
import { withWorkflow } from "workflow/next";
import { workflow } from "workflow/vite";
import { workflow } from "workflow/astro";
// Or use modules: ["workflow/nitro"] for Nitro/Nuxt

// AI agent
import { DurableAgent } from "@workflow/ai/agent";
指令:
typescript
"use workflow";  // 代码第一行声明,让异步函数具备持久化能力
"use step";      // 代码第一行声明,让函数成为可缓存、可重试的执行单元
核心导入语句:
typescript
// Workflow基础原语
import { sleep, fetch, createHook, createWebhook, getWritable } from "workflow";
import { FatalError, RetryableError } from "workflow";
import { getWorkflowMetadata, getStepMetadata } from "workflow";

// API操作
import { start, getRun, resumeHook, resumeWebhook } from "workflow/api";

// 框架集成
import { withWorkflow } from "workflow/next";
import { workflow } from "workflow/vite";
import { workflow } from "workflow/astro";
// 针对Nitro/Nuxt可使用模块:["workflow/nitro"]

// AI agent
import { DurableAgent } from "@workflow/ai/agent";

Prefer Step Functions to Avoid Sandbox Errors

优先使用Step函数避免沙箱错误

"use workflow"
functions run in a sandboxed VM.
"use step"
functions have full Node.js access. Put your logic in steps and use the workflow function purely for orchestration.
typescript
// Steps have full Node.js and npm access
async function fetchUserData(userId: string) {
  "use step";
  const response = await fetch(`https://api.example.com/users/${userId}`);
  return response.json();
}

async function processWithAI(data: any) {
  "use step";
  // AI SDK works in steps without workarounds
  return await generateText({
    model: openai("gpt-4"),
    prompt: `Process: ${JSON.stringify(data)}`,
  });
}

// Workflow orchestrates steps - no sandbox issues
export async function dataProcessingWorkflow(userId: string) {
  "use workflow";
  const data = await fetchUserData(userId);
  const processed = await processWithAI(data);
  return { success: true, processed };
}
Benefits: Steps have automatic retry, results are persisted for replay, and no sandbox restrictions.
声明
"use workflow"
的函数运行在沙箱VM中,声明
"use step"
的函数拥有完整的Node.js访问权限。请将业务逻辑放在step中,workflow函数仅负责编排逻辑。
typescript
// Step函数拥有完整的Node.js和npm包访问权限
async function fetchUserData(userId: string) {
  "use step";
  const response = await fetch(`https://api.example.com/users/${userId}`);
  return response.json();
}

async function processWithAI(data: any) {
  "use step";
  // AI SDK可以直接在step中运行,无需额外适配
  return await generateText({
    model: openai("gpt-4"),
    prompt: `Process: ${JSON.stringify(data)}`,
  });
}

// Workflow仅负责编排step,不会出现沙箱问题
export async function dataProcessingWorkflow(userId: string) {
  "use workflow";
  const data = await fetchUserData(userId);
  const processed = await processWithAI(data);
  return { success: true, processed };
}
优势: Step会自动重试,执行结果会持久化支持重放,且没有沙箱权限限制。

Workflow Sandbox Limitations

Workflow沙箱限制

When you need logic directly in a workflow function (not in a step), these restrictions apply:
LimitationWorkaround
No
fetch()
import { fetch } from "workflow"
then
globalThis.fetch = fetch
No
setTimeout
/
setInterval
Use
sleep("5s")
from
"workflow"
No Node.js modules (fs, crypto, etc.)Move to a step function
Example - Using fetch in workflow context:
typescript
import { fetch } from "workflow";

export async function myWorkflow() {
  "use workflow";
  globalThis.fetch = fetch;  // Required for AI SDK and HTTP libraries
  // Now generateText() and other libraries work
}
Note:
DurableAgent
from
@workflow/ai
handles the fetch assignment automatically.
如果你需要直接在workflow函数(而非step)中编写逻辑,需要遵循以下限制:
限制解决方案
不支持
fetch()
"workflow"
导入
fetch
,然后执行
globalThis.fetch = fetch
不支持
setTimeout
/
setInterval
使用
"workflow"
提供的
sleep("5s")
不支持Node.js内置模块(fs、crypto等)将逻辑迁移到step函数中
示例 - 在workflow上下文中使用fetch:
typescript
import { fetch } from "workflow";

export async function myWorkflow() {
  "use workflow";
  globalThis.fetch = fetch;  // AI SDK和其他HTTP库需要该配置才能正常运行
  // 现在generateText()和其他库可以正常工作了
}
注意:
@workflow/ai
提供的
DurableAgent
会自动处理fetch的挂载配置,无需手动操作。

DurableAgent — AI Agents in Workflows

DurableAgent — 工作流中的AI Agents

Use
DurableAgent
to build AI agents that maintain state and survive interruptions. It handles the workflow sandbox automatically (no manual
globalThis.fetch
needed).
typescript
import { DurableAgent } from "@workflow/ai/agent";
import { getWritable } from "workflow";
import { z } from "zod";
import type { UIMessageChunk } from "ai";

async function lookupData({ query }: { query: string }) {
  "use step";
  // Step functions have full Node.js access
  return `Results for "${query}"`;
}

export async function myAgentWorkflow(userMessage: string) {
  "use workflow";

  const agent = new DurableAgent({
    model: "anthropic/claude-sonnet-4-5",
    system: "You are a helpful assistant.",
    tools: {
      lookupData: {
        description: "Search for information",
        inputSchema: z.object({ query: z.string() }),
        execute: lookupData,
      },
    },
  });

  const result = await agent.stream({
    messages: [{ role: "user", content: userMessage }],
    writable: getWritable<UIMessageChunk>(),
    maxSteps: 10,
  });

  return result.messages;
}
Key points:
  • getWritable<UIMessageChunk>()
    streams output to the workflow run's default stream
  • Tool
    execute
    functions that need Node.js/npm access should use
    "use step"
  • Tool
    execute
    functions that use workflow primitives (
    sleep()
    ,
    createHook()
    ) should NOT use
    "use step"
    — they run at the workflow level
  • maxSteps
    limits the number of LLM calls (default is unlimited)
  • Multi-turn: pass
    result.messages
    plus new user messages to subsequent
    agent.stream()
    calls
For more details on
DurableAgent
, check the AI docs in
node_modules/@workflow/ai/docs/
.
使用
DurableAgent
可以构建具备状态持久化、支持中断恢复的AI Agent。它会自动适配workflow沙箱(无需手动配置
globalThis.fetch
)。
typescript
import { DurableAgent } from "@workflow/ai/agent";
import { getWritable } from "workflow";
import { z } from "zod";
import type { UIMessageChunk } from "ai";

async function lookupData({ query }: { query: string }) {
  "use step";
  // Step函数拥有完整的Node.js访问权限
  return `Results for "${query}"`;
}

export async function myAgentWorkflow(userMessage: string) {
  "use workflow";

  const agent = new DurableAgent({
    model: "anthropic/claude-sonnet-4-5",
    system: "You are a helpful assistant.",
    tools: {
      lookupData: {
        description: "Search for information",
        inputSchema: z.object({ query: z.string() }),
        execute: lookupData,
      },
    },
  });

  const result = await agent.stream({
    messages: [{ role: "user", content: userMessage }],
    writable: getWritable<UIMessageChunk>(),
    maxSteps: 10,
  });

  return result.messages;
}
核心要点:
  • getWritable<UIMessageChunk>()
    会将输出流写入workflow运行实例的默认流中
  • 需要Node.js/npm访问权限的工具
    execute
    函数应该声明
    "use step"
  • 使用workflow原语(
    sleep()
    createHook()
    )的工具
    execute
    函数不要声明
    "use step"
    ,它们会在workflow层级运行
  • maxSteps
    限制LLM调用的次数(默认无限制)
  • 多轮对话场景:可将
    result.messages
    加上新的用户消息传入后续的
    agent.stream()
    调用
如需了解更多
DurableAgent
相关内容,请查看
node_modules/@workflow/ai/docs/
下的AI文档。

Starting Workflows & Child Workflows

启动工作流与子工作流

Use
start()
to launch workflows from API routes.
start()
cannot be called directly in workflow context
— wrap it in a step function.
typescript
import { start } from "workflow/api";

// From an API route — works directly
export async function POST() {
  const run = await start(myWorkflow, [arg1, arg2]);
  return Response.json({ runId: run.runId });
}

// No-args workflow
const run = await start(noArgWorkflow);
Starting child workflows from inside a workflow — must use a step:
typescript
import { start } from "workflow/api";

// Wrap start() in a step function
async function triggerChild(data: string) {
  "use step";
  const run = await start(childWorkflow, [data]);
  return run.runId;
}

export async function parentWorkflow() {
  "use workflow";
  const childRunId = await triggerChild("some data");  // Fire-and-forget via step
  await sleep("1h");
}
start()
returns immediately — it doesn't wait for the workflow to complete. Use
run.returnValue
to await completion.
使用
start()
从API路由中启动工作流。
start()
不能直接在workflow上下文中调用
—— 请将其封装在step函数中。
typescript
import { start } from "workflow/api";

// 在API路由中可以直接调用
export async function POST() {
  const run = await start(myWorkflow, [arg1, arg2]);
  return Response.json({ runId: run.runId });
}

// 无参数的工作流
const run = await start(noArgWorkflow);
从工作流内部启动子工作流 —— 必须使用step封装:
typescript
import { start } from "workflow/api";

// 将start()封装在step函数中
async function triggerChild(data: string) {
  "use step";
  const run = await start(childWorkflow, [data]);
  return run.runId;
}

export async function parentWorkflow() {
  "use workflow";
  const childRunId = await triggerChild("some data");  // 通过step实现发后即忘的调用
  await sleep("1h");
}
start()
会立即返回,不会等待工作流执行完成。你可以使用
run.returnValue
来等待工作流执行结束。

Hooks — Pause & Resume with External Events

Hooks — 通过外部事件暂停与恢复

Hooks let workflows wait for external data. Use
createHook()
inside a workflow and
resumeHook()
from API routes. Deterministic tokens are for
createHook()
+
resumeHook()
(server-side) only.
createWebhook()
always generates random tokens — do not pass a
token
option to
createWebhook()
.
Hook允许工作流等待外部数据输入。你可以在workflow内部使用
createHook()
,在API路由中使用
resumeHook()
来恢复执行。确定性token仅适用于
createHook()
+
resumeHook()
(服务端场景)组合。
createWebhook()
总是会生成随机token,请勿给
createWebhook()
传递
token
参数。

Single event

单次事件

typescript
import { createHook } from "workflow";

export async function approvalWorkflow() {
  "use workflow";

  const hook = createHook<{ approved: boolean }>({
    token: "approval-123",  // deterministic token for external systems
  });

  const result = await hook;  // Workflow suspends here
  return result.approved;
}
typescript
import { createHook } from "workflow";

export async function approvalWorkflow() {
  "use workflow";

  const hook = createHook<{ approved: boolean }>({
    token: "approval-123",  // 给外部系统使用的确定性token
  });

  const result = await hook;  // 工作流会在此处暂停
  return result.approved;
}

Multiple events (iterable hooks)

多次事件(可迭代Hook)

Hooks implement
AsyncIterable
— use
for await...of
to receive multiple events:
typescript
import { createHook } from "workflow";

export async function chatWorkflow(channelId: string) {
  "use workflow";

  const hook = createHook<{ text: string; done?: boolean }>({
    token: `chat-${channelId}`,
  });

  for await (const event of hook) {
    await processMessage(event.text);
    if (event.done) break;
  }
}
Each
resumeHook(token, payload)
call delivers the next value to the loop.
Hook实现了
AsyncIterable
接口 —— 你可以使用
for await...of
来接收多个事件:
typescript
import { createHook } from "workflow";

export async function chatWorkflow(channelId: string) {
  "use workflow";

  const hook = createHook<{ text: string; done?: boolean }>({
    token: `chat-${channelId}`,
  });

  for await (const event of hook) {
    await processMessage(event.text);
    if (event.done) break;
  }
}
每次调用
resumeHook(token, payload)
都会给循环传递下一个值。

Resuming from API routes

从API路由恢复执行

typescript
import { resumeHook } from "workflow/api";

export async function POST(req: Request) {
  const { token, data } = await req.json();
  await resumeHook(token, data);
  return new Response("ok");
}
typescript
import { resumeHook } from "workflow/api";

export async function POST(req: Request) {
  const { token, data } = await req.json();
  await resumeHook(token, data);
  return new Response("ok");
}

Error Handling

错误处理

Use
FatalError
for permanent failures (no retry),
RetryableError
for transient failures:
typescript
import { FatalError, RetryableError } from "workflow";

if (res.status >= 400 && res.status < 500) {
  throw new FatalError(`Client error: ${res.status}`);
}
if (res.status === 429) {
  throw new RetryableError("Rate limited", { retryAfter: "5m" });
}
永久失败(无需重试)使用
FatalError
,临时失败使用
RetryableError
typescript
import { FatalError, RetryableError } from "workflow";

if (res.status >= 400 && res.status < 500) {
  throw new FatalError(`Client error: ${res.status}`);
}
if (res.status === 429) {
  throw new RetryableError("Rate limited", { retryAfter: "5m" });
}

Serialization

序列化规则

All data passed to/from workflows and steps must be serializable.
Supported types: string, number, boolean, null, undefined, bigint, plain objects, arrays, Date, RegExp, URL, URLSearchParams, Map, Set, Headers, ArrayBuffer, typed arrays, Request, Response, ReadableStream, WritableStream.
Not supported: Functions, class instances, Symbols, WeakMap/WeakSet. Pass data, not callbacks.
所有传入/传出工作流和step的数据必须可序列化。
支持的类型: 字符串、数字、布尔值、null、undefined、bigint、普通对象、数组、Date、RegExp、URL、URLSearchParams、Map、Set、Headers、ArrayBuffer、类型化数组、Request、Response、ReadableStream、WritableStream。
不支持的类型: 函数、类实例、Symbol、WeakMap/WeakSet。请传递数据而非回调函数。

Streaming

流处理

Use
getWritable()
to stream data from workflows.
getWritable()
can be called in both workflow and step contexts, but you cannot interact with the stream (call
getWriter()
,
write()
,
close()
) directly in a workflow function. The stream must be passed to step functions for actual I/O, or steps can call
getWritable()
themselves.
Get the stream in a workflow, pass it to a step:
typescript
import { getWritable } from "workflow";

export async function myWorkflow() {
  "use workflow";
  const writable = getWritable();
  await writeData(writable, "hello world");
}

async function writeData(writable: WritableStream, chunk: string) {
  "use step";
  const writer = writable.getWriter();
  try {
    await writer.write(chunk);
  } finally {
    writer.releaseLock();
  }
}
Call
getWritable()
directly inside a step (no need to pass it):
typescript
import { getWritable } from "workflow";

async function streamData(chunk: string) {
  "use step";
  const writer = getWritable().getWriter();
  try {
    await writer.write(chunk);
  } finally {
    writer.releaseLock();
  }
}
使用
getWritable()
从工作流中流式输出数据。
getWritable()
可以在workflow和step两种上下文中调用,但你不能直接在workflow函数中操作流(调用
getWriter()
write()
close()
)。必须将流传入step函数执行实际的I/O操作,或者step函数可以自行调用
getWritable()
在workflow中获取流,传递给step使用:
typescript
import { getWritable } from "workflow";

export async function myWorkflow() {
  "use workflow";
  const writable = getWritable();
  await writeData(writable, "hello world");
}

async function writeData(writable: WritableStream, chunk: string) {
  "use step";
  const writer = writable.getWriter();
  try {
    await writer.write(chunk);
  } finally {
    writer.releaseLock();
  }
}
直接在step内部调用
getWritable()
(无需传递流):
typescript
import { getWritable } from "workflow";

async function streamData(chunk: string) {
  "use step";
  const writer = getWritable().getWriter();
  try {
    await writer.write(chunk);
  } finally {
    writer.releaseLock();
  }
}

Namespaced Streams

命名空间流

Use
getWritable({ namespace: 'name' })
to create multiple independent streams for different types of data. This is useful for separating logs from primary output, different log levels, agent outputs, metrics, or any distinct data channels. Long-running workflows benefit from namespaced streams because you can replay only the important events (e.g., final results) while keeping verbose logs in a separate stream.
Example: Log levels and agent output separation:
typescript
import { getWritable } from "workflow";

type LogEntry = { level: "debug" | "info" | "warn" | "error"; message: string; timestamp: number };
type AgentOutput = { type: "thought" | "action" | "result"; content: string };

async function logDebug(message: string) {
  "use step";
  const writer = getWritable<LogEntry>({ namespace: "logs:debug" }).getWriter();
  try {
    await writer.write({ level: "debug", message, timestamp: Date.now() });
  } finally {
    writer.releaseLock();
  }
}

async function logInfo(message: string) {
  "use step";
  const writer = getWritable<LogEntry>({ namespace: "logs:info" }).getWriter();
  try {
    await writer.write({ level: "info", message, timestamp: Date.now() });
  } finally {
    writer.releaseLock();
  }
}

async function emitAgentThought(thought: string) {
  "use step";
  const writer = getWritable<AgentOutput>({ namespace: "agent:thoughts" }).getWriter();
  try {
    await writer.write({ type: "thought", content: thought });
  } finally {
    writer.releaseLock();
  }
}

async function emitAgentResult(result: string) {
  "use step";
  // Important results go to the default stream for easy replay
  const writer = getWritable<AgentOutput>().getWriter();
  try {
    await writer.write({ type: "result", content: result });
  } finally {
    writer.releaseLock();
  }
}

export async function agentWorkflow(task: string) {
  "use workflow";
  
  await logInfo(`Starting task: ${task}`);
  await logDebug("Initializing agent context");
  await emitAgentThought("Analyzing the task requirements...");
  
  // ... agent processing ...
  
  await emitAgentResult("Task completed successfully");
  await logInfo("Workflow finished");
}
Consuming namespaced streams:
typescript
import { start, getRun } from "workflow/api";
import { agentWorkflow } from "./workflows/agent";

export async function POST(request: Request) {
  const run = await start(agentWorkflow, ["process data"]);

  // Access specific streams by namespace
  const results = run.getReadable({ namespace: undefined }); // Default stream (important results)
  const infoLogs = run.getReadable({ namespace: "logs:info" });
  const debugLogs = run.getReadable({ namespace: "logs:debug" });
  const thoughts = run.getReadable({ namespace: "agent:thoughts" });

  // Return only important results for most clients
  return new Response(results, { headers: { "Content-Type": "application/json" } });
}

// Resume from a specific point (useful for long sessions)
export async function GET(request: Request) {
  const { searchParams } = new URL(request.url);
  const runId = searchParams.get("runId")!;
  const startIndex = parseInt(searchParams.get("startIndex") || "0", 10);
  
  const run = getRun(runId);
  // Resume only the important stream, skip verbose debug logs
  const stream = run.getReadable({ startIndex });
  
  return new Response(stream);
}
Pro tip: For very long-running sessions (50+ minutes), namespaced streams help manage replay performance. Put verbose/debug output in separate namespaces so you can replay just the important events quickly.
使用
getWritable({ namespace: 'name' })
可以创建多个独立的流,用于承载不同类型的数据。这适用于区分日志和主输出、不同日志级别、Agent输出、指标或者任何独立的数据通道。长时间运行的工作流可以从命名空间流中获益,因为你可以仅重放重要事件(比如最终结果),而将详细日志放在单独的流中。
示例:日志级别与Agent输出分离:
typescript
import { getWritable } from "workflow";

type LogEntry = { level: "debug" | "info" | "warn" | "error"; message: string; timestamp: number };
type AgentOutput = { type: "thought" | "action" | "result"; content: string };

async function logDebug(message: string) {
  "use step";
  const writer = getWritable<LogEntry>({ namespace: "logs:debug" }).getWriter();
  try {
    await writer.write({ level: "debug", message, timestamp: Date.now() });
  } finally {
    writer.releaseLock();
  }
}

async function logInfo(message: string) {
  "use step";
  const writer = getWritable<LogEntry>({ namespace: "logs:info" }).getWriter();
  try {
    await writer.write({ level: "info", message, timestamp: Date.now() });
  } finally {
    writer.releaseLock();
  }
}

async function emitAgentThought(thought: string) {
  "use step";
  const writer = getWritable<AgentOutput>({ namespace: "agent:thoughts" }).getWriter();
  try {
    await writer.write({ type: "thought", content: thought });
  } finally {
    writer.releaseLock();
  }
}

async function emitAgentResult(result: string) {
  "use step";
  // 重要结果写入默认流,方便重放
  const writer = getWritable<AgentOutput>().getWriter();
  try {
    await writer.write({ type: "result", content: result });
  } finally {
    writer.releaseLock();
  }
}

export async function agentWorkflow(task: string) {
  "use workflow";
  
  await logInfo(`Starting task: ${task}`);
  await logDebug("Initializing agent context");
  await emitAgentThought("Analyzing the task requirements...");
  
  // ... Agent处理逻辑 ...
  
  await emitAgentResult("Task completed successfully");
  await logInfo("Workflow finished");
}
消费命名空间流:
typescript
import { start, getRun } from "workflow/api";
import { agentWorkflow } from "./workflows/agent";

export async function POST(request: Request) {
  const run = await start(agentWorkflow, ["process data"]);

  // 通过命名空间访问特定流
  const results = run.getReadable({ namespace: undefined }); // 默认流(重要结果)
  const infoLogs = run.getReadable({ namespace: "logs:info" });
  const debugLogs = run.getReadable({ namespace: "logs:debug" });
  const thoughts = run.getReadable({ namespace: "agent:thoughts" });

  // 多数客户端仅返回重要结果即可
  return new Response(results, { headers: { "Content-Type": "application/json" } });
}

// 从特定点恢复(适用于长会话场景)
export async function GET(request: Request) {
  const { searchParams } = new URL(request.url);
  const runId = searchParams.get("runId")!;
  const startIndex = parseInt(searchParams.get("startIndex") || "0", 10);
  
  const run = getRun(runId);
  // 仅恢复重要流,跳过冗长的调试日志
  const stream = run.getReadable({ startIndex });
  
  return new Response(stream);
}
实用技巧: 对于运行时间非常长的会话(50分钟以上),命名空间流可以提升重放性能。将冗长/调试输出放在单独的命名空间中,这样你可以快速重放仅需的重要事件。

Debugging

调试

bash
undefined
bash
undefined

Check workflow endpoints are reachable

检查workflow端点是否可达

npx workflow health npx workflow health --port 3001 # Non-default port
npx workflow health npx workflow health --port 3001 # 非默认端口

Visual dashboard for runs

运行实例可视化面板

npx workflow web npx workflow web <run_id>
npx workflow web npx workflow web <run_id>

CLI inspection (use --json for machine-readable output, --help for full usage)

CLI查询(使用--json输出机器可读格式,--help查看完整用法)

npx workflow inspect runs npx workflow inspect run <run_id>
npx workflow inspect runs npx workflow inspect run <run_id>

For Vercel-deployed projects, specify backend and project

对于部署在Vercel的项目,指定后端和项目信息

npx workflow inspect runs --backend vercel --project <project-name> --team <team-slug> npx workflow inspect run <run_id> --backend vercel --project <project-name> --team <team-slug>
npx workflow inspect runs --backend vercel --project <project-name> --team <team-slug> npx workflow inspect run <run_id> --backend vercel --project <project-name> --team <team-slug>

Open Vercel dashboard in browser for a specific run

在浏览器中打开指定运行实例的Vercel面板

npx workflow inspect run <run_id> --web npx workflow web <run_id> --backend vercel --project <project-name> --team <team-slug>
npx workflow inspect run <run_id> --web npx workflow web <run_id> --backend vercel --project <project-name> --team <team-slug>

Cancel a running workflow

取消运行中的工作流

npx workflow cancel <run_id> npx workflow cancel <run_id> --backend vercel --project <project-name> --team <team-slug>
npx workflow cancel <run_id> npx workflow cancel <run_id> --backend vercel --project <project-name> --team <team-slug>

--env defaults to "production"; use --env preview for preview deployments

--env默认为"production";预览部署环境请使用--env preview


**Debugging tips:**
- Use `--json` (`-j`) on any command for machine-readable output
- Use `--web` to open the Vercel Observability dashboard in your browser
- Use `--help` on any command for full usage details
- Only import workflow APIs you actually use. Unused imports can cause 500 errors.

**调试技巧:**
- 任何命令都可以使用`--json`(简写`-j`)输出机器可读格式
- 使用`--web`可以在浏览器中打开Vercel可观测性面板
- 任何命令都可以使用`--help`查看完整用法
- 仅导入你实际使用的workflow API,未使用的导入可能会导致500错误。

Testing Workflows

测试工作流

Workflow DevKit provides a Vitest plugin for testing workflows in-process — no running server required.
Unit testing steps: Steps are just functions; without the compiler,
"use step"
is a no-op. Test them directly:
typescript
import { describe, it, expect } from "vitest";
import { createUser } from "./user-signup";

describe("createUser step", () => {
  it("should create a user", async () => {
    const user = await createUser("test@example.com");
    expect(user.email).toBe("test@example.com");
  });
});
Integration testing: Use
@workflow/vitest
for workflows using
sleep()
, hooks, webhooks, or retries:
typescript
// vitest.integration.config.ts
import { defineConfig } from "vitest/config";
import { workflow } from "@workflow/vitest";

export default defineConfig({
  plugins: [workflow()],
  test: {
    include: ["**/*.integration.test.ts"],
    testTimeout: 60_000,
  },
});
typescript
// approval.integration.test.ts
import { describe, it, expect } from "vitest";
import { start, getRun, resumeHook } from "workflow/api";
import { waitForHook, waitForSleep } from "@workflow/vitest";
import { approvalWorkflow } from "./approval";

describe("approvalWorkflow", () => {
  it("should publish when approved", async () => {
    const run = await start(approvalWorkflow, ["doc-123"]);

    // Wait for the hook, then resume it
    await waitForHook(run, { token: "approval:doc-123" });
    await resumeHook("approval:doc-123", { approved: true, reviewer: "alice" });

    // Wait for sleep, then wake it up
    const sleepId = await waitForSleep(run);
    await getRun(run.runId).wakeUp({ correlationIds: [sleepId] });

    const result = await run.returnValue;
    expect(result).toEqual({ status: "published", reviewer: "alice" });
  });
});
Testing webhooks: Use
resumeWebhook()
with a
Request
object — no HTTP server needed:
typescript
import { start, resumeWebhook } from "workflow/api";
import { waitForHook } from "@workflow/vitest";

const run = await start(ingestWorkflow, ["ep-1"]);
const hook = await waitForHook(run);  // Discovers the random webhook token
await resumeWebhook(hook.token, new Request("https://example.com/webhook", {
  method: "POST",
  body: JSON.stringify({ event: "order.created" }),
}));
Key APIs:
  • start()
    — trigger a workflow
  • run.returnValue
    — await workflow completion
  • waitForHook(run, { token? })
    /
    waitForSleep(run)
    — wait for workflow to reach a pause point
  • resumeHook(token, data)
    /
    resumeWebhook(token, request)
    — resume paused workflows
  • getRun(runId).wakeUp({ correlationIds })
    — skip
    sleep()
    calls
Best practices:
  • Keep unit tests (no plugin) and integration tests (
    workflow()
    plugin) in separate configs
  • Use deterministic hook tokens based on test data for easier resumption
  • Set generous
    testTimeout
    — workflows may run longer than typical unit tests
  • vi.mock()
    does not work in integration tests — step dependencies are bundled by esbuild
Workflow DevKit提供了Vitest插件,可在进程内测试工作流,无需运行额外服务。
Step单元测试: Step本质就是函数,没有编译器的情况下
"use step"
是空操作。你可以直接测试:
typescript
import { describe, it, expect } from "vitest";
import { createUser } from "./user-signup";

describe("createUser step", () => {
  it("should create a user", async () => {
    const user = await createUser("test@example.com");
    expect(user.email).toBe("test@example.com");
  });
});
集成测试: 对于使用了
sleep()
、hook、webhook或者重试的工作流,请使用
@workflow/vitest
typescript
// vitest.integration.config.ts
import { defineConfig } from "vitest/config";
import { workflow } from "@workflow/vitest";

export default defineConfig({
  plugins: [workflow()],
  test: {
    include: ["**/*.integration.test.ts"],
    testTimeout: 60_000,
  },
});
typescript
// approval.integration.test.ts
import { describe, it, expect } from "vitest";
import { start, getRun, resumeHook } from "workflow/api";
import { waitForHook, waitForSleep } from "@workflow/vitest";
import { approvalWorkflow } from "./approval";

describe("approvalWorkflow", () => {
  it("should publish when approved", async () => {
    const run = await start(approvalWorkflow, ["doc-123"]);

    // 等待hook触发,然后恢复执行
    await waitForHook(run, { token: "approval:doc-123" });
    await resumeHook("approval:doc-123", { approved: true, reviewer: "alice" });

    // 等待sleep触发,然后唤醒
    const sleepId = await waitForSleep(run);
    await getRun(run.runId).wakeUp({ correlationIds: [sleepId] });

    const result = await run.returnValue;
    expect(result).toEqual({ status: "published", reviewer: "alice" });
  });
});
测试Webhook: 可以直接给
resumeWebhook()
传递
Request
对象,无需启动HTTP服务:
typescript
import { start, resumeWebhook } from "workflow/api";
import { waitForHook } from "@workflow/vitest";

const run = await start(ingestWorkflow, ["ep-1"]);
const hook = await waitForHook(run);  // 自动发现随机的webhook token
await resumeWebhook(hook.token, new Request("https://example.com/webhook", {
  method: "POST",
  body: JSON.stringify({ event: "order.created" }),
}));
核心API:
  • start()
    — 触发工作流
  • run.returnValue
    — 等待工作流执行完成
  • waitForHook(run, { token? })
    /
    waitForSleep(run)
    — 等待工作流到达暂停点
  • resumeHook(token, data)
    /
    resumeWebhook(token, request)
    — 恢复暂停的工作流
  • getRun(runId).wakeUp({ correlationIds })
    — 跳过
    sleep()
    调用
最佳实践:
  • 将单元测试(无需插件)和集成测试(需要
    workflow()
    插件)放在不同的配置文件中
  • 基于测试数据使用确定性的hook token,方便恢复执行
  • 设置足够长的
    testTimeout
    —— 工作流的运行时间通常比普通单元测试长
  • vi.mock()
    在集成测试中不生效 —— step的依赖会被esbuild打包