combine-reactive
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCombine Reactive — Expert Decisions
Combine响应式编程——专家决策指南
Expert decision frameworks for Combine choices. Claude knows Combine syntax — this skill provides judgment calls for when Combine adds value vs async/await and how to avoid common pitfalls.
Combine框架选型的专家决策框架。Claude熟悉Combine语法——本技能提供判断依据,帮助你确定何时使用Combine更有价值,何时选择async/await,以及如何避开常见陷阱。
Decision Trees
决策树
Combine vs Async/Await
Combine与Async/Await的选择
What's your data flow pattern?
├─ Single async operation (fetch once)
│ └─ async/await
│ Simpler, built into language
│
├─ Stream of values over time
│ └─ Is it UI-driven (user events)?
│ ├─ YES → Combine (debounce, throttle shine here)
│ └─ NO → AsyncSequence may suffice
│
├─ Combining multiple data sources
│ └─ How many sources?
│ ├─ 2-3 → Combine's CombineLatest, Zip
│ └─ Many → Consider structured concurrency (TaskGroup)
│
└─ Existing codebase uses Combine?
└─ Maintain consistency unless migratingThe trap: Using Combine for simple one-shot async. is cleaner and doesn't need cancellable management.
async/await你的数据流模式是什么?
├─ 单次异步操作(仅获取一次数据)
│ └─ async/await
│ 更简洁,语言原生支持
│
├─ 随时间变化的数据流
│ └─ 是否为UI驱动(用户事件)?
│ ├─ 是 → Combine(debounce、throttle在此场景表现出色)
│ └─ 否 → AsyncSequence可能足够
│
├─ 组合多个数据源
│ └─ 数据源数量有多少?
│ ├─ 2-3个 → Combine的CombineLatest、Zip操作符
│ └─ 多个 → 考虑结构化并发(TaskGroup)
│
└─ 现有代码库已使用Combine?
└─ 保持一致性,除非计划迁移误区:用Combine处理简单的单次异步操作。更简洁,无需管理可取消对象。
async/awaitSubject Type Selection
Subject类型选择
Do you need to access current value?
├─ YES → CurrentValueSubject
│ Can read .value synchronously
│ New subscribers get current value immediately
│
└─ NO → Is it event-based (discrete occurrences)?
├─ YES → PassthroughSubject
│ No stored value, subscribers only get new emissions
│
└─ NO (need replay of past values) →
Consider external state management
Combine has no built-in ReplaySubject你需要访问当前值吗?
├─ 是 → CurrentValueSubject
│ 可同步读取.value属性
│ 新订阅者会立即收到当前值
│
└─ 否 → 是否为事件驱动(离散事件)?
├─ 是 → PassthroughSubject
│ 不存储值,订阅者仅接收新发送的事件
│
└─ 否(需要重放过往值)→
考虑外部状态管理方案
Combine没有内置的ReplaySubjectOperator Chain Design
操作符链设计
What transformation is needed?
├─ Value transformation
│ └─ map (simple), compactMap (filter nils), flatMap (nested publishers)
│
├─ Filtering
│ └─ filter, removeDuplicates, first, dropFirst
│
├─ Timing
│ └─ User input → debounce (wait for pause)
│ Scrolling → throttle (rate limit)
│
├─ Error handling
│ └─ Can provide fallback? → catch, replaceError
│ Need retry? → retry(n)
│
└─ Combining streams
└─ Need all values paired? → zip
Need latest from each? → combineLatest
Merge into one stream? → merge你需要什么样的转换?
├─ 值转换
│ └─ map(简单转换)、compactMap(过滤nil值)、flatMap(嵌套发布者)
│
├─ 过滤
│ └─ filter、removeDuplicates、first、dropFirst
│
├─ 时序控制
│ └─ 用户输入 → debounce(等待输入暂停)
│ 滚动事件 → throttle(速率限制)
│
├─ 错误处理
│ └─ 能否提供回退方案?→ catch、replaceError
│ 需要重试?→ retry(n)
│
└─ 数据流组合
└─ 需要将所有值配对?→ zip
需要获取每个流的最新值?→ combineLatest
合并为单个流?→ mergeNEVER Do
绝对禁止的操作
Subscription Management
订阅管理
NEVER forget to store subscriptions:
swift
// ❌ Subscription cancelled immediately
func loadData() {
publisher.sink { value in
self.data = value // Never called!
}
// sink returns AnyCancellable that's immediately deallocated
}
// ✅ Store in Set<AnyCancellable>
private var cancellables = Set<AnyCancellable>()
func loadData() {
publisher.sink { value in
self.data = value
}
.store(in: &cancellables)
}NEVER capture self strongly in sink closures:
swift
// ❌ Retain cycle — ViewModel never deallocates
publisher
.sink { value in
self.updateUI(value) // Strong capture!
}
.store(in: &cancellables)
// ✅ Use [weak self]
publisher
.sink { [weak self] value in
self?.updateUI(value)
}
.store(in: &cancellables)NEVER use assign(to:on:) with classes:
swift
// ❌ Strong reference to self — retain cycle
publisher
.assign(to: \.text, on: self) // Retains self!
.store(in: &cancellables)
// ✅ Use sink with weak self
publisher
.sink { [weak self] value in
self?.text = value
}
.store(in: &cancellables)
// Or use assign(to:) with @Published (no retain cycle)
publisher
.assign(to: &$text) // Safe — doesn't return cancellable绝对不要忘记存储订阅:
swift
// ❌ 订阅会立即被取消
func loadData() {
publisher.sink { value in
self.data = value // 永远不会被调用!
}
// sink返回的AnyCancellable会立即被释放
}
// ✅ 存储在Set<AnyCancellable>中
private var cancellables = Set<AnyCancellable>()
func loadData() {
publisher.sink { value in
self.data = value
}
.store(in: &cancellables)
}绝对不要在sink闭包中强引用self:
swift
// ❌ 循环引用——ViewModel永远不会被释放
publisher
.sink { value in
self.updateUI(value) // 强引用!
}
.store(in: &cancellables)
// ✅ 使用[weak self]
publisher
.sink { [weak self] value in
self?.updateUI(value)
}
.store(in: &cancellables)绝对不要对类使用assign(to:on:):
swift
// ❌ 强引用self——循环引用
publisher
.assign(to: \.text, on: self) // 持有self的强引用!
.store(in: &cancellables)
// ✅ 使用sink配合weak self
publisher
.sink { [weak self] value in
self?.text = value
}
.store(in: &cancellables)
// 或者配合@Published使用assign(to:)(无循环引用)
publisher
.assign(to: &$text) // 安全——不返回可取消对象Subject Misuse
Subject误用
NEVER expose subjects directly:
swift
// ❌ External code can send values
class ViewModel {
let users = PassthroughSubject<[User], Never>() // Public subject!
}
// External code:
viewModel.users.send([]) // Breaks encapsulation
// ✅ Expose as Publisher
class ViewModel {
private let usersSubject = PassthroughSubject<[User], Never>()
var users: AnyPublisher<[User], Never> {
usersSubject.eraseToAnyPublisher()
}
}NEVER send on subject from multiple threads without care:
swift
// ❌ Race condition — undefined behavior
DispatchQueue.global().async {
subject.send(value1)
}
DispatchQueue.global().async {
subject.send(value2)
}
// ✅ Serialize access
let subject = PassthroughSubject<Value, Never>()
let serialQueue = DispatchQueue(label: "subject.serial")
serialQueue.async {
subject.send(value)
}
// Or use .receive(on:) to ensure delivery on specific scheduler绝对不要直接暴露Subject:
swift
// ❌ 外部代码可以发送值
class ViewModel {
let users = PassthroughSubject<[User], Never>() // 公开的Subject!
}
// 外部代码:
viewModel.users.send([]) // 破坏封装性
// ✅ 以Publisher形式暴露
class ViewModel {
private let usersSubject = PassthroughSubject<[User], Never>()
var users: AnyPublisher<[User], Never> {
usersSubject.eraseToAnyPublisher()
}
}绝对不要不加处理地在多线程中向Subject发送值:
swift
// ❌ 竞态条件——行为未定义
DispatchQueue.global().async {
subject.send(value1)
}
DispatchQueue.global().async {
subject.send(value2)
}
// ✅ 序列化访问
let subject = PassthroughSubject<Value, Never>()
let serialQueue = DispatchQueue(label: "subject.serial")
serialQueue.async {
subject.send(value)
}
// 或者使用.receive(on:)确保在指定调度器上交付Operator Mistakes
操作符错误
NEVER use flatMap when you mean map:
swift
// ❌ Confusing — flatMap for non-publisher transformation
publisher
.flatMap { user -> AnyPublisher<String, Never> in
Just(user.name).eraseToAnyPublisher() // Overkill!
}
// ✅ Use map for simple transformation
publisher
.map { user in user.name }NEVER forget to handle errors in sink:
swift
// ❌ Compiler allows this but errors are ignored
publisher // Has Error type
.sink { value in
// Only handles values, error terminates silently
}
// ✅ Handle both completion and value
publisher
.sink(
receiveCompletion: { completion in
if case .failure(let error) = completion {
handleError(error)
}
},
receiveValue: { value in
handleValue(value)
}
)NEVER debounce without scheduler on main:
swift
// ❌ Debouncing on background scheduler, updating UI
searchText
.debounce(for: .milliseconds(300), scheduler: DispatchQueue.global())
.sink { [weak self] text in
self?.results = search(text) // UI update on background thread!
}
// ✅ Receive on main for UI updates
searchText
.debounce(for: .milliseconds(300), scheduler: DispatchQueue.main)
.sink { [weak self] text in
self?.results = search(text)
}
// Or explicitly receive on main
searchText
.debounce(for: .milliseconds(300), scheduler: DispatchQueue.global())
.receive(on: DispatchQueue.main)
.sink { ... }绝对不要在应该用map的时候用flatMap:
swift
// ❌ 混淆用法——用flatMap处理非发布者转换
publisher
.flatMap { user -> AnyPublisher<String, Never> in
Just(user.name).eraseToAnyPublisher() // 小题大做!
}
// ✅ 用map进行简单转换
publisher
.map { user in user.name }绝对不要在sink中忽略错误处理:
swift
// ❌ 编译器允许,但错误会被静默忽略
publisher // 包含Error类型
.sink { value in
// 仅处理值,错误会静默终止流
}
// ✅ 同时处理完成事件和值
publisher
.sink(
receiveCompletion: { completion in
if case .failure(let error) = completion {
handleError(error)
}
},
receiveValue: { value in
handleValue(value)
}
)绝对不要在后台调度器上执行debounce,然后在sink中更新UI:
swift
// ❌ 在后台调度器上防抖,更新UI
searchText
.debounce(for: .milliseconds(300), scheduler: DispatchQueue.global())
.sink { [weak self] text in
self?.results = search(text) // 在后台线程更新UI!
}
// ✅ 在主线程接收事件以更新UI
searchText
.debounce(for: .milliseconds(300), scheduler: DispatchQueue.main)
.sink { [weak self] text in
self?.results = search(text)
}
// 或者显式指定在主线程接收
searchText
.debounce(for: .milliseconds(300), scheduler: DispatchQueue.global())
.receive(on: DispatchQueue.main)
.sink { ... }Essential Patterns
核心模式
ViewModel with Combine
结合Combine的ViewModel
swift
@MainActor
final class SearchViewModel: ObservableObject {
@Published var searchText = ""
@Published private(set) var results: [SearchResult] = []
@Published private(set) var isLoading = false
@Published private(set) var error: Error?
private let searchService: SearchServiceProtocol
private var cancellables = Set<AnyCancellable>()
init(searchService: SearchServiceProtocol) {
self.searchService = searchService
setupSearch()
}
private func setupSearch() {
$searchText
.debounce(for: .milliseconds(300), scheduler: DispatchQueue.main)
.removeDuplicates()
.filter { !$0.isEmpty }
.handleEvents(receiveOutput: { [weak self] _ in
self?.isLoading = true
self?.error = nil
})
.flatMap { [weak self] query -> AnyPublisher<[SearchResult], Never> in
guard let self = self else {
return Just([]).eraseToAnyPublisher()
}
return self.searchService.search(query: query)
.catch { [weak self] error -> Just<[SearchResult]> in
self?.error = error
return Just([])
}
.eraseToAnyPublisher()
}
.receive(on: DispatchQueue.main)
.sink { [weak self] results in
self?.results = results
self?.isLoading = false
}
.store(in: &cancellables)
}
}swift
@MainActor
final class SearchViewModel: ObservableObject {
@Published var searchText = ""
@Published private(set) var results: [SearchResult] = []
@Published private(set) var isLoading = false
@Published private(set) var error: Error?
private let searchService: SearchServiceProtocol
private var cancellables = Set<AnyCancellable>()
init(searchService: SearchServiceProtocol) {
self.searchService = searchService
setupSearch()
}
private func setupSearch() {
$searchText
.debounce(for: .milliseconds(300), scheduler: DispatchQueue.main)
.removeDuplicates()
.filter { !$0.isEmpty }
.handleEvents(receiveOutput: { [weak self] _ in
self?.isLoading = true
self?.error = nil
})
.flatMap { [weak self] query -> AnyPublisher<[SearchResult], Never> in
guard let self = self else {
return Just([]).eraseToAnyPublisher()
}
return self.searchService.search(query: query)
.catch { [weak self] error -> Just<[SearchResult]> in
self?.error = error
return Just([])
}
.eraseToAnyPublisher()
}
.receive(on: DispatchQueue.main)
.sink { [weak self] results in
self?.results = results
self?.isLoading = false
}
.store(in: &cancellables)
}
}Combine to Async Bridge
Combine与Async的桥接
swift
extension Publisher where Failure == Error {
func async() async throws -> Output {
try await withCheckedThrowingContinuation { continuation in
var cancellable: AnyCancellable?
var didResume = false
cancellable = first()
.sink(
receiveCompletion: { completion in
guard !didResume else { return }
didResume = true
switch completion {
case .finished:
break // Value handled in receiveValue
case .failure(let error):
continuation.resume(throwing: error)
}
cancellable?.cancel()
},
receiveValue: { value in
guard !didResume else { return }
didResume = true
continuation.resume(returning: value)
}
)
}
}
}
// Usage
Task {
let user = try await fetchUserPublisher.async()
}swift
extension Publisher where Failure == Error {
func async() async throws -> Output {
try await withCheckedThrowingContinuation { continuation in
var cancellable: AnyCancellable?
var didResume = false
cancellable = first()
.sink(
receiveCompletion: { completion in
guard !didResume else { return }
didResume = true
switch completion {
case .finished:
break // 值在receiveValue中处理
case .failure(let error):
continuation.resume(throwing: error)
}
cancellable?.cancel()
},
receiveValue: { value in
guard !didResume else { return }
didResume = true
continuation.resume(returning: value)
}
)
}
}
}
// 使用示例
Task {
let user = try await fetchUserPublisher.async()
}Event Bus Pattern
事件总线模式
swift
final class EventBus {
static let shared = EventBus()
// Private subjects
private let userLoggedInSubject = PassthroughSubject<User, Never>()
private let userLoggedOutSubject = PassthroughSubject<Void, Never>()
private let cartUpdatedSubject = PassthroughSubject<Cart, Never>()
// Public publishers (read-only)
var userLoggedIn: AnyPublisher<User, Never> {
userLoggedInSubject.eraseToAnyPublisher()
}
var userLoggedOut: AnyPublisher<Void, Never> {
userLoggedOutSubject.eraseToAnyPublisher()
}
var cartUpdated: AnyPublisher<Cart, Never> {
cartUpdatedSubject.eraseToAnyPublisher()
}
// Send methods
func send(userLoggedIn user: User) {
userLoggedInSubject.send(user)
}
func sendUserLoggedOut() {
userLoggedOutSubject.send()
}
func send(cartUpdated cart: Cart) {
cartUpdatedSubject.send(cart)
}
}swift
final class EventBus {
static let shared = EventBus()
// 私有Subject
private let userLoggedInSubject = PassthroughSubject<User, Never>()
private let userLoggedOutSubject = PassthroughSubject<Void, Never>()
private let cartUpdatedSubject = PassthroughSubject<Cart, Never>()
// 公开的Publisher(只读)
var userLoggedIn: AnyPublisher<User, Never> {
userLoggedInSubject.eraseToAnyPublisher()
}
var userLoggedOut: AnyPublisher<Void, Never> {
userLoggedOutSubject.eraseToAnyPublisher()
}
var cartUpdated: AnyPublisher<Cart, Never> {
cartUpdatedSubject.eraseToAnyPublisher()
}
// 发送方法
func send(userLoggedIn user: User) {
userLoggedInSubject.send(user)
}
func sendUserLoggedOut() {
userLoggedOutSubject.send()
}
func send(cartUpdated cart: Cart) {
cartUpdatedSubject.send(cart)
}
}Quick Reference
快速参考
Combine vs Async/Await Decision
Combine与Async/Await决策对比
| Scenario | Prefer |
|---|---|
| Single async call | async/await |
| Stream of values | Combine |
| User input debouncing | Combine |
| Combining multiple API calls | Either (async let or CombineLatest) |
| Existing Combine codebase | Combine for consistency |
| New project, simple needs | async/await |
| 场景 | 首选方案 |
|---|---|
| 单次异步调用 | async/await |
| 随时间变化的数据流 | Combine |
| 用户输入防抖 | Combine |
| 组合多个API调用 | 两者均可(async let或CombineLatest) |
| 现有代码库已用Combine | 保持一致性,继续使用Combine |
| 新项目,需求简单 | async/await |
Subject Comparison
Subject对比
| Subject | Stores Value | New Subscribers Get |
|---|---|---|
| PassthroughSubject | No | Only new values |
| CurrentValueSubject | Yes | Current + new values |
| Subject | 是否存储值 | 新订阅者会收到 |
|---|---|---|
| PassthroughSubject | 否 | 仅新值 |
| CurrentValueSubject | 是 | 当前值 + 新值 |
Common Operators
常用操作符
| Category | Operators |
|---|---|
| Transform | map, flatMap, compactMap, scan |
| Filter | filter, removeDuplicates, first, dropFirst |
| Combine | combineLatest, zip, merge |
| Timing | debounce, throttle, delay |
| Error | catch, retry, replaceError |
| 分类 | 操作符 |
|---|---|
| 转换 | map, flatMap, compactMap, scan |
| 过滤 | filter, removeDuplicates, first, dropFirst |
| 组合 | combineLatest, zip, merge |
| 时序 | debounce, throttle, delay |
| 错误处理 | catch, retry, replaceError |
Red Flags
危险信号
| Smell | Problem | Fix |
|---|---|---|
| Subscription not stored | Immediate cancellation | .store(in: &cancellables) |
| Strong self in sink | Retain cycle | [weak self] |
| assign(to:on:self) | Retain cycle | Use sink or assign(to:&$property) |
| Public Subject | Encapsulation broken | Expose as AnyPublisher |
| flatMap for simple transform | Overkill | Use map |
| Debounce on background, sink updates UI | Threading bug | receive(on: .main) |
| Empty receiveCompletion | Errors ignored | Handle .failure case |
| 代码异味 | 问题 | 修复方案 |
|---|---|---|
| 订阅未存储 | 订阅立即被取消 | .store(in: &cancellables) |
| sink中强引用self | 循环引用 | 使用[weak self] |
| assign(to:on:self) | 循环引用 | 使用sink或assign(to:&$property) |
| 公开Subject | 封装性被破坏 | 以AnyPublisher形式暴露 |
| 用flatMap做简单转换 | 小题大做 | 使用map |
| 在后台防抖后直接更新UI | 线程问题 | 使用receive(on: .main) |
| 空的receiveCompletion | 错误被忽略 | 处理.failure情况 |