0

动手实现一个localcache - 实现篇

 5 months ago
source link: https://segmentfault.com/a/1190000041289700
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

原文链接:动手实现一个localcache - 实现篇

哈喽,大家好,我是asong,经过了前面两篇的介绍,我们已经基本了解该如何设计一个本地缓存了,本文就是这个系列的终结篇,自己动手实现一个本地缓存,接下来且听我细细道来!!!

本文代码已经上传到github:https://github.com/asong2020/go-localcache

现在这一版本算是一个1.0,后续会继续进行优化和迭代。

第一步:抽象接口

第一步很重要,以面向接口编程为原则,我们先抽象出来要暴露给用户的方法,给用户提供简单易懂的方法,因此我抽象出来的结果如下:

// ICache abstract interface
type ICache interface {
    // Set value use default expire time. default does not expire.
    Set(key string, value []byte) error
    // Get value if find it. if value already expire will delete.
    Get(key string) ([]byte, error)
    // SetWithTime set value with expire time
    SetWithTime(key string, value []byte, expired time.Duration) error
    // Delete manual removes the key
    Delete(key string) error
    // Len computes number of entries in cache
    Len() int
    // Capacity returns amount of bytes store in the cache.
    Capacity() int
    // Close is used to signal a shutdown of the cache when you are done with it.
    // This allows the cleaning goroutines to exit and ensures references are not
    // kept to the cache preventing GC of the entire cache.
    Close() error
    // Stats returns cache's statistics
    Stats() Stats
    // GetKeyHit returns key hit
    GetKeyHit(key string) int64
}
  • Set(key string, value []byte):使用该方法存储的数据使用默认的过期时间,如果清除过期的异步任务没有enable,那么就永不过期,否则默认过期时间为10min。
  • Get(key string) ([]byte, error):根据key获取对象内容,如果数据过期了会在这一步删除。
  • SetWithTime(key string, value []byte, expired time.Duration):存储对象是使用自定义过期时间
  • Delete(key string) error:根据key删除对应的缓存数据
  • Len() int:获取缓存的对象数量
  • Capacity() int:获取当前缓存的容量
  • Close() error:关闭缓存
  • Stats() Stats:缓存监控数据
  • GetKeyHit(key string) int64:获取key的命中率数据

第二步:定义缓存对象

第一步我们抽象好了接口,下面就要定义一个缓存对象实例实现接口,先看定义结构:

type cache struct {
    // hashFunc represents used hash func
    hashFunc HashFunc
    // bucketCount represents the number of segments within a cache instance. value must be a power of two.
    bucketCount uint64
    // bucketMask is bitwise AND applied to the hashVal to find the segment id.
    bucketMask uint64
    // segment is shard
    segments []*segment
    // segment lock
    locks    []sync.RWMutex
    // close cache
    close chan struct{}
}
  • hashFunc:分片要用的哈希函数,用户可以自行定义,实现HashFunc接口即可,默认使用fnv算法。
  • bucketCount:分片的数量,一定要是偶数,默认分片数为256
  • bucketMask:因为分片数是偶数,所以可以分片时可以使用位运算代替取余提升性能效率,hashValue % bucketCount == hashValue & bucketCount - 1
  • segments:分片对象,每个分片的对象结构我们在后面介绍。
  • locks:每个分片的读写锁
  • close:关闭缓存对象时通知其他goroutine暂停

接下来我们来写cache对象的构造函数:

// NewCache constructor cache instance
func NewCache(opts ...Opt) (ICache, error) {
    options := &options{
        hashFunc: NewDefaultHashFunc(),
        bucketCount: defaultBucketCount,
        maxBytes: defaultMaxBytes,
        cleanTime: defaultCleanTIme,
        statsEnabled: defaultStatsEnabled,
        cleanupEnabled: defaultCleanupEnabled,
    }
    for _, each := range opts{
        each(options)
    }

    if !isPowerOfTwo(options.bucketCount){
        return nil, errShardCount
    }

  if options.maxBytes <= 0 {
        return nil, ErrBytes
    }
  
    segments := make([]*segment, options.bucketCount)
    locks := make([]sync.RWMutex, options.bucketCount)

    maxSegmentBytes := (options.maxBytes + options.bucketCount - 1) / options.bucketCount
    for index := range segments{
        segments[index] = newSegment(maxSegmentBytes, options.statsEnabled)
    }

    c := &cache{
        hashFunc: options.hashFunc,
        bucketCount: options.bucketCount,
        bucketMask: options.bucketCount - 1,
        segments: segments,
        locks: locks,
        close: make(chan struct{}),
    }
    if options.cleanupEnabled {
        go c.cleanup(options.cleanTime)
    }
    
    return c, nil
}

这里为了更好的扩展,我们使用Options编程模式,我们的构造函数主要做三件事:

  • 前置参数检查,对于外部传入的参数,我们还是要做基本的校验
  • 分片对象初始化
  • 构造缓存对象

这里构造缓存对象时我们要先计算每个分片的容量,默认整个本地缓存256M的数据,然后在平均分到每一片区内,用户可以自行选择要缓存的数据大小。

第三步:定义分片结构

每个分片结构如下:

type segment struct {
    hashmap map[uint64]uint32
    entries buffer.IBuffer
    clock   clock
    evictList  *list.List
    stats IStats
}
  • hashmp:存储key所对应的存储索引
  • entries:存储key/value的底层结构,我们在第四步的时候介绍,也是代码的核心部分。
  • clock:定义时间方法
  • evicList:这里我们使用一个队列来记录old索引,当容量不足时进行删除(临时解决方案,当前存储结构不适合使用LRU淘汰算法)
  • stats:缓存的监控数据。

接下来我们再来看一下每个分片的构造函数:

func newSegment(bytes uint64, statsEnabled bool) *segment {
    if bytes == 0 {
        panic(fmt.Errorf("bytes cannot be zero"))
    }
    if bytes >= maxSegmentSize{
        panic(fmt.Errorf("too big bytes=%d; should be smaller than %d", bytes, maxSegmentSize))
    }
    capacity := (bytes + segmentSize - 1) / segmentSize
    entries := buffer.NewBuffer(int(capacity))
    entries.Reset()
    return &segment{
        entries: entries,
        hashmap: make(map[uint64]uint32),
        clock:   &systemClock{},
        evictList: list.New(),
        stats: newStats(statsEnabled),
    }
}

这里主要注意一点:

我们要根据每个片区的缓存数据大小来计算出容量,与上文的缓存对象初始化步骤对应上了。

第四步:定义缓存结构

缓存对象现在也构造好了,接下来就是本地缓存的核心:定义缓存结构。

bigcachefastcachefreecache都使用字节数组代替map存储缓存数据,从而减少GC压力,所以我们也可以借鉴其思想继续保持使用字节数组,这里我们使用二维字节切片存储缓存数据key/value;画个图表示一下:

使用二维数组存储数据的相比于bigcache的优势在于可以直接根据索引删除对应的数据,虽然也会有虫洞的问题,但是我们可以记录下来虫洞的索引,不断填充。

每个缓存的封装结构如下:

基本思想已经明确,接下来看一下我们对存储层的封装:

type Buffer struct {
    array [][]byte
    capacity int
    index int
    // maxCount = capacity - 1
    count int
    // availableSpace If any objects are removed after the buffer is full, the idle index is logged.
    // Avoid array "wormhole"
    availableSpace map[int]struct{}
    // placeholder record the index that buffer has stored.
    placeholder map[int]struct{}
}
  • array [][]byte:存储缓存对象的二维切片
  • capacity:缓存结构的最大容量
  • index:索引,记录缓存所在的位置的索引
  • count:记录缓存数量
  • availableSpace:记录"虫洞",当缓存对象被删除时记录下空闲位置的索引,方便后面容量满了后使用"虫洞"
  • placeholder:记录缓存对象的索引,迭代清除过期缓存可以用上。

buffer写入数据的流程(不贴代码了):

<img src="https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/4e339104a34d4fabb45bc45a4a830a3a~tplv-k3u1fbpfcp-zoom-1.image" style="zoom: 33%;" />

第五步:完善向缓存写入数据方法

上面我们定义好了所有需要的结构,接下来就是填充我们的写入缓存方法就可以了:

func (c *cache) Set(key string, value []byte) error  {
    hashKey := c.hashFunc.Sum64(key)
    bucketIndex := hashKey&c.bucketMask
    c.locks[bucketIndex].Lock()
    defer c.locks[bucketIndex].Unlock()
    err := c.segments[bucketIndex].set(key, hashKey, value, defaultExpireTime)
    return err
}

func (s *segment) set(key string, hashKey uint64, value []byte, expireTime time.Duration) error {
    if expireTime <= 0{
        return ErrExpireTimeInvalid
    }
    expireAt := uint64(s.clock.Epoch(expireTime))

    if previousIndex, ok := s.hashmap[hashKey]; ok {
        if err := s.entries.Remove(int(previousIndex)); err != nil{
            return err
        }
        delete(s.hashmap, hashKey)
    }

    entry := wrapEntry(expireAt, key, hashKey, value)
    for {
        index, err := s.entries.Push(entry)
        if err == nil {
            s.hashmap[hashKey] = uint32(index)
            s.evictList.PushFront(index)
            return nil
        }
        ele := s.evictList.Back()
        if err := s.entries.Remove(ele.Value.(int)); err != nil{
            return err
        }
        s.evictList.Remove(ele)
    }
}

流程分析如下:

  • 根据key计算哈希值,然后根据分片数获取对应分片位置
  • 如果当前缓存中存在相同的key,则先删除,在重新插入,会刷新过期时间
  • 封装存储结构,根据过期时间戳、key长度、哈希大小、缓存对象进行封装
  • 将数据存入缓存,如果缓存失败,移除最老的数据后再次重试

第六步:完善从缓存读取数据方法

第一步根据key计算哈希值,再根据分片数获取对应的分片位置:

func (c *cache) Get(key string) ([]byte, error)  {
    hashKey := c.hashFunc.Sum64(key)
    bucketIndex := hashKey&c.bucketMask
    c.locks[bucketIndex].RLock()
    defer c.locks[hashKey&c.bucketMask].RUnlock()
    entry, err := c.segments[bucketIndex].get(key, hashKey)
    if err != nil{
        return nil, err
    }
    return entry,nil
}

第二步执行分片方法获取缓存数据:

  • 先根据哈希值判断key是否存在于缓存中,不存返回key没有找到
  • 从缓存中读取数据得到缓存中的key判断是否发生哈希冲突
  • 判断缓存对象是否过期,过期删除缓存数据(可以根据业务优化需要是否返回当前过期数据)
  • 在每个记录缓存监控数据
func (s *segment) getWarpEntry(key string, hashKey uint64) ([]byte,error) {
    index, ok := s.hashmap[hashKey]
    if !ok {
        s.stats.miss()
        return nil, ErrEntryNotFound
    }
    entry, err := s.entries.Get(int(index))
    if err != nil{
        s.stats.miss()
        return nil, err
    }
    if entry == nil{
        s.stats.miss()
        return nil, ErrEntryNotFound
    }

    if entryKey := readKeyFromEntry(entry); key != entryKey {
        s.stats.collision()
        return nil, ErrEntryNotFound
    }
    return entry, nil
}

func (s *segment) get(key string, hashKey uint64) ([]byte, error) {
    currentTimestamp := s.clock.TimeStamp()
    entry, err := s.getWarpEntry(key, hashKey)
    if err != nil{
        return nil, err
    }
    res := readEntry(entry)

    expireAt := int64(readExpireAtFromEntry(entry))
    if currentTimestamp - expireAt >= 0{
        _ = s.entries.Remove(int(s.hashmap[hashKey]))
        delete(s.hashmap, hashKey)
        return nil, ErrEntryNotFound
    }
    s.stats.hit(key)

    return res, nil
}

第七步:来个测试用例体验一下

先来个简单的测试用例测试一下:

func (h *cacheTestSuite) TestSetAndGet() {
    cache, err := NewCache()
    assert.Equal(h.T(), nil, err)
    key := "asong"
    value := []byte("公众号:Golang梦工厂")

    err = cache.Set(key, value)
    assert.Equal(h.T(), nil, err)

    res, err := cache.Get(key)
    assert.Equal(h.T(), nil, err)
    assert.Equal(h.T(), value, res)
    h.T().Logf("get value is %s", string(res))
}

运行结果:

=== RUN   TestCacheTestSuite
=== RUN   TestCacheTestSuite/TestSetAndGet
    cache_test.go:33: get value is 公众号:Golang梦工厂
--- PASS: TestCacheTestSuite (0.00s)
    --- PASS: TestCacheTestSuite/TestSetAndGet (0.00s)
PASS

大功告成,基本功能通了,剩下就是跑基准测试、优化、迭代了(不在文章赘述了,可以关注github仓库最新动态)。

实现篇到这里就结束了,但是这个项目的编码仍未结束,我会继续以此版本为基础不断迭代优化,该本地缓存的优点:

  • 实现简单、提供给用户的方法简单易懂
  • 使用二维切片作为存储结构,避免了不能删除底层数据的缺点,也在一定程度上避免了"虫洞"问题。
  • 测试用例齐全,适合作为小白的入门项目

待优化点:

  • 没有使用高效的缓存淘汰算法,可能会导致热点数据被频繁删除
  • 定时删除过期数据会导致锁持有时间过长,需要优化
  • 关闭缓存实例需要优化处理方式
  • 根据业务场景进行优化(特定业务场景)
  • 添加异步加载缓存功能
  • ...... (思考中)

本文代码已经上传到github:https://github.com/asong2020/go-localcache

好啦,本文到这里就结束了,我是asong,我们下期见。

欢迎关注公众号:【Golang梦工厂】


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK