inngest-steps

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Inngest Steps

Inngest 步骤

Build robust, durable workflows with Inngest's step methods. Each step is a separate HTTP request that can be independently retried and monitored.
These skills are focused on TypeScript. For Python or Go, refer to the Inngest documentation for language-specific guidance. Core concepts apply across all languages.
使用Inngest的step方法构建健壮、耐用的工作流。每个步骤都是一个独立的HTTP请求,可以独立重试和监控。
本技能内容聚焦于TypeScript。如果使用Python或Go,请参考Inngest文档获取对应语言的指导。核心概念适用于所有语言。

Core Concept

核心概念

🔄 Critical: Each step re-runs your function from the beginning. Put ALL non-deterministic code (API calls, DB queries, randomness) inside steps, never outside.
📊 Step Limits: Every function has a maximum of 1,000 steps and 4MB total step data.
typescript
// ❌ WRONG - will run 4 times
export default inngest.createFunction(
  { id: "bad-example" },
  { event: "test" },
  async ({ step }) => {
    console.log("This logs 4 times!"); // Outside step = bad
    await step.run("a", () => console.log("a"));
    await step.run("b", () => console.log("b"));
    await step.run("c", () => console.log("c"));
  }
);

// ✅ CORRECT - logs once each
export default inngest.createFunction(
  { id: "good-example" },
  { event: "test" },
  async ({ step }) => {
    await step.run("log-hello", () => console.log("hello"));
    await step.run("a", () => console.log("a"));
    await step.run("b", () => console.log("b"));
    await step.run("c", () => console.log("c"));
  }
);
🔄 重要提示:每个步骤都会从头重新运行你的函数。 所有非确定性代码(API调用、数据库查询、随机逻辑)都必须放在步骤内部,绝对不能放在外部。
📊 步骤限制: 每个函数最多包含1000个步骤,步骤数据总大小上限为4MB。
typescript
// ❌ WRONG - will run 4 times
export default inngest.createFunction(
  { id: "bad-example" },
  { event: "test" },
  async ({ step }) => {
    console.log("This logs 4 times!"); // Outside step = bad
    await step.run("a", () => console.log("a"));
    await step.run("b", () => console.log("b"));
    await step.run("c", () => console.log("c"));
  }
);

// ✅ CORRECT - logs once each
export default inngest.createFunction(
  { id: "good-example" },
  { event: "test" },
  async ({ step }) => {
    await step.run("log-hello", () => console.log("hello"));
    await step.run("a", () => console.log("a"));
    await step.run("b", () => console.log("b"));
    await step.run("c", () => console.log("c"));
  }
);

step.run()

step.run()

Execute retriable code as a step. Each step ID can be reused - Inngest automatically handles counters.
typescript
// Basic usage
const result = await step.run("fetch-user", async () => {
  const user = await db.user.findById(userId);
  return user; // Always return useful data
});

// Synchronous code works too
const transformed = await step.run("transform-data", () => {
  return processData(result);
});

// Side effects (no return needed)
await step.run("send-notification", async () => {
  await sendEmail(user.email, "Welcome!");
});
✅ DO:
  • Put ALL non-deterministic logic inside steps
  • Return useful data for subsequent steps
  • Reuse step IDs in loops (counters handled automatically)
❌ DON'T:
  • Put deterministic logic in steps unnecessarily
  • Forget that each step = separate HTTP request
将可重试代码作为步骤执行。每个步骤ID可以重复使用 - Inngest会自动处理计数。
typescript
// Basic usage
const result = await step.run("fetch-user", async () => {
  const user = await db.user.findById(userId);
  return user; // Always return useful data
});

// Synchronous code works too
const transformed = await step.run("transform-data", () => {
  return processData(result);
});

// Side effects (no return needed)
await step.run("send-notification", async () => {
  await sendEmail(user.email, "Welcome!");
});
✅ 正确做法:
  • 将所有非确定性逻辑放在步骤内部
  • 返回有用数据供后续步骤使用
  • 在循环中重复使用步骤ID(计数由Inngest自动处理)
❌ 错误做法:
  • 不必要地将确定性逻辑放在步骤中
  • 忘记每个步骤对应一个独立的HTTP请求

step.sleep()

step.sleep()

Pause execution without using compute time.
typescript
// Duration strings
await step.sleep("wait-24h", "24h");
await step.sleep("short-delay", "30s");
await step.sleep("weekly-pause", "7d");

// Use in workflows
await step.run("send-welcome", () => sendEmail(email));
await step.sleep("wait-for-engagement", "3d");
await step.run("send-followup", () => sendFollowupEmail(email));
暂停执行且不占用计算资源。
typescript
// Duration strings
await step.sleep("wait-24h", "24h");
await step.sleep("short-delay", "30s");
await step.sleep("weekly-pause", "7d");

// Use in workflows
await step.run("send-welcome", () => sendEmail(email));
await step.sleep("wait-for-engagement", "3d");
await step.run("send-followup", () => sendFollowupEmail(email));

step.sleepUntil()

step.sleepUntil()

Sleep until a specific datetime.
typescript
const reminderDate = new Date("2024-12-25T09:00:00Z");
await step.sleepUntil("wait-for-christmas", reminderDate);

// From event data
const scheduledTime = new Date(event.data.remind_at);
await step.sleepUntil("wait-for-scheduled-time", scheduledTime);
休眠到指定的日期时间。
typescript
const reminderDate = new Date("2024-12-25T09:00:00Z");
await step.sleepUntil("wait-for-christmas", reminderDate);

// From event data
const scheduledTime = new Date(event.data.remind_at);
await step.sleepUntil("wait-for-scheduled-time", scheduledTime);

step.waitForEvent()

step.waitForEvent()

🚨 CRITICAL: waitForEvent ONLY catches events sent AFTER this step executes.
  • ❌ Event sent before waitForEvent runs → will NOT be caught
  • ✅ Event sent after waitForEvent runs → will be caught
  • Always check for
    null
    return (means timeout, event never arrived)
typescript
// Basic event waiting with timeout
const approval = await step.waitForEvent("wait-for-approval", {
  event: "app/invoice.approved",
  timeout: "7d",
  match: "data.invoiceId" // Simple matching
});

// Expression-based matching (CEL syntax)
const subscription = await step.waitForEvent("wait-for-subscription", {
  event: "app/subscription.created",
  timeout: "30d",
  if: "event.data.userId == async.data.userId && async.data.plan == 'pro'"
});

// Handle timeout
if (!approval) {
  await step.run("handle-timeout", () => {
    // Approval never came
    return notifyAccountingTeam();
  });
}
✅ DO:
  • Use unique IDs for matching (userId, sessionId, requestId)
  • Always set reasonable timeouts
  • Handle null return (timeout case)
  • Use with Realtime for human-in-the-loop flows
❌ DON'T:
  • Expect events sent before this step to be handled
  • Use without timeouts in production
🚨 重要提示:waitForEvent仅捕获此步骤执行后发送的事件。
  • ❌ 在waitForEvent运行前发送的事件 → 不会被捕获
  • ✅ 在waitForEvent运行后发送的事件 → 会被捕获
  • 务必检查返回值是否为
    null
    (表示超时,事件从未到达)
typescript
// Basic event waiting with timeout
const approval = await step.waitForEvent("wait-for-approval", {
  event: "app/invoice.approved",
  timeout: "7d",
  match: "data.invoiceId" // Simple matching
});

// Expression-based matching (CEL syntax)
const subscription = await step.waitForEvent("wait-for-subscription", {
  event: "app/subscription.created",
  timeout: "30d",
  if: "event.data.userId == async.data.userId && async.data.plan == 'pro'"
});

// Handle timeout
if (!approval) {
  await step.run("handle-timeout", () => {
    // Approval never came
    return notifyAccountingTeam();
  });
}
✅ 正确做法:
  • 使用唯一ID进行匹配(userId、sessionId、requestId)
  • 始终设置合理的超时时间
  • 处理返回值为null的情况(超时场景)
  • 结合Realtime实现需人工参与的流程
❌ 错误做法:
  • 期望处理在此步骤执行前发送的事件
  • 在生产环境中不设置超时时间

Expression Syntax

表达式语法

In expressions,
event
= the original triggering event,
async
= the new event being matched. See Expression Syntax Reference for full syntax, operators, and patterns.
在表达式中,
event
= 原始触发事件,
async
= 待匹配的事件。完整语法、运算符和模式请参考表达式语法参考

step.waitForSignal()

step.waitForSignal()

Wait for unique signals (not events). Better for 1:1 matching.
typescript
const taskId = "task-" + crypto.randomUUID();

const signal = await step.waitForSignal("wait-for-task-completion", {
  signal: taskId,
  timeout: "1h"
});

// Send signal elsewhere via Inngest API or SDK
// POST /v1/events with signal matching taskId
When to use:
  • waitForEvent: Multiple functions might handle the same event
  • waitForSignal: Exact 1:1 signal to specific function run
等待唯一信号(而非事件)。更适合一对一匹配场景。
typescript
const taskId = "task-" + crypto.randomUUID();

const signal = await step.waitForSignal("wait-for-task-completion", {
  signal: taskId,
  timeout: "1h"
});

// Send signal elsewhere via Inngest API or SDK
// POST /v1/events with signal matching taskId
适用场景:
  • waitForEvent:多个函数可能处理同一个事件
  • waitForSignal:信号与特定函数运行实现精确的一对一匹配

step.sendEvent()

step.sendEvent()

Fan out to other functions without waiting for results.
typescript
// Trigger other functions
await step.sendEvent("notify-systems", {
  name: "user/profile.updated",
  data: { userId: user.id, changes: profileChanges }
});

// Multiple events at once
await step.sendEvent("batch-notifications", [
  { name: "billing/invoice.created", data: { invoiceId } },
  { name: "email/invoice.send", data: { email: user.email, invoiceId } }
]);
Use when: You want to trigger other functions but don't need their results in the current function.
触发其他函数但不等待结果,实现事件扇出。
typescript
// Trigger other functions
await step.sendEvent("notify-systems", {
  name: "user/profile.updated",
  data: { userId: user.id, changes: profileChanges }
});

// Multiple events at once
await step.sendEvent("batch-notifications", [
  { name: "billing/invoice.created", data: { invoiceId } },
  { name: "email/invoice.send", data: { email: user.email, invoiceId } }
]);
适用时机: 你希望触发其他函数,但不需要在当前函数中获取它们的结果。

step.invoke()

step.invoke()

Call other functions and handle their results. Perfect for composition.
typescript
const computeSquare = inngest.createFunction(
  { id: "compute-square" },
  { event: "calculate/square" },
  async ({ event }) => {
    return { result: event.data.number * event.data.number };
  }
);

// Invoke and use result
const square = await step.invoke("get-square", {
  function: computeSquare,
  data: { number: 4 }
});

console.log(square.result); // 16, fully typed!
Great for:
  • Breaking complex workflows into composable functions
  • Reusing logic across multiple workflows
  • Map-reduce patterns
调用其他函数并处理其返回结果,非常适合组合式工作流。
typescript
const computeSquare = inngest.createFunction(
  { id: "compute-square" },
  { event: "calculate/square" },
  async ({ event }) => {
    return { result: event.data.number * event.data.number };
  }
);

// Invoke and use result
const square = await step.invoke("get-square", {
  function: computeSquare,
  data: { number: 4 }
});

console.log(square.result); // 16, fully typed!
适用场景:
  • 将复杂工作流拆分为可组合的函数
  • 在多个工作流中复用逻辑
  • 映射-归约模式

Patterns

模式实践

Loops with Steps

基于步骤的循环

Reuse step IDs - Inngest handles counters automatically.
typescript
const allProducts = [];
let cursor = null;
let hasMore = true;

while (hasMore) {
  // Same ID "fetch-page" reused - counters handled automatically
  const page = await step.run("fetch-page", async () => {
    return shopify.products.list({ cursor, limit: 50 });
  });

  allProducts.push(...page.products);

  if (page.products.length < 50) {
    hasMore = false;
  } else {
    cursor = page.products[49].id;
  }
}

await step.run("process-products", () => {
  return processAllProducts(allProducts);
});
重复使用步骤ID - Inngest会自动处理计数。
typescript
const allProducts = [];
let cursor = null;
let hasMore = true;

while (hasMore) {
  // Same ID "fetch-page" reused - counters handled automatically
  const page = await step.run("fetch-page", async () => {
    return shopify.products.list({ cursor, limit: 50 });
  });

  allProducts.push(...page.products);

  if (page.products.length < 50) {
    hasMore = false;
  } else {
    cursor = page.products[49].id;
  }
}

await step.run("process-products", () => {
  return processAllProducts(allProducts);
});

Parallel Execution

并行执行

Use Promise.all for parallel steps.
typescript
// Create steps without awaiting
const sendEmail = step.run("send-email", async () => {
  return await sendWelcomeEmail(user.email);
});

const updateCRM = step.run("update-crm", async () => {
  return await crmService.addUser(user);
});

const createSubscription = step.run("create-subscription", async () => {
  return await subscriptionService.create(user.id);
});

// Run all in parallel
const [emailId, crmRecord, subscription] = await Promise.all([
  sendEmail,
  updateCRM,
  createSubscription
]);

// Optimization: Enable optimizeParallelism for many parallel steps
export default inngest.createFunction(
  {
    id: "parallel-heavy-function",
    optimizeParallelism: true // Reduces HTTP requests by ~50%
  },
  { event: "process/batch" },
  async ({ event, step }) => {
    const results = await Promise.all(
      event.data.items.map((item, i) =>
        step.run(`process-item-${i}`, () => processItem(item))
      )
    );
  }
);
See inngest-flow-control for concurrency and throttling options.
使用Promise.all实现步骤并行执行。
typescript
// Create steps without awaiting
const sendEmail = step.run("send-email", async () => {
  return await sendWelcomeEmail(user.email);
});

const updateCRM = step.run("update-crm", async () => {
  return await crmService.addUser(user);
});

const createSubscription = step.run("create-subscription", async () => {
  return await subscriptionService.create(user.id);
});

// Run all in parallel
const [emailId, crmRecord, subscription] = await Promise.all([
  sendEmail,
  updateCRM,
  createSubscription
]);

// Optimization: Enable optimizeParallelism for many parallel steps
export default inngest.createFunction(
  {
    id: "parallel-heavy-function",
    optimizeParallelism: true // Reduces HTTP requests by ~50%
  },
  { event: "process/batch" },
  async ({ event, step }) => {
    const results = await Promise.all(
      event.data.items.map((item, i) =>
        step.run(`process-item-${i}`, () => processItem(item))
      )
    );
  }
);
更多并发和限流选项请查看inngest-flow-control

Chunking Jobs

任务分块处理

Perfect for batch processing with parallel steps.
typescript
export default inngest.createFunction(
  { id: "process-large-dataset" },
  { event: "data/process.large" },
  async ({ event, step }) => {
    const chunks = chunkArray(event.data.items, 10);

    // Process chunks in parallel
    const results = await Promise.all(
      chunks.map((chunk, index) =>
        step.run(`process-chunk-${index}`, () => processChunk(chunk))
      )
    );

    // Combine results
    await step.run("combine-results", () => {
      return aggregateResults(results);
    });
  }
);
结合并行步骤实现批量处理的理想方案。
typescript
export default inngest.createFunction(
  { id: "process-large-dataset" },
  { event: "data/process.large" },
  async ({ event, step }) => {
    const chunks = chunkArray(event.data.items, 10);

    // Process chunks in parallel
    const results = await Promise.all(
      chunks.map((chunk, index) =>
        step.run(`process-chunk-${index}`, () => processChunk(chunk))
      )
    );

    // Combine results
    await step.run("combine-results", () => {
      return aggregateResults(results);
    });
  }
);

Key Gotchas

关键注意事项

🔄 Function Re-execution: Code outside steps runs on every step execution ⏰ Event Timing: waitForEvent only catches events sent AFTER the step runs 🔢 Step Limits: Max 1,000 steps per function, 4MB per step output, 32MB per function run in total 📨 HTTP Requests: With
serve
, use
checkpointing
to reduce HTTP requests 🔁 Step IDs: Can be reused in loops - Inngest handles counters ⚡ Parallelism: Use Promise.all, consider optimizeParallelism for many steps
🔄 函数重执行: 步骤外部的代码会在每次步骤执行时重新运行 ⏰ 事件时序: waitForEvent仅捕获此步骤执行后发送的事件 🔢 步骤限制: 每个函数最多1000个步骤,每个步骤输出上限4MB,单个函数运行总数据上限32MB 📨 HTTP请求: 使用
serve
时,启用
checkpointing
可减少HTTP请求次数 🔁 步骤ID: 可在循环中重复使用 - Inngest会自动处理计数 ⚡ 并行性: 使用Promise.all实现并行,对于大量并行步骤可考虑启用optimizeParallelism

Common Use Cases

常见使用场景

  • Human-in-the-loop: waitForEvent + Realtime UI
  • Multi-step onboarding: sleep between steps, waitForEvent for user actions
  • Data processing: Parallel steps for chunked work
  • External integrations: step.run for reliable API calls
  • AI workflows: step.ai for durable LLM orchestration
  • Function composition: step.invoke to build complex workflows
Remember: Steps make your functions durable, observable, and debuggable. Embrace them!
  • 人工参与流程: waitForEvent + Realtime UI
  • 多步骤用户引导: 步骤间休眠,等待用户操作事件
  • 数据处理: 并行步骤处理分块任务
  • 外部集成: step.run实现可靠的API调用
  • AI工作流: step.ai实现耐用的LLM编排
  • 函数组合: step.invoke构建复杂工作流
请记住:步骤让你的函数具备耐用性、可观测性和可调试性。充分利用它们!