pekko-cqrs-es-implementation

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Pekko CQRS/ES 実装ガイド

Pekko CQRS/ES 实现指南

Apache Pekko + Scala 3でCQRS/Event Sourcingを実装する際の具体的なパターン集。
Apache Pekko + Scala 3实现CQRS/Event Sourcing时的具体模式集。

適用条件

适用条件

このスキルは以下のすべてを満たす場合にのみ使用する:
  1. プログラミング言語がScala(3.x推奨)であること
  2. CQRS/Event Sourcingアーキテクチャを採用していること
  3. Apache Pekko(またはAkka)をアクターフレームワークとして使用していること
上記を満たさない場合は、
cqrs-tradeoffs
cqrs-to-event-sourcing
cqrs-aggregate-modeling
など 言語非依存のスキルを使用すること。
本技能仅在满足以下所有条件时使用:
  1. 编程语言为Scala(推荐3.x版本)
  2. 采用CQRS/Event Sourcing架构
  3. 使用Apache Pekko(或Akka)作为Actor框架
不满足上述条件时,请使用
cqrs-tradeoffs
cqrs-to-event-sourcing
cqrs-aggregate-modeling
等语言无关的技能。

アーキテクチャ概要

架构概要

システム全体のデータフロー

系统整体数据流

クライアント
コマンドAPI(GraphQL Mutation)
ユースケース層(ZIO)
集約アクター(Pekko Typed)
ドメインモデル(純粋Scala)→ イベント生成
イベントストア(DynamoDB)
DynamoDB Streams
リードモデルアップデータ(AWS Lambda)
リードモデル(PostgreSQL)
クエリAPI(GraphQL Query)
客户端
命令API(GraphQL Mutation)
用例层(ZIO)
聚合Actor(Pekko Typed)
领域模型(纯Scala)→ 生成事件
事件存储(DynamoDB)
DynamoDB Streams
读模型更新器(AWS Lambda)
读模型(PostgreSQL)
查询API(GraphQL Query)

コマンド側とクエリ側の分離

命令端与查询端分离

コマンド側クエリ側
目的ビジネスルール実行効率的なデータ取得
データストアDynamoDB(イベントストア)PostgreSQL(リードモデル)
APIGraphQL MutationGraphQL Query
レイヤードメイン → ユースケース → インターフェースアダプタインターフェースアダプタのみ
整合性強い整合性(集約内)結果整合性
クエリ側にユースケース層はない。 GraphQL自体がユースケースに相当する。
命令端查询端
目的执行业务规则高效获取数据
数据存储DynamoDB(事件存储)PostgreSQL(读模型)
APIGraphQL MutationGraphQL Query
层级领域 → 用例 → 接口适配器仅接口适配器
一致性强一致性(聚合内)最终一致性
查询端没有用例层,GraphQL本身就相当于用例层。

モジュール構成

模块结构

modules/
├── command/
│   ├── domain/                          # ドメイン層(純粋Scala)
│   ├── use-case/                        # ユースケース層(ZIO)
│   ├── interface-adapter/               # アクター、GraphQL、シリアライザ
│   ├── interface-adapter-contract/      # コマンド/リプライのプロトコル定義
│   └── interface-adapter-event-serializer/  # Protocol Buffersシリアライザ
├── query/
│   ├── interface-adapter/               # DAO(Slick)、GraphQL
│   └── flyway-migration/               # DBマイグレーション
└── infrastructure/                      # 共有ユーティリティ

apps/
├── command-api/                         # コマンド側HTTPサーバー
├── query-api/                           # クエリ側HTTPサーバー
└── read-model-updater/                  # AWS Lambda
modules/
├── command/
│   ├── domain/                          # 领域层(纯Scala)
│   ├── use-case/                        # 用例层(ZIO)
│   ├── interface-adapter/               # Actor、GraphQL、序列化器
│   ├── interface-adapter-contract/      # 命令/响应协议定义
│   └── interface-adapter-event-serializer/  # Protocol Buffers序列化器
├── query/
│   ├── interface-adapter/               # DAO(Slick)、GraphQL
│   └── flyway-migration/               # 数据库迁移
└── infrastructure/                      # 公共工具

apps/
├── command-api/                         # 命令端HTTP服务
├── query-api/                           # 查询端HTTP服务
└── read-model-updater/                  # AWS Lambda

依存方向

依赖方向

domain ← use-case ← interface-adapter ← apps
  │                       ↑
  │               interface-adapter-contract
  │               interface-adapter-event-serializer
  └── Scalaのみに依存。フレームワーク非依存。
domain ← use-case ← interface-adapter ← apps
  │                       ↑
  │               interface-adapter-contract
  │               interface-adapter-event-serializer
  └── 仅依赖Scala,与框架无关

コマンド側実装パターン

命令端实现模式

1. ドメインモデル

1. 领域模型

原則: ドメインモデルは純粋なScalaコードで、Pekkoに一切依存しない。
原则:领域模型为纯Scala代码,完全不依赖Pekko。

集約(trait + private case class)

聚合(trait + private case class)

scala
trait UserAccount extends Entity {
  override type IdType = UserAccountId
  def id: UserAccountId
  def name: UserAccountName
  def emailAddress: EmailAddress
  def createdAt: DateTime
  def updatedAt: DateTime

  def rename(newName: UserAccountName): Either[RenameError, (UserAccount, UserAccountEvent)]
  def delete: Either[DeleteError, (UserAccount, UserAccountEvent)]
}

object UserAccount {
  // ファクトリ: 新しい状態とイベントのペアを返す
  def apply(
    id: UserAccountId,
    name: UserAccountName,
    emailAddress: EmailAddress,
    createdAt: DateTime = DateTime.now(),
    updatedAt: DateTime = DateTime.now()
  ): (UserAccount, UserAccountEvent) =
    (
      UserAccountImpl(id, false, name, emailAddress, createdAt, updatedAt),
      UserAccountEvent.Created_V1(
        id = DomainEventId.generate(),
        entityId = id,
        name = name,
        emailAddress = emailAddress,
        occurredAt = DateTime.now()
      ))

  // 実装はprivateで隠蔽
  private final case class UserAccountImpl(
    id: UserAccountId,
    deleted: Boolean,
    name: UserAccountName,
    emailAddress: EmailAddress,
    createdAt: DateTime,
    updatedAt: DateTime
  ) extends UserAccount {

    override def rename(
      newName: UserAccountName): Either[RenameError, (UserAccount, UserAccountEvent)] =
      if (name == newName) {
        Left(RenameError.FamilyNameSame)
      } else {
        val updated = this.copy(name = newName, updatedAt = DateTime.now())
        val event = UserAccountEvent.Renamed_V1(
          id = DomainEventId.generate(),
          entityId = id,
          oldName = name,
          newName = newName,
          occurredAt = DateTime.now()
        )
        Right((updated, event))
      }

    override def delete: Either[DeleteError, (UserAccount, UserAccountEvent)] =
      if (deleted) {
        Left(DeleteError.AlreadyDeleted)
      } else {
        val updated = copy(deleted = true, updatedAt = DateTime.now())
        val event = UserAccountEvent.Deleted_V1(
          id = DomainEventId.generate(),
          entityId = id,
          occurredAt = DateTime.now()
        )
        Right((updated, event))
      }
  }
}
重要なパターン:
  • 状態変更メソッドは
    Either[Error, (NewState, Event)]
    を返す
  • ファクトリも
    (State, Event)
    のペアを返す
  • 実装クラスは
    private
    で外部から直接構築できない
  • ドメインモデル内でPekkoのimportは一切ない
scala
trait UserAccount extends Entity {
  override type IdType = UserAccountId
  def id: UserAccountId
  def name: UserAccountName
  def emailAddress: EmailAddress
  def createdAt: DateTime
  def updatedAt: DateTime

  def rename(newName: UserAccountName): Either[RenameError, (UserAccount, UserAccountEvent)]
  def delete: Either[DeleteError, (UserAccount, UserAccountEvent)]
}

object UserAccount {
  // 工厂方法:返回新状态与事件的对
  def apply(
    id: UserAccountId,
    name: UserAccountName,
    emailAddress: EmailAddress,
    createdAt: DateTime = DateTime.now(),
    updatedAt: DateTime = DateTime.now()
  ): (UserAccount, UserAccountEvent) =
    (
      UserAccountImpl(id, false, name, emailAddress, createdAt, updatedAt),
      UserAccountEvent.Created_V1(
        id = DomainEventId.generate(),
        entityId = id,
        name = name,
        emailAddress = emailAddress,
        occurredAt = DateTime.now()
      ))

  // 实现类用private隐藏,禁止外部直接构造
  private final case class UserAccountImpl(
    id: UserAccountId,
    deleted: Boolean,
    name: UserAccountName,
    emailAddress: EmailAddress,
    createdAt: DateTime,
    updatedAt: DateTime
  ) extends UserAccount {

    override def rename(
      newName: UserAccountName): Either[RenameError, (UserAccount, UserAccountEvent)] =
      if (name == newName) {
        Left(RenameError.FamilyNameSame)
      } else {
        val updated = this.copy(name = newName, updatedAt = DateTime.now())
        val event = UserAccountEvent.Renamed_V1(
          id = DomainEventId.generate(),
          entityId = id,
          oldName = name,
          newName = newName,
          occurredAt = DateTime.now()
        )
        Right((updated, event))
      }

    override def delete: Either[DeleteError, (UserAccount, UserAccountEvent)] =
      if (deleted) {
        Left(DeleteError.AlreadyDeleted)
      } else {
        val updated = copy(deleted = true, updatedAt = DateTime.now())
        val event = UserAccountEvent.Deleted_V1(
          id = DomainEventId.generate(),
          entityId = id,
          occurredAt = DateTime.now()
        )
        Right((updated, event))
      }
  }
}
核心模式:
  • 状态变更方法返回
    Either[Error, (NewState, Event)]
  • 工厂方法也返回
    (State, Event)
  • 实现类为
    private
    ,禁止外部直接构造
  • 领域模型内完全不引入Pekko相关依赖

イベント定義(enum + バージョニング)

事件定义(enum + 版本控制)

scala
enum UserAccountEvent extends DomainEvent {
  override type EntityIdType = UserAccountId

  case Created_V1(
    id: DomainEventId,
    entityId: UserAccountId,
    name: UserAccountName,
    emailAddress: EmailAddress,
    occurredAt: DateTime
  )

  case Renamed_V1(
    id: DomainEventId,
    entityId: UserAccountId,
    oldName: UserAccountName,
    newName: UserAccountName,
    occurredAt: DateTime
  )

  case Deleted_V1(
    id: DomainEventId,
    entityId: UserAccountId,
    occurredAt: DateTime
  )
}
イベント設計ルール:
ルール説明
過去形で命名「何が起きたか」を表す
Created
,
Renamed
,
Deleted
_V1
サフィックス
スキーマ進化に対応
Renamed_V1
,
Renamed_V2
不変case classで自動的に保証
自己完結変更前後の値を含む
oldName
,
newName
必須フィールド
id
,
entityId
,
occurredAt
すべてのイベントに共通
scala
enum UserAccountEvent extends DomainEvent {
  override type EntityIdType = UserAccountId

  case Created_V1(
    id: DomainEventId,
    entityId: UserAccountId,
    name: UserAccountName,
    emailAddress: EmailAddress,
    occurredAt: DateTime
  )

  case Renamed_V1(
    id: DomainEventId,
    entityId: UserAccountId,
    oldName: UserAccountName,
    newName: UserAccountName,
    occurredAt: DateTime
  )

  case Deleted_V1(
    id: DomainEventId,
    entityId: UserAccountId,
    occurredAt: DateTime
  )
}
事件设计规则:
规则说明示例
过去式命名表达「已发生的事件」
Created
,
Renamed
,
Deleted
_V1
后缀
适配Schema演进
Renamed_V1
,
Renamed_V2
不可变由case class自动保证
自包含包含变更前后的所有值
oldName
,
newName
必填字段
id
,
entityId
,
occurredAt
所有事件通用

2. 集約の状態(enum)

2. 聚合状态(enum)

scala
enum UserAccountAggregateState {
  case NotCreated(id: UserAccountId)
  case Created(user: UserAccount)
  case Deleted(user: UserAccount)

  def applyEvent(event: UserAccountEvent): UserAccountAggregateState = (this, event) match {
    case (NotCreated(id), UserAccountEvent.Created_V1(_, entityId, name, emailAddress, _))
        if id == entityId =>
      Created(UserAccount(entityId, name, emailAddress)._1)

    case (Created(user), UserAccountEvent.Renamed_V1(_, entityId, _, newName, _))
        if user.id == entityId =>
      Created(user.rename(newName) match {
        case Right((u, _)) => u
        case Left(error) =>
          throw new IllegalStateException(s"Failed to rename user: $error")
      })

    case (Created(user), UserAccountEvent.Deleted_V1(_, entityId, _))
        if user.id == entityId =>
      Deleted(user.delete match {
        case Right((deletedUser, _)) => deletedUser
        case Left(error) =>
          throw new IllegalStateException(s"Failed to delete user: $error")
      })

    case _ =>
      throw new IllegalStateException(s"Cannot apply event $event to state $this")
  }
}
状態遷移の型安全性:
  • NotCreated
    Created
    (Created_V1イベントのみ)
  • Created
    Created
    (Renamed_V1)/
    Deleted
    (Deleted_V1)
  • Deleted
    → 遷移なし(どのイベントも受け付けない)
scala
enum UserAccountAggregateState {
  case NotCreated(id: UserAccountId)
  case Created(user: UserAccount)
  case Deleted(user: UserAccount)

  def applyEvent(event: UserAccountEvent): UserAccountAggregateState = (this, event) match {
    case (NotCreated(id), UserAccountEvent.Created_V1(_, entityId, name, emailAddress, _))
        if id == entityId =>
      Created(UserAccount(entityId, name, emailAddress)._1)

    case (Created(user), UserAccountEvent.Renamed_V1(_, entityId, _, newName, _))
        if user.id == entityId =>
      Created(user.rename(newName) match {
        case Right((u, _)) => u
        case Left(error) =>
          throw new IllegalStateException(s"Failed to rename user: $error")
      })

    case (Created(user), UserAccountEvent.Deleted_V1(_, entityId, _))
        if user.id == entityId =>
      Deleted(user.delete match {
        case Right((deletedUser, _)) => deletedUser
        case Left(error) =>
          throw new IllegalStateException(s"Failed to delete user: $error")
      })

    case _ =>
      throw new IllegalStateException(s"Cannot apply event $event to state $this")
  }
}
状态迁移的类型安全:
  • NotCreated
    Created
    (仅接受Created_V1事件)
  • Created
    Created
    (Renamed_V1)/
    Deleted
    (Deleted_V1)
  • Deleted
    → 无迁移(不接受任何事件)

3. プロトコル定義(コマンド/リプライ)

3. 协议定义(命令/响应)

scala
object UserAccountProtocol {
  // コマンド: すべてidを持つ
  sealed trait Command { def id: UserAccountId }
  final case class Create(
    id: UserAccountId, name: UserAccountName,
    emailAddress: EmailAddress, replyTo: ActorRef[CreateReply]) extends Command
  final case class Rename(
    id: UserAccountId, newName: UserAccountName,
    replyTo: ActorRef[RenameReply]) extends Command
  final case class Delete(
    id: UserAccountId, replyTo: ActorRef[DeleteReply]) extends Command
  final case class Get(
    id: UserAccountId, replyTo: ActorRef[GetReply]) extends Command

  // リプライ: コマンドごとに専用の型
  sealed trait CreateReply
  final case class CreateSucceeded(id: UserAccountId) extends CreateReply

  sealed trait RenameReply
  final case class RenameSucceeded(id: UserAccountId) extends RenameReply
  final case class RenameFailed(id: UserAccountId, reason: RenameError) extends RenameReply

  sealed trait DeleteReply
  final case class DeleteSucceeded(id: UserAccountId) extends DeleteReply
  final case class DeleteFailed(id: UserAccountId, reason: DeleteError) extends DeleteReply

  sealed trait GetReply
  final case class GetSucceeded(value: UserAccount) extends GetReply
  final case class GetNotFoundFailed(id: UserAccountId) extends GetReply
}
プロトコル設計ルール:
  • コマンドはすべて
    sealed trait Command
    を継承し、
    id
    を持つ
  • リプライはコマンドごとに専用の
    sealed trait
    を定義する(
    CreateReply
    ,
    RenameReply
    等)
  • replyTo: ActorRef[XxxReply]
    で型安全な応答を保証
  • 成功/失敗をcase classで表現し、パターンマッチで網羅性チェック
scala
object UserAccountProtocol {
  // 命令:所有命令都携带id
  sealed trait Command { def id: UserAccountId }
  final case class Create(
    id: UserAccountId, name: UserAccountName,
    emailAddress: EmailAddress, replyTo: ActorRef[CreateReply]) extends Command
  final case class Rename(
    id: UserAccountId, newName: UserAccountName,
    replyTo: ActorRef[RenameReply]) extends Command
  final case class Delete(
    id: UserAccountId, replyTo: ActorRef[DeleteReply]) extends Command
  final case class Get(
    id: UserAccountId, replyTo: ActorRef[GetReply]) extends Command

  // 响应:每个命令对应专用的响应类型
  sealed trait CreateReply
  final case class CreateSucceeded(id: UserAccountId) extends CreateReply

  sealed trait RenameReply
  final case class RenameSucceeded(id: UserAccountId) extends RenameReply
  final case class RenameFailed(id: UserAccountId, reason: RenameError) extends RenameReply

  sealed trait DeleteReply
  final case class DeleteSucceeded(id: UserAccountId) extends DeleteReply
  final case class DeleteFailed(id: UserAccountId, reason: DeleteError) extends DeleteReply

  sealed trait GetReply
  final case class GetSucceeded(value: UserAccount) extends GetReply
  final case class GetNotFoundFailed(id: UserAccountId) extends GetReply
}
协议设计规则:
  • 所有命令继承
    sealed trait Command
    ,且携带
    id
    字段
  • 每个命令对应独立的
    sealed trait
    响应类型(如
    CreateReply
    RenameReply
  • 通过
    replyTo: ActorRef[XxxReply]
    保证类型安全的响应
  • 用case class表达成功/失败状态,通过模式匹配做完整性检查

4. 集約アクター(PersistenceEffector)

4. 聚合Actor(PersistenceEffector)

scala
object UserAccountAggregate {

  // 状態ごとにハンドラ関数を分離
  private def handleNotCreated(
    state: UserAccountAggregateState.NotCreated,
    effector: PersistenceEffector[UserAccountAggregateState, UserAccountEvent, Command]
  ): Behavior[Command] = Behaviors.receiveMessagePartial {
    case Create(id, name, emailAddress, replyTo) if state.id == id =>
      val (newState, event) = UserAccount(id, name, emailAddress)
      effector.persistEvent(event) { _ =>
        replyTo ! CreateSucceeded(id)
        handleCreated(UserAccountAggregateState.Created(newState), effector)
      }
    case Get(id, replyTo) if state.id == id =>
      replyTo ! GetNotFoundFailed(id)
      Behaviors.same
  }

  private def handleCreated(
    state: UserAccountAggregateState.Created,
    effector: PersistenceEffector[UserAccountAggregateState, UserAccountEvent, Command]
  ): Behavior[Command] = Behaviors.receiveMessagePartial {
    case Rename(id, newName, replyTo) if state.user.id == id =>
      // ドメインモデルに委譲
      state.user.rename(newName) match {
        case Left(reason) =>
          replyTo ! RenameFailed(id, reason)
          Behaviors.same
        case Right((newUser, event)) =>
          effector.persistEvent(event) { _ =>
            replyTo ! RenameSucceeded(id)
            handleCreated(state.copy(user = newUser), effector)
          }
      }
    // ... Delete, Get も同様
  }

  // エントリポイント
  def apply(id: UserAccountId): Behavior[Command] = {
    val config = PersistenceEffectorConfig
      .create[UserAccountAggregateState, UserAccountEvent, Command](
        persistenceId = s"${id.entityTypeName}-${id.asString}",
        initialState = UserAccountAggregateState.NotCreated(id),
        applyEvent = (state, event) => state.applyEvent(event)
      )
      .withPersistenceMode(PersistenceMode.Persisted)
      .withSnapshotCriteria(SnapshotCriteria.every(1000))
      .withRetentionCriteria(RetentionCriteria.snapshotEvery(2))

    Behaviors.setup[Command] { implicit ctx =>
      Behaviors
        .supervise(
          PersistenceEffector.fromConfig(config) {
            case (state: UserAccountAggregateState.NotCreated, effector) =>
              handleNotCreated(state, effector)
            case (state: UserAccountAggregateState.Created, effector) =>
              handleCreated(state, effector)
            case (state: UserAccountAggregateState.Deleted, effector) =>
              handleDeleted(state, effector)
          })
        .onFailure[IllegalArgumentException](SupervisorStrategy.restart)
    }
  }
}
アクター実装ルール:
  • ドメインロジックをアクターに書かない。 アクターは永続化とライフサイクル管理に徹する
  • 状態ごとにハンドラ関数を分離し、受け付けるコマンドを限定する
  • effector.persistEvent(event) { _ => ... }
    でイベント永続化後にリプライ
  • ドメインモデルが
    Left
    を返したらリプライのみ(永続化しない)
  • ドメインモデルが
    Right
    を返したらイベントを永続化してからリプライ
scala
object UserAccountAggregate {

  // 按状态拆分处理函数
  private def handleNotCreated(
    state: UserAccountAggregateState.NotCreated,
    effector: PersistenceEffector[UserAccountAggregateState, UserAccountEvent, Command]
  ): Behavior[Command] = Behaviors.receiveMessagePartial {
    case Create(id, name, emailAddress, replyTo) if state.id == id =>
      val (newState, event) = UserAccount(id, name, emailAddress)
      effector.persistEvent(event) { _ =>
        replyTo ! CreateSucceeded(id)
        handleCreated(UserAccountAggregateState.Created(newState), effector)
      }
    case Get(id, replyTo) if state.id == id =>
      replyTo ! GetNotFoundFailed(id)
      Behaviors.same
  }

  private def handleCreated(
    state: UserAccountAggregateState.Created,
    effector: PersistenceEffector[UserAccountAggregateState, UserAccountEvent, Command]
  ): Behavior[Command] = Behaviors.receiveMessagePartial {
    case Rename(id, newName, replyTo) if state.user.id == id =>
      // 逻辑委托给领域模型
      state.user.rename(newName) match {
        case Left(reason) =>
          replyTo ! RenameFailed(id, reason)
          Behaviors.same
        case Right((newUser, event)) =>
          effector.persistEvent(event) { _ =>
            replyTo ! RenameSucceeded(id)
            handleCreated(state.copy(user = newUser), effector)
          }
      }
    // ... Delete、Get逻辑同理
  }

  // 入口方法
  def apply(id: UserAccountId): Behavior[Command] = {
    val config = PersistenceEffectorConfig
      .create[UserAccountAggregateState, UserAccountEvent, Command](
        persistenceId = s"${id.entityTypeName}-${id.asString}",
        initialState = UserAccountAggregateState.NotCreated(id),
        applyEvent = (state, event) => state.applyEvent(event)
      )
      .withPersistenceMode(PersistenceMode.Persisted)
      .withSnapshotCriteria(SnapshotCriteria.every(1000))
      .withRetentionCriteria(RetentionCriteria.snapshotEvery(2))

    Behaviors.setup[Command] { implicit ctx =>
      Behaviors
        .supervise(
          PersistenceEffector.fromConfig(config) {
            case (state: UserAccountAggregateState.NotCreated, effector) =>
              handleNotCreated(state, effector)
            case (state: UserAccountAggregateState.Created, effector) =>
              handleCreated(state, effector)
            case (state: UserAccountAggregateState.Deleted, effector) =>
              handleDeleted(state, effector)
          })
        .onFailure[IllegalArgumentException](SupervisorStrategy.restart)
    }
  }
}
Actor实现规则:
  • 不要在Actor中编写领域逻辑,Actor仅负责持久化和生命周期管理
  • 按状态拆分处理函数,限定可接收的命令范围
  • 通过
    effector.persistEvent(event) { _ => ... }
    实现事件持久化后再返回响应
  • 领域模型返回
    Left
    时仅返回错误响应,不做持久化
  • 领域模型返回
    Right
    时先持久化事件再返回成功响应

5. ユースケース層(ZIO)

5. 用例层(ZIO)

scala
private[users] final class UserAccountUseCaseImpl(
  userAccountAggregateRef: ActorRef[UserAccountProtocol.Command]
)(implicit
  timeout: Timeout,
  scheduler: Scheduler,
  ec: ExecutionContext
) extends UserAccountUseCase {

  override def createUserAccount(
    userAccountName: UserAccountName,
    emailAddress: EmailAddress
  ): IO[UserAccountUseCaseError, UserAccountId] =
    for {
      userAccountId <- ZIO.succeed(UserAccountId.generate())
      reply <- askActor[UserAccountProtocol.CreateReply] { replyTo =>
        UserAccountProtocol.Create(
          id = userAccountId,
          name = userAccountName,
          emailAddress = emailAddress,
          replyTo = replyTo
        )
      }.mapError(e => UserAccountUseCaseError.UnexpectedError(e.getMessage, Some(e)))
      result <- reply match {
        case UserAccountProtocol.CreateSucceeded(id) => ZIO.succeed(id)
      }
    } yield result

  private def askActor[R](
    createMessage: ActorRef[R] => UserAccountProtocol.Command
  ): Task[R] =
    PekkoInterop.fromFuture { userAccountAggregateRef.ask(createMessage) }
}
ユースケース層のルール:
  • ビジネスロジックを書く場所ではない。 処理ステップの調整役に徹する
  • ZIOのfor式でアクターとの通信を型安全に記述
  • askActor
    ヘルパーでPekko Ask → ZIO Task変換を共通化
  • エラーは
    UserAccountUseCaseError
    にマッピング
scala
private[users] final class UserAccountUseCaseImpl(
  userAccountAggregateRef: ActorRef[UserAccountProtocol.Command]
)(implicit
  timeout: Timeout,
  scheduler: Scheduler,
  ec: ExecutionContext
) extends UserAccountUseCase {

  override def createUserAccount(
    userAccountName: UserAccountName,
    emailAddress: EmailAddress
  ): IO[UserAccountUseCaseError, UserAccountId] =
    for {
      userAccountId <- ZIO.succeed(UserAccountId.generate())
      reply <- askActor[UserAccountProtocol.CreateReply] { replyTo =>
        UserAccountProtocol.Create(
          id = userAccountId,
          name = userAccountName,
          emailAddress = emailAddress,
          replyTo = replyTo
        )
      }.mapError(e => UserAccountUseCaseError.UnexpectedError(e.getMessage, Some(e)))
      result <- reply match {
        case UserAccountProtocol.CreateSucceeded(id) => ZIO.succeed(id)
      }
    } yield result

  private def askActor[R](
    createMessage: ActorRef[R] => UserAccountProtocol.Command
  ): Task[R] =
    PekkoInterop.fromFuture { userAccountAggregateRef.ask(createMessage) }
}
用例层规则:
  • 不要在此处编写业务逻辑,仅承担处理流程的协调角色
  • 通过ZIO的for表达式类型安全地编写与Actor的通信逻辑
  • 通过
    askActor
    工具方法统一实现Pekko Ask到ZIO Task的转换
  • 所有错误映射为
    UserAccountUseCaseError
    类型

シリアライズ

序列化

Protocol Buffersによるイベントシリアライズ

基于Protocol Buffers的事件序列化

protobuf
// event.proto
syntax = "proto3";

message UserAccountCreatedV1 {
  string id = 1;
  string entity_id = 2;
  string name = 3;
  string email_address = 4;
  string occurred_at = 5;
}

message UserAccountRenamedV1 {
  string id = 1;
  string entity_id = 2;
  string old_name = 3;
  string new_name = 4;
  string occurred_at = 5;
}
シリアライズ方針:
  • イベントとスナップショットの両方をProtocol Buffersで定義
  • ScalaPBでScalaコードを自動生成
  • カスタムシリアライザでドメインイベント ↔ Protobufメッセージを変換
  • バージョニングはProtobufのフィールド番号で後方互換性を確保
protobuf
// event.proto
syntax = "proto3";

message UserAccountCreatedV1 {
  string id = 1;
  string entity_id = 2;
  string name = 3;
  string email_address = 4;
  string occurred_at = 5;
}

message UserAccountRenamedV1 {
  string id = 1;
  string entity_id = 2;
  string old_name = 3;
  string new_name = 4;
  string occurred_at = 5;
}
序列化策略:
  • 事件和快照都通过Protocol Buffers定义
  • 使用ScalaPB自动生成Scala代码
  • 通过自定义序列化器实现领域事件 ↔ Protobuf消息的转换
  • 利用Protobuf的字段编号保证版本向后兼容

スナップショット戦略

快照策略

設定理由
保存頻度1000イベントごと起動時間とストレージのバランス
保持数最新2つリカバリ安全性の確保
配置原因
保存频率每1000个事件平衡启动时间与存储成本
保留数量最新2个保证恢复安全性

テスト戦略

测试策略

ドメインモデルテスト(Pekko非依存)

领域模型测试(无Pekko依赖)

scala
test("ユーザー名を変更できる") {
  val (user, _) = UserAccount(id, name, email)
  val result = user.rename(newName)
  result match {
    case Right((updated, event)) =>
      updated.name shouldBe newName
      event shouldBe a[UserAccountEvent.Renamed_V1]
    case Left(error) => fail(s"Unexpected error: $error")
  }
}
scala
test("可修改用户名") {
  val (user, _) = UserAccount(id, name, email)
  val result = user.rename(newName)
  result match {
    case Right((updated, event)) =>
      updated.name shouldBe newName
      event shouldBe a[UserAccountEvent.Renamed_V1]
    case Left(error) => fail(s"Unexpected error: $error")
  }
}

アクターテスト(ActorTestKit)

Actor测试(ActorTestKit)

scala
// PersistenceEffectorのテスト
val probe = testKit.createTestProbe[CreateReply]()
aggregateRef ! Create(id, name, email, probe.ref)
probe.expectMessage(CreateSucceeded(id))
scala
// PersistenceEffector测试
val probe = testKit.createTestProbe[CreateReply]()
aggregateRef ! Create(id, name, email, probe.ref)
probe.expectMessage(CreateSucceeded(id))

テストの分離

测试分层

レベル対象ツールPekko依存
ドメインビジネスロジックScalaTestなし
アクターメッセージング、永続化ActorTestKitあり
シリアライザイベント/スナップショットの変換ScalaTestなし
統合エンドツーエンドLocalStackあり
层级测试对象工具是否依赖Pekko
领域层业务逻辑ScalaTest
Actor层消息处理、持久化逻辑ActorTestKit
序列化层事件/快照转换逻辑ScalaTest
集成测试端到端流程LocalStack

関連スキルとの使い分け

与相关技能的区分使用

スキルフォーカス使うタイミング
本スキルPekko + Scala 3での具体的実装CQRS/ESをScalaで実装するとき
cqrs-tradeoffsCQRS採用判断のトレードオフ分析CQRS導入の是非を検討するとき
cqrs-to-event-sourcingなぜESが必要になるかの論理的説明ESの必然性を理解したいとき
cqrs-aggregate-modelingCQRS導入時の集約境界再定義集約の粒度を見直すとき
aggregate-design集約設計ルール全般集約の新規設計やレビューのとき
domain-building-blocksVO/Entity/Aggregate等の設計ドメインモデル全体を設計するとき
技能侧重点使用场景
本技能Pekko + Scala 3的具体实现用Scala实现CQRS/ES时
cqrs-tradeoffsCQRS选型的利弊分析评估是否引入CQRS时
cqrs-to-event-sourcing事件溯源的必要性说明需要理解ES的适用场景时
cqrs-aggregate-modelingCQRS落地时的聚合边界重定义调整聚合粒度时
aggregate-design通用聚合设计规则新建或评审聚合设计时
domain-building-blocksVO/Entity/Aggregate等基础构件设计设计整体领域模型时

参考文献

参考文献

関連スキル(併読推奨)

相关技能(推荐搭配阅读)

このスキルを使用する際は、以下のスキルも併せて参照すること:
  • cqrs-tradeoffs
    : CQRS/ES採用判断のトレードオフ分析
  • cqrs-to-event-sourcing
    : CQRSからイベントソーシングへの必然性
  • aggregate-design
    : ドメインモデルとしての集約設計ルール
使用本技能时,建议同时参考以下技能:
  • cqrs-tradeoffs
    : CQRS/ES选型的利弊分析
  • cqrs-to-event-sourcing
    : 从CQRS到事件溯源的必要性
  • aggregate-design
    : 作为领域模型的聚合设计规则