cloudflare-queues
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCloudflare Queues
Cloudflare Queues
Status: Production Ready ✅
Last Updated: 2026-01-09
Dependencies: cloudflare-worker-base (for Worker setup)
Latest Versions: wrangler@4.58.0, @cloudflare/workers-types@4.20260109.0
Recent Updates (2025):
- April 2025: Pull consumers increased limits (5,000 msg/s per queue, up from 1,200 requests/5min)
- March 2025: Pause & Purge APIs (wrangler queues pause-delivery, queues purge)
- 2025: Customizable retention (60s to 14 days, previously fixed at 4 days)
- 2025: Increased queue limits (10,000 queues per account, up from 10)
状态:已就绪可用于生产 ✅
最后更新:2026-01-09
依赖项:cloudflare-worker-base(用于Worker设置)
最新版本:wrangler@4.58.0, @cloudflare/workers-types@4.20260109.0
2025年近期更新:
- 2025年4月:拉取式消费者提升了限制(每个队列每秒5000条消息,从之前的每5分钟1200次请求提升)
- 2025年3月:新增暂停与清空API(wrangler queues pause-delivery, queues purge)
- 2025年:可自定义消息保留时长(60秒至14天,此前固定为4天)
- 2025年:提升了队列数量限制(每个账户10000个队列,从之前的10个提升)
Quick Start (5 Minutes)
快速开始(5分钟)
bash
undefinedbash
undefined1. Create queue
1. Create queue
npx wrangler queues create my-queue
npx wrangler queues create my-queue
2. Add producer binding to wrangler.jsonc
2. Add producer binding to wrangler.jsonc
{ "queues": { "producers": [{ "binding": "MY_QUEUE", "queue": "my-queue" }] } }
{ "queues": { "producers": [{ "binding": "MY_QUEUE", "queue": "my-queue" }] } }
3. Send message from Worker
3. Send message from Worker
await env.MY_QUEUE.send({ userId: '123', action: 'process-order' });
await env.MY_QUEUE.send({ userId: '123', action: 'process-order' });
Or publish via HTTP (May 2025+) from any service
Or publish via HTTP (May 2025+) from any service
curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/my-queue/messages"
-H "Authorization: Bearer YOUR_API_TOKEN"
-d '{"messages": [{"body": {"userId": "123"}}]}'
-H "Authorization: Bearer YOUR_API_TOKEN"
-d '{"messages": [{"body": {"userId": "123"}}]}'
curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/my-queue/messages"
-H "Authorization: Bearer YOUR_API_TOKEN"
-d '{"messages": [{"body": {"userId": "123"}}]}'
-H "Authorization: Bearer YOUR_API_TOKEN"
-d '{"messages": [{"body": {"userId": "123"}}]}'
4. Add consumer binding to wrangler.jsonc
4. Add consumer binding to wrangler.jsonc
{ "queues": { "consumers": [{ "queue": "my-queue", "max_batch_size": 10 }] } }
{ "queues": { "consumers": [{ "queue": "my-queue", "max_batch_size": 10 }] } }
5. Process messages
5. Process messages
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
await processMessage(message.body);
message.ack(); // Explicit acknowledgement
}
}
};
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
await processMessage(message.body);
message.ack(); // Explicit acknowledgement
}
}
};
6. Deploy and test
6. Deploy and test
npx wrangler deploy
npx wrangler tail my-consumer
---npx wrangler deploy
npx wrangler tail my-consumer
---Producer API
生产者API
typescript
// Send single message
await env.MY_QUEUE.send({ userId: '123', action: 'send-email' });
// Send with delay (max 12 hours)
await env.MY_QUEUE.send({ action: 'reminder' }, { delaySeconds: 600 });
// Send batch (max 100 messages or 256 KB)
await env.MY_QUEUE.sendBatch([
{ body: { userId: '1' } },
{ body: { userId: '2' } },
]);Critical Limits:
- Message size: 128 KB max (including ~100 bytes metadata)
- Messages >128 KB will fail - store in R2 and send reference instead
- Batch size: 100 messages or 256 KB total
- Delay: 0-43200 seconds (12 hours max)
typescript
// Send single message
await env.MY_QUEUE.send({ userId: '123', action: 'send-email' });
// Send with delay (max 12 hours)
await env.MY_QUEUE.send({ action: 'reminder' }, { delaySeconds: 600 });
// Send batch (max 100 messages or 256 KB)
await env.MY_QUEUE.sendBatch([
{ body: { userId: '1' } },
{ body: { userId: '2' } },
]);关键限制:
- 消息大小:最大128 KB(包含约100字节元数据)
- 超过128 KB的消息会发送失败 - 请将内容存储在R2中并发送引用
- 批量大小:最多100条消息或总大小256 KB
- 延迟时长:0-43200秒(最长12小时)
HTTP Publishing (May 2025+)
HTTP发布(2025年5月起可用)
New in May 2025: Publish messages to queues via HTTP from any service or programming language.
Source: Cloudflare Changelog
Authentication: Requires Cloudflare API token with permissions.
Queues Editbash
undefined2025年5月新增:可通过HTTP从任意服务或编程语言向队列发布消息。
认证:需要拥有权限的Cloudflare API令牌。
Queues Editbash
undefinedSingle message
Single message
curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/my-queue/messages"
-H "Authorization: Bearer YOUR_API_TOKEN"
-H "Content-Type: application/json"
-d '{ "messages": [ {"body": {"userId": "123", "action": "process-order"}} ] }'
-H "Authorization: Bearer YOUR_API_TOKEN"
-H "Content-Type: application/json"
-d '{ "messages": [ {"body": {"userId": "123", "action": "process-order"}} ] }'
curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/my-queue/messages"
-H "Authorization: Bearer YOUR_API_TOKEN"
-H "Content-Type: application/json"
-d '{ "messages": [ {"body": {"userId": "123", "action": "process-order"}} ] }'
-H "Authorization: Bearer YOUR_API_TOKEN"
-H "Content-Type: application/json"
-d '{ "messages": [ {"body": {"userId": "123", "action": "process-order"}} ] }'
Batch messages
Batch messages
curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/my-queue/messages"
-H "Authorization: Bearer YOUR_API_TOKEN"
-H "Content-Type: application/json"
-d '{ "messages": [ {"body": {"userId": "1"}}, {"body": {"userId": "2"}}, {"body": {"userId": "3"}} ] }'
-H "Authorization: Bearer YOUR_API_TOKEN"
-H "Content-Type: application/json"
-d '{ "messages": [ {"body": {"userId": "1"}}, {"body": {"userId": "2"}}, {"body": {"userId": "3"}} ] }'
**Use Cases**:
- Publishing from external microservices (Node.js, Python, Go, etc.)
- Cron jobs running outside Cloudflare
- Webhook receivers
- Legacy systems integration
- Services without Cloudflare Workers SDK
---curl -X POST "https://api.cloudflare.com/client/v4/accounts/{account_id}/queues/my-queue/messages"
-H "Authorization: Bearer YOUR_API_TOKEN"
-H "Content-Type: application/json"
-d '{ "messages": [ {"body": {"userId": "1"}}, {"body": {"userId": "2"}}, {"body": {"userId": "3"}} ] }'
-H "Authorization: Bearer YOUR_API_TOKEN"
-H "Content-Type: application/json"
-d '{ "messages": [ {"body": {"userId": "1"}}, {"body": {"userId": "2"}}, {"body": {"userId": "3"}} ] }'
**适用场景**:
- 从外部微服务(Node.js、Python、Go等)发布消息
- 在Cloudflare外部运行的定时任务
- Webhook接收器
- 遗留系统集成
- 没有Cloudflare Workers SDK的服务
---Event Subscriptions (August 2025+)
事件订阅(2025年8月起可用)
New in August 2025: Subscribe to events from Cloudflare services and consume via Queues.
Source: Cloudflare Changelog
Supported Event Sources:
- R2 (bucket.created, object.uploaded, object.deleted, etc.)
- Workers KV
- Workers AI
- Vectorize
- Workflows
- Super Slurper
- Workers Builds
Create Subscription:
bash
npx wrangler queues subscription create my-queue \
--source r2 \
--events bucket.created,object.uploadedEvent Structure:
typescript
interface CloudflareEvent {
type: string; // 'r2.bucket.created', 'kv.namespace.created'
source: string; // 'r2', 'kv', 'ai', etc.
payload: any; // Event-specific data
metadata: {
accountId: string;
timestamp: string;
};
}Consumer Example:
typescript
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
const event = message.body as CloudflareEvent;
switch (event.type) {
case 'r2.bucket.created':
console.log('New R2 bucket:', event.payload.bucketName);
await notifyAdmin(event.payload);
break;
case 'r2.object.uploaded':
console.log('File uploaded:', event.payload.key);
await processNewFile(event.payload.key);
break;
case 'kv.namespace.created':
console.log('New KV namespace:', event.payload.namespaceId);
break;
case 'ai.inference.completed':
console.log('AI inference done:', event.payload.modelId);
break;
}
message.ack();
}
}
};Use Cases:
- Build custom workflows triggered by R2 uploads
- Monitor infrastructure changes (new KV namespaces, buckets)
- Track AI inference jobs
- Audit account activity
- Event-driven architectures without custom webhooks
2025年8月新增:订阅Cloudflare服务的事件并通过队列消费。
支持的事件源:
- R2(bucket.created、object.uploaded、object.deleted等)
- Workers KV
- Workers AI
- Vectorize
- Workflows
- Super Slurper
- Workers Builds
创建订阅:
bash
npx wrangler queues subscription create my-queue \
--source r2 \
--events bucket.created,object.uploaded事件结构:
typescript
interface CloudflareEvent {
type: string; // 'r2.bucket.created', 'kv.namespace.created'
source: string; // 'r2', 'kv', 'ai', etc.
payload: any; // Event-specific data
metadata: {
accountId: string;
timestamp: string;
};
}消费者示例:
typescript
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
const event = message.body as CloudflareEvent;
switch (event.type) {
case 'r2.bucket.created':
console.log('New R2 bucket:', event.payload.bucketName);
await notifyAdmin(event.payload);
break;
case 'r2.object.uploaded':
console.log('File uploaded:', event.payload.key);
await processNewFile(event.payload.key);
break;
case 'kv.namespace.created':
console.log('New KV namespace:', event.payload.namespaceId);
break;
case 'ai.inference.completed':
console.log('AI inference done:', event.payload.modelId);
break;
}
message.ack();
}
}
};适用场景:
- 构建由R2上传触发的自定义工作流
- 监控基础设施变更(新的KV命名空间、存储桶)
- 跟踪AI推理任务
- 审计账户活动
- 无需自定义Webhook的事件驱动架构
Consumer API
消费者API
typescript
export default {
async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext): Promise<void> {
for (const message of batch.messages) {
// message.id - unique UUID
// message.timestamp - Date when sent
// message.body - your content
// message.attempts - retry count (starts at 1)
await processMessage(message.body);
message.ack(); // Explicit ack (critical for non-idempotent ops)
}
}
};
// Retry with exponential backoff
message.retry({ delaySeconds: Math.min(60 * Math.pow(2, message.attempts - 1), 3600) });
// Batch methods
batch.ackAll(); // Ack all messages
batch.retryAll(); // Retry all messagesCritical:
- - Mark success, prevents retry even if handler fails later
message.ack() - Use explicit ack for non-idempotent operations (DB writes, API calls, payments)
- Implicit ack - If handler returns successfully without calling ack(), all messages auto-acknowledged
- Ordering not guaranteed - Don't assume FIFO message order
typescript
export default {
async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext): Promise<void> {
for (const message of batch.messages) {
// message.id - unique UUID
// message.timestamp - Date when sent
// message.body - your content
// message.attempts - retry count (starts at 1)
await processMessage(message.body);
message.ack(); // Explicit ack (critical for non-idempotent ops)
}
}
};
// Retry with exponential backoff
message.retry({ delaySeconds: Math.min(60 * Math.pow(2, message.attempts - 1), 3600) });
// Batch methods
batch.ackAll(); // Ack all messages
batch.retryAll(); // Retry all messages关键注意事项:
- - 标记消息处理成功,即使后续处理程序失败也会阻止重试
message.ack() - 对非幂等操作使用显式确认(数据库写入、API调用、支付操作)
- 隐式确认 - 如果处理程序成功返回但未调用ack(),所有消息会自动被确认
- 不保证消息顺序 - 不要假设消息按FIFO顺序处理
Critical Consumer Patterns
关键消费者模式
Explicit Acknowledgement (Non-Idempotent Operations)
显式确认(非幂等操作)
ALWAYS use explicit ack() for: Database writes, API calls, financial transactions
typescript
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
try {
await env.DB.prepare('INSERT INTO orders (id, amount) VALUES (?, ?)')
.bind(message.body.orderId, message.body.amount).run();
message.ack(); // Only ack on success
} catch (error) {
console.error(`Failed ${message.id}:`, error);
// Don't ack - will retry
}
}
}
};Why? Prevents duplicate writes if one message in batch fails. Failed messages retry independently.
对于以下操作务必使用显式ack():数据库写入、API调用、金融交易
typescript
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
try {
await env.DB.prepare('INSERT INTO orders (id, amount) VALUES (?, ?)')
.bind(message.body.orderId, message.body.amount).run();
message.ack(); // Only ack on success
} catch (error) {
console.error(`Failed ${message.id}:`, error);
// Don't ack - will retry
}
}
}
};原因:避免批量处理中某条消息失败时导致重复写入。失败的消息会独立重试。
Exponential Backoff for Rate-Limited APIs
针对速率限制API的指数退避
typescript
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
try {
await fetch('https://api.example.com/process', {
method: 'POST',
body: JSON.stringify(message.body),
});
message.ack();
} catch (error) {
if (error.status === 429) {
const delaySeconds = Math.min(60 * Math.pow(2, message.attempts - 1), 3600);
message.retry({ delaySeconds });
} else {
message.retry();
}
}
}
}
};typescript
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
try {
await fetch('https://api.example.com/process', {
method: 'POST',
body: JSON.stringify(message.body),
});
message.ack();
} catch (error) {
if (error.status === 429) {
const delaySeconds = Math.min(60 * Math.pow(2, message.attempts - 1), 3600);
message.retry({ delaySeconds });
} else {
message.retry();
}
}
}
}
};Dead Letter Queue (DLQ) - CRITICAL for Production
死信队列(DLQ)- 生产环境必备
⚠️ Without DLQ, failed messages are DELETED PERMANENTLY after max_retries
bash
npx wrangler queues create my-dlqwrangler.jsonc:
jsonc
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_retries": 3,
"dead_letter_queue": "my-dlq" // Messages go here after 3 failed retries
}]
}
}DLQ Consumer:
typescript
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
console.error('PERMANENTLY FAILED:', message.id, message.body);
await env.DB.prepare('INSERT INTO failed_messages (id, body) VALUES (?, ?)')
.bind(message.id, JSON.stringify(message.body)).run();
message.ack(); // Remove from DLQ
}
}
};⚠️ 若未配置DLQ,失败消息在达到最大重试次数后会被永久删除
bash
npx wrangler queues create my-dlqwrangler.jsonc:
jsonc
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_retries": 3,
"dead_letter_queue": "my-dlq" // Messages go here after 3 failed retries
}]
}
}DLQ消费者:
typescript
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
console.error('PERMANENTLY FAILED:', message.id, message.body);
await env.DB.prepare('INSERT INTO failed_messages (id, body) VALUES (?, ?)')
.bind(message.id, JSON.stringify(message.body)).run();
message.ack(); // Remove from DLQ
}
}
};Known Issues Prevention
已知问题预防
This skill prevents 13 documented issues:
本技能可预防13种已记录的问题:
Issue #1: Multiple Dev Commands - Queues Don't Flow Between Processes
问题#1:多开发命令 - 队列消息无法在进程间流转
Error: Queue messages sent in one process don't appear in another consumer process
Source: GitHub Issue #9795
wrangler devwrangler devWhy It Happens: The virtual queue used by wrangler is in-process memory. Separate dev processes cannot share the queue state.
Prevention:
bash
undefined原因:wrangler使用的虚拟队列存储在进程内存中。独立的开发进程无法共享队列状态。
预防方案:
bash
undefined❌ Don't run producer and consumer as separate processes
❌ Don't run producer and consumer as separate processes
Terminal 1: wrangler dev (producer)
Terminal 1: wrangler dev (producer)
Terminal 2: wrangler dev (consumer) # Won't receive messages!
Terminal 2: wrangler dev (consumer) # Won't receive messages!
✅ Option 1: Run both in single dev command
✅ Option 1: Run both in single dev command
wrangler dev -c producer/wrangler.jsonc -c consumer/wrangler.jsonc
wrangler dev -c producer/wrangler.jsonc -c consumer/wrangler.jsonc
✅ Option 2: Use Vite plugin with auxiliaryWorkers
✅ Option 2: Use Vite plugin with auxiliaryWorkers
vite.config.ts:
vite.config.ts:
export default defineConfig({
plugins: [
cloudflare({
auxiliaryWorkers: ['./consumer/wrangler.jsonc']
})
]
})
---export default defineConfig({
plugins: [
cloudflare({
auxiliaryWorkers: ['./consumer/wrangler.jsonc']
})
]
})
---Issue #2: Queue Producer Binding Causes 500 Errors with Remote Dev
问题#2:队列生产者绑定导致远程开发模式下出现500错误
Error: All routes return 500 Internal Server Error when using with queue bindings
Source: GitHub Issue #9642
wrangler dev --remoteWhy It Happens: Queues are not yet supported in mode. Even routes that don't use the queue binding fail.
wrangler dev --remotePrevention:
jsonc
// When using remote dev, temporarily comment out queue bindings
{
"queues": {
// "producers": [{ "queue": "my-queue", "binding": "MY_QUEUE" }]
}
}
// Or use local dev instead
// wrangler dev (without --remote)错误:当使用并配置队列绑定时,所有路由都返回500内部服务器错误
来源:GitHub Issue #9642
wrangler dev --remote原因:队列目前不支持模式。即使是不使用队列绑定的路由也会失败。
wrangler dev --remote预防方案:
jsonc
// When using remote dev, temporarily comment out queue bindings
{
"queues": {
// "producers": [{ "queue": "my-queue", "binding": "MY_QUEUE" }]
}
}
// Or use local dev instead
// wrangler dev (without --remote)Issue #3: D1 Remote Breaks When Queue Remote is Set
问题#3:配置队列远程绑定后D1远程绑定失效
Error: D1 remote binding stops working when is set on queue producer binding
Source: GitHub Issue #11106
remote: trueWhy It Happens: Binding conflict issue affecting mixed local/remote development.
Prevention:
jsonc
// ❌ Don't mix D1 remote with queue remote
{
"d1_databases": [{
"binding": "DB",
"database_id": "...",
"remote": true
}],
"queues": {
"producers": [{
"binding": "QUEUE",
"queue": "my-queue",
"remote": true // ❌ Breaks D1 remote
}]
}
}
// ✅ Avoid remote on queues when using D1 remote
{
"d1_databases": [{ "binding": "DB", "remote": true }],
"queues": {
"producers": [{ "binding": "QUEUE", "queue": "my-queue" }]
}
}Status: No workaround yet. Track issue for updates.
错误:当队列生产者绑定设置时,D1远程绑定停止工作
来源:GitHub Issue #11106
remote: true原因:混合本地/远程开发时的绑定冲突问题。
预防方案:
jsonc
// ❌ Don't mix D1 remote with queue remote
{
"d1_databases": [{
"binding": "DB",
"database_id": "...",
"remote": true
}],
"queues": {
"producers": [{
"binding": "QUEUE",
"queue": "my-queue",
"remote": true // ❌ Breaks D1 remote
}]
}
}
// ✅ Avoid remote on queues when using D1 remote
{
"d1_databases": [{ "binding": "DB", "remote": true }],
"queues": {
"producers": [{ "binding": "QUEUE", "queue": "my-queue" }]
}
}状态:暂无解决方案,跟踪该问题获取更新。
Issue #4: Mixed Local/Remote Bindings - Queue Consumer Missing
问题#4:混合本地/远程绑定 - 队列消费者缺失
Error: Queue consumer binding does not appear when mixing local queues with remote AI/Vectorize bindings
Source: GitHub Issue #9887
Why It Happens: Wrangler doesn't support mixed local/remote bindings in the same worker.
Prevention:
jsonc
// ❌ Don't mix local queues with remote AI
{
"queues": {
"consumers": [{ "queue": "my-queue" }]
},
"ai": {
"binding": "AI",
"experimental_remote": true // ❌ Breaks queue consumer
}
}
// ✅ Option 1: All local (no remote bindings)
wrangler dev
// ✅ Option 2: Separate workers for queues vs AI
// Worker 1: Queue processing (local)
// Worker 2: AI operations (remote)错误:当混合本地队列与远程AI/Vectorize绑定时,队列消费者绑定不显示
来源:GitHub Issue #9887
原因:Wrangler不支持在同一个Worker中混合本地/远程绑定。
预防方案:
jsonc
// ❌ Don't mix local queues with remote AI
{
"queues": {
"consumers": [{ "queue": "my-queue" }]
},
"ai": {
"binding": "AI",
"experimental_remote": true // ❌ Breaks queue consumer
}
}
// ✅ Option 1: All local (no remote bindings)
wrangler dev
// ✅ Option 2: Separate workers for queues vs AI
// Worker 1: Queue processing (local)
// Worker 2: AI operations (remote)Issue #5: http_pull Type Prevents Worker Consumer Execution
问题#5:http_pull类型阻止Worker消费者执行
Error: Queue consumer with doesn't execute in production
Source: GitHub Issue #6619
type: "http_pull"Why It Happens: is for external HTTP-based consumers, not Worker-based consumers.
http_pullPrevention:
jsonc
// ❌ Don't use type: "http_pull" for Worker consumers
{
"queues": {
"consumers": [{
"queue": "my-queue",
"type": "http_pull", // ❌ Wrong for Workers
"max_batch_size": 10
}]
}
}
// ✅ Omit type field for push-based Worker consumers
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 10
// No "type" field - defaults to Worker consumer
}]
}
}错误:配置的队列消费者在生产环境中无法执行
来源:GitHub Issue #6619
type: "http_pull"原因:适用于外部基于HTTP的消费者,而非基于Worker的消费者。
http_pull预防方案:
jsonc
// ❌ Don't use type: "http_pull" for Worker consumers
{
"queues": {
"consumers": [{
"queue": "my-queue",
"type": "http_pull", // ❌ Wrong for Workers
"max_batch_size": 10
}]
}
}
// ✅ Omit type field for push-based Worker consumers
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 10
// No "type" field - defaults to Worker consumer
}]
}
}Breaking Changes & Deprecations
重大变更与废弃
delivery_delay in Producer Config (Upcoming Removal)
生产者配置中的delivery_delay(即将移除)
Warning: The parameter in producer bindings will be removed in a future wrangler version.
delivery_delaySource: GitHub Issue #10286
jsonc
// ❌ Will be removed - don't use
{
"queues": {
"producers": [{
"binding": "MY_QUEUE",
"queue": "my-queue",
"delivery_delay": 300 // ❌ Don't use this
}]
}
}Migration: Use per-message delay instead:
typescript
// ✅ Correct approach - per-message delay
await env.MY_QUEUE.send({ data }, { delaySeconds: 300 });Why: Workers should not affect queue-level settings. With multiple producers, the setting from the last-deployed producer wins, causing unpredictable behavior.
警告:生产者绑定中的参数将在未来的wrangler版本中移除。
delivery_delayjsonc
// ❌ Will be removed - don't use
{
"queues": {
"producers": [{
"binding": "MY_QUEUE",
"queue": "my-queue",
"delivery_delay": 300 // ❌ Don't use this
}]
}
}迁移方案:改为使用每条消息的延迟设置:
typescript
// ✅ Correct approach - per-message delay
await env.MY_QUEUE.send({ data }, { delaySeconds: 300 });原因:Worker不应影响队列级别的设置。当存在多个生产者时,最后部署的生产者的设置会生效,导致不可预测的行为。
Community Tips
社区技巧
Note: These tips come from community discussions and GitHub issues. Verify against your wrangler version.
注意:这些技巧来自社区讨论和GitHub问题。请根据你的wrangler版本进行验证。
Tip: max_batch_timeout May Break Local Development
技巧:max_batch_timeout可能会破坏本地开发
Source: GitHub Issue #6619
Confidence: MEDIUM
Applies to: Local development with
wrangler devIf your queue consumer doesn't execute locally, try removing :
max_batch_timeoutjsonc
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 10
// Remove max_batch_timeout for local dev
}]
}
}This appears to be version-specific and may not affect all setups.
来源:GitHub Issue #6619
可信度:中等
适用范围:使用的本地开发
wrangler dev如果你的队列消费者在本地无法执行,尝试移除:
max_batch_timeoutjsonc
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 10
// Remove max_batch_timeout for local dev
}]
}
}这似乎是版本特定的问题,可能不会影响所有设置。
Tip: Queue Name Not Available on Producer Bindings
技巧:生产者绑定无法获取队列名称
Source: GitHub Issue #10131
Confidence: HIGH
Applies to: Multi-environment deployments (staging, PR previews, tenant-specific queues)
Queue names are only available via in consumer handlers, not on producer bindings. This creates issues with environment-specific queue names like or .
batch.queueemail-queue-stagingemail-queue-pr-123Current Limitation:
typescript
// ❌ Can't access queue name from producer binding
const queueName = env.MY_QUEUE.name; // Doesn't exist!
// ❌ Must hardcode or normalize in consumer
switch (batch.queue) {
case 'email-queue': // What about email-queue-staging?
case 'email-queue-staging': // Must handle all variants
case 'email-queue-pr-123': // Dynamic env names break this
}Workaround:
typescript
// In consumer: Normalize queue name
function normalizeQueueName(queueName: string): string {
return queueName.replace(/-staging|-pr-\d+|-dev/g, '');
}
switch (normalizeQueueName(batch.queue)) {
case 'email-queue':
// Handle all email-queue-* variants
}Status: Feature request tracked internally: MQ-923
来源:GitHub Issue #10131
可信度:高
适用范围:多环境部署( staging、PR预览、租户特定队列)
队列名称仅能在消费者处理程序中通过获取,无法从生产者绑定中获取。这会给环境特定的队列名称(如或)带来问题。
batch.queueemail-queue-stagingemail-queue-pr-123当前限制:
typescript
// ❌ Can't access queue name from producer binding
const queueName = env.MY_QUEUE.name; // Doesn't exist!
// ❌ Must hardcode or normalize in consumer
switch (batch.queue) {
case 'email-queue': // What about email-queue-staging?
case 'email-queue-staging': // Must handle all variants
case 'email-queue-pr-123': // Dynamic env names break this
}解决方案:
typescript
// In consumer: Normalize queue name
function normalizeQueueName(queueName: string): string {
return queueName.replace(/-staging|-pr-\d+|-dev/g, '');
}
switch (normalizeQueueName(batch.queue)) {
case 'email-queue':
// Handle all email-queue-* variants
}状态:功能请求已在内部跟踪:MQ-923
Consumer Configuration
消费者配置
jsonc
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 100, // 1-100 (default: 10)
"max_batch_timeout": 30, // 0-60s (default: 5s)
"max_retries": 5, // 0-100 (default: 3)
"retry_delay": 300, // Seconds (default: 0)
"max_concurrency": 10, // 1-250 (default: auto-scale)
"dead_letter_queue": "my-dlq" // REQUIRED for production
}]
}
}Critical Settings:
- Batching - Consumer called when EITHER condition met (max_batch_size OR max_batch_timeout)
- max_retries - After exhausted: with DLQ → sent to DLQ, without DLQ → DELETED PERMANENTLY
- max_concurrency - Only set if upstream has rate limits or connection limits. Otherwise leave unset for auto-scaling (up to 250 concurrent invocations)
- DLQ - Create separately:
npx wrangler queues create my-dlq
jsonc
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 100, // 1-100 (default: 10)
"max_batch_timeout": 30, // 0-60s (default: 5s)
"max_retries": 5, // 0-100 (default: 3)
"retry_delay": 300, // Seconds (default: 0)
"max_concurrency": 10, // 1-250 (default: auto-scale)
"dead_letter_queue": "my-dlq" // REQUIRED for production
}]
}
}关键设置:
- 批量处理 - 当满足以下任一条件时调用消费者(达到max_batch_size或max_batch_timeout)
- max_retries - 达到最大重试次数后:配置了DLQ则发送到DLQ,未配置则永久删除
- max_concurrency - 仅当上游存在速率限制或连接限制时设置。否则留空以启用自动扩展(最多250个并发调用)
- DLQ - 需要单独创建:
npx wrangler queues create my-dlq
Wrangler Commands
Wrangler命令
bash
undefinedbash
undefinedCreate queue
Create queue
npx wrangler queues create my-queue
npx wrangler queues create my-queue --message-retention-period-secs 1209600 # 14 days
npx wrangler queues create my-queue
npx wrangler queues create my-queue --message-retention-period-secs 1209600 # 14 days
Manage queues
Manage queues
npx wrangler queues list
npx wrangler queues info my-queue
npx wrangler queues delete my-queue # ⚠️ Deletes ALL messages!
npx wrangler queues list
npx wrangler queues info my-queue
npx wrangler queues delete my-queue # ⚠️ Deletes ALL messages!
Pause/Purge (March 2025 - NEW)
Pause/Purge (March 2025 - NEW)
npx wrangler queues pause-delivery my-queue # Pause processing, keep receiving
npx wrangler queues resume-delivery my-queue
npx wrangler queues purge my-queue # ⚠️ Permanently deletes all messages!
npx wrangler queues pause-delivery my-queue # Pause processing, keep receiving
npx wrangler queues resume-delivery my-queue
npx wrangler queues purge my-queue # ⚠️ Permanently deletes all messages!
Consumer management
Consumer management
npx wrangler queues consumer add my-queue my-consumer-worker
--batch-size 50 --batch-timeout 10 --message-retries 5 npx wrangler queues consumer remove my-queue my-consumer-worker
--batch-size 50 --batch-timeout 10 --message-retries 5 npx wrangler queues consumer remove my-queue my-consumer-worker
---npx wrangler queues consumer add my-queue my-consumer-worker
--batch-size 50 --batch-timeout 10 --message-retries 5 npx wrangler queues consumer remove my-queue my-consumer-worker
--batch-size 50 --batch-timeout 10 --message-retries 5 npx wrangler queues consumer remove my-queue my-consumer-worker
---Limits & Quotas
限制与配额
| Feature | Limit |
|---|---|
| Queues per account | 10,000 |
| Message size | 128 KB (includes ~100 bytes metadata) |
| Message retries | 100 max |
| Batch size | 1-100 messages |
| Batch timeout | 0-60 seconds |
| Messages per sendBatch | 100 (or 256 KB total) |
| Queue throughput | 5,000 messages/second per queue |
| Message retention | 4 days (default), 14 days (max) |
| Queue backlog size | 25 GB per queue |
| Concurrent consumers | 250 (push-based, auto-scale) |
| Consumer duration | 15 minutes (wall clock) |
| Consumer CPU time | 30 seconds (default), 5 minutes (max) |
| Visibility timeout | 12 hours (pull consumers) |
| Message delay | 12 hours (max) |
| API rate limit | 1200 requests / 5 minutes |
| 功能 | 限制 |
|---|---|
| 每个账户的队列数量 | 10,000 |
| 消息大小 | 128 KB(包含约100字节元数据) |
| 消息重试次数 | 最多100次 |
| 批量大小 | 1-100条消息 |
| 批量超时 | 0-60秒 |
| sendBatch的消息数量 | 100条(或总大小256 KB) |
| 队列吞吐量 | 每个队列每秒5000条消息 |
| 消息保留时长 | 4天(默认),最长14天 |
| 队列积压大小 | 每个队列25 GB |
| 并发消费者数量 | 250个(推送式,自动扩展) |
| 消费者执行时长 | 15分钟(挂钟时间) |
| 消费者CPU时间 | 30秒(默认),最长5分钟 |
| 可见性超时 | 12小时(拉取式消费者) |
| 消息延迟 | 最长12小时 |
| API速率限制 | 每5分钟1200次请求 |
Pricing
定价
Requires Workers Paid plan ($5/month)
Operations Pricing:
- First 1,000,000 operations/month: FREE
- After that: $0.40 per million operations
What counts as an operation:
- Each 64 KB chunk written, read, or deleted
- Messages >64 KB count as multiple operations:
- 65 KB message = 2 operations
- 127 KB message = 2 operations
- 128 KB message = 2 operations
Typical message lifecycle:
- 1 write + 1 read + 1 delete = 3 operations
Retries:
- Each retry = additional read operation
- Message retried 3 times = 1 write + 4 reads + 1 delete = 6 operations
Dead Letter Queue:
- Writing to DLQ = additional write operation
Cost examples:
- 1M messages/month (no retries): ((1M × 3) - 1M) / 1M × $0.40 = $0.80
- 10M messages/month: ((10M × 3) - 1M) / 1M × $0.40 = $11.60
- 100M messages/month: ((100M × 3) - 1M) / 1M × $0.40 = $119.60
需要Workers付费计划(每月5美元)
操作定价:
- 每月前1,000,000次操作:免费
- 超出部分:每百万次操作0.40美元
计入操作的行为:
- 每64 KB的写入、读取或删除操作
- 超过64 KB的消息会被计为多次操作:
- 65 KB消息 = 2次操作
- 127 KB消息 = 2次操作
- 128 KB消息 = 2次操作
典型消息生命周期:
- 1次写入 + 1次读取 + 1次删除 = 3次操作
重试:
- 每次重试 = 额外的1次读取操作
- 消息被重试3次 = 1次写入 + 4次读取 + 1次删除 = 6次操作
死信队列:
- 写入DLQ = 额外的1次写入操作
成本示例:
- 每月100万条消息(无重试):((1M × 3) - 1M) / 1M × $0.40 = 0.80美元
- 每月1000万条消息:((10M × 3) - 1M) / 1M × $0.40 = 11.60美元
- 每月1亿条消息:((100M × 3) - 1M) / 1M × $0.40 = 119.60美元
Error Handling
错误处理
Common Errors
常见错误
1. Message Too Large
1. 消息过大
typescript
// ❌ Bad: Message >128 KB
await env.MY_QUEUE.send({
data: largeArray, // >128 KB
});
// ✅ Good: Check size before sending
const message = { data: largeArray };
const size = new TextEncoder().encode(JSON.stringify(message)).length;
if (size > 128000) {
// Store in R2, send reference
const key = `messages/${crypto.randomUUID()}.json`;
await env.MY_BUCKET.put(key, JSON.stringify(message));
await env.MY_QUEUE.send({ type: 'large-message', r2Key: key });
} else {
await env.MY_QUEUE.send(message);
}typescript
// ❌ Bad: Message >128 KB
await env.MY_QUEUE.send({
data: largeArray, // >128 KB
});
// ✅ Good: Check size before sending
const message = { data: largeArray };
const size = new TextEncoder().encode(JSON.stringify(message)).length;
if (size > 128000) {
// Store in R2, send reference
const key = `messages/${crypto.randomUUID()}.json`;
await env.MY_BUCKET.put(key, JSON.stringify(message));
await env.MY_QUEUE.send({ type: 'large-message', r2Key: key });
} else {
await env.MY_QUEUE.send(message);
}2. Throughput Exceeded
2. 吞吐量超限
typescript
// ❌ Bad: Exceeding 5000 msg/s per queue
for (let i = 0; i < 10000; i++) {
await env.MY_QUEUE.send({ id: i }); // Too fast!
}
// ✅ Good: Use sendBatch
const messages = Array.from({ length: 10000 }, (_, i) => ({
body: { id: i },
}));
// Send in batches of 100
for (let i = 0; i < messages.length; i += 100) {
await env.MY_QUEUE.sendBatch(messages.slice(i, i + 100));
}
// ✅ Even better: Rate limit with delay
for (let i = 0; i < messages.length; i += 100) {
await env.MY_QUEUE.sendBatch(messages.slice(i, i + 100));
if (i + 100 < messages.length) {
await new Promise(resolve => setTimeout(resolve, 100)); // 100ms delay
}
}typescript
// ❌ Bad: Exceeding 5000 msg/s per queue
for (let i = 0; i < 10000; i++) {
await env.MY_QUEUE.send({ id: i }); // Too fast!
}
// ✅ Good: Use sendBatch
const messages = Array.from({ length: 10000 }, (_, i) => ({
body: { id: i },
}));
// Send in batches of 100
for (let i = 0; i < messages.length; i += 100) {
await env.MY_QUEUE.sendBatch(messages.slice(i, i + 100));
}
// ✅ Even better: Rate limit with delay
for (let i = 0; i < messages.length; i += 100) {
await env.MY_QUEUE.sendBatch(messages.slice(i, i + 100));
if (i + 100 < messages.length) {
await new Promise(resolve => setTimeout(resolve, 100)); // 100ms delay
}
}3. Consumer Timeout
3. 消费者超时
typescript
// ❌ Bad: Long processing without CPU limit increase
export default {
async queue(batch: MessageBatch): Promise<void> {
for (const message of batch.messages) {
await processForMinutes(message.body); // CPU timeout!
}
},
};
// ✅ Good: Increase CPU limit in wrangler.jsoncwrangler.jsonc:
jsonc
{
"limits": {
"cpu_ms": 300000 // 5 minutes (max allowed)
}
}typescript
// ❌ Bad: Long processing without CPU limit increase
export default {
async queue(batch: MessageBatch): Promise<void> {
for (const message of batch.messages) {
await processForMinutes(message.body); // CPU timeout!
}
},
};
// ✅ Good: Increase CPU limit in wrangler.jsoncwrangler.jsonc:
jsonc
{
"limits": {
"cpu_ms": 300000 // 5 minutes (max allowed)
}
}4. Backlog Growing
4. 消息积压增长
typescript
// Issue: Consumer too slow, backlog growing
// ✅ Solution 1: Increase batch size
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 100 // Process more per invocation
}]
}
}
// ✅ Solution 2: Let concurrency auto-scale (don't set max_concurrency)
// ✅ Solution 3: Optimize consumer code
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
// Process in parallel
await Promise.all(
batch.messages.map(async (message) => {
await process(message.body);
message.ack();
})
);
},
};typescript
// Issue: Consumer too slow, backlog growing
// ✅ Solution 1: Increase batch size
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 100 // Process more per invocation
}]
}
}
// ✅ Solution 2: Let concurrency auto-scale (don't set max_concurrency)
// ✅ Solution 3: Optimize consumer code
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
// Process in parallel
await Promise.all(
batch.messages.map(async (message) => {
await process(message.body);
message.ack();
})
);
},
};Critical Rules
关键规则
Always:
- ✅ Configure DLQ for production (in consumer config)
dead_letter_queue - ✅ Use explicit for non-idempotent ops (DB writes, API calls)
message.ack() - ✅ Validate message size <128 KB before sending
- ✅ Use for multiple messages (more efficient)
sendBatch() - ✅ Implement exponential backoff:
60 * Math.pow(2, message.attempts - 1) - ✅ Let concurrency auto-scale (don't set unless upstream has rate limits)
max_concurrency
Never:
- ❌ Never assume FIFO ordering - not guaranteed
- ❌ Never rely on implicit ack for non-idempotent ops - use explicit
ack() - ❌ Never send messages >128 KB - will fail (store in R2 instead)
- ❌ Never skip DLQ in production - failed messages DELETED PERMANENTLY without DLQ
- ❌ Never exceed 5,000 msg/s per queue (push consumers) or rate limits apply
- ❌ Never process messages synchronously - use for parallelism
Promise.all()
务必遵守:
- ✅ 为生产环境配置DLQ(消费者配置中的)
dead_letter_queue - ✅ 对非幂等操作(数据库写入、API调用)使用显式
message.ack() - ✅ 在发送前验证消息大小小于128 KB
- ✅ 对多条消息使用(更高效)
sendBatch() - ✅ 实现指数退避:
60 * Math.pow(2, message.attempts - 1) - ✅ 启用自动扩展并发(除非上游有速率限制,否则不要设置)
max_concurrency
切勿:
- ❌ 不要假设消息按FIFO顺序处理 - 不保证顺序
- ❌ 对非幂等操作不要依赖隐式确认 - 使用显式
ack() - ❌ 不要发送超过128 KB的消息 - 会发送失败(改为存储在R2中)
- ❌ 生产环境中不要跳过DLQ配置 - 未配置DLQ的失败消息会被永久删除
- ❌ 不要超过每个队列每秒5000条消息的限制(推送式消费者),否则会触发速率限制
- ❌ 不要同步处理消息 - 使用实现并行处理
Promise.all()
Troubleshooting
故障排查
Issue: Messages not being delivered to consumer
问题:消息未投递到消费者
Possible causes:
- Consumer not deployed
- Wrong queue name in wrangler.jsonc
- Delivery paused
- Consumer throwing errors
Solution:
bash
undefined可能原因:
- 消费者未部署
- wrangler.jsonc中的队列名称错误
- 消息投递已暂停
- 消费者抛出错误
解决方案:
bash
undefinedCheck queue info
Check queue info
npx wrangler queues info my-queue
npx wrangler queues info my-queue
Check if delivery paused
Check if delivery paused
npx wrangler queues resume-delivery my-queue
npx wrangler queues resume-delivery my-queue
Check consumer logs
Check consumer logs
npx wrangler tail my-consumer
---npx wrangler tail my-consumer
---Issue: Entire batch retried when one message fails
问题:一条消息失败导致整个批次重试
Cause: Using implicit acknowledgement with non-idempotent operations
Solution: Use explicit ack()
typescript
// ✅ Explicit ack
for (const message of batch.messages) {
try {
await dbWrite(message.body);
message.ack(); // Only ack on success
} catch (error) {
console.error(`Failed: ${message.id}`);
// Don't ack - will retry
}
}原因:对非幂等操作使用了隐式确认
解决方案:使用显式ack()
typescript
// ✅ Explicit ack
for (const message of batch.messages) {
try {
await dbWrite(message.body);
message.ack(); // Only ack on success
} catch (error) {
console.error(`Failed: ${message.id}`);
// Don't ack - will retry
}
}Issue: Messages deleted without processing
问题:消息未被处理就被删除
Cause: No Dead Letter Queue configured
Solution:
bash
undefined原因:未配置死信队列
解决方案:
bash
undefinedCreate DLQ
Create DLQ
npx wrangler queues create my-dlq
npx wrangler queues create my-dlq
Add to consumer config
Add to consumer config
```jsonc
{
"queues": {
"consumers": [{
"queue": "my-queue",
"dead_letter_queue": "my-dlq"
}]
}
}
```jsonc
{
"queues": {
"consumers": [{
"queue": "my-queue",
"dead_letter_queue": "my-dlq"
}]
}
}Issue: Consumer not auto-scaling
问题:消费者未自动扩展
Possible causes:
- set to 1
max_concurrency - Consumer returning errors (not processing)
- Batch processing too fast (no backlog)
Solution:
jsonc
{
"queues": {
"consumers": [{
"queue": "my-queue",
// Don't set max_concurrency - let it auto-scale
"max_batch_size": 50 // Increase batch size instead
}]
}
}可能原因:
- 被设置为1
max_concurrency - 消费者持续返回错误(未处理消息)
- 批量处理速度过快(无积压)
解决方案:
jsonc
{
"queues": {
"consumers": [{
"queue": "my-queue",
// Don't set max_concurrency - let it auto-scale
"max_batch_size": 50 // Increase batch size instead
}]
}
}Related Documentation
相关文档
- Cloudflare Queues Docs
- How Queues Works
- JavaScript APIs
- Batching & Retries
- Consumer Concurrency
- Dead Letter Queues
- Wrangler Commands
- Limits
- Pricing
Last Updated: 2026-01-21
Version: 2.0.0
Changes: Added HTTP Publishing (May 2025), Event Subscriptions (Aug 2025), Known Issues Prevention (13 issues), Breaking Changes section, Community Tips. Error count: 0 → 13. Major feature additions and comprehensive issue documentation.
Maintainer: Jeremy Dawes | jeremy@jezweb.net
- Cloudflare Queues Docs
- How Queues Works
- JavaScript APIs
- Batching & Retries
- Consumer Concurrency
- Dead Letter Queues
- Wrangler Commands
- Limits
- Pricing
最后更新:2026-01-21
版本:2.0.0
变更:新增HTTP发布(2025年5月)、事件订阅(2025年8月)、已知问题预防(13个问题)、重大变更章节、社区技巧。错误数量:0 → 13。新增主要功能并完善了问题文档。
维护者:Jeremy Dawes | jeremy@jezweb.net