effect-ts-concurrency
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseEffect-TS Concurrency
Effect-TS 并发
Overview
概述
Effect-TS provides lightweight fibers for high-performance concurrency. The core principle is explicit control: always bound parallelism to prevent resource exhaustion.
Effect-TS 提供轻量级 fibers 以实现高性能并发。核心原则是显式控制:始终限制并行度以避免资源耗尽。
When to Use
适用场景
- Processing large arrays of effects (e.g., ,
Effect.all)Effect.forEach - Rate limiting external API calls or database connections
- Coordinating work between background processes (fibers)
- Signaling completion or state changes across different parts of the app
When NOT to use:
- Simple sequential operations
- When standard is sufficient (though Effect is usually preferred for consistency)
Promise.all
- 处理大量 effect 数组(例如 、
Effect.all)Effect.forEach - 对外部API调用或数据库连接进行限流
- 协调后台进程(fibers)之间的工作
- 在应用不同模块之间传递完成信号或状态变更信号
不适用场景:
- 简单的顺序操作
- 标准已足够满足需求的场景(不过为了代码一致性通常更推荐使用Effect)
Promise.all
Core Pattern: Bounded Parallelism
核心模式:有界并行
Unbounded parallelism is the most common source of "Too many open files" or "Connection timeout" errors.
| Pattern | Implementation | Result |
|---|---|---|
| BAD | | Unbounded - crashes on large inputs |
| GOOD | | Bounded - safe and predictable |
无界并行是引发“打开文件过多”或“连接超时”错误的最常见原因。
| 模式 | 实现方式 | 结果 |
|---|---|---|
| 错误示例 | | 无界并行 - 输入数据量过大时会崩溃 |
| 正确示例 | | 有界并行 - 安全且行为可预测 |
Quick Reference
快速参考
| Tool | Purpose | Key Method |
|---|---|---|
| Fiber | Background execution | |
| Semaphore | Bounded concurrency / Rate limiting | |
| Deferred | One-shot signaling / Promises | |
| concurrency | Option for | `{ concurrency: number |
| 工具 | 用途 | 核心方法 |
|---|---|---|
| Fiber | 后台执行 | |
| Semaphore | 有界并发/限流 | |
| Deferred | 单次信号传递/类Promise功能 | |
| concurrency | | `{ concurrency: number |
Implementation
实现示例
1. Bounded Parallelism (Critical)
1. 有界并行(关键用法)
Always specify concurrency when processing collections.
typescript
import { Effect } from 'effect';
// Process 1000 items, max 10 concurrent
const results = yield* Effect.all(
items.map(processItem),
{ concurrency: 10 }
);处理集合数据时始终指定并行度。
typescript
import { Effect } from 'effect';
// 处理1000条数据,最多同时执行10个并发任务
const results = yield* Effect.all(
items.map(processItem),
{ concurrency: 10 }
);2. Semaphore for Rate Limiting
2. 用Semaphore实现限流
Use a Semaphore when multiple independent operations must share a global limit.
typescript
import { Effect } from 'effect';
const program = Effect.gen(function* () {
const semaphore = yield* Effect.makeSemaphore(5); // Max 5 concurrent
yield* Effect.all(
requests.map((req) =>
semaphore.withPermits(1)(handleRequest(req))
),
{ concurrency: 'unbounded' } // Semaphore controls actual concurrency
);
});当多个独立操作需要共享全局并发限制时使用Semaphore。
typescript
import { Effect } from 'effect';
const program = Effect.gen(function* () {
const semaphore = yield* Effect.makeSemaphore(5); // 最多5个并发
yield* Effect.all(
requests.map((req) =>
semaphore.withPermits(1)(handleRequest(req))
),
{ concurrency: 'unbounded' } // 由Semaphore控制实际并发数
);
});3. Deferred for Signaling
3. 用Deferred实现信号传递
Use Deferred to wait for a specific event or value from another fiber.
typescript
import { Deferred, Effect, Fiber } from 'effect';
const program = Effect.gen(function* () {
const signal = yield* Deferred.make<void>();
const worker = yield* Effect.fork(
Effect.gen(function* () {
yield* Deferred.await(signal); // Wait for signal
yield* doWork();
})
);
yield* setup();
yield* Deferred.succeed(signal, undefined); // Trigger worker
yield* Fiber.join(worker);
});使用Deferred等待来自其他fiber的特定事件或值。
typescript
import { Deferred, Effect, Fiber } from 'effect';
const program = Effect.gen(function* () {
const signal = yield* Deferred.make<void>();
const worker = yield* Effect.fork(
Effect.gen(function* () {
yield* Deferred.await(signal); // 等待信号
yield* doWork();
})
);
yield* setup();
yield* Deferred.succeed(signal, undefined); // 触发worker执行
yield* Fiber.join(worker);
});Common Mistakes
常见错误
- Unbounded parallelism: Forgetting in
{ concurrency: n }.Effect.all - Leaking Fibers: Forking fibers without joining or interrupting them (use for safety).
Effect.scoped - Deadlocks: Circular dependencies between semaphores or deferreds.
- 无界并行: 在中忘记添加
Effect.all配置{ concurrency: n } - Fiber泄漏: 派生fiber后未进行join或中断操作(可使用保证安全)
Effect.scoped - 死锁: 信号量或Deferred之间存在循环依赖
Red Flags - STOP and Start Over
红色预警 - 请立即停止并修正
- Using on a large array without
Effect.all.{ concurrency: n } - Using inside an Effect-TS codebase.
Promise.all - Manual for rate limiting instead of
setTimeoutorEffect.makeRateLimiter.Semaphore
- 对大型数组使用时未配置
Effect.all{ concurrency: n } - 在Effect-TS代码库中使用
Promise.all - 手动使用实现限流,而非使用
setTimeout或SemaphoreEffect.makeRateLimiter
Rationalization Table
合理化对照表
| Excuse | Reality |
|---|---|
| "It's only 100 items" | 100 items today, 10,000 tomorrow. Bound it now. |
| "The API is fast" | Network latency and server load are unpredictable. |
| "I'll add concurrency later" | Unbounded parallelism is a ticking time bomb. |
REQUIRED BACKGROUND: See effect-ts-anti-patterns for more on unbounded parallelism.
| 借口 | 现实 |
|---|---|
| “反正只有100条数据” | 今天只有100条,明天可能就有10000条,现在就加上并行限制。 |
| “API响应很快” | 网络延迟和服务端负载都是不可预测的。 |
| “我后面再加并行限制” | 无界并行就是一颗定时炸弹。 |
必备背景知识: 查看effect-ts-anti-patterns文档了解更多无界并行相关的反模式。