37

golang标准库sync.Pool原理及源码简析 - 无远弗届

 4 years ago
source link: http://cbsheng.github.io/posts/golang%E6%A0%87%E5%87%86%E5%BA%93sync.pool%E5%8E%9F%E7%90%86%E5%8F%8A%E6%BA%90%E7%A0%81%E7%AE%80%E6%9E%90/?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

golang标准库sync.Pool原理及源码简析


pool关键作用:

  1. 减轻GC的压力。
  2. 复用对象内存。有时不一定希望复用内存,单纯是想减轻GC压力也可主动给pool塞对象。

Pool’s purpose is to cache allocated but unused items for later reuse, relieving pressure on the garbage collector. That is, it makes it easy to build efficient, thread-safe free lists. However, it is not suitable for all free lists.

sync.Pool就是围绕New字段、Get和Put方法来使用。用过都懂,比较简单就不介绍了。

Go是提供goroutine进行并发编程,在并发环境下,sync.Pool的使用不会造成严重性能问题是它的设计考虑点。

容易想到的方法是Pool对象为每个P都分配一个空间,这样在P上运行的G进行Get和Put操作时,就可以在P本地的空间上读写。这样方法比Pool对象维护一个全局空间有明显好处,全局空间的读写肯定要加锁。

即使每个P都有了自己的本地空间,也不是说就可以完全避免锁使用。不要忘了Pool提供了内存复用功效,每个P上的G都使用的是P本地的空间的话,那内存复用就有局限性,只能局限在一个P上。

而pool提供的内存复用是覆盖所有P。意思是,一个G在执行Get方法时,发生G所在的P上,没有可复用的对象。这时就到别的P那儿去偷。偷这个动作就要加锁了。因为偷取别人可复用对象时候,别人也可能同时在读写。

前面开始说每个P有自己的空间,作用是避免锁,后面又说到别的P上偷对象,又要加锁。是不是矛盾了。

不矛盾,让我们来看看sync.Pool的实现原理。

sync.Pool对象底层两个关键字段,local和localSize,前者是指向一个数组,数组大小存在localSize。localSize的大小跟P个数保持一致。数组每个元素就是代表每个P自己的本地空间,类型是poolLocal

poolLocal类型有两个关键字段,private和shared

  • shared是一个数组,读写要加锁。
  • private只能存一个对象,读写不加锁。

来理一下在Pool对象上读写的逻辑:

  1. Get操作时,先返回本地P上的private上的对象。
  2. 如果private为空,继续从本地P上的shared找,这里要加锁。
  3. 如果shared也没有,就到别的P那儿,从shared里偷。
  4. 所有其它P都遍历过了,没有任何对象可偷。就返回nil或调用New函数。

  5. Put操作时,优先放private。

  6. private已经被放了,那就放到shared的最后。

用一张图来表示:

sync_pool

sync.Pool的特性

  • 无大小限制。
  • 自动清理,每次GC前会清掉Pool里的所有对象。所以不适用于做连接池。
  • 每个P都会有一个本地的poolLocal,Get和Put优先在当前P的本地poolLocal操作。其次再进行跨P操作。
  • 所以Pool的最大个数是runtime.GOMAXPROCS(0)。

sync.Pool的缺点

pool的Get()并非成本低廉,最坏情况可能会上锁runtime.GOMAXPROCS(0)次。

所以,多Goroutine与多P的情况下,使用Pool的效果才会突显。否则要经历无谓的锁成本。

简单的常用场景

bytes.Buffer作为临时对象放在池子里,这样减轻每次都需要创建的消耗。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
type Dao struct {
    bp      sync.Pool
}

func New(c *conf.Config) (d *Dao) {
    d = &Dao{
        bp: sync.Pool{
            New: func() interface{} {
                return &bytes.Buffer{}
            },
        },
    }
    return
}

func (d *Dao) Infoc(args ...string) (value string, err error) {
    if len(args) == 0 {
        return
    }

    // fetch a buf from bufpool
    buf, ok := d.bp.Get().(*bytes.Buffer)
    if !ok {
        return "", ErrType
    }

    // append first arg
    if _, err := buf.WriteString(args[0]); err != nil {
        return "", err
    }

    for _, arg := range args[1:] {
        // append ,arg
        if _, err := buf.WriteString(defaultSpliter); err != nil {
            return "", err
        }

        if _, err := buf.WriteString(strings.Replace(arg, defaultSpliter, defaultReplacer, -1)); err != nil {
            return "", err
        }
    }

    value = buf.String()
    buf.Reset()
    d.bp.Put(buf)
    return

}

带注释的源码

sync.Pool数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// pool 的数据结构
type Pool struct {
	noCopy noCopy
	// 指向一个数组,个数与P相等,每个元素的类型为poolLocalInternal
	local     unsafe.Pointer
	// local数组的大小
	localSize uintptr
	// 创建pool对象时,用户必须提供的new函数
	New func() interface{}
}

type poolLocalInternal struct {
	// 私有对象,每个P都有,用于不同g执行get和put可以无锁操作
	private interface{}
	// 共享对象数组,每个P都有一个,同一个P上不同g可以多次执行put方法,需要有地方能存储。并且别的p上的g可能过来偷,所以要加锁
	shared  []interface{}
	// 对shared进行加锁,private不用加锁
	Mutex
}

type poolLocal struct {
	poolLocalInternal

	// Prevents false sharing on widespread platforms with
	// 128 mod (cache line size) = 0 .
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

Get方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
	// ...
	// 拿到当前P对应的pool
	l := p.pin()
	if l.private == nil {
		// 私有区有位置的话直接放私有区
		l.private = x
		x = nil
	}
	runtime_procUnpin()
	if x != nil {
		// 否则放在共享区里
		l.Lock()
		l.shared = append(l.shared, x)
		l.Unlock()
	}
  // ...
}

func (p *Pool) pin() *poolLocal {
	// 拿到当前P的ID
	pid := runtime_procPin()
	s := atomic.LoadUintptr(&p.localSize)
	l := p.local
	if uintptr(pid) < s {
		// 定义pool对象时,s取值为0。只有经过pinSlow后,p.localSize的值才被设置
		// 如果local数组已经初始化,就可以把对应P的本地pool返回
		return indexLocal(l, pid)
	}
	// 否则就得重建local
	return p.pinSlow()
}

func (p *Pool) pinSlow() *poolLocal {
	runtime_procUnpin()
	// 锁上所有的pool对象
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	pid := runtime_procPin()
	s := p.localSize
	l := p.local
	if uintptr(pid) < s {
    // pinSlow是一个创建local的方法。在获得allPoolsMu锁前,可能被别的P先获取,这种情况下local就已经被初始化了
		// 所以在获得allPoolsMu锁后需要再检查一次uintptr(pid) < s
		return indexLocal(l, pid)
	}
	if p.local == nil {
		allPools = append(allPools, p)
	}
	// local的大小默认就是P的个数
	size := runtime.GOMAXPROCS(0)
	local := make([]poolLocal, size)
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // 设置local
	atomic.StoreUintptr(&p.localSize, uintptr(size))         // 设置localSize
	return &local[pid]
}

Put方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
	// ...
	// 拿到当前P对应的pool
	l := p.pin()
	if l.private == nil {
		// 私有区有位置的话直接放私有区
		l.private = x
		x = nil
	}
	runtime_procUnpin()
	if x != nil {
		// 否则放在共享区里
		l.Lock()
		l.shared = append(l.shared, x)
		l.Unlock()
	}
	// ...
}

runtime_procPin和runtime_procUnpin

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//go:linkname sync_runtime_procPin sync.runtime_procPin
//go:nosplit
func sync_runtime_procPin() int {
	return procPin()
}

//go:nosplit
func procPin() int {
	_g_ := getg()
	mp := _g_.m

	mp.locks++
	return int(mp.p.ptr().id)
}

//go:linkname sync_atomic_runtime_procUnpin sync/atomic.runtime_procUnpin
//go:nosplit
func sync_atomic_runtime_procUnpin() {
	procUnpin()
}

//go:nosplit
func procUnpin() {
	_g_ := getg()
	_g_.m.locks--
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK