Loading...
Loading...
Compare original and translation side by side
┌────────────┴────────────┐
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Commands │ │ Queries │
│ API │ │ API │
└──────┬──────┘ └──────┬──────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Write │─────────►│ Read │
│ Model │ Events │ Model │
└─────────────┘ └─────────────┘ ┌────────────┴────────────┐
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Commands │ │ Queries │
│ API │ │ API │
└──────┬──────┘ └──────┬──────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Write │─────────►│ Read │
│ Model │ Events │ Model │
└─────────────┘ └─────────────┘@dataclass
class Command:
command_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.utcnow)
@dataclass
class CreateOrder(Command):
customer_id: str
items: list
shipping_address: dict
class CommandHandler(ABC, Generic[T]):
@abstractmethod
async def handle(self, command: T) -> Any:
pass
class CommandBus:
def __init__(self):
self._handlers: Dict[Type[Command], CommandHandler] = {}
def register(self, command_type, handler):
self._handlers[command_type] = handler
async def dispatch(self, command: Command) -> Any:
handler = self._handlers.get(type(command))
return await handler.handle(command)@dataclass
class Command:
command_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.utcnow)
@dataclass
class CreateOrder(Command):
customer_id: str
items: list
shipping_address: dict
class CommandHandler(ABC, Generic[T]):
@abstractmethod
async def handle(self, command: T) -> Any:
pass
class CommandBus:
def __init__(self):
self._handlers: Dict[Type[Command], CommandHandler] = {}
def register(self, command_type, handler):
self._handlers[command_type] = handler
async def dispatch(self, command: Command) -> Any:
handler = self._handlers.get(type(command))
return await handler.handle(command)@dataclass
class GetOrderById(Query):
order_id: str
@dataclass
class OrderView:
order_id: str
customer_id: str
status: str
total_amount: float
created_at: datetime
class GetOrderByIdHandler(QueryHandler[GetOrderById, OrderView]):
async def handle(self, query: GetOrderById) -> Optional[OrderView]:
row = await self.read_db.fetchrow(
"SELECT * FROM order_views WHERE order_id = $1",
query.order_id
)
return OrderView(**dict(row)) if row else None@dataclass
class GetOrderById(Query):
order_id: str
@dataclass
class OrderView:
order_id: str
customer_id: str
status: str
total_amount: float
created_at: datetime
class GetOrderByIdHandler(QueryHandler[GetOrderById, OrderView]):
async def handle(self, query: GetOrderById) -> Optional[OrderView]:
row = await self.read_db.fetchrow(
"SELECT * FROM order_views WHERE order_id = $1",
query.order_id
)
return OrderView(**dict(row)) if row else Noneundefinedundefinedundefinedundefinedclass ReadModelSynchronizer:
async def sync_projection(self, projection: Projection):
checkpoint = await self._get_checkpoint(projection.name)
events = await self.event_store.read_all(from_position=checkpoint)
for event in events:
if event.event_type in projection.handles():
await projection.apply(event)
await self._save_checkpoint(projection.name, event.position)
async def rebuild_projection(self, projection_name: str):
projection = self.projections[projection_name]
await projection.clear()
await self._save_checkpoint(projection_name, 0)
# Rebuild from beginningclass ReadModelSynchronizer:
async def sync_projection(self, projection: Projection):
checkpoint = await self._get_checkpoint(projection.name)
events = await self.event_store.read_all(from_position=checkpoint)
for event in events:
if event.event_type in projection.handles():
await projection.apply(event)
await self._save_checkpoint(projection.name, event.position)
async def rebuild_projection(self, projection_name: str):
projection = self.projections[projection_name]
await projection.clear()
await self._save_checkpoint(projection_name, 0)
# Rebuild from beginningasync def query_after_command(self, query, expected_version, stream_id, timeout=5.0):
"""Read-your-writes consistency."""
start = time.time()
while time.time() - start < timeout:
projection_version = await self._get_projection_version(stream_id)
if projection_version >= expected_version:
return await self.execute_query(query)
await asyncio.sleep(0.1)
return {"data": await self.execute_query(query), "_warning": "May be stale"}async def query_after_command(self, query, expected_version, stream_id, timeout=5.0):
"""Read-your-writes consistency."""
start = time.time()
while time.time() - start < timeout:
projection_version = await self._get_projection_version(stream_id)
if projection_version >= expected_version:
return await self.execute_query(query)
await asyncio.sleep(0.1)
return {"data": await self.execute_query(query), "_warning": "May be stale"}