effect-ts-concurrency

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Effect-TS Concurrency

Effect-TS 并发

Overview

概述

Effect-TS provides lightweight fibers for high-performance concurrency. The core principle is explicit control: always bound parallelism to prevent resource exhaustion.
Effect-TS 提供轻量级 fibers 以实现高性能并发。核心原则是显式控制:始终限制并行度以避免资源耗尽。

When to Use

适用场景

  • Processing large arrays of effects (e.g.,
    Effect.all
    ,
    Effect.forEach
    )
  • Rate limiting external API calls or database connections
  • Coordinating work between background processes (fibers)
  • Signaling completion or state changes across different parts of the app
When NOT to use:
  • Simple sequential operations
  • When standard
    Promise.all
    is sufficient (though Effect is usually preferred for consistency)
  • 处理大量 effect 数组(例如
    Effect.all
    Effect.forEach
  • 对外部API调用或数据库连接进行限流
  • 协调后台进程(fibers)之间的工作
  • 在应用不同模块之间传递完成信号或状态变更信号
不适用场景:
  • 简单的顺序操作
  • 标准
    Promise.all
    已足够满足需求的场景(不过为了代码一致性通常更推荐使用Effect)

Core Pattern: Bounded Parallelism

核心模式:有界并行

Unbounded parallelism is the most common source of "Too many open files" or "Connection timeout" errors.
PatternImplementationResult
BAD
Effect.all(effects)
Unbounded - crashes on large inputs
GOOD
Effect.all(effects, { concurrency: 10 })
Bounded - safe and predictable
无界并行是引发“打开文件过多”或“连接超时”错误的最常见原因。
模式实现方式结果
错误示例
Effect.all(effects)
无界并行 - 输入数据量过大时会崩溃
正确示例
Effect.all(effects, { concurrency: 10 })
有界并行 - 安全且行为可预测

Quick Reference

快速参考

ToolPurposeKey Method
FiberBackground execution
Effect.fork
/
Fiber.join
SemaphoreBounded concurrency / Rate limiting
semaphore.withPermits(n)
DeferredOne-shot signaling / Promises
Deferred.await
/
Deferred.succeed
concurrencyOption for
all
,
forEach
,
mapEffect
`{ concurrency: number
工具用途核心方法
Fiber后台执行
Effect.fork
/
Fiber.join
Semaphore有界并发/限流
semaphore.withPermits(n)
Deferred单次信号传递/类Promise功能
Deferred.await
/
Deferred.succeed
concurrency
all
forEach
mapEffect
的配置项
`{ concurrency: number

Implementation

实现示例

1. Bounded Parallelism (Critical)

1. 有界并行(关键用法)

Always specify concurrency when processing collections.
typescript
import { Effect } from 'effect';

// Process 1000 items, max 10 concurrent
const results = yield* Effect.all(
  items.map(processItem),
  { concurrency: 10 }
);
处理集合数据时始终指定并行度。
typescript
import { Effect } from 'effect';

// 处理1000条数据,最多同时执行10个并发任务
const results = yield* Effect.all(
  items.map(processItem),
  { concurrency: 10 }
);

2. Semaphore for Rate Limiting

2. 用Semaphore实现限流

Use a Semaphore when multiple independent operations must share a global limit.
typescript
import { Effect } from 'effect';

const program = Effect.gen(function* () {
  const semaphore = yield* Effect.makeSemaphore(5); // Max 5 concurrent
  
  yield* Effect.all(
    requests.map((req) => 
      semaphore.withPermits(1)(handleRequest(req))
    ),
    { concurrency: 'unbounded' } // Semaphore controls actual concurrency
  );
});
当多个独立操作需要共享全局并发限制时使用Semaphore。
typescript
import { Effect } from 'effect';

const program = Effect.gen(function* () {
  const semaphore = yield* Effect.makeSemaphore(5); // 最多5个并发
  
  yield* Effect.all(
    requests.map((req) => 
      semaphore.withPermits(1)(handleRequest(req))
    ),
    { concurrency: 'unbounded' } // 由Semaphore控制实际并发数
  );
});

3. Deferred for Signaling

3. 用Deferred实现信号传递

Use Deferred to wait for a specific event or value from another fiber.
typescript
import { Deferred, Effect, Fiber } from 'effect';

const program = Effect.gen(function* () {
  const signal = yield* Deferred.make<void>();
  
  const worker = yield* Effect.fork(
    Effect.gen(function* () {
      yield* Deferred.await(signal); // Wait for signal
      yield* doWork();
    })
  );
  
  yield* setup();
  yield* Deferred.succeed(signal, undefined); // Trigger worker
  yield* Fiber.join(worker);
});
使用Deferred等待来自其他fiber的特定事件或值。
typescript
import { Deferred, Effect, Fiber } from 'effect';

const program = Effect.gen(function* () {
  const signal = yield* Deferred.make<void>();
  
  const worker = yield* Effect.fork(
    Effect.gen(function* () {
      yield* Deferred.await(signal); // 等待信号
      yield* doWork();
    })
  );
  
  yield* setup();
  yield* Deferred.succeed(signal, undefined); // 触发worker执行
  yield* Fiber.join(worker);
});

Common Mistakes

常见错误

  • Unbounded parallelism: Forgetting
    { concurrency: n }
    in
    Effect.all
    .
  • Leaking Fibers: Forking fibers without joining or interrupting them (use
    Effect.scoped
    for safety).
  • Deadlocks: Circular dependencies between semaphores or deferreds.
  • 无界并行:
    Effect.all
    中忘记添加
    { concurrency: n }
    配置
  • Fiber泄漏: 派生fiber后未进行join或中断操作(可使用
    Effect.scoped
    保证安全)
  • 死锁: 信号量或Deferred之间存在循环依赖

Red Flags - STOP and Start Over

红色预警 - 请立即停止并修正

  • Using
    Effect.all
    on a large array without
    { concurrency: n }
    .
  • Using
    Promise.all
    inside an Effect-TS codebase.
  • Manual
    setTimeout
    for rate limiting instead of
    Effect.makeRateLimiter
    or
    Semaphore
    .
  • 对大型数组使用
    Effect.all
    时未配置
    { concurrency: n }
  • 在Effect-TS代码库中使用
    Promise.all
  • 手动使用
    setTimeout
    实现限流,而非使用
    Effect.makeRateLimiter
    或Semaphore

Rationalization Table

合理化对照表

ExcuseReality
"It's only 100 items"100 items today, 10,000 tomorrow. Bound it now.
"The API is fast"Network latency and server load are unpredictable.
"I'll add concurrency later"Unbounded parallelism is a ticking time bomb.
REQUIRED BACKGROUND: See effect-ts-anti-patterns for more on unbounded parallelism.
借口现实
“反正只有100条数据”今天只有100条,明天可能就有10000条,现在就加上并行限制。
“API响应很快”网络延迟和服务端负载都是不可预测的。
“我后面再加并行限制”无界并行就是一颗定时炸弹。
必备背景知识: 查看effect-ts-anti-patterns文档了解更多无界并行相关的反模式。