ddd-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Domain-Driven Design (DDD): Advanced Patterns

领域驱动设计(DDD):高级模式

Advanced integration patterns for handling complex distributed system interactions, data consistency, and cross-boundary communication. Use these patterns when a single Bounded Context's tactical patterns are insufficient.
"Distributed systems are different. If we don't know that, we are building them wrong." — Udi Dahan

用于处理复杂分布式系统交互、数据一致性和跨边界通信的高级集成模式。当单一限界上下文的战术模式不足以满足需求时,可使用这些模式。
"分布式系统是不同的。如果我们不了解这一点,就会构建出错误的系统。" —— Udi Dahan

🧭 When to Use Advanced Patterns

🧭 何时使用高级模式

SituationPattern(s)Complexity
Read model needs different shape than write modelCQRSMedium
Need full audit trail of all state changesEvent SourcingHigh
Need atomic DB + message broker consistencyOutboxMedium
Long-running business process across contextsSagaHigh
External system has incompatible modelACLMedium
High read load, simple queriesCQRS + Read ModelMedium
Warning: These patterns add significant complexity. Don't use them unless you have a clear problem they solve.

场景适用模式复杂度
读模型与写模型结构需求不同CQRS中等
需要所有状态变更的完整审计追踪Event Sourcing
需要数据库与消息代理的原子一致性Outbox中等
跨上下文的长期业务流程Saga
外部系统存在不兼容模型ACL中等
高读负载、简单查询场景CQRS + 读模型中等
警告: 这些模式会显著增加系统复杂度。除非有明确的问题需要解决,否则不要使用。

1️⃣ CQRS (Command Query Responsibility Segregation)

1️⃣ CQRS(命令查询职责分离)

Definition: Separate the read (Query) model from the write (Command) model. Each is optimized for its purpose.
定义: 将读(Query)模型与写(Command)模型分离,各自针对其用途进行优化。

Why CQRS?

为什么选择CQRS?

AspectTraditional CRUDCQRS
Read modelSame as write modelOptimized for queries (denormalized)
Write modelSame as read modelOptimized for business rules (normalized)
SchemaSingle schemaSeparate schemas per model
ScalingScale togetherScale independently
维度传统CRUDCQRS
读模型与写模型一致针对查询优化(非规范化)
写模型与读模型一致针对业务规则优化(规范化)
数据库 schema单一 schema每个模型使用独立 schema
扩容方式读写一起扩容读写独立扩容

Architecture

架构

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Client    │────▶│   Command   │────▶│  Command    │
│             │     │   API       │     │  Handler    │
└─────────────┘     └─────────────┘     └──────┬──────┘
                                        ┌─────────────┐
                                        │  Write DB   │
                                        │ (Normalized)│
                                        │ PostgreSQL  │
                                        └──────┬──────┘
                                               │ Events
                                        ┌─────────────┐
                                        │  Event Bus  │
                                        │   (Kafka)   │
                                        └──────┬──────┘
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Client    │◀────│   Query     │◀────│  Projector  │
│             │     │   API       │     │  (Consumer) │
└─────────────┘     └─────────────┘     └──────┬──────┘
                                        ┌─────────────┐
                                        │  Read DB    │
                                        │(Denormalized)│
                                        │Elasticsearch│
                                        └─────────────┘
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Client    │────▶│   Command   │────▶│  Command    │
│             │     │   API       │     │  Handler    │
└─────────────┘     └─────────────┘     └──────┬──────┘
                                        ┌─────────────┐
                                        │  Write DB   │
                                        │ (Normalized)│
                                        │ PostgreSQL  │
                                        └──────┬──────┘
                                               │ Events
                                        ┌─────────────┐
                                        │  Event Bus  │
                                        │   (Kafka)   │
                                        └──────┬──────┘
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Client    │◀────│   Query     │◀────│  Projector  │
│             │     │   API       │     │  (Consumer) │
└─────────────┘     └─────────────┘     └──────┬──────┘
                                        ┌─────────────┐
                                        │  Read DB    │
                                        │(Denormalized)│
                                        │Elasticsearch│
                                        └─────────────┘

Implementation Example (Go)

实现示例(Go)

go
// ============ COMMAND SIDE ============

// Command
type PlaceOrderCommand struct {
    CustomerID uuid.UUID
    Items      []OrderItemDTO
    CouponCode string
}

// Command Handler
type PlaceOrderHandler struct {
    orderRepo   order.Repository
    pricingSvc  *PricingService
    eventBus    EventBus
}

func (h *PlaceOrderHandler) Handle(ctx context.Context, cmd PlaceOrderCommand) error {
    // 1. Load/build aggregate
    items := convertItems(cmd.Items)
    total := h.pricingSvc.CalculateTotal(items, cmd.CouponCode)

    order, err := order.New(cmd.CustomerID, items, total)
    if err != nil {
        return err
    }

    // 2. Save aggregate (generates events)
    if err := h.orderRepo.Save(ctx, order); err != nil {
        return err
    }

    // 3. Events published by repository
    return nil
}

// ============ QUERY SIDE ============

// Read Model (denormalized for specific query)
type OrderSummary struct {
    OrderID      uuid.UUID
    CustomerName string
    Total        string
    Status       string
    ItemCount    int
    CreatedAt    time.Time
}

// Query
type GetCustomerOrdersQuery struct {
    CustomerID uuid.UUID
    Page       int
    PageSize   int
}

// Query Handler
type OrderQueryHandler struct {
    readDB *sql.DB // separate read-optimized DB
}

func (h *OrderQueryHandler) Handle(
    ctx context.Context,
    q GetCustomerOrdersQuery,
) ([]OrderSummary, error) {
    // Optimized query — no joins needed
    rows, err := h.readDB.QueryContext(ctx, `
        SELECT order_id, customer_name, total, status, item_count, created_at
        FROM order_summaries
        WHERE customer_id = $1
        ORDER BY created_at DESC
        LIMIT $2 OFFSET $3
    `, q.CustomerID, q.PageSize, (q.Page-1)*q.PageSize)
    // ... scan rows
}

// ============ PROJECTOR (syncs read model) ============

type OrderProjector struct {
    readDB *sql.DB
}

func (p *OrderProjector) OnOrderPlaced(event OrderPlacedEvent) error {
    _, err := p.readDB.Exec(`
        INSERT INTO order_summaries (order_id, customer_id, total, status, item_count, created_at)
        VALUES ($1, $2, $3, 'PENDING', $4, $5)
    `, event.OrderID, event.CustomerID, event.Total.String(),
        len(event.Items), event.CreatedAt)
    return err
}

func (p *OrderProjector) OnOrderPaid(event OrderPaidEvent) error {
    _, err := p.readDB.Exec(`
        UPDATE order_summaries SET status = 'PAID' WHERE order_id = $1
    `, event.OrderID)
    return err
}
go
// ============ COMMAND SIDE ============

// Command
type PlaceOrderCommand struct {
    CustomerID uuid.UUID
    Items      []OrderItemDTO
    CouponCode string
}

// Command Handler
type PlaceOrderHandler struct {
    orderRepo   order.Repository
    pricingSvc  *PricingService
    eventBus    EventBus
}

func (h *PlaceOrderHandler) Handle(ctx context.Context, cmd PlaceOrderCommand) error {
    // 1. Load/build aggregate
    items := convertItems(cmd.Items)
    total := h.pricingSvc.CalculateTotal(items, cmd.CouponCode)

    order, err := order.New(cmd.CustomerID, items, total)
    if err != nil {
        return err
    }

    // 2. Save aggregate (generates events)
    if err := h.orderRepo.Save(ctx, order); err != nil {
        return err
    }

    // 3. Events published by repository
    return nil
}

// ============ QUERY SIDE ============

// Read Model (denormalized for specific query)
type OrderSummary struct {
    OrderID      uuid.UUID
    CustomerName string
    Total        string
    Status       string
    ItemCount    int
    CreatedAt    time.Time
}

// Query
type GetCustomerOrdersQuery struct {
    CustomerID uuid.UUID
    Page       int
    PageSize   int
}

// Query Handler
type OrderQueryHandler struct {
    readDB *sql.DB // separate read-optimized DB
}

func (h *OrderQueryHandler) Handle(
    ctx context.Context,
    q GetCustomerOrdersQuery,
) ([]OrderSummary, error) {
    // Optimized query — no joins needed
    rows, err := h.readDB.QueryContext(ctx, `
        SELECT order_id, customer_name, total, status, item_count, created_at
        FROM order_summaries
        WHERE customer_id = $1
        ORDER BY created_at DESC
        LIMIT $2 OFFSET $3
    `, q.CustomerID, q.PageSize, (q.Page-1)*q.PageSize)
    // ... scan rows
}

// ============ PROJECTOR (syncs read model) ============

type OrderProjector struct {
    readDB *sql.DB
}

func (p *OrderProjector) OnOrderPlaced(event OrderPlacedEvent) error {
    _, err := p.readDB.Exec(`
        INSERT INTO order_summaries (order_id, customer_id, total, status, item_count, created_at)
        VALUES ($1, $2, $3, 'PENDING', $4, $5)
    `, event.OrderID, event.CustomerID, event.Total.String(),
        len(event.Items), event.CreatedAt)
    return err
}

func (p *OrderProjector) OnOrderPaid(event OrderPaidEvent) error {
    _, err := p.readDB.Exec(`
        UPDATE order_summaries SET status = 'PAID' WHERE order_id = $1
    `, event.OrderID)
    return err
}

CQRS Anti-Patterns

CQRS反模式

Anti-PatternSymptomFix
Premature CQRSSimple CRUD app with CQRSRemove CQRS. Use simple CRUD
Shared DatabaseCommand and Query use same DBSeparate schemas or physical DBs
Eventual Consistency ConfusionUsers expect immediate read-after-writeShow "processing" state, or use synchronous projection
Over-ProjectionProjecting every field to read modelOnly project fields needed for queries

反模式症状修复方案
过早使用CQRS简单CRUD应用却使用CQRS移除CQRS,使用简单CRUD
共享数据库命令与查询使用同一数据库分离schema或使用独立物理数据库
最终一致性混淆用户期望写入后立即读取到结果显示"处理中"状态,或使用同步投影
过度投影将所有字段都投影到读模型仅投影查询所需的字段

2️⃣ Event Sourcing

2️⃣ Event Sourcing(事件溯源)

Definition: Store the state of an application as a sequence of events. The current state is derived by replaying events.
定义: 将应用状态存储为一系列事件的序列,当前状态通过重放事件推导得出。

Why Event Sourcing?

为什么选择Event Sourcing?

BenefitDescription
Full Audit TrailEvery change is recorded with timestamp and reason
Temporal Queries"What was the state at time T?"
Event ReplayRebuild read models, debug issues by replaying
Natural CQRSEvents feed read model projections
优势描述
完整审计追踪所有变更都记录有时间戳和原因
时态查询支持查询"时间T点的状态是什么?"
事件重放重建读模型,通过重放事件调试问题
天然适配CQRS事件为读模型投影提供数据源

Architecture

架构

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Command   │────▶│  Aggregate  │────▶│  Event      │
│   Handler   │     │  (applies   │     │  Store      │
│             │     │   events)   │     │ (Append-only)│
└─────────────┘     └──────┬──────┘     └─────────────┘
                           │ generates
                    ┌─────────────┐
                    │  Domain     │
                    │  Events     │
                    └──────┬──────┘
              ┌────────────┼────────────┐
              ▼            ▼            ▼
        ┌─────────┐  ┌─────────┐  ┌─────────┐
        │ Read    │  │ Read    │  │ Read    │
        │ Model 1 │  │ Model 2 │  │ Model 3 │
        └─────────┘  └─────────┘  └─────────┘
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Command   │────▶│  Aggregate  │────▶│  Event      │
│   Handler   │     │  (applies   │     │  Store      │
│             │     │   events)   │     │ (Append-only)│
└─────────────┘     └──────┬──────┘     └─────────────┘
                           │ generates
                    ┌─────────────┐
                    │  Domain     │
                    │  Events     │
                    └──────┬──────┘
              ┌────────────┼────────────┐
              ▼            ▼            ▼
        ┌─────────┐  ┌─────────┐  ┌─────────┐
        │ Read    │  │ Read    │  │ Read    │
        │ Model 1 │  │ Model 2 │  │ Model 3 │
        └─────────┘  └─────────┘  └─────────┘

Implementation Example (Go)

实现示例(Go)

go
// Event Store interface
type EventStore interface {
    Append(ctx context.Context, streamID string, events []DomainEvent, expectedVersion int) error
    Read(ctx context.Context, streamID string) ([]StoredEvent, error)
    ReadAll(ctx context.Context, afterPosition int64) ([]StoredEvent, error)
}

// Stored event in DB
type StoredEvent struct {
    StreamID    string
    StreamType  string
    Position    int64
    EventType   string
    EventData   json.RawMessage
    OccurredAt  time.Time
}

// Aggregate with event sourcing
type Order struct {
    ID      uuid.UUID
    Version int // optimistic concurrency
    // state derived from events
    CustomerID uuid.UUID
    Items      []OrderItem
    Status     OrderStatus
    Total      Money
    // uncommitted events
    pendingEvents []DomainEvent
}

// Apply event to mutate state
func (o *Order) apply(event DomainEvent) {
    switch e := event.(type) {
    case OrderCreatedEvent:
        o.ID = e.OrderID
        o.CustomerID = e.CustomerID
        o.Status = OrderStatusPending
        o.Total = e.Total
    case OrderItemAddedEvent:
        o.Items = append(o.Items, OrderItem{
            ProductID: e.ProductID,
            Quantity:  e.Quantity,
            UnitPrice: e.UnitPrice,
        })
    case OrderPaidEvent:
        o.Status = OrderStatusPaid
    }
    o.Version++
}

// Reconstruct from events
func OrderFromEvents(events []DomainEvent) *Order {
    order := &Order{}
    for _, event := range events {
        order.apply(event)
    }
    return order
}

// Command creates event
func (o *Order) Pay(payment Money) error {
    if o.Status != OrderStatusPending {
        return errors.New("order not pending")
    }
    if !o.Total.Equals(payment) {
        return errors.New("payment mismatch")
    }
    o.pendingEvents = append(o.pendingEvents, OrderPaidEvent{
        OrderID: o.ID,
        Amount:  payment,
        PaidAt:  time.Now(),
    })
    o.apply(o.pendingEvents[len(o.pendingEvents)-1])
    return nil
}

// Repository loads and saves
func (r *EventSourcedOrderRepository) FindByID(
    ctx context.Context,
    id uuid.UUID,
) (*Order, error) {
    events, err := r.eventStore.Read(ctx, "order-"+id.String())
    if err != nil {
        return nil, err
    }
    domainEvents := make([]DomainEvent, len(events))
    for i, e := range events {
        domainEvents[i] = deserialize(e.EventType, e.EventData)
    }
    return OrderFromEvents(domainEvents), nil
}

func (r *EventSourcedOrderRepository) Save(
    ctx context.Context,
    order *Order,
) error {
    if len(order.pendingEvents) == 0 {
        return nil
    }
    err := r.eventStore.Append(
        ctx,
        "order-"+order.ID.String(),
        order.pendingEvents,
        order.Version-len(order.pendingEvents), // expected version
    )
    if err != nil {
        return err // optimistic concurrency conflict
    }
    order.pendingEvents = nil
    return nil
}
go
// Event Store interface
type EventStore interface {
    Append(ctx context.Context, streamID string, events []DomainEvent, expectedVersion int) error
    Read(ctx context.Context, streamID string) ([]StoredEvent, error)
    ReadAll(ctx context.Context, afterPosition int64) ([]StoredEvent, error)
}

// Stored event in DB
type StoredEvent struct {
    StreamID    string
    StreamType  string
    Position    int64
    EventType   string
    EventData   json.RawMessage
    OccurredAt  time.Time
}

// Aggregate with event sourcing
type Order struct {
    ID      uuid.UUID
    Version int // optimistic concurrency
    // state derived from events
    CustomerID uuid.UUID
    Items      []OrderItem
    Status     OrderStatus
    Total      Money
    // uncommitted events
    pendingEvents []DomainEvent
}

// Apply event to mutate state
func (o *Order) apply(event DomainEvent) {
    switch e := event.(type) {
    case OrderCreatedEvent:
        o.ID = e.OrderID
        o.CustomerID = e.CustomerID
        o.Status = OrderStatusPending
        o.Total = e.Total
    case OrderItemAddedEvent:
        o.Items = append(o.Items, OrderItem{
            ProductID: e.ProductID,
            Quantity:  e.Quantity,
            UnitPrice: e.UnitPrice,
        })
    case OrderPaidEvent:
        o.Status = OrderStatusPaid
    }
    o.Version++
}

// Reconstruct from events
func OrderFromEvents(events []DomainEvent) *Order {
    order := &Order{}
    for _, event := range events {
        order.apply(event)
    }
    return order
}

// Command creates event
func (o *Order) Pay(payment Money) error {
    if o.Status != OrderStatusPending {
        return errors.New("order not pending")
    }
    if !o.Total.Equals(payment) {
        return errors.New("payment mismatch")
    }
    o.pendingEvents = append(o.pendingEvents, OrderPaidEvent{
        OrderID: o.ID,
        Amount:  payment,
        PaidAt:  time.Now(),
    })
    o.apply(o.pendingEvents[len(o.pendingEvents)-1])
    return nil
}

// Repository loads and saves
func (r *EventSourcedOrderRepository) FindByID(
    ctx context.Context,
    id uuid.UUID,
) (*Order, error) {
    events, err := r.eventStore.Read(ctx, "order-"+id.String())
    if err != nil {
        return nil, err
    }
    domainEvents := make([]DomainEvent, len(events))
    for i, e := range events {
        domainEvents[i] = deserialize(e.EventType, e.EventData)
    }
    return OrderFromEvents(domainEvents), nil
}

func (r *EventSourcedOrderRepository) Save(
    ctx context.Context,
    order *Order,
) error {
    if len(order.pendingEvents) == 0 {
        return nil
    }
    err := r.eventStore.Append(
        ctx,
        "order-"+order.ID.String(),
        order.pendingEvents,
        order.Version-len(order.pendingEvents), // expected version
    )
    if err != nil {
        return err // optimistic concurrency conflict
    }
    order.pendingEvents = nil
    return nil
}

Event Sourcing Anti-Patterns

Event Sourcing反模式

Anti-PatternSymptomFix
Giant EventOne event captures entire aggregate stateSplit into granular business facts
No Schema VersioningCan't evolve event schemaAdd version field, support upcasting
Missing SnapshotReplay 10,000 events to load aggregateSnapshot every N events
Event as APIExternal systems consume raw eventsUse published language / integration events

反模式症状修复方案
巨型事件单个事件包含整个聚合根状态拆分为细粒度的业务事实事件
无事件Schema版本控制无法演进事件Schema添加版本字段,支持向上转换
缺少快照加载聚合根需要重放10000个事件每N个事件生成一次快照
将事件作为API外部系统直接消费原始事件使用发布语言/集成事件

3️⃣ Outbox Pattern

3️⃣ Outbox模式

Definition: Ensure atomic consistency between database state and message publication by writing messages to an "outbox" table in the same transaction.
定义: 通过在同一事务中将消息写入"outbox"表,确保数据库状态与消息发布的原子一致性。

The Problem

问题场景

Without Outbox:
  1. Save order to DB ✅
  2. Publish OrderPlaced event ❌ (network failure)
  → DB has order, but no event. Inconsistent.

With Outbox:
  1. Save order to DB ✅
  2. Save event to outbox table (same transaction) ✅
  3. Background process reads outbox and publishes ✅
  → Atomic. Either both succeed or both fail.
不使用Outbox:
  1. 将订单保存到数据库 ✅
  2. 发布OrderPlaced事件 ❌(网络故障)
  → 数据库中有订单,但无事件,数据不一致。

使用Outbox:
  1. 将订单保存到数据库 ✅
  2. 将事件保存到outbox表(同一事务) ✅
  3. 后台进程读取outbox并发布事件 ✅
  → 原子性操作,要么全部成功,要么全部失败。

Implementation (Go + PostgreSQL)

实现(Go + PostgreSQL)

go
// Outbox table schema:
// CREATE TABLE outbox (
//     id SERIAL PRIMARY KEY,
//     aggregate_type VARCHAR(255) NOT NULL,
//     aggregate_id VARCHAR(255) NOT NULL,
//     event_type VARCHAR(255) NOT NULL,
//     payload JSONB NOT NULL,
//     created_at TIMESTAMP NOT NULL DEFAULT NOW(),
//     published_at TIMESTAMP,
//     UNIQUE(aggregate_type, aggregate_id, event_type, payload)
// );

// Repository saves aggregate + outbox in one transaction
func (r *OrderRepository) Save(ctx context.Context, order *Order) error {
    tx, err := r.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 1. Save aggregate state
    if err := r.saveOrder(tx, order); err != nil {
        return err
    }

    // 2. Save events to outbox (same transaction!)
    events := order.PullEvents()
    for _, event := range events {
        payload, _ := json.Marshal(event)
        _, err := tx.ExecContext(ctx, `
            INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
            VALUES ($1, $2, $3, $4)
            ON CONFLICT DO NOTHING
        `, "order", order.ID.String(), event.EventName(), payload)
        if err != nil {
            return err
        }
    }

    return tx.Commit()
}

// Background relay process
type OutboxRelay struct {
    db       *sql.DB
    eventBus EventBus
    pollInterval time.Duration
}

func (r *OutboxRelay) Start(ctx context.Context) {
    ticker := time.NewTicker(r.pollInterval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            r.processOutbox(ctx)
        }
    }
}

func (r *OutboxRelay) processOutbox(ctx context.Context) error {
    rows, err := r.db.QueryContext(ctx, `
        SELECT id, aggregate_type, aggregate_id, event_type, payload
        FROM outbox
        WHERE published_at IS NULL
        ORDER BY id
        LIMIT 100
        FOR UPDATE SKIP LOCKED
    `)
    if err != nil {
        return err
    }
    defer rows.Close()

    for rows.Next() {
        var id int
        var aggType, aggID, eventType string
        var payload []byte
        rows.Scan(&id, &aggType, &aggID, &eventType, &payload)

        // Publish to event bus
        event := deserialize(eventType, payload)
        if err := r.eventBus.Publish(ctx, event); err != nil {
            log.Printf("failed to publish outbox event %d: %v", id, err)
            continue // will retry on next poll
        }

        // Mark as published
        _, err = r.db.ExecContext(ctx, `
            UPDATE outbox SET published_at = NOW() WHERE id = $1
        `, id)
        if err != nil {
            log.Printf("failed to mark outbox event %d as published: %v", id, err)
        }
    }
    return rows.Err()
}

go
// Outbox table schema:
// CREATE TABLE outbox (
//     id SERIAL PRIMARY KEY,
//     aggregate_type VARCHAR(255) NOT NULL,
//     aggregate_id VARCHAR(255) NOT NULL,
//     event_type VARCHAR(255) NOT NULL,
//     payload JSONB NOT NULL,
//     created_at TIMESTAMP NOT NULL DEFAULT NOW(),
//     published_at TIMESTAMP,
//     UNIQUE(aggregate_type, aggregate_id, event_type, payload)
// );

// Repository saves aggregate + outbox in one transaction
func (r *OrderRepository) Save(ctx context.Context, order *Order) error {
    tx, err := r.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 1. Save aggregate state
    if err := r.saveOrder(tx, order); err != nil {
        return err
    }

    // 2. Save events to outbox (same transaction!)
    events := order.PullEvents()
    for _, event := range events {
        payload, _ := json.Marshal(event)
        _, err := tx.ExecContext(ctx, `
            INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
            VALUES ($1, $2, $3, $4)
            ON CONFLICT DO NOTHING
        `, "order", order.ID.String(), event.EventName(), payload)
        if err != nil {
            return err
        }
    }

    return tx.Commit()
}

// Background relay process
type OutboxRelay struct {
    db       *sql.DB
    eventBus EventBus
    pollInterval time.Duration
}

func (r *OutboxRelay) Start(ctx context.Context) {
    ticker := time.NewTicker(r.pollInterval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            r.processOutbox(ctx)
        }
    }
}

func (r *OutboxRelay) processOutbox(ctx context.Context) error {
    rows, err := r.db.QueryContext(ctx, `
        SELECT id, aggregate_type, aggregate_id, event_type, payload
        FROM outbox
        WHERE published_at IS NULL
        ORDER BY id
        LIMIT 100
        FOR UPDATE SKIP LOCKED
    `)
    if err != nil {
        return err
    }
    defer rows.Close()

    for rows.Next() {
        var id int
        var aggType, aggID, eventType string
        var payload []byte
        rows.Scan(&id, &aggType, &aggID, &eventType, &payload)

        // Publish to event bus
        event := deserialize(eventType, payload)
        if err := r.eventBus.Publish(ctx, event); err != nil {
            log.Printf("failed to publish outbox event %d: %v", id, err)
            continue // will retry on next poll
        }

        // Mark as published
        _, err = r.db.ExecContext(ctx, `
            UPDATE outbox SET published_at = NOW() WHERE id = $1
        `, id)
        if err != nil {
            log.Printf("failed to mark outbox event %d as published: %v", id, err)
        }
    }
    return rows.Err()
}

4️⃣ Saga Pattern

4️⃣ Saga模式

Definition: A sequence of local transactions where each transaction updates data and publishes an event/message to trigger the next transaction. If a step fails, compensating transactions undo previous steps.
定义: 一系列本地事务的序列,每个事务更新数据并发布事件/消息以触发下一个事务。如果某一步失败,补偿事务会撤销之前的步骤。

Types

类型

TypeCoordinationWhen to Use
OrchestrationCentral coordinator sagaComplex flows, need visibility
ChoreographyEvent-driven, no central coordinatorSimple flows, loose coupling
类型协调方式适用场景
编排型中央协调器管理Saga复杂流程,需要可见性
** choreography( choreography型)**事件驱动,无中央协调器简单流程,松耦合场景

Orchestration Saga Example: Order Processing

编排型Saga示例:订单处理

┌─────────────┐
│  Saga       │
│ Coordinator │
└──────┬──────┘
       │ 1. ReserveInventory
┌─────────────┐     2. InventoryReserved
│  Inventory  │────────────────▶
│  Service    │
└─────────────┘
       │ 3. ProcessPayment
┌─────────────┐     4. PaymentConfirmed
│  Payment    │────────────────▶
│  Service    │
└─────────────┘
       │ 5. CreateShipment
┌─────────────┐     6. ShipmentCreated
│  Shipping   │────────────────▶
│  Service    │
└─────────────┘
       │ (if any step fails)
┌─────────────┐
│ Compensate: │
│ - ReleaseInventory
│ - RefundPayment
│ - CancelShipment
└─────────────┘
┌─────────────┐
│  Saga       │
│ Coordinator │
└──────┬──────┘
       │ 1. ReserveInventory
┌─────────────┐     2. InventoryReserved
│  Inventory  │────────────────▶
│  Service    │
└─────────────┘
       │ 3. ProcessPayment
┌─────────────┐     4. PaymentConfirmed
│  Payment    │────────────────▶
│  Service    │
└─────────────┘
       │ 5. CreateShipment
┌─────────────┐     6. ShipmentCreated
│  Shipping   │────────────────▶
│  Service    │
└─────────────┘
       │ (if any step fails)
┌─────────────┐
│ Compensate: │
│ - ReleaseInventory
│ - RefundPayment
│ - CancelShipment
└─────────────┘

Go Implementation (Orchestration)

Go实现(编排型)

go
type OrderSaga struct {
    inventorySvc InventoryClient
    paymentSvc   PaymentClient
    shippingSvc  ShippingClient
    sagaRepo     SagaRepository
}

func (s *OrderSaga) Start(ctx context.Context, order Order) error {
    saga := SagaState{
        ID:      uuid.New(),
        OrderID: order.ID,
        Status:  SagaStatusStarted,
        Steps: []SagaStep{
            {Name: "reserve_inventory", Status: StepStatusPending},
            {Name: "process_payment", Status: StepStatusPending},
            {Name: "create_shipment", Status: StepStatusPending},
        },
    }

    if err := s.sagaRepo.Save(ctx, saga); err != nil {
        return err
    }

    // Step 1: Reserve Inventory
    reservation, err := s.inventorySvc.Reserve(ctx, order.Items)
    if err != nil {
        return s.compensate(ctx, saga) // nothing to undo yet
    }
    saga.Steps[0].Status = StepStatusCompleted
    saga.Steps[0].Result = reservation.ID
    s.sagaRepo.Save(ctx, saga)

    // Step 2: Process Payment
    payment, err := s.paymentSvc.Charge(ctx, order.Total, order.CustomerID)
    if err != nil {
        return s.compensate(ctx, saga) // undo inventory reservation
    }
    saga.Steps[1].Status = StepStatusCompleted
    saga.Steps[1].Result = payment.ID
    s.sagaRepo.Save(ctx, saga)

    // Step 3: Create Shipment
    shipment, err := s.shippingSvc.Create(ctx, order.ID, order.ShippingAddress)
    if err != nil {
        return s.compensate(ctx, saga) // undo payment + inventory
    }
    saga.Steps[2].Status = StepStatusCompleted
    saga.Steps[2].Result = shipment.ID
    saga.Status = SagaStatusCompleted
    s.sagaRepo.Save(ctx, saga)

    return nil
}

func (s *OrderSaga) compensate(ctx context.Context, saga SagaState) error {
    saga.Status = SagaStatusCompensating
    s.sagaRepo.Save(ctx, saga)

    // Compensate in reverse order
    for i := len(saga.Steps) - 1; i >= 0; i-- {
        step := saga.Steps[i]
        if step.Status != StepStatusCompleted {
            continue
        }

        switch step.Name {
        case "create_shipment":
            s.shippingSvc.Cancel(ctx, step.Result)
        case "process_payment":
            s.paymentSvc.Refund(ctx, step.Result)
        case "reserve_inventory":
            s.inventorySvc.Release(ctx, step.Result)
        }

        step.Status = StepStatusCompensated
        s.sagaRepo.Save(ctx, saga)
    }

    saga.Status = SagaStatusCompensated
    return s.sagaRepo.Save(ctx, saga)
}

go
type OrderSaga struct {
    inventorySvc InventoryClient
    paymentSvc   PaymentClient
    shippingSvc  ShippingClient
    sagaRepo     SagaRepository
}

func (s *OrderSaga) Start(ctx context.Context, order Order) error {
    saga := SagaState{
        ID:      uuid.New(),
        OrderID: order.ID,
        Status:  SagaStatusStarted,
        Steps: []SagaStep{
            {Name: "reserve_inventory", Status: StepStatusPending},
            {Name: "process_payment", Status: StepStatusPending},
            {Name: "create_shipment", Status: StepStatusPending},
        },
    }

    if err := s.sagaRepo.Save(ctx, saga); err != nil {
        return err
    }

    // Step 1: Reserve Inventory
    reservation, err := s.inventorySvc.Reserve(ctx, order.Items)
    if err != nil {
        return s.compensate(ctx, saga) // nothing to undo yet
    }
    saga.Steps[0].Status = StepStatusCompleted
    saga.Steps[0].Result = reservation.ID
    s.sagaRepo.Save(ctx, saga)

    // Step 2: Process Payment
    payment, err := s.paymentSvc.Charge(ctx, order.Total, order.CustomerID)
    if err != nil {
        return s.compensate(ctx, saga) // undo inventory reservation
    }
    saga.Steps[1].Status = StepStatusCompleted
    saga.Steps[1].Result = payment.ID
    s.sagaRepo.Save(ctx, saga)

    // Step 3: Create Shipment
    shipment, err := s.shippingSvc.Create(ctx, order.ID, order.ShippingAddress)
    if err != nil {
        return s.compensate(ctx, saga) // undo payment + inventory
    }
    saga.Steps[2].Status = StepStatusCompleted
    saga.Steps[2].Result = shipment.ID
    saga.Status = SagaStatusCompleted
    s.sagaRepo.Save(ctx, saga)

    return nil
}

func (s *OrderSaga) compensate(ctx context.Context, saga SagaState) error {
    saga.Status = SagaStatusCompensating
    s.sagaRepo.Save(ctx, saga)

    // Compensate in reverse order
    for i := len(saga.Steps) - 1; i >= 0; i-- {
        step := saga.Steps[i]
        if step.Status != StepStatusCompleted {
            continue
        }

        switch step.Name {
        case "create_shipment":
            s.shippingSvc.Cancel(ctx, step.Result)
        case "process_payment":
            s.paymentSvc.Refund(ctx, step.Result)
        case "reserve_inventory":
            s.inventorySvc.Release(ctx, step.Result)
        }

        step.Status = StepStatusCompensated
        s.sagaRepo.Save(ctx, saga)
    }

    saga.Status = SagaStatusCompensated
    return s.sagaRepo.Save(ctx, saga)
}

5️⃣ Anti-Corruption Layer (ACL)

5️⃣ Anti-Corruption Layer(ACL,防腐层)

Definition: A translation layer that isolates your domain model from external systems' incompatible models.
定义: 一个转换层,将你的领域模型与外部系统的不兼容模型隔离开。

Why ACL?

为什么使用ACL?

Without ACL:
  Your Domain ──direct uses──▶ External Model (leaks into your domain)

With ACL:
  Your Domain ──uses──▶ ACL ──translates──▶ External Model
不使用ACL:
  你的领域模型 ──直接调用──▶ 外部模型(外部模型渗透到你的领域)

使用ACL:
  你的领域模型 ──调用──▶ ACL ──转换──▶ 外部模型

Implementation (Go)

实现(Go)

go
// Your domain model (clean, no external dependencies)
package domain

type Order struct {
    ID     uuid.UUID
    Total  Money
    Status OrderStatus
}

// External API model (messy, you don't control it)
package external

type StripeCharge struct {
    ID            string  `json:"id"`
    Amount        int     `json:"amount"` // in cents!
    Currency      string  `json:"currency"`
    Status        string  `json:"status"`
    ReceiptURL    string  `json:"receipt_url"`
    FailureCode   string  `json:"failure_code"`
    FailureMessage string `json:"failure_message"`
}

// ACL: translates between your domain and external model
package acl

type StripePaymentAdapter struct {
    client *stripe.Client
}

func (a *StripePaymentAdapter) Charge(
    ctx context.Context,
    amount domain.Money,
    customerID uuid.UUID,
) (domain.PaymentResult, error) {
    // Translate domain → external
    stripeAmount := amount.Amount.Mul(decimal.NewFromInt(100)).IntPart() // cents

    charge, err := a.client.CreateCharge(ctx, external.StripeChargeRequest{
        Amount:   stripeAmount,
        Currency: strings.ToLower(amount.Currency),
        Customer: customerID.String(),
    })
    if err != nil {
        return domain.PaymentResult{}, fmt.Errorf("stripe charge failed: %w", err)
    }

    // Translate external → domain
    return a.toDomainResult(charge), nil
}

func (a *StripePaymentAdapter) toDomainResult(
    charge external.StripeCharge,
) domain.PaymentResult {
    status := domain.PaymentStatusPending
    switch charge.Status {
    case "succeeded":
        status = domain.PaymentStatusSucceeded
    case "failed":
        status = domain.PaymentStatusFailed
    }

    amount := decimal.NewFromInt(int64(charge.Amount)).Div(decimal.NewFromInt(100))

    return domain.PaymentResult{
        TransactionID: charge.ID,
        Amount:        domain.MustNewMoney(amount, strings.ToUpper(charge.Currency)),
        Status:        status,
        ReceiptURL:    charge.ReceiptURL,
        FailureReason: charge.FailureMessage,
    }
}

go
// Your domain model (clean, no external dependencies)
package domain

type Order struct {
    ID     uuid.UUID
    Total  Money
    Status OrderStatus
}

// External API model (messy, you don't control it)
package external

type StripeCharge struct {
    ID            string  `json:"id"`
    Amount        int     `json:"amount"` // in cents!
    Currency      string  `json:"currency"`
    Status        string  `json:"status"`
    ReceiptURL    string  `json:"receipt_url"`
    FailureCode   string  `json:"failure_code"`
    FailureMessage string `json:"failure_message"`
}

// ACL: translates between your domain and external model
package acl

type StripePaymentAdapter struct {
    client *stripe.Client
}

func (a *StripePaymentAdapter) Charge(
    ctx context.Context,
    amount domain.Money,
    customerID uuid.UUID,
) (domain.PaymentResult, error) {
    // Translate domain → external
    stripeAmount := amount.Amount.Mul(decimal.NewFromInt(100)).IntPart() // cents

    charge, err := a.client.CreateCharge(ctx, external.StripeChargeRequest{
        Amount:   stripeAmount,
        Currency: strings.ToLower(amount.Currency),
        Customer: customerID.String(),
    })
    if err != nil {
        return domain.PaymentResult{}, fmt.Errorf("stripe charge failed: %w", err)
    }

    // Translate external → domain
    return a.toDomainResult(charge), nil
}

func (a *StripePaymentAdapter) toDomainResult(
    charge external.StripeCharge,
) domain.PaymentResult {
    status := domain.PaymentStatusPending
    switch charge.Status {
    case "succeeded":
        status = domain.PaymentStatusSucceeded
    case "failed":
        status = domain.PaymentStatusFailed
    }

    amount := decimal.NewFromInt(int64(charge.Amount)).Div(decimal.NewFromInt(100))

    return domain.PaymentResult{
        TransactionID: charge.ID,
        Amount:        domain.MustNewMoney(amount, strings.ToUpper(charge.Currency)),
        Status:        status,
        ReceiptURL:    charge.ReceiptURL,
        FailureReason: charge.FailureMessage,
    }
}

📋 Pattern Selection Flowchart

📋 模式选择流程图

Do you need to separate read/write models?
├── YES → Do you need full audit trail?
│         ├── YES → Event Sourcing + CQRS
│         └── NO  → CQRS only
├── NO → Do you need cross-service consistency?
│        ├── YES → Do you need to undo on failure?
│        │        ├── YES → Saga Pattern
│        │        └── NO  → Outbox Pattern
│        │
│        └── NO → Is there an external system with different model?
│                 ├── YES → Anti-Corruption Layer
│                 └── NO  → Standard DDD Tactical patterns

是否需要分离读/写模型?
├── 是 → 是否需要完整审计追踪?
│         ├── 是 → Event Sourcing + CQRS
│         └── 否  → 仅使用CQRS
├── 否 → 是否需要跨服务一致性?
│        ├── 是 → 是否需要在失败时回滚?
│        │        ├── 是 → Saga模式
│        │        └── 否  → Outbox模式
│        │
│        └── 否 → 是否存在模型不兼容的外部系统?
│                 ├── 是 → Anti-Corruption Layer
│                 └── 否  → 标准DDD战术模式

📚 References

📚 参考资料