event-sourcing

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Event Sourcing Patterns

事件溯源模式

Store application state as immutable events rather than current state snapshots.
将应用程序状态存储为不可变事件,而非当前状态快照。

Overview

概述

  • Full audit trail requirements (compliance, finance)
  • Temporal queries ("what was state at time X?")
  • CQRS implementations with separate read/write models
  • Systems requiring event replay and debugging
  • Microservices with eventual consistency
  • 完整审计追踪需求(合规、金融领域)
  • 时态查询("X时间点的状态是什么?")
  • 采用独立读写模型的CQRS实现
  • 需要事件重放和调试的系统
  • 具有最终一致性的微服务

Quick Reference

快速参考

Domain Event Base

领域事件基类

python
from pydantic import BaseModel, Field
from datetime import datetime, timezone
from uuid import UUID, uuid4

class DomainEvent(BaseModel):
    event_id: UUID = Field(default_factory=uuid4)
    aggregate_id: UUID
    event_type: str
    version: int
    timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
    class Config:
        frozen = True  # Events are immutable
python
from pydantic import BaseModel, Field
from datetime import datetime, timezone
from uuid import UUID, uuid4

class DomainEvent(BaseModel):
    event_id: UUID = Field(default_factory=uuid4)
    aggregate_id: UUID
    event_type: str
    version: int
    timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
    class Config:
        frozen = True  # Events are immutable

Event-Sourced Aggregate

事件溯源聚合根

python
class Account:
    def __init__(self):
        self._changes, self._version, self.balance = [], 0, 0.0

    def deposit(self, amount: float):
        self._raise_event(MoneyDeposited(aggregate_id=self.id, amount=amount, version=self._version + 1))

    def _apply(self, event):
        match event:
            case MoneyDeposited(): self.balance += event.amount
            case MoneyWithdrawn(): self.balance -= event.amount

    def load_from_history(self, events):
        for e in events: self._apply(e); self._version = e.version
python
class Account:
    def __init__(self):
        self._changes, self._version, self.balance = [], 0, 0.0

    def deposit(self, amount: float):
        self._raise_event(MoneyDeposited(aggregate_id=self.id, amount=amount, version=self._version + 1))

    def _apply(self, event):
        match event:
            case MoneyDeposited(): self.balance += event.amount
            case MoneyWithdrawn(): self.balance -= event.amount

    def load_from_history(self, events):
        for e in events: self._apply(e); self._version = e.version

Event Store Append

事件存储追加操作

python
async def append_events(self, aggregate_id: UUID, events: list, expected_version: int):
    current = await self.get_version(aggregate_id)
    if current != expected_version:
        raise ConcurrencyError(f"Expected {expected_version}, got {current}")
    for event in events:
        await self.session.execute(insert(event_store).values(
            event_id=event.event_id, aggregate_id=aggregate_id,
            event_type=event.event_type, version=event.version, data=event.model_dump()
        ))
python
async def append_events(self, aggregate_id: UUID, events: list, expected_version: int):
    current = await self.get_version(aggregate_id)
    if current != expected_version:
        raise ConcurrencyError(f"Expected {expected_version}, got {current}")
    for event in events:
        await self.session.execute(insert(event_store).values(
            event_id=event.event_id, aggregate_id=aggregate_id,
            event_type=event.event_type, version=event.version, data=event.model_dump()
        ))

Key Decisions

关键决策

DecisionRecommendation
Event namingPast tense (
OrderPlaced
, not
PlaceOrder
)
ConcurrencyOptimistic locking with version check
SnapshotsEvery 100-500 events for large aggregates
Event schemaVersion events, support upcasting
ProjectionsAsync handlers, idempotent updates
StoragePostgreSQL + JSONB or dedicated event store
决策事项推荐方案
事件命名使用过去式(
OrderPlaced
,而非
PlaceOrder
并发处理带版本检查的乐观锁
快照针对大型聚合根每100-500个事件创建一次快照
事件 schema对事件进行版本化,支持向上转换
投影异步处理器,幂等更新
存储PostgreSQL + JSONB 或专用事件存储

Anti-Patterns (FORBIDDEN)

反模式(禁止)

python
undefined
python
undefined

NEVER modify stored events

NEVER modify stored events

await event_store.update(event_id, new_data) # Destroys audit trail
await event_store.update(event_id, new_data) # Destroys audit trail

NEVER include computed data in events

NEVER include computed data in events

class OrderPlaced(DomainEvent): total: float # WRONG - compute from line items
class OrderPlaced(DomainEvent): total: float # WRONG - compute from line items

NEVER ignore event ordering

NEVER ignore event ordering

async for event in events: # May arrive out of order await handle(event) # Must check version/sequence
async for event in events: # May arrive out of order await handle(event) # Must check version/sequence

ALWAYS use immutable events

ALWAYS use immutable events

class Event(BaseModel): class Config: frozen = True # Correct
class Event(BaseModel): class Config: frozen = True # Correct

ALWAYS version your events

ALWAYS version your events

event_schema_version: int = 1 # Support schema evolution
undefined
event_schema_version: int = 1 # Support schema evolution
undefined

Related Skills

相关技能

  • message-queues
    - Distributed event delivery
  • database-schema-designer
    - Event store schema design
  • integration-testing
    - Testing event-sourced systems
  • message-queues
    - 分布式事件传递
  • database-schema-designer
    - 事件存储 schema 设计
  • integration-testing
    - 事件溯源系统测试

Capability Details

能力详情

event-store

event-store

Keywords: event store, append-only, event persistence, event log Solves:
  • Store events with optimistic concurrency
  • Query events by aggregate ID
  • Implement event versioning
关键词: 事件存储、追加写入、事件持久化、事件日志 解决问题:
  • 采用乐观并发机制存储事件
  • 按聚合根ID查询事件
  • 实现事件版本化

aggregate-pattern

aggregate-pattern

Keywords: aggregate, domain model, event sourcing aggregate, DDD Solves:
  • Model aggregates with event sourcing
  • Apply events to rebuild state
  • Handle commands and raise events
关键词: 聚合根、领域模型、事件溯源聚合根、DDD 解决问题:
  • 采用事件溯源建模聚合根
  • 应用事件重建状态
  • 处理命令并触发事件

projections

projections

Keywords: projection, read model, CQRS read side, denormalization Solves:
  • Create optimized read models from events
  • Implement async event handlers
  • Build materialized views
关键词: 投影、读模型、CQRS读端、反规范化 解决问题:
  • 从事件创建优化的读模型
  • 实现异步事件处理器
  • 构建物化视图

snapshots

snapshots

Keywords: snapshot, performance, aggregate loading, checkpoint Solves:
  • Speed up aggregate loading with snapshots
  • Implement snapshot strategies
  • Balance snapshot frequency vs storage
关键词: 快照、性能、聚合根加载、检查点 解决问题:
  • 通过快照加速聚合根加载
  • 实现快照策略
  • 平衡快照频率与存储成本