初识 Kotlin Flow
从 Sequence 说起
Sequence 是 Kotlin 提供的 Lazy List 实现。比如下面使用 Sequence 实现的 Fibonacci 数列:
val fibonacci = sequence {
var terms = Pair(0L, 1L)
while (true) {
🏹 yield(terms.first)
terms = Pair(
terms.second,
terms.first + terms.second
)
}
}
终端操作符(terminal operator)
我们在 Sequence builder 块中用一个无限循环,源源不断地计算序列中的元素,并通过 yield
将元素发送给消费者。
Sequence 的消费者就是所谓的终端操作符(terminal operator),比如 forEach
,sum
等。
fibonacci.take(10).forEach { println(it) }
fibonacci.take(10).sum()
以 forEach
为例,每当我们在 sequence
块中 yield
一个新元素,便会将这个元素发送给 forEach
块进行处理,然后再回到 sequence
块中执行后续的逻辑,计算下一个值。
而普通的 List 则会先计算出所有的元素存在内存中。正因为如此,我们可以用 Sequence 代表无限元素的序列。但是在消费数据的时候必须使用 take
中间操作符限制消费的数量。
中间操作符(intermediate operator)
Sequence 和 List 一样有 map
、filter
这些中间操作符。每在 List 上调用一次中间操作符都会返回一个新的 List;而在 Sequence 上调用操作符相当于构建一个数据的管道。一个元素被 yield
出来后,它会穿过这些定义好的数据管道,到达终端消费者。
假设没有终端消费者,那么我们相当于搭好了数据的管道,但是在运行时 Sequence builder 中的代码并不会运行。
下面这个小例子会输出字符串 "AaBbCc"
sequence {
yield("A".also { print(it) })
yield("B".also { print(it) })
yield("C".also { print(it) })
}.forEach {
print(it.toLowerCase())
}
应用案例:找出第一个非 null 属性
class Feedback(
val rate: Int,
val message: String?
)
val feedbacks = listOf(
Feedback(4, null),
Feedback(4, "Cool and clear function"),
Feedback(3, null),
Feedback(1, "Hey, it's too much"),
)
假设我们想要在 feedbacks
这个 List 中找出第一个不为 null 的 Feedback.message
, 即 "Cool and clear function"
。
有下面几种方式:
// 方法1: 生成一个中间 Collection
feedbacks
.mapNotNull { it.name }
.firstOrNull()
// 方法2: 转换成 Sequence
feedbacks
.asSequence()
.mapNotNull { it.name }
.firstOrNull()
// 方法3: Kotlin 1.5 标准库方法
feedbacks.firstNotNullOfOrNull { it.name }
如果集合元素非常多,或者需要经过很多操作符变换,方法 1 每次变换都创建一个新集合存储中间结果,有些浪费。 这样的场景使用 Sequence 会更好一些。
这个例子摘自 Kotlin 1.5.0-RC Released: Changes to the Standard and Test Libraries。
大家觉得 firstNotNullOfNull
这个方法怎么样?
应用案例:扁平化嵌套列表迭代器
假设有一个嵌套的整型列表。请设计一个迭代器,使其能够遍历这个整型列表中的所有整数。整型列表由这个接口表示:
interface NestedInteger {
fun isInteger(): Boolean
fun getInteger(): Int?
fun getList(): List<NestedInteer>?
}
列表中的每一项或者为一个整数,或者是另一个列表。其中列表的元素也可能是整数或是其他列表。 示例:
输入: [[1,1],2,[1,1]]
输出: [1,1,2,1,1]
如果将所有元素存进一个 List,那么可以用递归轻松解决这个问题:
fun flatten(nestedList: List<NestedInteger>): List<Int> {
val ans = mutableListOf()
fun walk(list: List<NestedInteger>) {
for (item in list) {
if (item.isInteger()) ans += item
else walk(item.getList())
}
}
walk(nestedList)
return ans
}
拿到 List 以后可以转成一个迭代器。但是迭代器应该是「懒」的,也就是说消费者一边消费数据,迭代器一边遍历数据源。这样的好处包括:
- 如果消费者只需要前几条数据,而数据源又很大,那么预先计算出整个 List 做了很多无用功。
- 消费者不用等待整个 List 计算完成后才进行下一步处理。通过并发可以加快整个数据处理流程的速度。
但是这样我们就不方便直接递归,需要自己手动维护一个栈,原本简单的代码变得复杂了许多(可以在这里查看完整代码)。
借助 Sequence,我们可以用递归的算法实现懒的迭代器:
// 为了进行递归需要封装出一个函数
suspend tailrec fun SequenceScope<Int>.walk(
list: List<NestedInteger>
) {
for (item in list) {
if (item.isInteger()) yield(item.getInteger())
else walk(item.getList())
}
}
sequence { walk(nestedList) }.iterator() // 题目所求
或许会想到,可以给 walk
函数传一个回调,这样也能实现边遍历边消费的效果。可以理解成 Sequence 就是对这一方式的封装。还记得 suspend 本质上是回调。
Sequence builder 是一个(受限的)suspend 块,而 yield
是 suspend 函数。
Flow:可以 suspend 的 Sequence
上面的例子中数据流里只有纯计算。实际场景中我们可能需要在中间或者终端操作符做一些耗时的操作(假设都封装成了 suspend 函数),比如调接口、从数据库读写数据:
suspend fun search(string: String) {/**/}
suspend fun saveToDB(string: String) {/**/}
// ❌ 无法编译
// 不能在 Sequence 的操作符中调用 suspend 函数
sequenceOf("foo", "bar")
.map { search(it) } // 调用远程 api
.forEach { saveToDB(it) } // 存至数据库
上面这个例子中我们希望依次调用接口搜索 "foo"
和 "bar"
,拿到搜索结果后存入数据库,即按照这个顺序:
- yield
"foo"
- search
"foo"
- saveToDB
"foo"
- yield
"bar"
- search
"bar"
- saveToDB
"bar"
然而,Sequence 的操作符只能传入常规(非 suspend)的块,无法在其中调用 suspend 函数。
于是我们有了 Flow:
suspend fun search(string: String) {/**/}
suspend fun saveToDB(string: String) {/**/}
scope.launch {
flowOf("foo", "bar")
.map { 🏹 search(it) } // 调用远程 api
🏹 .collect { 🏹 saveToDB(it )} // 存至数据库
}
Flow 和 Kotlin 协程的设计一样默认是顺序执行的,没有并发。上面的例子从执行顺序上等同下面这个循环:
for (item in listOf("foo", "bar")) {
val result = 🏹 search(item)
🏹 saveToDB(result)
}
collect
为代表的终端操作符是 suspend 函数。因此,对 Flow 进行消费必须有一个 CoroutineScope。而 Sequence 的终端操作符都只是普通函数。
Flow 让我们可以对数据流中的数据应用 suspend 函数,而所有 suspend 函数必须在某个 CoroutineScope 中执行。这些 suspend 的块默认就运行在 collect 的 CoroutineScope 中。
实际上和 Sequence 一样,Flow 的终端操作符才是驱动整个数据流的“原动力”。如果没有终端操作符,只引用了若干 map、filter 中间操作符,相当于只搭建了一个数据管道,Flow builder 中的代码不会运行,数据不会流动。 这种 Flow 被称作「冷流」。 一种有益的理解是把这种 Flow 的定义类比成函数定义,把终端操作符类比成(suspend)函数调用:
val myFlow: Flow<Int> = flow {/**/}
.map {/**/}
- 只有在
myFlow
上调用终端操作符的时候,flow {}
块以及map {}
中的代码才会运行,数据才会流动。 - 如果在
myFlow
上调用两次collect
,就好比同一个函数调用两次。 myFlow
内部含有 suspend 块,本身也应当看成 suspend 函数,在调用的时候必须提供一个 CoroutineScope。
Flow 借助 Kotlin 语言提供的 suspend 基础设施实现异步,而 RxJava 需要在流中的数据类型体现异步。比如使用 RxJava 做网络请求:
fun callSearchApi(string: String)
: Observable<SeachResult>
Observable.just("foo", "bar")
.flatMap { callSearchApi(it) }
.subscribe { println(it) }
注意到 callSearchApi
的返回结果需要套在一个 Observable
里,同时数据流中的变换需要使用 flatMap
。
而使用协程和 Flow,异步函数返回值不需要戴任何套子,数据流变换可以直接使用 map
,更加自然。
suspend fun callSearchApi(string: String)
: SeachResult
scope.launch {
flowOf("foo", "bar")
.map { callSearchApi(it) }
.collect { println(it) }
}
// 或者使用 `launchIn`,减少嵌套
flowOf("foo", "bar")
.map { callSearchApi(it) }
.onEach { println(it) }
.launchIn(scope)
一个玩具 Flow
Kotlin Flow 的设计和实现十分简洁优雅,我们不妨试着实现一个极简的玩具版本。
Flow
interface 只有一个 collect
方法,接收一个 FlowCollector
的参数。
FlowCollector
是一个典型的代表消费者的 interface(比如 Comparator
, 同样也是使用 <in T>
)。
interface Flow<out T> {
suspend fun collect(collector: FlowCollector<T>)
}
interface FlowCollector<in T> {
suspend fun emit(value: T)
}
这个 collect
方法可以看成联系反应式数据流上下游的纽带。
- 上游数据源,通过
this
访问; - 下游消费者,通过
collector
访问。
实现 collect
class MyFlow: Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
TODO()
}
}
当我们 collect
的时候需要将数据发送给 FlowCollector
进行消费,那么这个数据从哪里来呢?我们可以用 Flow builder 来创建 flow:
flow {
🏹 emit(1)
🏹 emit(2)
🏹 emit(3)
} 🏹.collect {
println(it)
}
咦,注意到 emit
了吗?不是别人,正是 FlowCollector.emit
方法。所以这个 Flow builder 函数接收了一个以 FlowCollector
为 receiver 的块。这个 FlowCollector
从哪儿来?Flow.collect
的时候传进来。
这样形成了一个闭环:我们在 flow
builder 函数的 suspend 块里面调用的 emit
是调用在将来 collect Flow 时候出现的下游消费者身上。这体现了 Flow 「懒」的性质:我们在创建一个 Flow 的时候不会立马计算出数据,而是传入一个 suspend 函数块,等 Flow collect 的时候,我们拿到了消费者 FlowCollector
,并以此为 receiver 调用事先存好的 suspend 块。这一设计尤其巧妙地利用了 Kotlin 带 receiver lambda 的特性。
把上面的分析落实到代码里,就得到了我们的玩具实现:
class MyFlow(
private val builder: suspend FlowCollector<T>.() -> Unit
): Flow<T> {
override suspend fun collect(collector: FlowCollector<T>)
= collector.builder()
}
为了方便调用方能够使用 collect {...}
的形式消费数据,可以定义一个扩展函数:
// 原封不动的协程库实现
public suspend inline fun <T> Flow<T>.collect(
crossinline action: suspend (value: T) -> Unit
): Unit = collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
中间操作符
中间操作符需要 collect 上游的 Flow,对上游数据进行变换,并将变换后的数据发送给一个新的 flow。比如我们可以这样实现 map:
fun <T, R> Flow<T>.map(block: suspend (value: T) -> R) =
MyFlow<R> {
collect { emit(block(it)) }
}
和 Sequence、Iterable 一样,Flow 的中间操作符都是扩展函数。这使得 Flow 的 interface 可以只有一个方法,保持精简,同时也方便用户自定义操作符。我们在调用自定义操作符的时候和标准库自带的操作符没有什么区别,不像 RxJava 那样需要借助 compose
或者 lift
这种额外的 API(参考 Implementing Custom Operators in RxJava)。
「对上游数据进行变换,并返回一个新的 Flow」这个模式非常常见,于是 Kotlin 协程库提供了个 transform
方法来实现 map、filter 等很多操作符。在定义自己的运算符的时候也建议使用。
// Kotlin 协程库内 Flow.map 的实现
public inline fun <T, R> Flow<T>.map(
crossinline transform: suspend (value: T) -> R
): Flow<R> = transform { value ->
return@transform emit(transform(value))
}
完整代码
class MyFlow<T>(
private val builder: suspend FlowCollector<T>.() -> Unit
) : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) = collector.builder()
}
fun <T, R> Flow<T>.map(block: suspend (value: T) -> R) = MyFlow<R> {
collect { emit(block(it)) }
}
fun <T> Flow<T>.filter(block: suspend (value: T) -> Boolean) = MyFlow<T> {
collect { if (block(it)) emit(it) }
}
suspend fun main() = MyFlow<Int> { emit(1); emit(2) }
.map { it * it }
.filter {
🏹 delay(1000)
it % 2 == 0
}
🏹 .collect { println("collected $it") }
可以看到,借助 Kotlin 已有的 suspend 基础设施,实现一个支持异步的反应式数据流不过两三行。实际上,Kotlin 协程库的 Flow 经过抽丝剥茧,核心代码和我们的玩具实现并没有太多区别,但提供了两个额外的保证:上下文保存(Context preservation)和异常透明(Exception transparency)。
关于 suspend 函数,欢迎阅读《理解 Kotlin 的 suspend 函数》。
Flow 的额外保证
上下文保存(Context preservation)
如果 collect 的块运行在主线程,那么我们可以在这个块里更新 UI,这是客户端的常见场景。RxJava 可以用 observeOn
和 subscribeOn
操作符切换线程。但是我们拿到一个 Observable 以后,只看函数签名无法确定消费者会在哪个线程执行,所以一般会手动调一次 observeOn(mainThread)
。
有的时候项目里会在全局的位置(比如 Retrofit 的 call adapter)统一加上了切换至主线程的操作,但是在实际调用的时候会不放心,或者习惯性地来一句 observeOn(mainThread)
,线程切了又切。
Kotlin 协程中在 CoroutineContext 中记录线程的调度信息。Flow 提供了「上下文保存 Context preservation」这一保证:Flow 在运行时确保上游不能改变下游的 context。 换句话说,消费 Flow 的线程取决于调用 collect 的 CoroutineContext。在哪里 collect 就在哪里执行,所见即所得。假设我们从某个 API 拿到了一个 Flow:
fun magicFlow(): Flow<String> = {/****/}
lifecycleScope.launch {
magicFlow().collect {
// 一定在主线程执行
uiBinding.label.text = it
}
}
这个 magicFlow
可能有一部分切换到了某个后台线程,但对于调用方来说这些都是 magicFlow
的内部实现细节,无须关心。
调用方因为要更新 UI,希望在主线程消费数据。androidx 提供的 LifecycleScope
指定了主线程为协程调度器。
我们在 LifecycleScope
开启的协程内调用 suspend collect
方法,可以确定 collect 的块会在主线程执行。
在 Android UI 层,直接在 lifecycleScope
上 launch 协程 collect Flow 有一个隐患:Activity 进入后台后 collect 数据消费不会跟着生命周期取消,可能会导致应用崩溃。
lifecycle:lifecycle-runtime-ktx:2.4.0-alpha01
的 Flow.flowWithLifecycle
API 解决了这个问题。
详情可以参考 A safer way to collect flows from Android UIs。
本文最后也提供了一个更加完善的例子。
这一设计和 Kotlin 协程一脉相承。suspend 函数在哪个线程执行却决于调用 suspend 函数的 CoroutineScope,完全在调用方的控制之内。suspend 函数内部可能会切到其他线程(比如 IO 场景下需要切换线程避免阻塞主线程),但调用方无须关心。
线程切换的细节对 Flow 的调用方来说也几乎是透明的。在 Android 客户端调用暴露 Flow 的 API 更新 UI,直接在主线程 collect
Flow 即可,无须手动切换线程。
回看我们的玩具实现并没有提供这样的保证:
fun magicFlow() = MyFlow<Int> {
withContext(Dispatchers.IO) {
emit(1)
}
}
suspend fun main() {
magicFlow().collect {
// 会调度到 Dispatchers.IO 运行
println(coroutineContext)
}
}
在上面的例子中,我们在切换了协程调度器的块中 emit 数据。回想一下,emit 实际上就是在调用 collect 里的 lambda。这样上游 flow 内部实现“偷偷地”把下游调用方的 CoroutineContext 换掉了,调用方难以一眼就知道眼前消费 flow 的代码会在哪个 CoroutineContext 执行。
所以,Kotlin 协程库的 Flow 实现会检查 emit 和 collect 在同一个协程里执行,否则会直接抛出异常:
Exception in thread "main" java.lang.IllegalStateException:
Flow invariant is violated
注意,Flow 禁止的是在不同的协程 emit 数据,并不是说 Flow 块中不能切换 Context。比如下面的例子是可以的:
suspend fun main() {
val f = flow {
emit(1)
val value = withContext(Dispatchers.IO) { 2 }
emit(value)
}
f.collect {
println(it)
}
}
按照 Kotlin 协程的设计,这样的写法必然可以。我们可以想象把 withContext
块这个表达式抽成一个 suspend 函数,在 Flow builder 中调用。suspend 函数内部切换 Context 的操作对外部调用方透明。
异常透明(Exception transparency)
Flow 另一个保证是异常透明。不过根据笔者目前的观点,不太推荐在使用 Kotlin 协程的时候扔异常。对这一部分感兴趣的话可以查阅官方文档。
Asynchronous Flow - Exception Transparency
Flow 在 Android 客户端的应用
总结一下,相比 RxJava,Flow 的优点包括:
- 依托 Kotlin 的 suspend 基础设施,设计和实现都更加简洁优雅,操作符组合性更强;
- 扩展函数定义操作符,自定义操作符的调用方式和协程库自带 Flow 操作符一致;
- 提供了上下文保存等额外保证,延续协程的设计思想,使得线程切换的细节几乎完全透明。
然而在 Android 客户端,大部分异步的场景使用 Kotlin 协程(suspend 函数)就足够了。 RxJava 在 Android 社区非常流行,主要还是解决线程切换麻烦的问题,而这一场景 Kotlin 协程已经能够非常优雅地解决。
目前,Android Jetpack 越来越多 API 使用 Flow,比如 DataStore、Room、Paging 3 等。建立对 Flow 正确的理解可以更好地使用这些库。
此外,RxAndroid 把常见的 Android 组件封装成数据源,方便我们做函数反应式编程,在一些简单的场景下使用效果比较好, 比如对用户的输入进行防抖(debounce),然后调用异步接口。Flow 可以使用 ReactiveCircus/FlowBinding。
lifecycleScope.launch {
binding.editText.textChanges()
.debounce(300)
// 根据 LifecycleOwner 生命周期自动取消
.flowWithLifecycle(this, Lifecycle.State.STARTED)
.map {
🏹 callSearchApi(it)
}
.collectLatest {
updateUi(it)
}
}
对应的 RxJava 版本:
RxTextView.textChanges(binding.editText)
.debounce(300, TimeUnit.MILLISECONDS)
.switchMap {
callApi()
}
.subscribeOn(io())
.observeOn(mainThread())
.subscribe {
updateUi(it)
}
注意到 Flow 的终端操作符使用了 collectLatest
。由于 Flow 的生产者、消费者都可以 suspend,当消费者 suspend 在处理某个元素 suspend 的时候,
此时如果生产者 emit 了新的数据,*Latest 系列操作符会取消处理前一个元素的协程块。这个行为和 RxJava 使用 switchMap
的逻辑相似。
在客户端场景,似乎大部分情况下都应该使用 collectLatest
。
但是 Flow 不是顺序(Sequential)执行,生产者会等待消费者的吗?这是因为 *Latest 系列操作符内部有一个额外的 Channel, 在下游 suspend 的同时能够监听上游并作出反应。 这样的 Flow 内部始终有个“激活”的部分,和本文介绍的「冷流」不同,是另一篇文章的话题了。
参考资料
- KotlinConf 2019: Asynchronous Data Streams with Kotlin Flow: 听 Kotlin Project Lead, Kotlin 协程和 Flow 的主要设计者 Roman Elizarov 先生介绍 Flow 设计的精妙之处。
- 本文最后介绍的 Android 函数反应式的例子比较简陋。假设我们希望在调接口的时候显示加载状态,单纯地在
onEach
中显示、隐藏组件不是特别完善。 建议参考 Jake Wharton 的 Managing State with RxJava。 - 《「正确」地使用 Kotlin Flow 进行搜索优化》介绍了一个典型的误用 Flow 的案例。我想看了本文应该完全不会犯这样的错误 ;-)
Kotlin 协程系列
Kotlin 协程可以帮助我们用顺序执行的代码实现异步方法的调用,在 async、await 编程模型的基础上有自己独特的设计。