event-store-design
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseEvent Store Design
事件存储设计
Comprehensive guide to designing event stores for event-sourced applications.
为事件溯源应用设计事件存储的综合指南。
When to Use This Skill
何时使用本技能
- Designing event sourcing infrastructure
- Choosing between event store technologies
- Implementing custom event stores
- Optimizing event storage and retrieval
- Setting up event store schemas
- Planning for event store scaling
- 设计事件溯源基础设施
- 选择事件存储技术
- 实现自定义事件存储
- 优化事件存储与检索
- 搭建事件存储模式
- 规划事件存储扩容
Core Concepts
核心概念
1. Event Store Architecture
1. 事件存储架构
┌─────────────────────────────────────────────────────┐
│ Event Store │
├─────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Stream 1 │ │ Stream 2 │ │ Stream 3 │ │
│ │ (Aggregate) │ │ (Aggregate) │ │ (Aggregate) │ │
│ ├─────────────┤ ├─────────────┤ ├─────────────┤ │
│ │ Event 1 │ │ Event 1 │ │ Event 1 │ │
│ │ Event 2 │ │ Event 2 │ │ Event 2 │ │
│ │ Event 3 │ │ ... │ │ Event 3 │ │
│ │ ... │ │ │ │ Event 4 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────┤
│ Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ... │
└─────────────────────────────────────────────────────┘┌─────────────────────────────────────────────────────┐
│ Event Store │
├─────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Stream 1 │ │ Stream 2 │ │ Stream 3 │ │
│ │ (Aggregate) │ │ (Aggregate) │ │ (Aggregate) │ │
│ ├─────────────┤ ├─────────────┤ ├─────────────┤ │
│ │ Event 1 │ │ Event 1 │ │ Event 1 │ │
│ │ Event 2 │ │ Event 2 │ │ Event 2 │ │
│ │ Event 3 │ │ ... │ │ Event 3 │ │
│ │ ... │ │ │ │ Event 4 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────┤
│ Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ... │
└─────────────────────────────────────────────────────┘2. Event Store Requirements
2. 事件存储要求
| Requirement | Description |
|---|---|
| Append-only | Events are immutable, only appends |
| Ordered | Per-stream and global ordering |
| Versioned | Optimistic concurrency control |
| Subscriptions | Real-time event notifications |
| Idempotent | Handle duplicate writes safely |
| 要求项 | 描述内容 |
|---|---|
| Append-only | 事件不可变,仅允许追加写入 |
| Ordered | 支持流内与全局排序 |
| Versioned | 乐观并发控制 |
| Subscriptions | 实时事件通知 |
| Idempotent | 安全处理重复写入 |
Technology Comparison
技术对比
| Technology | Best For | Limitations |
|---|---|---|
| EventStoreDB | Pure event sourcing | Single-purpose |
| PostgreSQL | Existing Postgres stack | Manual implementation |
| Kafka | High-throughput streaming | Not ideal for per-stream queries |
| DynamoDB | Serverless, AWS-native | Query limitations |
| Marten | .NET ecosystems | .NET specific |
| 技术名称 | 适用场景 | 局限性 |
|---|---|---|
| EventStoreDB | 纯事件溯源场景 | 单一用途 |
| PostgreSQL | 已有PostgreSQL技术栈 | 需手动实现相关功能 |
| Kafka | 高吞吐量流处理 | 不适用于流内查询 |
| DynamoDB | 无服务器、AWS原生环境 | 查询能力有限 |
| Marten | .NET生态系统 | 仅支持.NET环境 |
Templates
模板
Template 1: PostgreSQL Event Store Schema
模板1:PostgreSQL Event Store Schema
sql
-- Events table
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
stream_id VARCHAR(255) NOT NULL,
stream_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
version BIGINT NOT NULL,
global_position BIGSERIAL,
created_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
);
-- Index for stream queries
CREATE INDEX idx_events_stream_id ON events(stream_id, version);
-- Index for global subscription
CREATE INDEX idx_events_global_position ON events(global_position);
-- Index for event type queries
CREATE INDEX idx_events_event_type ON events(event_type);
-- Index for time-based queries
CREATE INDEX idx_events_created_at ON events(created_at);
-- Snapshots table
CREATE TABLE snapshots (
stream_id VARCHAR(255) PRIMARY KEY,
stream_type VARCHAR(255) NOT NULL,
snapshot_data JSONB NOT NULL,
version BIGINT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Subscriptions checkpoint table
CREATE TABLE subscription_checkpoints (
subscription_id VARCHAR(255) PRIMARY KEY,
last_position BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ DEFAULT NOW()
);sql
-- Events table
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
stream_id VARCHAR(255) NOT NULL,
stream_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
version BIGINT NOT NULL,
global_position BIGSERIAL,
created_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
);
-- Index for stream queries
CREATE INDEX idx_events_stream_id ON events(stream_id, version);
-- Index for global subscription
CREATE INDEX idx_events_global_position ON events(global_position);
-- Index for event type queries
CREATE INDEX idx_events_event_type ON events(event_type);
-- Index for time-based queries
CREATE INDEX idx_events_created_at ON events(created_at);
-- Snapshots table
CREATE TABLE snapshots (
stream_id VARCHAR(255) PRIMARY KEY,
stream_type VARCHAR(255) NOT NULL,
snapshot_data JSONB NOT NULL,
version BIGINT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Subscriptions checkpoint table
CREATE TABLE subscription_checkpoints (
subscription_id VARCHAR(255) PRIMARY KEY,
last_position BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ DEFAULT NOW()
);Template 2: Python Event Store Implementation
模板2:Python事件存储实现
python
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional, List
from uuid import UUID, uuid4
import json
import asyncpg
@dataclass
class Event:
stream_id: str
event_type: str
data: dict
metadata: dict = field(default_factory=dict)
event_id: UUID = field(default_factory=uuid4)
version: Optional[int] = None
global_position: Optional[int] = None
created_at: datetime = field(default_factory=datetime.utcnow)
class EventStore:
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def append_events(
self,
stream_id: str,
stream_type: str,
events: List[Event],
expected_version: Optional[int] = None
) -> List[Event]:
"""Append events to a stream with optimistic concurrency."""
async with self.pool.acquire() as conn:
async with conn.transaction():
# Check expected version
if expected_version is not None:
current = await conn.fetchval(
"SELECT MAX(version) FROM events WHERE stream_id = $1",
stream_id
)
current = current or 0
if current != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, got {current}"
)
# Get starting version
start_version = await conn.fetchval(
"SELECT COALESCE(MAX(version), 0) + 1 FROM events WHERE stream_id = $1",
stream_id
)
# Insert events
saved_events = []
for i, event in enumerate(events):
event.version = start_version + i
row = await conn.fetchrow(
"""
INSERT INTO events (id, stream_id, stream_type, event_type,
event_data, metadata, version, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING global_position
""",
event.event_id,
stream_id,
stream_type,
event.event_type,
json.dumps(event.data),
json.dumps(event.metadata),
event.version,
event.created_at
)
event.global_position = row['global_position']
saved_events.append(event)
return saved_events
async def read_stream(
self,
stream_id: str,
from_version: int = 0,
limit: int = 1000
) -> List[Event]:
"""Read events from a stream."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, stream_id, event_type, event_data, metadata,
version, global_position, created_at
FROM events
WHERE stream_id = $1 AND version >= $2
ORDER BY version
LIMIT $3
""",
stream_id, from_version, limit
)
return [self._row_to_event(row) for row in rows]
async def read_all(
self,
from_position: int = 0,
limit: int = 1000
) -> List[Event]:
"""Read all events globally."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, stream_id, event_type, event_data, metadata,
version, global_position, created_at
FROM events
WHERE global_position > $1
ORDER BY global_position
LIMIT $2
""",
from_position, limit
)
return [self._row_to_event(row) for row in rows]
async def subscribe(
self,
subscription_id: str,
handler,
from_position: int = 0,
batch_size: int = 100
):
"""Subscribe to all events from a position."""
# Get checkpoint
async with self.pool.acquire() as conn:
checkpoint = await conn.fetchval(
"""
SELECT last_position FROM subscription_checkpoints
WHERE subscription_id = $1
""",
subscription_id
)
position = checkpoint or from_position
while True:
events = await self.read_all(position, batch_size)
if not events:
await asyncio.sleep(1) # Poll interval
continue
for event in events:
await handler(event)
position = event.global_position
# Save checkpoint
async with self.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO subscription_checkpoints (subscription_id, last_position)
VALUES ($1, $2)
ON CONFLICT (subscription_id)
DO UPDATE SET last_position = $2, updated_at = NOW()
""",
subscription_id, position
)
def _row_to_event(self, row) -> Event:
return Event(
event_id=row['id'],
stream_id=row['stream_id'],
event_type=row['event_type'],
data=json.loads(row['event_data']),
metadata=json.loads(row['metadata']),
version=row['version'],
global_position=row['global_position'],
created_at=row['created_at']
)
class ConcurrencyError(Exception):
"""Raised when optimistic concurrency check fails."""
passpython
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional, List
from uuid import UUID, uuid4
import json
import asyncpg
@dataclass
class Event:
stream_id: str
event_type: str
data: dict
metadata: dict = field(default_factory=dict)
event_id: UUID = field(default_factory=uuid4)
version: Optional[int] = None
global_position: Optional[int] = None
created_at: datetime = field(default_factory=datetime.utcnow)
class EventStore:
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def append_events(
self,
stream_id: str,
stream_type: str,
events: List[Event],
expected_version: Optional[int] = None
) -> List[Event]:
"""Append events to a stream with optimistic concurrency."""
async with self.pool.acquire() as conn:
async with conn.transaction():
# Check expected version
if expected_version is not None:
current = await conn.fetchval(
"SELECT MAX(version) FROM events WHERE stream_id = $1",
stream_id
)
current = current or 0
if current != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, got {current}"
)
# Get starting version
start_version = await conn.fetchval(
"SELECT COALESCE(MAX(version), 0) + 1 FROM events WHERE stream_id = $1",
stream_id
)
# Insert events
saved_events = []
for i, event in enumerate(events):
event.version = start_version + i
row = await conn.fetchrow(
"""
INSERT INTO events (id, stream_id, stream_type, event_type,
event_data, metadata, version, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING global_position
""",
event.event_id,
stream_id,
stream_type,
event.event_type,
json.dumps(event.data),
json.dumps(event.metadata),
event.version,
event.created_at
)
event.global_position = row['global_position']
saved_events.append(event)
return saved_events
async def read_stream(
self,
stream_id: str,
from_version: int = 0,
limit: int = 1000
) -> List[Event]:
"""Read events from a stream."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, stream_id, event_type, event_data, metadata,
version, global_position, created_at
FROM events
WHERE stream_id = $1 AND version >= $2
ORDER BY version
LIMIT $3
""",
stream_id, from_version, limit
)
return [self._row_to_event(row) for row in rows]
async def read_all(
self,
from_position: int = 0,
limit: int = 1000
) -> List[Event]:
"""Read all events globally."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, stream_id, event_type, event_data, metadata,
version, global_position, created_at
FROM events
WHERE global_position > $1
ORDER BY global_position
LIMIT $2
""",
from_position, limit
)
return [self._row_to_event(row) for row in rows]
async def subscribe(
self,
subscription_id: str,
handler,
from_position: int = 0,
batch_size: int = 100
):
"""Subscribe to all events from a position."""
# Get checkpoint
async with self.pool.acquire() as conn:
checkpoint = await conn.fetchval(
"""
SELECT last_position FROM subscription_checkpoints
WHERE subscription_id = $1
""",
subscription_id
)
position = checkpoint or from_position
while True:
events = await self.read_all(position, batch_size)
if not events:
await asyncio.sleep(1) # Poll interval
continue
for event in events:
await handler(event)
position = event.global_position
# Save checkpoint
async with self.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO subscription_checkpoints (subscription_id, last_position)
VALUES ($1, $2)
ON CONFLICT (subscription_id)
DO UPDATE SET last_position = $2, updated_at = NOW()
""",
subscription_id, position
)
def _row_to_event(self, row) -> Event:
return Event(
event_id=row['id'],
stream_id=row['stream_id'],
event_type=row['event_type'],
data=json.loads(row['event_data']),
metadata=json.loads(row['metadata']),
version=row['version'],
global_position=row['global_position'],
created_at=row['created_at']
)
class ConcurrencyError(Exception):
"""Raised when optimistic concurrency check fails."""
passTemplate 3: EventStoreDB Usage
模板3:EventStoreDB 使用示例
python
from esdbclient import EventStoreDBClient, NewEvent, StreamState
import jsonpython
from esdbclient import EventStoreDBClient, NewEvent, StreamState
import jsonConnect
Connect
client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")
client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")
Append events
Append events
def append_events(stream_name: str, events: list, expected_revision=None):
new_events = [
NewEvent(
type=event['type'],
data=json.dumps(event['data']).encode(),
metadata=json.dumps(event.get('metadata', {})).encode()
)
for event in events
]
if expected_revision is None:
state = StreamState.ANY
elif expected_revision == -1:
state = StreamState.NO_STREAM
else:
state = expected_revision
return client.append_to_stream(
stream_name=stream_name,
events=new_events,
current_version=state
)def append_events(stream_name: str, events: list, expected_revision=None):
new_events = [
NewEvent(
type=event['type'],
data=json.dumps(event['data']).encode(),
metadata=json.dumps(event.get('metadata', {})).encode()
)
for event in events
]
if expected_revision is None:
state = StreamState.ANY
elif expected_revision == -1:
state = StreamState.NO_STREAM
else:
state = expected_revision
return client.append_to_stream(
stream_name=stream_name,
events=new_events,
current_version=state
)Read stream
Read stream
def read_stream(stream_name: str, from_revision: int = 0):
events = client.get_stream(
stream_name=stream_name,
stream_position=from_revision
)
return [
{
'type': event.type,
'data': json.loads(event.data),
'metadata': json.loads(event.metadata) if event.metadata else {},
'stream_position': event.stream_position,
'commit_position': event.commit_position
}
for event in events
]
def read_stream(stream_name: str, from_revision: int = 0):
events = client.get_stream(
stream_name=stream_name,
stream_position=from_revision
)
return [
{
'type': event.type,
'data': json.loads(event.data),
'metadata': json.loads(event.metadata) if event.metadata else {},
'stream_position': event.stream_position,
'commit_position': event.commit_position
}
for event in events
]
Subscribe to all
Subscribe to all
async def subscribe_to_all(handler, from_position: int = 0):
subscription = client.subscribe_to_all(commit_position=from_position)
async for event in subscription:
await handler({
'type': event.type,
'data': json.loads(event.data),
'stream_id': event.stream_name,
'position': event.commit_position
})
async def subscribe_to_all(handler, from_position: int = 0):
subscription = client.subscribe_to_all(commit_position=from_position)
async for event in subscription:
await handler({
'type': event.type,
'data': json.loads(event.data),
'stream_id': event.stream_name,
'position': event.commit_position
})
Category projection ($ce-Category)
Category projection ($ce-Category)
def read_category(category: str):
"""Read all events for a category using system projection."""
return read_stream(f"$ce-{category}")
undefineddef read_category(category: str):
"""Read all events for a category using system projection."""
return read_stream(f"$ce-{category}")
undefinedTemplate 4: DynamoDB Event Store
模板4:DynamoDB 事件存储
python
import boto3
from boto3.dynamodb.conditions import Key
from datetime import datetime
import json
import uuid
class DynamoEventStore:
def __init__(self, table_name: str):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)
def append_events(self, stream_id: str, events: list, expected_version: int = None):
"""Append events with conditional write for concurrency."""
with self.table.batch_writer() as batch:
for i, event in enumerate(events):
version = (expected_version or 0) + i + 1
item = {
'PK': f"STREAM#{stream_id}",
'SK': f"VERSION#{version:020d}",
'GSI1PK': 'EVENTS',
'GSI1SK': datetime.utcnow().isoformat(),
'event_id': str(uuid.uuid4()),
'stream_id': stream_id,
'event_type': event['type'],
'event_data': json.dumps(event['data']),
'version': version,
'created_at': datetime.utcnow().isoformat()
}
batch.put_item(Item=item)
return events
def read_stream(self, stream_id: str, from_version: int = 0):
"""Read events from a stream."""
response = self.table.query(
KeyConditionExpression=Key('PK').eq(f"STREAM#{stream_id}") &
Key('SK').gte(f"VERSION#{from_version:020d}")
)
return [
{
'event_type': item['event_type'],
'data': json.loads(item['event_data']),
'version': item['version']
}
for item in response['Items']
]python
import boto3
from boto3.dynamodb.conditions import Key
from datetime import datetime
import json
import uuid
class DynamoEventStore:
def __init__(self, table_name: str):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)
def append_events(self, stream_id: str, events: list, expected_version: int = None):
"""Append events with conditional write for concurrency."""
with self.table.batch_writer() as batch:
for i, event in enumerate(events):
version = (expected_version or 0) + i + 1
item = {
'PK': f"STREAM#{stream_id}",
'SK': f"VERSION#{version:020d}",
'GSI1PK': 'EVENTS',
'GSI1SK': datetime.utcnow().isoformat(),
'event_id': str(uuid.uuid4()),
'stream_id': stream_id,
'event_type': event['type'],
'event_data': json.dumps(event['data']),
'version': version,
'created_at': datetime.utcnow().isoformat()
}
batch.put_item(Item=item)
return events
def read_stream(self, stream_id: str, from_version: int = 0):
"""Read events from a stream."""
response = self.table.query(
KeyConditionExpression=Key('PK').eq(f"STREAM#{stream_id}") &
Key('SK').gte(f"VERSION#{from_version:020d}")
)
return [
{
'event_type': item['event_type'],
'data': json.loads(item['event_data']),
'version': item['version']
}
for item in response['Items']
]Table definition (CloudFormation/Terraform)
Table definition (CloudFormation/Terraform)
"""
DynamoDB Table:
- PK (Partition Key): String
- SK (Sort Key): String
- GSI1PK, GSI1SK for global ordering
Capacity: On-demand or provisioned based on throughput needs
"""
undefined"""
DynamoDB Table:
- PK (Partition Key): String
- SK (Sort Key): String
- GSI1PK, GSI1SK for global ordering
Capacity: On-demand or provisioned based on throughput needs
"""
undefinedBest Practices
最佳实践
Do's
建议做法
- Use stream IDs that include aggregate type -
Order-{uuid} - Include correlation/causation IDs - For tracing
- Version events from day one - Plan for schema evolution
- Implement idempotency - Use event IDs for deduplication
- Index appropriately - For your query patterns
- 使用包含聚合类型的流ID -
Order-{uuid} - 添加关联/因果ID - 用于链路追踪
- 从第一天开始为事件版本化 - 为 schema 演进做规划
- 实现幂等性 - 使用事件ID进行去重
- 合理创建索引 - 匹配你的查询模式
Don'ts
禁忌做法
- Don't update or delete events - They're immutable facts
- Don't store large payloads - Keep events small
- Don't skip optimistic concurrency - Prevents data corruption
- Don't ignore backpressure - Handle slow consumers
- 不要更新或删除事件 - 事件是不可变的事实
- 不要存储大负载 - 保持事件轻量化
- 不要跳过乐观并发控制 - 防止数据损坏
- 不要忽视背压 - 处理慢消费者