Loading...
Loading...
Asynchronous message queues for reliable background processing. Load when offloading background tasks, batch processing messages, implementing retry logic with dead letter queues, rate limiting upstream APIs, or decoupling producers from consumers.
npx skill4agent add null-shot/cloudflare-skills queues| Task | API |
|---|---|
| Send single message | |
| Send batch | |
| Define consumer | |
| Access message body | |
| Acknowledge message | Messages auto-ack unless handler throws |
| Retry message | |
| Get batch size | |
{
"name": "request-logger-consumer",
"main": "src/index.ts",
"compatibility_date": "2025-02-11",
"queues": {
"producers": [{
"name": "request-queue",
"binding": "REQUEST_QUEUE"
}],
"consumers": [{
"name": "request-queue",
"dead_letter_queue": "request-queue-dlq",
"retry_delay": 300,
"max_batch_size": 100,
"max_batch_timeout": 30,
"max_retries": 3
}]
},
"vars": {
"UPSTREAM_API_URL": "https://api.example.com/batch-logs",
"UPSTREAM_API_KEY": ""
}
}dead_letter_queueretry_delaymax_batch_sizemax_batch_timeoutmax_retries// src/index.ts
interface Env {
REQUEST_QUEUE: Queue;
UPSTREAM_API_URL: string;
UPSTREAM_API_KEY: string;
}
export default {
// Producer: Send messages to queue
async fetch(request: Request, env: Env) {
const info = {
timestamp: new Date().toISOString(),
method: request.method,
url: request.url,
headers: Object.fromEntries(request.headers),
};
await env.REQUEST_QUEUE.send(info);
return Response.json({
message: 'Request logged',
requestId: crypto.randomUUID()
});
},
// Consumer: Process messages in batches
async queue(batch: MessageBatch<any>, env: Env) {
const requests = batch.messages.map(msg => msg.body);
const response = await fetch(env.UPSTREAM_API_URL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${env.UPSTREAM_API_KEY}`
},
body: JSON.stringify({
timestamp: new Date().toISOString(),
batchSize: requests.length,
requests
})
});
if (!response.ok) {
// Throwing will retry the entire batch
throw new Error(`Upstream API error: ${response.status}`);
}
}
};// Send simple JSON payload
await env.QUEUE.send({ userId: 123, action: "login" });
// Send batch of messages
await env.QUEUE.sendBatch([
{ userId: 123, action: "login" },
{ userId: 456, action: "logout" },
{ userId: 789, action: "purchase" }
]);
// Send with typed body
interface UserEvent {
userId: number;
action: string;
timestamp: string;
}
await env.QUEUE.send<UserEvent>({
userId: 123,
action: "login",
timestamp: new Date().toISOString()
});{
"queues": {
"consumers": [{
"name": "main-queue",
"dead_letter_queue": "main-queue-dlq",
"retry_delay": 300, // 5 minutes
"max_retries": 3
}]
}
}retry_delaymax_retriesexport default {
// Main consumer
async queue(batch: MessageBatch<any>, env: Env) {
for (const message of batch.messages) {
try {
await processMessage(message.body);
} catch (error) {
console.error('Processing failed:', error);
throw error; // Trigger retry
}
}
}
};
// Separate worker for DLQ
export default {
async queue(batch: MessageBatch<any>, env: Env) {
// Log failed messages for debugging
for (const message of batch.messages) {
console.error('Dead letter message:', {
body: message.body,
attempts: message.attempts,
timestamp: message.timestamp
});
// Optionally store in KV/D1 for inspection
await env.FAILED_MESSAGES.put(
message.id,
JSON.stringify(message),
{ expirationTtl: 86400 * 7 } // 7 days
);
}
}
};async queue(batch: MessageBatch<any>, env: Env) {
// Throwing retries entire batch
const response = await fetch(env.UPSTREAM_API_URL, {
method: 'POST',
body: JSON.stringify(batch.messages.map(m => m.body))
});
if (!response.ok) {
throw new Error(`Batch failed: ${response.status}`);
}
}async queue(batch: MessageBatch<any>, env: Env) {
const results = await Promise.allSettled(
batch.messages.map(msg => processMessage(msg.body))
);
const failures = results.filter(r => r.status === 'rejected');
if (failures.length > 0) {
console.error(`${failures.length}/${batch.messages.length} messages failed`);
// Throwing here retries the entire batch
// Consider sending failed messages to a separate queue instead
}
}async queue(batch: MessageBatch<any>, env: Env) {
const failedMessages = [];
for (const message of batch.messages) {
try {
await processMessage(message.body);
} catch (error) {
failedMessages.push(message.body);
}
}
// Requeue only failures
if (failedMessages.length > 0) {
await env.RETRY_QUEUE.sendBatch(failedMessages);
}
// Don't throw - successfully processed messages won't be retried
}// Handle large payloads
async function sendLargePayload(data: any, env: Env) {
const serialized = JSON.stringify(data);
if (serialized.length > 100_000) { // ~100KB
// Option 1: Store in R2/KV, send reference
const key = crypto.randomUUID();
await env.LARGE_PAYLOADS.put(key, serialized);
await env.QUEUE.send({ type: 'large', key });
} else {
await env.QUEUE.send(data);
}
}interface Env {
// Producer bindings
REQUEST_QUEUE: Queue<RequestInfo>;
EMAIL_QUEUE: Queue<EmailPayload>;
// Environment variables
UPSTREAM_API_URL: string;
UPSTREAM_API_KEY: string;
// Other bindings
KV: KVNamespace;
DB: D1Database;
}
interface RequestInfo {
timestamp: string;
method: string;
url: string;
headers: Record<string, string>;
}
interface EmailPayload {
to: string;
subject: string;
body: string;
}max_batch_timeout