Netty分布式ByteBuf使用命中缓存的分配解析

目录
  • 分析先关逻辑之前, 首先介绍缓存对象的数据结构
    • 我们以tiny类型为例跟到createSubPageCaches方法中
    • 回到PoolArena的allocate方法中
    • 我们跟到normalizeCapacity方法中
    • 回到allocate方法中
    • allocateTiny是缓存分配的入口
    • 回到acheForTiny方法中
    • 我们简单看下Entry这个类
    • 跟进init方法

上一小节简单分析了directArena内存分配大概流程 ,知道其先命中缓存, 如果命中不到, 则区分配一款连续内存, 这一小节带大家剖析命中缓存的相关逻辑

分析先关逻辑之前, 首先介绍缓存对象的数据结构

回顾上一小节的内容, 我们讲到PoolThreadCache中维护了三个缓存数组(实际上是六个, 这里仅仅以Direct为例, heap类型的逻辑是一样的): tinySubPageDirectCaches, smallSubPageDirectCaches, 和normalDirectCaches分别代表tiny类型, small类型和normal类型的缓存数组

这三个数组保存在PoolThreadCache的成员变量中:

private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

其中是在构造方法中进行了初始化:

tinySubPageDirectCaches = createSubPageCaches(
        tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(
        smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
normalDirectCaches = createNormalCaches(
        normalCacheSize, maxCachedBufferCapacity, directArena);

我们以tiny类型为例跟到createSubPageCaches方法中

private static <T> MemoryRegionCache<T>[] createSubPageCaches(
        int cacheSize, int numCaches, SizeClass sizeClass) {
    if (cacheSize > 0) {
        @SuppressWarnings("unchecked")
        MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
        for (int i = 0; i < cache.length; i++) {
            cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
        }
        return cache;
    } else {
        return null;
    }
}

这里上面的小节已经分析过, 这里创建了一个缓存数组, 这个缓存数组的长度,也就是numCaches, 在不同的类型, 这个长度不一样, tiny类型长度是32, small类型长度为4, normal类型长度为3

我们知道, 缓存数组中每个节点代表一个缓存对象, 里面维护了一个队列, 队列大小由PooledByteBufAllocator类中的tinyCacheSize, smallCacheSize, normalCacheSize属性决定的, 这里之前小节已经剖析过

其中每个缓存对象, 队列中缓存的ByteBuf大小是固定的, netty将每种缓冲区类型分成了不同长度规格, 而每个缓存中的队列缓存的ByteBuf的长度, 都是同一个规格的长度, 而缓冲区数组的长度, 就是规格的数量

比如, 在tiny类型中, netty将其长度分成32个规格, 每个规格都是16的整数倍, 也就是包含0B, 16B, 32B, 48B, 64B, 80B, 96B......496B总共32种规格, 而在其缓存数组tinySubPageDirectCaches中, 这每一种规格代表数组中的一个缓存对象缓存的ByteBuf的大小, 我们以tinySubPageDirectCaches[1]为例(这里下标选择1是因为下标为0代表的规格是0B, 其实就代表一个空的缓存, 这里不进行举例), 在tinySubPageDirectCaches[1]的缓存对象中所缓存的ByteBuf的缓冲区长度是16B, 在tinySubPageDirectCaches[2]中缓存的ByteBuf长度都为32B, 以此类推, tinySubPageDirectCaches[31]中缓存的ByteBuf长度为496B

有关类型规则的分配如下:

tiny:总共32个规格, 均是16的整数倍, 0B, 16B, 32B, 48B, 64B, 80B, 96B......496B

small:4种规格, 512b, 1k, 2k, 4k

nomal:3种规格, 8k, 16k, 32k

这样, PoolThreadCache中缓存数组的数据结构为

大概了解缓存数组的数据结构, 我们再继续剖析在缓冲中分配内存的逻辑

回到PoolArena的allocate方法中

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    //规格化
    final int normCapacity = normalizeCapacity(reqCapacity);
    if (isTinyOrSmall(normCapacity)) {
        int tableIdx;
        PoolSubpage<T>[] table;
        //判断是不是tinty
        boolean tiny = isTiny(normCapacity);
        if (tiny) { // < 512
            //缓存分配
            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                return;
            }
            //通过tinyIdx拿到tableIdx
            tableIdx = tinyIdx(normCapacity);
            //subpage的数组
            table = tinySubpagePools;
        } else {
            if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                return;
            }
            tableIdx = smallIdx(normCapacity);
            table = smallSubpagePools;
        }

        //拿到对应的节点
        final PoolSubpage<T> head = table[tableIdx];

        synchronized (head) {
            final PoolSubpage<T> s = head.next;
            //默认情况下, head的next也是自身
            if (s != head) {
                assert s.doNotDestroy && s.elemSize == normCapacity;
                long handle = s.allocate();
                assert handle >= 0;
                s.chunk.initBufWithSubpage(buf, handle, reqCapacity);

                if (tiny) {
                    allocationsTiny.increment();
                } else {
                    allocationsSmall.increment();
                }
                return;
            }
        }
        allocateNormal(buf, reqCapacity, normCapacity);
        return;
    }
    if (normCapacity <= chunkSize) {
        //首先在缓存上进行内存分配
        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
            //分配成功, 返回
            return;
        }
        //分配不成功, 做实际的内存分配
        allocateNormal(buf, reqCapacity, normCapacity);
    } else {
        //大于这个值, 就不在缓存上分配
        allocateHuge(buf, reqCapacity);
    }
}

首先通过normalizeCapacity方法进行内存规格化

我们跟到normalizeCapacity方法中

int normalizeCapacity(int reqCapacity) {
    if (reqCapacity < 0) {
        throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
    }
    if (reqCapacity >= chunkSize) {
        return reqCapacity;
    }
    //如果>tiny
    if (!isTiny(reqCapacity)) { // >= 512
        //找一个2的幂次方的数值, 确保数值大于等于reqCapacity
        int normalizedCapacity = reqCapacity;
        normalizedCapacity --;
        normalizedCapacity |= normalizedCapacity >>>  1;
        normalizedCapacity |= normalizedCapacity >>>  2;
        normalizedCapacity |= normalizedCapacity >>>  4;
        normalizedCapacity |= normalizedCapacity >>>  8;
        normalizedCapacity |= normalizedCapacity >>> 16;
        normalizedCapacity ++;

        if (normalizedCapacity < 0) {
            normalizedCapacity >>>= 1;
        }

        return normalizedCapacity;
    }
    //如果是16的倍数
    if ((reqCapacity & 15) == 0) {
        return reqCapacity;
    }
    //不是16的倍数, 变成最大小于当前值的值+16
    return (reqCapacity & ~15) + 16;
}

if (!isTiny(reqCapacity)) 代表如果大于tiny类型的大小, 也就是512, 则会找一个2的幂次方的数值, 确保这个数值大于等于reqCapacity

如果是tiny, 则继续往下

if ((reqCapacity & 15) == 0) 这里判断如果是16的倍数, 则直接返回

如果不是16的倍数, 则返回 (reqCapacity & ~15) + 16 , 也就是变成最小大于当前值的16的倍数值

从上面规格化逻辑看出, 这里将缓存大小规格化成固定大小, 确保每个缓存对象缓存的ByteBuf容量统一

回到allocate方法中

if(isTinyOrSmall(normCapacity)) 这里是根据规格化后的大小判断是否tiny或者small类型, 我们跟到方法中:

boolean isTinyOrSmall(int normCapacity) {
    return (normCapacity & subpageOverflowMask) == 0;
}

这里是判断如果normCapacity小于一个page的大小, 也就是8k代表其实tiny或者small

继续看allocate方法:

如果当前大小是tiny或者small, 则isTiny(normCapacity)判断是否是tiny类型, 跟进去:

static boolean isTiny(int normCapacity) {
    return (normCapacity & 0xFFFFFE00) == 0;
}

这里是判断如果小于512, 则认为是tiny

再继续看allocate方法:

如果是tiny, 则通过cache.allocateTiny(this, buf, reqCapacity, normCapacity)在缓存上进行分配

我们就以tiny类型为例, 分析在缓存上分配ByteBuf的流程

allocateTiny是缓存分配的入口

我们跟进去, 进入到了PoolThreadCache的allocateTiny方法中:

boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
    return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}

这里有个方法cacheForTiny(area, normCapacity), 这个方法的作用是根据normCapacity找到tiny类型缓存数组中的一个缓存对象

我们跟进cacheForTiny:

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
    int idx = PoolArena.tinyIdx(normCapacity);
    if (area.isDirect()) {
        return cache(tinySubPageDirectCaches, idx);
    }
    return cache(tinySubPageHeapCaches, idx);
}

PoolArena.tinyIdx(normCapacity)是找到tiny类型缓存数组的下标

继续跟tinyIdx:

static int tinyIdx(int normCapacity) {
    return normCapacity >>> 4;
}

这里直接将normCapacity除以16, 通过前面的内容我们知道, tiny类型缓存数组中每个元素规格化的数据都是16的倍数, 所以通过这种方式可以找到其下标, 参考图5-2, 如果是16B会拿到下标为1的元素, 如果是32B则会拿到下标为2的元素

回到acheForTiny方法中

if (area.isDirect()) 这里判断是否是分配堆外内存, 因为我们是按照堆外内存进行举例, 所以这里为true

再继续跟到cache(tinySubPageDirectCaches, idx)方法中:

private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
    if (cache == null || idx > cache.length - 1) {
        return null;
    }
    return cache[idx];
}

这里我们看到直接通过下标的方式拿到了缓存数组中的对象

回到PoolThreadCache的allocateTiny方法中:

boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
    return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}

拿到了缓存对象之后, 我们跟到allocate(cacheForTiny(area, normCapacity), buf, reqCapacity)方法中:

private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
    if (cache == null) {
        return false;
    }
    boolean allocated = cache.allocate(buf, reqCapacity);
    if (++ allocations >= freeSweepAllocationThreshold) {
        allocations = 0;
        trim();
    }
    return allocated;
}

这里通过cache.allocate(buf, reqCapacity)进行继续进行分配

再继续往里跟, 跟到内部类MemoryRegionCache的allocate(PooledByteBuf<T> buf, int reqCapacity)方法中:

public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
    Entry<T> entry = queue.poll();
    if (entry == null) {
        return false;
    }
    initBuf(entry.chunk, entry.handle, buf, reqCapacity);
    entry.recycle();
    ++ allocations;
    return true;
}

这里首先通过queue.poll()这种方式弹出一个entry, 我们之前的小节分析过, MemoryRegionCache维护着一个队列, 而队列中的每一个值是一个entry

我们简单看下Entry这个类

static final class Entry<T> {
    final Handle<Entry<?>> recyclerHandle;
    PoolChunk<T> chunk;
    long handle = -1;
    //代码省略
}

这里重点关注chunk和handle的这两个属性, chunk代表一块连续的内存, 我们之前简单介绍过, netty是通过chunk为单位进行内存分配的, 我们之后会对chunk进行剖析

handle相当于一个指针, 可以唯一定位到chunk里面的一块连续的内存, 之后也会详细分析

这样, 通过chunk和handle就可以定位ByteBuf中指定一块连续内存, 有关ByteBuf相关的读写, 都会在这块内存中进行

我们回到MemoryRegionCache的allocate(PooledByteBuf<T> buf, int reqCapacity)方法:

public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
    Entry<T> entry = queue.poll();
    if (entry == null) {
        return false;
    }
    initBuf(entry.chunk, entry.handle, buf, reqCapacity);
    entry.recycle();
    ++ allocations;
    return true;
}

弹出entry之后, 通过initBuf(entry.chunk, entry.handle, buf, reqCapacity)这种方式给ByteBuf初始化, 这里参数传入我们刚才分析过的当前Entry的chunk和hanle

因为我们分析的tiny类型的缓存对象是SubPageMemoryRegionCache类型,所以我们继续跟到SubPageMemoryRegionCache类的initBuf(entry.chunk, entry.handle, buf, reqCapacity)方法中:

protected void initBuf(
        PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
    chunk.initBufWithSubpage(buf, handle, reqCapacity);
}

这里的chunk调用了initBufWithSubpage(buf, handle, reqCapacity)方法, 其实就是PoolChunk类中的方法

我们继续跟initBufWithSubpage:

void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int reqCapacity) {
    initBufWithSubpage(buf, handle, bitmapIdx(handle), reqCapacity);
}

这里有关bitmapIdx(handle)相关的逻辑, 会在后续的章节进行剖析, 这里继续往里跟:

private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
    assert bitmapIdx != 0;
    int memoryMapIdx = memoryMapIdx(handle);
    PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
    assert subpage.doNotDestroy;
    assert reqCapacity <= subpage.elemSize;
    buf.init(
        this, handle,
        runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize,
        arena.parent.threadCache());
}

这里我们先关注init方法, 因为我们是以PooledUnsafeDirectByteBuf为例, 所以这里走的是PooledUnsafeDirectByteBuf的init方法

跟进init方法

void init(PoolChunk<ByteBuffer> chunk, long handle, int offset, int length, int maxLength,
          PoolThreadCache cache) {
    super.init(chunk, handle, offset, length, maxLength, cache);
    initMemoryAddress();
}

首先调用了父类的init方法, 再跟进去:

void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
    //初始化
    assert handle >= 0;
    assert chunk != null;
    //在哪一块内存上进行分配的
    this.chunk = chunk;
    //这一块内存上的哪一块连续内存
    this.handle = handle;
    memory = chunk.memory;
    this.offset = offset;
    this.length = length;
    this.maxLength = maxLength;
    tmpNioBuf = null;
    this.cache = cache;
}

这里将PooledUnsafeDirectByteBuf的各个属性进行了初始化

this.chunk = chunk 这里初始化了chunk, 代表当前的ByteBuf是在哪一块内存中分配的

this.handle = handle 这里初始化了handle, 代表当前的ByteBuf是这块内存的哪个连续内存

有关offset和length, 我们会在之后的小节进行分析, 在这里我们只需要知道, 通过缓存分配ByteBuf, 我们只需要通过一个chunk和handle, 就可以确定一块内存

以上就是通过缓存分配ByteBuf对象的过程

我们回到MemoryRegionCache的allocate(PooledByteBuf<T> buf, int reqCapacity)方法:

public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
    Entry<T> entry = queue.poll();
    if (entry == null) {
        return false;
    }
    initBuf(entry.chunk, entry.handle, buf, reqCapacity);
    entry.recycle();
    ++ allocations;
    return true;
}

分析完了initBuf方法, 再继续往下看

entry.recycle()这步是将entry对象进行回收, 因为entry对象弹出之后没有再被引用, 可能gc会将entry对象回收, netty为了将对象进行循环利用, 就将其放在对象回收站进行回收

我们跟进recycle方法

void recycle() {
    chunk = null;
    handle = -1;
    recyclerHandle.recycle(this);
}

chunk = null和handle = -1表示当前Entry不指向任何一块内存

recyclerHandle.recycle(this) 将当前entry回收, 有关对象回收站, 我们会在后面的章节详细剖析

以上就是命中缓存的流程, 因为这里我们是假设缓中有值的情况下进行分配的, 如果第一次分配, 缓存中是没有值的, 那么在缓存中没有值的情况下, netty是如何进行分配的呢?我们再之后的小节会进行剖析

更多关于Netty分布式ByteBuf使用命中缓存分配的资料请关注我们其它相关文章!

(0)

相关推荐

  • Netty分布式源码分析监听读事件

    前文传送门:NioSocketChannel注册到selector 我们回到AbstractUnsafe的register0()方法: private void register0(ChannelPromise promise) { try { //省略代码 //做实际的注册 doRegister(); neverRegistered = false; registered = true; //触发事件 pipeline.invokeHandlerAddedIfNeeded(); safeSet

  • Netty分布式ByteBuf的分类方式源码解析

    目录 ByteBuf根据不同的分类方式 会有不同的分类结果 1.Pooled和Unpooled 2.基于直接内存的ByteBuf和基于堆内存的ByteBuf 3.safe和unsafe 上一小节简单介绍了AbstractByteBuf这个抽象类, 这一小节对其子类的分类做一个简单的介绍 ByteBuf根据不同的分类方式 会有不同的分类结果 我们首先看第一种分类方式 1.Pooled和Unpooled pooled是从一块内存里去取一段连续内存封装成byteBuf 具体标志是类名以Pooled开头

  • Netty分布式pipeline传播inbound事件源码分析

    前一小结回顾:pipeline管道Handler删除 传播inbound事件 有关于inbound事件, 在概述中做过简单的介绍, 就是以自己为基准, 流向自己的事件, 比如最常见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程 在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到chann

  • Netty分布式ByteBuf缓冲区分配器源码解析

    目录 缓冲区分配器 以其中的分配ByteBuf的方法为例, 对其做简单的介绍 跟到directBuffer()方法中 我们回到缓冲区分配的方法 然后通过validate方法进行参数验证 缓冲区分配器 顾明思议就是分配缓冲区的工具, 在netty中, 缓冲区分配器的顶级抽象是接口ByteBufAllocator, 里面定义了有关缓冲区分配的相关api 抽象类AbstractByteBufAllocator实现了ByteBufAllocator接口, 并且实现了其大部分功能 和AbstractByt

  • Netty分布式ByteBuf使用的底层实现方式源码解析

    目录 概述 AbstractByteBuf属性和构造方法 首先看这个类的属性和构造方法 我们看几个最简单的方法 我们重点关注第二个校验方法ensureWritable(length) 我们跟到扩容的方法里面去 最后将写指针后移length个字节 概述 熟悉Nio的小伙伴应该对jdk底层byteBuffer不会陌生, 也就是字节缓冲区, 主要用于对网络底层io进行读写, 当channel中有数据时, 将channel中的数据读取到字节缓冲区, 当要往对方写数据的时候, 将字节缓冲区的数据写到cha

  • Netty分布式NioSocketChannel注册到selector方法解析

    目录 我们回到最初的NioMessageUnsafe的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel的pipeline final ChannelPipeline pipeline = pipeline(

  • Netty分布式ByteBuf使用命中缓存的分配解析

    目录 分析先关逻辑之前, 首先介绍缓存对象的数据结构 我们以tiny类型为例跟到createSubPageCaches方法中 回到PoolArena的allocate方法中 我们跟到normalizeCapacity方法中 回到allocate方法中 allocateTiny是缓存分配的入口 回到acheForTiny方法中 我们简单看下Entry这个类 跟进init方法 上一小节简单分析了directArena内存分配大概流程 ,知道其先命中缓存, 如果命中不到, 则区分配一款连续内存, 这一

  • Netty分布式ByteBuf使用subPage级别内存分配剖析

    目录 subPage级别内存分配 我们其中是在构造方法中初始化的, 看构造方法中其初始化代码 在构造方法中创建完毕之后, 会通过循环为其赋值 这里通过normCapacity拿到tableIdx 跟到allocate(normCapacity)方法中 我们跟到PoolSubpage的构造方法中 我们跟到addToPool(head)中 我们跟到allocate()方法中 我们继续跟进findNextAvail方法 我们回到allocate()方法中 我们跟到initBuf方法中 回到initBu

  • Netty分布式ByteBuf使用page级别的内存分配解析

    目录 netty内存分配数据结构 我们看PoolArena中有关chunkList的成员变量 我们看PoolSubpage的属性 我们回到PoolArena的allocate方法 我们跟进allocateNormal 首先会从head节点往下遍历 这里直接通过构造函数创建了一个chunk 首先将参数传入的值进行赋值 我们再回到PoolArena的allocateNormal方法中 跟到allocate(normCapacity)中 我们跟到allocateNode方法中 我们跟进updatePa

  • Netty分布式ByteBuf使用directArena分配缓冲区过程解析

    目录 directArena分配缓冲区 回到newDirectBuffer中 我们跟到newByteBuf方法中 跟到reuse方法中 跟到allocate方法中 1.首先在缓存上进行分配 2.如果在缓存上分配不成功, 则实际分配一块内存 上一小节简单分析了PooledByteBufAllocator中, 线程局部缓存和arean的相关逻辑, 这一小节简单分析下directArena分配缓冲区的相关过程 directArena分配缓冲区 回到newDirectBuffer中 protected

  • Netty分布式ByteBuf中PooledByteBufAllocator剖析

    目录 前言 PooledByteBufAllocator分配逻辑 逻辑简述 我们回到newDirectBuffer中 有关缓存列表, 我们循序渐进的往下看 我们在static块中看其初始化过程 我们再次跟到initialValue方法中 我们跟到createSubPageCaches这个方法中 最后并保存其类型 前言 上一小节简单介绍了ByteBufAllocator以及其子类UnPooledByteBufAllocator的缓冲区分类的逻辑, 这一小节开始带大家剖析更为复杂的PooledByt

  • Netty分布式ByteBuf使用的回收逻辑剖析

    目录 ByteBuf回收 这里调用了release0, 跟进去 我们首先分析free方法 我们跟到cache中 回到add方法中 我们回到free方法中 前文传送门:ByteBuf使用subPage级别内存分配 ByteBuf回收 之前的章节我们提到过, 堆外内存是不受jvm垃圾回收机制控制的, 所以我们分配一块堆外内存进行ByteBuf操作时, 使用完毕要对对象进行回收, 这一小节, 就以PooledUnsafeDirectByteBuf为例讲解有关内存分配的相关逻辑 PooledUnsafe

  • Netty分布式pipeline管道传播outBound事件源码解析

    目录 outbound事件传输流程 这里我们同样给出两种写法 跟到其write方法中: 跟到findContextOutbound中 回到write方法: 继续跟invokeWrite0 我们跟到HeadContext的write方法中 了解了inbound事件的传播过程, 对于学习outbound事件传输的流程, 也不会太困难 outbound事件传输流程 在我们业务代码中, 有可能使用wirte方法往写数据: public void channelActive(ChannelHandlerC

随机推荐