using-workflows
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseWorking with Workflows
工作流使用指南
Create and run durable workflows with steps, streaming, and agent execution. Covers starting, resuming, and persisting workflow results.
创建并运行包含步骤、流处理和Agent执行的持久化工作流,涵盖工作流的启动、恢复以及结果持久化。
Working with Workflows
工作流使用指南
Create and run durable workflows with steps, streaming, and agent execution. Covers starting, resuming, and persisting workflow results.
See:
- Resource: in Fullstack Recipes
using-workflows - URL: https://fullstackrecipes.com/recipes/using-workflows
创建并运行包含步骤、流处理和Agent执行的持久化工作流,涵盖工作流的启动、恢复以及结果持久化。
参考:
- 资源:Fullstack Recipes中的
using-workflows - 链接:https://fullstackrecipes.com/recipes/using-workflows
Workflow Folder Structure
工作流文件夹结构
Each workflow has its own subfolder in :
src/workflows/src/workflows/
steps/ # Shared step functions
stream.ts # UI message stream helpers
chat/
index.ts # Workflow orchestration function ("use workflow")
steps/ # Workflow-specific steps ("use step")
history.ts
logger.ts
name-chat.ts
types.ts # Workflow-specific types- - Shared step functions reusable across workflows (e.g., stream helpers).
workflows/steps/ - - Contains the main workflow function with the
index.tsdirective. Orchestrates the workflow by calling step functions."use workflow" - - Contains individual step functions with the
steps/directive. Each step is a durable checkpoint."use step" - - Type definitions for the workflow's UI messages.
types.ts
每个工作流在目录下都有独立的子文件夹:
src/workflows/src/workflows/
steps/ # Shared step functions
stream.ts # UI message stream helpers
chat/
index.ts # Workflow orchestration function ("use workflow")
steps/ # Workflow-specific steps ("use step")
history.ts
logger.ts
name-chat.ts
types.ts # Workflow-specific types- - 可在多个工作流中复用的共享步骤函数(例如流处理工具)。
workflows/steps/ - - 包含带有
index.ts指令的主工作流函数,通过调用步骤函数来编排工作流。"use workflow" - - 包含带有
steps/指令的独立步骤函数,每个步骤都是一个持久化检查点。"use step" - - 工作流专属的UI消息类型定义。
types.ts
Creating a Workflow
创建工作流
Define workflows with the directive:
"use workflow"typescript
// src/workflows/chat/index.ts
import { getWorkflowMetadata, getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";
export async function chatWorkflow({ chatId, userMessage }) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
// Persist user message
await persistUserMessage({ chatId, message: userMessage });
// Create assistant placeholder with runId for resumption
const messageId = await createAssistantMessage({
chatId,
runId: workflowRunId,
});
// Get message history
const history = await getMessageHistory(chatId);
// Start the UI message stream
await startStream(messageId);
// Run agent with streaming
const { parts } = await chatAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
// Persist and finalize
await persistMessageParts({ chatId, messageId, parts });
// Finish the UI message stream
await finishStream();
await removeRunId(messageId);
}使用指令定义工作流:
"use workflow"typescript
// src/workflows/chat/index.ts
import { getWorkflowMetadata, getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";
export async function chatWorkflow({ chatId, userMessage }) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
// Persist user message
await persistUserMessage({ chatId, message: userMessage });
// Create assistant placeholder with runId for resumption
const messageId = await createAssistantMessage({
chatId,
runId: workflowRunId,
});
// Get message history
const history = await getMessageHistory(chatId);
// Start the UI message stream
await startStream(messageId);
// Run agent with streaming
const { parts } = await chatAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
// Persist and finalize
await persistMessageParts({ chatId, messageId, parts });
// Finish the UI message stream
await finishStream();
await removeRunId(messageId);
}Starting a Workflow
启动工作流
Use the function from :
startworkflow/apitypescript
import { start } from "workflow/api";
import { chatWorkflow } from "@/workflows/chat";
const run = await start(chatWorkflow, [{ chatId, userMessage }]);
// run.runId - unique identifier for this run
// run.readable - stream of UI message chunks使用中的函数:
workflow/apistarttypescript
import { start } from "workflow/api";
import { chatWorkflow } from "@/workflows/chat";
const run = await start(chatWorkflow, [{ chatId, userMessage }]);
// run.runId - unique identifier for this run
// run.readable - stream of UI message chunksResuming a Workflow Stream
恢复工作流流
Use to reconnect to an in-progress or completed workflow:
getRuntypescript
import { getRun } from "workflow/api";
const run = await getRun(runId);
const readable = await run.getReadable({ startIndex });使用重新连接到正在进行或已完成的工作流:
getRuntypescript
import { getRun } from "workflow/api";
const run = await getRun(runId);
const readable = await run.getReadable({ startIndex });Using Steps
使用步骤
Steps are durable checkpoints that persist their results:
typescript
async function getMessageHistory(chatId: string) {
"use step";
const dbMessages = await getChatMessages(chatId);
return convertDbMessagesToUIMessages(dbMessages);
}步骤是持久化检查点,会保存执行结果:
typescript
async function getMessageHistory(chatId: string) {
"use step";
const dbMessages = await getChatMessages(chatId);
return convertDbMessagesToUIMessages(dbMessages);
}Streaming UIMessageChunks
流式传输UIMessageChunks
When streaming responses to clients (e.g., chat messages), you must signal the start and end of the stream. This is required for proper stream framing with .
UIMessageChunkWorkflowChatTransportAlways call before and after:
startStream()agent.run()finishStream()typescript
import { getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";
export async function chatWorkflow({ chatId, messageId }) {
"use workflow";
const history = await getMessageHistory(chatId);
// Signal stream start with the message ID
await startStream(messageId);
// Run agent - streams UIMessageChunks to the client
const { parts } = await chatAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
await persistMessageParts({ chatId, messageId, parts });
// Signal stream end and close the writable
await finishStream();
}The stream step functions write messages:
UIMessageChunk- - Writes
startStream(messageId)to signal a new message{ type: "start", messageId } - - Writes
finishStream()and closes the stream{ type: "finish", finishReason: "stop" }
Without these signals, the client's cannot properly parse the streamed response.
WorkflowChatTransport向客户端流式传输响应(例如聊天消息)时,必须标记流的开始和结束。这是正确解析流响应的必要条件。
UIMessageChunkWorkflowChatTransport务必在之前调用,之后调用:
agent.run()startStream()finishStream()typescript
import { getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";
export async function chatWorkflow({ chatId, messageId }) {
"use workflow";
const history = await getMessageHistory(chatId);
// Signal stream start with the message ID
await startStream(messageId);
// Run agent - streams UIMessageChunks to the client
const { parts } = await chatAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
await persistMessageParts({ chatId, messageId, parts });
// Signal stream end and close the writable
await finishStream();
}流步骤函数会写入消息:
UIMessageChunk- - 写入
startStream(messageId)以标记新消息开始{ type: "start", messageId } - - 写入
finishStream()并关闭流{ type: "finish", finishReason: "stop" }
如果没有这些标记,客户端的将无法正确解析流式响应。
WorkflowChatTransportGetting Workflow Metadata
获取工作流元数据
Access the current run's metadata:
typescript
import { getWorkflowMetadata } from "workflow";
export async function chatWorkflow({ chatId }) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
// Store runId for resumption
await createAssistantMessage({ chatId, runId: workflowRunId });
}访问当前运行实例的元数据:
typescript
import { getWorkflowMetadata } from "workflow";
export async function chatWorkflow({ chatId }) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
// Store runId for resumption
await createAssistantMessage({ chatId, runId: workflowRunId });
}Workflow-Safe Logging
工作流安全日志
The workflow runtime doesn't support Node.js modules. Wrap logger calls in steps:
typescript
// src/workflows/chat/steps/logger.ts
import { logger } from "@/lib/logging/logger";
export async function log(
level: "info" | "warn" | "error" | "debug",
message: string,
data?: Record<string, unknown>,
): Promise<void> {
"use step";
if (data) {
logger[level](data, message);
} else {
logger[level](message);
}
}工作流运行时不支持Node.js模块,需将日志调用包装在步骤中:
typescript
// src/workflows/chat/steps/logger.ts
import { logger } from "@/lib/logging/logger";
export async function log(
level: "info" | "warn" | "error" | "debug",
message: string,
data?: Record<string, unknown>,
): Promise<void> {
"use step";
if (data) {
logger[level](data, message);
} else {
logger[level](message);
}
}Running Agents in Workflows
在工作流中运行Agent
Use the custom class for full streaming control:
Agenttypescript
import { getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";
export async function chatWorkflow({ chatId, userMessage }) {
"use workflow";
const messageId = await createAssistantMessage({ chatId, runId });
const history = await getMessageHistory(chatId);
await startStream(messageId);
const { parts } = await chatAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
await persistMessageParts({ chatId, messageId, parts });
await finishStream();
}使用自定义类实现完整的流控制:
Agenttypescript
import { getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";
export async function chatWorkflow({ chatId, userMessage }) {
"use workflow";
const messageId = await createAssistantMessage({ chatId, runId });
const history = await getMessageHistory(chatId);
await startStream(messageId);
const { parts } = await chatAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
await persistMessageParts({ chatId, messageId, parts });
await finishStream();
}Persisting Workflow Results
持久化工作流结果
Save agent output using step functions. The function validates that generic (returned by agents) match your application's specific tool and data types:
assertChatAgentPartsUIMessage["parts"]typescript
// src/workflows/chat/steps/history.ts
import type { UIMessage } from "ai";
import { insertMessageParts } from "@/lib/chat/queries";
import { assertChatAgentParts, type ChatAgentUIMessage } from "../types";
export async function persistMessageParts({
chatId,
messageId,
parts,
}: {
chatId: string;
messageId: string;
parts: UIMessage["parts"];
}): Promise<void> {
"use step";
assertChatAgentParts(parts);
await insertMessageParts(chatId, messageId, parts);
// Update chat timestamp
await db
.update(chats)
.set({ updatedAt: new Date() })
.where(eq(chats.id, chatId));
}使用步骤函数保存Agent输出。函数用于验证Agent返回的通用是否与应用的特定工具和数据类型匹配:
assertChatAgentPartsUIMessage["parts"]typescript
// src/workflows/chat/steps/history.ts
import type { UIMessage } from "ai";
import { insertMessageParts } from "@/lib/chat/queries";
import { assertChatAgentParts, type ChatAgentUIMessage } from "../types";
export async function persistMessageParts({
chatId,
messageId,
parts,
}: {
chatId: string;
messageId: string;
parts: UIMessage["parts"];
}): Promise<void> {
"use step";
assertChatAgentParts(parts);
await insertMessageParts(chatId, messageId, parts);
// Update chat timestamp
await db
.update(chats)
.set({ updatedAt: new Date() })
.where(eq(chats.id, chatId));
}