cloudflare-queues

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Cloudflare Queues

Cloudflare Queues

Status: Production Ready ✅ Last Updated: 2026-01-09 Dependencies: cloudflare-worker-base (for Worker setup) Latest Versions: wrangler@4.58.0, @cloudflare/workers-types@4.20260109.0
Recent Updates (2025):
  • April 2025: Pull consumers increased limits (5,000 msg/s per queue, up from 1,200 requests/5min)
  • March 2025: Pause & Purge APIs (wrangler queues pause-delivery, queues purge)
  • 2025: Customizable retention (60s to 14 days, previously fixed at 4 days)
  • 2025: Increased queue limits (10,000 queues per account, up from 10)

状态:已就绪可用于生产 ✅ 最后更新:2026-01-09 依赖项:cloudflare-worker-base(用于Worker设置) 最新版本:wrangler@4.58.0, @cloudflare/workers-types@4.20260109.0
2025年近期更新:
  • 2025年4月:拉取式消费者提升了限制(每个队列每秒5000条消息,从之前的每5分钟1200次请求提升)
  • 2025年3月:新增暂停与清空API(wrangler queues pause-delivery, queues purge)
  • 2025年:可自定义消息保留时长(60秒至14天,此前固定为4天)
  • 2025年:提升了队列数量限制(每个账户10000个队列,从之前的10个提升)

Quick Start (5 Minutes)

快速开始(5分钟)

bash
undefined
bash
undefined

1. Create queue

1. Create queue

npx wrangler queues create my-queue
npx wrangler queues create my-queue

2. Add producer binding to wrangler.jsonc

2. Add producer binding to wrangler.jsonc

{ "queues": { "producers": [{ "binding": "MY_QUEUE", "queue": "my-queue" }] } }

{ "queues": { "producers": [{ "binding": "MY_QUEUE", "queue": "my-queue" }] } }

3. Send message from Worker

3. Send message from Worker

await env.MY_QUEUE.send({ userId: '123', action: 'process-order' });
await env.MY_QUEUE.send({ userId: '123', action: 'process-order' });

Or publish via HTTP (May 2025+) from any service

Or publish via HTTP (May 2025+) from any service

curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/my-queue/messages"
-H "Authorization: Bearer YOUR_API_TOKEN"
-d '{"messages": [{"body": {"userId": "123"}}]}'
curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/my-queue/messages"
-H "Authorization: Bearer YOUR_API_TOKEN"
-d '{"messages": [{"body": {"userId": "123"}}]}'

4. Add consumer binding to wrangler.jsonc

4. Add consumer binding to wrangler.jsonc

{ "queues": { "consumers": [{ "queue": "my-queue", "max_batch_size": 10 }] } }

{ "queues": { "consumers": [{ "queue": "my-queue", "max_batch_size": 10 }] } }

5. Process messages

5. Process messages

export default { async queue(batch: MessageBatch, env: Env): Promise<void> { for (const message of batch.messages) { await processMessage(message.body); message.ack(); // Explicit acknowledgement } } };
export default { async queue(batch: MessageBatch, env: Env): Promise<void> { for (const message of batch.messages) { await processMessage(message.body); message.ack(); // Explicit acknowledgement } } };

6. Deploy and test

6. Deploy and test

npx wrangler deploy npx wrangler tail my-consumer

---
npx wrangler deploy npx wrangler tail my-consumer

---

Producer API

生产者API

typescript
// Send single message
await env.MY_QUEUE.send({ userId: '123', action: 'send-email' });

// Send with delay (max 12 hours)
await env.MY_QUEUE.send({ action: 'reminder' }, { delaySeconds: 600 });

// Send batch (max 100 messages or 256 KB)
await env.MY_QUEUE.sendBatch([
  { body: { userId: '1' } },
  { body: { userId: '2' } },
]);
Critical Limits:
  • Message size: 128 KB max (including ~100 bytes metadata)
  • Messages >128 KB will fail - store in R2 and send reference instead
  • Batch size: 100 messages or 256 KB total
  • Delay: 0-43200 seconds (12 hours max)

typescript
// Send single message
await env.MY_QUEUE.send({ userId: '123', action: 'send-email' });

// Send with delay (max 12 hours)
await env.MY_QUEUE.send({ action: 'reminder' }, { delaySeconds: 600 });

// Send batch (max 100 messages or 256 KB)
await env.MY_QUEUE.sendBatch([
  { body: { userId: '1' } },
  { body: { userId: '2' } },
]);
关键限制:
  • 消息大小:最大128 KB(包含约100字节元数据)
  • 超过128 KB的消息会发送失败 - 请将内容存储在R2中并发送引用
  • 批量大小:最多100条消息或总大小256 KB
  • 延迟时长:0-43200秒(最长12小时)

HTTP Publishing (May 2025+)

HTTP发布(2025年5月起可用)

New in May 2025: Publish messages to queues via HTTP from any service or programming language.
Authentication: Requires Cloudflare API token with
Queues Edit
permissions.
bash
undefined
2025年5月新增:可通过HTTP从任意服务或编程语言向队列发布消息。
认证:需要拥有
Queues Edit
权限的Cloudflare API令牌。
bash
undefined

Single message

Single message

curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/my-queue/messages"
-H "Authorization: Bearer YOUR_API_TOKEN"
-H "Content-Type: application/json"
-d '{ "messages": [ {"body": {"userId": "123", "action": "process-order"}} ] }'
curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/my-queue/messages"
-H "Authorization: Bearer YOUR_API_TOKEN"
-H "Content-Type: application/json"
-d '{ "messages": [ {"body": {"userId": "123", "action": "process-order"}} ] }'

Batch messages

Batch messages

curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/my-queue/messages"
-H "Authorization: Bearer YOUR_API_TOKEN"
-H "Content-Type: application/json"
-d '{ "messages": [ {"body": {"userId": "1"}}, {"body": {"userId": "2"}}, {"body": {"userId": "3"}} ] }'

**Use Cases**:
- Publishing from external microservices (Node.js, Python, Go, etc.)
- Cron jobs running outside Cloudflare
- Webhook receivers
- Legacy systems integration
- Services without Cloudflare Workers SDK

---
curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/my-queue/messages"
-H "Authorization: Bearer YOUR_API_TOKEN"
-H "Content-Type: application/json"
-d '{ "messages": [ {"body": {"userId": "1"}}, {"body": {"userId": "2"}}, {"body": {"userId": "3"}} ] }'

**适用场景**:
- 从外部微服务(Node.js、Python、Go等)发布消息
- 在Cloudflare外部运行的定时任务
- Webhook接收器
- 遗留系统集成
- 没有Cloudflare Workers SDK的服务

---

Event Subscriptions (August 2025+)

事件订阅(2025年8月起可用)

New in August 2025: Subscribe to events from Cloudflare services and consume via Queues.
Supported Event Sources:
  • R2 (bucket.created, object.uploaded, object.deleted, etc.)
  • Workers KV
  • Workers AI
  • Vectorize
  • Workflows
  • Super Slurper
  • Workers Builds
Create Subscription:
bash
npx wrangler queues subscription create my-queue \
  --source r2 \
  --events bucket.created,object.uploaded
Event Structure:
typescript
interface CloudflareEvent {
  type: string;           // 'r2.bucket.created', 'kv.namespace.created'
  source: string;         // 'r2', 'kv', 'ai', etc.
  payload: any;           // Event-specific data
  metadata: {
    accountId: string;
    timestamp: string;
  };
}
Consumer Example:
typescript
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const message of batch.messages) {
      const event = message.body as CloudflareEvent;

      switch (event.type) {
        case 'r2.bucket.created':
          console.log('New R2 bucket:', event.payload.bucketName);
          await notifyAdmin(event.payload);
          break;

        case 'r2.object.uploaded':
          console.log('File uploaded:', event.payload.key);
          await processNewFile(event.payload.key);
          break;

        case 'kv.namespace.created':
          console.log('New KV namespace:', event.payload.namespaceId);
          break;

        case 'ai.inference.completed':
          console.log('AI inference done:', event.payload.modelId);
          break;
      }

      message.ack();
    }
  }
};
Use Cases:
  • Build custom workflows triggered by R2 uploads
  • Monitor infrastructure changes (new KV namespaces, buckets)
  • Track AI inference jobs
  • Audit account activity
  • Event-driven architectures without custom webhooks

2025年8月新增:订阅Cloudflare服务的事件并通过队列消费。
支持的事件源:
  • R2(bucket.created、object.uploaded、object.deleted等)
  • Workers KV
  • Workers AI
  • Vectorize
  • Workflows
  • Super Slurper
  • Workers Builds
创建订阅:
bash
npx wrangler queues subscription create my-queue \
  --source r2 \
  --events bucket.created,object.uploaded
事件结构:
typescript
interface CloudflareEvent {
  type: string;           // 'r2.bucket.created', 'kv.namespace.created'
  source: string;         // 'r2', 'kv', 'ai', etc.
  payload: any;           // Event-specific data
  metadata: {
    accountId: string;
    timestamp: string;
  };
}
消费者示例:
typescript
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const message of batch.messages) {
      const event = message.body as CloudflareEvent;

      switch (event.type) {
        case 'r2.bucket.created':
          console.log('New R2 bucket:', event.payload.bucketName);
          await notifyAdmin(event.payload);
          break;

        case 'r2.object.uploaded':
          console.log('File uploaded:', event.payload.key);
          await processNewFile(event.payload.key);
          break;

        case 'kv.namespace.created':
          console.log('New KV namespace:', event.payload.namespaceId);
          break;

        case 'ai.inference.completed':
          console.log('AI inference done:', event.payload.modelId);
          break;
      }

      message.ack();
    }
  }
};
适用场景:
  • 构建由R2上传触发的自定义工作流
  • 监控基础设施变更(新的KV命名空间、存储桶)
  • 跟踪AI推理任务
  • 审计账户活动
  • 无需自定义Webhook的事件驱动架构

Consumer API

消费者API

typescript
export default {
  async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext): Promise<void> {
    for (const message of batch.messages) {
      // message.id - unique UUID
      // message.timestamp - Date when sent
      // message.body - your content
      // message.attempts - retry count (starts at 1)

      await processMessage(message.body);
      message.ack(); // Explicit ack (critical for non-idempotent ops)
    }
  }
};

// Retry with exponential backoff
message.retry({ delaySeconds: Math.min(60 * Math.pow(2, message.attempts - 1), 3600) });

// Batch methods
batch.ackAll();   // Ack all messages
batch.retryAll(); // Retry all messages
Critical:
  • message.ack()
    - Mark success, prevents retry even if handler fails later
  • Use explicit ack for non-idempotent operations (DB writes, API calls, payments)
  • Implicit ack - If handler returns successfully without calling ack(), all messages auto-acknowledged
  • Ordering not guaranteed - Don't assume FIFO message order

typescript
export default {
  async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext): Promise<void> {
    for (const message of batch.messages) {
      // message.id - unique UUID
      // message.timestamp - Date when sent
      // message.body - your content
      // message.attempts - retry count (starts at 1)

      await processMessage(message.body);
      message.ack(); // Explicit ack (critical for non-idempotent ops)
    }
  }
};

// Retry with exponential backoff
message.retry({ delaySeconds: Math.min(60 * Math.pow(2, message.attempts - 1), 3600) });

// Batch methods
batch.ackAll();   // Ack all messages
batch.retryAll(); // Retry all messages
关键注意事项:
  • message.ack()
    - 标记消息处理成功,即使后续处理程序失败也会阻止重试
  • 对非幂等操作使用显式确认(数据库写入、API调用、支付操作)
  • 隐式确认 - 如果处理程序成功返回但未调用ack(),所有消息会自动被确认
  • 不保证消息顺序 - 不要假设消息按FIFO顺序处理

Critical Consumer Patterns

关键消费者模式

Explicit Acknowledgement (Non-Idempotent Operations)

显式确认(非幂等操作)

ALWAYS use explicit ack() for: Database writes, API calls, financial transactions
typescript
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const message of batch.messages) {
      try {
        await env.DB.prepare('INSERT INTO orders (id, amount) VALUES (?, ?)')
          .bind(message.body.orderId, message.body.amount).run();
        message.ack(); // Only ack on success
      } catch (error) {
        console.error(`Failed ${message.id}:`, error);
        // Don't ack - will retry
      }
    }
  }
};
Why? Prevents duplicate writes if one message in batch fails. Failed messages retry independently.

对于以下操作务必使用显式ack():数据库写入、API调用、金融交易
typescript
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const message of batch.messages) {
      try {
        await env.DB.prepare('INSERT INTO orders (id, amount) VALUES (?, ?)')
          .bind(message.body.orderId, message.body.amount).run();
        message.ack(); // Only ack on success
      } catch (error) {
        console.error(`Failed ${message.id}:`, error);
        // Don't ack - will retry
      }
    }
  }
};
原因:避免批量处理中某条消息失败时导致重复写入。失败的消息会独立重试。

Exponential Backoff for Rate-Limited APIs

针对速率限制API的指数退避

typescript
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const message of batch.messages) {
      try {
        await fetch('https://api.example.com/process', {
          method: 'POST',
          body: JSON.stringify(message.body),
        });
        message.ack();
      } catch (error) {
        if (error.status === 429) {
          const delaySeconds = Math.min(60 * Math.pow(2, message.attempts - 1), 3600);
          message.retry({ delaySeconds });
        } else {
          message.retry();
        }
      }
    }
  }
};

typescript
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const message of batch.messages) {
      try {
        await fetch('https://api.example.com/process', {
          method: 'POST',
          body: JSON.stringify(message.body),
        });
        message.ack();
      } catch (error) {
        if (error.status === 429) {
          const delaySeconds = Math.min(60 * Math.pow(2, message.attempts - 1), 3600);
          message.retry({ delaySeconds });
        } else {
          message.retry();
        }
      }
    }
  }
};

Dead Letter Queue (DLQ) - CRITICAL for Production

死信队列(DLQ)- 生产环境必备

⚠️ Without DLQ, failed messages are DELETED PERMANENTLY after max_retries
bash
npx wrangler queues create my-dlq
wrangler.jsonc:
jsonc
{
  "queues": {
    "consumers": [{
      "queue": "my-queue",
      "max_retries": 3,
      "dead_letter_queue": "my-dlq"  // Messages go here after 3 failed retries
    }]
  }
}
DLQ Consumer:
typescript
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const message of batch.messages) {
      console.error('PERMANENTLY FAILED:', message.id, message.body);
      await env.DB.prepare('INSERT INTO failed_messages (id, body) VALUES (?, ?)')
        .bind(message.id, JSON.stringify(message.body)).run();
      message.ack(); // Remove from DLQ
    }
  }
};

⚠️ 若未配置DLQ,失败消息在达到最大重试次数后会被永久删除
bash
npx wrangler queues create my-dlq
wrangler.jsonc:
jsonc
{
  "queues": {
    "consumers": [{
      "queue": "my-queue",
      "max_retries": 3,
      "dead_letter_queue": "my-dlq"  // Messages go here after 3 failed retries
    }]
  }
}
DLQ消费者:
typescript
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const message of batch.messages) {
      console.error('PERMANENTLY FAILED:', message.id, message.body);
      await env.DB.prepare('INSERT INTO failed_messages (id, body) VALUES (?, ?)')
        .bind(message.id, JSON.stringify(message.body)).run();
      message.ack(); // Remove from DLQ
    }
  }
};

Known Issues Prevention

已知问题预防

This skill prevents 13 documented issues:
本技能可预防13种已记录的问题:

Issue #1: Multiple Dev Commands - Queues Don't Flow Between Processes

问题#1:多开发命令 - 队列消息无法在进程间流转

Error: Queue messages sent in one
wrangler dev
process don't appear in another
wrangler dev
consumer process Source: GitHub Issue #9795
Why It Happens: The virtual queue used by wrangler is in-process memory. Separate dev processes cannot share the queue state.
Prevention:
bash
undefined
错误:在一个
wrangler dev
进程中发送的队列消息不会出现在另一个
wrangler dev
消费者进程中 来源GitHub Issue #9795
原因:wrangler使用的虚拟队列存储在进程内存中。独立的开发进程无法共享队列状态。
预防方案:
bash
undefined

❌ Don't run producer and consumer as separate processes

❌ Don't run producer and consumer as separate processes

Terminal 1: wrangler dev (producer)

Terminal 1: wrangler dev (producer)

Terminal 2: wrangler dev (consumer) # Won't receive messages!

Terminal 2: wrangler dev (consumer) # Won't receive messages!

✅ Option 1: Run both in single dev command

✅ Option 1: Run both in single dev command

wrangler dev -c producer/wrangler.jsonc -c consumer/wrangler.jsonc
wrangler dev -c producer/wrangler.jsonc -c consumer/wrangler.jsonc

✅ Option 2: Use Vite plugin with auxiliaryWorkers

✅ Option 2: Use Vite plugin with auxiliaryWorkers

vite.config.ts:

vite.config.ts:

export default defineConfig({ plugins: [ cloudflare({ auxiliaryWorkers: ['./consumer/wrangler.jsonc'] }) ] })

---
export default defineConfig({ plugins: [ cloudflare({ auxiliaryWorkers: ['./consumer/wrangler.jsonc'] }) ] })

---

Issue #2: Queue Producer Binding Causes 500 Errors with Remote Dev

问题#2:队列生产者绑定导致远程开发模式下出现500错误

Error: All routes return 500 Internal Server Error when using
wrangler dev --remote
with queue bindings Source: GitHub Issue #9642
Why It Happens: Queues are not yet supported in
wrangler dev --remote
mode. Even routes that don't use the queue binding fail.
Prevention:
jsonc
// When using remote dev, temporarily comment out queue bindings
{
  "queues": {
    // "producers": [{ "queue": "my-queue", "binding": "MY_QUEUE" }]
  }
}

// Or use local dev instead
// wrangler dev (without --remote)

错误:当使用
wrangler dev --remote
并配置队列绑定时,所有路由都返回500内部服务器错误 来源GitHub Issue #9642
原因:队列目前不支持
wrangler dev --remote
模式。即使是不使用队列绑定的路由也会失败。
预防方案:
jsonc
// When using remote dev, temporarily comment out queue bindings
{
  "queues": {
    // "producers": [{ "queue": "my-queue", "binding": "MY_QUEUE" }]
  }
}

// Or use local dev instead
// wrangler dev (without --remote)

Issue #3: D1 Remote Breaks When Queue Remote is Set

问题#3:配置队列远程绑定后D1远程绑定失效

Error: D1 remote binding stops working when
remote: true
is set on queue producer binding Source: GitHub Issue #11106
Why It Happens: Binding conflict issue affecting mixed local/remote development.
Prevention:
jsonc
// ❌ Don't mix D1 remote with queue remote
{
  "d1_databases": [{
    "binding": "DB",
    "database_id": "...",
    "remote": true
  }],
  "queues": {
    "producers": [{
      "binding": "QUEUE",
      "queue": "my-queue",
      "remote": true  // ❌ Breaks D1 remote
    }]
  }
}

// ✅ Avoid remote on queues when using D1 remote
{
  "d1_databases": [{ "binding": "DB", "remote": true }],
  "queues": {
    "producers": [{ "binding": "QUEUE", "queue": "my-queue" }]
  }
}
Status: No workaround yet. Track issue for updates.

错误:当队列生产者绑定设置
remote: true
时,D1远程绑定停止工作 来源GitHub Issue #11106
原因:混合本地/远程开发时的绑定冲突问题。
预防方案:
jsonc
// ❌ Don't mix D1 remote with queue remote
{
  "d1_databases": [{
    "binding": "DB",
    "database_id": "...",
    "remote": true
  }],
  "queues": {
    "producers": [{
      "binding": "QUEUE",
      "queue": "my-queue",
      "remote": true  // ❌ Breaks D1 remote
    }]
  }
}

// ✅ Avoid remote on queues when using D1 remote
{
  "d1_databases": [{ "binding": "DB", "remote": true }],
  "queues": {
    "producers": [{ "binding": "QUEUE", "queue": "my-queue" }]
  }
}
状态:暂无解决方案,跟踪该问题获取更新。

Issue #4: Mixed Local/Remote Bindings - Queue Consumer Missing

问题#4:混合本地/远程绑定 - 队列消费者缺失

Error: Queue consumer binding does not appear when mixing local queues with remote AI/Vectorize bindings Source: GitHub Issue #9887
Why It Happens: Wrangler doesn't support mixed local/remote bindings in the same worker.
Prevention:
jsonc
// ❌ Don't mix local queues with remote AI
{
  "queues": {
    "consumers": [{ "queue": "my-queue" }]
  },
  "ai": {
    "binding": "AI",
    "experimental_remote": true  // ❌ Breaks queue consumer
  }
}

// ✅ Option 1: All local (no remote bindings)
wrangler dev

// ✅ Option 2: Separate workers for queues vs AI
// Worker 1: Queue processing (local)
// Worker 2: AI operations (remote)

错误:当混合本地队列与远程AI/Vectorize绑定时,队列消费者绑定不显示 来源GitHub Issue #9887
原因:Wrangler不支持在同一个Worker中混合本地/远程绑定。
预防方案:
jsonc
// ❌ Don't mix local queues with remote AI
{
  "queues": {
    "consumers": [{ "queue": "my-queue" }]
  },
  "ai": {
    "binding": "AI",
    "experimental_remote": true  // ❌ Breaks queue consumer
  }
}

// ✅ Option 1: All local (no remote bindings)
wrangler dev

// ✅ Option 2: Separate workers for queues vs AI
// Worker 1: Queue processing (local)
// Worker 2: AI operations (remote)

Issue #5: http_pull Type Prevents Worker Consumer Execution

问题#5:http_pull类型阻止Worker消费者执行

Error: Queue consumer with
type: "http_pull"
doesn't execute in production Source: GitHub Issue #6619
Why It Happens:
http_pull
is for external HTTP-based consumers, not Worker-based consumers.
Prevention:
jsonc
// ❌ Don't use type: "http_pull" for Worker consumers
{
  "queues": {
    "consumers": [{
      "queue": "my-queue",
      "type": "http_pull",  // ❌ Wrong for Workers
      "max_batch_size": 10
    }]
  }
}

// ✅ Omit type field for push-based Worker consumers
{
  "queues": {
    "consumers": [{
      "queue": "my-queue",
      "max_batch_size": 10
      // No "type" field - defaults to Worker consumer
    }]
  }
}

错误:配置
type: "http_pull"
的队列消费者在生产环境中无法执行 来源GitHub Issue #6619
原因
http_pull
适用于外部基于HTTP的消费者,而非基于Worker的消费者。
预防方案:
jsonc
// ❌ Don't use type: "http_pull" for Worker consumers
{
  "queues": {
    "consumers": [{
      "queue": "my-queue",
      "type": "http_pull",  // ❌ Wrong for Workers
      "max_batch_size": 10
    }]
  }
}

// ✅ Omit type field for push-based Worker consumers
{
  "queues": {
    "consumers": [{
      "queue": "my-queue",
      "max_batch_size": 10
      // No "type" field - defaults to Worker consumer
    }]
  }
}

Breaking Changes & Deprecations

重大变更与废弃

delivery_delay in Producer Config (Upcoming Removal)

生产者配置中的delivery_delay(即将移除)

Warning: The
delivery_delay
parameter in producer bindings will be removed in a future wrangler version.
jsonc
// ❌ Will be removed - don't use
{
  "queues": {
    "producers": [{
      "binding": "MY_QUEUE",
      "queue": "my-queue",
      "delivery_delay": 300  // ❌ Don't use this
    }]
  }
}
Migration: Use per-message delay instead:
typescript
// ✅ Correct approach - per-message delay
await env.MY_QUEUE.send({ data }, { delaySeconds: 300 });
Why: Workers should not affect queue-level settings. With multiple producers, the setting from the last-deployed producer wins, causing unpredictable behavior.

警告:生产者绑定中的
delivery_delay
参数将在未来的wrangler版本中移除。
jsonc
// ❌ Will be removed - don't use
{
  "queues": {
    "producers": [{
      "binding": "MY_QUEUE",
      "queue": "my-queue",
      "delivery_delay": 300  // ❌ Don't use this
    }]
  }
}
迁移方案:改为使用每条消息的延迟设置:
typescript
// ✅ Correct approach - per-message delay
await env.MY_QUEUE.send({ data }, { delaySeconds: 300 });
原因:Worker不应影响队列级别的设置。当存在多个生产者时,最后部署的生产者的设置会生效,导致不可预测的行为。

Community Tips

社区技巧

Note: These tips come from community discussions and GitHub issues. Verify against your wrangler version.
注意:这些技巧来自社区讨论和GitHub问题。请根据你的wrangler版本进行验证。

Tip: max_batch_timeout May Break Local Development

技巧:max_batch_timeout可能会破坏本地开发

Source: GitHub Issue #6619 Confidence: MEDIUM Applies to: Local development with
wrangler dev
If your queue consumer doesn't execute locally, try removing
max_batch_timeout
:
jsonc
{
  "queues": {
    "consumers": [{
      "queue": "my-queue",
      "max_batch_size": 10
      // Remove max_batch_timeout for local dev
    }]
  }
}
This appears to be version-specific and may not affect all setups.

来源GitHub Issue #6619 可信度:中等 适用范围:使用
wrangler dev
的本地开发
如果你的队列消费者在本地无法执行,尝试移除
max_batch_timeout
jsonc
{
  "queues": {
    "consumers": [{
      "queue": "my-queue",
      "max_batch_size": 10
      // Remove max_batch_timeout for local dev
    }]
  }
}
这似乎是版本特定的问题,可能不会影响所有设置。

Tip: Queue Name Not Available on Producer Bindings

技巧:生产者绑定无法获取队列名称

Source: GitHub Issue #10131 Confidence: HIGH Applies to: Multi-environment deployments (staging, PR previews, tenant-specific queues)
Queue names are only available via
batch.queue
in consumer handlers, not on producer bindings. This creates issues with environment-specific queue names like
email-queue-staging
or
email-queue-pr-123
.
Current Limitation:
typescript
// ❌ Can't access queue name from producer binding
const queueName = env.MY_QUEUE.name; // Doesn't exist!

// ❌ Must hardcode or normalize in consumer
switch (batch.queue) {
  case 'email-queue':           // What about email-queue-staging?
  case 'email-queue-staging':   // Must handle all variants
  case 'email-queue-pr-123':    // Dynamic env names break this
}
Workaround:
typescript
// In consumer: Normalize queue name
function normalizeQueueName(queueName: string): string {
  return queueName.replace(/-staging|-pr-\d+|-dev/g, '');
}

switch (normalizeQueueName(batch.queue)) {
  case 'email-queue':
    // Handle all email-queue-* variants
}
Status: Feature request tracked internally: MQ-923

来源GitHub Issue #10131 可信度:高 适用范围:多环境部署( staging、PR预览、租户特定队列)
队列名称仅能在消费者处理程序中通过
batch.queue
获取,无法从生产者绑定中获取。这会给环境特定的队列名称(如
email-queue-staging
email-queue-pr-123
)带来问题。
当前限制:
typescript
// ❌ Can't access queue name from producer binding
const queueName = env.MY_QUEUE.name; // Doesn't exist!

// ❌ Must hardcode or normalize in consumer
switch (batch.queue) {
  case 'email-queue':           // What about email-queue-staging?
  case 'email-queue-staging':   // Must handle all variants
  case 'email-queue-pr-123':    // Dynamic env names break this
}
解决方案:
typescript
// In consumer: Normalize queue name
function normalizeQueueName(queueName: string): string {
  return queueName.replace(/-staging|-pr-\d+|-dev/g, '');
}

switch (normalizeQueueName(batch.queue)) {
  case 'email-queue':
    // Handle all email-queue-* variants
}
状态:功能请求已在内部跟踪:MQ-923

Consumer Configuration

消费者配置

jsonc
{
  "queues": {
    "consumers": [{
      "queue": "my-queue",
      "max_batch_size": 100,           // 1-100 (default: 10)
      "max_batch_timeout": 30,         // 0-60s (default: 5s)
      "max_retries": 5,                // 0-100 (default: 3)
      "retry_delay": 300,              // Seconds (default: 0)
      "max_concurrency": 10,           // 1-250 (default: auto-scale)
      "dead_letter_queue": "my-dlq"    // REQUIRED for production
    }]
  }
}
Critical Settings:
  • Batching - Consumer called when EITHER condition met (max_batch_size OR max_batch_timeout)
  • max_retries - After exhausted: with DLQ → sent to DLQ, without DLQ → DELETED PERMANENTLY
  • max_concurrency - Only set if upstream has rate limits or connection limits. Otherwise leave unset for auto-scaling (up to 250 concurrent invocations)
  • DLQ - Create separately:
    npx wrangler queues create my-dlq

jsonc
{
  "queues": {
    "consumers": [{
      "queue": "my-queue",
      "max_batch_size": 100,           // 1-100 (default: 10)
      "max_batch_timeout": 30,         // 0-60s (default: 5s)
      "max_retries": 5,                // 0-100 (default: 3)
      "retry_delay": 300,              // Seconds (default: 0)
      "max_concurrency": 10,           // 1-250 (default: auto-scale)
      "dead_letter_queue": "my-dlq"    // REQUIRED for production
    }]
  }
}
关键设置:
  • 批量处理 - 当满足以下任一条件时调用消费者(达到max_batch_size或max_batch_timeout)
  • max_retries - 达到最大重试次数后:配置了DLQ则发送到DLQ,未配置则永久删除
  • max_concurrency - 仅当上游存在速率限制或连接限制时设置。否则留空以启用自动扩展(最多250个并发调用)
  • DLQ - 需要单独创建:
    npx wrangler queues create my-dlq

Wrangler Commands

Wrangler命令

bash
undefined
bash
undefined

Create queue

Create queue

npx wrangler queues create my-queue npx wrangler queues create my-queue --message-retention-period-secs 1209600 # 14 days
npx wrangler queues create my-queue npx wrangler queues create my-queue --message-retention-period-secs 1209600 # 14 days

Manage queues

Manage queues

npx wrangler queues list npx wrangler queues info my-queue npx wrangler queues delete my-queue # ⚠️ Deletes ALL messages!
npx wrangler queues list npx wrangler queues info my-queue npx wrangler queues delete my-queue # ⚠️ Deletes ALL messages!

Pause/Purge (March 2025 - NEW)

Pause/Purge (March 2025 - NEW)

npx wrangler queues pause-delivery my-queue # Pause processing, keep receiving npx wrangler queues resume-delivery my-queue npx wrangler queues purge my-queue # ⚠️ Permanently deletes all messages!
npx wrangler queues pause-delivery my-queue # Pause processing, keep receiving npx wrangler queues resume-delivery my-queue npx wrangler queues purge my-queue # ⚠️ Permanently deletes all messages!

Consumer management

Consumer management

npx wrangler queues consumer add my-queue my-consumer-worker
--batch-size 50 --batch-timeout 10 --message-retries 5 npx wrangler queues consumer remove my-queue my-consumer-worker

---
npx wrangler queues consumer add my-queue my-consumer-worker
--batch-size 50 --batch-timeout 10 --message-retries 5 npx wrangler queues consumer remove my-queue my-consumer-worker

---

Limits & Quotas

限制与配额

FeatureLimit
Queues per account10,000
Message size128 KB (includes ~100 bytes metadata)
Message retries100 max
Batch size1-100 messages
Batch timeout0-60 seconds
Messages per sendBatch100 (or 256 KB total)
Queue throughput5,000 messages/second per queue
Message retention4 days (default), 14 days (max)
Queue backlog size25 GB per queue
Concurrent consumers250 (push-based, auto-scale)
Consumer duration15 minutes (wall clock)
Consumer CPU time30 seconds (default), 5 minutes (max)
Visibility timeout12 hours (pull consumers)
Message delay12 hours (max)
API rate limit1200 requests / 5 minutes

功能限制
每个账户的队列数量10,000
消息大小128 KB(包含约100字节元数据)
消息重试次数最多100次
批量大小1-100条消息
批量超时0-60秒
sendBatch的消息数量100条(或总大小256 KB)
队列吞吐量每个队列每秒5000条消息
消息保留时长4天(默认),最长14天
队列积压大小每个队列25 GB
并发消费者数量250个(推送式,自动扩展)
消费者执行时长15分钟(挂钟时间)
消费者CPU时间30秒(默认),最长5分钟
可见性超时12小时(拉取式消费者)
消息延迟最长12小时
API速率限制每5分钟1200次请求

Pricing

定价

Requires Workers Paid plan ($5/month)
Operations Pricing:
  • First 1,000,000 operations/month: FREE
  • After that: $0.40 per million operations
What counts as an operation:
  • Each 64 KB chunk written, read, or deleted
  • Messages >64 KB count as multiple operations:
    • 65 KB message = 2 operations
    • 127 KB message = 2 operations
    • 128 KB message = 2 operations
Typical message lifecycle:
  • 1 write + 1 read + 1 delete = 3 operations
Retries:
  • Each retry = additional read operation
  • Message retried 3 times = 1 write + 4 reads + 1 delete = 6 operations
Dead Letter Queue:
  • Writing to DLQ = additional write operation
Cost examples:
  • 1M messages/month (no retries): ((1M × 3) - 1M) / 1M × $0.40 = $0.80
  • 10M messages/month: ((10M × 3) - 1M) / 1M × $0.40 = $11.60
  • 100M messages/month: ((100M × 3) - 1M) / 1M × $0.40 = $119.60

需要Workers付费计划(每月5美元)
操作定价:
  • 每月前1,000,000次操作:免费
  • 超出部分:每百万次操作0.40美元
计入操作的行为:
  • 每64 KB的写入、读取或删除操作
  • 超过64 KB的消息会被计为多次操作:
    • 65 KB消息 = 2次操作
    • 127 KB消息 = 2次操作
    • 128 KB消息 = 2次操作
典型消息生命周期:
  • 1次写入 + 1次读取 + 1次删除 = 3次操作
重试:
  • 每次重试 = 额外的1次读取操作
  • 消息被重试3次 = 1次写入 + 4次读取 + 1次删除 = 6次操作
死信队列:
  • 写入DLQ = 额外的1次写入操作
成本示例:
  • 每月100万条消息(无重试):((1M × 3) - 1M) / 1M × $0.40 = 0.80美元
  • 每月1000万条消息:((10M × 3) - 1M) / 1M × $0.40 = 11.60美元
  • 每月1亿条消息:((100M × 3) - 1M) / 1M × $0.40 = 119.60美元

Error Handling

错误处理

Common Errors

常见错误

1. Message Too Large

1. 消息过大

typescript
// ❌ Bad: Message >128 KB
await env.MY_QUEUE.send({
  data: largeArray, // >128 KB
});

// ✅ Good: Check size before sending
const message = { data: largeArray };
const size = new TextEncoder().encode(JSON.stringify(message)).length;

if (size > 128000) {
  // Store in R2, send reference
  const key = `messages/${crypto.randomUUID()}.json`;
  await env.MY_BUCKET.put(key, JSON.stringify(message));
  await env.MY_QUEUE.send({ type: 'large-message', r2Key: key });
} else {
  await env.MY_QUEUE.send(message);
}

typescript
// ❌ Bad: Message >128 KB
await env.MY_QUEUE.send({
  data: largeArray, // >128 KB
});

// ✅ Good: Check size before sending
const message = { data: largeArray };
const size = new TextEncoder().encode(JSON.stringify(message)).length;

if (size > 128000) {
  // Store in R2, send reference
  const key = `messages/${crypto.randomUUID()}.json`;
  await env.MY_BUCKET.put(key, JSON.stringify(message));
  await env.MY_QUEUE.send({ type: 'large-message', r2Key: key });
} else {
  await env.MY_QUEUE.send(message);
}

2. Throughput Exceeded

2. 吞吐量超限

typescript
// ❌ Bad: Exceeding 5000 msg/s per queue
for (let i = 0; i < 10000; i++) {
  await env.MY_QUEUE.send({ id: i }); // Too fast!
}

// ✅ Good: Use sendBatch
const messages = Array.from({ length: 10000 }, (_, i) => ({
  body: { id: i },
}));

// Send in batches of 100
for (let i = 0; i < messages.length; i += 100) {
  await env.MY_QUEUE.sendBatch(messages.slice(i, i + 100));
}

// ✅ Even better: Rate limit with delay
for (let i = 0; i < messages.length; i += 100) {
  await env.MY_QUEUE.sendBatch(messages.slice(i, i + 100));
  if (i + 100 < messages.length) {
    await new Promise(resolve => setTimeout(resolve, 100)); // 100ms delay
  }
}

typescript
// ❌ Bad: Exceeding 5000 msg/s per queue
for (let i = 0; i < 10000; i++) {
  await env.MY_QUEUE.send({ id: i }); // Too fast!
}

// ✅ Good: Use sendBatch
const messages = Array.from({ length: 10000 }, (_, i) => ({
  body: { id: i },
}));

// Send in batches of 100
for (let i = 0; i < messages.length; i += 100) {
  await env.MY_QUEUE.sendBatch(messages.slice(i, i + 100));
}

// ✅ Even better: Rate limit with delay
for (let i = 0; i < messages.length; i += 100) {
  await env.MY_QUEUE.sendBatch(messages.slice(i, i + 100));
  if (i + 100 < messages.length) {
    await new Promise(resolve => setTimeout(resolve, 100)); // 100ms delay
  }
}

3. Consumer Timeout

3. 消费者超时

typescript
// ❌ Bad: Long processing without CPU limit increase
export default {
  async queue(batch: MessageBatch): Promise<void> {
    for (const message of batch.messages) {
      await processForMinutes(message.body); // CPU timeout!
    }
  },
};

// ✅ Good: Increase CPU limit in wrangler.jsonc
wrangler.jsonc:
jsonc
{
  "limits": {
    "cpu_ms": 300000  // 5 minutes (max allowed)
  }
}

typescript
// ❌ Bad: Long processing without CPU limit increase
export default {
  async queue(batch: MessageBatch): Promise<void> {
    for (const message of batch.messages) {
      await processForMinutes(message.body); // CPU timeout!
    }
  },
};

// ✅ Good: Increase CPU limit in wrangler.jsonc
wrangler.jsonc:
jsonc
{
  "limits": {
    "cpu_ms": 300000  // 5 minutes (max allowed)
  }
}

4. Backlog Growing

4. 消息积压增长

typescript
// Issue: Consumer too slow, backlog growing

// ✅ Solution 1: Increase batch size
{
  "queues": {
    "consumers": [{
      "queue": "my-queue",
      "max_batch_size": 100  // Process more per invocation
    }]
  }
}

// ✅ Solution 2: Let concurrency auto-scale (don't set max_concurrency)

// ✅ Solution 3: Optimize consumer code
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    // Process in parallel
    await Promise.all(
      batch.messages.map(async (message) => {
        await process(message.body);
        message.ack();
      })
    );
  },
};

typescript
// Issue: Consumer too slow, backlog growing

// ✅ Solution 1: Increase batch size
{
  "queues": {
    "consumers": [{
      "queue": "my-queue",
      "max_batch_size": 100  // Process more per invocation
    }]
  }
}

// ✅ Solution 2: Let concurrency auto-scale (don't set max_concurrency)

// ✅ Solution 3: Optimize consumer code
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    // Process in parallel
    await Promise.all(
      batch.messages.map(async (message) => {
        await process(message.body);
        message.ack();
      })
    );
  },
};

Critical Rules

关键规则

Always:
  • ✅ Configure DLQ for production (
    dead_letter_queue
    in consumer config)
  • ✅ Use explicit
    message.ack()
    for non-idempotent ops (DB writes, API calls)
  • ✅ Validate message size <128 KB before sending
  • ✅ Use
    sendBatch()
    for multiple messages (more efficient)
  • ✅ Implement exponential backoff:
    60 * Math.pow(2, message.attempts - 1)
  • ✅ Let concurrency auto-scale (don't set
    max_concurrency
    unless upstream has rate limits)
Never:
  • ❌ Never assume FIFO ordering - not guaranteed
  • ❌ Never rely on implicit ack for non-idempotent ops - use explicit
    ack()
  • ❌ Never send messages >128 KB - will fail (store in R2 instead)
  • ❌ Never skip DLQ in production - failed messages DELETED PERMANENTLY without DLQ
  • ❌ Never exceed 5,000 msg/s per queue (push consumers) or rate limits apply
  • ❌ Never process messages synchronously - use
    Promise.all()
    for parallelism

务必遵守:
  • ✅ 为生产环境配置DLQ(消费者配置中的
    dead_letter_queue
  • ✅ 对非幂等操作(数据库写入、API调用)使用显式
    message.ack()
  • ✅ 在发送前验证消息大小小于128 KB
  • ✅ 对多条消息使用
    sendBatch()
    (更高效)
  • ✅ 实现指数退避:
    60 * Math.pow(2, message.attempts - 1)
  • ✅ 启用自动扩展并发(除非上游有速率限制,否则不要设置
    max_concurrency
切勿:
  • ❌ 不要假设消息按FIFO顺序处理 - 不保证顺序
  • ❌ 对非幂等操作不要依赖隐式确认 - 使用显式
    ack()
  • ❌ 不要发送超过128 KB的消息 - 会发送失败(改为存储在R2中)
  • ❌ 生产环境中不要跳过DLQ配置 - 未配置DLQ的失败消息会被永久删除
  • ❌ 不要超过每个队列每秒5000条消息的限制(推送式消费者),否则会触发速率限制
  • ❌ 不要同步处理消息 - 使用
    Promise.all()
    实现并行处理

Troubleshooting

故障排查

Issue: Messages not being delivered to consumer

问题:消息未投递到消费者

Possible causes:
  1. Consumer not deployed
  2. Wrong queue name in wrangler.jsonc
  3. Delivery paused
  4. Consumer throwing errors
Solution:
bash
undefined
可能原因:
  1. 消费者未部署
  2. wrangler.jsonc中的队列名称错误
  3. 消息投递已暂停
  4. 消费者抛出错误
解决方案:
bash
undefined

Check queue info

Check queue info

npx wrangler queues info my-queue
npx wrangler queues info my-queue

Check if delivery paused

Check if delivery paused

npx wrangler queues resume-delivery my-queue
npx wrangler queues resume-delivery my-queue

Check consumer logs

Check consumer logs

npx wrangler tail my-consumer

---
npx wrangler tail my-consumer

---

Issue: Entire batch retried when one message fails

问题:一条消息失败导致整个批次重试

Cause: Using implicit acknowledgement with non-idempotent operations
Solution: Use explicit ack()
typescript
// ✅ Explicit ack
for (const message of batch.messages) {
  try {
    await dbWrite(message.body);
    message.ack(); // Only ack on success
  } catch (error) {
    console.error(`Failed: ${message.id}`);
    // Don't ack - will retry
  }
}

原因:对非幂等操作使用了隐式确认
解决方案:使用显式ack()
typescript
// ✅ Explicit ack
for (const message of batch.messages) {
  try {
    await dbWrite(message.body);
    message.ack(); // Only ack on success
  } catch (error) {
    console.error(`Failed: ${message.id}`);
    // Don't ack - will retry
  }
}

Issue: Messages deleted without processing

问题:消息未被处理就被删除

Cause: No Dead Letter Queue configured
Solution:
bash
undefined
原因:未配置死信队列
解决方案:
bash
undefined

Create DLQ

Create DLQ

npx wrangler queues create my-dlq
npx wrangler queues create my-dlq

Add to consumer config

Add to consumer config


```jsonc
{
  "queues": {
    "consumers": [{
      "queue": "my-queue",
      "dead_letter_queue": "my-dlq"
    }]
  }
}


```jsonc
{
  "queues": {
    "consumers": [{
      "queue": "my-queue",
      "dead_letter_queue": "my-dlq"
    }]
  }
}

Issue: Consumer not auto-scaling

问题:消费者未自动扩展

Possible causes:
  1. max_concurrency
    set to 1
  2. Consumer returning errors (not processing)
  3. Batch processing too fast (no backlog)
Solution:
jsonc
{
  "queues": {
    "consumers": [{
      "queue": "my-queue",
      // Don't set max_concurrency - let it auto-scale
      "max_batch_size": 50  // Increase batch size instead
    }]
  }
}

可能原因:
  1. max_concurrency
    被设置为1
  2. 消费者持续返回错误(未处理消息)
  3. 批量处理速度过快(无积压)
解决方案:
jsonc
{
  "queues": {
    "consumers": [{
      "queue": "my-queue",
      // Don't set max_concurrency - let it auto-scale
      "max_batch_size": 50  // Increase batch size instead
    }]
  }
}

Related Documentation

相关文档


Last Updated: 2026-01-21 Version: 2.0.0 Changes: Added HTTP Publishing (May 2025), Event Subscriptions (Aug 2025), Known Issues Prevention (13 issues), Breaking Changes section, Community Tips. Error count: 0 → 13. Major feature additions and comprehensive issue documentation. Maintainer: Jeremy Dawes | jeremy@jezweb.net

最后更新:2026-01-21 版本:2.0.0 变更:新增HTTP发布(2025年5月)、事件订阅(2025年8月)、已知问题预防(13个问题)、重大变更章节、社区技巧。错误数量:0 → 13。新增主要功能并完善了问题文档。 维护者:Jeremy Dawes | jeremy@jezweb.net