combine-reactive

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Combine Reactive — Expert Decisions

Combine响应式编程——专家决策指南

Expert decision frameworks for Combine choices. Claude knows Combine syntax — this skill provides judgment calls for when Combine adds value vs async/await and how to avoid common pitfalls.

Combine框架选型的专家决策框架。Claude熟悉Combine语法——本技能提供判断依据,帮助你确定何时使用Combine更有价值,何时选择async/await,以及如何避开常见陷阱。

Decision Trees

决策树

Combine vs Async/Await

Combine与Async/Await的选择

What's your data flow pattern?
├─ Single async operation (fetch once)
│  └─ async/await
│     Simpler, built into language
├─ Stream of values over time
│  └─ Is it UI-driven (user events)?
│     ├─ YES → Combine (debounce, throttle shine here)
│     └─ NO → AsyncSequence may suffice
├─ Combining multiple data sources
│  └─ How many sources?
│     ├─ 2-3 → Combine's CombineLatest, Zip
│     └─ Many → Consider structured concurrency (TaskGroup)
└─ Existing codebase uses Combine?
   └─ Maintain consistency unless migrating
The trap: Using Combine for simple one-shot async.
async/await
is cleaner and doesn't need cancellable management.
你的数据流模式是什么?
├─ 单次异步操作(仅获取一次数据)
│  └─ async/await
│     更简洁,语言原生支持
├─ 随时间变化的数据流
│  └─ 是否为UI驱动(用户事件)?
│     ├─ 是 → Combine(debounce、throttle在此场景表现出色)
│     └─ 否 → AsyncSequence可能足够
├─ 组合多个数据源
│  └─ 数据源数量有多少?
│     ├─ 2-3个 → Combine的CombineLatest、Zip操作符
│     └─ 多个 → 考虑结构化并发(TaskGroup)
└─ 现有代码库已使用Combine?
   └─ 保持一致性,除非计划迁移
误区:用Combine处理简单的单次异步操作。
async/await
更简洁,无需管理可取消对象。

Subject Type Selection

Subject类型选择

Do you need to access current value?
├─ YES → CurrentValueSubject
│  Can read .value synchronously
│  New subscribers get current value immediately
└─ NO → Is it event-based (discrete occurrences)?
   ├─ YES → PassthroughSubject
   │  No stored value, subscribers only get new emissions
   └─ NO (need replay of past values) →
      Consider external state management
      Combine has no built-in ReplaySubject
你需要访问当前值吗?
├─ 是 → CurrentValueSubject
│  可同步读取.value属性
│  新订阅者会立即收到当前值
└─ 否 → 是否为事件驱动(离散事件)?
   ├─ 是 → PassthroughSubject
   │  不存储值,订阅者仅接收新发送的事件
   └─ 否(需要重放过往值)→
      考虑外部状态管理方案
      Combine没有内置的ReplaySubject

Operator Chain Design

操作符链设计

What transformation is needed?
├─ Value transformation
│  └─ map (simple), compactMap (filter nils), flatMap (nested publishers)
├─ Filtering
│  └─ filter, removeDuplicates, first, dropFirst
├─ Timing
│  └─ User input → debounce (wait for pause)
│     Scrolling → throttle (rate limit)
├─ Error handling
│  └─ Can provide fallback? → catch, replaceError
│     Need retry? → retry(n)
└─ Combining streams
   └─ Need all values paired? → zip
      Need latest from each? → combineLatest
      Merge into one stream? → merge

你需要什么样的转换?
├─ 值转换
│  └─ map(简单转换)、compactMap(过滤nil值)、flatMap(嵌套发布者)
├─ 过滤
│  └─ filter、removeDuplicates、first、dropFirst
├─ 时序控制
│  └─ 用户输入 → debounce(等待输入暂停)
│     滚动事件 → throttle(速率限制)
├─ 错误处理
│  └─ 能否提供回退方案?→ catch、replaceError
│     需要重试?→ retry(n)
└─ 数据流组合
   └─ 需要将所有值配对?→ zip
      需要获取每个流的最新值?→ combineLatest
      合并为单个流?→ merge

NEVER Do

绝对禁止的操作

Subscription Management

订阅管理

NEVER forget to store subscriptions:
swift
// ❌ Subscription cancelled immediately
func loadData() {
    publisher.sink { value in
        self.data = value  // Never called!
    }
    // sink returns AnyCancellable that's immediately deallocated
}

// ✅ Store in Set<AnyCancellable>
private var cancellables = Set<AnyCancellable>()

func loadData() {
    publisher.sink { value in
        self.data = value
    }
    .store(in: &cancellables)
}
NEVER capture self strongly in sink closures:
swift
// ❌ Retain cycle — ViewModel never deallocates
publisher
    .sink { value in
        self.updateUI(value)  // Strong capture!
    }
    .store(in: &cancellables)

// ✅ Use [weak self]
publisher
    .sink { [weak self] value in
        self?.updateUI(value)
    }
    .store(in: &cancellables)
NEVER use assign(to:on:) with classes:
swift
// ❌ Strong reference to self — retain cycle
publisher
    .assign(to: \.text, on: self)  // Retains self!
    .store(in: &cancellables)

// ✅ Use sink with weak self
publisher
    .sink { [weak self] value in
        self?.text = value
    }
    .store(in: &cancellables)

// Or use assign(to:) with @Published (no retain cycle)
publisher
    .assign(to: &$text)  // Safe — doesn't return cancellable
绝对不要忘记存储订阅:
swift
// ❌ 订阅会立即被取消
func loadData() {
    publisher.sink { value in
        self.data = value  // 永远不会被调用!
    }
    // sink返回的AnyCancellable会立即被释放
}

// ✅ 存储在Set<AnyCancellable>中
private var cancellables = Set<AnyCancellable>()

func loadData() {
    publisher.sink { value in
        self.data = value
    }
    .store(in: &cancellables)
}
绝对不要在sink闭包中强引用self:
swift
// ❌ 循环引用——ViewModel永远不会被释放
publisher
    .sink { value in
        self.updateUI(value)  // 强引用!
    }
    .store(in: &cancellables)

// ✅ 使用[weak self]
publisher
    .sink { [weak self] value in
        self?.updateUI(value)
    }
    .store(in: &cancellables)
绝对不要对类使用assign(to:on:):
swift
// ❌ 强引用self——循环引用
publisher
    .assign(to: \.text, on: self)  // 持有self的强引用!
    .store(in: &cancellables)

// ✅ 使用sink配合weak self
publisher
    .sink { [weak self] value in
        self?.text = value
    }
    .store(in: &cancellables)

// 或者配合@Published使用assign(to:)(无循环引用)
publisher
    .assign(to: &$text)  // 安全——不返回可取消对象

Subject Misuse

Subject误用

NEVER expose subjects directly:
swift
// ❌ External code can send values
class ViewModel {
    let users = PassthroughSubject<[User], Never>()  // Public subject!
}

// External code:
viewModel.users.send([])  // Breaks encapsulation

// ✅ Expose as Publisher
class ViewModel {
    private let usersSubject = PassthroughSubject<[User], Never>()

    var users: AnyPublisher<[User], Never> {
        usersSubject.eraseToAnyPublisher()
    }
}
NEVER send on subject from multiple threads without care:
swift
// ❌ Race condition — undefined behavior
DispatchQueue.global().async {
    subject.send(value1)
}
DispatchQueue.global().async {
    subject.send(value2)
}

// ✅ Serialize access
let subject = PassthroughSubject<Value, Never>()
let serialQueue = DispatchQueue(label: "subject.serial")

serialQueue.async {
    subject.send(value)
}

// Or use .receive(on:) to ensure delivery on specific scheduler
绝对不要直接暴露Subject:
swift
// ❌ 外部代码可以发送值
class ViewModel {
    let users = PassthroughSubject<[User], Never>()  // 公开的Subject!
}

// 外部代码:
viewModel.users.send([])  // 破坏封装性

// ✅ 以Publisher形式暴露
class ViewModel {
    private let usersSubject = PassthroughSubject<[User], Never>()

    var users: AnyPublisher<[User], Never> {
        usersSubject.eraseToAnyPublisher()
    }
}
绝对不要不加处理地在多线程中向Subject发送值:
swift
// ❌ 竞态条件——行为未定义
DispatchQueue.global().async {
    subject.send(value1)
}
DispatchQueue.global().async {
    subject.send(value2)
}

// ✅ 序列化访问
let subject = PassthroughSubject<Value, Never>()
let serialQueue = DispatchQueue(label: "subject.serial")

serialQueue.async {
    subject.send(value)
}

// 或者使用.receive(on:)确保在指定调度器上交付

Operator Mistakes

操作符错误

NEVER use flatMap when you mean map:
swift
// ❌ Confusing — flatMap for non-publisher transformation
publisher
    .flatMap { user -> AnyPublisher<String, Never> in
        Just(user.name).eraseToAnyPublisher()  // Overkill!
    }

// ✅ Use map for simple transformation
publisher
    .map { user in user.name }
NEVER forget to handle errors in sink:
swift
// ❌ Compiler allows this but errors are ignored
publisher  // Has Error type
    .sink { value in
        // Only handles values, error terminates silently
    }

// ✅ Handle both completion and value
publisher
    .sink(
        receiveCompletion: { completion in
            if case .failure(let error) = completion {
                handleError(error)
            }
        },
        receiveValue: { value in
            handleValue(value)
        }
    )
NEVER debounce without scheduler on main:
swift
// ❌ Debouncing on background scheduler, updating UI
searchText
    .debounce(for: .milliseconds(300), scheduler: DispatchQueue.global())
    .sink { [weak self] text in
        self?.results = search(text)  // UI update on background thread!
    }

// ✅ Receive on main for UI updates
searchText
    .debounce(for: .milliseconds(300), scheduler: DispatchQueue.main)
    .sink { [weak self] text in
        self?.results = search(text)
    }

// Or explicitly receive on main
searchText
    .debounce(for: .milliseconds(300), scheduler: DispatchQueue.global())
    .receive(on: DispatchQueue.main)
    .sink { ... }

绝对不要在应该用map的时候用flatMap:
swift
// ❌ 混淆用法——用flatMap处理非发布者转换
publisher
    .flatMap { user -> AnyPublisher<String, Never> in
        Just(user.name).eraseToAnyPublisher()  // 小题大做!
    }

// ✅ 用map进行简单转换
publisher
    .map { user in user.name }
绝对不要在sink中忽略错误处理:
swift
// ❌ 编译器允许,但错误会被静默忽略
publisher  // 包含Error类型
    .sink { value in
        // 仅处理值,错误会静默终止流
    }

// ✅ 同时处理完成事件和值
publisher
    .sink(
        receiveCompletion: { completion in
            if case .failure(let error) = completion {
                handleError(error)
            }
        },
        receiveValue: { value in
            handleValue(value)
        }
    )
绝对不要在后台调度器上执行debounce,然后在sink中更新UI:
swift
// ❌ 在后台调度器上防抖,更新UI
searchText
    .debounce(for: .milliseconds(300), scheduler: DispatchQueue.global())
    .sink { [weak self] text in
        self?.results = search(text)  // 在后台线程更新UI!
    }

// ✅ 在主线程接收事件以更新UI
searchText
    .debounce(for: .milliseconds(300), scheduler: DispatchQueue.main)
    .sink { [weak self] text in
        self?.results = search(text)
    }

// 或者显式指定在主线程接收
searchText
    .debounce(for: .milliseconds(300), scheduler: DispatchQueue.global())
    .receive(on: DispatchQueue.main)
    .sink { ... }

Essential Patterns

核心模式

ViewModel with Combine

结合Combine的ViewModel

swift
@MainActor
final class SearchViewModel: ObservableObject {
    @Published var searchText = ""
    @Published private(set) var results: [SearchResult] = []
    @Published private(set) var isLoading = false
    @Published private(set) var error: Error?

    private let searchService: SearchServiceProtocol
    private var cancellables = Set<AnyCancellable>()

    init(searchService: SearchServiceProtocol) {
        self.searchService = searchService
        setupSearch()
    }

    private func setupSearch() {
        $searchText
            .debounce(for: .milliseconds(300), scheduler: DispatchQueue.main)
            .removeDuplicates()
            .filter { !$0.isEmpty }
            .handleEvents(receiveOutput: { [weak self] _ in
                self?.isLoading = true
                self?.error = nil
            })
            .flatMap { [weak self] query -> AnyPublisher<[SearchResult], Never> in
                guard let self = self else {
                    return Just([]).eraseToAnyPublisher()
                }
                return self.searchService.search(query: query)
                    .catch { [weak self] error -> Just<[SearchResult]> in
                        self?.error = error
                        return Just([])
                    }
                    .eraseToAnyPublisher()
            }
            .receive(on: DispatchQueue.main)
            .sink { [weak self] results in
                self?.results = results
                self?.isLoading = false
            }
            .store(in: &cancellables)
    }
}
swift
@MainActor
final class SearchViewModel: ObservableObject {
    @Published var searchText = ""
    @Published private(set) var results: [SearchResult] = []
    @Published private(set) var isLoading = false
    @Published private(set) var error: Error?

    private let searchService: SearchServiceProtocol
    private var cancellables = Set<AnyCancellable>()

    init(searchService: SearchServiceProtocol) {
        self.searchService = searchService
        setupSearch()
    }

    private func setupSearch() {
        $searchText
            .debounce(for: .milliseconds(300), scheduler: DispatchQueue.main)
            .removeDuplicates()
            .filter { !$0.isEmpty }
            .handleEvents(receiveOutput: { [weak self] _ in
                self?.isLoading = true
                self?.error = nil
            })
            .flatMap { [weak self] query -> AnyPublisher<[SearchResult], Never> in
                guard let self = self else {
                    return Just([]).eraseToAnyPublisher()
                }
                return self.searchService.search(query: query)
                    .catch { [weak self] error -> Just<[SearchResult]> in
                        self?.error = error
                        return Just([])
                    }
                    .eraseToAnyPublisher()
            }
            .receive(on: DispatchQueue.main)
            .sink { [weak self] results in
                self?.results = results
                self?.isLoading = false
            }
            .store(in: &cancellables)
    }
}

Combine to Async Bridge

Combine与Async的桥接

swift
extension Publisher where Failure == Error {
    func async() async throws -> Output {
        try await withCheckedThrowingContinuation { continuation in
            var cancellable: AnyCancellable?
            var didResume = false

            cancellable = first()
                .sink(
                    receiveCompletion: { completion in
                        guard !didResume else { return }
                        didResume = true

                        switch completion {
                        case .finished:
                            break  // Value handled in receiveValue
                        case .failure(let error):
                            continuation.resume(throwing: error)
                        }
                        cancellable?.cancel()
                    },
                    receiveValue: { value in
                        guard !didResume else { return }
                        didResume = true
                        continuation.resume(returning: value)
                    }
                )
        }
    }
}

// Usage
Task {
    let user = try await fetchUserPublisher.async()
}
swift
extension Publisher where Failure == Error {
    func async() async throws -> Output {
        try await withCheckedThrowingContinuation { continuation in
            var cancellable: AnyCancellable?
            var didResume = false

            cancellable = first()
                .sink(
                    receiveCompletion: { completion in
                        guard !didResume else { return }
                        didResume = true

                        switch completion {
                        case .finished:
                            break  // 值在receiveValue中处理
                        case .failure(let error):
                            continuation.resume(throwing: error)
                        }
                        cancellable?.cancel()
                    },
                    receiveValue: { value in
                        guard !didResume else { return }
                        didResume = true
                        continuation.resume(returning: value)
                    }
                )
        }
    }
}

// 使用示例
Task {
    let user = try await fetchUserPublisher.async()
}

Event Bus Pattern

事件总线模式

swift
final class EventBus {
    static let shared = EventBus()

    // Private subjects
    private let userLoggedInSubject = PassthroughSubject<User, Never>()
    private let userLoggedOutSubject = PassthroughSubject<Void, Never>()
    private let cartUpdatedSubject = PassthroughSubject<Cart, Never>()

    // Public publishers (read-only)
    var userLoggedIn: AnyPublisher<User, Never> {
        userLoggedInSubject.eraseToAnyPublisher()
    }

    var userLoggedOut: AnyPublisher<Void, Never> {
        userLoggedOutSubject.eraseToAnyPublisher()
    }

    var cartUpdated: AnyPublisher<Cart, Never> {
        cartUpdatedSubject.eraseToAnyPublisher()
    }

    // Send methods
    func send(userLoggedIn user: User) {
        userLoggedInSubject.send(user)
    }

    func sendUserLoggedOut() {
        userLoggedOutSubject.send()
    }

    func send(cartUpdated cart: Cart) {
        cartUpdatedSubject.send(cart)
    }
}

swift
final class EventBus {
    static let shared = EventBus()

    // 私有Subject
    private let userLoggedInSubject = PassthroughSubject<User, Never>()
    private let userLoggedOutSubject = PassthroughSubject<Void, Never>()
    private let cartUpdatedSubject = PassthroughSubject<Cart, Never>()

    // 公开的Publisher(只读)
    var userLoggedIn: AnyPublisher<User, Never> {
        userLoggedInSubject.eraseToAnyPublisher()
    }

    var userLoggedOut: AnyPublisher<Void, Never> {
        userLoggedOutSubject.eraseToAnyPublisher()
    }

    var cartUpdated: AnyPublisher<Cart, Never> {
        cartUpdatedSubject.eraseToAnyPublisher()
    }

    // 发送方法
    func send(userLoggedIn user: User) {
        userLoggedInSubject.send(user)
    }

    func sendUserLoggedOut() {
        userLoggedOutSubject.send()
    }

    func send(cartUpdated cart: Cart) {
        cartUpdatedSubject.send(cart)
    }
}

Quick Reference

快速参考

Combine vs Async/Await Decision

Combine与Async/Await决策对比

ScenarioPrefer
Single async callasync/await
Stream of valuesCombine
User input debouncingCombine
Combining multiple API callsEither (async let or CombineLatest)
Existing Combine codebaseCombine for consistency
New project, simple needsasync/await
场景首选方案
单次异步调用async/await
随时间变化的数据流Combine
用户输入防抖Combine
组合多个API调用两者均可(async let或CombineLatest)
现有代码库已用Combine保持一致性,继续使用Combine
新项目,需求简单async/await

Subject Comparison

Subject对比

SubjectStores ValueNew Subscribers Get
PassthroughSubjectNoOnly new values
CurrentValueSubjectYesCurrent + new values
Subject是否存储值新订阅者会收到
PassthroughSubject仅新值
CurrentValueSubject当前值 + 新值

Common Operators

常用操作符

CategoryOperators
Transformmap, flatMap, compactMap, scan
Filterfilter, removeDuplicates, first, dropFirst
CombinecombineLatest, zip, merge
Timingdebounce, throttle, delay
Errorcatch, retry, replaceError
分类操作符
转换map, flatMap, compactMap, scan
过滤filter, removeDuplicates, first, dropFirst
组合combineLatest, zip, merge
时序debounce, throttle, delay
错误处理catch, retry, replaceError

Red Flags

危险信号

SmellProblemFix
Subscription not storedImmediate cancellation.store(in: &cancellables)
Strong self in sinkRetain cycle[weak self]
assign(to:on:self)Retain cycleUse sink or assign(to:&$property)
Public SubjectEncapsulation brokenExpose as AnyPublisher
flatMap for simple transformOverkillUse map
Debounce on background, sink updates UIThreading bugreceive(on: .main)
Empty receiveCompletionErrors ignoredHandle .failure case
代码异味问题修复方案
订阅未存储订阅立即被取消.store(in: &cancellables)
sink中强引用self循环引用使用[weak self]
assign(to:on:self)循环引用使用sink或assign(to:&$property)
公开Subject封装性被破坏以AnyPublisher形式暴露
用flatMap做简单转换小题大做使用map
在后台防抖后直接更新UI线程问题使用receive(on: .main)
空的receiveCompletion错误被忽略处理.failure情况