event-sourcing
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseEvent Sourcing Patterns
事件溯源模式
Store application state as immutable events rather than current state snapshots.
将应用程序状态存储为不可变事件,而非当前状态快照。
Overview
概述
- Full audit trail requirements (compliance, finance)
- Temporal queries ("what was state at time X?")
- CQRS implementations with separate read/write models
- Systems requiring event replay and debugging
- Microservices with eventual consistency
- 完整审计追踪需求(合规、金融领域)
- 时态查询("X时间点的状态是什么?")
- 采用独立读写模型的CQRS实现
- 需要事件重放和调试的系统
- 具有最终一致性的微服务
Quick Reference
快速参考
Domain Event Base
领域事件基类
python
from pydantic import BaseModel, Field
from datetime import datetime, timezone
from uuid import UUID, uuid4
class DomainEvent(BaseModel):
event_id: UUID = Field(default_factory=uuid4)
aggregate_id: UUID
event_type: str
version: int
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class Config:
frozen = True # Events are immutablepython
from pydantic import BaseModel, Field
from datetime import datetime, timezone
from uuid import UUID, uuid4
class DomainEvent(BaseModel):
event_id: UUID = Field(default_factory=uuid4)
aggregate_id: UUID
event_type: str
version: int
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class Config:
frozen = True # Events are immutableEvent-Sourced Aggregate
事件溯源聚合根
python
class Account:
def __init__(self):
self._changes, self._version, self.balance = [], 0, 0.0
def deposit(self, amount: float):
self._raise_event(MoneyDeposited(aggregate_id=self.id, amount=amount, version=self._version + 1))
def _apply(self, event):
match event:
case MoneyDeposited(): self.balance += event.amount
case MoneyWithdrawn(): self.balance -= event.amount
def load_from_history(self, events):
for e in events: self._apply(e); self._version = e.versionpython
class Account:
def __init__(self):
self._changes, self._version, self.balance = [], 0, 0.0
def deposit(self, amount: float):
self._raise_event(MoneyDeposited(aggregate_id=self.id, amount=amount, version=self._version + 1))
def _apply(self, event):
match event:
case MoneyDeposited(): self.balance += event.amount
case MoneyWithdrawn(): self.balance -= event.amount
def load_from_history(self, events):
for e in events: self._apply(e); self._version = e.versionEvent Store Append
事件存储追加操作
python
async def append_events(self, aggregate_id: UUID, events: list, expected_version: int):
current = await self.get_version(aggregate_id)
if current != expected_version:
raise ConcurrencyError(f"Expected {expected_version}, got {current}")
for event in events:
await self.session.execute(insert(event_store).values(
event_id=event.event_id, aggregate_id=aggregate_id,
event_type=event.event_type, version=event.version, data=event.model_dump()
))python
async def append_events(self, aggregate_id: UUID, events: list, expected_version: int):
current = await self.get_version(aggregate_id)
if current != expected_version:
raise ConcurrencyError(f"Expected {expected_version}, got {current}")
for event in events:
await self.session.execute(insert(event_store).values(
event_id=event.event_id, aggregate_id=aggregate_id,
event_type=event.event_type, version=event.version, data=event.model_dump()
))Key Decisions
关键决策
| Decision | Recommendation |
|---|---|
| Event naming | Past tense ( |
| Concurrency | Optimistic locking with version check |
| Snapshots | Every 100-500 events for large aggregates |
| Event schema | Version events, support upcasting |
| Projections | Async handlers, idempotent updates |
| Storage | PostgreSQL + JSONB or dedicated event store |
| 决策事项 | 推荐方案 |
|---|---|
| 事件命名 | 使用过去式( |
| 并发处理 | 带版本检查的乐观锁 |
| 快照 | 针对大型聚合根每100-500个事件创建一次快照 |
| 事件 schema | 对事件进行版本化,支持向上转换 |
| 投影 | 异步处理器,幂等更新 |
| 存储 | PostgreSQL + JSONB 或专用事件存储 |
Anti-Patterns (FORBIDDEN)
反模式(禁止)
python
undefinedpython
undefinedNEVER modify stored events
NEVER modify stored events
await event_store.update(event_id, new_data) # Destroys audit trail
await event_store.update(event_id, new_data) # Destroys audit trail
NEVER include computed data in events
NEVER include computed data in events
class OrderPlaced(DomainEvent):
total: float # WRONG - compute from line items
class OrderPlaced(DomainEvent):
total: float # WRONG - compute from line items
NEVER ignore event ordering
NEVER ignore event ordering
async for event in events: # May arrive out of order
await handle(event) # Must check version/sequence
async for event in events: # May arrive out of order
await handle(event) # Must check version/sequence
ALWAYS use immutable events
ALWAYS use immutable events
class Event(BaseModel):
class Config:
frozen = True # Correct
class Event(BaseModel):
class Config:
frozen = True # Correct
ALWAYS version your events
ALWAYS version your events
event_schema_version: int = 1 # Support schema evolution
undefinedevent_schema_version: int = 1 # Support schema evolution
undefinedRelated Skills
相关技能
- - Distributed event delivery
message-queues - - Event store schema design
database-schema-designer - - Testing event-sourced systems
integration-testing
- - 分布式事件传递
message-queues - - 事件存储 schema 设计
database-schema-designer - - 事件溯源系统测试
integration-testing
Capability Details
能力详情
event-store
event-store
Keywords: event store, append-only, event persistence, event log
Solves:
- Store events with optimistic concurrency
- Query events by aggregate ID
- Implement event versioning
关键词: 事件存储、追加写入、事件持久化、事件日志
解决问题:
- 采用乐观并发机制存储事件
- 按聚合根ID查询事件
- 实现事件版本化
aggregate-pattern
aggregate-pattern
Keywords: aggregate, domain model, event sourcing aggregate, DDD
Solves:
- Model aggregates with event sourcing
- Apply events to rebuild state
- Handle commands and raise events
关键词: 聚合根、领域模型、事件溯源聚合根、DDD
解决问题:
- 采用事件溯源建模聚合根
- 应用事件重建状态
- 处理命令并触发事件
projections
projections
Keywords: projection, read model, CQRS read side, denormalization
Solves:
- Create optimized read models from events
- Implement async event handlers
- Build materialized views
关键词: 投影、读模型、CQRS读端、反规范化
解决问题:
- 从事件创建优化的读模型
- 实现异步事件处理器
- 构建物化视图
snapshots
snapshots
Keywords: snapshot, performance, aggregate loading, checkpoint
Solves:
- Speed up aggregate loading with snapshots
- Implement snapshot strategies
- Balance snapshot frequency vs storage
关键词: 快照、性能、聚合根加载、检查点
解决问题:
- 通过快照加速聚合根加载
- 实现快照策略
- 平衡快照频率与存储成本