workflow
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCRITICAL: Always Use Correct workflow
Documentation
workflow重要提示:始终使用正确的workflow
文档
workflowYour knowledge of is outdated.
workflowThe documentation outlined below matches the installed version of the Workflow DevKit.
Follow these instructions before starting on any -related tasks:
workflowworkflowSearch the bundled documentation in :
node_modules/workflow/docs/- Find docs:
glob "node_modules/workflow/docs/**/*.mdx" - Search content:
grep "your query" node_modules/workflow/docs/
Documentation structure in :
node_modules/workflow/docs/- - Framework setup (next.mdx, express.mdx, hono.mdx, etc.)
getting-started/ - - Core concepts (workflows-and-steps.mdx, hooks.mdx, streaming.mdx, etc.)
foundations/ - - API docs (sleep.mdx, create-hook.mdx, fatal-error.mdx, etc.)
api-reference/workflow/ - - Client API (start.mdx, get-run.mdx, resume-hook.mdx, etc.)
api-reference/workflow-api/ - - AI SDK integration docs
ai/ - - Error code documentation
errors/
Related packages also include bundled docs:
- :
@workflow/ai- DurableAgent and AI integrationnode_modules/@workflow/ai/docs/ - :
@workflow/core- Core runtime (foundations, how-it-works)node_modules/@workflow/core/docs/ - :
@workflow/next- Next.js integrationnode_modules/@workflow/next/docs/
When in doubt, update to the latest version of the Workflow DevKit.
你所掌握的相关知识已经过时。
workflow下文列出的文档与你安装的Workflow DevKit版本匹配。在开始任何与相关的任务前,请遵循以下指引:
workflowworkflow请在中搜索内置文档:
node_modules/workflow/docs/- 查找文档:
glob "node_modules/workflow/docs/**/*.mdx" - 搜索内容:
grep "your query" node_modules/workflow/docs/
node_modules/workflow/docs/- - 框架配置(next.mdx、express.mdx、hono.mdx等)
getting-started/ - - 核心概念(workflows-and-steps.mdx、hooks.mdx、streaming.mdx等)
foundations/ - - API文档(sleep.mdx、create-hook.mdx、fatal-error.mdx等)
api-reference/workflow/ - - 客户端API(start.mdx、get-run.mdx、resume-hook.mdx等)
api-reference/workflow-api/ - - AI SDK集成文档
ai/ - - 错误码文档
errors/
相关依赖包也包含内置文档:
- :
@workflow/ai- DurableAgent与AI集成相关内容node_modules/@workflow/ai/docs/ - :
@workflow/core- 核心运行时(基础概念、运行原理解析)node_modules/@workflow/core/docs/ - :
@workflow/next- Next.js集成相关内容node_modules/@workflow/next/docs/
如果存在疑问,请先升级到最新版本的Workflow DevKit。
Official Resources
官方资源
- Website: https://useworkflow.dev
- GitHub: https://github.com/vercel/workflow
Quick Reference
快速参考
Directives:
typescript
"use workflow"; // First line - makes async function durable
"use step"; // First line - makes function a cached, retryable unitEssential imports:
typescript
// Workflow primitives
import { sleep, fetch, createHook, createWebhook, getWritable } from "workflow";
import { FatalError, RetryableError } from "workflow";
import { getWorkflowMetadata, getStepMetadata } from "workflow";
// API operations
import { start, getRun, resumeHook, resumeWebhook } from "workflow/api";
// Framework integrations
import { withWorkflow } from "workflow/next";
import { workflow } from "workflow/vite";
import { workflow } from "workflow/astro";
// Or use modules: ["workflow/nitro"] for Nitro/Nuxt
// AI agent
import { DurableAgent } from "@workflow/ai/agent";指令:
typescript
"use workflow"; // 代码第一行声明,让异步函数具备持久化能力
"use step"; // 代码第一行声明,让函数成为可缓存、可重试的执行单元核心导入语句:
typescript
// Workflow基础原语
import { sleep, fetch, createHook, createWebhook, getWritable } from "workflow";
import { FatalError, RetryableError } from "workflow";
import { getWorkflowMetadata, getStepMetadata } from "workflow";
// API操作
import { start, getRun, resumeHook, resumeWebhook } from "workflow/api";
// 框架集成
import { withWorkflow } from "workflow/next";
import { workflow } from "workflow/vite";
import { workflow } from "workflow/astro";
// 针对Nitro/Nuxt可使用模块:["workflow/nitro"]
// AI agent
import { DurableAgent } from "@workflow/ai/agent";Prefer Step Functions to Avoid Sandbox Errors
优先使用Step函数避免沙箱错误
"use workflow""use step"typescript
// Steps have full Node.js and npm access
async function fetchUserData(userId: string) {
"use step";
const response = await fetch(`https://api.example.com/users/${userId}`);
return response.json();
}
async function processWithAI(data: any) {
"use step";
// AI SDK works in steps without workarounds
return await generateText({
model: openai("gpt-4"),
prompt: `Process: ${JSON.stringify(data)}`,
});
}
// Workflow orchestrates steps - no sandbox issues
export async function dataProcessingWorkflow(userId: string) {
"use workflow";
const data = await fetchUserData(userId);
const processed = await processWithAI(data);
return { success: true, processed };
}Benefits: Steps have automatic retry, results are persisted for replay, and no sandbox restrictions.
声明的函数运行在沙箱VM中,声明的函数拥有完整的Node.js访问权限。请将业务逻辑放在step中,workflow函数仅负责编排逻辑。
"use workflow""use step"typescript
// Step函数拥有完整的Node.js和npm包访问权限
async function fetchUserData(userId: string) {
"use step";
const response = await fetch(`https://api.example.com/users/${userId}`);
return response.json();
}
async function processWithAI(data: any) {
"use step";
// AI SDK可以直接在step中运行,无需额外适配
return await generateText({
model: openai("gpt-4"),
prompt: `Process: ${JSON.stringify(data)}`,
});
}
// Workflow仅负责编排step,不会出现沙箱问题
export async function dataProcessingWorkflow(userId: string) {
"use workflow";
const data = await fetchUserData(userId);
const processed = await processWithAI(data);
return { success: true, processed };
}优势: Step会自动重试,执行结果会持久化支持重放,且没有沙箱权限限制。
Workflow Sandbox Limitations
Workflow沙箱限制
When you need logic directly in a workflow function (not in a step), these restrictions apply:
| Limitation | Workaround |
|---|---|
No | |
No | Use |
| No Node.js modules (fs, crypto, etc.) | Move to a step function |
Example - Using fetch in workflow context:
typescript
import { fetch } from "workflow";
export async function myWorkflow() {
"use workflow";
globalThis.fetch = fetch; // Required for AI SDK and HTTP libraries
// Now generateText() and other libraries work
}Note: from handles the fetch assignment automatically.
DurableAgent@workflow/ai如果你需要直接在workflow函数(而非step)中编写逻辑,需要遵循以下限制:
| 限制 | 解决方案 |
|---|---|
不支持 | 从 |
不支持 | 使用 |
| 不支持Node.js内置模块(fs、crypto等) | 将逻辑迁移到step函数中 |
示例 - 在workflow上下文中使用fetch:
typescript
import { fetch } from "workflow";
export async function myWorkflow() {
"use workflow";
globalThis.fetch = fetch; // AI SDK和其他HTTP库需要该配置才能正常运行
// 现在generateText()和其他库可以正常工作了
}注意: 提供的会自动处理fetch的挂载配置,无需手动操作。
@workflow/aiDurableAgentDurableAgent — AI Agents in Workflows
DurableAgent — 工作流中的AI Agents
Use to build AI agents that maintain state and survive interruptions. It handles the workflow sandbox automatically (no manual needed).
DurableAgentglobalThis.fetchtypescript
import { DurableAgent } from "@workflow/ai/agent";
import { getWritable } from "workflow";
import { z } from "zod";
import type { UIMessageChunk } from "ai";
async function lookupData({ query }: { query: string }) {
"use step";
// Step functions have full Node.js access
return `Results for "${query}"`;
}
export async function myAgentWorkflow(userMessage: string) {
"use workflow";
const agent = new DurableAgent({
model: "anthropic/claude-sonnet-4-5",
system: "You are a helpful assistant.",
tools: {
lookupData: {
description: "Search for information",
inputSchema: z.object({ query: z.string() }),
execute: lookupData,
},
},
});
const result = await agent.stream({
messages: [{ role: "user", content: userMessage }],
writable: getWritable<UIMessageChunk>(),
maxSteps: 10,
});
return result.messages;
}Key points:
- streams output to the workflow run's default stream
getWritable<UIMessageChunk>() - Tool functions that need Node.js/npm access should use
execute"use step" - Tool functions that use workflow primitives (
execute,sleep()) should NOT usecreateHook()— they run at the workflow level"use step" - limits the number of LLM calls (default is unlimited)
maxSteps - Multi-turn: pass plus new user messages to subsequent
result.messagescallsagent.stream()
For more details on , check the AI docs in .
DurableAgentnode_modules/@workflow/ai/docs/使用可以构建具备状态持久化、支持中断恢复的AI Agent。它会自动适配workflow沙箱(无需手动配置)。
DurableAgentglobalThis.fetchtypescript
import { DurableAgent } from "@workflow/ai/agent";
import { getWritable } from "workflow";
import { z } from "zod";
import type { UIMessageChunk } from "ai";
async function lookupData({ query }: { query: string }) {
"use step";
// Step函数拥有完整的Node.js访问权限
return `Results for "${query}"`;
}
export async function myAgentWorkflow(userMessage: string) {
"use workflow";
const agent = new DurableAgent({
model: "anthropic/claude-sonnet-4-5",
system: "You are a helpful assistant.",
tools: {
lookupData: {
description: "Search for information",
inputSchema: z.object({ query: z.string() }),
execute: lookupData,
},
},
});
const result = await agent.stream({
messages: [{ role: "user", content: userMessage }],
writable: getWritable<UIMessageChunk>(),
maxSteps: 10,
});
return result.messages;
}核心要点:
- 会将输出流写入workflow运行实例的默认流中
getWritable<UIMessageChunk>() - 需要Node.js/npm访问权限的工具函数应该声明
execute"use step" - 使用workflow原语(、
sleep())的工具createHook()函数不要声明execute,它们会在workflow层级运行"use step" - 限制LLM调用的次数(默认无限制)
maxSteps - 多轮对话场景:可将加上新的用户消息传入后续的
result.messages调用agent.stream()
如需了解更多相关内容,请查看下的AI文档。
DurableAgentnode_modules/@workflow/ai/docs/Starting Workflows & Child Workflows
启动工作流与子工作流
Use to launch workflows from API routes. cannot be called directly in workflow context — wrap it in a step function.
start()start()typescript
import { start } from "workflow/api";
// From an API route — works directly
export async function POST() {
const run = await start(myWorkflow, [arg1, arg2]);
return Response.json({ runId: run.runId });
}
// No-args workflow
const run = await start(noArgWorkflow);Starting child workflows from inside a workflow — must use a step:
typescript
import { start } from "workflow/api";
// Wrap start() in a step function
async function triggerChild(data: string) {
"use step";
const run = await start(childWorkflow, [data]);
return run.runId;
}
export async function parentWorkflow() {
"use workflow";
const childRunId = await triggerChild("some data"); // Fire-and-forget via step
await sleep("1h");
}start()run.returnValue使用从API路由中启动工作流。不能直接在workflow上下文中调用 —— 请将其封装在step函数中。
start()start()typescript
import { start } from "workflow/api";
// 在API路由中可以直接调用
export async function POST() {
const run = await start(myWorkflow, [arg1, arg2]);
return Response.json({ runId: run.runId });
}
// 无参数的工作流
const run = await start(noArgWorkflow);从工作流内部启动子工作流 —— 必须使用step封装:
typescript
import { start } from "workflow/api";
// 将start()封装在step函数中
async function triggerChild(data: string) {
"use step";
const run = await start(childWorkflow, [data]);
return run.runId;
}
export async function parentWorkflow() {
"use workflow";
const childRunId = await triggerChild("some data"); // 通过step实现发后即忘的调用
await sleep("1h");
}start()run.returnValueHooks — Pause & Resume with External Events
Hooks — 通过外部事件暂停与恢复
Hooks let workflows wait for external data. Use inside a workflow and from API routes. Deterministic tokens are for + (server-side) only. always generates random tokens — do not pass a option to .
createHook()resumeHook()createHook()resumeHook()createWebhook()tokencreateWebhook()Hook允许工作流等待外部数据输入。你可以在workflow内部使用,在API路由中使用来恢复执行。确定性token仅适用于 + (服务端场景)组合。总是会生成随机token,请勿给传递参数。
createHook()resumeHook()createHook()resumeHook()createWebhook()createWebhook()tokenSingle event
单次事件
typescript
import { createHook } from "workflow";
export async function approvalWorkflow() {
"use workflow";
const hook = createHook<{ approved: boolean }>({
token: "approval-123", // deterministic token for external systems
});
const result = await hook; // Workflow suspends here
return result.approved;
}typescript
import { createHook } from "workflow";
export async function approvalWorkflow() {
"use workflow";
const hook = createHook<{ approved: boolean }>({
token: "approval-123", // 给外部系统使用的确定性token
});
const result = await hook; // 工作流会在此处暂停
return result.approved;
}Multiple events (iterable hooks)
多次事件(可迭代Hook)
Hooks implement — use to receive multiple events:
AsyncIterablefor await...oftypescript
import { createHook } from "workflow";
export async function chatWorkflow(channelId: string) {
"use workflow";
const hook = createHook<{ text: string; done?: boolean }>({
token: `chat-${channelId}`,
});
for await (const event of hook) {
await processMessage(event.text);
if (event.done) break;
}
}Each call delivers the next value to the loop.
resumeHook(token, payload)Hook实现了接口 —— 你可以使用来接收多个事件:
AsyncIterablefor await...oftypescript
import { createHook } from "workflow";
export async function chatWorkflow(channelId: string) {
"use workflow";
const hook = createHook<{ text: string; done?: boolean }>({
token: `chat-${channelId}`,
});
for await (const event of hook) {
await processMessage(event.text);
if (event.done) break;
}
}每次调用都会给循环传递下一个值。
resumeHook(token, payload)Resuming from API routes
从API路由恢复执行
typescript
import { resumeHook } from "workflow/api";
export async function POST(req: Request) {
const { token, data } = await req.json();
await resumeHook(token, data);
return new Response("ok");
}typescript
import { resumeHook } from "workflow/api";
export async function POST(req: Request) {
const { token, data } = await req.json();
await resumeHook(token, data);
return new Response("ok");
}Error Handling
错误处理
Use for permanent failures (no retry), for transient failures:
FatalErrorRetryableErrortypescript
import { FatalError, RetryableError } from "workflow";
if (res.status >= 400 && res.status < 500) {
throw new FatalError(`Client error: ${res.status}`);
}
if (res.status === 429) {
throw new RetryableError("Rate limited", { retryAfter: "5m" });
}永久失败(无需重试)使用,临时失败使用:
FatalErrorRetryableErrortypescript
import { FatalError, RetryableError } from "workflow";
if (res.status >= 400 && res.status < 500) {
throw new FatalError(`Client error: ${res.status}`);
}
if (res.status === 429) {
throw new RetryableError("Rate limited", { retryAfter: "5m" });
}Serialization
序列化规则
All data passed to/from workflows and steps must be serializable.
Supported types: string, number, boolean, null, undefined, bigint, plain objects, arrays, Date, RegExp, URL, URLSearchParams, Map, Set, Headers, ArrayBuffer, typed arrays, Request, Response, ReadableStream, WritableStream.
Not supported: Functions, class instances, Symbols, WeakMap/WeakSet. Pass data, not callbacks.
所有传入/传出工作流和step的数据必须可序列化。
支持的类型: 字符串、数字、布尔值、null、undefined、bigint、普通对象、数组、Date、RegExp、URL、URLSearchParams、Map、Set、Headers、ArrayBuffer、类型化数组、Request、Response、ReadableStream、WritableStream。
不支持的类型: 函数、类实例、Symbol、WeakMap/WeakSet。请传递数据而非回调函数。
Streaming
流处理
Use to stream data from workflows. can be called in both workflow and step contexts, but you cannot interact with the stream (call , , ) directly in a workflow function. The stream must be passed to step functions for actual I/O, or steps can call themselves.
getWritable()getWritable()getWriter()write()close()getWritable()Get the stream in a workflow, pass it to a step:
typescript
import { getWritable } from "workflow";
export async function myWorkflow() {
"use workflow";
const writable = getWritable();
await writeData(writable, "hello world");
}
async function writeData(writable: WritableStream, chunk: string) {
"use step";
const writer = writable.getWriter();
try {
await writer.write(chunk);
} finally {
writer.releaseLock();
}
}Call directly inside a step (no need to pass it):
getWritable()typescript
import { getWritable } from "workflow";
async function streamData(chunk: string) {
"use step";
const writer = getWritable().getWriter();
try {
await writer.write(chunk);
} finally {
writer.releaseLock();
}
}使用从工作流中流式输出数据。可以在workflow和step两种上下文中调用,但你不能直接在workflow函数中操作流(调用、、)。必须将流传入step函数执行实际的I/O操作,或者step函数可以自行调用。
getWritable()getWritable()getWriter()write()close()getWritable()在workflow中获取流,传递给step使用:
typescript
import { getWritable } from "workflow";
export async function myWorkflow() {
"use workflow";
const writable = getWritable();
await writeData(writable, "hello world");
}
async function writeData(writable: WritableStream, chunk: string) {
"use step";
const writer = writable.getWriter();
try {
await writer.write(chunk);
} finally {
writer.releaseLock();
}
}直接在step内部调用(无需传递流):
getWritable()typescript
import { getWritable } from "workflow";
async function streamData(chunk: string) {
"use step";
const writer = getWritable().getWriter();
try {
await writer.write(chunk);
} finally {
writer.releaseLock();
}
}Namespaced Streams
命名空间流
Use to create multiple independent streams for different types of data. This is useful for separating logs from primary output, different log levels, agent outputs, metrics, or any distinct data channels. Long-running workflows benefit from namespaced streams because you can replay only the important events (e.g., final results) while keeping verbose logs in a separate stream.
getWritable({ namespace: 'name' })Example: Log levels and agent output separation:
typescript
import { getWritable } from "workflow";
type LogEntry = { level: "debug" | "info" | "warn" | "error"; message: string; timestamp: number };
type AgentOutput = { type: "thought" | "action" | "result"; content: string };
async function logDebug(message: string) {
"use step";
const writer = getWritable<LogEntry>({ namespace: "logs:debug" }).getWriter();
try {
await writer.write({ level: "debug", message, timestamp: Date.now() });
} finally {
writer.releaseLock();
}
}
async function logInfo(message: string) {
"use step";
const writer = getWritable<LogEntry>({ namespace: "logs:info" }).getWriter();
try {
await writer.write({ level: "info", message, timestamp: Date.now() });
} finally {
writer.releaseLock();
}
}
async function emitAgentThought(thought: string) {
"use step";
const writer = getWritable<AgentOutput>({ namespace: "agent:thoughts" }).getWriter();
try {
await writer.write({ type: "thought", content: thought });
} finally {
writer.releaseLock();
}
}
async function emitAgentResult(result: string) {
"use step";
// Important results go to the default stream for easy replay
const writer = getWritable<AgentOutput>().getWriter();
try {
await writer.write({ type: "result", content: result });
} finally {
writer.releaseLock();
}
}
export async function agentWorkflow(task: string) {
"use workflow";
await logInfo(`Starting task: ${task}`);
await logDebug("Initializing agent context");
await emitAgentThought("Analyzing the task requirements...");
// ... agent processing ...
await emitAgentResult("Task completed successfully");
await logInfo("Workflow finished");
}Consuming namespaced streams:
typescript
import { start, getRun } from "workflow/api";
import { agentWorkflow } from "./workflows/agent";
export async function POST(request: Request) {
const run = await start(agentWorkflow, ["process data"]);
// Access specific streams by namespace
const results = run.getReadable({ namespace: undefined }); // Default stream (important results)
const infoLogs = run.getReadable({ namespace: "logs:info" });
const debugLogs = run.getReadable({ namespace: "logs:debug" });
const thoughts = run.getReadable({ namespace: "agent:thoughts" });
// Return only important results for most clients
return new Response(results, { headers: { "Content-Type": "application/json" } });
}
// Resume from a specific point (useful for long sessions)
export async function GET(request: Request) {
const { searchParams } = new URL(request.url);
const runId = searchParams.get("runId")!;
const startIndex = parseInt(searchParams.get("startIndex") || "0", 10);
const run = getRun(runId);
// Resume only the important stream, skip verbose debug logs
const stream = run.getReadable({ startIndex });
return new Response(stream);
}Pro tip: For very long-running sessions (50+ minutes), namespaced streams help manage replay performance. Put verbose/debug output in separate namespaces so you can replay just the important events quickly.
使用可以创建多个独立的流,用于承载不同类型的数据。这适用于区分日志和主输出、不同日志级别、Agent输出、指标或者任何独立的数据通道。长时间运行的工作流可以从命名空间流中获益,因为你可以仅重放重要事件(比如最终结果),而将详细日志放在单独的流中。
getWritable({ namespace: 'name' })示例:日志级别与Agent输出分离:
typescript
import { getWritable } from "workflow";
type LogEntry = { level: "debug" | "info" | "warn" | "error"; message: string; timestamp: number };
type AgentOutput = { type: "thought" | "action" | "result"; content: string };
async function logDebug(message: string) {
"use step";
const writer = getWritable<LogEntry>({ namespace: "logs:debug" }).getWriter();
try {
await writer.write({ level: "debug", message, timestamp: Date.now() });
} finally {
writer.releaseLock();
}
}
async function logInfo(message: string) {
"use step";
const writer = getWritable<LogEntry>({ namespace: "logs:info" }).getWriter();
try {
await writer.write({ level: "info", message, timestamp: Date.now() });
} finally {
writer.releaseLock();
}
}
async function emitAgentThought(thought: string) {
"use step";
const writer = getWritable<AgentOutput>({ namespace: "agent:thoughts" }).getWriter();
try {
await writer.write({ type: "thought", content: thought });
} finally {
writer.releaseLock();
}
}
async function emitAgentResult(result: string) {
"use step";
// 重要结果写入默认流,方便重放
const writer = getWritable<AgentOutput>().getWriter();
try {
await writer.write({ type: "result", content: result });
} finally {
writer.releaseLock();
}
}
export async function agentWorkflow(task: string) {
"use workflow";
await logInfo(`Starting task: ${task}`);
await logDebug("Initializing agent context");
await emitAgentThought("Analyzing the task requirements...");
// ... Agent处理逻辑 ...
await emitAgentResult("Task completed successfully");
await logInfo("Workflow finished");
}消费命名空间流:
typescript
import { start, getRun } from "workflow/api";
import { agentWorkflow } from "./workflows/agent";
export async function POST(request: Request) {
const run = await start(agentWorkflow, ["process data"]);
// 通过命名空间访问特定流
const results = run.getReadable({ namespace: undefined }); // 默认流(重要结果)
const infoLogs = run.getReadable({ namespace: "logs:info" });
const debugLogs = run.getReadable({ namespace: "logs:debug" });
const thoughts = run.getReadable({ namespace: "agent:thoughts" });
// 多数客户端仅返回重要结果即可
return new Response(results, { headers: { "Content-Type": "application/json" } });
}
// 从特定点恢复(适用于长会话场景)
export async function GET(request: Request) {
const { searchParams } = new URL(request.url);
const runId = searchParams.get("runId")!;
const startIndex = parseInt(searchParams.get("startIndex") || "0", 10);
const run = getRun(runId);
// 仅恢复重要流,跳过冗长的调试日志
const stream = run.getReadable({ startIndex });
return new Response(stream);
}实用技巧: 对于运行时间非常长的会话(50分钟以上),命名空间流可以提升重放性能。将冗长/调试输出放在单独的命名空间中,这样你可以快速重放仅需的重要事件。
Debugging
调试
bash
undefinedbash
undefinedCheck workflow endpoints are reachable
检查workflow端点是否可达
npx workflow health
npx workflow health --port 3001 # Non-default port
npx workflow health
npx workflow health --port 3001 # 非默认端口
Visual dashboard for runs
运行实例可视化面板
npx workflow web
npx workflow web <run_id>
npx workflow web
npx workflow web <run_id>
CLI inspection (use --json for machine-readable output, --help for full usage)
CLI查询(使用--json输出机器可读格式,--help查看完整用法)
npx workflow inspect runs
npx workflow inspect run <run_id>
npx workflow inspect runs
npx workflow inspect run <run_id>
For Vercel-deployed projects, specify backend and project
对于部署在Vercel的项目,指定后端和项目信息
npx workflow inspect runs --backend vercel --project <project-name> --team <team-slug>
npx workflow inspect run <run_id> --backend vercel --project <project-name> --team <team-slug>
npx workflow inspect runs --backend vercel --project <project-name> --team <team-slug>
npx workflow inspect run <run_id> --backend vercel --project <project-name> --team <team-slug>
Open Vercel dashboard in browser for a specific run
在浏览器中打开指定运行实例的Vercel面板
npx workflow inspect run <run_id> --web
npx workflow web <run_id> --backend vercel --project <project-name> --team <team-slug>
npx workflow inspect run <run_id> --web
npx workflow web <run_id> --backend vercel --project <project-name> --team <team-slug>
Cancel a running workflow
取消运行中的工作流
npx workflow cancel <run_id>
npx workflow cancel <run_id> --backend vercel --project <project-name> --team <team-slug>
npx workflow cancel <run_id>
npx workflow cancel <run_id> --backend vercel --project <project-name> --team <team-slug>
--env defaults to "production"; use --env preview for preview deployments
--env默认为"production";预览部署环境请使用--env preview
**Debugging tips:**
- Use `--json` (`-j`) on any command for machine-readable output
- Use `--web` to open the Vercel Observability dashboard in your browser
- Use `--help` on any command for full usage details
- Only import workflow APIs you actually use. Unused imports can cause 500 errors.
**调试技巧:**
- 任何命令都可以使用`--json`(简写`-j`)输出机器可读格式
- 使用`--web`可以在浏览器中打开Vercel可观测性面板
- 任何命令都可以使用`--help`查看完整用法
- 仅导入你实际使用的workflow API,未使用的导入可能会导致500错误。Testing Workflows
测试工作流
Workflow DevKit provides a Vitest plugin for testing workflows in-process — no running server required.
Unit testing steps: Steps are just functions; without the compiler, is a no-op. Test them directly:
"use step"typescript
import { describe, it, expect } from "vitest";
import { createUser } from "./user-signup";
describe("createUser step", () => {
it("should create a user", async () => {
const user = await createUser("test@example.com");
expect(user.email).toBe("test@example.com");
});
});Integration testing: Use for workflows using , hooks, webhooks, or retries:
@workflow/vitestsleep()typescript
// vitest.integration.config.ts
import { defineConfig } from "vitest/config";
import { workflow } from "@workflow/vitest";
export default defineConfig({
plugins: [workflow()],
test: {
include: ["**/*.integration.test.ts"],
testTimeout: 60_000,
},
});typescript
// approval.integration.test.ts
import { describe, it, expect } from "vitest";
import { start, getRun, resumeHook } from "workflow/api";
import { waitForHook, waitForSleep } from "@workflow/vitest";
import { approvalWorkflow } from "./approval";
describe("approvalWorkflow", () => {
it("should publish when approved", async () => {
const run = await start(approvalWorkflow, ["doc-123"]);
// Wait for the hook, then resume it
await waitForHook(run, { token: "approval:doc-123" });
await resumeHook("approval:doc-123", { approved: true, reviewer: "alice" });
// Wait for sleep, then wake it up
const sleepId = await waitForSleep(run);
await getRun(run.runId).wakeUp({ correlationIds: [sleepId] });
const result = await run.returnValue;
expect(result).toEqual({ status: "published", reviewer: "alice" });
});
});Testing webhooks: Use with a object — no HTTP server needed:
resumeWebhook()Requesttypescript
import { start, resumeWebhook } from "workflow/api";
import { waitForHook } from "@workflow/vitest";
const run = await start(ingestWorkflow, ["ep-1"]);
const hook = await waitForHook(run); // Discovers the random webhook token
await resumeWebhook(hook.token, new Request("https://example.com/webhook", {
method: "POST",
body: JSON.stringify({ event: "order.created" }),
}));Key APIs:
- — trigger a workflow
start() - — await workflow completion
run.returnValue - /
waitForHook(run, { token? })— wait for workflow to reach a pause pointwaitForSleep(run) - /
resumeHook(token, data)— resume paused workflowsresumeWebhook(token, request) - — skip
getRun(runId).wakeUp({ correlationIds })callssleep()
Best practices:
- Keep unit tests (no plugin) and integration tests (plugin) in separate configs
workflow() - Use deterministic hook tokens based on test data for easier resumption
- Set generous — workflows may run longer than typical unit tests
testTimeout - does not work in integration tests — step dependencies are bundled by esbuild
vi.mock()
Workflow DevKit提供了Vitest插件,可在进程内测试工作流,无需运行额外服务。
Step单元测试: Step本质就是函数,没有编译器的情况下是空操作。你可以直接测试:
"use step"typescript
import { describe, it, expect } from "vitest";
import { createUser } from "./user-signup";
describe("createUser step", () => {
it("should create a user", async () => {
const user = await createUser("test@example.com");
expect(user.email).toBe("test@example.com");
});
});集成测试: 对于使用了、hook、webhook或者重试的工作流,请使用:
sleep()@workflow/vitesttypescript
// vitest.integration.config.ts
import { defineConfig } from "vitest/config";
import { workflow } from "@workflow/vitest";
export default defineConfig({
plugins: [workflow()],
test: {
include: ["**/*.integration.test.ts"],
testTimeout: 60_000,
},
});typescript
// approval.integration.test.ts
import { describe, it, expect } from "vitest";
import { start, getRun, resumeHook } from "workflow/api";
import { waitForHook, waitForSleep } from "@workflow/vitest";
import { approvalWorkflow } from "./approval";
describe("approvalWorkflow", () => {
it("should publish when approved", async () => {
const run = await start(approvalWorkflow, ["doc-123"]);
// 等待hook触发,然后恢复执行
await waitForHook(run, { token: "approval:doc-123" });
await resumeHook("approval:doc-123", { approved: true, reviewer: "alice" });
// 等待sleep触发,然后唤醒
const sleepId = await waitForSleep(run);
await getRun(run.runId).wakeUp({ correlationIds: [sleepId] });
const result = await run.returnValue;
expect(result).toEqual({ status: "published", reviewer: "alice" });
});
});测试Webhook: 可以直接给传递对象,无需启动HTTP服务:
resumeWebhook()Requesttypescript
import { start, resumeWebhook } from "workflow/api";
import { waitForHook } from "@workflow/vitest";
const run = await start(ingestWorkflow, ["ep-1"]);
const hook = await waitForHook(run); // 自动发现随机的webhook token
await resumeWebhook(hook.token, new Request("https://example.com/webhook", {
method: "POST",
body: JSON.stringify({ event: "order.created" }),
}));核心API:
- — 触发工作流
start() - — 等待工作流执行完成
run.returnValue - /
waitForHook(run, { token? })— 等待工作流到达暂停点waitForSleep(run) - /
resumeHook(token, data)— 恢复暂停的工作流resumeWebhook(token, request) - — 跳过
getRun(runId).wakeUp({ correlationIds })调用sleep()
最佳实践:
- 将单元测试(无需插件)和集成测试(需要插件)放在不同的配置文件中
workflow() - 基于测试数据使用确定性的hook token,方便恢复执行
- 设置足够长的—— 工作流的运行时间通常比普通单元测试长
testTimeout - 在集成测试中不生效 —— step的依赖会被esbuild打包
vi.mock()