swift-async-stream-patterns
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAsyncStream & AsyncSequence Patterns
AsyncStream & AsyncSequence 模式
When to use
适用场景
- Building custom types that produce values over time
AsyncSequence - Bridging synchronous/callback-based APIs to async/await
- Implementing channels, buffers, or multi-consumer broadcasting
- Handling backpressure in producer/consumer scenarios
- Creating "CurrentValue"-like semantics where late subscribers receive buffered values
- 构建可随时间生成值的自定义类型
AsyncSequence - 将同步/基于回调的API桥接到async/await
- 实现通道、缓冲区或多消费者广播功能
- 在生产者/消费者场景中处理背压问题
- 实现类“CurrentValue”语义,让晚订阅者能接收缓冲值
Core Patterns from swift-async-algorithms
来自swift-async-algorithms的核心模式
1. State Machine Pattern
1. 状态机模式
Use explicit state machines with enums to model complex async behavior. State machines return actions rather than performing side effects directly. This separates state logic from async operations.
swift
struct ChannelStateMachine<Element: Sendable> {
private enum State: Sendable {
case idle
case buffered(Element)
case streaming(continuation: Continuation)
case finished
}
private var state: State = .idle
// Each mutation returns an Action describing what to do
enum SendAction {
case yield(continuation: Continuation, element: Element)
case buffer(element: Element)
case ignore
}
mutating func send(_ element: Element) -> SendAction {
switch state {
case .idle:
state = .buffered(element)
return .buffer(element: element)
case .buffered:
state = .buffered(element)
return .buffer(element: element)
case .streaming(let continuation):
return .yield(continuation: continuation, element: element)
case .finished:
return .ignore
}
}
}Key insight: Compute state transitions inside locks, execute side effects (like resuming continuations) OUTSIDE locks.
使用带枚举的显式状态机来建模复杂的异步行为。状态机返回动作而非直接执行副作用,以此将状态逻辑与异步操作分离。
swift
struct ChannelStateMachine<Element: Sendable> {
private enum State: Sendable {
case idle
case buffered(Element)
case streaming(continuation: Continuation)
case finished
}
private var state: State = .idle
// Each mutation returns an Action describing what to do
enum SendAction {
case yield(continuation: Continuation, element: Element)
case buffer(element: Element)
case ignore
}
mutating func send(_ element: Element) -> SendAction {
switch state {
case .idle:
state = .buffered(element)
return .buffer(element: element)
case .buffered:
state = .buffered(element)
return .buffer(element: element)
case .streaming(let continuation):
return .yield(continuation: continuation, element: element)
case .finished:
return .ignore
}
}
}核心要点:在锁内计算状态转换,在锁外执行副作用(如恢复continuation)。
2. Thread-Safe State with Mutex
2. 基于Mutex的线程安全状态管理
Use from the Synchronization framework (iOS 18+, macOS 15+). The pattern from swift-async-algorithms:
Mutexswift
import Synchronization
@available(macOS 15.0, iOS 18.0, *)
final class Channel<Element: Sendable>: Sendable {
// State is a simple Sendable struct (NOT ~Copyable)
private struct State: Sendable {
var bufferedElement: Element?
var continuation: AsyncStream<Element>.Continuation?
var isFinished: Bool = false
}
// Mutex is stored in the class (class can hold ~Copyable types)
private let state: Mutex<State>
init() {
self.state = Mutex(State())
}
func send(_ element: Element) {
// 1. Determine action INSIDE lock
let continuation = state.withLock { state -> AsyncStream<Element>.Continuation? in
guard !state.isFinished else { return nil }
if let cont = state.continuation {
return cont
} else {
state.bufferedElement = element
return nil
}
}
// 2. Execute side effect OUTSIDE lock
continuation?.yield(element)
}
}Critical rules:
- Use to hold
final class(Mutex isMutex)~Copyable - State struct should be , not
Sendable~Copyable - Extract continuations inside the lock, resume OUTSIDE to prevent deadlocks
- Keep lock durations minimal - no async operations while holding a lock
使用Synchronization框架中的(iOS 18+、macOS 15+)。以下是来自swift-async-algorithms的模式:
Mutexswift
import Synchronization
@available(macOS 15.0, iOS 18.0, *)
final class Channel<Element: Sendable>: Sendable {
// State是简单的Sendable结构体(非~Copyable)
private struct State: Sendable {
var bufferedElement: Element?
var continuation: AsyncStream<Element>.Continuation?
var isFinished: Bool = false
}
// Mutex存储在类中(类可持有~Copyable类型)
private let state: Mutex<State>
init() {
self.state = Mutex(State())
}
func send(_ element: Element) {
// 1. 在锁内确定要执行的动作
let continuation = state.withLock { state -> AsyncStream<Element>.Continuation? in
guard !state.isFinished else { return nil }
if let cont = state.continuation {
return cont
} else {
state.bufferedElement = element
return nil
}
}
// 2. 在锁外执行副作用
continuation?.yield(element)
}
}关键规则:
- 使用来持有
final class(Mutex为Mutex类型)~Copyable - 状态结构体应遵循协议,而非
Sendable~Copyable - 在锁内提取continuation,在锁外恢复以避免死锁
- 尽量缩短持有锁的时间——持有锁时不要执行异步操作
3. Continuation Safety Patterns
3. Continuation安全模式
Always handle cancellation properly with continuations:
swift
func next() async -> Element? {
await withTaskCancellationHandler {
await withUnsafeContinuation { continuation in
// Determine action inside lock
let immediateResult = state.withLock { state -> Element?? in
if let element = state.buffer.popFirst() {
return .some(element)
}
state.waitingContinuation = continuation
return nil // Will be resumed later
}
// Handle immediate result OUTSIDE lock
if let result = immediateResult {
continuation.resume(returning: result)
}
}
} onCancel: {
// Called concurrently - must be thread-safe
let continuation = state.withLock { state -> UnsafeContinuation<Element?, Never>? in
let cont = state.waitingContinuation
state.waitingContinuation = nil
return cont
}
continuation?.resume(returning: nil)
}
}Critical rules:
- runs concurrently with the main operation - use locks
onCancel - Never call user code or resume continuations while holding a lock
- Always ensure continuations are eventually resumed (success, nil, or cancellation)
始终正确处理continuation的取消逻辑:
swift
func next() async -> Element? {
await withTaskCancellationHandler {
await withUnsafeContinuation { continuation in
// 在锁内确定动作
let immediateResult = state.withLock { state -> Element?? in
if let element = state.buffer.popFirst() {
return .some(element)
}
state.waitingContinuation = continuation
return nil // 后续会恢复
}
// 在锁外处理即时结果
if let result = immediateResult {
continuation.resume(returning: result)
}
}
} onCancel: {
// 并发调用——必须保证线程安全
let continuation = state.withLock { state -> UnsafeContinuation<Element?, Never>? in
let cont = state.waitingContinuation
state.waitingContinuation = nil
return cont
}
continuation?.resume(returning: nil)
}
}关键规则:
- 会与主操作并发执行——需使用锁保证安全
onCancel - 持有锁时不要调用用户代码或恢复continuation
- 必须确保continuation最终被恢复(成功返回、返回nil或处理取消)
4. Buffering Strategies
4. 缓冲区策略
Model buffering policies explicitly:
swift
enum BufferPolicy: Sendable {
/// Buffer up to N elements, then suspend producers
case bounded(Int)
/// Buffer without limit (use with caution)
case unbounded
/// Keep newest N elements, drop oldest when full
case bufferingNewest(Int)
/// Keep oldest N elements, drop newest when full
case bufferingOldest(Int)
}显式定义缓冲区策略:
swift
enum BufferPolicy: Sendable {
/// 最多缓冲N个元素,超出时挂起生产者
case bounded(Int)
/// 无限制缓冲(谨慎使用)
case unbounded
/// 保留最新的N个元素,缓冲区满时丢弃最旧元素
case bufferingNewest(Int)
/// 保留最旧的N个元素,缓冲区满时丢弃最新元素
case bufferingOldest(Int)
}5. Single-Value Buffering (CurrentValue Pattern)
5. 单值缓冲(CurrentValue模式)
For scenarios where late subscribers should receive the most recent value. Uses explicit state machine with enum states to make invalid states unrepresentable:
swift
@available(macOS 15.0, iOS 18.0, *)
public final class SingleValueBufferedStream<Element: Sendable>: Sendable {
private struct StateMachine: Sendable {
private enum State: Sendable {
case idle
case buffered(Element)
case streaming(AsyncStream<Element>.Continuation, generation: UInt64)
case finished
}
private var state: State = .idle
private var nextGeneration: UInt64 = 0
enum SendAction: Sendable {
case yield(AsyncStream<Element>.Continuation, Element)
case buffer
case ignore
}
mutating func send(_ element: Element) -> SendAction {
switch state {
case .idle:
state = .buffered(element)
return .buffer
case .buffered:
state = .buffered(element)
return .buffer
case .streaming(let continuation, let generation):
state = .streaming(continuation, generation: generation)
return .yield(continuation, element)
case .finished:
return .ignore
}
}
enum SubscribeAction: Sendable {
case streamActive(buffered: Element?, generation: UInt64)
case streamFinished
case replaceSubscriber(old: AsyncStream<Element>.Continuation, buffered: Element?, generation: UInt64)
}
mutating func subscribe(_ continuation: AsyncStream<Element>.Continuation) -> SubscribeAction {
nextGeneration &+= 1
let generation = nextGeneration
switch state {
case .idle:
state = .streaming(continuation, generation: generation)
return .streamActive(buffered: nil, generation: generation)
case .buffered(let element):
state = .streaming(continuation, generation: generation)
return .streamActive(buffered: element, generation: generation)
case .streaming(let oldContinuation, _):
state = .streaming(continuation, generation: generation)
return .replaceSubscriber(old: oldContinuation, buffered: nil, generation: generation)
case .finished:
return .streamFinished
}
}
}
private let stateMachine: Mutex<StateMachine>
public func send(_ element: Element) {
let action = stateMachine.withLock { $0.send(element) }
switch action {
case .yield(let continuation, let element):
continuation.yield(element)
case .buffer, .ignore:
break
}
}
}Key benefits of enum state machine:
- Invalid states are unrepresentable (can't have buffered element AND streaming simultaneously)
- State transitions are explicit and documented via switch cases
- Actions returned describe side effects, executed outside the lock
适用于晚订阅者需要接收最新值的场景。使用带枚举的显式状态机,避免出现无效状态:
swift
@available(macOS 15.0, iOS 18.0, *)
public final class SingleValueBufferedStream<Element: Sendable>: Sendable {
private struct StateMachine: Sendable {
private enum State: Sendable {
case idle
case buffered(Element)
case streaming(AsyncStream<Element>.Continuation, generation: UInt64)
case finished
}
private var state: State = .idle
private var nextGeneration: UInt64 = 0
enum SendAction: Sendable {
case yield(AsyncStream<Element>.Continuation, Element)
case buffer
case ignore
}
mutating func send(_ element: Element) -> SendAction {
switch state {
case .idle:
state = .buffered(element)
return .buffer
case .buffered:
state = .buffered(element)
return .buffer
case .streaming(let continuation, let generation):
state = .streaming(continuation, generation: generation)
return .yield(continuation, element)
case .finished:
return .ignore
}
}
enum SubscribeAction: Sendable {
case streamActive(buffered: Element?, generation: UInt64)
case streamFinished
case replaceSubscriber(old: AsyncStream<Element>.Continuation, buffered: Element?, generation: UInt64)
}
mutating func subscribe(_ continuation: AsyncStream<Element>.Continuation) -> SubscribeAction {
nextGeneration &+= 1
let generation = nextGeneration
switch state {
case .idle:
state = .streaming(continuation, generation: generation)
return .streamActive(buffered: nil, generation: generation)
case .buffered(let element):
state = .streaming(continuation, generation: generation)
return .streamActive(buffered: element, generation: generation)
case .streaming(let oldContinuation, _):
state = .streaming(continuation, generation: generation)
return .replaceSubscriber(old: oldContinuation, buffered: nil, generation: generation)
case .finished:
return .streamFinished
}
}
}
private let stateMachine: Mutex<StateMachine>
public func send(_ element: Element) {
let action = stateMachine.withLock { $0.send(element) }
switch action {
case .yield(let continuation, let element):
continuation.yield(element)
case .buffer, .ignore:
break
}
}
}枚举状态机的核心优势:
- 避免无效状态(不会同时存在缓冲元素和流传输状态)
- 通过switch分支明确记录状态转换逻辑
- 返回的动作定义了需执行的副作用,且在锁外执行
6. Lifecycle Management with Reference Types
6. 基于引用类型的生命周期管理
Use wrappers for cleanup on deinit:
final classswift
struct AsyncShareSequence<Base: AsyncSequence> {
// Extent manages lifetime - cancels iteration on deinit
final class Extent: Sendable {
let iteration: Iteration
deinit {
iteration.cancel()
}
}
let extent: Extent
}使用包装器在deinit时执行清理操作:
final classswift
struct AsyncShareSequence<Base: AsyncSequence> {
// Extent管理生命周期——在deinit时取消迭代
final class Extent: Sendable {
let iteration: Iteration
deinit {
iteration.cancel()
}
}
let extent: Extent
}7. Testing Patterns
7. 测试模式
Use gates for deterministic async testing. Gate is a synchronization primitive from swift-async-algorithms tests:
swift
import Synchronization
@available(macOS 15.0, iOS 18.0, *)
struct Gate: Sendable {
private enum State {
case closed
case open
case pending(UnsafeContinuation<Void, Never>)
}
private let state: Mutex<State>
init() {
self.state = Mutex(.closed)
}
func open() {
let continuation = state.withLock { state -> UnsafeContinuation<Void, Never>? in
switch state {
case .closed:
state = .open
return nil
case .pending(let continuation):
state = .closed
return continuation
case .open:
return nil
}
}
continuation?.resume()
}
func enter() async {
await withUnsafeContinuation { continuation in
let resume = state.withLock { state -> UnsafeContinuation<Void, Never>? in
switch state {
case .closed:
state = .pending(continuation)
return nil
case .open:
state = .closed
return continuation
case .pending:
fatalError("Only one waiter supported")
}
}
resume?.resume()
}
}
}Testing best practices from swift-async-algorithms:
- Use sparingly and only for timing-sensitive tests
Task.sleep - Use for synchronization between producer and consumer tasks
Gate - Always call or ensure the stream terminates to avoid hanging tests
finish() - Test edge cases: empty sequences, cancellation, errors, late subscribers
使用门控(Gate)实现确定性异步测试。Gate是swift-async-algorithms测试用例中的同步原语:
swift
import Synchronization
@available(macOS 15.0, iOS 18.0, *)
struct Gate: Sendable {
private enum State {
case closed
case open
case pending(UnsafeContinuation<Void, Never>)
}
private let state: Mutex<State>
init() {
self.state = Mutex(.closed)
}
func open() {
let continuation = state.withLock { state -> UnsafeContinuation<Void, Never>? in
switch state {
case .closed:
state = .open
return nil
case .pending(let continuation):
state = .closed
return continuation
case .open:
return nil
}
}
continuation?.resume()
}
func enter() async {
await withUnsafeContinuation { continuation in
let resume = state.withLock { state -> UnsafeContinuation<Void, Never>? in
switch state {
case .closed:
state = .pending(continuation)
return nil
case .open:
state = .closed
return continuation
case .pending:
fatalError("Only one waiter supported")
}
}
resume?.resume()
}
}
}来自swift-async-algorithms的测试最佳实践:
- 谨慎使用,仅用于对时间敏感的测试
Task.sleep - 使用实现生产者与消费者任务之间的同步
Gate - 始终调用或确保流终止,避免测试挂起
finish() - 测试边缘场景:空序列、取消、错误、晚订阅者
Anti-patterns to Avoid
需避免的反模式
❌ Using @unchecked Sendable without synchronization
❌ 未加同步的情况下使用@unchecked Sendable
swift
// BAD: No actual thread safety
final class Storage: @unchecked Sendable {
var state: State = .idle // Data race!
}
// GOOD: Use Mutex for thread-safe state
private let state: Mutex<State>swift
// 错误:无实际线程安全保障
final class Storage: @unchecked Sendable {
var state: State = .idle // 数据竞争!
}
// 正确:使用Mutex实现线程安全状态管理
private let state: Mutex<State>❌ Making State struct ~Copyable
❌ 将状态结构体设为~Copyable
swift
// BAD: ~Copyable makes Mutex<State> non-copyable, unusable in classes
private struct State: ~Copyable { ... }
// GOOD: State should be Sendable, not ~Copyable
private struct State: Sendable { ... }swift
// 错误:~Copyable会导致Mutex<State>不可复制,无法在类中使用
private struct State: ~Copyable { ... }
// 正确:状态结构体应遵循Sendable协议,而非~Copyable
private struct State: Sendable { ... }❌ Resuming continuations inside locks
❌ 在锁内恢复continuation
swift
// BAD: Resume inside critical region - can deadlock with Swift runtime
state.withLock { state in
continuation.resume(returning: value)
}
// GOOD: Extract continuation, resume outside
let cont = state.withLock { $0.takeContinuation() }
cont?.resume(returning: value)swift
// 错误:在临界区内恢复——可能与Swift运行时产生死锁
state.withLock { state in
continuation.resume(returning: value)
}
// 正确:提取continuation后在锁外恢复
let cont = state.withLock { $0.takeContinuation() }
cont?.resume(returning: value)❌ Ignoring stream termination in tests
❌ 测试中忽略流终止
swift
// BAD: Test hangs forever if finish() not called
let stream = source.makeStream()
for await value in stream { // Never terminates!
collected.append(value)
}
// GOOD: Always ensure stream terminates
source.finish()
for await value in stream {
collected.append(value)
}swift
// 错误:若未调用finish(),测试会永久挂起
let stream = source.makeStream()
for await value in stream { // 永远不会终止!
collected.append(value)
}
// 正确:始终确保流终止
source.finish()
for await value in stream {
collected.append(value)
}❌ Race between subscription and first event
❌ 订阅与首个事件之间的竞争
swift
// BAD: Event can fire before stream is subscribed
let stream = AsyncStream { continuation in
self.continuation = continuation // Race window here!
}
// GOOD: Buffer for late subscribers using SingleValueBufferedStream
let source = SingleValueBufferedStream<Event>()
source.send(event) // Safe even if no subscriber yet
let stream = source.makeStream() // Gets buffered eventswift
// 错误:事件可能在流订阅前触发
let stream = AsyncStream { continuation in
self.continuation = continuation // 存在竞争窗口!
}
// 正确:使用SingleValueBufferedStream为晚订阅者缓冲数据
let source = SingleValueBufferedStream<Event>()
source.send(event) // 即使无订阅者也安全
let stream = source.makeStream() // 会收到缓冲的事件Implementation Checklist
实现检查清单
- Use to hold
final class(Mutex is ~Copyable)Mutex - Use State struct (not ~Copyable)
Sendable - Extract continuations inside lock, resume OUTSIDE
- Handle task cancellation with
withTaskCancellationHandler - Consider buffering strategy for producer/consumer mismatch
- Add lifecycle cleanup via or
deinitonTermination - Ensure streams terminate in tests (call )
finish() - Never use without actual synchronization
@unchecked Sendable
- 使用来持有
final class(Mutex为~Copyable类型)Mutex - 状态结构体遵循协议(非~Copyable)
Sendable - 在锁内提取continuation,在锁外恢复
- 使用处理任务取消
withTaskCancellationHandler - 针对生产者/消费者不匹配的情况,考虑合适的缓冲策略
- 通过或
deinit添加生命周期清理逻辑onTermination - 测试中确保流终止(调用)
finish() - 不要在无实际同步保障的情况下使用
@unchecked Sendable
References
参考资料
- swift-async-algorithms: https://github.com/apple/swift-async-algorithms
- swift-async-algorithms: https://github.com/apple/swift-async-algorithms