33

Kotlin Coroutines(协程) 完全解析(二),深入理解协程的挂起、恢复与调度

 5 years ago
source link: http://johnnyshieh.me/posts/kotlin-coroutine-deep-diving/?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Kotlin Coroutines(协程) 完全解析系列:

Kotlin Coroutines(协程) 完全解析(一),协程简介

Kotlin Coroutines(协程) 完全解析(二),深入理解协程的挂起、恢复与调度

本文基于 Kotlin v1.3.0-rc-146,Kotlin-Coroutines v1.0.0-RC1

前面一篇文章协程简介,简单介绍了协程的一些基本概念以及其简化异步编程的优势,但是协程与线程有什么区别,协程的挂起与恢复是如何实现的,还有协程运行在哪个线程上,依然不是很清楚。这篇文章将分析协程的实现原理,一步步揭开协程的面纱。先来看看协程中最关键的挂起函数的实现原理:

1. 挂起函数的工作原理

协程的内部实现使用了 Kotlin 编译器的一些编译技术,当挂起函数调用时,背后大致细节如下:

挂起函数或挂起 lambda 表达式调用时,都有一个隐式的参数额外传入,这个参数是 Continuation 类型,封装了协程恢复后的执行的代码逻辑。

用前文中的一个挂起函数为例:

suspend fun requestToken(): Token { ... }

实际上在 JVM 中更像下面这样:

ObjectrequestToken(Continuation<Token> cont){ ... }

Continuation 的定义如下,类似于一个通用的回调接口:

/**
 * Interface representing a continuation after a suspension point that returns value of type `T`.
 */
public interface Continuation<in T>{
    /**
     * Context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result:Result<T>)
}

现在再看之前 postItem 函数:

suspend fun requestToken(): Token { ... }   // 挂起函数
suspend fun createPost(token:Token, item:Item): Post { ... }  // 挂起函数
fun processPost(post:Post) { ... }

fun postItem(item:Item) {
    GlobalScope.launch {
        val token = requestToken()
        val post = createPost(token, item)
        processPost(post)
    }
}

然而,协程内部实现不是使用普通回调的形式,而是使用状态机来处理不同的挂起点,大致的 CPS(Continuation Passing Style) 代码为:

// 编译后生成的内部类大致如下
final class postItem$1extends SuspendLambda...{
    public final Object invokeSuspend(Object result){
        ...
        switch (this.label) {
            case 0:
                this.label = 1;
                token = requestToken(this)
                break;
            case 1:
                this.label = 2;
                Token token = result;
                post = createPost(token, this.item, this)
                break;
            case 2:
                Post post = result;
                processPost(post)
                break;
        }
    }
}

上面代码中每一个挂起点和初始挂起点对应的 Continuation 都会转化为一种状态,协程恢复只是跳转到下一种状态中。挂起函数将执行过程分为多个 Continuation 片段,并且利用状态机的方式保证各个片段是顺序执行的。

coroutine_continuation.png

1.1 挂起函数可能会挂起协程

挂起函数使用 CPS style 的代码来挂起协程,保证挂起点后面的代码只能在挂起函数执行完后才能执行,所以挂起函数保证了协程内的顺序执行顺序。

在多个协程的情况下,挂起函数的作用更加明显:

fun postItem(item:Item) {
    GlobalScope.launch {
        // async { requestToken() } 新建一个协程,可能在另一个线程运行
        // 但是 await() 是挂起函数,当前协程执行逻辑卡在第一个分支,第一种状态,当 async 的协程执行完后恢复当前协程,才会切换到下一个分支
        val token = async { requestToken() }.await()
        // 在第二个分支状态中,又新建一个协程,使用 await 挂起函数将之后代码作为 Continuation 放倒下一个分支状态,直到 async 协程执行完
        val post = aync { createPost(token, item) }.await()
        // 最后一个分支状态,直接在当前协程处理
        processPost(post)
    }
}

上面的例子中, await() 挂起函数挂起当前协程,直到异步协程完成执行,但是这里并没有 阻塞线程 ,是使用状态机的控制逻辑来实现。而且挂起函数可以保证挂起点之后的代码一定在挂起点前代码执行完成后才会执行,挂起函数保证顺序执行,所以异步逻辑也可以用顺序的代码顺序来编写。

注意挂起函数不一定会挂起协程,如果相关调用的结果已经可用,库可以决定继续进行而不挂起,例如 async { requestToken() } 的返回值 Deferred 的结果已经可用时, await() 挂起函数可以直接返回结果,不用再挂起协程。

1.2 挂起函数不会阻塞线程

挂起函数挂起协程,并不会阻塞协程所在的线程,例如协程的 delay() 挂起函数会暂停协程一定时间,并不会阻塞协程所在线程,但是 Thread.sleep() 函数会阻塞线程。

看下面一个例子,两个协程运行在同一线程上:

fun main(args:Array<String>) {
    // 创建一个单线程的协程调度器,下面两个协程都运行在这同一线程上
    val coroutineDispatcher = newSingleThreadContext("ctx")
    // 启动协程 1
    GlobalScope.launch(coroutineDispatcher) {
        println("the first coroutine")
        delay(200)
        println("the first coroutine")
    }
    // 启动协程 2
    GlobalScope.launch(coroutineDispatcher) {
        println("the second coroutine")
        delay(100)
        println("the second coroutine")
    }
    // 保证 main 线程存活,确保上面两个协程运行完成
    Thread.sleep(500)
}

运行结果为:

the first coroutine
the second coroutine
the second coroutine
the first coroutine

从上面结果可以看出,当协程 1 暂停 200 ms 时,线程并没有阻塞,而是执行协程 2 的代码,然后在 200 ms 时间到后,继续执行协程 1 的逻辑。所以挂起函数并不会阻塞线程,这样可以节省线程资源,协程挂起时,线程可以继续执行其他逻辑。

1.3 挂起函数恢复协程后运行在哪个线程

协程的所属的线程调度在前一篇文章《协程简介》中有提到过,主要是由协程的 CoroutineDispatcher 控制, CoroutineDispatcher 可以指定协程运行在某一特定线程上、运作在线程池中或者不指定所运行的线程。所以协程调度器可以分为 Confined dispatcherUnconfined dispatcherDispatchers.DefaultDispatchers.IODispatchers.Main 属于 Confined dispatcher ,都指定了协程所运行的线程或线程池,挂起函数恢复后协程也是运行在指定的线程或线程池上的,而 Dispatchers.Unconfined 属于 Unconfined dispatcher ,协程启动并运行在 Caller Thread 上,但是只是在第一个挂起点之前是这样的,挂起恢复后运行在哪个线程完全由所调用的挂起函数决定。

fun main(args:Array<String>) = runBlocking<Unit> {
    launch { // 默认继承 parent coroutine 的 CoroutineDispatcher,指定运行在 main 线程
        println("main runBlocking: I'm working in thread${Thread.currentThread().name}")
        delay(100)
        println("main runBlocking: After delay in thread${Thread.currentThread().name}")
    }
    launch(Dispatchers.Unconfined) {
        println("Unconfined : I'm working in thread${Thread.currentThread().name}")
        delay(100)
        println("Unconfined : After delay in thread${Thread.currentThread().name}")
    }
}

输出如下:

Unconfined      : I'm working in thread main
main runBlocking: I'm working in thread main
Unconfined      : After delay in thread kotlinx.coroutines.DefaultExecutor
main runBlocking: After delay in thread main

上面第三行输出,经过 delay 挂起函数后,使用 Dispatchers.Unconfined 的协程挂起恢复后依然在 delay 函数使用的 DefaultExecutor 上。

2. 协程深入解析

上面更多地是通过 demo 的方式说明挂起函数函数的一些特性,但是协程的创建、启动、恢复、线程调度、协程切换是如何实现的呢,还是不清楚,下面结合源码详细地解析协程。

2.1 协程的创建与启动

先从新建一个协程开始分析协程的创建,最常见的协程创建方式为 CoroutineScope.launch {} ,关键源码如下:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    ...
    coroutine.start(start, coroutine, block)
    return coroutine
}

coroutine.start(start, coroutine, block) 默认情况下会走到 startCoroutineCancellable ,最终会调用到 createCoroutineUnintercepted

/**
 * Creates unintercepted coroutine without receiver and with result type [T].
 * This function creates a new, fresh instance of suspendable computation every time it is invoked.
 *
 * To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance.
 * The [completion] continuation is invoked when coroutine completes with result or exception.
 ...
 */
 public actual fun <T>(suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit> { ... }

重点注意该方法的注释,创建一个协程,创建了一个新的可挂起计算,通过调用 resume(Unit) 启动该协程。而且返回值为 ContinuationContinuation 提供了 resumeWith 恢复协程的接口,用以实现协程恢复, Continuation 封装了协程的代码运行逻辑和恢复接口。

再看之前协程代码编译生成的内部类 final class postItem$1 extends SuspendLambda ... ,协程的计算逻辑封装在 invokeSuspend 方法中,而 SuspendLambda 的继承关系为 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation,其中 BaseContinuationImpl 部分关键源码如下:

internal abstract class BaseContinuationImpl(...) {
    // 实现 Continuation 的 resumeWith,并且是 final 的,不可被重写
    public final override fun resumeWith(result:Result<Any?>) {
        ...
        val outcome = invokeSuspend(param)
        ...
    }

    // 由编译生成的协程相关类来实现,例如 postItem$1
    protected abstract fun invokeSuspend(result:Result<Any?>): Any?
}

而这部分与之前的分析也是吻合的,启动协程流程是 resume(Unit) -> resumeWith() -> invokeSuspend() ,协程的挂起通过 suspend 挂起函数实现,协程的恢复通过 Continuation.resumeWith 实现。

2.2 协程的线程调度

协程的线程调度是通过拦截器实现的,前面提到了协程启动调用到了 startCoroutineCancellable ,该方法实现为:

internal fun <T>(suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =
    createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
// createCoroutineUnintercepted(completion) 会创建一个新的协程,返回值类型为 Continuation
// intercepted() 是给 Continuation 加上 ContinuationInterceptor 拦截器,也是线程调度的关键
// resumeCancellable(Unit) 最终将调用 resume(Unit) 启动协程

再看 intercepted() 的具体实现:

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this
// ContinuationImpl 是 SuspendLambda 的父类

internal abstract class ContinuationImpl(...) : BaseContinuationImpl(completion) {
    @Transient
    private var intercepted: Continuation<Any?>? = null

    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
    // intercepted() 方法关键是 context[ContinuationInterceptor]?.interceptContinuation(this)
    // context[ContinuationInterceptor] 就是协程的 CoroutineDispatcher
}

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    /**
     * Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
     */
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
}

所以 intercepted() 最终会使用协程的 CoroutineDispatcherinterceptContinuation 方法包装原来的 Continuation,拦截所有的协程运行操作。

DispatchedContinuation 拦截了协程的启动和恢复,分别是 resumeCancellable(Unit) 和重写的 resumeWith(Result)

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : Continuation<T> by continuation, DispatchedTask<T> {
    inline fun resumeCancellable(value:T) {
        // 判断是否需要线程调度
        if (dispatcher.isDispatchNeeded(context)) {
            ...
            // 将协程的运算分发到另一个线程
            dispatcher.dispatch(context, this)
        } else {
            ...
            // 如果不需要调度,直接在当前线程执行协程运算
            resumeUndispatched(value)
        }
    }

    override fun resumeWith(result:Result<T>) {
        // 判断是否需要线程调度
        if (dispatcher.isDispatchNeeded(context)) {
            ...
            // 将协程的运算分发到另一个线程
            dispatcher.dispatch(context, this)
        } else {
            ...
            // 如果不需要调度,直接在当前线程执行协程运算
            continuation.resumeWith(result)
        }
    }
}

internal interface DispatchedTask<in T> :Runnable {
    public override fun run() {
        ...
        // 封装了 continuation.resume 逻辑
    }
}

继续跟踪 newSingleThreadContext()Dispatchers.IOdispatch 方法的实现,发现其实都调用了 Executor.execute(Runnable) 方法,而 Dispatchers.Unconfined 的实现更简单,关键在于 isDispatchNeeded() 返回为 false

2.3 协程的挂起和恢复

Kotlin 编译器会生成继承自 SuspendLambda 的子类,协程的真正运算逻辑都在 invokeSuspend 中。但是协程挂起的具体实现是如何呢?先看下面示例代码:

fun main(args:Array<String>) = runBlocking<Unit> { // 新建并启动 blocking 协程,运行在 main 线程上,等待所有子协程运行完成后才会结束
    launch(Dispatchers.Unconfined) { // 新建并启动 launch 协程,没有指定所运行线程,一开始运行在调用者所在的 main 线程上
        println("${Thread.currentThread().name}: launch start")
        async(Dispatchers.Default) { // 新建并启动 async 协程,运行在 Dispatchers.Default 的线程池中
            println("${Thread.currentThread().name}: async start")
            delay(100)  // 挂起 async 协程 100 ms
            println("${Thread.currentThread().name}: async end")
        }.await() // 挂起 launch 协程,直到 async 协程结束
        println("${Thread.currentThread().name}: launch end")
    }
}

其中 launch 协程编译生成的 SuspendLambda 子类的 invokeSuspend 方法如下:

public final Object invokeSuspend(@NotNull Object result) {
    Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
    switch (this.label) {
        case 0:
            ...
            System.out.println(stringBuilder.append(currentThread.getName()).append(" : launch start").toString());
            // 新建并启动 async 协程
            Deferred async$default = BuildersKt.async$default(coroutineScope, (CoroutineContext) Dispatchers.getDefault(), null, (Function2) new 1(null), 2, null);
            this.label = 1;
            // 调用 await() 挂起函数
            if (async$default.await(this) == coroutine_suspended) {
                return coroutine_suspended;
            }
            break;
        case 1:
            if (result instanceof Failure) {
                throw ((Failure) result).exception;
            }
            // 恢复协程后再执行一次 resumeWith(),然后无异常的话执行最后的 println()
            break;
        default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }
    ...
    System.out.println(stringBuilder2.append(currentThread2.getName()).append(" : launch end").toString());
    return Unit.INSTANCE;
}

上面代码中 launch 协程挂起的关键在于 async$default.await(this) == coroutine_suspended ,如果此时 async 线程未执行完成, await() 返回为 IntrinsicsKt.getCOROUTINE_SUSPENDED() ,就会 return,launch 协程的 invokeSuspend 方法执行完成,协程所在线程继续往下运行,此时 launch 线程处于挂起状态。所以协程挂起就是协程挂起点之前逻辑执行完成,协程的运算关键方法 resumeWith() 执行完成,线程继续执行往下执行其他逻辑。

协程挂起有三点需要注意的:

  • 启动其他协程并不会挂起当前协程,所以 launchasync 启动线程时,除非新协程运行在当前线程,则当前协程只能在新协程运行完成后继续执行,否则当前协程都会马上继续运行。

  • 协程挂起并不会阻塞线程,因为协程挂起时相当于执行完协程的方法,线程继续执行其他之后的逻辑。

  • 挂起函数并一定都会挂起协程,例如 await() 挂起函数如果返回值不等于 IntrinsicsKt.getCOROUTINE_SUSPENDED() ,则协程继续执行挂起点之后逻辑。

下面继续分析 await() 的实现原理,它的实现中关键是调用了 JobSupport.awaitSuspend() 方法:

private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
    /*
        * Custom code here, so that parent coroutine that is using await
        * on its child deferred (async) coroutine would throw the exception that this child had
        * thrown and not a JobCancellationException.
        */
    val cont = AwaitContinuation(uCont.intercepted(), this)
    cont.initCancellability()
    invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler)
    cont.getResult()
}

private class ResumeAwaitOnCompletion<T>(
    job: JobSupport,
    private val continuation: AbstractContinuation<T>
) : JobNode<JobSupport>(job) {
    override fun invoke(cause:Throwable?) {
        val state = job.state
        check(state !is Incomplete)
        if (state is CompletedExceptionally) {
            // Resume with exception in atomic way to preserve exception
            continuation.resumeWithExceptionMode(state.cause, MODE_ATOMIC_DEFAULT)
        } else {
            // Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
            @Suppress("UNCHECKED_CAST")
            continuation.resume(state as T)
        }
    }
    override fun toString() = "ResumeAwaitOnCompletion[$continuation]"
}

上面源码中 ResumeAwaitOnCompletioninvoke 方法的逻辑就是调用 continuation.resume(state as T) 恢复协程。 invokeOnCompletion 函数里面是如何实现 async 协程完成后自动恢复之前协程的呢,源码实现有些复杂,因为很多边界情况处理就不全部展开,其中最关键的逻辑如下:

// handler 就是 ResumeAwaitOnCompletion 的实例,将 handler 作为节点
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
// 将 node 节点添加到 state.list 中
if (!addLastAtomic(state, list, node)) return@loopOnState // retry

接下来我断点调试 launch 协程恢复的过程,从 async 协程的 SuspendLambda 的子类的 completion.resumeWith(outcome) -> AbstractCoroutine.resumeWith(result) ..-> JobSupport.tryFinalizeSimpleState() -> JobSupport.completeStateFinalization() -> state.list?.notifyCompletion(cause) -> node.invoke ,最后 handler 节点里面通过调用 resume(result) 恢复协程。

而这过程中有两个 finalresumeWith 方法,一个是 SuspendLambda 的父类 BaseContinuationImpl 的,我们再来详细分析一篇:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result:Result<Any?>) {
        ...
        var param = result
        while (true) {
            with(current) {
                val completion = completion!!
                val outcome: Result<Any?> =
                    try {
                        // 调用 invokeSuspend 方法执行,执行协程的真正运算逻辑
                        val outcome = invokeSuspend(param)
                        // 协程挂起时 invokeSuspend 才会返回 COROUTINE_SUSPENDED,所以协程挂起时,其实只是协程的 resumeWith 运行逻辑执行完成,再次调用 resumeWith 时,协程挂起点之后的逻辑才能继续执行
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                // 这里可以看出 Continuation 其实分为两类,一种是 BaseContinuationImpl,封装了协程的真正运算逻辑
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // 断点时发现 completion 是 DeferredCoroutine 实例,这里实际调用的是其父类 AbstractCoroutine 的 resumeWith 方法
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
}

接下来再来看另外一类 Continuation,AbstractCoroutine 的 resumeWith 实现:

public abstract class AbstractCoroutine<in T>(
    @JvmField
    protected val parentContext: CoroutineContext,
    active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
    /**
     * Completes execution of this with coroutine with the specified result.
     */
    public final override fun resumeWith(result:Result<T>) {
        // makeCompletingOnce 大致实现是修改协程状态,如果需要的话还会将结果返回给调用者协程,并恢复调用者协程
        makeCompletingOnce(result.toState(), defaultResumeMode)
    }
}

所以其中一类 Continuation BaseContinuationImplresumeWith 封装了协程的运算逻辑,用以协程的启动和恢复;而另一类 Continuation AbstractCoroutine ,主要是负责维护协程的状态和管理,它的 resumeWith 则是完成协程,恢复调用者协程。

2.4 协程的三层包装

常用的 launchasync 返回的 JobDeferred ,里面封装了协程状态,提供了取消协程接口,而它们的实例都是继承自 AbstractCoroutine ,它是协程的第一层包装。第二层包装是编译器生成的 SuspendLambda 的子类,封装了协程的真正运算逻辑,继承自 BaseContinuationImpl ,其中 completion 属性就是协程的第一层包装。第三层包装是前面分析协程的线程调度时提到的 DispatchedContinuation ,封装了线程调度逻辑,包含了协程的第二层包装。三层包装都实现了 Continuation 接口,通过代理模式将协程的各层包装组合在一起,每层负责不同的功能。

下面是协程运行的流程图:

coroutine_flow.jpg

3. 小结

经过以上解析之后,再来看协程就是一段可以挂起和恢复执行的运算逻辑,而协程的挂起是通过挂起函数实现的,挂起函数用状态机的方式用挂起点将协程的运算逻辑拆分为不同的片段,每次运行协程执行的不同的逻辑片段。所以协程有两个很大的好处:一是简化异步编程,支持异步返回;而是挂起不阻塞线程,提供线程利用率。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK