effect-concurrency-testing

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Effect Concurrency Testing Skill

Effect并发测试技能

This skill provides patterns for testing Effect's concurrency primitives: fibers, latches, deferreds, PubSub, SubscriptionRef, and streams.
本技能提供了测试Effect并发原语的模式:fibers、latches、deferreds、PubSub、SubscriptionRef和streams。

Core Principles

核心原则

CRITICAL: Choose the correct coordination primitive based on what you need to synchronize.
NeedUse
Simple fiber yield
Effect.yieldNow
Forked PubSub subscriber ready
yieldNow
after fork,
yieldNow
after each publish
Wait for subscriber ready
Deferred.make()
+
Deferred.await
Wait for stream element
Effect.makeLatch()
+
Stream.tap(() => latch.open)
Time-dependent behavior
TestClock.adjust
Verify events published
PubSub.subscribe
+
PubSub.takeAll
Check fiber status
fiber.unsafePoll()
关键提示:根据需要同步的内容选择正确的协调原语。
需求使用工具
简单Fiber让出执行权
Effect.yieldNow
已分叉的PubSub订阅者就绪分叉后调用
yieldNow
,每次发布后调用
yieldNow
等待订阅者就绪
Deferred.make()
+
Deferred.await
等待流元素
Effect.makeLatch()
+
Stream.tap(() => latch.open)
时间相关行为
TestClock.adjust
验证已发布的事件
PubSub.subscribe
+
PubSub.takeAll
检查Fiber状态
fiber.unsafePoll()

Fiber Coordination Patterns

Fiber协调模式

Effect.yieldNow - Simple Fiber Scheduling

Effect.yieldNow - 简单Fiber调度

Use
Effect.yieldNow
when you need to allow other fibers to execute. This is preferred over
TestClock.adjust
for non-time-dependent code.
typescript
import { it } from "@effect/vitest"
import { Effect, Exit, Fiber } from "effect"

it.effect("fiber polling with yieldNow", () =>
  Effect.gen(function* () {
    const latch = yield* Effect.makeLatch()

    const fiber = yield* latch.await.pipe(Effect.fork)

    yield* Effect.yieldNow()

    expect(fiber.unsafePoll()).toBeNull()

    yield* latch.open

    expect(yield* fiber.await).toEqual(Exit.void)
  })
)
当你需要允许其他Fiber执行时,使用
Effect.yieldNow
。对于非时间相关的代码,这比
TestClock.adjust
更合适。
typescript
import { it } from "@effect/vitest"
import { Effect, Exit, Fiber } from "effect"

it.effect("fiber polling with yieldNow", () =>
  Effect.gen(function* () {
    const latch = yield* Effect.makeLatch()

    const fiber = yield* latch.await.pipe(Effect.fork)

    yield* Effect.yieldNow()

    expect(fiber.unsafePoll()).toBeNull()

    yield* latch.open

    expect(yield* fiber.await).toEqual(Exit.void)
  })
)

Latch - Explicit Coordination

Latch - 显式协调

Effect.makeLatch()
creates a gate that blocks fibers until opened:
typescript
import { it } from "@effect/vitest"
import { Effect, Fiber } from "effect"

it.effect("latch coordination", () =>
  Effect.gen(function* () {
    const latch = yield* Effect.makeLatch()

    const fiber = yield* Effect.gen(function* () {
      yield* latch.await
      return "completed"
    }).pipe(Effect.fork)

    yield* Effect.yieldNow()
    expect(fiber.unsafePoll()).toBeNull()

    yield* latch.open

    const result = yield* Fiber.join(fiber)
    expect(result).toBe("completed")
  })
)
Effect.makeLatch()
会创建一个闸门,在打开前会阻塞Fiber:
typescript
import { it } from "@effect/vitest"
import { Effect, Fiber } from "effect"

it.effect("latch coordination", () =>
  Effect.gen(function* () {
    const latch = yield* Effect.makeLatch()

    const fiber = yield* Effect.gen(function* () {
      yield* latch.await
      return "completed"
    }).pipe(Effect.fork)

    yield* Effect.yieldNow()
    expect(fiber.unsafePoll()).toBeNull()

    yield* latch.open

    const result = yield* Fiber.join(fiber)
    expect(result).toBe("completed")
  })
)

Latch Operations

Latch操作

typescript
import { Effect } from "effect"

declare const latch: Effect.Effect.Success<ReturnType<typeof Effect.makeLatch>>

latch.await       // Wait until latch is open
latch.open        // Open the latch (allows waiters through)
latch.close       // Close the latch (blocks future waiters)
latch.release     // Open once, then close
latch.whenOpen    // Run effect only when latch is open
typescript
import { Effect } from "effect"

declare const latch: Effect.Effect.Success<ReturnType<typeof Effect.makeLatch>>

latch.await       // 等待latch打开
latch.open        // 打开latch(允许等待的Fiber通过)
latch.close       // 关闭latch(阻塞后续等待的Fiber)
latch.release     // 打开一次后关闭
latch.whenOpen    // 仅当latch打开时运行effect

Deferred - Signal Readiness Between Fibers

Deferred - Fiber间的就绪信号

Use
Deferred
when one fiber needs to signal another with a value:
typescript
import { it } from "@effect/vitest"
import { Effect, Deferred, Fiber } from "effect"

it.effect("deferred signaling", () =>
  Effect.gen(function* () {
    const signal = yield* Deferred.make<number>()

    const consumer = yield* Effect.gen(function* () {
      const value = yield* Deferred.await(signal)
      return value * 2
    }).pipe(Effect.fork)

    yield* Deferred.succeed(signal, 21)

    const result = yield* Fiber.join(consumer)
    expect(result).toBe(42)
  })
)
当一个Fiber需要向另一个Fiber发送值信号时,使用
Deferred
typescript
import { it } from "@effect/vitest"
import { Effect, Deferred, Fiber } from "effect"

it.effect("deferred signaling", () =>
  Effect.gen(function* () {
    const signal = yield* Deferred.make<number>()

    const consumer = yield* Effect.gen(function* () {
      const value = yield* Deferred.await(signal)
      return value * 2
    }).pipe(Effect.fork)

    yield* Deferred.succeed(signal, 21)

    const result = yield* Fiber.join(consumer)
    expect(result).toBe(42)
  })
)

fiber.unsafePoll() - Check Completion Without Blocking

fiber.unsafePoll() - 无阻塞检查完成状态

typescript
import { Effect, Exit, Fiber } from "effect"

declare const fiber: Fiber.RuntimeFiber<string>

fiber.unsafePoll()
// Returns null if running
// Returns Exit<A, E> if completed (success, failure, or interrupted)

// Check if still running
expect(fiber.unsafePoll()).toBeNull()

// Check if completed
expect(fiber.unsafePoll()).toBeDefined()

// Check specific completion
expect(fiber.unsafePoll()).toEqual(Exit.succeed("result"))
typescript
import { Effect, Exit, Fiber } from "effect"

declare const fiber: Fiber.RuntimeFiber<string>

fiber.unsafePoll()
// 若Fiber正在运行则返回null
// 若已完成(成功、失败或中断)则返回Exit<A, E>

// 检查是否仍在运行
expect(fiber.unsafePoll()).toBeNull()

// 检查是否已完成
expect(fiber.unsafePoll()).toBeDefined()

// 检查具体的完成状态
expect(fiber.unsafePoll()).toEqual(Exit.succeed("result"))

PubSub Event Testing

PubSub事件测试

Direct Event Verification

直接事件验证

Use
Effect.scoped
to manage PubSub subscription lifecycle:
typescript
import { it } from "@effect/vitest"
import { Effect, PubSub } from "effect"

it.effect("verify published events", () =>
  Effect.gen(function* () {
    const pubsub = yield* PubSub.unbounded<string>()

    yield* Effect.scoped(
      Effect.gen(function* () {
        const sub = yield* PubSub.subscribe(pubsub)

        yield* PubSub.publish(pubsub, "event-1")
        yield* PubSub.publish(pubsub, "event-2")

        const events = yield* PubSub.takeAll(sub)

        expect(events).toEqual(["event-1", "event-2"])
      })
    )
  })
)
使用
Effect.scoped
管理PubSub订阅的生命周期:
typescript
import { it } from "@effect/vitest"
import { Effect, PubSub } from "effect"

it.effect("verify published events", () =>
  Effect.gen(function* () {
    const pubsub = yield* PubSub.unbounded<string>()

    yield* Effect.scoped(
      Effect.gen(function* () {
        const sub = yield* PubSub.subscribe(pubsub)

        yield* PubSub.publish(pubsub, "event-1")
        yield* PubSub.publish(pubsub, "event-2")

        const events = yield* PubSub.takeAll(sub)

        expect(events).toEqual(["event-1", "event-2"])
      })
    )
  })
)

Testing Event Publishers

测试事件发布者

When testing a service that publishes events:
typescript
import { it } from "@effect/vitest"
import { Effect, PubSub, Context, Layer } from "effect"

interface UserEvent {
  readonly type: "created" | "deleted"
  readonly userId: string
}

class EventBus extends Context.Tag("EventBus")<
  EventBus,
  PubSub.PubSub<UserEvent>
>() {}

class UserService extends Context.Tag("UserService")<
  UserService,
  { readonly createUser: (id: string) => Effect.Effect<void> }
>() {}

declare const UserServiceLive: Layer.Layer<UserService, never, EventBus>

it.effect("should publish user created event", () =>
  Effect.gen(function* () {
    const pubsub = yield* PubSub.unbounded<UserEvent>()

    yield* Effect.scoped(
      Effect.gen(function* () {
        const sub = yield* PubSub.subscribe(pubsub)

        const service = yield* UserService
        yield* service.createUser("user-123")

        const events = yield* PubSub.takeAll(sub)

        expect(events).toHaveLength(1)
        expect(events[0]).toEqual({
          type: "created",
          userId: "user-123"
        })
      })
    )
  }).pipe(
    Effect.provide(UserServiceLive),
    Effect.provide(Layer.succeed(EventBus, pubsub))
  )
)
测试发布事件的服务时:
typescript
import { it } from "@effect/vitest"
import { Effect, PubSub, Context, Layer } from "effect"

interface UserEvent {
  readonly type: "created" | "deleted"
  readonly userId: string
}

class EventBus extends Context.Tag("EventBus")<
  EventBus,
  PubSub.PubSub<UserEvent>
>() {}

class UserService extends Context.Tag("UserService")<
  UserService,
  { readonly createUser: (id: string) => Effect.Effect<void> }
>() {}

declare const UserServiceLive: Layer.Layer<UserService, never, EventBus>

it.effect("should publish user created event", () =>
  Effect.gen(function* () {
    const pubsub = yield* PubSub.unbounded<UserEvent>()

    yield* Effect.scoped(
      Effect.gen(function* () {
        const sub = yield* PubSub.subscribe(pubsub)

        const service = yield* UserService
        yield* service.createUser("user-123")

        const events = yield* PubSub.takeAll(sub)

        expect(events).toHaveLength(1)
        expect(events[0]).toEqual({
          type: "created",
          userId: "user-123"
        })
      })
    )
  }).pipe(
    Effect.provide(UserServiceLive),
    Effect.provide(Layer.succeed(EventBus, pubsub))
  )
)

Concurrent Publisher/Subscriber Testing

并发发布者/订阅者测试

typescript
import { it } from "@effect/vitest"
import { Effect, PubSub, Fiber, Array as A } from "effect"

it.effect("concurrent publishers and subscribers", () =>
  Effect.gen(function* () {
    const values = A.range(0, 9)
    const latch = yield* Effect.makeLatch()
    const pubsub = yield* PubSub.bounded<number>(10)

    const subscriber = yield* PubSub.subscribe(pubsub).pipe(
      Effect.flatMap((sub) =>
        latch.await.pipe(
          Effect.andThen(
            Effect.forEach(values, () => PubSub.take(sub))
          )
        )
      ),
      Effect.scoped,
      Effect.forkScoped
    )

    yield* PubSub.publishAll(pubsub, values)
    yield* latch.open

    const result = yield* Fiber.join(subscriber)
    expect(result).toEqual(values)
  })
)
typescript
import { it } from "@effect/vitest"
import { Effect, PubSub, Fiber, Array as A } from "effect"

it.effect("concurrent publishers and subscribers", () =>
  Effect.gen(function* () {
    const values = A.range(0, 9)
    const latch = yield* Effect.makeLatch()
    const pubsub = yield* PubSub.bounded<number>(10)

    const subscriber = yield* PubSub.subscribe(pubsub).pipe(
      Effect.flatMap((sub) =>
        latch.await.pipe(
          Effect.andThen(
            Effect.forEach(values, () => PubSub.take(sub))
          )
        )
      ),
      Effect.scoped,
      Effect.forkScoped
    )

    yield* PubSub.publishAll(pubsub, values)
    yield* latch.open

    const result = yield* Fiber.join(subscriber)
    expect(result).toEqual(values)
  })
)

Forked Fiber PubSub Subscriptions

已分叉Fiber的PubSub订阅

When testing forked fibers that subscribe to a PubSub, proper yield ordering is critical to avoid losing events.
测试订阅PubSub的已分叉Fiber时,正确的让出执行顺序对于避免丢失事件至关重要。

Correct Order: yieldNow After Subscribe, Then After Each Publish

正确顺序:订阅后调用yieldNow,每次发布后调用yieldNow

typescript
import { it, expect } from "@effect/vitest"
import { Effect, PubSub, Ref, Array as A } from "effect"

it.effect("forked subscriber receives all events", () =>
  Effect.gen(function* () {
    const pubsub = yield* PubSub.unbounded<string>()
    const received = yield* Ref.make<string[]>([])

    yield* Effect.scoped(
      Effect.gen(function* () {
        const sub = yield* PubSub.subscribe(pubsub)

        yield* Effect.fork(
          Effect.forever(
            Effect.gen(function* () {
              const msg = yield* PubSub.take(sub)
              yield* Ref.update(received, A.append(msg))
            })
          )
        )

        yield* Effect.yieldNow()  // Let forked fiber start and become ready

        yield* PubSub.publish(pubsub, "event-1")
        yield* Effect.yieldNow()  // Let fiber process event-1

        yield* PubSub.publish(pubsub, "event-2")
        yield* Effect.yieldNow()  // Let fiber process event-2

        const events = yield* Ref.get(received)
        expect(events).toEqual(["event-1", "event-2"])
      })
    )
  })
)
typescript
import { it, expect } from "@effect/vitest"
import { Effect, PubSub, Ref, Array as A } from "effect"

it.effect("forked subscriber receives all events", () =>
  Effect.gen(function* () {
    const pubsub = yield* PubSub.unbounded<string>()
    const received = yield* Ref.make<string[]>([])

    yield* Effect.scoped(
      Effect.gen(function* () {
        const sub = yield* PubSub.subscribe(pubsub)

        yield* Effect.fork(
          Effect.forever(
            Effect.gen(function* () {
              const msg = yield* PubSub.take(sub)
              yield* Ref.update(received, A.append(msg))
            })
          )
        )

        yield* Effect.yieldNow()  // 让已分叉的Fiber启动并就绪

        yield* PubSub.publish(pubsub, "event-1")
        yield* Effect.yieldNow()  // 让Fiber处理event-1

        yield* PubSub.publish(pubsub, "event-2")
        yield* Effect.yieldNow()  // 让Fiber处理event-2

        const events = yield* Ref.get(received)
        expect(events).toEqual(["event-1", "event-2"])
      })
    )
  })
)

Why This Order Matters

为何此顺序至关重要

The fiber scheduling model requires explicit yields at specific points:
  1. yieldNow after subscribe/fork: The forked fiber needs a chance to execute its first instruction (the
    PubSub.take
    ) before any events are published. Without this yield, the fiber hasn't started yet.
  2. yieldNow after each publish: After publishing, the subscriber fiber needs a turn to process the event. Without yielding, you may publish multiple events before the fiber processes any.
Fiber调度模型需要在特定点显式让出执行权:
  1. 订阅/分叉后调用yieldNow:已分叉的Fiber需要机会执行其第一条指令(
    PubSub.take
    ),然后才能发布任何事件。如果没有这个让出操作,Fiber还未启动。
  2. 每次发布后调用yieldNow:发布后,订阅者Fiber需要机会处理事件。如果不让出执行权,你可能会在Fiber处理任何事件前发布多个事件。

Common Mistake: Events Lost

常见错误:事件丢失

typescript
import { Effect, PubSub, Ref } from "effect"

// BAD - Events are lost because fiber hasn't started
Effect.gen(function* () {
  const pubsub = yield* PubSub.unbounded<string>()
  const received = yield* Ref.make<string[]>([])

  yield* Effect.scoped(
    Effect.gen(function* () {
      const sub = yield* PubSub.subscribe(pubsub)

      yield* Effect.fork(/* subscriber logic */)

      // WRONG: Publishing immediately - fiber not ready yet!
      yield* PubSub.publish(pubsub, "event-1")
      yield* PubSub.publish(pubsub, "event-2")

      yield* Effect.yieldNow()  // Too late - events already missed

      const events = yield* Ref.get(received)
      // events may be [] or incomplete!
    })
  )
})
typescript
import { Effect, PubSub, Ref } from "effect"

// 错误示例 - 事件丢失,因为Fiber尚未就绪
Effect.gen(function* () {
  const pubsub = yield* PubSub.unbounded<string>()
  const received = yield* Ref.make<string[]>([])

  yield* Effect.scoped(
    Effect.gen(function* () {
      const sub = yield* PubSub.subscribe(pubsub)

      yield* Effect.fork(/* 订阅者逻辑 */)

      // 错误:立即发布事件 - Fiber尚未就绪!
      yield* PubSub.publish(pubsub, "event-1")
      yield* PubSub.publish(pubsub, "event-2")

      yield* Effect.yieldNow()  // 太晚了 - 事件已丢失

      const events = yield* Ref.get(received)
      // events可能为空或不完整!
    })
  )
})

Single yieldNow Is Sufficient

单次yieldNow已足够

Unlike
sleep(0)
patterns in other runtimes, Effect's
yieldNow
is deterministic within the fiber scheduler. A single
yieldNow
is sufficient at each synchronization point - no need for multiple yields or retry loops.
typescript
import { Effect, PubSub } from "effect"

// GOOD - Single yield at each point
yield* Effect.fork(subscriber)
yield* Effect.yieldNow()  // One yield is enough

yield* PubSub.publish(pubsub, "event")
yield* Effect.yieldNow()  // One yield is enough

// BAD - Unnecessary multiple yields
yield* Effect.fork(subscriber)
yield* Effect.yieldNow()
yield* Effect.yieldNow()  // Redundant
yield* Effect.yieldNow()  // Redundant
与其他运行时中的
sleep(0)
模式不同,Effect的
yieldNow
在Fiber调度器中是确定性的。每个同步点只需调用一次
yieldNow
即可 - 无需多次调用或重试循环。
typescript
import { Effect, PubSub } from "effect"

// 正确示例 - 每个点只需一次让出
yield* Effect.fork(subscriber)
yield* Effect.yieldNow()  // 一次让出足够

yield* PubSub.publish(pubsub, "event")
yield* Effect.yieldNow()  // 一次让出足够

// 错误示例 - 不必要的多次让出
yield* Effect.fork(subscriber)
yield* Effect.yieldNow()
yield* Effect.yieldNow()  // 冗余
yield* Effect.yieldNow()  // 冗余

Testing Observer Pattern with Session

使用Session测试观察者模式

This pattern applies to any forked subscriber, including observer patterns:
typescript
import { it, expect } from "@effect/vitest"
import { Effect } from "effect"

declare const Observer: {
  attach: (
    session: unknown,
    observer: unknown,
    args: unknown
  ) => Effect.Effect<void>
}

declare const Session: {
  publish: (session: unknown, event: unknown) => Effect.Effect<void>
}

declare const session: unknown
declare const observer: unknown
declare const args: unknown
declare const event1: unknown
declare const event2: unknown
declare const getResults: () => Effect.Effect<unknown[]>

it.effect("observer receives session events", () =>
  Effect.gen(function* () {
    yield* Observer.attach(session, observer, args)  // Forks subscriber
    yield* Effect.yieldNow()                         // Let fiber start

    yield* Session.publish(session, event1)
    yield* Effect.yieldNow()                         // Let event process

    yield* Session.publish(session, event2)
    yield* Effect.yieldNow()                         // Let event process

    const results = yield* getResults()
    expect(results).toHaveLength(2)
  })
)
此模式适用于任何已分叉的订阅者,包括观察者模式:
typescript
import { it, expect } from "@effect/vitest"
import { Effect } from "effect"

declare const Observer: {
  attach: (
    session: unknown,
    observer: unknown,
    args: unknown
  ) => Effect.Effect<void>
}

declare const Session: {
  publish: (session: unknown, event: unknown) => Effect.Effect<void>
}

declare const session: unknown
declare const observer: unknown
declare const args: unknown
declare const event1: unknown
declare const event2: unknown
declare const getResults: () => Effect.Effect<unknown[]>

it.effect("observer receives session events", () =>
  Effect.gen(function* () {
    yield* Observer.attach(session, observer, args)  // 分叉订阅者
    yield* Effect.yieldNow()                         // 让Fiber启动

    yield* Session.publish(session, event1)
    yield* Effect.yieldNow()                         // 让事件处理完成

    yield* Session.publish(session, event2)
    yield* Effect.yieldNow()                         // 让事件处理完成

    const results = yield* getResults()
    expect(results).toHaveLength(2)
  })
)

SubscriptionRef Testing

SubscriptionRef测试

Testing Stream Changes with Latches

使用Latch测试流变更

The latch pattern ensures the stream subscription is ready before mutations:
typescript
import { it } from "@effect/vitest"
import { Effect, Fiber, Number } from "effect"
import { Stream, SubscriptionRef } from "effect/stream"

it.effect("multiple subscribers can receive changes", () =>
  Effect.gen(function* () {
    const ref = yield* SubscriptionRef.make(0)
    const latch1 = yield* Effect.makeLatch()
    const latch2 = yield* Effect.makeLatch()

    const fiber1 = yield* SubscriptionRef.changes(ref).pipe(
      Stream.tap(() => latch1.open),
      Stream.take(3),
      Stream.runCollect,
      Effect.forkScoped
    )

    yield* latch1.await
    yield* SubscriptionRef.update(ref, Number.increment)

    const fiber2 = yield* SubscriptionRef.changes(ref).pipe(
      Stream.tap(() => latch2.open),
      Stream.take(2),
      Stream.runCollect,
      Effect.forkScoped
    )

    yield* latch2.await
    yield* SubscriptionRef.update(ref, Number.increment)

    const result1 = yield* Fiber.join(fiber1)
    const result2 = yield* Fiber.join(fiber2)

    expect(result1).toEqual([0, 1, 2])
    expect(result2).toEqual([1, 2])
  })
)
Latch模式可确保在变更前流订阅已就绪:
typescript
import { it } from "@effect/vitest"
import { Effect, Fiber, Number } from "effect"
import { Stream, SubscriptionRef } from "effect/stream"

it.effect("multiple subscribers can receive changes", () =>
  Effect.gen(function* () {
    const ref = yield* SubscriptionRef.make(0)
    const latch1 = yield* Effect.makeLatch()
    const latch2 = yield* Effect.makeLatch()

    const fiber1 = yield* SubscriptionRef.changes(ref).pipe(
      Stream.tap(() => latch1.open),
      Stream.take(3),
      Stream.runCollect,
      Effect.forkScoped
    )

    yield* latch1.await
    yield* SubscriptionRef.update(ref, Number.increment)

    const fiber2 = yield* SubscriptionRef.changes(ref).pipe(
      Stream.tap(() => latch2.open),
      Stream.take(2),
      Stream.runCollect,
      Effect.forkScoped
    )

    yield* latch2.await
    yield* SubscriptionRef.update(ref, Number.increment)

    const result1 = yield* Fiber.join(fiber1)
    const result2 = yield* Fiber.join(fiber2)

    expect(result1).toEqual([0, 1, 2])
    expect(result2).toEqual([1, 2])
  })
)

Testing Subscription Interruption

测试订阅中断

typescript
import { it } from "@effect/vitest"
import { Effect, Exit, Fiber, Number, Cause } from "effect"
import { Pull, Stream, SubscriptionRef } from "effect/stream"

it.effect("subscriptions are interruptible", () =>
  Effect.gen(function* () {
    const ref = yield* SubscriptionRef.make(0)
    const latch = yield* Effect.makeLatch()

    const fiber = yield* SubscriptionRef.changes(ref).pipe(
      Stream.tap(() => latch.open),
      Stream.take(10),
      Stream.runCollect,
      Effect.forkScoped
    )

    yield* latch.await
    yield* SubscriptionRef.update(ref, Number.increment)
    yield* Fiber.interrupt(fiber)

    const result = yield* Fiber.await(fiber)

    expect(
      Exit.isFailure(result) && Pull.isHaltCause(result.cause)
    ).toBe(true)
  })
)
typescript
import { it } from "@effect/vitest"
import { Effect, Exit, Fiber, Number, Cause } from "effect"
import { Pull, Stream, SubscriptionRef } from "effect/stream"

it.effect("subscriptions are interruptible", () =>
  Effect.gen(function* () {
    const ref = yield* SubscriptionRef.make(0)
    const latch = yield* Effect.makeLatch()

    const fiber = yield* SubscriptionRef.changes(ref).pipe(
      Stream.tap(() => latch.open),
      Stream.take(10),
      Stream.runCollect,
      Effect.forkScoped
    )

    yield* latch.await
    yield* SubscriptionRef.update(ref, Number.increment)
    yield* Fiber.interrupt(fiber)

    const result = yield* Fiber.await(fiber)

    expect(
      Exit.isFailure(result) && Pull.isHaltCause(result.cause)
    ).toBe(true)
  })
)

Stream Testing

Stream测试

Collecting Stream Results

收集Stream结果

typescript
import { it } from "@effect/vitest"
import { Effect } from "effect"
import { Stream } from "effect/stream"

it.effect("should collect stream elements", () =>
  Effect.gen(function* () {
    const result = yield* Stream.make(1, 2, 3, 4, 5).pipe(
      Stream.filter((n) => n % 2 === 0),
      Stream.runCollect
    )

    expect(result).toEqual([2, 4])
  })
)
typescript
import { it } from "@effect/vitest"
import { Effect } from "effect"
import { Stream } from "effect/stream"

it.effect("should collect stream elements", () =>
  Effect.gen(function* () {
    const result = yield* Stream.make(1, 2, 3, 4, 5).pipe(
      Stream.filter((n) => n % 2 === 0),
      Stream.runCollect
    )

    expect(result).toEqual([2, 4])
  })
)

Testing Stream Side Effects

测试Stream副作用

typescript
import { it } from "@effect/vitest"
import { Effect, Ref } from "effect"
import { Stream } from "effect/stream"

it.effect("should track side effects", () =>
  Effect.gen(function* () {
    const log = yield* Ref.make<string[]>([])

    yield* Stream.make("a", "b", "c").pipe(
      Stream.tap((item) => Ref.update(log, (items) => [...items, item])),
      Stream.runDrain
    )

    const logged = yield* Ref.get(log)
    expect(logged).toEqual(["a", "b", "c"])
  })
)
typescript
import { it } from "@effect/vitest"
import { Effect, Ref } from "effect"
import { Stream } from "effect/stream"

it.effect("should track side effects", () =>
  Effect.gen(function* () {
    const log = yield* Ref.make<string[]>([])

    yield* Stream.make("a", "b", "c").pipe(
      Stream.tap((item) => Ref.update(log, (items) => [...items, item])),
      Stream.runDrain
    )

    const logged = yield* Ref.get(log)
    expect(logged).toEqual(["a", "b", "c"])
  })
)

Testing Stream Errors

测试Stream错误

typescript
import { it } from "@effect/vitest"
import { Effect, Exit, Data } from "effect"
import { Stream } from "effect/stream"

class StreamError extends Data.TaggedError("StreamError")<{
  readonly message: string
}> {}

it.effect("should handle stream errors", () =>
  Effect.gen(function* () {
    const result = yield* Stream.make(1, 2, 3).pipe(
      Stream.mapEffect((n) =>
        n === 2
          ? Effect.fail(new StreamError({ message: "boom" }))
          : Effect.succeed(n)
      ),
      Stream.runCollect,
      Effect.exit
    )

    expect(Exit.isFailure(result)).toBe(true)
  })
)
typescript
import { it } from "@effect/vitest"
import { Effect, Exit, Data } from "effect"
import { Stream } from "effect/stream"

class StreamError extends Data.TaggedError("StreamError")<{
  readonly message: string
}> {}

it.effect("should handle stream errors", () =>
  Effect.gen(function* () {
    const result = yield* Stream.make(1, 2, 3).pipe(
      Stream.mapEffect((n) =>
        n === 2
          ? Effect.fail(new StreamError({ message: "boom" }))
          : Effect.succeed(n)
      ),
      Stream.runCollect,
      Effect.exit
    )

    expect(Exit.isFailure(result)).toBe(true)
  })
)

Testing Stream Finalization

测试Stream终结操作

typescript
import { it } from "@effect/vitest"
import { Effect, Ref } from "effect"
import { Stream } from "effect/stream"

it.effect("should run finalizers", () =>
  Effect.gen(function* () {
    const finalized = yield* Ref.make(false)

    yield* Stream.make(1, 2, 3).pipe(
      Stream.ensuring(Ref.set(finalized, true)),
      Stream.take(1),
      Stream.runDrain
    )

    expect(yield* Ref.get(finalized)).toBe(true)
  })
)
typescript
import { it } from "@effect/vitest"
import { Effect, Ref } from "effect"
import { Stream } from "effect/stream"

it.effect("should run finalizers", () =>
  Effect.gen(function* () {
    const finalized = yield* Ref.make(false)

    yield* Stream.make(1, 2, 3).pipe(
      Stream.ensuring(Ref.set(finalized, true)),
      Stream.take(1),
      Stream.runDrain
    )

    expect(yield* Ref.get(finalized)).toBe(true)
  })
)

Interruption Testing

中断测试

Testing Fiber Interruption

测试Fiber中断

typescript
import { it } from "@effect/vitest"
import { Effect, Exit, Fiber, Cause } from "effect"

it.effect("should handle interruption", () =>
  Effect.gen(function* () {
    const fiber = yield* Effect.never.pipe(Effect.fork)

    yield* Fiber.interrupt(fiber)

    const result = yield* Fiber.await(fiber)

    expect(Exit.isInterrupted(result)).toBe(true)
  })
)
typescript
import { it } from "@effect/vitest"
import { Effect, Exit, Fiber, Cause } from "effect"

it.effect("should handle interruption", () =>
  Effect.gen(function* () {
    const fiber = yield* Effect.never.pipe(Effect.fork)

    yield* Fiber.interrupt(fiber)

    const result = yield* Fiber.await(fiber)

    expect(Exit.isInterrupted(result)).toBe(true)
  })
)

Testing Interrupted-Only Cause

测试仅中断原因

typescript
import { it } from "@effect/vitest"
import { Effect, Exit, Fiber, Cause } from "effect"

it.effect("should have interrupted-only cause", () =>
  Effect.gen(function* () {
    const fiber = yield* Effect.never.pipe(Effect.fork)

    yield* Fiber.interrupt(fiber)

    const result = yield* Fiber.await(fiber)

    expect(
      Exit.isFailure(result) && Cause.isInterruptedOnly(result.cause)
    ).toBe(true)
  })
)
typescript
import { it } from "@effect/vitest"
import { Effect, Exit, Fiber, Cause } from "effect"

it.effect("should have interrupted-only cause", () =>
  Effect.gen(function* () {
    const fiber = yield* Effect.never.pipe(Effect.fork)

    yield* Fiber.interrupt(fiber)

    const result = yield* Fiber.await(fiber)

    expect(
      Exit.isFailure(result) && Cause.isInterruptedOnly(result.cause)
    ).toBe(true)
  })
)

Time-Dependent Concurrency Testing

时间相关并发测试

Use
TestClock
only when testing time-dependent behavior like delays, timeouts, or schedules.
typescript
import { it } from "@effect/vitest"
import { Effect, Fiber, TestClock, Duration } from "effect"

it.effect("should handle delayed concurrent operations", () =>
  Effect.gen(function* () {
    const fiber = yield* Effect.gen(function* () {
      yield* Effect.sleep(Duration.seconds(5))
      return "done"
    }).pipe(Effect.fork)

    yield* TestClock.adjust(Duration.seconds(5))

    const result = yield* Fiber.join(fiber)
    expect(result).toBe("done")
  })
)
仅在测试时间相关行为(如延迟、超时或调度)时使用
TestClock
typescript
import { it } from "@effect/vitest"
import { Effect, Fiber, TestClock, Duration } from "effect"

it.effect("should handle delayed concurrent operations", () =>
  Effect.gen(function* () {
    const fiber = yield* Effect.gen(function* () {
      yield* Effect.sleep(Duration.seconds(5))
      return "done"
    }).pipe(Effect.fork)

    yield* TestClock.adjust(Duration.seconds(5))

    const result = yield* Fiber.join(fiber)
    expect(result).toBe("done")
  })
)

Anti-Patterns

反模式

DON'T use TestClock for non-time-dependent code

不要对非时间相关代码使用TestClock

typescript
import { Effect, TestClock, Duration } from "effect"

// BAD - Using TestClock when not needed
Effect.gen(function* () {
  const fiber = yield* someEffect.pipe(Effect.fork)
  yield* TestClock.adjust(Duration.millis(100))
  yield* Fiber.join(fiber)
})

// GOOD - Use yieldNow for simple yielding
Effect.gen(function* () {
  const fiber = yield* someEffect.pipe(Effect.fork)
  yield* Effect.yieldNow()
  yield* Fiber.join(fiber)
})
typescript
import { Effect, TestClock, Duration } from "effect"

// 错误示例 - 不必要时使用TestClock
Effect.gen(function* () {
  const fiber = yield* someEffect.pipe(Effect.fork)
  yield* TestClock.adjust(Duration.millis(100))
  yield* Fiber.join(fiber)
})

// 正确示例 - 对简单让出使用yieldNow
Effect.gen(function* () {
  const fiber = yield* someEffect.pipe(Effect.fork)
  yield* Effect.yieldNow()
  yield* Fiber.join(fiber)
})

DON'T poll in a loop without yieldNow

不要在无yieldNow的循环中轮询

typescript
import { Effect, Fiber } from "effect"

declare const fiber: Fiber.RuntimeFiber<void>

// BAD - Busy loop
while (fiber.unsafePoll() === null) {
  // Spins forever!
}

// GOOD - Yield between polls or use Fiber.await
Effect.gen(function* () {
  while (fiber.unsafePoll() === null) {
    yield* Effect.yieldNow()
  }
})

// BETTER - Just await the fiber
Effect.gen(function* () {
  yield* Fiber.await(fiber)
})
typescript
import { Effect, Fiber } from "effect"

declare const fiber: Fiber.RuntimeFiber<void>

// 错误示例 - 忙循环
while (fiber.unsafePoll() === null) {
  // 永远自旋!
}

// 正确示例 - 在轮询间让出或使用Fiber.await
Effect.gen(function* () {
  while (fiber.unsafePoll() === null) {
    yield* Effect.yieldNow()
  }
})

// 更优示例 - 直接等待Fiber
Effect.gen(function* () {
  yield* Fiber.await(fiber)
})

DON'T forget Effect.scoped for PubSub subscriptions

不要忘记为PubSub订阅使用Effect.scoped

typescript
import { Effect, PubSub } from "effect"

declare const pubsub: PubSub.PubSub<string>

// BAD - Subscription leaks
Effect.gen(function* () {
  const sub = yield* PubSub.subscribe(pubsub)
  // Sub is never cleaned up!
})

// GOOD - Scoped subscription
Effect.gen(function* () {
  yield* Effect.scoped(
    Effect.gen(function* () {
      const sub = yield* PubSub.subscribe(pubsub)
      const events = yield* PubSub.takeAll(sub)
      // Sub cleaned up when scope closes
    })
  )
})
typescript
import { Effect, PubSub } from "effect"

declare const pubsub: PubSub.PubSub<string>

// 错误示例 - 订阅泄漏
Effect.gen(function* () {
  const sub = yield* PubSub.subscribe(pubsub)
  // 订阅永远不会被清理!
})

// 正确示例 - 作用域内的订阅
Effect.gen(function* () {
  yield* Effect.scoped(
    Effect.gen(function* () {
      const sub = yield* PubSub.subscribe(pubsub)
      const events = yield* PubSub.takeAll(sub)
      // 作用域关闭时订阅会被清理
    })
  )
})

DON'T start subscriptions after mutations

不要在变更后才启动订阅

typescript
import { Effect } from "effect"
import { Stream, SubscriptionRef } from "effect/stream"

declare const ref: SubscriptionRef.SubscriptionRef<number>

// BAD - May miss events
Effect.gen(function* () {
  yield* SubscriptionRef.update(ref, (n) => n + 1)
  const fiber = yield* SubscriptionRef.changes(ref).pipe(
    Stream.take(1),
    Stream.runCollect,
    Effect.fork
  )
  // Subscription started after mutation - may miss it!
})

// GOOD - Use latch to ensure subscription is ready
Effect.gen(function* () {
  const latch = yield* Effect.makeLatch()

  const fiber = yield* SubscriptionRef.changes(ref).pipe(
    Stream.tap(() => latch.open),
    Stream.take(2),
    Stream.runCollect,
    Effect.forkScoped
  )

  yield* latch.await
  yield* SubscriptionRef.update(ref, (n) => n + 1)

  const result = yield* Fiber.join(fiber)
})
typescript
import { Effect } from "effect"
import { Stream, SubscriptionRef } from "effect/stream"

declare const ref: SubscriptionRef.SubscriptionRef<number>

// 错误示例 - 可能丢失事件
Effect.gen(function* () {
  yield* SubscriptionRef.update(ref, (n) => n + 1)
  const fiber = yield* SubscriptionRef.changes(ref).pipe(
    Stream.take(1),
    Stream.runCollect,
    Effect.fork
  )
  // 订阅在变更后启动 - 可能会错过该事件!
})

// 正确示例 - 使用Latch确保订阅就绪
Effect.gen(function* () {
  const latch = yield* Effect.makeLatch()

  const fiber = yield* SubscriptionRef.changes(ref).pipe(
    Stream.tap(() => latch.open),
    Stream.take(2),
    Stream.runCollect,
    Effect.forkScoped
  )

  yield* latch.await
  yield* SubscriptionRef.update(ref, (n) => n + 1)

  const result = yield* Fiber.join(fiber)
})

Quality Checklist

质量检查清单

  • Using correct coordination primitive for the use case
  • Effect.scoped
    wraps PubSub subscriptions
  • Latches ensure stream subscriptions are ready before mutations
  • Effect.yieldNow
    after fork to let subscriber fiber start
  • Effect.yieldNow
    after each publish to let fiber process event
  • Effect.yieldNow
    used instead of TestClock for non-time-dependent code
  • Fiber interruption tested with
    Exit.isInterrupted
    or
    Cause.isInterruptedOnly
  • Stream finalizers verified with
    Stream.ensuring
  • No busy polling without yields
  • Test is deterministic (no race conditions)
  • 根据使用场景选择了正确的协调原语
  • Effect.scoped
    包裹了PubSub订阅
  • 使用Latch确保流订阅在变更前已就绪
  • 分叉后调用
    Effect.yieldNow
    让订阅者Fiber启动
  • 每次发布后调用
    Effect.yieldNow
    让Fiber处理事件
  • 对非时间相关代码使用
    Effect.yieldNow
    而非TestClock
  • 使用
    Exit.isInterrupted
    Cause.isInterruptedOnly
    测试Fiber中断
  • 使用
    Stream.ensuring
    验证流终结操作
  • 没有无让出操作的忙轮询
  • 测试是确定性的(无竞态条件)