Netty分布式ByteBuf使用的回收逻辑剖析

目录
  • ByteBuf回收
    • 这里调用了release0, 跟进去
    • 我们首先分析free方法
    • 我们跟到cache中
    • 回到add方法中
    • 我们回到free方法中

前文传送门:ByteBuf使用subPage级别内存分配

ByteBuf回收

之前的章节我们提到过, 堆外内存是不受jvm垃圾回收机制控制的, 所以我们分配一块堆外内存进行ByteBuf操作时, 使用完毕要对对象进行回收, 这一小节, 就以PooledUnsafeDirectByteBuf为例讲解有关内存分配的相关逻辑

PooledUnsafeDirectByteBuf中内存释放的入口方法是其父类AbstractReferenceCountedByteBuf中的release方法:

@Override
 public boolean release() {
     return release0(1);
 }

这里调用了release0, 跟进去

private boolean release0(int decrement) {
    for (;;) {
        int refCnt = this.refCnt;
        if (refCnt < decrement) {
            throw new IllegalReferenceCountException(refCnt, -decrement);
        }
        if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
            if (refCnt == decrement) {
                deallocate();
                return true;
            }
            return false;
        }
    }
}

if (refCnt == decrement) 中判断当前byteBuf是否没有被引用了, 如果没有被引用, 则通过deallocate()方法进行释放

因为我们是以PooledUnsafeDirectByteBuf为例, 所以这里会调用其父类PooledByteBuf的deallocate方法:

protected final void deallocate() {
    if (handle >= 0) {
        final long handle = this.handle;
        this.handle = -1;
        memory = null;
        chunk.arena.free(chunk, handle, maxLength, cache);
        recycle();
    }
}

this.handle = -1表示当前的ByteBuf不再指向任何一块内存

memory = null这里将memory也设置为null

chunk.arena.free(chunk, handle, maxLength, cache)这一步是将ByteBuf的内存进行释放

recycle()是将对象放入的对象回收站, 循环利用

我们首先分析free方法

void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
    //是否为unpooled
    if (chunk.unpooled) {
        int size = chunk.chunkSize();
        destroyChunk(chunk);
        activeBytesHuge.add(-size);
        deallocationsHuge.increment();
    } else {
        //那种级别的Size
        SizeClass sizeClass = sizeClass(normCapacity);
        //加到缓存里
        if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
            return;
        }
        //将缓存对象标记为未使用
        freeChunk(chunk, handle, sizeClass);
    }
}

首先判断是不是unpooled, 我们这里是Pooled, 所以会走到else块中:

sizeClass(normCapacity)计算是哪种级别的size, 我们按照tiny级别进行分析

cache.add(this, chunk, handle, normCapacity, sizeClass)是将当前当前ByteBuf进行缓存

我们之前讲过, 再分配ByteBuf时首先在缓存上分配, 而这步, 就是将其缓存的过程, 跟进去:

boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
    //拿到MemoryRegionCache节点
    MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
    if (cache == null) {
        return false;
    }
    //将chunk, 和handle封装成实体加到queue里面
    return cache.add(chunk, handle);
}

首先根据根据类型拿到相关类型缓存节点, 这里会根据不同的内存规格去找不同的对象, 我们简单回顾一下, 每个缓存对象都包含一个queue, queue中每个节点是entry, 每一个entry中包含一个chunk和handle, 可以指向唯一的连续的内存

我们跟到cache中

private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
    switch (sizeClass) {
    case Normal:
        return cacheForNormal(area, normCapacity);
    case Small:
        return cacheForSmall(area, normCapacity);
    case Tiny:
        return cacheForTiny(area, normCapacity);
    default:
        throw new Error();
    }
}

假设我们是tiny类型, 这里就会走到cacheForTiny(area, normCapacity)方法中, 跟进去:

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
    int idx = PoolArena.tinyIdx(normCapacity);
    if (area.isDirect()) {
        return cache(tinySubPageDirectCaches, idx);
    }
    return cache(tinySubPageHeapCaches, idx);
}

这个方法我们之前剖析过, 就是根据大小找到第几个缓存中的第几个缓存, 拿到下标之后, 通过cache去超相对应的缓存对象:

private static <T>  MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
    if (cache == null || idx > cache.length - 1) {
        return null;
    }
    return cache[idx];
}

我们这里看到, 是直接通过下标拿的缓存对象

回到add方法中

boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
    //拿到MemoryRegionCache节点
    MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
    if (cache == null) {
        return false;
    }
    //将chunk, 和handle封装成实体加到queue里面
    return cache.add(chunk, handle);
}

这里的cache对象调用了一个add方法, 这个方法就是将chunk和handle封装成一个entry加到queue里面

我们跟到add方法中:

public final boolean add(PoolChunk<T> chunk, long handle) {
    Entry<T> entry = newEntry(chunk, handle);
    boolean queued = queue.offer(entry);
    if (!queued) {
        entry.recycle();
    }
    return queued;
}

我们之前介绍过, 从在缓存中分配的时候从queue弹出一个entry, 会放到一个对象池里面, 而这里Entry<T> entry = newEntry(chunk, handle)就是从对象池里去取一个entry对象, 然后将chunk和handle进行赋值

然后通过queue.offer(entry)加到queue中

我们回到free方法中

void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
    //是否为unpooled
    if (chunk.unpooled) {
        int size = chunk.chunkSize();
        destroyChunk(chunk);
        activeBytesHuge.add(-size);
        deallocationsHuge.increment();
    } else {
        //那种级别的Size
        SizeClass sizeClass = sizeClass(normCapacity);
        //加到缓存里
        if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
            return;
        }
        freeChunk(chunk, handle, sizeClass);
    }
}

这里加到缓存之后, 如果成功, 就会return, 如果不成功, 就会调用freeChunk(chunk, handle, sizeClass)方法, 这个方法的意义是, 将原先给ByteBuf分配的内存区段标记为未使用

跟进freeChunk简单分析下:

void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
    final boolean destroyChunk;
    synchronized (this) {
        switch (sizeClass) {
        case Normal:
            ++deallocationsNormal;
            break;
        case Small:
            ++deallocationsSmall;
            break;
        case Tiny:
            ++deallocationsTiny;
            break;
        default:
            throw new Error();
        }
        destroyChunk = !chunk.parent.free(chunk, handle);
    }
    if (destroyChunk) {
        destroyChunk(chunk);
    }
}

我们再跟到free方法中:

boolean free(PoolChunk<T> chunk, long handle) {
    chunk.free(handle);
    if (chunk.usage() < minUsage) {
        remove(chunk);
        return move0(chunk);
    }
    return true;
}

chunk.free(handle)的意思是通过chunk释放一段连续的内存

再跟到free方法中:

void free(long handle) {
    int memoryMapIdx = memoryMapIdx(handle);
    int bitmapIdx = bitmapIdx(handle);

    if (bitmapIdx != 0) {
        PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
        assert subpage != null && subpage.doNotDestroy;
        PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
        synchronized (head) {
            if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {
                return;
            }
        }
    }
    freeBytes += runLength(memoryMapIdx);
    setValue(memoryMapIdx, depth(memoryMapIdx));
    updateParentsFree(memoryMapIdx);
}

if (bitmapIdx != 0)这 里判断是当前缓冲区分配的级别是Page还是Subpage, 如果是Subpage, 则会找到相关的Subpage将其位图标记为0

如果不是subpage, 这里通过分配内存的反向标记, 将该内存标记为未使用

这段逻辑可以读者自行分析, 如果之前分配相关的知识掌握扎实的话, 这里的逻辑也不是很难

回到PooledByteBuf的deallocate方法中:

protected final void deallocate() {
    if (handle >= 0) {
        final long handle = this.handle;
        this.handle = -1;
        memory = null;
        chunk.arena.free(chunk, handle, maxLength, cache);
        recycle();
    }
}

最后, 通过recycle()将释放的ByteBuf放入对象回收站, 有关对象回收站的知识, 会在以后的章节进行剖析

以上就是内存回收的大概逻辑,更多关于Netty分布式ByteBuf使用回收的资料请关注我们其它相关文章!

(0)

相关推荐

  • Netty分布式ByteBuf使用命中缓存的分配解析

    目录 分析先关逻辑之前, 首先介绍缓存对象的数据结构 我们以tiny类型为例跟到createSubPageCaches方法中 回到PoolArena的allocate方法中 我们跟到normalizeCapacity方法中 回到allocate方法中 allocateTiny是缓存分配的入口 回到acheForTiny方法中 我们简单看下Entry这个类 跟进init方法 上一小节简单分析了directArena内存分配大概流程 ,知道其先命中缓存, 如果命中不到, 则区分配一款连续内存, 这一

  • Netty分布式ByteBuf的分类方式源码解析

    目录 ByteBuf根据不同的分类方式 会有不同的分类结果 1.Pooled和Unpooled 2.基于直接内存的ByteBuf和基于堆内存的ByteBuf 3.safe和unsafe 上一小节简单介绍了AbstractByteBuf这个抽象类, 这一小节对其子类的分类做一个简单的介绍 ByteBuf根据不同的分类方式 会有不同的分类结果 我们首先看第一种分类方式 1.Pooled和Unpooled pooled是从一块内存里去取一段连续内存封装成byteBuf 具体标志是类名以Pooled开头

  • Netty分布式ByteBuf中PooledByteBufAllocator剖析

    目录 前言 PooledByteBufAllocator分配逻辑 逻辑简述 我们回到newDirectBuffer中 有关缓存列表, 我们循序渐进的往下看 我们在static块中看其初始化过程 我们再次跟到initialValue方法中 我们跟到createSubPageCaches这个方法中 最后并保存其类型 前言 上一小节简单介绍了ByteBufAllocator以及其子类UnPooledByteBufAllocator的缓冲区分类的逻辑, 这一小节开始带大家剖析更为复杂的PooledByt

  • Netty分布式ByteBuf缓冲区分配器源码解析

    目录 缓冲区分配器 以其中的分配ByteBuf的方法为例, 对其做简单的介绍 跟到directBuffer()方法中 我们回到缓冲区分配的方法 然后通过validate方法进行参数验证 缓冲区分配器 顾明思议就是分配缓冲区的工具, 在netty中, 缓冲区分配器的顶级抽象是接口ByteBufAllocator, 里面定义了有关缓冲区分配的相关api 抽象类AbstractByteBufAllocator实现了ByteBufAllocator接口, 并且实现了其大部分功能 和AbstractByt

  • Netty分布式ByteBuf使用page级别的内存分配解析

    目录 netty内存分配数据结构 我们看PoolArena中有关chunkList的成员变量 我们看PoolSubpage的属性 我们回到PoolArena的allocate方法 我们跟进allocateNormal 首先会从head节点往下遍历 这里直接通过构造函数创建了一个chunk 首先将参数传入的值进行赋值 我们再回到PoolArena的allocateNormal方法中 跟到allocate(normCapacity)中 我们跟到allocateNode方法中 我们跟进updatePa

  • Netty分布式ByteBuf使用directArena分配缓冲区过程解析

    目录 directArena分配缓冲区 回到newDirectBuffer中 我们跟到newByteBuf方法中 跟到reuse方法中 跟到allocate方法中 1.首先在缓存上进行分配 2.如果在缓存上分配不成功, 则实际分配一块内存 上一小节简单分析了PooledByteBufAllocator中, 线程局部缓存和arean的相关逻辑, 这一小节简单分析下directArena分配缓冲区的相关过程 directArena分配缓冲区 回到newDirectBuffer中 protected

  • Netty分布式ByteBuf使用的回收逻辑剖析

    目录 ByteBuf回收 这里调用了release0, 跟进去 我们首先分析free方法 我们跟到cache中 回到add方法中 我们回到free方法中 前文传送门:ByteBuf使用subPage级别内存分配 ByteBuf回收 之前的章节我们提到过, 堆外内存是不受jvm垃圾回收机制控制的, 所以我们分配一块堆外内存进行ByteBuf操作时, 使用完毕要对对象进行回收, 这一小节, 就以PooledUnsafeDirectByteBuf为例讲解有关内存分配的相关逻辑 PooledUnsafe

  • Netty分布式编码器写buffer队列逻辑剖析

    目录 写buffer队列 我们跟到AbstractUnsafe的write方法中 回到write方法中 我们跟到setUnwritable(invokeLater)方法中 前文传送门:抽象编码器MessageToByteEncoder 写buffer队列 之前的小节我们介绍过, writeAndFlush方法其实最终会调用write和flush方法 write方法最终会传递到head节点, 调用HeadContext的write方法: public void write(ChannelHandl

  • Netty分布式ByteBuf使用subPage级别内存分配剖析

    目录 subPage级别内存分配 我们其中是在构造方法中初始化的, 看构造方法中其初始化代码 在构造方法中创建完毕之后, 会通过循环为其赋值 这里通过normCapacity拿到tableIdx 跟到allocate(normCapacity)方法中 我们跟到PoolSubpage的构造方法中 我们跟到addToPool(head)中 我们跟到allocate()方法中 我们继续跟进findNextAvail方法 我们回到allocate()方法中 我们跟到initBuf方法中 回到initBu

  • Netty分布式ByteBuf使用的底层实现方式源码解析

    目录 概述 AbstractByteBuf属性和构造方法 首先看这个类的属性和构造方法 我们看几个最简单的方法 我们重点关注第二个校验方法ensureWritable(length) 我们跟到扩容的方法里面去 最后将写指针后移length个字节 概述 熟悉Nio的小伙伴应该对jdk底层byteBuffer不会陌生, 也就是字节缓冲区, 主要用于对网络底层io进行读写, 当channel中有数据时, 将channel中的数据读取到字节缓冲区, 当要往对方写数据的时候, 将字节缓冲区的数据写到cha

  • Netty分布式固定长度解码器实现原理剖析

    固定长度解码器 上一小节:解码器读取数据不完整的逻辑剖析 我们了解到, 解码器需要继承ByteToMessageDecoder, 并重写decode方法, 将解析出来的对象放入集合中集合, ByteToMessageDecoder中可以将解析出来的对象向下进行传播, 这一小节带大家剖析一个最简单的解码器FixedLengthFrameDecoder, 从它入手了解码器的相关原理 FixedLengthFrameDecoder是一个固定长度的解码器, 功能就是根据固定长度, 截取固定大小的字节数进

随机推荐