Loading...
Loading...
Apache Pekko + Scala 3によるCQRS/Event Sourcing実装ガイド。 PersistenceEffectorを用いた集約アクター、ドメインモデルとアクターの分離、 状態遷移の型安全な表現、イベント設計、Protocol Buffersシリアライズ、 ZIOベースのユースケース層、リードモデルアップデータの実装パターンを提供する。 対象言語: Scala 3限定。CQRS/Event Sourcingアーキテクチャが前提の場合のみ使用。 トリガー条件: 「Scala」かつ「CQRS」または「Event Sourcing」または「Pekko」が リクエストに含まれる場合のみ起動。Scala以外の言語やCQRS/ES以外のアーキテクチャでは このスキルを使用してはならない。 トリガー:「PekkoでCQRS/ESを実装したい」「Scalaで集約アクターを書きたい」 「PersistenceEffectorの使い方」「Pekkoのイベントソーシング」 「ScalaでCQRSのコマンド側を実装」「Pekkoで状態遷移を管理」 といったPekko + Scala + CQRS/ES実装リクエストで起動。 非トリガー:「CQRSのトレードオフ」「イベントソーシングとは」「JavaでCQRS」 「GoでEvent Sourcing」「CQRSの概念を教えて」など、Scala/Pekko以外や概念的な質問では 起動してはならない。
npx skill4agent add j5ik2o/okite-ai pekko-cqrs-es-implementationcqrs-tradeoffscqrs-to-event-sourcingcqrs-aggregate-modelingクライアント
│
▼
コマンドAPI(GraphQL Mutation)
│
▼
ユースケース層(ZIO)
│
▼
集約アクター(Pekko Typed)
│
▼
ドメインモデル(純粋Scala)→ イベント生成
│
▼
イベントストア(DynamoDB)
│
▼
DynamoDB Streams
│
▼
リードモデルアップデータ(AWS Lambda)
│
▼
リードモデル(PostgreSQL)
│
▼
クエリAPI(GraphQL Query)| コマンド側 | クエリ側 | |
|---|---|---|
| 目的 | ビジネスルール実行 | 効率的なデータ取得 |
| データストア | DynamoDB(イベントストア) | PostgreSQL(リードモデル) |
| API | GraphQL Mutation | GraphQL Query |
| レイヤー | ドメイン → ユースケース → インターフェースアダプタ | インターフェースアダプタのみ |
| 整合性 | 強い整合性(集約内) | 結果整合性 |
modules/
├── command/
│ ├── domain/ # ドメイン層(純粋Scala)
│ ├── use-case/ # ユースケース層(ZIO)
│ ├── interface-adapter/ # アクター、GraphQL、シリアライザ
│ ├── interface-adapter-contract/ # コマンド/リプライのプロトコル定義
│ └── interface-adapter-event-serializer/ # Protocol Buffersシリアライザ
├── query/
│ ├── interface-adapter/ # DAO(Slick)、GraphQL
│ └── flyway-migration/ # DBマイグレーション
└── infrastructure/ # 共有ユーティリティ
apps/
├── command-api/ # コマンド側HTTPサーバー
├── query-api/ # クエリ側HTTPサーバー
└── read-model-updater/ # AWS Lambdadomain ← use-case ← interface-adapter ← apps
│ ↑
│ interface-adapter-contract
│ interface-adapter-event-serializer
└── Scalaのみに依存。フレームワーク非依存。trait UserAccount extends Entity {
override type IdType = UserAccountId
def id: UserAccountId
def name: UserAccountName
def emailAddress: EmailAddress
def createdAt: DateTime
def updatedAt: DateTime
def rename(newName: UserAccountName): Either[RenameError, (UserAccount, UserAccountEvent)]
def delete: Either[DeleteError, (UserAccount, UserAccountEvent)]
}
object UserAccount {
// ファクトリ: 新しい状態とイベントのペアを返す
def apply(
id: UserAccountId,
name: UserAccountName,
emailAddress: EmailAddress,
createdAt: DateTime = DateTime.now(),
updatedAt: DateTime = DateTime.now()
): (UserAccount, UserAccountEvent) =
(
UserAccountImpl(id, false, name, emailAddress, createdAt, updatedAt),
UserAccountEvent.Created_V1(
id = DomainEventId.generate(),
entityId = id,
name = name,
emailAddress = emailAddress,
occurredAt = DateTime.now()
))
// 実装はprivateで隠蔽
private final case class UserAccountImpl(
id: UserAccountId,
deleted: Boolean,
name: UserAccountName,
emailAddress: EmailAddress,
createdAt: DateTime,
updatedAt: DateTime
) extends UserAccount {
override def rename(
newName: UserAccountName): Either[RenameError, (UserAccount, UserAccountEvent)] =
if (name == newName) {
Left(RenameError.FamilyNameSame)
} else {
val updated = this.copy(name = newName, updatedAt = DateTime.now())
val event = UserAccountEvent.Renamed_V1(
id = DomainEventId.generate(),
entityId = id,
oldName = name,
newName = newName,
occurredAt = DateTime.now()
)
Right((updated, event))
}
override def delete: Either[DeleteError, (UserAccount, UserAccountEvent)] =
if (deleted) {
Left(DeleteError.AlreadyDeleted)
} else {
val updated = copy(deleted = true, updatedAt = DateTime.now())
val event = UserAccountEvent.Deleted_V1(
id = DomainEventId.generate(),
entityId = id,
occurredAt = DateTime.now()
)
Right((updated, event))
}
}
}Either[Error, (NewState, Event)](State, Event)privateenum UserAccountEvent extends DomainEvent {
override type EntityIdType = UserAccountId
case Created_V1(
id: DomainEventId,
entityId: UserAccountId,
name: UserAccountName,
emailAddress: EmailAddress,
occurredAt: DateTime
)
case Renamed_V1(
id: DomainEventId,
entityId: UserAccountId,
oldName: UserAccountName,
newName: UserAccountName,
occurredAt: DateTime
)
case Deleted_V1(
id: DomainEventId,
entityId: UserAccountId,
occurredAt: DateTime
)
}| ルール | 説明 | 例 |
|---|---|---|
| 過去形で命名 | 「何が起きたか」を表す | |
| スキーマ進化に対応 | |
| 不変 | case classで自動的に保証 | |
| 自己完結 | 変更前後の値を含む | |
| 必須フィールド | | すべてのイベントに共通 |
enum UserAccountAggregateState {
case NotCreated(id: UserAccountId)
case Created(user: UserAccount)
case Deleted(user: UserAccount)
def applyEvent(event: UserAccountEvent): UserAccountAggregateState = (this, event) match {
case (NotCreated(id), UserAccountEvent.Created_V1(_, entityId, name, emailAddress, _))
if id == entityId =>
Created(UserAccount(entityId, name, emailAddress)._1)
case (Created(user), UserAccountEvent.Renamed_V1(_, entityId, _, newName, _))
if user.id == entityId =>
Created(user.rename(newName) match {
case Right((u, _)) => u
case Left(error) =>
throw new IllegalStateException(s"Failed to rename user: $error")
})
case (Created(user), UserAccountEvent.Deleted_V1(_, entityId, _))
if user.id == entityId =>
Deleted(user.delete match {
case Right((deletedUser, _)) => deletedUser
case Left(error) =>
throw new IllegalStateException(s"Failed to delete user: $error")
})
case _ =>
throw new IllegalStateException(s"Cannot apply event $event to state $this")
}
}NotCreatedCreatedCreatedCreatedDeletedDeletedobject UserAccountProtocol {
// コマンド: すべてidを持つ
sealed trait Command { def id: UserAccountId }
final case class Create(
id: UserAccountId, name: UserAccountName,
emailAddress: EmailAddress, replyTo: ActorRef[CreateReply]) extends Command
final case class Rename(
id: UserAccountId, newName: UserAccountName,
replyTo: ActorRef[RenameReply]) extends Command
final case class Delete(
id: UserAccountId, replyTo: ActorRef[DeleteReply]) extends Command
final case class Get(
id: UserAccountId, replyTo: ActorRef[GetReply]) extends Command
// リプライ: コマンドごとに専用の型
sealed trait CreateReply
final case class CreateSucceeded(id: UserAccountId) extends CreateReply
sealed trait RenameReply
final case class RenameSucceeded(id: UserAccountId) extends RenameReply
final case class RenameFailed(id: UserAccountId, reason: RenameError) extends RenameReply
sealed trait DeleteReply
final case class DeleteSucceeded(id: UserAccountId) extends DeleteReply
final case class DeleteFailed(id: UserAccountId, reason: DeleteError) extends DeleteReply
sealed trait GetReply
final case class GetSucceeded(value: UserAccount) extends GetReply
final case class GetNotFoundFailed(id: UserAccountId) extends GetReply
}sealed trait Commandidsealed traitCreateReplyRenameReplyreplyTo: ActorRef[XxxReply]object UserAccountAggregate {
// 状態ごとにハンドラ関数を分離
private def handleNotCreated(
state: UserAccountAggregateState.NotCreated,
effector: PersistenceEffector[UserAccountAggregateState, UserAccountEvent, Command]
): Behavior[Command] = Behaviors.receiveMessagePartial {
case Create(id, name, emailAddress, replyTo) if state.id == id =>
val (newState, event) = UserAccount(id, name, emailAddress)
effector.persistEvent(event) { _ =>
replyTo ! CreateSucceeded(id)
handleCreated(UserAccountAggregateState.Created(newState), effector)
}
case Get(id, replyTo) if state.id == id =>
replyTo ! GetNotFoundFailed(id)
Behaviors.same
}
private def handleCreated(
state: UserAccountAggregateState.Created,
effector: PersistenceEffector[UserAccountAggregateState, UserAccountEvent, Command]
): Behavior[Command] = Behaviors.receiveMessagePartial {
case Rename(id, newName, replyTo) if state.user.id == id =>
// ドメインモデルに委譲
state.user.rename(newName) match {
case Left(reason) =>
replyTo ! RenameFailed(id, reason)
Behaviors.same
case Right((newUser, event)) =>
effector.persistEvent(event) { _ =>
replyTo ! RenameSucceeded(id)
handleCreated(state.copy(user = newUser), effector)
}
}
// ... Delete, Get も同様
}
// エントリポイント
def apply(id: UserAccountId): Behavior[Command] = {
val config = PersistenceEffectorConfig
.create[UserAccountAggregateState, UserAccountEvent, Command](
persistenceId = s"${id.entityTypeName}-${id.asString}",
initialState = UserAccountAggregateState.NotCreated(id),
applyEvent = (state, event) => state.applyEvent(event)
)
.withPersistenceMode(PersistenceMode.Persisted)
.withSnapshotCriteria(SnapshotCriteria.every(1000))
.withRetentionCriteria(RetentionCriteria.snapshotEvery(2))
Behaviors.setup[Command] { implicit ctx =>
Behaviors
.supervise(
PersistenceEffector.fromConfig(config) {
case (state: UserAccountAggregateState.NotCreated, effector) =>
handleNotCreated(state, effector)
case (state: UserAccountAggregateState.Created, effector) =>
handleCreated(state, effector)
case (state: UserAccountAggregateState.Deleted, effector) =>
handleDeleted(state, effector)
})
.onFailure[IllegalArgumentException](SupervisorStrategy.restart)
}
}
}effector.persistEvent(event) { _ => ... }LeftRightprivate[users] final class UserAccountUseCaseImpl(
userAccountAggregateRef: ActorRef[UserAccountProtocol.Command]
)(implicit
timeout: Timeout,
scheduler: Scheduler,
ec: ExecutionContext
) extends UserAccountUseCase {
override def createUserAccount(
userAccountName: UserAccountName,
emailAddress: EmailAddress
): IO[UserAccountUseCaseError, UserAccountId] =
for {
userAccountId <- ZIO.succeed(UserAccountId.generate())
reply <- askActor[UserAccountProtocol.CreateReply] { replyTo =>
UserAccountProtocol.Create(
id = userAccountId,
name = userAccountName,
emailAddress = emailAddress,
replyTo = replyTo
)
}.mapError(e => UserAccountUseCaseError.UnexpectedError(e.getMessage, Some(e)))
result <- reply match {
case UserAccountProtocol.CreateSucceeded(id) => ZIO.succeed(id)
}
} yield result
private def askActor[R](
createMessage: ActorRef[R] => UserAccountProtocol.Command
): Task[R] =
PekkoInterop.fromFuture { userAccountAggregateRef.ask(createMessage) }
}askActorUserAccountUseCaseError// event.proto
syntax = "proto3";
message UserAccountCreatedV1 {
string id = 1;
string entity_id = 2;
string name = 3;
string email_address = 4;
string occurred_at = 5;
}
message UserAccountRenamedV1 {
string id = 1;
string entity_id = 2;
string old_name = 3;
string new_name = 4;
string occurred_at = 5;
}| 設定 | 値 | 理由 |
|---|---|---|
| 保存頻度 | 1000イベントごと | 起動時間とストレージのバランス |
| 保持数 | 最新2つ | リカバリ安全性の確保 |
test("ユーザー名を変更できる") {
val (user, _) = UserAccount(id, name, email)
val result = user.rename(newName)
result match {
case Right((updated, event)) =>
updated.name shouldBe newName
event shouldBe a[UserAccountEvent.Renamed_V1]
case Left(error) => fail(s"Unexpected error: $error")
}
}// PersistenceEffectorのテスト
val probe = testKit.createTestProbe[CreateReply]()
aggregateRef ! Create(id, name, email, probe.ref)
probe.expectMessage(CreateSucceeded(id))| レベル | 対象 | ツール | Pekko依存 |
|---|---|---|---|
| ドメイン | ビジネスロジック | ScalaTest | なし |
| アクター | メッセージング、永続化 | ActorTestKit | あり |
| シリアライザ | イベント/スナップショットの変換 | ScalaTest | なし |
| 統合 | エンドツーエンド | LocalStack | あり |
| スキル | フォーカス | 使うタイミング |
|---|---|---|
| 本スキル | Pekko + Scala 3での具体的実装 | CQRS/ESをScalaで実装するとき |
| cqrs-tradeoffs | CQRS採用判断のトレードオフ分析 | CQRS導入の是非を検討するとき |
| cqrs-to-event-sourcing | なぜESが必要になるかの論理的説明 | ESの必然性を理解したいとき |
| cqrs-aggregate-modeling | CQRS導入時の集約境界再定義 | 集約の粒度を見直すとき |
| aggregate-design | 集約設計ルール全般 | 集約の新規設計やレビューのとき |
| domain-building-blocks | VO/Entity/Aggregate等の設計 | ドメインモデル全体を設計するとき |
cqrs-tradeoffscqrs-to-event-sourcingaggregate-design