69

GO: sync.Pool 的实现与演进

 5 years ago
source link: https://studygolang.com/articles/20176?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

上一次写了 sync.Mutex 的实现与演进后,感觉有必要对 GO 标准库的一些功能进行追踪,看看不断优化的过程,发掘有意思的点。一般 sync.Pool 用作小对像池,比如前公司同事,在 thrift golang lib 增加了 sync.Pool 实现 []byte 等对象的复用。网上也有很多 objectPool 的轮子,但总体实现都不如 sync.Pool 高效。

基本原理与演进初探

想象一下如果我们自己实现,该怎么做呢?用一个定长的 channel 保存对象,拿到了就用,拿不到就 new 创建一个,伪代码大致如下:

type ObjectPool struct {
    ch chan {}interface
    newFunc func() {}interface
}

func (o *ObjectPool) Get() {}interface {
    select {
        v := <-o.ch:
          return v
        default:
    }
    return o.newFunc()
}

func (o *ObjectPool) Put(v {}interface) {
    select {
        o.ch <- v:
        default:
    }
}

代码很简洁,利用 select default 语法实现无阻塞操作。这里最大的问题就是 channel 也是有代价的,一把大锁让性能会变得很低,参考我之前的关 dpvs 性能优化 。那怎么优化呢?多核 cpu 高并发编程,就是要每个 cpu 拥有自己的本地数据,这样就避免了锁争用的开销。而事实上 sync.Pool 也是这么做的。

看了下提交记录,从增加该功能后实现的大方现基本没变:

  1. 每个 P (逻辑并发模型,参考 GMP) 拥有本地缓存队列,如果本地获取不到对象,再从其它 P 去偷一个,其它 P 也没的话,调 new factory 创建新的返回。
  2. Pool 里的对象不是永生的,老的实现,对象如果仅由 Pool 引用,那么会在下次 GC 之间被销毁。但是最新优化 22950 里,为了优化 GC 后 Pool 为空导致的冷启动性能抖动,增加了 victim cache, 用来保存上一次 GC 本应被销毁的对象,也就是说,对象至少存活两次 GC 间隔。
  3. 性能优化,将本地队列变成无锁队列( 单生产者,多消费者模型,严格来讲不通用),还有一些 fix bug...

数据结构及演进

type Pool struct {
    local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
    localSize uintptr        // size of the local array

    // New optionally specifies a function to generate
    // a value when Get would otherwise return nil.
    // It may not be changed concurrently with calls to Get.
    New func() interface{}
}

// Local per-P Pool appendix.
type poolLocal struct {
    private interface{}   // Can be used only by the respective P.
    shared  []interface{} // Can be used by any P.
    Mutex                 // Protects shared.
    pad     [128]byte     // Prevents false sharing.
}

对象是存储在 poolLocal 里的,private 字段表示最新生成的单个对象,只能由本地 P 访问,shared 是一个 slice, 可以被任意 P 访问,Mutex 用来保护 shared. pad 用来对齐,作用参考我之前的 cpu cache

再加头看 Pool 结构体,New 是创建对象的工厂方法。local 是一个指向 []poolLocal 的指针(准确说,是 slice 底层数组的首地址),localSize 是 slice 的长度,由于 P 的个数是可以在线调整的,所以 localSize 运行时可能会变化。访问时,P 的 id 对应 []poolLocal 下标索引。

type Pool struct {
    noCopy noCopy

    local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
    localSize uintptr        // size of the local array

    victim     unsafe.Pointer // local from previous cycle
    victimSize uintptr        // size of victims array

    // New optionally specifies a function to generate
    // a value when Get would otherwise return nil.
    // It may not be changed concurrently with calls to Get.
    New func() interface{}
}

// Local per-P Pool appendix.
type poolLocalInternal struct {
    private interface{} // Can be used only by the respective P.
    shared  poolChain   // Local P can pushHead/popHead; any P can popTail.
}

type poolLocal struct {
    poolLocalInternal

    // Prevents false sharing on widespread platforms with
    // 128 mod (cache line size) = 0 .
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
  1. Pool 增加了 noCopy 字段,Pool 默认创建后禁止拷贝,必须使用指针。noCopy 用来编绎时 go vet 检查,静态语言就是爽,编绎期干了好多脏活累活。参考 issue 8005 , 里面有很多讨论,关于禁止拷贝如何实现。
  2. 增加 victim cache, 以减少 GC 后冷启动导致的性能抖动。
  3. poolLocal 拆成了两个结构体,pad 实现也稍微变了下,为了兼容更多硬件 cache line size. 另外最重要的优化,就是 shared slice 变成了无锁队列。

第一版本实现

对象 put

// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
    if raceenabled {
        // Under race detector the Pool degenerates into no-op.
        // It's conforming, simple and does not introduce excessive
        // happens-before edges between unrelated goroutines.
        return
    }
    if x == nil {
        return
    }
    l := p.pin()
    if l.private == nil {
        l.private = x
        x = nil
    }
    runtime_procUnpin()
    if x == nil {
        return
    }
    l.Lock()
    l.shared = append(l.shared, x)
    l.Unlock()
}

逻辑很简单,先 pin 住,如果 private 字段为空,将对象放到 private 字段,否则添加到 share 池里。

对象 get

func (p *Pool) Get() interface{} {
    if raceenabled { // race 检测时禁用 Pool 功能,后续去掉了这个
        if p.New != nil {
            return p.New()
        }
        return nil
    }
    l := p.pin() // pin 会禁止 P 被抢占,并返回本地 P 对应的 poolLocal 信息。
    x := l.private
    l.private = nil
    runtime_procUnpin()
    if x != nil { // 如果 private 有了,就不用去看 share 直接返回就好
        return x
    }
    l.Lock() // 上锁保护 share
    last := len(l.shared) - 1
    if last >= 0 {
        x = l.shared[last]
        l.shared = l.shared[:last]
    }
    l.Unlock()
    if x != nil { // 此时从 share 中拿到了对象,返回即可
        return x
    }
    return p.getSlow() // 走慢的逻辑:从其它 P 偷或是调用 new 工厂方法创建
}

func (p *Pool) getSlow() (x interface{}) {
    // See the comment in pin regarding ordering of the loads.
    size := atomic.LoadUintptr(&p.localSize) // load-acquire
    local := p.local                         // load-consume
    // Try to steal one element from other procs.
    pid := runtime_procPin()
    runtime_procUnpin()
    for i := 0; i < int(size); i++ { // 轮循从下一个 P 本地队列偷数据
        l := indexLocal(local, (pid+i+1)%int(size))
        l.Lock()
        last := len(l.shared) - 1
        if last >= 0 {
            x = l.shared[last]
            l.shared = l.shared[:last]
            l.Unlock()
            break
        }
        l.Unlock()
    }

    if x == nil && p.New != nil { // 其它 P 中也没偷到,New 一个
        x = p.New()
    }
    return x
}

从这里,可以看到大体逻辑,和之前描述基本一致,那具体 pin 如何实现的呢?有什么作用呢?接着看源码

func sync·runtime_procPin() (p int) {
    M *mp;

    mp = m;
    // Disable preemption.
    mp->locks++;
    p = mp->p->id;
}

func sync·runtime_procUnpin() {
    m->locks--;
}

实际上 sync·runtime_procPinsync·runtime_procUnpin 就是针对 M 进行加锁,防止被 runtime 抢占而己。Pin 除了上锁,会返回 P 的 id

// pin pins the current goroutine to P, disables preemption and returns poolLocal pool for the P.
// Caller must call runtime_procUnpin() when done with the pool.
func (p *Pool) pin() *poolLocal {
    pid := runtime_procPin()
    // In pinSlow we store to localSize and then to local, here we load in opposite order.
    // Since we've disabled preemption, GC can not happen in between.
    // Thus here we must observe local at least as large localSize.
    // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
    s := atomic.LoadUintptr(&p.localSize) // load-acquire 获取 []poolLocal slice 长度
    l := p.local                          // load-consume 获取 []poolLocal 首地址
    if uintptr(pid) < s { // 由于 P 的 id 就是 []poolLocal 下标
        return indexLocal(l, pid)
    }
    return p.pinSlow()
}

func (p *Pool) pinSlow() *poolLocal {
    // Retry under the mutex.
    // Can not lock the mutex while pinned.
    runtime_procUnpin()
    allPoolsMu.Lock()
    defer allPoolsMu.Unlock()
    pid := runtime_procPin()
    // poolCleanup won't be called while we are pinned.
    s := p.localSize
    l := p.local
    if uintptr(pid) < s { // pid 就是 slice 的下村,所以如果 pid 小于 s 就查找 slice
        return indexLocal(l, pid)
    }
    if p.local == nil { // 第一次使用,把 Pool 添加到全局 allPools 
        allPools = append(allPools, p)
    }
    // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. 走扩容逻辑
    size := runtime.GOMAXPROCS(0)
    local := make([]poolLocal, size)
    atomic.StorePointer((*unsafe.Pointer)(&p.local), unsafe.Pointer(&local[0])) // store-release
    atomic.StoreUintptr(&p.localSize, uintptr(size))                            // store-release
    return &local[pid]
}
    // l 是指针地地,做类型转换,然后返回下标 i 的 poolLocal
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
    return &(*[1000000]poolLocal)(l)[i]
}

pin 的作用将当前 goroutine 和 P 进行绑定,禁止抢占,然后返回当前 P 所对应的 poolLocal 结构体。

pinSlow
pinSlow

可以看到,整体实现不是很复杂,最新版本与第一版变化不太大。

对象 cleanup

func poolCleanup() {
    // This function is called with the world stopped, at the beginning of a garbage collection.
    // It must not allocate and probably should not call any runtime functions.
    // Defensively zero out everything, 2 reasons:
    // 1. To prevent false retention of whole Pools.
    // 2. If GC happens while a goroutine works with l.shared in Put/Get,
    //    it will retain whole Pool. So next cycle memory consumption would be doubled.
    for i, p := range allPools {
        allPools[i] = nil
        for i := 0; i < int(p.localSize); i++ {
            l := indexLocal(p.local, i)
            l.private = nil
            for j := range l.shared {
                l.shared[j] = nil
            }
            l.shared = nil
        }
    }
    allPools = []*Pool{}
}

var (
    allPoolsMu Mutex
    allPools   []*Pool
)

func init() {
    runtime_registerPoolCleanup(poolCleanup)
}

代码很简单,init 函数会将 poolCleanup 注册到 runtime, 在 GC 开始,STW 后执行,遍历 poolLocal 然后解引用即可。

indexLocal 性能优化

参见官方 commit ,修改如下

func indexLocal(l unsafe.Pointer, i int) *poolLocal {
-       return &(*[1000000]poolLocal)(l)[i]
+       lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
+       return (*poolLocal)(lp)
 }
Performance results on linux/amd64:

    name            old time/op  new time/op  delta
    Pool-4          19.1ns ± 2%  10.1ns ± 1%  -47.15%  (p=0.000 n=10+8)
    PoolOverflow-4  3.11µs ± 1%  2.10µs ± 2%  -32.66%  (p=0.000 n=10+10)

    Performance results on linux/386:

    name            old time/op  new time/op  delta
    Pool-4          20.0ns ± 2%  13.1ns ± 1%  -34.59%  (p=0.000 n=10+9)
    PoolOverflow-4  3.51µs ± 1%  2.49µs ± 0%  -28.99%  (p=0.000 n=10+8)

可以看到,修改后性能大幅提升,那么这次性能优化的原理是什么呢???原版本是转化成 [1000000]poolLocal 定长数组后寻址,一个是直接根据 offset 定位到指定内存,然后做 poolLocal 类型转换。先看下汇编实现

"".indexLocal STEXT nosplit size=20 args=0x18 locals=0x0
    0x0000 00000 (test.go:11)   TEXT    "".indexLocal(SB), NOSPLIT|ABIInternal, $0-24
    0x0000 00000 (test.go:11)   FUNCDATA    $0, gclocals·9fad110d66c97cf0b58d28cccea80b12(SB)
    0x0000 00000 (test.go:11)   FUNCDATA    $1, gclocals·7d2d5fca80364273fb07d5820a76fef4(SB)
    0x0000 00000 (test.go:11)   FUNCDATA    $3, gclocals·9a26515dfaeddd28bcbc040f1199f48d(SB)
    0x0000 00000 (test.go:12)   PCDATA  $2, $0
    0x0000 00000 (test.go:12)   PCDATA  $0, $0
    0x0000 00000 (test.go:12)   MOVQ    "".i+16(SP), AX
    0x0005 00005 (test.go:12)   PCDATA  $2, $1
    0x0005 00005 (test.go:12)   PCDATA  $0, $1
    0x0005 00005 (test.go:12)   MOVQ    "".l+8(SP), CX
    0x000a 00010 (test.go:12)   PCDATA  $2, $2
    0x000a 00010 (test.go:12)   LEAQ    (CX)(AX*8), AX
    0x000e 00014 (test.go:13)   PCDATA  $2, $0
    0x000e 00014 (test.go:13)   PCDATA  $0, $2
    0x000e 00014 (test.go:13)   MOVQ    AX, "".~r2+24(SP)
    0x0013 00019 (test.go:13)   RET
    0x0000 48 8b 44 24 10 48 8b 4c 24 08 48 8d 04 c1 48 89  H.D$.H.L$.H...H.
    0x0010 44 24 18 c3                                      D$..
"".indexLocal2 STEXT nosplit size=58 args=0x18 locals=0x8
    0x0000 00000 (test.go:16)   TEXT    "".indexLocal2(SB), NOSPLIT|ABIInternal, $8-24
    0x0000 00000 (test.go:16)   SUBQ    $8, SP
    0x0004 00004 (test.go:16)   MOVQ    BP, (SP)
    0x0008 00008 (test.go:16)   LEAQ    (SP), BP
    0x000c 00012 (test.go:16)   FUNCDATA    $0, gclocals·9fad110d66c97cf0b58d28cccea80b12(SB)
    0x000c 00012 (test.go:16)   FUNCDATA    $1, gclocals·7d2d5fca80364273fb07d5820a76fef4(SB)
    0x000c 00012 (test.go:16)   FUNCDATA    $3, gclocals·9fb7f0986f647f17cb53dda1484e0f7a(SB)
    0x000c 00012 (test.go:17)   PCDATA  $2, $1
    0x000c 00012 (test.go:17)   PCDATA  $0, $1
    0x000c 00012 (test.go:17)   MOVQ    "".l+16(SP), AX
    0x0011 00017 (test.go:17)   TESTB   AL, (AX)
    0x0013 00019 (test.go:17)   MOVQ    "".i+24(SP), CX
    0x0018 00024 (test.go:17)   CMPQ    CX, $1000000
    0x001f 00031 (test.go:17)   JCC 51
    0x0021 00033 (test.go:17)   LEAQ    (AX)(CX*8), AX
    0x0025 00037 (test.go:17)   PCDATA  $2, $0
    0x0025 00037 (test.go:17)   PCDATA  $0, $2
    0x0025 00037 (test.go:17)   MOVQ    AX, "".~r2+32(SP)
    0x002a 00042 (test.go:17)   MOVQ    (SP), BP
    0x002e 00046 (test.go:17)   ADDQ    $8, SP
    0x0032 00050 (test.go:17)   RET
    0x0033 00051 (test.go:17)   PCDATA  $0, $1
    0x0033 00051 (test.go:17)   CALL    runtime.panicindex(SB)
    0x0038 00056 (test.go:17)   UNDEF

indexLocal 是优化之后的,indexLocal2 是优化前的代码。可以看多,老版本多了个 CMPQ, 也就是查看是否数组越界的检查,多了层分支预测的逻辑。想不到吧,两种转换方式还有性能差距。

增加无锁队列

poolLocal.share 字段由 []interface{} 变成了 poolChain, 这个队列专为 Pool 而设计,单生产者多消费者,多消费者消费时使用 CAS 实现无锁,参见 commit . 个人觉得不如 dpdk ring 实现的好。

Currently, Pool stores each per-P shard's overflow in a slice
protected by a Mutex. In order to store to the overflow or steal from
another shard, a P must lock that shard's Mutex. This allows for
simple synchronization between Put and Get, but has unfortunate
consequences for clearing pools.

Pools are cleared during STW sweep termination, and hence rely on
pinning a goroutine to its P to synchronize between Get/Put and
clearing. This makes the Get/Put fast path extremely fast because it
can rely on quiescence-style coordination, which doesn't even require
atomic writes, much less locking.

The catch is that a goroutine cannot acquire a Mutex while pinned to
its P (as this could deadlock). Hence, it must drop the pin on the
slow path. But this means the slow path is not synchronized with
clearing. As a result,

1) It's difficult to reason about races between clearing and the slow
path. Furthermore, this reasoning often depends on unspecified nuances
of where preemption points can occur.

2) Clearing must zero out the pointer to every object in every Pool to
prevent a concurrent slow path from causing all objects to be
retained. Since this happens during STW, this has an O(# objects in
Pools) effect on STW time.

3) We can't implement a victim cache without making clearing even
slower.

This CL solves these problems by replacing the locked overflow slice
with a lock-free structure. This allows Gets and Puts to be pinned the
whole time they're manipulating the shards slice (Pool.local), which
eliminates the races between Get/Put and clearing. This, in turn,
eliminates the need to zero all object pointers, reducing clearing to
O(# of Pools) during STW.

In addition to significantly reducing STW impact, this also happens to
speed up the Get/Put fast-path and the slow path. It somewhat
increases the cost of PoolExpensiveNew, but we'll fix that in the next
CL.

name                 old time/op     new time/op     delta
Pool-12                 3.00ns ± 0%     2.21ns ±36%  -26.32%  (p=0.000 n=18+19)
PoolOverflow-12          600ns ± 1%      587ns ± 1%   -2.21%  (p=0.000 n=16+18)
PoolSTW-12              71.0µs ± 2%      5.6µs ± 3%  -92.15%  (p=0.000 n=20+20)
PoolExpensiveNew-12     3.14ms ± 5%     3.69ms ± 7%  +17.67%  (p=0.000 n=19+20)

name                 old p50-ns/STW  new p50-ns/STW  delta
PoolSTW-12               70.7k ± 1%       5.5k ± 2%  -92.25%  (p=0.000 n=20+20)

name                 old p95-ns/STW  new p95-ns/STW  delta
PoolSTW-12               73.1k ± 2%       6.7k ± 4%  -90.86%  (p=0.000 n=18+19)

name                 old GCs/op      new GCs/op      delta
PoolExpensiveNew-12       0.38 ± 1%       0.39 ± 1%   +2.07%  (p=0.000 n=20+18)

name                 old New/op      new New/op      delta
PoolExpensiveNew-12       33.9 ± 6%       40.0 ± 6%  +17.97%  (p=0.000 n=19+20)

完整的看下 Get 代码实现:

func (p *Pool) Get() interface{} {
    if race.Enabled {
        race.Disable()
    }
    l, pid := p.pin()
    x := l.private
    l.private = nil
    if x == nil {
        // Try to pop the head of the local shard. We prefer
        // the head over the tail for temporal locality of
        // reuse.
        x, _ = l.shared.popHead()
        if x == nil {
            x = p.getSlow(pid)
        }
    }
    runtime_procUnpin()
    if race.Enabled {
        race.Enable()
        if x != nil {
            race.Acquire(poolRaceAddr(x))
        }
    }
    if x == nil && p.New != nil {
        x = p.New()
    }
    return x
}

func (p *Pool) getSlow(pid int) interface{} {
    // See the comment in pin regarding ordering of the loads.
    size := atomic.LoadUintptr(&p.localSize) // load-acquire
    local := p.local                         // load-consume
    // Try to steal one element from other procs.
    for i := 0; i < int(size); i++ {
        l := indexLocal(local, (pid+i+1)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }
    return nil
}

具体无锁队列怎么实现的,就不贴了,各种 CAS... 没啥特别的。

增加 victim cache

为什么要增加 victim cache 看这个 22950 ,说白了,就是要减少 GC 清除所有 Pool 后的冷启动问题,让分配对象更平滑。参见 commit .

Currently, every Pool is cleared completely at the start of each GC.
This is a problem for heavy users of Pool because it causes an
allocation spike immediately after Pools are clear, which impacts both
throughput and latency.

This CL fixes this by introducing a victim cache mechanism. Instead of
clearing Pools, the victim cache is dropped and the primary cache is
moved to the victim cache. As a result, in steady-state, there are
(roughly) no new allocations, but if Pool usage drops, objects will
still be collected within two GCs (as opposed to one).

This victim cache approach also improves Pool's impact on GC dynamics.
The current approach causes all objects in Pools to be short lived.
However, if an application is in steady state and is just going to
repopulate its Pools, then these objects impact the live heap size *as
if* they were long lived. Since Pooled objects count as short lived
when computing the GC trigger and goal, but act as long lived objects
in the live heap, this causes GC to trigger too frequently. If Pooled
objects are a non-trivial portion of an application's heap, this
increases the CPU overhead of GC. The victim cache lets Pooled objects
affect the GC trigger and goal as long-lived objects.

This has no impact on Get/Put performance, but substantially reduces
the impact to the Pool user when a GC happens. PoolExpensiveNew
demonstrates this in the substantially reduction in the rate at which
the "New" function is called.

name                 old time/op     new time/op     delta
Pool-12                 2.21ns ±36%     2.00ns ± 0%     ~     (p=0.070 n=19+16)
PoolOverflow-12          587ns ± 1%      583ns ± 1%   -0.77%  (p=0.000 n=18+18)
PoolSTW-12              5.57µs ± 3%     4.52µs ± 4%  -18.82%  (p=0.000 n=20+19)
PoolExpensiveNew-12     3.69ms ± 7%     1.25ms ± 5%  -66.25%  (p=0.000 n=20+19)

name                 old p50-ns/STW  new p50-ns/STW  delta
PoolSTW-12               5.48k ± 2%      4.53k ± 2%  -17.32%  (p=0.000 n=20+20)

name                 old p95-ns/STW  new p95-ns/STW  delta
PoolSTW-12               6.69k ± 4%      5.13k ± 3%  -23.31%  (p=0.000 n=19+18)

name                 old GCs/op      new GCs/op      delta
PoolExpensiveNew-12       0.39 ± 1%       0.32 ± 2%  -17.95%  (p=0.000 n=18+20)

name                 old New/op      new New/op      delta
PoolExpensiveNew-12       40.0 ± 6%       12.4 ± 6%  -68.91%  (p=0.000 n=20+19)

重点在注释的第一段,以前 Pool 的原理:如果对象在 GC 时只有 Pool 引用这个对象,那么会在 GC 时被释放掉。但是对于 Pool 重度用户来讲,GC 后会有大量的对象分配创建,影响吞吐和性能。这个 patch 就是为了让更平滑,变成了对象至少存活两个 GC 区间。

func poolCleanup() {
    // This function is called with the world stopped, at the beginning of a garbage collection.
    // It must not allocate and probably should not call any runtime functions.

    // Because the world is stopped, no pool user can be in a
    // pinned section (in effect, this has all Ps pinned).

    // Drop victim caches from all pools.
    for _, p := range oldPools {
        p.victim = nil
        p.victimSize = 0
    }

    // Move primary cache to victim cache.
    for _, p := range allPools {
        p.victim = p.local
        p.victimSize = p.localSize
        p.local = nil
        p.localSize = 0
    }

    // The pools with non-empty primary caches now have non-empty
    // victim caches and no pools have primary caches.
    oldPools, allPools = allPools, nil
}

可以看下新版 poolCleanup 函数最后一行。使用时 Get 会在 slow path 逻辑里调用 victim cache.

总结

衡量一个基础组件,不仅要看他的性能,还要考滤稳定性,尤其是这种语言标准库。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK