csharp-concurrency-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

.NET Concurrency: Choosing the Right Tool

.NET 并发:选择合适的工具

When to Use This Skill

何时使用本技能

Use this skill when:
  • Deciding how to handle concurrent operations in .NET
  • Evaluating whether to use async/await, Channels, Akka.NET, or other abstractions
  • Tempted to use locks, semaphores, or other synchronization primitives
  • Need to process streams of data with backpressure, batching, or debouncing
  • Managing state across multiple concurrent entities
在以下场景中使用本技能:
  • 决定如何在.NET中处理并发操作
  • 评估是否使用async/await、Channels、Akka.NET或其他抽象机制
  • 想要使用锁、信号量或其他同步原语时
  • 需要处理带有背压、批处理或防抖的数据流
  • 管理多个并发实体的状态

The Philosophy

核心理念

Start simple, escalate only when needed.
Most concurrency problems can be solved with
async/await
. Only reach for more sophisticated tools when you have a specific need that async/await can't address cleanly.
Try to avoid shared mutable state. The best way to handle concurrency is to design it away. Immutable data, message passing, and isolated state (like actors) eliminate entire categories of bugs.
Locks should be the exception, not the rule. When you can't avoid shared mutable state, using a lock occasionally isn't the end of the world. But if you find yourself reaching for
lock
,
SemaphoreSlim
, or other synchronization primitives regularly, step back and reconsider your design.
When you truly need shared mutable state:
  1. First choice: Redesign to avoid it (immutability, message passing, actor isolation)
  2. Second choice: Use
    System.Collections.Concurrent
    (ConcurrentDictionary, ConcurrentQueue, etc.)
  3. Third choice: Use
    Channel<T>
    to serialize access through message passing
  4. Last resort: Use
    lock
    for simple, short-lived critical sections
Locks are appropriate when building low-level infrastructure or concurrent data structures. But for business logic, there's almost always a better abstraction.

从简单开始,仅在必要时升级方案。
大多数并发问题都可以用
async/await
解决。只有当async/await无法干净地满足特定需求时,才考虑使用更复杂的工具。
尽量避免共享可变状态。 处理并发的最佳方式是从设计上规避它。不可变数据、消息传递和隔离状态(如Actor)可以消除整类bug。
锁应该是例外,而非常规操作。 当你无法避免共享可变状态时,偶尔使用锁并非世界末日。但如果你发现自己频繁使用
lock
SemaphoreSlim
或其他同步原语,请退一步重新考虑你的设计。
当你确实需要共享可变状态时:
  1. 首选方案: 重新设计以避免它(不可变性、消息传递、Actor隔离)
  2. 次选方案: 使用
    System.Collections.Concurrent
    (ConcurrentDictionary、ConcurrentQueue等)
  3. 第三选择: 使用
    Channel<T>
    通过消息传递序列化访问
  4. 最后手段: 对简单、短期的临界区使用
    lock
锁适用于构建底层基础设施或并发数据结构。但对于业务逻辑,几乎总有更好的抽象方案。

Decision Tree

决策树

What are you trying to do?
├─► Wait for I/O (HTTP, database, file)?
│   └─► Use async/await
├─► Process a collection in parallel (CPU-bound)?
│   └─► Use Parallel.ForEachAsync
├─► Producer/consumer pattern (work queue)?
│   └─► Use System.Threading.Channels
├─► UI event handling (debounce, throttle, combine)?
│   └─► Use Reactive Extensions (Rx)
├─► Server-side stream processing (backpressure, batching)?
│   └─► Use Akka.NET Streams
├─► State machines with complex transitions?
│   └─► Use Akka.NET Actors (Become pattern)
├─► Manage state for many independent entities?
│   └─► Use Akka.NET Actors (entity-per-actor)
├─► Coordinate multiple async operations?
│   └─► Use Task.WhenAll / Task.WhenAny
└─► None of the above fits?
    └─► Ask yourself: "Do I really need shared mutable state?"
        ├─► Yes → Consider redesigning to avoid it
        └─► Truly unavoidable → Use Channels or Actors to serialize access

你想要实现什么?
├─► 等待I/O操作(HTTP、数据库、文件)?
│   └─► 使用async/await
├─► 并行处理集合(CPU密集型)?
│   └─► 使用Parallel.ForEachAsync
├─► 生产者/消费者模式(工作队列)?
│   └─► 使用System.Threading.Channels
├─► UI事件处理(防抖、节流、组合)?
│   └─► 使用Reactive Extensions (Rx)
├─► 服务端流处理(背压、批处理)?
│   └─► 使用Akka.NET Streams
├─► 带有复杂状态转换的状态机?
│   └─► 使用Akka.NET Actors(Become模式)
├─► 管理多个独立实体的状态?
│   └─► 使用Akka.NET Actors(每个实体一个Actor)
├─► 协调多个异步操作?
│   └─► 使用Task.WhenAll / Task.WhenAny
└─► 以上都不适用?
    └─► 问自己:"我真的需要共享可变状态吗?"
        ├─► 是 → 考虑重新设计以避免它
        └─► 确实无法避免 → 使用Channels或Actors序列化访问

Level 1: async/await (Default Choice)

级别1:async/await(默认选择)

Use for: I/O-bound operations, non-blocking waits, most everyday concurrency.
csharp
// Simple async I/O
public async Task<Order> GetOrderAsync(string orderId, CancellationToken ct)
{
    var order = await _database.GetAsync(orderId, ct);
    var customer = await _customerService.GetAsync(order.CustomerId, ct);
    return order with { Customer = customer };
}

// Parallel async operations (when independent)
public async Task<Dashboard> LoadDashboardAsync(string userId, CancellationToken ct)
{
    var ordersTask = _orderService.GetRecentOrdersAsync(userId, ct);
    var notificationsTask = _notificationService.GetUnreadAsync(userId, ct);
    var statsTask = _statsService.GetUserStatsAsync(userId, ct);

    await Task.WhenAll(ordersTask, notificationsTask, statsTask);

    return new Dashboard(
        Orders: await ordersTask,
        Notifications: await notificationsTask,
        Stats: await statsTask);
}
Key principles:
  • Always accept
    CancellationToken
  • Use
    ConfigureAwait(false)
    in library code
  • Don't block on async code (no
    .Result
    or
    .Wait()
    )

适用场景: I/O密集型操作、非阻塞等待、大多数日常并发场景。
csharp
// 简单异步I/O
public async Task<Order> GetOrderAsync(string orderId, CancellationToken ct)
{
    var order = await _database.GetAsync(orderId, ct);
    var customer = await _customerService.GetAsync(order.CustomerId, ct);
    return order with { Customer = customer };
}

// 并行异步操作(独立任务)
public async Task<Dashboard> LoadDashboardAsync(string userId, CancellationToken ct)
{
    var ordersTask = _orderService.GetRecentOrdersAsync(userId, ct);
    var notificationsTask = _notificationService.GetUnreadAsync(userId, ct);
    var statsTask = _statsService.GetUserStatsAsync(userId, ct);

    await Task.WhenAll(ordersTask, notificationsTask, statsTask);

    return new Dashboard(
        Orders: await ordersTask,
        Notifications: await notificationsTask,
        Stats: await statsTask);
}
核心原则:
  • 始终接受
    CancellationToken
    参数
  • 在类库代码中使用
    ConfigureAwait(false)
  • 不要阻塞异步代码(避免使用
    .Result
    .Wait()

Level 2: Parallel.ForEachAsync (CPU-Bound Parallelism)

级别2:Parallel.ForEachAsync(CPU密集型并行处理)

Use for: Processing collections in parallel when work is CPU-bound or you need controlled concurrency.
csharp
// Process items with controlled parallelism
public async Task ProcessOrdersAsync(
    IEnumerable<Order> orders,
    CancellationToken ct)
{
    await Parallel.ForEachAsync(
        orders,
        new ParallelOptions
        {
            MaxDegreeOfParallelism = Environment.ProcessorCount,
            CancellationToken = ct
        },
        async (order, token) =>
        {
            await ProcessOrderAsync(order, token);
        });
}

// CPU-bound work with I/O
public async Task<IReadOnlyList<ProcessedImage>> ProcessImagesAsync(
    IEnumerable<string> imagePaths,
    CancellationToken ct)
{
    var results = new ConcurrentBag<ProcessedImage>();

    await Parallel.ForEachAsync(
        imagePaths,
        new ParallelOptions { MaxDegreeOfParallelism = 4, CancellationToken = ct },
        async (path, token) =>
        {
            var image = await File.ReadAllBytesAsync(path, token);
            var processed = ProcessImage(image); // CPU-bound
            results.Add(processed);
        });

    return results.ToList();
}
When NOT to use:
  • Pure I/O operations (async/await is sufficient)
  • When order matters (Parallel doesn't preserve order)
  • When you need backpressure or flow control

适用场景: 当工作为CPU密集型或需要控制并发度时,并行处理集合。
csharp
// 控制并发度处理项
public async Task ProcessOrdersAsync(
    IEnumerable<Order> orders,
    CancellationToken ct)
{
    await Parallel.ForEachAsync(
        orders,
        new ParallelOptions
        {
            MaxDegreeOfParallelism = Environment.ProcessorCount,
            CancellationToken = ct
        },
        async (order, token) =>
        {
            await ProcessOrderAsync(order, token);
        });
}

// 包含I/O的CPU密集型工作
public async Task<IReadOnlyList<ProcessedImage>> ProcessImagesAsync(
    IEnumerable<string> imagePaths,
    CancellationToken ct)
{
    var results = new ConcurrentBag<ProcessedImage>();

    await Parallel.ForEachAsync(
        imagePaths,
        new ParallelOptions { MaxDegreeOfParallelism = 4, CancellationToken = ct },
        async (path, token) =>
        {
            var image = await File.ReadAllBytesAsync(path, token);
            var processed = ProcessImage(image); // CPU密集型操作
            results.Add(processed);
        });

    return results.ToList();
}
不适用场景:
  • 纯I/O操作(async/await已足够)
  • 对顺序有要求的场景(Parallel不保证顺序)
  • 需要背压或流控制的场景

Level 3: System.Threading.Channels (Producer/Consumer)

级别3:System.Threading.Channels(生产者/消费者模式)

Use for: Work queues, producer/consumer patterns, decoupling producers from consumers, simple stream-like processing.
csharp
// Basic producer/consumer
public class OrderProcessor
{
    private readonly Channel<Order> _channel;

    public OrderProcessor()
    {
        // Bounded channel provides backpressure
        _channel = Channel.CreateBounded<Order>(new BoundedChannelOptions(100)
        {
            FullMode = BoundedChannelFullMode.Wait
        });
    }

    // Producer
    public async Task EnqueueOrderAsync(Order order, CancellationToken ct)
    {
        await _channel.Writer.WriteAsync(order, ct);
    }

    // Consumer (run as background task)
    public async Task ProcessOrdersAsync(CancellationToken ct)
    {
        await foreach (var order in _channel.Reader.ReadAllAsync(ct))
        {
            await ProcessOrderAsync(order, ct);
        }
    }

    // Signal no more items
    public void Complete() => _channel.Writer.Complete();
}
csharp
// Multiple consumers (work-stealing pattern)
public class WorkerPool
{
    private readonly Channel<WorkItem> _channel;
    private readonly List<Task> _workers = new();

    public WorkerPool(int workerCount)
    {
        _channel = Channel.CreateUnbounded<WorkItem>();

        // Start multiple consumers
        for (int i = 0; i < workerCount; i++)
        {
            _workers.Add(Task.Run(() => ConsumeAsync()));
        }
    }

    private async Task ConsumeAsync()
    {
        await foreach (var item in _channel.Reader.ReadAllAsync())
        {
            await ProcessAsync(item);
        }
    }

    public ValueTask EnqueueAsync(WorkItem item)
        => _channel.Writer.WriteAsync(item);
}
Channels are good for:
  • Decoupling producer speed from consumer speed
  • Buffering work with backpressure
  • Simple fan-out to multiple workers
  • Background processing queues
Channels are NOT good for:
  • Complex stream operations (batching, windowing, merging)
  • Stateful processing per entity
  • When you need sophisticated error handling/supervision

适用场景: 工作队列、生产者/消费者模式、解耦生产者与消费者、简单的类流处理。
csharp
// 基础生产者/消费者
public class OrderProcessor
{
    private readonly Channel<Order> _channel;

    public OrderProcessor()
    {
        // 有界通道提供背压机制
        _channel = Channel.CreateBounded<Order>(new BoundedChannelOptions(100)
        {
            FullMode = BoundedChannelFullMode.Wait
        });
    }

    // 生产者
    public async Task EnqueueOrderAsync(Order order, CancellationToken ct)
    {
        await _channel.Writer.WriteAsync(order, ct);
    }

    // 消费者(作为后台任务运行)
    public async Task ProcessOrdersAsync(CancellationToken ct)
    {
        await foreach (var order in _channel.Reader.ReadAllAsync(ct))
        {
            await ProcessOrderAsync(order, ct);
        }
    }

    // 标记无更多项
    public void Complete() => _channel.Writer.Complete();
}
csharp
// 多消费者(工作窃取模式)
public class WorkerPool
{
    private readonly Channel<WorkItem> _channel;
    private readonly List<Task> _workers = new();

    public WorkerPool(int workerCount)
    {
        _channel = Channel.CreateUnbounded<WorkItem>();

        // 启动多个消费者
        for (int i = 0; i < workerCount; i++)
        {
            _workers.Add(Task.Run(() => ConsumeAsync()));
        }
    }

    private async Task ConsumeAsync()
    {
        await foreach (var item in _channel.Reader.ReadAllAsync())
        {
            await ProcessAsync(item);
        }
    }

    public ValueTask EnqueueAsync(WorkItem item)
        => _channel.Writer.WriteAsync(item);
}
Channels适合场景:
  • 解耦生产者与消费者的速度
  • 带背压的工作缓冲
  • 简单的多工作者分发
  • 后台处理队列
Channels不适合场景:
  • 复杂的流操作(批处理、窗口化、合并)
  • 每个实体的有状态处理
  • 需要复杂错误处理/监管的场景

Level 4: Akka.NET Streams (Complex Stream Processing)

级别4:Akka.NET Streams(复杂流处理)

Use for: Backpressure, batching, debouncing, throttling, merging streams, complex transformations.
csharp
using Akka.Streams;
using Akka.Streams.Dsl;

// Batching with timeout
public Source<IReadOnlyList<Event>, NotUsed> BatchEvents(
    Source<Event, NotUsed> events)
{
    return events
        .GroupedWithin(100, TimeSpan.FromSeconds(1)) // Batch up to 100 or 1 second
        .Select(batch => batch.ToList() as IReadOnlyList<Event>);
}

// Throttling
public Source<Request, NotUsed> ThrottleRequests(
    Source<Request, NotUsed> requests)
{
    return requests
        .Throttle(10, TimeSpan.FromSeconds(1), 5, ThrottleMode.Shaping);
}

// Parallel processing with ordered results
public Source<ProcessedItem, NotUsed> ProcessWithParallelism(
    Source<Item, NotUsed> items)
{
    return items
        .SelectAsync(4, async item => await ProcessAsync(item)); // 4 parallel
}

// Complex pipeline
public IRunnableGraph<Task<Done>> CreatePipeline(
    Source<RawEvent, NotUsed> events,
    Sink<ProcessedEvent, Task<Done>> sink)
{
    return events
        .Where(e => e.IsValid)
        .GroupedWithin(50, TimeSpan.FromMilliseconds(500))
        .SelectAsync(4, batch => ProcessBatchAsync(batch))
        .SelectMany(results => results)
        .ToMaterialized(sink, Keep.Right);
}
Akka.NET Streams excel at:
  • Batching with size AND time limits
  • Throttling and rate limiting
  • Backpressure that propagates through the entire pipeline
  • Merging/splitting streams
  • Parallel processing with ordering guarantees
  • Error handling with supervision

适用场景: 背压、批处理、防抖、节流、合并流、复杂转换。
csharp
using Akka.Streams;
using Akka.Streams.Dsl;

// 带超时的批处理
public Source<IReadOnlyList<Event>, NotUsed> BatchEvents(
    Source<Event, NotUsed> events)
{
    return events
        .GroupedWithin(100, TimeSpan.FromSeconds(1)) // 最多100个或1秒超时
        .Select(batch => batch.ToList() as IReadOnlyList<Event>);
}

// 节流处理
public Source<Request, NotUsed> ThrottleRequests(
    Source<Request, NotUsed> requests)
{
    return requests
        .Throttle(10, TimeSpan.FromSeconds(1), 5, ThrottleMode.Shaping);
}

// 带有序结果的并行处理
public Source<ProcessedItem, NotUsed> ProcessWithParallelism(
    Source<Item, NotUsed> items)
{
    return items
        .SelectAsync(4, async item => await ProcessAsync(item)); // 4个并行任务
}

// 复杂流水线
public IRunnableGraph<Task<Done>> CreatePipeline(
    Source<RawEvent, NotUsed> events,
    Sink<ProcessedEvent, Task<Done>> sink)
{
    return events
        .Where(e => e.IsValid)
        .GroupedWithin(50, TimeSpan.FromMilliseconds(500))
        .SelectAsync(4, batch => ProcessBatchAsync(batch))
        .SelectMany(results => results)
        .ToMaterialized(sink, Keep.Right);
}
Akka.NET Streams擅长场景:
  • 基于大小和时间限制的批处理
  • 节流和速率限制
  • 贯穿整个流水线的背压机制
  • 流的合并/拆分
  • 带顺序保证的并行处理
  • 带监管的错误处理

Level 4b: Reactive Extensions (UI and Event Composition)

级别4b:Reactive Extensions(UI与事件组合)

Use for: UI event handling, composing event streams, time-based operations in client applications.
Rx shines in UI scenarios where you need to react to user events with debouncing, throttling, or combining multiple event sources.
csharp
using System.Reactive.Linq;

// Search-as-you-type with debouncing
public class SearchViewModel
{
    public SearchViewModel(ISearchService searchService)
    {
        // React to text changes with debouncing
        SearchResults = SearchText
            .Throttle(TimeSpan.FromMilliseconds(300))  // Wait for typing to pause
            .DistinctUntilChanged()                     // Ignore if same text
            .Where(text => text.Length >= 3)           // Minimum length
            .SelectMany(text => searchService.SearchAsync(text).ToObservable())
            .ObserveOn(RxApp.MainThreadScheduler);     // Back to UI thread
    }

    public IObservable<string> SearchText { get; }
    public IObservable<IList<SearchResult>> SearchResults { get; }
}

// Combining multiple UI events
public IObservable<bool> CanSubmit =>
    Observable.CombineLatest(
        UsernameValid,
        PasswordValid,
        EmailValid,
        (user, pass, email) => user && pass && email);

// Double-click detection
public IObservable<Point> DoubleClicks =>
    MouseClicks
        .Buffer(TimeSpan.FromMilliseconds(300))
        .Where(clicks => clicks.Count >= 2)
        .Select(clicks => clicks.Last());

// Auto-save with debouncing
public IDisposable AutoSave =>
    DocumentChanges
        .Throttle(TimeSpan.FromSeconds(2))
        .Subscribe(async doc => await SaveAsync(doc));
Rx is ideal for:
  • UI event composition (WPF, WinForms, MAUI, Blazor)
  • Search-as-you-type with debouncing
  • Combining multiple event sources
  • Time-windowed operations in UI
  • Drag-and-drop gesture detection
  • Real-time data visualization
Rx vs Akka.NET Streams:
ScenarioRxAkka.NET Streams
UI events✅ Best choiceOverkill
Client-side composition✅ Best choiceOverkill
Server-side pipelinesWorks but limited✅ Better backpressure
Distributed processing❌ Not designed for✅ Built for this
Hot observables✅ Native supportRequires more setup
Rule of thumb: Rx for UI/client, Akka.NET Streams for server-side pipelines.

适用场景: UI事件处理、事件流组合、客户端应用中的基于时间的操作。
Rx在UI场景中表现出色,你可以通过防抖、节流或组合多个事件源来响应用户操作。
csharp
using System.Reactive.Linq;

// 带防抖的即时搜索
public class SearchViewModel
{
    public SearchViewModel(ISearchService searchService)
    {
        // 响应文本变化并添加防抖
        SearchResults = SearchText
            .Throttle(TimeSpan.FromMilliseconds(300))  // 等待输入暂停
            .DistinctUntilChanged()                     // 忽略重复文本
            .Where(text => text.Length >= 3)           // 最小长度限制
            .SelectMany(text => searchService.SearchAsync(text).ToObservable())
            .ObserveOn(RxApp.MainThreadScheduler);     // 切换回UI线程
    }

    public IObservable<string> SearchText { get; }
    public IObservable<IList<SearchResult>> SearchResults { get; }
}

// 组合多个UI事件
public IObservable<bool> CanSubmit =>
    Observable.CombineLatest(
        UsernameValid,
        PasswordValid,
        EmailValid,
        (user, pass, email) => user && pass && email);

// 双击检测
public IObservable<Point> DoubleClicks =>
    MouseClicks
        .Buffer(TimeSpan.FromMilliseconds(300))
        .Where(clicks => clicks.Count >= 2)
        .Select(clicks => clicks.Last());

// 带防抖的自动保存
public IDisposable AutoSave =>
    DocumentChanges
        .Throttle(TimeSpan.FromSeconds(2))
        .Subscribe(async doc => await SaveAsync(doc));
Rx理想场景:
  • UI事件组合(WPF、WinForms、MAUI、Blazor)
  • 带防抖的即时搜索
  • 组合多个事件源
  • UI中的时间窗口操作
  • 拖放手势检测
  • 实时数据可视化
Rx vs Akka.NET Streams:
场景RxAkka.NET Streams
UI事件✅ 最佳选择大材小用
客户端组合✅ 最佳选择大材小用
服务端流水线可用但受限✅ 背压机制更完善
分布式处理❌ 非设计目标✅ 原生支持
热可观察对象✅ 原生支持需要更多配置
经验法则: Rx用于UI/客户端,Akka.NET Streams用于服务端流水线。

Level 5: Akka.NET Actors (Stateful Concurrency)

级别5:Akka.NET Actors(有状态并发)

Use for: Managing state for multiple entities, state machines, push-based updates, complex coordination, supervision and fault tolerance.
适用场景: 管理多个实体的状态、状态机、推送式更新、复杂协调、监管与容错。

Entity-Per-Actor Pattern

每个实体一个Actor模式

csharp
// Actor per entity - each order has isolated state
public class OrderActor : ReceiveActor
{
    private OrderState _state;

    public OrderActor(string orderId)
    {
        _state = new OrderState(orderId);

        Receive<AddItem>(msg =>
        {
            _state = _state.AddItem(msg.Item);
            Sender.Tell(new ItemAdded(msg.Item));
        });

        Receive<Checkout>(msg =>
        {
            if (_state.CanCheckout)
            {
                _state = _state.Checkout();
                Sender.Tell(new CheckoutSucceeded(_state.Total));
            }
            else
            {
                Sender.Tell(new CheckoutFailed("Cart is empty"));
            }
        });

        Receive<GetState>(_ => Sender.Tell(_state));
    }
}
csharp
// 每个实体一个Actor - 每个订单拥有独立状态
public class OrderActor : ReceiveActor
{
    private OrderState _state;

    public OrderActor(string orderId)
    {
        _state = new OrderState(orderId);

        Receive<AddItem>(msg =>
        {
            _state = _state.AddItem(msg.Item);
            Sender.Tell(new ItemAdded(msg.Item));
        });

        Receive<Checkout>(msg =>
        {
            if (_state.CanCheckout)
            {
                _state = _state.Checkout();
                Sender.Tell(new CheckoutSucceeded(_state.Total));
            }
            else
            {
                Sender.Tell(new CheckoutFailed("购物车为空"));
            }
        });

        Receive<GetState>(_ => Sender.Tell(_state));
    }
}

State Machines with Become

使用Become实现状态机

Actors excel at implementing state machines using
Become()
to switch message handlers:
csharp
public class PaymentActor : ReceiveActor
{
    private PaymentData _payment;

    public PaymentActor(string paymentId)
    {
        _payment = new PaymentData(paymentId);

        // Start in Pending state
        Pending();
    }

    private void Pending()
    {
        Receive<AuthorizePayment>(msg =>
        {
            _payment = _payment with { Amount = msg.Amount };
            // Transition to Authorizing state
            Become(Authorizing);
            Self.Tell(new ProcessAuthorization());
        });

        Receive<CancelPayment>(_ =>
        {
            Become(Cancelled);
            Sender.Tell(new PaymentCancelled(_payment.Id));
        });
    }

    private void Authorizing()
    {
        Receive<ProcessAuthorization>(async _ =>
        {
            var result = await _gateway.AuthorizeAsync(_payment);
            if (result.Success)
            {
                _payment = _payment with { AuthCode = result.AuthCode };
                Become(Authorized);
            }
            else
            {
                Become(Failed);
            }
        });

        // Can't cancel while authorizing - stash for later or reject
        Receive<CancelPayment>(_ =>
        {
            Sender.Tell(new PaymentError("Cannot cancel during authorization"));
        });
    }

    private void Authorized()
    {
        Receive<CapturePayment>(_ =>
        {
            Become(Capturing);
            Self.Tell(new ProcessCapture());
        });

        Receive<VoidPayment>(_ =>
        {
            Become(Voiding);
            Self.Tell(new ProcessVoid());
        });
    }

    private void Capturing() { /* ... */ }
    private void Voiding() { /* ... */ }
    private void Cancelled() { /* Only responds to GetState */ }
    private void Failed() { /* Only responds to GetState, Retry */ }
}
Actor通过
Become()
切换消息处理器,可以优雅地实现状态机:
csharp
public class PaymentActor : ReceiveActor
{
    private PaymentData _payment;

    public PaymentActor(string paymentId)
    {
        _payment = new PaymentData(paymentId);

        // 初始状态为Pending
        Pending();
    }

    private void Pending()
    {
        Receive<AuthorizePayment>(msg =>
        {
            _payment = _payment with { Amount = msg.Amount };
            // 转换到Authorizing状态
            Become(Authorizing);
            Self.Tell(new ProcessAuthorization());
        });

        Receive<CancelPayment>(_ =>
        {
            Become(Cancelled);
            Sender.Tell(new PaymentCancelled(_payment.Id));
        });
    }

    private void Authorizing()
    {
        Receive<ProcessAuthorization>(async _ =>
        {
            var result = await _gateway.AuthorizeAsync(_payment);
            if (result.Success)
            {
                _payment = _payment with { AuthCode = result.AuthCode };
                Become(Authorized);
            }
            else
            {
                Become(Failed);
            }
        });

        // 授权过程中无法取消 - 暂存或拒绝
        Receive<CancelPayment>(_ =>
        {
            Sender.Tell(new PaymentError("授权过程中无法取消"));
        });
    }

    private void Authorized()
    {
        Receive<CapturePayment>(_ =>
        {
            Become(Capturing);
            Self.Tell(new ProcessCapture());
        });

        Receive<VoidPayment>(_ =>
        {
            Become(Voiding);
            Self.Tell(new ProcessVoid());
        });
    }

    private void Capturing() { /* ... */ }
    private void Voiding() { /* ... */ }
    private void Cancelled() { /* 仅响应GetState */ }
    private void Failed() { /* 仅响应GetState、Retry */ }
}

Distributed Entities with Cluster Sharding

使用Cluster Sharding实现分布式实体

csharp
// Using Cluster Sharding for distributed entities
builder.WithShardRegion<OrderActor>(
    typeName: "orders",
    entityPropsFactory: (_, _, resolver) =>
        orderId => Props.Create(() => new OrderActor(orderId)),
    messageExtractor: new OrderMessageExtractor(),
    shardOptions: new ShardOptions());

// Send message to any order - sharding routes to correct node
var orderRegion = registry.Get<OrderActor>();
orderRegion.Tell(new ShardingEnvelope("order-123", new AddItem(item)));
csharp
// 使用Cluster Sharding实现分布式实体
builder.WithShardRegion<OrderActor>(
    typeName: "orders",
    entityPropsFactory: (_, _, resolver) =>
        orderId => Props.Create(() => new OrderActor(orderId)),
    messageExtractor: new OrderMessageExtractor(),
    shardOptions: new ShardOptions());

// 向任意订单发送消息 - Sharding会路由到正确节点
var orderRegion = registry.Get<OrderActor>();
orderRegion.Tell(new ShardingEnvelope("order-123", new AddItem(item)));

When to Use Akka.NET

何时使用Akka.NET

Use Akka.NET Actors when you have:
ScenarioWhy Actors?
Many entities with independent stateEach entity gets its own actor - no locks, natural isolation
State machines
Become()
elegantly models state transitions
Push-based/reactive updatesActors naturally support tell-don't-ask
Supervision requirementsParent actors supervise children, automatic restart on failure
Distributed systemsCluster Sharding distributes entities across nodes
Long-running workflowsActors + persistence = durable workflows
Real-time systemsMessage-driven, non-blocking by design
IoT / device managementEach device = one actor, scales to millions
Don't use Akka.NET when:
ScenarioBetter Alternative
Simple work queue
Channel<T>
Request/response API
async/await
Batch processing
Parallel.ForEachAsync
or Akka.NET Streams
UI event handlingReactive Extensions
Shared state (single instance)Service with
Channel
for serialization
CRUD operationsStandard async services
当你遇到以下场景时使用Akka.NET Actors:
场景为何选择Actor?
大量带有独立状态的实体每个实体对应一个Actor - 无需锁,天然隔离
状态机
Become()
优雅地实现状态转换
推送式/响应式更新Actor天然支持"告知而非询问"模式
监管需求父Actor监管子Actor,失败时自动重启
分布式系统Cluster Sharding将实体分布到多个节点
长期运行的工作流Actor + 持久化 = 可持久化工作流
实时系统消息驱动,原生非阻塞设计
IoT / 设备管理每个设备对应一个Actor,可扩展至数百万级
不要使用Akka.NET的场景:
场景更好的替代方案
简单工作队列
Channel<T>
请求/响应API
async/await
批处理
Parallel.ForEachAsync
或 Akka.NET Streams
UI事件处理Reactive Extensions
共享状态(单实例)结合
Channel
实现序列化访问的服务
CRUD操作标准异步服务

The Actor Mindset

Actor思维模式

Think of actors when your problem looks like:
  • "I have thousands of [orders/users/devices/sessions] that need independent state"
  • "Each [entity] goes through a lifecycle with different behaviors at each stage"
  • "I need to push updates to interested parties when something changes"
  • "If processing fails, I want to restart just that entity, not the whole system"
  • "This needs to work across multiple servers"
If none of these apply, you probably don't need actors.

当你的问题符合以下描述时,考虑使用Actor:
  • "我有数千个[订单/用户/设备/会话]需要独立管理状态"
  • "每个[实体]都有生命周期,不同阶段有不同行为"
  • "我需要在状态变化时推送更新给相关方"
  • "如果处理失败,我希望仅重启该实体,而非整个系统"
  • "这需要在多台服务器上运行"
如果以上都不适用,你可能不需要Actor。

Anti-Patterns: What to Avoid

反模式:需要避免的做法

❌ Locks for Business Logic

❌ 在业务逻辑中使用锁

csharp
// BAD: Using locks to protect shared state
private readonly object _lock = new();
private Dictionary<string, Order> _orders = new();

public void UpdateOrder(string id, Action<Order> update)
{
    lock (_lock)
    {
        if (_orders.TryGetValue(id, out var order))
        {
            update(order);
        }
    }
}

// GOOD: Use an actor or Channel to serialize access
// Each order gets its own actor - no locks needed
csharp
// 错误:使用锁保护共享状态
private readonly object _lock = new();
private Dictionary<string, Order> _orders = new();

public void UpdateOrder(string id, Action<Order> update)
{
    lock (_lock)
    {
        if (_orders.TryGetValue(id, out var order))
        {
            update(order);
        }
    }
}

// 正确:使用Actor或Channel序列化访问
// 每个订单对应一个Actor - 无需锁

❌ Manual Thread Management

❌ 手动线程管理

csharp
// BAD: Creating threads manually
var thread = new Thread(() => ProcessOrders());
thread.Start();

// GOOD: Use Task.Run or better abstractions
_ = Task.Run(() => ProcessOrdersAsync(cancellationToken));
csharp
// 错误:手动创建线程
var thread = new Thread(() => ProcessOrders());
thread.Start();

// 正确:使用Task.Run或更好的抽象
_ = Task.Run(() => ProcessOrdersAsync(cancellationToken));

❌ Blocking in Async Code

❌ 在异步代码中阻塞

csharp
// BAD: Blocking on async
var result = GetDataAsync().Result; // Deadlock risk!
GetDataAsync().Wait();              // Also bad

// GOOD: Async all the way
var result = await GetDataAsync();
csharp
// 错误:阻塞异步代码
var result = GetDataAsync().Result; // 有死锁风险!
GetDataAsync().Wait();              // 同样错误

// 正确:全程异步
var result = await GetDataAsync();

❌ Shared Mutable State Without Protection

❌ 无保护的共享可变状态

csharp
// BAD: Multiple tasks mutating shared state
var results = new List<Result>();
await Parallel.ForEachAsync(items, async (item, ct) =>
{
    var result = await ProcessAsync(item, ct);
    results.Add(result); // Race condition!
});

// GOOD: Use ConcurrentBag or collect results differently
var results = new ConcurrentBag<Result>();
// Or better: return from the lambda and collect

csharp
// 错误:多个任务修改共享状态
var results = new List<Result>();
await Parallel.ForEachAsync(items, async (item, ct) =>
{
    var result = await ProcessAsync(item, ct);
    results.Add(result); // 竞态条件!
});

// 正确:使用ConcurrentBag或其他方式收集结果
var results = new ConcurrentBag<Result>();
// 更好的方式:从lambda返回结果并收集

Prefer Async Local Functions

优先使用异步本地函数

Use async local functions instead of
Task.Run(async () => ...)
or
ContinueWith()
:
使用异步本地函数替代
Task.Run(async () => ...)
ContinueWith()

Don't: Anonymous Async Lambda

不推荐:匿名异步Lambda

csharp
private void HandleCommand(MyCommand cmd)
{
    var self = Self;

    _ = Task.Run(async () =>
    {
        // Lots of async work here...
        var result = await DoWorkAsync();
        return new WorkCompleted(result);
    }).PipeTo(self);
}
csharp
private void HandleCommand(MyCommand cmd)
{
    var self = Self;

    _ = Task.Run(async () =>
    {
        // 大量异步工作...
        var result = await DoWorkAsync();
        return new WorkCompleted(result);
    }).PipeTo(self);
}

Do: Async Local Function

推荐:异步本地函数

csharp
private void HandleCommand(MyCommand cmd)
{
    async Task<WorkCompleted> ExecuteAsync()
    {
        // Lots of async work here...
        var result = await DoWorkAsync();
        return new WorkCompleted(result);
    }

    ExecuteAsync().PipeTo(Self);
}
csharp
private void HandleCommand(MyCommand cmd)
{
    async Task<WorkCompleted> ExecuteAsync()
    {
        // 大量异步工作...
        var result = await DoWorkAsync();
        return new WorkCompleted(result);
    }

    ExecuteAsync().PipeTo(Self);
}

Avoid ContinueWith for Sequencing

避免使用ContinueWith进行任务排序

Don't:
csharp
someTask
    .ContinueWith(t => ProcessResult(t.Result))
    .ContinueWith(t => SendNotification(t.Result));
Do:
csharp
async Task ProcessAndNotifyAsync()
{
    var result = await someTask;
    var processed = await ProcessResult(result);
    await SendNotification(processed);
}

ProcessAndNotifyAsync();
不推荐:
csharp
someTask
    .ContinueWith(t => ProcessResult(t.Result))
    .ContinueWith(t => SendNotification(t.Result));
推荐:
csharp
async Task ProcessAndNotifyAsync()
{
    var result = await someTask;
    var processed = await ProcessResult(result);
    await SendNotification(processed);
}

ProcessAndNotifyAsync();

Why This Matters

为何这很重要

BenefitDescription
ReadabilityNamed functions are self-documenting; anonymous lambdas obscure intent
DebuggingStack traces show meaningful function names instead of
<>c__DisplayClass
Exception handlingCleaner try/catch structure without
AggregateException
unwrapping
Scope clarityLocal functions make captured variables explicit
TestabilityEasier to extract and unit test the async logic
优势描述
可读性命名函数自文档化;匿名Lambda会模糊意图
调试堆栈跟踪显示有意义的函数名,而非
<>c__DisplayClass
异常处理更清晰的try/catch结构,无需解包
AggregateException
作用域清晰本地函数使捕获的变量更明确
可测试性更容易提取异步逻辑进行单元测试

Akka.NET Example

Akka.NET示例

When using
PipeTo
in actors, async local functions keep the pattern clean:
csharp
private void HandleSync(StartSync cmd)
{
    async Task<SyncResult> PerformSyncAsync()
    {
        await using var scope = _scopeFactory.CreateAsyncScope();
        var service = scope.ServiceProvider.GetRequiredService<ISyncService>();

        var count = await service.SyncAsync(cmd.EntityId);
        return new SyncResult(cmd.EntityId, count);
    }

    PerformSyncAsync().PipeTo(Self);
}
This is cleaner than wrapping everything in
Task.Run(async () => ...)
.

在Actor中使用
PipeTo
时,异步本地函数让代码更简洁:
csharp
private void HandleSync(StartSync cmd)
{
    async Task<SyncResult> PerformSyncAsync()
    {
        await using var scope = _scopeFactory.CreateAsyncScope();
        var service = scope.ServiceProvider.GetRequiredService<ISyncService>();

        var count = await service.SyncAsync(cmd.EntityId);
        return new SyncResult(cmd.EntityId, count);
    }

    PerformSyncAsync().PipeTo(Self);
}
这比将所有代码包裹在
Task.Run(async () => ...)
中更简洁。

Quick Reference: Which Tool When?

快速参考:何时使用哪种工具?

NeedToolExample
Wait for I/O
async/await
HTTP calls, database queries
Parallel CPU work
Parallel.ForEachAsync
Image processing, calculations
Work queue
Channel<T>
Background job processing
UI events with debounce/throttleReactive ExtensionsSearch-as-you-type, auto-save
Server-side batching/throttlingAkka.NET StreamsEvent aggregation, rate limiting
State machinesAkka.NET ActorsPayment flows, order lifecycles
Entity state managementAkka.NET ActorsOrder management, user sessions
Fire multiple async ops
Task.WhenAll
Loading dashboard data
Race multiple async ops
Task.WhenAny
Timeout with fallback
Periodic work
PeriodicTimer
Health checks, polling

需求工具示例
等待I/O
async/await
HTTP调用、数据库查询
并行CPU工作
Parallel.ForEachAsync
图片处理、计算任务
工作队列
Channel<T>
后台任务处理
带防抖/节流的UI事件Reactive Extensions即时搜索、自动保存
服务端批处理/节流Akka.NET Streams事件聚合、速率限制
状态机Akka.NET Actors支付流程、订单生命周期
实体状态管理Akka.NET Actors订单管理、用户会话
触发多个异步操作
Task.WhenAll
加载仪表盘数据
异步操作竞速
Task.WhenAny
带降级的超时处理
周期性任务
PeriodicTimer
健康检查、轮询

The Escalation Path

升级路径

async/await (start here)
    ├─► Need parallelism? → Parallel.ForEachAsync
    ├─► Need producer/consumer? → Channel<T>
    ├─► Need UI event composition? → Reactive Extensions
    ├─► Need server-side stream processing? → Akka.NET Streams
    └─► Need state machines or entity management? → Akka.NET Actors
Only escalate when you have a concrete need. Don't reach for actors or streams "just in case" - start with async/await and move up only when the simpler approach doesn't fit.
async/await(从这里开始)
    ├─► 需要并行处理? → Parallel.ForEachAsync
    ├─► 需要生产者/消费者模式? → Channel<T>
    ├─► 需要UI事件组合? → Reactive Extensions
    ├─► 需要服务端流处理? → Akka.NET Streams
    └─► 需要状态机或实体管理? → Akka.NET Actors
仅在有具体需求时才升级方案。 不要为了"以防万一"就使用Actor或流处理——从async/await开始,只有当简单方案不适用时再升级。