Netty分布式pipeline管道创建方法跟踪解析

目录
  • 概述
  • pipeline的创建

上一章节回顾:Netty分布式源码分析监听读事件

概述

pipeline, 顾名思义, 就是管道的意思, 在netty中, 事件在pipeline中传输, 用户可以中断事件, 添加自己的事件处理逻辑, 可以直接将事件中断不再往下传输, 同样可以改变管道的流向, 传递其他事件.这里有点类似于Spring的AOP, 但是比AOP实现起来简单的多

事件通常分为两种, 一是inBound事件, 另一种是outBound事件, inBound事件, 顾名思义, 就是从另一端流向自己的事件, 比如读事件, 连接完成事件等等, outBound, 是从自己流向另一端的事件, 比如连接事件, 写事件, 刷新缓冲区事件等等

在netty中, 事件是通过handler对象进行处理的, 里面封装着事件的处理逻辑.而每个handler, 是由HandlerContext进行包装的, 里面封装了对事件传输的操作

通过之前的学习, 我们知道每一个channel绑定一个pipeline, 那么pipeline和handler又是什么关系呢?

其实pipeline我们可以理解成是一个双向链表的数据结构, 只是其中存放的并不是数据而是HandlerContext, 而HandlerContext又包装了handler, 事件传输过程中, 从头结点(或者尾节点)开始, 找到下一个HandlerContext, 执行其Handler的业务逻辑, 然后再继续往下走, 直到执行到尾节点(或者头结点, 反向)为止, 通过一幅图了解其大概逻辑:

这里head代表pipeline的头结点, tail代表pipeline的尾节点, 这两个节点是会随着pipeline的初始化而创建, 并且不会被删除

HandlerContext的简单继承关系比较简单, 默认的是DefaultChannelHandlerContext, 继承于AbstractChannelHandlerContext

而Handler分为InboundHandler和outBoundHandler, Inbound专门处理inbound事件, outBound专门用于处理outBound事件

继承关系如下图:

从图中不难看出, 如果属于ChannelInboundHandler的子类, 则属于Inbound类型的handler

如果是ChannelOutboundHandler的子类, 则属于Outbound类型的handler

了解了其大概逻辑, 我们继续跟到源码中, 看其实如何体现的:

pipeline的创建

回顾之前NioServerSocketChannel的创建过程

我们看AbstractChannel的构造方法:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

我们跟到newChannelPipeline()中:

protected DefaultChannelPipeline newChannelPipeline() {
    //传入当前channel
    return new DefaultChannelPipeline(this);
}

我们看到这里创建了一个DefaultChannelPipeline, 并将自身channel传入

继续跟DefaultChannelPipeline的构造方法:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    tail = new TailContext(this);
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
}

首先保存了当前channel

然后保存了两个属性succeededFuture, voidPromise, 这两个属性是Future相关的内容, 之后的章节会讲到

首先, 这里初始化了两个节点, head节点和tail节点, 在pipeline中代表头结点和尾节点

我们首先跟到这tail节点类中,也就是TailContext类:

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    TailContext(DefaultChannelPipeline pipeline) {
        //inbound处理器
        super(pipeline, null, TAIL_NAME, true, false);
        //将当前节点设置为已添加, head和tail.....
        setAddComplete();
    }
    //自身也是handler
    @Override
    public ChannelHandler handler() {
        return this;
    }
    //方法省略

这个是DefualtPipline的内部类, 首先看其继承了AbstractChannelHandlerContext类, 说明自身是个HandlerContext, 同时也实现ChannelInboundHander接口, 并且其中的handler()方法返回了自身, 说明自身也是handler, 而实现ChannelInboundHander, 说明自身只处理Inbound事件

构造方法中, 调用了父类的构造器, 看其中参数:

pipeline是自身所属的pipeline

executor为null

TAIL_NAME是当前handler, 也就是自身的命名

true代表自身是inboundHandler

fasle代表自身不是outboundHandler

继续跟到父类构造方法中:

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                              boolean inbound, boolean outbound) {
    //名字
    this.name = ObjectUtil.checkNotNull(name, "name");
    //pipeline
    this.pipeline = pipeline;
    //线程处理器
    this.executor = executor;
    //事件标识
    this.inbound = inbound;
    this.outbound = outbound;
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}

这里初始化了自身父类的几个属性

pipeline为自身绑定的pipeline

exeutor是线程执行器, 这里为空

inbound和outbound是事件标志, 这里分别是true和false, 也就是自身属于inboundHnadler而不属于outboundHandler

回到DefaultChannelPipeline的构造方法:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    tail = new TailContext(this);
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
}

再看HeadContext类:

final class HeadContext extends AbstractChannelHandlerContext
        implements ChannelOutboundHandler, ChannelInboundHandler {
    private final Unsafe unsafe;
    HeadContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, HEAD_NAME, false, true);
        unsafe = pipeline.channel().unsafe();
        setAddComplete();
    }
    @Override
    public ChannelHandler handler() {
        return this;
    }
}

看过了tail节点, head节点就不难理解, 同样继承了AbstractChannelHandlerContext, 说明自身是一个HandlerContext, 与tail不同的是, 这里实现了ChannelOutboundHandler接口和ChannelOutboundHandler接口, 说明其既能处理inbound事件也能处理outbound的事件, handler方法返归自身, 说明自身是一个handler

在构造方法中初始化了一个Unsafe类型的成员变量, 是通过自身绑定的channel拿到的, 说明这个类中可以进行对channel的读写操作

这里同样调用了父类的构造方法, 不同的是, 这里inbound参数传入了false, 而outbound参数传入了true, 这里说明这里标志的事件是outbound事件

同学们可能疑惑, 为什么同时执行ChannelOutboundHandler接口和ChannelOutboundHandler但是标志的事件不同?

其实这两个地方应用的场景是不同的, 继承ChannelOutboundHandler和ChannelOutboundHandler, 说明其既能处理inbound事件也能处理outBound的事件, 但是只有outbound属性为true说明自身是一个outboundhandler, 是一个可以处理inbound事件的outboundhandler(估计被绕晕了), 这两种handler主要是保证在事件传输中保证事件的单方向流动, 在后面事件传输我们能领会到

再跟进父类的构造方法, 又是我们熟悉的部分:

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                              boolean inbound, boolean outbound) {
    //名字
    this.name = ObjectUtil.checkNotNull(name, "name");
    //pipeline
    this.pipeline = pipeline;
    //线程处理器
    this.executor = executor;
    //事件标识
    this.inbound = inbound;
    this.outbound = outbound;
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}

初始化了pipeline, executor, 和事件标识的属性

回到DefaultChannelPipeline的构造方法:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    tail = new TailContext(this);
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
}

我们介绍完了head, 和tail这两个context, 继续往下看:

head.next = tail;

tail.prev = head;

tail节点和head节点中的next和prev属性, 其实是其父类AbstractChannelHandlerContext, 每一个handlerContext都拥有这两个属性, 代表自身的下一个节点和上一个节点, 因为我们概述中介绍过pipeline其实是一个双向链表, 所以其中每一个节点必须有指向其他节点的指针, 熟悉双向链接数据结构的同学应该不会陌生

这里head节点的next属性是tail节点, tail节点的prev属性是head, 说明当前双向链表只有两个节点, head和tail, 其中head下一个节点指向tail, tail的上一个节点指向head, 如图所示:

以上就是pipeline的初始化过程,更多关于Netty分布式pipeline管道创建的资料请关注我们其它相关文章!

(0)

相关推荐

  • Netty源码解析NioEventLoop创建的构造方法

    目录 前文传送门:Netty源码分析 NioEventLoop 回到上一小节的MultithreadEventExecutorGroup类的构造方法: protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { //代码省略 if (executor == null) { //创建一个新的

  • Netty源码分析NioEventLoop线程的启动

    目录 之前的小节我们学习了NioEventLoop的创建以及线程分配器的初始化, 那么NioEventLoop是如何开启的呢, 我们这一小节继续学习 NioEventLoop的开启方法在其父类SingleThreadEventExecutor中的execute(Runnable task)方法中, 我们跟到这个方法: @Override public void execute(Runnable task) { if (task == null) { throw new NullPointerEx

  • Netty分布式NioEventLoop优化selector源码解析

    目录 优化selector selector的创建过程 代码剖析 这里一步创建了这个优化后的数据结构 最后返回优化后的selector 优化selector selector的创建过程 在剖析selector轮询之前, 我们先讲解一下selector的创建过程 回顾之前的小节, 在创建NioEventLoop中初始化了唯一绑定的selector: NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider

  • Netty源码分析NioEventLoop初始化线程选择器创建

    前文传送门:NioEventLoop创建 初始化线程选择器 回到上一小节的MultithreadEventExecutorGroup类的构造方法: protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { //代码省略 if (executor == null) { //创建一个新的线程

  • Netty分布式pipeline管道创建方法跟踪解析

    目录 概述 pipeline的创建 上一章节回顾:Netty分布式源码分析监听读事件 概述 pipeline, 顾名思义, 就是管道的意思, 在netty中, 事件在pipeline中传输, 用户可以中断事件, 添加自己的事件处理逻辑, 可以直接将事件中断不再往下传输, 同样可以改变管道的流向, 传递其他事件.这里有点类似于Spring的AOP, 但是比AOP实现起来简单的多 事件通常分为两种, 一是inBound事件, 另一种是outBound事件, inBound事件, 顾名思义, 就是从另

  • Netty分布式pipeline管道Handler的添加代码跟踪解析

    目录 添加handler 我们跟到其addLast()方法中 再继续跟到addLast()方法中去 我们跟到checkMultiplicity(handler)中 跟到filterName方法中 跟到isInbound(handler)方法中 我们回到最初的addLast()方法中 我们跟进addLast0(newCtx)中 前文传送门:Netty分布式pipeline管道创建 添加handler 我们以用户代码为例进行剖析: .childHandler(new ChannelInitializ

  • Netty分布式pipeline管道传播outBound事件源码解析

    目录 outbound事件传输流程 这里我们同样给出两种写法 跟到其write方法中: 跟到findContextOutbound中 回到write方法: 继续跟invokeWrite0 我们跟到HeadContext的write方法中 了解了inbound事件的传播过程, 对于学习outbound事件传输的流程, 也不会太困难 outbound事件传输流程 在我们业务代码中, 有可能使用wirte方法往写数据: public void channelActive(ChannelHandlerC

  • Netty分布式pipeline管道异常传播事件源码解析

    目录 传播异常事件 简单的异常处理的场景 我们跟到invokeChannelRead这个方法 我还是通过两种写法来进行剖析 跟进invokeExceptionCaught方法 跟到invokeExceptionCaught方法中 讲完了inbound事件和outbound事件的传输流程, 这一小节剖析异常事件的传输流程 传播异常事件 简单的异常处理的场景 @Override public void channelRead(ChannelHandlerContext ctx, Object msg

  • Netty分布式pipeline管道传播事件的逻辑总结分析

    目录 问题分析 首先完成了handler的添加, 但是并没有马上执行回调 回到callHandlerCallbackLater方法中 章节总结 我们在第一章和第三章中, 遗留了很多有关事件传输的相关逻辑, 这里带大家一一回顾 问题分析 首先看两个问题: 1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为什么会调用到ServerBootstrap的内部类ServerBootstrapAccep

  • Netty分布式pipeline管道Handler的删除逻辑操作

    目录 删除handler操作 我们跟到getContextPrDie这个方法中 首先要断言删除的节点不能是tail和head 回到remove(ctx)方法 上一小节我们学习了添加handler的逻辑操作, 这一小节我们学习删除handler的相关逻辑 删除handler操作 如果用户在业务逻辑中进行ctx.pipeline().remove(this)这样的写法, 或者ch.pipeline().remove(new SimpleHandler())这样的写法, 则就是对handler进行删除

  • Netty分布式pipeline传播inbound事件源码分析

    前一小结回顾:pipeline管道Handler删除 传播inbound事件 有关于inbound事件, 在概述中做过简单的介绍, 就是以自己为基准, 流向自己的事件, 比如最常见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程 在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到chann

  • Netty分布式ByteBuf缓冲区分配器源码解析

    目录 缓冲区分配器 以其中的分配ByteBuf的方法为例, 对其做简单的介绍 跟到directBuffer()方法中 我们回到缓冲区分配的方法 然后通过validate方法进行参数验证 缓冲区分配器 顾明思议就是分配缓冲区的工具, 在netty中, 缓冲区分配器的顶级抽象是接口ByteBufAllocator, 里面定义了有关缓冲区分配的相关api 抽象类AbstractByteBufAllocator实现了ByteBufAllocator接口, 并且实现了其大部分功能 和AbstractByt

  • Netty分布式行解码器逻辑源码解析

    目录 行解码器LineBasedFrameDecoder 首先看其参数 我们跟到重载的decode方法中 我们看findEndOfLine(buffer)方法 这一小节了解下行解码器LineBasedFrameDecoder, 行解码器的功能是一个字节流, 以\r\n或者直接以\n结尾进行解码, 也就是以换行符为分隔进行解析 同样, 这个解码器也继承了ByteToMessageDecoder 行解码器LineBasedFrameDecoder 首先看其参数 //数据包的最大长度, 超过该长度会进

随机推荐