Netty分布式解码器读取数据不完整的逻辑剖析

目录
  • 概述
  • 第一节: ByteToMessageDecoder
    • 我们看他的定义
    • 我们看其channelRead方法
    • 我们看cumulator属性
    • 我们回到channRead方法中

概述

在我们上一个章节遗留过一个问题, 就是如果Server在读取客户端的数据的时候, 如果一次读取不完整, 就触发channelRead事件, 那么Netty是如何处理这类问题的, 在这一章中, 会对此做详细剖析

之前的章节我们学习过pipeline, 事件在pipeline中传递, handler可以将事件截取并对其处理, 而之后剖析的编解码器, 其实就是一个handler, 截取byteBuf中的字节, 然后组建成业务需要的数据进行继续传播

编码器, 通常是OutBoundHandler, 也就是以自身为基准, 对那些对外流出的数据做处理, 所以也叫编码器, 将数据经过编码发送出去

解码器, 通常是inboundHandler, 也就是以自身为基准, 对那些流向自身的数据做处理, 所以也叫解码器, 将对向的数据接收之后经过解码再进行使用

同样, 在netty的编码器中, 也会对半包和粘包问题做相应的处理

什么是半包, 顾名思义, 就是不完整的数据包, 因为netty在轮询读事件的时候, 每次将channel中读取的数据, 不一定是一个完整的数据包, 这种情况, 就叫半包

粘包同样也不难理解, 如果client往server发送数据包, 如果发送频繁很有可能会将多个数据包的数据都发送到通道中, 如果在server在读取的时候可能会读取到超过一个完整数据包的长度, 这种情况叫粘包

有关半包和粘包, 入下图所示:

6-0-1

netty对半包的或者粘包的处理其实也很简单, 通过之前的学习, 我们知道, 每个handler是和channel唯一绑定的, 一个handler只对应一个channel, 所以将channel中的数据读取时候经过解析, 如果不是一个完整的数据包, 则解析失败, 将这块数据包进行保存, 等下次解析时再和这个数据包进行组装解析, 直到解析到完整的数据包, 才会将数据包进行向下传递

具体流程是在代码中如何体现的呢?我们进入到源码分析中

第一节: ByteToMessageDecoder

ByteToMessageDecoder解码器, 顾名思义, 是一个将Byte解析成消息的解码器,

我们看他的定义

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter{
    //类体省略
}

这里继承了ChannelInboundHandlerAdapter, 根据之前的学习, 我们知道, 这是个inbound类型的handler, 也就是处理流向自身事件的handler

其次, 该类通过abstract关键字修饰, 说明是个抽象类, 在我们实际使用的时候, 并不是直接使用这个类, 而是使用其子类, 类定义了解码器的骨架方法, 具体实现逻辑交给子类, 同样, 在半包处理中也是由该类进行实现的

netty中很多解码器都实现了这个类, 并且, 我们也可以通过实现该类进行自定义解码器

我们重点关注一下该类的一个属性:

ByteBuf cumulation;

这个属性, 就是有关半包处理的关键属性, 从概述中我们知道, netty会将不完整的数据包进行保存, 这个数据包就是保存在这个属性中

之前的学习我们知道, ByteBuf读取完数据会传递channelRead事件, 传播过程中会调用handler的channelRead方法, ByteToMessageDecoder的channelRead方法, 就是编码的关键部分

我们看其channelRead方法

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //如果message是byteBuf类型
    if (msg instanceof ByteBuf) {
        //简单当成一个arrayList, 用于盛放解析到的对象
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            ByteBuf data = (ByteBuf) msg;
            //当前累加器为空, 说明这是第一次从io流里面读取数据
            first = cumulation == null;
            if (first) {
                //如果是第一次, 则将累加器赋值为刚读进来的对象
                cumulation = data;
            } else {
                //如果不是第一次, 则把当前累加的数据和读进来的数据进行累加
                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            }
            //调用子类的方法进行解析
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new DecoderException(t);
        } finally {
            if (cumulation != null && !cumulation.isReadable()) {
                numReads = 0;
                cumulation.release();
                cumulation = null;
            } else if (++ numReads >= discardAfterReads) {
                numReads = 0;
                discardSomeReadBytes();
            }
            //记录list长度
            int size = out.size();
            decodeWasNull = !out.insertSinceRecycled();
            //向下传播
            fireChannelRead(ctx, out, size);
            out.recycle();
        }
    } else {
        //不是byteBuf类型则向下传播
        ctx.fireChannelRead(msg);
    }
}

这方法比较长, 带大家一步步剖析

首先判断如果传来的数据是ByteBuf, 则进入if块中

CodecOutputList out = CodecOutputList.newInstance() 这里就当成一个ArrayList就好, 用于盛放解码完成的数据

ByteBuf data = (ByteBuf) msg 这步将数据转化成ByteBuf

first = cumulation == null  这里表示如果cumulation == null, 说明没有存储板半包数据, 则将当前的数据保存在属性cumulation中

如果 cumulation != null , 说明存储了半包数据, 则通过cumulator.cumulate(ctx.alloc(), cumulation, data)将读取到的数据和原来的数据进行累加, 保存在属性cumulation中

我们看cumulator属性

private Cumulator cumulator = MERGE_CUMULATOR;

这里调用了其静态属性MERGE_CUMULATOR, 我们跟过去:

public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        ByteBuf buffer;
        //不能到过最大内存
        if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                || cumulation.refCnt() > 1) {
            buffer = expandCumulation(alloc, cumulation, in.readableBytes());
        } else {
            buffer = cumulation;
        }
        //将当前数据buffer
        buffer.writeBytes(in);
        in.release();
        return buffer;
    }
};

这里创建了Cumulator类型的静态对象, 并重写了cumulate方法, 这里cumulate方法, 就是用于将ByteBuf进行拼接的方法:

方法中, 首先判断cumulation的写指针+in的可读字节数是否超过了cumulation的最大长度, 如果超过了, 将对cumulation进行扩容, 如果没超过, 则将其赋值到局部变量buffer中

然后将in的数据写到buffer中, 将in进行释放, 返回写入数据后的ByteBuf

回到channelRead方法中:

最后通过callDecode(ctx, cumulation, out)方法进行解码, 这里传入了Context对象, 缓冲区cumulation和集合out:

我们跟到callDecode(ctx, cumulation, out)方法中:

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        //只要累加器里面有数据
        while (in.isReadable()) {
            int outSize = out.size();
            //判断当前List是否有对象
            if (outSize > 0) {
                //如果有对象, 则向下传播事件
                fireChannelRead(ctx, out, outSize);
                //清空当前list
                out.clear();
                //解码过程中如ctx被removed掉就break
                if (ctx.isRemoved()) {
                    break;
                }
                outSize = 0;
            }
            //当前可读数据长度
            int oldInputLength = in.readableBytes();
            //子类实现
            //子类解析, 解析玩对象放到out里面
            decode(ctx, in, out);
            if (ctx.isRemoved()) {
                break;
            }
            //List解析前大小 和解析后长度一样(什么没有解析出来)
            if (outSize == out.size()) {
                //原来可读的长度==解析后可读长度
                //说明没有读取数据(当前累加的数据并没有拼成一个完整的数据包)
                if (oldInputLength == in.readableBytes()) {
                    //跳出循环(下次在读取数据才能进行后续的解析)
                    break;
                } else {
                    //没有解析到数据, 但是进行读取了
                    continue;
                }
            }
            //out里面有数据, 但是没有从累加器读取数据
            if (oldInputLength == in.readableBytes()) {
                throw new DecoderException(
                        StringUtil.simpleClassName(getClass()) +
                        ".decode() did not read anything but decoded a message.");
            }

            if (isSingleDecode()) {
                break;
            }
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Throwable cause) {
        throw new DecoderException(cause);
    }
}

这里首先循环判断传入的ByteBuf是否有可读字节, 如果还有可读字节说明没有解码完成, 则循环继续解码

然后判断集合out的大小, 如果大小大于1, 说明out中盛放了解码完成之后的数据, 然后将事件向下传播, 并清空out

因为我们第一次解码out是空的, 所以这里不会进入if块, 这部分我们稍后分析, 这里继续往下看

通过 int oldInputLength = in.readableBytes() 获取当前ByteBuf, 其实也就是属性cumulation的可读字节数, 这里就是一个备份用于比较, 我们继续往下看:

decode(ctx, in, out)方法是最终的解码操作, 这部会读取cumulation并且将解码后的数据放入到集合out中, 在ByteToMessageDecoder中的该方法是一个抽象方法, 让子类进行实现, 我们使用的netty很多的解码都是继承了ByteToMessageDecoder并实现了decode方法从而完成了解码操作, 同样我们也可以遵循相应的规则进行自定义解码器, 在之后的小节中会讲解netty定义的解码器, 并剖析相关的实现细节, 这里我们继续往下看:

if (outSize == out.size()) 这个判断表示解析之前的out大小和解析之后out大小进行比较, 如果相同, 说明并没有解析出数据, 我们进入到if块中:

if (oldInputLength == in.readableBytes()) 表示cumulation的可读字节数在解析之前和解析之后是相同的, 说明解码方法中并没有解析数据, 也就是当前的数据并不是一个完整的数据包, 则跳出循环, 留给下次解析, 否则, 说明没有解析到数据, 但是读取了, 所以跳过该次循环进入下次循环

最后判断 if (oldInputLength == in.readableBytes()) , 这里代表out中有数据, 但是并没有从cumulation读数据, 说明这个out的内容是非法的, 直接抛出异常

我们回到channRead方法中

我们关注finally中的内容:

finally {
    if (cumulation != null && !cumulation.isReadable()) {
        numReads = 0;
        cumulation.release();
        cumulation = null;
    } else if (++ numReads >= discardAfterReads) {
        numReads = 0;
        discardSomeReadBytes();
    }
    //记录list长度
    int size = out.size();
    decodeWasNull = !out.insertSinceRecycled();
    //向下传播
    fireChannelRead(ctx, out, size);
    out.recycle();
}

首先判断cumulation不为null, 并且没有可读字节, 则将累加器进行释放, 并设置为null

之后记录out的长度, 通过fireChannelRead(ctx, out, size)将channelRead事件进行向下传播, 并回收out对象

我们跟到fireChannelRead(ctx, out, size)方法中:

static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
    //遍历List
    for (int i = 0; i < numElements; i ++) {
        //逐个向下传递
        ctx.fireChannelRead(msgs.getUnsafe(i));
    }
}

这里遍历out集合, 并将里面的元素逐个向下传递

以上就是有关解码的骨架逻辑

更多关于Netty分布式解码器读取数据的资料请关注我们其它相关文章!

(0)

相关推荐

  • Netty分布式ByteBuf使用directArena分配缓冲区过程解析

    目录 directArena分配缓冲区 回到newDirectBuffer中 我们跟到newByteBuf方法中 跟到reuse方法中 跟到allocate方法中 1.首先在缓存上进行分配 2.如果在缓存上分配不成功, 则实际分配一块内存 上一小节简单分析了PooledByteBufAllocator中, 线程局部缓存和arean的相关逻辑, 这一小节简单分析下directArena分配缓冲区的相关过程 directArena分配缓冲区 回到newDirectBuffer中 protected

  • Netty分布式ByteBuf使用page级别的内存分配解析

    目录 netty内存分配数据结构 我们看PoolArena中有关chunkList的成员变量 我们看PoolSubpage的属性 我们回到PoolArena的allocate方法 我们跟进allocateNormal 首先会从head节点往下遍历 这里直接通过构造函数创建了一个chunk 首先将参数传入的值进行赋值 我们再回到PoolArena的allocateNormal方法中 跟到allocate(normCapacity)中 我们跟到allocateNode方法中 我们跟进updatePa

  • Netty分布式ByteBuf中PooledByteBufAllocator剖析

    目录 前言 PooledByteBufAllocator分配逻辑 逻辑简述 我们回到newDirectBuffer中 有关缓存列表, 我们循序渐进的往下看 我们在static块中看其初始化过程 我们再次跟到initialValue方法中 我们跟到createSubPageCaches这个方法中 最后并保存其类型 前言 上一小节简单介绍了ByteBufAllocator以及其子类UnPooledByteBufAllocator的缓冲区分类的逻辑, 这一小节开始带大家剖析更为复杂的PooledByt

  • Netty分布式ByteBuf使用subPage级别内存分配剖析

    目录 subPage级别内存分配 我们其中是在构造方法中初始化的, 看构造方法中其初始化代码 在构造方法中创建完毕之后, 会通过循环为其赋值 这里通过normCapacity拿到tableIdx 跟到allocate(normCapacity)方法中 我们跟到PoolSubpage的构造方法中 我们跟到addToPool(head)中 我们跟到allocate()方法中 我们继续跟进findNextAvail方法 我们回到allocate()方法中 我们跟到initBuf方法中 回到initBu

  • Netty分布式ByteBuf使用的回收逻辑剖析

    目录 ByteBuf回收 这里调用了release0, 跟进去 我们首先分析free方法 我们跟到cache中 回到add方法中 我们回到free方法中 前文传送门:ByteBuf使用subPage级别内存分配 ByteBuf回收 之前的章节我们提到过, 堆外内存是不受jvm垃圾回收机制控制的, 所以我们分配一块堆外内存进行ByteBuf操作时, 使用完毕要对对象进行回收, 这一小节, 就以PooledUnsafeDirectByteBuf为例讲解有关内存分配的相关逻辑 PooledUnsafe

  • Netty分布式ByteBuf使用命中缓存的分配解析

    目录 分析先关逻辑之前, 首先介绍缓存对象的数据结构 我们以tiny类型为例跟到createSubPageCaches方法中 回到PoolArena的allocate方法中 我们跟到normalizeCapacity方法中 回到allocate方法中 allocateTiny是缓存分配的入口 回到acheForTiny方法中 我们简单看下Entry这个类 跟进init方法 上一小节简单分析了directArena内存分配大概流程 ,知道其先命中缓存, 如果命中不到, 则区分配一款连续内存, 这一

  • Netty分布式解码器读取数据不完整的逻辑剖析

    目录 概述 第一节: ByteToMessageDecoder 我们看他的定义 我们看其channelRead方法 我们看cumulator属性 我们回到channRead方法中 概述 在我们上一个章节遗留过一个问题, 就是如果Server在读取客户端的数据的时候, 如果一次读取不完整, 就触发channelRead事件, 那么Netty是如何处理这类问题的, 在这一章中, 会对此做详细剖析 之前的章节我们学习过pipeline, 事件在pipeline中传递, handler可以将事件截取并对

  • Netty分布式Future与Promise执行回调相关逻辑剖析

    目录 Future和Promise执行回调 首先我们看一段写在handler中的业务代码 这里关注newPromise()方法, 跟进去 我们继续跟write方法 跟进tryFailure方法 跟到addMessage方法中 最后跟到AbstractUnsafe的flush方法 我们跟到remove()方法中 再跟到trySuccess方法中 我们看用户代码 跟到addListener0方法中 回到addListener0方法中 跟到isDone方法中 跟到notifyListeners()方法

  • Java中I/O流读取数据不完整的问题解决

    目录 一·问题描述: 二·问题原因: 三·解决办法: 四·测试结果:成功 一·问题描述: 1.利用Java的转换流去读取一个json文件数据,获取的数据无法被解析为json格式数据(格式总是报错),且获取的数据末尾缺少一部分数据. (1)Java源代码如图 (2)原json文件如图 (3)解析获取的数据如图:转换为json格式数据报错 二·问题原因: 1.最后一次缓存数组里面的数据,没有拼接到最终字符串数据里面 2.stringBuffer.append(buffer)拼接数据的方法,内部可能会

  • Netty分布式获取异线程释放对象源码剖析

    目录 获取异线程释放对象 在介绍之前我们首先看Stack类中的两个属性 我们跟到pop方法中 继续跟到scavengeSome方法中 我们继续分析transfer方法 接着我们我们关注一个细节 我们跟到reclaimSpace方法 章节小结 前文传送门:异线程下回收对象 获取异线程释放对象 上一小节分析了异线程回收对象, 原理是通过与stack关联的WeakOrderQueue进行回收 如果对象经过异线程回收之后, 当前线程需要取出对象进行二次利用, 如果当前stack中为空, 则会通过当前st

  • Netty分布式flush方法刷新buffer队列源码剖析

    flush方法 上一小节学习了writeAndFlush的write方法, 这一小节我们剖析flush方法 通过前面的学习我们知道, flush方法通过事件传递, 最终会传递到HeadContext的flush方法: public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } 这里最终会调用AbstractUnsafe的flush方法 public final void flush() { a

  • Netty分布式从recycler对象回收站获取对象过程剖析

    前文传送门:Netty分布式高性能工具类recycler的使用及创建 从对象回收站中获取对象 我们回顾上一小节demo的main方法中 从回收站获取对象 public static void main(String[] args){ User user1 = RECYCLER.get(); user1.recycle(); User user2 = RECYCLER.get(); user2.recycle(); System.out.println(user1==user2); } 这个通过R

  • Netty分布式固定长度解码器实现原理剖析

    固定长度解码器 上一小节:解码器读取数据不完整的逻辑剖析 我们了解到, 解码器需要继承ByteToMessageDecoder, 并重写decode方法, 将解析出来的对象放入集合中集合, ByteToMessageDecoder中可以将解析出来的对象向下进行传播, 这一小节带大家剖析一个最简单的解码器FixedLengthFrameDecoder, 从它入手了解码器的相关原理 FixedLengthFrameDecoder是一个固定长度的解码器, 功能就是根据固定长度, 截取固定大小的字节数进

  • Netty分布式高性能工具类FastThreadLocal和Recycler分析

    目录 概述 第一节:FastThreadLocal的使用和创建 首先我们看一个最简单的demo 跟到nextVariableIndex方法中 我们首先剖析slowGet()方法 我们跟进fastGet 回到FastThreadLocal的get方法中 在我们的demo中对应这个方法 前文传送门:Netty分布式Future与Promise执行回调相关逻辑剖析 概述 FastThreadLocal我们在剖析堆外内存分配的时候简单介绍过, 它类似于JDK的ThreadLocal, 也是用于在多线程条

  • Netty分布式ByteBuf使用SocketChannel读取数据过程剖析

    目录 Server读取数据的流程 我们首先看NioEventLoop的processSelectedKey方法 这里会走到DefaultChannelConfig的getAllocator方法中 我们跟到static块中 回到NioByteUnsafe的read()方法中 我们跟进recvBufAllocHandle 继续看doReadBytes方法 跟到record方法中 章节总结 我们第三章分析过客户端接入的流程, 这一小节带大家剖析客户端发送数据, Server读取数据的流程: 首先温馨提

  • Netty分布式行解码器逻辑源码解析

    目录 行解码器LineBasedFrameDecoder 首先看其参数 我们跟到重载的decode方法中 我们看findEndOfLine(buffer)方法 这一小节了解下行解码器LineBasedFrameDecoder, 行解码器的功能是一个字节流, 以\r\n或者直接以\n结尾进行解码, 也就是以换行符为分隔进行解析 同样, 这个解码器也继承了ByteToMessageDecoder 行解码器LineBasedFrameDecoder 首先看其参数 //数据包的最大长度, 超过该长度会进

随机推荐