19

Netty内存池之PoolThreadCache详解

 4 years ago
source link: https://www.tuicool.com/articles/uiINJvZ
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

PoolThreadCahche是Netty内存管理中能够实现高效内存申请和释放的一个重要原因,Netty会为每一个线程都维护一个PoolThreadCache对象,当进行内存申请时,首先会尝试从PoolThreadCache中申请,如果无法从中申请到,则会尝试从Netty的公共内存池中申请。本文首先会对PoolThreadCache的数据结构进行讲解,然后会介绍Netty是如何初始化PoolThreadCache的,最后会介绍如何在PoolThreadCache中申请内存和如何将内存释放到PoolThreadCache中。

1. PoolThreadCache数据结构

PoolThreadCache的数据结构与PoolArena的主要属性结构非常相似,但细微位置有很大的不同。在PoolThreadCache中,其维护了三个数组(我们以直接内存的缓存方式为例进行讲解),如下所示:

// 存储tiny类型的内存缓存,该数组长度为32,其中只有下标为1~31的元素缓存了有效数据,第0号位空置。
// 这里内存大小的存储方式也与PoolSubpage类似,数组的每一号元素都存储了不同等级的内存块,每个等级的
// 内存块的内存大小差值为16byte,比如第1号位维护了大小为16byte的内存块,第二号为维护了大小为32byte的
// 内存块,依次类推,第31号位维护了大小为496byte的内存块。
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
// 存储small类型的内存缓存,该数组长度为4,数组中每个元素中维护的内存块大小也是成等级递增的,并且这里
// 的递增方式是按照2的指数次幂进行的,比如第0号为维护的是大小为512byte的内存块,第1号位维护的是大小为
// 1024byte的内存块,第2号位维护的是大小为2048byte的内存块,第3号位维护的是大小为4096byte的内存块
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
// 存储normal类型的内存缓存。需要注意的是,这里虽说是维护的normal类型的缓存,但是其只维护2<<13,2<<14
// 和2<<15三个大小的内存块,而该数组的大小也正好为3,因而这三个大小的内存块将被依次放置在该数组中。
// 如果申请的目标内存大于2<<15,那么Netty会将申请动作交由PoolArena进行。
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

这三个数组分别保存了tiny,small和normal类型的缓存数据,不同于PoolArena的使用PoolSubpage和PoolChunk进行内存的维护,这里都是使用MemoryRegionCache进行的。另外,在MemoryRegionCache中保存了一个有界队列,对于tiny类型的缓存,该队列的长度为512,对于small类型的缓存,该队列的长度为256,对于normal类型的缓存,该队列的长度为64。在进行内存释放的时候,如果队列已经满了,那么就会将该内存块释放回PoolArena中。这里需要说明的是,这里的队列中的元素统一使用的是Entry这种数据结构,该结构的主要属性如下:

static final class Entry<T> {
  // 用于循环利用当前Entry对象的处理器,该处理器的实现原理,我们后续将进行讲解
  final Handle<Entry<?>> recyclerHandle;
  // 记录了当前内存块是从哪一个PoolChunk中申请得来的
  PoolChunk<T> chunk;
  // 如果是直接内存,该属性记录了当前内存块所在的ByteBuffer对象
  ByteBuffer nioBuffer;
  // 由于当前申请的内存块在PoolChunk以及PoolSubpage中的位置是可以通过一个长整型参数来表示的,
  // 这个长整型参数就是这里的handle,因而这里直接将其记录下来,以便后续需要将当前内存块释放到
  // PoolArena中时,能够快速获取其所在的位置
  long handle = -1;
}

PoolThreadCache中维护每一个内存块最终都是使用的一个Entry对象来进行的,从上面的属性可以看出,记录该内存块最重要的属性是chunk和handle,chunk记录了当前内存块所在的PoolChunk对象,而handle则记录了当前内存块是在PoolChunk和PoolSubpage中的哪个位置(关于PoolChunk,PoolSubpage和PoolArena的实现原理,建议读者阅读一下前面的文章,这样有助于读者快速理解相关原理)。如此,对于Netty使用的PoolThreadCache的存储结构我们就有了一个比较清晰的认识。

下面我们通过一幅图来对PoolThreadCache的数据结构进行一个整体的演示:

IBjEBny.png!web

如上图所示展示的就是PoolThreadCache的结构示意图。从图中可以看出在一个PoolThreadCache中,主要有三个MemoryRegionCache数组用于存储tiny,small和normal类型的内存块。每个MemoryRegionCache中有一个队列,队列中的元素类型为Entry。Entry的作用就是存储缓存的内存块的,其存储的方式主要是通过记录当前内存块所在的PoolChunk和标志其在PoolChunk中位置的handle参数。对于不同类型的数组,队列的长度是不一样的,tiny类型的是512,small类型的是256,normal类型的则是64。

2. PoolThreadCache初始化

对于PoolThreadCache的初始化,这里单独拿出来讲解的原因是,其初始化过程是与PoolThreadLocalCache所绑定的。PoolThreadLocalCache的作用与Java中的ThreadLocal的作用非常类似,其有一个initialValue()方法,用于在无法从PoolThreadLocalCache中获取数据时,通过调用该方法初始化一个。另外其提供了一个get()方法和和remove()方法,分别用于从PoolThreadLocalCache中将当前绑定的数据给清除。这里我们首先看看获取PoolThreadCache的入口代码:

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
  // 从PoolThreadLocalCache中尝试获取一个PoolThreadCache对象,
  // 如果不存在,则自行初始化一个返回
  PoolThreadCache cache = threadCache.get();
  // 由于当前方法是需要返回一个direct buffer,因而这里直接使用cache中的directArena
  PoolArena<ByteBuffer> directArena = cache.directArena;

  final ByteBuf buf;
  if (directArena != null) {
    // 如果directArena不为空,则直接调用其allocate()方法申请内存
    buf = directArena.allocate(cache, initialCapacity, maxCapacity);
  } else {
    // 如果当前缓存中由于某种原因无法获取到directArena,则直接创建一个存有直接内存的ByteBuf,
    // 一般情况下不会走到这一步
    buf = PlatformDependent.hasUnsafe() ?
      UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
  }

  // 为ByteBuf设置内存泄露检测功能
  return toLeakAwareBuffer(buf);
}

从上面的代码中可以看出,在最开始的时候,就会通过PoolThreadLocalCache尝试获取一个PoolThreadCache对象,如果不存在,其会自行初始化一个。这里我们直接看其是如何初始化的,如下是PoolThreadLocalCache.initialValue()方法的源码:

@Override
protected synchronized PoolThreadCache initialValue() {
  // 这里leastUsedArena()就是获取对应的PoolArena数组中最少被使用的那个Arena,将其返回。
  // 这里的判断方式是通过比较PoolArena.numThreadCaches属性来进行的,该属性记录了当前PoolArena被
  // 多少个线程所占用了。这里采用的思想就是,找到最少被使用的那个PoolArena,将其存入新的线程缓存中
  final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
  final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

  Thread current = Thread.currentThread();
  // 只有在指定了为每个线程使用缓存,或者当前线程是FastThreadLocalThread的子类型时,才会使用线程缓存
  if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
    return new PoolThreadCache(
      heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
      DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
  }
  // 如果指定了不使用缓存,或者线程换粗对象不是FastThreadLocalThread类型的,则创建一个PoolThreadCache
  // 对象,该对象中是不做任何缓存的,因为初始化数据都是0
  return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}

private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
  if (arenas == null || arenas.length == 0) {
    return null;
  }

  // 在PoolArena数组中找到被最少线程占用的对象,将其返回。这样做的目的是,由于内存池是多个线程都可以
  // 访问的公共区域,因而当这里就需要对内存池进行划分,以减少线程之间的竞争。
  PoolArena<T> minArena = arenas[0];
  for (int i = 1; i < arenas.length; i++) {
    PoolArena<T> arena = arenas[i];
    if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
      minArena = arena;
    }
  }

  return minArena;
}

从上述代码可以看出,对于PoolThreadCache的初始化,其首先会查找PoolArena数组中被最少线程占用的那个arena,然后将其封装到一个新建的PoolThreadCache中。

3. 内存申请

需要注意的是,PoolThreadCache申请内存并不是说其会创建一块内存,或者说其会到PoolArena中申请内存,而是指,其本身已经缓存有内存块,而当前申请的内存块大小正好与其一致,就会将该内存块返回;PoolThreadCache中的内存块都是在当前线程使用完创建的ByteBuf对象后,通过调用其release()方法释放内存时直接缓存到当前PoolThreadCache中的,其并不会直接将内存块返回给PoolArena。这里我们直接看一下其allocate()方法是如何实现的:

// 申请tiny类型的内存块
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
  return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}

// 申请small类型的内存块
boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, 
     int reqCapacity, int normCapacity) {
  return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
}

// 申请normal类型的内存块
boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, 
     int reqCapacity, int normCapacity) {
  return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
}

// 从MemoryRegionCache中申请内存
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
  if (cache == null) {
    return false;
  }

  // 从MemoryRegionCache中申请内存,本质上就是从其队列中申请,如果存在,则初始化申请到的内存块
  boolean allocated = cache.allocate(buf, reqCapacity);
  // 这里是如果当前PoolThreadCache中申请内存的次数达到了8192次,则对内存块进行一次trim()操作,
  // 对使用较少的内存块,将其返还给PoolArena,以供给其他线程使用
  if (++allocations >= freeSweepAllocationThreshold) {
    allocations = 0;
    trim();
  }
  return allocated;
}

这里对于内存块的申请,我们可以看到,PoolThreadCache是将其分为tiny,small和normal三种不同的方法来调用的,而具体大小的区分其实是在PoolArena中进行区分的(读者可以阅读本人前面的关于PoolArena介绍的文章)。在对应的内存数组中找到MemoryRegionCache对象之后,通过调用allocate()方法来申请内存,申请完之后还会检查当前缓存申请次数是否达到了8192次,达到了则对缓存中使用的内存块进行检测,将较少使用的内存块返还给PoolArena。这里我们首先看一下获取MemoryRegionCache的代码是如何实现的,也即cacheForTiny(),cacheForSmall()和cacheForNormal()的代码:

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
  // 计算当前数组下标索引,由于tiny类型的内存块每一层级相差16byte,因而这里的计算方式就是
  // 将目标内存大小除以16
  int idx = PoolArena.tinyIdx(normCapacity);
  // 返回tiny类型的数组中对应位置的MemoryRegionCache
  if (area.isDirect()) {
    return cache(tinySubPageDirectCaches, idx);
  }
  return cache(tinySubPageHeapCaches, idx);
}

private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
  // 计算当前数组下标的索引,由于small类型的内存块大小都是2的指数次幂,因而这里就是将目标内存大小
  // 除以1024之后计算其偏移量
  int idx = PoolArena.smallIdx(normCapacity);
  // 返回small类型的数组中对应位置的MemoryRegionCache
  if (area.isDirect()) {
    return cache(smallSubPageDirectCaches, idx);
  }
  return cache(smallSubPageHeapCaches, idx);
}

private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
  // 对于normal类型的缓存,这里也是首先将其向右位移13位,也就是8192,然后取2的对数,这样就
  // 可以得到其在数组中的位置,然后返回normal类型的数组中对应位置的MemoryRegionCache
  if (area.isDirect()) {
    int idx = log2(normCapacity >> numShiftsNormalDirect);
    return cache(normalDirectCaches, idx);
  }
  int idx = log2(normCapacity >> numShiftsNormalHeap);
  return cache(normalHeapCaches, idx);
}

这里对于数组位置的计算,主要是根据各个数组数据存储方式的不同而进行的,而它们最终都是通过一个MemoryRegionCache存储的,因而只需要返回该缓存对象即可。下面我们继续看一下MemoryRegionCache.allocate()方法是如何申请内存的:

public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
  // 尝试从队列中获取,如果队列中不存在,说明没有对应的内存块,则返回false,表示申请失败
  Entry<T> entry = queue.poll();
  if (entry == null) {
    return false;
  }
  
  // 走到这里说明队列中存在对应的内存块,那么通过其存储的Entry对象来初始化ByteBuf对象,
  // 如此即表示申请内存成功
  initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
  // 对entry对象进行循环利用
  entry.recycle();

  // 更新当前已经申请的内存数量
  ++allocations;
  return true;
}

可以看到,MemoryRegionCache申请内存的方式主要是从队列中取,如果取到了,则使用该内存块初始化一个ByteBuf对象。

前面我们讲到,PoolThreadCache会对其内存块使用次数进行计数,这么做的目的在于,如果一个ThreadPoolCache所缓存的内存块使用较少,那么就可以将其释放到PoolArena中,以便于其他线程可以申请使用。PoolThreadCache会在其内存总的申请次数达到8192时遍历其所有的MemoryRegionCache,然后调用其trim()方法进行内存释放,如下是该方法的源码:

public final void trim() {
  // size表示当前MemoryRegionCache中队列的最大可存储容量,allocations表示当前MemoryRegionCache
  // 的内存申请次数,size-allocations的含义就是判断当前申请的次数是否连队列的容量都没达到
  int free = size - allocations;
  allocations = 0;

  // 如果申请的次数连队列的容量都没达到,则释放该内存块
  if (free > 0) {
    free(free);
  }
}

private int free(int max) {
  int numFreed = 0;
  // 依次从队列中取出Entry数据,调用freeEntry()方法释放该Entry
  for (; numFreed < max; numFreed++) {
    Entry<T> entry = queue.poll();
    if (entry != null) {
      freeEntry(entry);
    } else {
      return numFreed;
    }
  }
  return numFreed;
}

private void freeEntry(Entry entry) {
  // 通过当前Entry中保存的PoolChunk和handle等数据释放当前内存块
  PoolChunk chunk = entry.chunk;
  long handle = entry.handle;
  ByteBuffer nioBuffer = entry.nioBuffer;
  entry.recycle();
  chunk.arena.freeChunk(chunk, handle, sizeClass, nioBuffer);
}

4. 内存释放

​ 对于内存的释放,其原理比较简单,一般的释放内存的入口在ByteBuf对象中。当调用ByteBuf.release()方法的时候,其首先会将释放动作委托给PoolChunk的free()方法,PoolChunk则会判断当前是否是池化的ByteBuf,如果是池化的ByteBuf,则调用PoolThreadCache.add()方法将其添加到PoolThreadCache中,也就是说在释放内存时,其实际上是释放到当前线程的PoolThreadCache中的。如下是add()方法的源码:

boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
    long handle, int normCapacity, SizeClass sizeClass) {
  // 通过当前释放的内存块的大小计算其应该放到哪个等级的MemoryRegionCache中
  MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
  if (cache == null) {
    return false;
  }
  
  // 将内存块释放到目标MemoryRegionCache中
  return cache.add(chunk, nioBuffer, handle);
}

public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle) {
  // 这里会尝试从缓存中获取一个Entry对象,如果没获取到则创建一个
  Entry<T> entry = newEntry(chunk, nioBuffer, handle);
  // 将实例化的Entry对象放到队列里
  boolean queued = queue.offer(entry);
  if (!queued) {
    entry.recycle();
  }

  return queued;
}

5. 小结

本文首先详细讲解了PoolThreadCache的数据结构,并且说明了其中需要注意的点,然后介绍了PoolThreadCache的实例化方式,接着从申请和释放内存两个角度介绍了PoolThreadCache源码的实现方式。

作者:爱宝贝丶

链接:https://my.oschina.net/zhangxufeng/blog/3040834

版权归作者所有,转载请注明出处


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK