effect-concurrency-testing
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseEffect Concurrency Testing Skill
Effect并发测试技能
This skill provides patterns for testing Effect's concurrency primitives: fibers, latches, deferreds, PubSub, SubscriptionRef, and streams.
本技能提供了测试Effect并发原语的模式:fibers、latches、deferreds、PubSub、SubscriptionRef和streams。
Core Principles
核心原则
CRITICAL: Choose the correct coordination primitive based on what you need to synchronize.
| Need | Use |
|---|---|
| Simple fiber yield | |
| Forked PubSub subscriber ready | |
| Wait for subscriber ready | |
| Wait for stream element | |
| Time-dependent behavior | |
| Verify events published | |
| Check fiber status | |
关键提示:根据需要同步的内容选择正确的协调原语。
| 需求 | 使用工具 |
|---|---|
| 简单Fiber让出执行权 | |
| 已分叉的PubSub订阅者就绪 | 分叉后调用 |
| 等待订阅者就绪 | |
| 等待流元素 | |
| 时间相关行为 | |
| 验证已发布的事件 | |
| 检查Fiber状态 | |
Fiber Coordination Patterns
Fiber协调模式
Effect.yieldNow - Simple Fiber Scheduling
Effect.yieldNow - 简单Fiber调度
Use when you need to allow other fibers to execute. This is preferred over for non-time-dependent code.
Effect.yieldNowTestClock.adjusttypescript
import { it } from "@effect/vitest"
import { Effect, Exit, Fiber } from "effect"
it.effect("fiber polling with yieldNow", () =>
Effect.gen(function* () {
const latch = yield* Effect.makeLatch()
const fiber = yield* latch.await.pipe(Effect.fork)
yield* Effect.yieldNow()
expect(fiber.unsafePoll()).toBeNull()
yield* latch.open
expect(yield* fiber.await).toEqual(Exit.void)
})
)当你需要允许其他Fiber执行时,使用。对于非时间相关的代码,这比更合适。
Effect.yieldNowTestClock.adjusttypescript
import { it } from "@effect/vitest"
import { Effect, Exit, Fiber } from "effect"
it.effect("fiber polling with yieldNow", () =>
Effect.gen(function* () {
const latch = yield* Effect.makeLatch()
const fiber = yield* latch.await.pipe(Effect.fork)
yield* Effect.yieldNow()
expect(fiber.unsafePoll()).toBeNull()
yield* latch.open
expect(yield* fiber.await).toEqual(Exit.void)
})
)Latch - Explicit Coordination
Latch - 显式协调
Effect.makeLatch()typescript
import { it } from "@effect/vitest"
import { Effect, Fiber } from "effect"
it.effect("latch coordination", () =>
Effect.gen(function* () {
const latch = yield* Effect.makeLatch()
const fiber = yield* Effect.gen(function* () {
yield* latch.await
return "completed"
}).pipe(Effect.fork)
yield* Effect.yieldNow()
expect(fiber.unsafePoll()).toBeNull()
yield* latch.open
const result = yield* Fiber.join(fiber)
expect(result).toBe("completed")
})
)Effect.makeLatch()typescript
import { it } from "@effect/vitest"
import { Effect, Fiber } from "effect"
it.effect("latch coordination", () =>
Effect.gen(function* () {
const latch = yield* Effect.makeLatch()
const fiber = yield* Effect.gen(function* () {
yield* latch.await
return "completed"
}).pipe(Effect.fork)
yield* Effect.yieldNow()
expect(fiber.unsafePoll()).toBeNull()
yield* latch.open
const result = yield* Fiber.join(fiber)
expect(result).toBe("completed")
})
)Latch Operations
Latch操作
typescript
import { Effect } from "effect"
declare const latch: Effect.Effect.Success<ReturnType<typeof Effect.makeLatch>>
latch.await // Wait until latch is open
latch.open // Open the latch (allows waiters through)
latch.close // Close the latch (blocks future waiters)
latch.release // Open once, then close
latch.whenOpen // Run effect only when latch is opentypescript
import { Effect } from "effect"
declare const latch: Effect.Effect.Success<ReturnType<typeof Effect.makeLatch>>
latch.await // 等待latch打开
latch.open // 打开latch(允许等待的Fiber通过)
latch.close // 关闭latch(阻塞后续等待的Fiber)
latch.release // 打开一次后关闭
latch.whenOpen // 仅当latch打开时运行effectDeferred - Signal Readiness Between Fibers
Deferred - Fiber间的就绪信号
Use when one fiber needs to signal another with a value:
Deferredtypescript
import { it } from "@effect/vitest"
import { Effect, Deferred, Fiber } from "effect"
it.effect("deferred signaling", () =>
Effect.gen(function* () {
const signal = yield* Deferred.make<number>()
const consumer = yield* Effect.gen(function* () {
const value = yield* Deferred.await(signal)
return value * 2
}).pipe(Effect.fork)
yield* Deferred.succeed(signal, 21)
const result = yield* Fiber.join(consumer)
expect(result).toBe(42)
})
)当一个Fiber需要向另一个Fiber发送值信号时,使用:
Deferredtypescript
import { it } from "@effect/vitest"
import { Effect, Deferred, Fiber } from "effect"
it.effect("deferred signaling", () =>
Effect.gen(function* () {
const signal = yield* Deferred.make<number>()
const consumer = yield* Effect.gen(function* () {
const value = yield* Deferred.await(signal)
return value * 2
}).pipe(Effect.fork)
yield* Deferred.succeed(signal, 21)
const result = yield* Fiber.join(consumer)
expect(result).toBe(42)
})
)fiber.unsafePoll() - Check Completion Without Blocking
fiber.unsafePoll() - 无阻塞检查完成状态
typescript
import { Effect, Exit, Fiber } from "effect"
declare const fiber: Fiber.RuntimeFiber<string>
fiber.unsafePoll()
// Returns null if running
// Returns Exit<A, E> if completed (success, failure, or interrupted)
// Check if still running
expect(fiber.unsafePoll()).toBeNull()
// Check if completed
expect(fiber.unsafePoll()).toBeDefined()
// Check specific completion
expect(fiber.unsafePoll()).toEqual(Exit.succeed("result"))typescript
import { Effect, Exit, Fiber } from "effect"
declare const fiber: Fiber.RuntimeFiber<string>
fiber.unsafePoll()
// 若Fiber正在运行则返回null
// 若已完成(成功、失败或中断)则返回Exit<A, E>
// 检查是否仍在运行
expect(fiber.unsafePoll()).toBeNull()
// 检查是否已完成
expect(fiber.unsafePoll()).toBeDefined()
// 检查具体的完成状态
expect(fiber.unsafePoll()).toEqual(Exit.succeed("result"))PubSub Event Testing
PubSub事件测试
Direct Event Verification
直接事件验证
Use to manage PubSub subscription lifecycle:
Effect.scopedtypescript
import { it } from "@effect/vitest"
import { Effect, PubSub } from "effect"
it.effect("verify published events", () =>
Effect.gen(function* () {
const pubsub = yield* PubSub.unbounded<string>()
yield* Effect.scoped(
Effect.gen(function* () {
const sub = yield* PubSub.subscribe(pubsub)
yield* PubSub.publish(pubsub, "event-1")
yield* PubSub.publish(pubsub, "event-2")
const events = yield* PubSub.takeAll(sub)
expect(events).toEqual(["event-1", "event-2"])
})
)
})
)使用管理PubSub订阅的生命周期:
Effect.scopedtypescript
import { it } from "@effect/vitest"
import { Effect, PubSub } from "effect"
it.effect("verify published events", () =>
Effect.gen(function* () {
const pubsub = yield* PubSub.unbounded<string>()
yield* Effect.scoped(
Effect.gen(function* () {
const sub = yield* PubSub.subscribe(pubsub)
yield* PubSub.publish(pubsub, "event-1")
yield* PubSub.publish(pubsub, "event-2")
const events = yield* PubSub.takeAll(sub)
expect(events).toEqual(["event-1", "event-2"])
})
)
})
)Testing Event Publishers
测试事件发布者
When testing a service that publishes events:
typescript
import { it } from "@effect/vitest"
import { Effect, PubSub, Context, Layer } from "effect"
interface UserEvent {
readonly type: "created" | "deleted"
readonly userId: string
}
class EventBus extends Context.Tag("EventBus")<
EventBus,
PubSub.PubSub<UserEvent>
>() {}
class UserService extends Context.Tag("UserService")<
UserService,
{ readonly createUser: (id: string) => Effect.Effect<void> }
>() {}
declare const UserServiceLive: Layer.Layer<UserService, never, EventBus>
it.effect("should publish user created event", () =>
Effect.gen(function* () {
const pubsub = yield* PubSub.unbounded<UserEvent>()
yield* Effect.scoped(
Effect.gen(function* () {
const sub = yield* PubSub.subscribe(pubsub)
const service = yield* UserService
yield* service.createUser("user-123")
const events = yield* PubSub.takeAll(sub)
expect(events).toHaveLength(1)
expect(events[0]).toEqual({
type: "created",
userId: "user-123"
})
})
)
}).pipe(
Effect.provide(UserServiceLive),
Effect.provide(Layer.succeed(EventBus, pubsub))
)
)测试发布事件的服务时:
typescript
import { it } from "@effect/vitest"
import { Effect, PubSub, Context, Layer } from "effect"
interface UserEvent {
readonly type: "created" | "deleted"
readonly userId: string
}
class EventBus extends Context.Tag("EventBus")<
EventBus,
PubSub.PubSub<UserEvent>
>() {}
class UserService extends Context.Tag("UserService")<
UserService,
{ readonly createUser: (id: string) => Effect.Effect<void> }
>() {}
declare const UserServiceLive: Layer.Layer<UserService, never, EventBus>
it.effect("should publish user created event", () =>
Effect.gen(function* () {
const pubsub = yield* PubSub.unbounded<UserEvent>()
yield* Effect.scoped(
Effect.gen(function* () {
const sub = yield* PubSub.subscribe(pubsub)
const service = yield* UserService
yield* service.createUser("user-123")
const events = yield* PubSub.takeAll(sub)
expect(events).toHaveLength(1)
expect(events[0]).toEqual({
type: "created",
userId: "user-123"
})
})
)
}).pipe(
Effect.provide(UserServiceLive),
Effect.provide(Layer.succeed(EventBus, pubsub))
)
)Concurrent Publisher/Subscriber Testing
并发发布者/订阅者测试
typescript
import { it } from "@effect/vitest"
import { Effect, PubSub, Fiber, Array as A } from "effect"
it.effect("concurrent publishers and subscribers", () =>
Effect.gen(function* () {
const values = A.range(0, 9)
const latch = yield* Effect.makeLatch()
const pubsub = yield* PubSub.bounded<number>(10)
const subscriber = yield* PubSub.subscribe(pubsub).pipe(
Effect.flatMap((sub) =>
latch.await.pipe(
Effect.andThen(
Effect.forEach(values, () => PubSub.take(sub))
)
)
),
Effect.scoped,
Effect.forkScoped
)
yield* PubSub.publishAll(pubsub, values)
yield* latch.open
const result = yield* Fiber.join(subscriber)
expect(result).toEqual(values)
})
)typescript
import { it } from "@effect/vitest"
import { Effect, PubSub, Fiber, Array as A } from "effect"
it.effect("concurrent publishers and subscribers", () =>
Effect.gen(function* () {
const values = A.range(0, 9)
const latch = yield* Effect.makeLatch()
const pubsub = yield* PubSub.bounded<number>(10)
const subscriber = yield* PubSub.subscribe(pubsub).pipe(
Effect.flatMap((sub) =>
latch.await.pipe(
Effect.andThen(
Effect.forEach(values, () => PubSub.take(sub))
)
)
),
Effect.scoped,
Effect.forkScoped
)
yield* PubSub.publishAll(pubsub, values)
yield* latch.open
const result = yield* Fiber.join(subscriber)
expect(result).toEqual(values)
})
)Forked Fiber PubSub Subscriptions
已分叉Fiber的PubSub订阅
When testing forked fibers that subscribe to a PubSub, proper yield ordering is critical to avoid losing events.
测试订阅PubSub的已分叉Fiber时,正确的让出执行顺序对于避免丢失事件至关重要。
Correct Order: yieldNow After Subscribe, Then After Each Publish
正确顺序:订阅后调用yieldNow,每次发布后调用yieldNow
typescript
import { it, expect } from "@effect/vitest"
import { Effect, PubSub, Ref, Array as A } from "effect"
it.effect("forked subscriber receives all events", () =>
Effect.gen(function* () {
const pubsub = yield* PubSub.unbounded<string>()
const received = yield* Ref.make<string[]>([])
yield* Effect.scoped(
Effect.gen(function* () {
const sub = yield* PubSub.subscribe(pubsub)
yield* Effect.fork(
Effect.forever(
Effect.gen(function* () {
const msg = yield* PubSub.take(sub)
yield* Ref.update(received, A.append(msg))
})
)
)
yield* Effect.yieldNow() // Let forked fiber start and become ready
yield* PubSub.publish(pubsub, "event-1")
yield* Effect.yieldNow() // Let fiber process event-1
yield* PubSub.publish(pubsub, "event-2")
yield* Effect.yieldNow() // Let fiber process event-2
const events = yield* Ref.get(received)
expect(events).toEqual(["event-1", "event-2"])
})
)
})
)typescript
import { it, expect } from "@effect/vitest"
import { Effect, PubSub, Ref, Array as A } from "effect"
it.effect("forked subscriber receives all events", () =>
Effect.gen(function* () {
const pubsub = yield* PubSub.unbounded<string>()
const received = yield* Ref.make<string[]>([])
yield* Effect.scoped(
Effect.gen(function* () {
const sub = yield* PubSub.subscribe(pubsub)
yield* Effect.fork(
Effect.forever(
Effect.gen(function* () {
const msg = yield* PubSub.take(sub)
yield* Ref.update(received, A.append(msg))
})
)
)
yield* Effect.yieldNow() // 让已分叉的Fiber启动并就绪
yield* PubSub.publish(pubsub, "event-1")
yield* Effect.yieldNow() // 让Fiber处理event-1
yield* PubSub.publish(pubsub, "event-2")
yield* Effect.yieldNow() // 让Fiber处理event-2
const events = yield* Ref.get(received)
expect(events).toEqual(["event-1", "event-2"])
})
)
})
)Why This Order Matters
为何此顺序至关重要
The fiber scheduling model requires explicit yields at specific points:
-
yieldNow after subscribe/fork: The forked fiber needs a chance to execute its first instruction (the) before any events are published. Without this yield, the fiber hasn't started yet.
PubSub.take -
yieldNow after each publish: After publishing, the subscriber fiber needs a turn to process the event. Without yielding, you may publish multiple events before the fiber processes any.
Fiber调度模型需要在特定点显式让出执行权:
-
订阅/分叉后调用yieldNow:已分叉的Fiber需要机会执行其第一条指令(),然后才能发布任何事件。如果没有这个让出操作,Fiber还未启动。
PubSub.take -
每次发布后调用yieldNow:发布后,订阅者Fiber需要机会处理事件。如果不让出执行权,你可能会在Fiber处理任何事件前发布多个事件。
Common Mistake: Events Lost
常见错误:事件丢失
typescript
import { Effect, PubSub, Ref } from "effect"
// BAD - Events are lost because fiber hasn't started
Effect.gen(function* () {
const pubsub = yield* PubSub.unbounded<string>()
const received = yield* Ref.make<string[]>([])
yield* Effect.scoped(
Effect.gen(function* () {
const sub = yield* PubSub.subscribe(pubsub)
yield* Effect.fork(/* subscriber logic */)
// WRONG: Publishing immediately - fiber not ready yet!
yield* PubSub.publish(pubsub, "event-1")
yield* PubSub.publish(pubsub, "event-2")
yield* Effect.yieldNow() // Too late - events already missed
const events = yield* Ref.get(received)
// events may be [] or incomplete!
})
)
})typescript
import { Effect, PubSub, Ref } from "effect"
// 错误示例 - 事件丢失,因为Fiber尚未就绪
Effect.gen(function* () {
const pubsub = yield* PubSub.unbounded<string>()
const received = yield* Ref.make<string[]>([])
yield* Effect.scoped(
Effect.gen(function* () {
const sub = yield* PubSub.subscribe(pubsub)
yield* Effect.fork(/* 订阅者逻辑 */)
// 错误:立即发布事件 - Fiber尚未就绪!
yield* PubSub.publish(pubsub, "event-1")
yield* PubSub.publish(pubsub, "event-2")
yield* Effect.yieldNow() // 太晚了 - 事件已丢失
const events = yield* Ref.get(received)
// events可能为空或不完整!
})
)
})Single yieldNow Is Sufficient
单次yieldNow已足够
Unlike patterns in other runtimes, Effect's is deterministic within the fiber scheduler. A single is sufficient at each synchronization point - no need for multiple yields or retry loops.
sleep(0)yieldNowyieldNowtypescript
import { Effect, PubSub } from "effect"
// GOOD - Single yield at each point
yield* Effect.fork(subscriber)
yield* Effect.yieldNow() // One yield is enough
yield* PubSub.publish(pubsub, "event")
yield* Effect.yieldNow() // One yield is enough
// BAD - Unnecessary multiple yields
yield* Effect.fork(subscriber)
yield* Effect.yieldNow()
yield* Effect.yieldNow() // Redundant
yield* Effect.yieldNow() // Redundant与其他运行时中的模式不同,Effect的在Fiber调度器中是确定性的。每个同步点只需调用一次即可 - 无需多次调用或重试循环。
sleep(0)yieldNowyieldNowtypescript
import { Effect, PubSub } from "effect"
// 正确示例 - 每个点只需一次让出
yield* Effect.fork(subscriber)
yield* Effect.yieldNow() // 一次让出足够
yield* PubSub.publish(pubsub, "event")
yield* Effect.yieldNow() // 一次让出足够
// 错误示例 - 不必要的多次让出
yield* Effect.fork(subscriber)
yield* Effect.yieldNow()
yield* Effect.yieldNow() // 冗余
yield* Effect.yieldNow() // 冗余Testing Observer Pattern with Session
使用Session测试观察者模式
This pattern applies to any forked subscriber, including observer patterns:
typescript
import { it, expect } from "@effect/vitest"
import { Effect } from "effect"
declare const Observer: {
attach: (
session: unknown,
observer: unknown,
args: unknown
) => Effect.Effect<void>
}
declare const Session: {
publish: (session: unknown, event: unknown) => Effect.Effect<void>
}
declare const session: unknown
declare const observer: unknown
declare const args: unknown
declare const event1: unknown
declare const event2: unknown
declare const getResults: () => Effect.Effect<unknown[]>
it.effect("observer receives session events", () =>
Effect.gen(function* () {
yield* Observer.attach(session, observer, args) // Forks subscriber
yield* Effect.yieldNow() // Let fiber start
yield* Session.publish(session, event1)
yield* Effect.yieldNow() // Let event process
yield* Session.publish(session, event2)
yield* Effect.yieldNow() // Let event process
const results = yield* getResults()
expect(results).toHaveLength(2)
})
)此模式适用于任何已分叉的订阅者,包括观察者模式:
typescript
import { it, expect } from "@effect/vitest"
import { Effect } from "effect"
declare const Observer: {
attach: (
session: unknown,
observer: unknown,
args: unknown
) => Effect.Effect<void>
}
declare const Session: {
publish: (session: unknown, event: unknown) => Effect.Effect<void>
}
declare const session: unknown
declare const observer: unknown
declare const args: unknown
declare const event1: unknown
declare const event2: unknown
declare const getResults: () => Effect.Effect<unknown[]>
it.effect("observer receives session events", () =>
Effect.gen(function* () {
yield* Observer.attach(session, observer, args) // 分叉订阅者
yield* Effect.yieldNow() // 让Fiber启动
yield* Session.publish(session, event1)
yield* Effect.yieldNow() // 让事件处理完成
yield* Session.publish(session, event2)
yield* Effect.yieldNow() // 让事件处理完成
const results = yield* getResults()
expect(results).toHaveLength(2)
})
)SubscriptionRef Testing
SubscriptionRef测试
Testing Stream Changes with Latches
使用Latch测试流变更
The latch pattern ensures the stream subscription is ready before mutations:
typescript
import { it } from "@effect/vitest"
import { Effect, Fiber, Number } from "effect"
import { Stream, SubscriptionRef } from "effect/stream"
it.effect("multiple subscribers can receive changes", () =>
Effect.gen(function* () {
const ref = yield* SubscriptionRef.make(0)
const latch1 = yield* Effect.makeLatch()
const latch2 = yield* Effect.makeLatch()
const fiber1 = yield* SubscriptionRef.changes(ref).pipe(
Stream.tap(() => latch1.open),
Stream.take(3),
Stream.runCollect,
Effect.forkScoped
)
yield* latch1.await
yield* SubscriptionRef.update(ref, Number.increment)
const fiber2 = yield* SubscriptionRef.changes(ref).pipe(
Stream.tap(() => latch2.open),
Stream.take(2),
Stream.runCollect,
Effect.forkScoped
)
yield* latch2.await
yield* SubscriptionRef.update(ref, Number.increment)
const result1 = yield* Fiber.join(fiber1)
const result2 = yield* Fiber.join(fiber2)
expect(result1).toEqual([0, 1, 2])
expect(result2).toEqual([1, 2])
})
)Latch模式可确保在变更前流订阅已就绪:
typescript
import { it } from "@effect/vitest"
import { Effect, Fiber, Number } from "effect"
import { Stream, SubscriptionRef } from "effect/stream"
it.effect("multiple subscribers can receive changes", () =>
Effect.gen(function* () {
const ref = yield* SubscriptionRef.make(0)
const latch1 = yield* Effect.makeLatch()
const latch2 = yield* Effect.makeLatch()
const fiber1 = yield* SubscriptionRef.changes(ref).pipe(
Stream.tap(() => latch1.open),
Stream.take(3),
Stream.runCollect,
Effect.forkScoped
)
yield* latch1.await
yield* SubscriptionRef.update(ref, Number.increment)
const fiber2 = yield* SubscriptionRef.changes(ref).pipe(
Stream.tap(() => latch2.open),
Stream.take(2),
Stream.runCollect,
Effect.forkScoped
)
yield* latch2.await
yield* SubscriptionRef.update(ref, Number.increment)
const result1 = yield* Fiber.join(fiber1)
const result2 = yield* Fiber.join(fiber2)
expect(result1).toEqual([0, 1, 2])
expect(result2).toEqual([1, 2])
})
)Testing Subscription Interruption
测试订阅中断
typescript
import { it } from "@effect/vitest"
import { Effect, Exit, Fiber, Number, Cause } from "effect"
import { Pull, Stream, SubscriptionRef } from "effect/stream"
it.effect("subscriptions are interruptible", () =>
Effect.gen(function* () {
const ref = yield* SubscriptionRef.make(0)
const latch = yield* Effect.makeLatch()
const fiber = yield* SubscriptionRef.changes(ref).pipe(
Stream.tap(() => latch.open),
Stream.take(10),
Stream.runCollect,
Effect.forkScoped
)
yield* latch.await
yield* SubscriptionRef.update(ref, Number.increment)
yield* Fiber.interrupt(fiber)
const result = yield* Fiber.await(fiber)
expect(
Exit.isFailure(result) && Pull.isHaltCause(result.cause)
).toBe(true)
})
)typescript
import { it } from "@effect/vitest"
import { Effect, Exit, Fiber, Number, Cause } from "effect"
import { Pull, Stream, SubscriptionRef } from "effect/stream"
it.effect("subscriptions are interruptible", () =>
Effect.gen(function* () {
const ref = yield* SubscriptionRef.make(0)
const latch = yield* Effect.makeLatch()
const fiber = yield* SubscriptionRef.changes(ref).pipe(
Stream.tap(() => latch.open),
Stream.take(10),
Stream.runCollect,
Effect.forkScoped
)
yield* latch.await
yield* SubscriptionRef.update(ref, Number.increment)
yield* Fiber.interrupt(fiber)
const result = yield* Fiber.await(fiber)
expect(
Exit.isFailure(result) && Pull.isHaltCause(result.cause)
).toBe(true)
})
)Stream Testing
Stream测试
Collecting Stream Results
收集Stream结果
typescript
import { it } from "@effect/vitest"
import { Effect } from "effect"
import { Stream } from "effect/stream"
it.effect("should collect stream elements", () =>
Effect.gen(function* () {
const result = yield* Stream.make(1, 2, 3, 4, 5).pipe(
Stream.filter((n) => n % 2 === 0),
Stream.runCollect
)
expect(result).toEqual([2, 4])
})
)typescript
import { it } from "@effect/vitest"
import { Effect } from "effect"
import { Stream } from "effect/stream"
it.effect("should collect stream elements", () =>
Effect.gen(function* () {
const result = yield* Stream.make(1, 2, 3, 4, 5).pipe(
Stream.filter((n) => n % 2 === 0),
Stream.runCollect
)
expect(result).toEqual([2, 4])
})
)Testing Stream Side Effects
测试Stream副作用
typescript
import { it } from "@effect/vitest"
import { Effect, Ref } from "effect"
import { Stream } from "effect/stream"
it.effect("should track side effects", () =>
Effect.gen(function* () {
const log = yield* Ref.make<string[]>([])
yield* Stream.make("a", "b", "c").pipe(
Stream.tap((item) => Ref.update(log, (items) => [...items, item])),
Stream.runDrain
)
const logged = yield* Ref.get(log)
expect(logged).toEqual(["a", "b", "c"])
})
)typescript
import { it } from "@effect/vitest"
import { Effect, Ref } from "effect"
import { Stream } from "effect/stream"
it.effect("should track side effects", () =>
Effect.gen(function* () {
const log = yield* Ref.make<string[]>([])
yield* Stream.make("a", "b", "c").pipe(
Stream.tap((item) => Ref.update(log, (items) => [...items, item])),
Stream.runDrain
)
const logged = yield* Ref.get(log)
expect(logged).toEqual(["a", "b", "c"])
})
)Testing Stream Errors
测试Stream错误
typescript
import { it } from "@effect/vitest"
import { Effect, Exit, Data } from "effect"
import { Stream } from "effect/stream"
class StreamError extends Data.TaggedError("StreamError")<{
readonly message: string
}> {}
it.effect("should handle stream errors", () =>
Effect.gen(function* () {
const result = yield* Stream.make(1, 2, 3).pipe(
Stream.mapEffect((n) =>
n === 2
? Effect.fail(new StreamError({ message: "boom" }))
: Effect.succeed(n)
),
Stream.runCollect,
Effect.exit
)
expect(Exit.isFailure(result)).toBe(true)
})
)typescript
import { it } from "@effect/vitest"
import { Effect, Exit, Data } from "effect"
import { Stream } from "effect/stream"
class StreamError extends Data.TaggedError("StreamError")<{
readonly message: string
}> {}
it.effect("should handle stream errors", () =>
Effect.gen(function* () {
const result = yield* Stream.make(1, 2, 3).pipe(
Stream.mapEffect((n) =>
n === 2
? Effect.fail(new StreamError({ message: "boom" }))
: Effect.succeed(n)
),
Stream.runCollect,
Effect.exit
)
expect(Exit.isFailure(result)).toBe(true)
})
)Testing Stream Finalization
测试Stream终结操作
typescript
import { it } from "@effect/vitest"
import { Effect, Ref } from "effect"
import { Stream } from "effect/stream"
it.effect("should run finalizers", () =>
Effect.gen(function* () {
const finalized = yield* Ref.make(false)
yield* Stream.make(1, 2, 3).pipe(
Stream.ensuring(Ref.set(finalized, true)),
Stream.take(1),
Stream.runDrain
)
expect(yield* Ref.get(finalized)).toBe(true)
})
)typescript
import { it } from "@effect/vitest"
import { Effect, Ref } from "effect"
import { Stream } from "effect/stream"
it.effect("should run finalizers", () =>
Effect.gen(function* () {
const finalized = yield* Ref.make(false)
yield* Stream.make(1, 2, 3).pipe(
Stream.ensuring(Ref.set(finalized, true)),
Stream.take(1),
Stream.runDrain
)
expect(yield* Ref.get(finalized)).toBe(true)
})
)Interruption Testing
中断测试
Testing Fiber Interruption
测试Fiber中断
typescript
import { it } from "@effect/vitest"
import { Effect, Exit, Fiber, Cause } from "effect"
it.effect("should handle interruption", () =>
Effect.gen(function* () {
const fiber = yield* Effect.never.pipe(Effect.fork)
yield* Fiber.interrupt(fiber)
const result = yield* Fiber.await(fiber)
expect(Exit.isInterrupted(result)).toBe(true)
})
)typescript
import { it } from "@effect/vitest"
import { Effect, Exit, Fiber, Cause } from "effect"
it.effect("should handle interruption", () =>
Effect.gen(function* () {
const fiber = yield* Effect.never.pipe(Effect.fork)
yield* Fiber.interrupt(fiber)
const result = yield* Fiber.await(fiber)
expect(Exit.isInterrupted(result)).toBe(true)
})
)Testing Interrupted-Only Cause
测试仅中断原因
typescript
import { it } from "@effect/vitest"
import { Effect, Exit, Fiber, Cause } from "effect"
it.effect("should have interrupted-only cause", () =>
Effect.gen(function* () {
const fiber = yield* Effect.never.pipe(Effect.fork)
yield* Fiber.interrupt(fiber)
const result = yield* Fiber.await(fiber)
expect(
Exit.isFailure(result) && Cause.isInterruptedOnly(result.cause)
).toBe(true)
})
)typescript
import { it } from "@effect/vitest"
import { Effect, Exit, Fiber, Cause } from "effect"
it.effect("should have interrupted-only cause", () =>
Effect.gen(function* () {
const fiber = yield* Effect.never.pipe(Effect.fork)
yield* Fiber.interrupt(fiber)
const result = yield* Fiber.await(fiber)
expect(
Exit.isFailure(result) && Cause.isInterruptedOnly(result.cause)
).toBe(true)
})
)Time-Dependent Concurrency Testing
时间相关并发测试
Use only when testing time-dependent behavior like delays, timeouts, or schedules.
TestClocktypescript
import { it } from "@effect/vitest"
import { Effect, Fiber, TestClock, Duration } from "effect"
it.effect("should handle delayed concurrent operations", () =>
Effect.gen(function* () {
const fiber = yield* Effect.gen(function* () {
yield* Effect.sleep(Duration.seconds(5))
return "done"
}).pipe(Effect.fork)
yield* TestClock.adjust(Duration.seconds(5))
const result = yield* Fiber.join(fiber)
expect(result).toBe("done")
})
)仅在测试时间相关行为(如延迟、超时或调度)时使用。
TestClocktypescript
import { it } from "@effect/vitest"
import { Effect, Fiber, TestClock, Duration } from "effect"
it.effect("should handle delayed concurrent operations", () =>
Effect.gen(function* () {
const fiber = yield* Effect.gen(function* () {
yield* Effect.sleep(Duration.seconds(5))
return "done"
}).pipe(Effect.fork)
yield* TestClock.adjust(Duration.seconds(5))
const result = yield* Fiber.join(fiber)
expect(result).toBe("done")
})
)Anti-Patterns
反模式
DON'T use TestClock for non-time-dependent code
不要对非时间相关代码使用TestClock
typescript
import { Effect, TestClock, Duration } from "effect"
// BAD - Using TestClock when not needed
Effect.gen(function* () {
const fiber = yield* someEffect.pipe(Effect.fork)
yield* TestClock.adjust(Duration.millis(100))
yield* Fiber.join(fiber)
})
// GOOD - Use yieldNow for simple yielding
Effect.gen(function* () {
const fiber = yield* someEffect.pipe(Effect.fork)
yield* Effect.yieldNow()
yield* Fiber.join(fiber)
})typescript
import { Effect, TestClock, Duration } from "effect"
// 错误示例 - 不必要时使用TestClock
Effect.gen(function* () {
const fiber = yield* someEffect.pipe(Effect.fork)
yield* TestClock.adjust(Duration.millis(100))
yield* Fiber.join(fiber)
})
// 正确示例 - 对简单让出使用yieldNow
Effect.gen(function* () {
const fiber = yield* someEffect.pipe(Effect.fork)
yield* Effect.yieldNow()
yield* Fiber.join(fiber)
})DON'T poll in a loop without yieldNow
不要在无yieldNow的循环中轮询
typescript
import { Effect, Fiber } from "effect"
declare const fiber: Fiber.RuntimeFiber<void>
// BAD - Busy loop
while (fiber.unsafePoll() === null) {
// Spins forever!
}
// GOOD - Yield between polls or use Fiber.await
Effect.gen(function* () {
while (fiber.unsafePoll() === null) {
yield* Effect.yieldNow()
}
})
// BETTER - Just await the fiber
Effect.gen(function* () {
yield* Fiber.await(fiber)
})typescript
import { Effect, Fiber } from "effect"
declare const fiber: Fiber.RuntimeFiber<void>
// 错误示例 - 忙循环
while (fiber.unsafePoll() === null) {
// 永远自旋!
}
// 正确示例 - 在轮询间让出或使用Fiber.await
Effect.gen(function* () {
while (fiber.unsafePoll() === null) {
yield* Effect.yieldNow()
}
})
// 更优示例 - 直接等待Fiber
Effect.gen(function* () {
yield* Fiber.await(fiber)
})DON'T forget Effect.scoped for PubSub subscriptions
不要忘记为PubSub订阅使用Effect.scoped
typescript
import { Effect, PubSub } from "effect"
declare const pubsub: PubSub.PubSub<string>
// BAD - Subscription leaks
Effect.gen(function* () {
const sub = yield* PubSub.subscribe(pubsub)
// Sub is never cleaned up!
})
// GOOD - Scoped subscription
Effect.gen(function* () {
yield* Effect.scoped(
Effect.gen(function* () {
const sub = yield* PubSub.subscribe(pubsub)
const events = yield* PubSub.takeAll(sub)
// Sub cleaned up when scope closes
})
)
})typescript
import { Effect, PubSub } from "effect"
declare const pubsub: PubSub.PubSub<string>
// 错误示例 - 订阅泄漏
Effect.gen(function* () {
const sub = yield* PubSub.subscribe(pubsub)
// 订阅永远不会被清理!
})
// 正确示例 - 作用域内的订阅
Effect.gen(function* () {
yield* Effect.scoped(
Effect.gen(function* () {
const sub = yield* PubSub.subscribe(pubsub)
const events = yield* PubSub.takeAll(sub)
// 作用域关闭时订阅会被清理
})
)
})DON'T start subscriptions after mutations
不要在变更后才启动订阅
typescript
import { Effect } from "effect"
import { Stream, SubscriptionRef } from "effect/stream"
declare const ref: SubscriptionRef.SubscriptionRef<number>
// BAD - May miss events
Effect.gen(function* () {
yield* SubscriptionRef.update(ref, (n) => n + 1)
const fiber = yield* SubscriptionRef.changes(ref).pipe(
Stream.take(1),
Stream.runCollect,
Effect.fork
)
// Subscription started after mutation - may miss it!
})
// GOOD - Use latch to ensure subscription is ready
Effect.gen(function* () {
const latch = yield* Effect.makeLatch()
const fiber = yield* SubscriptionRef.changes(ref).pipe(
Stream.tap(() => latch.open),
Stream.take(2),
Stream.runCollect,
Effect.forkScoped
)
yield* latch.await
yield* SubscriptionRef.update(ref, (n) => n + 1)
const result = yield* Fiber.join(fiber)
})typescript
import { Effect } from "effect"
import { Stream, SubscriptionRef } from "effect/stream"
declare const ref: SubscriptionRef.SubscriptionRef<number>
// 错误示例 - 可能丢失事件
Effect.gen(function* () {
yield* SubscriptionRef.update(ref, (n) => n + 1)
const fiber = yield* SubscriptionRef.changes(ref).pipe(
Stream.take(1),
Stream.runCollect,
Effect.fork
)
// 订阅在变更后启动 - 可能会错过该事件!
})
// 正确示例 - 使用Latch确保订阅就绪
Effect.gen(function* () {
const latch = yield* Effect.makeLatch()
const fiber = yield* SubscriptionRef.changes(ref).pipe(
Stream.tap(() => latch.open),
Stream.take(2),
Stream.runCollect,
Effect.forkScoped
)
yield* latch.await
yield* SubscriptionRef.update(ref, (n) => n + 1)
const result = yield* Fiber.join(fiber)
})Quality Checklist
质量检查清单
- Using correct coordination primitive for the use case
- wraps PubSub subscriptions
Effect.scoped - Latches ensure stream subscriptions are ready before mutations
- after fork to let subscriber fiber start
Effect.yieldNow - after each publish to let fiber process event
Effect.yieldNow - used instead of TestClock for non-time-dependent code
Effect.yieldNow - Fiber interruption tested with or
Exit.isInterruptedCause.isInterruptedOnly - Stream finalizers verified with
Stream.ensuring - No busy polling without yields
- Test is deterministic (no race conditions)
- 根据使用场景选择了正确的协调原语
- 包裹了PubSub订阅
Effect.scoped - 使用Latch确保流订阅在变更前已就绪
- 分叉后调用让订阅者Fiber启动
Effect.yieldNow - 每次发布后调用让Fiber处理事件
Effect.yieldNow - 对非时间相关代码使用而非TestClock
Effect.yieldNow - 使用或
Exit.isInterrupted测试Fiber中断Cause.isInterruptedOnly - 使用验证流终结操作
Stream.ensuring - 没有无让出操作的忙轮询
- 测试是确定性的(无竞态条件)