iii-queue-processing
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseQueue Processing
队列处理
Comparable to: BullMQ, Celery, SQS
同类工具:BullMQ、Celery、SQS
Key Concepts
核心概念
Use the concepts below when they fit the task. Not every queue setup needs all of them.
- Named queues are declared in under
iii-config.yamlqueue_configs - Standard queues process jobs concurrently; FIFO queues preserve ordering
- dispatches a job to a named queue
TriggerAction.Enqueue({ queue }) - Failed jobs auto-retry with exponential backoff up to
max_retries - Jobs that exhaust retries land in a dead letter queue for inspection
- Each consumer function receives the job payload and a
messageReceiptId - Fan-out is achieved by having one producer trigger multiple distinct consumer functions via separate enqueue calls
当任务需要时使用以下概念,并非所有队列配置都需要用到全部概念。
- 命名队列在的
iii-config.yaml下声明queue_configs - 标准队列并发处理任务;FIFO队列保证消息顺序
- 将任务分发至指定命名队列
TriggerAction.Enqueue({ queue }) - 失败任务会自动重试,采用指数退避策略,直至达到次数
max_retries - 耗尽重试次数的任务会进入死信队列等待检查
- 每个消费者函数会接收任务负载和
messageReceiptId - 通过一次生产者调用多个独立的入队操作触发不同消费者函数,可实现**扇出(Fan-out)**效果
Architecture
架构
Producer function
→ TriggerAction.Enqueue({ queue: 'task-queue' })
→ Named Queue (standard or FIFO)
→ Consumer registerFunction handler
→ success / retry with backoff
→ Dead Letter Queue (after max_retries)
Producer function
→ TriggerAction.Enqueue({ queue: 'task-queue' })
→ Named Queue (standard or FIFO)
→ Consumer registerFunction handler
→ success / retry with backoff
→ Dead Letter Queue (after max_retries)
iii Primitives Used
使用的iii原语
| Primitive | Purpose |
|---|---|
| Define the consumer that processes jobs |
| Dispatch a job to a named queue |
| Acknowledge or track individual job processing |
| Declare queues with concurrency and retries |
| 原语 | 用途 |
|---|---|
| 定义处理任务的消费者函数 |
| 将任务分发至指定命名队列 |
| 确认或跟踪单个任务的处理状态 |
| 声明队列并配置并发数、重试次数等参数 |
Reference Implementation
参考实现
See ../references/queue-processing.js for the full working example — a producer that enqueues jobs and a consumer that processes them with retry logic.
Also available in Python: ../references/queue-processing.py
Also available in Rust: ../references/queue-processing.rs
完整可运行示例请查看 ../references/queue-processing.js —— 包含一个入队任务的生产者和带重试逻辑的任务消费者。
同时提供Python版本:../references/queue-processing.py
以及Rust版本:../references/queue-processing.rs
Common Patterns
常见模式
Code using this pattern commonly includes, when relevant:
- — worker initialization
registerWorker(url, { workerName }) - — define the consumer
registerFunction(id, handler) - — enqueue a job
trigger({ function_id, payload, action: TriggerAction.Enqueue({ queue }) }) - — track or acknowledge the job
payload.messageReceiptId - — persist results after processing
trigger({ function_id: 'state::set', payload }) - — structured logging per job
const logger = new Logger()
使用该模式的代码通常会包含以下相关内容(按需使用):
- —— 初始化工作器
registerWorker(url, { workerName }) - —— 定义消费者函数
registerFunction(id, handler) - —— 将任务入队
trigger({ function_id, payload, action: TriggerAction.Enqueue({ queue }) }) - —— 跟踪或确认任务
payload.messageReceiptId - —— 处理完成后持久化结果
trigger({ function_id: 'state::set', payload }) - —— 为每个任务添加结构化日志
const logger = new Logger()
Adapting This Pattern
模式适配
Use the adaptations below when they apply to the task.
- Choose FIFO queues when job ordering matters (e.g. sequential pipeline steps)
- Set and
max_retriesin queue config to match your workloadconcurrency - Chain multiple queues for multi-stage pipelines (queue A consumer enqueues to queue B)
- For idempotency, check state before processing to avoid duplicate work on retries
- Use fan-out by enqueuing to multiple consumer functions when a single event requires parallel processing (e.g. payment + notification + audit)
当任务符合以下场景时,可采用对应的适配方式:
- 当任务顺序至关重要时(如流水线的 sequential 步骤),选择FIFO队列
- 在队列配置中设置和
max_retries以匹配工作负载concurrency - 串联多个队列构建多阶段流水线(队列A的消费者将任务入队至队列B)
- 为保证幂等性,处理任务前先检查状态,避免重试时重复执行工作
- 当单个事件需要并行处理时(如支付+通知+审计),通过将任务入队至多个消费者函数实现扇出效果
Engine Configuration
引擎配置
Named queues are declared in iii-config.yaml under with per-queue , , , and . Fan-out is a pattern (one producer triggers multiple consumer functions), not a queue config key. See ../references/iii-config.yaml for the full annotated config reference.
queue_configsmax_retriesconcurrencytypebackoff_ms命名队列在的下声明,可配置每个队列的、、和。扇出是一种模式(一个生产者触发多个消费者函数),并非队列配置项。完整带注释的配置参考请查看 ../references/iii-config.yaml。
iii-config.yamlqueue_configsmax_retriesconcurrencytypebackoff_msPattern Boundaries
模式边界
- If the task only needs fire-and-forget without retries or ordering, prefer with
iii-trigger-actions.TriggerAction.Void() - If failed jobs need special handling or alerting, prefer for the DLQ consumer.
iii-dead-letter-queues - If the task is step-by-step orchestration with branching, prefer .
iii-workflow-orchestration - Stay with when the primary need is reliable async job execution with retries.
iii-queue-processing
- 如果任务仅需“触发即遗忘”,无需重试或顺序保证,优先使用的
iii-trigger-actions。TriggerAction.Void() - 如果失败任务需要特殊处理或告警,优先为死信队列消费者使用。
iii-dead-letter-queues - 如果任务是带分支的分步编排,优先使用。
iii-workflow-orchestration - 当核心需求是具备重试机制的可靠异步任务执行时,选择。
iii-queue-processing
When to Use
适用场景
- Use this skill when the task is primarily about in the iii engine.
iii-queue-processing - Triggers when the request directly asks for this pattern or an equivalent implementation.
- 当任务核心是在iii引擎中实现时,使用该技能。
iii-queue-processing - 当请求直接要求该模式或等效实现时触发。
Boundaries
使用边界
- Never use this skill as a generic fallback for unrelated tasks.
- You must not apply this skill when a more specific iii skill is a better fit.
- Always verify environment and safety constraints before applying examples from this skill.
- 切勿将该技能作为无关任务的通用 fallback。
- 当更特定的iii技能更合适时,不得使用该技能。
- 在应用该技能的示例前,务必验证环境和安全约束。