Netty分布式抽象编码器MessageToByteEncoder逻辑分析

目录
  • MessageToByteEncoder
    • 首先看MessageToByteEncoder的类声明
    • 跟到allocateBuffer方法中

前文回顾:Netty分布式编码器及写数据事件处理

MessageToByteEncoder

同解码器一样, 编码器中也有一个抽象类叫MessageToByteEncoder, 其中定义了编码器的骨架方法, 具体编码逻辑交给子类实现

解码器同样也是个handler, 将写出的数据进行截取处理, 我们在学习pipeline中我们知道, 写数据的时候会传递write事件, 传递过程中会调用handler的write方法, 所以编码器码器可以重写write方法, 将数据编码成二进制字节流然后再继续传递write事件

首先看MessageToByteEncoder的类声明

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter{
    //省略类体
}

这里继承ChannelOutboundHandlerAdapter, 说明是个outBoundhandler, 我们知道write事件是个outBound事件, 而outBound事件只能通过outBoundHandler进行传输

write事件传播过程中要调用handler的write方法

我们跟到MessageToByteEncoder的write方法中:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        if (acceptOutboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I cast = (I) msg;
            buf = allocateBuffer(ctx, cast, preferDirect);
            try {
                encode(ctx, cast, buf);
            } finally {
                ReferenceCountUtil.release(cast);
            }

            if (buf.isReadable()) {
                ctx.write(buf, promise);
            } else {
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        if (buf != null) {
            buf.release();
        }
    }
}

首先通过 if (acceptOutboundMessage(msg)) 判断当前对象是否可处理

如果可处理, 则进入if块中的逻辑, 如果不能处理, 则进入else块, 通过ctx.write(msg, promise)继续传递write事件

我们看if块中

I cast = (I) msg 这里是强制类型转换, 转换成I类型, I类型是个泛型, 具体类型由用户定义

buf = allocateBuffer(ctx, cast, preferDirect) 这里进行缓冲区分配

跟到allocateBuffer方法中

protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
                           boolean preferDirect) throws Exception {
    if (preferDirect) {
        return ctx.alloc().ioBuffer();
    } else {
        return ctx.alloc().heapBuffer();
    }
}

这里会直接通过ctx的内存分配器进行内存分配, 通过判断preferDirect来分配堆内存或者堆外内存, 默认情况下是分配堆外内存

有关内存分配, 我们之前已经做过相关的剖析

回到write方法中:

内存分配结束之后会调用encode(ctx, cast, buf)方法进行编码, 该类由子类实现

子类可以通过继承该类, 重写encode方法, 将参数对象cast编码成字节写入到传入的ByteBuf中, 就完成了编码工作

编码完成后后, 会通过ReferenceCountUtil.release(cast)将cast对象释放

if (buf.isReadable()) 这里判断buf是否有可读字节, 如果有可读字节, 则继续传递write事件

如果没有可读字节, 则将buf进行释放, 继续传播write事件, 传递一个空的ByteBuf

最后将buf设置为空

以上就是有关抽象编码器的抽象逻辑, 具体的编码逻辑还需要其子类去做,更多关于Netty分布式抽象编码器MessageToByteEncoder的资料请关注我们其它相关文章!

(0)

相关推荐

  • Netty分布式行解码器逻辑源码解析

    目录 行解码器LineBasedFrameDecoder 首先看其参数 我们跟到重载的decode方法中 我们看findEndOfLine(buffer)方法 这一小节了解下行解码器LineBasedFrameDecoder, 行解码器的功能是一个字节流, 以\r\n或者直接以\n结尾进行解码, 也就是以换行符为分隔进行解析 同样, 这个解码器也继承了ByteToMessageDecoder 行解码器LineBasedFrameDecoder 首先看其参数 //数据包的最大长度, 超过该长度会进

  • Netty分布式编码器及写数据事件处理使用场景

    目录 概述 编码器 第一节: writeAndFlush的事件传播 我们看一个最简单的使用的场景 我们跟到writeAndFlush方法中 我们跟到invokeWriteAndFlush中 我们再看invokeFlush0方法 概述 上一小章我们介绍了解码器, 这一章我们介绍编码器 其实编码器和解码器比较类似, 编码器也是一个handler, 并且属于outbounfHandle, 就是将准备发出去的数据进行拦截, 拦截之后进行相应的处理之后再次进发送处理, 如果理解了解码器, 那么编码器的相关

  • Netty分布式ByteBuf使用SocketChannel读取数据过程剖析

    目录 Server读取数据的流程 我们首先看NioEventLoop的processSelectedKey方法 这里会走到DefaultChannelConfig的getAllocator方法中 我们跟到static块中 回到NioByteUnsafe的read()方法中 我们跟进recvBufAllocHandle 继续看doReadBytes方法 跟到record方法中 章节总结 我们第三章分析过客户端接入的流程, 这一小节带大家剖析客户端发送数据, Server读取数据的流程: 首先温馨提

  • Nett分布式分隔符解码器逻辑源码剖析

    目录 分隔符解码器 我们看其中的一个构造方法 我们跟到重载decode方法中 我们看初始化该属性的构造方法 章节总结 前文传送门:Netty分布式行解码器逻辑源码解析 分隔符解码器 基于分隔符解码器DelimiterBasedFrameDecoder, 是按照指定分隔符进行解码的解码器, 通过分隔符, 可以将二进制流拆分成完整的数据包 同样继承了ByteToMessageDecoder并重写了decode方法 我们看其中的一个构造方法 public DelimiterBasedFrameDeco

  • Netty分布式固定长度解码器实现原理剖析

    固定长度解码器 上一小节:解码器读取数据不完整的逻辑剖析 我们了解到, 解码器需要继承ByteToMessageDecoder, 并重写decode方法, 将解析出来的对象放入集合中集合, ByteToMessageDecoder中可以将解析出来的对象向下进行传播, 这一小节带大家剖析一个最简单的解码器FixedLengthFrameDecoder, 从它入手了解码器的相关原理 FixedLengthFrameDecoder是一个固定长度的解码器, 功能就是根据固定长度, 截取固定大小的字节数进

  • Netty分布式解码器读取数据不完整的逻辑剖析

    目录 概述 第一节: ByteToMessageDecoder 我们看他的定义 我们看其channelRead方法 我们看cumulator属性 我们回到channRead方法中 概述 在我们上一个章节遗留过一个问题, 就是如果Server在读取客户端的数据的时候, 如果一次读取不完整, 就触发channelRead事件, 那么Netty是如何处理这类问题的, 在这一章中, 会对此做详细剖析 之前的章节我们学习过pipeline, 事件在pipeline中传递, handler可以将事件截取并对

  • Netty分布式抽象编码器MessageToByteEncoder逻辑分析

    目录 MessageToByteEncoder 首先看MessageToByteEncoder的类声明 跟到allocateBuffer方法中 前文回顾:Netty分布式编码器及写数据事件处理 MessageToByteEncoder 同解码器一样, 编码器中也有一个抽象类叫MessageToByteEncoder, 其中定义了编码器的骨架方法, 具体编码逻辑交给子类实现 解码器同样也是个handler, 将写出的数据进行截取处理, 我们在学习pipeline中我们知道, 写数据的时候会传递wr

  • Netty分布式编码器写buffer队列逻辑剖析

    目录 写buffer队列 我们跟到AbstractUnsafe的write方法中 回到write方法中 我们跟到setUnwritable(invokeLater)方法中 前文传送门:抽象编码器MessageToByteEncoder 写buffer队列 之前的小节我们介绍过, writeAndFlush方法其实最终会调用write和flush方法 write方法最终会传递到head节点, 调用HeadContext的write方法: public void write(ChannelHandl

  • Netty分布式Future与Promise执行回调相关逻辑剖析

    目录 Future和Promise执行回调 首先我们看一段写在handler中的业务代码 这里关注newPromise()方法, 跟进去 我们继续跟write方法 跟进tryFailure方法 跟到addMessage方法中 最后跟到AbstractUnsafe的flush方法 我们跟到remove()方法中 再跟到trySuccess方法中 我们看用户代码 跟到addListener0方法中 回到addListener0方法中 跟到isDone方法中 跟到notifyListeners()方法

  • Netty分布式ByteBuf缓冲区分配器源码解析

    目录 缓冲区分配器 以其中的分配ByteBuf的方法为例, 对其做简单的介绍 跟到directBuffer()方法中 我们回到缓冲区分配的方法 然后通过validate方法进行参数验证 缓冲区分配器 顾明思议就是分配缓冲区的工具, 在netty中, 缓冲区分配器的顶级抽象是接口ByteBufAllocator, 里面定义了有关缓冲区分配的相关api 抽象类AbstractByteBufAllocator实现了ByteBufAllocator接口, 并且实现了其大部分功能 和AbstractByt

  • Netty分布式ByteBuf使用的底层实现方式源码解析

    目录 概述 AbstractByteBuf属性和构造方法 首先看这个类的属性和构造方法 我们看几个最简单的方法 我们重点关注第二个校验方法ensureWritable(length) 我们跟到扩容的方法里面去 最后将写指针后移length个字节 概述 熟悉Nio的小伙伴应该对jdk底层byteBuffer不会陌生, 也就是字节缓冲区, 主要用于对网络底层io进行读写, 当channel中有数据时, 将channel中的数据读取到字节缓冲区, 当要往对方写数据的时候, 将字节缓冲区的数据写到cha

  • Netty分布式ByteBuf使用page级别的内存分配解析

    目录 netty内存分配数据结构 我们看PoolArena中有关chunkList的成员变量 我们看PoolSubpage的属性 我们回到PoolArena的allocate方法 我们跟进allocateNormal 首先会从head节点往下遍历 这里直接通过构造函数创建了一个chunk 首先将参数传入的值进行赋值 我们再回到PoolArena的allocateNormal方法中 跟到allocate(normCapacity)中 我们跟到allocateNode方法中 我们跟进updatePa

  • Netty分布式server启动流程Nio创建源码分析

    目录 NioServerSocketChannel创建 继承关系 绑定端口 端口封装成socket地址对象 跟进initAndRegister()方法 创建channel 父类的构造方法 将jdk的channel设置为非阻塞模式 前文传送门 Netty分布式Server启动流程服务端初始化源码分析 NioServerSocketChannel创建 我们如果熟悉Nio, 则对channel的概念则不会陌生, channel在相当于一个通道, 用于数据的传输 Netty将jdk的channel进行了

  • Netty分布式NioEventLoop任务队列执行源码分析

    目录 执行任务队列 跟进runAllTasks方法: 我们跟进fetchFromScheduledTaskQueue()方法 回到runAllTasks(long timeoutNanos)方法中 章节小结 前文传送门:NioEventLoop处理IO事件 执行任务队列 继续回到NioEventLoop的run()方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectN

随机推荐