i@yujinyan.me

Blog

谈谈 Kotlin 协程的 Context 和 Scope

引子

开启 Kotlin 协程需要在 CoroutineScope 上调用 launch 或者 async 这些方法。这些定义在 CoroutineScope 上的扩展方法叫作 coroutine builder。

lifecycleScope.launch(Dispatchers.IO) {
  delay(1000)
}

除了 suspend 块之外,还可以给 coroutine builder 传入额外的参数。比如上面这个 Dispatchers.IO 指定 launch 的协程块调度到 IO 线程池去执行。launch 的签名:

public fun CoroutineScope.launch(
  context: CoroutineContext = EmptyCoroutineContext, 
  // 忽略了与本文无关的 CoroutineStart 参数
  block: suspend CoroutineScope.() -> Unit
): Job

可以看到开启一个 Kotlin 协程至少涉及到三个概念:CoroutineScopeCoroutineContextJob。其中 Scope (作用域)和 Context(上下文)直接照着字面意思理解似乎含义非常相近。另外协程的 Scope 里只包含一个属性即 CoroutineContext

public interface CoroutineScope {
  public val coroutineContext: CoroutineContext
}

Kotlin 的协程为何需要一个 Scope 的概念?能不能去掉 Scope 只留下 Context, 从而降低 API 的复杂度?似乎大家都曾有这样的困惑(比如这里的讨论)。Kotlin 协程的主要设计者 Roman Elizarov 也有专文介绍。本文将记录笔者的理解。

Context 用于配置协程的属性

Context 是专门定制的数据结构

Context 像一个集合(Set):这个集合由不同类型的 Element 组成。可以通过运算符重载的 add 添加元素,如果添加已经存在的类型的元素则会覆盖。

CoroutineName("foo") + CoroutineName("bar")
  == CoroutineName("bar") // true

将两个 Context 「+」在一起以后返回的类型是 CombinedContext。由于这个集合本身和里面的元素 CoroutineContext.Element 都是 CoroutineContext,我们在调用 launch 这种接收 Context 的函数的时候既可以传单个元素,也可以传组合在一起的 Context,而不需要额外在外面加一个 listOf 这样的套子,或者使用 vararg,十分简洁优雅。

context hierarchy

重要 Context 的继承/实现关系

Context 是不可变(immutable)的。对 Context 进行添加或者删除元素的操作都会返回新的 Context 对象。这一性质是协程并发场景下的需要。

Context 又像一个字典(Map):每一种类型的 Element 都有对应的 CoroutineContext.Key,可以通过这个 Key 类型安全地获取到相应类型的 Element:

fun main() {
  (CoroutineName("Coco") + Dispatchers.IO).also { it: CoroutineContext ->
    println(it[CoroutineName] == CoroutineName("Coco")) // true
    println(it[ContinuationInterceptor] == Dispatchers.IO) // true
  }
}

示例代码中用来获取元素的 CoroutineName 其实是 CoroutineName 这个类的伴生对象(companion object)。相比使用 CoroutineName::class 作为字典的 Key 也是更加简洁优雅的。

public data class CoroutineName(
  val name: String
) : AbstractCoroutineContextElement(CoroutineName) {
  public companion object Key : CoroutineContext.Key<CoroutineName> 
  override fun toString(): String = "CoroutineName($name)"
}

Context 集合和字典的性质确保了 CombinedContext 这个集合里每一种类型 Element 的唯一性

虽然 Context 用起来像字典和集合,但其实现却是链表

CombinedContext

CombinedContext 数据结构示意

由于 Context 中每种类型的 Element 是唯一的,而 Element 类型定义在 Kotlin 协程库(kotlinx.coroutines)内部,其数量是固定的,所以对链表操作的时间复杂度是有上界的。 使用自定义的链表来实现 Context 相比使用现成的数据结构可以避免一些额外的开销,对于框架实现来说是非常合理的。

在协程调用链任意位置获取 Context

Context 一般用来存储某些具有全局性质的状态。比如,React.js 通过声明式的 API 描述组件树的形状。有的时候跨组件层层传递一些数据会比较麻烦。 如果这个数据具有全局性质(比如页面的主题),借助 React 的 Context API ,我们无须明确地传遍每一个组件,就能将值深入传递进组件树。

// 为当前的 theme 创建一个 context(“light”为默认值)。
const ThemeContext = React.createContext('light'); 
class App extends React.Component {
  render() {
    // 使用一个 Provider 来将当前的 theme 传递给以下的组件树。
    // 无论多深,任何组件都能读取这个值。
    // 在这个例子中,我们将 “dark” 作为当前的值传递下去。
    return (
      <ThemeContext.Provider value="dark">
        <Toolbar />
      </ThemeContext.Provider>
    );
  }
}

// 中间的组件再也不必指明往下传递 theme 了。
function Toolbar() {
  return (
    <div><ThemedButton /></div>
  );
}

class ThemedButton extends React.Component {
  // 指定 contextType 读取当前的 theme context。
  // React 会往上找到最近的 theme Provider,然后使用它的值。
  // 在这个例子中,当前的 theme 值为 “dark”。
  static contextType = ThemeContext;
  render() {
    return <Button theme={this.context} />;
  }
}

一段可以作为整体执行的代码块可以叫作「子程序 routine」,比如函数、方法、lambda、条件块、循环块等。 Kotlin 协程(coroutine)就是一段可以 suspend 的代码块。我们出于抽象复用的目的,将一部分含有异步的代码抽离出来封装成 suspend 函数。

函数调用也类似 UI 组件,可以看作一个树状的结构。在 Kotlin 的 suspend 函数中,我们可以在调用链的任意层级获取 Context(Context propagation):

fun main() = runBlocking {
  // 在 Context 中添加 CoroutineName[Coco] 元素
  launch(CoroutineName("Coco")) { 
    foo()
  }
  Unit
}

// 调用链:foo->bar->baz
suspend fun foo() = bar()
suspend fun bar() = baz()
suspend fun baz() {
  // 在调用链中获取 Context 中的元素
  println(coroutineContext[CoroutineName]) 
}

这个 coroutineContext 是 Kotlin 在编译期添加的,可以看成编译器将调用方的 Context 隐式地传给了调用的 suspend 函数。 在「理解 Kotlin 的 suspend 函数」一文中, 我们介绍了 suspend 的本质是 Continuation,而 Continuation 中除了对应回调的 resumeWith 方法之外,剩下另外一个属性就是 CoroutineContext

public interface Continuation<in T> {
  // 每个 suspend 的代码块都有一个 Context
  public val context: CoroutineContext 

  public fun resumeWith(result: Result<T>)
}

suspend 函数中的 coroutineContext 在没有通过withContext 更新 Context 的情况下,和调用方的 Context 是相同的。一种有益的理解是可以想象把调用的 suspend 函数内联(inline) 到这个 suspend 块里面,程序的行为不会发生变化。下面这个例子检查了调用方、suspend 函数内部和 Continuation 的 Context 都是相同的。

suspend fun main() {
  println(checkCallerContext(coroutineContext)) // true
  println(checkContinuationContext()) // true
}

suspend fun checkCallerContext(callerContext: CoroutineContext): Boolean =
  // 不更新 Context 的情况下和调用方的 Context 相同
  callerContext === coroutineContext

suspend fun checkContinuationContext(): Boolean {

  // suspendCoroutine 是连接 suspend 和回调的桥梁。
  // 传给它的 lambda 属于桥回调的那一边,不是 suspend 的 block,
  // 所以没有 coroutineContext。因此我们在桥的 suspend 这一边的时候
  // 保存一下这个 suspend 的 Context
  val currentContext = coroutineContext

  // 通过 suspendCoroutine 获取当前 Continuation
  return suspendCoroutine { cont ->
    val contContext = cont.context

    // 两个 Context 是相同的
    val isTheSame = contContext === currentContext
    cont.resume(isTheSame)
  }
}

那么 Kotlin 提供的 Context 机制仅仅是为了方便地传一些全局状态吗?

一个核心 Element:ContinuationInterceptor

我们知道 Context 是为了协程服务的。所谓协程就是编程语言在运行时「协作式 」地将子程序调度到线程上执行。

ContinuationInterceptor 这个 Element 为协程的调度提供了基础设施。我们熟悉的用于指定执行协程的线程的 Dispatchers.IOCoroutineDispatcher 就是 ContinuationInterceptor

object CommonPoolContext :
  AbstractCoroutineContextElement(ContinuationInterceptor),
  ContinuationInterceptor {

  val pool: ForkJoinPool = ForkJoinPool() 

  override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
    object : Continuation<T> {
      override val context: CoroutineContext = continuation.context

      override fun resumeWith(result: Result<T>) {
        pool.submit { continuation.resumeWith(result) } 
      }
    }
}

CoroutineScope 与「结构化并发」

2018 年 9 月 12 日,协程库 kotlinx.coroutines 发布了 0.26.0 版本,是一个重要里程碑。在这之前 coroutine builders 是全局的顶层函数,并不需要 CoroutineScope 就能开启协程,比如下面这个例子中的 async

// ⚠️ 使用了已废弃、过时的 API
suspend fun loadAndCombineImage(name1: String, name2: String): Image {
  val image1 = async { loadImage(name1) } 
  val image2 = async { loadImage(name2) } 
  return combineImages(image1.await(), image2.await())
}

全局顶层 coroutine builder 的麻烦

这样的设计有什么问题呢?我们可以看几个简单的例子。

🚨

作为顶层函数的 coroutine builders 已被废弃,在目前的 API 中相当于通过 GlobalScope 开启协程。本文使用 GlobalScope 来模拟全局顶层的 coroutine builders。

例:将一个文件流传给一个 process 函数进行处理

fun process(stream: InputStream): Unit { /****/ }

File("foo.txt").inputStream().use {
  process(it)
}

结果运行的时候抛出了异常 java.io.IOException :Stream closed。我们打开函数 process 一看:

fun process(stream: InputStream) {
  GlobalScope.launch { 
    delay(1000)
    stream.reader().readText()
  }
}

原来,Kotlin 标准库中的 use 会在接受的 lambda 执行完毕后关闭文件流(封装了 finally)。由于 process 函数在返回之后开启的异步任务还在执行,但文件却已关闭,于是抛出了异常。


例 :假设我们调用一个 writeData 函数往存储里写一些数据,这个函数用 launch(Dispatchers.IO) 开启了一个调度到 IO 线程执行的协程:

fun main() {
  writeData()
  // 🤔 数据写完了吗?可以读这个数据了吗?
}

fun writeData() {
  GlobalScope.launch(Dispatchers.IO) { 
    // doing some work
    // before writing data
  }
}

writeData 返回以后,数据写完了吗?我们无法确定。writeData 内部 launch 的协程甚至可以抛出异常,但作为调用方我们无法捕获这个异常(无法通过在 writeData 外面 try catch 捕获 writeData 开启的协程内抛出的异常)。


例:Android 的 Activity

class MyActivity: Activity {
  val binding = MyActivityBinding.inflate(layoutInflater)

  override fun onCreate(savedInstanceState: Bundle?) {
    GlobalScope.launch { 
      val result = 🏹 someNetworkRequest()
      binding.resultView.text = result
    }
  }
}

假设 someNetworkRequest 由于网络问题变得很缓慢,用户可能等得不耐烦,直接关闭了这个页面。由于 launch 的协程块引用了 Activity 的属性,这个协程会连带整个 Activity 一起泄漏。

分析上面这几个例子可以发现,问题出在我们开启了协程以后就弃之不理了,没有及时取消,或者没有 join 去等待协程的结果。实际上,不仅仅 Kotlin 的 GlobalScope,几乎所有的异步 API(thread、promise、callback, goroutine 等)都允许我们不加生命周期限定地开启异步任务。当开启异步任务的函数结束返回之后,这个异步任务可能尚未完成,继续在后台执行。调用方无法知道这个异步任务何时结束,有没有抛出异常。

大家可能有这样的经验:使用某些 API 的时候不得不手动延时几秒钟再执行后面的逻辑,不然会产生奇怪的问题。或许这个 API 里面忘记 join 某个线程了。

结构化并发 Structured Concurrency

这么看来异步 API 把 join 等待异步任务完成设计成默认行为似乎是更好的选择——这就是「结构化并发 Structured Concurrency」的核心思想。

Python 一个异步并发库 Trio 的作者 Nathaniel J. Smith 在 2018 年发布了一篇博文 Notes on structured concurrency, or: Go statement considered harmful,详尽地阐述了 Structured Concurrency,值得一读。Go 语言的 go 关键字类似 Kotlin 协程的 GlobalScope.launch 。文中认为,以 go 关键字为典型的现有异步 API 就好比半个世纪前 Dijkstra 反对的 goto 语句。

Dijkstra 在他著名的 Go To Statement Considered Harmful (1968) 一文中指出:人们更擅长把握事物的静态关系,而当程序运行起来以后,进程的状态流转是一个非常动态的过程。因此,人们很难在头脑中勾绘程序在运行时状态变化的完整图景。编程语言的设计应当尽可能缩短代码文本和运行时程序之间的差异,使得程序员看着某行代码就能推断程序运行的状态。

而那时非常流行的 goto 语句可以使进程跳到对应代码文本的任意位置。这样我们只能从头开始在头脑中模拟执行一遍程序的执行,很难在代码局部位置推断程序运行时的状态,难以保证程序的正确性。

Dijkstra 认为高级语言应当摒弃 goto 语句,提倡「结构化编程 Structured Programming」——即程序员使用条件、循环、函数块等结构块进行组合表达程序逻辑。

Structured Programming

黑盒性质:控制流流入 → [黑盒] → 控制流流出,复刻自 njs blog.

可以看到,程序经过这些控制结构的时候总是从上到下(sequential)的:一个入口,一个出口。不同控制结构中间部分像一个「黑盒」。我们在阅读到这一块代码的时候可以确定这个块里有一些逻辑,这些逻辑完成以后,控制流最终会从一个出口出来,进入下一行代码。而一旦编程语言支持 goto 语句,这种封装就被破坏。

在结构化并发中,所有异步任务都会被约束在一个作用域里面,这个作用域类似于结构化编程中的条件、循环、函数控制体,虽然可能有多个任务并发执行,但最终都会从一个出口出来,符合「黑盒」的性质。假设程序员读到图示虚线的位置,他可以确定,如果代码走到这里,上面并发的三个任务一定成功完成了。

Structured Concurrency

结构化并发同样满足「黑盒」性质,复刻自 njs blog.

越来越多语言正在吸收结构化并发的思想,例如 Java 的 Project LoomSwift 的协程

Job 与取消

在讨论 Kotlin 如何实现结构化并发之前,我们先来看一下协程的取消(Cancellation)。

首先从上面 Android Activity 的例子可以看到,如果用户离开界面,出于及时回收系统资源的考虑,协程应该需要支持取消。 同样在服务端,如果连接断开或者某个关键异步任务失败,其他异步任务也应该及时停掉以避免不必要的资源浪费。

Kotlin 的协程、 Java 的线程和 Goroutine 都是协作式(cooperative)的,意味着要真正支持取消,协程需要在任务的间隙主动去检查当前的 Job 是不是处于 Active 的状态。 这是因为如果子程序可以被突兀地中止,很有可能事情做到一半,损坏数据结构或文件资源等。

Go 语言通过 channel 实现取消协程。下面这例子将一个名为 done 的 channel 传递给调用链中所有含有异步任务的函数。 调用方通过关闭这个 channel 的方式「通知」所有开启的协程结束正在进行的任务。我们不会给这个 channel 发送数据,只是把关闭 channel 产生的副作用作为「广播」的方式。

func main() {
  // 创建 channel
  done := make(chan struct{}) 

  // 开启协程
  go work(done)
  go work(done)

  time.Sleep(5000 * time.Millisecond)
  close(done) 
}

func work(done chan struct{}) {
  for {
    // 在任务执行「间隙」检查 channel 是否被关闭
    if cancelled(done) { 
      return
    }

    time.Sleep(1000 * time.Millisecond)
    println("working...")
  }
}

func cancelled(done chan struct{}) bool {
  select {
  // channel 关闭之后 receive 会立即返回零值
  case <-done: 
    return true 
  default:
    return false
}

基于这种取消协程的方式,Go 标准库提供了一个 Context ,其中的 Done 方法返回了一个这样的 channel。如果使用 Context 的话,所有调用链上的函数都需要显式地传入这个 Context 对象,并在每个函数内部监听这个 Done channel。

type Context interface {
  // Done returns a channel
  // that is closed when this Context is canceled
  // or times out.
  Done() <-chan struct{} 
  // ...
}

如果习惯使用 ThreadLocal,可能会觉得这种显式传值比较麻烦(比如这篇介绍 Go 上下文 Context 文章底下的评论)。

Kotlin 协程中,我们通过 CoroutineContext 中隐式传递的 Job 对象取消协程。 Coroutine builder launch 的返回值是一个代表协程的 Job 对象,可以调用 .cancel 取消协程,.join 等待协程完成。 由于 Job 是一个 CoroutineContext.Element,可以在 suspend 函数调用链的任意位置通过 coroutineContext 属性获取当前协程对应的 Job。

suspend fun main() =
  GlobalScope.launch {
    foo()
  }.join()

suspend fun foo() =
  bar()

suspend fun bar() {
  // true
  println(coroutineContext.isActive) 
  delay(1000)
}

// CoroutineContext.isActive is just a shortcut
public val CoroutineContext.isActive: Boolean
    get() = this[Job]?.isActive == true 

如何让我们写的 Kotlin 协程代码支持取消? 在 coroutine builder 开启的协程块内部可以用 Job.isActive 判断当前协程是否被取消。 如果已取消则可以直接返回或者抛出 CancellationException。 这个异常在协程库中不同于别的异常,有特殊的意义,是一个专门用作取消协程的标记,被抛出以后调用栈回退到 launch 的协程,整个协程正常结束,异常不会继续传播。

suspend fun main() {
  val job = GlobalScope.launch {
    for (i in 0..50) {
      // 除了 return 之外还可以抛 `CancellationException`
      // 协程库提供的 `ensureActive` 封装了这一方法
      // 另外也可以使用 `yield`
      if (!isActive) return@launch 

      // 下面这些写法也可以
      // if (!isActive) throw CancellationException()
      // ensureActive()
      // yield()

      println(fibonacci(i))
    }
  }
  delay(100)
  // 取消 job 并等待,避免 jvm 直接退出
  job.cancelAndJoin()
}

// ⚠️ deliberately slow fibonacci
fun fibonacci(n: Int): Int = if (n <= 1)
  n else fibonacci(n - 1) + fibonacci(n - 2)

在封装出的 suspend 函数内部支持取消, return 是不行的,必须抛 CancellationException。 因为 return 以后,控制流正常退回上层函数,可能会继续执行后面的同步语句。当协程被取消后,整个调用链应该立即回退。 而 launch 的协程块不同于 suspend 函数内部,是协程调用树的根节点,因此可以直接 return 结束协程。

如果我们调用的 suspend 函数支持取消,意味着这个 suspend 函数会检查当前协程是否是取消的状态并抛出 CancellationException。 所有 kotlinx.coroutines 中的 suspend 函数都支持取消。我们调用支持取消的 suspend 函数,也就自动支持了取消,很少需要做专门处理。

假设我们把上面这个例子中输出 fibonacci 数的代码封装成 suspend 函数,在这个函数内部可以使用 yield 方法来确保只有协程在 Active 的状态才会继续计算:

suspend fun main() {
  val job = GlobalScope.launch {
    printFibonacciSlowly(50)
  }
  delay(100)
  job.cancelAndJoin()
}

suspend fun printFibonacciSlowly(n: Int) {
  for (i in 0..n) {
    🏹 yield() 
    println(fibonacci(i))
  }
}

将回调 API 封装成 suspend 函数的时候,可以使用 suspendCancellableCoroutine 来支持取消操作,具体可以参考理解 Kotlin 的 suspend 函数中的例子。

Kotlin 这种隐式传 context 与 Go 显式传 context 可以说各有利弊。显式传递的方式读起来更加清晰,但是所有函数都需要手动监听 channel,会造成一些 boilerplate。Kotlin 利用集成在语言中的 CoroutineContext,代码更加清爽;同时,我们可以在协程的调用树中利用协程库中提供的 yieldsuspendCancellableCoroutine 等函数,在异步任务的「间隙」中自动插入对协程状态的检查,并通过异常机制回退整个协程的调用栈,实现取消协程更加方便,但是可能有一定学习成本。

Job 与协程父子关系

在协程的调用树中,除了调用 suspend 函数之外还有可能开启新的协程。根据结构化并发的思想,父协程必须等待所有子协程结束以后才能结束,因此在创建新的协程 Job 时必须以某种形式和父协程建立关联。

Kotlin 协程在 0.26.0 之前曾推荐这样的写法:

suspend fun sayHelloWorldInContext() {
  GlobalScope.launch(coroutineContext) { 
    delay(500)
    print("Hello ")
  }
  GlobalScope.launch(coroutineContext) { 
    delay(1000)
    print("World!")
  }
}

上面的例子将 suspend 函数中编译器添加的 coroutineContext 传入 launch,这样新开启的协程将运行在外部执行这个 suspend 函数的协程 Job 中。 如果外部的 Job 被取消,sayHelloWorldInContextlaunch 的协程也会被取消,可以解决前面 Android Activity 带有生命周期结束后协程泄漏的问题。 但是另外的问题没有解决,开启协程的函数并不会等待异步任务结束,返回之后异步任务有可能还在执行。所以更好的写法是这样:

suspend fun sayHelloWorld() {
  val job = Job(parent = coroutineContext[Job]) 
  GlobalScope.launch(job) { 
    delay(500)
    print("Hello ")
  }
  GlobalScope.launch(job) { 
    delay(1000)
    print("World!")
  }
  job.complete()
  job.join()
}

在这个例子中,我们在 sayHelloWorld 这个 suspend 函数内创建一个新的 Job 实例,并在函数内部手动去 join 这个 Job。这样的代码写起来有点麻烦,容易忘记,并没有比 Java 的线程 API 好多少。Kotlin 老手可能意识到可以把 suspend 函数内部和 Job 相关的逻辑封装成一个高阶函数,接收一个以 Job 实例为 receiver 的 lambda,比如:

suspend fun sayHelloWorld() = job {
  // `this` is Job
  GlobalScope.launch(this) { 
    delay(500)
    print("Hello ")
  }
  GlobalScope.launch(this) { 
    delay(1000)
    print("World!")
  }
}

这很 Kotlin。但是 launch(this) 有些尴尬。Kotlin 老手可能会想到如果 launch 是定义在 job 块的 Receiver 上的话,那么我们可以直接这个块里面 launch ,写法上就和 0.26.0 之前的全局顶层函数很像了。

到这里我们已经差不多重新发明了 Kotlin 协程库 Structured Concurrency 的两大支柱——coroutineScope 高阶函数和 CoroutineScope 接口。coroutineScope 类似我们写的 job 函数(Kotlin 官方曾考虑用这个名字),而 CoroutineScope 就是前面提到的 Receiver。

Kotlin 协程的结构化并发设计

Kotlin 0.26.0 废弃了所有全局顶层函数 coroutine builder,改成了 CoroutineScope 上的扩展方法。这样开启协程必须有一个 CoroutineScope。这一设计比简单地添加一个 job 高阶函数好得多。Dijkstra 的观点并不仅仅是说推荐大家使用条件、循环、函数等控制体,更重要的是应该在编程语言中废弃 goto。因为只要 goto 存在,每个函数内部都可能藏着一个 goto,从而打破「黑盒」的性质,破坏封装性。类似地,结构化并发认为应当废弃「非结构化」的、fire-and-forget 的异步 API。CoroutineScope 的引入,使结构化并发在 Kotlin 协程 API 中成为了默认行为。

根据目前的最佳实践,在 suspend 函数中如果需要开启新的协程,需要先借助 coroutineScope 打开一个新的块,这个块包含了一个新的 Job 并限定了所有在其中开启的协程的生命周期:如果代码运行到 coroutineScope 块后面,意味着所有在这个块里面的异步任务都已成功结束;如果 coroutineScope 中任意一个协程抛出了异常,那么调用栈回退,异常会被传递到 coroutineScope 的外层。在下面的例子中, 如果任意一个 loadImage 失败抛出异常,这个异常会被传递给 loadAndCombineImage 的调用方。


suspend fun loadAndCombineImage(name1: String, name2: String): Image =
  coroutineScope { 
    val image1 = async { loadImage(name1) }
    val image2 = async { loadImage(name2) }
    return combineImages(image1.await(), image2.await())
  }

// ⚠️ 使用了已废弃的全局顶层 coroutine builder
suspend fun loadAndCombineImage(name1: String, name2: String): Image {
  val image1 = async { loadImage(name1) }
  val image2 = async { loadImage(name2) }
  return combineImages(image1.await(), image2.await())
}

而顶层的协程是「世界的尽头 」,一般需要和框架有生命周期的组件集成,配置一个 CoroutineScope 。例如:

class MyActivity : CoroutineScope { 
  val job = SupervisorJob()
  override val coroutineContext = Dispatchers.Main + job 

  fun doSomethingInBackground() = launch { ... }
  fun onDestroy() { job.cancel() }
}

这个例子中, Kotlin 协程的 Context、Scope 和 Job 三个零件优雅地拼接在一起:我们让具有生命周期的系统组件实现 CoroutineScope,这样需要 override coroutineContext ,我们在其中配置所有在这个作用域中开启的协程的默认属性。由于 MyActivity 「是」一个 CoroutineScope,开启协程的时候可以省去 this ,API 调用起来看着和全局顶层函数一样,但加上了生命周期限定。

然而我们熟悉的 androidx 通过 LifecycleOwner.lifecycleScope 扩展属性提供了 Scope。使用扩展属性的方式比直接实现 interface 更加「开箱即用」,侵入性更低,同时更加显式,便于 Kotlin 协程在 Android 社区推广开来。使用上面手动集成的方法最好在项目中有个 BaseActivity 这样的基类,同时还需要开发者弄清楚 Context、Scope、Job 这些概念,学习成本稍高一些。

对于非结构化、传统的 fire-and-forget 并发,Kotlin 提供了前面用来举例子的 GlobalScope 。读到这里,相信可以想象出 GlobalScope 的实现:

public object GlobalScope : CoroutineScope {
  override val coroutineContext: CoroutineContext
    get() = EmptyCoroutineContext
}

一般不推荐在应用里使用 GlobalScope 。根据结构化并发的思想,GlobalScope 长远看可能最终会被废弃。一些「后台」异步任务可以考虑在生命周期更长的组件上定义 CoroutineScope,比如 Android 的 Application 以及 Spring singleton scope 的组件。比较方便的做法可以:

val appScope = GlobalScope

在业务逻辑代码中引用自己定义的 appScope ,方便在一个统一的位置对协程进行配置。

Kotlin 协程的两个约定

Kotlin 结构化并发如何解决「全局顶层 coroutine builder 函数造成的麻烦」这个标题下的问题?以处理文件流的问题为例:

开启协程必须有 CoroutineScope。我们可以把这个 CoroutineScope 显式地传进封装的函数;也可以像 coroutine builder 一样定义成 CoroutineScope 的扩展方法。两种只是形式上的区别,实质是一样的,但后者似乎更加符合 Kotlin 的 style。

fun CoroutineScope.process(stream: InputStream) { 
  launch {
    delay(1000)
    stream.reader().readText()
  }
}

suspend fun main() {
  File("foo.txt").inputStream().use {
    coroutineScope { 
      process(it)
    }
    // 🚩 程序运行到这里 process 一定正常结束了
  }
}

使用结构化并发,在 process 外面包一个 coroutineScope 块以后,调用方可以控制所调用函数内开启的协程的生命周期。我们可以确定 coroutineScope 块结束以后意味着 process 开启的全部异步任务都已经顺利结束。

Kotlin 协程抽象复用的机制除了 CoroutineScope 上的扩展函数之外还有本文的姊妹篇 《理解 Kotlin 的 suspend 函数》 中介绍的 suspend 函数。两者有重要的不同:

  • 定义在 CoroutineScope 上的扩展函数提供了这样的约定(Convention):这个函数会立即返回,但是函数会开启异步任务,可以理解为这个函数内的子程序和调用方的的代码并发执行。
  • suspend 函数提供的约定:调用这个函数不会阻塞线程,函数内的子程序执行完毕以后函数才会返回,控制流回到调用方。suspend 函数不应该有开启异步任务的副作用。

Suspend functions are sequential by default. Concurrency is hard, and its launch must be explicit.

Roman Elizarov, Project Lead for Kotlin

可以看到,Kotlin 在类型系统对这两种不同性质的函数作了区分:

// slow work but does not block caller's thread
suspend foo(params: Params): Response

// launch concurrent subprogram with surrounding code
fun CoroutineScope.foo(params: Params): Response

理解和遵循这两个约定是用好 Kotlin 协程的关键。

🚨

suspend 函数和 coroutine builder 语义不同:一个与调用方代码顺序执行,另一个并发执行。不要将两种语义混在一起。

// ❌ 反面示例:违背了 Kotlin 协程的设计约定
suspend fun CoroutineScope.foo()

参考资料

Structured Concurrency

Roman Elizarov 有多篇文章和视频介绍 Kotlin 的 Structured Concurrency:

Goroutine 和 Context 部分内容参考了 《Go 语言设计和实现》The Go Programming Language

后记

协程「姊妹篇」

public interface Continuation<in T> {
  //《谈谈 Kotlin 协程的 Context 和 Scope》
  public val context: CoroutineContext

  //《理解 Kotlin 的 suspend 函数》
  public fun resumeWith(result: Result<T>)
}

笔者在写作本文的时候意识到,这两篇关于 Kotlin 协程的文章分别介绍了 suspend 函数和 coroutine builder,正好对应 Continuation interface 的两个组成部分:CoroutineContextresumeWith 方法。这一两分还体现在:

  • suspend 函数和 coroutine builder 在类型系统上的区别;
  • Kotlin 标准库提供的 CPS 变换基础设施 / kotlinx.coroutines 协程的实现;
  • suspend 函数与 coroutine builder 内取消协程的差异等。

两篇文章正好形成姊妹两篇,互相补充。

🔗

欢迎阅读本文的「姊妹篇」:《理解 Kotlin 的 suspend 函数》

国内对 Kotlin 协程的介绍

笔者最早学习 Kotlin 协程主要是看其主要设计者 Roman Elizarov 先生的演讲以及在 Medium 上发表的文章。 Roman 的介绍非常 high-level,着重问题、概念、思想和设计。并发实践匮乏会导致有时候难以领会 Kotlin 协程要解决的问题,无法理解。 这两篇介绍协程的文章补充解释了笔者学习过程中产生的一些困惑,或许可以当作 Roman 演讲和文章的注脚。

在学习、写作的过程中笔者也看了一些国内对 Kotlin 协程的介绍,感觉对协程重要概念的介绍相对较少, 比如本文提到的 Structured Concurrency、两个 Conventions 等。 很多文章对实现细节情有独钟,想要「破解」协程,或者「扒了协程的皮」。 分析原理的时候摘录大量源码,翻译一些源码里面的注释,读起来让人「不明觉厉」。 但仔细看的话会发现,由于缺少对一些重要的高层概念的把握,很多对源码的解读其实是片面甚至错误的。

学习一个库或者框架,直接看它的实现原理并不是最高效的方式。即使源码看明白了也不一定用得对。 把握设计思想和理念更加重要。所有这些框架类库都为了解决某个问题而诞生。 有了解决问题的思路以后,作者可能采取各种 hack 达成目的,同时在不断发展成熟的过程中会加入很多优化。 所有这些细节都有可能掩盖问题和 idea 的本质。不先学习 idea 绕在细节里,很难形成自己的思路,解决新的问题。

评论区 Discussions · 也可前往 GitHub 评论区 互动