目录
一、数据流基本介绍
基于协程的数据流处理框架
🎯 核心概念:异步数据序列
Flow 是一种可以异步发出多个值的数据流,这使它区别于仅返回单个值的挂起函数。它主要由三个实体构成:
- 提供方:负责生成并添加数据到流中。
- 中介:(可选)负责修改发送到流中的值或流本身。
- 使用方:负责接收和使用流中的值。
🏗️ Flow 的基石:冷流与热流
理解 冷流 和 热流 的区别至关重要:
- 冷流 (如
flow构建器):无状态,每次 当有使用方(调用collect)时,提供方的代码就会重新执行。它不存储数据。 - 热流 (如
StateFlow/SharedFlow):有状态,独立于收集操作而存在,数据会被缓存。多个使用方可以共享同一个热流实例并接收数据,且收集不会触发提供方代码。
主要 API 与用法全景图
1. 创建 Flow:从源头开始
flow { ... }构建器:最常用的方式。在代码块中通过emit(value)发出数据。flowOf(...):基于一组固定的值创建 Flow。.asFlow():将各种集合(如List、Set)或序列转换为 Flow。callbackFlow { ... }:关键工具。用于将基于回调的 API(如 Firebase Listener)转换为 Flow。需在awaitClose { }中清理资源。
2. 修饰 Flow:中间运算符(懒加载)
这些运算符应用于 Flow 后,会返回一个新的 Flow,并定义数据转换的逻辑,但不会立即执行,直到有终端运算符开始收集。
- 转换:
map、transform(可多次emit)。 - 过滤:
filter、take、drop、distinctUntilChanged。 - 组合:
zip:将两个 Flow 严格配对,以较短的 Flow 为准结束。combine:当 Flow 中任意一个有新值时,用各自的最新值进行组合。
- 处理背压与性能:
buffer():为 Flow 添加缓冲区,允许生产者与消费者并行运行,优化整体执行时间。conflate():仅保留最新的值,当消费者处理速度慢时,中间值会被丢弃。collectLatest:当有新值发出时,如果旧值还未处理完,则取消旧值的处理。
- 线程切换:
flowOn(Dispatcher):改变上游(flowOn之前的部分)执行的协程调度器。 - 错误处理:
catch { ... }:捕获上游异常,可emit备用值或重新抛出。 - 副作用:
onEach、onStart、onCompletion:用于监听 Flow 的生命周期事件,不影响数据本身。
3. 使用 Flow:终端运算符(触发执行)
终端运算符是挂起函数,调用后会启动 Flow 并开始接收数据。
collect { ... }:最基础的,逐个处理每个值。- 转换为其他类型:
toList()、toSet()。 - 获取特定值:
first()、single()。 - 规约操作:
reduce()、fold()。 - 判断与计数:
any()、count()。
🔥 状态与事件:StateFlow 和 SharedFlow
它们是热流,非常适合在 ViewModel 中向界面暴露状态或事件。
StateFlow:状态容器
- 特点:总是持有一个最新状态。新收集器会立即收到当前的最新状态。类似于 LiveData,但需要配合
repeatOnLifecycle在界面层安全收集。 - 使用模式:在 ViewModel 中,通常使用私有的
MutableStateFlow作为后备属性,对外暴露不可变的StateFlow。通过更新其value来发送新状态。 - 转换工具:
.stateIn可以将普通 Flow 或 SharedFlow 转换为 StateFlow,需要指定作用域、初始值和启动策略。
SharedFlow:事件总线
- 特点:是 StateFlow 的泛化版本,可高度配置。适合发送一次性事件(如导航、Snackbar 消息)或需要多个订阅者共享的数据流。
- 核心参数:
replay:为新订阅者重放之前发出的 N 个值。extraBufferCapacity:除了重放缓存外的额外缓冲区大小。onBufferOverflow:缓冲区满时的策略(挂起、丢弃最新、丢弃最旧)。
- 转换工具:
.shareIn可以将普通 Flow 转换为 SharedFlow,需要指定作用域、重放数量和启动策略。
📱 与 Android 架构组件集成
- Room:在 DAO 中,让查询函数返回
Flow<T>,即可在数据库变更时自动收到最新数据。 - ViewModel:
- 使用
viewModelScope启动协程来收集数据源或执行操作。 - 使用
StateFlow暴露界面状态,确保状态在配置变更后依然存活。 - 在界面层使用
repeatOnLifecycle(Lifecycle.State.STARTED)来安全地收集 Flow,避免资源浪费和内存泄漏。
- 使用
💡 核心要点与最佳实践
- 冷热流选择:对于每次收集都需要独立计算的场景,用冷流;对于需要共享状态或事件的场景,用热流(
StateFlow/SharedFlow)。 - 异常处理:始终在 Flow 链的适当位置使用
catch运算符来处理异常,并考虑是否要发出一个备用值。 - 线程管理:使用
flowOn将耗时操作(如网络请求、数据库读写)切换到后台调度器,保持界面线程流畅。 - 背压处理:当生产者速度可能快于消费者时,根据业务需求选择
buffer、conflate或collectLatest。 - 资源释放:在自定义的
callbackFlow中,务必在awaitClose { }中清理回调监听器,防止内存泄漏。
二、MutableStateFlow
好的,我们从最基础的 Flow 开始,一步步讲到 StateFlow,这样你就能理解它们之间的关系了。
1.Flow:最基本的数据流
什么是 Flow?
Flow 就像一个可以多次发送数据的“管道”,你可以往里面放数据,别人可以从另一头接收数据。
kotlin
// 创建一个 Flow(管道)
val numberFlow = flow {
emit(1) // 往管道里放 1
emit(2) // 放 2
emit(3) // 放 3
}
// 从管道里接收数据
numberFlow.collect { number ->
println(number) // 会打印 1, 2, 3
}
Flow 的重要特点:冷流
每次调用 collect,都会重新执行一遍 flow {} 里面的代码:
kotlin
val numberFlow = flow {
println("开始执行")
emit(1)
}
// 第一次收集
numberFlow.collect { println("收到: $it") }
// 输出:开始执行 → 收到: 1
// 第二次收集,又会重新执行
numberFlow.collect { println("收到: $it") }
// 输出:开始执行 → 收到: 1 (又一次"开始执行")
简单理解:Flow 就像一次性使用的发电机,每次有人需要电,它就重新发电。
2.问题:能不能有个“永久的”数据流?
上面 Flow 的问题是:每次收集都会重新运行。但有时候我们想要一个持久的、共享的数据流,比如:
- 用户登录状态(一个地方改了,所有地方都知道)
- UI 的状态(ViewModel 改了,多个页面都能收到)
这时候就需要 SharedFlow。
3.SharedFlow:共享的数据流
SharedFlow 的特点:热流
- 不管有没有人收集,它都可以存在
- 多个地方可以同时收集(共享)
- 新来的收集者可以收到之前发送的部分数据(通过 replay 设置)
kotlin
val sharedFlow = MutableSharedFlow<Int>(replay = 2) // 保留最近 2 条数据
// 先发送一些数据
sharedFlow.emit(1)
sharedFlow.emit(2)
sharedFlow.emit(3)
// 后开始的收集者,可以收到最近 2 条(2 和 3)
sharedFlow.collect { value ->
println("收到: $value") // 输出:2, 3
}
类比
- Flow(冷流):点播视频,每个人看都从头播放
- SharedFlow(热流):直播频道,新进来的观众只能看到最近几分钟的内容(replay)
4.StateFlow:特殊的 SharedFlow
StateFlow 是 SharedFlow 的一种特化版本,专门用来表示“状态”。
StateFlow 的特殊之处
| 特点 | 说明 |
|---|---|
| 必须有初始值 | 一开始就要有个默认状态 |
| replay = 1 | 永远只保留最新的一条数据 |
| 可以获取当前值 | 通过 .value 直接拿到当前状态 |
| 自动去重 | 如果新值和旧值相同(equals),不会通知收集者 |
代码示例
kotlin
// 创建一个 StateFlow(初始值是 0)
val stateFlow = MutableStateFlow(0)
// 修改值
stateFlow.value = 1
stateFlow.value = 2
stateFlow.value = 2 // 相同的值,不会触发更新
// 获取当前值
println(stateFlow.value) // 输出 2
// 收集变化
stateFlow.collect { value ->
println("状态变成: $value") // 会打印:状态变成: 0, 1, 2(不重复打印2)
}
为什么要有 StateFlow?
因为很多场景就是需要一个会变化的值,比如:
kotlin
// UI 状态
val isLoading = MutableStateFlow(false)
val userName = MutableStateFlow("")
val errorMessage = MutableStateFlow<String?>(null)
5.Mutable 前缀的含义
- StateFlow:只读,只能观察,不能修改
- MutableStateFlow:可读写,可以修改值
kotlin
// 典型的 ViewModel 写法
class MyViewModel : ViewModel() {
// 私有的,可修改的版本
private val _count = MutableStateFlow(0)
// 公开的,只读版本
val count: StateFlow<Int> = _count
fun increment() {
_count.value++ // 内部修改
}
}
// 外部使用
viewModel.count.collect { value ->
// 只能看到值,不能修改
}
6.关系总结图
text
Flow(冷流)
└── 每次收集都重新执行
└── 独立的数据流
↓
SharedFlow(热流)
└── 共享的数据流
└── 可以设置 replay(新收集者收到多少历史数据)
↓
StateFlow(特殊的 SharedFlow)
└── replay = 1(只保留最新)
└── 必须有初始值
└── 可以 .value 获取当前值
└── 自动去重
↓
MutableStateFlow(可修改的 StateFlow)
└── 可以修改 value
└── 通常私有化,对外暴露 StateFlow
7.什么时候用哪个?
| 需求 | 用什么 |
|---|---|
| 网络请求、数据库查询(一次性操作) | 普通 Flow |
| 事件(点击、弹窗、导航) | SharedFlow |
| 状态(加载中、用户信息、开关状态) | StateFlow |
8.一个完整的例子
kotlin
class UserViewModel : ViewModel() {
// 状态用 StateFlow
private val _userName = MutableStateFlow("未登录")
val userName: StateFlow<String> = _userName
// 事件用 SharedFlow
private val _toastEvent = MutableSharedFlow<String>()
val toastEvent: SharedFlow<String> = _toastEvent
fun login(name: String) {
_userName.value = name // 更新状态
viewModelScope.launch {
_toastEvent.emit("欢迎 $name") // 发送一次性事件
}
}
}
三、repeatOnLifecycle 或 shareIn
🆚 核心区别与联系
通过下表可以更清晰地理解两者的定位:
| 对比维度 | repeatOnLifecycle |
shareIn |
|---|---|---|
| 主要角色 | 生命周期安全的收集器 | 数据源共享转换器 |
| 使用位置 | Activity / Fragment (界面层) |
ViewModel / 数据层 |
| 核心目的 | 根据生命周期自动开始/停止收集,节省资源,避免崩溃。 | 将冷流变为热流,共享数据源,避免重复计算。 |
| 操作对象 | 任何 Flow |
通常是冷流 |
| 产出结果 | 无,它是收集行为本身 | 一个可以被多个收集者共享的热流 (SharedFlow) |
| 关键参数 | Lifecycle.State (如 STARTED) |
started (启动策略, 如 WhileSubscribed) 和 replay |
💡 协同工作模式
它们俩经常在架构中协同工作:
- 在
ViewModel中,您使用shareIn或stateIn将一个底层的冷流转换为热流(如StateFlow),并暴露给界面层。 - 在
Fragment中,您使用repeatOnLifecycle来安全地收集这个暴露出来的热流,确保界面在可见时才更新,不可见时自动停止收集。
这种模式既保证了数据源的共享和效率,又保证了界面层收集的安全性,是官方推荐的架构实践。
0 条评论