inngest-flow-control
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseInngest Flow Control
Inngest 流控制
Master Inngest flow control mechanisms to manage resources, prevent overloading systems, and ensure application reliability. This skill covers all flow control options with prescriptive guidance on when and how to use each.
These skills are focused on TypeScript. For Python or Go, refer to the Inngest documentation for language-specific guidance. Core concepts apply across all languages.
掌握Inngest流控制机制,以管理资源、防止系统过载并确保应用可靠性。本技能涵盖所有流控制选项,并针对每种选项的使用场景与方法提供指导性说明。
本技能聚焦于 TypeScript。若使用Python或Go,请参考Inngest官方文档获取语言专属指引。核心概念适用于所有语言。
Quick Decision Guide
快速决策指南
- "Limit how many run at once" → Concurrency
- "Spread runs over time" → Throttling
- "Block after N runs in a period" → Rate Limiting
- "Wait for activity to stop, then run once" → Debounce
- "Only one run at a time for this key" → Singleton
- "Process events in groups" → Batching
- "Some runs are more important" → Priority
- 「限制同时运行的数量」 → 并发控制
- 「分散运行时间」 → 节流
- 「在一段时间内运行N次后阻止执行」 → 速率限制
- 「等待活动停止后再执行一次」 → 防抖
- 「针对该键仅允许一个实例同时运行」 → 单例模式
- 「批量处理事件」 → 批处理
- 「部分运行任务优先级更高」 → 优先级设置
Concurrency
并发控制
When to use: Limit the number of executing steps (not function runs) to manage computing resources and prevent system overwhelm.
Key insight: Concurrency limits active code execution, not function runs. A function waiting on or doesn't count against the limit.
step.sleep()step.waitForEvent()适用场景: 限制正在执行的步骤数量(而非函数运行次数),以计算资源并防止系统过载。
核心要点: 并发控制限制的是活跃代码的执行,而非函数运行次数。处于或等待状态的函数不计入限制。
step.sleep()step.waitForEvent()Basic Concurrency
基础并发控制
typescript
inngest.createFunction(
{
id: "process-images",
concurrency: 5
},
{ event: "media/image.uploaded" },
async ({ event, step }) => {
// Only 5 steps can execute simultaneously
await step.run("resize", () => resizeImage(event.data.imageUrl));
}
);typescript
inngest.createFunction(
{
id: "process-images",
concurrency: 5
},
{ event: "media/image.uploaded" },
async ({ event, step }) => {
// Only 5 steps can execute simultaneously
await step.run("resize", () => resizeImage(event.data.imageUrl));
}
);Concurrency with Keys (Multi-tenant)
带键的并发控制(多租户场景)
Use parameter to apply limit per unique value of the key.
keytypescript
inngest.createFunction(
{
id: "user-sync",
concurrency: [
{
key: "event.data.user_id",
limit: 1
}
]
},
{ event: "user/profile.updated" },
async ({ event, step }) => {
// Only 1 step per user can execute at once
// Prevents race conditions in user-specific operations
}
);使用参数为键的每个唯一值应用限制。
keytypescript
inngest.createFunction(
{
id: "user-sync",
concurrency: [
{
key: "event.data.user_id",
limit: 1
}
]
},
{ event: "user/profile.updated" },
async ({ event, step }) => {
// Only 1 step per user can execute at once
// Prevents race conditions in user-specific operations
}
);Account-level Shared Limits
账户级共享限制
typescript
inngest.createFunction(
{
id: "ai-summary",
concurrency: [
{
scope: "account",
key: `"openai"`,
limit: 60
}
]
},
{ event: "ai/summary.requested" },
async ({ event, step }) => {
// Share 60 concurrent OpenAI calls across all functions
}
);When to use each:
- Basic: Protect databases or limit general capacity
- Keyed: Multi-tenant fairness, prevent "noisy neighbor" issues
- Account-level: Share quotas across multiple functions (API limits)
typescript
inngest.createFunction(
{
id: "ai-summary",
concurrency: [
{
scope: "account",
key: `"openai"`,
limit: 60
}
]
},
{ event: "ai/summary.requested" },
async ({ event, step }) => {
// Share 60 concurrent OpenAI calls across all functions
}
);各模式适用场景:
- 基础模式:保护数据库或限制通用容量
- 带键模式:多租户公平性,避免「噪音邻居」问题
- 账户级模式:跨多个函数共享配额(如API限制)
Throttling
节流
When to use: Control the rate of function starts over time to work around API rate limits or smooth traffic spikes.
Key difference from concurrency: Throttling limits function run starts; concurrency limits step execution.
typescript
inngest.createFunction(
{
id: "sync-crm-data",
throttle: {
limit: 10, // 10 function starts
period: "60s", // per minute
burst: 5, // plus 5 immediate bursts
key: "event.data.customer_id" // per customer
}
},
{ event: "crm/contact.updated" },
async ({ event, step }) => {
// Respects CRM API rate limits: 10 calls/min per customer
await step.run("sync", () => crmApi.updateContact(event.data));
}
);Configuration:
- : Functions that can start per period
limit - : Time window (1s to 7d)
period - : Extra immediate starts allowed
burst - : Apply limits per unique key value
key
适用场景: 控制函数启动的速率,以规避API速率限制或平滑流量峰值。
与并发控制的核心区别: 节流限制的是函数启动次数;并发控制限制的是步骤执行数量。
typescript
inngest.createFunction(
{
id: "sync-crm-data",
throttle: {
limit: 10, // 10 function starts
period: "60s", // per minute
burst: 5, // plus 5 immediate bursts
key: "event.data.customer_id" // per customer
}
},
{ event: "crm/contact.updated" },
async ({ event, step }) => {
// Respects CRM API rate limits: 10 calls/min per customer
await step.run("sync", () => crmApi.updateContact(event.data));
}
);配置参数:
- : 每个周期内可启动的函数数量
limit - : 时间窗口(1秒至7天)
period - : 允许的额外即时启动次数
burst - : 为键的每个唯一值应用限制
key
Rate Limiting
速率限制
When to use: Hard limit to prevent abuse or skip excessive duplicate events.
Key difference from throttling: Rate limiting discards events; throttling delays them.
typescript
inngest.createFunction(
{
id: "webhook-processor",
rateLimit: {
limit: 1,
period: "4h",
key: "event.data.webhook_id"
}
},
{ event: "webhook/data.received" },
async ({ event, step }) => {
// Process each webhook only once per 4 hours
// Prevents duplicate webhook spam
}
);Use cases:
- Prevent webhook duplicates
- Limit expensive operations per user
- Protection against abuse
适用场景: 设置硬限制以防止滥用或跳过过多重复事件。
与节流的核心区别: 速率限制会丢弃事件;节流会延迟事件执行。
typescript
inngest.createFunction(
{
id: "webhook-processor",
rateLimit: {
limit: 1,
period: "4h",
key: "event.data.webhook_id"
}
},
{ event: "webhook/data.received" },
async ({ event, step }) => {
// Process each webhook only once per 4 hours
// Prevents duplicate webhook spam
}
);适用场景:
- 防止重复Webhook事件
- 限制每个用户的高成本操作
- 防滥用保护
Debounce
防抖
When to use: Wait for a series of events to stop arriving before processing the latest one.
typescript
inngest.createFunction(
{
id: "save-document",
debounce: {
period: "5m", // Wait 5min after last edit
key: "event.data.document_id",
timeout: "30m" // Force save after 30min max
}
},
{ event: "document/content.changed" },
async ({ event, step }) => {
// Saves document only after user stops editing
// Uses the LAST event received
await step.run("save", () => saveDocument(event.data));
}
);Perfect for:
- User input that changes rapidly (search, document editing)
- Noisy webhook events
- Ensuring latest data is processed
适用场景: 等待一系列事件停止触发后,再处理最新的一次事件。
typescript
inngest.createFunction(
{
id: "save-document",
debounce: {
period: "5m", // Wait 5min after last edit
key: "event.data.document_id",
timeout: "30m" // Force save after 30min max
}
},
{ event: "document/content.changed" },
async ({ event, step }) => {
// Saves document only after user stops editing
// Uses the LAST event received
await step.run("save", () => saveDocument(event.data));
}
);理想适用场景:
- 用户快速输入的场景(如搜索、文档编辑)
- 频繁触发的Webhook事件
- 确保处理最新数据
Priority
优先级设置
When to use: Execute some function runs ahead of others based on dynamic data.
typescript
inngest.createFunction(
{
id: "process-order",
priority: {
// VIP users get priority up to 120 seconds ahead
run: "event.data.user_tier == 'vip' ? 120 : 0"
}
},
{ event: "order/placed" },
async ({ event, step }) => {
// VIP orders jump ahead in the queue
}
);Advanced example:
typescript
inngest.createFunction(
{
id: "support-ticket",
priority: {
run: `
event.data.severity == 'critical' ? 300 :
event.data.severity == 'high' ? 120 :
event.data.user_plan == 'enterprise' ? 60 : 0
`
}
},
{ event: "support/ticket.created" },
async ({ event, step }) => {
// Critical tickets get highest priority (300s ahead)
// High severity: 120s ahead
// Enterprise users: 60s ahead
// Everyone else: normal priority
}
);适用场景: 根据动态数据让部分函数运行任务优先于其他任务执行。
typescript
inngest.createFunction(
{
id: "process-order",
priority: {
// VIP users get priority up to 120 seconds ahead
run: "event.data.user_tier == 'vip' ? 120 : 0"
}
},
{ event: "order/placed" },
async ({ event, step }) => {
// VIP orders jump ahead in the queue
}
);进阶示例:
typescript
inngest.createFunction(
{
id: "support-ticket",
priority: {
run: `
event.data.severity == 'critical' ? 300 :
event.data.severity == 'high' ? 120 :
event.data.user_plan == 'enterprise' ? 60 : 0
`
}
},
{ event: "support/ticket.created" },
async ({ event, step }) => {
// Critical tickets get highest priority (300s ahead)
// High severity: 120s ahead
// Enterprise users: 60s ahead
// Everyone else: normal priority
}
);Singleton
单例模式
When to use: Ensure only one instance of a function runs at a time.
适用场景: 确保同一时间仅存在一个函数实例运行。
Skip Mode (Preserve Current Run)
跳过模式(保留当前运行实例)
typescript
inngest.createFunction(
{
id: "data-backup",
singleton: {
key: "event.data.database_id",
mode: "skip"
}
},
{ event: "backup/requested" },
async ({ event, step }) => {
// Skip new backups if one is already running for this database
await step.run("backup", () => performBackup(event.data.database_id));
}
);typescript
inngest.createFunction(
{
id: "data-backup",
singleton: {
key: "event.data.database_id",
mode: "skip"
}
},
{ event: "backup/requested" },
async ({ event, step }) => {
// Skip new backups if one is already running for this database
await step.run("backup", () => performBackup(event.data.database_id));
}
);Cancel Mode (Use Latest Event)
取消模式(使用最新事件)
typescript
inngest.createFunction(
{
id: "realtime-sync",
singleton: {
key: "event.data.user_id",
mode: "cancel"
}
},
{ event: "user/data.changed" },
async ({ event, step }) => {
// Cancel previous sync and start with latest data
await step.run("sync", () => syncUserData(event.data));
}
);typescript
inngest.createFunction(
{
id: "realtime-sync",
singleton: {
key: "event.data.user_id",
mode: "cancel"
}
},
{ event: "user/data.changed" },
async ({ event, step }) => {
// Cancel previous sync and start with latest data
await step.run("sync", () => syncUserData(event.data));
}
);Batching
批处理
When to use: Process multiple events together for efficiency.
typescript
inngest.createFunction(
{
id: "bulk-email-send",
batchEvents: {
maxSize: 100, // Up to 100 events
timeout: "30s", // Or 30 seconds, whichever first
key: "event.data.campaign_id" // Batch per campaign
}
},
{ event: "email/send.queued" },
async ({ events, step }) => {
// Process array of events together
const emails = events.map((evt) => ({
to: evt.data.email,
subject: evt.data.subject,
body: evt.data.body
}));
await step.run("send-batch", () => emailService.sendBulk(emails));
}
);适用场景: 批量处理多个事件以提升效率。
typescript
inngest.createFunction(
{
id: "bulk-email-send",
batchEvents: {
maxSize: 100, // Up to 100 events
timeout: "30s", // Or 30 seconds, whichever first
key: "event.data.campaign_id" // Batch per campaign
}
},
{ event: "email/send.queued" },
async ({ events, step }) => {
// Process array of events together
const emails = events.map((evt) => ({
to: evt.data.email,
subject: evt.data.subject,
body: evt.data.body
}));
await step.run("send-batch", () => emailService.sendBulk(emails));
}
);Combining Flow Control
流控制组合使用
Example: Fair AI Processing
示例:公平AI处理
typescript
inngest.createFunction(
{
id: "ai-image-processing",
// Global throttling for API limits
throttle: {
limit: 50,
period: "60s",
key: `"gpu-cluster"`
},
// Per-user concurrency for fairness
concurrency: [
{
key: "event.data.user_id",
limit: 3
}
],
// VIP users get priority
priority: {
run: "event.data.plan == 'pro' ? 60 : 0"
}
},
{ event: "ai/image.generate" },
async ({ event, step }) => {
// Combines multiple flow controls for optimal resource usage
}
);Pro tip: Most production functions benefit from combining 1-3 flow control mechanisms for optimal reliability and performance.
typescript
inngest.createFunction(
{
id: "ai-image-processing",
// Global throttling for API limits
throttle: {
limit: 50,
period: "60s",
key: `"gpu-cluster"`
},
// Per-user concurrency for fairness
concurrency: [
{
key: "event.data.user_id",
limit: 3
}
],
// VIP users get priority
priority: {
run: "event.data.plan == 'pro' ? 60 : 0"
}
},
{ event: "ai/image.generate" },
async ({ event, step }) => {
// Combines multiple flow controls for optimal resource usage
}
);专业提示: 大多数生产环境中的函数通过组合1-3种流控制机制,可实现最佳的可靠性与性能。