一、数据流基本介绍

基于协程的数据流处理框架

🎯 核心概念:异步数据序列

Flow 是一种可以异步发出多个值的数据流,这使它区别于仅返回单个值的挂起函数。它主要由三个实体构成:

  • 提供方:负责生成并添加数据到流中。
  • 中介:(可选)负责修改发送到流中的值或流本身。
  • 使用方:负责接收和使用流中的值。

🏗️ Flow 的基石:冷流与热流

理解 冷流热流 的区别至关重要:

  • 冷流 (如 flow 构建器)无状态每次 当有使用方(调用 collect)时,提供方的代码就会重新执行。它不存储数据。
  • 热流 (如 StateFlow/SharedFlow)有状态,独立于收集操作而存在,数据会被缓存。多个使用方可以共享同一个热流实例并接收数据,且收集不会触发提供方代码。

主要 API 与用法全景图

1. 创建 Flow:从源头开始

  • flow { ... } 构建器:最常用的方式。在代码块中通过 emit(value) 发出数据。
  • flowOf(...):基于一组固定的值创建 Flow。
  • .asFlow():将各种集合(如 ListSet)或序列转换为 Flow。
  • callbackFlow { ... }关键工具。用于将基于回调的 API(如 Firebase Listener)转换为 Flow。需在 awaitClose { } 中清理资源。

2. 修饰 Flow:中间运算符(懒加载)

这些运算符应用于 Flow 后,会返回一个新的 Flow,并定义数据转换的逻辑,但不会立即执行,直到有终端运算符开始收集。

  • 转换maptransform(可多次 emit)。
  • 过滤filtertakedropdistinctUntilChanged
  • 组合
    • zip:将两个 Flow 严格配对,以较短的 Flow 为准结束。
    • combine:当 Flow 中任意一个有新值时,用各自的最新值进行组合。
  • 处理背压与性能
    • buffer():为 Flow 添加缓冲区,允许生产者与消费者并行运行,优化整体执行时间。
    • conflate():仅保留最新的值,当消费者处理速度慢时,中间值会被丢弃。
    • collectLatest:当有新值发出时,如果旧值还未处理完,则取消旧值的处理。
  • 线程切换flowOn(Dispatcher):改变上游flowOn之前的部分)执行的协程调度器。
  • 错误处理catch { ... }:捕获上游异常,可 emit 备用值或重新抛出。
  • 副作用onEachonStartonCompletion:用于监听 Flow 的生命周期事件,不影响数据本身。

3. 使用 Flow:终端运算符(触发执行)

终端运算符是挂起函数,调用后会启动 Flow 并开始接收数据。

  • collect { ... }:最基础的,逐个处理每个值。
  • 转换为其他类型toList()toSet()
  • 获取特定值first()single()
  • 规约操作reduce()fold()
  • 判断与计数any()count()

🔥 状态与事件:StateFlow 和 SharedFlow

它们是热流,非常适合在 ViewModel 中向界面暴露状态或事件。

StateFlow:状态容器

  • 特点:总是持有一个最新状态。新收集器会立即收到当前的最新状态。类似于 LiveData,但需要配合 repeatOnLifecycle 在界面层安全收集。
  • 使用模式:在 ViewModel 中,通常使用私有的 MutableStateFlow 作为后备属性,对外暴露不可变的 StateFlow。通过更新其 value 来发送新状态。
  • 转换工具.stateIn 可以将普通 Flow 或 SharedFlow 转换为 StateFlow,需要指定作用域、初始值和启动策略。

SharedFlow:事件总线

  • 特点:是 StateFlow 的泛化版本,可高度配置。适合发送一次性事件(如导航、Snackbar 消息)或需要多个订阅者共享的数据流。
  • 核心参数
    • replay:为新订阅者重放之前发出的 N 个值。
    • extraBufferCapacity:除了重放缓存外的额外缓冲区大小。
    • onBufferOverflow:缓冲区满时的策略(挂起、丢弃最新、丢弃最旧)。
  • 转换工具.shareIn 可以将普通 Flow 转换为 SharedFlow,需要指定作用域、重放数量和启动策略。

📱 与 Android 架构组件集成

  • Room:在 DAO 中,让查询函数返回 Flow<T>,即可在数据库变更时自动收到最新数据。
  • ViewModel
    • 使用 viewModelScope 启动协程来收集数据源或执行操作。
    • 使用 StateFlow 暴露界面状态,确保状态在配置变更后依然存活。
    • 在界面层使用 repeatOnLifecycle(Lifecycle.State.STARTED) 来安全地收集 Flow,避免资源浪费和内存泄漏。

💡 核心要点与最佳实践

  1. 冷热流选择:对于每次收集都需要独立计算的场景,用冷流;对于需要共享状态或事件的场景,用热流StateFlow/SharedFlow)。
  2. 异常处理:始终在 Flow 链的适当位置使用 catch 运算符来处理异常,并考虑是否要发出一个备用值。
  3. 线程管理:使用 flowOn 将耗时操作(如网络请求、数据库读写)切换到后台调度器,保持界面线程流畅。
  4. 背压处理:当生产者速度可能快于消费者时,根据业务需求选择 bufferconflatecollectLatest
  5. 资源释放:在自定义的 callbackFlow 中,务必在 awaitClose { } 中清理回调监听器,防止内存泄漏。

二、MutableStateFlow

好的,我们从最基础的 Flow 开始,一步步讲到 StateFlow,这样你就能理解它们之间的关系了。

1.Flow:最基本的数据流

什么是 Flow?

Flow 就像一个可以多次发送数据的“管道”,你可以往里面放数据,别人可以从另一头接收数据。

kotlin

// 创建一个 Flow(管道)
val numberFlow = flow {
    emit(1)  // 往管道里放 1
    emit(2)  // 放 2
    emit(3)  // 放 3
}

// 从管道里接收数据
numberFlow.collect { number ->
    println(number)  // 会打印 1, 2, 3
}

Flow 的重要特点:冷流

每次调用 collect,都会重新执行一遍 flow {} 里面的代码:

kotlin

val numberFlow = flow {
    println("开始执行")
    emit(1)
}

// 第一次收集
numberFlow.collect { println("收到: $it") }  
// 输出:开始执行 → 收到: 1

// 第二次收集,又会重新执行
numberFlow.collect { println("收到: $it") }  
// 输出:开始执行 → 收到: 1  (又一次"开始执行")

简单理解:Flow 就像一次性使用的发电机,每次有人需要电,它就重新发电。


2.问题:能不能有个“永久的”数据流?

上面 Flow 的问题是:每次收集都会重新运行。但有时候我们想要一个持久的、共享的数据流,比如:

  • 用户登录状态(一个地方改了,所有地方都知道)
  • UI 的状态(ViewModel 改了,多个页面都能收到)

这时候就需要 SharedFlow

3.SharedFlow:共享的数据流

SharedFlow 的特点:热流

  • 不管有没有人收集,它都可以存在
  • 多个地方可以同时收集(共享)
  • 新来的收集者可以收到之前发送的部分数据(通过 replay 设置)

kotlin

val sharedFlow = MutableSharedFlow<Int>(replay = 2)  // 保留最近 2 条数据

// 先发送一些数据
sharedFlow.emit(1)
sharedFlow.emit(2)
sharedFlow.emit(3)

// 后开始的收集者,可以收到最近 2 条(2 和 3)
sharedFlow.collect { value ->
    println("收到: $value")  // 输出:2, 3
}

类比

  • Flow(冷流):点播视频,每个人看都从头播放
  • SharedFlow(热流):直播频道,新进来的观众只能看到最近几分钟的内容(replay)

4.StateFlow:特殊的 SharedFlow

StateFlowSharedFlow 的一种特化版本,专门用来表示“状态”。

StateFlow 的特殊之处

特点 说明
必须有初始值 一开始就要有个默认状态
replay = 1 永远只保留最新的一条数据
可以获取当前值 通过 .value 直接拿到当前状态
自动去重 如果新值和旧值相同(equals),不会通知收集者

代码示例

kotlin

// 创建一个 StateFlow(初始值是 0)
val stateFlow = MutableStateFlow(0)

// 修改值
stateFlow.value = 1
stateFlow.value = 2
stateFlow.value = 2  // 相同的值,不会触发更新

// 获取当前值
println(stateFlow.value)  // 输出 2

// 收集变化
stateFlow.collect { value ->
    println("状态变成: $value")  // 会打印:状态变成: 0, 1, 2(不重复打印2)
}

为什么要有 StateFlow?

因为很多场景就是需要一个会变化的值,比如:

kotlin

// UI 状态
val isLoading = MutableStateFlow(false)
val userName = MutableStateFlow("")
val errorMessage = MutableStateFlow<String?>(null)

5.Mutable 前缀的含义

  • StateFlow:只读,只能观察,不能修改
  • MutableStateFlow:可读写,可以修改值

kotlin

// 典型的 ViewModel 写法
class MyViewModel : ViewModel() {
    // 私有的,可修改的版本
    private val _count = MutableStateFlow(0)

    // 公开的,只读版本
    val count: StateFlow<Int> = _count

    fun increment() {
        _count.value++  // 内部修改
    }
}

// 外部使用
viewModel.count.collect { value ->
    // 只能看到值,不能修改
}

6.关系总结图

text

Flow(冷流)
  └── 每次收集都重新执行
  └── 独立的数据流
        ↓
SharedFlow(热流)
  └── 共享的数据流
  └── 可以设置 replay(新收集者收到多少历史数据)
        ↓
StateFlow(特殊的 SharedFlow)
  └── replay = 1(只保留最新)
  └── 必须有初始值
  └── 可以 .value 获取当前值
  └── 自动去重
        ↓
MutableStateFlow(可修改的 StateFlow)
  └── 可以修改 value
  └── 通常私有化,对外暴露 StateFlow

7.什么时候用哪个?

需求 用什么
网络请求、数据库查询(一次性操作) 普通 Flow
事件(点击、弹窗、导航) SharedFlow
状态(加载中、用户信息、开关状态) StateFlow

8.一个完整的例子

kotlin

class UserViewModel : ViewModel() {
    // 状态用 StateFlow
    private val _userName = MutableStateFlow("未登录")
    val userName: StateFlow<String> = _userName

    // 事件用 SharedFlow
    private val _toastEvent = MutableSharedFlow<String>()
    val toastEvent: SharedFlow<String> = _toastEvent

    fun login(name: String) {
        _userName.value = name  // 更新状态
        viewModelScope.launch {
            _toastEvent.emit("欢迎 $name")  // 发送一次性事件
        }
    }
}

三、repeatOnLifecycle 或 shareIn

🆚 核心区别与联系

通过下表可以更清晰地理解两者的定位:

对比维度 repeatOnLifecycle shareIn
主要角色 生命周期安全的收集器 数据源共享转换器
使用位置 Activity / Fragment (界面层) ViewModel / 数据层
核心目的 根据生命周期自动开始/停止收集,节省资源,避免崩溃。 将冷流变为热流,共享数据源,避免重复计算。
操作对象 任何 Flow 通常是冷流
产出结果 无,它是收集行为本身 一个可以被多个收集者共享的热流 (SharedFlow)
关键参数 Lifecycle.State (如 STARTED) started (启动策略, 如 WhileSubscribed) 和 replay

💡 协同工作模式

它们俩经常在架构中协同工作

  1. ViewModel,您使用 shareInstateIn 将一个底层的冷流转换为热流(如 StateFlow),并暴露给界面层。
  2. Fragment,您使用 repeatOnLifecycle 来安全地收集这个暴露出来的热流,确保界面在可见时才更新,不可见时自动停止收集。

这种模式既保证了数据源的共享和效率,又保证了界面层收集的安全性,是官方推荐的架构实践。


0 条评论

发表回复

您的电子邮箱地址不会被公开。