41

【Go源码分析】Go scheduler 源码分析

 5 years ago
source link: https://studygolang.com/articles/19567?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

作者:孙伟

1、进程/线程/协程基本概念

  • 一个 进程 可以有多个线程,一般情况下固定2MB内存块来做栈,用来保存当前被调用/挂起的函数内部的变量,CPU在执行调度的时候切换的是线程,如果下一个线程也是当前进程的,就只有线程切换,“很快”就能完成;如果下一个线程不是当前的进程,就需要切换进程,这就得费点时间了。
  • 线程 分为 内核态线程用户态线程 ,用户态线程需要绑定内核态线程,CPU并不能感知用户态线程的存在,它只知道它在运行1个线程,这个线程实际是内核态线程。
  • 用户态线程实际有个名字叫 协程 (co-routine),为了容易区分,我们使用协程指用户态线程,使用线程指内核态线程。
  • 协程跟线程是有区别的,线程由CPU调度是抢占式的,协程由用户态调度是协作式的,一个协程让出CPU后,才执行下一个协程。

协程和线程绑定关系有以下3种:

  • N:1,N个协程绑定1个线程,优点就是协程在用户态线程即完成切换,不会陷入到内核态,这种切换非常的轻量快速。但也有很大的缺点,1个进程的所有协程都绑定在1个线程上,一是某个程序用不了硬件的多核加速能力,二是一旦某协程阻塞,造成线程阻塞,本进程的其他协程都无法执行了,根本就没有并发的能力了。
  • 1:1,1个协程绑定1个线程,这种最容易实现。协程的调度都由CPU完成了,不存在N:1缺点,但有一个缺点是协程的创建、删除和切换的代价都由CPU完成,有点略显昂贵了。
  • M:N,M个协程绑定N个线程,是N:1和1:1类型的结合,克服了以上2种模型的缺点,但实现起来最为复杂。

2、Golang简介

2.1 Goroutine 概念

因为线程切换需要很大的上下文,这种切换消耗了大量CPU时间,所以Go的并行单元并不是传统意义上的线程,而是采用更轻量的协程(goroutine)来处理,大大提高了并行度,因此Go被称为“最并行的语言”。

2.2与其他并发模型的对比

  • Python等解释性语言采用的是多进程并发模型,进程的上下文是最大的,所以切换耗费巨大,同时由于多进程通信只能用socket通讯,或者专门设置共享内存,给编程带来了极大的困扰与不便;
  • C++等语言通常会采用多线程并发模型,相比进程,线程的上下文要小很多,而且多个线程之间本来就是共享内存的,所以编程相比要轻松很多。但是线程的启动和销毁,切换依然要耗费大量CPU时间;于是出现了线程池技术,将线程先储存起来,保持一定的数量,来避免频繁开启/关闭线程的时间消耗,但是这种初级的技术存在一些问题,比如有线程一直被IO阻塞,这样的话这个线程一直占据着坑位,导致后面的任务排不到队,拿不到线程来执行;
  • Go的并发较为复杂,Go采用了更轻量的数据结构来代替线程,这种数据结构相比线程更轻量,他有自己的栈,切换起来更快。然而真正执行并发的还是线程,Go通过调度器将goroutine调度到线程中执行,并适时地释放和创建新的线程,并且当一个正在运行的goroutine进入阻塞(常见场景就是等待IO)时,将其脱离占用的线程,将其他准备好运行的goroutine放在该线程上执行。通过较为复杂的调度手段,使得整个系统获得极高的并行度同时又不耗费大量的CPU资源。

2.3 Goroutine的特点

  • 非阻塞 。Goroutine的引入是为了方便高并发程序的编写。一个Goroutine在进行阻塞操作(比如系统调用)时,会把当前线程中的其他Goroutine移交到其他线程中继续执行,从而避免了整个程序的阻塞。
  • 调度器 。虽然Golang引入了垃圾回收(gc),在执行gc时就要求Goroutine是停止的,但Go通过自己实现调度器,也可以方便的实现该功能。 通过多个Goroutine来实现并发程序,既有异步IO的优势,又具有多线程、多进程编写程序的便利性。
  • 自己维护堆栈 。当然引入Goroutine,也意味着引入了极大的复杂性。一个Goroutine既要包含要执行的代码,又要包含用于执行该代码的栈、PC(PC值=当前程序执行位置+8)和SP指针。堆栈指针需要保证各种模式下程序完成性。

既然每个Goroutine都有自己的栈,那么在创建Goroutine时,就要同时创建对应的栈。Goroutine在执行时,栈空间会不停增长。栈通常是连续增长的,由于每个进程中的各个线程共享虚拟内存空间,当有多个线程时,就需要为每个线程分配不同起始地址的栈。这就需要在分配栈之前先预估每个线程栈的大小。如果线程数量非常多,就很容易栈溢出。

为了解决这个问题,就有了Split Stacks 技术:创建栈时,只分配一块比较小的内存,如果进行某次函数调用导致栈空间不足时,就会在其他地方分配一块新的栈空间。新的空间不需要和老的栈空间连续。函数调用的参数会拷贝到新的栈空间中,接下来的函数执行都在新栈空间中进行。Golang的栈管理方式与此类似,但是为了更高的效率,使用了连续栈( Golang连续栈) 实现方式也是先分配一块固定大小的栈,在栈空间不足时,分配一块更大的栈,并把旧的栈全部拷贝到新栈中。这样避免了Split Stacks方法可能导致的频繁内存分配和释放。

Goroutine的执行是可以被抢占的。如果一个Goroutine一直占用CPU,长时间没有被调度过,就会被runtime抢占掉,把CPU时间交给其他Goroutine。 这个可以通过 debug/goroutine 阻塞实现。

2.4 结构体

  • M:指go中的工作者线程,是真正执行代码的单元;
  • P:是一种调度goroutine的上下文,goroutine依赖于P进行调度,P是真正的并行单元;
  • G:即goroutine,是go语言中的一段代码(以一个函数的形式展现),最小的并行单元;

P必须绑定在M上才能运行,M必须绑定了P才能运行,而一般情况下,最多有MAXPROCS(通常等于CPU数量)个P,但是可能有很多个M,真正运行的只有绑定了M的P,所以P是真正的并行单元。

每个P有一个自己的runnableG队列,可以从里面拿出一个G来运行,同时也有一个全局的runnable G队列,G通过P依附在M上面执行。不单独使用全局的runnable G队列的原因是,分布式的队列有利于减小临界区大小,想一想多个线程同时请求可用的G的时候,如果只有全局的资源,那么这个全局的锁会导致多少线程一直在等待。

但是如果一个正在执行的G进入了阻塞,典型的例子就是等待IO,那么他和它所在的M会在那边等待,而上下文P会传递到其他可用的M上面,这样这个阻塞就不会影响程序的并行度。

G结构体

type g struct {
   // Stack parameters.
   // stack describes the actual stack memory: [stack.lo, stack.hi).
   // stackguard0 is the stack pointer compared in the Go stack growth prologue.
   // It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.
   // stackguard1 is the stack pointer compared in the C stack growth prologue.
   // It is stack.lo+StackGuard on g0 and gsignal stacks.
   // It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).
   stack       stack   // offset known to runtime/cgo //描述了真实的栈内存,包括上下界、
   stackguard0 uintptr // offset known to liblink
   stackguard1 uintptr // offset known to liblink
 
   _panic         *_panic // innermost panic - offset known to liblink
   _defer         *_defer // innermost defer
   m              *m      // current m; offset known to arm liblink  //当前的M
   sched          gobuf    //goroutine切换时,用于保存g的上下文
   syscallsp      uintptr        // if status==Gsyscall, syscallsp = sched.sp to use during gc
   syscallpc      uintptr        // if status==Gsyscall, syscallpc = sched.pc to use during gc
   stktopsp       uintptr        // expected sp at top of stack, to check in traceback
   param          unsafe.Pointer // passed parameter on wakeup 用于传递参数,睡眠时 其他goroutine可以设置param,唤醒时该goroutine可以获取
   atomicstatus   uint32
   stackLock      uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
   goid           int64   //goroutine 的ID
   waitsince      int64  // approx time when the g become blocked  g被阻塞的 大概时间
   waitreason     string // if status==Gwaiting
   schedlink      guintptr
   preempt        bool     // preemption signal, duplicates stackguard0 = stackpreempt
   paniconfault   bool     // panic (instead of crash) on unexpected fault address
   preemptscan    bool     // preempted g does scan for gc
   gcscandone     bool     // g has scanned stack; protected by _Gscan bit in status
   gcscanvalid    bool     // false at start of gc cycle, true if G has not run since last scan; TODO: remove?
   throwsplit     bool     // must not split stack
   raceignore     int8     // ignore race detection events
   sysblocktraced bool     // StartTrace has emitted EvGoInSyscall about this goroutine
   sysexitticks   int64    // cputicks when syscall has returned (for tracing)
   traceseq       uint64   // trace event sequencer
   tracelastp     puintptr // last P emitted an event for this goroutine
   lockedm        muintptr    //G被锁定只能在这个M运行
   sig            uint32
   writebuf       []byte
   sigcode0       uintptr
   sigcode1       uintptr
   sigpc          uintptr
   gopc           uintptr // pc of go statement that created this goroutine
   startpc        uintptr // pc of goroutine function
   racectx        uintptr
   waiting        *sudog         // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
   cgoCtxt        []uintptr      // cgo traceback context
   labels         unsafe.Pointer // profiler labels
   timer          *timer         // cached timer for time.Sleep
   selectDone     uint32         // are we participating in a select and did someone win the race?
 
   // Per-G GC state
 
   // gcAssistBytes is this G's GC assist credit in terms of
   // bytes allocated. If this is positive, then the G has credit
   // to allocate gcAssistBytes bytes without assisting. If this
   // is negative, then the G must correct this by performing
   // scan work. We track this in bytes to make it fast to update
   // and check for debt in the malloc hot path. The assist ratio
   // determines how this corresponds to scan work debt.
   gcAssistBytes int64
}

Gobuf结构体

type gobuf struct {
    sp   uintptr
    pc   uintptr
    g    guintptr
    ctxt unsafe.Pointer
    ret  sys.Uintreg
    lr   uintptr
    bp   uintptr // for GOEXPERIMENT=framepointer
}

其中最主要的当然是sched了,保存了goroutine的上下文。goroutine切换的时候不同于线程有OS来负责这部分数据,而是由一个gobuf对象来保存,这样能够更加轻量级,再来看看gobuf的结构

M结构体

type m struct {
    g0      *g     // 带有调度栈的goroutine
    gsignal       *g         // 处理信号的goroutine
    tls           [6]uintptr // thread-local storage
    mstartfn      func()
    curg          *g       // 当前运行的goroutine
    caughtsig     guintptr
    p             puintptr // 关联p和执行的go代码
    nextp         puintptr
    id            int32
    mallocing     int32 // 状态
    spinning      bool // m是否out of work
    blocked       bool // m是否被阻塞
    inwb          bool // m是否在执行写屏蔽
    printlock     int8
    incgo         bool // m在执行cgo吗
    fastrand      uint32
    ncgocall      uint64      // cgo调用的总数
    ncgo          int32       // 当前cgo调用的数目
    park          note
    alllink       *m // 用于链接allm
    schedlink     muintptr
    mcache        *mcache // 当前m的内存缓存
    lockedg       *g // 锁定g在当前m上执行,而不会切换到其他m
    createstack   [32]uintptr // thread创建的栈
}

结构体M中有两个G是需要关注一下的:

  • 一个是curg,代表结构体M当前绑定的结构体G。
  • 另一个是g0,是带有调度栈的goroutine,这是一个比较特殊的goroutine。普通的goroutine的栈是在堆上分配的可增长的栈,而g0的栈是M对应的线程的栈。所有调度相关的代码,会先切换到该goroutine的栈中再执行。也就是说线程的栈也是用的g实现,而不是使用的OS的。

P结构体

type p struct {
    lock mutex
    id          int32
    status      uint32 // 状态,可以为pidle/prunning/...
    link        puintptr
    schedtick   uint32     // 每调度一次加1
    syscalltick uint32     // 每一次系统调用加1
    sysmontick  sysmontick
    m           muintptr   // 回链到关联的m
    mcache      *mcache
    racectx     uintptr
    goidcache    uint64 // goroutine的ID的缓存
    goidcacheend uint64
    // 可运行的goroutine的队列
    runqhead uint32
    runqtail uint32
    runq     [256]guintptr
    runnext guintptr // 下一个运行的g
    sudogcache []*sudog
    sudogbuf   [128]*sudog
    palloc persistentAlloc // per-P to avoid mutex
    pad [sys.CacheLineSize]byte
}

其中P的状态有Pidle, Prunning, Psyscall, Pgcstop, Pdead;在其内部队列runqhead里面有可运行的goroutine,P优先从内部获取执行的g,这样能够提高效率。

Schedt结构体

type schedt struct {
   goidgen  uint64
    lastpoll uint64
    lock mutex
    midle        muintptr // idle状态的m
    nmidle       int32    // idle状态的m个数
    nmidlelocked int32    // lockde状态的m个数
    mcount       int32    // 创建的m的总数
    maxmcount    int32    // m允许的最大个数
    ngsys uint32 // 系统中goroutine的数目,会自动更新
    pidle      puintptr // idle的p
    npidle     uint32
    nmspinning uint32
    // 全局的可运行的g队列
    runqhead guintptr
    runqtail guintptr
    runqsize int32
    // dead的G的全局缓存
    gflock       mutex
    gfreeStack   *g
    gfreeNoStack *g
    ngfree       int32
    // sudog的缓存中心
    sudoglock  mutex
    sudogcache *sudog
}

大多数需要的信息都已放在了结构体M、G和P中,schedt结构体只是一个壳。可以看到,其中有M的idle队列,P的idle队列,以及一个全局的就绪的G队列。schedt结构体中的Lock是非常必须的,如果M或P等做一些非局部的操作,它们一般需要先锁住调度器。

2.5具体函数

goroutine调度器的代码在/src/runtime/proc.go中,一些比较关键的函数分析如下。

2.5.1 schedule函数

schedule函数在runtime需要进行调度时执行,为当前的P寻找一个可以运行的G并执行它,寻找顺序如下:

  • 1) 调用runqget函数来从P自己的runnable G队列中得到一个可以执行的G;
  • 2) 如果1)失败,则调用findrunnable函数去寻找一个可以执行的G;
  • 3) 如果2)也没有得到可以执行的G,那么结束调度,从上次的现场继续执行。
  • 4) 注意)//偶尔会先检查一次全局可运行队列,以确保公平性。否则,两个goroutine可以完全占用本地runqueue。 通过 schedtick计数 %61来保证

代码如下:

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
   _g_ := getg()
 
   if _g_.m.locks != 0 {
      throw("schedule: holding locks")
   }
 
   if _g_.m.lockedg != 0 {
      stoplockedm()
      execute(_g_.m.lockedg.ptr(), false) // Never returns.
   }
 
   // We should not schedule away from a g that is executing a cgo call,
   // since the cgo call is using the m's g0 stack.
   if _g_.m.incgo {
      throw("schedule: in cgo")
   }
 
top:
   if sched.gcwaiting != 0 {
      gcstopm()
      goto top
   }
   if _g_.m.p.ptr().runSafePointFn != 0 {
      runSafePointFn()
   }
 
   var gp *g
   var inheritTime bool
   if trace.enabled || trace.shutdown {
      gp = traceReader()
      if gp != nil {
         casgstatus(gp, _Gwaiting, _Grunnable)
         traceGoUnpark(gp, 0)
      }
   }
   if gp == nil && gcBlackenEnabled != 0 {
      gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
   }
   if gp == nil {
      // Check the global runnable queue once in a while to ensure fairness.
      // Otherwise two goroutines can completely occupy the local runqueue
      // by constantly respawning each other.
      if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
         lock(&sched.lock)
         gp = globrunqget(_g_.m.p.ptr(), 1)
         unlock(&sched.lock)
      }
   }
   if gp == nil {
      gp, inheritTime = runqget(_g_.m.p.ptr())
      if gp != nil && _g_.m.spinning {
         throw("schedule: spinning with local work")
      }
   }
   if gp == nil {
      gp, inheritTime = findrunnable() // blocks until work is available
   }
 
   // This thread is going to run a goroutine and is not spinning anymore,
   // so if it was marked as spinning we need to reset it now and potentially
   // start a new spinning M.
   if _g_.m.spinning {
      resetspinning()
   }
 
   if gp.lockedm != 0 {
      // Hands off own p to the locked m,
      // then blocks waiting for a new p.
      startlockedm(gp)
      goto top
   }
 
   execute(gp, inheritTime)
}

2.5.2 findrunnable函数

findrunnable函数负责给一个P寻找可以执行的G,它的寻找顺序如下:

  • 1) 调用runqget函数来从P自己的runnable G队列中得到一个可以执行的G;
  • 2) 如果1)失败,调用globrunqget函数从全局runnableG队列中得到一个可以执行的G;
  • 3) 如果2)失败,调用netpoll(非阻塞)函数取一个异步回调的G
  • 4) 如果3)失败,尝试从其他P那里偷取一半数量的G过来;
  • 5) 如果4)失败,再次调用globrunqget函数从全局runnableG队列中得到一个可以执行的G;
  • 6) 如果5)失败,调用netpoll(阻塞)函数取一个异步回调的G;
  • 7) 如果6)仍然没有取到G,那么调用stopm函数停止这个M。

代码如下:

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
   _g_ := getg()
 
   // The conditions here and in handoffp must agree: if
   // findrunnable would return a G to run, handoffp must start
   // an M.
 
top:
   _p_ := _g_.m.p.ptr()
   if sched.gcwaiting != 0 {
      gcstopm()
      goto top
   }
   if _p_.runSafePointFn != 0 {
      runSafePointFn()
   }
   if fingwait && fingwake {
      if gp := wakefing(); gp != nil {
         ready(gp, 0, true)
      }
   }
   if *cgo_yield != nil {
      asmcgocall(*cgo_yield, nil)
   }
 
   // local runq
   if gp, inheritTime := runqget(_p_); gp != nil {
      return gp, inheritTime
   }
 
   // global runq
   if sched.runqsize != 0 {
      lock(&sched.lock)
      gp := globrunqget(_p_, 0)
      unlock(&sched.lock)
      if gp != nil {
         return gp, false
      }
   }
 
   // Poll network.
   // This netpoll is only an optimization before we resort to stealing.
   // We can safely skip it if there are no waiters or a thread is blocked
   // in netpoll already. If there is any kind of logical race with that
   // blocked thread (e.g. it has already returned from netpoll, but does
   // not set lastpoll yet), this thread will do blocking netpoll below
   // anyway.
   if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
      if gp := netpoll(false); gp != nil { // non-blocking
         // netpoll returns list of goroutines linked by schedlink.
         injectglist(gp.schedlink.ptr())
         casgstatus(gp, _Gwaiting, _Grunnable)
         if trace.enabled {
            traceGoUnpark(gp, 0)
         }
         return gp, false
      }
   }
 
   // Steal work from other P's.
   procs := uint32(gomaxprocs)
   if atomic.Load(&sched.npidle) == procs-1 {
      // Either GOMAXPROCS=1 or everybody, except for us, is idle already.
      // New work can appear from returning syscall/cgocall, network or timers.
      // Neither of that submits to local run queues, so no point in stealing.
      goto stop
   }
   // If number of spinning M's >= number of busy P's, block.
   // This is necessary to prevent excessive CPU consumption
   // when GOMAXPROCS>>1 but the program parallelism is low.
   if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
      goto stop
   }
   if !_g_.m.spinning {
      _g_.m.spinning = true
      atomic.Xadd(&sched.nmspinning, 1)
   }
   for i := 0; i < 4; i++ {
      for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
         if sched.gcwaiting != 0 {
            goto top
         }
         stealRunNextG := i > 2 // first look for ready queues with more than 1 g
         if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
            return gp, false
         }
      }
   }
 
stop:
 
   // We have nothing to do. If we're in the GC mark phase, can
   // safely scan and blacken objects, and have work to do, run
   // idle-time marking rather than give up the P.
   if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) {
      _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
      gp := _p_.gcBgMarkWorker.ptr()
      casgstatus(gp, _Gwaiting, _Grunnable)
      if trace.enabled {
         traceGoUnpark(gp, 0)
      }
      return gp, false
   }
 
   // Before we drop our P, make a snapshot of the allp slice,
   // which can change underfoot once we no longer block
   // safe-points. We don't need to snapshot the contents because
   // everything up to cap(allp) is immutable.
   allpSnapshot := allp
 
   // return P and block
   lock(&sched.lock)
   if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
      unlock(&sched.lock)
      goto top
   }
   if sched.runqsize != 0 {
      gp := globrunqget(_p_, 0)
      unlock(&sched.lock)
      return gp, false
   }
   if releasep() != _p_ {
      throw("findrunnable: wrong p")
   }
   pidleput(_p_)
   unlock(&sched.lock)
 
   // Delicate dance: thread transitions from spinning to non-spinning state,
   // potentially concurrently with submission of new goroutines. We must
   // drop nmspinning first and then check all per-P queues again (with
   // #StoreLoad memory barrier in between). If we do it the other way around,
   // another thread can submit a goroutine after we've checked all run queues
   // but before we drop nmspinning; as the result nobody will unpark a thread
   // to run the goroutine.
   // If we discover new work below, we need to restore m.spinning as a signal
   // for resetspinning to unpark a new worker thread (because there can be more
   // than one starving goroutine). However, if after discovering new work
   // we also observe no idle Ps, it is OK to just park the current thread:
   // the system is fully loaded so no spinning threads are required.
   // Also see "Worker thread parking/unparking" comment at the top of the file.
   wasSpinning := _g_.m.spinning
   if _g_.m.spinning {
      _g_.m.spinning = false
      if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
         throw("findrunnable: negative nmspinning")
      }
   }
 
   // check all runqueues once again
   for _, _p_ := range allpSnapshot {
      if !runqempty(_p_) {
         lock(&sched.lock)
         _p_ = pidleget()
         unlock(&sched.lock)
         if _p_ != nil {
            acquirep(_p_)
            if wasSpinning {
               _g_.m.spinning = true
               atomic.Xadd(&sched.nmspinning, 1)
            }
            goto top
         }
         break
      }
   }
 
   // Check for idle-priority GC work again.
   if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) {
      lock(&sched.lock)
      _p_ = pidleget()
      if _p_ != nil && _p_.gcBgMarkWorker == 0 {
         pidleput(_p_)
         _p_ = nil
      }
      unlock(&sched.lock)
      if _p_ != nil {
         acquirep(_p_)
         if wasSpinning {
            _g_.m.spinning = true
            atomic.Xadd(&sched.nmspinning, 1)
         }
         // Go back to idle GC check.
         goto stop
      }
   }
 
   // poll network
   if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
      if _g_.m.p != 0 {
         throw("findrunnable: netpoll with p")
      }
      if _g_.m.spinning {
         throw("findrunnable: netpoll with spinning")
      }
      gp := netpoll(true) // block until new work is available
      atomic.Store64(&sched.lastpoll, uint64(nanotime()))
      if gp != nil {
         lock(&sched.lock)
         _p_ = pidleget()
         unlock(&sched.lock)
         if _p_ != nil {
            acquirep(_p_)
            injectglist(gp.schedlink.ptr())
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
               traceGoUnpark(gp, 0)
            }
            return gp, false
         }
         injectglist(gp)
      }
   }
   stopm()
   goto top
}

2.5.3 newproc函数

newproc函数负责创建一个可以运行的G并将其放在当前的P的runnable G队列中,它是类似”go func() { … }”语句真正被编译器翻译后的调用,核心代码在newproc1函数。这个函数执行顺序如下:

  • 1) 获得当前的G所在的 P,然后从free G队列中取出一个G;
  • 2) 如果1)取到则对这个G进行参数配置,否则新建一个G;
  • 3) 将G加入P的runnable G队列。

代码如下:

// Go1.10.8版本默认stack大小为2KB

_StackMin = 2048
// 创建一个g对象,然后放到g队列
// 等待被执行

// Create a new g running fn with narg bytes of arguments starting
// at argp. callerpc is the address of the go statement that created
// this. The new g is put on the queue of g's waiting to run.
func newproc1(fn *funcval, argp *uint8, narg int32, callerpc uintptr) {
   _g_ := getg()
 
   if fn == nil {
      _g_.m.throwing = -1 // do not dump full stacks
      throw("go of nil func value")
   }
   _g_.m.locks++ // disable preemption because it can be holding p in a local var
   siz := narg
   siz = (siz + 7) &^ 7
 
   // We could allocate a larger initial stack if necessary.
   // Not worth it: this is almost always an error.
   // 4*sizeof(uintreg): extra space added below
   // sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).
   if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
      throw("newproc: function arguments too large for new goroutine")
   }
 
   _p_ := _g_.m.p.ptr()
   newg := gfget(_p_)
   if newg == nil {
      newg = malg(_StackMin)
      casgstatus(newg, _Gidle, _Gdead)
      allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
   }
   if newg.stack.hi == 0 {
      throw("newproc1: newg missing stack")
   }
 
   if readgstatus(newg) != _Gdead {
      throw("newproc1: new g is not Gdead")
   }
 
   totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
   totalSize += -totalSize & (sys.SpAlign - 1)                  // align to spAlign
   sp := newg.stack.hi - totalSize
   spArg := sp
   if usesLR {
      // caller's LR
      *(*uintptr)(unsafe.Pointer(sp)) = 0
      prepGoExitFrame(sp)
      spArg += sys.MinFrameSize
   }
   if narg > 0 {
      memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
      // This is a stack-to-stack copy. If write barriers
      // are enabled and the source stack is grey (the
      // destination is always black), then perform a
      // barrier copy. We do this *after* the memmove
      // because the destination stack may have garbage on
      // it.
      if writeBarrier.needed && !_g_.m.curg.gcscandone {
         f := findfunc(fn.fn)
         stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
         // We're in the prologue, so it's always stack map index 0.
         bv := stackmapdata(stkmap, 0)
         bulkBarrierBitmap(spArg, spArg, uintptr(narg), 0, bv.bytedata)
      }
   }
 
   memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
   newg.sched.sp = sp
   newg.stktopsp = sp
   newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
   newg.sched.g = guintptr(unsafe.Pointer(newg))
   gostartcallfn(&newg.sched, fn)
   newg.gopc = callerpc
   newg.startpc = fn.fn
   if _g_.m.curg != nil {
      newg.labels = _g_.m.curg.labels
   }
   if isSystemGoroutine(newg) {
      atomic.Xadd(&sched.ngsys, +1)
   }
   newg.gcscanvalid = false
   casgstatus(newg, _Gdead, _Grunnable)
 
   if _p_.goidcache == _p_.goidcacheend {
      // Sched.goidgen is the last allocated id,
      // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
      // At startup sched.goidgen=0, so main goroutine receives goid=1.
      _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
      _p_.goidcache -= _GoidCacheBatch - 1
      _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
   }
   newg.goid = int64(_p_.goidcache)
   _p_.goidcache++
   if raceenabled {
      newg.racectx = racegostart(callerpc)
   }
   if trace.enabled {
      traceGoCreate(newg, newg.startpc)
   }
   runqput(_p_, newg, true)
 
   if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
      wakep()
   }
   _g_.m.locks--
   if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
      _g_.stackguard0 = stackPreempt
   }
}

2.5.4 goexit0函数

goexit函数是当G退出时调用的。这个函数对G进行一些设置后,将它放入free G列表中,供以后复用,之后调用schedule函数调度。

// goexit continuation on g0.
func goexit0(gp *g) {
   _g_ := getg()
 
   //设置g的 status从 _Grunning变为 _Gdead
   casgstatus(gp, _Grunning, _Gdead)
   if isSystemGoroutine(gp) {
      atomic.Xadd(&sched.ngsys, -1)
   }
   //对该g 进行释放设置 基本为nil /0
   gp.m = nil
   locked := gp.lockedm != 0
   gp.lockedm = 0
   _g_.m.lockedg = 0
   gp.paniconfault = false
   gp._defer = nil // should be true already but just in case.
   gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
   gp.writebuf = nil
   gp.waitreason = ""
   gp.param = nil
   gp.labels = nil
   gp.timer = nil
 
   if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
      // Flush assist credit to the global pool. This gives
      // better information to pacing if the application is
      // rapidly creating an exiting goroutines.
      scanCredit := int64(gcController.assistWorkPerByte * float64(gp.gcAssistBytes))
      atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)
      gp.gcAssistBytes = 0
   }
 
   // Note that gp's stack scan is now "valid" because it has no
   // stack.
   gp.gcscanvalid = true
   dropg()
 
   if _g_.m.lockedInt != 0 {
      print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")
      throw("internal lockOSThread error")
   }
   _g_.m.lockedExt = 0
   //把这个g 推到free G 列表
   gfput(_g_.m.p.ptr(), gp)
   if locked {
      // The goroutine may have locked this thread because
      // it put it in an unusual kernel state. Kill it
      // rather than returning it to the thread pool.
 
      // Return to mstart, which will release the P and exit
      // the thread.
      if GOOS != "plan9" { // See golang.org/issue/22227.
         gogo(&_g_.m.g0.sched)
      }
   }
   schedule()
}

2.5.5 handoffp函数

handoffp函数将P从系统调用或阻塞的M中传递出去,如果P还有runnable G队列,那么新开一个M,调用startm函数,新开的M不空旋。

// Hands off P from syscall or locked M.
// Always runs without a P, so write barriers are not allowed.
//go:nowritebarrierrec
func handoffp(_p_ *p) {
   // handoffp must start an M in any situation where
   // findrunnable would return a G to run on _p_.
 
 
   //如果这个P的队列不为空或调度内的size不为空 那么 进行startm 且不空旋
   if !runqempty(_p_) || sched.runqsize != 0 {
      startm(_p_, false)
      return
   }
   //如果正在进行GC处理  同上
   if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
      startm(_p_, false)
      return
   }
   //如果没活可做了,检查下有没有 空闲/自旋的 M
   //否则 不需要我们做自旋
   if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
      startm(_p_, true)
      return
   }
   //调度上锁  将这个P 摘除走
   lock(&sched.lock)
   if sched.gcwaiting != 0 {
      _p_.status = _Pgcstop
      sched.stopwait--
      if sched.stopwait == 0 {
         notewakeup(&sched.stopnote)
      }
      unlock(&sched.lock)
      return
   }
   if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
      sched.safePointFn(_p_)
      sched.safePointWait--
      if sched.safePointWait == 0 {
         notewakeup(&sched.safePointNote)
      }
   }
   if sched.runqsize != 0 {
      unlock(&sched.lock)
      startm(_p_, false)
      return
   }
   // If this is the last running P and nobody is polling network,
   // need to wakeup another M to poll network.
   if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
      unlock(&sched.lock)
      startm(_p_, false)
      return
   }
   pidleput(_p_)
   unlock(&sched.lock)
}

2.5.6 startm函数

startm函数调度一个M或者必要时创建一个M来运行指定的P。

// Schedules some M to run the p (creates an M if necessary).
// If p==nil, tries to get an idle P, if no idle P's does nothing.
// May run with m.p==nil, so write barriers are not allowed.
// If spinning is set, the caller has incremented nmspinning and startm will
// either decrement nmspinning or set m.spinning in the newly started M.
//go:nowritebarrierrec
func startm(_p_ *p, spinning bool) {
   //加锁
   lock(&sched.lock)
   if _p_ == nil {
       
      _p_ = pidleget()
      if _p_ == nil {
         unlock(&sched.lock)
         if spinning {
            // The caller incremented nmspinning, but there are no idle Ps,
            // so it's okay to just undo the increment and give up.
            if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
               throw("startm: negative nmspinning")
            }
         }
         return
      }
   }
    
   mp := mget()
   unlock(&sched.lock)
   if mp == nil {
      var fn func()
      if spinning {
         // The caller incremented nmspinning, so set m.spinning in the new M.
         fn = mspinning
      }
      newm(fn, _p_)
      return
   }
    
   if mp.spinning {
      throw("startm: m is spinning")
   }
   if mp.nextp != 0 {
      throw("startm: m has p")
   }
   if spinning && !runqempty(_p_) {
      throw("startm: p has runnable gs")
   }
   // The caller incremented nmspinning, so set m.spinning in the new M.
   mp.spinning = spinning
   mp.nextp.set(_p_)
   notewakeup(&mp.park)
}

2.5.7 sysmon函数

sysmon函数是Go runtime启动时创建的,负责监控所有goroutine的状态,判断是否需要GC,进行netpoll等操作。sysmon函数中会调用retake函数进行抢占式调度。

// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
   lock(&sched.lock)
   sched.nmsys++
   checkdead()
   unlock(&sched.lock)
 
   // If a heap span goes unused for 5 minutes after a garbage collection,
   // we hand it back to the operating system.
   scavengelimit := int64(5 * 60 * 1e9)
 
   if debug.scavenge > 0 {
      // Scavenge-a-lot for testing.
      forcegcperiod = 10 * 1e6
      scavengelimit = 20 * 1e6
   }
 
   lastscavenge := nanotime()
   nscavenge := 0
 
   lasttrace := int64(0)
   idle := 0 // how many cycles in succession we had not wokeup somebody
   delay := uint32(0)
   for {
      if idle == 0 { // start with 20us sleep...
         delay = 20
      } else if idle > 50 { // start doubling the sleep after 1ms...
         delay *= 2
      }
      if delay > 10*1000 { // up to 10ms
         delay = 10 * 1000
      }
      usleep(delay)
      if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
         lock(&sched.lock)
         if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
            atomic.Store(&sched.sysmonwait, 1)
            unlock(&sched.lock)
            // Make wake-up period small enough
            // for the sampling to be correct.
            maxsleep := forcegcperiod / 2
            if scavengelimit < forcegcperiod {
               maxsleep = scavengelimit / 2
            }
            shouldRelax := true
            if osRelaxMinNS > 0 {
               next := timeSleepUntil()
               now := nanotime()
               if next-now < osRelaxMinNS {
                  shouldRelax = false
               }
            }
            if shouldRelax {
               osRelax(true)
            }
            notetsleep(&sched.sysmonnote, maxsleep)
            if shouldRelax {
               osRelax(false)
            }
            lock(&sched.lock)
            atomic.Store(&sched.sysmonwait, 0)
            noteclear(&sched.sysmonnote)
            idle = 0
            delay = 20
         }
         unlock(&sched.lock)
      }
      // trigger libc interceptors if needed
      if *cgo_yield != nil {
         asmcgocall(*cgo_yield, nil)
      }
      // poll network if not polled for more than 10ms
      lastpoll := int64(atomic.Load64(&sched.lastpoll))
      now := nanotime()
      if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
         atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
         gp := netpoll(false) // non-blocking - returns list of goroutines
         if gp != nil {
            // Need to decrement number of idle locked M's
            // (pretending that one more is running) before injectglist.
            // Otherwise it can lead to the following situation:
            // injectglist grabs all P's but before it starts M's to run the P's,
            // another M returns from syscall, finishes running its G,
            // observes that there is no work to do and no other running M's
            // and reports deadlock.
            incidlelocked(-1)
            injectglist(gp)
            incidlelocked(1)
         }
      }
      // retake P's blocked in syscalls
      // and preempt long running G's
      if retake(now) != 0 {
         idle = 0
      } else {
         idle++
      }
      // check if we need to force a GC
      if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
         lock(&forcegc.lock)
         forcegc.idle = 0
         forcegc.g.schedlink = 0
         injectglist(forcegc.g)
         unlock(&forcegc.lock)
      }
      // scavenge heap once in a while
      if lastscavenge+scavengelimit/2 < now {
         mheap_.scavenge(int32(nscavenge), uint64(now), uint64(scavengelimit))
         lastscavenge = now
         nscavenge++
      }
      if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
         lasttrace = now
         schedtrace(debug.scheddetail > 0)
      }
   }
}

2.5.8 retake函数

枚举所有的P 如果P在系统调用中(_Psyscall), 且经过了一次sysmon循环(20us~10ms), 则抢占这个P, 调用handoffp解除M和P之间的关联, 如果P在运行中(_Prunning), 且经过了一次sysmon循环并且G运行时间超过forcePreemptNS(10ms), 则抢占这个P

并设置g.preempt = true,g.stackguard0 = stackPreempt。

为什么设置了stackguard就可以实现抢占?

因为这个值用于检查当前栈空间是否足够, go函数的开头会比对这个值判断是否需要扩张栈。

newstack函数判断g.stackguard0等于stackPreempt, 就知道这是抢占触发的, 这时会再检查一遍是否要抢占。

抢占机制保证了不会有一个G长时间的运行导致其他G无法运行的情况发生。

func retake(now int64) uint32 {
   n := 0
   // Prevent allp slice changes. This lock will be completely
   // uncontended unless we're already stopping the world.
   lock(&allpLock)
   // We can't use a range loop over allp because we may
   // temporarily drop the allpLock. Hence, we need to re-fetch
   // allp each time around the loop.
   for i := 0; i < len(allp); i++ {
      _p_ := allp[i]
      if _p_ == nil {
         // This can happen if procresize has grown
         // allp but not yet created new Ps.
         continue
      }
      pd := &_p_.sysmontick
      s := _p_.status
      if s == _Psyscall {
         // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
         t := int64(_p_.syscalltick)
         if int64(pd.syscalltick) != t {
            pd.syscalltick = uint32(t)
            pd.syscallwhen = now
            continue
         }
         // On the one hand we don't want to retake Ps if there is no other work to do,
         // but on the other hand we want to retake them eventually
         // because they can prevent the sysmon thread from deep sleep.
         if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
            continue
         }
         // Drop allpLock so we can take sched.lock.
         unlock(&allpLock)
         // Need to decrement number of idle locked M's
         // (pretending that one more is running) before the CAS.
         // Otherwise the M from which we retake can exit the syscall,
         // increment nmidle and report deadlock.
         incidlelocked(-1)
         if atomic.Cas(&_p_.status, s, _Pidle) {
            if trace.enabled {
               traceGoSysBlock(_p_)
               traceProcStop(_p_)
            }
            n++
            _p_.syscalltick++
            handoffp(_p_)
         }
         incidlelocked(1)
         lock(&allpLock)
      } else if s == _Prunning {
         // Preempt G if it's running for too long.
         t := int64(_p_.schedtick)
         if int64(pd.schedtick) != t {
            pd.schedtick = uint32(t)
            pd.schedwhen = now
            continue
         }
         if pd.schedwhen+forcePreemptNS > now {
            continue
         }
         preemptone(_p_)
      }
   }
   unlock(&allpLock)
   return uint32(n)
}

3、调度器总结

3.1 调度器的两大思想

  • 复用线程:协程本身就是运行在一组线程之上,不需要频繁的创建、销毁线程,而是对线程的复用。在调度器中复用线程还有2个体现:1)work stealing,当本线程无可运行的G时,尝试从其他线程绑定的P偷取G,而不是销毁线程。2)handoff,当本线程因为G进行系统调用阻塞时,线程释放绑定的P,把P转移给其他空闲的线程执行。
  • 利用并行:GOMAXPROCS设置P的数量,当GOMAXPROCS大于1时,就最多有GOMAXPROCS个线程处于运行状态,这些线程可能分布在多个CPU核上同时运行,使得并发利用并行。另外,GOMAXPROCS也限制了并发的程度,比如GOMAXPROCS = 核数/2,则最多利用了一半的CPU核进行并行。

3.2调度器的两小策略:

  • 抢占:在coroutine中要等待一个协程主动让出CPU才执行下一个协程,在Go中,一个goroutine最多占用CPU 10ms,防止其他goroutine被饿死,这就是goroutine不同于coroutine的一个地方。
  • 全局G队列:在新的调度器中依然有全局G队列,但功能已经被弱化了,当M执行work stealing从其他P偷不到G时,它可以从全局G队列获取G。

4、参考资料


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK