Loading...
Loading...
Upstash Workflow implementation guide. Use when creating async workflows with QStash, implementing fan-out patterns, or building 3-layer workflow architecture (process → paginate → execute).
npx skill4agent add lobehub/lobehub upstash-workflowLayer 1: Entry Point (process-*)
├─ Validates prerequisites
├─ Calculates total items to process
├─ Filters existing items
├─ Supports dry-run mode (statistics only)
└─ Triggers Layer 2 if work needed
Layer 2: Pagination (paginate-*)
├─ Handles cursor-based pagination
├─ Implements fan-out for large batches
├─ Recursively processes all pages
└─ Triggers Layer 3 for each item
Layer 3: Single Task Execution (execute-*/generate-*)
└─ Performs actual business logic for ONE itemwelcome-placeholderagent-welcome// Layer 1: Entry Point
if (dryRun) {
console.log('[workflow:process] Dry run mode, returning statistics only');
return {
...result,
dryRun: true,
message: `[DryRun] Would process ${itemsNeedingProcessing.length} items`,
};
}{
success: true,
totalEligible: 100,
toProcess: 80,
alreadyProcessed: 20,
dryRun: true,
message: "[DryRun] Would process 80 items"
}// Layer 2: Pagination
const CHUNK_SIZE = 20;
if (itemIds.length > CHUNK_SIZE) {
// Fan-out to smaller chunks
const chunks = chunk(itemIds, CHUNK_SIZE);
console.log('[workflow:paginate] Fan-out mode:', {
chunks: chunks.length,
chunkSize: CHUNK_SIZE,
totalItems: itemIds.length,
});
await Promise.all(
chunks.map((ids, idx) =>
context.run(`workflow:fanout:${idx + 1}/${chunks.length}`, () =>
WorkflowClass.triggerPaginateItems({ itemIds: ids }),
),
),
);
}PAGE_SIZE = 50CHUNK_SIZE = 20// Layer 3: Single Task Execution
export const { POST } = serve<ExecutePayload>(
async (context) => {
const { itemId } = context.requestPayload ?? {};
if (!itemId) {
return { success: false, error: 'Missing itemId' };
}
// Get item
const item = await context.run('workflow:get-item', async () => {
return getItem(itemId);
});
// Execute business logic for THIS item only
const result = await context.run('workflow:execute', async () => {
return processItem(item);
});
// Save result for THIS item
await context.run('workflow:save', async () => {
return saveResult(itemId, result);
});
return { success: true, itemId, result };
},
{
flowControl: {
key: 'workflow.execute',
parallelism: 10,
ratePerSecond: 5,
},
},
);flowControlsrc/
├── app/(backend)/api/workflows/
│ └── {workflow-name}/
│ ├── process-{entities}/route.ts # Layer 1
│ ├── paginate-{entities}/route.ts # Layer 2
│ └── execute-{entity}/route.ts # Layer 3
│
└── server/workflows/
└── {workflowName}/
└── index.ts # Workflow classsrc/server/workflows/{workflowName}/index.tsimport { Client } from '@upstash/workflow';
import debug from 'debug';
const log = debug('lobe-server:workflows:{workflow-name}');
// Workflow paths
const WORKFLOW_PATHS = {
processItems: '/api/workflows/{workflow-name}/process-items',
paginateItems: '/api/workflows/{workflow-name}/paginate-items',
executeItem: '/api/workflows/{workflow-name}/execute-item',
} as const;
// Payload types
export interface ProcessItemsPayload {
dryRun?: boolean;
force?: boolean;
}
export interface PaginateItemsPayload {
cursor?: string;
itemIds?: string[]; // For fanout chunks
}
export interface ExecuteItemPayload {
itemId: string;
}
/**
* Get workflow URL using APP_URL
*/
const getWorkflowUrl = (path: string): string => {
const baseUrl = process.env.APP_URL;
if (!baseUrl) throw new Error('APP_URL is required to trigger workflows');
return new URL(path, baseUrl).toString();
};
/**
* Get workflow client
*/
const getWorkflowClient = (): Client => {
const token = process.env.QSTASH_TOKEN;
if (!token) throw new Error('QSTASH_TOKEN is required to trigger workflows');
const config: ConstructorParameters<typeof Client>[0] = { token };
if (process.env.QSTASH_URL) {
(config as Record<string, unknown>).url = process.env.QSTASH_URL;
}
return new Client(config);
};
/**
* {Workflow Name} Workflow
*/
export class {WorkflowName}Workflow {
private static client: Client;
private static getClient(): Client {
if (!this.client) {
this.client = getWorkflowClient();
}
return this.client;
}
/**
* Trigger workflow to process items (entry point)
*/
static triggerProcessItems(payload: ProcessItemsPayload) {
const url = getWorkflowUrl(WORKFLOW_PATHS.processItems);
log('Triggering process-items workflow');
return this.getClient().trigger({ body: payload, url });
}
/**
* Trigger workflow to paginate items
*/
static triggerPaginateItems(payload: PaginateItemsPayload) {
const url = getWorkflowUrl(WORKFLOW_PATHS.paginateItems);
log('Triggering paginate-items workflow');
return this.getClient().trigger({ body: payload, url });
}
/**
* Trigger workflow to execute a single item
*/
static triggerExecuteItem(payload: ExecuteItemPayload) {
const url = getWorkflowUrl(WORKFLOW_PATHS.executeItem);
log('Triggering execute-item workflow: %s', payload.itemId);
return this.getClient().trigger({ body: payload, url });
}
/**
* Filter items that need processing (e.g., check Redis cache, database state)
*/
static async filterItemsNeedingProcessing(itemIds: string[]): Promise<string[]> {
if (itemIds.length === 0) return [];
// Check existing state (Redis, database, etc.)
// Return items that need processing
return itemIds;
}
}import { serve } from '@upstash/workflow/nextjs';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type ProcessPayload } from '@/server/workflows/{workflowName}';
/**
* Entry workflow for {workflow description}
* 1. Get all eligible items
* 2. Filter items that already have results
* 3. If dryRun, return statistics only
* 4. If no items need processing, return early
* 5. Trigger paginate workflow
*/
export const { POST } = serve<ProcessPayload>(
async (context) => {
const { dryRun, force } = context.requestPayload ?? {};
console.log('[{workflow}:process] Starting with payload:', { dryRun, force });
// Get all eligible items
const allItemIds = await context.run('{workflow}:get-all-items', async () => {
const db = await getServerDB();
// Query database for eligible items
return items.map((item) => item.id);
});
console.log('[{workflow}:process] Total eligible items:', allItemIds.length);
if (allItemIds.length === 0) {
return {
success: true,
totalEligible: 0,
message: 'No eligible items found',
};
}
// Filter items that need processing
const itemsNeedingProcessing = await context.run('{workflow}:filter-existing', () =>
WorkflowClass.filterItemsNeedingProcessing(allItemIds),
);
const result = {
success: true,
totalEligible: allItemIds.length,
toProcess: itemsNeedingProcessing.length,
alreadyProcessed: allItemIds.length - itemsNeedingProcessing.length,
};
console.log('[{workflow}:process] Check result:', result);
// If dryRun mode, return statistics only
if (dryRun) {
console.log('[{workflow}:process] Dry run mode, returning statistics only');
return {
...result,
dryRun: true,
message: `[DryRun] Would process ${itemsNeedingProcessing.length} items`,
};
}
// If no items need processing, return early
if (itemsNeedingProcessing.length === 0) {
console.log('[{workflow}:process] All items already processed');
return {
...result,
message: 'All items already processed',
};
}
// Trigger paginate workflow
console.log('[{workflow}:process] Triggering paginate workflow');
await context.run('{workflow}:trigger-paginate', () => WorkflowClass.triggerPaginateItems({}));
return {
...result,
message: `Triggered pagination for ${itemsNeedingProcessing.length} items`,
};
},
{
flowControl: {
key: '{workflow}.process',
parallelism: 1,
ratePerSecond: 1,
},
},
);import { serve } from '@upstash/workflow/nextjs';
import { chunk } from 'es-toolkit/compat';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type PaginatePayload } from '@/server/workflows/{workflowName}';
const PAGE_SIZE = 50;
const CHUNK_SIZE = 20;
/**
* Paginate items workflow - handles pagination and fanout
* 1. If specific itemIds provided (from fanout), process them directly
* 2. Otherwise, paginate through all items using cursor
* 3. Filter items that need processing
* 4. If batch > CHUNK_SIZE, fanout to smaller chunks
* 5. Trigger execute workflow for each item
* 6. Schedule next page if cursor exists
*/
export const { POST } = serve<PaginatePayload>(
async (context) => {
const { cursor, itemIds: payloadItemIds } = context.requestPayload ?? {};
console.log('[{workflow}:paginate] Starting with payload:', {
cursor,
itemIdsCount: payloadItemIds?.length ?? 0,
});
// If specific itemIds are provided, process them directly (from fanout)
if (payloadItemIds && payloadItemIds.length > 0) {
console.log('[{workflow}:paginate] Processing specific itemIds:', {
count: payloadItemIds.length,
});
await Promise.all(
payloadItemIds.map((itemId) =>
context.run(`{workflow}:execute:${itemId}`, () =>
WorkflowClass.triggerExecuteItem({ itemId }),
),
),
);
return {
success: true,
processedItems: payloadItemIds.length,
};
}
// Paginate through all items
const itemBatch = await context.run('{workflow}:get-batch', async () => {
const db = await getServerDB();
// Query database with cursor and PAGE_SIZE
const items = await db.query(...);
if (!items.length) return { ids: [] };
const last = items.at(-1);
return {
ids: items.map(item => item.id),
cursor: last ? last.id : undefined,
};
});
const batchItemIds = itemBatch.ids;
const nextCursor = 'cursor' in itemBatch ? itemBatch.cursor : undefined;
console.log('[{workflow}:paginate] Got batch:', {
batchSize: batchItemIds.length,
nextCursor,
});
if (batchItemIds.length === 0) {
console.log('[{workflow}:paginate] No more items, pagination complete');
return { success: true, message: 'Pagination complete' };
}
// Filter items that need processing
const itemIds = await context.run('{workflow}:filter-existing', () =>
WorkflowClass.filterItemsNeedingProcessing(batchItemIds),
);
console.log('[{workflow}:paginate] After filtering:', {
needProcessing: itemIds.length,
skipped: batchItemIds.length - itemIds.length,
});
// Process items if any need processing
if (itemIds.length > 0) {
if (itemIds.length > CHUNK_SIZE) {
// Fanout to smaller chunks
const chunks = chunk(itemIds, CHUNK_SIZE);
console.log('[{workflow}:paginate] Fanout mode:', {
chunks: chunks.length,
chunkSize: CHUNK_SIZE,
totalItems: itemIds.length,
});
await Promise.all(
chunks.map((ids, idx) =>
context.run(`{workflow}:fanout:${idx + 1}/${chunks.length}`, () =>
WorkflowClass.triggerPaginateItems({ itemIds: ids }),
),
),
);
} else {
// Process directly
console.log('[{workflow}:paginate] Processing items directly:', {
count: itemIds.length,
});
await Promise.all(
itemIds.map((itemId) =>
context.run(`{workflow}:execute:${itemId}`, () =>
WorkflowClass.triggerExecuteItem({ itemId }),
),
),
);
}
}
// Schedule next page
if (nextCursor) {
console.log('[{workflow}:paginate] Scheduling next page:', { nextCursor });
await context.run('{workflow}:next-page', () =>
WorkflowClass.triggerPaginateItems({ cursor: nextCursor }),
);
} else {
console.log('[{workflow}:paginate] No more pages');
}
return {
success: true,
processedItems: itemIds.length,
skippedItems: batchItemIds.length - itemIds.length,
nextCursor: nextCursor ?? null,
};
},
{
flowControl: {
key: '{workflow}.paginate',
parallelism: 20,
ratePerSecond: 5,
},
},
);import { serve } from '@upstash/workflow/nextjs';
import { getServerDB } from '@/database/server';
import { WorkflowClass, type ExecutePayload } from '@/server/workflows/{workflowName}';
/**
* Execute item workflow - performs actual business logic
* 1. Get item data
* 2. Perform business logic (AI generation, data processing, etc.)
* 3. Save results
*/
export const { POST } = serve<ExecutePayload>(
async (context) => {
const { itemId } = context.requestPayload ?? {};
console.log('[{workflow}:execute] Starting:', { itemId });
if (!itemId) {
return { success: false, error: 'Missing itemId' };
}
const db = await getServerDB();
// Get item data
const item = await context.run('{workflow}:get-item', async () => {
// Query database for item
return item;
});
if (!item) {
return { success: false, error: 'Item not found' };
}
// Perform business logic
const result = await context.run('{workflow}:process-item', async () => {
const workflow = new WorkflowClass(db, itemId);
return workflow.generate(); // or process(), execute(), etc.
});
// Save results
await context.run('{workflow}:save-result', async () => {
const workflow = new WorkflowClass(db, itemId);
return workflow.saveToRedis(result); // or saveToDatabase(), etc.
});
console.log('[{workflow}:execute] Completed:', { itemId });
return {
success: true,
itemId,
result,
};
},
{
flowControl: {
key: '{workflow}.execute',
parallelism: 10,
ratePerSecond: 5,
},
},
);export const { POST } = serve<Payload>(
async (context) => {
const { itemId } = context.requestPayload ?? {};
// Validate required parameters
if (!itemId) {
return { success: false, error: 'Missing itemId in payload' };
}
try {
// Perform work
const result = await context.run('step-name', () => doWork(itemId));
return { success: true, itemId, result };
} catch (error) {
console.error('[workflow:error]', error);
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error'
};
}
},
{ flowControl: { ... } },
);console.log('[{workflow}:{layer}] Starting with payload:', payload);
console.log('[{workflow}:{layer}] Processing items:', { count: items.length });
console.log('[{workflow}:{layer}] Completed:', result);
console.error('[{workflow}:{layer}:error]', error);// Success response
return {
success: true,
itemId,
result,
message: 'Optional success message',
};
// Error response
return {
success: false,
error: 'Error description',
itemId, // Include context if available
};
// Statistics response (for entry point)
return {
success: true,
totalEligible: 100,
toProcess: 80,
alreadyProcessed: 20,
dryRun: true, // If applicable
message: 'Summary message',
};// Layer 1: Entry point - single instance only
flowControl: {
key: '{workflow}.process',
parallelism: 1, // Only 1 process workflow at a time
ratePerSecond: 1, // 1 execution per second
}
// Layer 2: Pagination - moderate concurrency
flowControl: {
key: '{workflow}.paginate',
parallelism: 20, // Up to 20 pagination workflows in parallel
ratePerSecond: 5, // 5 new executions per second
}
// Layer 3: Single task execution - high concurrency
flowControl: {
key: '{workflow}.execute',
parallelism: 10, // Up to 10 items processed in parallel
ratePerSecond: 5, // 5 new items per second
}parallelism: 1ratePerSecond{workflow}:step-name// Good: Unique step names
await Promise.all(
items.map((item) => context.run(`{workflow}:execute:${item.id}`, () => processItem(item))),
);
// Bad: Same step name for all items
await Promise.all(
items.map((item) =>
context.run(`{workflow}:execute`, () =>
// ❌ Not unique
processItem(item),
),
),
);export const { POST } = serve<Payload>(
async (context) => {
const { itemId, configId } = context.requestPayload ?? {};
// Validate at the start
if (!itemId) {
return { success: false, error: 'Missing itemId in payload' };
}
if (!configId) {
return { success: false, error: 'Missing configId in payload' };
}
// Proceed with work...
},
{ flowControl: { ... } },
);export const { POST } = serve<Payload>(
async (context) => {
const db = await getServerDB(); // Get once
// Use in multiple steps
const item = await context.run('get-item', async () => {
return itemModel.findById(db, itemId);
});
const result = await context.run('save-result', async () => {
return resultModel.create(db, result);
});
},
{ flowControl: { ... } },
);describe('WorkflowName', () => {
it('should process items successfully', async () => {
// Setup test data
const items = await createTestItems();
// Trigger workflow
await WorkflowClass.triggerProcessItems({ dryRun: false });
// Wait for completion (use polling or webhook)
await waitForCompletion();
// Verify results
const results = await getResults();
expect(results).toHaveLength(items.length);
});
it('should support dryRun mode', async () => {
const result = await WorkflowClass.triggerProcessItems({ dryRun: true });
expect(result).toMatchObject({
success: true,
dryRun: true,
totalEligible: expect.any(Number),
toProcess: expect.any(Number),
});
});
});process-userspaginate-usersgenerate-user// Layer 1: process-users
if (dryRun) {
return {
...result,
dryRun: true,
message: `[DryRun] Would process ${usersNeedingGeneration.length} users`,
};
}// Layer 2: paginate-users
if (userIds.length > CHUNK_SIZE) {
const chunks = chunk(userIds, CHUNK_SIZE);
await Promise.all(
chunks.map((ids, idx) =>
context.run(`welcome-placeholder:fanout:${idx + 1}/${chunks.length}`, () =>
WelcomePlaceholderWorkflow.triggerPaginateUsers({ userIds: ids }),
),
),
);
}// Layer 3: generate-user
export const { POST } = serve<GenerateUserPlaceholderPayload>(async (context) => {
const { userId } = context.requestPayload ?? {};
// Execute for ONE user only
const workflow = new WelcomePlaceholderWorkflow(db, userId);
const placeholders = await context.run('generate', () => workflow.generate());
return { success: true, userId, placeholdersCount: placeholders.length };
});paidOnlydryRun/api/workflows/welcome-placeholder/process-users/route.ts/api/workflows/welcome-placeholder/paginate-users/route.ts/api/workflows/welcome-placeholder/generate-user/route.ts/server/workflows/welcomePlaceholder/index.tsprocess-agentspaginate-agentsgenerate-agent// Layer 1: process-agents
if (dryRun) {
return {
...result,
dryRun: true,
message: `[DryRun] Would process ${agentsNeedingGeneration.length} agents`,
};
}// Layer 3: generate-agent
export const { POST } = serve<GenerateAgentWelcomePayload>(async (context) => {
const { agentId } = context.requestPayload ?? {};
// Execute for ONE agent only
const workflow = new AgentWelcomeWorkflow(db, agentId);
const data = await context.run('generate', () => workflow.generate());
return { success: true, agentId, data };
});paidOnlydryRun/api/workflows/agent-welcome/process-agents/route.ts/api/workflows/agent-welcome/paginate-agents/route.ts/api/workflows/agent-welcome/generate-agent/route.ts/server/workflows/agentWelcome/index.ts// Bad: Same step name when processing multiple items
await Promise.all(items.map((item) => context.run('process', () => process(item))));// Good: Unique step names
await Promise.all(items.map((item) => context.run(`process:${item.id}`, () => process(item))));// Bad: No validation
export const { POST } = serve<Payload>(async (context) => {
const { itemId } = context.requestPayload ?? {};
const result = await process(itemId); // May fail with undefined
});// Good: Validate early
export const { POST } = serve<Payload>(async (context) => {
const { itemId } = context.requestPayload ?? {};
if (!itemId) {
return { success: false, error: 'Missing itemId' };
}
const result = await process(itemId);
});// Bad: No filtering, may duplicate work
const allItems = await getAllItems();
await Promise.all(allItems.map((item) => triggerExecute(item)));// Good: Filter existing items first
const allItems = await getAllItems();
const itemsNeedingProcessing = await filterExisting(allItems);
await Promise.all(itemsNeedingProcessing.map((item) => triggerExecute(item)));// Bad: Inconsistent prefixes and formats
console.log('Starting workflow');
log.info('Processing item:', itemId);
console.log(`Done with ${itemId}`);// Good: Consistent structured logging
console.log('[workflow:layer] Starting with payload:', payload);
console.log('[workflow:layer] Processing item:', { itemId });
console.log('[workflow:layer] Completed:', { itemId, result });# Required for all workflows
APP_URL=https://your-app.com # Base URL for workflow endpoints
QSTASH_TOKEN=qstash_xxx # QStash authentication token
# Optional (for custom QStash URL)
QSTASH_URL=https://custom-qstash.com # Custom QStash endpoint