49

Kotlin Coroutines(协程) 完全解析(三),封装异步回调、协程间关系及协程的取消

 5 years ago
source link: http://johnnyshieh.me/posts/kotlin-coroutine-integration-and-cancel/?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Kotlin Coroutines(协程) 完全解析系列:

Kotlin Coroutines(协程) 完全解析(一),协程简介

Kotlin Coroutines(协程) 完全解析(二),深入理解协程的挂起、恢复与调度

Kotlin Coroutines(协程) 完全解析(三),封装异步回调、协程间关系及协程的取消

本文基于 Kotlin v1.3.0-rc-146,Kotlin-Coroutines v1.0.0-RC1

前面两篇文章解析了挂起函数通过状态机来实现,协程的本质就是有三层包装的 Continuation ,这篇文章进一步解析协程的运用。主要介绍如何将异步回调封装为挂起函数,解析协程之间的关系以及协程的取消。

1. 封装异步回调为挂起函数

在异步编程中,回调是非常常见的写法,那么如何将回调转换为协程中的挂起函数呢?可以通过两个挂起函数 suspendCoroutine{}suspendCancellableCoroutine{} ,下面看如何将 OkHttp 的网络请求转换为挂起函数。

suspend fun <T>Call<T>.await(): T = suspendCoroutine { cont ->
    enqueue(object : Callback<T> {
        override fun onResponse(call:Call<T>, response:Response<T>) { 
            if (response.isSuccessful) {
                cont.resume(response.body()!!)
            } else {
                cont.resumeWithException(ErrorResponse(response))
            }
        }
        override fun onFailure(call:Call<T>, t:Throwable) { 
            cont.resumeWithException(t)
        } 
    })
}

上面的 await() 的扩展函数调用时,首先会挂起当前协程,然后执行 enqueue 将网络请求放入队列中,当请求成功时,通过 cont.resume(response.body()!!) 来恢复之前的协程。

再来看下 suspendCoroutine{}suspendCancellableCoroutine{} 的定义:

public suspend inline fun <T>suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
    suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
        val safe = SafeContinuation(c.intercepted())
        block(safe)
        safe.getOrThrow()
    }

public suspend inline fun <T>suspendCancellableCoroutine(
    crossinline block: (CancellableContinuation<T>) -> Unit
): T =
    suspendCoroutineUninterceptedOrReturn { uCont ->
        val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
        // 和 suspendCoroutine 的区别就在这里,如果协程已经被取消或者已完成,就会抛出 CancellationException 异常
        cancellable.initCancellability()
        block(cancellable)
        cancellable.getResult()
    }

它们的关键实现都是调用 suspendCoroutineUninterceptedOrReturn() 函数,它的作用是获取当前协程的实例,并且挂起当前协程或者不挂起直接返回结果。

协程中还有两个常见的挂起函数使用到了 suspendCoroutineUninterceptedOrReturn() 函数,分别是 delay()yield()

1.1 delay 的实现

public suspend fun delay(timeMillis:Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
    }
}

/** Returns [Delay] implementation of the given context */
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay

internal actual val DefaultDelay: Delay = DefaultExecutor

delay 使用 suspendCancellableCoroutine 挂起协程,而协程恢复的一般情况下是关键在 DefaultExecutor.scheduleResumeAfterDelay() ,其中实现是 schedule(DelayedResumeTask(timeMillis, continuation)) ,其中的关键逻辑是将 DelayedResumeTask 放到 DefaultExecutor 的队列最后,在延迟的时间到达就会执行 DelayedResumeTask,那么该 task 里面的实现是什么:

override fun run() {
    // 直接在调用者线程恢复协程
    with(cont) { resumeUndispatched(Unit) }
}

1.2 yield 的实现

yield() 的作用是挂起当前协程,然后将协程分发到 Dispatcher 的队列,这样可以让该协程所在线程或线程池可以运行其他协程逻辑,然后在 Dispatcher 空闲的时候继续执行原来协程。简单的来说就是让出自己的执行权,给其他协程使用,当其他协程执行完成或也让出执行权时,一开始的协程可以恢复继续运行。

看下面的代码示例:

fun main(args:Array<String>) = runBlocking<Unit> {
    launch {
        repeat(3) {
            println("job1 repeat$ittimes")
            yield()
        }
    }
    launch {
        repeat(3) {
            println("job2 repeat$ittimes")
            yield()
        }
    }
}

通过 yield() 实现 job1 和 job2 两个协程交替运行,输出如下:

job1 repeat 0 times
job2 repeat 0 times
job1 repeat 1 times
job2 repeat 1 times
job1 repeat 2 times
job2 repeat 2 times

现在来看其实现:

public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
    val context = uCont.context
    // 检测协程是否已经取消或者完成,如果是的话抛出 CancellationException
    context.checkCompletion()
    // 如果协程没有线程调度器,或者像 Dispatchers.Unconfined 一样没有进行调度,则直接返回
    val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
    if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit
    // dispatchYield(Unit) 最终会调用到 dispatcher.dispatch(context, block) 将协程分发到调度器队列中,这样线程可以执行其他协程
    cont.dispatchYield(Unit)
    COROUTINE_SUSPENDED
}

所以注意到, yield() 需要依赖协程的线程调度器,而调度器再次执行该协程时,在第二篇中有讲过会调用 resume 来恢复协程运行。

现在来看封装异步逻辑为挂起函数的关键是用 suspendCoroutineUninterceptedOrReturn 函数包装,然后在异步逻辑完成时调用 resume 手动恢复协程。

2. 协程之间的关系

官方文档中有提到协程之间可能存在父子关系,取消父协程时,也会取消所有子协程。在 Job 的源码中有这样一段话描述协程间父子关系:

* A parent-child relation has the following effect:
*
* * Cancellation of parent with [cancel] or its exceptionalcompletion(failure)
*   immediately cancels all its children.
* * Parent cannot complete until all its children are complete. Parent waits for all its children to
*   complete in _completing_ or _cancelling_ state.
* * Uncaught exception in a child, by default, cancels parent. In particular, this applies to
*   children created with [launch][CoroutineScope.launch] coroutine builder. Note, that
*   [async][CoroutineScope.async] and other future-like
*   coroutine builders do not have uncaught exceptions by definition, since all their exceptions are
*   caught and are encapsulated in their result.

所以协程间父子关系有三种影响:

  • 父协程手动调用 cancel() 或者异常结束,会立即取消它的所有子协程。

  • 父协程必须等待所有子协程完成(处于完成或者取消状态)才能完成。

  • 子协程抛出未捕获的异常时,默认情况下会取消其父协程。

下面先来看看协程是如何建立父子关系的, launchasync 新建协程时,首先都是 newCoroutineContext(context) 新建协程的 CoroutineContext 上下文,下面看其具体细节:

public actual fun CoroutineScope.newCoroutineContext(context:CoroutineContext): CoroutineContext {
    // 新协程继承了原来 CoroutineScope 的 coroutineContext
    val combined = coroutineContext + context
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    // 当新协程没有指定线程调度器时,会默认使用 Dispatchers.Default
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug
}

所以新的协程的 CoroutineContext 都继承了原来 CoroutineScope 的 coroutineContext,然后 launchasync 新建协程最后都会调用 start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) ,里面第一行是 initParentJob() ,通过注释可以知道就是这个函数建立父子关系的,下面看其实现细节:

// AbstractCoroutine.kt
internal fun initParentJob() {
    initParentJobInternal(parentContext[Job])
}

// JobSupport.kt
internal fun initParentJobInternal(parent:Job?) {
    check(parentHandle == null)
    if (parent == null) {
        parentHandle = NonDisposableHandle
        return
    }
    parent.start() // make sure the parent is started
    @Suppress("DEPRECATION")
    // 关键在于 parent.attachChild(this)
    val handle = parent.attachChild(this)
    parentHandle = handle
    // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
    if (isCompleted) {
        handle.dispose()
        parentHandle = NonDisposableHandle // release it just in case, to aid GC
    }
}

这里需要注意的是 GlobalScope 和普通协程的 CoroutineScope 的区别, GlobalScope 的 Job 是为空的, GlobalScope.launch{}GlobalScope.async{} 新建的协程是没有父协程的。

下面继续看 attachChild 的实现:

public final override fun attachChild(child:ChildJob): ChildHandle {
    return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle
}

invokeOnCompletion() 函数在前一篇解析 Deferred.await() 中有提到,关键是将 handler 节点添加到父协程 state.list 的末尾。

2.1 父协程手动调用 cancel() 或者异常结束,会立即取消它的所有子协程

跟踪父协程的 cancel() 调用过程,其中关键过程为 cancel() -> cancel(null) -> cancelImpl(null) -> makeCancelling(null) -> tryMakeCancelling(state, causeException) -> notifyCancelling(list, rootCause),下面继续分析 notifyCancelling(list, rootCause) 的实现:

// JobSupport.kt
private fun notifyCancelling(list:NodeList, cause:Throwable) {
    // first cancel our own children
    onCancellation(cause)
    // 这里会调用所有子协程绑定的 ChildHandleNode.invoke(cause) -> childJob.parentCancelled(parentJob) 来取消所有子协程
    notifyHandlers<JobCancellingNode<*>>(list, cause)
    // then cancel parent
    // cancelParent(cause) 不一定会取消父协程,cancel() 时不会取消父协程,因为此时产生 cause 的是 JobCancellationException,属于 CancellationException
    cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
}

public final override fun parentCancelled(parentJob:ParentJob) {
    // 父协程取消时,子协程会通过 parentCancelled 来取消自己
    cancelImpl(parentJob)
}

private fun cancelParent(cause:Throwable): Boolean {
    // CancellationException is considered "normal" and parent is not cancelled when child produces it.
    // This allow parent to cancel its children (normally) without being cancelled itself, unless
    // child crashes and produce some other exception during its completion.
    if (cause is CancellationException) return true
    if (!cancelsParent) return false
    // 当 cancelsParent 为 true, 且子线程抛出未捕获的异常时,默认情况下 childCancelled() 会取消其父协程。
    return parentHandle?.childCancelled(cause) == true
}

2.2 父协程必须等待所有子协程完成(处于完成或者取消状态)才能完成

前一篇文章有提到协程的完成通过 AbstractCoroutine.resumeWith(result) 实现,调用过程为 makeCompletingOnce(result.toState(), defaultResumeMode) -> tryMakeCompleting(),其中关键源码如下:

// JobSupport.kt
private fun tryMakeCompleting(state:Any?, proposedUpdate:Any?, mode:Int): Int {
    ...
    // now wait for children
    val child = firstChild(state)
    // 等待子协程完成
    if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
        return COMPLETING_WAITING_CHILDREN
    // otherwise -- we have not children left (all were already cancelled?)
    if (tryFinalizeFinishingState(finishing, proposedUpdate, mode))
        return COMPLETING_COMPLETED
    // otherwise retry
    return COMPLETING_RETRY
}

private tailrec fun tryWaitForChild(state:Finishing, child:ChildHandleNode, proposedUpdate:Any?): Boolean {
    // 添加 ChildCompletion 节点到子协程的 state.list 末尾,当子协程完成时会调用 ChildCompletion.invoke()
    val handle = child.childJob.invokeOnCompletion(
        invokeImmediately = false,
        handler = ChildCompletion(this, state, child, proposedUpdate).asHandler
    )
    if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
    // 循环设置所有其他子协程
    val nextChild = child.nextChild() ?: return false
    return tryWaitForChild(state, nextChild, proposedUpdate)
}

tryWaitForChild() 也是通过 invokeOnCompletion() 添加节点到子协程的 state.list 中,当子协程完成时会调用 ChildCompletion.invoke():

// ChildCompletion class
override fun invoke(cause:Throwable?) {
    parent.continueCompleting(state, child, proposedUpdate)
}

private fun continueCompleting(state:Finishing, lastChild:ChildHandleNode, proposedUpdate:Any?) {
    require(this.state === state) // consistency check -- it cannot change while we are waiting for children
    // figure out if we need to wait for next child
    val waitChild = lastChild.nextChild()
    // try wait for next child
    if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
    // no more children to wait -- try update state
    // 当所有子协程都完成时,才会 tryFinalizeFinishingState() 完成自己
    if (tryFinalizeFinishingState(state, proposedUpdate, MODE_ATOMIC_DEFAULT)) return
}

2.3 子协程抛出未捕获的异常时,默认情况下会取消其父协程。

子线程抛出未捕获的异常时,后续的处理会如何呢?在前一篇解析中协程的运算在第二层包装 BaseContinuationImpl 中,我们再看一次:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result:Result<Any?>) {
        ...
        var param = result
        while (true) {
            with(current) {
                val completion = completion!!
                val outcome: Result<Any?> =
                    try {
                        // 调用 invokeSuspend 方法执行,执行协程的真正运算逻辑
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        // 协程抛出未捕获的异常,会在这里被拦截,然后作为结果完成协程
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // 协程的状态修改在 AbstractCoroutine.resumeWith() 中
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
}

所以协程有未捕获的异常中,会在第二层包装中的 resumeWith() 捕获到,然后调用第一层包装 AbstractCoroutine.resumeWith() 来取消当前协程,处理过程为 AbstractCoroutine.resumeWith(Result.failure(exception)) -> JobSupport.makeCompletingOnce(CompletedExceptionally(exception), defaultResumeMode) -> tryMakeCompleting(state, CompletedExceptionally(exception), defaultResumeMode) -> notifyCancelling(list, exception) -> cancelParent(exception),所以出现未捕获的异常时,和手动调用 cancel() 一样会调用到 notifyCancelling(list, exception) 来取消当前协程,和手动调用 cancel() 的区别在于 exception 不是 CancellationException。

private fun cancelParent(cause:Throwable): Boolean {
    // CancellationException is considered "normal" and parent is not cancelled when child produces it.
    // This allow parent to cancel its children (normally) without being cancelled itself, unless
    // child crashes and produce some other exception during its completion.
    if (cause is CancellationException) return true
    if (!cancelsParent) return false
    // launch 和 async 新建的协程的 cancelsParent 都为 true, 所以子线程抛出未捕获的异常时,默认情况下 childCancelled() 会取消其父协程。
    return parentHandle?.childCancelled(cause) == true
}

// 默认情况下 childCancelled() 会取消取消协程
public open fun childCancelled(cause:Throwable): Boolean =
    cancelImpl(cause) && handlesException

3. 协程的取消

前面分析协程父子关系中的取消协程时,可以知道协程的取消只是在协程的第一层包装中 AbstractCoroutine 中修改协程的状态,并没有影响到第二层包装中 BaseContinuationImpl 中协程的实际运算逻辑。所以协程的取消只是状态的变化,并不会取消协程的实际运算逻辑,看下面的代码示例:

fun main(args:Array<String>) = runBlocking {
    val job1 = launch(Dispatchers.Default) {
        repeat(5) {
            println("job1 sleep${it +1}times")
            delay(500)
        }
    }
    delay(700)
    job1.cancel()

    val job2 = launch(Dispatchers.Default) {
        var nextPrintTime = 0L
        var i = 1
        while (i <= 3) {
            val currentTime = System.currentTimeMillis()
            if (currentTime >= nextPrintTime) {
                println("job2 sleep${i++}...")
                nextPrintTime = currentTime + 500L
            }
        }
    }
    delay(700)
    job2.cancel()
}

输出结果如下:

job1 sleep 1 times
job1 sleep 2 times
job2 sleep 1 ...
job2 sleep 2 ...
job2 sleep 3 ...

上面代码中 job1 取消后, delay() 会检测协程是否已取消,所以 job1 之后的运算就结束了;而 job2 取消后,没有检测协程状态的逻辑,都是计算逻辑,所以 job2 的运算逻辑还是会继续运行。

所以为了可以及时取消协程的运算逻辑,可以检测协程的状态,使用 isActive 来判断,上面示例中可以将 while(i <= 3) 替换为 while(isActive)

4. 小结

最后总结下本文的内容,封装异步代码为挂起函数其实非常简单,只需要用 suspendCoroutine{}suspendCancellableCoroutine{} ,还要异步逻辑完成用 resume()resumeWithException 来恢复协程。

新建协程时需要协程间关系, GlobalScope.launch{}GlobalScope.async{} 新建的协程是没有父协程的,而在协程中使用 launch{}aysnc{} 一般都是子协程。对于父子协程需要注意下面三种关系:

  • 父协程手动调用 cancel() 或者异常结束,会立即取消它的所有子协程。

  • 父协程必须等待所有子协程完成(处于完成或者取消状态)才能完成。

  • 子协程抛出未捕获的异常时,默认情况下会取消其父协程。

对于协程的取消, cancel() 只是将协程的状态修改为已取消状态,并不能取消协程的运算逻辑,协程库中很多挂起函数都会检测协程状态,如果想及时取消协程的运算,最好使用 isActive 判断协程状态。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK