ddd-patterns
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseDomain-Driven Design (DDD): Advanced Patterns
领域驱动设计(DDD):高级模式
Advanced integration patterns for handling complex distributed system interactions, data consistency, and cross-boundary communication. Use these patterns when a single Bounded Context's tactical patterns are insufficient.
"Distributed systems are different. If we don't know that, we are building them wrong." — Udi Dahan
用于处理复杂分布式系统交互、数据一致性和跨边界通信的高级集成模式。当单一限界上下文的战术模式不足以满足需求时,可使用这些模式。
"分布式系统是不同的。如果我们不了解这一点,就会构建出错误的系统。" —— Udi Dahan
🧭 When to Use Advanced Patterns
🧭 何时使用高级模式
| Situation | Pattern(s) | Complexity |
|---|---|---|
| Read model needs different shape than write model | CQRS | Medium |
| Need full audit trail of all state changes | Event Sourcing | High |
| Need atomic DB + message broker consistency | Outbox | Medium |
| Long-running business process across contexts | Saga | High |
| External system has incompatible model | ACL | Medium |
| High read load, simple queries | CQRS + Read Model | Medium |
Warning: These patterns add significant complexity. Don't use them unless you have a clear problem they solve.
| 场景 | 适用模式 | 复杂度 |
|---|---|---|
| 读模型与写模型结构需求不同 | CQRS | 中等 |
| 需要所有状态变更的完整审计追踪 | Event Sourcing | 高 |
| 需要数据库与消息代理的原子一致性 | Outbox | 中等 |
| 跨上下文的长期业务流程 | Saga | 高 |
| 外部系统存在不兼容模型 | ACL | 中等 |
| 高读负载、简单查询场景 | CQRS + 读模型 | 中等 |
警告: 这些模式会显著增加系统复杂度。除非有明确的问题需要解决,否则不要使用。
1️⃣ CQRS (Command Query Responsibility Segregation)
1️⃣ CQRS(命令查询职责分离)
Definition: Separate the read (Query) model from the write (Command) model. Each is optimized for its purpose.
定义: 将读(Query)模型与写(Command)模型分离,各自针对其用途进行优化。
Why CQRS?
为什么选择CQRS?
| Aspect | Traditional CRUD | CQRS |
|---|---|---|
| Read model | Same as write model | Optimized for queries (denormalized) |
| Write model | Same as read model | Optimized for business rules (normalized) |
| Schema | Single schema | Separate schemas per model |
| Scaling | Scale together | Scale independently |
| 维度 | 传统CRUD | CQRS |
|---|---|---|
| 读模型 | 与写模型一致 | 针对查询优化(非规范化) |
| 写模型 | 与读模型一致 | 针对业务规则优化(规范化) |
| 数据库 schema | 单一 schema | 每个模型使用独立 schema |
| 扩容方式 | 读写一起扩容 | 读写独立扩容 |
Architecture
架构
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │────▶│ Command │────▶│ Command │
│ │ │ API │ │ Handler │
└─────────────┘ └─────────────┘ └──────┬──────┘
│
▼
┌─────────────┐
│ Write DB │
│ (Normalized)│
│ PostgreSQL │
└──────┬──────┘
│
│ Events
▼
┌─────────────┐
│ Event Bus │
│ (Kafka) │
└──────┬──────┘
│
▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │◀────│ Query │◀────│ Projector │
│ │ │ API │ │ (Consumer) │
└─────────────┘ └─────────────┘ └──────┬──────┘
│
▼
┌─────────────┐
│ Read DB │
│(Denormalized)│
│Elasticsearch│
└─────────────┘┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │────▶│ Command │────▶│ Command │
│ │ │ API │ │ Handler │
└─────────────┘ └─────────────┘ └──────┬──────┘
│
▼
┌─────────────┐
│ Write DB │
│ (Normalized)│
│ PostgreSQL │
└──────┬──────┘
│
│ Events
▼
┌─────────────┐
│ Event Bus │
│ (Kafka) │
└──────┬──────┘
│
▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │◀────│ Query │◀────│ Projector │
│ │ │ API │ │ (Consumer) │
└─────────────┘ └─────────────┘ └──────┬──────┘
│
▼
┌─────────────┐
│ Read DB │
│(Denormalized)│
│Elasticsearch│
└─────────────┘Implementation Example (Go)
实现示例(Go)
go
// ============ COMMAND SIDE ============
// Command
type PlaceOrderCommand struct {
CustomerID uuid.UUID
Items []OrderItemDTO
CouponCode string
}
// Command Handler
type PlaceOrderHandler struct {
orderRepo order.Repository
pricingSvc *PricingService
eventBus EventBus
}
func (h *PlaceOrderHandler) Handle(ctx context.Context, cmd PlaceOrderCommand) error {
// 1. Load/build aggregate
items := convertItems(cmd.Items)
total := h.pricingSvc.CalculateTotal(items, cmd.CouponCode)
order, err := order.New(cmd.CustomerID, items, total)
if err != nil {
return err
}
// 2. Save aggregate (generates events)
if err := h.orderRepo.Save(ctx, order); err != nil {
return err
}
// 3. Events published by repository
return nil
}
// ============ QUERY SIDE ============
// Read Model (denormalized for specific query)
type OrderSummary struct {
OrderID uuid.UUID
CustomerName string
Total string
Status string
ItemCount int
CreatedAt time.Time
}
// Query
type GetCustomerOrdersQuery struct {
CustomerID uuid.UUID
Page int
PageSize int
}
// Query Handler
type OrderQueryHandler struct {
readDB *sql.DB // separate read-optimized DB
}
func (h *OrderQueryHandler) Handle(
ctx context.Context,
q GetCustomerOrdersQuery,
) ([]OrderSummary, error) {
// Optimized query — no joins needed
rows, err := h.readDB.QueryContext(ctx, `
SELECT order_id, customer_name, total, status, item_count, created_at
FROM order_summaries
WHERE customer_id = $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
`, q.CustomerID, q.PageSize, (q.Page-1)*q.PageSize)
// ... scan rows
}
// ============ PROJECTOR (syncs read model) ============
type OrderProjector struct {
readDB *sql.DB
}
func (p *OrderProjector) OnOrderPlaced(event OrderPlacedEvent) error {
_, err := p.readDB.Exec(`
INSERT INTO order_summaries (order_id, customer_id, total, status, item_count, created_at)
VALUES ($1, $2, $3, 'PENDING', $4, $5)
`, event.OrderID, event.CustomerID, event.Total.String(),
len(event.Items), event.CreatedAt)
return err
}
func (p *OrderProjector) OnOrderPaid(event OrderPaidEvent) error {
_, err := p.readDB.Exec(`
UPDATE order_summaries SET status = 'PAID' WHERE order_id = $1
`, event.OrderID)
return err
}go
// ============ COMMAND SIDE ============
// Command
type PlaceOrderCommand struct {
CustomerID uuid.UUID
Items []OrderItemDTO
CouponCode string
}
// Command Handler
type PlaceOrderHandler struct {
orderRepo order.Repository
pricingSvc *PricingService
eventBus EventBus
}
func (h *PlaceOrderHandler) Handle(ctx context.Context, cmd PlaceOrderCommand) error {
// 1. Load/build aggregate
items := convertItems(cmd.Items)
total := h.pricingSvc.CalculateTotal(items, cmd.CouponCode)
order, err := order.New(cmd.CustomerID, items, total)
if err != nil {
return err
}
// 2. Save aggregate (generates events)
if err := h.orderRepo.Save(ctx, order); err != nil {
return err
}
// 3. Events published by repository
return nil
}
// ============ QUERY SIDE ============
// Read Model (denormalized for specific query)
type OrderSummary struct {
OrderID uuid.UUID
CustomerName string
Total string
Status string
ItemCount int
CreatedAt time.Time
}
// Query
type GetCustomerOrdersQuery struct {
CustomerID uuid.UUID
Page int
PageSize int
}
// Query Handler
type OrderQueryHandler struct {
readDB *sql.DB // separate read-optimized DB
}
func (h *OrderQueryHandler) Handle(
ctx context.Context,
q GetCustomerOrdersQuery,
) ([]OrderSummary, error) {
// Optimized query — no joins needed
rows, err := h.readDB.QueryContext(ctx, `
SELECT order_id, customer_name, total, status, item_count, created_at
FROM order_summaries
WHERE customer_id = $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
`, q.CustomerID, q.PageSize, (q.Page-1)*q.PageSize)
// ... scan rows
}
// ============ PROJECTOR (syncs read model) ============
type OrderProjector struct {
readDB *sql.DB
}
func (p *OrderProjector) OnOrderPlaced(event OrderPlacedEvent) error {
_, err := p.readDB.Exec(`
INSERT INTO order_summaries (order_id, customer_id, total, status, item_count, created_at)
VALUES ($1, $2, $3, 'PENDING', $4, $5)
`, event.OrderID, event.CustomerID, event.Total.String(),
len(event.Items), event.CreatedAt)
return err
}
func (p *OrderProjector) OnOrderPaid(event OrderPaidEvent) error {
_, err := p.readDB.Exec(`
UPDATE order_summaries SET status = 'PAID' WHERE order_id = $1
`, event.OrderID)
return err
}CQRS Anti-Patterns
CQRS反模式
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Premature CQRS | Simple CRUD app with CQRS | Remove CQRS. Use simple CRUD |
| Shared Database | Command and Query use same DB | Separate schemas or physical DBs |
| Eventual Consistency Confusion | Users expect immediate read-after-write | Show "processing" state, or use synchronous projection |
| Over-Projection | Projecting every field to read model | Only project fields needed for queries |
| 反模式 | 症状 | 修复方案 |
|---|---|---|
| 过早使用CQRS | 简单CRUD应用却使用CQRS | 移除CQRS,使用简单CRUD |
| 共享数据库 | 命令与查询使用同一数据库 | 分离schema或使用独立物理数据库 |
| 最终一致性混淆 | 用户期望写入后立即读取到结果 | 显示"处理中"状态,或使用同步投影 |
| 过度投影 | 将所有字段都投影到读模型 | 仅投影查询所需的字段 |
2️⃣ Event Sourcing
2️⃣ Event Sourcing(事件溯源)
Definition: Store the state of an application as a sequence of events. The current state is derived by replaying events.
定义: 将应用状态存储为一系列事件的序列,当前状态通过重放事件推导得出。
Why Event Sourcing?
为什么选择Event Sourcing?
| Benefit | Description |
|---|---|
| Full Audit Trail | Every change is recorded with timestamp and reason |
| Temporal Queries | "What was the state at time T?" |
| Event Replay | Rebuild read models, debug issues by replaying |
| Natural CQRS | Events feed read model projections |
| 优势 | 描述 |
|---|---|
| 完整审计追踪 | 所有变更都记录有时间戳和原因 |
| 时态查询 | 支持查询"时间T点的状态是什么?" |
| 事件重放 | 重建读模型,通过重放事件调试问题 |
| 天然适配CQRS | 事件为读模型投影提供数据源 |
Architecture
架构
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Command │────▶│ Aggregate │────▶│ Event │
│ Handler │ │ (applies │ │ Store │
│ │ │ events) │ │ (Append-only)│
└─────────────┘ └──────┬──────┘ └─────────────┘
│
│ generates
▼
┌─────────────┐
│ Domain │
│ Events │
└──────┬──────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Read │ │ Read │ │ Read │
│ Model 1 │ │ Model 2 │ │ Model 3 │
└─────────┘ └─────────┘ └─────────┘┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Command │────▶│ Aggregate │────▶│ Event │
│ Handler │ │ (applies │ │ Store │
│ │ │ events) │ │ (Append-only)│
└─────────────┘ └──────┬──────┘ └─────────────┘
│
│ generates
▼
┌─────────────┐
│ Domain │
│ Events │
└──────┬──────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Read │ │ Read │ │ Read │
│ Model 1 │ │ Model 2 │ │ Model 3 │
└─────────┘ └─────────┘ └─────────┘Implementation Example (Go)
实现示例(Go)
go
// Event Store interface
type EventStore interface {
Append(ctx context.Context, streamID string, events []DomainEvent, expectedVersion int) error
Read(ctx context.Context, streamID string) ([]StoredEvent, error)
ReadAll(ctx context.Context, afterPosition int64) ([]StoredEvent, error)
}
// Stored event in DB
type StoredEvent struct {
StreamID string
StreamType string
Position int64
EventType string
EventData json.RawMessage
OccurredAt time.Time
}
// Aggregate with event sourcing
type Order struct {
ID uuid.UUID
Version int // optimistic concurrency
// state derived from events
CustomerID uuid.UUID
Items []OrderItem
Status OrderStatus
Total Money
// uncommitted events
pendingEvents []DomainEvent
}
// Apply event to mutate state
func (o *Order) apply(event DomainEvent) {
switch e := event.(type) {
case OrderCreatedEvent:
o.ID = e.OrderID
o.CustomerID = e.CustomerID
o.Status = OrderStatusPending
o.Total = e.Total
case OrderItemAddedEvent:
o.Items = append(o.Items, OrderItem{
ProductID: e.ProductID,
Quantity: e.Quantity,
UnitPrice: e.UnitPrice,
})
case OrderPaidEvent:
o.Status = OrderStatusPaid
}
o.Version++
}
// Reconstruct from events
func OrderFromEvents(events []DomainEvent) *Order {
order := &Order{}
for _, event := range events {
order.apply(event)
}
return order
}
// Command creates event
func (o *Order) Pay(payment Money) error {
if o.Status != OrderStatusPending {
return errors.New("order not pending")
}
if !o.Total.Equals(payment) {
return errors.New("payment mismatch")
}
o.pendingEvents = append(o.pendingEvents, OrderPaidEvent{
OrderID: o.ID,
Amount: payment,
PaidAt: time.Now(),
})
o.apply(o.pendingEvents[len(o.pendingEvents)-1])
return nil
}
// Repository loads and saves
func (r *EventSourcedOrderRepository) FindByID(
ctx context.Context,
id uuid.UUID,
) (*Order, error) {
events, err := r.eventStore.Read(ctx, "order-"+id.String())
if err != nil {
return nil, err
}
domainEvents := make([]DomainEvent, len(events))
for i, e := range events {
domainEvents[i] = deserialize(e.EventType, e.EventData)
}
return OrderFromEvents(domainEvents), nil
}
func (r *EventSourcedOrderRepository) Save(
ctx context.Context,
order *Order,
) error {
if len(order.pendingEvents) == 0 {
return nil
}
err := r.eventStore.Append(
ctx,
"order-"+order.ID.String(),
order.pendingEvents,
order.Version-len(order.pendingEvents), // expected version
)
if err != nil {
return err // optimistic concurrency conflict
}
order.pendingEvents = nil
return nil
}go
// Event Store interface
type EventStore interface {
Append(ctx context.Context, streamID string, events []DomainEvent, expectedVersion int) error
Read(ctx context.Context, streamID string) ([]StoredEvent, error)
ReadAll(ctx context.Context, afterPosition int64) ([]StoredEvent, error)
}
// Stored event in DB
type StoredEvent struct {
StreamID string
StreamType string
Position int64
EventType string
EventData json.RawMessage
OccurredAt time.Time
}
// Aggregate with event sourcing
type Order struct {
ID uuid.UUID
Version int // optimistic concurrency
// state derived from events
CustomerID uuid.UUID
Items []OrderItem
Status OrderStatus
Total Money
// uncommitted events
pendingEvents []DomainEvent
}
// Apply event to mutate state
func (o *Order) apply(event DomainEvent) {
switch e := event.(type) {
case OrderCreatedEvent:
o.ID = e.OrderID
o.CustomerID = e.CustomerID
o.Status = OrderStatusPending
o.Total = e.Total
case OrderItemAddedEvent:
o.Items = append(o.Items, OrderItem{
ProductID: e.ProductID,
Quantity: e.Quantity,
UnitPrice: e.UnitPrice,
})
case OrderPaidEvent:
o.Status = OrderStatusPaid
}
o.Version++
}
// Reconstruct from events
func OrderFromEvents(events []DomainEvent) *Order {
order := &Order{}
for _, event := range events {
order.apply(event)
}
return order
}
// Command creates event
func (o *Order) Pay(payment Money) error {
if o.Status != OrderStatusPending {
return errors.New("order not pending")
}
if !o.Total.Equals(payment) {
return errors.New("payment mismatch")
}
o.pendingEvents = append(o.pendingEvents, OrderPaidEvent{
OrderID: o.ID,
Amount: payment,
PaidAt: time.Now(),
})
o.apply(o.pendingEvents[len(o.pendingEvents)-1])
return nil
}
// Repository loads and saves
func (r *EventSourcedOrderRepository) FindByID(
ctx context.Context,
id uuid.UUID,
) (*Order, error) {
events, err := r.eventStore.Read(ctx, "order-"+id.String())
if err != nil {
return nil, err
}
domainEvents := make([]DomainEvent, len(events))
for i, e := range events {
domainEvents[i] = deserialize(e.EventType, e.EventData)
}
return OrderFromEvents(domainEvents), nil
}
func (r *EventSourcedOrderRepository) Save(
ctx context.Context,
order *Order,
) error {
if len(order.pendingEvents) == 0 {
return nil
}
err := r.eventStore.Append(
ctx,
"order-"+order.ID.String(),
order.pendingEvents,
order.Version-len(order.pendingEvents), // expected version
)
if err != nil {
return err // optimistic concurrency conflict
}
order.pendingEvents = nil
return nil
}Event Sourcing Anti-Patterns
Event Sourcing反模式
| Anti-Pattern | Symptom | Fix |
|---|---|---|
| Giant Event | One event captures entire aggregate state | Split into granular business facts |
| No Schema Versioning | Can't evolve event schema | Add version field, support upcasting |
| Missing Snapshot | Replay 10,000 events to load aggregate | Snapshot every N events |
| Event as API | External systems consume raw events | Use published language / integration events |
| 反模式 | 症状 | 修复方案 |
|---|---|---|
| 巨型事件 | 单个事件包含整个聚合根状态 | 拆分为细粒度的业务事实事件 |
| 无事件Schema版本控制 | 无法演进事件Schema | 添加版本字段,支持向上转换 |
| 缺少快照 | 加载聚合根需要重放10000个事件 | 每N个事件生成一次快照 |
| 将事件作为API | 外部系统直接消费原始事件 | 使用发布语言/集成事件 |
3️⃣ Outbox Pattern
3️⃣ Outbox模式
Definition: Ensure atomic consistency between database state and message publication by writing messages to an "outbox" table in the same transaction.
定义: 通过在同一事务中将消息写入"outbox"表,确保数据库状态与消息发布的原子一致性。
The Problem
问题场景
Without Outbox:
1. Save order to DB ✅
2. Publish OrderPlaced event ❌ (network failure)
→ DB has order, but no event. Inconsistent.
With Outbox:
1. Save order to DB ✅
2. Save event to outbox table (same transaction) ✅
3. Background process reads outbox and publishes ✅
→ Atomic. Either both succeed or both fail.不使用Outbox:
1. 将订单保存到数据库 ✅
2. 发布OrderPlaced事件 ❌(网络故障)
→ 数据库中有订单,但无事件,数据不一致。
使用Outbox:
1. 将订单保存到数据库 ✅
2. 将事件保存到outbox表(同一事务) ✅
3. 后台进程读取outbox并发布事件 ✅
→ 原子性操作,要么全部成功,要么全部失败。Implementation (Go + PostgreSQL)
实现(Go + PostgreSQL)
go
// Outbox table schema:
// CREATE TABLE outbox (
// id SERIAL PRIMARY KEY,
// aggregate_type VARCHAR(255) NOT NULL,
// aggregate_id VARCHAR(255) NOT NULL,
// event_type VARCHAR(255) NOT NULL,
// payload JSONB NOT NULL,
// created_at TIMESTAMP NOT NULL DEFAULT NOW(),
// published_at TIMESTAMP,
// UNIQUE(aggregate_type, aggregate_id, event_type, payload)
// );
// Repository saves aggregate + outbox in one transaction
func (r *OrderRepository) Save(ctx context.Context, order *Order) error {
tx, err := r.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// 1. Save aggregate state
if err := r.saveOrder(tx, order); err != nil {
return err
}
// 2. Save events to outbox (same transaction!)
events := order.PullEvents()
for _, event := range events {
payload, _ := json.Marshal(event)
_, err := tx.ExecContext(ctx, `
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES ($1, $2, $3, $4)
ON CONFLICT DO NOTHING
`, "order", order.ID.String(), event.EventName(), payload)
if err != nil {
return err
}
}
return tx.Commit()
}
// Background relay process
type OutboxRelay struct {
db *sql.DB
eventBus EventBus
pollInterval time.Duration
}
func (r *OutboxRelay) Start(ctx context.Context) {
ticker := time.NewTicker(r.pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
r.processOutbox(ctx)
}
}
}
func (r *OutboxRelay) processOutbox(ctx context.Context) error {
rows, err := r.db.QueryContext(ctx, `
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox
WHERE published_at IS NULL
ORDER BY id
LIMIT 100
FOR UPDATE SKIP LOCKED
`)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var id int
var aggType, aggID, eventType string
var payload []byte
rows.Scan(&id, &aggType, &aggID, &eventType, &payload)
// Publish to event bus
event := deserialize(eventType, payload)
if err := r.eventBus.Publish(ctx, event); err != nil {
log.Printf("failed to publish outbox event %d: %v", id, err)
continue // will retry on next poll
}
// Mark as published
_, err = r.db.ExecContext(ctx, `
UPDATE outbox SET published_at = NOW() WHERE id = $1
`, id)
if err != nil {
log.Printf("failed to mark outbox event %d as published: %v", id, err)
}
}
return rows.Err()
}go
// Outbox table schema:
// CREATE TABLE outbox (
// id SERIAL PRIMARY KEY,
// aggregate_type VARCHAR(255) NOT NULL,
// aggregate_id VARCHAR(255) NOT NULL,
// event_type VARCHAR(255) NOT NULL,
// payload JSONB NOT NULL,
// created_at TIMESTAMP NOT NULL DEFAULT NOW(),
// published_at TIMESTAMP,
// UNIQUE(aggregate_type, aggregate_id, event_type, payload)
// );
// Repository saves aggregate + outbox in one transaction
func (r *OrderRepository) Save(ctx context.Context, order *Order) error {
tx, err := r.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// 1. Save aggregate state
if err := r.saveOrder(tx, order); err != nil {
return err
}
// 2. Save events to outbox (same transaction!)
events := order.PullEvents()
for _, event := range events {
payload, _ := json.Marshal(event)
_, err := tx.ExecContext(ctx, `
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES ($1, $2, $3, $4)
ON CONFLICT DO NOTHING
`, "order", order.ID.String(), event.EventName(), payload)
if err != nil {
return err
}
}
return tx.Commit()
}
// Background relay process
type OutboxRelay struct {
db *sql.DB
eventBus EventBus
pollInterval time.Duration
}
func (r *OutboxRelay) Start(ctx context.Context) {
ticker := time.NewTicker(r.pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
r.processOutbox(ctx)
}
}
}
func (r *OutboxRelay) processOutbox(ctx context.Context) error {
rows, err := r.db.QueryContext(ctx, `
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox
WHERE published_at IS NULL
ORDER BY id
LIMIT 100
FOR UPDATE SKIP LOCKED
`)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var id int
var aggType, aggID, eventType string
var payload []byte
rows.Scan(&id, &aggType, &aggID, &eventType, &payload)
// Publish to event bus
event := deserialize(eventType, payload)
if err := r.eventBus.Publish(ctx, event); err != nil {
log.Printf("failed to publish outbox event %d: %v", id, err)
continue // will retry on next poll
}
// Mark as published
_, err = r.db.ExecContext(ctx, `
UPDATE outbox SET published_at = NOW() WHERE id = $1
`, id)
if err != nil {
log.Printf("failed to mark outbox event %d as published: %v", id, err)
}
}
return rows.Err()
}4️⃣ Saga Pattern
4️⃣ Saga模式
Definition: A sequence of local transactions where each transaction updates data and publishes an event/message to trigger the next transaction. If a step fails, compensating transactions undo previous steps.
定义: 一系列本地事务的序列,每个事务更新数据并发布事件/消息以触发下一个事务。如果某一步失败,补偿事务会撤销之前的步骤。
Types
类型
| Type | Coordination | When to Use |
|---|---|---|
| Orchestration | Central coordinator saga | Complex flows, need visibility |
| Choreography | Event-driven, no central coordinator | Simple flows, loose coupling |
| 类型 | 协调方式 | 适用场景 |
|---|---|---|
| 编排型 | 中央协调器管理Saga | 复杂流程,需要可见性 |
| ** choreography( choreography型)** | 事件驱动,无中央协调器 | 简单流程,松耦合场景 |
Orchestration Saga Example: Order Processing
编排型Saga示例:订单处理
┌─────────────┐
│ Saga │
│ Coordinator │
└──────┬──────┘
│ 1. ReserveInventory
▼
┌─────────────┐ 2. InventoryReserved
│ Inventory │────────────────▶
│ Service │
└─────────────┘
▲
│ 3. ProcessPayment
▼
┌─────────────┐ 4. PaymentConfirmed
│ Payment │────────────────▶
│ Service │
└─────────────┘
▲
│ 5. CreateShipment
▼
┌─────────────┐ 6. ShipmentCreated
│ Shipping │────────────────▶
│ Service │
└─────────────┘
│
│ (if any step fails)
▼
┌─────────────┐
│ Compensate: │
│ - ReleaseInventory
│ - RefundPayment
│ - CancelShipment
└─────────────┘┌─────────────┐
│ Saga │
│ Coordinator │
└──────┬──────┘
│ 1. ReserveInventory
▼
┌─────────────┐ 2. InventoryReserved
│ Inventory │────────────────▶
│ Service │
└─────────────┘
▲
│ 3. ProcessPayment
▼
┌─────────────┐ 4. PaymentConfirmed
│ Payment │────────────────▶
│ Service │
└─────────────┘
▲
│ 5. CreateShipment
▼
┌─────────────┐ 6. ShipmentCreated
│ Shipping │────────────────▶
│ Service │
└─────────────┘
│
│ (if any step fails)
▼
┌─────────────┐
│ Compensate: │
│ - ReleaseInventory
│ - RefundPayment
│ - CancelShipment
└─────────────┘Go Implementation (Orchestration)
Go实现(编排型)
go
type OrderSaga struct {
inventorySvc InventoryClient
paymentSvc PaymentClient
shippingSvc ShippingClient
sagaRepo SagaRepository
}
func (s *OrderSaga) Start(ctx context.Context, order Order) error {
saga := SagaState{
ID: uuid.New(),
OrderID: order.ID,
Status: SagaStatusStarted,
Steps: []SagaStep{
{Name: "reserve_inventory", Status: StepStatusPending},
{Name: "process_payment", Status: StepStatusPending},
{Name: "create_shipment", Status: StepStatusPending},
},
}
if err := s.sagaRepo.Save(ctx, saga); err != nil {
return err
}
// Step 1: Reserve Inventory
reservation, err := s.inventorySvc.Reserve(ctx, order.Items)
if err != nil {
return s.compensate(ctx, saga) // nothing to undo yet
}
saga.Steps[0].Status = StepStatusCompleted
saga.Steps[0].Result = reservation.ID
s.sagaRepo.Save(ctx, saga)
// Step 2: Process Payment
payment, err := s.paymentSvc.Charge(ctx, order.Total, order.CustomerID)
if err != nil {
return s.compensate(ctx, saga) // undo inventory reservation
}
saga.Steps[1].Status = StepStatusCompleted
saga.Steps[1].Result = payment.ID
s.sagaRepo.Save(ctx, saga)
// Step 3: Create Shipment
shipment, err := s.shippingSvc.Create(ctx, order.ID, order.ShippingAddress)
if err != nil {
return s.compensate(ctx, saga) // undo payment + inventory
}
saga.Steps[2].Status = StepStatusCompleted
saga.Steps[2].Result = shipment.ID
saga.Status = SagaStatusCompleted
s.sagaRepo.Save(ctx, saga)
return nil
}
func (s *OrderSaga) compensate(ctx context.Context, saga SagaState) error {
saga.Status = SagaStatusCompensating
s.sagaRepo.Save(ctx, saga)
// Compensate in reverse order
for i := len(saga.Steps) - 1; i >= 0; i-- {
step := saga.Steps[i]
if step.Status != StepStatusCompleted {
continue
}
switch step.Name {
case "create_shipment":
s.shippingSvc.Cancel(ctx, step.Result)
case "process_payment":
s.paymentSvc.Refund(ctx, step.Result)
case "reserve_inventory":
s.inventorySvc.Release(ctx, step.Result)
}
step.Status = StepStatusCompensated
s.sagaRepo.Save(ctx, saga)
}
saga.Status = SagaStatusCompensated
return s.sagaRepo.Save(ctx, saga)
}go
type OrderSaga struct {
inventorySvc InventoryClient
paymentSvc PaymentClient
shippingSvc ShippingClient
sagaRepo SagaRepository
}
func (s *OrderSaga) Start(ctx context.Context, order Order) error {
saga := SagaState{
ID: uuid.New(),
OrderID: order.ID,
Status: SagaStatusStarted,
Steps: []SagaStep{
{Name: "reserve_inventory", Status: StepStatusPending},
{Name: "process_payment", Status: StepStatusPending},
{Name: "create_shipment", Status: StepStatusPending},
},
}
if err := s.sagaRepo.Save(ctx, saga); err != nil {
return err
}
// Step 1: Reserve Inventory
reservation, err := s.inventorySvc.Reserve(ctx, order.Items)
if err != nil {
return s.compensate(ctx, saga) // nothing to undo yet
}
saga.Steps[0].Status = StepStatusCompleted
saga.Steps[0].Result = reservation.ID
s.sagaRepo.Save(ctx, saga)
// Step 2: Process Payment
payment, err := s.paymentSvc.Charge(ctx, order.Total, order.CustomerID)
if err != nil {
return s.compensate(ctx, saga) // undo inventory reservation
}
saga.Steps[1].Status = StepStatusCompleted
saga.Steps[1].Result = payment.ID
s.sagaRepo.Save(ctx, saga)
// Step 3: Create Shipment
shipment, err := s.shippingSvc.Create(ctx, order.ID, order.ShippingAddress)
if err != nil {
return s.compensate(ctx, saga) // undo payment + inventory
}
saga.Steps[2].Status = StepStatusCompleted
saga.Steps[2].Result = shipment.ID
saga.Status = SagaStatusCompleted
s.sagaRepo.Save(ctx, saga)
return nil
}
func (s *OrderSaga) compensate(ctx context.Context, saga SagaState) error {
saga.Status = SagaStatusCompensating
s.sagaRepo.Save(ctx, saga)
// Compensate in reverse order
for i := len(saga.Steps) - 1; i >= 0; i-- {
step := saga.Steps[i]
if step.Status != StepStatusCompleted {
continue
}
switch step.Name {
case "create_shipment":
s.shippingSvc.Cancel(ctx, step.Result)
case "process_payment":
s.paymentSvc.Refund(ctx, step.Result)
case "reserve_inventory":
s.inventorySvc.Release(ctx, step.Result)
}
step.Status = StepStatusCompensated
s.sagaRepo.Save(ctx, saga)
}
saga.Status = SagaStatusCompensated
return s.sagaRepo.Save(ctx, saga)
}5️⃣ Anti-Corruption Layer (ACL)
5️⃣ Anti-Corruption Layer(ACL,防腐层)
Definition: A translation layer that isolates your domain model from external systems' incompatible models.
定义: 一个转换层,将你的领域模型与外部系统的不兼容模型隔离开。
Why ACL?
为什么使用ACL?
Without ACL:
Your Domain ──direct uses──▶ External Model (leaks into your domain)
With ACL:
Your Domain ──uses──▶ ACL ──translates──▶ External Model不使用ACL:
你的领域模型 ──直接调用──▶ 外部模型(外部模型渗透到你的领域)
使用ACL:
你的领域模型 ──调用──▶ ACL ──转换──▶ 外部模型Implementation (Go)
实现(Go)
go
// Your domain model (clean, no external dependencies)
package domain
type Order struct {
ID uuid.UUID
Total Money
Status OrderStatus
}
// External API model (messy, you don't control it)
package external
type StripeCharge struct {
ID string `json:"id"`
Amount int `json:"amount"` // in cents!
Currency string `json:"currency"`
Status string `json:"status"`
ReceiptURL string `json:"receipt_url"`
FailureCode string `json:"failure_code"`
FailureMessage string `json:"failure_message"`
}
// ACL: translates between your domain and external model
package acl
type StripePaymentAdapter struct {
client *stripe.Client
}
func (a *StripePaymentAdapter) Charge(
ctx context.Context,
amount domain.Money,
customerID uuid.UUID,
) (domain.PaymentResult, error) {
// Translate domain → external
stripeAmount := amount.Amount.Mul(decimal.NewFromInt(100)).IntPart() // cents
charge, err := a.client.CreateCharge(ctx, external.StripeChargeRequest{
Amount: stripeAmount,
Currency: strings.ToLower(amount.Currency),
Customer: customerID.String(),
})
if err != nil {
return domain.PaymentResult{}, fmt.Errorf("stripe charge failed: %w", err)
}
// Translate external → domain
return a.toDomainResult(charge), nil
}
func (a *StripePaymentAdapter) toDomainResult(
charge external.StripeCharge,
) domain.PaymentResult {
status := domain.PaymentStatusPending
switch charge.Status {
case "succeeded":
status = domain.PaymentStatusSucceeded
case "failed":
status = domain.PaymentStatusFailed
}
amount := decimal.NewFromInt(int64(charge.Amount)).Div(decimal.NewFromInt(100))
return domain.PaymentResult{
TransactionID: charge.ID,
Amount: domain.MustNewMoney(amount, strings.ToUpper(charge.Currency)),
Status: status,
ReceiptURL: charge.ReceiptURL,
FailureReason: charge.FailureMessage,
}
}go
// Your domain model (clean, no external dependencies)
package domain
type Order struct {
ID uuid.UUID
Total Money
Status OrderStatus
}
// External API model (messy, you don't control it)
package external
type StripeCharge struct {
ID string `json:"id"`
Amount int `json:"amount"` // in cents!
Currency string `json:"currency"`
Status string `json:"status"`
ReceiptURL string `json:"receipt_url"`
FailureCode string `json:"failure_code"`
FailureMessage string `json:"failure_message"`
}
// ACL: translates between your domain and external model
package acl
type StripePaymentAdapter struct {
client *stripe.Client
}
func (a *StripePaymentAdapter) Charge(
ctx context.Context,
amount domain.Money,
customerID uuid.UUID,
) (domain.PaymentResult, error) {
// Translate domain → external
stripeAmount := amount.Amount.Mul(decimal.NewFromInt(100)).IntPart() // cents
charge, err := a.client.CreateCharge(ctx, external.StripeChargeRequest{
Amount: stripeAmount,
Currency: strings.ToLower(amount.Currency),
Customer: customerID.String(),
})
if err != nil {
return domain.PaymentResult{}, fmt.Errorf("stripe charge failed: %w", err)
}
// Translate external → domain
return a.toDomainResult(charge), nil
}
func (a *StripePaymentAdapter) toDomainResult(
charge external.StripeCharge,
) domain.PaymentResult {
status := domain.PaymentStatusPending
switch charge.Status {
case "succeeded":
status = domain.PaymentStatusSucceeded
case "failed":
status = domain.PaymentStatusFailed
}
amount := decimal.NewFromInt(int64(charge.Amount)).Div(decimal.NewFromInt(100))
return domain.PaymentResult{
TransactionID: charge.ID,
Amount: domain.MustNewMoney(amount, strings.ToUpper(charge.Currency)),
Status: status,
ReceiptURL: charge.ReceiptURL,
FailureReason: charge.FailureMessage,
}
}📋 Pattern Selection Flowchart
📋 模式选择流程图
Do you need to separate read/write models?
├── YES → Do you need full audit trail?
│ ├── YES → Event Sourcing + CQRS
│ └── NO → CQRS only
│
├── NO → Do you need cross-service consistency?
│ ├── YES → Do you need to undo on failure?
│ │ ├── YES → Saga Pattern
│ │ └── NO → Outbox Pattern
│ │
│ └── NO → Is there an external system with different model?
│ ├── YES → Anti-Corruption Layer
│ └── NO → Standard DDD Tactical patterns是否需要分离读/写模型?
├── 是 → 是否需要完整审计追踪?
│ ├── 是 → Event Sourcing + CQRS
│ └── 否 → 仅使用CQRS
│
├── 否 → 是否需要跨服务一致性?
│ ├── 是 → 是否需要在失败时回滚?
│ │ ├── 是 → Saga模式
│ │ └── 否 → Outbox模式
│ │
│ └── 否 → 是否存在模型不兼容的外部系统?
│ ├── 是 → Anti-Corruption Layer
│ └── 否 → 标准DDD战术模式📚 References
📚 参考资料
- CQRS, Task Based UIs, Event Sourcing — Greg Young
- Exploring CQRS and Event Sourcing — Microsoft patterns & practices
- The Outbox Pattern — Chris Richardson
- Saga Pattern — Chris Richardson
- Anti-Corruption Layer — Azure Architecture Center
- Domain-Driven Design: Tackling Complexity — Eric Evans
- CQRS, Task Based UIs, Event Sourcing — Greg Young
- Exploring CQRS and Event Sourcing — Microsoft patterns & practices
- The Outbox Pattern — Chris Richardson
- Saga Pattern — Chris Richardson
- Anti-Corruption Layer — Azure Architecture Center
- Domain-Driven Design: Tackling Complexity — Eric Evans