cqrs-patterns
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCQRS Patterns
CQRS 模式
Separate read and write concerns for optimized data access.
分离读写关注点,优化数据访问。
Overview
概述
- Read-heavy workloads with complex queries
- Different scaling requirements for reads vs writes
- Event sourcing implementations
- Multiple read model representations of same data
- Complex domain models with simple read requirements
- 带有复杂查询的读密集型工作负载
- 读操作与写操作有不同的扩展需求
- 事件溯源的实现场景
- 同一数据的多种读模型表示
- 领域模型复杂但读需求简单的场景
When NOT to Use
不适用于以下场景
- Simple CRUD applications
- Strong consistency requirements everywhere
- Small datasets with simple queries
- 简单的CRUD应用
- 所有场景都要求强一致性
- 数据集小且查询简单的系统
Architecture Overview
架构概述
┌─────────────────┐ ┌─────────────────┐
│ Write Side │ │ Read Side │
├─────────────────┤ ├─────────────────┤
│ ┌───────────┐ │ │ ┌───────────┐ │
│ │ Commands │ │ │ │ Queries │ │
│ └─────┬─────┘ │ │ └─────┬─────┘ │
│ ┌─────▼─────┐ │ │ ┌─────▼─────┐ │
│ │ Aggregate │ │ │ │Read Model │ │
│ └─────┬─────┘ │ │ └───────────┘ │
│ ┌─────▼─────┐ │ │ ▲ │
│ │ Events │──┼─────────┼────────┘ │
│ └───────────┘ │ Publish │ Project │
└─────────────────┘ └─────────────────┘┌─────────────────┐ ┌─────────────────┐
│ Write Side │ │ Read Side │
├─────────────────┤ ├─────────────────┤
│ ┌───────────┐ │ │ ┌───────────┐ │
│ │ Commands │ │ │ │ Queries │ │
│ └─────┬─────┘ │ │ └─────┬─────┘ │
│ ┌─────▼─────┐ │ │ ┌─────▼─────┐ │
│ │ Aggregate │ │ │ │Read Model │ │
│ └─────┬─────┘ │ │ └───────────┘ │
│ ┌─────▼─────┐ │ │ ▲ │
│ │ Events │──┼─────────┼────────┘ │
│ └───────────┘ │ Publish │ Project │
└─────────────────┘ └─────────────────┘Command Side (Write Model)
命令端(写模型)
Command and Handler
命令与处理器
python
from pydantic import BaseModel, Field
from uuid import UUID, uuid4
from datetime import datetime, timezone
from abc import ABC, abstractmethod
class Command(BaseModel):
"""Base command with metadata."""
command_id: UUID = Field(default_factory=uuid4)
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
user_id: UUID | None = None
class CreateOrder(Command):
customer_id: UUID
items: list[OrderItem]
shipping_address: Address
class CommandHandler(ABC):
@abstractmethod
async def handle(self, command: Command) -> list["DomainEvent"]:
pass
class CreateOrderHandler(CommandHandler):
def __init__(self, order_repo, inventory_service):
self.order_repo = order_repo
self.inventory = inventory_service
async def handle(self, command: CreateOrder) -> list[DomainEvent]:
for item in command.items:
if not await self.inventory.check_availability(item.product_id, item.quantity):
raise InsufficientInventoryError(item.product_id)
order = Order.create(
customer_id=command.customer_id,
items=command.items,
shipping_address=command.shipping_address,
)
await self.order_repo.save(order)
return order.pending_events
class CommandBus:
def __init__(self):
self._handlers: dict[type, CommandHandler] = {}
def register(self, command_type: type, handler: CommandHandler):
self._handlers[command_type] = handler
async def dispatch(self, command: Command) -> list[DomainEvent]:
handler = self._handlers.get(type(command))
if not handler:
raise NoHandlerFoundError(type(command))
events = await handler.handle(command)
for event in events:
await self.event_publisher.publish(event)
return eventspython
from pydantic import BaseModel, Field
from uuid import UUID, uuid4
from datetime import datetime, timezone
from abc import ABC, abstractmethod
class Command(BaseModel):
"""Base command with metadata."""
command_id: UUID = Field(default_factory=uuid4)
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
user_id: UUID | None = None
class CreateOrder(Command):
customer_id: UUID
items: list[OrderItem]
shipping_address: Address
class CommandHandler(ABC):
@abstractmethod
async def handle(self, command: Command) -> list["DomainEvent"]:
pass
class CreateOrderHandler(CommandHandler):
def __init__(self, order_repo, inventory_service):
self.order_repo = order_repo
self.inventory = inventory_service
async def handle(self, command: CreateOrder) -> list[DomainEvent]:
for item in command.items:
if not await self.inventory.check_availability(item.product_id, item.quantity):
raise InsufficientInventoryError(item.product_id)
order = Order.create(
customer_id=command.customer_id,
items=command.items,
shipping_address=command.shipping_address,
)
await self.order_repo.save(order)
return order.pending_events
class CommandBus:
def __init__(self):
self._handlers: dict[type, CommandHandler] = {}
def register(self, command_type: type, handler: CommandHandler):
self._handlers[command_type] = handler
async def dispatch(self, command: Command) -> list[DomainEvent]:
handler = self._handlers.get(type(command))
if not handler:
raise NoHandlerFoundError(type(command))
events = await handler.handle(command)
for event in events:
await self.event_publisher.publish(event)
return eventsWrite Model (Aggregate)
写模型(聚合根)
python
from dataclasses import dataclass, field
@dataclass
class Order:
id: UUID
customer_id: UUID
items: list[OrderItem]
status: OrderStatus
_pending_events: list[DomainEvent] = field(default_factory=list)
@classmethod
def create(cls, customer_id: UUID, items: list, shipping_address: Address) -> "Order":
order = cls(id=uuid4(), customer_id=customer_id, items=[], status=OrderStatus.PENDING)
for item in items:
order.items.append(item)
order._raise_event(OrderItemAdded(order_id=order.id, product_id=item.product_id))
order._raise_event(OrderCreated(order_id=order.id, customer_id=customer_id))
return order
def cancel(self, reason: str):
if self.status == OrderStatus.SHIPPED:
raise InvalidOperationError("Cannot cancel shipped order")
self.status = OrderStatus.CANCELLED
self._raise_event(OrderCancelled(order_id=self.id, reason=reason))
def _raise_event(self, event: DomainEvent):
self._pending_events.append(event)
@property
def pending_events(self) -> list[DomainEvent]:
events = self._pending_events.copy()
self._pending_events.clear()
return eventspython
from dataclasses import dataclass, field
@dataclass
class Order:
id: UUID
customer_id: UUID
items: list[OrderItem]
status: OrderStatus
_pending_events: list[DomainEvent] = field(default_factory=list)
@classmethod
def create(cls, customer_id: UUID, items: list, shipping_address: Address) -> "Order":
order = cls(id=uuid4(), customer_id=customer_id, items=[], status=OrderStatus.PENDING)
for item in items:
order.items.append(item)
order._raise_event(OrderItemAdded(order_id=order.id, product_id=item.product_id))
order._raise_event(OrderCreated(order_id=order.id, customer_id=customer_id))
return order
def cancel(self, reason: str):
if self.status == OrderStatus.SHIPPED:
raise InvalidOperationError("Cannot cancel shipped order")
self.status = OrderStatus.CANCELLED
self._raise_event(OrderCancelled(order_id=self.id, reason=reason))
def _raise_event(self, event: DomainEvent):
self._pending_events.append(event)
@property
def pending_events(self) -> list[DomainEvent]:
events = self._pending_events.copy()
self._pending_events.clear()
return eventsQuery Side (Read Model)
查询端(读模型)
Query Handler
查询处理器
python
class Query(BaseModel):
pass
class GetOrderById(Query):
order_id: UUID
class GetOrdersByCustomer(Query):
customer_id: UUID
status: OrderStatus | None = None
page: int = 1
page_size: int = 20
class GetOrderByIdHandler:
def __init__(self, read_db):
self.db = read_db
async def handle(self, query: GetOrderById) -> OrderView | None:
row = await self.db.fetchrow(
"SELECT * FROM order_summary WHERE id = $1", query.order_id
)
return OrderView(**row) if row else None
class OrderView(BaseModel):
"""Denormalized read model for orders."""
id: UUID
customer_id: UUID
customer_name: str # Denormalized
status: str
total_amount: float
item_count: int
created_at: datetimepython
class Query(BaseModel):
pass
class GetOrderById(Query):
order_id: UUID
class GetOrdersByCustomer(Query):
customer_id: UUID
status: OrderStatus | None = None
page: int = 1
page_size: int = 20
class GetOrderByIdHandler:
def __init__(self, read_db):
self.db = read_db
async def handle(self, query: GetOrderById) -> OrderView | None:
row = await self.db.fetchrow(
"SELECT * FROM order_summary WHERE id = $1", query.order_id
)
return OrderView(**row) if row else None
class OrderView(BaseModel):
"""Denormalized read model for orders."""
id: UUID
customer_id: UUID
customer_name: str # Denormalized
status: str
total_amount: float
item_count: int
created_at: datetimeProjections
投影
python
class OrderProjection:
"""Projects events to read models."""
def __init__(self, read_db, customer_service):
self.db = read_db
self.customers = customer_service
async def handle(self, event: DomainEvent):
match event:
case OrderCreated():
await self._on_order_created(event)
case OrderItemAdded():
await self._on_item_added(event)
case OrderCancelled():
await self._on_order_cancelled(event)
async def _on_order_created(self, event: OrderCreated):
customer = await self.customers.get(event.customer_id)
await self.db.execute(
"""INSERT INTO order_summary (id, customer_id, customer_name, status, total_amount, item_count, created_at)
VALUES ($1, $2, $3, 'pending', 0.0, 0, $4)
ON CONFLICT (id) DO UPDATE SET customer_name = $3""",
event.order_id, event.customer_id, customer.name, event.timestamp,
)
async def _on_item_added(self, event: OrderItemAdded):
subtotal = event.quantity * event.unit_price
await self.db.execute(
"UPDATE order_summary SET total_amount = total_amount + $1, item_count = item_count + 1 WHERE id = $2",
subtotal, event.order_id,
)
async def _on_order_cancelled(self, event: OrderCancelled):
await self.db.execute(
"UPDATE order_summary SET status = 'cancelled' WHERE id = $1", event.order_id
)python
class OrderProjection:
"""Projects events to read models."""
def __init__(self, read_db, customer_service):
self.db = read_db
self.customers = customer_service
async def handle(self, event: DomainEvent):
match event:
case OrderCreated():
await self._on_order_created(event)
case OrderItemAdded():
await self._on_item_added(event)
case OrderCancelled():
await self._on_order_cancelled(event)
async def _on_order_created(self, event: OrderCreated):
customer = await self.customers.get(event.customer_id)
await self.db.execute(
"""INSERT INTO order_summary (id, customer_id, customer_name, status, total_amount, item_count, created_at)
VALUES ($1, $2, $3, 'pending', 0.0, 0, $4)
ON CONFLICT (id) DO UPDATE SET customer_name = $3""",
event.order_id, event.customer_id, customer.name, event.timestamp,
)
async def _on_item_added(self, event: OrderItemAdded):
subtotal = event.quantity * event.unit_price
await self.db.execute(
"UPDATE order_summary SET total_amount = total_amount + $1, item_count = item_count + 1 WHERE id = $2",
subtotal, event.order_id,
)
async def _on_order_cancelled(self, event: OrderCancelled):
await self.db.execute(
"UPDATE order_summary SET status = 'cancelled' WHERE id = $1", event.order_id
)FastAPI Integration
FastAPI 集成
python
from fastapi import FastAPI, Depends, HTTPException
app = FastAPI()
@app.post("/api/v1/orders", status_code=201)
async def create_order(request: CreateOrderRequest, bus: CommandBus = Depends(get_command_bus)):
command = CreateOrder(
customer_id=request.customer_id,
items=request.items,
shipping_address=request.shipping_address,
)
try:
events = await bus.dispatch(command)
return {"order_id": events[0].order_id}
except InsufficientInventoryError as e:
raise HTTPException(400, f"Insufficient inventory: {e}")
@app.get("/api/v1/orders/{order_id}")
async def get_order(order_id: UUID, bus: QueryBus = Depends(get_query_bus)):
order = await bus.dispatch(GetOrderById(order_id=order_id))
if not order:
raise HTTPException(404, "Order not found")
return orderpython
from fastapi import FastAPI, Depends, HTTPException
app = FastAPI()
@app.post("/api/v1/orders", status_code=201)
async def create_order(request: CreateOrderRequest, bus: CommandBus = Depends(get_command_bus)):
command = CreateOrder(
customer_id=request.customer_id,
items=request.items,
shipping_address=request.shipping_address,
)
try:
events = await bus.dispatch(command)
return {"order_id": events[0].order_id}
except InsufficientInventoryError as e:
raise HTTPException(400, f"Insufficient inventory: {e}")
@app.get("/api/v1/orders/{order_id}")
async def get_order(order_id: UUID, bus: QueryBus = Depends(get_query_bus)):
order = await bus.dispatch(GetOrderById(order_id=order_id))
if not order:
raise HTTPException(404, "Order not found")
return orderKey Decisions
关键决策
| Decision | Recommendation |
|---|---|
| Consistency | Eventual consistency between write and read models |
| Event storage | Event store for write side, denormalized tables for read |
| Projection lag | Monitor and alert on projection delay |
| Read model count | Start with one, add more for specific query needs |
| Rebuild strategy | Ability to rebuild projections from events |
| 决策 | 建议 |
|---|---|
| 一致性 | 写模型与读模型之间采用最终一致性 |
| 事件存储 | 写端使用事件存储,读端使用非规范化表 |
| 投影延迟 | 监控并告警投影延迟情况 |
| 读模型数量 | 从一个读模型开始,根据特定查询需求逐步添加 |
| 重建策略 | 具备从事件重建投影的能力 |
Anti-Patterns (FORBIDDEN)
反模式(禁止使用)
python
undefinedpython
undefinedNEVER query write model for reads
绝不要查询写模型来获取读数据
order = await aggregate_repo.get(order_id) # WRONG
order = await aggregate_repo.get(order_id) # 错误做法
CORRECT: Use read model
正确做法:使用读模型
order = await query_bus.dispatch(GetOrderById(order_id=order_id))
order = await query_bus.dispatch(GetOrderById(order_id=order_id))
NEVER modify read model directly
绝不要直接修改读模型
await read_db.execute("UPDATE orders SET status = $1", status) # WRONG
await read_db.execute("UPDATE orders SET status = $1", status) # 错误做法
CORRECT: Dispatch command, let projection update
正确做法:分发命令,由投影完成更新
await bus.dispatch(UpdateOrderStatus(order_id=order_id, status=status))
await bus.dispatch(UpdateOrderStatus(order_id=order_id, status=status))
NEVER skip projection idempotency - use UPSERT
绝不要忽略投影的幂等性 - 使用UPSERT
await self.db.execute("INSERT INTO ... ON CONFLICT (id) DO UPDATE SET ...")
undefinedawait self.db.execute("INSERT INTO ... ON CONFLICT (id) DO UPDATE SET ...")
undefinedRelated Skills
相关技能
- - Event-sourced write models
event-sourcing - - Cross-aggregate transactions
saga-patterns - - Read model schema design
database-schema-designer
- - 基于事件溯源的写模型
event-sourcing - - 跨聚合根事务
saga-patterns - - 读模型 schema 设计
database-schema-designer