akka-net-best-practices

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Akka.NET Best Practices

Akka.NET 最佳实践

When to Use This Skill

何时使用本技能

Use this skill when:
  • Designing actor communication patterns
  • Deciding between EventStream and DistributedPubSub
  • Implementing error handling in actors
  • Understanding supervision strategies
  • Choosing between Props patterns and DependencyResolver
  • Designing work distribution across nodes
  • Creating testable actor systems that can run with or without cluster infrastructure
  • Abstracting over Cluster Sharding for local testing scenarios

在以下场景中使用本技能:
  • 设计Actor通信模式
  • 选择EventStream还是DistributedPubSub
  • 在Actor中实现错误处理
  • 理解监督策略
  • 选择Props模式还是DependencyResolver
  • 设计跨节点的工作分配
  • 创建可测试的Actor系统,支持有无集群基础设施的运行环境
  • 为本地测试场景抽象Cluster Sharding

1. EventStream vs DistributedPubSub

1. EventStream 与 DistributedPubSub 对比

Critical: EventStream is LOCAL ONLY

重点:EventStream 仅支持本地使用

Context.System.EventStream
is local to a single ActorSystem process. It does NOT work across cluster nodes.
csharp
// BAD: This only works on a single server
// When you add a second server, subscribers on server 2 won't receive events from server 1
Context.System.EventStream.Subscribe(Self, typeof(PostCreated));
Context.System.EventStream.Publish(new PostCreated(postId, authorId));
When EventStream is appropriate:
  • Logging and diagnostics within a single process
  • Local event bus for truly single-process applications
  • Development/testing scenarios
Context.System.EventStream
仅在单个ActorSystem进程内可用,无法跨集群节点工作。
csharp
// BAD: This only works on a single server
// When you add a second server, subscribers on server 2 won't receive events from server 1
Context.System.EventStream.Subscribe(Self, typeof(PostCreated));
Context.System.EventStream.Publish(new PostCreated(postId, authorId));
适合使用EventStream的场景:
  • 单进程内的日志记录与诊断
  • 纯单进程应用的本地事件总线
  • 开发/测试场景

Use DistributedPubSub for Multi-Node

多节点场景使用DistributedPubSub

For events that must reach actors across multiple cluster nodes, use
Akka.Cluster.Tools.PublishSubscribe
:
csharp
using Akka.Cluster.Tools.PublishSubscribe;

public class TimelineUpdatePublisher : ReceiveActor
{
    private readonly IActorRef _mediator;

    public TimelineUpdatePublisher()
    {
        // Get the DistributedPubSub mediator
        _mediator = DistributedPubSub.Get(Context.System).Mediator;

        Receive<PublishTimelineUpdate>(msg =>
        {
            // Publish to a topic - reaches all subscribers across all nodes
            _mediator.Tell(new Publish($"timeline:{msg.UserId}", msg.Update));
        });
    }
}

public class TimelineSubscriber : ReceiveActor
{
    public TimelineSubscriber(UserId userId)
    {
        var mediator = DistributedPubSub.Get(Context.System).Mediator;

        // Subscribe to user-specific topic
        mediator.Tell(new Subscribe($"timeline:{userId}", Self));

        Receive<TimelineUpdate>(update =>
        {
            // Handle the update - this works across cluster nodes
        });

        Receive<SubscribeAck>(ack =>
        {
            // Subscription confirmed
        });
    }
}
对于需要跨多个集群节点传递给Actor的事件,请使用
Akka.Cluster.Tools.PublishSubscribe
csharp
using Akka.Cluster.Tools.PublishSubscribe;

public class TimelineUpdatePublisher : ReceiveActor
{
    private readonly IActorRef _mediator;

    public TimelineUpdatePublisher()
    {
        // Get the DistributedPubSub mediator
        _mediator = DistributedPubSub.Get(Context.System).Mediator;

        Receive<PublishTimelineUpdate>(msg =>
        {
            // Publish to a topic - reaches all subscribers across all nodes
            _mediator.Tell(new Publish($"timeline:{msg.UserId}", msg.Update));
        });
    }
}

public class TimelineSubscriber : ReceiveActor
{
    public TimelineSubscriber(UserId userId)
    {
        var mediator = DistributedPubSub.Get(Context.System).Mediator;

        // Subscribe to user-specific topic
        mediator.Tell(new Subscribe($"timeline:{userId}", Self));

        Receive<TimelineUpdate>(update =>
        {
            // Handle the update - this works across cluster nodes
        });

        Receive<SubscribeAck>(ack =>
        {
            // Subscription confirmed
        });
    }
}

Akka.Hosting Configuration for DistributedPubSub

DistributedPubSub 的 Akka.Hosting 配置

csharp
builder.WithDistributedPubSub(role: null); // Available on all roles, or specify a role
csharp
builder.WithDistributedPubSub(role: null); // Available on all roles, or specify a role

Topic Design Patterns

主题设计模式

PatternTopic FormatUse Case
Per-user
timeline:{userId}
Timeline updates, notifications
Per-entity
post:{postId}
Post engagement updates
Broadcast
system:announcements
System-wide notifications
Role-based
workers:rss-poller
Work distribution

模式主题格式适用场景
按用户划分
timeline:{userId}
时间线更新、通知
按实体划分
post:{postId}
帖子互动更新
广播
system:announcements
系统级通知
按角色划分
workers:rss-poller
工作分配

2. Supervision Strategies

2. 监督策略

Key Clarification: Supervision is for CHILDREN

关键说明:监督仅针对子Actor

A supervision strategy defined on an actor dictates how that actor supervises its children, NOT how the actor itself is supervised.
csharp
public class ParentActor : ReceiveActor
{
    // This strategy applies to children of ParentActor, NOT to ParentActor itself
    protected override SupervisorStrategy SupervisorStrategy()
    {
        return new OneForOneStrategy(
            maxNrOfRetries: 10,
            withinTimeRange: TimeSpan.FromSeconds(30),
            decider: ex => ex switch
            {
                ArithmeticException => Directive.Resume,
                NullReferenceException => Directive.Restart,
                ArgumentException => Directive.Stop,
                _ => Directive.Escalate
            });
    }
}
Actor上定义的监督策略规定了该Actor如何监督其子Actor,而非该Actor自身如何被监督。
csharp
public class ParentActor : ReceiveActor
{
    // This strategy applies to children of ParentActor, NOT to ParentActor itself
    protected override SupervisorStrategy SupervisorStrategy()
    {
        return new OneForOneStrategy(
            maxNrOfRetries: 10,
            withinTimeRange: TimeSpan.FromSeconds(30),
            decider: ex => ex switch
            {
                ArithmeticException => Directive.Resume,
                NullReferenceException => Directive.Restart,
                ArgumentException => Directive.Stop,
                _ => Directive.Escalate
            });
    }
}

Default Supervision Strategy

默认监督策略

The default
OneForOneStrategy
already includes rate limiting:
  • 10 restarts within 1 second = actor is permanently stopped
  • This prevents infinite restart loops
You rarely need a custom strategy unless you have specific requirements.
默认的
OneForOneStrategy
已包含速率限制:
  • 1秒内重启10次 → Actor被永久停止
  • 这可以防止无限重启循环
除非有特定需求,否则很少需要自定义策略

When to Define Custom Supervision

何时定义自定义监督策略

Good reasons:
  • Actor throws exceptions indicating irrecoverable state corruption → Restart
  • Actor throws exceptions that should NOT cause restart (expected failures) → Resume
  • Child failures should affect siblings → Use
    AllForOneStrategy
  • Need different retry limits than the default
Bad reasons:
  • "Just to be safe" - the default is already safe
  • Don't understand what the actor does - understand it first
合理理由:
  • Actor抛出的异常表明状态已损坏且无法恢复 → 重启
  • Actor抛出的异常不应导致重启(预期内的失败) → 恢复
  • 子Actor的失败应影响同级Actor → 使用
    AllForOneStrategy
  • 需要与默认值不同的重试限制
不合理理由:
  • “只是为了安全” → 默认策略已经足够安全
  • 不理解Actor的功能 → 先理解其功能

Example: When Custom Supervision Makes Sense

示例:自定义监督策略的适用场景

csharp
public class RssFeedCoordinator : ReceiveActor
{
    protected override SupervisorStrategy SupervisorStrategy()
    {
        return new OneForOneStrategy(
            maxNrOfRetries: -1, // Unlimited retries
            withinTimeRange: TimeSpan.FromMinutes(1),
            decider: ex => ex switch
            {
                // HTTP timeout - transient, resume and let the actor retry via its own timer
                HttpRequestException => Directive.Resume,

                // Feed URL permanently invalid - stop this child, don't restart forever
                InvalidFeedUrlException => Directive.Stop,

                // Unknown error - restart to clear potentially corrupt state
                _ => Directive.Restart
            });
    }
}

csharp
public class RssFeedCoordinator : ReceiveActor
{
    protected override SupervisorStrategy SupervisorStrategy()
    {
        return new OneForOneStrategy(
            maxNrOfRetries: -1, // Unlimited retries
            withinTimeRange: TimeSpan.FromMinutes(1),
            decider: ex => ex switch
            {
                // HTTP timeout - transient, resume and let the actor retry via its own timer
                HttpRequestException => Directive.Resume,

                // Feed URL permanently invalid - stop this child, don't restart forever
                InvalidFeedUrlException => Directive.Stop,

                // Unknown error - restart to clear potentially corrupt state
                _ => Directive.Restart
            });
    }
}

3. Error Handling: Supervision vs Try-Catch

3. 错误处理:监督 vs Try-Catch

When to Use Try-Catch (Most Cases)

何时使用Try-Catch(大多数场景)

Use try-catch when:
  • The failure is expected (network timeout, invalid input, external service down)
  • You know exactly why the exception occurred
  • You can handle it gracefully (retry, return error response, log and continue)
  • Restarting would not help (same error would occur again)
csharp
public class RssFeedPollerActor : ReceiveActor
{
    public RssFeedPollerActor()
    {
        ReceiveAsync<PollFeed>(async msg =>
        {
            try
            {
                var feed = await _httpClient.GetStringAsync(msg.FeedUrl);
                var items = ParseFeed(feed);
                // Process items...
            }
            catch (HttpRequestException ex)
            {
                // Expected failure - log and schedule retry
                _log.Warning("Feed {Url} unavailable: {Error}", msg.FeedUrl, ex.Message);
                Context.System.Scheduler.ScheduleTellOnce(
                    TimeSpan.FromMinutes(5),
                    Self,
                    msg,
                    Self);
            }
            catch (XmlException ex)
            {
                // Invalid feed format - log and mark as bad
                _log.Error("Feed {Url} has invalid format: {Error}", msg.FeedUrl, ex.Message);
                Sender.Tell(new FeedPollResult.InvalidFormat(msg.FeedUrl));
            }
        });
    }
}
在以下情况使用try-catch:
  • 失败是预期内的(网络超时、无效输入、外部服务宕机)
  • 完全清楚异常发生的原因
  • 你可以优雅地处理它(重试、返回错误响应、记录日志并继续)
  • 重启无济于事(错误会再次发生)
csharp
public class RssFeedPollerActor : ReceiveActor
{
    public RssFeedPollerActor()
    {
        ReceiveAsync<PollFeed>(async msg =>
        {
            try
            {
                var feed = await _httpClient.GetStringAsync(msg.FeedUrl);
                var items = ParseFeed(feed);
                // Process items...
            }
            catch (HttpRequestException ex)
            {
                // Expected failure - log and schedule retry
                _log.Warning("Feed {Url} unavailable: {Error}", msg.FeedUrl, ex.Message);
                Context.System.Scheduler.ScheduleTellOnce(
                    TimeSpan.FromMinutes(5),
                    Self,
                    msg,
                    Self);
            }
            catch (XmlException ex)
            {
                // Invalid feed format - log and mark as bad
                _log.Error("Feed {Url} has invalid format: {Error}", msg.FeedUrl, ex.Message);
                Sender.Tell(new FeedPollResult.InvalidFormat(msg.FeedUrl));
            }
        });
    }
}

When to Let Supervision Handle It

何时让监督策略处理异常

Let exceptions propagate (trigger supervision) when:
  • You have no idea why the exception occurred
  • The actor's state might be corrupt
  • A restart would help (fresh state, reconnect resources)
  • It's a programming error (NullReferenceException, InvalidOperationException from bad logic)
csharp
public class OrderActor : ReceiveActor
{
    private OrderState _state;

    public OrderActor()
    {
        Receive<ProcessPayment>(msg =>
        {
            // If this throws, we have no idea why - let supervision restart us
            // A restart will reload state from persistence and might fix the issue
            var result = _state.ApplyPayment(msg.Amount);
            Persist(new PaymentApplied(msg.Amount), evt =>
            {
                _state = _state.With(evt);
            });
        });
    }
}
在以下情况让异常传播(触发监督):
  • 完全不清楚异常发生的原因
  • Actor的状态可能已损坏
  • 重启会有帮助(刷新状态、重新连接资源)
  • 这是编程错误(NullReferenceException、由错误逻辑导致的InvalidOperationException)
csharp
public class OrderActor : ReceiveActor
{
    private OrderState _state;

    public OrderActor()
    {
        Receive<ProcessPayment>(msg =>
        {
            // If this throws, we have no idea why - let supervision restart us
            // A restart will reload state from persistence and might fix the issue
            var result = _state.ApplyPayment(msg.Amount);
            Persist(new PaymentApplied(msg.Amount), evt =>
            {
                _state = _state.With(evt);
            });
        });
    }
}

Anti-Pattern: Swallowing Unknown Exceptions

反模式:吞掉未知异常

csharp
// BAD: Swallowing exceptions hides problems
public class BadActor : ReceiveActor
{
    public BadActor()
    {
        ReceiveAsync<DoWork>(async msg =>
        {
            try
            {
                await ProcessWork(msg);
            }
            catch (Exception ex)
            {
                // This hides all errors - you'll never know something is broken
                _log.Error(ex, "Error processing work");
                // Actor continues with potentially corrupt state
            }
        });
    }
}

// GOOD: Handle known exceptions, let unknown ones propagate
public class GoodActor : ReceiveActor
{
    public GoodActor()
    {
        ReceiveAsync<DoWork>(async msg =>
        {
            try
            {
                await ProcessWork(msg);
            }
            catch (HttpRequestException ex)
            {
                // Known, expected failure - handle gracefully
                _log.Warning("HTTP request failed: {Error}", ex.Message);
                Sender.Tell(new WorkResult.TransientFailure());
            }
            // Unknown exceptions propagate to supervision
        });
    }
}

csharp
// BAD: Swallowing exceptions hides problems
public class BadActor : ReceiveActor
{
    public BadActor()
    {
        ReceiveAsync<DoWork>(async msg =>
        {
            try
            {
                await ProcessWork(msg);
            }
            catch (Exception ex)
            {
                // This hides all errors - you'll never know something is broken
                _log.Error(ex, "Error processing work");
                // Actor continues with potentially corrupt state
            }
        });
    }
}

// GOOD: Handle known exceptions, let unknown ones propagate
public class GoodActor : ReceiveActor
{
    public GoodActor()
    {
        ReceiveAsync<DoWork>(async msg =>
        {
            try
            {
                await ProcessWork(msg);
            }
            catch (HttpRequestException ex)
            {
                // Known, expected failure - handle gracefully
                _log.Warning("HTTP request failed: {Error}", ex.Message);
                Sender.Tell(new WorkResult.TransientFailure());
            }
            // Unknown exceptions propagate to supervision
        });
    }
}

4. Props vs DependencyResolver

4. Props vs DependencyResolver

When to Use Plain Props

何时使用普通Props

Use
Props.Create()
when:
  • Actor doesn't need
    IServiceProvider
    or
    IRequiredActor<T>
  • All dependencies can be passed via constructor
  • Actor is simple and self-contained
csharp
// Simple actor with no DI needs
public static Props Props(PostId postId, IPostWriteStore store)
    => Akka.Actor.Props.Create(() => new PostEngagementActor(postId, store));

// Usage
var actor = Context.ActorOf(PostEngagementActor.Props(postId, store), postId.ToString());
在以下情况使用
Props.Create()
  • Actor不需要
    IServiceProvider
    IRequiredActor<T>
  • 所有依赖都可以通过构造函数传递
  • Actor简单且独立
csharp
// Simple actor with no DI needs
public static Props Props(PostId postId, IPostWriteStore store)
    => Akka.Actor.Props.Create(() => new PostEngagementActor(postId, store));

// Usage
var actor = Context.ActorOf(PostEngagementActor.Props(postId, store), postId.ToString());

When to Use DependencyResolver

何时使用DependencyResolver

Use
resolver.Props<T>()
when:
  • Actor needs
    IServiceProvider
    to create scoped services
  • Actor uses
    IRequiredActor<T>
    to get references to other actors
  • Actor has many dependencies that are already in DI container
csharp
// Actor that needs scoped database connections
public class OrderProcessorActor : ReceiveActor
{
    public OrderProcessorActor(IServiceProvider serviceProvider)
    {
        ReceiveAsync<ProcessOrder>(async msg =>
        {
            // Create a scope for this operation
            using var scope = serviceProvider.CreateScope();
            var dbContext = scope.ServiceProvider.GetRequiredService<OrderDbContext>();
            // Process order...
        });
    }
}

// Registration with DI
builder.WithActors((system, registry, resolver) =>
{
    var actor = system.ActorOf(resolver.Props<OrderProcessorActor>(), "order-processor");
    registry.Register<OrderProcessorActor>(actor);
});
在以下情况使用
resolver.Props<T>()
  • Actor需要
    IServiceProvider
    来创建范围服务
  • Actor使用
    IRequiredActor<T>
    获取其他Actor的引用
  • Actor有多个已在DI容器中的依赖
csharp
// Actor that needs scoped database connections
public class OrderProcessorActor : ReceiveActor
{
    public OrderProcessorActor(IServiceProvider serviceProvider)
    {
        ReceiveAsync<ProcessOrder>(async msg =>
        {
            // Create a scope for this operation
            using var scope = serviceProvider.CreateScope();
            var dbContext = scope.ServiceProvider.GetRequiredService<OrderDbContext>();
            // Process order...
        });
    }
}

// Registration with DI
builder.WithActors((system, registry, resolver) =>
{
    var actor = system.ActorOf(resolver.Props<OrderProcessorActor>(), "order-processor");
    registry.Register<OrderProcessorActor>(actor);
});

Remote Deployment Considerations

远程部署注意事项

You almost never need remote deployment. Remote deployment means deploying an actor to run on a different node than the one creating it.
If you're not doing remote deployment (and you probably aren't):
  • Props.Create(() => new Actor(...))
    with closures is fine
  • The "serialization issue" warning doesn't apply
When you would use remote deployment:
  • Distributing compute-intensive work to specific nodes
  • Running actors on nodes with specific hardware (GPU, etc.)
For most applications, use cluster sharding instead of remote deployment - it handles distribution automatically.

几乎不需要远程部署。远程部署意味着将Actor部署到与创建它的节点不同的节点上运行。
如果你不进行远程部署(通常确实不需要):
  • 使用闭包的
    Props.Create(() => new Actor(...))
    是没问题的
  • “序列化问题”的警告不适用
何时需要远程部署:
  • 将计算密集型工作分配到特定节点
  • 在具有特定硬件(如GPU)的节点上运行Actor
对于大多数应用,请使用集群分片代替远程部署——它会自动处理分配。

5. Work Distribution Patterns

5. 工作分配模式

Problem: Thundering Herd

问题:惊群效应

When you have many background jobs (RSS feeds, email sending, etc.), don't process them all at once:
csharp
// BAD: Polls all feeds simultaneously on startup
public class BadRssCoordinator : ReceiveActor
{
    public BadRssCoordinator(IRssFeedRepository repo)
    {
        ReceiveAsync<StartPolling>(async _ =>
        {
            var feeds = await repo.GetAllFeedsAsync();
            foreach (var feed in feeds) // 2000 feeds = 2000 simultaneous HTTP requests
            {
                Context.ActorOf(RssFeedPollerActor.Props(feed.Url));
            }
        });
    }
}
当你有许多后台任务(如RSS订阅、邮件发送等)时,不要同时处理所有任务:
csharp
// BAD: Polls all feeds simultaneously on startup
public class BadRssCoordinator : ReceiveActor
{
    public BadRssCoordinator(IRssFeedRepository repo)
    {
        ReceiveAsync<StartPolling>(async _ =>
        {
            var feeds = await repo.GetAllFeedsAsync();
            foreach (var feed in feeds) // 2000 feeds = 2000 simultaneous HTTP requests
            {
                Context.ActorOf(RssFeedPollerActor.Props(feed.Url));
            }
        });
    }
}

Pattern 1: Database-Driven Work Queue

模式1:数据库驱动的工作队列

Use the database as a work queue with
FOR UPDATE SKIP LOCKED
:
csharp
public class RssPollerWorker : ReceiveActor
{
    public RssPollerWorker(IRssFeedRepository repo)
    {
        ReceiveAsync<PollBatch>(async _ =>
        {
            // Each worker claims a batch - naturally distributes across nodes
            var feeds = await repo.ClaimFeedsForPollingAsync(
                batchSize: 10,
                staleAfter: TimeSpan.FromMinutes(10));

            foreach (var feed in feeds)
            {
                try
                {
                    await PollFeed(feed);
                    await repo.MarkPolledAsync(feed.Id, success: true);
                }
                catch (Exception ex)
                {
                    await repo.MarkPolledAsync(feed.Id, success: false, error: ex.Message);
                }
            }

            // Schedule next batch
            Context.System.Scheduler.ScheduleTellOnce(
                TimeSpan.FromSeconds(5),
                Self,
                PollBatch.Instance,
                Self);
        });
    }
}
sql
-- ClaimFeedsForPollingAsync implementation
UPDATE rss_feeds
SET status = 'processing',
    processing_started_at = NOW()
WHERE id IN (
    SELECT id FROM rss_feeds
    WHERE status = 'pending'
      AND (next_poll_at IS NULL OR next_poll_at <= NOW())
    ORDER BY next_poll_at NULLS FIRST
    LIMIT @batchSize
    FOR UPDATE SKIP LOCKED
)
RETURNING *;
Benefits:
  • Naturally distributes work across multiple server nodes
  • No coordination needed - database handles locking
  • Easy to monitor (query the table)
  • Survives server restarts
使用数据库作为工作队列,配合
FOR UPDATE SKIP LOCKED
csharp
public class RssPollerWorker : ReceiveActor
{
    public RssPollerWorker(IRssFeedRepository repo)
    {
        ReceiveAsync<PollBatch>(async _ =>
        {
            // Each worker claims a batch - naturally distributes across nodes
            var feeds = await repo.ClaimFeedsForPollingAsync(
                batchSize: 10,
                staleAfter: TimeSpan.FromMinutes(10));

            foreach (var feed in feeds)
            {
                try
                {
                    await PollFeed(feed);
                    await repo.MarkPolledAsync(feed.Id, success: true);
                }
                catch (Exception ex)
                {
                    await repo.MarkPolledAsync(feed.Id, success: false, error: ex.Message);
                }
            }

            // Schedule next batch
            Context.System.Scheduler.ScheduleTellOnce(
                TimeSpan.FromSeconds(5),
                Self,
                PollBatch.Instance,
                Self);
        });
    }
}
sql
-- ClaimFeedsForPollingAsync implementation
UPDATE rss_feeds
SET status = 'processing',
    processing_started_at = NOW()
WHERE id IN (
    SELECT id FROM rss_feeds
    WHERE status = 'pending'
      AND (next_poll_at IS NULL OR next_poll_at <= NOW())
    ORDER BY next_poll_at NULLS FIRST
    LIMIT @batchSize
    FOR UPDATE SKIP LOCKED
)
RETURNING *;
优势:
  • 自然地跨多个服务器节点分配工作
  • 无需协调——数据库处理锁定
  • 易于监控(查询表即可)
  • 服务器重启后仍能继续

Pattern 2: Akka.Streams for Rate Limiting

模式2:使用Akka.Streams进行速率限制

Use Akka.Streams to throttle processing within a single node:
csharp
public class ThrottledRssProcessor : ReceiveActor
{
    public ThrottledRssProcessor(IRssFeedRepository repo)
    {
        var materializer = Context.System.Materializer();

        ReceiveAsync<StartProcessing>(async _ =>
        {
            await Source.From(await repo.GetPendingFeedsAsync())
                .Throttle(10, TimeSpan.FromSeconds(1)) // Max 10 per second
                .SelectAsync(4, async feed => // Max 4 concurrent
                {
                    await PollFeed(feed);
                    return feed;
                })
                .RunWith(Sink.Ignore<RssFeed>(), materializer);
        });
    }
}
使用Akka.Streams在单个节点内限制处理速率:
csharp
public class ThrottledRssProcessor : ReceiveActor
{
    public ThrottledRssProcessor(IRssFeedRepository repo)
    {
        var materializer = Context.System.Materializer();

        ReceiveAsync<StartProcessing>(async _ =>
        {
            await Source.From(await repo.GetPendingFeedsAsync())
                .Throttle(10, TimeSpan.FromSeconds(1)) // Max 10 per second
                .SelectAsync(4, async feed => // Max 4 concurrent
                {
                    await PollFeed(feed);
                    return feed;
                })
                .RunWith(Sink.Ignore<RssFeed>(), materializer);
        });
    }
}

Pattern 3: Durable Queue (Email Outbox Pattern)

模式3:持久化队列(邮件发件箱模式)

For work that must be reliably processed, use a database-backed outbox:
csharp
// Enqueue work transactionally with business operation
public async Task CreatePostAsync(Post post)
{
    await using var transaction = await _db.BeginTransactionAsync();

    await _postStore.CreateAsync(post);

    // Enqueue notification emails in same transaction
    foreach (var follower in await _followStore.GetFollowersAsync(post.AuthorId))
    {
        await _emailOutbox.EnqueueAsync(new EmailJob
        {
            To = follower.Email,
            Template = "new-post",
            Data = JsonSerializer.Serialize(new { PostId = post.Id })
        });
    }

    await transaction.CommitAsync();
}

// Worker processes outbox
public class EmailOutboxWorker : ReceiveActor
{
    public EmailOutboxWorker(IEmailOutboxStore outbox, IEmailSender sender)
    {
        ReceiveAsync<ProcessBatch>(async _ =>
        {
            var batch = await outbox.ClaimBatchAsync(10);
            foreach (var job in batch)
            {
                try
                {
                    await sender.SendAsync(job);
                    await outbox.MarkSentAsync(job.Id);
                }
                catch (Exception ex)
                {
                    await outbox.MarkFailedAsync(job.Id, ex.Message);
                }
            }
        });
    }
}

对于必须可靠处理的工作,请使用数据库支持的发件箱:
csharp
// Enqueue work transactionally with business operation
public async Task CreatePostAsync(Post post)
{
    await using var transaction = await _db.BeginTransactionAsync();

    await _postStore.CreateAsync(post);

    // Enqueue notification emails in same transaction
    foreach (var follower in await _followStore.GetFollowersAsync(post.AuthorId))
    {
        await _emailOutbox.EnqueueAsync(new EmailJob
        {
            To = follower.Email,
            Template = "new-post",
            Data = JsonSerializer.Serialize(new { PostId = post.Id })
        });
    }

    await transaction.CommitAsync();
}

// Worker processes outbox
public class EmailOutboxWorker : ReceiveActor
{
    public EmailOutboxWorker(IEmailOutboxStore outbox, IEmailSender sender)
    {
        ReceiveAsync<ProcessBatch>(async _ =>
        {
            var batch = await outbox.ClaimBatchAsync(10);
            foreach (var job in batch)
            {
                try
                {
                    await sender.SendAsync(job);
                    await outbox.MarkSentAsync(job.Id);
                }
                catch (Exception ex)
                {
                    await outbox.MarkFailedAsync(job.Id, ex.Message);
                }
            }
        });
    }
}

6. Common Mistakes Summary

6. 常见错误总结

MistakeWhy It's WrongFix
Using EventStream for cross-node pub/subEventStream is local onlyUse DistributedPubSub
Defining supervision to "protect" an actorSupervision protects childrenUnderstand the hierarchy
Catching all exceptionsHides bugs, corrupts stateOnly catch expected errors
Always using DependencyResolverAdds unnecessary complexityUse plain Props when possible
Processing all background jobs at onceThundering herd, resource exhaustionUse database queue + rate limiting
Throwing exceptions for expected failuresTriggers unnecessary restartsReturn result types, use messaging

错误原因修复方案
使用EventStream进行跨节点发布/订阅EventStream仅支持本地使用DistributedPubSub
定义监督策略以“保护”Actor自身监督策略保护的是子Actor理解Actor层级结构
捕获所有异常隐藏bug、导致状态损坏仅捕获预期内的错误
始终使用DependencyResolver增加不必要的复杂度可能的话使用普通Props
同时处理所有后台任务惊群效应、资源耗尽使用数据库队列+速率限制
为预期内的失败抛出异常触发不必要的重启返回结果类型、使用消息机制

7. Quick Reference

7. 快速参考

Communication Pattern Decision Tree

通信模式决策树

Need to communicate between actors?
├── Same process only? → EventStream is fine
├── Across cluster nodes?
│   ├── Point-to-point? → Use ActorSelection or known IActorRef
│   └── Pub/sub? → Use DistributedPubSub
└── Fire-and-forget to external system? → Consider outbox pattern
需要在Actor之间通信?
├── 仅在同一进程内? → 可以使用EventStream
├── 跨集群节点?
│   ├── 点对点? → 使用ActorSelection或已知的IActorRef
│   └── 发布/订阅? → 使用DistributedPubSub
└── 向外部系统发送即发即弃的消息? → 考虑发件箱模式

Error Handling Decision Tree

错误处理决策树

Exception occurred in actor?
├── Expected failure (HTTP timeout, invalid input)?
│   └── Try-catch, handle gracefully, continue
├── State might be corrupt?
│   └── Let supervision restart
├── Unknown cause?
│   └── Let supervision restart
└── Programming error (null ref, bad logic)?
    └── Let supervision restart, fix the bug
Actor中发生异常?
├── 预期内的失败(HTTP超时、无效输入)?
│   └── 使用try-catch优雅处理,继续运行
├── 状态可能已损坏?
│   └── 让监督策略重启Actor
├── 未知原因?
│   └── 让监督策略重启Actor
└── 编程错误(空引用、逻辑错误)?
    └── 让监督策略重启Actor,修复bug

Props Decision Tree

Props决策树

Creating actor Props?
├── Actor needs IServiceProvider?
│   └── Use resolver.Props<T>()
├── Actor needs IRequiredActor<T>?
│   └── Use resolver.Props<T>()
├── Simple actor with constructor params?
│   └── Use Props.Create(() => new Actor(...))
└── Remote deployment needed?
    └── Probably not - use cluster sharding instead

创建Actor的Props?
├── Actor需要IServiceProvider?
│   └── 使用resolver.Props<T>()
├── Actor需要IRequiredActor<T>?
│   └── 使用resolver.Props<T>()
├── 简单Actor,仅需构造函数参数?
│   └── 使用Props.Create(() => new Actor(...))
└── 需要远程部署?
    └── 通常不需要——使用集群分片代替

8. Cluster/Local Mode Abstractions

8. 集群/本地模式抽象

For applications that need to run both in clustered production environments and local/test environments without cluster infrastructure, use abstraction patterns to toggle between implementations.
对于需要同时在集群生产环境和无集群基础设施的本地/测试环境中运行的应用,请使用抽象模式在不同实现之间切换。

AkkaExecutionMode Enum

AkkaExecutionMode 枚举

Define an execution mode that controls which implementations are used:
csharp
/// <summary>
/// Determines how Akka.NET infrastructure features are configured.
/// </summary>
public enum AkkaExecutionMode
{
    /// <summary>
    /// Local test mode - no cluster infrastructure.
    /// Uses in-memory implementations for pub/sub and local parent actors
    /// instead of cluster sharding.
    /// </summary>
    LocalTest,

    /// <summary>
    /// Full cluster mode with sharding, singletons, and distributed pub/sub.
    /// </summary>
    Clustered
}
定义一个执行模式,控制使用哪种实现:
csharp
/// <summary>
/// Determines how Akka.NET infrastructure features are configured.
/// </summary>
public enum AkkaExecutionMode
{
    /// <summary>
    /// Local test mode - no cluster infrastructure.
    /// Uses in-memory implementations for pub/sub and local parent actors
    /// instead of cluster sharding.
    /// </summary>
    LocalTest,

    /// <summary>
    /// Full cluster mode with sharding, singletons, and distributed pub/sub.
    /// </summary>
    Clustered
}

GenericChildPerEntityParent - Local Sharding Alternative

GenericChildPerEntityParent - 本地分片替代方案

When testing locally, you can't use Cluster Sharding. This actor mimics sharding behavior by creating child actors per entity ID using the same
IMessageExtractor
interface:
csharp
/// <summary>
/// A local parent actor that mimics Cluster Sharding behavior.
/// Creates and manages child actors per entity ID using the same IMessageExtractor
/// that would be used with real sharding, enabling seamless switching between modes.
/// </summary>
public sealed class GenericChildPerEntityParent : ReceiveActor
{
    private readonly IMessageExtractor _extractor;
    private readonly Func<string, Props> _propsFactory;
    private readonly Dictionary<string, IActorRef> _children = new();
    private readonly ILoggingAdapter _log = Context.GetLogger();

    public GenericChildPerEntityParent(
        IMessageExtractor extractor,
        Func<string, Props> propsFactory)
    {
        _extractor = extractor;
        _propsFactory = propsFactory;

        ReceiveAny(msg =>
        {
            var entityId = _extractor.EntityId(msg);
            if (string.IsNullOrEmpty(entityId))
            {
                _log.Warning("Could not extract entity ID from message {0}", msg.GetType().Name);
                Unhandled(msg);
                return;
            }

            var child = GetOrCreateChild(entityId);

            // Unwrap the message if it's a ShardingEnvelope
            var unwrapped = _extractor.EntityMessage(msg);
            child.Forward(unwrapped);
        });
    }

    private IActorRef GetOrCreateChild(string entityId)
    {
        if (_children.TryGetValue(entityId, out var existing))
            return existing;

        var props = _propsFactory(entityId);
        var child = Context.ActorOf(props, entityId);
        Context.Watch(child);
        _children[entityId] = child;

        _log.Debug("Created child actor for entity {0}", entityId);
        return child;
    }

    protected override void PreRestart(Exception reason, object message)
    {
        // Don't stop children on restart
    }

    public static Props CreateProps(
        IMessageExtractor extractor,
        Func<string, Props> propsFactory)
    {
        return Props.Create(() => new GenericChildPerEntityParent(extractor, propsFactory));
    }
}
本地测试时无法使用Cluster Sharding。这个Actor通过使用相同的
IMessageExtractor
接口,为每个实体ID创建子Actor,模拟分片行为:
csharp
/// <summary>
/// A local parent actor that mimics Cluster Sharding behavior.
/// Creates and manages child actors per entity ID using the same IMessageExtractor
/// that would be used with real sharding, enabling seamless switching between modes.
/// </summary>
public sealed class GenericChildPerEntityParent : ReceiveActor
{
    private readonly IMessageExtractor _extractor;
    private readonly Func<string, Props> _propsFactory;
    private readonly Dictionary<string, IActorRef> _children = new();
    private readonly ILoggingAdapter _log = Context.GetLogger();

    public GenericChildPerEntityParent(
        IMessageExtractor extractor,
        Func<string, Props> propsFactory)
    {
        _extractor = extractor;
        _propsFactory = propsFactory;

        ReceiveAny(msg =>
        {
            var entityId = _extractor.EntityId(msg);
            if (string.IsNullOrEmpty(entityId))
            {
                _log.Warning("Could not extract entity ID from message {0}", msg.GetType().Name);
                Unhandled(msg);
                return;
            }

            var child = GetOrCreateChild(entityId);

            // Unwrap the message if it's a ShardingEnvelope
            var unwrapped = _extractor.EntityMessage(msg);
            child.Forward(unwrapped);
        });
    }

    private IActorRef GetOrCreateChild(string entityId)
    {
        if (_children.TryGetValue(entityId, out var existing))
            return existing;

        var props = _propsFactory(entityId);
        var child = Context.ActorOf(props, entityId);
        Context.Watch(child);
        _children[entityId] = child;

        _log.Debug("Created child actor for entity {0}", entityId);
        return child;
    }

    protected override void PreRestart(Exception reason, object message)
    {
        // Don't stop children on restart
    }

    public static Props CreateProps(
        IMessageExtractor extractor,
        Func<string, Props> propsFactory)
    {
        return Props.Create(() => new GenericChildPerEntityParent(extractor, propsFactory));
    }
}

IPubSubMediator - Abstracting DistributedPubSub

IPubSubMediator - 抽象DistributedPubSub

Create an interface to abstract over pub/sub so tests can use a local implementation:
csharp
/// <summary>
/// Abstraction over pub/sub messaging that allows swapping between
/// DistributedPubSub (clustered) and local implementations (testing).
/// </summary>
public interface IPubSubMediator
{
    /// <summary>
    /// Subscribe an actor to a topic.
    /// </summary>
    void Subscribe(string topic, IActorRef subscriber);

    /// <summary>
    /// Unsubscribe an actor from a topic.
    /// </summary>
    void Unsubscribe(string topic, IActorRef subscriber);

    /// <summary>
    /// Publish a message to all subscribers of a topic.
    /// </summary>
    void Publish(string topic, object message);

    /// <summary>
    /// Send a message to one subscriber of a topic (load balanced).
    /// </summary>
    void Send(string topic, object message);
}
创建一个接口来抽象发布/订阅机制,以便测试时可以使用本地实现:
csharp
/// <summary>
/// Abstraction over pub/sub messaging that allows swapping between
/// DistributedPubSub (clustered) and local implementations (testing).
/// </summary>
public interface IPubSubMediator
{
    /// <summary>
    /// Subscribe an actor to a topic.
    /// </summary>
    void Subscribe(string topic, IActorRef subscriber);

    /// <summary>
    /// Unsubscribe an actor from a topic.
    /// </summary>
    void Unsubscribe(string topic, IActorRef subscriber);

    /// <summary>
    /// Publish a message to all subscribers of a topic.
    /// </summary>
    void Publish(string topic, object message);

    /// <summary>
    /// Send a message to one subscriber of a topic (load balanced).
    /// </summary>
    void Send(string topic, object message);
}

LocalPubSubMediator - In-Memory Implementation

LocalPubSubMediator - 内存实现

csharp
/// <summary>
/// In-memory pub/sub implementation for local testing without cluster.
/// Uses the EventStream internally for simplicity.
/// </summary>
public sealed class LocalPubSubMediator : IPubSubMediator
{
    private readonly ActorSystem _system;
    private readonly ConcurrentDictionary<string, HashSet<IActorRef>> _subscriptions = new();
    private readonly object _lock = new();

    public LocalPubSubMediator(ActorSystem system)
    {
        _system = system;
    }

    public void Subscribe(string topic, IActorRef subscriber)
    {
        lock (_lock)
        {
            var subs = _subscriptions.GetOrAdd(topic, _ => new HashSet<IActorRef>());
            subs.Add(subscriber);
        }

        // Send acknowledgement like real DistributedPubSub does
        subscriber.Tell(new SubscribeAck(new Subscribe(topic, subscriber)));
    }

    public void Unsubscribe(string topic, IActorRef subscriber)
    {
        lock (_lock)
        {
            if (_subscriptions.TryGetValue(topic, out var subs))
            {
                subs.Remove(subscriber);
            }
        }

        subscriber.Tell(new UnsubscribeAck(new Unsubscribe(topic, subscriber)));
    }

    public void Publish(string topic, object message)
    {
        HashSet<IActorRef> subscribers;
        lock (_lock)
        {
            if (!_subscriptions.TryGetValue(topic, out var subs))
                return;
            subscribers = new HashSet<IActorRef>(subs);
        }

        foreach (var subscriber in subscribers)
        {
            subscriber.Tell(message);
        }
    }

    public void Send(string topic, object message)
    {
        IActorRef? target = null;
        lock (_lock)
        {
            if (_subscriptions.TryGetValue(topic, out var subs) && subs.Count > 0)
            {
                // Simple round-robin - pick first available
                target = subs.FirstOrDefault();
            }
        }

        target?.Tell(message);
    }
}
csharp
/// <summary>
/// In-memory pub/sub implementation for local testing without cluster.
/// Uses the EventStream internally for simplicity.
/// </summary>
public sealed class LocalPubSubMediator : IPubSubMediator
{
    private readonly ActorSystem _system;
    private readonly ConcurrentDictionary<string, HashSet<IActorRef>> _subscriptions = new();
    private readonly object _lock = new();

    public LocalPubSubMediator(ActorSystem system)
    {
        _system = system;
    }

    public void Subscribe(string topic, IActorRef subscriber)
    {
        lock (_lock)
        {
            var subs = _subscriptions.GetOrAdd(topic, _ => new HashSet<IActorRef>());
            subs.Add(subscriber);
        }

        // Send acknowledgement like real DistributedPubSub does
        subscriber.Tell(new SubscribeAck(new Subscribe(topic, subscriber)));
    }

    public void Unsubscribe(string topic, IActorRef subscriber)
    {
        lock (_lock)
        {
            if (_subscriptions.TryGetValue(topic, out var subs))
            {
                subs.Remove(subscriber);
            }
        }

        subscriber.Tell(new UnsubscribeAck(new Unsubscribe(topic, subscriber)));
    }

    public void Publish(string topic, object message)
    {
        HashSet<IActorRef> subscribers;
        lock (_lock)
        {
            if (!_subscriptions.TryGetValue(topic, out var subs))
                return;
            subscribers = new HashSet<IActorRef>(subs);
        }

        foreach (var subscriber in subscribers)
        {
            subscriber.Tell(message);
        }
    }

    public void Send(string topic, object message)
    {
        IActorRef? target = null;
        lock (_lock)
        {
            if (_subscriptions.TryGetValue(topic, out var subs) && subs.Count > 0)
            {
                // Simple round-robin - pick first available
                target = subs.FirstOrDefault();
            }
        }

        target?.Tell(message);
    }
}

ClusterPubSubMediator - Production Implementation

ClusterPubSubMediator - 生产环境实现

csharp
/// <summary>
/// Production implementation wrapping Akka.Cluster.Tools.PublishSubscribe.
/// </summary>
public sealed class ClusterPubSubMediator : IPubSubMediator
{
    private readonly IActorRef _mediator;

    public ClusterPubSubMediator(ActorSystem system)
    {
        _mediator = DistributedPubSub.Get(system).Mediator;
    }

    public void Subscribe(string topic, IActorRef subscriber)
    {
        _mediator.Tell(new Subscribe(topic, subscriber));
    }

    public void Unsubscribe(string topic, IActorRef subscriber)
    {
        _mediator.Tell(new Unsubscribe(topic, subscriber));
    }

    public void Publish(string topic, object message)
    {
        _mediator.Tell(new Publish(topic, message));
    }

    public void Send(string topic, object message)
    {
        _mediator.Tell(new Send(topic, message, localAffinity: true));
    }
}
csharp
/// <summary>
/// Production implementation wrapping Akka.Cluster.Tools.PublishSubscribe.
/// </summary>
public sealed class ClusterPubSubMediator : IPubSubMediator
{
    private readonly IActorRef _mediator;

    public ClusterPubSubMediator(ActorSystem system)
    {
        _mediator = DistributedPubSub.Get(system).Mediator;
    }

    public void Subscribe(string topic, IActorRef subscriber)
    {
        _mediator.Tell(new Subscribe(topic, subscriber));
    }

    public void Unsubscribe(string topic, IActorRef subscriber)
    {
        _mediator.Tell(new Unsubscribe(topic, subscriber));
    }

    public void Publish(string topic, object message)
    {
        _mediator.Tell(new Publish(topic, message));
    }

    public void Send(string topic, object message)
    {
        _mediator.Tell(new Send(topic, message, localAffinity: true));
    }
}

Wiring It All Together

整合所有组件

Configure your ActorSystem based on execution mode:
csharp
public static class AkkaHostingExtensions
{
    public static AkkaConfigurationBuilder ConfigureActorSystem(
        this AkkaConfigurationBuilder builder,
        AkkaExecutionMode mode,
        IServiceCollection services)
    {
        if (mode == AkkaExecutionMode.Clustered)
        {
            builder
                .WithClustering()
                .WithShardRegion<MyEntity>(
                    "my-entity",
                    (system, registry, resolver) => entityId =>
                        resolver.Props<MyEntityActor>(entityId),
                    new MyEntityMessageExtractor(),
                    new ShardOptions())
                .WithDistributedPubSub();

            // Register cluster pub/sub mediator
            services.AddSingleton<IPubSubMediator>(sp =>
            {
                var system = sp.GetRequiredService<ActorSystem>();
                return new ClusterPubSubMediator(system);
            });
        }
        else // LocalTest mode
        {
            // Register local pub/sub mediator
            services.AddSingleton<IPubSubMediator>(sp =>
            {
                var system = sp.GetRequiredService<ActorSystem>();
                return new LocalPubSubMediator(system);
            });

            // Use GenericChildPerEntityParent instead of sharding
            builder.WithActors((system, registry, resolver) =>
            {
                var parent = system.ActorOf(
                    GenericChildPerEntityParent.CreateProps(
                        new MyEntityMessageExtractor(),
                        entityId => resolver.Props<MyEntityActor>(entityId)),
                    "my-entity");

                registry.Register<MyEntityParent>(parent);
            });
        }

        return builder;
    }
}
根据执行模式配置ActorSystem:
csharp
public static class AkkaHostingExtensions
{
    public static AkkaConfigurationBuilder ConfigureActorSystem(
        this AkkaConfigurationBuilder builder,
        AkkaExecutionMode mode,
        IServiceCollection services)
    {
        if (mode == AkkaExecutionMode.Clustered)
        {
            builder
                .WithClustering()
                .WithShardRegion<MyEntity>(
                    "my-entity",
                    (system, registry, resolver) => entityId =>
                        resolver.Props<MyEntityActor>(entityId),
                    new MyEntityMessageExtractor(),
                    new ShardOptions())
                .WithDistributedPubSub();

            // Register cluster pub/sub mediator
            services.AddSingleton<IPubSubMediator>(sp =>
            {
                var system = sp.GetRequiredService<ActorSystem>();
                return new ClusterPubSubMediator(system);
            });
        }
        else // LocalTest mode
        {
            // Register local pub/sub mediator
            services.AddSingleton<IPubSubMediator>(sp =>
            {
                var system = sp.GetRequiredService<ActorSystem>();
                return new LocalPubSubMediator(system);
            });

            // Use GenericChildPerEntityParent instead of sharding
            builder.WithActors((system, registry, resolver) =>
            {
                var parent = system.ActorOf(
                    GenericChildPerEntityParent.CreateProps(
                        new MyEntityMessageExtractor(),
                        entityId => resolver.Props<MyEntityActor>(entityId)),
                    "my-entity");

                registry.Register<MyEntityParent>(parent);
            });
        }

        return builder;
    }
}

Usage in Application Code

在应用代码中使用

Application code uses the abstractions and doesn't need to know which mode is active:
csharp
public class MyService
{
    private readonly IPubSubMediator _pubSub;
    private readonly IRequiredActor<MyEntityParent> _entityParent;

    public MyService(
        IPubSubMediator pubSub,
        IRequiredActor<MyEntityParent> entityParent)
    {
        _pubSub = pubSub;
        _entityParent = entityParent;
    }

    public async Task ProcessAsync(string entityId, MyCommand command)
    {
        // Works identically in both modes
        var parent = await _entityParent.GetAsync();
        parent.Tell(new ShardingEnvelope(entityId, command));

        // Publish event - works with both local and distributed pub/sub
        _pubSub.Publish($"entity:{entityId}", new EntityUpdated(entityId));
    }
}
应用代码使用抽象接口,无需知道当前处于哪种模式:
csharp
public class MyService
{
    private readonly IPubSubMediator _pubSub;
    private readonly IRequiredActor<MyEntityParent> _entityParent;

    public MyService(
        IPubSubMediator pubSub,
        IRequiredActor<MyEntityParent> entityParent)
    {
        _pubSub = pubSub;
        _entityParent = entityParent;
    }

    public async Task ProcessAsync(string entityId, MyCommand command)
    {
        // Works identically in both modes
        var parent = await _entityParent.GetAsync();
        parent.Tell(new ShardingEnvelope(entityId, command));

        // Publish event - works with both local and distributed pub/sub
        _pubSub.Publish($"entity:{entityId}", new EntityUpdated(entityId));
    }
}

Benefits of This Pattern

该模式的优势

BenefitDescription
Fast unit testsNo cluster startup overhead, tests run in milliseconds
Identical message flowSame
IMessageExtractor
, same message types
Easy debuggingLocal mode is simpler to step through
Integration test flexibilityChoose mode per test scenario
Production confidenceAbstractions are thin wrappers over real implementations
优势说明
快速单元测试无需集群启动开销,测试在毫秒级完成
相同的消息流使用相同的
IMessageExtractor
和消息类型
易于调试本地模式更易于单步调试
集成测试灵活性可为每个测试场景选择模式
生产环境信心抽象层是真实实现的轻量级包装

When to Use Each Mode

何时使用每种模式

ScenarioRecommended Mode
Unit testsLocalTest
Integration tests (single node)LocalTest
Integration tests (multi-node)Clustered
Local developmentLocalTest or Clustered (your choice)
ProductionClustered

场景推荐模式
单元测试LocalTest
集成测试(单节点)LocalTest
集成测试(多节点)Clustered
本地开发LocalTest或Clustered(按需选择)
生产环境Clustered

9. Actor Logging

9. Actor 日志

Use ILoggingAdapter, Not ILogger<T>

使用ILoggingAdapter,而非ILogger<T>

In actors, use
ILoggingAdapter
from
Context.GetLogger()
instead of DI-injected
ILogger<T>
:
csharp
public class MyActor : ReceiveActor
{
    private readonly ILoggingAdapter _log = Context.GetLogger();

    public MyActor()
    {
        Receive<MyMessage>(msg =>
        {
            // ✅ Akka.NET ILoggingAdapter with semantic logging (v1.5.57+)
            _log.Info("Processing message for user {UserId}", msg.UserId);
            _log.Error(ex, "Failed to process {MessageType}", msg.GetType().Name);
        });
    }
}
Why ILoggingAdapter:
  • Integrates with Akka's logging pipeline and supervision
  • Supports semantic/structured logging as of v1.5.57
  • Method names:
    Info()
    ,
    Debug()
    ,
    Warning()
    ,
    Error()
    (not
    Log*
    variants)
  • No DI required - obtained directly from actor context
Don't inject ILogger<T>:
csharp
// ❌ Don't inject ILogger<T> into actors
public class MyActor : ReceiveActor
{
    private readonly ILogger<MyActor> _logger; // Wrong!

    public MyActor(ILogger<MyActor> logger)
    {
        _logger = logger;
    }
}
在Actor中,请使用
Context.GetLogger()
获取的
ILoggingAdapter
,而非通过DI注入的
ILogger<T>
csharp
public class MyActor : ReceiveActor
{
    private readonly ILoggingAdapter _log = Context.GetLogger();

    public MyActor()
    {
        Receive<MyMessage>(msg =>
        {
            // ✅ Akka.NET ILoggingAdapter with semantic logging (v1.5.57+)
            _log.Info("Processing message for user {UserId}", msg.UserId);
            _log.Error(ex, "Failed to process {MessageType}", msg.GetType().Name);
        });
    }
}
使用ILoggingAdapter的原因:
  • 与Akka的日志管道和监督策略集成
  • 从v1.5.57版本开始支持语义/结构化日志
  • 方法名称:
    Info()
    Debug()
    Warning()
    Error()
    (而非
    Log*
    变体)
  • 无需DI——直接从Actor上下文获取
不要注入ILogger<T>
csharp
// ❌ Don't inject ILogger<T> into actors
public class MyActor : ReceiveActor
{
    private readonly ILogger<MyActor> _logger; // Wrong!

    public MyActor(ILogger<MyActor> logger)
    {
        _logger = logger;
    }
}

Semantic Logging (v1.5.57+)

语义日志(v1.5.57+)

As of Akka.NET v1.5.57,
ILoggingAdapter
supports semantic/structured logging with named placeholders:
csharp
// Named placeholders for better log aggregation and querying
_log.Info("Order {OrderId} processed for customer {CustomerId}", order.Id, order.CustomerId);

// Prefer named placeholders over positional
// ✅ Good: {OrderId}, {CustomerId}
// ❌ Avoid: {0}, {1}

从Akka.NET v1.5.57开始,
ILoggingAdapter
支持带命名占位符的语义/结构化日志:
csharp
// Named placeholders for better log aggregation and querying
_log.Info("Order {OrderId} processed for customer {CustomerId}", order.Id, order.CustomerId);

// Prefer named placeholders over positional
// ✅ Good: {OrderId}, {CustomerId}
// ❌ Avoid: {0}, {1}

10. Managing Async Operations with CancellationToken

10. 使用CancellationToken管理异步操作

When actors launch async operations via
PipeTo
, those operations can outlive the actor if not properly managed. Use
CancellationToken
tied to the actor lifecycle.
当Actor通过
PipeTo
启动异步操作时,如果管理不当,这些操作可能会比Actor存活更久。请使用与Actor生命周期绑定的
CancellationToken

Actor-Scoped CancellationTokenSource

Actor范围的CancellationTokenSource

Cancel in-flight async work when the actor stops:
csharp
public class DataSyncActor : ReceiveActor
{
    private CancellationTokenSource? _operationCts;

    public DataSyncActor()
    {
        ReceiveAsync<StartSync>(HandleStartSyncAsync);
    }

    protected override void PostStop()
    {
        // Cancel any in-flight async work when actor stops
        _operationCts?.Cancel();
        _operationCts?.Dispose();
        _operationCts = null;
        base.PostStop();
    }

    private Task HandleStartSyncAsync(StartSync cmd)
    {
        // Cancel any previous operation, create new CTS
        _operationCts?.Cancel();
        _operationCts?.Dispose();
        _operationCts = new CancellationTokenSource();
        var ct = _operationCts.Token;

        async Task<SyncResult> PerformSyncAsync()
        {
            try
            {
                ct.ThrowIfCancellationRequested();

                // Pass token to all async operations
                var data = await _repository.GetDataAsync(ct);
                await _service.ProcessAsync(data, ct);

                return new SyncResult(Success: true);
            }
            catch (OperationCanceledException) when (ct.IsCancellationRequested)
            {
                // Actor is stopping - graceful exit
                return new SyncResult(Success: false, "Cancelled");
            }
        }

        PerformSyncAsync().PipeTo(Self);
        return Task.CompletedTask;
    }
}
当Actor停止时取消正在进行的异步工作:
csharp
public class DataSyncActor : ReceiveActor
{
    private CancellationTokenSource? _operationCts;

    public DataSyncActor()
    {
        ReceiveAsync<StartSync>(HandleStartSyncAsync);
    }

    protected override void PostStop()
    {
        // Cancel any in-flight async work when actor stops
        _operationCts?.Cancel();
        _operationCts?.Dispose();
        _operationCts = null;
        base.PostStop();
    }

    private Task HandleStartSyncAsync(StartSync cmd)
    {
        // Cancel any previous operation, create new CTS
        _operationCts?.Cancel();
        _operationCts?.Dispose();
        _operationCts = new CancellationTokenSource();
        var ct = _operationCts.Token;

        async Task<SyncResult> PerformSyncAsync()
        {
            try
            {
                ct.ThrowIfCancellationRequested();

                // Pass token to all async operations
                var data = await _repository.GetDataAsync(ct);
                await _service.ProcessAsync(data, ct);

                return new SyncResult(Success: true);
            }
            catch (OperationCanceledException) when (ct.IsCancellationRequested)
            {
                // Actor is stopping - graceful exit
                return new SyncResult(Success: false, "Cancelled");
            }
        }

        PerformSyncAsync().PipeTo(Self);
        return Task.CompletedTask;
    }
}

Linked CTS for Per-Operation Timeouts

用于每个操作超时的链接CTS

For external API calls that might hang, use linked CTS with short timeouts:
csharp
private static readonly TimeSpan ApiTimeout = TimeSpan.FromSeconds(30);

async Task<SyncResult> PerformSyncAsync()
{
    // Check actor-level cancellation
    ct.ThrowIfCancellationRequested();

    // Per-operation timeout linked to actor's CTS
    SomeResult result;
    using (var opCts = CancellationTokenSource.CreateLinkedTokenSource(ct))
    {
        opCts.CancelAfter(ApiTimeout);
        result = await _externalApi.FetchDataAsync(opCts.Token);
    }

    // Process result...
}
How linked CTS works:
  • Inherits cancellation from parent (actor stop → cancels immediately)
  • Adds its own timeout via
    CancelAfter
    (hung API → cancels after timeout)
  • Whichever fires first wins
  • Disposed after each operation (short-lived)
对于可能挂起的外部API调用,请使用带短超时的链接CTS:
csharp
private static readonly TimeSpan ApiTimeout = TimeSpan.FromSeconds(30);

async Task<SyncResult> PerformSyncAsync()
{
    // Check actor-level cancellation
    ct.ThrowIfCancellationRequested();

    // Per-operation timeout linked to actor's CTS
    SomeResult result;
    using (var opCts = CancellationTokenSource.CreateLinkedTokenSource(ct))
    {
        opCts.CancelAfter(ApiTimeout);
        result = await _externalApi.FetchDataAsync(opCts.Token);
    }

    // Process result...
}
链接CTS的工作方式:
  • 继承父级的取消信号(Actor停止→立即取消)
  • 通过
    CancelAfter
    添加自身的超时(API挂起→超时后取消)
  • 哪个信号先触发就生效哪个
  • 每个操作后释放(生命周期短)

Graceful Timeout vs Shutdown Handling

优雅超时与关闭处理

Distinguish between actor shutdown and operation timeout:
csharp
try
{
    using var opCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
    opCts.CancelAfter(ApiTimeout);
    await _api.CallAsync(opCts.Token);
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
    // Timeout (not actor death) - can retry or handle gracefully
    _log.Warning("API call timed out, skipping item");
}
// If ct.IsCancellationRequested is true, let it propagate up
区分Actor关闭和操作超时:
csharp
try
{
    using var opCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
    opCts.CancelAfter(ApiTimeout);
    await _api.CallAsync(opCts.Token);
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
    // Timeout (not actor death) - can retry or handle gracefully
    _log.Warning("API call timed out, skipping item");
}
// If ct.IsCancellationRequested is true, let it propagate up

Key Points

关键点

PracticeDescription
Actor CTS in PostStopAlways cancel and dispose in
PostStop()
New CTS per operationCancel previous before starting new work
Pass token everywhereEF Core queries, HTTP calls, etc. all accept
CancellationToken
Linked CTS for timeoutsExternal calls get short timeouts to prevent hanging
Check in loopsCall
ct.ThrowIfCancellationRequested()
between iterations
Graceful handlingDistinguish timeout vs shutdown in catch blocks
实践说明
在PostStop中处理Actor CTS始终在
PostStop()
中取消并释放CTS
每个操作新建CTS启动新工作前取消之前的操作
将令牌传递到所有地方EF Core查询、HTTP调用等都接受
CancellationToken
使用链接CTS实现超时外部调用使用短超时防止挂起
在循环中检查在迭代之间调用
ct.ThrowIfCancellationRequested()
优雅处理在catch块中区分超时和关闭

When to Use

适用场景

  • Any actor that launches async work via
    PipeTo
  • Long-running operations (sync jobs, batch processing)
  • External API calls that might hang
  • Database operations in loops
  • 任何通过
    PipeTo
    启动异步工作的Actor
  • 长时间运行的操作(同步任务、批处理)
  • 可能挂起的外部API调用
  • 循环中的数据库操作