8

go sema 源码分析

 4 years ago
source link: https://studygolang.com/articles/25628
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

sema.go semacquire1和 semrelease1 函数是 sync.mutex 用来阻塞 g 和释放 g 的实现,这两个方法也实现了类似信号量的功能,并且是针对 goroutine 的信号量,由于还没看 go 调度相关的代码,sema 里跟调度相关的逻辑也不做仔细说明和代码注解

semacquire1 函数

大致流程:获取 sudog 和 semaRoot ,其中 sudog 是 g 放在等待队列里的包装对象,sudog 里会有 g 的信息和一些其他的参数, semaRoot 则是队列结构体,内部是堆树,把和当前 g 关联的 sudog 放到 semaRoot 里,然后把 g 的状态改为等待,调用调度器执行别的 g,此时当前 g 就停止执行了。一直到被调度器重新调度执行,会首先释放 sudog 然后再去执行别的代码逻辑

semaRoot

// 一个 semaRoot 持有不同地址的 sudog 的平衡树
// 每一个 sudog 可能反过来指向等待在同一个地址上的 sudog 的列表
// 对同一个地址上的 sudog 的内联列表的操作的时间复杂度都是O(1),扫描 semaRoot 的顶部列表是 O(log n)
// n 是 hash 到并且阻塞在同一个 semaRoot 上的不同地址的 goroutines 的总数
type semaRoot struct {
	lock  mutex
	treap *sudog // root of balanced tree of unique waiters. 不同 waiter 的平衡树
	nwait uint32 // Number of waiters. Read w/o the lock. waiter 的数量
}

复制代码
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
	gp := getg()
	if gp != gp.m.curg {
		throw("semacquire not on the G stack")
	}

	// Easy case.
	if cansemacquire(addr) {
		return
	}

	// Harder case:
	//	increment waiter count
	//	try cansemacquire one more time, return if succeeded
	//	enqueue itself as a waiter
	//	sleep
	//	(waiter descriptor is dequeued by signaler)
	// 获取一个 sudog
	s := acquireSudog()
	// 获取一个 semaRoot
	root := semroot(addr)
	t0 := int64(0)
	s.releasetime = 0
	s.acquiretime = 0
	s.ticket = 0
	// 一些性能采集的参数 应该是
	if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
		t0 = cputicks()
		s.releasetime = -1
	}
	if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
		if t0 == 0 {
			t0 = cputicks()
		}
		s.acquiretime = t0
	}
	for {
		// 锁定在 semaRoot 上
		lock(&root.lock)
		// Add ourselves to nwait to disable "easy case" in semrelease.
		// nwait 加一
		atomic.Xadd(&root.nwait, 1)
		// Check cansemacquire to avoid missed wakeup.
		if cansemacquire(addr) {
			atomic.Xadd(&root.nwait, -1)
			unlock(&root.lock)
			break
		}
		// Any semrelease after the cansemacquire knows we're waiting
		// (we set nwait above), so go to sleep.
		// 加到 semaRoot treap 上
		root.queue(addr, s, lifo)
		// 解锁 semaRoot ,并且把当前 g 的状态改为等待,然后让当前的 m 调用其他的 g 执行,当前 g 相当于等待
		goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
		if s.ticket != 0 || cansemacquire(addr) {
			break
		}
	}
	if s.releasetime > 0 {
		blockevent(s.releasetime-t0, 3+skipframes)
	}
	// 释放 sudog
	releaseSudog(s)
}		
复制代码

关键的 goparkunlock 函数,调用的是 gopark函数

// 把当前的 goroutine 改为等待状态,并且调用 unlockf 函数,如果函数返回 flase,则当前 g 被恢复
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
	if reason != waitReasonSleep {
		checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy 两个 goroutine 使调度器忙时,有可能会超时
	}
	mp := acquirem()
	gp := mp.curg
	status := readgstatus(gp)
	if status != _Grunning && status != _Gscanrunning {
		throw("gopark: bad g status")
	}
	mp.waitlock = lock
	// 记住: unlockf 永远返回 true
	mp.waitunlockf = unlockf
	gp.waitreason = reason
	mp.waittraceev = traceEv
	mp.waittraceskip = traceskip
	releasem(mp)
	// can't do anything that might move the G between Ms here.
	mcall(park_m)
}
复制代码

macll 会先切换成 g0,并把当前 g 作为参数调用 park_m

// 在 g0 上继续 park
func park_m(gp *g) {
	// 当前 g 是g0
	_g_ := getg()

	if trace.enabled {
		traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
	}

	// 设置参数 g 的状态
	casgstatus(gp, _Grunning, _Gwaiting)
	// 删除参数 g 和 m 的关系
	dropg()

	if fn := _g_.m.waitunlockf; fn != nil {
		// 执行解锁操作, 假如是从 sema 过来的,fn 永远返回 true
		ok := fn(gp, _g_.m.waitlock)
		_g_.m.waitunlockf = nil
		_g_.m.waitlock = nil
		if !ok {
			if trace.enabled {
				traceGoUnpark(gp, 2)
			}
			casgstatus(gp, _Gwaiting, _Grunnable)
			execute(gp, true) // Schedule it back, never returns.
		}
	}
	// 调度其他的 g 执行
	schedule()
}
复制代码

park_m 执行之后,调度器就调度并执行其他的 g, 之前的 gp 也就等待了

semrelease1 函数

大致流程: 设置 addr 信号,从队列里取 sudog s,把 s 对应的 g 变为可执行状态,并且放在 p 的本地队列下一个执行的位置。如果参数 handoff 为 true,并且当前 m.locks == 0,就把当前的 g 放到 p 本地队列的队尾,调用调度器,因为s.g 被放到 p 本地队列的下一个执行位置,所以调度器此刻执行的就是 s.g

func semrelease1(addr *uint32, handoff bool, skipframes int) {
	root := semroot(addr)
	atomic.Xadd(addr, 1)

	// Easy case: no waiters?
	// This check must happen after the xadd, to avoid a missed wakeup
	// (see loop in semacquire).
	// 没有等待的 sudog
	if atomic.Load(&root.nwait) == 0 {
		return
	}

	// Harder case: search for a waiter and wake it.
	lock(&root.lock)
	if atomic.Load(&root.nwait) == 0 {
		// The count is already consumed by another goroutine,
		// so no need to wake up another goroutine.
		unlock(&root.lock)
		return
	}
	// 从队列里取出来 sudog ,此时 ticket == 0
	s, t0 := root.dequeue(addr)
	if s != nil {
		atomic.Xadd(&root.nwait, -1)
	}
	unlock(&root.lock)
	if s != nil { // May be slow or even yield, so unlock first
		acquiretime := s.acquiretime
		if acquiretime != 0 {
			mutexevent(t0-acquiretime, 3+skipframes)
		}
		if s.ticket != 0 {
			throw("corrupted semaphore ticket")
		}
		if handoff && cansemacquire(addr) {
			s.ticket = 1
		}
		// 把 sudog 对应的 g 改为待执行状态,并且放到 p 本地队列的下一个执行
		readyWithTime(s, 5+skipframes)
		if s.ticket == 1 && getg().m.locks == 0 {
			// Direct G handoff
			// readyWithTime has added the waiter G as runnext in the
			// current P; we now call the scheduler so that we start running
			// the waiter G immediately.
			// Note that waiter inherits our time slice: this is desirable
			// to avoid having a highly contended semaphore hog the P
			// indefinitely. goyield is like Gosched, but it does not emit a
			// GoSched trace event and, more importantly, puts the current G
			// on the local runq instead of the global one.
			// We only do this in the starving regime (handoff=true), as in
			// the non-starving case it is possible for a different waiter
			// to acquire the semaphore while we are yielding/scheduling,
			// and this would be wasteful. We wait instead to enter starving
			// regime, and then we start to do direct handoffs of ticket and
			// P.
			// See issue 33747 for discussion.
			// 调用调度器立即执行 G
			// 等待的 g 继承时间片,避免无限制的争夺信号量
			// 把当前 g 放到 p 本地队列的队尾,启动调度器,因为 s.g 在本地队列的下一个,所以调度器立马执行 s.g
			goyield()
		}
	}
}
复制代码

readyWithTime 把 sudog 对应的 g 唤醒,并且放到 p 本地队列的下一个执行位置

readWithTime 会调用 systemstack , systemstack 会切换到当前 os 线程的堆栈执行 read

// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
	if trace.enabled {
		traceGoUnpark(gp, traceskip)
	}

	status := readgstatus(gp)

	// Mark runnable.
	// 此刻的— _g_ 不是 gp
	_g_ := getg()
	mp := acquirem() // disable preemption because it can be holding p in a local var
	if status&^_Gscan != _Gwaiting {
		dumpgstatus(gp)
		throw("bad g->status in ready")
	}

	// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
	casgstatus(gp, _Gwaiting, _Grunnable)
	// 把 g 放到 p 本地队列,next 为 true, 就放在下一个执行, next 为 false,放在队尾
	runqput(_g_.m.p.ptr(), gp, next)
	// TODO 这个看了调度代码再解释
	if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
		wakep()
	}
	releasem(mp)
}
复制代码

goyield 调用 mcall 执行 goyield_m, goyield_m 会把当前的 g 放到 p 本地对象的队尾, 然后执行调度器

func goyield_m(gp *g) {
	pp := gp.m.p.ptr()
	casgstatus(gp, _Grunning, _Grunnable)
	dropg()
	runqput(pp, gp, false)
	schedule()
}
复制代码

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK