Loading...
Loading...
Create and run durable workflows with steps, streaming, and agent execution. Covers starting, resuming, and persisting workflow results.
npx skill4agent add andrelandgraf/fullstackrecipes using-workflowsusing-workflowssrc/workflows/src/workflows/
steps/ # Shared step functions
stream.ts # UI message stream helpers
chat/
index.ts # Workflow orchestration function ("use workflow")
steps/ # Workflow-specific steps ("use step")
history.ts
logger.ts
name-chat.ts
types.ts # Workflow-specific typesworkflows/steps/index.ts"use workflow"steps/"use step"types.ts"use workflow"// src/workflows/chat/index.ts
import { getWorkflowMetadata, getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";
export async function chatWorkflow({ chatId, userMessage }) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
// Persist user message
await persistUserMessage({ chatId, message: userMessage });
// Create assistant placeholder with runId for resumption
const messageId = await createAssistantMessage({
chatId,
runId: workflowRunId,
});
// Get message history
const history = await getMessageHistory(chatId);
// Start the UI message stream
await startStream(messageId);
// Run agent with streaming
const { parts } = await chatAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
// Persist and finalize
await persistMessageParts({ chatId, messageId, parts });
// Finish the UI message stream
await finishStream();
await removeRunId(messageId);
}startworkflow/apiimport { start } from "workflow/api";
import { chatWorkflow } from "@/workflows/chat";
const run = await start(chatWorkflow, [{ chatId, userMessage }]);
// run.runId - unique identifier for this run
// run.readable - stream of UI message chunksgetRunimport { getRun } from "workflow/api";
const run = await getRun(runId);
const readable = await run.getReadable({ startIndex });async function getMessageHistory(chatId: string) {
"use step";
const dbMessages = await getChatMessages(chatId);
return convertDbMessagesToUIMessages(dbMessages);
}UIMessageChunkWorkflowChatTransportstartStream()agent.run()finishStream()import { getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";
export async function chatWorkflow({ chatId, messageId }) {
"use workflow";
const history = await getMessageHistory(chatId);
// Signal stream start with the message ID
await startStream(messageId);
// Run agent - streams UIMessageChunks to the client
const { parts } = await chatAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
await persistMessageParts({ chatId, messageId, parts });
// Signal stream end and close the writable
await finishStream();
}UIMessageChunkstartStream(messageId){ type: "start", messageId }finishStream(){ type: "finish", finishReason: "stop" }WorkflowChatTransportimport { getWorkflowMetadata } from "workflow";
export async function chatWorkflow({ chatId }) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
// Store runId for resumption
await createAssistantMessage({ chatId, runId: workflowRunId });
}// src/workflows/chat/steps/logger.ts
import { logger } from "@/lib/logging/logger";
export async function log(
level: "info" | "warn" | "error" | "debug",
message: string,
data?: Record<string, unknown>,
): Promise<void> {
"use step";
if (data) {
logger[level](data, message);
} else {
logger[level](message);
}
}Agentimport { getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";
export async function chatWorkflow({ chatId, userMessage }) {
"use workflow";
const messageId = await createAssistantMessage({ chatId, runId });
const history = await getMessageHistory(chatId);
await startStream(messageId);
const { parts } = await chatAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
await persistMessageParts({ chatId, messageId, parts });
await finishStream();
}assertChatAgentPartsUIMessage["parts"]// src/workflows/chat/steps/history.ts
import type { UIMessage } from "ai";
import { insertMessageParts } from "@/lib/chat/queries";
import { assertChatAgentParts, type ChatAgentUIMessage } from "../types";
export async function persistMessageParts({
chatId,
messageId,
parts,
}: {
chatId: string;
messageId: string;
parts: UIMessage["parts"];
}): Promise<void> {
"use step";
assertChatAgentParts(parts);
await insertMessageParts(chatId, messageId, parts);
// Update chat timestamp
await db
.update(chats)
.set({ updatedAt: new Date() })
.where(eq(chats.id, chatId));
}