42

golang源码阅读-sync.Mutex

 5 years ago
source link: https://studygolang.com/articles/17017?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

【golang源码分析】sync.Mutex

概述

Mutex只有两个阻塞方法Lock和Unlock,有两种工作模式:normal和starvation。

goroutine在抢占锁时,会先自旋一段时间,如果抢占失败会被放在一个FIFO等待队列中休眠,当锁被释放时,会优先唤醒队首的goroutine。

在normal模式中,被唤醒的waiter会跟新到的goroutine竞争锁,一般来说新的goroutine已经在cpu中进行了,并且可能有不止一个goroutine,在这种情况下刚被唤醒的goroutine有很大概率会失败,而被重新放回FIFO队首。如果当一个waiter等待获取锁的时间超过1ms,会将mutex转换成starvation模式。

在starvation模式,当mutex被unlock时,持有权会直接移交到队首的goroutine中。新加入的goroutine不会再参与锁的竞争,而是直接放入等待队列的尾部。

mutex通过一个state变量来维护锁的状态信息。

  • 低一位mutexLocked:表示锁是否被锁定
  • 低二位mutexWoken:表示是否有goroutine在竞争锁,这个主要应用于unlock时判断是否有必要唤醒等待队列中的goroutine
  • 低三位mutexStarving:表示锁的模式starvation或normal,剩余高位用于标志等待队列的长度。

代码分析

定义

Mutex实现了Locker接口,维护两个变量——state用来维护锁的状态,同时通过操作sema来唤醒和休眠等待goroutine

type Mutex struct {
    state int32
    sema  uint32
}

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
    Lock()
    Unlock()
}

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving
    mutexWaiterShift = iota

    starvationThresholdNs = 1e6
)

Lock

完整代码

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }

    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
        // Don't spin in starvation mode, ownership is handed off to waiters
        // so we won't be able to acquire the mutex anyway.
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // Active spinning makes sense.
            // Try to set mutexWoken flag to inform Unlock
            // to not wake other blocked goroutines.
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
        new := old
        // Don't try to acquire starving mutex, new arriving goroutines must queue.
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        // The current goroutine switches mutex to starvation mode.
        // But if the mutex is currently unlocked, don't do the switch.
        // Unlock expects that starving mutex has waiters, which will not
        // be true in this case.
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // If we were already waiting before, queue at the front of the queue.
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo)
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // Exit starvation mode.
                    // Critical to do it here and consider wait time.
                    // Starvation mode is so inefficient, that two goroutines
                    // can go lock-step infinitely once they switch mutex
                    // to starvation mode.
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

详解

  1. 首先默认state为0,通过cas操作将锁的状态更新为mutexLocked,尝试快速获取锁。在并发度较小的情况下,mutex多数处于初始状态,可以提供加锁性能。如果快速获取锁失败,goroutine会不断地进行自旋抢锁、休眠、唤醒抢锁,直到抢到锁后break。在这期间,需要通过cas对state进行更新,只有更新成功的goroutine才能进行下一阶段的操作。
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
  1. 竞争锁的goroutine会先自旋一段时间直到mutex被unlocked、或切换到starvation模式、或自旋次数达到上限。自旋过程中,goroutine也会尝试通过cas将mutexWoken位置1,以通知Unlock操作不必唤醒等待队列中的goroutine。starvatione模式是不需要自旋的,因为锁的持有权会直接移交给等待队列队首的goroutine,自旋操作没有实际意义。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
        // 当前goroutine未更新成功过woken位,woken位为0,等待队列非空,通过cas尝试更新woken位
        if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
            atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            awoke = true
        }
        runtime_doSpin()  // pause一段时间
        iter++
        old = m.state
        continue
}
  1. 从前面代码可以看到,退出自旋有三种原因,及其对应的处理如下:
    • mutex被Unlock:将mutexLocked置1,代表尝试抢锁,然后cas尝试更新state
    • mutex切换到starvation模式:不改变mutexLocked,waiter数量+1,cas更新state成功后后直接加入到等待队列里。
    • 自旋次数达到上限:不改变mutexLocked,waiter数量+1,如果starving变量为true,则将mutexStarving位置1。

starving是goroutine被唤醒时通过计算等待时间获取的,大于1ms则为true,表示需要切换到starvation模式。

这里需要注意,当mutex为unlocked的时候,不需要转换mutex模式。因为starving为true,说明该goroutine是normal模式下等待队列中刚被唤醒的waiter(原因需要看后面的代码,在starvation模式。会直接将mutex的持有权交给唤醒的goroutine,走不到这一步)。如果此时做了状态切换,可能会出现在starvation模式,等待队列为空的情况。因此, 只有自旋抢锁失败的情况下,才会由等待时间超过阈值的goroutine将mutex模式切换到starvation

new := old
// 锁被unlocked,将mutexLocked置1,代表尝试抢锁
if old&mutexStarving == 0 {
        new |= mutexLocked
}
// starving或自旋次数到上限, 等待数量+1
if old&(mutexLocked|mutexStarving) != 0 {
        new += 1 << mutexWaiterShift
}
// 如果已经是starvation状态,这一步没什么意义
if starving && old&mutexLocked != 0 {
        new |= mutexStarving
}
if awoke {
        if new&mutexWoken == 0 {
            throw("sync: inconsistent mutex state")
        }
        // 把awoke位给清掉
        new &^= mutexWoken
}
  1. cas 更新mutex状态,失败则重复上述操作,直到状态更新成功。
    • 如果是获取锁的动作,则直接break。
    • 否则将goroutine放到等待队列中,若waitStartTime不为0,则说明是waiter抢锁失败,应该重新返回队首。
if atomic.CompareAndSwapInt32(&m.state, old, new) {
        // 获取锁成功,break。接下来要处理starving和自旋次数达到上限的情况
        if old&(mutexLocked|mutexStarving) == 0 {
            break // locked the mutex with CAS
        }
       // waitStartTime说明是被唤醒的goroutine抢锁失败了,应该重新放回队首(queueLifo=true)
        queueLifo := waitStartTime != 0
        if waitStartTime == 0 {
            waitStartTime = runtime_nanotime()
        }
        // 休眠
        runtime_SemacquireMutex(&m.sema, queueLifo)
  1. 被唤醒后,如果当前starving为true,或等待时间大于阈值,则将starving置为true,下一次自旋抢锁失败会将mutex切换到starvation模式。在starvation模式,该goroutine直接获取锁,否则的话要重复前面的操作跟新来的goroutine抢锁
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state

if old&mutexStarving != 0 {
        // starving模式不应该出现mutex被locked或woken,等待队列长度也不能为0,有的话说明这段代码逻辑有bug
        if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
            throw("sync: inconsistent mutex state")
        }
        // 获取锁并将等待数-1
        delta := int32(mutexLocked - 1<<mutexWaiterShift)
        // 等待时间小于阈值,或该goroutine为等待队列中最后一个waiter,切换到normal模式
        if !starving || old>>mutexWaiterShift == 1 {
            delta -= mutexStarving
        }
        // 把状态更新过去,获取锁并退出。这里不用cas,因为前面操作保证了低三位都不会被更新,高位本来是原子计数,所以直接add就可以
        atomic.AddInt32(&m.state, delta)
        break
}
awoke = true
iter = 0

Unlock

// mutex允许一个goroutine加锁,另一个goroutine解锁,不要求两个操作保证同一个goroutine
func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    // 直接解锁,然后判断一下状态,如果不一致的话则抛异常
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }

  // starvation模式,则直接唤醒blocked队列队首goroutine
  // normal模式,如果等待队列为空,或此时锁的状态发生了变化,则不用唤醒等待队列中的goroutine
  // 否则的话,将等待数减1,cas更新state,成功则唤醒一个waiter
    if new&mutexStarving == 0 {
        old := new
        for {
            // If there are no waiters or a goroutine has already
            // been woken or grabbed the lock, no need to wake anyone.
            // In starvation mode ownership is directly handed off from unlocking
            // goroutine to the next waiter. We are not part of this chain,
            // since we did not observe mutexStarving when we unlocked the mutex above.
            // So get off the way.
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // Grab the right to wake someone.
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false)
                return
            }
            old = m.state
        }
    } else {
     // starving模式,直接将持有权交给下一个waiter。mutexLocked是在队首waiter被唤醒后更新的,所以此时mutexLocked还是0,但是在starving       
     // 模式下,新来的goroutine不会更新mutexLocked位。配合Lock代码看。
        runtime_Semrelease(&m.sema, true)
    }
}

\

practice/golang/源码分析


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK