akka-net-best-practices
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAkka.NET Best Practices
Akka.NET 最佳实践
When to Use This Skill
何时使用本技能
Use this skill when:
- Designing actor communication patterns
- Deciding between EventStream and DistributedPubSub
- Implementing error handling in actors
- Understanding supervision strategies
- Choosing between Props patterns and DependencyResolver
- Designing work distribution across nodes
- Creating testable actor systems that can run with or without cluster infrastructure
- Abstracting over Cluster Sharding for local testing scenarios
在以下场景中使用本技能:
- 设计Actor通信模式
- 选择EventStream还是DistributedPubSub
- 在Actor中实现错误处理
- 理解监督策略
- 选择Props模式还是DependencyResolver
- 设计跨节点的工作分配
- 创建可测试的Actor系统,支持有无集群基础设施的运行环境
- 为本地测试场景抽象Cluster Sharding
1. EventStream vs DistributedPubSub
1. EventStream 与 DistributedPubSub 对比
Critical: EventStream is LOCAL ONLY
重点:EventStream 仅支持本地使用
Context.System.EventStreamcsharp
// BAD: This only works on a single server
// When you add a second server, subscribers on server 2 won't receive events from server 1
Context.System.EventStream.Subscribe(Self, typeof(PostCreated));
Context.System.EventStream.Publish(new PostCreated(postId, authorId));When EventStream is appropriate:
- Logging and diagnostics within a single process
- Local event bus for truly single-process applications
- Development/testing scenarios
Context.System.EventStreamcsharp
// BAD: This only works on a single server
// When you add a second server, subscribers on server 2 won't receive events from server 1
Context.System.EventStream.Subscribe(Self, typeof(PostCreated));
Context.System.EventStream.Publish(new PostCreated(postId, authorId));适合使用EventStream的场景:
- 单进程内的日志记录与诊断
- 纯单进程应用的本地事件总线
- 开发/测试场景
Use DistributedPubSub for Multi-Node
多节点场景使用DistributedPubSub
For events that must reach actors across multiple cluster nodes, use :
Akka.Cluster.Tools.PublishSubscribecsharp
using Akka.Cluster.Tools.PublishSubscribe;
public class TimelineUpdatePublisher : ReceiveActor
{
private readonly IActorRef _mediator;
public TimelineUpdatePublisher()
{
// Get the DistributedPubSub mediator
_mediator = DistributedPubSub.Get(Context.System).Mediator;
Receive<PublishTimelineUpdate>(msg =>
{
// Publish to a topic - reaches all subscribers across all nodes
_mediator.Tell(new Publish($"timeline:{msg.UserId}", msg.Update));
});
}
}
public class TimelineSubscriber : ReceiveActor
{
public TimelineSubscriber(UserId userId)
{
var mediator = DistributedPubSub.Get(Context.System).Mediator;
// Subscribe to user-specific topic
mediator.Tell(new Subscribe($"timeline:{userId}", Self));
Receive<TimelineUpdate>(update =>
{
// Handle the update - this works across cluster nodes
});
Receive<SubscribeAck>(ack =>
{
// Subscription confirmed
});
}
}对于需要跨多个集群节点传递给Actor的事件,请使用:
Akka.Cluster.Tools.PublishSubscribecsharp
using Akka.Cluster.Tools.PublishSubscribe;
public class TimelineUpdatePublisher : ReceiveActor
{
private readonly IActorRef _mediator;
public TimelineUpdatePublisher()
{
// Get the DistributedPubSub mediator
_mediator = DistributedPubSub.Get(Context.System).Mediator;
Receive<PublishTimelineUpdate>(msg =>
{
// Publish to a topic - reaches all subscribers across all nodes
_mediator.Tell(new Publish($"timeline:{msg.UserId}", msg.Update));
});
}
}
public class TimelineSubscriber : ReceiveActor
{
public TimelineSubscriber(UserId userId)
{
var mediator = DistributedPubSub.Get(Context.System).Mediator;
// Subscribe to user-specific topic
mediator.Tell(new Subscribe($"timeline:{userId}", Self));
Receive<TimelineUpdate>(update =>
{
// Handle the update - this works across cluster nodes
});
Receive<SubscribeAck>(ack =>
{
// Subscription confirmed
});
}
}Akka.Hosting Configuration for DistributedPubSub
DistributedPubSub 的 Akka.Hosting 配置
csharp
builder.WithDistributedPubSub(role: null); // Available on all roles, or specify a rolecsharp
builder.WithDistributedPubSub(role: null); // Available on all roles, or specify a roleTopic Design Patterns
主题设计模式
| Pattern | Topic Format | Use Case |
|---|---|---|
| Per-user | | Timeline updates, notifications |
| Per-entity | | Post engagement updates |
| Broadcast | | System-wide notifications |
| Role-based | | Work distribution |
| 模式 | 主题格式 | 适用场景 |
|---|---|---|
| 按用户划分 | | 时间线更新、通知 |
| 按实体划分 | | 帖子互动更新 |
| 广播 | | 系统级通知 |
| 按角色划分 | | 工作分配 |
2. Supervision Strategies
2. 监督策略
Key Clarification: Supervision is for CHILDREN
关键说明:监督仅针对子Actor
A supervision strategy defined on an actor dictates how that actor supervises its children, NOT how the actor itself is supervised.
csharp
public class ParentActor : ReceiveActor
{
// This strategy applies to children of ParentActor, NOT to ParentActor itself
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
maxNrOfRetries: 10,
withinTimeRange: TimeSpan.FromSeconds(30),
decider: ex => ex switch
{
ArithmeticException => Directive.Resume,
NullReferenceException => Directive.Restart,
ArgumentException => Directive.Stop,
_ => Directive.Escalate
});
}
}Actor上定义的监督策略规定了该Actor如何监督其子Actor,而非该Actor自身如何被监督。
csharp
public class ParentActor : ReceiveActor
{
// This strategy applies to children of ParentActor, NOT to ParentActor itself
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
maxNrOfRetries: 10,
withinTimeRange: TimeSpan.FromSeconds(30),
decider: ex => ex switch
{
ArithmeticException => Directive.Resume,
NullReferenceException => Directive.Restart,
ArgumentException => Directive.Stop,
_ => Directive.Escalate
});
}
}Default Supervision Strategy
默认监督策略
The default already includes rate limiting:
OneForOneStrategy- 10 restarts within 1 second = actor is permanently stopped
- This prevents infinite restart loops
You rarely need a custom strategy unless you have specific requirements.
默认的已包含速率限制:
OneForOneStrategy- 1秒内重启10次 → Actor被永久停止
- 这可以防止无限重启循环
除非有特定需求,否则很少需要自定义策略。
When to Define Custom Supervision
何时定义自定义监督策略
Good reasons:
- Actor throws exceptions indicating irrecoverable state corruption → Restart
- Actor throws exceptions that should NOT cause restart (expected failures) → Resume
- Child failures should affect siblings → Use
AllForOneStrategy - Need different retry limits than the default
Bad reasons:
- "Just to be safe" - the default is already safe
- Don't understand what the actor does - understand it first
合理理由:
- Actor抛出的异常表明状态已损坏且无法恢复 → 重启
- Actor抛出的异常不应导致重启(预期内的失败) → 恢复
- 子Actor的失败应影响同级Actor → 使用
AllForOneStrategy - 需要与默认值不同的重试限制
不合理理由:
- “只是为了安全” → 默认策略已经足够安全
- 不理解Actor的功能 → 先理解其功能
Example: When Custom Supervision Makes Sense
示例:自定义监督策略的适用场景
csharp
public class RssFeedCoordinator : ReceiveActor
{
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
maxNrOfRetries: -1, // Unlimited retries
withinTimeRange: TimeSpan.FromMinutes(1),
decider: ex => ex switch
{
// HTTP timeout - transient, resume and let the actor retry via its own timer
HttpRequestException => Directive.Resume,
// Feed URL permanently invalid - stop this child, don't restart forever
InvalidFeedUrlException => Directive.Stop,
// Unknown error - restart to clear potentially corrupt state
_ => Directive.Restart
});
}
}csharp
public class RssFeedCoordinator : ReceiveActor
{
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
maxNrOfRetries: -1, // Unlimited retries
withinTimeRange: TimeSpan.FromMinutes(1),
decider: ex => ex switch
{
// HTTP timeout - transient, resume and let the actor retry via its own timer
HttpRequestException => Directive.Resume,
// Feed URL permanently invalid - stop this child, don't restart forever
InvalidFeedUrlException => Directive.Stop,
// Unknown error - restart to clear potentially corrupt state
_ => Directive.Restart
});
}
}3. Error Handling: Supervision vs Try-Catch
3. 错误处理:监督 vs Try-Catch
When to Use Try-Catch (Most Cases)
何时使用Try-Catch(大多数场景)
Use try-catch when:
- The failure is expected (network timeout, invalid input, external service down)
- You know exactly why the exception occurred
- You can handle it gracefully (retry, return error response, log and continue)
- Restarting would not help (same error would occur again)
csharp
public class RssFeedPollerActor : ReceiveActor
{
public RssFeedPollerActor()
{
ReceiveAsync<PollFeed>(async msg =>
{
try
{
var feed = await _httpClient.GetStringAsync(msg.FeedUrl);
var items = ParseFeed(feed);
// Process items...
}
catch (HttpRequestException ex)
{
// Expected failure - log and schedule retry
_log.Warning("Feed {Url} unavailable: {Error}", msg.FeedUrl, ex.Message);
Context.System.Scheduler.ScheduleTellOnce(
TimeSpan.FromMinutes(5),
Self,
msg,
Self);
}
catch (XmlException ex)
{
// Invalid feed format - log and mark as bad
_log.Error("Feed {Url} has invalid format: {Error}", msg.FeedUrl, ex.Message);
Sender.Tell(new FeedPollResult.InvalidFormat(msg.FeedUrl));
}
});
}
}在以下情况使用try-catch:
- 失败是预期内的(网络超时、无效输入、外部服务宕机)
- 你完全清楚异常发生的原因
- 你可以优雅地处理它(重试、返回错误响应、记录日志并继续)
- 重启无济于事(错误会再次发生)
csharp
public class RssFeedPollerActor : ReceiveActor
{
public RssFeedPollerActor()
{
ReceiveAsync<PollFeed>(async msg =>
{
try
{
var feed = await _httpClient.GetStringAsync(msg.FeedUrl);
var items = ParseFeed(feed);
// Process items...
}
catch (HttpRequestException ex)
{
// Expected failure - log and schedule retry
_log.Warning("Feed {Url} unavailable: {Error}", msg.FeedUrl, ex.Message);
Context.System.Scheduler.ScheduleTellOnce(
TimeSpan.FromMinutes(5),
Self,
msg,
Self);
}
catch (XmlException ex)
{
// Invalid feed format - log and mark as bad
_log.Error("Feed {Url} has invalid format: {Error}", msg.FeedUrl, ex.Message);
Sender.Tell(new FeedPollResult.InvalidFormat(msg.FeedUrl));
}
});
}
}When to Let Supervision Handle It
何时让监督策略处理异常
Let exceptions propagate (trigger supervision) when:
- You have no idea why the exception occurred
- The actor's state might be corrupt
- A restart would help (fresh state, reconnect resources)
- It's a programming error (NullReferenceException, InvalidOperationException from bad logic)
csharp
public class OrderActor : ReceiveActor
{
private OrderState _state;
public OrderActor()
{
Receive<ProcessPayment>(msg =>
{
// If this throws, we have no idea why - let supervision restart us
// A restart will reload state from persistence and might fix the issue
var result = _state.ApplyPayment(msg.Amount);
Persist(new PaymentApplied(msg.Amount), evt =>
{
_state = _state.With(evt);
});
});
}
}在以下情况让异常传播(触发监督):
- 你完全不清楚异常发生的原因
- Actor的状态可能已损坏
- 重启会有帮助(刷新状态、重新连接资源)
- 这是编程错误(NullReferenceException、由错误逻辑导致的InvalidOperationException)
csharp
public class OrderActor : ReceiveActor
{
private OrderState _state;
public OrderActor()
{
Receive<ProcessPayment>(msg =>
{
// If this throws, we have no idea why - let supervision restart us
// A restart will reload state from persistence and might fix the issue
var result = _state.ApplyPayment(msg.Amount);
Persist(new PaymentApplied(msg.Amount), evt =>
{
_state = _state.With(evt);
});
});
}
}Anti-Pattern: Swallowing Unknown Exceptions
反模式:吞掉未知异常
csharp
// BAD: Swallowing exceptions hides problems
public class BadActor : ReceiveActor
{
public BadActor()
{
ReceiveAsync<DoWork>(async msg =>
{
try
{
await ProcessWork(msg);
}
catch (Exception ex)
{
// This hides all errors - you'll never know something is broken
_log.Error(ex, "Error processing work");
// Actor continues with potentially corrupt state
}
});
}
}
// GOOD: Handle known exceptions, let unknown ones propagate
public class GoodActor : ReceiveActor
{
public GoodActor()
{
ReceiveAsync<DoWork>(async msg =>
{
try
{
await ProcessWork(msg);
}
catch (HttpRequestException ex)
{
// Known, expected failure - handle gracefully
_log.Warning("HTTP request failed: {Error}", ex.Message);
Sender.Tell(new WorkResult.TransientFailure());
}
// Unknown exceptions propagate to supervision
});
}
}csharp
// BAD: Swallowing exceptions hides problems
public class BadActor : ReceiveActor
{
public BadActor()
{
ReceiveAsync<DoWork>(async msg =>
{
try
{
await ProcessWork(msg);
}
catch (Exception ex)
{
// This hides all errors - you'll never know something is broken
_log.Error(ex, "Error processing work");
// Actor continues with potentially corrupt state
}
});
}
}
// GOOD: Handle known exceptions, let unknown ones propagate
public class GoodActor : ReceiveActor
{
public GoodActor()
{
ReceiveAsync<DoWork>(async msg =>
{
try
{
await ProcessWork(msg);
}
catch (HttpRequestException ex)
{
// Known, expected failure - handle gracefully
_log.Warning("HTTP request failed: {Error}", ex.Message);
Sender.Tell(new WorkResult.TransientFailure());
}
// Unknown exceptions propagate to supervision
});
}
}4. Props vs DependencyResolver
4. Props vs DependencyResolver
When to Use Plain Props
何时使用普通Props
Use when:
Props.Create()- Actor doesn't need or
IServiceProviderIRequiredActor<T> - All dependencies can be passed via constructor
- Actor is simple and self-contained
csharp
// Simple actor with no DI needs
public static Props Props(PostId postId, IPostWriteStore store)
=> Akka.Actor.Props.Create(() => new PostEngagementActor(postId, store));
// Usage
var actor = Context.ActorOf(PostEngagementActor.Props(postId, store), postId.ToString());在以下情况使用:
Props.Create()- Actor不需要或
IServiceProviderIRequiredActor<T> - 所有依赖都可以通过构造函数传递
- Actor简单且独立
csharp
// Simple actor with no DI needs
public static Props Props(PostId postId, IPostWriteStore store)
=> Akka.Actor.Props.Create(() => new PostEngagementActor(postId, store));
// Usage
var actor = Context.ActorOf(PostEngagementActor.Props(postId, store), postId.ToString());When to Use DependencyResolver
何时使用DependencyResolver
Use when:
resolver.Props<T>()- Actor needs to create scoped services
IServiceProvider - Actor uses to get references to other actors
IRequiredActor<T> - Actor has many dependencies that are already in DI container
csharp
// Actor that needs scoped database connections
public class OrderProcessorActor : ReceiveActor
{
public OrderProcessorActor(IServiceProvider serviceProvider)
{
ReceiveAsync<ProcessOrder>(async msg =>
{
// Create a scope for this operation
using var scope = serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<OrderDbContext>();
// Process order...
});
}
}
// Registration with DI
builder.WithActors((system, registry, resolver) =>
{
var actor = system.ActorOf(resolver.Props<OrderProcessorActor>(), "order-processor");
registry.Register<OrderProcessorActor>(actor);
});在以下情况使用:
resolver.Props<T>()- Actor需要来创建范围服务
IServiceProvider - Actor使用获取其他Actor的引用
IRequiredActor<T> - Actor有多个已在DI容器中的依赖
csharp
// Actor that needs scoped database connections
public class OrderProcessorActor : ReceiveActor
{
public OrderProcessorActor(IServiceProvider serviceProvider)
{
ReceiveAsync<ProcessOrder>(async msg =>
{
// Create a scope for this operation
using var scope = serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<OrderDbContext>();
// Process order...
});
}
}
// Registration with DI
builder.WithActors((system, registry, resolver) =>
{
var actor = system.ActorOf(resolver.Props<OrderProcessorActor>(), "order-processor");
registry.Register<OrderProcessorActor>(actor);
});Remote Deployment Considerations
远程部署注意事项
You almost never need remote deployment. Remote deployment means deploying an actor to run on a different node than the one creating it.
If you're not doing remote deployment (and you probably aren't):
- with closures is fine
Props.Create(() => new Actor(...)) - The "serialization issue" warning doesn't apply
When you would use remote deployment:
- Distributing compute-intensive work to specific nodes
- Running actors on nodes with specific hardware (GPU, etc.)
For most applications, use cluster sharding instead of remote deployment - it handles distribution automatically.
几乎不需要远程部署。远程部署意味着将Actor部署到与创建它的节点不同的节点上运行。
如果你不进行远程部署(通常确实不需要):
- 使用闭包的是没问题的
Props.Create(() => new Actor(...)) - “序列化问题”的警告不适用
何时需要远程部署:
- 将计算密集型工作分配到特定节点
- 在具有特定硬件(如GPU)的节点上运行Actor
对于大多数应用,请使用集群分片代替远程部署——它会自动处理分配。
5. Work Distribution Patterns
5. 工作分配模式
Problem: Thundering Herd
问题:惊群效应
When you have many background jobs (RSS feeds, email sending, etc.), don't process them all at once:
csharp
// BAD: Polls all feeds simultaneously on startup
public class BadRssCoordinator : ReceiveActor
{
public BadRssCoordinator(IRssFeedRepository repo)
{
ReceiveAsync<StartPolling>(async _ =>
{
var feeds = await repo.GetAllFeedsAsync();
foreach (var feed in feeds) // 2000 feeds = 2000 simultaneous HTTP requests
{
Context.ActorOf(RssFeedPollerActor.Props(feed.Url));
}
});
}
}当你有许多后台任务(如RSS订阅、邮件发送等)时,不要同时处理所有任务:
csharp
// BAD: Polls all feeds simultaneously on startup
public class BadRssCoordinator : ReceiveActor
{
public BadRssCoordinator(IRssFeedRepository repo)
{
ReceiveAsync<StartPolling>(async _ =>
{
var feeds = await repo.GetAllFeedsAsync();
foreach (var feed in feeds) // 2000 feeds = 2000 simultaneous HTTP requests
{
Context.ActorOf(RssFeedPollerActor.Props(feed.Url));
}
});
}
}Pattern 1: Database-Driven Work Queue
模式1:数据库驱动的工作队列
Use the database as a work queue with :
FOR UPDATE SKIP LOCKEDcsharp
public class RssPollerWorker : ReceiveActor
{
public RssPollerWorker(IRssFeedRepository repo)
{
ReceiveAsync<PollBatch>(async _ =>
{
// Each worker claims a batch - naturally distributes across nodes
var feeds = await repo.ClaimFeedsForPollingAsync(
batchSize: 10,
staleAfter: TimeSpan.FromMinutes(10));
foreach (var feed in feeds)
{
try
{
await PollFeed(feed);
await repo.MarkPolledAsync(feed.Id, success: true);
}
catch (Exception ex)
{
await repo.MarkPolledAsync(feed.Id, success: false, error: ex.Message);
}
}
// Schedule next batch
Context.System.Scheduler.ScheduleTellOnce(
TimeSpan.FromSeconds(5),
Self,
PollBatch.Instance,
Self);
});
}
}sql
-- ClaimFeedsForPollingAsync implementation
UPDATE rss_feeds
SET status = 'processing',
processing_started_at = NOW()
WHERE id IN (
SELECT id FROM rss_feeds
WHERE status = 'pending'
AND (next_poll_at IS NULL OR next_poll_at <= NOW())
ORDER BY next_poll_at NULLS FIRST
LIMIT @batchSize
FOR UPDATE SKIP LOCKED
)
RETURNING *;Benefits:
- Naturally distributes work across multiple server nodes
- No coordination needed - database handles locking
- Easy to monitor (query the table)
- Survives server restarts
使用数据库作为工作队列,配合:
FOR UPDATE SKIP LOCKEDcsharp
public class RssPollerWorker : ReceiveActor
{
public RssPollerWorker(IRssFeedRepository repo)
{
ReceiveAsync<PollBatch>(async _ =>
{
// Each worker claims a batch - naturally distributes across nodes
var feeds = await repo.ClaimFeedsForPollingAsync(
batchSize: 10,
staleAfter: TimeSpan.FromMinutes(10));
foreach (var feed in feeds)
{
try
{
await PollFeed(feed);
await repo.MarkPolledAsync(feed.Id, success: true);
}
catch (Exception ex)
{
await repo.MarkPolledAsync(feed.Id, success: false, error: ex.Message);
}
}
// Schedule next batch
Context.System.Scheduler.ScheduleTellOnce(
TimeSpan.FromSeconds(5),
Self,
PollBatch.Instance,
Self);
});
}
}sql
-- ClaimFeedsForPollingAsync implementation
UPDATE rss_feeds
SET status = 'processing',
processing_started_at = NOW()
WHERE id IN (
SELECT id FROM rss_feeds
WHERE status = 'pending'
AND (next_poll_at IS NULL OR next_poll_at <= NOW())
ORDER BY next_poll_at NULLS FIRST
LIMIT @batchSize
FOR UPDATE SKIP LOCKED
)
RETURNING *;优势:
- 自然地跨多个服务器节点分配工作
- 无需协调——数据库处理锁定
- 易于监控(查询表即可)
- 服务器重启后仍能继续
Pattern 2: Akka.Streams for Rate Limiting
模式2:使用Akka.Streams进行速率限制
Use Akka.Streams to throttle processing within a single node:
csharp
public class ThrottledRssProcessor : ReceiveActor
{
public ThrottledRssProcessor(IRssFeedRepository repo)
{
var materializer = Context.System.Materializer();
ReceiveAsync<StartProcessing>(async _ =>
{
await Source.From(await repo.GetPendingFeedsAsync())
.Throttle(10, TimeSpan.FromSeconds(1)) // Max 10 per second
.SelectAsync(4, async feed => // Max 4 concurrent
{
await PollFeed(feed);
return feed;
})
.RunWith(Sink.Ignore<RssFeed>(), materializer);
});
}
}使用Akka.Streams在单个节点内限制处理速率:
csharp
public class ThrottledRssProcessor : ReceiveActor
{
public ThrottledRssProcessor(IRssFeedRepository repo)
{
var materializer = Context.System.Materializer();
ReceiveAsync<StartProcessing>(async _ =>
{
await Source.From(await repo.GetPendingFeedsAsync())
.Throttle(10, TimeSpan.FromSeconds(1)) // Max 10 per second
.SelectAsync(4, async feed => // Max 4 concurrent
{
await PollFeed(feed);
return feed;
})
.RunWith(Sink.Ignore<RssFeed>(), materializer);
});
}
}Pattern 3: Durable Queue (Email Outbox Pattern)
模式3:持久化队列(邮件发件箱模式)
For work that must be reliably processed, use a database-backed outbox:
csharp
// Enqueue work transactionally with business operation
public async Task CreatePostAsync(Post post)
{
await using var transaction = await _db.BeginTransactionAsync();
await _postStore.CreateAsync(post);
// Enqueue notification emails in same transaction
foreach (var follower in await _followStore.GetFollowersAsync(post.AuthorId))
{
await _emailOutbox.EnqueueAsync(new EmailJob
{
To = follower.Email,
Template = "new-post",
Data = JsonSerializer.Serialize(new { PostId = post.Id })
});
}
await transaction.CommitAsync();
}
// Worker processes outbox
public class EmailOutboxWorker : ReceiveActor
{
public EmailOutboxWorker(IEmailOutboxStore outbox, IEmailSender sender)
{
ReceiveAsync<ProcessBatch>(async _ =>
{
var batch = await outbox.ClaimBatchAsync(10);
foreach (var job in batch)
{
try
{
await sender.SendAsync(job);
await outbox.MarkSentAsync(job.Id);
}
catch (Exception ex)
{
await outbox.MarkFailedAsync(job.Id, ex.Message);
}
}
});
}
}对于必须可靠处理的工作,请使用数据库支持的发件箱:
csharp
// Enqueue work transactionally with business operation
public async Task CreatePostAsync(Post post)
{
await using var transaction = await _db.BeginTransactionAsync();
await _postStore.CreateAsync(post);
// Enqueue notification emails in same transaction
foreach (var follower in await _followStore.GetFollowersAsync(post.AuthorId))
{
await _emailOutbox.EnqueueAsync(new EmailJob
{
To = follower.Email,
Template = "new-post",
Data = JsonSerializer.Serialize(new { PostId = post.Id })
});
}
await transaction.CommitAsync();
}
// Worker processes outbox
public class EmailOutboxWorker : ReceiveActor
{
public EmailOutboxWorker(IEmailOutboxStore outbox, IEmailSender sender)
{
ReceiveAsync<ProcessBatch>(async _ =>
{
var batch = await outbox.ClaimBatchAsync(10);
foreach (var job in batch)
{
try
{
await sender.SendAsync(job);
await outbox.MarkSentAsync(job.Id);
}
catch (Exception ex)
{
await outbox.MarkFailedAsync(job.Id, ex.Message);
}
}
});
}
}6. Common Mistakes Summary
6. 常见错误总结
| Mistake | Why It's Wrong | Fix |
|---|---|---|
| Using EventStream for cross-node pub/sub | EventStream is local only | Use DistributedPubSub |
| Defining supervision to "protect" an actor | Supervision protects children | Understand the hierarchy |
| Catching all exceptions | Hides bugs, corrupts state | Only catch expected errors |
| Always using DependencyResolver | Adds unnecessary complexity | Use plain Props when possible |
| Processing all background jobs at once | Thundering herd, resource exhaustion | Use database queue + rate limiting |
| Throwing exceptions for expected failures | Triggers unnecessary restarts | Return result types, use messaging |
| 错误 | 原因 | 修复方案 |
|---|---|---|
| 使用EventStream进行跨节点发布/订阅 | EventStream仅支持本地 | 使用DistributedPubSub |
| 定义监督策略以“保护”Actor自身 | 监督策略保护的是子Actor | 理解Actor层级结构 |
| 捕获所有异常 | 隐藏bug、导致状态损坏 | 仅捕获预期内的错误 |
| 始终使用DependencyResolver | 增加不必要的复杂度 | 可能的话使用普通Props |
| 同时处理所有后台任务 | 惊群效应、资源耗尽 | 使用数据库队列+速率限制 |
| 为预期内的失败抛出异常 | 触发不必要的重启 | 返回结果类型、使用消息机制 |
7. Quick Reference
7. 快速参考
Communication Pattern Decision Tree
通信模式决策树
Need to communicate between actors?
├── Same process only? → EventStream is fine
├── Across cluster nodes?
│ ├── Point-to-point? → Use ActorSelection or known IActorRef
│ └── Pub/sub? → Use DistributedPubSub
└── Fire-and-forget to external system? → Consider outbox pattern需要在Actor之间通信?
├── 仅在同一进程内? → 可以使用EventStream
├── 跨集群节点?
│ ├── 点对点? → 使用ActorSelection或已知的IActorRef
│ └── 发布/订阅? → 使用DistributedPubSub
└── 向外部系统发送即发即弃的消息? → 考虑发件箱模式Error Handling Decision Tree
错误处理决策树
Exception occurred in actor?
├── Expected failure (HTTP timeout, invalid input)?
│ └── Try-catch, handle gracefully, continue
├── State might be corrupt?
│ └── Let supervision restart
├── Unknown cause?
│ └── Let supervision restart
└── Programming error (null ref, bad logic)?
└── Let supervision restart, fix the bugActor中发生异常?
├── 预期内的失败(HTTP超时、无效输入)?
│ └── 使用try-catch优雅处理,继续运行
├── 状态可能已损坏?
│ └── 让监督策略重启Actor
├── 未知原因?
│ └── 让监督策略重启Actor
└── 编程错误(空引用、逻辑错误)?
└── 让监督策略重启Actor,修复bugProps Decision Tree
Props决策树
Creating actor Props?
├── Actor needs IServiceProvider?
│ └── Use resolver.Props<T>()
├── Actor needs IRequiredActor<T>?
│ └── Use resolver.Props<T>()
├── Simple actor with constructor params?
│ └── Use Props.Create(() => new Actor(...))
└── Remote deployment needed?
└── Probably not - use cluster sharding instead创建Actor的Props?
├── Actor需要IServiceProvider?
│ └── 使用resolver.Props<T>()
├── Actor需要IRequiredActor<T>?
│ └── 使用resolver.Props<T>()
├── 简单Actor,仅需构造函数参数?
│ └── 使用Props.Create(() => new Actor(...))
└── 需要远程部署?
└── 通常不需要——使用集群分片代替8. Cluster/Local Mode Abstractions
8. 集群/本地模式抽象
For applications that need to run both in clustered production environments and local/test environments without cluster infrastructure, use abstraction patterns to toggle between implementations.
对于需要同时在集群生产环境和无集群基础设施的本地/测试环境中运行的应用,请使用抽象模式在不同实现之间切换。
AkkaExecutionMode Enum
AkkaExecutionMode 枚举
Define an execution mode that controls which implementations are used:
csharp
/// <summary>
/// Determines how Akka.NET infrastructure features are configured.
/// </summary>
public enum AkkaExecutionMode
{
/// <summary>
/// Local test mode - no cluster infrastructure.
/// Uses in-memory implementations for pub/sub and local parent actors
/// instead of cluster sharding.
/// </summary>
LocalTest,
/// <summary>
/// Full cluster mode with sharding, singletons, and distributed pub/sub.
/// </summary>
Clustered
}定义一个执行模式,控制使用哪种实现:
csharp
/// <summary>
/// Determines how Akka.NET infrastructure features are configured.
/// </summary>
public enum AkkaExecutionMode
{
/// <summary>
/// Local test mode - no cluster infrastructure.
/// Uses in-memory implementations for pub/sub and local parent actors
/// instead of cluster sharding.
/// </summary>
LocalTest,
/// <summary>
/// Full cluster mode with sharding, singletons, and distributed pub/sub.
/// </summary>
Clustered
}GenericChildPerEntityParent - Local Sharding Alternative
GenericChildPerEntityParent - 本地分片替代方案
When testing locally, you can't use Cluster Sharding. This actor mimics sharding behavior by creating child actors per entity ID using the same interface:
IMessageExtractorcsharp
/// <summary>
/// A local parent actor that mimics Cluster Sharding behavior.
/// Creates and manages child actors per entity ID using the same IMessageExtractor
/// that would be used with real sharding, enabling seamless switching between modes.
/// </summary>
public sealed class GenericChildPerEntityParent : ReceiveActor
{
private readonly IMessageExtractor _extractor;
private readonly Func<string, Props> _propsFactory;
private readonly Dictionary<string, IActorRef> _children = new();
private readonly ILoggingAdapter _log = Context.GetLogger();
public GenericChildPerEntityParent(
IMessageExtractor extractor,
Func<string, Props> propsFactory)
{
_extractor = extractor;
_propsFactory = propsFactory;
ReceiveAny(msg =>
{
var entityId = _extractor.EntityId(msg);
if (string.IsNullOrEmpty(entityId))
{
_log.Warning("Could not extract entity ID from message {0}", msg.GetType().Name);
Unhandled(msg);
return;
}
var child = GetOrCreateChild(entityId);
// Unwrap the message if it's a ShardingEnvelope
var unwrapped = _extractor.EntityMessage(msg);
child.Forward(unwrapped);
});
}
private IActorRef GetOrCreateChild(string entityId)
{
if (_children.TryGetValue(entityId, out var existing))
return existing;
var props = _propsFactory(entityId);
var child = Context.ActorOf(props, entityId);
Context.Watch(child);
_children[entityId] = child;
_log.Debug("Created child actor for entity {0}", entityId);
return child;
}
protected override void PreRestart(Exception reason, object message)
{
// Don't stop children on restart
}
public static Props CreateProps(
IMessageExtractor extractor,
Func<string, Props> propsFactory)
{
return Props.Create(() => new GenericChildPerEntityParent(extractor, propsFactory));
}
}本地测试时无法使用Cluster Sharding。这个Actor通过使用相同的接口,为每个实体ID创建子Actor,模拟分片行为:
IMessageExtractorcsharp
/// <summary>
/// A local parent actor that mimics Cluster Sharding behavior.
/// Creates and manages child actors per entity ID using the same IMessageExtractor
/// that would be used with real sharding, enabling seamless switching between modes.
/// </summary>
public sealed class GenericChildPerEntityParent : ReceiveActor
{
private readonly IMessageExtractor _extractor;
private readonly Func<string, Props> _propsFactory;
private readonly Dictionary<string, IActorRef> _children = new();
private readonly ILoggingAdapter _log = Context.GetLogger();
public GenericChildPerEntityParent(
IMessageExtractor extractor,
Func<string, Props> propsFactory)
{
_extractor = extractor;
_propsFactory = propsFactory;
ReceiveAny(msg =>
{
var entityId = _extractor.EntityId(msg);
if (string.IsNullOrEmpty(entityId))
{
_log.Warning("Could not extract entity ID from message {0}", msg.GetType().Name);
Unhandled(msg);
return;
}
var child = GetOrCreateChild(entityId);
// Unwrap the message if it's a ShardingEnvelope
var unwrapped = _extractor.EntityMessage(msg);
child.Forward(unwrapped);
});
}
private IActorRef GetOrCreateChild(string entityId)
{
if (_children.TryGetValue(entityId, out var existing))
return existing;
var props = _propsFactory(entityId);
var child = Context.ActorOf(props, entityId);
Context.Watch(child);
_children[entityId] = child;
_log.Debug("Created child actor for entity {0}", entityId);
return child;
}
protected override void PreRestart(Exception reason, object message)
{
// Don't stop children on restart
}
public static Props CreateProps(
IMessageExtractor extractor,
Func<string, Props> propsFactory)
{
return Props.Create(() => new GenericChildPerEntityParent(extractor, propsFactory));
}
}IPubSubMediator - Abstracting DistributedPubSub
IPubSubMediator - 抽象DistributedPubSub
Create an interface to abstract over pub/sub so tests can use a local implementation:
csharp
/// <summary>
/// Abstraction over pub/sub messaging that allows swapping between
/// DistributedPubSub (clustered) and local implementations (testing).
/// </summary>
public interface IPubSubMediator
{
/// <summary>
/// Subscribe an actor to a topic.
/// </summary>
void Subscribe(string topic, IActorRef subscriber);
/// <summary>
/// Unsubscribe an actor from a topic.
/// </summary>
void Unsubscribe(string topic, IActorRef subscriber);
/// <summary>
/// Publish a message to all subscribers of a topic.
/// </summary>
void Publish(string topic, object message);
/// <summary>
/// Send a message to one subscriber of a topic (load balanced).
/// </summary>
void Send(string topic, object message);
}创建一个接口来抽象发布/订阅机制,以便测试时可以使用本地实现:
csharp
/// <summary>
/// Abstraction over pub/sub messaging that allows swapping between
/// DistributedPubSub (clustered) and local implementations (testing).
/// </summary>
public interface IPubSubMediator
{
/// <summary>
/// Subscribe an actor to a topic.
/// </summary>
void Subscribe(string topic, IActorRef subscriber);
/// <summary>
/// Unsubscribe an actor from a topic.
/// </summary>
void Unsubscribe(string topic, IActorRef subscriber);
/// <summary>
/// Publish a message to all subscribers of a topic.
/// </summary>
void Publish(string topic, object message);
/// <summary>
/// Send a message to one subscriber of a topic (load balanced).
/// </summary>
void Send(string topic, object message);
}LocalPubSubMediator - In-Memory Implementation
LocalPubSubMediator - 内存实现
csharp
/// <summary>
/// In-memory pub/sub implementation for local testing without cluster.
/// Uses the EventStream internally for simplicity.
/// </summary>
public sealed class LocalPubSubMediator : IPubSubMediator
{
private readonly ActorSystem _system;
private readonly ConcurrentDictionary<string, HashSet<IActorRef>> _subscriptions = new();
private readonly object _lock = new();
public LocalPubSubMediator(ActorSystem system)
{
_system = system;
}
public void Subscribe(string topic, IActorRef subscriber)
{
lock (_lock)
{
var subs = _subscriptions.GetOrAdd(topic, _ => new HashSet<IActorRef>());
subs.Add(subscriber);
}
// Send acknowledgement like real DistributedPubSub does
subscriber.Tell(new SubscribeAck(new Subscribe(topic, subscriber)));
}
public void Unsubscribe(string topic, IActorRef subscriber)
{
lock (_lock)
{
if (_subscriptions.TryGetValue(topic, out var subs))
{
subs.Remove(subscriber);
}
}
subscriber.Tell(new UnsubscribeAck(new Unsubscribe(topic, subscriber)));
}
public void Publish(string topic, object message)
{
HashSet<IActorRef> subscribers;
lock (_lock)
{
if (!_subscriptions.TryGetValue(topic, out var subs))
return;
subscribers = new HashSet<IActorRef>(subs);
}
foreach (var subscriber in subscribers)
{
subscriber.Tell(message);
}
}
public void Send(string topic, object message)
{
IActorRef? target = null;
lock (_lock)
{
if (_subscriptions.TryGetValue(topic, out var subs) && subs.Count > 0)
{
// Simple round-robin - pick first available
target = subs.FirstOrDefault();
}
}
target?.Tell(message);
}
}csharp
/// <summary>
/// In-memory pub/sub implementation for local testing without cluster.
/// Uses the EventStream internally for simplicity.
/// </summary>
public sealed class LocalPubSubMediator : IPubSubMediator
{
private readonly ActorSystem _system;
private readonly ConcurrentDictionary<string, HashSet<IActorRef>> _subscriptions = new();
private readonly object _lock = new();
public LocalPubSubMediator(ActorSystem system)
{
_system = system;
}
public void Subscribe(string topic, IActorRef subscriber)
{
lock (_lock)
{
var subs = _subscriptions.GetOrAdd(topic, _ => new HashSet<IActorRef>());
subs.Add(subscriber);
}
// Send acknowledgement like real DistributedPubSub does
subscriber.Tell(new SubscribeAck(new Subscribe(topic, subscriber)));
}
public void Unsubscribe(string topic, IActorRef subscriber)
{
lock (_lock)
{
if (_subscriptions.TryGetValue(topic, out var subs))
{
subs.Remove(subscriber);
}
}
subscriber.Tell(new UnsubscribeAck(new Unsubscribe(topic, subscriber)));
}
public void Publish(string topic, object message)
{
HashSet<IActorRef> subscribers;
lock (_lock)
{
if (!_subscriptions.TryGetValue(topic, out var subs))
return;
subscribers = new HashSet<IActorRef>(subs);
}
foreach (var subscriber in subscribers)
{
subscriber.Tell(message);
}
}
public void Send(string topic, object message)
{
IActorRef? target = null;
lock (_lock)
{
if (_subscriptions.TryGetValue(topic, out var subs) && subs.Count > 0)
{
// Simple round-robin - pick first available
target = subs.FirstOrDefault();
}
}
target?.Tell(message);
}
}ClusterPubSubMediator - Production Implementation
ClusterPubSubMediator - 生产环境实现
csharp
/// <summary>
/// Production implementation wrapping Akka.Cluster.Tools.PublishSubscribe.
/// </summary>
public sealed class ClusterPubSubMediator : IPubSubMediator
{
private readonly IActorRef _mediator;
public ClusterPubSubMediator(ActorSystem system)
{
_mediator = DistributedPubSub.Get(system).Mediator;
}
public void Subscribe(string topic, IActorRef subscriber)
{
_mediator.Tell(new Subscribe(topic, subscriber));
}
public void Unsubscribe(string topic, IActorRef subscriber)
{
_mediator.Tell(new Unsubscribe(topic, subscriber));
}
public void Publish(string topic, object message)
{
_mediator.Tell(new Publish(topic, message));
}
public void Send(string topic, object message)
{
_mediator.Tell(new Send(topic, message, localAffinity: true));
}
}csharp
/// <summary>
/// Production implementation wrapping Akka.Cluster.Tools.PublishSubscribe.
/// </summary>
public sealed class ClusterPubSubMediator : IPubSubMediator
{
private readonly IActorRef _mediator;
public ClusterPubSubMediator(ActorSystem system)
{
_mediator = DistributedPubSub.Get(system).Mediator;
}
public void Subscribe(string topic, IActorRef subscriber)
{
_mediator.Tell(new Subscribe(topic, subscriber));
}
public void Unsubscribe(string topic, IActorRef subscriber)
{
_mediator.Tell(new Unsubscribe(topic, subscriber));
}
public void Publish(string topic, object message)
{
_mediator.Tell(new Publish(topic, message));
}
public void Send(string topic, object message)
{
_mediator.Tell(new Send(topic, message, localAffinity: true));
}
}Wiring It All Together
整合所有组件
Configure your ActorSystem based on execution mode:
csharp
public static class AkkaHostingExtensions
{
public static AkkaConfigurationBuilder ConfigureActorSystem(
this AkkaConfigurationBuilder builder,
AkkaExecutionMode mode,
IServiceCollection services)
{
if (mode == AkkaExecutionMode.Clustered)
{
builder
.WithClustering()
.WithShardRegion<MyEntity>(
"my-entity",
(system, registry, resolver) => entityId =>
resolver.Props<MyEntityActor>(entityId),
new MyEntityMessageExtractor(),
new ShardOptions())
.WithDistributedPubSub();
// Register cluster pub/sub mediator
services.AddSingleton<IPubSubMediator>(sp =>
{
var system = sp.GetRequiredService<ActorSystem>();
return new ClusterPubSubMediator(system);
});
}
else // LocalTest mode
{
// Register local pub/sub mediator
services.AddSingleton<IPubSubMediator>(sp =>
{
var system = sp.GetRequiredService<ActorSystem>();
return new LocalPubSubMediator(system);
});
// Use GenericChildPerEntityParent instead of sharding
builder.WithActors((system, registry, resolver) =>
{
var parent = system.ActorOf(
GenericChildPerEntityParent.CreateProps(
new MyEntityMessageExtractor(),
entityId => resolver.Props<MyEntityActor>(entityId)),
"my-entity");
registry.Register<MyEntityParent>(parent);
});
}
return builder;
}
}根据执行模式配置ActorSystem:
csharp
public static class AkkaHostingExtensions
{
public static AkkaConfigurationBuilder ConfigureActorSystem(
this AkkaConfigurationBuilder builder,
AkkaExecutionMode mode,
IServiceCollection services)
{
if (mode == AkkaExecutionMode.Clustered)
{
builder
.WithClustering()
.WithShardRegion<MyEntity>(
"my-entity",
(system, registry, resolver) => entityId =>
resolver.Props<MyEntityActor>(entityId),
new MyEntityMessageExtractor(),
new ShardOptions())
.WithDistributedPubSub();
// Register cluster pub/sub mediator
services.AddSingleton<IPubSubMediator>(sp =>
{
var system = sp.GetRequiredService<ActorSystem>();
return new ClusterPubSubMediator(system);
});
}
else // LocalTest mode
{
// Register local pub/sub mediator
services.AddSingleton<IPubSubMediator>(sp =>
{
var system = sp.GetRequiredService<ActorSystem>();
return new LocalPubSubMediator(system);
});
// Use GenericChildPerEntityParent instead of sharding
builder.WithActors((system, registry, resolver) =>
{
var parent = system.ActorOf(
GenericChildPerEntityParent.CreateProps(
new MyEntityMessageExtractor(),
entityId => resolver.Props<MyEntityActor>(entityId)),
"my-entity");
registry.Register<MyEntityParent>(parent);
});
}
return builder;
}
}Usage in Application Code
在应用代码中使用
Application code uses the abstractions and doesn't need to know which mode is active:
csharp
public class MyService
{
private readonly IPubSubMediator _pubSub;
private readonly IRequiredActor<MyEntityParent> _entityParent;
public MyService(
IPubSubMediator pubSub,
IRequiredActor<MyEntityParent> entityParent)
{
_pubSub = pubSub;
_entityParent = entityParent;
}
public async Task ProcessAsync(string entityId, MyCommand command)
{
// Works identically in both modes
var parent = await _entityParent.GetAsync();
parent.Tell(new ShardingEnvelope(entityId, command));
// Publish event - works with both local and distributed pub/sub
_pubSub.Publish($"entity:{entityId}", new EntityUpdated(entityId));
}
}应用代码使用抽象接口,无需知道当前处于哪种模式:
csharp
public class MyService
{
private readonly IPubSubMediator _pubSub;
private readonly IRequiredActor<MyEntityParent> _entityParent;
public MyService(
IPubSubMediator pubSub,
IRequiredActor<MyEntityParent> entityParent)
{
_pubSub = pubSub;
_entityParent = entityParent;
}
public async Task ProcessAsync(string entityId, MyCommand command)
{
// Works identically in both modes
var parent = await _entityParent.GetAsync();
parent.Tell(new ShardingEnvelope(entityId, command));
// Publish event - works with both local and distributed pub/sub
_pubSub.Publish($"entity:{entityId}", new EntityUpdated(entityId));
}
}Benefits of This Pattern
该模式的优势
| Benefit | Description |
|---|---|
| Fast unit tests | No cluster startup overhead, tests run in milliseconds |
| Identical message flow | Same |
| Easy debugging | Local mode is simpler to step through |
| Integration test flexibility | Choose mode per test scenario |
| Production confidence | Abstractions are thin wrappers over real implementations |
| 优势 | 说明 |
|---|---|
| 快速单元测试 | 无需集群启动开销,测试在毫秒级完成 |
| 相同的消息流 | 使用相同的 |
| 易于调试 | 本地模式更易于单步调试 |
| 集成测试灵活性 | 可为每个测试场景选择模式 |
| 生产环境信心 | 抽象层是真实实现的轻量级包装 |
When to Use Each Mode
何时使用每种模式
| Scenario | Recommended Mode |
|---|---|
| Unit tests | LocalTest |
| Integration tests (single node) | LocalTest |
| Integration tests (multi-node) | Clustered |
| Local development | LocalTest or Clustered (your choice) |
| Production | Clustered |
| 场景 | 推荐模式 |
|---|---|
| 单元测试 | LocalTest |
| 集成测试(单节点) | LocalTest |
| 集成测试(多节点) | Clustered |
| 本地开发 | LocalTest或Clustered(按需选择) |
| 生产环境 | Clustered |
9. Actor Logging
9. Actor 日志
Use ILoggingAdapter, Not ILogger<T>
使用ILoggingAdapter,而非ILogger<T>
In actors, use from instead of DI-injected :
ILoggingAdapterContext.GetLogger()ILogger<T>csharp
public class MyActor : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
public MyActor()
{
Receive<MyMessage>(msg =>
{
// ✅ Akka.NET ILoggingAdapter with semantic logging (v1.5.57+)
_log.Info("Processing message for user {UserId}", msg.UserId);
_log.Error(ex, "Failed to process {MessageType}", msg.GetType().Name);
});
}
}Why ILoggingAdapter:
- Integrates with Akka's logging pipeline and supervision
- Supports semantic/structured logging as of v1.5.57
- Method names: ,
Info(),Debug(),Warning()(notError()variants)Log* - No DI required - obtained directly from actor context
Don't inject ILogger<T>:
csharp
// ❌ Don't inject ILogger<T> into actors
public class MyActor : ReceiveActor
{
private readonly ILogger<MyActor> _logger; // Wrong!
public MyActor(ILogger<MyActor> logger)
{
_logger = logger;
}
}在Actor中,请使用获取的,而非通过DI注入的:
Context.GetLogger()ILoggingAdapterILogger<T>csharp
public class MyActor : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
public MyActor()
{
Receive<MyMessage>(msg =>
{
// ✅ Akka.NET ILoggingAdapter with semantic logging (v1.5.57+)
_log.Info("Processing message for user {UserId}", msg.UserId);
_log.Error(ex, "Failed to process {MessageType}", msg.GetType().Name);
});
}
}使用ILoggingAdapter的原因:
- 与Akka的日志管道和监督策略集成
- 从v1.5.57版本开始支持语义/结构化日志
- 方法名称:、
Info()、Debug()、Warning()(而非Error()变体)Log* - 无需DI——直接从Actor上下文获取
不要注入ILogger<T>:
csharp
// ❌ Don't inject ILogger<T> into actors
public class MyActor : ReceiveActor
{
private readonly ILogger<MyActor> _logger; // Wrong!
public MyActor(ILogger<MyActor> logger)
{
_logger = logger;
}
}Semantic Logging (v1.5.57+)
语义日志(v1.5.57+)
As of Akka.NET v1.5.57, supports semantic/structured logging with named placeholders:
ILoggingAdaptercsharp
// Named placeholders for better log aggregation and querying
_log.Info("Order {OrderId} processed for customer {CustomerId}", order.Id, order.CustomerId);
// Prefer named placeholders over positional
// ✅ Good: {OrderId}, {CustomerId}
// ❌ Avoid: {0}, {1}从Akka.NET v1.5.57开始,支持带命名占位符的语义/结构化日志:
ILoggingAdaptercsharp
// Named placeholders for better log aggregation and querying
_log.Info("Order {OrderId} processed for customer {CustomerId}", order.Id, order.CustomerId);
// Prefer named placeholders over positional
// ✅ Good: {OrderId}, {CustomerId}
// ❌ Avoid: {0}, {1}10. Managing Async Operations with CancellationToken
10. 使用CancellationToken管理异步操作
When actors launch async operations via , those operations can outlive the actor if not properly managed. Use tied to the actor lifecycle.
PipeToCancellationToken当Actor通过启动异步操作时,如果管理不当,这些操作可能会比Actor存活更久。请使用与Actor生命周期绑定的。
PipeToCancellationTokenActor-Scoped CancellationTokenSource
Actor范围的CancellationTokenSource
Cancel in-flight async work when the actor stops:
csharp
public class DataSyncActor : ReceiveActor
{
private CancellationTokenSource? _operationCts;
public DataSyncActor()
{
ReceiveAsync<StartSync>(HandleStartSyncAsync);
}
protected override void PostStop()
{
// Cancel any in-flight async work when actor stops
_operationCts?.Cancel();
_operationCts?.Dispose();
_operationCts = null;
base.PostStop();
}
private Task HandleStartSyncAsync(StartSync cmd)
{
// Cancel any previous operation, create new CTS
_operationCts?.Cancel();
_operationCts?.Dispose();
_operationCts = new CancellationTokenSource();
var ct = _operationCts.Token;
async Task<SyncResult> PerformSyncAsync()
{
try
{
ct.ThrowIfCancellationRequested();
// Pass token to all async operations
var data = await _repository.GetDataAsync(ct);
await _service.ProcessAsync(data, ct);
return new SyncResult(Success: true);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
// Actor is stopping - graceful exit
return new SyncResult(Success: false, "Cancelled");
}
}
PerformSyncAsync().PipeTo(Self);
return Task.CompletedTask;
}
}当Actor停止时取消正在进行的异步工作:
csharp
public class DataSyncActor : ReceiveActor
{
private CancellationTokenSource? _operationCts;
public DataSyncActor()
{
ReceiveAsync<StartSync>(HandleStartSyncAsync);
}
protected override void PostStop()
{
// Cancel any in-flight async work when actor stops
_operationCts?.Cancel();
_operationCts?.Dispose();
_operationCts = null;
base.PostStop();
}
private Task HandleStartSyncAsync(StartSync cmd)
{
// Cancel any previous operation, create new CTS
_operationCts?.Cancel();
_operationCts?.Dispose();
_operationCts = new CancellationTokenSource();
var ct = _operationCts.Token;
async Task<SyncResult> PerformSyncAsync()
{
try
{
ct.ThrowIfCancellationRequested();
// Pass token to all async operations
var data = await _repository.GetDataAsync(ct);
await _service.ProcessAsync(data, ct);
return new SyncResult(Success: true);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
// Actor is stopping - graceful exit
return new SyncResult(Success: false, "Cancelled");
}
}
PerformSyncAsync().PipeTo(Self);
return Task.CompletedTask;
}
}Linked CTS for Per-Operation Timeouts
用于每个操作超时的链接CTS
For external API calls that might hang, use linked CTS with short timeouts:
csharp
private static readonly TimeSpan ApiTimeout = TimeSpan.FromSeconds(30);
async Task<SyncResult> PerformSyncAsync()
{
// Check actor-level cancellation
ct.ThrowIfCancellationRequested();
// Per-operation timeout linked to actor's CTS
SomeResult result;
using (var opCts = CancellationTokenSource.CreateLinkedTokenSource(ct))
{
opCts.CancelAfter(ApiTimeout);
result = await _externalApi.FetchDataAsync(opCts.Token);
}
// Process result...
}How linked CTS works:
- Inherits cancellation from parent (actor stop → cancels immediately)
- Adds its own timeout via (hung API → cancels after timeout)
CancelAfter - Whichever fires first wins
- Disposed after each operation (short-lived)
对于可能挂起的外部API调用,请使用带短超时的链接CTS:
csharp
private static readonly TimeSpan ApiTimeout = TimeSpan.FromSeconds(30);
async Task<SyncResult> PerformSyncAsync()
{
// Check actor-level cancellation
ct.ThrowIfCancellationRequested();
// Per-operation timeout linked to actor's CTS
SomeResult result;
using (var opCts = CancellationTokenSource.CreateLinkedTokenSource(ct))
{
opCts.CancelAfter(ApiTimeout);
result = await _externalApi.FetchDataAsync(opCts.Token);
}
// Process result...
}链接CTS的工作方式:
- 继承父级的取消信号(Actor停止→立即取消)
- 通过添加自身的超时(API挂起→超时后取消)
CancelAfter - 哪个信号先触发就生效哪个
- 每个操作后释放(生命周期短)
Graceful Timeout vs Shutdown Handling
优雅超时与关闭处理
Distinguish between actor shutdown and operation timeout:
csharp
try
{
using var opCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
opCts.CancelAfter(ApiTimeout);
await _api.CallAsync(opCts.Token);
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
// Timeout (not actor death) - can retry or handle gracefully
_log.Warning("API call timed out, skipping item");
}
// If ct.IsCancellationRequested is true, let it propagate up区分Actor关闭和操作超时:
csharp
try
{
using var opCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
opCts.CancelAfter(ApiTimeout);
await _api.CallAsync(opCts.Token);
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
// Timeout (not actor death) - can retry or handle gracefully
_log.Warning("API call timed out, skipping item");
}
// If ct.IsCancellationRequested is true, let it propagate upKey Points
关键点
| Practice | Description |
|---|---|
| Actor CTS in PostStop | Always cancel and dispose in |
| New CTS per operation | Cancel previous before starting new work |
| Pass token everywhere | EF Core queries, HTTP calls, etc. all accept |
| Linked CTS for timeouts | External calls get short timeouts to prevent hanging |
| Check in loops | Call |
| Graceful handling | Distinguish timeout vs shutdown in catch blocks |
| 实践 | 说明 |
|---|---|
| 在PostStop中处理Actor CTS | 始终在 |
| 每个操作新建CTS | 启动新工作前取消之前的操作 |
| 将令牌传递到所有地方 | EF Core查询、HTTP调用等都接受 |
| 使用链接CTS实现超时 | 外部调用使用短超时防止挂起 |
| 在循环中检查 | 在迭代之间调用 |
| 优雅处理 | 在catch块中区分超时和关闭 |
When to Use
适用场景
- Any actor that launches async work via
PipeTo - Long-running operations (sync jobs, batch processing)
- External API calls that might hang
- Database operations in loops
- 任何通过启动异步工作的Actor
PipeTo - 长时间运行的操作(同步任务、批处理)
- 可能挂起的外部API调用
- 循环中的数据库操作