35

Kotlin Coroutines(协程) 完全解析(四),协程的异常处理

 5 years ago
source link: http://johnnyshieh.me/posts/kotlin-coroutine-exception-handling/?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Kotlin Coroutines(协程) 完全解析系列:

Kotlin Coroutines(协程) 完全解析(一),协程简介

Kotlin Coroutines(协程) 完全解析(二),深入理解协程的挂起、恢复与调度

Kotlin Coroutines(协程) 完全解析(三),封装异步回调、协程间关系及协程的取消

Kotlin Coroutines(协程) 完全解析(四),协程的异常处理

Kotlin Coroutines(协程) 完全解析(五),协程的并发

本文基于 Kotlin v1.3.0-rc-146,Kotlin-Coroutines v1.0.0-RC1

在上一篇文章中提到子协程抛出未捕获的异常时默认会取消其父协程,而抛出 CancellationException 却会当作正常的协程结束不会取消其父协程。本文来详细解析协程中的异常处理,抛出未捕获异常后协程结束后运行会不会崩溃,可以拦截协程的未捕获异常吗,如何让子协程的异常不影响父协程。

Kotlin 官网文档中有关于协程异常处理的文章,里面的内容本文就不再重复,所以读者们先阅读官方文档:

Coroutine Exception handling

协程的异常处理(官方文档中文版)

看完官方文档后,可能还是会有一些疑问:

  • launch 式协程的未捕获异常为什么会自动传播到父协程,为什么对异常只是在控制台打印而已?

  • async 式协程的未捕获异常为什么需要依赖用户来最终消耗异常?

  • 自定义的 CoroutineExceptionHandler 的是如何生效的?

  • 异常的聚合是怎么处理的?

  • SupervisorJobsupervisorScope 实现异常单向传播的原理是什么?

这些疑问在本文逐步解析协程中异常处理的流程时,会一一解答。

1. 协程中异常处理的流程

从抛出异常的地方开始跟踪协程中异常处理的流程,抛出异常时一般都在协程的运算逻辑中。而在第二篇 深入理解协程的挂起、恢复与调度 中提到在协程的三层包装中,运算逻辑在第二层 BaseContinuationImplresumeWith() 函数中的 invokeSuspend 运行,所以再来看一次:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result:Result<Any?>) {
        ...
        var param = result
        while (true) {
            with(current) {
                val completion = completion!!
                val outcome: Result<Any?> =
                    try {
                        // 调用 invokeSuspend 方法执行,执行协程的真正运算逻辑
                        val outcome = invokeSuspend(param)
                        // 协程挂起时 invokeSuspend 才会返回 COROUTINE_SUSPENDED,所以协程挂起时,其实只是协程的 resumeWith 运行逻辑执行完成,再次调用 resumeWith 时,协程挂起点之后的逻辑才能继续执行
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        // 注意这个 catch 语句,其实协程运算中所有异常都会在这里被捕获,然后作为一种运算结果
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // 这里实际调用的是其父类 AbstractCoroutine 的 resumeWith 方法,当捕获到异常时,调用 resumeWith(Result.failure(exception)) 更新协程状态
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
}

从上面源码的 try {} catch {} 语句来看,首先 协程运算过程中所有未捕获异常其实都会在第二层包装中被捕获 ,然后会通过 AbstractCoroutine.resumeWith(Result.failure(exception)) 进入到第三层包装中,所以协程的第三层包装不仅维护协程的状态,还处理协程运算中的未捕获异常。这在第三篇分析子协程抛出未捕获异常,默认情况会取消其父线程时也提到过。

继续跟踪 AbstractCoroutine.resumeWith(Result.failure(exception)) -> JobSupport.makeCompletingOnce(CompletedExceptionally(exception), defaultResumeMode) -> JobSupport.tryMakeCompleting(state, CompletedExceptionally(exception), defaultResumeMode),在最后 tryMakeCompleting() 过程中部分关键代码:

private fun tryMakeCompleting(state:Any?, proposedUpdate:Any?, mode:Int): Int {
    ...
    // process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)
// 该情景下,notifyRootCause 的值为 exception
    notifyRootCause?.let { notifyCancelling(list, it) }
    // now wait for children
    val child = firstChild(state)
    if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
        return COMPLETING_WAITING_CHILDREN
    // otherwise -- we have not children left (all were already cancelled?)
// 已取消所有子协程后,更新该协程的最终状态
    if (tryFinalizeFinishingState(finishing, proposedUpdate, mode))
        return COMPLETING_COMPLETED
    // otherwise retry
    return COMPLETING_RETRY
}

先看 notifyCancelling(state.list, exception) 函数:

private fun notifyCancelling(list:NodeList, cause:Throwable) {
    // first cancel our own children
    onCancellation(cause)
// 这里会调用 handle 节点的 invoke() 方法取消子协程,具体点就是调用 childJob.parentCancelled(job) 取消子协程
    notifyHandlers<JobCancellingNode<*>>(list, cause)
    // then cancel parent
// 然后可能会取消父协程
    cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
}

private fun cancelParent(cause:Throwable): Boolean {
    // CancellationException is considered "normal" and parent is not cancelled when child produces it.
    // This allow parent to cancel its children (normally) without being cancelled itself, unless
    // child crashes and produce some other exception during its completion.
// CancellationException 是正常的协程结束行为,手动抛出 CancellationException 也不会取消父协程
    if (cause is CancellationException) return true
// cancelsParent 属性也可以决定出现异常时是否取消父协程,不过一般该属性都为 true
    if (!cancelsParent) return false
// parentHandle?.childCancelled(cause) 最后会通过调用 parentJob.childCancelled(cause) 取消父协程
    return parentHandle?.childCancelled(cause) == true
}

所以 出现未捕获异常时,首先会取消所有子协程,然后可能会取消父协程。 而有些情况下并不会取消父协程,一是当异常属于 CancellationException 时,而是使用 SupervisorJobsupervisorScope 时,子协程出现未捕获异常时也不会影响父协程,它们的原理是重写 childCancelled() 为 override fun childCancelled(cause: Throwable): Boolean = false

launch 式协程和 async 式协程都会自动向上传播异常,取消父协程。

接下来再看 tryFinalizeFinishingState 的实现:

private fun tryFinalizeFinishingState(state:Finishing, proposedUpdate:Any?, mode:Int): Boolean {
    ...
// proposedException 即前面未捕获的异常
    val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
    // Create the final exception and seal the state so that no more exceptions can be added
    var suppressed = false
    val finalException = synchronized(state) {
        val exceptions = state.sealLocked(proposedException)
        val finalCause = getFinalRootCause(state, exceptions)
        // Report suppressed exceptions if initial cause doesn't match final cause (due to JCE unwrapping)
// 如果在处理异常过程还有其他异常,这里通过 finalCause.addSuppressedThrowable(exception) 的方式记录下来
        if (finalCause != null) suppressed = suppressExceptions(finalCause, exceptions) || finalCause !== state.rootCause
        finalCause
    }
    ...
    // Now handle exception if parent can't handle it
// 如果 finalException 不是 CancellationException,而且有父协程且不为 SupervisorJob 和 supervisorScope,cancelParent(finalException) 都返回 true
// 也就是说一般情况下出现未捕获的异常,一般会传递到最根部的协程,由最顶端的协程去处理
    if (finalException != null && !cancelParent(finalException)) {
        handleJobException(finalException)
    }
    ...
    // And process all post-completion actions
    completeStateFinalization(state, finalState, mode, suppressed)
    return true
}

上面代码中 if (finalException != null && !cancelParent(finalException)) 语句可以看出,除非是 SupervisorJob 和 supervisorScope,一般协程出现未捕获异常时,不仅会取消父协程,一步步取消到最根部的协程,而且最后还由最根部的协程(Root Coroutine)处理协程。下面继续看处理异常的 handleJobException 的实现:

// JobSupport
protected open fun handleJobException(exception:Throwable) {}

// Builders.common.kt
private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
    override val cancelsParent: Boolean get() = true
    override fun handleJobException(exception:Throwable) = handleExceptionViaHandler(parentContext, exception)
}

// Actor
private open class ActorCoroutine<E>(
    ...
) : ChannelCoroutine<E>(parentContext, channel, active), ActorScope<E> {
    override fun onCancellation(cause:Throwable?) {
        _channel.cancel(cause)
    }

    override val cancelsParent: Boolean get() = true
    override fun handleJobException(exception:Throwable) = handleExceptionViaHandler(parentContext, exception)
}

默认的 handleJobException 的实现为空,所以如果 Root Coroutine 为 async 式协程,不会有任何异常打印操作,也不会 crash,但是为 launch 式协程或者 actor 式协程的话,会调用 handleExceptionViaHandler() 处理异常。

下面接着看 handleExceptionViaHandler() 的实现:

internal fun handleExceptionViaHandler(context: CoroutineContext, exception: Throwable) {
    // Invoke exception handler from the context if present
    try {
        context[CoroutineExceptionHandler]?.let {
            it.handleException(context, exception)
// 如果协程有自定义 CoroutineExceptionHandler,则只调用 handler.handleException() 就返回
            return
        }
    } catch (t: Throwable) {
        handleCoroutineExceptionImpl(context, handlerException(exception, t))
        return
    }

    // If handler is not present in the context or exception was thrown, fallback to the global handler
// 如果没有自定义 CoroutineExceptionHandler,
    handleCoroutineExceptionImpl(context, exception)
}

internal actual fun handleCoroutineExceptionImpl(context: CoroutineContext, exception: Throwable) {
    // use additional extension handlers
// 在 Android 中,还会有 uncaughtExceptionPreHandler 作为额外的 handlers
    for (handler in handlers) {
        try {
            handler.handleException(context, exception)
        } catch (t: Throwable) {
            // Use thread's handler if custom handler failed to handle exception
            val currentThread = Thread.currentThread()
            currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, handlerException(exception, t))
        }
    }

    // use thread's handler
    val currentThread = Thread.currentThread()
// 调用当前线程的 uncaughtExceptionHandler 处理异常
    currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
}

// Thread.java
public UncaughtExceptionHandler getUncaughtExceptionHandler() {
// 当前线程没有定义 uncaughtExceptionHandler,会返回线程组作为 Thread.UncaughtExceptionHandler
    return uncaughtExceptionHandler != null ?
        uncaughtExceptionHandler : group;
}

// ThreadGroup.java
public void uncaughtException(Thread t, Throwable e) {
    if (parent != null) {
        parent.uncaughtException(t, e);
    } else {
        Thread.UncaughtExceptionHandler ueh =
            Thread.getDefaultUncaughtExceptionHandler();
// 优先使用线程通用的 DefaultUncaughtExceptionHandler,如果也没有的话,则在控制台打印异常堆栈信息
        if (ueh != null) {
            ueh.uncaughtException(t, e);
        } else if (!(e instanceof ThreadDeath)) {
            System.err.print("Exception in thread \""
                                + t.getName() + "\" ");
            e.printStackTrace(System.err);
        }
    }
}

所以默认情况下, launch 式协程对未捕获的异常只是打印异常堆栈信息,如果在 Android 中还会调用 uncaughtExceptionPreHandler 处理异常。但是如果使用了 CoroutineExceptionHandler 的话,只会使用自定义的 CoroutineExceptionHandler 处理异常。

到这里协程的异常处理流程就走完了,但是还有一个问题还没解答, async 式协程的未捕获异常只会导致取消自己和取消父协程,又是如何依赖用户来最终消耗异常呢?

fun main(args:Array<String>) = runBlocking<Unit> {
    val deferred = GlobalScope.async {
        println("Throwing exception from async")
        throw IndexOutOfBoundsException()
    }
// await() 恢复调用者协程时会重写抛出异常
    deferred.await()
}

看看反编译的 class 文件就明白了:

public final Object invokeSuspend(@NotNull Object result){
    Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
    Deferred deferred;
    switch (this.label) {
        case 0:
            if (result instanceof Failure) {
                throw ((Failure) result).exception;
            }
            CoroutineScope coroutineScope = this.p$;
// 创建并启动一个新的 async 协程
            deferred = BuildersKt.async$default((CoroutineScope) GlobalScope.INSTANCE, null, null, (Function2) new 1(null), 3, null);
            this.L$0 = deferred;
            this.label = 1;
// await() 挂起函数挂起当前协程,等待 async 协程的结果
            if (deferred.await(this) == coroutine_suspended) {
                return coroutine_suspended;
            }
            break;
        case 1:
            deferred = (Deferred) this.L$0;
// async 协程恢复当前协程时,传递进来的结果是 CompletedExceptionally(IndexOutOfBoundsException())
            if (result instanceof Failure) {
// 在当前协程重新抛出 IndexOutOfBoundsException 异常
                throw ((Failure) result).exception;
            }
            break;
        default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }
    return Unit.INSTANCE;
}

所以 async 式协程只有通过 await() 将异常重新抛出,不过可以可以通过 try { deffered.await() } catch () { ... } 来捕获异常。

2. 小结

分析完协程的异常处理流程,其中需要注意的问题有下面这些:

  • 抛出 CancellationException 或者调用 cancel() 只会取消当前协程和子协程,不会取消父协程,也不会其他例如打印堆栈信息等的异常处理操作。

  • 抛出未捕获的非 CancellationException 异常会取消子协程和自己,也会取消父协程,一直取消 root 协程,异常也会由 root 协程处理。

  • 如果使用了 SupervisorJob 或 supervisorScope,子协程抛出未捕获的非 CancellationException 异常不会取消父协程,异常也会由子协程自己处理。

  • launch 式协程和 actor 式协程默认处理异常的方式只是打印堆栈信息,可以自定义 CoroutineExceptionHandler 来处理异常。

  • async 式协程本身不会处理异常,自定义 CoroutineExceptionHandler 也无效,但是会在 await() 恢复调用者协程时重新抛出异常。


Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK