inngest-durable-functions
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseInngest Durable Functions
Inngest 持久化函数
Master Inngest's durable execution model for building fault-tolerant, long-running workflows. This skill covers the complete lifecycle from triggers to error handling.
These skills are focused on TypeScript. For Python or Go, refer to the Inngest documentation for language-specific guidance. Core concepts apply across all languages.
掌握Inngest的持久化执行模型,用于构建容错、长时间运行的工作流。本技能涵盖从触发器到错误处理的完整生命周期。
本技能聚焦于TypeScript。若使用Python或Go,请参考Inngest文档获取语言专属指导。核心概念适用于所有语言。
Core Concepts You Need to Know
你需要了解的核心概念
Durable Execution Model
持久化执行模型
- Each step should encapsulate side-effects and non-deterministic code
- Memoization prevents re-execution of completed steps
- State persistence survives infrastructure failures
- Automatic retries with configurable retry count
- 每个步骤应封装副作用和非确定性代码
- 记忆化可防止已完成步骤的重复执行
- 状态持久化可在基础设施故障时保留状态
- 自动重试,支持配置重试次数
Step Execution Flow
步骤执行流程
typescript
// ❌ BAD: Non-deterministic logic outside steps
async ({ event, step }) => {
const timestamp = Date.now(); // This runs multiple times!
const result = await step.run("process-data", () => {
return processData(event.data);
});
};
// ✅ GOOD: All non-deterministic logic in steps
async ({ event, step }) => {
const result = await step.run("process-with-timestamp", () => {
const timestamp = Date.now(); // Only runs once
return processData(event.data, timestamp);
});
};typescript
// ❌ 不良示例:步骤外存在非确定性逻辑
async ({ event, step }) => {
const timestamp = Date.now(); // 这段代码会多次执行!
const result = await step.run("process-data", () => {
return processData(event.data);
});
};
// ✅ 良好示例:所有非确定性逻辑都放在步骤内
async ({ event, step }) => {
const result = await step.run("process-with-timestamp", () => {
const timestamp = Date.now(); // 仅执行一次
return processData(event.data, timestamp);
});
};Function Limits
函数限制
Every Inngest function has these hard limits:
- Maximum 1,000 steps per function run
- Maximum 4MB returned data for each step
- Maximum 32MB combined function run state including, event data, step output, and function output
- Each step = separate HTTP request (~50-100ms overhead)
If you're hitting these limits, break your function into smaller functions connected via or .
step.invoke()step.sendEvent()所有Inngest函数都有以下硬性限制:
- 每个函数运行最多包含1000个步骤
- 每个步骤返回的数据最大为4MB
- 函数运行的组合状态(包括事件数据、步骤输出和函数输出)最大为32MB
- 每个步骤对应一次独立的HTTP请求(约50-100ms的开销)
如果遇到这些限制,请将函数拆分为更小的函数,通过或连接。
step.invoke()step.sendEvent()When to Use Steps
何时使用步骤
Always wrap in :
step.run()- API calls and network requests
- Database reads and writes
- File I/O operations
- Any non-deterministic operation
- Anything you want retried independently on failure
Never wrap in :
step.run()- Pure calculations and data transformations
- Simple validation logic
- Deterministic operations with no side effects
- Logging (use outside steps)
务必用包裹:
step.run()- API调用和网络请求
- 数据库读写操作
- 文件I/O操作
- 任何非确定性操作
- 任何希望在失败时独立重试的操作
切勿用包裹:
step.run()- 纯计算和数据转换
- 简单验证逻辑
- 无副作用的确定性操作
- 日志记录(放在步骤外执行)
Function Creation
函数创建
Basic Function Structure
基础函数结构
typescript
const processOrder = inngest.createFunction(
{
id: "process-order", // Unique, never change this
retries: 4, // Default: 4 retries per step
concurrency: 10 // Max concurrent executions
},
{ event: "order/created" }, // Trigger
async ({ event, step }) => {
// Your durable workflow
}
);typescript
const processOrder = inngest.createFunction(
{
id: "process-order", // 唯一标识,请勿修改
retries: 4, // 默认:每个步骤重试4次
concurrency: 10 // 最大并发执行数
},
{ event: "order/created" }, // 触发器
async ({ event, step }) => {
// 你的持久化工作流逻辑
}
);Step IDs and Memoization
步骤ID与记忆化
typescript
// Step IDs can be reused - Inngest handles counters automatically
const data = await step.run("fetch-data", () => fetchUserData());
const more = await step.run("fetch-data", () => fetchOrderData()); // Different execution
// Use descriptive IDs for clarity
await step.run("validate-payment", () => validatePayment(event.data.paymentId));
await step.run("charge-customer", () => chargeCustomer(event.data));
await step.run("send-confirmation", () => sendEmail(event.data.email));typescript
// 步骤ID可重复使用 - Inngest会自动处理计数器
const data = await step.run("fetch-data", () => fetchUserData());
const more = await step.run("fetch-data", () => fetchOrderData()); // 独立的执行实例
// 使用描述性ID提升可读性
await step.run("validate-payment", () => validatePayment(event.data.paymentId));
await step.run("charge-customer", () => chargeCustomer(event.data));
await step.run("send-confirmation", () => sendEmail(event.data.email));Triggers and Events
触发器与事件
Event Triggers
事件触发器
typescript
// Single event trigger
{ event: "user/signup" }
// Event with conditional filter
{
event: "user/action",
if: 'event.data.action == "purchase" && event.data.amount > 100'
}
// Multiple triggers (up to 10)
[
{ event: "user/signup" },
{ event: "user/login", if: 'event.data.firstLogin == true' },
{ cron: "0 9 * * *" } // Daily at 9 AM
]typescript
// 单个事件触发器
{ event: "user/signup" }
// 带条件过滤的事件
{
event: "user/action",
if: 'event.data.action == "purchase" && event.data.amount > 100'
}
// 多个触发器(最多10个)
[
{ event: "user/signup" },
{ event: "user/login", if: 'event.data.firstLogin == true' },
{ cron: "0 9 * * *" } // 每天上午9点执行
]Cron Triggers
定时触发器
typescript
// Basic cron
{
cron: "0 */6 * * *";
} // Every 6 hours
// With timezone
{
cron: "TZ=Europe/Paris 0 12 * * 5";
} // Fridays at noon Paris time
// Combine with events
[
{ event: "manual/report.requested" },
{ cron: "0 0 * * 0" } // Weekly on Sunday
];typescript
// 基础定时任务
{
cron: "0 */6 * * *";
} // 每6小时执行一次
// 带时区的定时任务
{
cron: "TZ=Europe/Paris 0 12 * * 5";
} // 巴黎时间每周五中午执行
// 与事件触发器组合
[
{ event: "manual/report.requested" },
{ cron: "0 0 * * 0" } // 每周日执行
];Function Invocation
函数调用
typescript
// Invoke another function as a step
const result = await step.invoke("generate-report", {
function: generateReportFunction,
data: { userId: event.data.userId }
});
// Use returned data
await step.run("process-report", () => {
return processReport(result);
});typescript
// 在步骤中调用另一个函数
const result = await step.invoke("generate-report", {
function: generateReportFunction,
data: { userId: event.data.userId }
});
// 使用返回的数据
await step.run("process-report", () => {
return processReport(result);
});Idempotency Strategies
幂等性策略
Event-Level Idempotency (Producer Side)
事件级幂等性(生产者端)
typescript
// Prevent duplicate events with custom ID
await inngest.send({
id: `checkout-completed-${cartId}`, // 24-hour deduplication
name: "cart/checkout.completed",
data: { cartId, email: "user@example.com" }
});typescript
// 使用自定义ID防止重复事件
await inngest.send({
id: `checkout-completed-${cartId}`, // 24小时去重
name: "cart/checkout.completed",
data: { cartId, email: "user@example.com" }
});Function-Level Idempotency (Consumer Side)
函数级幂等性(消费者端)
typescript
const sendEmail = inngest.createFunction(
{
id: "send-checkout-email",
// Only run once per cartId per 24 hours
idempotency: "event.data.cartId"
},
{ event: "cart/checkout.completed" },
async ({ event, step }) => {
// This function won't run twice for same cartId
}
);
// Complex idempotency keys
const processUserAction = inngest.createFunction(
{
id: "process-user-action",
// Unique per user + organization combination
idempotency: 'event.data.userId + "-" + event.data.organizationId'
},
{ event: "user/action.performed" },
async ({ event, step }) => {
/* ... */
}
);typescript
const sendEmail = inngest.createFunction(
{
id: "send-checkout-email",
// 每个cartId在24小时内仅执行一次
idempotency: "event.data.cartId"
},
{ event: "cart/checkout.completed" },
async ({ event, step }) => {
// 相同cartId不会触发多次执行
}
);
// 复杂幂等键
const processUserAction = inngest.createFunction(
{
id: "process-user-action",
// 按用户+组织的组合生成唯一键
idempotency: 'event.data.userId + "-" + event.data.organizationId'
},
{ event: "user/action.performed" },
async ({ event, step }) => {
/* ... */
}
);Cancellation Patterns
取消模式
Event-Based Cancellation
基于事件的取消
In expressions, = the original triggering event, = the new event being matched. See Expression Syntax Reference for full details.
eventasynctypescript
const processOrder = inngest.createFunction(
{
id: "process-order",
cancelOn: [
{
event: "order/cancelled",
if: "event.data.orderId == async.data.orderId"
}
]
},
{ event: "order/created" },
async ({ event, step }) => {
await step.sleepUntil("wait-for-payment", event.data.paymentDue);
// Will be cancelled if order/cancelled event received
await step.run("charge-payment", () => processPayment(event.data));
}
);在表达式中,指原始触发事件,指匹配到的新事件。详情请参考表达式语法参考。
eventasynctypescript
const processOrder = inngest.createFunction(
{
id: "process-order",
cancelOn: [
{
event: "order/cancelled",
if: "event.data.orderId == async.data.orderId"
}
]
},
{ event: "order/created" },
async ({ event, step }) => {
await step.sleepUntil("wait-for-payment", event.data.paymentDue);
// 若收到order/cancelled事件,将被取消
await step.run("charge-payment", () => processPayment(event.data));
}
);Timeout Cancellation
超时取消
typescript
const processWithTimeout = inngest.createFunction(
{
id: "process-with-timeout",
timeouts: {
start: "5m", // Cancel if not started within 5 minutes
run: "30m" // Cancel if running longer than 30 minutes
}
},
{ event: "long/process.requested" },
async ({ event, step }) => {
/* ... */
}
);typescript
const processWithTimeout = inngest.createFunction(
{
id: "process-with-timeout",
timeouts: {
start: "5m", // 若5分钟内未启动则取消
run: "30m" // 若运行超过30分钟则取消
}
},
{ event: "long/process.requested" },
async ({ event, step }) => {
/* ... */
}
);Handling Cancellation Cleanup
处理取消后的清理工作
typescript
// Listen for cancellation events
const cleanupCancelled = inngest.createFunction(
{ id: "cleanup-cancelled-process" },
{ event: "inngest/function.cancelled" },
async ({ event, step }) => {
if (event.data.function_id === "process-order") {
await step.run("cleanup-resources", () => {
return cleanupOrderResources(event.data.run_id);
});
}
}
);typescript
// 监听取消事件
const cleanupCancelled = inngest.createFunction(
{ id: "cleanup-cancelled-process" },
{ event: "inngest/function.cancelled" },
async ({ event, step }) => {
if (event.data.function_id === "process-order") {
await step.run("cleanup-resources", () => {
return cleanupOrderResources(event.data.run_id);
});
}
}
);Error Handling and Retries
错误处理与重试
Default Retry Behavior
默认重试行为
- 5 total attempts (1 initial + 4 retries) per step
- Exponential backoff with jitter
- Independent retry counters per step
- 每个步骤最多5次尝试(1次初始执行+4次重试)
- 带抖动的指数退避策略
- 每个步骤有独立的重试计数器
Custom Retry Configuration
自定义重试配置
typescript
const reliableFunction = inngest.createFunction(
{
id: "reliable-function",
retries: 10 // Up to 10 retries per step
},
{ event: "critical/task" },
async ({ event, step, attempt }) => {
// Access attempt number (0-indexed)
if (attempt > 5) {
// Different logic for later attempts
}
}
);typescript
const reliableFunction = inngest.createFunction(
{
id: "reliable-function",
retries: 10 // 每个步骤最多重试10次
},
{ event: "critical/task" },
async ({ event, step, attempt }) => {
// 获取尝试次数(从0开始计数)
if (attempt > 5) {
// 针对后续尝试的特殊逻辑
}
}
);Non-Retriable Errors
不可重试错误
Prevent retries for code that won't succeed upon retry.
typescript
import { NonRetriableError } from "inngest";
const processUser = inngest.createFunction(
{ id: "process-user" },
{ event: "user/process.requested" },
async ({ event, step }) => {
const user = await step.run("fetch-user", async () => {
const user = await db.users.findOne(event.data.userId);
if (!user) {
// Don't retry - user doesn't exist
throw new NonRetriableError("User not found, stopping execution");
}
return user;
});
// Continue processing...
}
);针对重试也无法成功的代码,禁止重试。
typescript
import { NonRetriableError } from "inngest";
const processUser = inngest.createFunction(
{ id: "process-user" },
{ event: "user/process.requested" },
async ({ event, step }) => {
const user = await step.run("fetch-user", async () => {
const user = await db.users.findOne(event.data.userId);
if (!user) {
// 无需重试 - 用户不存在
throw new NonRetriableError("用户不存在,终止执行");
}
return user;
});
// 继续处理...
}
);Custom Retry Timing
自定义重试时机
typescript
import { RetryAfterError } from "inngest";
const respectRateLimit = inngest.createFunction(
{ id: "api-call" },
{ event: "api/call.requested" },
async ({ event, step }) => {
await step.run("call-api", async () => {
const response = await externalAPI.call(event.data);
if (response.status === 429) {
// Retry after specific time from API
const retryAfter = response.headers["retry-after"];
throw new RetryAfterError("Rate limited", `${retryAfter}s`);
}
return response.data;
});
}
);typescript
import { RetryAfterError } from "inngest";
const respectRateLimit = inngest.createFunction(
{ id: "api-call" },
{ event: "api/call.requested" },
async ({ event, step }) => {
await step.run("call-api", async () => {
const response = await externalAPI.call(event.data);
if (response.status === 429) {
// 根据API指定的时间重试
const retryAfter = response.headers["retry-after"];
throw new RetryAfterError("请求频率受限", `${retryAfter}s`);
}
return response.data;
});
}
);Logging Best Practices
日志记录最佳实践
Proper Logging Setup
正确配置日志
typescript
import winston from "winston";
// Configure logger
const logger = winston.createLogger({
level: "info",
format: winston.format.json(),
transports: [new winston.transports.Console()]
});
const inngest = new Inngest({
id: "my-app",
logger // Pass logger to client
});typescript
import winston from "winston";
// 配置日志器
const logger = winston.createLogger({
level: "info",
format: winston.format.json(),
transports: [new winston.transports.Console()]
});
const inngest = new Inngest({
id: "my-app",
logger // 将日志器传递给客户端
});Function Logging Patterns
函数日志记录模式
typescript
const processData = inngest.createFunction(
{ id: "process-data" },
{ event: "data/process.requested" },
async ({ event, step, logger }) => {
// ✅ GOOD: Log inside steps to avoid duplicates
const result = await step.run("fetch-data", async () => {
logger.info("Fetching data for user", { userId: event.data.userId });
return await fetchUserData(event.data.userId);
});
// ❌ AVOID: Logging outside steps can duplicate
// logger.info("Processing complete"); // This could run multiple times!
await step.run("log-completion", async () => {
logger.info("Processing complete", { resultCount: result.length });
});
}
);typescript
const processData = inngest.createFunction(
{ id: "process-data" },
{ event: "data/process.requested" },
async ({ event, step, logger }) => {
// ✅ 最佳实践:在步骤内记录日志以避免重复
const result = await step.run("fetch-data", async () => {
logger.info("为用户获取数据", { userId: event.data.userId });
return await fetchUserData(event.data.userId);
});
// ❌ 避免:在步骤外记录日志会导致重复
// logger.info("处理完成"); // 这段代码可能会执行多次!
await step.run("log-completion", async () => {
logger.info("处理完成", { resultCount: result.length });
});
}
);Performance Optimization
性能优化
Checkpointing
检查点机制
typescript
// Enable checkpointing for lower latency
const realTimeFunction = inngest.createFunction(
{
id: "real-time-function",
checkpointing: {
maxRuntime: "300s", // Max continuous execution time
bufferedSteps: 2, // Buffer 2 steps before checkpointing
maxInterval: "10s" // Max wait before checkpoint
}
},
{ event: "realtime/process" },
async ({ event, step }) => {
// Steps execute immediately with periodic checkpointing
const result1 = await step.run("step-1", () => process1(event.data));
const result2 = await step.run("step-2", () => process2(result1));
return { result2 };
}
);typescript
// 启用检查点以降低延迟
const realTimeFunction = inngest.createFunction(
{
id: "real-time-function",
checkpointing: {
maxRuntime: "300s", // 最大连续执行时间
bufferedSteps: 2, // 检查点前缓存2个步骤
maxInterval: "10s" // 检查点的最大间隔时间
}
},
{ event: "realtime/process" },
async ({ event, step }) => {
// 步骤会立即执行,同时定期创建检查点
const result1 = await step.run("step-1", () => process1(event.data));
const result2 = await step.run("step-2", () => process2(result1));
return { result2 };
}
);Advanced Patterns
高级模式
Conditional Step Execution
条件步骤执行
typescript
const conditionalProcess = inngest.createFunction(
{ id: "conditional-process" },
{ event: "process/conditional" },
async ({ event, step }) => {
const userData = await step.run("fetch-user", () => {
return getUserData(event.data.userId);
});
// Conditional step execution
if (userData.isPremium) {
await step.run("premium-processing", () => {
return processPremiumFeatures(userData);
});
}
// Always runs
await step.run("standard-processing", () => {
return processStandardFeatures(userData);
});
}
);typescript
const conditionalProcess = inngest.createFunction(
{ id: "conditional-process" },
{ event: "process/conditional" },
async ({ event, step }) => {
const userData = await step.run("fetch-user", () => {
return getUserData(event.data.userId);
});
// 条件性执行步骤
if (userData.isPremium) {
await step.run("premium-processing", () => {
return processPremiumFeatures(userData);
});
}
// 始终执行的步骤
await step.run("standard-processing", () => {
return processStandardFeatures(userData);
});
}
);Error Recovery Patterns
错误恢复模式
typescript
const robustProcess = inngest.createFunction(
{ id: "robust-process" },
{ event: "process/robust" },
async ({ event, step }) => {
let primaryResult;
try {
primaryResult = await step.run("primary-service", () => {
return callPrimaryService(event.data);
});
} catch (error) {
// Fallback to secondary service
primaryResult = await step.run("fallback-service", () => {
return callSecondaryService(event.data);
});
}
return { result: primaryResult };
}
);typescript
const robustProcess = inngest.createFunction(
{ id: "robust-process" },
{ event: "process/robust" },
async ({ event, step }) => {
let primaryResult;
try {
primaryResult = await step.run("primary-service", () => {
return callPrimaryService(event.data);
});
} catch (error) {
// 回退到备用服务
primaryResult = await step.run("fallback-service", () => {
return callSecondaryService(event.data);
});
}
return { result: primaryResult };
}
);Common Mistakes to Avoid
需要避免的常见错误
- ❌ Non-deterministic code outside steps
- ❌ Database calls outside steps
- ❌ Logging outside steps (causes duplicates)
- ❌ Changing step IDs after deployment
- ❌ Not handling NonRetriableError cases
- ❌ Ignoring idempotency for critical functions
- ❌ 步骤外存在非确定性代码
- ❌ 数据库操作放在步骤外
- ❌ 日志记录放在步骤外(会导致重复)
- ❌ 部署后修改步骤ID
- ❌ 未处理NonRetriableError场景
- ❌ 关键函数未考虑幂等性
Next Steps
后续步骤
- See inngest-steps for detailed step method reference
- See references/step-execution.md for detailed step patterns
- See references/error-handling.md for comprehensive error strategies
- See references/observability.md for monitoring and tracing setup
- See references/checkpointing.md for performance optimization details
This skill covers Inngest's durable function patterns. For event sending and webhook handling, see the skill.
inngest-events- 查看inngest-steps获取详细的步骤方法参考
- 查看references/step-execution.md获取详细的步骤模式
- 查看references/error-handling.md获取全面的错误处理策略
- 查看references/observability.md获取监控与追踪配置方法
- 查看references/checkpointing.md获取性能优化细节
本技能涵盖Inngest的持久化函数模式。关于事件发送和Webhook处理,请查看技能。
inngest-events