24

Golang的sync.WaitGroup 实现逻辑和源码解析

 4 years ago
source link: https://studygolang.com/articles/26896
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

方便的并发,是Golang的一大特色优势,而使用并发,对sync包的WaitGroup不会陌生。WaitGroup主要用来做Golang并发实例即Goroutine的等待,当使用go启动多个并发程序,通过waitgroup可以等待所有go程序结束后再执行后面的代码逻辑,比如:

func Main() {
    wg := sync.WaitGroup{}
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            time.Sleep(10 * time.Second)
        }()

    }
    wg.Wait() // 等待在此,等所有go func里都执行了Done()才会退出
}
复制代码

实现原理

WaitGroup对外提供三个方法,Add(int),Done()和Wait(), 其中Done()是调用了Add(-1),一般使用方法是,先统一Add,在goroutine里并发的Done,然后Wait。

WaitGroup主要维护了2个计数器,一个是请求计数器 v,一个是等待计数器 w,二者组成一个64bit的值,请求计数器占高32bit,等待计数器占低32bit。

那么等待计数器拿来干嘛?是因为同一个实例的Wait()方法支持多处调用,每一次Wait()方法执行,等待计数器 w 就会加1,而当请求计数器v为0触发Wait()时,要根据w的数量发送w份的信号量,正确的触发所有的Wait(),这虽然不是常用的一个特性,但是在一些特殊场合是有用处的(比如多个并发都依赖于WaitGroup的实例的结束信号来进行下一个action),演示代码如下:

func main() {
  wg := sync.WaitGroup{}
  for i := 0; i < 10; i++ {
    wg.Add(1)
    go func() {
      defer wg.Done()
​    }()
  }
  time.Sleep(2 * time.Second)
  for j := 0; j < 3; j++ {
    go func(i int) {
      // 3个地方调用Wait(),通过等待j计时器,每个Wati都会被hu唤醒
      wg.Wait()
      fmt.Println("wait done now ", i)
    }(j)
  }
  time.Sleep(10 * time.Second)
  return
}
/*
输出如下,数字出现的顺序随机
wait done now  1
wait done now  0
wait done now  2
*/
复制代码

同时,WaitGroup里还对使用逻辑进行了严格的检查,比如Wait()一旦开始不能Add().

下面是带注释的代码,去掉了不影响代码逻辑的trace部分:

func (wg *WaitGroup) Add(delta int) {
    statep := wg.state()
    // 更新statep,statep将在wait和add中通过原子操作一起使用
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32)
    w := uint32(state)
        if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
        // wait不等于0说明已经执行了Wait,此时不容许Add
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // 正常情况,Add会让v增加,Done会让v减少,如果没有全部Done掉,此处v总是会大于0的,直到v为0才往下走
    // 而w代表是有多少个goruntine在等待done的信号,wait中通过compareAndSwap对这个w进行加1
     if v > 0 || w == 0 {
        return
    }
    // This goroutine has set counter to 0 when waiters > 0.
    // Now there can't be concurrent mutations of state:
    // - Adds must not happen concurrently with Wait,
    // - Wait does not increment waiters if it sees counter == 0.
    // Still do a cheap sanity check to detect WaitGroup misuse.
    // 当v为0(Done掉了所有)或者w不为0(已经开始等待)才会到这里,但是在这个过程中又有一次Add,导致statep变化,panic
    if *statep != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // Reset waiters count to 0.
    // 将statep清0,在Wait中通过这个值来保护信号量发出后还对这个Waitgroup进行操作
    *statep = 0
    // 将信号量发出,触发wait结束
    for ; w != 0; w-- {
        runtime_Semrelease(&wg.sema, false)
    }
}

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
    statep := wg.state()
        for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        w := uint32(state)
        if v == 0 {
            // Counter is 0, no need to wait.
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
        // Increment waiters count.
        // 如果statep和state相等,则增加等待计数,同时进入if等待信号量
        // 此处做CAS,主要是防止多个goroutine里进行Wait()操作,每有一个goroutine进行了wait,等待计数就加1
        // 如果这里不相等,说明statep,在 从读出来 到 CAS比较 的这个时间区间内,被别的goroutine改写了,那么不进入if,回去再读一次,这样写避免用锁,更高效些
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            if race.Enabled && w == 0 {
                // Wait must be synchronized with the first Add.
                // Need to model this is as a write to race with the read in Add.
                // As a consequence, can do the write only for the first waiter,
                // otherwise concurrent Waits will race with each other.
                race.Write(unsafe.Pointer(&wg.sema))
            }
            // 等待信号量
            runtime_Semacquire(&wg.sema)
            // 信号量来了,代表所有Add都已经Done
            if *statep != 0 {
                // 走到这里,说明在所有Add都已经Done后,触发信号量后,又被执行了Add
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            return
        }
    }
}
复制代码

Recommend

  • 55

    针对Golang1.9的sync.WaitGroup进行分析,与Golang1.10基本一样除了将panic改为了throw之外其他的都一样。源代码位置:sync\waitgroup.go。结构体typeWaitGroupstruct{noCopynoCopy//noCopy可以嵌入到结构中,在第一次使用后不可复制,使用govet作为检测使用//位值:...

  • 36
    • yangxikun.github.io 4 years ago
    • Cache

    golang sync.WaitGroup 底层实现

    数据结构 // A WaitGroup waits for a collection of goroutines to finish. // The main goroutine calls Add to set the number of // goroutines to wait for. Then each of the goroutines // runs and calls Done when...

  • 44
    • www.80shihua.com 3 years ago
    • Cache

    golang之sync包之WaitGroup

    sync包 sync是synchronization同步这个词的缩写,所以也会叫做同步包。这里提供了基本同步的操作,比如互斥锁等等。这里除了Once和WaitGroup类型之外,大多数类型都是供低级库例程使用的。更高级别的同步最好通过channel通道和co...

  • 21
    • studygolang.com 3 years ago
    • Cache

    【golang】sync.WaitGroup详解

    一、前言 Go语言在设计上对同步(Synchronization,数据同步和线程同步)提供大量的支持,比如 goroutine和channel同步原语,库层面有 - sync:提供基本的同步原语(比如Mutex、RWMutex、Locker)和 工具类(Once、WaitG...

  • 5

    解析在线教育增长方法拼团的底层逻辑和设计思路二、为什么解析拼团在线教育获客成本越来越高,尤其是投放成本在几个巨头的争夺下越来越高,而我们使用的任务宝裂变因为转化难题而被逐渐舍弃和优化,越来越多的机构开始寻找低成本获客之道,这时...

  • 3
    • studygolang.com 3 years ago
    • Cache

    Golang sync.WaitGroup的用法

    0x01 介绍经常会看到以下了代码: package mainimport ( "fmt" "time")func main(){ for i := 0; i < 100 ; i++{ go fmt.Println(i) } ti...

  • 4
    • studygolang.com 3 years ago
    • Cache

    golang sync.WaitGroup 源码分析

    最近在学习golang源码,学习golang源码是学习golang的非常好的途径。 先来记录一波sync包的学习。版本 go1.14.2 darwin/amd64 sync.WaitGroup 我们一般使用sync.waitGroup 做并发控制,使用方式一般如下...

  • 0

    如果你有一个任务可以分解成多个子任务进行处理,同时每个子任务没有先后执行顺序的限制,等到全部子任务执行完毕后,再进行下一步处理。这时每个子任务的执行可以并发处理,这种情景下适合使用 sync.WaitGroup。虽然 sync.WaitGroup...

  • 2
    • cbsheng.github.io 2 years ago
    • Cache

    你真的会用sync.WaitGroup吗

    你真的会用sync.WaitGroup吗 sync.WaitGroup常规用法 通俗点说,两个角色,一种goroutine作为一个worker(他是个小弟),老老实实干活。另一种goroutine作为管理者督促小弟干活(它自己也是个worker)。 在有很多小...

  • 3

    4.3、Golang 并发编程-WaitGroup实现同步 package main import ( "fmt" ) func showMessage(i int) { fmt.Printf("i: %v\n", i) } func main() { for i := 0; i < 10; i++ { showMess...

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK