Categories
程式開發

Netty源碼解析– 內存池與PoolArena


我們知道,Netty使用直接內存實現Netty零拷貝以提升性能,

但直接內存的創建和釋放可能需要涉及系統調用,是比較昂貴的操作,如果每個請求都創建和釋放一個直接內存,那性能肯定是不能滿足要求的。

這時就需要使用內存池。

即從系統中申請一大塊內存,再在上面分配每個請求所需的內存。

Netty中的內存池主要涉及PoolArena,PoolChunk與PoolSubpage。

本文主要分析PoolArena的作用與實現。

源碼分析基於Netty 4.1.52

接口關係

ByteBufAllocator,內存分配器,負責為ByteBuf分配內存, 線程安全。

PooledByteBufAllocator,池化內存分配器,默認的ByteBufAllocator,預先從操作系統中申請一大塊內存,在該內存上分配內存給ByteBuf,可以提高性能和減小內存碎片。

UnPooledByteBufAllocator,非池化內存分配器,每次都從操作系統中申請內存。

RecvByteBufAllocator,接收內存分配器,為Channel讀入的IO數據分配一塊大小合理的buffer空間。具體功能交由內部接口Handle定義。

它主要是針對Channel讀入場景添加一些操作,如guess,incMessagesRead,lastBytesRead等等。

ByteBuf,分配好的內存塊,可以直接使用。

下面只關注PooledByteBufAllocator,它是Netty中默認的內存分配器,也是理解Netty內存機制的難點。

內存分配

前面文章《ChannelPipeline機制與讀寫過程》中分析了數據讀取過程,

NioByteUnsafe#read

public final void read() {
...
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);

ByteBuf byteBuf = null;

...
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
...
}

recvBufAllocHandle方法返回AdaptiveRecvByteBufAllocator.HandleImpl。 (AdaptiveRecvByteBufAllocator,PooledByteBufAllocator都在DefaultChannelConfig中初始化)

AdaptiveRecvByteBufAllocator.HandleImpl#allocate-> AbstractByteBufAllocator#ioBuffer-> PooledByteBufAllocator#directBuffer-> PooledByteBufAllocator#newDirectBuffer

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
// #1
PoolThreadCache cache = threadCache.get();
PoolArena directArena = cache.directArena;

final ByteBuf buf;
if (directArena != null) {
// #2
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
// #3
buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}

AbstractByteBufAllocator#ioBuffer方法會判斷當前系統是否支持unsafe。支持時使用直接內存,不支持則使用堆內存。這裡只關注直接內存的實現。

#1 從當前線程緩存中獲取對應內存池PoolArena

#2 在當前線程內存池上分配內存

#3 內存池不存在,只能使用非池化內存分配內存了

PooledByteBufAllocator#threadCache是​​一個PoolThreadLocalCache實例,PoolThreadLocalCache繼承於FastThreadLocal,FastThreadLocal這裡簡單理解為對ThreadLocal的優化,它為每個線程維護了一個PoolThreadCache,PoolThreadCache上關聯了內存池。

當PoolThreadLocalCache上某個線程的PoolThreadCache不存在時,通過initialValue方法構造。

PoolThreadLocalCache#initialValue

protected synchronized PoolThreadCache initialValue() {
// #1
final PoolArena heapArena = leastUsedArena(heapArenas);
final PoolArena directArena = leastUsedArena(directArenas);
// #2
final Thread current = Thread.currentThread();
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
final PoolThreadCache cache = new PoolThreadCache(
heapArena, directArena, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);

...
}
// No caching so just use 0 as sizes.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0);
}

#1 從PooledByteBufAllocator的heapArenas,directArenas中獲取使用率最小的PoolArena。

PooledByteBufAllocator構造時默認會為PooledByteBufAllocator#directArenas初始化8個PoolArena。

#2 構造PoolThreadCache。

PoolArena,可以理解為一個內存池,負責管理從操作系統中申請到的內存塊。

PoolThreadCache為每一個線程關聯一個PoolArena(PoolThreadCache#directArena),該線程的內存都在該PoolArena上分配。

Netty支持高並發系統,可能有很多線程進行同時內存分配。為了緩解線程競爭,通過創建多個PoolArena細化鎖的粒度,從而提高並發執行的效率。

注意,一個PoolArena可以會分給多個的線程,可以看到PoolArena上會有一些同步操作。

內存級別

前面分析SizeClasses的文章說過,Netty將內存池中的內存塊按大小劃分為3個級別。

不同級別的內存塊管理算法不同。默認劃分規則如下:

小<= 28672(3.5K)

正常<= 16777216(2M)

巨大> 16777216(2M)

smallSubpagePools是一個PoolSubpage數組,負責維護small級別的內存塊信息。

PoolChunk負責維護normal級別的內存,PoolChunkList管理一組PoolChunk。

PoolArena按內存使用率將PoolChunk分別維護到6個PoolChunkList中,

PoolArena按內存使用率將PoolChunk分別維護到6個PoolChunkList中,

qInit->內存使用率為0~25,

q000->內存使用率為1~50,

q025->內存使用率為25~75,

q050->內存使用率為50~75,

q075->內存使用率為75~100,

q100->內存使用率為100。

注意:PoolChunk是Netty每次向操作系統申請的內存塊。

PoolSubpage需要从PoolChunk中分配,而Tiny,Small级別的内存则是从PoolSubpage中分配。

下面來看一下分配過程

private void allocate(PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity) {
// #1
final int sizeIdx = size2SizeIdx(reqCapacity);
// #2
if (sizeIdx <= smallMaxSizeIdx) { tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx); } else if (sizeIdx 0 ? normalizeSize(reqCapacity) : reqCapacity; // Huge allocations are never served via the cache so just call allocateHuge allocateHuge(buf, normCapacity); } }

#1 size2SizeIdx是父類SizeClasses提供的方法,它使用特定算法,將申請的內存大小調整為規範大小,劃分到對應位置,返回對應索引,可參考《內存對齊類SizeClasses》

#2 分配small級別的內存塊

#3 分配normal級別的內存塊

#4 分配huge級別的內存塊

private void tcacheAllocateSmall(PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity,
final int sizeIdx) {
// #1
if (cache.allocateSmall(this, buf, reqCapacity, sizeIdx)) {
return;
}

// #2
final PoolSubpage head = smallSubpagePools[sizeIdx];
final boolean needsNormalAllocation;
synchronized (head) {
// #3
final PoolSubpage s = head.next;
needsNormalAllocation = s == head;
if (!needsNormalAllocation) {
assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx);
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
}
}
// #4
if (needsNormalAllocation) {
synchronized (this) {
allocateNormal(buf, reqCapacity, sizeIdx, cache);
}
}

incSmallAllocation();
}

#1 首先嘗試在線程緩存上分配。

除了PoolArena,PoolThreadCache#smallSubPageHeapCaches還為每個線程維護了Small級別的內存緩存

#2 使用前面SizeClasses#size2SizeIdx方法計算的索引,獲取對應PoolSubpage

#3 注意,head是一個佔位節點,並不存儲數據,s==head表示當前存在可以用的PoolSubpage,因為已經耗盡的PoolSubpage是會從鍊錶中移除。

接著從PoolSubpage中分配內存,後面有文章解析詳細過程

注意,這裡必要運行在同步機制中。

#4 沒有可用的PoolSubpage,需要申請一個Normal級別的內存塊,再在上面分配所需內存

normal級別的內存也是先嘗試在線程緩存中分配,分配失敗後再調用allocateNormal方法申請

PoolArena#allocate->分配普通

private void allocateNormal(PooledByteBuf buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
q025.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
q000.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) {
return;
}

// Add a new chunk.
PoolChunk c = newChunk(pageSize, nPSizes, pageShifts, chunkSize);
boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache);
assert success;
qInit.add(c);
}

#1 依次從q050,q025,q000,qInit,q075上申請內存

為什麼要是這個順序呢?

PoolArena中的PoolChunkList之間也組成一個“雙向”鍊錶

qInit ---> q000 q025 q050 q075 q100

PoolChunkList中還維護了minUsage,maxUsage,即當一個PoolChunk使用率大於maxUsage,它將被移動到下一個PoolChunkList,使用率小於minUsage,則被移動到前一個PoolChunkList。

注意:q000沒有前置節點,它的minUsage為1,即上面的PoolChunk內存完全釋放後,將被銷毀。

qInit的前置節點是它自己,但它的minUsage為Integer.MIN_VALUE,即使上面的PoolChunk內存完全釋放後,也不會被銷毀,而是繼續保留在內存。

不優先從q000分配,正是因為q000上的PoolChunk內存完全釋放後要被銷毀,如果在上面分配,則會延遲內存的回收進度。

而q075上由於內存利用率太高,導致內存分配的成功率大大降低,因此放到最後。

所以從q050是一個不錯的選擇,這樣大部分情況下,Chunk的利用率都會保持在一個較高水平,提高整個應用的內存利用率;

在PoolChunkList上申請內存,PoolChunkList會遍歷鍊錶上PoolChunk節點,直到分配成功或到達鍊錶末尾。

PoolChunk分配後,如果內存使用率高於maxUsage,它將被移動到下一個PoolChunkList。

newChunk方法負責構造一個PoolChunk,這裡是內存池向操作系統申請內存。

DirectArena#newChunk

protected PoolChunk newChunk(int pageSize, int maxPageIdx,
int pageShifts, int chunkSize) {
if (directMemoryCacheAlignment == 0) {
return new PoolChunk(this,
allocateDirect(chunkSize), pageSize, pageShifts,
chunkSize, maxPageIdx, 0);
}
final ByteBuffer memory = allocateDirect(chunkSize
+ directMemoryCacheAlignment);
return new PoolChunk(this, memory, pageSize,
pageShifts, chunkSize, maxPageIdx,
offsetCacheLine(memory));
}

allocateDirect方法向操作系統申請內存,獲得一個(jvm)ByteBuffer,

PoolChunk#memory維護了該ByteBuffer,PoolChunk的內存實際上都是在該ByteBuffer上分配。

最後是huge級別的內存申請

private void allocateHuge(PooledByteBuf buf, int reqCapacity) {
PoolChunk chunk = newUnpooledChunk(reqCapacity);
activeBytesHuge.add(chunk.chunkSize());
buf.initUnpooled(chunk, reqCapacity);
allocationsHuge.increment();
}

比較簡單,沒有使用內存池,直接向操作系統申請內存。

內存釋放

void free(PoolChunk chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
// #1
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
// #2
SizeClass sizeClass = sizeClass(handle);
if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}

freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, false);
}
}

#1 非池化內存,直接銷毀內存

#2 池化內存,首先嘗試加到線程緩存中,成功則不需要其他操作。失敗則調用freeChunk

void freeChunk(PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass, ByteBuffer nioBuffer,
boolean finalizer) {
final boolean destroyChunk;
synchronized (this) {
...
destroyChunk = !chunk.parent.free(chunk, handle, normCapacity, nioBuffer);
}
if (destroyChunk) {
// destroyChunk not need to be called while holding the synchronized lock.
destroyChunk(chunk);
}
}

chunk.parent即PoolChunkList,PoolChunkList#free會調用PoolChunk釋放內存,釋放內存後,如果內存使用率低於minUsage,則移動前一個PoolChunkList,如果前一個PoolChunkList不存在(q000),則返回false,由後面的步驟銷毀該PoolChunk。

可回顧前面解析ByteBuf文章中關於內存銷毀的內容。

如果您覺得本文不錯,歡迎關注我的微信公眾號,您的關注是我堅持的動力!

Netty源碼解析-- 內存池與PoolArena 1