iii-queue-processing

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Queue Processing

队列处理

Comparable to: BullMQ, Celery, SQS
可类比工具:BullMQ、Celery、SQS

Key Concepts

核心概念

Use the concepts below when they fit the task. Not every queue setup needs all of them.
  • Named queues are declared in
    iii-config.yaml
    under
    queue_configs
  • Standard queues process jobs concurrently; FIFO queues preserve ordering
  • TriggerAction.Enqueue({ queue })
    dispatches a job to a named queue
  • Failed jobs auto-retry with exponential backoff up to
    max_retries
  • Jobs that exhaust retries land in a dead letter queue for inspection
  • Each consumer function receives the job payload and a
    messageReceiptId
当任务符合以下场景时使用这些概念,并非所有队列配置都需要全部概念。
  • 命名队列
    iii-config.yaml
    queue_configs
    下声明
  • 标准队列并发处理任务;FIFO队列保证消息顺序
  • TriggerAction.Enqueue({ queue })
    用于将任务分发至指定命名队列
  • 失败任务会自动重试,采用指数退避策略,直至达到
    max_retries
    上限
  • 耗尽重试次数的任务会进入死信队列以待检查
  • 每个消费者函数会接收任务负载和
    messageReceiptId

Architecture

架构

Producer function → TriggerAction.Enqueue({ queue: 'task-queue' }) → Named Queue (standard or FIFO) → Consumer registerFunction handler → success / retry with backoff → Dead Letter Queue (after max_retries)
生产者函数 → TriggerAction.Enqueue({ queue: 'task-queue' }) → 命名队列(标准或FIFO类型) → 消费者registerFunction处理器 → 成功 / 退避重试 → 死信队列(达到max_retries后)

iii Primitives Used

使用的iii原语

PrimitivePurpose
registerFunction
Define the consumer that processes jobs
trigger({ ..., action: TriggerAction.Enqueue({ queue }) })
Dispatch a job to a named queue
messageReceiptId
Acknowledge or track individual job processing
queue_configs
in
iii-config.yaml
Declare queues with concurrency and retries
原语用途
registerFunction
定义处理任务的消费者
trigger({ ..., action: TriggerAction.Enqueue({ queue }) })
将任务分发至指定命名队列
messageReceiptId
确认或跟踪单个任务的处理状态
iii-config.yaml
中的
queue_configs
声明队列并配置并发数和重试规则

Reference Implementation

参考实现

See ../references/queue-processing.js for the full working example — a producer that enqueues jobs and a consumer that processes them with retry logic.
Also available in Python: ../references/queue-processing.py
Also available in Rust: ../references/queue-processing.rs
完整可运行示例请查看../references/queue-processing.js —— 包含一个将任务入队的生产者,以及一个带重试逻辑的任务消费者。
同时提供Python版本:../references/queue-processing.py
以及Rust版本:../references/queue-processing.rs

Common Patterns

常见模式

Code using this pattern commonly includes, when relevant:
  • registerWorker(url, { workerName })
    — worker initialization
  • registerFunction(id, handler)
    — define the consumer
  • trigger({ function_id, payload, action: TriggerAction.Enqueue({ queue }) })
    — enqueue a job
  • payload.messageReceiptId
    — track or acknowledge the job
  • trigger({ function_id: 'state::set', payload })
    — persist results after processing
  • const logger = new Logger()
    — structured logging per job
使用该模式的代码通常包含以下相关内容:
  • registerWorker(url, { workerName })
    —— 初始化处理器
  • registerFunction(id, handler)
    —— 定义消费者
  • trigger({ function_id, payload, action: TriggerAction.Enqueue({ queue }) })
    —— 将任务入队
  • payload.messageReceiptId
    —— 跟踪或确认任务
  • trigger({ function_id: 'state::set', payload })
    —— 处理完成后持久化结果
  • const logger = new Logger()
    —— 为每个任务添加结构化日志

Adapting This Pattern

模式适配

Use the adaptations below when they apply to the task.
  • Choose FIFO queues when job ordering matters (e.g. sequential pipeline steps)
  • Set
    max_retries
    and
    concurrency
    in queue config to match your workload
  • Chain multiple queues for multi-stage pipelines (queue A consumer enqueues to queue B)
  • For idempotency, check state before processing to avoid duplicate work on retries
当任务符合以下场景时,可采用对应的适配方式:
  • 当任务顺序至关重要时(如流水线的连续步骤),选择FIFO队列
  • 在队列配置中设置
    max_retries
    concurrency
    以匹配工作负载
  • 为多阶段流水线串联多个队列(队列A的消费者将任务入队至队列B)
  • 为实现幂等性,处理任务前先检查状态,避免重试时重复执行工作

Engine Configuration

引擎配置

Named queues are declared in iii-config.yaml under
queue_configs
with per-queue
max_retries
,
concurrency
,
type
, and
backoff_ms
. See ../references/iii-config.yaml for the full annotated config reference.
命名队列在
iii-config.yaml
queue_configs
下声明,可配置每个队列的
max_retries
concurrency
type
backoff_ms
。完整带注释的配置参考请查看../references/iii-config.yaml

Pattern Boundaries

模式边界

  • If the task only needs fire-and-forget without retries or ordering, prefer
    iii-trigger-actions
    with
    TriggerAction.Void()
    .
  • If failed jobs need special handling or alerting, prefer
    iii-dead-letter-queues
    for the DLQ consumer.
  • If the task is step-by-step orchestration with branching, prefer
    iii-workflow-orchestration
    .
  • Stay with
    iii-queue-processing
    when the primary need is reliable async job execution with retries.
  • 如果任务仅需“即发即弃”,无需重试或顺序保证,优先使用
    iii-trigger-actions
    TriggerAction.Void()
  • 如果失败任务需要特殊处理或告警,优先为死信队列消费者使用
    iii-dead-letter-queues
  • 如果任务是带分支的分步编排,优先使用
    iii-workflow-orchestration
  • 当核心需求是可靠的异步任务执行及重试时,选择
    iii-queue-processing

When to Use

使用场景

  • Use this skill when the task is primarily about
    iii-queue-processing
    in the iii engine.
  • Triggers when the request directly asks for this pattern or an equivalent implementation.
  • 当任务主要涉及iii引擎中的
    iii-queue-processing
    时,使用该技能。
  • 当请求直接要求该模式或等效实现时触发。

Boundaries

边界限制

  • Never use this skill as a generic fallback for unrelated tasks.
  • You must not apply this skill when a more specific iii skill is a better fit.
  • Always verify environment and safety constraints before applying examples from this skill.
  • 切勿将该技能作为无关任务的通用 fallback。
  • 当更特定的iii技能更合适时,不得使用该技能。
  • 在应用该技能的示例前,务必验证环境和安全约束。