Nett分布式分隔符解码器逻辑源码剖析

目录
  • 分隔符解码器
    • 我们看其中的一个构造方法
    • 我们跟到重载decode方法中
    • 我们看初始化该属性的构造方法
  • 章节总结

前文传送门:Netty分布式行解码器逻辑源码解析

分隔符解码器

基于分隔符解码器DelimiterBasedFrameDecoder, 是按照指定分隔符进行解码的解码器, 通过分隔符, 可以将二进制流拆分成完整的数据包

同样继承了ByteToMessageDecoder并重写了decode方法

我们看其中的一个构造方法

public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters) {
    this(maxFrameLength, true, delimiters);
}

这里参数maxFrameLength代表最大长度, delimiters是个可变参数, 可以说可以支持多个分隔符进行解码

我们进入decode方法:

protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    Object decoded = decode(ctx, in);
    if (decoded != null) {
        out.add(decoded);
    }
}

这里同样调用了其重载的decode方法并将解析好的数据添加到集合list中, 其父类就可以遍历out, 并将内容传播

我们跟到重载decode方法中

protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
    //行处理器(1)
    if (lineBasedDecoder != null) {
        return lineBasedDecoder.decode(ctx, buffer);
    }
    int minFrameLength = Integer.MAX_VALUE;
    ByteBuf minDelim = null;
    //找到最小长度的分隔符(2)
    for (ByteBuf delim: delimiters) {
        //每个分隔符分隔的数据包长度
        int frameLength = indexOf(buffer, delim);
        if (frameLength >= 0 && frameLength < minFrameLength) {
            minFrameLength = frameLength;
            minDelim = delim;
        }
    }
    //解码(3)
    //已经找到分隔符
    if (minDelim != null) {
        int minDelimLength = minDelim.capacity();
        ByteBuf frame;
        //当前分隔符否处于丢弃模式
        if (discardingTooLongFrame) {
            //首先设置为非丢弃模式
            discardingTooLongFrame = false;
            //丢弃
            buffer.skipBytes(minFrameLength + minDelimLength);

            int tooLongFrameLength = this.tooLongFrameLength;
            this.tooLongFrameLength = 0;
            if (!failFast) {
                fail(tooLongFrameLength);
            }
            return null;
        }
        //处于非丢弃模式
        //当前找到的数据包, 大于允许的数据包
        if (minFrameLength > maxFrameLength) {
            //当前数据包+最小分隔符长度 全部丢弃
            buffer.skipBytes(minFrameLength + minDelimLength);
            //传递异常事件
            fail(minFrameLength);
            return null;
        }
        //如果是正常的长度
        //解析出来的数据包是否忽略分隔符
        if (stripDelimiter) {
            //如果不包含分隔符
            //截取
            frame = buffer.readRetainedSlice(minFrameLength);
            //跳过分隔符
            buffer.skipBytes(minDelimLength);
        } else {
            //截取包含分隔符的长度
            frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
        }
        return frame;
    } else {
        //如果没有找到分隔符
        //非丢弃模式
        if (!discardingTooLongFrame) {
            //可读字节大于允许的解析出来的长度
            if (buffer.readableBytes() > maxFrameLength) {
                //将这个长度记录下
                tooLongFrameLength = buffer.readableBytes();
                //跳过这段长度
                buffer.skipBytes(buffer.readableBytes());
                //标记当前处于丢弃状态
                discardingTooLongFrame = true;
                if (failFast) {
                    fail(tooLongFrameLength);
                }
            }
        } else {
            tooLongFrameLength += buffer.readableBytes();
            buffer.skipBytes(buffer.readableBytes());
        }
        return null;
    }
}

这里的方法也比较长, 这里也通过拆分进行剖析

(1). 行处理器

(2). 找到最小长度分隔符

(3). 解码

首先看第一步行处理器:

if (lineBasedDecoder != null) {
    return lineBasedDecoder.decode(ctx, buffer);
}

这里首先判断成员变量lineBasedDecoder是否为空, 如果不为空则直接调用lineBasedDecoder的decode的方法进行解码, lineBasedDecoder实际上就是上一小节剖析的LineBasedFrameDecoder解码器

这个成员变量, 会在分隔符是\n和\r\n的时候进行初始化

我们看初始化该属性的构造方法

public DelimiterBasedFrameDecoder(
        int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {
    //代码省略
    //如果是基于行的分隔
    if (isLineBased(delimiters) && !isSubclass()) {
        //初始化行处理器
        lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
        this.delimiters = null;
    } else {
        //代码省略
    }
    //代码省略
}

这里isLineBased(delimiters)会判断是否是基于行的分隔, 跟到isLineBased方法中:

private static boolean isLineBased(final ByteBuf[] delimiters) {
    //分隔符长度不为2
    if (delimiters.length != 2) {
        return false;
    }
    //拿到第一个分隔符
    ByteBuf a = delimiters[0];
    //拿到第二个分隔符
    ByteBuf b = delimiters[1];
    if (a.capacity() < b.capacity()) {
        a = delimiters[1];
        b = delimiters[0];
    }
    //确保a是/r/n分隔符, 确保b是/n分隔符
    return a.capacity() == 2 && b.capacity() == 1
            && a.getByte(0) == '\r' && a.getByte(1) == '\n'
            && b.getByte(0) == '\n';
}

首先判断长度等于2, 直接返回false

然后拿到第一个分隔符a和第二个分隔符b, 然后判断a的第一个分隔符是不是\r, a的第二个分隔符是不是\n, b的第一个分隔符是不是\n, 如果都为true, 则条件成立

我们回到decode方法中, 看步骤2, 找到最小长度的分隔符:

这里最小长度的分隔符, 意思就是从读指针开始, 找到最近的分隔符

for (ByteBuf delim: delimiters) {
    //每个分隔符分隔的数据包长度
    int frameLength = indexOf(buffer, delim);
    if (frameLength >= 0 && frameLength < minFrameLength) {
        minFrameLength = frameLength;
        minDelim = delim;
    }
}

这里会遍历所有的分隔符, 然后找到每个分隔符到读指针到数据包长度

然后通过if判断, 找到长度最小的数据包的长度, 然后保存当前数据包的的分隔符, 如下图:

6-4-1

这里假设A和B同为分隔符, A分隔符到读指针的长度小于B分隔符到读指针的长度, 这里会找到最小的分隔符A, 分隔符的最小长度, 就readIndex到A的长度

我们继续看第3步, 解码:

if (minDelim != null) 表示已经找到最小长度分隔符, 我们继续看if块中的逻辑:

int minDelimLength = minDelim.capacity();
ByteBuf frame;
if (discardingTooLongFrame) {
    discardingTooLongFrame = false;
    buffer.skipBytes(minFrameLength + minDelimLength);
    int tooLongFrameLength = this.tooLongFrameLength;
    this.tooLongFrameLength = 0;
    if (!failFast) {
        fail(tooLongFrameLength);
    }
    return null;
}
if (minFrameLength > maxFrameLength) {
    buffer.skipBytes(minFrameLength + minDelimLength);
    fail(minFrameLength);
    return null;
}
if (stripDelimiter) {
    frame = buffer.readRetainedSlice(minFrameLength);
    buffer.skipBytes(minDelimLength);
} else {
    frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
}
return frame;

if (discardingTooLongFrame) 表示当前是否处于非丢弃模式, 如果是丢弃模式, 则进入if块

因为第一个不是丢弃模式, 所以这里先分析if块后面的逻辑

if (minFrameLength > maxFrameLength) 这里是判断当前找到的数据包长度大于最大长度, 这里的最大长度使我们创建解码器的时候设置的, 如果超过了最大长度, 就通过 buffer.skipBytes(minFrameLength + minDelimLength) 方式, 跳过数据包+分隔符的长度, 也就是将这部分数据进行完全丢弃

继续往下看, 如果长度不大最大允许长度, 则通过 if (stripDelimiter) 判断解析的出来的数据包是否包含分隔符, 如果不包含分隔符, 则截取数据包的长度之后, 跳过分隔符

我们再回头看 if (discardingTooLongFrame) 中的if块中的逻辑, 也就是丢弃模式:

首先将discardingTooLongFrame设置为false, 标记非丢弃模式

然后通过 buffer.skipBytes(minFrameLength + minDelimLength) 将数据包+分隔符长度的字节数跳过, 也就是进行丢弃, 之后再进行抛出异常

分析完成了找到分隔符之后的丢弃模式非丢弃模式的逻辑处理, 我们在分析没找到分隔符的逻辑处理, 也就是 if (minDelim != null) 中的else块:

if (!discardingTooLongFrame) {
    if (buffer.readableBytes() > maxFrameLength) {
        tooLongFrameLength = buffer.readableBytes();
        buffer.skipBytes(buffer.readableBytes());
        discardingTooLongFrame = true;
        if (failFast) {
            fail(tooLongFrameLength);
        }
    }
} else {
    tooLongFrameLength += buffer.readableBytes();
    buffer.skipBytes(buffer.readableBytes());
}
return null;

首先通过 if (!discardingTooLongFrame) 判断是否为非丢弃模式, 如果是, 则进入if块:

在if块中, 首先通过 if (buffer.readableBytes() > maxFrameLength) 判断当前可读字节数是否大于最大允许的长度, 如果大于最大允许的长度, 则将可读字节数设置到tooLongFrameLength的属性中, 代表丢弃的字节数

然后通过 buffer.skipBytes(buffer.readableBytes()) 将累计器中所有的可读字节进行丢弃

最后将discardingTooLongFrame设置为true, 也就是丢弃模式, 之后抛出异常

如果 if (!discardingTooLongFrame) 为false, 也就是当前处于丢弃模式, 则追加tooLongFrameLength也就是丢弃的字节数的长度, 并通过 buffer.skipBytes(buffer.readableBytes()) 将所有的字节继续进行丢弃

以上就是分隔符解码器的相关逻辑

章节总结

本章介绍了抽象解码器ByteToMessageDecoder, 和其他几个实现了ByteToMessageDecoder类的解码器, 这个几个解码器逻辑都比较简单, 同学们可以根据其中的思想剖析其他的比较复杂的解码器, 或者根据其规则实现自己的自定义解码器

更多关于Nett分布式分隔符解码器的资料请关注我们其它相关文章!

(0)

相关推荐

  • Netty分布式ByteBuf使用SocketChannel读取数据过程剖析

    目录 Server读取数据的流程 我们首先看NioEventLoop的processSelectedKey方法 这里会走到DefaultChannelConfig的getAllocator方法中 我们跟到static块中 回到NioByteUnsafe的read()方法中 我们跟进recvBufAllocHandle 继续看doReadBytes方法 跟到record方法中 章节总结 我们第三章分析过客户端接入的流程, 这一小节带大家剖析客户端发送数据, Server读取数据的流程: 首先温馨提

  • Netty分布式ByteBuf使用subPage级别内存分配剖析

    目录 subPage级别内存分配 我们其中是在构造方法中初始化的, 看构造方法中其初始化代码 在构造方法中创建完毕之后, 会通过循环为其赋值 这里通过normCapacity拿到tableIdx 跟到allocate(normCapacity)方法中 我们跟到PoolSubpage的构造方法中 我们跟到addToPool(head)中 我们跟到allocate()方法中 我们继续跟进findNextAvail方法 我们回到allocate()方法中 我们跟到initBuf方法中 回到initBu

  • C语言基础野指针与空指针示例分析

    目录 一:野指针 1.1 :野指针的成因 2.1 :规避野指针 1. 初始化指针 2. 避免指针越界 3 避免返回局部变量的地址 4. 开辟的指针释放后置为NULL 5. 养成良好的编程习惯 二:空指针 一:野指针 概念:野指针就是指向的内存地址是未知的(随机的,不正确的,没有明确限制的). 说明:指针变量也是变量,是变量就可以任意赋值.但是,任意数值赋值给指针变量没有意义,因为这样的指针就成了野指针,此指针指向的区域是未知(操作系统不允许操作此指针指向的内存区域). 注:野指针不会直接引发错误

  • Netty分布式固定长度解码器实现原理剖析

    固定长度解码器 上一小节:解码器读取数据不完整的逻辑剖析 我们了解到, 解码器需要继承ByteToMessageDecoder, 并重写decode方法, 将解析出来的对象放入集合中集合, ByteToMessageDecoder中可以将解析出来的对象向下进行传播, 这一小节带大家剖析一个最简单的解码器FixedLengthFrameDecoder, 从它入手了解码器的相关原理 FixedLengthFrameDecoder是一个固定长度的解码器, 功能就是根据固定长度, 截取固定大小的字节数进

  • Netty分布式解码器读取数据不完整的逻辑剖析

    目录 概述 第一节: ByteToMessageDecoder 我们看他的定义 我们看其channelRead方法 我们看cumulator属性 我们回到channRead方法中 概述 在我们上一个章节遗留过一个问题, 就是如果Server在读取客户端的数据的时候, 如果一次读取不完整, 就触发channelRead事件, 那么Netty是如何处理这类问题的, 在这一章中, 会对此做详细剖析 之前的章节我们学习过pipeline, 事件在pipeline中传递, handler可以将事件截取并对

  • Netty分布式ByteBuf使用的回收逻辑剖析

    目录 ByteBuf回收 这里调用了release0, 跟进去 我们首先分析free方法 我们跟到cache中 回到add方法中 我们回到free方法中 前文传送门:ByteBuf使用subPage级别内存分配 ByteBuf回收 之前的章节我们提到过, 堆外内存是不受jvm垃圾回收机制控制的, 所以我们分配一块堆外内存进行ByteBuf操作时, 使用完毕要对对象进行回收, 这一小节, 就以PooledUnsafeDirectByteBuf为例讲解有关内存分配的相关逻辑 PooledUnsafe

  • Nett分布式分隔符解码器逻辑源码剖析

    目录 分隔符解码器 我们看其中的一个构造方法 我们跟到重载decode方法中 我们看初始化该属性的构造方法 章节总结 前文传送门:Netty分布式行解码器逻辑源码解析 分隔符解码器 基于分隔符解码器DelimiterBasedFrameDecoder, 是按照指定分隔符进行解码的解码器, 通过分隔符, 可以将二进制流拆分成完整的数据包 同样继承了ByteToMessageDecoder并重写了decode方法 我们看其中的一个构造方法 public DelimiterBasedFrameDeco

  • Netty分布式行解码器逻辑源码解析

    目录 行解码器LineBasedFrameDecoder 首先看其参数 我们跟到重载的decode方法中 我们看findEndOfLine(buffer)方法 这一小节了解下行解码器LineBasedFrameDecoder, 行解码器的功能是一个字节流, 以\r\n或者直接以\n结尾进行解码, 也就是以换行符为分隔进行解析 同样, 这个解码器也继承了ByteToMessageDecoder 行解码器LineBasedFrameDecoder 首先看其参数 //数据包的最大长度, 超过该长度会进

  • Netty分布式获取异线程释放对象源码剖析

    目录 获取异线程释放对象 在介绍之前我们首先看Stack类中的两个属性 我们跟到pop方法中 继续跟到scavengeSome方法中 我们继续分析transfer方法 接着我们我们关注一个细节 我们跟到reclaimSpace方法 章节小结 前文传送门:异线程下回收对象 获取异线程释放对象 上一小节分析了异线程回收对象, 原理是通过与stack关联的WeakOrderQueue进行回收 如果对象经过异线程回收之后, 当前线程需要取出对象进行二次利用, 如果当前stack中为空, 则会通过当前st

  • Netty分布式ByteBuf缓冲区分配器源码解析

    目录 缓冲区分配器 以其中的分配ByteBuf的方法为例, 对其做简单的介绍 跟到directBuffer()方法中 我们回到缓冲区分配的方法 然后通过validate方法进行参数验证 缓冲区分配器 顾明思议就是分配缓冲区的工具, 在netty中, 缓冲区分配器的顶级抽象是接口ByteBufAllocator, 里面定义了有关缓冲区分配的相关api 抽象类AbstractByteBufAllocator实现了ByteBufAllocator接口, 并且实现了其大部分功能 和AbstractByt

  • Java并发编程之ReentrantLock实现原理及源码剖析

    目录 一.ReentrantLock简介 二.ReentrantLock使用 三.ReentrantLock源码分析 1.非公平锁源码分析 2.公平锁源码分析 前面<Java并发编程之JUC并发核心AQS同步队列原理剖析>介绍了AQS的同步等待队列的实现原理及源码分析,这节我们将介绍一下基于AQS实现的ReentranLock的应用.特性.实现原理及源码分析. 一.ReentrantLock简介 ReentrantLock位于Java的juc包里面,从JDK1.5开始出现,是基于AQS同步队列

  • vue3.x源码剖析之数据响应式的深入讲解

    目录 前言 什么是数据响应式 数据响应式的大体流程 vue2.x数据响应式和3.x响应式对比 大致流程图 实现依赖收集 代码仓库 结尾 前言 如果错过了秋枫和冬雪,那么春天的樱花一定会盛开吧.最近一直在准备自己的考试,考完试了,终于可以继续研究源码和写文章了,哈哈哈.学过vue的都知道,数据响应式在vue框架中极其重要,写代码也好,面试也罢,数据响应式都是核心的内容.在vue3的官网文档中,作者说如果想让数据更加响应式的话,可以把数据放在reactive里面,官方文档在讲述这里的时候一笔带过,笔

  • SpringBoot源码剖析之属性文件加载原理

    目录 前言 1.找到入口 2.ConfigFileApplicationListener 2.1 主要流程分析 2.2 Loader构造器 2.3 properties加载 总结 前言 首先我们来看一个问题.就是我们在创建SpringBoot项目的时候会在对应的application.properties或者application.yml文件中添加对应的属性信息,我们的问题是这些属性文件是什么时候被加载的?如果要实现自定义的属性文件怎么来实现呢?本文来给大家揭晓答案: 1.找到入口 结合我们前面

  • Netty分布式NioEventLoop任务队列执行源码分析

    目录 执行任务队列 跟进runAllTasks方法: 我们跟进fetchFromScheduledTaskQueue()方法 回到runAllTasks(long timeoutNanos)方法中 章节小结 前文传送门:NioEventLoop处理IO事件 执行任务队列 继续回到NioEventLoop的run()方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectN

  • Netty分布式NioEventLoop优化selector源码解析

    目录 优化selector selector的创建过程 代码剖析 这里一步创建了这个优化后的数据结构 最后返回优化后的selector 优化selector selector的创建过程 在剖析selector轮询之前, 我们先讲解一下selector的创建过程 回顾之前的小节, 在创建NioEventLoop中初始化了唯一绑定的selector: NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider

  • golang高并发系统限流策略漏桶和令牌桶算法源码剖析

    目录 前言 漏桶算法 样例 源码实现 令牌桶算法 样例 源码剖析 Limit类型 Limiter结构体 Reservation结构体 Limiter消费token limiter归还Token 总结 前言 今天与大家聊一聊高并发系统中的限流技术,限流又称为流量控制,是指限制到达系统的并发请求数,当达到限制条件则可以拒绝请求,可以起到保护下游服务,防止服务过载等作用.常用的限流策略有漏桶算法.令牌桶算法.滑动窗口:下文主要与大家一起分析一下漏桶算法和令牌桶算法,滑动窗口就不在这里这介绍了.好啦,废

随机推荐