cqrs-implementation

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

CQRS Implementation

CQRS 实现方案

Comprehensive guide to implementing CQRS (Command Query Responsibility Segregation) patterns.
这是一份关于实现命令查询职责分离(CQRS)模式的综合指南。

When to Use This Skill

何时使用该方案

  • Separating read and write concerns
  • Scaling reads independently from writes
  • Building event-sourced systems
  • Optimizing complex query scenarios
  • Different read/write data models needed
  • High-performance reporting requirements
  • 分离读写关注点
  • 独立扩展读操作与写操作
  • 构建事件溯源系统
  • 优化复杂查询场景
  • 需要不同的读写数据模型
  • 高性能报表需求

Core Concepts

核心概念

1. CQRS Architecture

1. CQRS 架构

                    ┌─────────────┐
                    │   Client    │
                    └──────┬──────┘
              ┌────────────┴────────────┐
              │                         │
              ▼                         ▼
       ┌─────────────┐          ┌─────────────┐
       │  Commands   │          │   Queries   │
       │    API      │          │    API      │
       └──────┬──────┘          └──────┬──────┘
              │                         │
              ▼                         ▼
       ┌─────────────┐          ┌─────────────┐
       │  Command    │          │   Query     │
       │  Handlers   │          │  Handlers   │
       └──────┬──────┘          └──────┬──────┘
              │                         │
              ▼                         ▼
       ┌─────────────┐          ┌─────────────┐
       │   Write     │─────────►│    Read     │
       │   Model     │  Events  │   Model     │
       └─────────────┘          └─────────────┘
                    ┌─────────────┐
                    │   Client    │
                    └──────┬──────┘
              ┌────────────┴────────────┐
              │                         │
              ▼                         ▼
       ┌─────────────┐          ┌─────────────┐
       │  Commands   │          │   Queries   │
       │    API      │          │    API      │
       └──────┬──────┘          └──────┬──────┘
              │                         │
              ▼                         ▼
       ┌─────────────┐          ┌─────────────┐
       │  Command    │          │   Query     │
       │  Handlers   │          │  Handlers   │
       └──────┬──────┘          └──────┬──────┘
              │                         │
              ▼                         ▼
       ┌─────────────┐          ┌─────────────┐
       │   Write     │─────────►│    Read     │
       │   Model     │  Events  │   Model     │
       └─────────────┘          └─────────────┘

2. Key Components

2. 核心组件

ComponentResponsibility
CommandIntent to change state
Command HandlerValidates and executes commands
EventRecord of state change
QueryRequest for data
Query HandlerRetrieves data from read model
ProjectorUpdates read model from events
Component职责说明
Command表示更改状态的意图
Command Handler验证并执行命令
Event记录状态变更
Query数据请求
Query Handler从读模型中检索数据
Projector根据事件更新读模型

Templates

实现模板

Template 1: Command Infrastructure

模板1:命令基础设施

python
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TypeVar, Generic, Dict, Any, Type
from datetime import datetime
import uuid
python
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TypeVar, Generic, Dict, Any, Type
from datetime import datetime
import uuid

Command base

Command base

@dataclass class Command: command_id: str = None timestamp: datetime = None
def __post_init__(self):
    self.command_id = self.command_id or str(uuid.uuid4())
    self.timestamp = self.timestamp or datetime.utcnow()
@dataclass class Command: command_id: str = None timestamp: datetime = None
def __post_init__(self):
    self.command_id = self.command_id or str(uuid.uuid4())
    self.timestamp = self.timestamp or datetime.utcnow()

Concrete commands

Concrete commands

@dataclass class CreateOrder(Command): customer_id: str items: list shipping_address: dict
@dataclass class AddOrderItem(Command): order_id: str product_id: str quantity: int price: float
@dataclass class CancelOrder(Command): order_id: str reason: str
@dataclass class CreateOrder(Command): customer_id: str items: list shipping_address: dict
@dataclass class AddOrderItem(Command): order_id: str product_id: str quantity: int price: float
@dataclass class CancelOrder(Command): order_id: str reason: str

Command handler base

Command handler base

T = TypeVar('T', bound=Command)
class CommandHandler(ABC, Generic[T]): @abstractmethod async def handle(self, command: T) -> Any: pass
T = TypeVar('T', bound=Command)
class CommandHandler(ABC, Generic[T]): @abstractmethod async def handle(self, command: T) -> Any: pass

Command bus

Command bus

class CommandBus: def init(self): self._handlers: Dict[Type[Command], CommandHandler] = {}
def register(self, command_type: Type[Command], handler: CommandHandler):
    self._handlers[command_type] = handler

async def dispatch(self, command: Command) -> Any:
    handler = self._handlers.get(type(command))
    if not handler:
        raise ValueError(f"No handler for {type(command).__name__}")
    return await handler.handle(command)
class CommandBus: def init(self): self._handlers: Dict[Type[Command], CommandHandler] = {}
def register(self, command_type: Type[Command], handler: CommandHandler):
    self._handlers[command_type] = handler

async def dispatch(self, command: Command) -> Any:
    handler = self._handlers.get(type(command))
    if not handler:
        raise ValueError(f"No handler for {type(command).__name__}")
    return await handler.handle(command)

Command handler implementation

Command handler implementation

class CreateOrderHandler(CommandHandler[CreateOrder]): def init(self, order_repository, event_store): self.order_repository = order_repository self.event_store = event_store
async def handle(self, command: CreateOrder) -> str:
    # Validate
    if not command.items:
        raise ValueError("Order must have at least one item")

    # Create aggregate
    order = Order.create(
        customer_id=command.customer_id,
        items=command.items,
        shipping_address=command.shipping_address
    )

    # Persist events
    await self.event_store.append_events(
        stream_id=f"Order-{order.id}",
        stream_type="Order",
        events=order.uncommitted_events
    )

    return order.id
undefined
class CreateOrderHandler(CommandHandler[CreateOrder]): def init(self, order_repository, event_store): self.order_repository = order_repository self.event_store = event_store
async def handle(self, command: CreateOrder) -> str:
    # Validate
    if not command.items:
        raise ValueError("Order must have at least one item")

    # Create aggregate
    order = Order.create(
        customer_id=command.customer_id,
        items=command.items,
        shipping_address=command.shipping_address
    )

    # Persist events
    await self.event_store.append_events(
        stream_id=f"Order-{order.id}",
        stream_type="Order",
        events=order.uncommitted_events
    )

    return order.id
undefined

Template 2: Query Infrastructure

模板2:查询基础设施

python
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TypeVar, Generic, List, Optional
python
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TypeVar, Generic, List, Optional

Query base

Query base

@dataclass class Query: pass
@dataclass class Query: pass

Concrete queries

Concrete queries

@dataclass class GetOrderById(Query): order_id: str
@dataclass class GetCustomerOrders(Query): customer_id: str status: Optional[str] = None page: int = 1 page_size: int = 20
@dataclass class SearchOrders(Query): query: str filters: dict = None sort_by: str = "created_at" sort_order: str = "desc"
@dataclass class GetOrderById(Query): order_id: str
@dataclass class GetCustomerOrders(Query): customer_id: str status: Optional[str] = None page: int = 1 page_size: int = 20
@dataclass class SearchOrders(Query): query: str filters: dict = None sort_by: str = "created_at" sort_order: str = "desc"

Query result types

Query result types

@dataclass class OrderView: order_id: str customer_id: str status: str total_amount: float item_count: int created_at: datetime shipped_at: Optional[datetime] = None
@dataclass class PaginatedResult(Generic[T]): items: List[T] total: int page: int page_size: int
@property
def total_pages(self) -> int:
    return (self.total + self.page_size - 1) // self.page_size
@dataclass class OrderView: order_id: str customer_id: str status: str total_amount: float item_count: int created_at: datetime shipped_at: Optional[datetime] = None
@dataclass class PaginatedResult(Generic[T]): items: List[T] total: int page: int page_size: int
@property
def total_pages(self) -> int:
    return (self.total + self.page_size - 1) // self.page_size

Query handler base

Query handler base

T = TypeVar('T', bound=Query) R = TypeVar('R')
class QueryHandler(ABC, Generic[T, R]): @abstractmethod async def handle(self, query: T) -> R: pass
T = TypeVar('T', bound=Query) R = TypeVar('R')
class QueryHandler(ABC, Generic[T, R]): @abstractmethod async def handle(self, query: T) -> R: pass

Query bus

Query bus

class QueryBus: def init(self): self._handlers: Dict[Type[Query], QueryHandler] = {}
def register(self, query_type: Type[Query], handler: QueryHandler):
    self._handlers[query_type] = handler

async def dispatch(self, query: Query) -> Any:
    handler = self._handlers.get(type(query))
    if not handler:
        raise ValueError(f"No handler for {type(query).__name__}")
    return await handler.handle(query)
class QueryBus: def init(self): self._handlers: Dict[Type[Query], QueryHandler] = {}
def register(self, query_type: Type[Query], handler: QueryHandler):
    self._handlers[query_type] = handler

async def dispatch(self, query: Query) -> Any:
    handler = self._handlers.get(type(query))
    if not handler:
        raise ValueError(f"No handler for {type(query).__name__}")
    return await handler.handle(query)

Query handler implementation

Query handler implementation

class GetOrderByIdHandler(QueryHandler[GetOrderById, Optional[OrderView]]): def init(self, read_db): self.read_db = read_db
async def handle(self, query: GetOrderById) -> Optional[OrderView]:
    async with self.read_db.acquire() as conn:
        row = await conn.fetchrow(
            """
            SELECT order_id, customer_id, status, total_amount,
                   item_count, created_at, shipped_at
            FROM order_views
            WHERE order_id = $1
            """,
            query.order_id
        )
        if row:
            return OrderView(**dict(row))
        return None
class GetCustomerOrdersHandler(QueryHandler[GetCustomerOrders, PaginatedResult[OrderView]]): def init(self, read_db): self.read_db = read_db
async def handle(self, query: GetCustomerOrders) -> PaginatedResult[OrderView]:
    async with self.read_db.acquire() as conn:
        # Build query with optional status filter
        where_clause = "customer_id = $1"
        params = [query.customer_id]

        if query.status:
            where_clause += " AND status = $2"
            params.append(query.status)

        # Get total count
        total = await conn.fetchval(
            f"SELECT COUNT(*) FROM order_views WHERE {where_clause}",
            *params
        )

        # Get paginated results
        offset = (query.page - 1) * query.page_size
        rows = await conn.fetch(
            f"""
            SELECT order_id, customer_id, status, total_amount,
                   item_count, created_at, shipped_at
            FROM order_views
            WHERE {where_clause}
            ORDER BY created_at DESC
            LIMIT ${len(params) + 1} OFFSET ${len(params) + 2}
            """,
            *params, query.page_size, offset
        )

        return PaginatedResult(
            items=[OrderView(**dict(row)) for row in rows],
            total=total,
            page=query.page,
            page_size=query.page_size
        )
undefined
class GetOrderByIdHandler(QueryHandler[GetOrderById, Optional[OrderView]]): def init(self, read_db): self.read_db = read_db
async def handle(self, query: GetOrderById) -> Optional[OrderView]:
    async with self.read_db.acquire() as conn:
        row = await conn.fetchrow(
            """
            SELECT order_id, customer_id, status, total_amount,
                   item_count, created_at, shipped_at
            FROM order_views
            WHERE order_id = $1
            """,
            query.order_id
        )
        if row:
            return OrderView(**dict(row))
        return None
class GetCustomerOrdersHandler(QueryHandler[GetCustomerOrders, PaginatedResult[OrderView]]): def init(self, read_db): self.read_db = read_db
async def handle(self, query: GetCustomerOrders) -> PaginatedResult[OrderView]:
    async with self.read_db.acquire() as conn:
        # Build query with optional status filter
        where_clause = "customer_id = $1"
        params = [query.customer_id]

        if query.status:
            where_clause += " AND status = $2"
            params.append(query.status)

        # Get total count
        total = await conn.fetchval(
            f"SELECT COUNT(*) FROM order_views WHERE {where_clause}",
            *params
        )

        # Get paginated results
        offset = (query.page - 1) * query.page_size
        rows = await conn.fetch(
            f"""
            SELECT order_id, customer_id, status, total_amount,
                   item_count, created_at, shipped_at
            FROM order_views
            WHERE {where_clause}
            ORDER BY created_at DESC
            LIMIT ${len(params) + 1} OFFSET ${len(params) + 2}
            """,
            *params, query.page_size, offset
        )

        return PaginatedResult(
            items=[OrderView(**dict(row)) for row in rows],
            total=total,
            page=query.page,
            page_size=query.page_size
        )
undefined

Template 3: FastAPI CQRS Application

模板3:FastAPI CQRS 应用

python
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional

app = FastAPI()
python
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional

app = FastAPI()

Request/Response models

Request/Response models

class CreateOrderRequest(BaseModel): customer_id: str items: List[dict] shipping_address: dict
class OrderResponse(BaseModel): order_id: str customer_id: str status: str total_amount: float item_count: int created_at: datetime
class CreateOrderRequest(BaseModel): customer_id: str items: List[dict] shipping_address: dict
class OrderResponse(BaseModel): order_id: str customer_id: str status: str total_amount: float item_count: int created_at: datetime

Dependency injection

Dependency injection

def get_command_bus() -> CommandBus: return app.state.command_bus
def get_query_bus() -> QueryBus: return app.state.query_bus
def get_command_bus() -> CommandBus: return app.state.command_bus
def get_query_bus() -> QueryBus: return app.state.query_bus

Command endpoints (POST, PUT, DELETE)

Command endpoints (POST, PUT, DELETE)

@app.post("/orders", response_model=dict) async def create_order( request: CreateOrderRequest, command_bus: CommandBus = Depends(get_command_bus) ): command = CreateOrder( customer_id=request.customer_id, items=request.items, shipping_address=request.shipping_address ) order_id = await command_bus.dispatch(command) return {"order_id": order_id}
@app.post("/orders/{order_id}/items") async def add_item( order_id: str, product_id: str, quantity: int, price: float, command_bus: CommandBus = Depends(get_command_bus) ): command = AddOrderItem( order_id=order_id, product_id=product_id, quantity=quantity, price=price ) await command_bus.dispatch(command) return {"status": "item_added"}
@app.delete("/orders/{order_id}") async def cancel_order( order_id: str, reason: str, command_bus: CommandBus = Depends(get_command_bus) ): command = CancelOrder(order_id=order_id, reason=reason) await command_bus.dispatch(command) return {"status": "cancelled"}
@app.post("/orders", response_model=dict) async def create_order( request: CreateOrderRequest, command_bus: CommandBus = Depends(get_command_bus) ): command = CreateOrder( customer_id=request.customer_id, items=request.items, shipping_address=request.shipping_address ) order_id = await command_bus.dispatch(command) return {"order_id": order_id}
@app.post("/orders/{order_id}/items") async def add_item( order_id: str, product_id: str, quantity: int, price: float, command_bus: CommandBus = Depends(get_command_bus) ): command = AddOrderItem( order_id=order_id, product_id=product_id, quantity=quantity, price=price ) await command_bus.dispatch(command) return {"status": "item_added"}
@app.delete("/orders/{order_id}") async def cancel_order( order_id: str, reason: str, command_bus: CommandBus = Depends(get_command_bus) ): command = CancelOrder(order_id=order_id, reason=reason) await command_bus.dispatch(command) return {"status": "cancelled"}

Query endpoints (GET)

Query endpoints (GET)

@app.get("/orders/{order_id}", response_model=OrderResponse) async def get_order( order_id: str, query_bus: QueryBus = Depends(get_query_bus) ): query = GetOrderById(order_id=order_id) result = await query_bus.dispatch(query) if not result: raise HTTPException(status_code=404, detail="Order not found") return result
@app.get("/customers/{customer_id}/orders") async def get_customer_orders( customer_id: str, status: Optional[str] = None, page: int = 1, page_size: int = 20, query_bus: QueryBus = Depends(get_query_bus) ): query = GetCustomerOrders( customer_id=customer_id, status=status, page=page, page_size=page_size ) return await query_bus.dispatch(query)
@app.get("/orders/search") async def search_orders( q: str, sort_by: str = "created_at", query_bus: QueryBus = Depends(get_query_bus) ): query = SearchOrders(query=q, sort_by=sort_by) return await query_bus.dispatch(query)
undefined
@app.get("/orders/{order_id}", response_model=OrderResponse) async def get_order( order_id: str, query_bus: QueryBus = Depends(get_query_bus) ): query = GetOrderById(order_id=order_id) result = await query_bus.dispatch(query) if not result: raise HTTPException(status_code=404, detail="Order not found") return result
@app.get("/customers/{customer_id}/orders") async def get_customer_orders( customer_id: str, status: Optional[str] = None, page: int = 1, page_size: int = 20, query_bus: QueryBus = Depends(get_query_bus) ): query = GetCustomerOrders( customer_id=customer_id, status=status, page=page, page_size=page_size ) return await query_bus.dispatch(query)
@app.get("/orders/search") async def search_orders( q: str, sort_by: str = "created_at", query_bus: QueryBus = Depends(get_query_bus) ): query = SearchOrders(query=q, sort_by=sort_by) return await query_bus.dispatch(query)
undefined

Template 4: Read Model Synchronization

模板4:读模型同步

python
class ReadModelSynchronizer:
    """Keeps read models in sync with events."""

    def __init__(self, event_store, read_db, projections: List[Projection]):
        self.event_store = event_store
        self.read_db = read_db
        self.projections = {p.name: p for p in projections}

    async def run(self):
        """Continuously sync read models."""
        while True:
            for name, projection in self.projections.items():
                await self._sync_projection(projection)
            await asyncio.sleep(0.1)

    async def _sync_projection(self, projection: Projection):
        checkpoint = await self._get_checkpoint(projection.name)

        events = await self.event_store.read_all(
            from_position=checkpoint,
            limit=100
        )

        for event in events:
            if event.event_type in projection.handles():
                try:
                    await projection.apply(event)
                except Exception as e:
                    # Log error, possibly retry or skip
                    logger.error(f"Projection error: {e}")
                    continue

            await self._save_checkpoint(projection.name, event.global_position)

    async def rebuild_projection(self, projection_name: str):
        """Rebuild a projection from scratch."""
        projection = self.projections[projection_name]

        # Clear existing data
        await projection.clear()

        # Reset checkpoint
        await self._save_checkpoint(projection_name, 0)

        # Rebuild
        while True:
            checkpoint = await self._get_checkpoint(projection_name)
            events = await self.event_store.read_all(checkpoint, 1000)

            if not events:
                break

            for event in events:
                if event.event_type in projection.handles():
                    await projection.apply(event)

            await self._save_checkpoint(
                projection_name,
                events[-1].global_position
            )
python
class ReadModelSynchronizer:
    """Keeps read models in sync with events."""

    def __init__(self, event_store, read_db, projections: List[Projection]):
        self.event_store = event_store
        self.read_db = read_db
        self.projections = {p.name: p for p in projections}

    async def run(self):
        """Continuously sync read models."""
        while True:
            for name, projection in self.projections.items():
                await self._sync_projection(projection)
            await asyncio.sleep(0.1)

    async def _sync_projection(self, projection: Projection):
        checkpoint = await self._get_checkpoint(projection.name)

        events = await self.event_store.read_all(
            from_position=checkpoint,
            limit=100
        )

        for event in events:
            if event.event_type in projection.handles():
                try:
                    await projection.apply(event)
                except Exception as e:
                    # Log error, possibly retry or skip
                    logger.error(f"Projection error: {e}")
                    continue

            await self._save_checkpoint(projection.name, event.global_position)

    async def rebuild_projection(self, projection_name: str):
        """Rebuild a projection from scratch."""
        projection = self.projections[projection_name]

        # Clear existing data
        await projection.clear()

        # Reset checkpoint
        await self._save_checkpoint(projection_name, 0)

        # Rebuild
        while True:
            checkpoint = await self._get_checkpoint(projection_name)
            events = await self.event_store.read_all(checkpoint, 1000)

            if not events:
                break

            for event in events:
                if event.event_type in projection.handles():
                    await projection.apply(event)

            await self._save_checkpoint(
                projection_name,
                events[-1].global_position
            )

Template 5: Eventual Consistency Handling

模板5:最终一致性处理

python
class ConsistentQueryHandler:
    """Query handler that can wait for consistency."""

    def __init__(self, read_db, event_store):
        self.read_db = read_db
        self.event_store = event_store

    async def query_after_command(
        self,
        query: Query,
        expected_version: int,
        stream_id: str,
        timeout: float = 5.0
    ):
        """
        Execute query, ensuring read model is at expected version.
        Used for read-your-writes consistency.
        """
        start_time = time.time()

        while time.time() - start_time < timeout:
            # Check if read model is caught up
            projection_version = await self._get_projection_version(stream_id)

            if projection_version >= expected_version:
                return await self.execute_query(query)

            # Wait a bit and retry
            await asyncio.sleep(0.1)

        # Timeout - return stale data with warning
        return {
            "data": await self.execute_query(query),
            "_warning": "Data may be stale"
        }

    async def _get_projection_version(self, stream_id: str) -> int:
        """Get the last processed event version for a stream."""
        async with self.read_db.acquire() as conn:
            return await conn.fetchval(
                "SELECT last_event_version FROM projection_state WHERE stream_id = $1",
                stream_id
            ) or 0
python
class ConsistentQueryHandler:
    """Query handler that can wait for consistency."""

    def __init__(self, read_db, event_store):
        self.read_db = read_db
        self.event_store = event_store

    async def query_after_command(
        self,
        query: Query,
        expected_version: int,
        stream_id: str,
        timeout: float = 5.0
    ):
        """
        Execute query, ensuring read model is at expected version.
        Used for read-your-writes consistency.
        """
        start_time = time.time()

        while time.time() - start_time < timeout:
            # Check if read model is caught up
            projection_version = await self._get_projection_version(stream_id)

            if projection_version >= expected_version:
                return await self.execute_query(query)

            # Wait a bit and retry
            await asyncio.sleep(0.1)

        # Timeout - return stale data with warning
        return {
            "data": await self.execute_query(query),
            "_warning": "Data may be stale"
        }

    async def _get_projection_version(self, stream_id: str) -> int:
        """Get the last processed event version for a stream."""
        async with self.read_db.acquire() as conn:
            return await conn.fetchval(
                "SELECT last_event_version FROM projection_state WHERE stream_id = $1",
                stream_id
            ) or 0

Best Practices

最佳实践

Do's

推荐做法

  • Separate command and query models - Different needs
  • Use eventual consistency - Accept propagation delay
  • Validate in command handlers - Before state change
  • Denormalize read models - Optimize for queries
  • Version your events - For schema evolution
  • 分离命令与查询模型 - 满足不同需求
  • 使用最终一致性 - 接受传播延迟
  • 在命令处理器中验证 - 状态变更前执行
  • 对读模型进行反规范化 - 优化查询效率
  • 为事件添加版本 - 支持 schema 演进

Don'ts

禁忌事项

  • Don't query in commands - Use only for writes
  • Don't couple read/write schemas - Independent evolution
  • Don't over-engineer - Start simple
  • Don't ignore consistency SLAs - Define acceptable lag
  • 不要在命令中执行查询 - 命令仅用于写操作
  • 不要耦合读写 schema - 独立演进
  • 不要过度设计 - 从简单方案开始
  • 不要忽略一致性SLA - 定义可接受的延迟

Resources

参考资源