message_queues

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Message Queues

消息队列

Implement asynchronous communication patterns for event-driven architectures, background job processing, and service decoupling.
为事件驱动架构、后台任务处理和服务解耦实现异步通信模式。

When to Use This Skill

适用场景

Use message queues when:
  • Long-running operations block HTTP requests (report generation, video processing)
  • Service decoupling required (microservices, event-driven architecture)
  • Guaranteed delivery needed (payment processing, order fulfillment)
  • Event streaming for analytics (log aggregation, metrics pipelines)
  • Workflow orchestration for complex processes (multi-step sagas, human-in-the-loop)
  • Background job processing (email sending, image resizing)
在以下场景中使用消息队列:
  • 长时间运行的操作会阻塞HTTP请求(报表生成、视频处理)
  • 需要服务解耦(微服务、事件驱动架构)
  • 需要消息可靠投递(支付处理、订单履约)
  • 需要事件流用于分析(日志聚合、指标流水线)
  • 需要复杂流程的工作流编排(多阶段事务补偿、人工介入流程)
  • 后台任务处理(邮件发送、图片缩放)

Broker Selection Decision Tree

消息代理选择决策树

Choose message broker based on primary need:
根据核心需求选择消息代理:

Event Streaming / Log Aggregation

事件流 / 日志聚合

→ Apache Kafka
  • Throughput: 500K-1M msg/s
  • Replay events (event sourcing)
  • Exactly-once semantics
  • Long-term retention
  • Use: Analytics pipelines, CQRS, event sourcing
→ Apache Kafka
  • 吞吐量:50万-100万条消息/秒
  • 支持事件重放(事件溯源)
  • 精确一次语义
  • 长期数据保留
  • 适用场景:分析流水线、CQRS、事件溯源

Simple Background Jobs

简单后台任务

→ Task Queues
  • Python → Celery + Redis
  • TypeScript → BullMQ + Redis
  • Go → Asynq + Redis
  • Use: Email sending, report generation, webhooks
→ 任务队列
  • Python → Celery + Redis
  • TypeScript → BullMQ + Redis
  • Go → Asynq + Redis
  • 适用场景:邮件发送、报表生成、Webhook处理

Complex Workflows / Sagas

复杂工作流 / 事务补偿(Saga)

→ Temporal
  • Durable execution (survives restarts)
  • Saga pattern support
  • Human-in-the-loop workflows
  • Use: Order processing, AI agent orchestration
→ Temporal
  • 持久化执行(可重启恢复)
  • 支持Saga模式
  • 支持人工介入的工作流
  • 适用场景:订单处理、AI Agent编排

Request-Reply / RPC Patterns

请求-响应 / RPC模式

→ NATS
  • Built-in request-reply
  • Sub-millisecond latency
  • Cloud-native, simple operations
  • Use: Microservices RPC, IoT command/control
→ NATS
  • 内置请求-响应机制
  • 亚毫秒级延迟
  • 云原生、运维简单
  • 适用场景:微服务RPC、物联网指令控制

Complex Message Routing

复杂消息路由

→ RabbitMQ
  • Exchanges (direct, topic, fanout, headers)
  • Dead letter exchanges
  • Message TTL, priorities
  • Use: Multi-consumer patterns, pub/sub
→ RabbitMQ
  • 支持多种交换机(直连、主题、扇出、头部)
  • 死信交换机
  • 消息TTL、优先级设置
  • 适用场景:多消费者模式、发布/订阅

Already Using Redis

已在使用Redis

→ Redis Streams
  • No new infrastructure
  • Simple consumer groups
  • Moderate throughput (100K+ msg/s)
  • Use: Notification queues, simple job queues
→ Redis Streams
  • 无需新增基础设施
  • 简单的消费者组
  • 中等吞吐量(10万+条消息/秒)
  • 适用场景:通知队列、简单任务队列

Performance Comparison

性能对比

BrokerThroughputLatency (p99)Best For
Kafka500K-1M msg/s10-50msEvent streaming
NATS JetStream200K-400K msg/sSub-ms to 5msCloud-native microservices
RabbitMQ50K-100K msg/s5-20msTask queues, complex routing
Redis Streams100K+ msg/sSub-msSimple queues, caching
代理吞吐量P99延迟最佳适用场景
Kafka50万-100万条消息/秒10-50ms事件流
NATS JetStream20万-40万条消息/秒亚毫秒至5ms云原生微服务
RabbitMQ5万-10万条消息/秒5-20ms任务队列、复杂路由
Redis Streams10万+条消息/秒亚毫秒简单队列、缓存

Quick Start Examples

快速开始示例

Kafka Producer/Consumer (Python)

Kafka生产者/消费者(Python)

See
examples/kafka-python/
for working code.
python
from confluent_kafka import Producer, Consumer
完整代码请查看
examples/kafka-python/
python
from confluent_kafka import Producer, Consumer

Producer

Producer

producer = Producer({'bootstrap.servers': 'localhost:9092'}) producer.produce('orders', key='order_123', value='{"status": "created"}') producer.flush()
producer = Producer({'bootstrap.servers': 'localhost:9092'}) producer.produce('orders', key='order_123', value='{"status": "created"}') producer.flush()

Consumer

Consumer

consumer = Consumer({ 'bootstrap.servers': 'localhost:9092', 'group.id': 'order-processors', 'auto.offset.reset': 'earliest' }) consumer.subscribe(['orders'])
while True: msg = consumer.poll(1.0) if msg is not None: process_order(msg.value())
undefined
consumer = Consumer({ 'bootstrap.servers': 'localhost:9092', 'group.id': 'order-processors', 'auto.offset.reset': 'earliest' }) consumer.subscribe(['orders'])
while True: msg = consumer.poll(1.0) if msg is not None: process_order(msg.value())
undefined

Celery Background Jobs (Python)

Celery后台任务(Python)

See
examples/celery-image-processing/
for full implementation.
python
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379')

@app.task(bind=True, max_retries=3)
def process_image(self, image_url: str):
    try:
        result = expensive_image_processing(image_url)
        return result
    except RecoverableError as e:
        raise self.retry(exc=e, countdown=60)
完整实现请查看
examples/celery-image-processing/
python
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379')

@app.task(bind=True, max_retries=3)
def process_image(self, image_url: str):
    try:
        result = expensive_image_processing(image_url)
        return result
    except RecoverableError as e:
        raise self.retry(exc=e, countdown=60)

BullMQ Job Processing (TypeScript)

BullMQ任务处理(TypeScript)

See
examples/bullmq-webhook-processor/
for full implementation.
typescript
import { Queue, Worker } from 'bullmq'

const queue = new Queue('webhooks', {
  connection: { host: 'localhost', port: 6379 }
})

// Enqueue job
await queue.add('send-webhook', {
  url: 'https://example.com/webhook',
  payload: { event: 'order.created' }
})

// Process jobs
const worker = new Worker('webhooks', async job => {
  await fetch(job.data.url, {
    method: 'POST',
    body: JSON.stringify(job.data.payload)
  })
}, { connection: { host: 'localhost', port: 6379 } })
完整实现请查看
examples/bullmq-webhook-processor/
typescript
import { Queue, Worker } from 'bullmq'

const queue = new Queue('webhooks', {
  connection: { host: 'localhost', port: 6379 }
})

// 入队任务
await queue.add('send-webhook', {
  url: 'https://example.com/webhook',
  payload: { event: 'order.created' }
})

// 处理任务
const worker = new Worker('webhooks', async job => {
  await fetch(job.data.url, {
    method: 'POST',
    body: JSON.stringify(job.data.payload)
  })
}, { connection: { host: 'localhost', port: 6379 } })

Temporal Workflow Orchestration

Temporal工作流编排

See
examples/temporal-order-saga/
for saga pattern implementation.
python
from temporalio import workflow, activity
from datetime import timedelta

@workflow.defn
class OrderSagaWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> str:
        # Step 1: Reserve inventory
        inventory_id = await workflow.execute_activity(
            reserve_inventory,
            order_id,
            start_to_close_timeout=timedelta(seconds=10),
        )

        # Step 2: Charge payment
        payment_id = await workflow.execute_activity(
            charge_payment,
            order_id,
            start_to_close_timeout=timedelta(seconds=30),
        )

        return f"Order {order_id} completed"
Saga模式实现请查看
examples/temporal-order-saga/
python
from temporalio import workflow, activity
from datetime import timedelta

@workflow.defn
class OrderSagaWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> str:
        # 步骤1:预留库存
        inventory_id = await workflow.execute_activity(
            reserve_inventory,
            order_id,
            start_to_close_timeout=timedelta(seconds=10),
        )

        # 步骤2:扣款
        payment_id = await workflow.execute_activity(
            charge_payment,
            order_id,
            start_to_close_timeout=timedelta(seconds=30),
        )

        return f"Order {order_id} completed"

Core Patterns

核心模式

Event Naming Convention

事件命名规范

Use:
Domain.Entity.Action.Version
Examples:
  • order.created.v1
  • user.profile.updated.v2
  • payment.failed.v1
使用格式:
领域.实体.动作.版本
示例:
  • order.created.v1
  • user.profile.updated.v2
  • payment.failed.v1

Event Schema Structure

事件Schema结构

json
{
  "event_type": "order.created.v2",
  "event_id": "uuid-here",
  "timestamp": "2025-12-02T10:00:00Z",
  "version": "2.0",
  "data": {
    "order_id": "ord_123",
    "customer_id": "cus_456"
  },
  "metadata": {
    "producer": "order-service",
    "trace_id": "abc123",
    "correlation_id": "xyz789"
  }
}
json
{
  "event_type": "order.created.v2",
  "event_id": "uuid-here",
  "timestamp": "2025-12-02T10:00:00Z",
  "version": "2.0",
  "data": {
    "order_id": "ord_123",
    "customer_id": "cus_456"
  },
  "metadata": {
    "producer": "order-service",
    "trace_id": "abc123",
    "correlation_id": "xyz789"
  }
}

Dead Letter Queue Pattern

死信队列(DLQ)模式

Route failed messages to dead letter queue (DLQ) after max retries:
python
@app.task(bind=True, max_retries=3)
def process_order(self, order_id: str):
    try:
        result = perform_processing(order_id)
        return result
    except UnrecoverableError as e:
        send_to_dlq(order_id, str(e))
        raise Reject(e, requeue=False)
达到最大重试次数后,将失败消息路由到死信队列:
python
@app.task(bind=True, max_retries=3)
def process_order(self, order_id: str):
    try:
        result = perform_processing(order_id)
        return result
    except UnrecoverableError as e:
        send_to_dlq(order_id, str(e))
        raise Reject(e, requeue=False)

Idempotency for Exactly-Once Processing

精确一次处理的幂等性实现

python
@app.post("/process")
async def process_payment(
    payment_data: dict,
    idempotency_key: str = Header(None)
):
    # Check if already processed
    cached_result = redis_client.get(f"idempotency:{idempotency_key}")
    if cached_result:
        return {"status": "already_processed"}

    result = process_payment_logic(payment_data)
    redis_client.setex(f"idempotency:{idempotency_key}", 86400, result)
    return {"status": "processed", "result": result}
python
@app.post("/process")
async def process_payment(
    payment_data: dict,
    idempotency_key: str = Header(None)
):
    # 检查是否已处理
    cached_result = redis_client.get(f"idempotency:{idempotency_key}")
    if cached_result:
        return {"status": "already_processed"}

    result = process_payment_logic(payment_data)
    redis_client.setex(f"idempotency:{idempotency_key}", 86400, result)
    return {"status": "processed", "result": result}

Frontend Integration

前端集成

Job Status Updates via SSE

通过SSE实现任务状态更新

python
undefined
python
undefined

FastAPI endpoint for real-time job status

FastAPI实时任务状态端点

@app.get("/status/{task_id}") async def task_status_stream(task_id: str): async def event_generator(): while True: task = celery_app.AsyncResult(task_id)
        if task.state == 'PROGRESS':
            yield {"event": "progress", "data": task.info.get('progress', 0)}
        elif task.state == 'SUCCESS':
            yield {"event": "complete", "data": task.result}
            break

        await asyncio.sleep(0.5)

return EventSourceResponse(event_generator())
undefined
@app.get("/status/{task_id}") async def task_status_stream(task_id: str): async def event_generator(): while True: task = celery_app.AsyncResult(task_id)
        if task.state == 'PROGRESS':
            yield {"event": "progress", "data": task.info.get('progress', 0)}
        elif task.state == 'SUCCESS':
            yield {"event": "complete", "data": task.result}
            break

        await asyncio.sleep(0.5)

return EventSourceResponse(event_generator())
undefined

React Component

React组件

typescript
export function JobStatus({ jobId }: { jobId: string }) {
  const [progress, setProgress] = useState(0)

  useEffect(() => {
    const eventSource = new EventSource(`/api/status/${jobId}`)

    eventSource.addEventListener('progress', (e) => {
      setProgress(JSON.parse(e.data))
    })

    eventSource.addEventListener('complete', (e) => {
      toast({ title: 'Job complete', description: JSON.parse(e.data) })
      eventSource.close()
    })

    return () => eventSource.close()
  }, [jobId])

  return <ProgressBar value={progress} />
}
typescript
export function JobStatus({ jobId }: { jobId: string }) {
  const [progress, setProgress] = useState(0)

  useEffect(() => {
    const eventSource = new EventSource(`/api/status/${jobId}`)

    eventSource.addEventListener('progress', (e) => {
      setProgress(JSON.parse(e.data))
    })

    eventSource.addEventListener('complete', (e) => {
      toast({ title: 'Job complete', description: JSON.parse(e.data) })
      eventSource.close()
    })

    return () => eventSource.close()
  }, [jobId])

  return <ProgressBar value={progress} />
}

Detailed Guides

详细指南

For comprehensive documentation, see reference files:
完整文档请参考以下文件:

Broker-Specific Guides

代理专属指南

  • Kafka: See
    references/kafka.md
    for partitioning, consumer groups, exactly-once semantics
  • RabbitMQ: See
    references/rabbitmq.md
    for exchanges, bindings, routing patterns
  • NATS: See
    references/nats.md
    for JetStream, request-reply patterns
  • Redis Streams: See
    references/redis-streams.md
    for consumer groups, acknowledgments
  • Kafka:查看
    references/kafka.md
    了解分区、消费者组、精确一次语义
  • RabbitMQ:查看
    references/rabbitmq.md
    了解交换机、绑定、路由模式
  • NATS:查看
    references/nats.md
    了解JetStream、请求-响应模式
  • Redis Streams:查看
    references/redis-streams.md
    了解消费者组、消息确认

Task Queue Guides

任务队列指南

  • Celery: See
    references/celery.md
    for periodic tasks, canvas (workflows), monitoring
  • BullMQ: See
    references/bullmq.md
    for job prioritization, flows, Bull Board monitoring
  • Temporal: See
    references/temporal-workflows.md
    for saga patterns, signals, queries
  • Celery:查看
    references/celery.md
    了解定时任务、Canvas(工作流)、监控
  • BullMQ:查看
    references/bullmq.md
    了解任务优先级、流程、Bull Board监控
  • Temporal:查看
    references/temporal-workflows.md
    了解Saga模式、信号、查询

Pattern Guides

模式指南

  • Event Patterns: See
    references/event-patterns.md
    for event sourcing, CQRS, outbox pattern
  • 事件模式:查看
    references/event-patterns.md
    了解事件溯源、CQRS、Outbox模式

Common Anti-Patterns to Avoid

需避免的常见反模式

1. Synchronous API for Long Operations

1. 长时间操作使用同步API

python
undefined
python
undefined

❌ BAD: Blocks request thread

❌ 错误:阻塞请求线程

@app.post("/generate-report") def generate_report(user_id: str): report = expensive_computation(user_id) # 5 minutes! return report
@app.post("/generate-report") def generate_report(user_id: str): report = expensive_computation(user_id) # 耗时5分钟! return report

✅ GOOD: Enqueue background job

✅ 正确:入队后台任务

@app.post("/generate-report") async def generate_report(user_id: str): task = generate_report_task.delay(user_id) return {"task_id": task.id}
undefined
@app.post("/generate-report") async def generate_report(user_id: str): task = generate_report_task.delay(user_id) return {"task_id": task.id}
undefined

2. Non-Idempotent Consumers

2. 非幂等消费者

python
undefined
python
undefined

❌ BAD: Processes duplicates

❌ 错误:重复处理消息

@app.task def send_email(email: str): send_email_service(email) # Sends twice if retried!
@app.task def send_email(email: str): send_email_service(email) # 重试时会发送两次!

✅ GOOD: Idempotent with deduplication

✅ 正确:通过去重实现幂等

@app.task def send_email(email: str, idempotency_key: str): if redis.exists(f"sent:{idempotency_key}"): return "already_sent" send_email_service(email) redis.setex(f"sent:{idempotency_key}", 86400, "1")
undefined
@app.task def send_email(email: str, idempotency_key: str): if redis.exists(f"sent:{idempotency_key}"): return "already_sent" send_email_service(email) redis.setex(f"sent:{idempotency_key}", 86400, "1")
undefined

3. Ignoring Dead Letter Queues

3. 忽略死信队列

python
undefined
python
undefined

❌ BAD: Failed messages lost forever

❌ 错误:失败消息永久丢失

@app.task(max_retries=3) def risky_task(data): process(data) # If all retries fail, data disappears
@app.task(max_retries=3) def risky_task(data): process(data) # 若所有重试失败,数据将消失

✅ GOOD: DLQ for manual inspection

✅ 正确:使用DLQ进行人工排查

@app.task(max_retries=3) def risky_task(data): try: process(data) except Exception as e: if self.request.retries >= 3: send_to_dlq(data, str(e)) raise
undefined
@app.task(max_retries=3) def risky_task(data): try: process(data) except Exception as e: if self.request.retries >= 3: send_to_dlq(data, str(e)) raise
undefined

4. Using Kafka for Request-Reply

4. 使用Kafka实现请求-响应

python
undefined
python
undefined

❌ BAD: Kafka is not designed for RPC

❌ 错误:Kafka并非为RPC设计

def get_user_profile(user_id: str): kafka_producer.send("user_requests", {"user_id": user_id}) # How to correlate response? Kafka is asynchronous!
def get_user_profile(user_id: str): kafka_producer.send("user_requests", {"user_id": user_id}) # 如何关联响应?Kafka是异步的!

✅ GOOD: Use NATS request-reply or HTTP/gRPC

✅ 正确:使用NATS请求-响应或HTTP/gRPC

response = await nats.request("user.profile", user_id.encode())
undefined
response = await nats.request("user.profile", user_id.encode())
undefined

Library Recommendations

库推荐

Context7 Research

Context7研究

Confluent Kafka (Python)
  • Context7 ID:
    /confluentinc/confluent-kafka-python
  • Trust Score: 68.8/100
  • Code Snippets: 192+
  • Production-ready Python Kafka client
Temporal
  • Context7 ID:
    /websites/temporal_io
  • Trust Score: 80.9/100
  • Code Snippets: 3,769+
  • Workflow orchestration for durable execution
Confluent Kafka (Python)
  • Context7 ID:
    /confluentinc/confluent-kafka-python
  • 信任评分:68.8/100
  • 代码片段:192+个
  • 生产就绪的Python Kafka客户端
Temporal
  • Context7 ID:
    /websites/temporal_io
  • 信任评分:80.9/100
  • 代码片段:3769+个
  • 用于持久化执行的工作流编排框架

Installation

安装命令

Python:
bash
pip install confluent-kafka celery[redis] temporalio aio-pika redis
TypeScript/Node.js:
bash
npm install kafkajs bullmq @temporalio/client amqplib ioredis
Rust:
bash
cargo add rdkafka lapin async-nats redis
Go:
bash
go get github.com/confluentinc/confluent-kafka-go
go get github.com/hibiken/asynq
go get go.temporal.io/sdk
Python:
bash
pip install confluent-kafka celery[redis] temporalio aio-pika redis
TypeScript/Node.js:
bash
npm install kafkajs bullmq @temporalio/client amqplib ioredis
Rust:
bash
cargo add rdkafka lapin async-nats redis
Go:
bash
go get github.com/confluentinc/confluent-kafka-go
go get github.com/hibiken/asynq
go get go.temporal.io/sdk

Utilities

工具脚本

Use scripts for setup automation:
  • Kafka setup: Run
    python scripts/kafka_producer_consumer.py
    for test utilities
  • Schema validation: Run
    python scripts/validate_message_schema.py
    to validate event schemas
使用以下脚本实现自动化部署:
  • Kafka部署:运行
    python scripts/kafka_producer_consumer.py
    获取测试工具
  • Schema验证:运行
    python scripts/validate_message_schema.py
    验证事件Schema

Related Skills

相关技能

  • api-patterns: API design for async job submission
  • realtime-sync: WebSocket/SSE for job status updates
  • feedback: Toast notifications for job completion
  • databases-*: Persistent storage for event logs Message Queues v1.1 - Enhanced
  • api-patterns:异步任务提交的API设计
  • realtime-sync:通过WebSocket/SSE实现任务状态更新
  • feedback:任务完成的Toast通知
  • databases-*:事件日志的持久化存储 Message Queues v1.1 - 增强版

🔄 Workflow

🔄 工作流

Aşama 1: Design Phase

阶段1:设计阶段

  • Pattern Selection: Point-to-Point (Queue) mi Pub-Sub (Topic) mi karar ver.
  • Schema Registry: Mesaj formatını (Avro/Protobuf) ve versiyonlamayı baştan yap.
  • Partitioning: Veri dağılımını (Ordering garantisi için Key seçimi) planla.
  • 模式选择:决定使用点对点(Queue)还是发布-订阅(Topic)模式。
  • Schema注册表:预先定义消息格式(Avro/Protobuf)及版本规则。
  • 分区规划:规划数据分布(为保证顺序性选择合适的Key)。

Aşama 2: Implementation Checklist

阶段2:实施检查清单

  • Idempotency: Consumer tarafında "Exactly-Once" veya "At-Least-Once" stratejisini kur.
  • DLQ: İşlenemeyen mesajlar için Dead Letter Queue ve Alarm kur.
  • Backpressure: Consumer yavaşlarsa Producer'ı yavaşlatacak mekanizmayı düşün.
  • 幂等性设置:在Consumer端配置“Exactly-Once”或“At-Least-Once”策略。
  • 死信队列(DLQ)配置:为无法处理的消息设置死信队列及告警机制。
  • 背压处理:考虑当Consumer处理缓慢时,减缓Producer发送速度的机制。

Aşama 3: Operations

阶段3:运维阶段

  • Lag Monitoring: Consumer Lag (üretim hızı vs tüketim hızı) metriğini izle.
  • Retention: Disk doluluğunu önlemek için retention policy (süre veya boyut) ayarla.
  • 延迟监控:监控Consumer Lag(生产速度与消费速度的差值)指标。
  • 数据保留策略:设置保留策略(按时间或大小)以避免磁盘耗尽。

Kontrol Noktaları

检查点

AşamaDoğrulama
1Mesaj sırasında (ordering) bozulma iş mantığını bozuyor mu?
2Sistem 24 saatlik log kaybına dayanıklı mı (Durability)?
3Poison message (formatı bozuk mesaj) sistemi kilitliyor mu?
阶段验证内容
1消息顺序(ordering)是否会破坏业务逻辑?
2系统是否能承受24小时的日志丢失(持久性)?
3有毒消息(格式错误的消息)是否会导致系统阻塞?