cloudflare-workflows
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCloudflare Workflows
Cloudflare Workflows
Status: Production Ready ✅
Last Updated: 2025-10-22
Dependencies: cloudflare-worker-base (for Worker setup)
Latest Versions: wrangler@4.44.0, @cloudflare/workers-types@4.20251014.0
状态:已就绪可用于生产环境 ✅
最后更新:2025-10-22
依赖项:cloudflare-worker-base(用于Worker配置)
最新版本:wrangler@4.44.0, @cloudflare/workers-types@4.20251014.0
Quick Start (10 Minutes)
快速入门(10分钟)
1. Create a Workflow
1. 创建工作流
Use the Cloudflare Workflows starter template:
bash
npm create cloudflare@latest my-workflow -- --template cloudflare/workflows-starter --git --deploy false
cd my-workflowWhat you get:
- WorkflowEntrypoint class template
- Worker to trigger workflows
- Complete wrangler.jsonc configuration
使用Cloudflare Workflows入门模板:
bash
npm create cloudflare@latest my-workflow -- --template cloudflare/workflows-starter --git --deploy false
cd my-workflow你将获得:
- WorkflowEntrypoint类模板
- 用于触发工作流的Worker
- 完整的wrangler.jsonc配置
2. Understand the Basic Structure
2. 理解基础结构
src/index.ts:
typescript
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
type Env = {
MY_WORKFLOW: Workflow;
};
type Params = {
userId: string;
email: string;
};
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
// Access params from event.payload
const { userId, email } = event.payload;
// Step 1: Do some work
const result = await step.do('process user', async () => {
return { processed: true, userId };
});
// Step 2: Wait before next action
await step.sleep('wait 1 hour', '1 hour');
// Step 3: Continue workflow
await step.do('send email', async () => {
// Send email logic
return { sent: true, email };
});
// Optional: return final state
return { completed: true, userId };
}
}
// Worker to trigger workflow
export default {
async fetch(req: Request, env: Env): Promise<Response> {
// Create new workflow instance
const instance = await env.MY_WORKFLOW.create({
params: { userId: '123', email: 'user@example.com' }
});
return Response.json({
id: instance.id,
status: await instance.status()
});
}
};src/index.ts:
typescript
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
type Env = {
MY_WORKFLOW: Workflow;
};
type Params = {
userId: string;
email: string;
};
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
// 从event.payload中获取参数
const { userId, email } = event.payload;
// 步骤1:执行任务
const result = await step.do('process user', async () => {
return { processed: true, userId };
});
// 步骤2:等待后执行下一个操作
await step.sleep('wait 1 hour', '1 hour');
// 步骤3:继续工作流
await step.do('send email', async () => {
// 发送邮件逻辑
return { sent: true, email };
});
// 可选:返回最终状态
return { completed: true, userId };
}
}
// 用于触发工作流的Worker
export default {
async fetch(req: Request, env: Env): Promise<Response> {
// 创建新的工作流实例
const instance = await env.MY_WORKFLOW.create({
params: { userId: '123', email: 'user@example.com' }
});
return Response.json({
id: instance.id,
status: await instance.status()
});
}
};3. Configure Wrangler
3. 配置Wrangler
wrangler.jsonc:
jsonc
{
"name": "my-workflow",
"main": "src/index.ts",
"compatibility_date": "2025-10-22",
"workflows": [
{
"name": "my-workflow",
"binding": "MY_WORKFLOW",
"class_name": "MyWorkflow"
}
]
}wrangler.jsonc:
jsonc
{
"name": "my-workflow",
"main": "src/index.ts",
"compatibility_date": "2025-10-22",
"workflows": [
{
"name": "my-workflow",
"binding": "MY_WORKFLOW",
"class_name": "MyWorkflow"
}
]
}4. Deploy and Test
4. 部署与测试
bash
undefinedbash
undefinedDeploy workflow
部署工作流
npm run deploy
npm run deploy
Trigger workflow (visit in browser or curl)
触发工作流(在浏览器访问或使用curl)
curl https://my-workflow.<subdomain>.workers.dev/
curl https://my-workflow.<subdomain>.workers.dev/
View workflow instances
查看工作流实例
npx wrangler workflows instances list my-workflow
npx wrangler workflows instances list my-workflow
Check instance status
检查实例状态
npx wrangler workflows instances describe my-workflow <instance-id>
---npx wrangler workflows instances describe my-workflow <instance-id>
---WorkflowEntrypoint Class
WorkflowEntrypoint类
Extend WorkflowEntrypoint
继承WorkflowEntrypoint
Every Workflow must extend and implement a method:
WorkflowEntrypointrun()typescript
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
// Workflow steps here
}
}Type Parameters:
- - Environment bindings (KV, D1, R2, etc.)
Env - - Type of workflow parameters passed via
Paramsevent.payload
每个工作流必须继承并实现方法:
WorkflowEntrypointrun()typescript
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
// 工作流步骤写在这里
}
}类型参数:
- - 环境绑定(KV、D1、R2等)
Env - - 通过
Params传递的工作流参数类型event.payload
run() Method
run()方法
typescript
async run(
event: WorkflowEvent<Params>,
step: WorkflowStep
): Promise<T | void>Parameters:
- - Contains workflow metadata and payload
event - - Provides step methods (do, sleep, sleepUntil, waitForEvent)
step
Returns:
- Optional return value (must be serializable)
- Return value available via instance.status()
Example:
typescript
export class OrderWorkflow extends WorkflowEntrypoint<Env, OrderParams> {
async run(event: WorkflowEvent<OrderParams>, step: WorkflowStep) {
const { orderId, customerId } = event.payload;
// Access bindings via this.env
const order = await this.env.DB.prepare(
'SELECT * FROM orders WHERE id = ?'
).bind(orderId).first();
const result = await step.do('process payment', async () => {
// Payment processing
return { paid: true, amount: order.total };
});
// Return final state
return {
orderId,
status: 'completed',
paidAmount: result.amount
};
}
}typescript
async run(
event: WorkflowEvent<Params>,
step: WorkflowStep
): Promise<T | void>参数:
- - 包含工作流元数据和负载
event - - 提供步骤方法(do、sleep、sleepUntil、waitForEvent)
step
返回值:
- 可选返回值(必须可序列化)
- 返回值可通过instance.status()获取
示例:
typescript
export class OrderWorkflow extends WorkflowEntrypoint<Env, OrderParams> {
async run(event: WorkflowEvent<OrderParams>, step: WorkflowStep) {
const { orderId, customerId } = event.payload;
// 通过this.env访问绑定资源
const order = await this.env.DB.prepare(
'SELECT * FROM orders WHERE id = ?'
).bind(orderId).first();
const result = await step.do('process payment', async () => {
// 支付处理逻辑
return { paid: true, amount: order.total };
});
// 返回最终状态
return {
orderId,
status: 'completed',
paidAmount: result.amount
};
}
}Step Methods
步骤方法
step.do() - Execute Work
step.do() - 执行任务
typescript
step.do<T>(
name: string,
config?: WorkflowStepConfig,
callback: () => Promise<T>
): Promise<T>OR (config is optional):
typescript
step.do<T>(
name: string,
callback: () => Promise<T>
): Promise<T>Parameters:
- - Step name (for observability)
name - (optional) - Retry configuration
config - - Async function that does the work
callback
Returns:
- The value returned from callback (must be serializable)
Example:
typescript
// Simple step
const files = await step.do('fetch files', async () => {
const response = await fetch('https://api.example.com/files');
return await response.json();
});
// Step with retry config
const result = await step.do(
'call payment API',
{
retries: {
limit: 10,
delay: '10 seconds',
backoff: 'exponential'
},
timeout: '5 minutes'
},
async () => {
const response = await fetch('https://payment-api.example.com/charge', {
method: 'POST',
body: JSON.stringify({ amount: 100 })
});
return await response.json();
}
);CRITICAL - Serialization:
- Return value must be JSON serializable
- ✅ Allowed: string, number, boolean, Array, Object, null
- ❌ Forbidden: Function, Symbol, circular references, undefined
- Step will throw error if return value isn't serializable
typescript
step.do<T>(
name: string,
config?: WorkflowStepConfig,
callback: () => Promise<T>
): Promise<T>或(config为可选参数):
typescript
step.do<T>(
name: string,
callback: () => Promise<T>
): Promise<T>参数:
- - 步骤名称(用于可观测性)
name - (可选)- 重试配置
config - - 执行任务的异步函数
callback
返回值:
- callback返回的值(必须可序列化)
示例:
typescript
// 简单步骤
const files = await step.do('fetch files', async () => {
const response = await fetch('https://api.example.com/files');
return await response.json();
});
// 带重试配置的步骤
const result = await step.do(
'call payment API',
{
retries: {
limit: 10,
delay: '10 seconds',
backoff: 'exponential'
},
timeout: '5 minutes'
},
async () => {
const response = await fetch('https://payment-api.example.com/charge', {
method: 'POST',
body: JSON.stringify({ amount: 100 })
});
return await response.json();
}
);关键注意事项 - 序列化:
- 返回值必须可JSON序列化
- ✅ 允许类型:字符串、数字、布尔值、数组、对象、null
- ❌ 禁止类型:函数、Symbol、循环引用、undefined
- 如果返回值不可序列化,步骤会抛出错误
step.sleep() - Relative Sleep
step.sleep() - 相对时间等待
typescript
step.sleep(name: string, duration: WorkflowDuration): Promise<void>Parameters:
- - Step name
name - - Number (milliseconds) or human-readable string
duration
Accepted units:
- /
"second""seconds" - /
"minute""minutes" - /
"hour""hours" - /
"day""days" - /
"week""weeks" - /
"month""months" - /
"year""years"
Examples:
typescript
// Sleep for 5 minutes
await step.sleep('wait 5 minutes', '5 minutes');
// Sleep for 1 hour
await step.sleep('hourly delay', '1 hour');
// Sleep for 2 days
await step.sleep('wait 2 days', '2 days');
// Sleep using milliseconds
await step.sleep('wait 30 seconds', 30000);
// Common pattern: schedule daily task
await step.do('send daily report', async () => {
// Send report
});
await step.sleep('wait until tomorrow', '1 day');
// Workflow continues next dayPriority:
- Workflows resuming from sleep take priority over new instances
- Ensures older workflows complete before new ones start
typescript
step.sleep(name: string, duration: WorkflowDuration): Promise<void>参数:
- - 步骤名称
name - - 数字(毫秒)或人类可读的字符串
duration
支持的单位:
- /
"second""seconds" - /
"minute""minutes" - /
"hour""hours" - /
"day""days" - /
"week""weeks" - /
"month""months" - /
"year""years"
示例:
typescript
// 等待5分钟
await step.sleep('wait 5 minutes', '5 minutes');
// 等待1小时
await step.sleep('hourly delay', '1 hour');
// 等待2天
await step.sleep('wait 2 days', '2 days');
// 使用毫秒等待30秒
await step.sleep('wait 30 seconds', 30000);
// 常见模式:调度每日任务
await step.do('send daily report', async () => {
// 发送报告
});
await step.sleep('wait until tomorrow', '1 day');
// 工作流次日继续执行优先级:
- 从睡眠中恢复的工作流优先于新实例
- 确保旧工作流在新实例启动前完成
step.sleepUntil() - Sleep to Specific Date
step.sleepUntil() - 等待至指定日期
typescript
step.sleepUntil(
name: string,
timestamp: Date | number
): Promise<void>Parameters:
- - Step name
name - - Date object or UNIX timestamp (milliseconds)
timestamp
Examples:
typescript
// Sleep until specific date
const launchDate = new Date('2025-12-25T00:00:00Z');
await step.sleepUntil('wait for launch', launchDate);
// Sleep until UNIX timestamp
const timestamp = Date.parse('24 Oct 2024 13:00:00 UTC');
await step.sleepUntil('wait until time', timestamp);
// Sleep until next Monday 9am UTC
const nextMonday = new Date();
nextMonday.setDate(nextMonday.getDate() + ((1 + 7 - nextMonday.getDay()) % 7 || 7));
nextMonday.setUTCHours(9, 0, 0, 0);
await step.sleepUntil('wait until Monday 9am', nextMonday);
// Schedule work at specific time
await step.do('prepare campaign', async () => {
// Prepare marketing campaign
});
const campaignLaunch = new Date('2025-11-01T12:00:00Z');
await step.sleepUntil('wait for campaign launch', campaignLaunch);
await step.do('launch campaign', async () => {
// Launch campaign
});typescript
step.sleepUntil(
name: string,
timestamp: Date | number
): Promise<void>参数:
- - 步骤名称
name - - Date对象或UNIX时间戳(毫秒)
timestamp
示例:
typescript
// 等待至指定日期
const launchDate = new Date('2025-12-25T00:00:00Z');
await step.sleepUntil('wait for launch', launchDate);
// 等待至UNIX时间戳
const timestamp = Date.parse('24 Oct 2024 13:00:00 UTC');
await step.sleepUntil('wait until time', timestamp);
// 等待至下周一UTC时间9点
const nextMonday = new Date();
nextMonday.setDate(nextMonday.getDate() + ((1 + 7 - nextMonday.getDay()) % 7 || 7));
nextMonday.setUTCHours(9, 0, 0, 0);
await step.sleepUntil('wait until Monday 9am', nextMonday);
// 在指定时间调度任务
await step.do('prepare campaign', async () => {
// 准备营销活动
});
const campaignLaunch = new Date('2025-11-01T12:00:00Z');
await step.sleepUntil('wait for campaign launch', campaignLaunch);
await step.do('launch campaign', async () => {
// 启动营销活动
});step.waitForEvent() - Wait for External Event
step.waitForEvent() - 等待外部事件
typescript
step.waitForEvent<T>(
name: string,
options: { type: string; timeout?: string | number }
): Promise<T>Parameters:
- - Step name
name - - Event type to match
options.type - (optional) - Max wait time (default: 24 hours)
options.timeout
Returns:
- The event payload sent via
instance.sendEvent()
Example:
typescript
export class PaymentWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
// Create payment intent
await step.do('create payment intent', async () => {
// Call Stripe API
});
// Wait for webhook from Stripe (max 1 hour)
const webhookData = await step.waitForEvent<StripeWebhook>(
'wait for payment confirmation',
{ type: 'stripe-webhook', timeout: '1 hour' }
);
// Continue based on webhook
if (webhookData.status === 'succeeded') {
await step.do('fulfill order', async () => {
// Fulfill order
});
} else {
await step.do('handle failed payment', async () => {
// Handle failure
});
}
}
}
// Worker receives webhook and sends event to workflow
export default {
async fetch(req: Request, env: Env): Promise<Response> {
if (req.url.includes('/webhook/stripe')) {
const webhookData = await req.json();
// Get workflow instance by ID (stored when created)
const instance = await env.PAYMENT_WORKFLOW.get(instanceId);
// Send event to waiting workflow
await instance.sendEvent({
type: 'stripe-webhook',
payload: webhookData
});
return new Response('OK');
}
}
};Timeout behavior:
- If timeout expires, throws error and workflow can retry or fail
- Wrap in try-catch if timeout should not fail workflow
typescript
try {
const event = await step.waitForEvent('wait for user input', {
type: 'user-submitted',
timeout: '10 minutes'
});
} catch (error) {
// Timeout occurred - handle gracefully
await step.do('send reminder', async () => {
// Send reminder to user
});
}typescript
step.waitForEvent<T>(
name: string,
options: { type: string; timeout?: string | number }
): Promise<T>参数:
- - 步骤名称
name - - 要匹配的事件类型
options.type - (可选)- 最长等待时间(默认:24小时)
options.timeout
返回值:
- 通过发送的事件负载
instance.sendEvent()
示例:
typescript
export class PaymentWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
// 创建支付意向
await step.do('create payment intent', async () => {
// 调用Stripe API
});
// 等待Stripe的Webhook(最长1小时)
const webhookData = await step.waitForEvent<StripeWebhook>(
'wait for payment confirmation',
{ type: 'stripe-webhook', timeout: '1 hour' }
);
// 根据Webhook内容继续执行
if (webhookData.status === 'succeeded') {
await step.do('fulfill order', async () => {
// 履行订单
});
} else {
await step.do('handle failed payment', async () => {
// 处理支付失败
});
}
}
}
// Worker接收Webhook并向工作流发送事件
export default {
async fetch(req: Request, env: Env): Promise<Response> {
if (req.url.includes('/webhook/stripe')) {
const webhookData = await req.json();
// 通过ID获取工作流实例(创建时已存储)
const instance = await env.PAYMENT_WORKFLOW.get(instanceId);
// 向等待中的工作流发送事件
await instance.sendEvent({
type: 'stripe-webhook',
payload: webhookData
});
return new Response('OK');
}
}
};超时行为:
- 如果超时,会抛出错误,工作流可重试或失败
- 如果超时不应导致工作流失败,可包裹在try-catch中
typescript
try {
const event = await step.waitForEvent('wait for user input', {
type: 'user-submitted',
timeout: '10 minutes'
});
} catch (error) {
// 发生超时 - 优雅处理
await step.do('send reminder', async () => {
// 向用户发送提醒
});
}WorkflowStepConfig
WorkflowStepConfig
Configure retry behavior for individual steps:
typescript
interface WorkflowStepConfig {
retries?: {
limit: number; // Max retry attempts (Infinity allowed)
delay: string | number; // Delay between retries
backoff?: 'constant' | 'linear' | 'exponential';
};
timeout?: string | number; // Max time per attempt
}为单个步骤配置重试行为:
typescript
interface WorkflowStepConfig {
retries?: {
limit: number; // 最大重试次数(允许设为Infinity)
delay: string | number; // 重试间隔
backoff?: 'constant' | 'linear' | 'exponential';
};
timeout?: string | number; // 单次尝试最长时间
}Default Configuration
默认配置
If no config provided, Workflows uses:
typescript
{
retries: {
limit: 5,
delay: 10000, // 10 seconds
backoff: 'exponential'
},
timeout: '10 minutes'
}如果未提供配置,Workflows会使用以下默认值:
typescript
{
retries: {
limit: 5,
delay: 10000, // 10秒
backoff: 'exponential'
},
timeout: '10 minutes'
}Retry Examples
重试示例
Constant Backoff (same delay each time):
typescript
await step.do(
'send email',
{
retries: {
limit: 3,
delay: '30 seconds',
backoff: 'constant' // Always wait 30 seconds
}
},
async () => {
// Send email
}
);Linear Backoff (increasing delay):
typescript
await step.do(
'poll API',
{
retries: {
limit: 5,
delay: '1 minute',
backoff: 'linear' // 1m, 2m, 3m, 4m, 5m
}
},
async () => {
// Poll API
}
);Exponential Backoff (recommended for most cases):
typescript
await step.do(
'call rate-limited API',
{
retries: {
limit: 10,
delay: '10 seconds',
backoff: 'exponential' // 10s, 20s, 40s, 80s, 160s, ...
},
timeout: '5 minutes'
},
async () => {
// API call
}
);Unlimited Retries:
typescript
await step.do(
'critical operation',
{
retries: {
limit: Infinity, // Retry forever
delay: '1 minute',
backoff: 'exponential'
}
},
async () => {
// Operation that must succeed eventually
}
);No Retries:
typescript
await step.do(
'non-idempotent operation',
{
retries: {
limit: 0 // Fail immediately on error
}
},
async () => {
// One-time operation
}
);恒定间隔重试(每次延迟相同):
typescript
await step.do(
'send email',
{
retries: {
limit: 3,
delay: '30 seconds',
backoff: 'constant' // 始终等待30秒
}
},
async () => {
// 发送邮件
}
);线性间隔重试(延迟逐渐增加):
typescript
await step.do(
'poll API',
{
retries: {
limit: 5,
delay: '1 minute',
backoff: 'linear' // 1分钟、2分钟、3分钟、4分钟、5分钟
}
},
async () => {
// 轮询API
}
);指数间隔重试(大多数场景推荐):
typescript
await step.do(
'call rate-limited API',
{
retries: {
limit: 10,
delay: '10 seconds',
backoff: 'exponential' // 10秒、20秒、40秒、80秒、160秒...
},
timeout: '5 minutes'
},
async () => {
// API调用
}
);无限重试:
typescript
await step.do(
'critical operation',
{
retries: {
limit: Infinity, // 一直重试
delay: '1 minute',
backoff: 'exponential'
}
},
async () => {
// 必须最终成功的操作
}
);不重试:
typescript
await step.do(
'non-idempotent operation',
{
retries: {
limit: 0 // 出错时立即失败
}
},
async () => {
// 一次性操作
}
);Error Handling
错误处理
NonRetryableError
NonRetryableError
Force workflow to fail immediately without retrying:
typescript
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
import { NonRetryableError } from 'cloudflare:workflows';
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
await step.do('validate input', async () => {
if (!event.payload.userId) {
throw new NonRetryableError('userId is required');
}
// Validate user exists
const user = await this.env.DB.prepare(
'SELECT * FROM users WHERE id = ?'
).bind(event.payload.userId).first();
if (!user) {
// Terminal error - retrying won't help
throw new NonRetryableError('User not found');
}
return user;
});
}
}When to use NonRetryableError:
- ✅ Authentication/authorization failures
- ✅ Invalid input that won't change
- ✅ Resource doesn't exist (404)
- ✅ Validation errors
- ❌ Network failures (should retry)
- ❌ Rate limits (should retry with backoff)
- ❌ Temporary service outages (should retry)
强制工作流立即失败,不进行重试:
typescript
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
import { NonRetryableError } from 'cloudflare:workflows';
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
await step.do('validate input', async () => {
if (!event.payload.userId) {
throw new NonRetryableError('userId is required');
}
// 验证用户是否存在
const user = await this.env.DB.prepare(
'SELECT * FROM users WHERE id = ?'
).bind(event.payload.userId).first();
if (!user) {
// 终止错误 - 重试无意义
throw new NonRetryableError('User not found');
}
return user;
});
}
}何时使用NonRetryableError:
- ✅ 认证/授权失败
- ✅ 不会变更的无效输入
- ✅ 资源不存在(404)
- ✅ 验证错误
- ❌ 网络故障(应重试)
- ❌ 速率限制(应带间隔重试)
- ❌ 临时服务中断(应重试)
Catch Errors to Continue Workflow
捕获错误以继续工作流
Prevent entire workflow from failing by catching step errors:
typescript
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
// Critical step - workflow fails if this fails
await step.do('process payment', async () => {
// Payment processing
});
// Optional step - workflow continues even if it fails
try {
await step.do('send confirmation email', async () => {
// Email sending
});
} catch (error) {
console.log(`Email failed: ${error.message}`);
// Do cleanup or alternative action
await step.do('log email failure', async () => {
await this.env.DB.prepare(
'INSERT INTO failed_emails (user_id, error) VALUES (?, ?)'
).bind(event.payload.userId, error.message).run();
});
}
// Workflow continues
await step.do('update order status', async () => {
// Update status
});
}
}Pattern: Graceful degradation:
typescript
// Try primary service, fall back to secondary
let result;
try {
result = await step.do('call primary API', async () => {
return await callPrimaryAPI();
});
} catch (error) {
console.log('Primary API failed, trying backup');
result = await step.do('call backup API', async () => {
return await callBackupAPI();
});
}通过捕获步骤错误,避免整个工作流失败:
typescript
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
// 关键步骤 - 此步骤失败则工作流失败
await step.do('process payment', async () => {
// 支付处理
});
// 可选步骤 - 即使失败,工作流仍继续执行
try {
await step.do('send confirmation email', async () => {
// 发送邮件
});
} catch (error) {
console.log(`Email failed: ${error.message}`);
// 执行清理或替代操作
await step.do('log email failure', async () => {
await this.env.DB.prepare(
'INSERT INTO failed_emails (user_id, error) VALUES (?, ?)'
).bind(event.payload.userId, error.message).run();
});
}
// 工作流继续执行
await step.do('update order status', async () => {
// 更新状态
});
}
}模式:优雅降级:
typescript
// 尝试主服务,失败则回退到备用服务
let result;
try {
result = await step.do('call primary API', async () => {
return await callPrimaryAPI();
});
} catch (error) {
console.log('Primary API failed, trying backup');
result = await step.do('call backup API', async () => {
return await callBackupAPI();
});
}Triggering Workflows
触发工作流
From Workers
从Worker触发
Configure binding in wrangler.jsonc:
jsonc
{
"name": "trigger-worker",
"main": "src/index.ts",
"compatibility_date": "2025-10-22",
"workflows": [
{
"name": "my-workflow",
"binding": "MY_WORKFLOW",
"class_name": "MyWorkflow",
"script_name": "workflow-worker" // If workflow is in different Worker
}
]
}Trigger from Worker:
typescript
type Env = {
MY_WORKFLOW: Workflow;
};
export default {
async fetch(req: Request, env: Env): Promise<Response> {
// Create new workflow instance
const instance = await env.MY_WORKFLOW.create({
params: {
userId: '123',
email: 'user@example.com'
}
});
// Return instance ID
return Response.json({
id: instance.id,
status: await instance.status()
});
}
};在wrangler.jsonc中配置绑定:
jsonc
{
"name": "trigger-worker",
"main": "src/index.ts",
"compatibility_date": "2025-10-22",
"workflows": [
{
"name": "my-workflow",
"binding": "MY_WORKFLOW",
"class_name": "MyWorkflow",
"script_name": "workflow-worker" // 如果工作流在其他Worker中
}
]
}从Worker触发:
typescript
type Env = {
MY_WORKFLOW: Workflow;
};
export default {
async fetch(req: Request, env: Env): Promise<Response> {
// 创建新的工作流实例
const instance = await env.MY_WORKFLOW.create({
params: {
userId: '123',
email: 'user@example.com'
}
});
// 返回实例ID
return Response.json({
id: instance.id,
status: await instance.status()
});
}
};Get Instance Status
获取实例状态
typescript
// Get instance by ID
const instance = await env.MY_WORKFLOW.get(instanceId);
// Get status
const status = await instance.status();
console.log(status);
// {
// status: 'running' | 'complete' | 'errored' | 'queued' | 'unknown',
// error: string | null,
// output: any // Return value from run() if complete
// }typescript
// 通过ID获取实例
const instance = await env.MY_WORKFLOW.get(instanceId);
// 获取状态
const status = await instance.status();
console.log(status);
// {
// status: 'running' | 'complete' | 'errored' | 'queued' | 'unknown',
// error: string | null,
// output: any // 如果已完成,为run()的返回值
// }Send Events to Running Instance
向运行中的实例发送事件
typescript
// Get instance
const instance = await env.MY_WORKFLOW.get(instanceId);
// Send event (will be received by step.waitForEvent)
await instance.sendEvent({
type: 'user-action',
payload: { action: 'approved' }
});typescript
// 获取实例
const instance = await env.MY_WORKFLOW.get(instanceId);
// 发送事件(将被step.waitForEvent接收)
await instance.sendEvent({
type: 'user-action',
payload: { action: 'approved' }
});Pause and Resume
暂停与恢复
typescript
// Pause instance
await instance.pause();
// Resume instance
await instance.resume();
// Terminate instance
await instance.terminate();typescript
// 暂停实例
await instance.pause();
// 恢复实例
await instance.resume();
// 终止实例
await instance.terminate();Workflow Patterns
工作流模式
Pattern 1: Long-Running Process
模式1:长期运行的流程
typescript
export class VideoProcessingWorkflow extends WorkflowEntrypoint<Env, VideoParams> {
async run(event: WorkflowEvent<VideoParams>, step: WorkflowStep) {
const { videoId } = event.payload;
// Step 1: Upload to processing service
const uploadResult = await step.do('upload video', async () => {
const video = await this.env.MY_BUCKET.get(`videos/${videoId}`);
const response = await fetch('https://processor.example.com/upload', {
method: 'POST',
body: video?.body
});
return await response.json();
});
// Step 2: Wait for processing (could take hours)
await step.sleep('wait for initial processing', '10 minutes');
// Step 3: Poll for completion
let processed = false;
let attempts = 0;
while (!processed && attempts < 20) {
const status = await step.do(`check status attempt ${attempts}`, async () => {
const response = await fetch(
`https://processor.example.com/status/${uploadResult.jobId}`
);
return await response.json();
});
if (status.complete) {
processed = true;
} else {
attempts++;
await step.sleep(`wait before retry ${attempts}`, '5 minutes');
}
}
// Step 4: Download processed video
await step.do('download processed video', async () => {
const response = await fetch(uploadResult.downloadUrl);
const processed = await response.blob();
await this.env.MY_BUCKET.put(`processed/${videoId}`, processed);
});
return { videoId, status: 'complete' };
}
}typescript
export class VideoProcessingWorkflow extends WorkflowEntrypoint<Env, VideoParams> {
async run(event: WorkflowEvent<VideoParams>, step: WorkflowStep) {
const { videoId } = event.payload;
// 步骤1:上传到处理服务
const uploadResult = await step.do('upload video', async () => {
const video = await this.env.MY_BUCKET.get(`videos/${videoId}`);
const response = await fetch('https://processor.example.com/upload', {
method: 'POST',
body: video?.body
});
return await response.json();
});
// 步骤2:等待初始处理(可能需要数小时)
await step.sleep('wait for initial processing', '10 minutes');
// 步骤3:轮询处理完成状态
let processed = false;
let attempts = 0;
while (!processed && attempts < 20) {
const status = await step.do(`check status attempt ${attempts}`, async () => {
const response = await fetch(
`https://processor.example.com/status/${uploadResult.jobId}`
);
return await response.json();
});
if (status.complete) {
processed = true;
} else {
attempts++;
await step.sleep(`wait before retry ${attempts}`, '5 minutes');
}
}
// 步骤4:下载处理后的视频
await step.do('download processed video', async () => {
const response = await fetch(uploadResult.downloadUrl);
const processed = await response.blob();
await this.env.MY_BUCKET.put(`processed/${videoId}`, processed);
});
return { videoId, status: 'complete' };
}
}Pattern 2: Event-Driven Approval Flow
模式2:事件驱动的审批流程
typescript
export class ApprovalWorkflow extends WorkflowEntrypoint<Env, ApprovalParams> {
async run(event: WorkflowEvent<ApprovalParams>, step: WorkflowStep) {
const { requestId, requesterId } = event.payload;
// Step 1: Create approval request
await step.do('create approval request', async () => {
await this.env.DB.prepare(
'INSERT INTO approvals (id, requester_id, status) VALUES (?, ?, ?)'
).bind(requestId, requesterId, 'pending').run();
});
// Step 2: Send notification to approvers
await step.do('notify approvers', async () => {
await sendNotification(requestId);
});
// Step 3: Wait for approval (max 7 days)
let approvalEvent;
try {
approvalEvent = await step.waitForEvent<ApprovalEvent>(
'wait for approval decision',
{ type: 'approval-decision', timeout: '7 days' }
);
} catch (error) {
// Timeout - auto-reject
await step.do('auto-reject due to timeout', async () => {
await this.env.DB.prepare(
'UPDATE approvals SET status = ? WHERE id = ?'
).bind('rejected', requestId).run();
});
return { requestId, status: 'rejected', reason: 'timeout' };
}
// Step 4: Process decision
await step.do('process approval decision', async () => {
await this.env.DB.prepare(
'UPDATE approvals SET status = ?, approver_id = ? WHERE id = ?'
).bind(approvalEvent.approved ? 'approved' : 'rejected', approvalEvent.approverId, requestId).run();
});
// Step 5: Execute approved action (if approved)
if (approvalEvent.approved) {
await step.do('execute approved action', async () => {
// Execute the action
});
}
return { requestId, status: approvalEvent.approved ? 'approved' : 'rejected' };
}
}typescript
export class ApprovalWorkflow extends WorkflowEntrypoint<Env, ApprovalParams> {
async run(event: WorkflowEvent<ApprovalParams>, step: WorkflowStep) {
const { requestId, requesterId } = event.payload;
// 步骤1:创建审批请求
await step.do('create approval request', async () => {
await this.env.DB.prepare(
'INSERT INTO approvals (id, requester_id, status) VALUES (?, ?, ?)'
).bind(requestId, requesterId, 'pending').run();
});
// 步骤2:向审批人发送通知
await step.do('notify approvers', async () => {
await sendNotification(requestId);
});
// 步骤3:等待审批(最长7天)
let approvalEvent;
try {
approvalEvent = await step.waitForEvent<ApprovalEvent>(
'wait for approval decision',
{ type: 'approval-decision', timeout: '7 days' }
);
} catch (error) {
// 超时 - 自动拒绝
await step.do('auto-reject due to timeout', async () => {
await this.env.DB.prepare(
'UPDATE approvals SET status = ? WHERE id = ?'
).bind('rejected', requestId).run();
});
return { requestId, status: 'rejected', reason: 'timeout' };
}
// 步骤4:处理审批结果
await step.do('process approval decision', async () => {
await this.env.DB.prepare(
'UPDATE approvals SET status = ?, approver_id = ? WHERE id = ?'
).bind(approvalEvent.approved ? 'approved' : 'rejected', approvalEvent.approverId, requestId).run();
});
// 步骤5:如果审批通过,执行对应操作
if (approvalEvent.approved) {
await step.do('execute approved action', async () => {
// 执行操作
});
}
return { requestId, status: approvalEvent.approved ? 'approved' : 'rejected' };
}
}Pattern 3: Scheduled Workflow
模式3:定时工作流
typescript
export class DailyReportWorkflow extends WorkflowEntrypoint<Env, ReportParams> {
async run(event: WorkflowEvent<ReportParams>, step: WorkflowStep) {
// Calculate next 9am UTC
const now = new Date();
const tomorrow9am = new Date();
tomorrow9am.setUTCDate(tomorrow9am.getUTCDate() + 1);
tomorrow9am.setUTCHours(9, 0, 0, 0);
// Sleep until tomorrow 9am
await step.sleepUntil('wait until 9am tomorrow', tomorrow9am);
// Generate report
const report = await step.do('generate daily report', async () => {
const results = await this.env.DB.prepare(
'SELECT * FROM metrics WHERE date = ?'
).bind(now.toISOString().split('T')[0]).all();
return {
date: now.toISOString().split('T')[0],
metrics: results.results
};
});
// Send report
await step.do('send report', async () => {
await sendEmail({
to: event.payload.recipients,
subject: `Daily Report - ${report.date}`,
body: formatReport(report.metrics)
});
});
return { sent: true, date: report.date };
}
}typescript
export class DailyReportWorkflow extends WorkflowEntrypoint<Env, ReportParams> {
async run(event: WorkflowEvent<ReportParams>, step: WorkflowStep) {
// 计算下一个UTC时间9点
const now = new Date();
const tomorrow9am = new Date();
tomorrow9am.setUTCDate(tomorrow9am.getUTCDate() + 1);
tomorrow9am.setUTCHours(9, 0, 0, 0);
// 等待到次日9点
await step.sleepUntil('wait until 9am tomorrow', tomorrow9am);
// 生成报告
const report = await step.do('generate daily report', async () => {
const results = await this.env.DB.prepare(
'SELECT * FROM metrics WHERE date = ?'
).bind(now.toISOString().split('T')[0]).all();
return {
date: now.toISOString().split('T')[0],
metrics: results.results
};
});
// 发送报告
await step.do('send report', async () => {
await sendEmail({
to: event.payload.recipients,
subject: `Daily Report - ${report.date}`,
body: formatReport(report.metrics)
});
});
return { sent: true, date: report.date };
}
}Pattern 4: Workflow Chaining
模式4:工作流链式调用
typescript
export class OrderWorkflow extends WorkflowEntrypoint<Env, OrderParams> {
async run(event: WorkflowEvent<OrderParams>, step: WorkflowStep) {
const { orderId } = event.payload;
// Step 1: Process payment
const paymentResult = await step.do('process payment', async () => {
return await processPayment(orderId);
});
// Step 2: Trigger fulfillment workflow
const fulfillmentInstance = await step.do('start fulfillment', async () => {
return await this.env.FULFILLMENT_WORKFLOW.create({
params: {
orderId,
paymentId: paymentResult.id
}
});
});
// Step 3: Wait for fulfillment to complete
await step.sleep('wait for fulfillment', '5 minutes');
// Step 4: Check fulfillment status
const fulfillmentStatus = await step.do('check fulfillment', async () => {
const instance = await this.env.FULFILLMENT_WORKFLOW.get(fulfillmentInstance.id);
return await instance.status();
});
if (fulfillmentStatus.status === 'complete') {
// Step 5: Send confirmation
await step.do('send order confirmation', async () => {
await sendConfirmation(orderId);
});
}
return { orderId, status: 'complete' };
}
}typescript
export class OrderWorkflow extends WorkflowEntrypoint<Env, OrderParams> {
async run(event: WorkflowEvent<OrderParams>, step: WorkflowStep) {
const { orderId } = event.payload;
// 步骤1:处理支付
const paymentResult = await step.do('process payment', async () => {
return await processPayment(orderId);
});
// 步骤2:触发履行工作流
const fulfillmentInstance = await step.do('start fulfillment', async () => {
return await this.env.FULFILLMENT_WORKFLOW.create({
params: {
orderId,
paymentId: paymentResult.id
}
});
});
// 步骤3:等待履行完成
await step.sleep('wait for fulfillment', '5 minutes');
// 步骤4:检查履行状态
const fulfillmentStatus = await step.do('check fulfillment', async () => {
const instance = await this.env.FULFILLMENT_WORKFLOW.get(fulfillmentInstance.id);
return await instance.status();
});
if (fulfillmentStatus.status === 'complete') {
// 步骤5:发送确认邮件
await step.do('send order confirmation', async () => {
await sendConfirmation(orderId);
});
}
return { orderId, status: 'complete' };
}
}Wrangler Commands
Wrangler命令
List Workflow Instances
列出工作流实例
bash
undefinedbash
undefinedList all instances of a workflow
列出某个工作流的所有实例
npx wrangler workflows instances list my-workflow
npx wrangler workflows instances list my-workflow
Filter by status
按状态过滤
npx wrangler workflows instances list my-workflow --status running
npx wrangler workflows instances list my-workflow --status complete
npx wrangler workflows instances list my-workflow --status errored
undefinednpx wrangler workflows instances list my-workflow --status running
npx wrangler workflows instances list my-workflow --status complete
npx wrangler workflows instances list my-workflow --status errored
undefinedDescribe Instance
查看实例详情
bash
undefinedbash
undefinedGet detailed info about specific instance
获取特定实例的详细信息
npx wrangler workflows instances describe my-workflow <instance-id>
npx wrangler workflows instances describe my-workflow <instance-id>
Output shows:
输出内容包括:
- Current status (running/complete/errored)
- 当前状态(running/complete/errored)
- Each step with start/end times
每个步骤的开始/结束时间
- Step outputs
- 步骤输出
- Retry history
- 重试历史
- Any errors
- 错误信息
- Sleep state (if sleeping)
- 睡眠状态(如果正在睡眠)
undefinedundefinedTrigger Workflow (Development)
触发工作流(开发环境)
bash
undefinedbash
undefinedDeploy workflow
部署工作流
npx wrangler deploy
npx wrangler deploy
Trigger via HTTP (if Worker is set up to trigger)
通过HTTP触发(如果Worker已配置触发逻辑)
curl https://my-workflow.<subdomain>.workers.dev/
---curl https://my-workflow.<subdomain>.workers.dev/
---State Persistence
状态持久化
What Can Be Persisted
可持久化的内容
Workflows automatically persist state returned from :
step.do()✅ Serializable Types:
- Primitives: ,
string,number,booleannull - Arrays: ,
[1, 2, 3]['a', 'b', 'c'] - Objects: ,
{ key: 'value' }{ nested: { data: true } } - Nested structures:
{ users: [{ id: 1, name: 'Alice' }] }
❌ Non-Serializable Types:
- Functions:
() => {} - Symbols:
Symbol('key') - Circular references:
const obj = {}; obj.self = obj; - undefined (use null instead)
- Class instances (serialize to plain objects)
Example - Correct Serialization:
typescript
// ✅ Good - all values serializable
const result = await step.do('fetch data', async () => {
return {
users: [
{ id: 1, name: 'Alice', active: true },
{ id: 2, name: 'Bob', active: false }
],
timestamp: Date.now(),
metadata: null
};
});
// ❌ Bad - contains function
const bad = await step.do('bad example', async () => {
return {
data: [1, 2, 3],
transform: (x) => x * 2 // ❌ Function not serializable
};
});
// This will throw an error!Workflows会自动持久化返回的状态:
step.do()✅ 可序列化类型:
- 基本类型:、
string、number、booleannull - 数组:、
[1, 2, 3]['a', 'b', 'c'] - 对象:、
{ key: 'value' }{ nested: { data: true } } - 嵌套结构:
{ users: [{ id: 1, name: 'Alice' }] }
❌ 不可序列化类型:
- 函数:
() => {} - Symbol:
Symbol('key') - 循环引用:
const obj = {}; obj.self = obj; - undefined(请使用null替代)
- 类实例(需序列化为普通对象)
示例 - 正确的序列化:
typescript
// ✅ 正确 - 所有值均可序列化
const result = await step.do('fetch data', async () => {
return {
users: [
{ id: 1, name: 'Alice', active: true },
{ id: 2, name: 'Bob', active: false }
],
timestamp: Date.now(),
metadata: null
};
});
// ❌ 错误 - 包含函数
const bad = await step.do('bad example', async () => {
return {
data: [1, 2, 3],
transform: (x) => x * 2 // ❌ 函数不可序列化
};
});
// 此代码会抛出错误!Access State Across Steps
跨步骤访问状态
typescript
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
// Step 1: Get data
const userData = await step.do('fetch user', async () => {
return { id: 123, email: 'user@example.com' };
});
// Step 2: Use data from step 1
const orderData = await step.do('create order', async () => {
return {
userId: userData.id, // ✅ Access previous step's data
userEmail: userData.email,
orderId: 'ORD-456'
};
});
// Step 3: Use data from step 1 and 2
await step.do('send confirmation', async () => {
await sendEmail({
to: userData.email, // ✅ Still accessible
subject: `Order ${orderData.orderId} confirmed`
});
});
}
}typescript
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
// 步骤1:获取数据
const userData = await step.do('fetch user', async () => {
return { id: 123, email: 'user@example.com' };
});
// 步骤2:使用步骤1的数据
const orderData = await step.do('create order', async () => {
return {
userId: userData.id, // ✅ 访问前一步的数据
userEmail: userData.email,
orderId: 'ORD-456'
};
});
// 步骤3:使用步骤1和步骤2的数据
await step.do('send confirmation', async () => {
await sendEmail({
to: userData.email, // ✅ 仍可访问
subject: `Order ${orderData.orderId} confirmed`
});
});
}
}Observability
可观测性
Built-in Metrics
内置指标
Workflows automatically track:
- Instance status: queued, running, complete, errored, paused
- Step execution: start/end times, duration, success/failure
- Retry history: attempts, errors, delays
- Sleep state: when workflow will wake up
- Output: return values from steps and run()
Workflows会自动跟踪以下指标:
- 实例状态:排队中、运行中、已完成、已出错、已暂停
- 步骤执行:开始/结束时间、持续时长、成功/失败
- 重试历史:尝试次数、错误信息、延迟时间
- 睡眠状态:工作流唤醒时间
- 输出:步骤和run()方法的返回值
View Metrics in Dashboard
在控制台查看指标
Access via Cloudflare dashboard:
- Workers & Pages
- Select your workflow
- View instances and metrics
Metrics include:
- Total instances created
- Success/error rates
- Average execution time
- Step-level performance
通过Cloudflare控制台访问:
- 进入Workers & Pages
- 选择你的工作流
- 查看实例和指标
指标包括:
- 创建的实例总数
- 成功率/错误率
- 平均执行时间
- 步骤级性能
Programmatic Access
程序化访问
typescript
// Get instance status
const instance = await env.MY_WORKFLOW.get(instanceId);
const status = await instance.status();
console.log(status);
// {
// status: 'complete',
// error: null,
// output: { userId: '123', status: 'processed' }
// }typescript
// 获取实例状态
const instance = await env.MY_WORKFLOW.get(instanceId);
const status = await instance.status();
console.log(status);
// {
// status: 'complete',
// error: null,
// output: { userId: '123', status: 'processed' }
// }Limits
限制
| Feature | Limit |
|---|---|
| Max workflow duration | 30 days |
| Max steps per workflow | 10,000 |
| Max sleep/sleepUntil duration | 30 days |
| Max step timeout | 15 minutes |
| Max concurrent instances | Unlimited (autoscales) |
| Max payload size | 128 KB |
| Max step output size | 128 KB |
| Max waitForEvent timeout | 30 days |
| Max retry limit | Infinity (configurable) |
Notes:
- and
step.sleep()do NOT count toward 10,000 step limitstep.sleepUntil() - Workflows can run for up to 30 days total
- Each step execution limited to 15 minutes max
- Retries count as separate attempts, not separate steps
| 功能 | 限制 |
|---|---|
| 工作流最长持续时间 | 30天 |
| 每个工作流的最大步骤数 | 10,000 |
| sleep/sleepUntil最长持续时间 | 30天 |
| 步骤最长超时时间 | 15分钟 |
| 最大并发实例数 | 无限制(自动扩缩容) |
| 最大负载大小 | 128 KB |
| 步骤最大输出大小 | 128 KB |
| waitForEvent最长超时时间 | 30天 |
| 最大重试次数 | Infinity(可配置) |
注意:
- 和
step.sleep()不计入10,000步的限制step.sleepUntil() - 工作流总运行时长最长为30天
- 每个步骤的单次执行最长为15分钟
- 重试会被计为单独的尝试,而非单独的步骤
Pricing
定价
Requires Workers Paid plan ($5/month)
Workflow Executions:
- First 10,000,000 step executions/month: FREE
- After that: $0.30 per million step executions
What counts as a step execution:
- Each call
step.do() - Each retry of a step
- ,
step.sleep(),step.sleepUntil()do NOT countstep.waitForEvent()
Cost examples:
- Workflow with 5 steps, no retries: 5 step executions
- Workflow with 3 steps, 1 step retries 2 times: 5 step executions (3 + 2)
- 10M simple workflows/month (5 steps each): ((50M - 10M) / 1M) × $0.30 = $12/month
需要Workers付费计划(每月5美元)
工作流执行:
- 每月前10,000,000次步骤执行:免费
- 超出部分:每百万次步骤执行0.30美元
什么会被计为步骤执行:
- 每次调用
step.do() - 步骤的每次重试
- 、
step.sleep()、step.sleepUntil()不计入step.waitForEvent()
成本示例:
- 包含5个步骤、无重试的工作流:5次步骤执行
- 包含3个步骤、其中1个步骤重试2次的工作流:5次步骤执行(3+2)
- 每月1000万个简单工作流(每个5步):((50,000,000 - 10,000,000) / 1,000,000) × 0.30 = 每月12美元
Always Do ✅
最佳实践 ✅
- Use descriptive step names - "fetch user data", not "step 1"
- Return serializable values only - primitives, arrays, plain objects
- Use NonRetryableError for terminal errors - auth failures, invalid input
- Configure retry limits - avoid infinite retries unless necessary
- Catch errors for optional steps - use try-catch if step can fail gracefully
- Use exponential backoff for retries - default backoff for most cases
- Validate inputs early - fail fast with NonRetryableError if invalid
- Store workflow instance IDs - save to DB/KV to query status later
- Use waitForEvent for human-in-loop - approvals, external confirmations
- Monitor workflow metrics - track success rates and errors
- 使用描述性的步骤名称 - 比如“获取用户数据”,而非“步骤1”
- 仅返回可序列化的值 - 基本类型、数组、普通对象
- 对终止错误使用NonRetryableError - 认证失败、无效输入等
- 配置重试限制 - 除非必要,避免无限重试
- 为可选步骤添加错误捕获 - 使用try-catch让步骤可优雅失败
- 重试使用指数间隔 - 大多数场景的默认推荐
- 尽早验证输入 - 如果输入无效,使用NonRetryableError快速失败
- 存储工作流实例ID - 保存到数据库/KV中,以便后续查询状态
- 对人工参与的流程使用waitForEvent - 审批、外部确认等场景
- 监控工作流指标 - 跟踪成功率和错误率
Never Do ❌
禁忌 ❌
- Never return functions from steps - will throw serialization error
- Never create circular references - will fail to serialize
- Never assume steps execute immediately - they may retry or sleep
- Never use blocking operations - use step.do() for async work
- Never exceed 128 KB payload/output - will fail
- Never retry non-idempotent operations infinitely - use retry limits
- Never ignore serialization errors - fix the data structure
- Never use workflows for real-time operations - use Durable Objects instead
- Never skip error handling for critical steps - wrap in try-catch or use NonRetryableError
- Never assume step order is guaranteed across retries - each step is independent
- 永远不要从步骤中返回函数 - 会抛出序列化错误
- 永远不要创建循环引用 - 会序列化失败
- 永远不要假设步骤会立即执行 - 步骤可能会重试或进入睡眠
- 永远不要使用阻塞操作 - 异步任务请使用step.do()
- 永远不要超出128 KB的负载/输出限制 - 会导致失败
- 永远不要对非幂等操作无限重试 - 请设置重试限制
- 永远不要忽略序列化错误 - 修复数据结构
- 永远不要将工作流用于实时操作 - 请使用Durable Objects
- 永远不要忽略关键步骤的错误处理 - 使用try-catch或NonRetryableError
- 永远不要假设重试后步骤顺序不变 - 每个步骤都是独立的
Troubleshooting
故障排查
Issue: "Cannot perform I/O on behalf of a different request"
问题:"Cannot perform I/O on behalf of a different request"
Cause: Trying to use I/O objects created in one request context from another request handler
Solution: Always perform I/O within callbacks
step.do()typescript
// ❌ Bad - I/O outside step
const response = await fetch('https://api.example.com/data');
const data = await response.json();
await step.do('use data', async () => {
// Using data from outside step's I/O context
return data; // This will fail!
});
// ✅ Good - I/O inside step
const data = await step.do('fetch data', async () => {
const response = await fetch('https://api.example.com/data');
return await response.json(); // ✅ Correct
});原因: 尝试在一个请求上下文中创建的I/O对象,在另一个请求处理器中使用
解决方案: 始终在的回调中执行I/O操作
step.do()typescript
// ❌ 错误 - I/O操作在步骤外
const response = await fetch('https://api.example.com/data');
const data = await response.json();
await step.do('use data', async () => {
// 使用步骤外I/O操作获取的数据
return data; // 此代码会失败!
});
// ✅ 正确 - I/O操作在步骤内
const data = await step.do('fetch data', async () => {
const response = await fetch('https://api.example.com/data');
return await response.json(); // ✅ 正确
});Issue: NonRetryableError behaves differently in dev vs production
问题:NonRetryableError在开发环境和生产环境表现不同
Known Issue: Throwing NonRetryableError with empty message in dev mode causes retries, but works correctly in production
Workaround: Always provide a message to NonRetryableError
typescript
// ❌ May retry in dev
throw new NonRetryableError();
// ✅ Works consistently
throw new NonRetryableError('User not found');Source: workers-sdk#10113
已知问题: 在开发环境中,抛出不带消息的NonRetryableError会导致重试,但在生产环境中表现正常
解决方法: 始终为NonRetryableError提供错误消息
typescript
// ❌ 在开发环境中可能会重试
throw new NonRetryableError();
// ✅ 在所有环境中表现一致
throw new NonRetryableError('User not found');Issue: "The requested module 'cloudflare:workers' does not provide an export named 'WorkflowEvent'"
问题:"The requested module 'cloudflare:workers' does not provide an export named 'WorkflowEvent'"
Cause: Incorrect import or outdated @cloudflare/workers-types
Solution:
bash
undefined原因: 导入错误或@cloudflare/workers-types版本过时
解决方案:
bash
undefinedUpdate types
更新类型定义
npm install -D @cloudflare/workers-types@latest
npm install -D @cloudflare/workers-types@latest
Ensure correct import
确保导入正确
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
import { NonRetryableError } from 'cloudflare:workflows';
---import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
import { NonRetryableError } from 'cloudflare:workflows';
---Issue: Step returns undefined instead of expected value
问题:步骤返回undefined而非预期值
Cause: Step callback doesn't return a value
Solution: Always return from step callbacks
typescript
// ❌ Bad - no return
const result = await step.do('get data', async () => {
const data = await fetchData();
// Missing return!
});
console.log(result); // undefined
// ✅ Good - explicit return
const result = await step.do('get data', async () => {
const data = await fetchData();
return data; // ✅
});原因: 步骤的回调函数没有返回值
解决方案: 确保步骤回调函数有返回值
typescript
// ❌ 错误 - 没有返回值
const result = await step.do('get data', async () => {
const data = await fetchData();
// 缺少return!
});
console.log(result); // undefined
// ✅ 正确 - 显式返回
const result = await step.do('get data', async () => {
const data = await fetchData();
return data; // ✅
});Issue: Workflow instance stuck in "running" state
问题:工作流实例一直处于"running"状态
Possible causes:
- Step is sleeping for long duration
- Step is waiting for event that never arrives
- Step is retrying with long backoff
Solution:
bash
undefined可能原因:
- 步骤正在长时间睡眠
- 步骤在等待永远不会到达的事件
- 步骤在使用长间隔重试
解决方案:
bash
undefinedCheck instance details
查看实例详情
npx wrangler workflows instances describe my-workflow <instance-id>
npx wrangler workflows instances describe my-workflow <instance-id>
Look for:
重点查看:
- Sleep state (will show wake time)
- 睡眠状态(会显示唤醒时间)
- Waiting for event (will show event type and timeout)
- 等待的事件(会显示事件类型和超时时间)
- Retry history (will show attempts and delays)
- 重试历史(会显示尝试次数和延迟)
---
---Production Checklist
生产环境检查清单
Before deploying workflows to production:
- All steps have descriptive names
- Retry limits configured for all steps
- NonRetryableError used for terminal errors
- Critical steps have error handling
- Optional steps wrapped in try-catch
- No non-serializable values returned
- Payload sizes under 128 KB
- Workflow duration under 30 days
- Instance IDs stored for status queries
- Monitoring and alerting configured
- waitForEvent timeouts configured
- Tested in development environment
- Tested retry behavior
- Tested error scenarios
在将工作流部署到生产环境前,请确认:
- 所有步骤都有描述性名称
- 所有步骤都配置了重试限制
- 对终止错误使用了NonRetryableError
- 关键步骤有错误处理
- 可选步骤被包裹在try-catch中
- 没有返回不可序列化的值
- 负载大小不超过128 KB
- 工作流持续时长不超过30天
- 实例ID已存储,可用于查询状态
- 已配置监控和告警
- 已配置waitForEvent的超时时间
- 已在开发环境中测试
- 已测试重试行为
- 已测试错误场景
Related Documentation
相关文档
- Cloudflare Workflows Docs
- Get Started Guide
- Workers API
- Sleeping and Retrying
- Events and Parameters
- Limits
- Pricing
- Changelog
Last Updated: 2025-10-22
Version: 1.0.0
Maintainer: Jeremy Dawes | jeremy@jezweb.net
最后更新:2025-10-22
版本:1.0.0
维护者:Jeremy Dawes | jeremy@jezweb.net