2

victoriaMetrics之byteBuffer - charlieroro

 2 years ago
source link: https://www.cnblogs.com/charlieroro/p/16110151.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

victoriaMetrics之byteBuffer

VictoriaMetrics经常会处理数目庞大的指标,在处理的过程中会涉及指标的拷贝,如果在指标拷贝时都进行内存申请的话,其内存消耗和性能损耗都非常大。victoriaMetrics使用byteBuffer来复用内存,提升性能,其核心就是用了sync.pool。下面主要看下它是如何结合sync.pool运作的。

ByteBuffer的结构体如下,只包含一个切片:

type ByteBuffer struct {
	// B is the underlying byte slice.
	B []byte
}

ByteBufferPool的用法

为了服用ByteBuffer,victoriaMetrics用到了ByteBufferPool,与常见的sync.Pool用法相同,包含一个Get和一个Put函数。

// ByteBufferPool is a pool of ByteBuffers.
type ByteBufferPool struct {
	p sync.Pool
}

// Get obtains a ByteBuffer from bbp.
func (bbp *ByteBufferPool) Get() *ByteBuffer {
	bbv := bbp.p.Get()
	if bbv == nil {
		return &ByteBuffer{}
	}
	return bbv.(*ByteBuffer)
}

// Put puts bb into bbp.
func (bbp *ByteBufferPool) Put(bb *ByteBuffer) {
	bb.Reset()
	bbp.p.Put(bb)
}

Put函数用于将ByteBuffer返回给资源池,为了防止下次使用的时候出现无效数据,在返回给sync.Pool之前需要清空切片内存,其使用的Reset函数如下,bb.B = bb.B[:0]也是一种常见的清空切片内容的方式:

func (bb *ByteBuffer) Reset() {
	bb.B = bb.B[:0]
}

ByteBuffer实现了io.Writerio.ReaderFrom接口。

Writer接口实现

实现的write接口如下,比较简单,只是简单地将入参数据添加到byteBuffer中。在append的时候会增加切片的容量。

func (bb *ByteBuffer) Write(p []byte) (int, error) {
	bb.B = append(bb.B, p...)
	return len(p), nil
}

ReaderFrom接口实现

ReaderFrom中比较有意思的是看它是如何预分配容量,以及在容量不足的情况下,如何进行扩容。其核心思想是使用make预先申请一块内存,而不是通过append来让底层自动扩容。

  1. 首先获取b的长度,表示切片中已有的数据长度

  2. 由于ByteBuffer可能来自ByteBufferPool.Get,因此,其切片容量可能无法满足数据读取的需要,此时用到了ResizeWithCopyMayOverallocateResizeWithCopyMayOverallocate确保切片的容量不小于n字节,如果容量足够,则返回长度为n的子切片,否则申请新的切片,并返回长度为n的子切片。roundToNearestPow2会找出最接近n的2的幂的数值,以此作为新切片的容量。

    // ResizeNoCopyMayOverallocate resizes b to minimum n bytes and returns the resized buffer (which may be newly allocated).
    //
    // If newly allocated buffer is returned then b contents isn't copied to it.
    func ResizeNoCopyMayOverallocate(b []byte, n int) []byte {
    	if n <= cap(b) {
    		return b[:n]
    	}
    	nNew := roundToNearestPow2(n)
    	bNew := make([]byte, nNew)
    	return bNew[:n]
    }
    
    // roundToNearestPow2 rounds n to the nearest power of 2
    //
    // It is expected that n > 0
    func roundToNearestPow2(n int) int {
    	pow2 := uint8(bits.Len(uint(n - 1)))
    	return 1 << pow2
    }
    
  3. 将b的长度等于容量

  4. 设置offset为b中已有的数据偏移量

  5. 获取剩余的容量free,如果剩余的容量不足一半(free < offset),则将容量翻倍

  6. 将数据读取到offset之后的存储中,并增加偏移量

  7. Read操作返回错误时,将ByteBuffer中的切片长度设置为b,如果返回错误为EOF,则视为数据读取完成。

// ReadFrom reads all the data from r to bb until EOF.
func (bb *ByteBuffer) ReadFrom(r io.Reader) (int64, error) {
	b := bb.B
	bLen := len(b)//1
	b = ResizeWithCopyMayOverallocate(b, 4*1024) //2
	b = b[:cap(b)]//3
	offset := bLen//4
	for {
		if free := len(b) - offset; free < offset {//5
			n := len(b)
			b = append(b, make([]byte, n)...)
		}
		n, err := r.Read(b[offset:])//6
		offset += n
		if err != nil {//7
			bb.B = b[:offset]
			if err == io.EOF {
				err = nil
			}
			return int64(offset - bLen), err//9
		}
	}
}

后续可以使用该库来满足从io.Reader中读取数据,而不用担心buffer不足,借助ByteBufferPool可以有效地复用buffer。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK