Netty分布式ByteBuf使用SocketChannel读取数据过程剖析

目录
  • Server读取数据的流程
    • 我们首先看NioEventLoop的processSelectedKey方法
    • 这里会走到DefaultChannelConfig的getAllocator方法中
    • 我们跟到static块中
    • 回到NioByteUnsafe的read()方法中
    • 我们跟进recvBufAllocHandle
    • 继续看doReadBytes方法
    • 跟到record方法中
  • 章节总结

我们第三章分析过客户端接入的流程, 这一小节带大家剖析客户端发送数据, Server读取数据的流程:

首先温馨提示, 这一小节高度耦合第三章的第1, 2节的内容, 很多知识这里并不会重复讲解, 如果对之前的知识印象不深刻建议恶补第三章的第1, 2节的内容之后再学习这一小节

传送门:

初始化NioSockectChannelConfig

处理接入事件之handle的创建

Server读取数据的流程

我们首先看NioEventLoop的processSelectedKey方法

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    //获取到channel中的unsafe
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    //如果这个key不是合法的, 说明这个channel可能有问题
    if (!k.isValid()) {
        //代码省略
    }
    try {
        //如果是合法的, 拿到key的io事件
        int readyOps = k.readyOps();
        //链接事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            unsafe.finishConnect();
        }
        //写事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }
        //读事件和接受链接事件
        //如果当前NioEventLoop是work线程的话, 这里就是op_read事件
        //如果是当前NioEventLoop是boss线程的话, 这里就是op_accept事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                return;
            }
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)

这里的判断表示轮询到大事件是op_read或者op_accept事件

之前的章节分析过, 如果当前NioEventLoop是work线程的话, 那么这里就是op_read事件, 也就是读事件, 表示客户端发来了数据流

这里会调用unsafe的redis()方法进行读取

如果是work线程, 那么这里的channel是NioServerSocketChannel, 其绑定的unsafe是NioByteUnsafe, 这里会走进NioByteUnsafe的read()方法中:

public final void read() {
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);
        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                byteBuf = allocHandle.allocate(allocator);
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    break;
                }
                allocHandle.incMessagesRead(1);
                readPending = false;
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());

            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();

            if (close) {
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

首先获取SocketChannel的config, pipeline等相关属性

final ByteBufAllocator allocator = config.getAllocator(); 这一步是获取一个ByteBuf的内存分配器, 用于分配ByteBuf

这里会走到DefaultChannelConfig的getAllocator方法中

public ByteBufAllocator getAllocator() {
    return allocator;
}

这里返回的DefualtChannelConfig的成员变量, 我们看这个成员变量:

private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;

这里调用ByteBufAllocator的属性DEFAULT, 跟进去:

ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;

我们看到这里又调用了ByteBufUtil的静态属性DEFAULT_ALLOCATOR, 再跟进去:

static final ByteBufAllocator DEFAULT_ALLOCATOR;

DEFAULT_ALLOCATOR这个属性是在static块中初始化的

我们跟到static块中

static {
    String allocType = SystemPropertyUtil.get(
            "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
    allocType = allocType.toLowerCase(Locale.US).trim();

    ByteBufAllocator alloc;
    if ("unpooled".equals(allocType)) {
        alloc = UnpooledByteBufAllocator.DEFAULT;
        logger.debug("-Dio.netty.allocator.type: {}", allocType);
    } else if ("pooled".equals(allocType)) {
        alloc = PooledByteBufAllocator.DEFAULT;
        logger.debug("-Dio.netty.allocator.type: {}", allocType);
    } else {
        alloc = PooledByteBufAllocator.DEFAULT;
        logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
    }
    DEFAULT_ALLOCATOR = alloc;
    //代码省略
}

首先判断运行环境是不是安卓, 如果不是安卓, 在返回"pooled"字符串保存在allocType中

然后通过if判断, 最后局部变量alloc = PooledByteBufAllocator.DEFAULT, 最后将alloc赋值到成员变量DEFAULT_ALLOCATOR

我们跟到PooledByteBufAllocator的DEFAULT属性中:

public static final PooledByteBufAllocator DEFAULT =
        new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());

我们看到这里直接通过new的方式, 创建了一个PooledByteBufAllocator对象, 也就是基于申请一块连续内存进行缓冲区分配的缓冲区分配器

缓冲区分配器的知识, 我们之前小节进行了详细的剖析, 这里就不再赘述

回到NioByteUnsafe的read()方法中

public final void read() {
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);

        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                byteBuf = allocHandle.allocate(allocator);
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    break;
                }

                allocHandle.incMessagesRead(1);
                readPending = false;
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());

            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();

            if (close) {
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

这里 ByteBufAllocator allocator = config.getAllocator()中的allocator , 就是PooledByteBufAllocator

final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle()  是创建一个handle, 我们之前的章节讲过, handle是对RecvByteBufAllocator进行实际操作的对象

我们跟进recvBufAllocHandle

public RecvByteBufAllocator.Handle recvBufAllocHandle() {
    //如果不存在, 则创建一个handle的实例
    if (recvHandle == null) {
        recvHandle = config().getRecvByteBufAllocator().newHandle();
    }
    return recvHandle;
}

这里是我们之前剖析过的逻辑, 如果不存在, 则创建handle的实例, 具体创建过程我们可以回顾第三章的第二小节, 这里就不再赘述

同样allocHandle.reset(config)是将配置重置, 第三章的第二小节也对其进行过剖析

重置完配置之后, 进行do-while循环, 有关循环终止条件allocHandle.continueReading(), 之前小节也有过详细剖析, 这里也不再赘述

在do-while循环中, 首先看 byteBuf = allocHandle.allocate(allocator) 这一步, 这里传入了刚才创建的allocate对象, 也就是PooledByteBufAllocator:

这里会跑到DefaultMaxMessagesRecvByteBufAllocator类的allocate方法中:

public ByteBuf allocate(ByteBufAllocator alloc) {
    return alloc.ioBuffer(guess());
}

这里的guess方法, 会调用AdaptiveRecvByteBufAllocator的guess方法:

public int guess() {
    return nextReceiveBufferSize;
}

这里会返回AdaptiveRecvByteBufAllocator的成员变量nextReceiveBufferSize, 也就是下次所分配缓冲区的大小, 根据我们之前学习的内容, 第一次分配的时候会分配初始大小, 也就是1024字节

回到DefaultMaxMessagesRecvByteBufAllocator类的allocate方法中:

这样, alloc.ioBuffer(guess())就会分配一个PooledByteBuf

我们跟到AbstractByteBufAllocator的ioBuffer方法中:

public ByteBuf ioBuffer(int initialCapacity) {
    if (PlatformDependent.hasUnsafe()) {
        return directBuffer(initialCapacity);
    }
    return heapBuffer(initialCapacity);
}

这里首先判断是否能获取jdk的unsafe对象, 默认为true, 所以会走到directBuffer(initialCapacity)中, 这里最终会分配一个PooledUnsafeDirectByteBuf对象, 具体分配流程我们再之前小节做过详细剖析

回到NioByteUnsafe的read()方法中:

分配完了ByteBuf之后, 再看这一步allocHandle.lastBytesRead(doReadBytes(byteBuf)):

首先看参数doReadBytes(byteBuf)方法, 这步是将channel中的数据读取到我们刚分配的ByteBuf中, 并返回读取到的字节数

这里会调用到NioSocketChannel的doReadBytes方法:

protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

首先拿到绑定在channel中的handler, 因为我们已经创建了handle, 所以这里会直接拿到

再看allocHandle.attemptedBytesRead(byteBuf.writableBytes())这步, byteBuf.writableBytes()返回byteBuf的可写字节数, 也就是最多能从channel中读取多少字节写到ByteBuf, allocate的attemptedBytesRead会把可写字节数设置到DefaultMaxMessagesRecvByteBufAllocator 类的attemptedBytesRead属性中

跟到DefaultMaxMessagesRecvByteBufAllocator中的attemptedBytesRead我们会看到:

public void attemptedBytesRead(int bytes) {
    attemptedBytesRead = bytes;
}

继续看doReadBytes方法

最后, 通过byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead())将jdk底层的channel中的数据写入到我们创建的ByteBuf中, 并返回实际写入的字节数

回到NioByteUnsafe的read()方法中:

继续看allocHandle.lastBytesRead(doReadBytes(byteBuf))这步

刚才我们剖析过doReadBytes(byteBuf)返回的是世界写入ByteBuf的字节数

再看lastBytesRead方法, 跟到DefaultMaxMessagesRecvByteBufAllocator的lastBytesRead方法中:

public final void lastBytesRead(int bytes) {
    lastBytesRead = bytes;
    totalBytesRead += bytes;
    if (totalBytesRead < 0) {
        totalBytesRead = Integer.MAX_VALUE;
    }
}

这里会赋值两个属性, lastBytesRead代表最后读取的字节数, 这里赋值为我们刚才写入ByteBuf的字节数, totalBytesRead表示总共读取的字节数, 这里将写入的字节数追加

继续看NioByteUnsafe的read()方法:

如果最后一次读取数据为0, 说明已经将channel中的数据全部读取完毕, 将新创建的ByteBuf释放循环利用, 并跳出循环

allocHandle.incMessagesRead(1)这步是增加消息的读取次数, 因为我们循环最多16次, 所以当增加消息次数增加到16会结束循环

读取完毕之后, 会通过pipeline.fireChannelRead(byteBuf)将传递channelRead事件, 有关channelRead事件, 我们在第四章也进行了详细的剖析

这里读者会有疑问, 如果一次读取不完, 就传递channelRead事件, 那么server接收到的数据有可能就是不完整的, 其实关于这点, netty也做了相应的处理, 我们会在之后的章节详细剖析netty的半包处理机制

循环结束后, 会执行到allocHandle.readComplete()这一步

我们知道第一次分配ByteBuf的初始容量是1024, 但是初始容量不一定一定满足所有的业务场景, netty中, 将每次读取数据的字节数进行记录, 然后之后次分配ByteBuf的时候, 容量会尽可能的符合业务场景所需要大小, 具体实现方式, 就是在readComplete()这一步体现的

我们跟到AdaptiveRecvByteBufAllocator的readComplete()方法中:

public void readComplete() {
    record(totalBytesRead());
}

这里调用了record方法, 并且传入了这一次所读取的字节总数

跟到record方法中

private void record(int actualReadBytes) {
    if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
        if (decreaseNow) {
            index = Math.max(index - INDEX_DECREMENT, minIndex);
            nextReceiveBufferSize = SIZE_TABLE[index];
            decreaseNow = false;
        } else {
            decreaseNow = true;
        }
    } else if (actualReadBytes >= nextReceiveBufferSize) {
        index = Math.min(index + INDEX_INCREMENT, maxIndex);
        nextReceiveBufferSize = SIZE_TABLE[index];
        decreaseNow = false;
    }
}

首先看判断条件 if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)])

这里index是当前分配的缓冲区大小所在的SIZE_TABLE中的索引, 将这个索引进行缩进, 然后根据缩进后的所以找出SIZE_TABLE中所存储的内存值, 再判断是否大于等于这次读取的最大字节数, 如果条件成立, 说明分配的内存过大, 需要缩容操作, 我们看if块中缩容相关的逻辑

首先 if (decreaseNow) 会判断是否立刻进行收缩操作, 通常第一次不会进行收缩操作, 然后会将decreaseNow设置为true, 代表下一次直接进行收缩操作

假设需要立刻进行收缩操作, 我们看收缩操作的相关逻辑:

index = Math.max(index - INDEX_DECREMENT, minIndex) 这一步将索引缩进一步, 但不能小于最小索引值

然后通过 nextReceiveBufferSize = SIZE_TABLE[index] 获取设置索引之后的内存, 赋值在nextReceiveBufferSize, 也就是下次需要分配的大小, 下次就会根据这个大小分配ByteBuf了, 这样就实现了缩容操作

再看 else&nbsp;if (actualReadBytes >= nextReceiveBufferSize)

这里判断这次读取字节的总量比上次分配的大小还要大, 则进行扩容操作

扩容操作也很简单, 索引步进, 然后拿到步进后的索引所对应的内存值, 作为下次所需要分配的大小

再NioByteUnsafe的read()方法中:

经过了缩容或者扩容操作之后, 通过pipeline.fireChannelReadComplete()传播ChannelReadComplete()事件

以上就是读取客户端消息的相关流程

章节总结

本章主要剖析了ByteBuf的基本操作以及缓冲区分配等相关知识.

缓冲区分配, 分为通过调用jdk的api的方式和分配一块连续内存的方式

其中, 通过分配连续内存的方式分配缓冲区中, 又介绍了在page级别分配的逻辑和在subpage级别分配的逻辑

page级别分配时通过操作内存二叉树的方式记录分配情况

subpage级别分配是通过位图的方式记录分配情况

最后介绍了NioSocketChannel处理读事件的相关逻辑

总体来说, 这一章的内容难度是比较大的, 希望同学课后通过多调试的方式进行熟练掌握

更多关于ByteBuf使用SocketChannel读取数据过程的资料请关注我们其它相关文章!

(0)

相关推荐

  • Netty分布式ByteBuf使用命中缓存的分配解析

    目录 分析先关逻辑之前, 首先介绍缓存对象的数据结构 我们以tiny类型为例跟到createSubPageCaches方法中 回到PoolArena的allocate方法中 我们跟到normalizeCapacity方法中 回到allocate方法中 allocateTiny是缓存分配的入口 回到acheForTiny方法中 我们简单看下Entry这个类 跟进init方法 上一小节简单分析了directArena内存分配大概流程 ,知道其先命中缓存, 如果命中不到, 则区分配一款连续内存, 这一

  • Netty分布式ByteBuf使用subPage级别内存分配剖析

    目录 subPage级别内存分配 我们其中是在构造方法中初始化的, 看构造方法中其初始化代码 在构造方法中创建完毕之后, 会通过循环为其赋值 这里通过normCapacity拿到tableIdx 跟到allocate(normCapacity)方法中 我们跟到PoolSubpage的构造方法中 我们跟到addToPool(head)中 我们跟到allocate()方法中 我们继续跟进findNextAvail方法 我们回到allocate()方法中 我们跟到initBuf方法中 回到initBu

  • Netty分布式ByteBuf使用的回收逻辑剖析

    目录 ByteBuf回收 这里调用了release0, 跟进去 我们首先分析free方法 我们跟到cache中 回到add方法中 我们回到free方法中 前文传送门:ByteBuf使用subPage级别内存分配 ByteBuf回收 之前的章节我们提到过, 堆外内存是不受jvm垃圾回收机制控制的, 所以我们分配一块堆外内存进行ByteBuf操作时, 使用完毕要对对象进行回收, 这一小节, 就以PooledUnsafeDirectByteBuf为例讲解有关内存分配的相关逻辑 PooledUnsafe

  • Netty分布式ByteBuf使用page级别的内存分配解析

    目录 netty内存分配数据结构 我们看PoolArena中有关chunkList的成员变量 我们看PoolSubpage的属性 我们回到PoolArena的allocate方法 我们跟进allocateNormal 首先会从head节点往下遍历 这里直接通过构造函数创建了一个chunk 首先将参数传入的值进行赋值 我们再回到PoolArena的allocateNormal方法中 跟到allocate(normCapacity)中 我们跟到allocateNode方法中 我们跟进updatePa

  • Netty分布式ByteBuf中PooledByteBufAllocator剖析

    目录 前言 PooledByteBufAllocator分配逻辑 逻辑简述 我们回到newDirectBuffer中 有关缓存列表, 我们循序渐进的往下看 我们在static块中看其初始化过程 我们再次跟到initialValue方法中 我们跟到createSubPageCaches这个方法中 最后并保存其类型 前言 上一小节简单介绍了ByteBufAllocator以及其子类UnPooledByteBufAllocator的缓冲区分类的逻辑, 这一小节开始带大家剖析更为复杂的PooledByt

  • Netty分布式ByteBuf使用directArena分配缓冲区过程解析

    目录 directArena分配缓冲区 回到newDirectBuffer中 我们跟到newByteBuf方法中 跟到reuse方法中 跟到allocate方法中 1.首先在缓存上进行分配 2.如果在缓存上分配不成功, 则实际分配一块内存 上一小节简单分析了PooledByteBufAllocator中, 线程局部缓存和arean的相关逻辑, 这一小节简单分析下directArena分配缓冲区的相关过程 directArena分配缓冲区 回到newDirectBuffer中 protected

  • Netty分布式ByteBuf使用SocketChannel读取数据过程剖析

    目录 Server读取数据的流程 我们首先看NioEventLoop的processSelectedKey方法 这里会走到DefaultChannelConfig的getAllocator方法中 我们跟到static块中 回到NioByteUnsafe的read()方法中 我们跟进recvBufAllocHandle 继续看doReadBytes方法 跟到record方法中 章节总结 我们第三章分析过客户端接入的流程, 这一小节带大家剖析客户端发送数据, Server读取数据的流程: 首先温馨提

  • Netty分布式ByteBuf使用的底层实现方式源码解析

    目录 概述 AbstractByteBuf属性和构造方法 首先看这个类的属性和构造方法 我们看几个最简单的方法 我们重点关注第二个校验方法ensureWritable(length) 我们跟到扩容的方法里面去 最后将写指针后移length个字节 概述 熟悉Nio的小伙伴应该对jdk底层byteBuffer不会陌生, 也就是字节缓冲区, 主要用于对网络底层io进行读写, 当channel中有数据时, 将channel中的数据读取到字节缓冲区, 当要往对方写数据的时候, 将字节缓冲区的数据写到cha

  • Netty分布式ByteBuf缓冲区分配器源码解析

    目录 缓冲区分配器 以其中的分配ByteBuf的方法为例, 对其做简单的介绍 跟到directBuffer()方法中 我们回到缓冲区分配的方法 然后通过validate方法进行参数验证 缓冲区分配器 顾明思议就是分配缓冲区的工具, 在netty中, 缓冲区分配器的顶级抽象是接口ByteBufAllocator, 里面定义了有关缓冲区分配的相关api 抽象类AbstractByteBufAllocator实现了ByteBufAllocator接口, 并且实现了其大部分功能 和AbstractByt

  • Netty分布式ByteBuf的分类方式源码解析

    目录 ByteBuf根据不同的分类方式 会有不同的分类结果 1.Pooled和Unpooled 2.基于直接内存的ByteBuf和基于堆内存的ByteBuf 3.safe和unsafe 上一小节简单介绍了AbstractByteBuf这个抽象类, 这一小节对其子类的分类做一个简单的介绍 ByteBuf根据不同的分类方式 会有不同的分类结果 我们首先看第一种分类方式 1.Pooled和Unpooled pooled是从一块内存里去取一段连续内存封装成byteBuf 具体标志是类名以Pooled开头

随机推荐