message_queues
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseMessage Queues
消息队列
Implement asynchronous communication patterns for event-driven architectures, background job processing, and service decoupling.
为事件驱动架构、后台任务处理和服务解耦实现异步通信模式。
When to Use This Skill
适用场景
Use message queues when:
- Long-running operations block HTTP requests (report generation, video processing)
- Service decoupling required (microservices, event-driven architecture)
- Guaranteed delivery needed (payment processing, order fulfillment)
- Event streaming for analytics (log aggregation, metrics pipelines)
- Workflow orchestration for complex processes (multi-step sagas, human-in-the-loop)
- Background job processing (email sending, image resizing)
在以下场景中使用消息队列:
- 长时间运行的操作会阻塞HTTP请求(报表生成、视频处理)
- 需要服务解耦(微服务、事件驱动架构)
- 需要消息可靠投递(支付处理、订单履约)
- 需要事件流用于分析(日志聚合、指标流水线)
- 需要复杂流程的工作流编排(多阶段事务补偿、人工介入流程)
- 后台任务处理(邮件发送、图片缩放)
Broker Selection Decision Tree
消息代理选择决策树
Choose message broker based on primary need:
根据核心需求选择消息代理:
Event Streaming / Log Aggregation
事件流 / 日志聚合
→ Apache Kafka
- Throughput: 500K-1M msg/s
- Replay events (event sourcing)
- Exactly-once semantics
- Long-term retention
- Use: Analytics pipelines, CQRS, event sourcing
→ Apache Kafka
- 吞吐量:50万-100万条消息/秒
- 支持事件重放(事件溯源)
- 精确一次语义
- 长期数据保留
- 适用场景:分析流水线、CQRS、事件溯源
Simple Background Jobs
简单后台任务
→ Task Queues
- Python → Celery + Redis
- TypeScript → BullMQ + Redis
- Go → Asynq + Redis
- Use: Email sending, report generation, webhooks
→ 任务队列
- Python → Celery + Redis
- TypeScript → BullMQ + Redis
- Go → Asynq + Redis
- 适用场景:邮件发送、报表生成、Webhook处理
Complex Workflows / Sagas
复杂工作流 / 事务补偿(Saga)
→ Temporal
- Durable execution (survives restarts)
- Saga pattern support
- Human-in-the-loop workflows
- Use: Order processing, AI agent orchestration
→ Temporal
- 持久化执行(可重启恢复)
- 支持Saga模式
- 支持人工介入的工作流
- 适用场景:订单处理、AI Agent编排
Request-Reply / RPC Patterns
请求-响应 / RPC模式
→ NATS
- Built-in request-reply
- Sub-millisecond latency
- Cloud-native, simple operations
- Use: Microservices RPC, IoT command/control
→ NATS
- 内置请求-响应机制
- 亚毫秒级延迟
- 云原生、运维简单
- 适用场景:微服务RPC、物联网指令控制
Complex Message Routing
复杂消息路由
→ RabbitMQ
- Exchanges (direct, topic, fanout, headers)
- Dead letter exchanges
- Message TTL, priorities
- Use: Multi-consumer patterns, pub/sub
→ RabbitMQ
- 支持多种交换机(直连、主题、扇出、头部)
- 死信交换机
- 消息TTL、优先级设置
- 适用场景:多消费者模式、发布/订阅
Already Using Redis
已在使用Redis
→ Redis Streams
- No new infrastructure
- Simple consumer groups
- Moderate throughput (100K+ msg/s)
- Use: Notification queues, simple job queues
→ Redis Streams
- 无需新增基础设施
- 简单的消费者组
- 中等吞吐量(10万+条消息/秒)
- 适用场景:通知队列、简单任务队列
Performance Comparison
性能对比
| Broker | Throughput | Latency (p99) | Best For |
|---|---|---|---|
| Kafka | 500K-1M msg/s | 10-50ms | Event streaming |
| NATS JetStream | 200K-400K msg/s | Sub-ms to 5ms | Cloud-native microservices |
| RabbitMQ | 50K-100K msg/s | 5-20ms | Task queues, complex routing |
| Redis Streams | 100K+ msg/s | Sub-ms | Simple queues, caching |
| 代理 | 吞吐量 | P99延迟 | 最佳适用场景 |
|---|---|---|---|
| Kafka | 50万-100万条消息/秒 | 10-50ms | 事件流 |
| NATS JetStream | 20万-40万条消息/秒 | 亚毫秒至5ms | 云原生微服务 |
| RabbitMQ | 5万-10万条消息/秒 | 5-20ms | 任务队列、复杂路由 |
| Redis Streams | 10万+条消息/秒 | 亚毫秒 | 简单队列、缓存 |
Quick Start Examples
快速开始示例
Kafka Producer/Consumer (Python)
Kafka生产者/消费者(Python)
See for working code.
examples/kafka-python/python
from confluent_kafka import Producer, Consumer完整代码请查看 。
examples/kafka-python/python
from confluent_kafka import Producer, ConsumerProducer
Producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('orders', key='order_123', value='{"status": "created"}')
producer.flush()
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('orders', key='order_123', value='{"status": "created"}')
producer.flush()
Consumer
Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])
while True:
msg = consumer.poll(1.0)
if msg is not None:
process_order(msg.value())
undefinedconsumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])
while True:
msg = consumer.poll(1.0)
if msg is not None:
process_order(msg.value())
undefinedCelery Background Jobs (Python)
Celery后台任务(Python)
See for full implementation.
examples/celery-image-processing/python
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379')
@app.task(bind=True, max_retries=3)
def process_image(self, image_url: str):
try:
result = expensive_image_processing(image_url)
return result
except RecoverableError as e:
raise self.retry(exc=e, countdown=60)完整实现请查看 。
examples/celery-image-processing/python
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379')
@app.task(bind=True, max_retries=3)
def process_image(self, image_url: str):
try:
result = expensive_image_processing(image_url)
return result
except RecoverableError as e:
raise self.retry(exc=e, countdown=60)BullMQ Job Processing (TypeScript)
BullMQ任务处理(TypeScript)
See for full implementation.
examples/bullmq-webhook-processor/typescript
import { Queue, Worker } from 'bullmq'
const queue = new Queue('webhooks', {
connection: { host: 'localhost', port: 6379 }
})
// Enqueue job
await queue.add('send-webhook', {
url: 'https://example.com/webhook',
payload: { event: 'order.created' }
})
// Process jobs
const worker = new Worker('webhooks', async job => {
await fetch(job.data.url, {
method: 'POST',
body: JSON.stringify(job.data.payload)
})
}, { connection: { host: 'localhost', port: 6379 } })完整实现请查看 。
examples/bullmq-webhook-processor/typescript
import { Queue, Worker } from 'bullmq'
const queue = new Queue('webhooks', {
connection: { host: 'localhost', port: 6379 }
})
// 入队任务
await queue.add('send-webhook', {
url: 'https://example.com/webhook',
payload: { event: 'order.created' }
})
// 处理任务
const worker = new Worker('webhooks', async job => {
await fetch(job.data.url, {
method: 'POST',
body: JSON.stringify(job.data.payload)
})
}, { connection: { host: 'localhost', port: 6379 } })Temporal Workflow Orchestration
Temporal工作流编排
See for saga pattern implementation.
examples/temporal-order-saga/python
from temporalio import workflow, activity
from datetime import timedelta
@workflow.defn
class OrderSagaWorkflow:
@workflow.run
async def run(self, order_id: str) -> str:
# Step 1: Reserve inventory
inventory_id = await workflow.execute_activity(
reserve_inventory,
order_id,
start_to_close_timeout=timedelta(seconds=10),
)
# Step 2: Charge payment
payment_id = await workflow.execute_activity(
charge_payment,
order_id,
start_to_close_timeout=timedelta(seconds=30),
)
return f"Order {order_id} completed"Saga模式实现请查看 。
examples/temporal-order-saga/python
from temporalio import workflow, activity
from datetime import timedelta
@workflow.defn
class OrderSagaWorkflow:
@workflow.run
async def run(self, order_id: str) -> str:
# 步骤1:预留库存
inventory_id = await workflow.execute_activity(
reserve_inventory,
order_id,
start_to_close_timeout=timedelta(seconds=10),
)
# 步骤2:扣款
payment_id = await workflow.execute_activity(
charge_payment,
order_id,
start_to_close_timeout=timedelta(seconds=30),
)
return f"Order {order_id} completed"Core Patterns
核心模式
Event Naming Convention
事件命名规范
Use:
Domain.Entity.Action.VersionExamples:
order.created.v1user.profile.updated.v2payment.failed.v1
使用格式:
领域.实体.动作.版本示例:
order.created.v1user.profile.updated.v2payment.failed.v1
Event Schema Structure
事件Schema结构
json
{
"event_type": "order.created.v2",
"event_id": "uuid-here",
"timestamp": "2025-12-02T10:00:00Z",
"version": "2.0",
"data": {
"order_id": "ord_123",
"customer_id": "cus_456"
},
"metadata": {
"producer": "order-service",
"trace_id": "abc123",
"correlation_id": "xyz789"
}
}json
{
"event_type": "order.created.v2",
"event_id": "uuid-here",
"timestamp": "2025-12-02T10:00:00Z",
"version": "2.0",
"data": {
"order_id": "ord_123",
"customer_id": "cus_456"
},
"metadata": {
"producer": "order-service",
"trace_id": "abc123",
"correlation_id": "xyz789"
}
}Dead Letter Queue Pattern
死信队列(DLQ)模式
Route failed messages to dead letter queue (DLQ) after max retries:
python
@app.task(bind=True, max_retries=3)
def process_order(self, order_id: str):
try:
result = perform_processing(order_id)
return result
except UnrecoverableError as e:
send_to_dlq(order_id, str(e))
raise Reject(e, requeue=False)达到最大重试次数后,将失败消息路由到死信队列:
python
@app.task(bind=True, max_retries=3)
def process_order(self, order_id: str):
try:
result = perform_processing(order_id)
return result
except UnrecoverableError as e:
send_to_dlq(order_id, str(e))
raise Reject(e, requeue=False)Idempotency for Exactly-Once Processing
精确一次处理的幂等性实现
python
@app.post("/process")
async def process_payment(
payment_data: dict,
idempotency_key: str = Header(None)
):
# Check if already processed
cached_result = redis_client.get(f"idempotency:{idempotency_key}")
if cached_result:
return {"status": "already_processed"}
result = process_payment_logic(payment_data)
redis_client.setex(f"idempotency:{idempotency_key}", 86400, result)
return {"status": "processed", "result": result}python
@app.post("/process")
async def process_payment(
payment_data: dict,
idempotency_key: str = Header(None)
):
# 检查是否已处理
cached_result = redis_client.get(f"idempotency:{idempotency_key}")
if cached_result:
return {"status": "already_processed"}
result = process_payment_logic(payment_data)
redis_client.setex(f"idempotency:{idempotency_key}", 86400, result)
return {"status": "processed", "result": result}Frontend Integration
前端集成
Job Status Updates via SSE
通过SSE实现任务状态更新
python
undefinedpython
undefinedFastAPI endpoint for real-time job status
FastAPI实时任务状态端点
@app.get("/status/{task_id}")
async def task_status_stream(task_id: str):
async def event_generator():
while True:
task = celery_app.AsyncResult(task_id)
if task.state == 'PROGRESS':
yield {"event": "progress", "data": task.info.get('progress', 0)}
elif task.state == 'SUCCESS':
yield {"event": "complete", "data": task.result}
break
await asyncio.sleep(0.5)
return EventSourceResponse(event_generator())undefined@app.get("/status/{task_id}")
async def task_status_stream(task_id: str):
async def event_generator():
while True:
task = celery_app.AsyncResult(task_id)
if task.state == 'PROGRESS':
yield {"event": "progress", "data": task.info.get('progress', 0)}
elif task.state == 'SUCCESS':
yield {"event": "complete", "data": task.result}
break
await asyncio.sleep(0.5)
return EventSourceResponse(event_generator())undefinedReact Component
React组件
typescript
export function JobStatus({ jobId }: { jobId: string }) {
const [progress, setProgress] = useState(0)
useEffect(() => {
const eventSource = new EventSource(`/api/status/${jobId}`)
eventSource.addEventListener('progress', (e) => {
setProgress(JSON.parse(e.data))
})
eventSource.addEventListener('complete', (e) => {
toast({ title: 'Job complete', description: JSON.parse(e.data) })
eventSource.close()
})
return () => eventSource.close()
}, [jobId])
return <ProgressBar value={progress} />
}typescript
export function JobStatus({ jobId }: { jobId: string }) {
const [progress, setProgress] = useState(0)
useEffect(() => {
const eventSource = new EventSource(`/api/status/${jobId}`)
eventSource.addEventListener('progress', (e) => {
setProgress(JSON.parse(e.data))
})
eventSource.addEventListener('complete', (e) => {
toast({ title: 'Job complete', description: JSON.parse(e.data) })
eventSource.close()
})
return () => eventSource.close()
}, [jobId])
return <ProgressBar value={progress} />
}Detailed Guides
详细指南
For comprehensive documentation, see reference files:
完整文档请参考以下文件:
Broker-Specific Guides
代理专属指南
- Kafka: See for partitioning, consumer groups, exactly-once semantics
references/kafka.md - RabbitMQ: See for exchanges, bindings, routing patterns
references/rabbitmq.md - NATS: See for JetStream, request-reply patterns
references/nats.md - Redis Streams: See for consumer groups, acknowledgments
references/redis-streams.md
- Kafka:查看 了解分区、消费者组、精确一次语义
references/kafka.md - RabbitMQ:查看 了解交换机、绑定、路由模式
references/rabbitmq.md - NATS:查看 了解JetStream、请求-响应模式
references/nats.md - Redis Streams:查看 了解消费者组、消息确认
references/redis-streams.md
Task Queue Guides
任务队列指南
- Celery: See for periodic tasks, canvas (workflows), monitoring
references/celery.md - BullMQ: See for job prioritization, flows, Bull Board monitoring
references/bullmq.md - Temporal: See for saga patterns, signals, queries
references/temporal-workflows.md
- Celery:查看 了解定时任务、Canvas(工作流)、监控
references/celery.md - BullMQ:查看 了解任务优先级、流程、Bull Board监控
references/bullmq.md - Temporal:查看 了解Saga模式、信号、查询
references/temporal-workflows.md
Pattern Guides
模式指南
- Event Patterns: See for event sourcing, CQRS, outbox pattern
references/event-patterns.md
- 事件模式:查看 了解事件溯源、CQRS、Outbox模式
references/event-patterns.md
Common Anti-Patterns to Avoid
需避免的常见反模式
1. Synchronous API for Long Operations
1. 长时间操作使用同步API
python
undefinedpython
undefined❌ BAD: Blocks request thread
❌ 错误:阻塞请求线程
@app.post("/generate-report")
def generate_report(user_id: str):
report = expensive_computation(user_id) # 5 minutes!
return report
@app.post("/generate-report")
def generate_report(user_id: str):
report = expensive_computation(user_id) # 耗时5分钟!
return report
✅ GOOD: Enqueue background job
✅ 正确:入队后台任务
@app.post("/generate-report")
async def generate_report(user_id: str):
task = generate_report_task.delay(user_id)
return {"task_id": task.id}
undefined@app.post("/generate-report")
async def generate_report(user_id: str):
task = generate_report_task.delay(user_id)
return {"task_id": task.id}
undefined2. Non-Idempotent Consumers
2. 非幂等消费者
python
undefinedpython
undefined❌ BAD: Processes duplicates
❌ 错误:重复处理消息
@app.task
def send_email(email: str):
send_email_service(email) # Sends twice if retried!
@app.task
def send_email(email: str):
send_email_service(email) # 重试时会发送两次!
✅ GOOD: Idempotent with deduplication
✅ 正确:通过去重实现幂等
@app.task
def send_email(email: str, idempotency_key: str):
if redis.exists(f"sent:{idempotency_key}"):
return "already_sent"
send_email_service(email)
redis.setex(f"sent:{idempotency_key}", 86400, "1")
undefined@app.task
def send_email(email: str, idempotency_key: str):
if redis.exists(f"sent:{idempotency_key}"):
return "already_sent"
send_email_service(email)
redis.setex(f"sent:{idempotency_key}", 86400, "1")
undefined3. Ignoring Dead Letter Queues
3. 忽略死信队列
python
undefinedpython
undefined❌ BAD: Failed messages lost forever
❌ 错误:失败消息永久丢失
@app.task(max_retries=3)
def risky_task(data):
process(data) # If all retries fail, data disappears
@app.task(max_retries=3)
def risky_task(data):
process(data) # 若所有重试失败,数据将消失
✅ GOOD: DLQ for manual inspection
✅ 正确:使用DLQ进行人工排查
@app.task(max_retries=3)
def risky_task(data):
try:
process(data)
except Exception as e:
if self.request.retries >= 3:
send_to_dlq(data, str(e))
raise
undefined@app.task(max_retries=3)
def risky_task(data):
try:
process(data)
except Exception as e:
if self.request.retries >= 3:
send_to_dlq(data, str(e))
raise
undefined4. Using Kafka for Request-Reply
4. 使用Kafka实现请求-响应
python
undefinedpython
undefined❌ BAD: Kafka is not designed for RPC
❌ 错误:Kafka并非为RPC设计
def get_user_profile(user_id: str):
kafka_producer.send("user_requests", {"user_id": user_id})
# How to correlate response? Kafka is asynchronous!
def get_user_profile(user_id: str):
kafka_producer.send("user_requests", {"user_id": user_id})
# 如何关联响应?Kafka是异步的!
✅ GOOD: Use NATS request-reply or HTTP/gRPC
✅ 正确:使用NATS请求-响应或HTTP/gRPC
response = await nats.request("user.profile", user_id.encode())
undefinedresponse = await nats.request("user.profile", user_id.encode())
undefinedLibrary Recommendations
库推荐
Context7 Research
Context7研究
Confluent Kafka (Python)
- Context7 ID:
/confluentinc/confluent-kafka-python - Trust Score: 68.8/100
- Code Snippets: 192+
- Production-ready Python Kafka client
Temporal
- Context7 ID:
/websites/temporal_io - Trust Score: 80.9/100
- Code Snippets: 3,769+
- Workflow orchestration for durable execution
Confluent Kafka (Python)
- Context7 ID:
/confluentinc/confluent-kafka-python - 信任评分:68.8/100
- 代码片段:192+个
- 生产就绪的Python Kafka客户端
Temporal
- Context7 ID:
/websites/temporal_io - 信任评分:80.9/100
- 代码片段:3769+个
- 用于持久化执行的工作流编排框架
Installation
安装命令
Python:
bash
pip install confluent-kafka celery[redis] temporalio aio-pika redisTypeScript/Node.js:
bash
npm install kafkajs bullmq @temporalio/client amqplib ioredisRust:
bash
cargo add rdkafka lapin async-nats redisGo:
bash
go get github.com/confluentinc/confluent-kafka-go
go get github.com/hibiken/asynq
go get go.temporal.io/sdkPython:
bash
pip install confluent-kafka celery[redis] temporalio aio-pika redisTypeScript/Node.js:
bash
npm install kafkajs bullmq @temporalio/client amqplib ioredisRust:
bash
cargo add rdkafka lapin async-nats redisGo:
bash
go get github.com/confluentinc/confluent-kafka-go
go get github.com/hibiken/asynq
go get go.temporal.io/sdkUtilities
工具脚本
Use scripts for setup automation:
- Kafka setup: Run for test utilities
python scripts/kafka_producer_consumer.py - Schema validation: Run to validate event schemas
python scripts/validate_message_schema.py
使用以下脚本实现自动化部署:
- Kafka部署:运行 获取测试工具
python scripts/kafka_producer_consumer.py - Schema验证:运行 验证事件Schema
python scripts/validate_message_schema.py
Related Skills
相关技能
- api-patterns: API design for async job submission
- realtime-sync: WebSocket/SSE for job status updates
- feedback: Toast notifications for job completion
- databases-*: Persistent storage for event logs Message Queues v1.1 - Enhanced
- api-patterns:异步任务提交的API设计
- realtime-sync:通过WebSocket/SSE实现任务状态更新
- feedback:任务完成的Toast通知
- databases-*:事件日志的持久化存储 Message Queues v1.1 - 增强版
🔄 Workflow
🔄 工作流
Aşama 1: Design Phase
阶段1:设计阶段
- Pattern Selection: Point-to-Point (Queue) mi Pub-Sub (Topic) mi karar ver.
- Schema Registry: Mesaj formatını (Avro/Protobuf) ve versiyonlamayı baştan yap.
- Partitioning: Veri dağılımını (Ordering garantisi için Key seçimi) planla.
- 模式选择:决定使用点对点(Queue)还是发布-订阅(Topic)模式。
- Schema注册表:预先定义消息格式(Avro/Protobuf)及版本规则。
- 分区规划:规划数据分布(为保证顺序性选择合适的Key)。
Aşama 2: Implementation Checklist
阶段2:实施检查清单
- Idempotency: Consumer tarafında "Exactly-Once" veya "At-Least-Once" stratejisini kur.
- DLQ: İşlenemeyen mesajlar için Dead Letter Queue ve Alarm kur.
- Backpressure: Consumer yavaşlarsa Producer'ı yavaşlatacak mekanizmayı düşün.
- 幂等性设置:在Consumer端配置“Exactly-Once”或“At-Least-Once”策略。
- 死信队列(DLQ)配置:为无法处理的消息设置死信队列及告警机制。
- 背压处理:考虑当Consumer处理缓慢时,减缓Producer发送速度的机制。
Aşama 3: Operations
阶段3:运维阶段
- Lag Monitoring: Consumer Lag (üretim hızı vs tüketim hızı) metriğini izle.
- Retention: Disk doluluğunu önlemek için retention policy (süre veya boyut) ayarla.
- 延迟监控:监控Consumer Lag(生产速度与消费速度的差值)指标。
- 数据保留策略:设置保留策略(按时间或大小)以避免磁盘耗尽。
Kontrol Noktaları
检查点
| Aşama | Doğrulama |
|---|---|
| 1 | Mesaj sırasında (ordering) bozulma iş mantığını bozuyor mu? |
| 2 | Sistem 24 saatlik log kaybına dayanıklı mı (Durability)? |
| 3 | Poison message (formatı bozuk mesaj) sistemi kilitliyor mu? |
| 阶段 | 验证内容 |
|---|---|
| 1 | 消息顺序(ordering)是否会破坏业务逻辑? |
| 2 | 系统是否能承受24小时的日志丢失(持久性)? |
| 3 | 有毒消息(格式错误的消息)是否会导致系统阻塞? |