Loading...
Loading...
Build durable workflows with Cloudflare Workflows (GA April 2025). Features step.do, step.sleep, waitForEvent, Vitest testing, automatic retries, and state persistence for long-running tasks. Prevents 12 documented errors. Use when: creating workflows, implementing retries, or troubleshooting NonRetryableError, I/O context, serialization errors, waitForEvent timeouts, getPlatformProxy failures.
npx skill4agent add jezweb/claude-skills cloudflare-workflows# 1. Scaffold project
npm create cloudflare@latest my-workflow -- --template cloudflare/workflows-starter --git --deploy false
cd my-workflow
# 2. Configure wrangler.jsonc
{
"name": "my-workflow",
"main": "src/index.ts",
"compatibility_date": "2025-11-25",
"workflows": [{
"name": "my-workflow",
"binding": "MY_WORKFLOW",
"class_name": "MyWorkflow"
}]
}
# 3. Create workflow (src/index.ts)
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
const result = await step.do('process', async () => { /* work */ });
await step.sleep('wait', '1 hour');
await step.do('continue', async () => { /* more work */ });
}
}
# 4. Deploy and test
npm run deploy
npx wrangler workflows instances list my-workflowWorkflowEntrypointrun()stepwaitForEvent()waitForEvent()wrangler devwaitForEvent()export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
for (let i = 0; i < 3; i++) {
try {
const evt = await step.waitForEvent(`wait-${i}`, {
type: 'user-action',
timeout: '5 seconds'
});
console.log(`Iteration ${i}: Received event`);
} catch {
console.log(`Iteration ${i}: Timeout`);
}
}
}
}
// In wrangler dev:
// - Iteration 1: ✅ receives event
// - Iteration 2: ⏱️ times out (expected)
// - Iteration 3: ❌ does not receive event (BUG - event is sent but ignored)MiniflareCoreError [ERR_RUNTIME_FAILURE]: The Workers runtime failed to startgetPlatformProxy()wranglergetPlatformProxy()wrangler.cli.jsonc// Workaround: Separate config for CLI scripts
// wrangler.cli.jsonc (no workflows)
{
"name": "my-worker",
"main": "src/index.ts",
"compatibility_date": "2025-01-20"
// workflows commented out
}
// Use in script:
import { getPlatformProxy } from 'wrangler';
const { env } = await getPlatformProxy({ configPath: './wrangler.cli.jsonc' });instance.not_foundwrangler devworkflow.create()ctx.waitUntil()export default {
async fetch(req: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
const workflow = await env.MY_WORKFLOW.create({ params: { userId: '123' } });
// ✅ Ensure workflow initialization completes
ctx.waitUntil(workflow.status());
return Response.redirect('/dashboard', 302);
}
};[vitest-worker]: Timeout calling "resolveId"@cloudflare/vitest-pool-workerstestTimeoutexport default defineWorkersConfig({
test: {
testTimeout: 60_000 // Default: 5000ms
}
});isolatedStorage: falseError: Not implemented yetinstance.restart()instance.terminate()wrangler devrunningconst instance = await env.MY_WORKFLOW.get(instanceId);
// ❌ Fails in wrangler dev
await instance.restart(); // Error: Not implemented yet
await instance.terminate(); // Error: Not implemented yet
// ✅ Works in production"Cannot perform I/O on behalf of a different request"step.do()// ❌ Bad - I/O outside step
const response = await fetch('https://api.example.com/data');
const data = await response.json();
await step.do('use data', async () => {
return data; // This will fail!
});
// ✅ Good - I/O inside step
const data = await step.do('fetch data', async () => {
const response = await fetch('https://api.example.com/data');
return await response.json();
});// ❌ Retries in dev, exits in prod
throw new NonRetryableError('');
// ✅ Exits in both environments
throw new NonRetryableError('Validation failed');step.do()step.do()// ❌ BAD - In-memory variable lost on hibernation
let counter = 0;
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
counter = await step.do('increment', async () => counter + 1);
await step.sleep('wait', '1 hour'); // ← Hibernates here, in-memory state lost
console.log(counter); // ❌ Will be 0, not 1!
}
}
// ✅ GOOD - State from step.do() return values persists
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
const counter = await step.do('increment', async () => 1);
await step.sleep('wait', '1 hour');
console.log(counter); // ✅ Still 1
}
}Date.now()Math.random()// ❌ BAD - Non-deterministic step name
await step.do(`fetch-data-${Date.now()}`, async () => {
return await fetchExpensiveData();
});
// Every execution creates new cache key → step always re-runs
// ✅ GOOD - Deterministic step name
await step.do('fetch-data', async () => {
return await fetchExpensiveData();
});
// Same cache key → result reused on restart/retrystep.do()// ❌ BAD - Race outside step
const fastest = await Promise.race([fetchA(), fetchB()]);
await step.do('use result', async () => fastest);
// On restart: race runs again, different promise might win
// ✅ GOOD - Race inside step
const fastest = await step.do('fetch fastest', async () => {
return await Promise.race([fetchA(), fetchB()]);
});
// On restart: cached result used, consistent behaviorstep.do()step.do()// ❌ BAD - Side effect outside step
console.log('Workflow started'); // ← Logs multiple times on restart
await step.do('work', async () => { /* work */ });
// ✅ GOOD - Side effects inside step
await step.do('log start', async () => {
console.log('Workflow started'); // ← Logs once (cached)
});// ❌ BAD - Charge customer without check
await step.do('charge', async () => {
return await stripe.charges.create({ amount: 1000, customer: customerId });
});
// If step times out after charge succeeds, retry charges AGAIN!
// ✅ GOOD - Check for existing charge first
await step.do('charge', async () => {
const existing = await stripe.charges.list({ customer: customerId, limit: 1 });
if (existing.data.length > 0) return existing.data[0]; // Idempotent
return await stripe.charges.create({ amount: 1000, customer: customerId });
});step.do<T>(name: string, config?: WorkflowStepConfig, callback: () => Promise<T>): Promise<T>nameconfigcallbackconst result = await step.do('call API', { retries: { limit: 10, delay: '10s', backoff: 'exponential' }, timeout: '5 min' }, async () => {
return await fetch('https://api.example.com/data').then(r => r.json());
});step.sleep(name: string, duration: WorkflowDuration): Promise<void>nameduration"second""minute""hour""day""week""month""year"await step.sleep('wait 5 minutes', '5 minutes');
await step.sleep('wait 1 hour', '1 hour');
await step.sleep('wait 2 days', '2 days');
await step.sleep('wait 30 seconds', 30000); // millisecondsstep.sleepUntil(name: string, timestamp: Date | number): Promise<void>nametimestampawait step.sleepUntil('wait for launch', new Date('2025-12-25T00:00:00Z'));
await step.sleepUntil('wait until time', Date.parse('24 Oct 2024 13:00:00 UTC'));step.waitForEvent<T>(name: string, options: { type: string; timeout?: string | number }): Promise<T>nameoptions.typeoptions.timeoutinstance.sendEvent()export class PaymentWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
await step.do('create payment', async () => { /* Stripe API */ });
const webhookData = await step.waitForEvent<StripeWebhook>(
'wait for payment confirmation',
{ type: 'stripe-webhook', timeout: '1 hour' }
);
if (webhookData.status === 'succeeded') {
await step.do('fulfill order', async () => { /* fulfill */ });
}
}
}
// Worker sends event to workflow
export default {
async fetch(req: Request, env: Env): Promise<Response> {
if (req.url.includes('/webhook/stripe')) {
const instance = await env.PAYMENT_WORKFLOW.get(instanceId);
await instance.sendEvent({ type: 'stripe-webhook', payload: await req.json() });
return new Response('OK');
}
}
};try {
const event = await step.waitForEvent('wait for user', { type: 'user-submitted', timeout: '10 minutes' });
} catch (error) {
await step.do('send reminder', async () => { /* reminder */ });
}interface WorkflowStepConfig {
retries?: {
limit: number; // Max attempts (Infinity allowed)
delay: string | number; // Delay between retries
backoff?: 'constant' | 'linear' | 'exponential';
};
timeout?: string | number; // Max time per attempt
}{ retries: { limit: 5, delay: 10000, backoff: 'exponential' }, timeout: '10 minutes' }// Constant: 30s, 30s, 30s
{ retries: { limit: 3, delay: '30 seconds', backoff: 'constant' } }
// Linear: 1m, 2m, 3m, 4m, 5m
{ retries: { limit: 5, delay: '1 minute', backoff: 'linear' } }
// Exponential (recommended): 10s, 20s, 40s, 80s, 160s
{ retries: { limit: 10, delay: '10 seconds', backoff: 'exponential' }, timeout: '5 minutes' }
// Unlimited retries
{ retries: { limit: Infinity, delay: '1 minute', backoff: 'exponential' } }
// No retries
{ retries: { limit: 0 } }import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
import { NonRetryableError } from 'cloudflare:workflows';
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
await step.do('validate input', async () => {
if (!event.payload.userId) {
throw new NonRetryableError('userId is required');
}
// Validate user exists
const user = await this.env.DB.prepare(
'SELECT * FROM users WHERE id = ?'
).bind(event.payload.userId).first();
if (!user) {
// Terminal error - retrying won't help
throw new NonRetryableError('User not found');
}
return user;
});
}
}export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
await step.do('process payment', async () => { /* critical */ });
try {
await step.do('send email', async () => { /* optional */ });
} catch (error) {
await step.do('log failure', async () => {
await this.env.DB.prepare('INSERT INTO failed_emails VALUES (?, ?)').bind(event.payload.userId, error.message).run();
});
}
await step.do('update status', async () => { /* continues */ });
}
}let result;
try {
result = await step.do('call primary API', async () => await callPrimaryAPI());
} catch {
result = await step.do('call backup API', async () => await callBackupAPI());
}{
"workflows": [{
"name": "my-workflow",
"binding": "MY_WORKFLOW",
"class_name": "MyWorkflow",
"script_name": "workflow-worker" // If workflow in different Worker
}]
}const instance = await env.MY_WORKFLOW.create({ params: { userId: '123' } });
return Response.json({ id: instance.id, status: await instance.status() });const instance = await env.MY_WORKFLOW.get(instanceId);
const status = await instance.status(); // { status: 'running'|'complete'|'errored'|'queued', error, output }
await instance.sendEvent({ type: 'user-action', payload: { action: 'approved' } });
await instance.pause();
await instance.resume();
await instance.terminate();step.do()stringnumberbooleannull// ✅ Good
const result = await step.do('fetch data', async () => ({
users: [{ id: 1, name: 'Alice' }],
timestamp: Date.now(),
metadata: null
}));
// ❌ Bad - function not serializable
const bad = await step.do('bad', async () => ({ data: [1, 2, 3], transform: (x) => x * 2 })); // Throws error!const userData = await step.do('fetch user', async () => ({ id: 123, email: 'user@example.com' }));
const orderData = await step.do('create order', async () => ({ userId: userData.id, orderId: 'ORD-456' }));
await step.do('send email', async () => sendEmail({ to: userData.email, subject: `Order ${orderData.orderId}` }));const instance = await env.MY_WORKFLOW.get(instanceId);
const status = await instance.status();
console.log(status);
// {
// status: 'complete' | 'running' | 'errored' | 'queued' | 'waiting' | 'unknown',
// error: string | null,
// output: { userId: '123', status: 'processed' }
// }// wrangler.jsonc
{ "limits": { "cpu_ms": 300000 } } // 5 minutes max (default: 30 seconds)| Feature | Workers Free | Workers Paid |
|---|---|---|
| Max steps per workflow | 1,024 | 1,024 |
| Max state per step | 1 MiB | 1 MiB |
| Max state per instance | 100 MB | 1 GB |
| Max event payload size | 1 MiB | 1 MiB |
| Max sleep/sleepUntil duration | 365 days | 365 days |
| Max waitForEvent timeout | 365 days | 365 days |
| CPU time per step | 10 ms | 30 sec (default), 5 min (max) |
| Duration (wall clock) per step | Unlimited | Unlimited |
| Max workflow executions | 100,000/day | Unlimited |
| Concurrent instances | 25 | 10,000 (Oct 2025, up from 4,500) |
| Instance creation rate | 100/second | 100/second (Oct 2025, 10x faster) |
| Max queued instances | 100,000 | 1,000,000 |
| Max subrequests per instance | 50/request | 1,000/request |
| Retention (completed state) | 3 days | 30 days |
| Max Workflow name length | 64 chars | 64 chars |
| Max instance ID length | 100 chars | 100 chars |
step.sleep()step.sleepUntil()wrangler.jsonc{ "limits": { "cpu_ms": 300000 } }step.do()step.sleep()step.sleepUntil()step.waitForEvent()cloudflare:testnpm install -D vitest@latest @cloudflare/vitest-pool-workers@latestimport { defineWorkersConfig } from '@cloudflare/vitest-pool-workers/config';
export default defineWorkersConfig({ test: { poolOptions: { workers: { miniflare: { bindings: { MY_WORKFLOW: { scriptName: 'workflow' } } } } } } });import { env, introspectWorkflowInstance } from 'cloudflare:test';
it('should complete workflow', async () => {
const instance = await introspectWorkflowInstance(env.MY_WORKFLOW, 'test-123');
try {
await instance.modify(async (m) => {
await m.disableSleeps(); // Skip all sleeps
await m.mockStepResult({ name: 'fetch data' }, { users: [{ id: 1 }] }); // Mock step result
await m.mockEvent({ type: 'approval', payload: { approved: true } }); // Send mock event
await m.mockStepError({ name: 'call API' }, new Error('Network timeout'), 1); // Force error once
});
await env.MY_WORKFLOW.create({ id: 'test-123' });
await expect(instance.waitForStatus('complete')).resolves.not.toThrow();
} finally {
await instance.dispose(); // Cleanup
}
});disableSleeps(steps?)mockStepResult(step, result)mockStepError(step, error, times?)mockEvent(event)forceStepTimeout(step, times?)forceEventTimeout(step)mcp__cloudflare-docs__search_cloudflare_documentation