62

一个调度系统的开发与性能优化

 6 years ago
source link: https://zhuanlan.zhihu.com/p/32439765?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

一个调度系统的开发与性能优化

可观测性、Kubernetes、云原生、Go,欢迎私聊!

背景:随着Go的不断发展,流行度越来越高,业界对Go的认可度也越来越高,所以很多团队或者公司在遇到性能问题时都会尝试使用Go来重构系统,尤其是云计算领域,大家期望能够通过语言栈来解决遇到的性能问题,该系统即是此类典型的案例。你可以把该系统认为就是kubernetes中的scheduler模块,不过更加复杂一些,增加了更多的预选复杂属性以及一些我们自己的优化。

毫无疑问的是,语言栈只可以解决基本的编码问题,例如开发周期与质量的平衡,语言本身的自带的高并发属性等等,但是毋庸置疑的是语言的选择只是解决性能问题的第一步,对此我是比较赞同的;重构和优化系统的绝大部分是对于系统逻辑设计的重新思考和编码细节的细致优化,一般看来,系统的架构和逻辑设计决定了系统性能的基本走势,编码以及算法的优化只是在此基础上的性能提升,犹如站在巨人的肩上,巨人的高低基本决定了我们系统最终的性能高度。

OK,回到我们的话题,这个类kube-scheduler的调度系统经过近一个月的开发,包括代码重构,代码规范,开发新的功能等等不断的Coding和修改,总算基本完成了,接着便面临系统优化相关的一系列问题,例如CPU和内存占用情况,初始化速度,响应速度,批量处理的性能以及Go的GC时间和goroutine执行情况等等很多指标。更少的资源,达到更高的性能将是优化的主题,所以一方面关注资源,另一方面关注性能,两者既矛盾又相辅相成。这里不会对调度系统本身和云计算的东西过多介绍。

一、初始化优化

在开始的时候,我发现启动该系统,然后直接去请求服务要等36s左右才返回结果,我得到的初始化火焰图如下:

图一

熟悉火焰图的同学可以一眼看到,中间很长的candidate.(*HostBuilder).init是我们需要关注的,它占据了近65%的CPU时间,我们等待系统初始化的时间应该大部分都耗在这里,说明这个地方是我们的一个瓶颈,给我们的优化指明了方向。

我看到了下列的代码(示例代码):

setFuncs := []func(){
    func() { b.set1(dbCache, errMessageChannel) },
    func() { b.set2(syncCache, ids, errMessageChannel) },
    func() { b.set3(syncCache, ids, errMessageChannel) },
    func() { b.set4(ids, errMessageChannel) },
    func() { b.set5(ids, errMessageChannel) },
    func() { b.set6(ids, errMessageChannel) },
    func() { b.set7(ids, errMessageChannel) },
    func() { b.set8(errMessageChannel) },
    func() { b.set9(ids, errMessageChannel) },
    func() { b.set10(errMessageChannel) },
    func() { b.set11(ids, errMessageChannel) },
    func() { b.set12(errMessageChannel) },
}

可以看到在setFuncs中有12个函数分别是给初始化数据set属性的值,每一个函数都有一定的计算量,而且我们是安装上面的顺序依次执行的。ok,所以我们可不可以并发执行呢?这样写本是因为要获取错误信息,要依次将错误信息append,所以首先我们如何解决这个问题呢?当然是channel,所以,我们完全可以使用sync.WaitGroup将该部分并发执行直到全部都结束。

你可以自己封装它:

type WaitGroupWrapper struct {
  gosync.WaitGroup
}

func (w *WaitGroupWrapper) Wrap(cb func()) {
  w.Add(1)
  go func() {
    cb()
    w.Done()
  }()
}

这样你就可以方面的使用,不用担心是不是少Add或者Done

for _, f := range setFuncs {
    wg.Wrap(f)
  }
  wg.Wait()

效果很明显,初始化速度降至10s左右,和之前Python版相比也算低了一个数量级。基本可接受(后面该性能还会进一步优化)

图2

二、减少无效计算

熟悉kubernetes的同学应该知道在scheduler中有预选和优选两个阶段,在预选阶段有一系列的Filter,一台宿主机如果有不满足的指标那么这台宿主机将会被排除,在优选阶段不会在列。

从火焰图我们可以看到filter占据的CPU时间较多

图3

最开始的时候我通过并发的执行所有的filter来提高处理速度,但是收效甚微。后来想能不能在出现不符合的Filter时后面的Filter不再执行?由于我们有好几万台宿主机,可以想象应该可以节省不少的计算量,但这会带来一个问题,即在失败时只能知道哪一个Filter不符合,不能知道后面的是否符合。这是一个取舍的问题。经过和同事商量,决定还是舍去。经过测试发现处理速度有了显著的提高。一次调度从260ms降低到150ms左右。

我们通过火焰图可以看出效果:

图4

无独有偶,在Kubernetes社区上你也可以看到有人提出类似的优化Issue,这一点说明不仅仅我们是这样想的,在针对此类问题业界也在寻求更加优秀的调度算法,在和scheduler的owner沟通过程中,他也意识到这样的优化是很多场景会用到的,同时觉得可以通过一个配置项开启这个功能,所以我针对此问题给Kubernetes的原生scheduler提了一个PR,为Kubernetes也增加该项优化,据我们初步测试性能可提升40%左右,后期还会有对此的优化,这里不做详细介绍了。

三、关于Go的栈扩张问题

虽然一直对Go很是推崇,自己也在一直研究它,但是很客观地说Go本身还是有很多地方不足,最直接的问题是你会碰到一些坑。细心的同学应该已经看到图4的火焰图最右边的morestack

这里要说下Go的栈,我们都知道Go语言中的goroutine初始栈大小只有2K,但如果运行过程中调用链比较长(例如在使用递归算法时或者项目很庞大导致调用链很长),超过的这个大小的时候,栈会自动地扩张,这个时候就会调用到函数runtime.morestack,也就是我们上面看到的。当然开一个goroutine本身开销非常小,但是调用morestack进行扩栈的开销是比较大的。因为牵涉到栈上对象以及引用的问题,所以在morestack的时候,里面的对象都是需要调整位置的,指针都要重定位到新的栈。栈越大,调用链越长,涉及到需要调整的对象也就越多,所以可以看到调用morestack的时候开销也越大。

我们的项目里就发现这个问题,morestack的CPU时间几乎占到了12%,这是我们无法忍受的。针对该问题看了好久源码,解决方案一般有两种,一种是初始化时就将栈扩展,在社区中看到有人提出将这个大小默认扩展到32K,但我不是很看好这个方案,这会导致所有的goroutine都变大,反而丧失了goroutine本身轻便的特性。

结合业界对该问题的其他解决方案,这里我们采用协程池的方案,其实goroutine这么轻量的东西,其实本身做池意义并不大,随用随开,用完就扔,完全没有什么负担。采用goroutine pool之后,如果goroutine被扩栈了,再还到pool里面,下次拿出来时是一个已扩栈过的goroutine,因此可以减少morestack。TiDB中针对此问题有个很不错的PR

四、ioutil.ReadAll的坑

在pprof观察heap增长的时候发现一个处理http response.Body体的逻辑导致heap不断增长,栈如下:

1: 67108864 [2: 134217728] @ 0x49b4a7 0x49b3e1 0x57887c 0x57895e 0x137dbbb 0x138035d 0x138029f 0x138cc6a 0x138e1c0 0x1341c8b 0x139073c 0x1341526 0x1340e2c 0x1341f7a 0x7fb3fe 0x7fb0cd 0x7faffd 0x45c291
# 0x49b4a6  bytes.makeSlice+0x76                      /usr/local/Cellar/go/1.9.2/libexec/src/bytes/buffer.go:231
# 0x49b3e0  bytes.(*Buffer).ReadFrom+0x290                    /usr/local/Cellar/go/1.9.2/libexec/src/bytes/buffer.go:203
# 0x57887b  io/ioutil.readAll+0x12b                     /usr/local/Cellar/go/1.9.2/libexec/src/io/ioutil/ioutil.go:33
# 0x57895d  io/ioutil.ReadAll+0x3d                      /usr/local/Cellar/go/1.9.2/libexec/src/io/ioutil/ioutil.go:42

这是抓取的pprof中的heap信息,在程序运行过程中一直在增加,可以看出大部分内存开销集中在 bytes.makeSlice 和 ioutil.ReadAll上面,我们的程序也确实有调用 ioutil.ReadAll().

respData, err := ioutil.ReadAll(resp.Body)  
    err = json.NewDecoder(resp.Body).Decode(&reqData.RespValue)
    if err != nil {
        err = errors.Newf(err, "failed reading the response body")  
        return    
    }
    
    if len(respData) > 0 {    
      if reqData.RespValue != nil {   
          err = json.Unmarshal(respData, &reqData.RespValue)    
          if err != nil {   
              err = errors.Newf(err, "failed unmarshaling the response body: %s", respData)   
          }   
      }
    }
// readAll reads from r until an error or EOF and returns the data it read
// from the internal buffer allocated with a specified capacity.
func readAll(r io.Reader, capacity int64) (b []byte, err error) {
  var buf bytes.Buffer
  // If the buffer overflows, we will get bytes.ErrTooLarge.
  // Return that as an error. Any other panic remains.
  defer func() {
    e := recover()
    if e == nil {
      return
    }
    if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge {
      err = panicErr
    } else {
      panic(e)
    }
  }()
  if int64(int(capacity)) == capacity {
    buf.Grow(int(capacity))
  }
  _, err = buf.ReadFrom(r)
  return buf.Bytes(), err
}

// ReadAll reads from r until an error or EOF and returns the data it read.
// A successful call returns err == nil, not err == EOF. Because ReadAll is
// defined to read from src until EOF, it does not treat an EOF from Read
// as an error to be reported.
func ReadAll(r io.Reader) ([]byte, error) {
  return readAll(r, bytes.MinRead)
}

从以上可以看到,ioutil.ReadAll 每次都会分配初始化一个大小为 bytes.MinRead 的 buffer ,bytes.MinRead 在 Golang 里是一个常量,值为 512 。就是说每次调用 ioutil.ReadAll 都会分配一块大小为 512 字节的内存。makeSlice的原因是ioutil.ReadAll() 里会调用 bytes.Buffer.ReadFrom, 而 bytes.Buffer.ReadFrom 会进行 makeSlice

// ReadFrom reads data from r until EOF and appends it to the buffer, growing
// the buffer as needed. The return value n is the number of bytes read. Any
// error except io.EOF encountered during the read is also returned. If the
// buffer becomes too large, ReadFrom will panic with ErrTooLarge.
func (b *Buffer) ReadFrom(r io.Reader) (n int64, err error) {
  b.lastRead = opInvalid
  for {
    i := b.grow(MinRead)
    m, e := r.Read(b.buf[i:cap(b.buf)])
    if m < 0 {
      panic(errNegativeRead)
    }

    b.buf = b.buf[:i+m]
    n += int64(m)
    if e == io.EOF {
      return n, nil // e is EOF, so return nil explicitly
    }
    if e != nil {
      return n, e
    }
  }
}

// grow grows the buffer to guarantee space for n more bytes.
// It returns the index where bytes should be written.
// If the buffer can't grow it will panic with ErrTooLarge.
func (b *Buffer) grow(n int) int {
  m := b.Len()
  // If buffer is empty, reset to recover space.
  if m == 0 && b.off != 0 {
    b.Reset()
  }
  // Try to grow by means of a reslice.
  if i, ok := b.tryGrowByReslice(n); ok {
    return i
  }
  // Check if we can make use of bootstrap array.
  if b.buf == nil && n <= len(b.bootstrap) {
    b.buf = b.bootstrap[:n]
    return 0
  }
  c := cap(b.buf)
  if n <= c/2-m {
    // We can slide things down instead of allocating a new
    // slice. We only need m+n <= c to slide, but
    // we instead let capacity get twice as large so we
    // don't spend all our time copying.
    copy(b.buf, b.buf[b.off:])
  } else if c > maxInt-c-n {
    panic(ErrTooLarge)
  } else {
    // Not enough space anywhere, we need to allocate.
    buf := makeSlice(2*c + n)
    copy(buf, b.buf[b.off:])
    b.buf = buf
  }
  // Restore b.off and len(b.buf).
  b.off = 0
  b.buf = b.buf[:m+n]
  return m
}
  • 这个函数主要作用就是从 io.Reader 里读取的数据放入 buffer 中,如果 buffer 空间不够,就按照每次 2x + MinRead 的算法递增,这里 MinRead 的大小也是 512 Bytes ,由于我们每次调用 ioutil.ReadAll 读取的数据远小于 512 Bytes ,照理说不会触发 buffer grow 的算法,但是仔细看下实现发现不是这么回事,buffer grow 的算法大概是是这样子的:
  • 1.计算 buffer 剩余大小 free;
  • 2.如果 free < MinRead, 分配一个大小为 2 * cap + MinRead 的 newBuf, 把老 buffer 的数据拷贝到 newBuf;
  • 3.如果 free >= MinRead, 读取数据到 buffer, 遇到错误就返回,否则跳转到第 1 步. 因为我们用了 io.LimitReader , 第一趟循环只会读取固定字节的数据,不会触发任何错误。但是第二次循环的时候,由于 buffer 的剩余空间不足,就会触发一次 buffer grow 的算法,再分配一个大小为 1536 Bytes 的 Buffer , 然后再次 Read() 的时候会返回 io.EOF 的错误。这就是为什么会有那么多次 makeSlice 调用的原因

怎么解决?仔细看下代码会发现,我们的目的不就是Umarshal其中的数据吗?交给json自己做吧,没必要多那一步。

err = json.NewDecoder(resp.Body).Decode(&reqData.RespValue)
if err != nil {
  err = errors.Newf(err, "failed unmarshaling the response body: %v", resp.Body)
}

再继续看下heap发现减轻很多了。

五、关于并行度的大小

这个是让我比较苦恼的地方。刚开始我是在一个4核8G的虚拟机中测试,发现不断调整workqueue.Parallelize的并行度出现以下的特点。

图5

为什么会是14呢?

难道是我的机器原因,我换了一台32核的机器,32核的机器CPU负载只有20%左右,并且拥有256个虚拟核数。这里有上万个任务需要跑,我在测试之前一直觉得要么是32最合适,要么是256最合适,可结果出乎我所料,仍然是14。CPU上不去我推测的可能性是开启的goroutine运行时间过短,CPU没有采集到,但我觉得这个理由说服不了我自己。

进而想想会不会是GC的原因,开启GC我发现GC频率很低的,大概几十秒左右。所以排除了GC的影响,但是可以肯定的是并不是goroutine越多越好,对于每一个系统具体应该是多少的数量应该根据自己系统和机器调整到最佳状态,这个只有方法可循没有公式可用。

(PS:这个问题至今我没有找到这个数字关系,苦恼中,有兴趣的同学可以给提提思路!)

六、一些小的CASE

  • 使用高性能json序列化

从火焰图发现encoding/json占据了不少的CPU时间,从目前的开源json序列化package老看,最好的是http://github.com/json-iterator/go,比标准库快数十倍。

  • 尽量提前分配slice和map的长度

这个应该都清楚,一次性分配空间可以减少自动扩张引起的复制过程。

  • 尽量减少对象的分配

虽然我已经尽量的减少对象,但是考虑到成本和目前GC并没有带来严重的影响,还可以忍受,后期应该会专门优化这部分也说不定。

  • 减少无效的计算和优化算法一样重要

我发现大部分情况下,不是说使用更加高效的算法来解决的,更多的是对于流程的优化和无效计算的减少。

  • 1.使用pprof,火焰图等工具可以对优化工作带来很大的便利,善于使用工具非常重要,也是一个必备的技能。掌握了使用的套路,就是收获了经验
  • 2.抛弃业务只去想优化很可能是在做无用功,深入业务才能更好的开展开发和优化工作
  • 3.Go系统优化的过程,深入了解Go的特性是基础,不然会不知思路在哪里,这也是初级Go学习者想要进阶的一个途径:深入Go的特性
  • 4.深入业务又要给自己产生正向反馈,你解决的问题可能就是业界也存在的问题,积极寻找外界的信息会帮助自己更好的构建系统也可能为业界贡献自己的才智

说的比较乱,不对和不足还请大家指教!

两个月的努力?终于也在2018年前上线了。。。。。看到性能监控的曲线真漂亮!

更一下曲线:




About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK