271

来,控制一下 Goroutine 的并发数量

 5 years ago
source link: https://studygolang.com/articles/17811?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

1460000017956434?w=800&h=600

原文地址: 来,控制一下 Goroutine 的并发数量

问题

func main() {
    userCount := math.MaxInt64
    for i := 0; i < userCount; i++ {
        go func(i int) {
            // 做一些各种各样的业务逻辑处理
            fmt.Printf("go func: %d\n", i)
            time.Sleep(time.Second)
        }(i)
    }
}

在这里,假设 userCount 是一个外部传入的参数(不可预测,有可能值非常大),有人会全部丢进去循环。想着全部都并发 goroutine 去同时做某一件事。觉得这样子会效率会更高,对不对!

那么,你觉得这里有没有什么问题?

噩梦般的开始

当然,在 特定场景下 ,问题可大了。因为在本文被丢进去同时并发的可是一个极端值。我们可以一起观察下图的指标分析,看看情况有多 “崩溃”。下图是上述代码的表现:

输出结果

...
go func: 5839
go func: 5840
go func: 5841
go func: 5842
go func: 5915
go func: 5524
go func: 5916
go func: 8209
go func: 8264
signal: killed

如果你自己执行过代码,在 “输出结果” 上你会遇到如下问题:

  • 系统资源占用率不断上涨
  • 输出一定数量后:控制台就不再刷新输出最新的值了
  • 信号量:signal: killed

系统负载

1460000017956435?w=1200&h=1000

CPU

1460000017956436

短时间内系统负载暴增

虚拟内存

1460000017956437?w=1200&h=1000

短时间内占用的虚拟内存暴增

top

PID    COMMAND      %CPU  TIME     #TH   #WQ  #PORT MEM    PURG   CMPRS  PGRP  PPID  STATE    BOOSTS
...
73414  test         100.2 01:59.50 9/1   0    18    6801M+ 0B     114G+  73403 73403 running  *0[1]

小结

如果仔细看过监控工具的示意图,就可以知道其实我间隔的执行了两次,能看到系统间的使用率幅度非常大。当进程被杀掉后,整体又恢复为正常值

在这里,我们回到主题,就是在 不控制并发的 goroutine 数量 会发生什么问题?大致如下:

  • CPU 使用率浮动上涨
  • Memory 占用不断上涨。也可以看看 CMPRS,它表示进程的压缩数据的字节数。已经到达 114G+ 了
  • 主进程崩溃(被杀掉了)

简单来说,“崩溃” 的原因就是对系统资源的占用过大。常见的比如:打开文件数(too many files open)、内存占用等等

危害

对该台服务器产生非常大的影响,影响自身及相关联的应用。很有可能导致不可用或响应缓慢,另外启动了复数 “失控” 的 goroutine,导致程序流转混乱

解决方案

在前面花了大量篇幅,渲染了在存在大量并发 goroutine 数量时,不控制的话会出现 “严重” 的问题,接下来一起思考下解决方案。如下:

  1. 控制/限制 goroutine 同时并发运行的数量
  2. 改变应用程序的逻辑写法(避免大规模的使用系统资源和等待)
  3. 调整服务的硬件配置、最大打开数、内存等阈值

控制 goroutine 并发数量

接下来正式的开始解决这个问题,希望你认真阅读的同时加以思考,因为这个问题在实际项目中真的是太常见了!

问题已经抛出来了,你需要做的是 想想有什么办法 解决这个问题。建议你自行思考一下技术方案。再接着往下看 :-)

尝试 chan

func main() {
    userCount := 10
    ch := make(chan bool, 2)
    for i := 0; i < userCount; i++ {
        ch <- true
        go Read(ch, i)
    }
    
    //time.Sleep(time.Second)
}

func Read(ch chan bool, i int) {
    fmt.Printf("go func: %d\n", i)
    <- ch
}

输出结果:

go func: 1
go func: 2
go func: 3
go func: 4
go func: 5
go func: 6
go func: 7
go func: 8
go func: 0

嗯,我们似乎很好的控制了 2 个 2 个的 “顺序” 执行多个 goroutine。但是,问题出现了。你仔细数一下输出结果,才 9 个值?

这明显就不对。原因出在当主协程结束时,子协程也是会被终止掉的。因此剩余的 goroutine 没来及把值输出,就被送上路了(不信你把 time.Sleep 打开看看,看看输出数量)

尝试 sync

...
var wg = sync.WaitGroup{}

func main() {
    userCount := 10
    for i := 0; i < userCount; i++ {
        wg.Add(1)
        go Read(i)
    }

    wg.Wait()
}

func Read(i int) {
    defer wg.Done()
    fmt.Printf("go func: %d\n", i)
}

嗯,单纯的使用 sync.WaitGroup 也不行。没有控制到同时并发的 goroutine 数量(代指达不到本文所要求的目标)

小结

单纯 简单 使用 channel 或 sync 都有明显缺陷,不行。我们再看看组件配合能不能实现

尝试 chan + sync

...
var wg = sync.WaitGroup{}

func main() {
    userCount := 10
    ch := make(chan bool, 2)
    for i := 0; i < userCount; i++ {
        go Read(ch, i)
    }

    wg.Wait()
}

func Read(ch chan bool, i int) {
    defer wg.Done()
    wg.Add(1)

    ch <- true
    fmt.Printf("go func: %d, time: %d\n", i, time.Now().Unix())
    time.Sleep(time.Second)
    <-ch
}

输出结果:

go func: 9, time: 1547911938
go func: 1, time: 1547911938
go func: 6, time: 1547911939
go func: 7, time: 1547911939
go func: 8, time: 1547911940
go func: 0, time: 1547911940
go func: 3, time: 1547911941
go func: 2, time: 1547911941
go func: 4, time: 1547911942
go func: 5, time: 1547911942

从输出结果来看,确实实现了控制 goroutine 以 2 个 2 个的数量去执行我们的 “业务逻辑”,当然结果集也理所应当的是乱序输出

方案一:简单 Semaphore

在确立了简单使用 chan + sync 的方案是可行后,我们重新将流转逻辑封装为 gsema ,主程序变成如下:

import (
    "fmt"
    "time"

    "github.com/EDDYCJY/gsema"
)

var sema = gsema.NewSemaphore(3)

func main() {
    userCount := 10
    for i := 0; i < userCount; i++ {
        go Read(i)
    }

    sema.Wait()
}

func Read(i int) {
    defer sema.Done()
    sema.Add(1)

    fmt.Printf("go func: %d, time: %d\n", i, time.Now().Unix())
    time.Sleep(time.Second)
}

分析方案

在上述代码中,程序执行流程如下:

sema

看上去人模人样,没什么严重问题。但却有一个 “大” 坑,认真看到第二点 “每次启动一个 goroutine” 这句话。这里 有点问题 ,提前产生那么多的 goroutine 会不会有什么问题,接下来一起分析下利弊,如下:

  • 适合 量不大、复杂度低 的使用场景

    • 几百几千个、几十万个也是可以接受的(看具体业务场景)
    • 实际业务逻辑在运行前就已经被阻塞等待了(因为并发数受限),基本实际业务逻辑损耗的性能比 goroutine 本身大
    • goroutine 本身很轻便,仅损耗极少许的内存空间和调度。这种等待响应的情况都是躺好了,等待任务唤醒
  • Semaphore 操作复杂度低且流转简单,容易控制

  • 不适合 量很大、复杂度高 的使用场景

    • 有几百万、几千万个 goroutine 的话,就浪费了大量调度 goroutine 和内存空间。恰好你的服务器也接受不了的话
  • Semaphore 操作复杂度提高,要管理更多的状态

小结

  • 基于什么业务场景,就用什么方案去做事
  • 有足够的时间,允许你去追求更优秀、极致的方案(用第三方库也行)

用哪种方案,我认为主要基于以上两点去思考,都是 OK 的。没有对错,只有当前业务场景能不能接受,这个预先启动的 goroutine 数量你的系统是否能够接受

当然了,常见/简单的 Go 应用采用这类技术方案,基本就能解决问题了。因为像本文第一节 “问题” 如此超巨大数量的情况,情况很少。其并不存在那些 “特殊性”。因此用这个方案基本 OK

灵活控制 goroutine 并发数量

小手一紧。隔壁老王发现了新的问题。“方案一” 中,在 输入输出一体 的情况下,在常见的业务场景中确实可以

但,这次新的业务场景比较特殊,要控制输入的数量,以此达到 改变允许并发运行 goroutine 的数量 。我们仔细想想,要做出如下改变:

  • 输入/输出要抽离,才可以分别控制
  • 输入/输出要可变,理所应当在 for-loop 中(可设置数值的地方)
  • 允许改变 goroutine 并发数量,但它也必须有一个 最大值 (因为允许改变是相对)

方案二:灵活 chan + sync

package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func main() {
    userCount := 10
    ch := make(chan int, 5)
    for i := 0; i < userCount; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for d := range ch {
                fmt.Printf("go func: %d, time: %d\n", d, time.Now().Unix())
                time.Sleep(time.Second * time.Duration(d))
            }
        }()
    }

    for i := 0; i < 10; i++ {
        ch <- 1
        ch <- 2
        //time.Sleep(time.Second)
    }

    close(ch)
    wg.Wait()
}

输出结果:

...
go func: 1, time: 1547950567
go func: 3, time: 1547950567
go func: 1, time: 1547950567
go func: 2, time: 1547950567
go func: 2, time: 1547950567
go func: 3, time: 1547950567
go func: 1, time: 1547950568
go func: 2, time: 1547950568
go func: 3, time: 1547950568
go func: 1, time: 1547950568
go func: 3, time: 1547950569
go func: 2, time: 1547950569

在 “方案二” 中,我们可以随时随地的根据新的业务需求,做如下事情:

  • 变更 channel 的输入数量
  • 能够根据特殊情况,变更 channel 的循环值
  • 变更最大允许并发的 goroutine 数量

总的来说,就是可控空间都尽量放开了,是不是更加灵活了呢 :-)

方案三:第三方库

比较成熟的第三方库也不少,基本都是以生成和管理 goroutine 为目标的池工具。我简单列了几个,具体建议大家阅读下源码或者多找找,原理相似

总结

在本文的开头,我花了大力气(极端数量),告诉你 同时并发过多的 goroutine 数量会导致系统占用资源不断上涨。最终该服务崩盘的极端情况 。为的是希望你今后避免这种问题,给你留下深刻的印象

接下来我们以 “控制 goroutine 并发数量” 为主题,展开了一番分析。分别给出了三种方案。在我看来,各具优缺点,我建议你 挑选合适自身场景的技术方案 就可以了

因为,有不同类型的技术方案也能解决这个问题,千人千面。本文推荐的是较常见的解决方案,也欢迎大家在评论区继续补充 :-)


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK