kotlin-coroutines-flows

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Kotlin Coroutines & Flows

Kotlin Coroutines & Flows

Patterns for structured concurrency, Flow-based reactive streams, and coroutine testing in Android and Kotlin Multiplatform projects.
适用于Android和Kotlin Multiplatform项目的结构化并发、基于Flow的响应式流及协程测试模式。

When to Activate

适用场景

  • Writing async code with Kotlin coroutines
  • Using Flow, StateFlow, or SharedFlow for reactive data
  • Handling concurrent operations (parallel loading, debounce, retry)
  • Testing coroutines and Flows
  • Managing coroutine scopes and cancellation
  • 使用Kotlin协程编写异步代码
  • 使用Flow、StateFlow或SharedFlow处理响应式数据
  • 处理并发操作(并行加载、防抖、重试)
  • 测试协程与Flow
  • 管理协程作用域与取消

Structured Concurrency

结构化并发

Scope Hierarchy

作用域层级

Application
  └── viewModelScope (ViewModel)
        └── coroutineScope { } (structured child)
              ├── async { } (concurrent task)
              └── async { } (concurrent task)
Always use structured concurrency — never
GlobalScope
:
kotlin
// BAD
GlobalScope.launch { fetchData() }

// GOOD — scoped to ViewModel lifecycle
viewModelScope.launch { fetchData() }

// GOOD — scoped to composable lifecycle
LaunchedEffect(key) { fetchData() }
Application
  └── viewModelScope (ViewModel)
        └── coroutineScope { } (结构化子作用域)
              ├── async { } (并发任务)
              └── async { } (并发任务)
始终使用结构化并发——切勿使用
GlobalScope
kotlin
// 错误示例
GlobalScope.launch { fetchData() }

// 正确示例 — 作用域绑定至ViewModel生命周期
viewModelScope.launch { fetchData() }

// 正确示例 — 作用域绑定至Composable生命周期
LaunchedEffect(key) { fetchData() }

Parallel Decomposition

并行分解

Use
coroutineScope
+
async
for parallel work:
kotlin
suspend fun loadDashboard(): Dashboard = coroutineScope {
    val items = async { itemRepository.getRecent() }
    val stats = async { statsRepository.getToday() }
    val profile = async { userRepository.getCurrent() }
    Dashboard(
        items = items.await(),
        stats = stats.await(),
        profile = profile.await()
    )
}
使用
coroutineScope
+
async
处理并行任务:
kotlin
suspend fun loadDashboard(): Dashboard = coroutineScope {
    val items = async { itemRepository.getRecent() }
    val stats = async { statsRepository.getToday() }
    val profile = async { userRepository.getCurrent() }
    Dashboard(
        items = items.await(),
        stats = stats.await(),
        profile = profile.await()
    )
}

SupervisorScope

SupervisorScope

Use
supervisorScope
when child failures should not cancel siblings:
kotlin
suspend fun syncAll() = supervisorScope {
    launch { syncItems() }       // failure here won't cancel syncStats
    launch { syncStats() }
    launch { syncSettings() }
}
当子任务失败不应取消其他兄弟任务时,使用
supervisorScope
kotlin
suspend fun syncAll() = supervisorScope {
    launch { syncItems() }       // 此处失败不会取消syncStats
    launch { syncStats() }
    launch { syncSettings() }
}

Flow Patterns

Flow模式

Cold Flow — One-Shot to Stream Conversion

冷流——单次操作转流

kotlin
fun observeItems(): Flow<List<Item>> = flow {
    // Re-emits whenever the database changes
    itemDao.observeAll()
        .map { entities -> entities.map { it.toDomain() } }
        .collect { emit(it) }
}
kotlin
fun observeItems(): Flow<List<Item>> = flow {
    // 数据库变更时重新发射数据
    itemDao.observeAll()
        .map { entities -> entities.map { it.toDomain() } }
        .collect { emit(it) }
}

StateFlow for UI State

用于UI状态的StateFlow

kotlin
class DashboardViewModel(
    observeProgress: ObserveUserProgressUseCase
) : ViewModel() {
    val progress: StateFlow<UserProgress> = observeProgress()
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5_000),
            initialValue = UserProgress.EMPTY
        )
}
WhileSubscribed(5_000)
keeps the upstream active for 5 seconds after the last subscriber leaves — survives configuration changes without restarting.
kotlin
class DashboardViewModel(
    observeProgress: ObserveUserProgressUseCase
) : ViewModel() {
    val progress: StateFlow<UserProgress> = observeProgress()
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5_000),
            initialValue = UserProgress.EMPTY
        )
}
WhileSubscribed(5_000)
表示最后一个订阅者离开后,保持上游活跃5秒——配置变更时无需重启即可恢复。

Combining Multiple Flows

多Flow组合

kotlin
val uiState: StateFlow<HomeState> = combine(
    itemRepository.observeItems(),
    settingsRepository.observeTheme(),
    userRepository.observeProfile()
) { items, theme, profile ->
    HomeState(items = items, theme = theme, profile = profile)
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5_000), HomeState())
kotlin
val uiState: StateFlow<HomeState> = combine(
    itemRepository.observeItems(),
    settingsRepository.observeTheme(),
    userRepository.observeProfile()
) { items, theme, profile ->
    HomeState(items = items, theme = theme, profile = profile)
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5_000), HomeState())

Flow Operators

Flow操作符

kotlin
// Debounce search input
searchQuery
    .debounce(300)
    .distinctUntilChanged()
    .flatMapLatest { query -> repository.search(query) }
    .catch { emit(emptyList()) }
    .collect { results -> _state.update { it.copy(results = results) } }

// Retry with exponential backoff
fun fetchWithRetry(): Flow<Data> = flow { emit(api.fetch()) }
    .retryWhen { cause, attempt ->
        if (cause is IOException && attempt < 3) {
            delay(1000L * (1 shl attempt.toInt()))
            true
        } else {
            false
        }
    }
kotlin
// 搜索输入防抖
searchQuery
    .debounce(300)
    .distinctUntilChanged()
    .flatMapLatest { query -> repository.search(query) }
    .catch { emit(emptyList()) }
    .collect { results -> _state.update { it.copy(results = results) } }

// 指数退避重试
fun fetchWithRetry(): Flow<Data> = flow { emit(api.fetch()) }
    .retryWhen { cause, attempt ->
        if (cause is IOException && attempt < 3) {
            delay(1000L * (1 shl attempt.toInt()))
            true
        } else {
            false
        }
    }

SharedFlow for One-Time Events

用于一次性事件的SharedFlow

kotlin
class ItemListViewModel : ViewModel() {
    private val _effects = MutableSharedFlow<Effect>()
    val effects: SharedFlow<Effect> = _effects.asSharedFlow()

    sealed interface Effect {
        data class ShowSnackbar(val message: String) : Effect
        data class NavigateTo(val route: String) : Effect
    }

    private fun deleteItem(id: String) {
        viewModelScope.launch {
            repository.delete(id)
            _effects.emit(Effect.ShowSnackbar("Item deleted"))
        }
    }
}

// Collect in Composable
LaunchedEffect(Unit) {
    viewModel.effects.collect { effect ->
        when (effect) {
            is Effect.ShowSnackbar -> snackbarHostState.showSnackbar(effect.message)
            is Effect.NavigateTo -> navController.navigate(effect.route)
        }
    }
}
kotlin
class ItemListViewModel : ViewModel() {
    private val _effects = MutableSharedFlow<Effect>()
    val effects: SharedFlow<Effect> = _effects.asSharedFlow()

    sealed interface Effect {
        data class ShowSnackbar(val message: String) : Effect
        data class NavigateTo(val route: String) : Effect
    }

    private fun deleteItem(id: String) {
        viewModelScope.launch {
            repository.delete(id)
            _effects.emit(Effect.ShowSnackbar("Item deleted"))
        }
    }
}

// 在Composable中收集
LaunchedEffect(Unit) {
    viewModel.effects.collect { effect ->
        when (effect) {
            is Effect.ShowSnackbar -> snackbarHostState.showSnackbar(effect.message)
            is Effect.NavigateTo -> navController.navigate(effect.route)
        }
    }
}

Dispatchers

调度器

kotlin
// CPU-intensive work
withContext(Dispatchers.Default) { parseJson(largePayload) }

// IO-bound work
withContext(Dispatchers.IO) { database.query() }

// Main thread (UI) — default in viewModelScope
withContext(Dispatchers.Main) { updateUi() }
In KMP, use
Dispatchers.Default
and
Dispatchers.Main
(available on all platforms).
Dispatchers.IO
is JVM/Android only — use
Dispatchers.Default
on other platforms or provide via DI.
kotlin
// CPU密集型任务
withContext(Dispatchers.Default) { parseJson(largePayload) }

// IO密集型任务
withContext(Dispatchers.IO) { database.query() }

// 主线程(UI)——viewModelScope默认使用
withContext(Dispatchers.Main) { updateUi() }
在KMP中,使用
Dispatchers.Default
Dispatchers.Main
(全平台可用)。
Dispatchers.IO
仅适用于JVM/Android平台——在其他平台使用
Dispatchers.Default
或通过依赖注入提供。

Cancellation

取消机制

Cooperative Cancellation

协作式取消

Long-running loops must check for cancellation:
kotlin
suspend fun processItems(items: List<Item>) = coroutineScope {
    for (item in items) {
        ensureActive()  // throws CancellationException if cancelled
        process(item)
    }
}
长时间运行的循环必须检查取消状态:
kotlin
suspend fun processItems(items: List<Item>) = coroutineScope {
    for (item in items) {
        ensureActive()  // 若已取消则抛出CancellationException
        process(item)
    }
}

Cleanup with try/finally

使用try/finally清理资源

kotlin
viewModelScope.launch {
    try {
        _state.update { it.copy(isLoading = true) }
        val data = repository.fetch()
        _state.update { it.copy(data = data) }
    } finally {
        _state.update { it.copy(isLoading = false) }  // always runs, even on cancellation
    }
}
kotlin
viewModelScope.launch {
    try {
        _state.update { it.copy(isLoading = true) }
        val data = repository.fetch()
        _state.update { it.copy(data = data) }
    } finally {
        _state.update { it.copy(isLoading = false) }  // 即使取消也会执行
    }
}

Testing

测试

Testing StateFlow with Turbine

使用Turbine测试StateFlow

kotlin
@Test
fun `search updates item list`() = runTest {
    val fakeRepository = FakeItemRepository().apply { emit(testItems) }
    val viewModel = ItemListViewModel(GetItemsUseCase(fakeRepository))

    viewModel.state.test {
        assertEquals(ItemListState(), awaitItem())  // initial

        viewModel.onSearch("query")
        val loading = awaitItem()
        assertTrue(loading.isLoading)

        val loaded = awaitItem()
        assertFalse(loaded.isLoading)
        assertEquals(1, loaded.items.size)
    }
}
kotlin
@Test
fun `search updates item list`() = runTest {
    val fakeRepository = FakeItemRepository().apply { emit(testItems) }
    val viewModel = ItemListViewModel(GetItemsUseCase(fakeRepository))

    viewModel.state.test {
        assertEquals(ItemListState(), awaitItem())  // 初始状态

        viewModel.onSearch("query")
        val loading = awaitItem()
        assertTrue(loading.isLoading)

        val loaded = awaitItem()
        assertFalse(loaded.isLoading)
        assertEquals(1, loaded.items.size)
    }
}

Testing with TestDispatcher

使用TestDispatcher测试

kotlin
@Test
fun `parallel load completes correctly`() = runTest {
    val viewModel = DashboardViewModel(
        itemRepo = FakeItemRepo(),
        statsRepo = FakeStatsRepo()
    )

    viewModel.load()
    advanceUntilIdle()

    val state = viewModel.state.value
    assertNotNull(state.items)
    assertNotNull(state.stats)
}
kotlin
@Test
fun `parallel load completes correctly`() = runTest {
    val viewModel = DashboardViewModel(
        itemRepo = FakeItemRepo(),
        statsRepo = FakeStatsRepo()
    )

    viewModel.load()
    advanceUntilIdle()

    val state = viewModel.state.value
    assertNotNull(state.items)
    assertNotNull(state.stats)
}

Faking Flows

模拟Flow

kotlin
class FakeItemRepository : ItemRepository {
    private val _items = MutableStateFlow<List<Item>>(emptyList())

    override fun observeItems(): Flow<List<Item>> = _items

    fun emit(items: List<Item>) { _items.value = items }

    override suspend fun getItemsByCategory(category: String): Result<List<Item>> {
        return Result.success(_items.value.filter { it.category == category })
    }
}
kotlin
class FakeItemRepository : ItemRepository {
    private val _items = MutableStateFlow<List<Item>>(emptyList())

    override fun observeItems(): Flow<List<Item>> = _items

    fun emit(items: List<Item>) { _items.value = items }

    override suspend fun getItemsByCategory(category: String): Result<List<Item>> {
        return Result.success(_items.value.filter { it.category == category })
    }
}

Anti-Patterns to Avoid

需避免的反模式

  • Using
    GlobalScope
    — leaks coroutines, no structured cancellation
  • Collecting Flows in
    init {}
    without a scope — use
    viewModelScope.launch
  • Using
    MutableStateFlow
    with mutable collections — always use immutable copies:
    _state.update { it.copy(list = it.list + newItem) }
  • Catching
    CancellationException
    — let it propagate for proper cancellation
  • Using
    flowOn(Dispatchers.Main)
    to collect — collection dispatcher is the caller's dispatcher
  • Creating
    Flow
    in
    @Composable
    without
    remember
    — recreates the flow every recomposition
  • 使用
    GlobalScope
    ——会导致协程泄漏,无结构化取消
  • init {}
    中无作用域地收集Flow——应使用
    viewModelScope.launch
  • 使用
    MutableStateFlow
    搭配可变集合——始终使用不可变副本:
    _state.update { it.copy(list = it.list + newItem) }
  • 捕获
    CancellationException
    ——应让其传播以实现正确取消
  • 使用
    flowOn(Dispatchers.Main)
    进行收集——收集调度器由调用方决定
  • @Composable
    中不使用
    remember
    创建Flow——重组时会重新创建Flow

References

参考资料

See skill:
compose-multiplatform-patterns
for UI consumption of Flows. See skill:
android-clean-architecture
for where coroutines fit in layers.
查看技能:
compose-multiplatform-patterns
以了解Flow在UI中的使用方式。 查看技能:
android-clean-architecture
以了解协程在各层级中的定位。