using-message-queues

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Message Queues

消息队列

Implement asynchronous communication patterns for event-driven architectures, background job processing, and service decoupling.
为事件驱动架构、后台任务处理和服务解耦实现异步通信模式。

When to Use This Skill

何时使用该技能

Use message queues when:
  • Long-running operations block HTTP requests (report generation, video processing)
  • Service decoupling required (microservices, event-driven architecture)
  • Guaranteed delivery needed (payment processing, order fulfillment)
  • Event streaming for analytics (log aggregation, metrics pipelines)
  • Workflow orchestration for complex processes (multi-step sagas, human-in-the-loop)
  • Background job processing (email sending, image resizing)
在以下场景中使用消息队列:
  • 长时间运行的操作会阻塞HTTP请求(报表生成、视频处理)
  • 需要服务解耦(微服务、事件驱动架构)
  • 需要消息可靠投递(支付处理、订单履行)
  • 用于分析的事件流(日志聚合、指标流水线)
  • 复杂流程的工作流编排(多步骤事务补偿、人工介入流程)
  • 后台任务处理(邮件发送、图片缩放)

Broker Selection Decision Tree

消息代理选择决策树

Choose message broker based on primary need:
根据核心需求选择消息代理:

Event Streaming / Log Aggregation

事件流 / 日志聚合

→ Apache Kafka
  • Throughput: 500K-1M msg/s
  • Replay events (event sourcing)
  • Exactly-once semantics
  • Long-term retention
  • Use: Analytics pipelines, CQRS, event sourcing
→ Apache Kafka
  • 吞吐量:500K-1M 消息/秒
  • 可重放事件(事件溯源)
  • 精确一次语义
  • 长期数据保留
  • 适用场景:分析流水线、CQRS、事件溯源

Simple Background Jobs

简单后台任务

→ Task Queues
  • Python → Celery + Redis
  • TypeScript → BullMQ + Redis
  • Go → Asynq + Redis
  • Use: Email sending, report generation, webhooks
→ 任务队列
  • Python → Celery + Redis
  • TypeScript → BullMQ + Redis
  • Go → Asynq + Redis
  • 适用场景:邮件发送、报表生成、Webhook处理

Complex Workflows / Sagas

复杂工作流 / 事务补偿(Saga)

→ Temporal
  • Durable execution (survives restarts)
  • Saga pattern support
  • Human-in-the-loop workflows
  • Use: Order processing, AI agent orchestration
→ Temporal
  • 持久化执行(可在重启后恢复)
  • 支持Saga模式
  • 支持人工介入的工作流
  • 适用场景:订单处理、AI Agent编排

Request-Reply / RPC Patterns

请求-响应 / RPC模式

→ NATS
  • Built-in request-reply
  • Sub-millisecond latency
  • Cloud-native, simple operations
  • Use: Microservices RPC, IoT command/control
→ NATS
  • 内置请求-响应机制
  • 亚毫秒级延迟
  • 云原生、操作简单
  • 适用场景:微服务RPC、物联网命令/控制

Complex Message Routing

复杂消息路由

→ RabbitMQ
  • Exchanges (direct, topic, fanout, headers)
  • Dead letter exchanges
  • Message TTL, priorities
  • Use: Multi-consumer patterns, pub/sub
→ RabbitMQ
  • 交换机(直连、主题、扇出、头部)
  • 死信交换机
  • 消息TTL、优先级
  • 适用场景:多消费者模式、发布/订阅

Already Using Redis

已使用Redis

→ Redis Streams
  • No new infrastructure
  • Simple consumer groups
  • Moderate throughput (100K+ msg/s)
  • Use: Notification queues, simple job queues
→ Redis Streams
  • 无需新增基础设施
  • 简单的消费者组
  • 中等吞吐量(100K+ 消息/秒)
  • 适用场景:通知队列、简单任务队列

Performance Comparison

性能对比

BrokerThroughputLatency (p99)Best For
Kafka500K-1M msg/s10-50msEvent streaming
NATS JetStream200K-400K msg/sSub-ms to 5msCloud-native microservices
RabbitMQ50K-100K msg/s5-20msTask queues, complex routing
Redis Streams100K+ msg/sSub-msSimple queues, caching
消息代理吞吐量延迟(p99)最佳适用场景
Kafka500K-1M 消息/秒10-50ms事件流
NATS JetStream200K-400K 消息/秒亚毫秒至5ms云原生微服务
RabbitMQ50K-100K 消息/秒5-20ms任务队列、复杂路由
Redis Streams100K+ 消息/秒亚毫秒简单队列、缓存

Quick Start Examples

快速入门示例

Kafka Producer/Consumer (Python)

Kafka生产者/消费者(Python)

See
examples/kafka-python/
for working code.
python
from confluent_kafka import Producer, Consumer
完整代码请查看
examples/kafka-python/
python
from confluent_kafka import Producer, Consumer

Producer

Producer

producer = Producer({'bootstrap.servers': 'localhost:9092'}) producer.produce('orders', key='order_123', value='{"status": "created"}') producer.flush()
producer = Producer({'bootstrap.servers': 'localhost:9092'}) producer.produce('orders', key='order_123', value='{"status": "created"}') producer.flush()

Consumer

Consumer

consumer = Consumer({ 'bootstrap.servers': 'localhost:9092', 'group.id': 'order-processors', 'auto.offset.reset': 'earliest' }) consumer.subscribe(['orders'])
while True: msg = consumer.poll(1.0) if msg is not None: process_order(msg.value())
undefined
consumer = Consumer({ 'bootstrap.servers': 'localhost:9092', 'group.id': 'order-processors', 'auto.offset.reset': 'earliest' }) consumer.subscribe(['orders'])
while True: msg = consumer.poll(1.0) if msg is not None: process_order(msg.value())
undefined

Celery Background Jobs (Python)

Celery后台任务(Python)

See
examples/celery-image-processing/
for full implementation.
python
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379')

@app.task(bind=True, max_retries=3)
def process_image(self, image_url: str):
    try:
        result = expensive_image_processing(image_url)
        return result
    except RecoverableError as e:
        raise self.retry(exc=e, countdown=60)
完整实现请查看
examples/celery-image-processing/
python
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379')

@app.task(bind=True, max_retries=3)
def process_image(self, image_url: str):
    try:
        result = expensive_image_processing(image_url)
        return result
    except RecoverableError as e:
        raise self.retry(exc=e, countdown=60)

BullMQ Job Processing (TypeScript)

BullMQ任务处理(TypeScript)

See
examples/bullmq-webhook-processor/
for full implementation.
typescript
import { Queue, Worker } from 'bullmq'

const queue = new Queue('webhooks', {
  connection: { host: 'localhost', port: 6379 }
})

// Enqueue job
await queue.add('send-webhook', {
  url: 'https://example.com/webhook',
  payload: { event: 'order.created' }
})

// Process jobs
const worker = new Worker('webhooks', async job => {
  await fetch(job.data.url, {
    method: 'POST',
    body: JSON.stringify(job.data.payload)
  })
}, { connection: { host: 'localhost', port: 6379 } })
完整实现请查看
examples/bullmq-webhook-processor/
typescript
import { Queue, Worker } from 'bullmq'

const queue = new Queue('webhooks', {
  connection: { host: 'localhost', port: 6379 }
})

// Enqueue job
await queue.add('send-webhook', {
  url: 'https://example.com/webhook',
  payload: { event: 'order.created' }
})

// Process jobs
const worker = new Worker('webhooks', async job => {
  await fetch(job.data.url, {
    method: 'POST',
    body: JSON.stringify(job.data.payload)
  })
}, { connection: { host: 'localhost', port: 6379 } })

Temporal Workflow Orchestration

Temporal工作流编排

See
examples/temporal-order-saga/
for saga pattern implementation.
python
from temporalio import workflow, activity
from datetime import timedelta

@workflow.defn
class OrderSagaWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> str:
        # Step 1: Reserve inventory
        inventory_id = await workflow.execute_activity(
            reserve_inventory,
            order_id,
            start_to_close_timeout=timedelta(seconds=10),
        )

        # Step 2: Charge payment
        payment_id = await workflow.execute_activity(
            charge_payment,
            order_id,
            start_to_close_timeout=timedelta(seconds=30),
        )

        return f"Order {order_id} completed"
Saga模式实现请查看
examples/temporal-order-saga/
python
from temporalio import workflow, activity
from datetime import timedelta

@workflow.defn
class OrderSagaWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> str:
        # Step 1: Reserve inventory
        inventory_id = await workflow.execute_activity(
            reserve_inventory,
            order_id,
            start_to_close_timeout=timedelta(seconds=10),
        )

        # Step 2: Charge payment
        payment_id = await workflow.execute_activity(
            charge_payment,
            order_id,
            start_to_close_timeout=timedelta(seconds=30),
        )

        return f"Order {order_id} completed"

Core Patterns

核心模式

Event Naming Convention

事件命名规范

Use:
Domain.Entity.Action.Version
Examples:
  • order.created.v1
  • user.profile.updated.v2
  • payment.failed.v1
使用格式:
Domain.Entity.Action.Version
Examples:
  • order.created.v1
  • user.profile.updated.v2
  • payment.failed.v1

Event Schema Structure

事件Schema结构

json
{
  "event_type": "order.created.v2",
  "event_id": "uuid-here",
  "timestamp": "2025-12-02T10:00:00Z",
  "version": "2.0",
  "data": {
    "order_id": "ord_123",
    "customer_id": "cus_456"
  },
  "metadata": {
    "producer": "order-service",
    "trace_id": "abc123",
    "correlation_id": "xyz789"
  }
}
json
{
  "event_type": "order.created.v2",
  "event_id": "uuid-here",
  "timestamp": "2025-12-02T10:00:00Z",
  "version": "2.0",
  "data": {
    "order_id": "ord_123",
    "customer_id": "cus_456"
  },
  "metadata": {
    "producer": "order-service",
    "trace_id": "abc123",
    "correlation_id": "xyz789"
  }
}

Dead Letter Queue Pattern

死信队列模式

Route failed messages to dead letter queue (DLQ) after max retries:
python
@app.task(bind=True, max_retries=3)
def process_order(self, order_id: str):
    try:
        result = perform_processing(order_id)
        return result
    except UnrecoverableError as e:
        send_to_dlq(order_id, str(e))
        raise Reject(e, requeue=False)
在达到最大重试次数后,将失败消息路由到死信队列(DLQ):
python
@app.task(bind=True, max_retries=3)
def process_order(self, order_id: str):
    try:
        result = perform_processing(order_id)
        return result
    except UnrecoverableError as e:
        send_to_dlq(order_id, str(e))
        raise Reject(e, requeue=False)

Idempotency for Exactly-Once Processing

精确一次处理的幂等性实现

python
@app.post("/process")
async def process_payment(
    payment_data: dict,
    idempotency_key: str = Header(None)
):
    # Check if already processed
    cached_result = redis_client.get(f"idempotency:{idempotency_key}")
    if cached_result:
        return {"status": "already_processed"}

    result = process_payment_logic(payment_data)
    redis_client.setex(f"idempotency:{idempotency_key}", 86400, result)
    return {"status": "processed", "result": result}
python
@app.post("/process")
async def process_payment(
    payment_data: dict,
    idempotency_key: str = Header(None)
):
    # Check if already processed
    cached_result = redis_client.get(f"idempotency:{idempotency_key}")
    if cached_result:
        return {"status": "already_processed"}

    result = process_payment_logic(payment_data)
    redis_client.setex(f"idempotency:{idempotency_key}", 86400, result)
    return {"status": "processed", "result": result}

Frontend Integration

前端集成

Job Status Updates via SSE

通过SSE实现任务状态更新

python
undefined
python
undefined

FastAPI endpoint for real-time job status

FastAPI endpoint for real-time job status

@app.get("/status/{task_id}") async def task_status_stream(task_id: str): async def event_generator(): while True: task = celery_app.AsyncResult(task_id)
        if task.state == 'PROGRESS':
            yield {"event": "progress", "data": task.info.get('progress', 0)}
        elif task.state == 'SUCCESS':
            yield {"event": "complete", "data": task.result}
            break

        await asyncio.sleep(0.5)

return EventSourceResponse(event_generator())
undefined
@app.get("/status/{task_id}") async def task_status_stream(task_id: str): async def event_generator(): while True: task = celery_app.AsyncResult(task_id)
        if task.state == 'PROGRESS':
            yield {"event": "progress", "data": task.info.get('progress', 0)}
        elif task.state == 'SUCCESS':
            yield {"event": "complete", "data": task.result}
            break

        await asyncio.sleep(0.5)

return EventSourceResponse(event_generator())
undefined

React Component

React组件

typescript
export function JobStatus({ jobId }: { jobId: string }) {
  const [progress, setProgress] = useState(0)

  useEffect(() => {
    const eventSource = new EventSource(`/api/status/${jobId}`)

    eventSource.addEventListener('progress', (e) => {
      setProgress(JSON.parse(e.data))
    })

    eventSource.addEventListener('complete', (e) => {
      toast({ title: 'Job complete', description: JSON.parse(e.data) })
      eventSource.close()
    })

    return () => eventSource.close()
  }, [jobId])

  return <ProgressBar value={progress} />
}
typescript
export function JobStatus({ jobId }: { jobId: string }) {
  const [progress, setProgress] = useState(0)

  useEffect(() => {
    const eventSource = new EventSource(`/api/status/${jobId}`)

    eventSource.addEventListener('progress', (e) => {
      setProgress(JSON.parse(e.data))
    })

    eventSource.addEventListener('complete', (e) => {
      toast({ title: 'Job complete', description: JSON.parse(e.data) })
      eventSource.close()
    })

    return () => eventSource.close()
  }, [jobId])

  return <ProgressBar value={progress} />
}

Detailed Guides

详细指南

For comprehensive documentation, see reference files:
如需完整文档,请查看参考文件:

Broker-Specific Guides

消息代理专属指南

  • Kafka: See
    references/kafka.md
    for partitioning, consumer groups, exactly-once semantics
  • RabbitMQ: See
    references/rabbitmq.md
    for exchanges, bindings, routing patterns
  • NATS: See
    references/nats.md
    for JetStream, request-reply patterns
  • Redis Streams: See
    references/redis-streams.md
    for consumer groups, acknowledgments
  • Kafka: 查看
    references/kafka.md
    了解分区、消费者组、精确一次语义
  • RabbitMQ: 查看
    references/rabbitmq.md
    了解交换机、绑定、路由模式
  • NATS: 查看
    references/nats.md
    了解JetStream、请求-响应模式
  • Redis Streams: 查看
    references/redis-streams.md
    了解消费者组、消息确认

Task Queue Guides

任务队列指南

  • Celery: See
    references/celery.md
    for periodic tasks, canvas (workflows), monitoring
  • BullMQ: See
    references/bullmq.md
    for job prioritization, flows, Bull Board monitoring
  • Temporal: See
    references/temporal-workflows.md
    for saga patterns, signals, queries
  • Celery: 查看
    references/celery.md
    了解定时任务、Canvas(工作流)、监控
  • BullMQ: 查看
    references/bullmq.md
    了解任务优先级、流程、Bull Board监控
  • Temporal: 查看
    references/temporal-workflows.md
    了解Saga模式、信号、查询

Pattern Guides

模式指南

  • Event Patterns: See
    references/event-patterns.md
    for event sourcing, CQRS, outbox pattern
  • 事件模式: 查看
    references/event-patterns.md
    了解事件溯源、CQRS、Outbox模式

Common Anti-Patterns to Avoid

需避免的常见反模式

1. Synchronous API for Long Operations

1. 为长时间操作使用同步API

python
undefined
python
undefined

❌ BAD: Blocks request thread

❌ 错误:阻塞请求线程

@app.post("/generate-report") def generate_report(user_id: str): report = expensive_computation(user_id) # 5 minutes! return report
@app.post("/generate-report") def generate_report(user_id: str): report = expensive_computation(user_id) # 耗时5分钟! return report

✅ GOOD: Enqueue background job

✅ 正确:将任务加入后台队列

@app.post("/generate-report") async def generate_report(user_id: str): task = generate_report_task.delay(user_id) return {"task_id": task.id}
undefined
@app.post("/generate-report") async def generate_report(user_id: str): task = generate_report_task.delay(user_id) return {"task_id": task.id}
undefined

2. Non-Idempotent Consumers

2. 非幂等的消费者

python
undefined
python
undefined

❌ BAD: Processes duplicates

❌ 错误:重复处理消息

@app.task def send_email(email: str): send_email_service(email) # Sends twice if retried!
@app.task def send_email(email: str): send_email_service(email) # 如果重试会发送两次!

✅ GOOD: Idempotent with deduplication

✅ 正确:通过去重实现幂等

@app.task def send_email(email: str, idempotency_key: str): if redis.exists(f"sent:{idempotency_key}"): return "already_sent" send_email_service(email) redis.setex(f"sent:{idempotency_key}", 86400, "1")
undefined
@app.task def send_email(email: str, idempotency_key: str): if redis.exists(f"sent:{idempotency_key}"): return "already_sent" send_email_service(email) redis.setex(f"sent:{idempotency_key}", 86400, "1")
undefined

3. Ignoring Dead Letter Queues

3. 忽略死信队列

python
undefined
python
undefined

❌ BAD: Failed messages lost forever

❌ 错误:失败消息永久丢失

@app.task(max_retries=3) def risky_task(data): process(data) # If all retries fail, data disappears
@app.task(max_retries=3) def risky_task(data): process(data) # 如果所有重试失败,数据会消失

✅ GOOD: DLQ for manual inspection

✅ 正确:使用DLQ进行人工检查

@app.task(max_retries=3) def risky_task(data): try: process(data) except Exception as e: if self.request.retries >= 3: send_to_dlq(data, str(e)) raise
undefined
@app.task(max_retries=3) def risky_task(data): try: process(data) except Exception as e: if self.request.retries >= 3: send_to_dlq(data, str(e)) raise
undefined

4. Using Kafka for Request-Reply

4. 使用Kafka实现请求-响应

python
undefined
python
undefined

❌ BAD: Kafka is not designed for RPC

❌ 错误:Kafka并非为RPC设计

def get_user_profile(user_id: str): kafka_producer.send("user_requests", {"user_id": user_id}) # How to correlate response? Kafka is asynchronous!
def get_user_profile(user_id: str): kafka_producer.send("user_requests", {"user_id": user_id}) # 如何关联响应?Kafka是异步的!

✅ GOOD: Use NATS request-reply or HTTP/gRPC

✅ 正确:使用NATS请求-响应或HTTP/gRPC

response = await nats.request("user.profile", user_id.encode())
undefined
response = await nats.request("user.profile", user_id.encode())
undefined

Library Recommendations

库推荐

Context7 Research

Context7研究

Confluent Kafka (Python)
  • Context7 ID:
    /confluentinc/confluent-kafka-python
  • Trust Score: 68.8/100
  • Code Snippets: 192+
  • Production-ready Python Kafka client
Temporal
  • Context7 ID:
    /websites/temporal_io
  • Trust Score: 80.9/100
  • Code Snippets: 3,769+
  • Workflow orchestration for durable execution
Confluent Kafka (Python)
  • Context7 ID:
    /confluentinc/confluent-kafka-python
  • 信任评分:68.8/100
  • 代码片段:192+
  • 生产就绪的Python Kafka客户端
Temporal
  • Context7 ID:
    /websites/temporal_io
  • 信任评分:80.9/100
  • 代码片段:3,769+
  • 用于持久化执行的工作流编排工具

Installation

安装命令

Python:
bash
pip install confluent-kafka celery[redis] temporalio aio-pika redis
TypeScript/Node.js:
bash
npm install kafkajs bullmq @temporalio/client amqplib ioredis
Rust:
bash
cargo add rdkafka lapin async-nats redis
Go:
bash
go get github.com/confluentinc/confluent-kafka-go
go get github.com/hibiken/asynq
go get go.temporal.io/sdk
Python:
bash
pip install confluent-kafka celery[redis] temporalio aio-pika redis
TypeScript/Node.js:
bash
npm install kafkajs bullmq @temporalio/client amqplib ioredis
Rust:
bash
cargo add rdkafka lapin async-nats redis
Go:
bash
go get github.com/confluentinc/confluent-kafka-go
go get github.com/hibiken/asynq
go get go.temporal.io/sdk

Utilities

工具脚本

Use scripts for setup automation:
  • Kafka setup: Run
    python scripts/kafka_producer_consumer.py
    for test utilities
  • Schema validation: Run
    python scripts/validate_message_schema.py
    to validate event schemas
使用以下脚本实现自动化部署:
  • Kafka部署: 运行
    python scripts/kafka_producer_consumer.py
    获取测试工具
  • Schema验证: 运行
    python scripts/validate_message_schema.py
    验证事件Schema

Related Skills

相关技能

  • api-patterns: API design for async job submission
  • realtime-sync: WebSocket/SSE for job status updates
  • feedback: Toast notifications for job completion
  • databases-*: Persistent storage for event logs
  • observability: Tracing and metrics for queue operations
  • api-patterns: 用于异步任务提交的API设计
  • realtime-sync: 用于任务状态更新的WebSocket/SSE
  • feedback: 任务完成的Toast通知
  • databases-*: 事件日志的持久化存储
  • observability: 队列操作的追踪与指标