upstash-workflow

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Upstash Workflow Implementation Guide

Upstash Workflow 实现指南

This guide covers the standard patterns for implementing Upstash Workflow + QStash async workflows in the LobeHub codebase.
本指南介绍了在 LobeHub 代码库中实现 Upstash Workflow + QStash 异步工作流的标准模式。

🎯 The Three Core Patterns

🎯 三大核心模式

All workflows in LobeHub follow the same 3-layer architecture with three essential patterns:
  1. 🔍 Dry-Run Mode - Get statistics without triggering actual execution
  2. 🌟 Fan-Out Pattern - Split large batches into smaller chunks for parallel processing
  3. 🎯 Single Task Execution - Each workflow execution processes ONE item only
These patterns ensure scalable, debuggable, and cost-efficient async workflows.
LobeHub 中所有的工作流都遵循相同的三层架构,包含三个核心模式:
  1. 🔍 试运行模式 - 无需触发实际执行即可获取统计数据
  2. 🌟 扇出模式 - 将大批量任务拆分为更小的块进行并行处理
  3. 🎯 单任务执行 - 每次工作流执行仅处理一个项目
这些模式可以保证异步工作流具备可扩展性、可调试性和成本效益。

Table of Contents

目录

Architecture Overview

架构概览

Standard 3-Layer Pattern

标准三层模式

All workflows follow a standard 3-layer architecture:
Layer 1: Entry Point (process-*)
  ├─ Validates prerequisites
  ├─ Calculates total items to process
  ├─ Filters existing items
  ├─ Supports dry-run mode (statistics only)
  └─ Triggers Layer 2 if work needed

Layer 2: Pagination (paginate-*)
  ├─ Handles cursor-based pagination
  ├─ Implements fan-out for large batches
  ├─ Recursively processes all pages
  └─ Triggers Layer 3 for each item

Layer 3: Single Task Execution (execute-*/generate-*)
  └─ Performs actual business logic for ONE item
Examples:
welcome-placeholder
,
agent-welcome

所有工作流都遵循标准的三层架构:
Layer 1: 入口点 (process-*)
  ├─ 验证前置条件
  ├─ 计算待处理的项目总数
  ├─ 过滤已存在的项目
  ├─ 支持试运行模式(仅统计数据)
  └─ 如有需要则触发第二层

Layer 2: 分页层 (paginate-*)
  ├─ 处理基于游标分页的逻辑
  ├─ 对大批量任务实现扇出
  ├─ 递归处理所有分页
  └─ 为每个项目触发第三层

Layer 3: 单任务执行层 (execute-*/generate-*)
  └─ 为单个项目执行业务逻辑
示例:
welcome-placeholder
,
agent-welcome

Core Patterns

核心模式

1. Dry-Run Mode

1. 试运行模式

Purpose: Get statistics without triggering actual execution
Pattern:
typescript
// Layer 1: Entry Point
if (dryRun) {
  console.log('[workflow:process] Dry run mode, returning statistics only');
  return {
    ...result,
    dryRun: true,
    message: `[DryRun] Would process ${itemsNeedingProcessing.length} items`,
  };
}
Use Case: Check how many items will be processed before committing to execution
Response:
typescript
{
  success: true,
  totalEligible: 100,
  toProcess: 80,
  alreadyProcessed: 20,
  dryRun: true,
  message: "[DryRun] Would process 80 items"
}
用途: 无需触发实际执行即可获取统计数据
模式:
typescript
// 第一层:入口点
if (dryRun) {
  console.log('[workflow:process] 试运行模式,仅返回统计数据');
  return {
    ...result,
    dryRun: true,
    message: `[试运行] 将处理 ${itemsNeedingProcessing.length} 个项目`,
  };
}
使用场景: 在确认执行前检查将处理的项目数量
响应:
typescript
{
  success: true,
  totalEligible: 100,
  toProcess: 80,
  alreadyProcessed: 20,
  dryRun: true,
  message: "[试运行] 将处理 80 个项目"
}

2. Fan-Out Pattern

2. 扇出模式

Purpose: Split large batches into smaller chunks for parallel processing
Pattern:
typescript
// Layer 2: Pagination
const CHUNK_SIZE = 20;

if (itemIds.length > CHUNK_SIZE) {
  // Fan-out to smaller chunks
  const chunks = chunk(itemIds, CHUNK_SIZE);
  console.log('[workflow:paginate] Fan-out mode:', {
    chunks: chunks.length,
    chunkSize: CHUNK_SIZE,
    totalItems: itemIds.length,
  });

  await Promise.all(
    chunks.map((ids, idx) =>
      context.run(`workflow:fanout:${idx + 1}/${chunks.length}`, () =>
        WorkflowClass.triggerPaginateItems({ itemIds: ids }),
      ),
    ),
  );
}
Use Case: Avoid hitting workflow step limits by splitting large batches
Configuration:
  • PAGE_SIZE = 50
    - Items per pagination page
  • CHUNK_SIZE = 20
    - Items per fan-out chunk
  • If batch > CHUNK_SIZE, split into chunks and recursively trigger pagination
用途: 将大批量任务拆分为更小的块进行并行处理
模式:
typescript
// 第二层:分页层
const CHUNK_SIZE = 20;

if (itemIds.length > CHUNK_SIZE) {
  // 扇出为更小的块
  const chunks = chunk(itemIds, CHUNK_SIZE);
  console.log('[workflow:paginate] 扇出模式:', {
    chunks: chunks.length,
    chunkSize: CHUNK_SIZE,
    totalItems: itemIds.length,
  });

  await Promise.all(
    chunks.map((ids, idx) =>
      context.run(`workflow:fanout:${idx + 1}/${chunks.length}`, () =>
        WorkflowClass.triggerPaginateItems({ itemIds: ids }),
      ),
    ),
  );
}
使用场景: 通过拆分大批量任务避免触及工作流步骤限制
配置:
  • PAGE_SIZE = 50
    - 每个分页的项目数
  • CHUNK_SIZE = 20
    - 每个扇出块的项目数
  • 如果批量任务数大于 CHUNK_SIZE,则拆分为多个块并递归触发分页

3. Single Task Execution

3. 单任务执行

Purpose: Execute business logic for ONE item at a time
Pattern:
typescript
// Layer 3: Single Task Execution
export const { POST } = serve<ExecutePayload>(
  async (context) => {
    const { itemId } = context.requestPayload ?? {};

    if (!itemId) {
      return { success: false, error: 'Missing itemId' };
    }

    // Get item
    const item = await context.run('workflow:get-item', async () => {
      return getItem(itemId);
    });

    // Execute business logic for THIS item only
    const result = await context.run('workflow:execute', async () => {
      return processItem(item);
    });

    // Save result for THIS item
    await context.run('workflow:save', async () => {
      return saveResult(itemId, result);
    });

    return { success: true, itemId, result };
  },
  {
    flowControl: {
      key: 'workflow.execute',
      parallelism: 10,
      ratePerSecond: 5,
    },
  },
);
Key Principles:
  • Each workflow execution handles exactly ONE item
  • Parallelism controlled by
    flowControl
    config
  • Multiple items processed via Layer 2 triggering multiple Layer 3 executions

用途: 一次仅为一个项目执行业务逻辑
模式:
typescript
// 第三层:单任务执行层
export const { POST } = serve<ExecutePayload>(
  async (context) => {
    const { itemId } = context.requestPayload ?? {};

    if (!itemId) {
      return { success: false, error: '缺少 itemId' };
    }

    // 获取项目
    const item = await context.run('workflow:get-item', async () => {
      return getItem(itemId);
    });

    // 仅为当前项目执行业务逻辑
    const result = await context.run('workflow:execute', async () => {
      return processItem(item);
    });

    // 保存当前项目的结果
    await context.run('workflow:save', async () => {
      return saveResult(itemId, result);
    });

    return { success: true, itemId, result };
  },
  {
    flowControl: {
      key: 'workflow.execute',
      parallelism: 10,
      ratePerSecond: 5,
    },
  },
);
核心原则:
  • 每次工作流执行仅处理一个项目
  • 并行度由
    flowControl
    配置控制
  • 多个项目通过第二层触发多次第三层执行来处理

File Structure

文件结构

Directory Layout

目录布局

src/
├── app/(backend)/api/workflows/
│   └── {workflow-name}/
│       ├── process-{entities}/route.ts      # Layer 1
│       ├── paginate-{entities}/route.ts     # Layer 2
│       └── execute-{entity}/route.ts        # Layer 3
└── server/workflows/
    └── {workflowName}/
        └── index.ts                          # Workflow class
src/
├── app/(backend)/api/workflows/
│   └── {workflow-name}/
│       ├── process-{entities}/route.ts      # 第一层
│       ├── paginate-{entities}/route.ts     # 第二层
│       └── execute-{entity}/route.ts        # 第三层
└── server/workflows/
    └── {workflowName}/
        └── index.ts                          # 工作流类

Cloud Project Configuration

云项目配置

For lobehub-cloud specific configurations (re-exports, cloud-only workflows, deployment patterns), see:
📄 Cloud Configuration Guide

如需了解 lobehub-cloud 特定配置(重新导出、仅云端工作流、部署模式),请查看:
📄 云配置指南

Implementation Patterns

实现模式

1. Workflow Class

1. 工作流类

Location:
src/server/workflows/{workflowName}/index.ts
typescript
import { Client } from '@upstash/workflow';
import debug from 'debug';

const log = debug('lobe-server:workflows:{workflow-name}');

// Workflow paths
const WORKFLOW_PATHS = {
  processItems: '/api/workflows/{workflow-name}/process-items',
  paginateItems: '/api/workflows/{workflow-name}/paginate-items',
  executeItem: '/api/workflows/{workflow-name}/execute-item',
} as const;

// Payload types
export interface ProcessItemsPayload {
  dryRun?: boolean;
  force?: boolean;
}

export interface PaginateItemsPayload {
  cursor?: string;
  itemIds?: string[]; // For fanout chunks
}

export interface ExecuteItemPayload {
  itemId: string;
}

/**
 * Get workflow URL using APP_URL
 */
const getWorkflowUrl = (path: string): string => {
  const baseUrl = process.env.APP_URL;
  if (!baseUrl) throw new Error('APP_URL is required to trigger workflows');
  return new URL(path, baseUrl).toString();
};

/**
 * Get workflow client
 */
const getWorkflowClient = (): Client => {
  const token = process.env.QSTASH_TOKEN;
  if (!token) throw new Error('QSTASH_TOKEN is required to trigger workflows');

  const config: ConstructorParameters<typeof Client>[0] = { token };
  if (process.env.QSTASH_URL) {
    (config as Record<string, unknown>).url = process.env.QSTASH_URL;
  }
  return new Client(config);
};

/**
 * {Workflow Name} Workflow
 */
export class {WorkflowName}Workflow {
  private static client: Client;

  private static getClient(): Client {
    if (!this.client) {
      this.client = getWorkflowClient();
    }
    return this.client;
  }

  /**
   * Trigger workflow to process items (entry point)
   */
  static triggerProcessItems(payload: ProcessItemsPayload) {
    const url = getWorkflowUrl(WORKFLOW_PATHS.processItems);
    log('Triggering process-items workflow');
    return this.getClient().trigger({ body: payload, url });
  }

  /**
   * Trigger workflow to paginate items
   */
  static triggerPaginateItems(payload: PaginateItemsPayload) {
    const url = getWorkflowUrl(WORKFLOW_PATHS.paginateItems);
    log('Triggering paginate-items workflow');
    return this.getClient().trigger({ body: payload, url });
  }

  /**
   * Trigger workflow to execute a single item
   */
  static triggerExecuteItem(payload: ExecuteItemPayload) {
    const url = getWorkflowUrl(WORKFLOW_PATHS.executeItem);
    log('Triggering execute-item workflow: %s', payload.itemId);
    return this.getClient().trigger({ body: payload, url });
  }

  /**
   * Filter items that need processing (e.g., check Redis cache, database state)
   */
  static async filterItemsNeedingProcessing(itemIds: string[]): Promise<string[]> {
    if (itemIds.length === 0) return [];

    // Check existing state (Redis, database, etc.)
    // Return items that need processing

    return itemIds;
  }
}
位置:
src/server/workflows/{workflowName}/index.ts
typescript
import { Client } from '@upstash/workflow';
import debug from 'debug';

const log = debug('lobe-server:workflows:{workflow-name}');

// 工作流路径
const WORKFLOW_PATHS = {
  processItems: '/api/workflows/{workflow-name}/process-items',
  paginateItems: '/api/workflows/{workflow-name}/paginate-items',
  executeItem: '/api/workflows/{workflow-name}/execute-item',
} as const;

// 负载类型
export interface ProcessItemsPayload {
  dryRun?: boolean;
  force?: boolean;
}

export interface PaginateItemsPayload {
  cursor?: string;
  itemIds?: string[]; // 用于扇出块
}

export interface ExecuteItemPayload {
  itemId: string;
}

/**
 * 使用 APP_URL 获取工作流 URL
 */
const getWorkflowUrl = (path: string): string => {
  const baseUrl = process.env.APP_URL;
  if (!baseUrl) throw new Error('触发工作流需要配置 APP_URL');
  return new URL(path, baseUrl).toString();
};

/**
 * 获取工作流客户端
 */
const getWorkflowClient = (): Client => {
  const token = process.env.QSTASH_TOKEN;
  if (!token) throw new Error('触发工作流需要配置 QSTASH_TOKEN');

  const config: ConstructorParameters<typeof Client>[0] = { token };
  if (process.env.QSTASH_URL) {
    (config as Record<string, unknown>).url = process.env.QSTASH_URL;
  }
  return new Client(config);
};

/**
 * {工作流名称} 工作流
 */
export class {WorkflowName}Workflow {
  private static client: Client;

  private static getClient(): Client {
    if (!this.client) {
      this.client = getWorkflowClient();
    }
    return this.client;
  }

  /**
   * 触发工作流处理项目(入口点)
   */
  static triggerProcessItems(payload: ProcessItemsPayload) {
    const url = getWorkflowUrl(WORKFLOW_PATHS.processItems);
    log('触发 process-items 工作流');
    return this.getClient().trigger({ body: payload, url });
  }

  /**
   * 触发工作流分页处理项目
   */
  static triggerPaginateItems(payload: PaginateItemsPayload) {
    const url = getWorkflowUrl(WORKFLOW_PATHS.paginateItems);
    log('触发 paginate-items 工作流');
    return this.getClient().trigger({ body: payload, url });
  }

  /**
   * 触发工作流执行单个项目
   */
  static triggerExecuteItem(payload: ExecuteItemPayload) {
    const url = getWorkflowUrl(WORKFLOW_PATHS.executeItem);
    log('触发 execute-item 工作流: %s', payload.itemId);
    return this.getClient().trigger({ body: payload, url });
  }

  /**
   * 过滤需要处理的项目(例如检查 Redis 缓存、数据库状态)
   */
  static async filterItemsNeedingProcessing(itemIds: string[]): Promise<string[]> {
    if (itemIds.length === 0) return [];

    // 检查现有状态(Redis、数据库等)
    // 返回需要处理的项目

    return itemIds;
  }
}

2. Layer 1: Entry Point (process-*)

2. 第一层:入口点 (process-*)

Purpose: Validates prerequisites, calculates statistics, supports dryRun mode
typescript
import { serve } from '@upstash/workflow/nextjs';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type ProcessPayload } from '@/server/workflows/{workflowName}';

/**
 * Entry workflow for {workflow description}
 * 1. Get all eligible items
 * 2. Filter items that already have results
 * 3. If dryRun, return statistics only
 * 4. If no items need processing, return early
 * 5. Trigger paginate workflow
 */
export const { POST } = serve<ProcessPayload>(
  async (context) => {
    const { dryRun, force } = context.requestPayload ?? {};

    console.log('[{workflow}:process] Starting with payload:', { dryRun, force });

    // Get all eligible items
    const allItemIds = await context.run('{workflow}:get-all-items', async () => {
      const db = await getServerDB();
      // Query database for eligible items
      return items.map((item) => item.id);
    });

    console.log('[{workflow}:process] Total eligible items:', allItemIds.length);

    if (allItemIds.length === 0) {
      return {
        success: true,
        totalEligible: 0,
        message: 'No eligible items found',
      };
    }

    // Filter items that need processing
    const itemsNeedingProcessing = await context.run('{workflow}:filter-existing', () =>
      WorkflowClass.filterItemsNeedingProcessing(allItemIds),
    );

    const result = {
      success: true,
      totalEligible: allItemIds.length,
      toProcess: itemsNeedingProcessing.length,
      alreadyProcessed: allItemIds.length - itemsNeedingProcessing.length,
    };

    console.log('[{workflow}:process] Check result:', result);

    // If dryRun mode, return statistics only
    if (dryRun) {
      console.log('[{workflow}:process] Dry run mode, returning statistics only');
      return {
        ...result,
        dryRun: true,
        message: `[DryRun] Would process ${itemsNeedingProcessing.length} items`,
      };
    }

    // If no items need processing, return early
    if (itemsNeedingProcessing.length === 0) {
      console.log('[{workflow}:process] All items already processed');
      return {
        ...result,
        message: 'All items already processed',
      };
    }

    // Trigger paginate workflow
    console.log('[{workflow}:process] Triggering paginate workflow');
    await context.run('{workflow}:trigger-paginate', () => WorkflowClass.triggerPaginateItems({}));

    return {
      ...result,
      message: `Triggered pagination for ${itemsNeedingProcessing.length} items`,
    };
  },
  {
    flowControl: {
      key: '{workflow}.process',
      parallelism: 1,
      ratePerSecond: 1,
    },
  },
);
用途: 验证前置条件、计算统计数据、支持试运行模式
typescript
import { serve } from '@upstash/workflow/nextjs';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type ProcessPayload } from '@/server/workflows/{workflowName}';

/**
 * {工作流描述} 的入口工作流
 * 1. 获取所有符合条件的项目
 * 2. 过滤已有结果的项目
 * 3. 如果是试运行模式,仅返回统计数据
 * 4. 如果没有需要处理的项目,提前返回
 * 5. 触发分页工作流
 */
export const { POST } = serve<ProcessPayload>(
  async (context) => {
    const { dryRun, force } = context.requestPayload ?? {};

    console.log('[{workflow}:process] 启动,负载:', { dryRun, force });

    // 获取所有符合条件的项目
    const allItemIds = await context.run('{workflow}:get-all-items', async () => {
      const db = await getServerDB();
      // 查询数据库获取符合条件的项目
      return items.map((item) => item.id);
    });

    console.log('[{workflow}:process] 符合条件的项目总数:', allItemIds.length);

    if (allItemIds.length === 0) {
      return {
        success: true,
        totalEligible: 0,
        message: '未找到符合条件的项目',
      };
    }

    // 过滤需要处理的项目
    const itemsNeedingProcessing = await context.run('{workflow}:filter-existing', () =>
      WorkflowClass.filterItemsNeedingProcessing(allItemIds),
    );

    const result = {
      success: true,
      totalEligible: allItemIds.length,
      toProcess: itemsNeedingProcessing.length,
      alreadyProcessed: allItemIds.length - itemsNeedingProcessing.length,
    };

    console.log('[{workflow}:process] 检查结果:', result);

    // 如果是试运行模式,仅返回统计数据
    if (dryRun) {
      console.log('[{workflow}:process] 试运行模式,仅返回统计数据');
      return {
        ...result,
        dryRun: true,
        message: `[试运行] 将处理 ${itemsNeedingProcessing.length} 个项目`,
      };
    }

    // 如果没有需要处理的项目,提前返回
    if (itemsNeedingProcessing.length === 0) {
      console.log('[{workflow}:process] 所有项目已处理完成');
      return {
        ...result,
        message: '所有项目已处理完成',
      };
    }

    // 触发分页工作流
    console.log('[{workflow}:process] 触发分页工作流');
    await context.run('{workflow}:trigger-paginate', () => WorkflowClass.triggerPaginateItems({}));

    return {
      ...result,
      message: `已触发 ${itemsNeedingProcessing.length} 个项目的分页处理`,
    };
  },
  {
    flowControl: {
      key: '{workflow}.process',
      parallelism: 1,
      ratePerSecond: 1,
    },
  },
);

3. Layer 2: Pagination (paginate-*)

3. 第二层:分页层 (paginate-*)

Purpose: Handles cursor-based pagination, implements fanout for large batches
typescript
import { serve } from '@upstash/workflow/nextjs';
import { chunk } from 'es-toolkit/compat';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type PaginatePayload } from '@/server/workflows/{workflowName}';

const PAGE_SIZE = 50;
const CHUNK_SIZE = 20;

/**
 * Paginate items workflow - handles pagination and fanout
 * 1. If specific itemIds provided (from fanout), process them directly
 * 2. Otherwise, paginate through all items using cursor
 * 3. Filter items that need processing
 * 4. If batch > CHUNK_SIZE, fanout to smaller chunks
 * 5. Trigger execute workflow for each item
 * 6. Schedule next page if cursor exists
 */
export const { POST } = serve<PaginatePayload>(
  async (context) => {
    const { cursor, itemIds: payloadItemIds } = context.requestPayload ?? {};

    console.log('[{workflow}:paginate] Starting with payload:', {
      cursor,
      itemIdsCount: payloadItemIds?.length ?? 0,
    });

    // If specific itemIds are provided, process them directly (from fanout)
    if (payloadItemIds && payloadItemIds.length > 0) {
      console.log('[{workflow}:paginate] Processing specific itemIds:', {
        count: payloadItemIds.length,
      });

      await Promise.all(
        payloadItemIds.map((itemId) =>
          context.run(`{workflow}:execute:${itemId}`, () =>
            WorkflowClass.triggerExecuteItem({ itemId }),
          ),
        ),
      );

      return {
        success: true,
        processedItems: payloadItemIds.length,
      };
    }

    // Paginate through all items
    const itemBatch = await context.run('{workflow}:get-batch', async () => {
      const db = await getServerDB();
      // Query database with cursor and PAGE_SIZE
      const items = await db.query(...);

      if (!items.length) return { ids: [] };

      const last = items.at(-1);
      return {
        ids: items.map(item => item.id),
        cursor: last ? last.id : undefined,
      };
    });

    const batchItemIds = itemBatch.ids;
    const nextCursor = 'cursor' in itemBatch ? itemBatch.cursor : undefined;

    console.log('[{workflow}:paginate] Got batch:', {
      batchSize: batchItemIds.length,
      nextCursor,
    });

    if (batchItemIds.length === 0) {
      console.log('[{workflow}:paginate] No more items, pagination complete');
      return { success: true, message: 'Pagination complete' };
    }

    // Filter items that need processing
    const itemIds = await context.run('{workflow}:filter-existing', () =>
      WorkflowClass.filterItemsNeedingProcessing(batchItemIds),
    );

    console.log('[{workflow}:paginate] After filtering:', {
      needProcessing: itemIds.length,
      skipped: batchItemIds.length - itemIds.length,
    });

    // Process items if any need processing
    if (itemIds.length > 0) {
      if (itemIds.length > CHUNK_SIZE) {
        // Fanout to smaller chunks
        const chunks = chunk(itemIds, CHUNK_SIZE);
        console.log('[{workflow}:paginate] Fanout mode:', {
          chunks: chunks.length,
          chunkSize: CHUNK_SIZE,
          totalItems: itemIds.length,
        });

        await Promise.all(
          chunks.map((ids, idx) =>
            context.run(`{workflow}:fanout:${idx + 1}/${chunks.length}`, () =>
              WorkflowClass.triggerPaginateItems({ itemIds: ids }),
            ),
          ),
        );
      } else {
        // Process directly
        console.log('[{workflow}:paginate] Processing items directly:', {
          count: itemIds.length,
        });

        await Promise.all(
          itemIds.map((itemId) =>
            context.run(`{workflow}:execute:${itemId}`, () =>
              WorkflowClass.triggerExecuteItem({ itemId }),
            ),
          ),
        );
      }
    }

    // Schedule next page
    if (nextCursor) {
      console.log('[{workflow}:paginate] Scheduling next page:', { nextCursor });
      await context.run('{workflow}:next-page', () =>
        WorkflowClass.triggerPaginateItems({ cursor: nextCursor }),
      );
    } else {
      console.log('[{workflow}:paginate] No more pages');
    }

    return {
      success: true,
      processedItems: itemIds.length,
      skippedItems: batchItemIds.length - itemIds.length,
      nextCursor: nextCursor ?? null,
    };
  },
  {
    flowControl: {
      key: '{workflow}.paginate',
      parallelism: 20,
      ratePerSecond: 5,
    },
  },
);
用途: 处理基于游标分页的逻辑,为大批量任务实现扇出
typescript
import { serve } from '@upstash/workflow/nextjs';
import { chunk } from 'es-toolkit/compat';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type PaginatePayload } from '@/server/workflows/{workflowName}';

const PAGE_SIZE = 50;
const CHUNK_SIZE = 20;

/**
 * 分页项目工作流 - 处理分页和扇出逻辑
 * 1. 如果提供了特定的 itemIds(来自扇出),直接处理
 * 2. 否则,使用游标分页遍历所有项目
 * 3. 过滤需要处理的项目
 * 4. 如果批量任务数大于 CHUNK_SIZE,扇出为更小的块
 * 5. 为每个项目触发执行工作流
 * 6. 如果存在游标,调度下一页处理
 */
export const { POST } = serve<PaginatePayload>(
  async (context) => {
    const { cursor, itemIds: payloadItemIds } = context.requestPayload ?? {};

    console.log('[{workflow}:paginate] 启动,负载:', {
      cursor,
      itemIdsCount: payloadItemIds?.length ?? 0,
    });

    // 如果提供了特定的 itemIds,直接处理(来自扇出)
    if (payloadItemIds && payloadItemIds.length > 0) {
      console.log('[{workflow}:paginate] 处理指定的 itemIds:', {
        count: payloadItemIds.length,
      });

      await Promise.all(
        payloadItemIds.map((itemId) =>
          context.run(`{workflow}:execute:${itemId}`, () =>
            WorkflowClass.triggerExecuteItem({ itemId }),
          ),
        ),
      );

      return {
        success: true,
        processedItems: payloadItemIds.length,
      };
    }

    // 分页遍历所有项目
    const itemBatch = await context.run('{workflow}:get-batch', async () => {
      const db = await getServerDB();
      // 使用游标和 PAGE_SIZE 查询数据库
      const items = await db.query(...);

      if (!items.length) return { ids: [] };

      const last = items.at(-1);
      return {
        ids: items.map(item => item.id),
        cursor: last ? last.id : undefined,
      };
    });

    const batchItemIds = itemBatch.ids;
    const nextCursor = 'cursor' in itemBatch ? itemBatch.cursor : undefined;

    console.log('[{workflow}:paginate] 获取到批量数据:', {
      batchSize: batchItemIds.length,
      nextCursor,
    });

    if (batchItemIds.length === 0) {
      console.log('[{workflow}:paginate] 没有更多项目,分页完成');
      return { success: true, message: '分页完成' };
    }

    // 过滤需要处理的项目
    const itemIds = await context.run('{workflow}:filter-existing', () =>
      WorkflowClass.filterItemsNeedingProcessing(batchItemIds),
    );

    console.log('[{workflow}:paginate] 过滤后:', {
      needProcessing: itemIds.length,
      skipped: batchItemIds.length - itemIds.length,
    });

    // 如果有需要处理的项目则处理
    if (itemIds.length > 0) {
      if (itemIds.length > CHUNK_SIZE) {
        // 扇出为更小的块
        const chunks = chunk(itemIds, CHUNK_SIZE);
        console.log('[{workflow}:paginate] 扇出模式:', {
          chunks: chunks.length,
          chunkSize: CHUNK_SIZE,
          totalItems: itemIds.length,
        });

        await Promise.all(
          chunks.map((ids, idx) =>
            context.run(`{workflow}:fanout:${idx + 1}/${chunks.length}`, () =>
              WorkflowClass.triggerPaginateItems({ itemIds: ids }),
            ),
          ),
        );
      } else {
        // 直接处理
        console.log('[{workflow}:paginate] 直接处理项目:', {
          count: itemIds.length,
        });

        await Promise.all(
          itemIds.map((itemId) =>
            context.run(`{workflow}:execute:${itemId}`, () =>
              WorkflowClass.triggerExecuteItem({ itemId }),
            ),
          ),
        );
      }
    }

    // 调度下一页
    if (nextCursor) {
      console.log('[{workflow}:paginate] 调度下一页:', { nextCursor });
      await context.run('{workflow}:next-page', () =>
        WorkflowClass.triggerPaginateItems({ cursor: nextCursor }),
      );
    } else {
      console.log('[{workflow}:paginate] 没有更多页面');
    }

    return {
      success: true,
      processedItems: itemIds.length,
      skippedItems: batchItemIds.length - itemIds.length,
      nextCursor: nextCursor ?? null,
    };
  },
  {
    flowControl: {
      key: '{workflow}.paginate',
      parallelism: 20,
      ratePerSecond: 5,
    },
  },
);

4. Layer 3: Execution (execute-/generate-)

4. 第三层:执行层 (execute-/generate-)

Purpose: Performs actual business logic
typescript
import { serve } from '@upstash/workflow/nextjs';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type ExecutePayload } from '@/server/workflows/{workflowName}';

/**
 * Execute item workflow - performs actual business logic
 * 1. Get item data
 * 2. Perform business logic (AI generation, data processing, etc.)
 * 3. Save results
 */
export const { POST } = serve<ExecutePayload>(
  async (context) => {
    const { itemId } = context.requestPayload ?? {};

    console.log('[{workflow}:execute] Starting:', { itemId });

    if (!itemId) {
      return { success: false, error: 'Missing itemId' };
    }

    const db = await getServerDB();

    // Get item data
    const item = await context.run('{workflow}:get-item', async () => {
      // Query database for item
      return item;
    });

    if (!item) {
      return { success: false, error: 'Item not found' };
    }

    // Perform business logic
    const result = await context.run('{workflow}:process-item', async () => {
      const workflow = new WorkflowClass(db, itemId);
      return workflow.generate(); // or process(), execute(), etc.
    });

    // Save results
    await context.run('{workflow}:save-result', async () => {
      const workflow = new WorkflowClass(db, itemId);
      return workflow.saveToRedis(result); // or saveToDatabase(), etc.
    });

    console.log('[{workflow}:execute] Completed:', { itemId });

    return {
      success: true,
      itemId,
      result,
    };
  },
  {
    flowControl: {
      key: '{workflow}.execute',
      parallelism: 10,
      ratePerSecond: 5,
    },
  },
);

用途: 执行实际的业务逻辑
typescript
import { serve } from '@upstash/workflow/nextjs';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type ExecutePayload } from '@/server/workflows/{workflowName}';

/**
 * 执行项目工作流 - 执行实际的业务逻辑
 * 1. 获取项目数据
 * 2. 执行业务逻辑(AI 生成、数据处理等)
 * 3. 保存结果
 */
export const { POST } = serve<ExecutePayload>(
  async (context) => {
    const { itemId } = context.requestPayload ?? {};

    console.log('[{workflow}:execute] 启动:', { itemId });

    if (!itemId) {
      return { success: false, error: '缺少 itemId' };
    }

    const db = await getServerDB();

    // 获取项目数据
    const item = await context.run('{workflow}:get-item', async () => {
      // 查询数据库获取项目
      return item;
    });

    if (!item) {
      return { success: false, error: '项目不存在' };
    }

    // 执行业务逻辑
    const result = await context.run('{workflow}:process-item', async () => {
      const workflow = new WorkflowClass(db, itemId);
      return workflow.generate(); // 或 process(), execute() 等
    });

    // 保存结果
    await context.run('{workflow}:save-result', async () => {
      const workflow = new WorkflowClass(db, itemId);
      return workflow.saveToRedis(result); // 或 saveToDatabase() 等
    });

    console.log('[{workflow}:execute] 完成:', { itemId });

    return {
      success: true,
      itemId,
      result,
    };
  },
  {
    flowControl: {
      key: '{workflow}.execute',
      parallelism: 10,
      ratePerSecond: 5,
    },
  },
);

Best Practices

最佳实践

1. Error Handling

1. 错误处理

typescript
export const { POST } = serve<Payload>(
  async (context) => {
    const { itemId } = context.requestPayload ?? {};

    // Validate required parameters
    if (!itemId) {
      return { success: false, error: 'Missing itemId in payload' };
    }

    try {
      // Perform work
      const result = await context.run('step-name', () => doWork(itemId));

      return { success: true, itemId, result };
    } catch (error) {
      console.error('[workflow:error]', error);
      return {
        success: false,
        error: error instanceof Error ? error.message : 'Unknown error'
      };
    }
  },
  { flowControl: { ... } },
);
typescript
export const { POST } = serve<Payload>(
  async (context) => {
    const { itemId } = context.requestPayload ?? {};

    // 验证必填参数
    if (!itemId) {
      return { success: false, error: '负载中缺少 itemId' };
    }

    try {
      // 执行任务
      const result = await context.run('step-name', () => doWork(itemId));

      return { success: true, itemId, result };
    } catch (error) {
      console.error('[workflow:error]', error);
      return {
        success: false,
        error: error instanceof Error ? error.message : '未知错误'
      };
    }
  },
  { flowControl: { ... } },
);

2. Logging

2. 日志记录

Use consistent log prefixes and structured logging:
typescript
console.log('[{workflow}:{layer}] Starting with payload:', payload);
console.log('[{workflow}:{layer}] Processing items:', { count: items.length });
console.log('[{workflow}:{layer}] Completed:', result);
console.error('[{workflow}:{layer}:error]', error);
使用统一的日志前缀和结构化日志:
typescript
console.log('[{workflow}:{layer}] 启动,负载:', payload);
console.log('[{workflow}:{layer}] 处理项目:', { count: items.length });
console.log('[{workflow}:{layer}] 完成:', result);
console.error('[{workflow}:{layer}:error]', error);

3. Return Values

3. 返回值

Return consistent response shapes:
typescript
// Success response
return {
  success: true,
  itemId,
  result,
  message: 'Optional success message',
};

// Error response
return {
  success: false,
  error: 'Error description',
  itemId, // Include context if available
};

// Statistics response (for entry point)
return {
  success: true,
  totalEligible: 100,
  toProcess: 80,
  alreadyProcessed: 20,
  dryRun: true, // If applicable
  message: 'Summary message',
};
返回统一的响应结构:
typescript
// 成功响应
return {
  success: true,
  itemId,
  result,
  message: '可选的成功消息',
};

// 错误响应
return {
  success: false,
  error: '错误描述',
  itemId, // 如有上下文请包含
};

// 统计响应(入口点使用)
return {
  success: true,
  totalEligible: 100,
  toProcess: 80,
  alreadyProcessed: 20,
  dryRun: true, // 如适用
  message: '摘要消息',
};

4. flowControl Configuration

4. flowControl 配置

Purpose: Control concurrency and rate limiting for workflow executions
Tune concurrency based on layer:
typescript
// Layer 1: Entry point - single instance only
flowControl: {
  key: '{workflow}.process',
  parallelism: 1,        // Only 1 process workflow at a time
  ratePerSecond: 1,      // 1 execution per second
}

// Layer 2: Pagination - moderate concurrency
flowControl: {
  key: '{workflow}.paginate',
  parallelism: 20,       // Up to 20 pagination workflows in parallel
  ratePerSecond: 5,      // 5 new executions per second
}

// Layer 3: Single task execution - high concurrency
flowControl: {
  key: '{workflow}.execute',
  parallelism: 10,       // Up to 10 items processed in parallel
  ratePerSecond: 5,      // 5 new items per second
}
Guidelines:
  • Layer 1: Always use
    parallelism: 1
    to avoid duplicate processing
  • Layer 2: Moderate concurrency for pagination (typically 10-20)
  • Layer 3: Higher concurrency for parallel item processing (typically 5-10)
  • Adjust
    ratePerSecond
    based on external API rate limits or resource constraints
用途: 控制工作流执行的并发数和速率限制
根据层级调整并发度:
typescript
// 第一层:入口点 - 仅允许单实例
flowControl: {
  key: '{workflow}.process',
  parallelism: 1,        // 一次仅运行一个处理工作流
  ratePerSecond: 1,      // 每秒执行1次
}

// 第二层:分页层 - 中等并发
flowControl: {
  key: '{workflow}.paginate',
  parallelism: 20,       // 最多同时运行20个分页工作流
  ratePerSecond: 5,      // 每秒新增5次执行
}

// 第三层:单任务执行层 - 高并发
flowControl: {
  key: '{workflow}.execute',
  parallelism: 10,       // 最多同时处理10个项目
  ratePerSecond: 5,      // 每秒新增5个项目处理
}
指南:
  • 第一层: 始终使用
    parallelism: 1
    避免重复处理
  • 第二层: 分页使用中等并发(通常10-20)
  • 第三层: 并行项目处理使用更高并发(通常5-10)
  • 根据外部API速率限制或资源约束调整
    ratePerSecond

5. context.run() Best Practices

5. context.run() 最佳实践

  • Use descriptive step names with prefixes:
    {workflow}:step-name
  • Each step should be idempotent (safe to retry)
  • Don't nest context.run() calls - keep them flat
  • Use unique step names when processing multiple items:
typescript
// Good: Unique step names
await Promise.all(
  items.map((item) => context.run(`{workflow}:execute:${item.id}`, () => processItem(item))),
);

// Bad: Same step name for all items
await Promise.all(
  items.map((item) =>
    context.run(`{workflow}:execute`, () =>
      // ❌ Not unique
      processItem(item),
    ),
  ),
);
  • 使用带前缀的描述性步骤名称:
    {workflow}:step-name
  • 每个步骤应该是幂等的(重试安全)
  • 不要嵌套 context.run() 调用 - 保持扁平化
  • 处理多个项目时使用唯一的步骤名称:
typescript
// 好:唯一的步骤名称
await Promise.all(
  items.map((item) => context.run(`{workflow}:execute:${item.id}`, () => processItem(item))),
);

// 坏:所有项目使用相同的步骤名称
await Promise.all(
  items.map((item) =>
    context.run(`{workflow}:execute`, () =>
      // ❌ 不唯一
      processItem(item),
    ),
  ),
);

6. Payload Validation

6. 负载验证

Always validate required parameters at the start:
typescript
export const { POST } = serve<Payload>(
  async (context) => {
    const { itemId, configId } = context.requestPayload ?? {};

    // Validate at the start
    if (!itemId) {
      return { success: false, error: 'Missing itemId in payload' };
    }

    if (!configId) {
      return { success: false, error: 'Missing configId in payload' };
    }

    // Proceed with work...
  },
  { flowControl: { ... } },
);
始终在开头验证必填参数:
typescript
export const { POST } = serve<Payload>(
  async (context) => {
    const { itemId, configId } = context.requestPayload ?? {};

    // 开头验证
    if (!itemId) {
      return { success: false, error: '负载中缺少 itemId' };
    }

    if (!configId) {
      return { success: false, error: '负载中缺少 configId' };
    }

    // 继续执行任务...
  },
  { flowControl: { ... } },
);

7. Database Connection

7. 数据库连接

Get database connection once per workflow:
typescript
export const { POST } = serve<Payload>(
  async (context) => {
    const db = await getServerDB(); // Get once

    // Use in multiple steps
    const item = await context.run('get-item', async () => {
      return itemModel.findById(db, itemId);
    });

    const result = await context.run('save-result', async () => {
      return resultModel.create(db, result);
    });
  },
  { flowControl: { ... } },
);
每个工作流仅获取一次数据库连接:
typescript
export const { POST } = serve<Payload>(
  async (context) => {
    const db = await getServerDB(); // 仅获取一次

    // 在多个步骤中使用
    const item = await context.run('get-item', async () => {
      return itemModel.findById(db, itemId);
    });

    const result = await context.run('save-result', async () => {
      return resultModel.create(db, result);
    });
  },
  { flowControl: { ... } },
);

8. Testing

8. 测试

Create integration tests for workflows:
typescript
describe('WorkflowName', () => {
  it('should process items successfully', async () => {
    // Setup test data
    const items = await createTestItems();

    // Trigger workflow
    await WorkflowClass.triggerProcessItems({ dryRun: false });

    // Wait for completion (use polling or webhook)
    await waitForCompletion();

    // Verify results
    const results = await getResults();
    expect(results).toHaveLength(items.length);
  });

  it('should support dryRun mode', async () => {
    const result = await WorkflowClass.triggerProcessItems({ dryRun: true });

    expect(result).toMatchObject({
      success: true,
      dryRun: true,
      totalEligible: expect.any(Number),
      toProcess: expect.any(Number),
    });
  });
});

为工作流创建集成测试:
typescript
describe('WorkflowName', () => {
  it('应该成功处理项目', async () => {
    // 设置测试数据
    const items = await createTestItems();

    // 触发工作流
    await WorkflowClass.triggerProcessItems({ dryRun: false });

    // 等待完成(使用轮询或webhook)
    await waitForCompletion();

    // 验证结果
    const results = await getResults();
    expect(results).toHaveLength(items.length);
  });

  it('应该支持试运行模式', async () => {
    const result = await WorkflowClass.triggerProcessItems({ dryRun: true });

    expect(result).toMatchObject({
      success: true,
      dryRun: true,
      totalEligible: expect.any(Number),
      toProcess: expect.any(Number),
    });
  });
});

Examples

示例

Example 1: Welcome Placeholder

示例1:欢迎占位符

Use Case: Generate AI-powered welcome placeholders for users
Structure:
  • Layer 1:
    process-users
    - Entry point, checks eligible users
  • Layer 2:
    paginate-users
    - Paginates through active users
  • Layer 3:
    generate-user
    - Generates placeholders for ONE user
Core Patterns Demonstrated:
  1. Dry-Run Mode:
typescript
// Layer 1: process-users
if (dryRun) {
  return {
    ...result,
    dryRun: true,
    message: `[DryRun] Would process ${usersNeedingGeneration.length} users`,
  };
}
  1. Fan-Out Pattern:
typescript
// Layer 2: paginate-users
if (userIds.length > CHUNK_SIZE) {
  const chunks = chunk(userIds, CHUNK_SIZE);
  await Promise.all(
    chunks.map((ids, idx) =>
      context.run(`welcome-placeholder:fanout:${idx + 1}/${chunks.length}`, () =>
        WelcomePlaceholderWorkflow.triggerPaginateUsers({ userIds: ids }),
      ),
    ),
  );
}
  1. Single Task Execution:
typescript
// Layer 3: generate-user
export const { POST } = serve<GenerateUserPlaceholderPayload>(async (context) => {
  const { userId } = context.requestPayload ?? {};

  // Execute for ONE user only
  const workflow = new WelcomePlaceholderWorkflow(db, userId);
  const placeholders = await context.run('generate', () => workflow.generate());

  return { success: true, userId, placeholdersCount: placeholders.length };
});
Key Features:
  • ✅ Filters users who already have cached placeholders in Redis
  • ✅ Supports
    paidOnly
    flag to process only subscribed users
  • ✅ Supports
    dryRun
    mode for statistics
  • ✅ Uses fan-out for large user batches (CHUNK_SIZE=20)
  • ✅ Each execution processes exactly ONE user
Files:
  • /api/workflows/welcome-placeholder/process-users/route.ts
  • /api/workflows/welcome-placeholder/paginate-users/route.ts
  • /api/workflows/welcome-placeholder/generate-user/route.ts
  • /server/workflows/welcomePlaceholder/index.ts
使用场景: 为用户生成AI驱动的欢迎占位符
结构:
  • 第一层:
    process-users
    - 入口点,检查符合条件的用户
  • 第二层:
    paginate-users
    - 分页遍历活跃用户
  • 第三层:
    generate-user
    - 为单个用户生成占位符
核心模式演示:
  1. 试运行模式:
typescript
// 第一层:process-users
if (dryRun) {
  return {
    ...result,
    dryRun: true,
    message: `[试运行] 将为 ${usersNeedingGeneration.length} 个用户生成内容`,
  };
}
  1. 扇出模式:
typescript
// 第二层:paginate-users
if (userIds.length > CHUNK_SIZE) {
  const chunks = chunk(userIds, CHUNK_SIZE);
  await Promise.all(
    chunks.map((ids, idx) =>
      context.run(`welcome-placeholder:fanout:${idx + 1}/${chunks.length}`, () =>
        WelcomePlaceholderWorkflow.triggerPaginateUsers({ userIds: ids }),
      ),
    ),
  );
}
  1. 单任务执行:
typescript
// 第三层:generate-user
export const { POST } = serve<GenerateUserPlaceholderPayload>(async (context) => {
  const { userId } = context.requestPayload ?? {};

  // 仅为单个用户执行
  const workflow = new WelcomePlaceholderWorkflow(db, userId);
  const placeholders = await context.run('generate', () => workflow.generate());

  return { success: true, userId, placeholdersCount: placeholders.length };
});
核心特性:
  • ✅ 过滤Redis中已有缓存占位符的用户
  • ✅ 支持
    paidOnly
    标志仅处理订阅用户
  • ✅ 支持
    dryRun
    模式获取统计数据
  • ✅ 对大量用户批量使用扇出(CHUNK_SIZE=20)
  • ✅ 每次执行仅处理一个用户
文件:
  • /api/workflows/welcome-placeholder/process-users/route.ts
  • /api/workflows/welcome-placeholder/paginate-users/route.ts
  • /api/workflows/welcome-placeholder/generate-user/route.ts
  • /server/workflows/welcomePlaceholder/index.ts

Example 2: Agent Welcome

示例2:Agent欢迎信息

Use Case: Generate welcome messages and open questions for AI agents
Structure:
  • Layer 1:
    process-agents
    - Entry point, checks eligible agents
  • Layer 2:
    paginate-agents
    - Paginates through active agents
  • Layer 3:
    generate-agent
    - Generates welcome data for ONE agent
Core Patterns Demonstrated:
  1. Dry-Run Mode:
typescript
// Layer 1: process-agents
if (dryRun) {
  return {
    ...result,
    dryRun: true,
    message: `[DryRun] Would process ${agentsNeedingGeneration.length} agents`,
  };
}
  1. Fan-Out Pattern: Same as welcome-placeholder
  2. Single Task Execution:
typescript
// Layer 3: generate-agent
export const { POST } = serve<GenerateAgentWelcomePayload>(async (context) => {
  const { agentId } = context.requestPayload ?? {};

  // Execute for ONE agent only
  const workflow = new AgentWelcomeWorkflow(db, agentId);
  const data = await context.run('generate', () => workflow.generate());

  return { success: true, agentId, data };
});
Key Features:
  • ✅ Filters agents who already have cached data in Redis
  • ✅ Supports
    paidOnly
    flag for subscribed users' agents only
  • ✅ Supports
    dryRun
    mode for statistics
  • ✅ Uses fan-out for large agent batches (CHUNK_SIZE=20)
  • ✅ Each execution processes exactly ONE agent
Files:
  • /api/workflows/agent-welcome/process-agents/route.ts
  • /api/workflows/agent-welcome/paginate-agents/route.ts
  • /api/workflows/agent-welcome/generate-agent/route.ts
  • /server/workflows/agentWelcome/index.ts

使用场景: 为AI Agent生成欢迎消息和开放式问题
结构:
  • 第一层:
    process-agents
    - 入口点,检查符合条件的Agent
  • 第二层:
    paginate-agents
    - 分页遍历活跃Agent
  • 第三层:
    generate-agent
    - 为单个Agent生成欢迎数据
核心模式演示:
  1. 试运行模式:
typescript
// 第一层:process-agents
if (dryRun) {
  return {
    ...result,
    dryRun: true,
    message: `[试运行] 将为 ${agentsNeedingGeneration.length} 个Agent生成内容`,
  };
}
  1. 扇出模式: 和欢迎占位符示例相同
  2. 单任务执行:
typescript
// 第三层:generate-agent
export const { POST } = serve<GenerateAgentWelcomePayload>(async (context) => {
  const { agentId } = context.requestPayload ?? {};

  // 仅为单个Agent执行
  const workflow = new AgentWelcomeWorkflow(db, agentId);
  const data = await context.run('generate', () => workflow.generate());

  return { success: true, agentId, data };
});
核心特性:
  • ✅ 过滤Redis中已有缓存数据的Agent
  • ✅ 支持
    paidOnly
    标志仅处理订阅用户的Agent
  • ✅ 支持
    dryRun
    模式获取统计数据
  • ✅ 对大量Agent批量使用扇出(CHUNK_SIZE=20)
  • ✅ 每次执行仅处理一个Agent
文件:
  • /api/workflows/agent-welcome/process-agents/route.ts
  • /api/workflows/agent-welcome/paginate-agents/route.ts
  • /api/workflows/agent-welcome/generate-agent/route.ts
  • /server/workflows/agentWelcome/index.ts

Key Takeaways from Examples

示例核心要点

Both workflows follow the exact same pattern:
  1. Layer 1 (Entry Point):
    • Calculate statistics
    • Filter existing items
    • Support dry-run mode
    • Trigger pagination only if needed
  2. Layer 2 (Pagination):
    • Paginate with cursor (PAGE_SIZE=50)
    • Fan-out large batches (CHUNK_SIZE=20)
    • Trigger Layer 3 for each item
    • Recursively process all pages
  3. Layer 3 (Execution):
    • Process ONE item per execution
    • Perform business logic
    • Save results
    • Return success/failure
The only differences are:
  • Entity type (users vs agents)
  • Business logic (placeholder generation vs welcome generation)
  • Data source (different database queries)

两个工作流都遵循完全相同的模式:
  1. 第一层(入口点):
    • 计算统计数据
    • 过滤现有项目
    • 支持试运行模式
    • 仅在需要时触发分页
  2. 第二层(分页层):
    • 使用游标分页(PAGE_SIZE=50)
    • 对大批量任务扇出(CHUNK_SIZE=20)
    • 为每个项目触发第三层
    • 递归处理所有分页
  3. 第三层(执行层):
    • 每次执行处理一个项目
    • 执行业务逻辑
    • 保存结果
    • 返回成功/失败状态
唯一的区别是:
  • 实体类型(用户 vs Agent)
  • 业务逻辑(占位符生成 vs 欢迎信息生成)
  • 数据源(不同的数据库查询)

Common Pitfalls

常见陷阱

❌ Don't: Use context.run() without unique names

❌ 不要:使用context.run()时不指定唯一名称

typescript
// Bad: Same step name when processing multiple items
await Promise.all(items.map((item) => context.run('process', () => process(item))));
typescript
// Good: Unique step names
await Promise.all(items.map((item) => context.run(`process:${item.id}`, () => process(item))));
typescript
// 坏:处理多个项目时使用相同的步骤名称
await Promise.all(items.map((item) => context.run('process', () => process(item))));
typescript
// 好:使用唯一的步骤名称
await Promise.all(items.map((item) => context.run(`process:${item.id}`, () => process(item))));

❌ Don't: Forget to validate payload parameters

❌ 不要:忘记验证负载参数

typescript
// Bad: No validation
export const { POST } = serve<Payload>(async (context) => {
  const { itemId } = context.requestPayload ?? {};
  const result = await process(itemId); // May fail with undefined
});
typescript
// Good: Validate early
export const { POST } = serve<Payload>(async (context) => {
  const { itemId } = context.requestPayload ?? {};

  if (!itemId) {
    return { success: false, error: 'Missing itemId' };
  }

  const result = await process(itemId);
});
typescript
// 坏:没有验证
export const { POST } = serve<Payload>(async (context) => {
  const { itemId } = context.requestPayload ?? {};
  const result = await process(itemId); // 可能因为undefined失败
});
typescript
// 好:提前验证
export const { POST } = serve<Payload>(async (context) => {
  const { itemId } = context.requestPayload ?? {};

  if (!itemId) {
    return { success: false, error: '缺少 itemId' };
  }

  const result = await process(itemId);
});

❌ Don't: Skip filtering existing items

❌ 不要:跳过过滤现有项目

typescript
// Bad: No filtering, may duplicate work
const allItems = await getAllItems();
await Promise.all(allItems.map((item) => triggerExecute(item)));
typescript
// Good: Filter existing items first
const allItems = await getAllItems();
const itemsNeedingProcessing = await filterExisting(allItems);
await Promise.all(itemsNeedingProcessing.map((item) => triggerExecute(item)));
typescript
// 坏:没有过滤,可能重复工作
const allItems = await getAllItems();
await Promise.all(allItems.map((item) => triggerExecute(item)));
typescript
// 好:先过滤现有项目
const allItems = await getAllItems();
const itemsNeedingProcessing = await filterExisting(allItems);
await Promise.all(itemsNeedingProcessing.map((item) => triggerExecute(item)));

❌ Don't: Use inconsistent logging

❌ 不要:使用不一致的日志记录

typescript
// Bad: Inconsistent prefixes and formats
console.log('Starting workflow');
log.info('Processing item:', itemId);
console.log(`Done with ${itemId}`);
typescript
// Good: Consistent structured logging
console.log('[workflow:layer] Starting with payload:', payload);
console.log('[workflow:layer] Processing item:', { itemId });
console.log('[workflow:layer] Completed:', { itemId, result });

typescript
// 坏:前缀和格式不一致
console.log('Starting workflow');
log.info('Processing item:', itemId);
console.log(`Done with ${itemId}`);
typescript
// 好:统一的结构化日志
console.log('[workflow:layer] 启动,负载:', payload);
console.log('[workflow:layer] 处理项目:', { itemId });
console.log('[workflow:layer] 完成:', { itemId, result });

Environment Variables Required

所需环境变量

bash
undefined
bash
undefined

Required for all workflows

所有工作流必填

APP_URL=https://your-app.com # Base URL for workflow endpoints QSTASH_TOKEN=qstash_xxx # QStash authentication token
APP_URL=https://your-app.com # 工作流端点的基础URL QSTASH_TOKEN=qstash_xxx # QStash认证令牌

Optional (for custom QStash URL)

可选(用于自定义QStash URL)

QSTASH_URL=https://custom-qstash.com # Custom QStash endpoint

---
QSTASH_URL=https://custom-qstash.com # 自定义QStash端点

---

Checklist for New Workflows

新工作流检查清单

Planning Phase

规划阶段

  • Identify entity to process (users, agents, items, etc.)
  • Define business logic for single item execution
  • Determine filtering logic (Redis cache, database state, etc.)
  • 确定要处理的实体(用户、Agent、项目等)
  • 定义单个项目执行的业务逻辑
  • 确定过滤逻辑(Redis缓存、数据库状态等)

Implementation Phase

实现阶段

  • Define payload types with proper TypeScript interfaces
  • Create workflow class with static trigger methods
  • Layer 1: Implement entry point with dry-run support
  • Layer 1: Add filtering logic to avoid duplicate work
  • Layer 2: Implement pagination with fan-out logic
  • Layer 3: Implement single task execution (ONE item per run)
  • Configure appropriate flowControl for each layer
  • Add consistent logging with workflow prefixes
  • Validate all required payload parameters
  • Use unique context.run() step names
  • 使用合适的TypeScript接口定义负载类型
  • 创建带有静态触发方法的工作流类
  • 第一层:实现支持试运行的入口点
  • 第一层:添加过滤逻辑避免重复工作
  • 第二层:实现带有扇出逻辑的分页
  • 第三层:实现单任务执行(每次运行处理一个项目)
  • 为每一层配置合适的flowControl
  • 添加带有工作流前缀的统一日志
  • 验证所有必填负载参数
  • 使用唯一的context.run()步骤名称

Quality & Deployment

质量与部署

  • Return consistent response shapes
  • Configure cloud deployment (see Cloud Guide if using lobehub-cloud)
  • Write integration tests
  • Test with dry-run mode first
  • Test with small batch before full rollout

  • 返回统一的响应结构
  • 配置云部署(如果使用lobehub-cloud请参考云指南
  • 编写集成测试
  • 先使用试运行模式测试
  • 全量发布前先使用小批量测试

Additional Resources

额外资源