38

Kotlin Coroutines(协程) 完全解析(五),协程的并发

 5 years ago
source link: http://johnnyshieh.me/posts/kotlin-coroutine-concurrency/?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Kotlin Coroutines(协程) 完全解析系列:

Kotlin Coroutines(协程) 完全解析(一),协程简介

Kotlin Coroutines(协程) 完全解析(二),深入理解协程的挂起、恢复与调度

Kotlin Coroutines(协程) 完全解析(三),封装异步回调、协程间关系及协程的取消

Kotlin Coroutines(协程) 完全解析(四),协程的异常处理

Kotlin Coroutines(协程) 完全解析(五),协程的并发

本文基于 Kotlin v1.3.0-rc-146,Kotlin-Coroutines v1.0.0-RC1

通过前面几篇文章可以明白协程就是可以挂起和恢复执行的运算逻辑,挂起函数用状态机的方式用挂起点将协程的运算逻辑拆分为不同的片段,每次运行协程执行的不同的逻辑片段。所以协程在运行时只是线程中的一块代码,线程的并发处理方式都可以用在协程上。不过协程还提供两种特有的方式,一是不阻塞线程的互斥锁 Mutex ,一是通过 ThreadLocal 实现的协程局部数据。

1. Mutex

线程中锁都是阻塞式,在没有获取锁时无法执行其他逻辑,而协程可以通过挂起函数解决这个,没有获取锁就挂起协程,获取后再恢复协程,协程挂起时线程并没有阻塞可以执行其他逻辑。这种互斥锁就是 Mutex ,它与 synchronized 关键字有些类似,还提供了 withLock 扩展函数,替代常用的 mutex.lock; try {...} finally { mutex.unlock() } :

fun main(args:Array<String>) = runBlocking<Unit> {
    val mutex = Mutex()
    var counter = 0
    repeat(10000) {
        GlobalScope.launch {
            mutex.withLock {
                counter ++
            }
        }
    }
    println("The final count is$counter")
}

Mutex 的使用比较简单,不过需要注意的是多个协程竞争的应该是同一个 Mutex 互斥锁。

2. 协程局部数据

线程中可以使用ThreadLocal作为线程局部数据,每个线程中的数据都是独立的。协程中可以通过 ThreadLocal.asContextElement() 扩展函数实现协程局部数据,每次协程切换会恢复之前的值。先看下面的示例:

fun main(args:Array<String>) = runBlocking<Unit> {
    val threadLocal = ThreadLocal<String>().apply { set("Init") }
    printlnValue(threadLocal)
    val job = GlobalScope.launch(threadLocal.asContextElement("launch")) {
        printlnValue(threadLocal)
        threadLocal.set("launch changed")
        printlnValue(threadLocal)
        yield()
        printlnValue(threadLocal)
    }
    job.join()
    printlnValue(threadLocal)
}

private fun printlnValue(threadLocal:ThreadLocal<String>) {
    println("${Thread.currentThread()}thread local value:${threadLocal.get()}")
}

输出如下:

Thread[main,5,main] thread local value: Init
Thread[DefaultDispatcher-worker-1,5,main] thread local value: launch
Thread[DefaultDispatcher-worker-1,5,main] thread local value: launch changed
Thread[DefaultDispatcher-worker-2,5,main] thread local value: launch
Thread[main,5,main] thread local value: Init

上面的输出有个疑问的地方,为什么执行 yield() 挂起函数后 threadLocal 的值不是 launch changed 而变回了 launch ?

下面直接分析源码:

// 注意这里 value 的默认值是 ThreadLocal 当前值
public fun <T>ThreadLocal<T>.asContextElement(value:T=get()): ThreadContextElement<T> =
    ThreadLocalElement(value, this)

internal class ThreadLocalElement<T>(
    private val value: T,
    private val threadLocal: ThreadLocal<T>
) : ThreadContextElement<T> {
    override val key: CoroutineContext.Key<*> = ThreadLocalKey(threadLocal)

    override fun updateThreadContext(context:CoroutineContext): T {
        val oldState = threadLocal.get()
// 设置 threadLocal 的值为 value 前先保存了之前的值
        threadLocal.set(value)
        return oldState
    }

    override fun restoreThreadContext(context:CoroutineContext, oldState:T) {
// 将 threadLocal 修改为之前保存的值
        threadLocal.set(oldState)
    }
    ...
}

// 协程启动和恢复都会用此函数包装,在 Dispatched.run()、DisptchedContinuation.resumeWith() 、
// DisptchedContinuation.resumeUndispatched() 等协程启动和恢复的地方都可以发现此函数的踪影
internal actual inline fun <T>withCoroutineContext(context:CoroutineContext, countOrElement:Any?, block: () -> T): T {
// updateThreadContext() 函数会调用到 ThreadContextElement.updateThreadContext(context)
// oldValue 是 threadLocal 之前的值
    val oldValue = updateThreadContext(context, countOrElement)
    try {
        return block()
    } finally {
// restoreThreadContext() 函数会调用到 ThreadContextElement.restoreThreadContext(context, oldValue)
        restoreThreadContext(context, oldValue)
    }
}

根据上面的源码和断点调试,可以发现协程的启动和恢复都会执行一次 ThreadContextElement.updateThreadContext(context)ThreadContextElement.restoreThreadContext(context, oldValue) ,现在再分析一次上面的代码运行:

fun main(args:Array<String>) = runBlocking<Unit> {
    val threadLocal = ThreadLocal<String>().apply { set("Init") }
// 此时在 main 线程,threadLocal 的值为 Init
    printlnValue(threadLocal)
    val job = GlobalScope.launch(threadLocal.asContextElement("launch")) {
// 启动协程后,切换到 DefaultDispatcher-worker-1 线程,threadLocal 在该线程的值为 null
        // 调用 updateThreadContext() 设置 threadLocal 的值为 launch,保存之前的为 null
        printlnValue(threadLocal)
        // 在 DefaultDispatcher-worker-1 线程,修改 threadLocal 的值为 launch changed
        threadLocal.set("launch changed")
        printlnValue(threadLocal)
        // yield() 挂起函数会挂起当前协程,并将协程分发到 Dispatcher.Default 的队列中等待恢复
        // 挂起协程后调用 restoreThreadContext() 修改 threadLocal 为 null
        yield()
// 恢复协程后,此时在 DefaultDispatcher-worker-2 线程,threadLocal 的值为 null
        // 再次调用 updateThreadContext() 设置 threadLocal 的值为 launch,保存之前的为 null
        printlnValue(threadLocal)
        // 结束协程后,restoreThreadContext() 修改 threadLocal 为 null
    }
    job.join()
    // 此时已经从 DefaultDispatcher-worker-2 线程切换回 main 线程,main 线程中的 threadlocal 没有修改过,还是为 Init
    printlnValue(threadLocal)
}

private fun printlnValue(threadLocal:ThreadLocal<String>) {
    println("${Thread.currentThread()}thread local value:${threadLocal.get()}")
}

所以 ThreadContextElement 并不能跟踪所有 ThreadLocal 对象的访问,而且每次挂起时更新的值将丢失。最重要的牢记它的原理: 启动和恢复时保存 ThreadLocal 在当前线程的值,并修改为 value,挂起和结束时修改当前线程 ThreadLocal 的值为之前保存的值。

3. 已有线程同步方式

  • 最简单的 synchronized 关键字

  • ReentrantLock 等 java.util.concurrent.locks 包中锁

  • AtomicInteger 等 java.util.concurrent.atomic 包中的原子类

  • ConcurrentHashMap 等线程安全的集合

协程中的并发与线程的并发大部分是相同的,所以本篇文章应该是目前为止该系列文章中最容易理解的一篇,本系列 Kotlin Coroutines(协程) 完全解析 暂时就到这里,后面待 select 表达式、Channel、Actor 等实验性内容正式发布后继续解析,还有在 Android 项目中协程的实际运用,敬请期待。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK