effect-queues-background

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Queues, PubSub & Background

队列、PubSub与后台任务

When to use

适用场景

  • Decoupling producers/consumers with backpressure
  • Broadcasting events to multiple subscribers
  • Running background loops with graceful shutdown
  • 借助背压机制解耦生产者与消费者
  • 向多个订阅者广播事件
  • 运行带优雅关闭的后台循环

Queue (bounded)

有界队列

ts
import { Queue } from "effect"
const q = yield* Queue.bounded<string>(32)
yield* Queue.offer(q, "job")
const job = yield* Queue.take(q)
ts
import { Queue } from "effect"
const q = yield* Queue.bounded<string>(32)
yield* Queue.offer(q, "job")
const job = yield* Queue.take(q)

PubSub (broadcast)

发布订阅(广播模式)

ts
import { PubSub } from "effect"
const ps = yield* PubSub.bounded<string>(32)
yield* PubSub.publish(ps, "evt")
ts
import { PubSub } from "effect"
const ps = yield* PubSub.bounded<string>(32)
yield* PubSub.publish(ps, "evt")

Background Fiber

后台纤程

ts
const fiber = yield* Effect.fork(loop)
yield* Fiber.interrupt(fiber)
ts
const fiber = yield* Effect.fork(loop)
yield* Fiber.interrupt(fiber)

Guidance

实践指南

  • Prefer bounded queues to apply natural backpressure
  • Use multiple workers by forking consumers
  • Ensure background fibers are interrupted during shutdown
  • 优先使用有界队列以实现天然的背压机制
  • 通过创建多个消费者进程来使用多worker模式
  • 确保后台纤程在shutdown时被中断

Pitfalls

注意事项

  • Unbounded queues lead to memory growth
  • Silent background failures → add logging/metrics
  • 无界队列会导致内存增长
  • 静默的后台任务故障→添加日志/监控指标

Cross-links

相关链接

  • Concurrency for pools and interruption
  • Time/Logging for observability of background tasks
  • 用于线程池与中断处理的并发编程
  • 用于后台任务可观测性的时间/日志模块

Local Source Reference

本地源码参考

CRITICAL: Search local Effect source before implementing
The full Effect source code is available at
docs/effect-source/
. Always search the actual implementation before writing Effect code.
重要提示:实现前请先查阅本地Effect源码
完整的Effect源码位于
docs/effect-source/
。编写Effect代码前,请务必先查看实际实现。

Key Source Files

核心源码文件

  • Queue:
    docs/effect-source/effect/src/Queue.ts
  • PubSub:
    docs/effect-source/effect/src/PubSub.ts
  • Fiber:
    docs/effect-source/effect/src/Fiber.ts
  • Queue:
    docs/effect-source/effect/src/Queue.ts
  • PubSub:
    docs/effect-source/effect/src/PubSub.ts
  • Fiber:
    docs/effect-source/effect/src/Fiber.ts

Example Searches

示例搜索命令

bash
undefined
bash
undefined

Find Queue patterns

查找队列相关模式

grep -F "bounded" docs/effect-source/effect/src/Queue.ts grep -F "offer" docs/effect-source/effect/src/Queue.ts grep -F "take" docs/effect-source/effect/src/Queue.ts
grep -F "bounded" docs/effect-source/effect/src/Queue.ts grep -F "offer" docs/effect-source/effect/src/Queue.ts grep -F "take" docs/effect-source/effect/src/Queue.ts

Study PubSub operations

研究PubSub操作

grep -F "publish" docs/effect-source/effect/src/PubSub.ts grep -F "subscribe" docs/effect-source/effect/src/PubSub.ts
grep -F "publish" docs/effect-source/effect/src/PubSub.ts grep -F "subscribe" docs/effect-source/effect/src/PubSub.ts

Find background fiber patterns

查找后台纤程相关模式

grep -F "fork" docs/effect-source/effect/src/Fiber.ts grep -F "interrupt" docs/effect-source/effect/src/Fiber.ts
grep -F "fork" docs/effect-source/effect/src/Fiber.ts grep -F "interrupt" docs/effect-source/effect/src/Fiber.ts

Look at Queue test examples

查看队列测试示例

grep -F "Queue." docs/effect-source/effect/test/Queue.test.ts
undefined
grep -F "Queue." docs/effect-source/effect/test/Queue.test.ts
undefined

Workflow

工作流程

  1. Identify the Queue or PubSub API you need
  2. Search
    docs/effect-source/effect/src/Queue.ts
    or
    PubSub.ts
    for the implementation
  3. Study the types and backpressure patterns
  4. Look at test files for usage examples
  5. Write your code based on real implementations
Real source code > documentation > assumptions
  1. 确定所需的Queue或PubSub API
  2. docs/effect-source/effect/src/Queue.ts
    PubSub.ts
    中查找对应实现
  3. 研究类型定义与背压模式
  4. 查看测试文件获取使用示例
  5. 基于实际实现编写代码
实际源码 > 文档 > 主观假设

References

参考资料