9

Go并发编程之传统同步—(2)条件变量

 3 years ago
source link: https://segmentfault.com/a/1190000037426003
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

前言

回顾上篇文章 《Go并发编程之传统同步—(1)互斥锁》 其中说到,同步最终是为了达到以下两种目的:

  • 维持共享数据一致性,并发安全
  • 控制流程管理,更好的协同工作

示例程序通过使用互斥锁,达到了数据一致性目的,那么流程管理应该怎么做呢?

传统同步

条件变量

上篇文章的示例程序,仅仅实现了累加功能,但在现实的工作场景中,需求往往不可能这么简单,现在扩展一下这个程序,给它加上累减的功能。

加上了累减的示例程序,可以抽象的理解为一个固定容量的“储水池”,可以注水、排水。

仅用互斥锁

当水注满以后,停止注水,开始排水,当水排空以后,开始注水,反反复复...

func TestDemo1(t *testing.T) {
    var mut sync.Mutex
    maxSize := 10
    counter := 0

    // 排水口
    go func() {
        for {
            mut.Lock()
            if counter == maxSize {
                for i := 0; i < maxSize; i++ {
                    counter--
                    log.Printf("OUTPUT counter = %d", counter)
                }
            }
            mut.Unlock()
            time.Sleep(1 * time.Second)
        }
    }()

    // 注水口
    for {
        mut.Lock()
        if counter == 0 {
            for i := 0; i < maxSize; i++ {
                counter++
                log.Printf(" INPUT counter = %d", counter)
            }
        }
        mut.Unlock()
        time.Sleep(1 * time.Second)
    }
}

结果

=== RUN   TestDemo1
                ···
2020/10/06 13:52:50  INPUT counter = 8
2020/10/06 13:52:50  INPUT counter = 9
2020/10/06 13:52:50  INPUT counter = 10
2020/10/06 13:52:50 OUTPUT counter = 9
2020/10/06 13:52:50 OUTPUT counter = 8
2020/10/06 13:52:50 OUTPUT counter = 7
                ···

看着没有什么问题,一切正常,但就是这样工作的策略效率太低。

优化互斥锁

优化策略,不用等注满水再排水,也不用放空之后,再注水,注水口和排水口一起工作。

func TestDemo2(t *testing.T) {
    var mut sync.Mutex
    maxSize := 10
    counter := 0

    // 排水口
    go func() {
        for {
            mut.Lock()
            if counter != 0 {
                counter--
            }
            log.Printf("OUTPUT counter = %d", counter)
            mut.Unlock()
            time.Sleep(5 * time.Second) // 为了演示效果,睡眠5秒
        }
    }()

    // 注水口
    for {
        mut.Lock()
        if counter != maxSize {
            counter++
        }
        log.Printf(" INPUT counter = %d", counter)
        mut.Unlock()
        time.Sleep(1 * time.Second) // 为了演示效果,睡眠1秒
    }
}

结果

=== RUN   TestDemo2
                ···
2020/10/06 14:11:46  INPUT counter = 7
2020/10/06 14:11:47  INPUT counter = 8
2020/10/06 14:11:48 OUTPUT counter = 7
2020/10/06 14:11:48  INPUT counter = 8
2020/10/06 14:11:49  INPUT counter = 9
2020/10/06 14:11:50  INPUT counter = 10
2020/10/06 14:11:51  INPUT counter = 10
2020/10/06 14:11:52  INPUT counter = 10
2020/10/06 14:11:53 OUTPUT counter = 9
2020/10/06 14:11:53  INPUT counter = 10
2020/10/06 14:11:54  INPUT counter = 10
2020/10/06 14:11:55  INPUT counter = 10
2020/10/06 14:11:56  INPUT counter = 10
2020/10/06 14:11:57  INPUT counter = 10
2020/10/06 14:11:58 OUTPUT counter = 9
2020/10/06 14:11:58  INPUT counter = 10
2020/10/06 14:11:59  INPUT counter = 10
                ···

通过日志输出,可以看到程序达到了需求,运作正常。

但是,通过日志输出发现,当排水口效率低下的时候,注水口一直在轮询,这里频繁的上锁操作造成的开销很是浪费。

条件变量:单发通知

那有没有什么好的办法,省去不必要的轮询?如果注水口和排水口能互相“通知”就好了!这个功能, 条件变量 可以做到。

条件变量总是与互斥锁组合使用,除了可以使用 Lock、Unlock,还有如下三个方法:

  • Wait 等待通知
  • Signal 单发通知
  • Broadcast 广播通知
func TestDemo3(t *testing.T) {
    cond := sync.NewCond(new(sync.Mutex)) // 初始化条件变量
    maxSize := 10
    counter := 0

    // 排水口
    go func() {
        for {
            cond.L.Lock() // 上锁
            if counter == 0 { // 没水了
                cond.Wait() // 啥时候来水?等通知!
            }
            counter--
            log.Printf("OUTPUT counter = %d", counter)
            cond.Signal() // 单发通知:已排水
            cond.L.Unlock() // 解锁
            time.Sleep(5 * time.Second) // 为了演示效果,睡眠5秒
        }
    }()

    // 注水口
    for {
        cond.L.Lock() // 上锁
        if counter == maxSize { // 水满了
            cond.Wait() // 啥时候排水?等待通知!
        }
        counter++
        log.Printf(" INPUT counter = %d", counter)
        cond.Signal() // 单发通知:已来水
        cond.L.Unlock() // 解锁
        time.Sleep(1 * time.Second) // 为了演示效果,睡眠1秒
    }
}

结果

=== RUN   TestDemo3
                ···
2020/10/06 14:51:22  INPUT counter = 7
2020/10/06 14:51:23  INPUT counter = 8
2020/10/06 14:51:24 OUTPUT counter = 7
2020/10/06 14:51:24  INPUT counter = 8
2020/10/06 14:51:25  INPUT counter = 9
2020/10/06 14:51:26  INPUT counter = 10
2020/10/06 14:51:29 OUTPUT counter = 9
2020/10/06 14:51:29  INPUT counter = 10
2020/10/06 14:51:34 OUTPUT counter = 9
2020/10/06 14:51:34  INPUT counter = 10
                ···

通过日志输出,可以看出来,注水口没有一直轮询了,而是等到排水口发通知后,再进行注水,注水口一直再等排水口。那么新的问题又来了,如何提高排水口的效率呢?

条件变量:广播通知

多制造出一个排水口,提高排水效率。

那就不能继续使用单发通知了(Signal),因为单发通知只会通知到一个等待(Wait),针对多等待的这种情况,就需要使用广播通知(Broadcast)。

func TestDemo4(t *testing.T) {
    cond := sync.NewCond(new(sync.Mutex)) // 初始化条件变量
    maxSize := 10
    counter := 0

    // 排水口 1
    go func() {
        for {
            cond.L.Lock() // 上锁
            if counter == 0 { // 没水了
            //for counter == 0 { // 没水了
                cond.Wait() // 啥时候来水?等通知!
            }
            counter--
            log.Printf("OUTPUT A counter = %d", counter)
            cond.Broadcast() // 单发通知:已排水
            cond.L.Unlock() // 解锁
            //time.Sleep(2 * time.Second) // 为了演示效果,睡眠5秒
        }
    }()

    // 排水口 2
    go func() {
        for {
            cond.L.Lock() // 上锁
            if counter == 0 { // 没水了
            //for counter == 0 { // 没水了
                cond.Wait() // 啥时候来水?等通知!
            }
            counter--
            log.Printf("OUTPUT B counter = %d", counter)
            cond.Broadcast() // 单发通知:已排水
            cond.L.Unlock() // 解锁
            //time.Sleep(2 * time.Second) // 为了演示效果,睡眠5秒
        }
    }()

    // 注水口
    for {
        cond.L.Lock() // 上锁
        if counter == maxSize { // 水满了
        //for counter == maxSize { // 水满了
            cond.Wait() // 啥时候排水?等待通知!
        }
        counter++
        log.Printf(" INPUT   counter = %d", counter)
        cond.Broadcast() // 单发通知:已来水
        cond.L.Unlock() // 解锁
        //time.Sleep(1 * time.Second) // 为了演示效果,睡眠1秒
    }
}

结果

=== RUN   TestDemo4
                ···
2020/10/07 20:57:30 OUTPUT B counter = 2
2020/10/07 20:57:30 OUTPUT B counter = 1
2020/10/07 20:57:30 OUTPUT B counter = 0
2020/10/07 20:57:30 OUTPUT A counter = -1
2020/10/07 20:57:30 OUTPUT A counter = -2
2020/10/07 20:57:30 OUTPUT A counter = -3
2020/10/07 20:57:30 OUTPUT A counter = -4
                ···
2020/10/07 20:57:31 OUTPUT B counter = -7605
2020/10/07 20:57:31  INPUT   counter = -7604
2020/10/07 20:57:31 OUTPUT A counter = -7605
2020/10/07 20:57:31 OUTPUT A counter = -7606
                ···

通过日志输出可以看到,刚开始的时候还很正常,到后面的时候就变成负值了,一直在负增长,What?

《Go并发编程之传统同步—(1)互斥锁》 文章中,程序因为没有加上互斥锁,出现过 counter 值异常的情况。

但这次程序这次加了互斥锁,按理说形成了一个临界区应该是没有问题了,所以问题应该不是出在临界区上,难道问题出在 Wait 上?

通过IDE 追踪一下Wait的源码

func (c *Cond) Wait() {
        // 检查 c 是否是被复制的,如果是就 panic
    c.checker.check()
    // 将当前 goroutine 加入等待队列
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    // 等待当前 goroutine 被唤醒
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

原来 Wait 内部的执行流程是,先执行了解锁,然后进入等待状态,接到通知之后,再执行加锁操作。

那按照这个代码逻辑结合输出日志,走一程序遍流程,看看能不能复现出 counter 为负值的情况:

  1. 注水口将 counter 累加到 10 之后,发送广播通知(Broadcast)。
  2. goroutine A 在“第1步”之前的时候进入了等待通知(Wait),现在接收到了广播通知(Broadcast),从 runtime_notifyListWait() 返回,并且成功执行了加锁(Lock)操作。
  3. goroutine B 在“第1步”之前的时候进入了等待通知(Wait),现在接收到了广播通知(Broadcast),从 runtime_notifyListWait() 返回,在执行加锁(Lock)操作的时候,发现 goroutine A 先抢占了临界区,所以一直阻塞在 c.L.Lock()。
  4. goroutine A 虽然完成任务后会释放锁,但是每次也成功将锁抢占,所以就这样 一直将 counter 减到了 0,然后发送广播通知(Broadcast)、解锁(Unlock)。
  5. goroutine B 在 goroutine A 解锁后,成功获得锁并从 Lock 方法中返回,接下来跳出 Wait 方法、跳出 if 判断,执行 counter--(0--),这时候 counter 的值是 -1

图示

F7fMneM.jpg!mobile

问题就出现在第五步,只要 goroutine B 加锁成功的时候,再判断一下 counter 是否为 0 就好了。

所以将 if counter == 0 改成 for counter == 0,这样上面的“第五步”就变成了

5.goroutine B 在 goroutine A 解锁后,成功加锁(Lock)并从阻塞总返回,接下来跳出 Wait 方法、再次进入 for 循环,判断 counter == 0 结果为真,再次进入等待(Wait)。

代码做出相应的修改后,再执行看结果,没有问题了。

延伸

发送通知

等待通知(Wait)肯定是要在临界区里面的,那发送通知(Signal、Broadcast)在哪里更好呢?

Luck()
Wait()
Broadcast()// Signal()
Unlock()

// 或者

Luck()
Wait()
Unlock()
Broadcast()// Signal()

// 两种写法都不会报错

在 go 的发送通知方法(Broadcast、Signal)上有这么一段话:

// It is allowed but not required for the caller to hold c.L \

在我以往的 C 多线程开发的时候,发送通知总是在锁中的:

pthread_mutex_lock(&thread->mutex);
//              ...
pthread_cond_signal(&thread->cond);
pthread_mutex_unlock(&thread->mutex);

man 手册中有写到:

The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal().

个人对此并没有什么见解,就不乱下定论了,有想法的小伙伴可以在文章下面留言,一起讨论。

等待通知

消息通知是有即时性的,如果没有 goroutine 在等待通知,那么这次通知直接被丢弃。

kubernetes

https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/tools/cache/fifo.go

总结

  1. Wait() 内会执行解锁、等待、加锁。
  2. Wait() 必须在 for 循环里面。
  3. Wait() 方法会把当前的 goroutine 添加到通知队列的队尾。
  4. 单发通知,唤醒通知队列第一个排队的 goroutine。
  5. 广播通知,唤醒通知队列里面全部的 goroutine。
  6. 程序示例只是为了演示效果,实际的开发中,生产者和消费者应该是异步消费,不应该使用同一个互斥锁。

文章示例代码


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK