concurrency-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Concurrency Patterns

并发模式

Overview

概述

Implement safe concurrent code using proper synchronization primitives and patterns for parallel execution.
使用合适的同步原语和模式实现安全的并发代码,以支持并行执行。

When to Use

适用场景

  • Multi-threaded applications
  • Parallel data processing
  • Race condition prevention
  • Resource pooling
  • Task coordination
  • High-performance systems
  • Async operations
  • Worker pools
  • 多线程应用
  • 并行数据处理
  • 竞争条件预防
  • 资源池化
  • 任务协调
  • 高性能系统
  • 异步操作
  • 工作线程池

Implementation Examples

实现示例

1. Promise Pool (TypeScript)

1. Promise Pool (TypeScript)

typescript
class PromisePool {
  private queue: Array<() => Promise<any>> = [];
  private active = 0;

  constructor(private concurrency: number) {}

  async add<T>(fn: () => Promise<T>): Promise<T> {
    while (this.active >= this.concurrency) {
      await this.waitForSlot();
    }

    this.active++;

    try {
      return await fn();
    } finally {
      this.active--;
    }
  }

  private async waitForSlot(): Promise<void> {
    return new Promise(resolve => {
      const checkSlot = () => {
        if (this.active < this.concurrency) {
          resolve();
        } else {
          setTimeout(checkSlot, 10);
        }
      };
      checkSlot();
    });
  }

  async map<T, R>(
    items: T[],
    fn: (item: T) => Promise<R>
  ): Promise<R[]> {
    return Promise.all(
      items.map(item => this.add(() => fn(item)))
    );
  }
}

// Usage
const pool = new PromisePool(5);

const urls = Array.from({ length: 100 }, (_, i) =>
  `https://api.example.com/item/${i}`
);

const results = await pool.map(urls, async (url) => {
  const response = await fetch(url);
  return response.json();
});
typescript
class PromisePool {
  private queue: Array<() => Promise<any>> = [];
  private active = 0;

  constructor(private concurrency: number) {}

  async add<T>(fn: () => Promise<T>): Promise<T> {
    while (this.active >= this.concurrency) {
      await this.waitForSlot();
    }

    this.active++;

    try {
      return await fn();
    } finally {
      this.active--;
    }
  }

  private async waitForSlot(): Promise<void> {
    return new Promise(resolve => {
      const checkSlot = () => {
        if (this.active < this.concurrency) {
          resolve();
        } else {
          setTimeout(checkSlot, 10);
        }
      };
      checkSlot();
    });
  }

  async map<T, R>(
    items: T[],
    fn: (item: T) => Promise<R>
  ): Promise<R[]> {
    return Promise.all(
      items.map(item => this.add(() => fn(item)))
    );
  }
}

// Usage
const pool = new PromisePool(5);

const urls = Array.from({ length: 100 }, (_, i) =>
  `https://api.example.com/item/${i}`
);

const results = await pool.map(urls, async (url) => {
  const response = await fetch(url);
  return response.json();
});

2. Mutex and Semaphore (TypeScript)

2. Mutex and Semaphore (TypeScript)

typescript
class Mutex {
  private locked = false;
  private queue: Array<() => void> = [];

  async acquire(): Promise<void> {
    if (!this.locked) {
      this.locked = true;
      return;
    }

    return new Promise(resolve => {
      this.queue.push(resolve);
    });
  }

  release(): void {
    if (this.queue.length > 0) {
      const resolve = this.queue.shift()!;
      resolve();
    } else {
      this.locked = false;
    }
  }

  async runExclusive<T>(fn: () => Promise<T>): Promise<T> {
    await this.acquire();
    try {
      return await fn();
    } finally {
      this.release();
    }
  }
}

class Semaphore {
  private available: number;
  private queue: Array<() => void> = [];

  constructor(private max: number) {
    this.available = max;
  }

  async acquire(): Promise<void> {
    if (this.available > 0) {
      this.available--;
      return;
    }

    return new Promise(resolve => {
      this.queue.push(resolve);
    });
  }

  release(): void {
    if (this.queue.length > 0) {
      const resolve = this.queue.shift()!;
      resolve();
    } else {
      this.available++;
    }
  }

  async runExclusive<T>(fn: () => Promise<T>): Promise<T> {
    await this.acquire();
    try {
      return await fn();
    } finally {
      this.release();
    }
  }
}

// Usage
const mutex = new Mutex();
let counter = 0;

async function incrementCounter() {
  await mutex.runExclusive(async () => {
    const current = counter;
    await new Promise(resolve => setTimeout(resolve, 10));
    counter = current + 1;
  });
}

// Database connection pool with semaphore
const dbSemaphore = new Semaphore(10); // Max 10 concurrent connections

async function queryDatabase(query: string) {
  return dbSemaphore.runExclusive(async () => {
    // Execute query
    return executeQuery(query);
  });
}

async function executeQuery(query: string) {
  // Query logic
}
typescript
class Mutex {
  private locked = false;
  private queue: Array<() => void> = [];

  async acquire(): Promise<void> {
    if (!this.locked) {
      this.locked = true;
      return;
    }

    return new Promise(resolve => {
      this.queue.push(resolve);
    });
  }

  release(): void {
    if (this.queue.length > 0) {
      const resolve = this.queue.shift()!;
      resolve();
    } else {
      this.locked = false;
    }
  }

  async runExclusive<T>(fn: () => Promise<T>): Promise<T> {
    await this.acquire();
    try {
      return await fn();
    } finally {
      this.release();
    }
  }
}

class Semaphore {
  private available: number;
  private queue: Array<() => void> = [];

  constructor(private max: number) {
    this.available = max;
  }

  async acquire(): Promise<void> {
    if (this.available > 0) {
      this.available--;
      return;
    }

    return new Promise(resolve => {
      this.queue.push(resolve);
    });
  }

  release(): void {
    if (this.queue.length > 0) {
      const resolve = this.queue.shift()!;
      resolve();
    } else {
      this.available++;
    }
  }

  async runExclusive<T>(fn: () => Promise<T>): Promise<T> {
    await this.acquire();
    try {
      return await fn();
    } finally {
      this.release();
    }
  }
}

// Usage
const mutex = new Mutex();
let counter = 0;

async function incrementCounter() {
  await mutex.runExclusive(async () => {
    const current = counter;
    await new Promise(resolve => setTimeout(resolve, 10));
    counter = current + 1;
  });
}

// Database connection pool with semaphore
const dbSemaphore = new Semaphore(10); // Max 10 concurrent connections

async function queryDatabase(query: string) {
  return dbSemaphore.runExclusive(async () => {
    // Execute query
    return executeQuery(query);
  });
}

async function executeQuery(query: string) {
  // Query logic
}

3. Worker Pool (Node.js)

3. Worker Pool (Node.js)

typescript
import { Worker } from 'worker_threads';

interface Task<T> {
  id: string;
  data: any;
  resolve: (value: T) => void;
  reject: (error: Error) => void;
}

class WorkerPool {
  private workers: Worker[] = [];
  private availableWorkers: Worker[] = [];
  private taskQueue: Task<any>[] = [];

  constructor(
    private workerScript: string,
    private poolSize: number
  ) {
    this.initializeWorkers();
  }

  private initializeWorkers(): void {
    for (let i = 0; i < this.poolSize; i++) {
      const worker = new Worker(this.workerScript);

      worker.on('message', (result) => {
        this.handleWorkerMessage(worker, result);
      });

      worker.on('error', (error) => {
        console.error('Worker error:', error);
      });

      this.workers.push(worker);
      this.availableWorkers.push(worker);
    }
  }

  async execute<T>(data: any): Promise<T> {
    return new Promise((resolve, reject) => {
      const task: Task<T> = {
        id: Math.random().toString(36),
        data,
        resolve,
        reject
      };

      this.taskQueue.push(task);
      this.processQueue();
    });
  }

  private processQueue(): void {
    while (this.taskQueue.length > 0 && this.availableWorkers.length > 0) {
      const task = this.taskQueue.shift()!;
      const worker = this.availableWorkers.shift()!;

      worker.postMessage({
        taskId: task.id,
        data: task.data
      });

      (worker as any).currentTask = task;
    }
  }

  private handleWorkerMessage(worker: Worker, result: any): void {
    const task = (worker as any).currentTask as Task<any>;

    if (!task) return;

    if (result.error) {
      task.reject(new Error(result.error));
    } else {
      task.resolve(result.data);
    }

    delete (worker as any).currentTask;
    this.availableWorkers.push(worker);
    this.processQueue();
  }

  async terminate(): Promise<void> {
    await Promise.all(
      this.workers.map(worker => worker.terminate())
    );
  }
}

// worker.js
// const { parentPort } = require('worker_threads');
//
// parentPort.on('message', async ({ taskId, data }) => {
//   try {
//     const result = await processData(data);
//     parentPort.postMessage({ taskId, data: result });
//   } catch (error) {
//     parentPort.postMessage({ taskId, error: error.message });
//   }
// });
typescript
import { Worker } from 'worker_threads';

interface Task<T> {
  id: string;
  data: any;
  resolve: (value: T) => void;
  reject: (error: Error) => void;
}

class WorkerPool {
  private workers: Worker[] = [];
  private availableWorkers: Worker[] = [];
  private taskQueue: Task<any>[] = [];

  constructor(
    private workerScript: string,
    private poolSize: number
  ) {
    this.initializeWorkers();
  }

  private initializeWorkers(): void {
    for (let i = 0; i < this.poolSize; i++) {
      const worker = new Worker(this.workerScript);

      worker.on('message', (result) => {
        this.handleWorkerMessage(worker, result);
      });

      worker.on('error', (error) => {
        console.error('Worker error:', error);
      });

      this.workers.push(worker);
      this.availableWorkers.push(worker);
    }
  }

  async execute<T>(data: any): Promise<T> {
    return new Promise((resolve, reject) => {
      const task: Task<T> = {
        id: Math.random().toString(36),
        data,
        resolve,
        reject
      };

      this.taskQueue.push(task);
      this.processQueue();
    });
  }

  private processQueue(): void {
    while (this.taskQueue.length > 0 && this.availableWorkers.length > 0) {
      const task = this.taskQueue.shift()!;
      const worker = this.availableWorkers.shift()!;

      worker.postMessage({
        taskId: task.id,
        data: task.data
      });

      (worker as any).currentTask = task;
    }
  }

  private handleWorkerMessage(worker: Worker, result: any): void {
    const task = (worker as any).currentTask as Task<any>;

    if (!task) return;

    if (result.error) {
      task.reject(new Error(result.error));
    } else {
      task.resolve(result.data);
    }

    delete (worker as any).currentTask;
    this.availableWorkers.push(worker);
    this.processQueue();
  }

  async terminate(): Promise<void> {
    await Promise.all(
      this.workers.map(worker => worker.terminate())
    );
  }
}

// worker.js
// const { parentPort } = require('worker_threads');
//
// parentPort.on('message', async ({ taskId, data }) => {
//   try {
//     const result = await processData(data);
//     parentPort.postMessage({ taskId, data: result });
//   } catch (error) {
//     parentPort.postMessage({ taskId, error: error.message });
//   }
// });

4. Python Threading Patterns

4. Python Threading Patterns

python
import threading
from queue import Queue
from typing import Callable, List, TypeVar, Generic
import time

T = TypeVar('T')
R = TypeVar('R')

class ThreadPool(Generic[T, R]):
    def __init__(self, num_threads: int):
        self.num_threads = num_threads
        self.tasks: Queue = Queue()
        self.results: List[R] = []
        self.lock = threading.Lock()
        self.workers: List[threading.Thread] = []

    def map(self, func: Callable[[T], R], items: List[T]) -> List[R]:
        """Map function over items using thread pool."""
        # Add tasks to queue
        for item in items:
            self.tasks.put(item)

        # Start workers
        for _ in range(self.num_threads):
            worker = threading.Thread(
                target=self._worker,
                args=(func,)
            )
            worker.start()
            self.workers.append(worker)

        # Wait for completion
        self.tasks.join()

        # Stop workers
        for _ in range(self.num_threads):
            self.tasks.put(None)

        for worker in self.workers:
            worker.join()

        return self.results

    def _worker(self, func: Callable[[T], R]):
        """Worker thread."""
        while True:
            item = self.tasks.get()

            if item is None:
                self.tasks.task_done()
                break

            try:
                result = func(item)

                with self.lock:
                    self.results.append(result)
            finally:
                self.tasks.task_done()


class Mutex:
    def __init__(self):
        self._lock = threading.Lock()

    def __enter__(self):
        self._lock.acquire()
        return self

    def __exit__(self, *args):
        self._lock.release()


class Semaphore:
    def __init__(self, max_count: int):
        self._semaphore = threading.Semaphore(max_count)

    def __enter__(self):
        self._semaphore.acquire()
        return self

    def __exit__(self, *args):
        self._semaphore.release()
python
import threading
from queue import Queue
from typing import Callable, List, TypeVar, Generic
import time

T = TypeVar('T')
R = TypeVar('R')

class ThreadPool(Generic[T, R]):
    def __init__(self, num_threads: int):
        self.num_threads = num_threads
        self.tasks: Queue = Queue()
        self.results: List[R] = []
        self.lock = threading.Lock()
        self.workers: List[threading.Thread] = []

    def map(self, func: Callable[[T], R], items: List[T]) -> List[R]:
        """Map function over items using thread pool."""
        # Add tasks to queue
        for item in items:
            self.tasks.put(item)

        # Start workers
        for _ in range(self.num_threads):
            worker = threading.Thread(
                target=self._worker,
                args=(func,)
            )
            worker.start()
            self.workers.append(worker)

        # Wait for completion
        self.tasks.join()

        # Stop workers
        for _ in range(self.num_threads):
            self.tasks.put(None)

        for worker in self.workers:
            worker.join()

        return self.results

    def _worker(self, func: Callable[[T], R]):
        """Worker thread."""
        while True:
            item = self.tasks.get()

            if item is None:
                self.tasks.task_done()
                break

            try:
                result = func(item)

                with self.lock:
                    self.results.append(result)
            finally:
                self.tasks.task_done()


class Mutex:
    def __init__(self):
        self._lock = threading.Lock()

    def __enter__(self):
        self._lock.acquire()
        return self

    def __exit__(self, *args):
        self._lock.release()


class Semaphore:
    def __init__(self, max_count: int):
        self._semaphore = threading.Semaphore(max_count)

    def __enter__(self):
        self._semaphore.acquire()
        return self

    def __exit__(self, *args):
        self._semaphore.release()

Usage

Usage

def process_item(item: int) -> int: time.sleep(0.1) return item * 2
pool = ThreadPool(num_threads=5) items = list(range(100)) results = pool.map(process_item, items) print(f"Processed {len(results)} items")
def process_item(item: int) -> int: time.sleep(0.1) return item * 2
pool = ThreadPool(num_threads=5) items = list(range(100)) results = pool.map(process_item, items) print(f"Processed {len(results)} items")

Mutex example

Mutex example

counter = 0 mutex = Mutex()
def increment(): global counter with mutex: current = counter time.sleep(0.001) counter = current + 1
threads = [threading.Thread(target=increment) for _ in range(100)] for t in threads: t.start() for t in threads: t.join()
print(f"Counter: {counter}") # Should be 100
counter = 0 mutex = Mutex()
def increment(): global counter with mutex: current = counter time.sleep(0.001) counter = current + 1
threads = [threading.Thread(target=increment) for _ in range(100)] for t in threads: t.start() for t in threads: t.join()
print(f"Counter: {counter}") # Should be 100

Semaphore example

Semaphore example

db_connections = Semaphore(max_count=10)
def query_database(query: str): with db_connections: # Execute query with limited connections time.sleep(0.1) print(f"Executing: {query}")
undefined
db_connections = Semaphore(max_count=10)
def query_database(query: str): with db_connections: # Execute query with limited connections time.sleep(0.1) print(f"Executing: {query}")
undefined

5. Async Patterns (Python asyncio)

5. Async Patterns (Python asyncio)

python
import asyncio
from typing import Callable, List, TypeVar, Awaitable

T = TypeVar('T')
R = TypeVar('R')

class AsyncPool:
    def __init__(self, concurrency: int):
        self.semaphore = asyncio.Semaphore(concurrency)

    async def map(
        self,
        func: Callable[[T], Awaitable[R]],
        items: List[T]
    ) -> List[R]:
        """Map async function over items with concurrency limit."""
        async def bounded_func(item: T) -> R:
            async with self.semaphore:
                return await func(item)

        return await asyncio.gather(*[
            bounded_func(item) for item in items
        ])


class AsyncQueue:
    def __init__(self, max_size: int = 0):
        self.queue = asyncio.Queue(maxsize=max_size)

    async def put(self, item):
        await self.queue.put(item)

    async def get(self):
        return await self.queue.get()

    def task_done(self):
        self.queue.task_done()

    async def join(self):
        await self.queue.join()
python
import asyncio
from typing import Callable, List, TypeVar, Awaitable

T = TypeVar('T')
R = TypeVar('R')

class AsyncPool:
    def __init__(self, concurrency: int):
        self.semaphore = asyncio.Semaphore(concurrency)

    async def map(
        self,
        func: Callable[[T], Awaitable[R]],
        items: List[T]
    ) -> List[R]:
        """Map async function over items with concurrency limit."""
        async def bounded_func(item: T) -> R:
            async with self.semaphore:
                return await func(item)

        return await asyncio.gather(*[
            bounded_func(item) for item in items
        ])


class AsyncQueue:
    def __init__(self, max_size: int = 0):
        self.queue = asyncio.Queue(maxsize=max_size)

    async def put(self, item):
        await self.queue.put(item)

    async def get(self):
        return await self.queue.get()

    def task_done(self):
        self.queue.task_done()

    async def join(self):
        await self.queue.join()

Producer-Consumer pattern

Producer-Consumer pattern

async def producer(queue: AsyncQueue, items: List[int]): """Produce items.""" for item in items: await queue.put(item) print(f"Produced: {item}") await asyncio.sleep(0.1)
async def consumer(queue: AsyncQueue, name: str): """Consume items.""" while True: item = await queue.get()
    if item is None:
        queue.task_done()
        break

    print(f"{name} consuming: {item}")
    await asyncio.sleep(0.2)
    queue.task_done()
async def main(): queue = AsyncQueue(max_size=10)
# Start consumers
consumers = [
    asyncio.create_task(consumer(queue, f"Consumer-{i}"))
    for i in range(3)
]

# Start producer
await producer(queue, list(range(20)))

# Wait for all items to be processed
await queue.join()

# Stop consumers
for _ in range(3):
    await queue.put(None)

await asyncio.gather(*consumers)
asyncio.run(main())
undefined
async def producer(queue: AsyncQueue, items: List[int]): """Produce items.""" for item in items: await queue.put(item) print(f"Produced: {item}") await asyncio.sleep(0.1)
async def consumer(queue: AsyncQueue, name: str): """Consume items.""" while True: item = await queue.get()
    if item is None:
        queue.task_done()
        break

    print(f"{name} consuming: {item}")
    await asyncio.sleep(0.2)
    queue.task_done()
async def main(): queue = AsyncQueue(max_size=10)
# Start consumers
consumers = [
    asyncio.create_task(consumer(queue, f"Consumer-{i}"))
    for i in range(3)
]

# Start producer
await producer(queue, list(range(20)))

# Wait for all items to be processed
await queue.join()

# Stop consumers
for _ in range(3):
    await queue.put(None)

await asyncio.gather(*consumers)
asyncio.run(main())
undefined

6. Go-Style Channels (Simulation)

6. Go-Style Channels (Simulation)

typescript
class Channel<T> {
  private buffer: T[] = [];
  private senders: Array<{ value: T; resolve: () => void }> = [];
  private receivers: Array<(value: T) => void> = [];
  private closed = false;

  constructor(private bufferSize: number = 0) {}

  async send(value: T): Promise<void> {
    if (this.closed) {
      throw new Error('Channel is closed');
    }

    if (this.receivers.length > 0) {
      const receiver = this.receivers.shift()!;
      receiver(value);
      return;
    }

    if (this.buffer.length < this.bufferSize) {
      this.buffer.push(value);
      return;
    }

    return new Promise(resolve => {
      this.senders.push({ value, resolve });
    });
  }

  async receive(): Promise<T | undefined> {
    if (this.buffer.length > 0) {
      const value = this.buffer.shift()!;

      if (this.senders.length > 0) {
        const sender = this.senders.shift()!;
        this.buffer.push(sender.value);
        sender.resolve();
      }

      return value;
    }

    if (this.senders.length > 0) {
      const sender = this.senders.shift()!;
      sender.resolve();
      return sender.value;
    }

    if (this.closed) {
      return undefined;
    }

    return new Promise(resolve => {
      this.receivers.push(resolve);
    });
  }

  close(): void {
    this.closed = true;
    this.receivers.forEach(receiver => receiver(undefined as any));
    this.receivers = [];
  }
}

// Usage
async function example() {
  const channel = new Channel<number>(5);

  // Producer
  async function producer() {
    for (let i = 0; i < 10; i++) {
      await channel.send(i);
      console.log(`Sent: ${i}`);
    }
    channel.close();
  }

  // Consumer
  async function consumer() {
    while (true) {
      const value = await channel.receive();
      if (value === undefined) break;
      console.log(`Received: ${value}`);
    }
  }

  await Promise.all([
    producer(),
    consumer()
  ]);
}
typescript
class Channel<T> {
  private buffer: T[] = [];
  private senders: Array<{ value: T; resolve: () => void }> = [];
  private receivers: Array<(value: T) => void> = [];
  private closed = false;

  constructor(private bufferSize: number = 0) {}

  async send(value: T): Promise<void> {
    if (this.closed) {
      throw new Error('Channel is closed');
    }

    if (this.receivers.length > 0) {
      const receiver = this.receivers.shift()!;
      receiver(value);
      return;
    }

    if (this.buffer.length < this.bufferSize) {
      this.buffer.push(value);
      return;
    }

    return new Promise(resolve => {
      this.senders.push({ value, resolve });
    });
  }

  async receive(): Promise<T | undefined> {
    if (this.buffer.length > 0) {
      const value = this.buffer.shift()!;

      if (this.senders.length > 0) {
        const sender = this.senders.shift()!;
        this.buffer.push(sender.value);
        sender.resolve();
      }

      return value;
    }

    if (this.senders.length > 0) {
      const sender = this.senders.shift()!;
      sender.resolve();
      return sender.value;
    }

    if (this.closed) {
      return undefined;
    }

    return new Promise(resolve => {
      this.receivers.push(resolve);
    });
  }

  close(): void {
    this.closed = true;
    this.receivers.forEach(receiver => receiver(undefined as any));
    this.receivers = [];
  }
}

// Usage
async function example() {
  const channel = new Channel<number>(5);

  // Producer
  async function producer() {
    for (let i = 0; i < 10; i++) {
      await channel.send(i);
      console.log(`Sent: ${i}`);
    }
    channel.close();
  }

  // Consumer
  async function consumer() {
    while (true) {
      const value = await channel.receive();
      if (value === undefined) break;
      console.log(`Received: ${value}`);
    }
  }

  await Promise.all([
    producer(),
    consumer()
  ]);
}

Best Practices

最佳实践

✅ DO

✅ 推荐做法

  • Use proper synchronization primitives
  • Limit concurrency to avoid resource exhaustion
  • Handle errors in concurrent operations
  • Use immutable data when possible
  • Test concurrent code thoroughly
  • Profile concurrent performance
  • Document thread-safety guarantees
  • 使用合适的同步原语
  • 限制并发数以避免资源耗尽
  • 处理并发操作中的错误
  • 尽可能使用不可变数据
  • 全面测试并发代码
  • 分析并发性能
  • 记录线程安全保证

❌ DON'T

❌ 不推荐做法

  • Share mutable state without synchronization
  • Use sleep/polling for coordination
  • Create unlimited threads/workers
  • Ignore race conditions
  • Block event loops in async code
  • Forget to clean up resources
  • 在未同步的情况下共享可变状态
  • 使用睡眠/轮询进行协调
  • 创建无限制的线程/工作进程
  • 忽略竞争条件
  • 在异步代码中阻塞事件循环
  • 忘记清理资源

Resources

参考资源