upstash-workflow
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseUpstash Workflow Implementation Guide
Upstash Workflow 实现指南
This guide covers the standard patterns for implementing Upstash Workflow + QStash async workflows in the LobeHub codebase.
本指南介绍了在 LobeHub 代码库中实现 Upstash Workflow + QStash 异步工作流的标准模式。
🎯 The Three Core Patterns
🎯 三大核心模式
All workflows in LobeHub follow the same 3-layer architecture with three essential patterns:
- 🔍 Dry-Run Mode - Get statistics without triggering actual execution
- 🌟 Fan-Out Pattern - Split large batches into smaller chunks for parallel processing
- 🎯 Single Task Execution - Each workflow execution processes ONE item only
These patterns ensure scalable, debuggable, and cost-efficient async workflows.
LobeHub 中所有的工作流都遵循相同的三层架构,包含三个核心模式:
- 🔍 试运行模式 - 无需触发实际执行即可获取统计数据
- 🌟 扇出模式 - 将大批量任务拆分为更小的块进行并行处理
- 🎯 单任务执行 - 每次工作流执行仅处理一个项目
这些模式可以保证异步工作流具备可扩展性、可调试性和成本效益。
Table of Contents
目录
Architecture Overview
架构概览
Standard 3-Layer Pattern
标准三层模式
All workflows follow a standard 3-layer architecture:
Layer 1: Entry Point (process-*)
├─ Validates prerequisites
├─ Calculates total items to process
├─ Filters existing items
├─ Supports dry-run mode (statistics only)
└─ Triggers Layer 2 if work needed
Layer 2: Pagination (paginate-*)
├─ Handles cursor-based pagination
├─ Implements fan-out for large batches
├─ Recursively processes all pages
└─ Triggers Layer 3 for each item
Layer 3: Single Task Execution (execute-*/generate-*)
└─ Performs actual business logic for ONE itemExamples: ,
welcome-placeholderagent-welcome所有工作流都遵循标准的三层架构:
Layer 1: 入口点 (process-*)
├─ 验证前置条件
├─ 计算待处理的项目总数
├─ 过滤已存在的项目
├─ 支持试运行模式(仅统计数据)
└─ 如有需要则触发第二层
Layer 2: 分页层 (paginate-*)
├─ 处理基于游标分页的逻辑
├─ 对大批量任务实现扇出
├─ 递归处理所有分页
└─ 为每个项目触发第三层
Layer 3: 单任务执行层 (execute-*/generate-*)
└─ 为单个项目执行业务逻辑示例: ,
welcome-placeholderagent-welcomeCore Patterns
核心模式
1. Dry-Run Mode
1. 试运行模式
Purpose: Get statistics without triggering actual execution
Pattern:
typescript
// Layer 1: Entry Point
if (dryRun) {
console.log('[workflow:process] Dry run mode, returning statistics only');
return {
...result,
dryRun: true,
message: `[DryRun] Would process ${itemsNeedingProcessing.length} items`,
};
}Use Case: Check how many items will be processed before committing to execution
Response:
typescript
{
success: true,
totalEligible: 100,
toProcess: 80,
alreadyProcessed: 20,
dryRun: true,
message: "[DryRun] Would process 80 items"
}用途: 无需触发实际执行即可获取统计数据
模式:
typescript
// 第一层:入口点
if (dryRun) {
console.log('[workflow:process] 试运行模式,仅返回统计数据');
return {
...result,
dryRun: true,
message: `[试运行] 将处理 ${itemsNeedingProcessing.length} 个项目`,
};
}使用场景: 在确认执行前检查将处理的项目数量
响应:
typescript
{
success: true,
totalEligible: 100,
toProcess: 80,
alreadyProcessed: 20,
dryRun: true,
message: "[试运行] 将处理 80 个项目"
}2. Fan-Out Pattern
2. 扇出模式
Purpose: Split large batches into smaller chunks for parallel processing
Pattern:
typescript
// Layer 2: Pagination
const CHUNK_SIZE = 20;
if (itemIds.length > CHUNK_SIZE) {
// Fan-out to smaller chunks
const chunks = chunk(itemIds, CHUNK_SIZE);
console.log('[workflow:paginate] Fan-out mode:', {
chunks: chunks.length,
chunkSize: CHUNK_SIZE,
totalItems: itemIds.length,
});
await Promise.all(
chunks.map((ids, idx) =>
context.run(`workflow:fanout:${idx + 1}/${chunks.length}`, () =>
WorkflowClass.triggerPaginateItems({ itemIds: ids }),
),
),
);
}Use Case: Avoid hitting workflow step limits by splitting large batches
Configuration:
- - Items per pagination page
PAGE_SIZE = 50 - - Items per fan-out chunk
CHUNK_SIZE = 20 - If batch > CHUNK_SIZE, split into chunks and recursively trigger pagination
用途: 将大批量任务拆分为更小的块进行并行处理
模式:
typescript
// 第二层:分页层
const CHUNK_SIZE = 20;
if (itemIds.length > CHUNK_SIZE) {
// 扇出为更小的块
const chunks = chunk(itemIds, CHUNK_SIZE);
console.log('[workflow:paginate] 扇出模式:', {
chunks: chunks.length,
chunkSize: CHUNK_SIZE,
totalItems: itemIds.length,
});
await Promise.all(
chunks.map((ids, idx) =>
context.run(`workflow:fanout:${idx + 1}/${chunks.length}`, () =>
WorkflowClass.triggerPaginateItems({ itemIds: ids }),
),
),
);
}使用场景: 通过拆分大批量任务避免触及工作流步骤限制
配置:
- - 每个分页的项目数
PAGE_SIZE = 50 - - 每个扇出块的项目数
CHUNK_SIZE = 20 - 如果批量任务数大于 CHUNK_SIZE,则拆分为多个块并递归触发分页
3. Single Task Execution
3. 单任务执行
Purpose: Execute business logic for ONE item at a time
Pattern:
typescript
// Layer 3: Single Task Execution
export const { POST } = serve<ExecutePayload>(
async (context) => {
const { itemId } = context.requestPayload ?? {};
if (!itemId) {
return { success: false, error: 'Missing itemId' };
}
// Get item
const item = await context.run('workflow:get-item', async () => {
return getItem(itemId);
});
// Execute business logic for THIS item only
const result = await context.run('workflow:execute', async () => {
return processItem(item);
});
// Save result for THIS item
await context.run('workflow:save', async () => {
return saveResult(itemId, result);
});
return { success: true, itemId, result };
},
{
flowControl: {
key: 'workflow.execute',
parallelism: 10,
ratePerSecond: 5,
},
},
);Key Principles:
- Each workflow execution handles exactly ONE item
- Parallelism controlled by config
flowControl - Multiple items processed via Layer 2 triggering multiple Layer 3 executions
用途: 一次仅为一个项目执行业务逻辑
模式:
typescript
// 第三层:单任务执行层
export const { POST } = serve<ExecutePayload>(
async (context) => {
const { itemId } = context.requestPayload ?? {};
if (!itemId) {
return { success: false, error: '缺少 itemId' };
}
// 获取项目
const item = await context.run('workflow:get-item', async () => {
return getItem(itemId);
});
// 仅为当前项目执行业务逻辑
const result = await context.run('workflow:execute', async () => {
return processItem(item);
});
// 保存当前项目的结果
await context.run('workflow:save', async () => {
return saveResult(itemId, result);
});
return { success: true, itemId, result };
},
{
flowControl: {
key: 'workflow.execute',
parallelism: 10,
ratePerSecond: 5,
},
},
);核心原则:
- 每次工作流执行仅处理一个项目
- 并行度由 配置控制
flowControl - 多个项目通过第二层触发多次第三层执行来处理
File Structure
文件结构
Directory Layout
目录布局
src/
├── app/(backend)/api/workflows/
│ └── {workflow-name}/
│ ├── process-{entities}/route.ts # Layer 1
│ ├── paginate-{entities}/route.ts # Layer 2
│ └── execute-{entity}/route.ts # Layer 3
│
└── server/workflows/
└── {workflowName}/
└── index.ts # Workflow classsrc/
├── app/(backend)/api/workflows/
│ └── {workflow-name}/
│ ├── process-{entities}/route.ts # 第一层
│ ├── paginate-{entities}/route.ts # 第二层
│ └── execute-{entity}/route.ts # 第三层
│
└── server/workflows/
└── {workflowName}/
└── index.ts # 工作流类Cloud Project Configuration
云项目配置
For lobehub-cloud specific configurations (re-exports, cloud-only workflows, deployment patterns), see:
📄 Cloud Configuration Guide
如需了解 lobehub-cloud 特定配置(重新导出、仅云端工作流、部署模式),请查看:
📄 云配置指南
Implementation Patterns
实现模式
1. Workflow Class
1. 工作流类
Location:
src/server/workflows/{workflowName}/index.tstypescript
import { Client } from '@upstash/workflow';
import debug from 'debug';
const log = debug('lobe-server:workflows:{workflow-name}');
// Workflow paths
const WORKFLOW_PATHS = {
processItems: '/api/workflows/{workflow-name}/process-items',
paginateItems: '/api/workflows/{workflow-name}/paginate-items',
executeItem: '/api/workflows/{workflow-name}/execute-item',
} as const;
// Payload types
export interface ProcessItemsPayload {
dryRun?: boolean;
force?: boolean;
}
export interface PaginateItemsPayload {
cursor?: string;
itemIds?: string[]; // For fanout chunks
}
export interface ExecuteItemPayload {
itemId: string;
}
/**
* Get workflow URL using APP_URL
*/
const getWorkflowUrl = (path: string): string => {
const baseUrl = process.env.APP_URL;
if (!baseUrl) throw new Error('APP_URL is required to trigger workflows');
return new URL(path, baseUrl).toString();
};
/**
* Get workflow client
*/
const getWorkflowClient = (): Client => {
const token = process.env.QSTASH_TOKEN;
if (!token) throw new Error('QSTASH_TOKEN is required to trigger workflows');
const config: ConstructorParameters<typeof Client>[0] = { token };
if (process.env.QSTASH_URL) {
(config as Record<string, unknown>).url = process.env.QSTASH_URL;
}
return new Client(config);
};
/**
* {Workflow Name} Workflow
*/
export class {WorkflowName}Workflow {
private static client: Client;
private static getClient(): Client {
if (!this.client) {
this.client = getWorkflowClient();
}
return this.client;
}
/**
* Trigger workflow to process items (entry point)
*/
static triggerProcessItems(payload: ProcessItemsPayload) {
const url = getWorkflowUrl(WORKFLOW_PATHS.processItems);
log('Triggering process-items workflow');
return this.getClient().trigger({ body: payload, url });
}
/**
* Trigger workflow to paginate items
*/
static triggerPaginateItems(payload: PaginateItemsPayload) {
const url = getWorkflowUrl(WORKFLOW_PATHS.paginateItems);
log('Triggering paginate-items workflow');
return this.getClient().trigger({ body: payload, url });
}
/**
* Trigger workflow to execute a single item
*/
static triggerExecuteItem(payload: ExecuteItemPayload) {
const url = getWorkflowUrl(WORKFLOW_PATHS.executeItem);
log('Triggering execute-item workflow: %s', payload.itemId);
return this.getClient().trigger({ body: payload, url });
}
/**
* Filter items that need processing (e.g., check Redis cache, database state)
*/
static async filterItemsNeedingProcessing(itemIds: string[]): Promise<string[]> {
if (itemIds.length === 0) return [];
// Check existing state (Redis, database, etc.)
// Return items that need processing
return itemIds;
}
}位置:
src/server/workflows/{workflowName}/index.tstypescript
import { Client } from '@upstash/workflow';
import debug from 'debug';
const log = debug('lobe-server:workflows:{workflow-name}');
// 工作流路径
const WORKFLOW_PATHS = {
processItems: '/api/workflows/{workflow-name}/process-items',
paginateItems: '/api/workflows/{workflow-name}/paginate-items',
executeItem: '/api/workflows/{workflow-name}/execute-item',
} as const;
// 负载类型
export interface ProcessItemsPayload {
dryRun?: boolean;
force?: boolean;
}
export interface PaginateItemsPayload {
cursor?: string;
itemIds?: string[]; // 用于扇出块
}
export interface ExecuteItemPayload {
itemId: string;
}
/**
* 使用 APP_URL 获取工作流 URL
*/
const getWorkflowUrl = (path: string): string => {
const baseUrl = process.env.APP_URL;
if (!baseUrl) throw new Error('触发工作流需要配置 APP_URL');
return new URL(path, baseUrl).toString();
};
/**
* 获取工作流客户端
*/
const getWorkflowClient = (): Client => {
const token = process.env.QSTASH_TOKEN;
if (!token) throw new Error('触发工作流需要配置 QSTASH_TOKEN');
const config: ConstructorParameters<typeof Client>[0] = { token };
if (process.env.QSTASH_URL) {
(config as Record<string, unknown>).url = process.env.QSTASH_URL;
}
return new Client(config);
};
/**
* {工作流名称} 工作流
*/
export class {WorkflowName}Workflow {
private static client: Client;
private static getClient(): Client {
if (!this.client) {
this.client = getWorkflowClient();
}
return this.client;
}
/**
* 触发工作流处理项目(入口点)
*/
static triggerProcessItems(payload: ProcessItemsPayload) {
const url = getWorkflowUrl(WORKFLOW_PATHS.processItems);
log('触发 process-items 工作流');
return this.getClient().trigger({ body: payload, url });
}
/**
* 触发工作流分页处理项目
*/
static triggerPaginateItems(payload: PaginateItemsPayload) {
const url = getWorkflowUrl(WORKFLOW_PATHS.paginateItems);
log('触发 paginate-items 工作流');
return this.getClient().trigger({ body: payload, url });
}
/**
* 触发工作流执行单个项目
*/
static triggerExecuteItem(payload: ExecuteItemPayload) {
const url = getWorkflowUrl(WORKFLOW_PATHS.executeItem);
log('触发 execute-item 工作流: %s', payload.itemId);
return this.getClient().trigger({ body: payload, url });
}
/**
* 过滤需要处理的项目(例如检查 Redis 缓存、数据库状态)
*/
static async filterItemsNeedingProcessing(itemIds: string[]): Promise<string[]> {
if (itemIds.length === 0) return [];
// 检查现有状态(Redis、数据库等)
// 返回需要处理的项目
return itemIds;
}
}2. Layer 1: Entry Point (process-*)
2. 第一层:入口点 (process-*)
Purpose: Validates prerequisites, calculates statistics, supports dryRun mode
typescript
import { serve } from '@upstash/workflow/nextjs';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type ProcessPayload } from '@/server/workflows/{workflowName}';
/**
* Entry workflow for {workflow description}
* 1. Get all eligible items
* 2. Filter items that already have results
* 3. If dryRun, return statistics only
* 4. If no items need processing, return early
* 5. Trigger paginate workflow
*/
export const { POST } = serve<ProcessPayload>(
async (context) => {
const { dryRun, force } = context.requestPayload ?? {};
console.log('[{workflow}:process] Starting with payload:', { dryRun, force });
// Get all eligible items
const allItemIds = await context.run('{workflow}:get-all-items', async () => {
const db = await getServerDB();
// Query database for eligible items
return items.map((item) => item.id);
});
console.log('[{workflow}:process] Total eligible items:', allItemIds.length);
if (allItemIds.length === 0) {
return {
success: true,
totalEligible: 0,
message: 'No eligible items found',
};
}
// Filter items that need processing
const itemsNeedingProcessing = await context.run('{workflow}:filter-existing', () =>
WorkflowClass.filterItemsNeedingProcessing(allItemIds),
);
const result = {
success: true,
totalEligible: allItemIds.length,
toProcess: itemsNeedingProcessing.length,
alreadyProcessed: allItemIds.length - itemsNeedingProcessing.length,
};
console.log('[{workflow}:process] Check result:', result);
// If dryRun mode, return statistics only
if (dryRun) {
console.log('[{workflow}:process] Dry run mode, returning statistics only');
return {
...result,
dryRun: true,
message: `[DryRun] Would process ${itemsNeedingProcessing.length} items`,
};
}
// If no items need processing, return early
if (itemsNeedingProcessing.length === 0) {
console.log('[{workflow}:process] All items already processed');
return {
...result,
message: 'All items already processed',
};
}
// Trigger paginate workflow
console.log('[{workflow}:process] Triggering paginate workflow');
await context.run('{workflow}:trigger-paginate', () => WorkflowClass.triggerPaginateItems({}));
return {
...result,
message: `Triggered pagination for ${itemsNeedingProcessing.length} items`,
};
},
{
flowControl: {
key: '{workflow}.process',
parallelism: 1,
ratePerSecond: 1,
},
},
);用途: 验证前置条件、计算统计数据、支持试运行模式
typescript
import { serve } from '@upstash/workflow/nextjs';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type ProcessPayload } from '@/server/workflows/{workflowName}';
/**
* {工作流描述} 的入口工作流
* 1. 获取所有符合条件的项目
* 2. 过滤已有结果的项目
* 3. 如果是试运行模式,仅返回统计数据
* 4. 如果没有需要处理的项目,提前返回
* 5. 触发分页工作流
*/
export const { POST } = serve<ProcessPayload>(
async (context) => {
const { dryRun, force } = context.requestPayload ?? {};
console.log('[{workflow}:process] 启动,负载:', { dryRun, force });
// 获取所有符合条件的项目
const allItemIds = await context.run('{workflow}:get-all-items', async () => {
const db = await getServerDB();
// 查询数据库获取符合条件的项目
return items.map((item) => item.id);
});
console.log('[{workflow}:process] 符合条件的项目总数:', allItemIds.length);
if (allItemIds.length === 0) {
return {
success: true,
totalEligible: 0,
message: '未找到符合条件的项目',
};
}
// 过滤需要处理的项目
const itemsNeedingProcessing = await context.run('{workflow}:filter-existing', () =>
WorkflowClass.filterItemsNeedingProcessing(allItemIds),
);
const result = {
success: true,
totalEligible: allItemIds.length,
toProcess: itemsNeedingProcessing.length,
alreadyProcessed: allItemIds.length - itemsNeedingProcessing.length,
};
console.log('[{workflow}:process] 检查结果:', result);
// 如果是试运行模式,仅返回统计数据
if (dryRun) {
console.log('[{workflow}:process] 试运行模式,仅返回统计数据');
return {
...result,
dryRun: true,
message: `[试运行] 将处理 ${itemsNeedingProcessing.length} 个项目`,
};
}
// 如果没有需要处理的项目,提前返回
if (itemsNeedingProcessing.length === 0) {
console.log('[{workflow}:process] 所有项目已处理完成');
return {
...result,
message: '所有项目已处理完成',
};
}
// 触发分页工作流
console.log('[{workflow}:process] 触发分页工作流');
await context.run('{workflow}:trigger-paginate', () => WorkflowClass.triggerPaginateItems({}));
return {
...result,
message: `已触发 ${itemsNeedingProcessing.length} 个项目的分页处理`,
};
},
{
flowControl: {
key: '{workflow}.process',
parallelism: 1,
ratePerSecond: 1,
},
},
);3. Layer 2: Pagination (paginate-*)
3. 第二层:分页层 (paginate-*)
Purpose: Handles cursor-based pagination, implements fanout for large batches
typescript
import { serve } from '@upstash/workflow/nextjs';
import { chunk } from 'es-toolkit/compat';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type PaginatePayload } from '@/server/workflows/{workflowName}';
const PAGE_SIZE = 50;
const CHUNK_SIZE = 20;
/**
* Paginate items workflow - handles pagination and fanout
* 1. If specific itemIds provided (from fanout), process them directly
* 2. Otherwise, paginate through all items using cursor
* 3. Filter items that need processing
* 4. If batch > CHUNK_SIZE, fanout to smaller chunks
* 5. Trigger execute workflow for each item
* 6. Schedule next page if cursor exists
*/
export const { POST } = serve<PaginatePayload>(
async (context) => {
const { cursor, itemIds: payloadItemIds } = context.requestPayload ?? {};
console.log('[{workflow}:paginate] Starting with payload:', {
cursor,
itemIdsCount: payloadItemIds?.length ?? 0,
});
// If specific itemIds are provided, process them directly (from fanout)
if (payloadItemIds && payloadItemIds.length > 0) {
console.log('[{workflow}:paginate] Processing specific itemIds:', {
count: payloadItemIds.length,
});
await Promise.all(
payloadItemIds.map((itemId) =>
context.run(`{workflow}:execute:${itemId}`, () =>
WorkflowClass.triggerExecuteItem({ itemId }),
),
),
);
return {
success: true,
processedItems: payloadItemIds.length,
};
}
// Paginate through all items
const itemBatch = await context.run('{workflow}:get-batch', async () => {
const db = await getServerDB();
// Query database with cursor and PAGE_SIZE
const items = await db.query(...);
if (!items.length) return { ids: [] };
const last = items.at(-1);
return {
ids: items.map(item => item.id),
cursor: last ? last.id : undefined,
};
});
const batchItemIds = itemBatch.ids;
const nextCursor = 'cursor' in itemBatch ? itemBatch.cursor : undefined;
console.log('[{workflow}:paginate] Got batch:', {
batchSize: batchItemIds.length,
nextCursor,
});
if (batchItemIds.length === 0) {
console.log('[{workflow}:paginate] No more items, pagination complete');
return { success: true, message: 'Pagination complete' };
}
// Filter items that need processing
const itemIds = await context.run('{workflow}:filter-existing', () =>
WorkflowClass.filterItemsNeedingProcessing(batchItemIds),
);
console.log('[{workflow}:paginate] After filtering:', {
needProcessing: itemIds.length,
skipped: batchItemIds.length - itemIds.length,
});
// Process items if any need processing
if (itemIds.length > 0) {
if (itemIds.length > CHUNK_SIZE) {
// Fanout to smaller chunks
const chunks = chunk(itemIds, CHUNK_SIZE);
console.log('[{workflow}:paginate] Fanout mode:', {
chunks: chunks.length,
chunkSize: CHUNK_SIZE,
totalItems: itemIds.length,
});
await Promise.all(
chunks.map((ids, idx) =>
context.run(`{workflow}:fanout:${idx + 1}/${chunks.length}`, () =>
WorkflowClass.triggerPaginateItems({ itemIds: ids }),
),
),
);
} else {
// Process directly
console.log('[{workflow}:paginate] Processing items directly:', {
count: itemIds.length,
});
await Promise.all(
itemIds.map((itemId) =>
context.run(`{workflow}:execute:${itemId}`, () =>
WorkflowClass.triggerExecuteItem({ itemId }),
),
),
);
}
}
// Schedule next page
if (nextCursor) {
console.log('[{workflow}:paginate] Scheduling next page:', { nextCursor });
await context.run('{workflow}:next-page', () =>
WorkflowClass.triggerPaginateItems({ cursor: nextCursor }),
);
} else {
console.log('[{workflow}:paginate] No more pages');
}
return {
success: true,
processedItems: itemIds.length,
skippedItems: batchItemIds.length - itemIds.length,
nextCursor: nextCursor ?? null,
};
},
{
flowControl: {
key: '{workflow}.paginate',
parallelism: 20,
ratePerSecond: 5,
},
},
);用途: 处理基于游标分页的逻辑,为大批量任务实现扇出
typescript
import { serve } from '@upstash/workflow/nextjs';
import { chunk } from 'es-toolkit/compat';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type PaginatePayload } from '@/server/workflows/{workflowName}';
const PAGE_SIZE = 50;
const CHUNK_SIZE = 20;
/**
* 分页项目工作流 - 处理分页和扇出逻辑
* 1. 如果提供了特定的 itemIds(来自扇出),直接处理
* 2. 否则,使用游标分页遍历所有项目
* 3. 过滤需要处理的项目
* 4. 如果批量任务数大于 CHUNK_SIZE,扇出为更小的块
* 5. 为每个项目触发执行工作流
* 6. 如果存在游标,调度下一页处理
*/
export const { POST } = serve<PaginatePayload>(
async (context) => {
const { cursor, itemIds: payloadItemIds } = context.requestPayload ?? {};
console.log('[{workflow}:paginate] 启动,负载:', {
cursor,
itemIdsCount: payloadItemIds?.length ?? 0,
});
// 如果提供了特定的 itemIds,直接处理(来自扇出)
if (payloadItemIds && payloadItemIds.length > 0) {
console.log('[{workflow}:paginate] 处理指定的 itemIds:', {
count: payloadItemIds.length,
});
await Promise.all(
payloadItemIds.map((itemId) =>
context.run(`{workflow}:execute:${itemId}`, () =>
WorkflowClass.triggerExecuteItem({ itemId }),
),
),
);
return {
success: true,
processedItems: payloadItemIds.length,
};
}
// 分页遍历所有项目
const itemBatch = await context.run('{workflow}:get-batch', async () => {
const db = await getServerDB();
// 使用游标和 PAGE_SIZE 查询数据库
const items = await db.query(...);
if (!items.length) return { ids: [] };
const last = items.at(-1);
return {
ids: items.map(item => item.id),
cursor: last ? last.id : undefined,
};
});
const batchItemIds = itemBatch.ids;
const nextCursor = 'cursor' in itemBatch ? itemBatch.cursor : undefined;
console.log('[{workflow}:paginate] 获取到批量数据:', {
batchSize: batchItemIds.length,
nextCursor,
});
if (batchItemIds.length === 0) {
console.log('[{workflow}:paginate] 没有更多项目,分页完成');
return { success: true, message: '分页完成' };
}
// 过滤需要处理的项目
const itemIds = await context.run('{workflow}:filter-existing', () =>
WorkflowClass.filterItemsNeedingProcessing(batchItemIds),
);
console.log('[{workflow}:paginate] 过滤后:', {
needProcessing: itemIds.length,
skipped: batchItemIds.length - itemIds.length,
});
// 如果有需要处理的项目则处理
if (itemIds.length > 0) {
if (itemIds.length > CHUNK_SIZE) {
// 扇出为更小的块
const chunks = chunk(itemIds, CHUNK_SIZE);
console.log('[{workflow}:paginate] 扇出模式:', {
chunks: chunks.length,
chunkSize: CHUNK_SIZE,
totalItems: itemIds.length,
});
await Promise.all(
chunks.map((ids, idx) =>
context.run(`{workflow}:fanout:${idx + 1}/${chunks.length}`, () =>
WorkflowClass.triggerPaginateItems({ itemIds: ids }),
),
),
);
} else {
// 直接处理
console.log('[{workflow}:paginate] 直接处理项目:', {
count: itemIds.length,
});
await Promise.all(
itemIds.map((itemId) =>
context.run(`{workflow}:execute:${itemId}`, () =>
WorkflowClass.triggerExecuteItem({ itemId }),
),
),
);
}
}
// 调度下一页
if (nextCursor) {
console.log('[{workflow}:paginate] 调度下一页:', { nextCursor });
await context.run('{workflow}:next-page', () =>
WorkflowClass.triggerPaginateItems({ cursor: nextCursor }),
);
} else {
console.log('[{workflow}:paginate] 没有更多页面');
}
return {
success: true,
processedItems: itemIds.length,
skippedItems: batchItemIds.length - itemIds.length,
nextCursor: nextCursor ?? null,
};
},
{
flowControl: {
key: '{workflow}.paginate',
parallelism: 20,
ratePerSecond: 5,
},
},
);4. Layer 3: Execution (execute-/generate-)
4. 第三层:执行层 (execute-/generate-)
Purpose: Performs actual business logic
typescript
import { serve } from '@upstash/workflow/nextjs';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type ExecutePayload } from '@/server/workflows/{workflowName}';
/**
* Execute item workflow - performs actual business logic
* 1. Get item data
* 2. Perform business logic (AI generation, data processing, etc.)
* 3. Save results
*/
export const { POST } = serve<ExecutePayload>(
async (context) => {
const { itemId } = context.requestPayload ?? {};
console.log('[{workflow}:execute] Starting:', { itemId });
if (!itemId) {
return { success: false, error: 'Missing itemId' };
}
const db = await getServerDB();
// Get item data
const item = await context.run('{workflow}:get-item', async () => {
// Query database for item
return item;
});
if (!item) {
return { success: false, error: 'Item not found' };
}
// Perform business logic
const result = await context.run('{workflow}:process-item', async () => {
const workflow = new WorkflowClass(db, itemId);
return workflow.generate(); // or process(), execute(), etc.
});
// Save results
await context.run('{workflow}:save-result', async () => {
const workflow = new WorkflowClass(db, itemId);
return workflow.saveToRedis(result); // or saveToDatabase(), etc.
});
console.log('[{workflow}:execute] Completed:', { itemId });
return {
success: true,
itemId,
result,
};
},
{
flowControl: {
key: '{workflow}.execute',
parallelism: 10,
ratePerSecond: 5,
},
},
);用途: 执行实际的业务逻辑
typescript
import { serve } from '@upstash/workflow/nextjs';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type ExecutePayload } from '@/server/workflows/{workflowName}';
/**
* 执行项目工作流 - 执行实际的业务逻辑
* 1. 获取项目数据
* 2. 执行业务逻辑(AI 生成、数据处理等)
* 3. 保存结果
*/
export const { POST } = serve<ExecutePayload>(
async (context) => {
const { itemId } = context.requestPayload ?? {};
console.log('[{workflow}:execute] 启动:', { itemId });
if (!itemId) {
return { success: false, error: '缺少 itemId' };
}
const db = await getServerDB();
// 获取项目数据
const item = await context.run('{workflow}:get-item', async () => {
// 查询数据库获取项目
return item;
});
if (!item) {
return { success: false, error: '项目不存在' };
}
// 执行业务逻辑
const result = await context.run('{workflow}:process-item', async () => {
const workflow = new WorkflowClass(db, itemId);
return workflow.generate(); // 或 process(), execute() 等
});
// 保存结果
await context.run('{workflow}:save-result', async () => {
const workflow = new WorkflowClass(db, itemId);
return workflow.saveToRedis(result); // 或 saveToDatabase() 等
});
console.log('[{workflow}:execute] 完成:', { itemId });
return {
success: true,
itemId,
result,
};
},
{
flowControl: {
key: '{workflow}.execute',
parallelism: 10,
ratePerSecond: 5,
},
},
);Best Practices
最佳实践
1. Error Handling
1. 错误处理
typescript
export const { POST } = serve<Payload>(
async (context) => {
const { itemId } = context.requestPayload ?? {};
// Validate required parameters
if (!itemId) {
return { success: false, error: 'Missing itemId in payload' };
}
try {
// Perform work
const result = await context.run('step-name', () => doWork(itemId));
return { success: true, itemId, result };
} catch (error) {
console.error('[workflow:error]', error);
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error'
};
}
},
{ flowControl: { ... } },
);typescript
export const { POST } = serve<Payload>(
async (context) => {
const { itemId } = context.requestPayload ?? {};
// 验证必填参数
if (!itemId) {
return { success: false, error: '负载中缺少 itemId' };
}
try {
// 执行任务
const result = await context.run('step-name', () => doWork(itemId));
return { success: true, itemId, result };
} catch (error) {
console.error('[workflow:error]', error);
return {
success: false,
error: error instanceof Error ? error.message : '未知错误'
};
}
},
{ flowControl: { ... } },
);2. Logging
2. 日志记录
Use consistent log prefixes and structured logging:
typescript
console.log('[{workflow}:{layer}] Starting with payload:', payload);
console.log('[{workflow}:{layer}] Processing items:', { count: items.length });
console.log('[{workflow}:{layer}] Completed:', result);
console.error('[{workflow}:{layer}:error]', error);使用统一的日志前缀和结构化日志:
typescript
console.log('[{workflow}:{layer}] 启动,负载:', payload);
console.log('[{workflow}:{layer}] 处理项目:', { count: items.length });
console.log('[{workflow}:{layer}] 完成:', result);
console.error('[{workflow}:{layer}:error]', error);3. Return Values
3. 返回值
Return consistent response shapes:
typescript
// Success response
return {
success: true,
itemId,
result,
message: 'Optional success message',
};
// Error response
return {
success: false,
error: 'Error description',
itemId, // Include context if available
};
// Statistics response (for entry point)
return {
success: true,
totalEligible: 100,
toProcess: 80,
alreadyProcessed: 20,
dryRun: true, // If applicable
message: 'Summary message',
};返回统一的响应结构:
typescript
// 成功响应
return {
success: true,
itemId,
result,
message: '可选的成功消息',
};
// 错误响应
return {
success: false,
error: '错误描述',
itemId, // 如有上下文请包含
};
// 统计响应(入口点使用)
return {
success: true,
totalEligible: 100,
toProcess: 80,
alreadyProcessed: 20,
dryRun: true, // 如适用
message: '摘要消息',
};4. flowControl Configuration
4. flowControl 配置
Purpose: Control concurrency and rate limiting for workflow executions
Tune concurrency based on layer:
typescript
// Layer 1: Entry point - single instance only
flowControl: {
key: '{workflow}.process',
parallelism: 1, // Only 1 process workflow at a time
ratePerSecond: 1, // 1 execution per second
}
// Layer 2: Pagination - moderate concurrency
flowControl: {
key: '{workflow}.paginate',
parallelism: 20, // Up to 20 pagination workflows in parallel
ratePerSecond: 5, // 5 new executions per second
}
// Layer 3: Single task execution - high concurrency
flowControl: {
key: '{workflow}.execute',
parallelism: 10, // Up to 10 items processed in parallel
ratePerSecond: 5, // 5 new items per second
}Guidelines:
- Layer 1: Always use to avoid duplicate processing
parallelism: 1 - Layer 2: Moderate concurrency for pagination (typically 10-20)
- Layer 3: Higher concurrency for parallel item processing (typically 5-10)
- Adjust based on external API rate limits or resource constraints
ratePerSecond
用途: 控制工作流执行的并发数和速率限制
根据层级调整并发度:
typescript
// 第一层:入口点 - 仅允许单实例
flowControl: {
key: '{workflow}.process',
parallelism: 1, // 一次仅运行一个处理工作流
ratePerSecond: 1, // 每秒执行1次
}
// 第二层:分页层 - 中等并发
flowControl: {
key: '{workflow}.paginate',
parallelism: 20, // 最多同时运行20个分页工作流
ratePerSecond: 5, // 每秒新增5次执行
}
// 第三层:单任务执行层 - 高并发
flowControl: {
key: '{workflow}.execute',
parallelism: 10, // 最多同时处理10个项目
ratePerSecond: 5, // 每秒新增5个项目处理
}指南:
- 第一层: 始终使用 避免重复处理
parallelism: 1 - 第二层: 分页使用中等并发(通常10-20)
- 第三层: 并行项目处理使用更高并发(通常5-10)
- 根据外部API速率限制或资源约束调整
ratePerSecond
5. context.run() Best Practices
5. context.run() 最佳实践
- Use descriptive step names with prefixes:
{workflow}:step-name - Each step should be idempotent (safe to retry)
- Don't nest context.run() calls - keep them flat
- Use unique step names when processing multiple items:
typescript
// Good: Unique step names
await Promise.all(
items.map((item) => context.run(`{workflow}:execute:${item.id}`, () => processItem(item))),
);
// Bad: Same step name for all items
await Promise.all(
items.map((item) =>
context.run(`{workflow}:execute`, () =>
// ❌ Not unique
processItem(item),
),
),
);- 使用带前缀的描述性步骤名称:
{workflow}:step-name - 每个步骤应该是幂等的(重试安全)
- 不要嵌套 context.run() 调用 - 保持扁平化
- 处理多个项目时使用唯一的步骤名称:
typescript
// 好:唯一的步骤名称
await Promise.all(
items.map((item) => context.run(`{workflow}:execute:${item.id}`, () => processItem(item))),
);
// 坏:所有项目使用相同的步骤名称
await Promise.all(
items.map((item) =>
context.run(`{workflow}:execute`, () =>
// ❌ 不唯一
processItem(item),
),
),
);6. Payload Validation
6. 负载验证
Always validate required parameters at the start:
typescript
export const { POST } = serve<Payload>(
async (context) => {
const { itemId, configId } = context.requestPayload ?? {};
// Validate at the start
if (!itemId) {
return { success: false, error: 'Missing itemId in payload' };
}
if (!configId) {
return { success: false, error: 'Missing configId in payload' };
}
// Proceed with work...
},
{ flowControl: { ... } },
);始终在开头验证必填参数:
typescript
export const { POST } = serve<Payload>(
async (context) => {
const { itemId, configId } = context.requestPayload ?? {};
// 开头验证
if (!itemId) {
return { success: false, error: '负载中缺少 itemId' };
}
if (!configId) {
return { success: false, error: '负载中缺少 configId' };
}
// 继续执行任务...
},
{ flowControl: { ... } },
);7. Database Connection
7. 数据库连接
Get database connection once per workflow:
typescript
export const { POST } = serve<Payload>(
async (context) => {
const db = await getServerDB(); // Get once
// Use in multiple steps
const item = await context.run('get-item', async () => {
return itemModel.findById(db, itemId);
});
const result = await context.run('save-result', async () => {
return resultModel.create(db, result);
});
},
{ flowControl: { ... } },
);每个工作流仅获取一次数据库连接:
typescript
export const { POST } = serve<Payload>(
async (context) => {
const db = await getServerDB(); // 仅获取一次
// 在多个步骤中使用
const item = await context.run('get-item', async () => {
return itemModel.findById(db, itemId);
});
const result = await context.run('save-result', async () => {
return resultModel.create(db, result);
});
},
{ flowControl: { ... } },
);8. Testing
8. 测试
Create integration tests for workflows:
typescript
describe('WorkflowName', () => {
it('should process items successfully', async () => {
// Setup test data
const items = await createTestItems();
// Trigger workflow
await WorkflowClass.triggerProcessItems({ dryRun: false });
// Wait for completion (use polling or webhook)
await waitForCompletion();
// Verify results
const results = await getResults();
expect(results).toHaveLength(items.length);
});
it('should support dryRun mode', async () => {
const result = await WorkflowClass.triggerProcessItems({ dryRun: true });
expect(result).toMatchObject({
success: true,
dryRun: true,
totalEligible: expect.any(Number),
toProcess: expect.any(Number),
});
});
});为工作流创建集成测试:
typescript
describe('WorkflowName', () => {
it('应该成功处理项目', async () => {
// 设置测试数据
const items = await createTestItems();
// 触发工作流
await WorkflowClass.triggerProcessItems({ dryRun: false });
// 等待完成(使用轮询或webhook)
await waitForCompletion();
// 验证结果
const results = await getResults();
expect(results).toHaveLength(items.length);
});
it('应该支持试运行模式', async () => {
const result = await WorkflowClass.triggerProcessItems({ dryRun: true });
expect(result).toMatchObject({
success: true,
dryRun: true,
totalEligible: expect.any(Number),
toProcess: expect.any(Number),
});
});
});Examples
示例
Example 1: Welcome Placeholder
示例1:欢迎占位符
Use Case: Generate AI-powered welcome placeholders for users
Structure:
- Layer 1: - Entry point, checks eligible users
process-users - Layer 2: - Paginates through active users
paginate-users - Layer 3: - Generates placeholders for ONE user
generate-user
Core Patterns Demonstrated:
- Dry-Run Mode:
typescript
// Layer 1: process-users
if (dryRun) {
return {
...result,
dryRun: true,
message: `[DryRun] Would process ${usersNeedingGeneration.length} users`,
};
}- Fan-Out Pattern:
typescript
// Layer 2: paginate-users
if (userIds.length > CHUNK_SIZE) {
const chunks = chunk(userIds, CHUNK_SIZE);
await Promise.all(
chunks.map((ids, idx) =>
context.run(`welcome-placeholder:fanout:${idx + 1}/${chunks.length}`, () =>
WelcomePlaceholderWorkflow.triggerPaginateUsers({ userIds: ids }),
),
),
);
}- Single Task Execution:
typescript
// Layer 3: generate-user
export const { POST } = serve<GenerateUserPlaceholderPayload>(async (context) => {
const { userId } = context.requestPayload ?? {};
// Execute for ONE user only
const workflow = new WelcomePlaceholderWorkflow(db, userId);
const placeholders = await context.run('generate', () => workflow.generate());
return { success: true, userId, placeholdersCount: placeholders.length };
});Key Features:
- ✅ Filters users who already have cached placeholders in Redis
- ✅ Supports flag to process only subscribed users
paidOnly - ✅ Supports mode for statistics
dryRun - ✅ Uses fan-out for large user batches (CHUNK_SIZE=20)
- ✅ Each execution processes exactly ONE user
Files:
/api/workflows/welcome-placeholder/process-users/route.ts/api/workflows/welcome-placeholder/paginate-users/route.ts/api/workflows/welcome-placeholder/generate-user/route.ts/server/workflows/welcomePlaceholder/index.ts
使用场景: 为用户生成AI驱动的欢迎占位符
结构:
- 第一层: - 入口点,检查符合条件的用户
process-users - 第二层: - 分页遍历活跃用户
paginate-users - 第三层: - 为单个用户生成占位符
generate-user
核心模式演示:
- 试运行模式:
typescript
// 第一层:process-users
if (dryRun) {
return {
...result,
dryRun: true,
message: `[试运行] 将为 ${usersNeedingGeneration.length} 个用户生成内容`,
};
}- 扇出模式:
typescript
// 第二层:paginate-users
if (userIds.length > CHUNK_SIZE) {
const chunks = chunk(userIds, CHUNK_SIZE);
await Promise.all(
chunks.map((ids, idx) =>
context.run(`welcome-placeholder:fanout:${idx + 1}/${chunks.length}`, () =>
WelcomePlaceholderWorkflow.triggerPaginateUsers({ userIds: ids }),
),
),
);
}- 单任务执行:
typescript
// 第三层:generate-user
export const { POST } = serve<GenerateUserPlaceholderPayload>(async (context) => {
const { userId } = context.requestPayload ?? {};
// 仅为单个用户执行
const workflow = new WelcomePlaceholderWorkflow(db, userId);
const placeholders = await context.run('generate', () => workflow.generate());
return { success: true, userId, placeholdersCount: placeholders.length };
});核心特性:
- ✅ 过滤Redis中已有缓存占位符的用户
- ✅ 支持 标志仅处理订阅用户
paidOnly - ✅ 支持 模式获取统计数据
dryRun - ✅ 对大量用户批量使用扇出(CHUNK_SIZE=20)
- ✅ 每次执行仅处理一个用户
文件:
/api/workflows/welcome-placeholder/process-users/route.ts/api/workflows/welcome-placeholder/paginate-users/route.ts/api/workflows/welcome-placeholder/generate-user/route.ts/server/workflows/welcomePlaceholder/index.ts
Example 2: Agent Welcome
示例2:Agent欢迎信息
Use Case: Generate welcome messages and open questions for AI agents
Structure:
- Layer 1: - Entry point, checks eligible agents
process-agents - Layer 2: - Paginates through active agents
paginate-agents - Layer 3: - Generates welcome data for ONE agent
generate-agent
Core Patterns Demonstrated:
- Dry-Run Mode:
typescript
// Layer 1: process-agents
if (dryRun) {
return {
...result,
dryRun: true,
message: `[DryRun] Would process ${agentsNeedingGeneration.length} agents`,
};
}-
Fan-Out Pattern: Same as welcome-placeholder
-
Single Task Execution:
typescript
// Layer 3: generate-agent
export const { POST } = serve<GenerateAgentWelcomePayload>(async (context) => {
const { agentId } = context.requestPayload ?? {};
// Execute for ONE agent only
const workflow = new AgentWelcomeWorkflow(db, agentId);
const data = await context.run('generate', () => workflow.generate());
return { success: true, agentId, data };
});Key Features:
- ✅ Filters agents who already have cached data in Redis
- ✅ Supports flag for subscribed users' agents only
paidOnly - ✅ Supports mode for statistics
dryRun - ✅ Uses fan-out for large agent batches (CHUNK_SIZE=20)
- ✅ Each execution processes exactly ONE agent
Files:
/api/workflows/agent-welcome/process-agents/route.ts/api/workflows/agent-welcome/paginate-agents/route.ts/api/workflows/agent-welcome/generate-agent/route.ts/server/workflows/agentWelcome/index.ts
使用场景: 为AI Agent生成欢迎消息和开放式问题
结构:
- 第一层: - 入口点,检查符合条件的Agent
process-agents - 第二层: - 分页遍历活跃Agent
paginate-agents - 第三层: - 为单个Agent生成欢迎数据
generate-agent
核心模式演示:
- 试运行模式:
typescript
// 第一层:process-agents
if (dryRun) {
return {
...result,
dryRun: true,
message: `[试运行] 将为 ${agentsNeedingGeneration.length} 个Agent生成内容`,
};
}-
扇出模式: 和欢迎占位符示例相同
-
单任务执行:
typescript
// 第三层:generate-agent
export const { POST } = serve<GenerateAgentWelcomePayload>(async (context) => {
const { agentId } = context.requestPayload ?? {};
// 仅为单个Agent执行
const workflow = new AgentWelcomeWorkflow(db, agentId);
const data = await context.run('generate', () => workflow.generate());
return { success: true, agentId, data };
});核心特性:
- ✅ 过滤Redis中已有缓存数据的Agent
- ✅ 支持 标志仅处理订阅用户的Agent
paidOnly - ✅ 支持 模式获取统计数据
dryRun - ✅ 对大量Agent批量使用扇出(CHUNK_SIZE=20)
- ✅ 每次执行仅处理一个Agent
文件:
/api/workflows/agent-welcome/process-agents/route.ts/api/workflows/agent-welcome/paginate-agents/route.ts/api/workflows/agent-welcome/generate-agent/route.ts/server/workflows/agentWelcome/index.ts
Key Takeaways from Examples
示例核心要点
Both workflows follow the exact same pattern:
-
Layer 1 (Entry Point):
- Calculate statistics
- Filter existing items
- Support dry-run mode
- Trigger pagination only if needed
-
Layer 2 (Pagination):
- Paginate with cursor (PAGE_SIZE=50)
- Fan-out large batches (CHUNK_SIZE=20)
- Trigger Layer 3 for each item
- Recursively process all pages
-
Layer 3 (Execution):
- Process ONE item per execution
- Perform business logic
- Save results
- Return success/failure
The only differences are:
- Entity type (users vs agents)
- Business logic (placeholder generation vs welcome generation)
- Data source (different database queries)
两个工作流都遵循完全相同的模式:
-
第一层(入口点):
- 计算统计数据
- 过滤现有项目
- 支持试运行模式
- 仅在需要时触发分页
-
第二层(分页层):
- 使用游标分页(PAGE_SIZE=50)
- 对大批量任务扇出(CHUNK_SIZE=20)
- 为每个项目触发第三层
- 递归处理所有分页
-
第三层(执行层):
- 每次执行处理一个项目
- 执行业务逻辑
- 保存结果
- 返回成功/失败状态
唯一的区别是:
- 实体类型(用户 vs Agent)
- 业务逻辑(占位符生成 vs 欢迎信息生成)
- 数据源(不同的数据库查询)
Common Pitfalls
常见陷阱
❌ Don't: Use context.run() without unique names
❌ 不要:使用context.run()时不指定唯一名称
typescript
// Bad: Same step name when processing multiple items
await Promise.all(items.map((item) => context.run('process', () => process(item))));typescript
// Good: Unique step names
await Promise.all(items.map((item) => context.run(`process:${item.id}`, () => process(item))));typescript
// 坏:处理多个项目时使用相同的步骤名称
await Promise.all(items.map((item) => context.run('process', () => process(item))));typescript
// 好:使用唯一的步骤名称
await Promise.all(items.map((item) => context.run(`process:${item.id}`, () => process(item))));❌ Don't: Forget to validate payload parameters
❌ 不要:忘记验证负载参数
typescript
// Bad: No validation
export const { POST } = serve<Payload>(async (context) => {
const { itemId } = context.requestPayload ?? {};
const result = await process(itemId); // May fail with undefined
});typescript
// Good: Validate early
export const { POST } = serve<Payload>(async (context) => {
const { itemId } = context.requestPayload ?? {};
if (!itemId) {
return { success: false, error: 'Missing itemId' };
}
const result = await process(itemId);
});typescript
// 坏:没有验证
export const { POST } = serve<Payload>(async (context) => {
const { itemId } = context.requestPayload ?? {};
const result = await process(itemId); // 可能因为undefined失败
});typescript
// 好:提前验证
export const { POST } = serve<Payload>(async (context) => {
const { itemId } = context.requestPayload ?? {};
if (!itemId) {
return { success: false, error: '缺少 itemId' };
}
const result = await process(itemId);
});❌ Don't: Skip filtering existing items
❌ 不要:跳过过滤现有项目
typescript
// Bad: No filtering, may duplicate work
const allItems = await getAllItems();
await Promise.all(allItems.map((item) => triggerExecute(item)));typescript
// Good: Filter existing items first
const allItems = await getAllItems();
const itemsNeedingProcessing = await filterExisting(allItems);
await Promise.all(itemsNeedingProcessing.map((item) => triggerExecute(item)));typescript
// 坏:没有过滤,可能重复工作
const allItems = await getAllItems();
await Promise.all(allItems.map((item) => triggerExecute(item)));typescript
// 好:先过滤现有项目
const allItems = await getAllItems();
const itemsNeedingProcessing = await filterExisting(allItems);
await Promise.all(itemsNeedingProcessing.map((item) => triggerExecute(item)));❌ Don't: Use inconsistent logging
❌ 不要:使用不一致的日志记录
typescript
// Bad: Inconsistent prefixes and formats
console.log('Starting workflow');
log.info('Processing item:', itemId);
console.log(`Done with ${itemId}`);typescript
// Good: Consistent structured logging
console.log('[workflow:layer] Starting with payload:', payload);
console.log('[workflow:layer] Processing item:', { itemId });
console.log('[workflow:layer] Completed:', { itemId, result });typescript
// 坏:前缀和格式不一致
console.log('Starting workflow');
log.info('Processing item:', itemId);
console.log(`Done with ${itemId}`);typescript
// 好:统一的结构化日志
console.log('[workflow:layer] 启动,负载:', payload);
console.log('[workflow:layer] 处理项目:', { itemId });
console.log('[workflow:layer] 完成:', { itemId, result });Environment Variables Required
所需环境变量
bash
undefinedbash
undefinedRequired for all workflows
所有工作流必填
APP_URL=https://your-app.com # Base URL for workflow endpoints
QSTASH_TOKEN=qstash_xxx # QStash authentication token
APP_URL=https://your-app.com # 工作流端点的基础URL
QSTASH_TOKEN=qstash_xxx # QStash认证令牌
Optional (for custom QStash URL)
可选(用于自定义QStash URL)
QSTASH_URL=https://custom-qstash.com # Custom QStash endpoint
---QSTASH_URL=https://custom-qstash.com # 自定义QStash端点
---Checklist for New Workflows
新工作流检查清单
Planning Phase
规划阶段
- Identify entity to process (users, agents, items, etc.)
- Define business logic for single item execution
- Determine filtering logic (Redis cache, database state, etc.)
- 确定要处理的实体(用户、Agent、项目等)
- 定义单个项目执行的业务逻辑
- 确定过滤逻辑(Redis缓存、数据库状态等)
Implementation Phase
实现阶段
- Define payload types with proper TypeScript interfaces
- Create workflow class with static trigger methods
- Layer 1: Implement entry point with dry-run support
- Layer 1: Add filtering logic to avoid duplicate work
- Layer 2: Implement pagination with fan-out logic
- Layer 3: Implement single task execution (ONE item per run)
- Configure appropriate flowControl for each layer
- Add consistent logging with workflow prefixes
- Validate all required payload parameters
- Use unique context.run() step names
- 使用合适的TypeScript接口定义负载类型
- 创建带有静态触发方法的工作流类
- 第一层:实现支持试运行的入口点
- 第一层:添加过滤逻辑避免重复工作
- 第二层:实现带有扇出逻辑的分页
- 第三层:实现单任务执行(每次运行处理一个项目)
- 为每一层配置合适的flowControl
- 添加带有工作流前缀的统一日志
- 验证所有必填负载参数
- 使用唯一的context.run()步骤名称
Quality & Deployment
质量与部署
- Return consistent response shapes
- Configure cloud deployment (see Cloud Guide if using lobehub-cloud)
- Write integration tests
- Test with dry-run mode first
- Test with small batch before full rollout
- 返回统一的响应结构
- 配置云部署(如果使用lobehub-cloud请参考云指南)
- 编写集成测试
- 先使用试运行模式测试
- 全量发布前先使用小批量测试
Additional Resources
额外资源
- Upstash Workflow Documentation
- QStash Documentation
- Example Workflows in Codebase
- Workflow Classes
- Upstash Workflow 文档
- QStash 文档
- 代码库中的示例工作流
- 工作流类