kotlin-coroutines-flows
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseKotlin Coroutines & Flows
Kotlin Coroutines & Flows
Patterns for structured concurrency, Flow-based reactive streams, and coroutine testing in Android and Kotlin Multiplatform projects.
适用于Android和Kotlin Multiplatform项目的结构化并发、基于Flow的响应式流及协程测试模式。
When to Activate
适用场景
- Writing async code with Kotlin coroutines
- Using Flow, StateFlow, or SharedFlow for reactive data
- Handling concurrent operations (parallel loading, debounce, retry)
- Testing coroutines and Flows
- Managing coroutine scopes and cancellation
- 使用Kotlin协程编写异步代码
- 使用Flow、StateFlow或SharedFlow处理响应式数据
- 处理并发操作(并行加载、防抖、重试)
- 测试协程与Flow
- 管理协程作用域与取消
Structured Concurrency
结构化并发
Scope Hierarchy
作用域层级
Application
└── viewModelScope (ViewModel)
└── coroutineScope { } (structured child)
├── async { } (concurrent task)
└── async { } (concurrent task)Always use structured concurrency — never :
GlobalScopekotlin
// BAD
GlobalScope.launch { fetchData() }
// GOOD — scoped to ViewModel lifecycle
viewModelScope.launch { fetchData() }
// GOOD — scoped to composable lifecycle
LaunchedEffect(key) { fetchData() }Application
└── viewModelScope (ViewModel)
└── coroutineScope { } (结构化子作用域)
├── async { } (并发任务)
└── async { } (并发任务)始终使用结构化并发——切勿使用:
GlobalScopekotlin
// 错误示例
GlobalScope.launch { fetchData() }
// 正确示例 — 作用域绑定至ViewModel生命周期
viewModelScope.launch { fetchData() }
// 正确示例 — 作用域绑定至Composable生命周期
LaunchedEffect(key) { fetchData() }Parallel Decomposition
并行分解
Use + for parallel work:
coroutineScopeasynckotlin
suspend fun loadDashboard(): Dashboard = coroutineScope {
val items = async { itemRepository.getRecent() }
val stats = async { statsRepository.getToday() }
val profile = async { userRepository.getCurrent() }
Dashboard(
items = items.await(),
stats = stats.await(),
profile = profile.await()
)
}使用 + 处理并行任务:
coroutineScopeasynckotlin
suspend fun loadDashboard(): Dashboard = coroutineScope {
val items = async { itemRepository.getRecent() }
val stats = async { statsRepository.getToday() }
val profile = async { userRepository.getCurrent() }
Dashboard(
items = items.await(),
stats = stats.await(),
profile = profile.await()
)
}SupervisorScope
SupervisorScope
Use when child failures should not cancel siblings:
supervisorScopekotlin
suspend fun syncAll() = supervisorScope {
launch { syncItems() } // failure here won't cancel syncStats
launch { syncStats() }
launch { syncSettings() }
}当子任务失败不应取消其他兄弟任务时,使用:
supervisorScopekotlin
suspend fun syncAll() = supervisorScope {
launch { syncItems() } // 此处失败不会取消syncStats
launch { syncStats() }
launch { syncSettings() }
}Flow Patterns
Flow模式
Cold Flow — One-Shot to Stream Conversion
冷流——单次操作转流
kotlin
fun observeItems(): Flow<List<Item>> = flow {
// Re-emits whenever the database changes
itemDao.observeAll()
.map { entities -> entities.map { it.toDomain() } }
.collect { emit(it) }
}kotlin
fun observeItems(): Flow<List<Item>> = flow {
// 数据库变更时重新发射数据
itemDao.observeAll()
.map { entities -> entities.map { it.toDomain() } }
.collect { emit(it) }
}StateFlow for UI State
用于UI状态的StateFlow
kotlin
class DashboardViewModel(
observeProgress: ObserveUserProgressUseCase
) : ViewModel() {
val progress: StateFlow<UserProgress> = observeProgress()
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5_000),
initialValue = UserProgress.EMPTY
)
}WhileSubscribed(5_000)kotlin
class DashboardViewModel(
observeProgress: ObserveUserProgressUseCase
) : ViewModel() {
val progress: StateFlow<UserProgress> = observeProgress()
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5_000),
initialValue = UserProgress.EMPTY
)
}WhileSubscribed(5_000)Combining Multiple Flows
多Flow组合
kotlin
val uiState: StateFlow<HomeState> = combine(
itemRepository.observeItems(),
settingsRepository.observeTheme(),
userRepository.observeProfile()
) { items, theme, profile ->
HomeState(items = items, theme = theme, profile = profile)
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5_000), HomeState())kotlin
val uiState: StateFlow<HomeState> = combine(
itemRepository.observeItems(),
settingsRepository.observeTheme(),
userRepository.observeProfile()
) { items, theme, profile ->
HomeState(items = items, theme = theme, profile = profile)
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5_000), HomeState())Flow Operators
Flow操作符
kotlin
// Debounce search input
searchQuery
.debounce(300)
.distinctUntilChanged()
.flatMapLatest { query -> repository.search(query) }
.catch { emit(emptyList()) }
.collect { results -> _state.update { it.copy(results = results) } }
// Retry with exponential backoff
fun fetchWithRetry(): Flow<Data> = flow { emit(api.fetch()) }
.retryWhen { cause, attempt ->
if (cause is IOException && attempt < 3) {
delay(1000L * (1 shl attempt.toInt()))
true
} else {
false
}
}kotlin
// 搜索输入防抖
searchQuery
.debounce(300)
.distinctUntilChanged()
.flatMapLatest { query -> repository.search(query) }
.catch { emit(emptyList()) }
.collect { results -> _state.update { it.copy(results = results) } }
// 指数退避重试
fun fetchWithRetry(): Flow<Data> = flow { emit(api.fetch()) }
.retryWhen { cause, attempt ->
if (cause is IOException && attempt < 3) {
delay(1000L * (1 shl attempt.toInt()))
true
} else {
false
}
}SharedFlow for One-Time Events
用于一次性事件的SharedFlow
kotlin
class ItemListViewModel : ViewModel() {
private val _effects = MutableSharedFlow<Effect>()
val effects: SharedFlow<Effect> = _effects.asSharedFlow()
sealed interface Effect {
data class ShowSnackbar(val message: String) : Effect
data class NavigateTo(val route: String) : Effect
}
private fun deleteItem(id: String) {
viewModelScope.launch {
repository.delete(id)
_effects.emit(Effect.ShowSnackbar("Item deleted"))
}
}
}
// Collect in Composable
LaunchedEffect(Unit) {
viewModel.effects.collect { effect ->
when (effect) {
is Effect.ShowSnackbar -> snackbarHostState.showSnackbar(effect.message)
is Effect.NavigateTo -> navController.navigate(effect.route)
}
}
}kotlin
class ItemListViewModel : ViewModel() {
private val _effects = MutableSharedFlow<Effect>()
val effects: SharedFlow<Effect> = _effects.asSharedFlow()
sealed interface Effect {
data class ShowSnackbar(val message: String) : Effect
data class NavigateTo(val route: String) : Effect
}
private fun deleteItem(id: String) {
viewModelScope.launch {
repository.delete(id)
_effects.emit(Effect.ShowSnackbar("Item deleted"))
}
}
}
// 在Composable中收集
LaunchedEffect(Unit) {
viewModel.effects.collect { effect ->
when (effect) {
is Effect.ShowSnackbar -> snackbarHostState.showSnackbar(effect.message)
is Effect.NavigateTo -> navController.navigate(effect.route)
}
}
}Dispatchers
调度器
kotlin
// CPU-intensive work
withContext(Dispatchers.Default) { parseJson(largePayload) }
// IO-bound work
withContext(Dispatchers.IO) { database.query() }
// Main thread (UI) — default in viewModelScope
withContext(Dispatchers.Main) { updateUi() }In KMP, use and (available on all platforms). is JVM/Android only — use on other platforms or provide via DI.
Dispatchers.DefaultDispatchers.MainDispatchers.IODispatchers.Defaultkotlin
// CPU密集型任务
withContext(Dispatchers.Default) { parseJson(largePayload) }
// IO密集型任务
withContext(Dispatchers.IO) { database.query() }
// 主线程(UI)——viewModelScope默认使用
withContext(Dispatchers.Main) { updateUi() }在KMP中,使用和(全平台可用)。仅适用于JVM/Android平台——在其他平台使用或通过依赖注入提供。
Dispatchers.DefaultDispatchers.MainDispatchers.IODispatchers.DefaultCancellation
取消机制
Cooperative Cancellation
协作式取消
Long-running loops must check for cancellation:
kotlin
suspend fun processItems(items: List<Item>) = coroutineScope {
for (item in items) {
ensureActive() // throws CancellationException if cancelled
process(item)
}
}长时间运行的循环必须检查取消状态:
kotlin
suspend fun processItems(items: List<Item>) = coroutineScope {
for (item in items) {
ensureActive() // 若已取消则抛出CancellationException
process(item)
}
}Cleanup with try/finally
使用try/finally清理资源
kotlin
viewModelScope.launch {
try {
_state.update { it.copy(isLoading = true) }
val data = repository.fetch()
_state.update { it.copy(data = data) }
} finally {
_state.update { it.copy(isLoading = false) } // always runs, even on cancellation
}
}kotlin
viewModelScope.launch {
try {
_state.update { it.copy(isLoading = true) }
val data = repository.fetch()
_state.update { it.copy(data = data) }
} finally {
_state.update { it.copy(isLoading = false) } // 即使取消也会执行
}
}Testing
测试
Testing StateFlow with Turbine
使用Turbine测试StateFlow
kotlin
@Test
fun `search updates item list`() = runTest {
val fakeRepository = FakeItemRepository().apply { emit(testItems) }
val viewModel = ItemListViewModel(GetItemsUseCase(fakeRepository))
viewModel.state.test {
assertEquals(ItemListState(), awaitItem()) // initial
viewModel.onSearch("query")
val loading = awaitItem()
assertTrue(loading.isLoading)
val loaded = awaitItem()
assertFalse(loaded.isLoading)
assertEquals(1, loaded.items.size)
}
}kotlin
@Test
fun `search updates item list`() = runTest {
val fakeRepository = FakeItemRepository().apply { emit(testItems) }
val viewModel = ItemListViewModel(GetItemsUseCase(fakeRepository))
viewModel.state.test {
assertEquals(ItemListState(), awaitItem()) // 初始状态
viewModel.onSearch("query")
val loading = awaitItem()
assertTrue(loading.isLoading)
val loaded = awaitItem()
assertFalse(loaded.isLoading)
assertEquals(1, loaded.items.size)
}
}Testing with TestDispatcher
使用TestDispatcher测试
kotlin
@Test
fun `parallel load completes correctly`() = runTest {
val viewModel = DashboardViewModel(
itemRepo = FakeItemRepo(),
statsRepo = FakeStatsRepo()
)
viewModel.load()
advanceUntilIdle()
val state = viewModel.state.value
assertNotNull(state.items)
assertNotNull(state.stats)
}kotlin
@Test
fun `parallel load completes correctly`() = runTest {
val viewModel = DashboardViewModel(
itemRepo = FakeItemRepo(),
statsRepo = FakeStatsRepo()
)
viewModel.load()
advanceUntilIdle()
val state = viewModel.state.value
assertNotNull(state.items)
assertNotNull(state.stats)
}Faking Flows
模拟Flow
kotlin
class FakeItemRepository : ItemRepository {
private val _items = MutableStateFlow<List<Item>>(emptyList())
override fun observeItems(): Flow<List<Item>> = _items
fun emit(items: List<Item>) { _items.value = items }
override suspend fun getItemsByCategory(category: String): Result<List<Item>> {
return Result.success(_items.value.filter { it.category == category })
}
}kotlin
class FakeItemRepository : ItemRepository {
private val _items = MutableStateFlow<List<Item>>(emptyList())
override fun observeItems(): Flow<List<Item>> = _items
fun emit(items: List<Item>) { _items.value = items }
override suspend fun getItemsByCategory(category: String): Result<List<Item>> {
return Result.success(_items.value.filter { it.category == category })
}
}Anti-Patterns to Avoid
需避免的反模式
- Using — leaks coroutines, no structured cancellation
GlobalScope - Collecting Flows in without a scope — use
init {}viewModelScope.launch - Using with mutable collections — always use immutable copies:
MutableStateFlow_state.update { it.copy(list = it.list + newItem) } - Catching — let it propagate for proper cancellation
CancellationException - Using to collect — collection dispatcher is the caller's dispatcher
flowOn(Dispatchers.Main) - Creating in
Flowwithout@Composable— recreates the flow every recompositionremember
- 使用——会导致协程泄漏,无结构化取消
GlobalScope - 在中无作用域地收集Flow——应使用
init {}viewModelScope.launch - 使用搭配可变集合——始终使用不可变副本:
MutableStateFlow_state.update { it.copy(list = it.list + newItem) } - 捕获——应让其传播以实现正确取消
CancellationException - 使用进行收集——收集调度器由调用方决定
flowOn(Dispatchers.Main) - 在中不使用
@Composable创建Flow——重组时会重新创建Flowremember
References
参考资料
See skill: for UI consumption of Flows.
See skill: for where coroutines fit in layers.
compose-multiplatform-patternsandroid-clean-architecture查看技能:以了解Flow在UI中的使用方式。
查看技能:以了解协程在各层级中的定位。
compose-multiplatform-patternsandroid-clean-architecture