Netty分布式pipeline传播inbound事件源码分析

前一小结回顾:pipeline管道Handler删除

传播inbound事件

有关于inbound事件, 在概述中做过简单的介绍, 就是以自己为基准, 流向自己的事件, 比如最常见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程

在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到channelRead方法中了呢, 也是我们这一小节要剖析的内容

在业务代码中, 传递channelRead事件方式是通过fireChannelRead方法进行传播的

这里给大家看两种写法

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //写法1:
    ctx.fireChannelRead(msg);
    //写法2
    ctx.pipeline().fireChannelRead(msg);
}

这里重写了channelRead方法, 并且方法体内继续通过fireChannelRead方法进行传播channelRead事件, 那么这两种写法有什么异同?

我们先以写法2为例, 将这种写法进行剖析

这里首先获取当前context的pipeline对象, 然后通过pipeline对象调用自身的fireChannelRead方法进行传播, 因为默认创建的DefaultChannelpipeline

我们跟到DefaultChannelpipeline的fireChannelRead方法中:

public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

这里首先调用的是AbstractChannelHandlerContext类的静态方法invokeChannelRead, 参数传入head节点和事件的消息

我们跟进invokeChannelRead方法:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

这里的Object m m通常就是我们传入的msg, 而next, 目前是head节点, 然后再判断是否为当前eventLoop线程, 如果不是则将方法包装成task交给eventLoop线程处理

我们跟到invokeChannelRead方法中

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

首先通过invokeHandler()判断当前handler是否已添加, 如果添加, 则执行当前handler的chanelRead方法, 其实这里我们基本上就明白了, 通过fireChannelRead方法传递事件的过程中, 其实就是找到相关handler执行其channelRead方法, 由于我们在这里的handler就是head节点, 所以我们跟到HeadContext的channelRead方法中:

HeadContext的channelRead方法:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //向下传递channelRead事件
    ctx.fireChannelRead(msg);
}

在这里我们看到, 这里通过fireChannelRead方法继续往下传递channelRead事件, 而这种调用方式, 就是我们刚才分析用户代码的第一种调用方式:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //写法1:
    ctx.fireChannelRead(msg);
    //写法2
    ctx.pipeline().fireChannelRead(msg);
}

这里直接通过context对象调用fireChannelRead方法, 那么和使用pipeline调用有什么区别的, 我会回到HeadConetx的channelRead方法, 我们来剖析ctx.fireChannelRead(msg)这句, 大家就会对这个问题有答案了, 跟到ctx的fireChannelRead方法中, 这里会走到AbstractChannelHandlerContext类中的fireChannelRead方法中

跟到AbstractChannelHandlerContext类中的fireChannelRead方法:

public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

这里我们看到, invokeChannelRead方法中传入了一个findContextInbound()参数, 而这findContextInbound方法其实就是找到当前Context的下一个节点

跟到findContextInbound方法

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

这里的逻辑也比较简单, 是通过一个doWhile循环, 找到当前handlerContext的下一个节点, 这里要注意循环的终止条件, while (!ctx.inbound)表示下一个context标志的事件不是inbound的事件, 则循环继续往下找, 言外之意就是要找到下一个标注inbound事件的节点

有关事件的标注, 之前的小节已经剖析过了, 如果是用户定义的handler, 是通过handler继承的接口而定的, 如果tail或者head, 那么是在初始化的时候就已经定义好, 这里不再赘述

回到AbstractChannelHandlerContext类的fireChannelRead方法中:

public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

找到下一个节点后, 继续调用invokeChannelRead方法, 传入下一个和消息对象:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    //第一次执行next其实就是head
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

这里的逻辑我们又不陌生了, 因为我们传入的是当前context的下一个节点, 所以这里会调用下一个节点invokeChannelRead方法, 因我们刚才剖析的是head节点, 所以下一个节点有可能是用户添加的handler的包装类HandlerConext的对象

这里我们跟进invokeChannelRead方法中去:

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            //发生异常的时候在这里捕获异常
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

又是我们熟悉的逻辑, 调用了自身handler的channelRead方法, 如果是用户自定义的handler, 则会走到用户定义的channelRead()方法中去, 所以这里就解释了为什么通过传递channelRead事件, 最终会走到用户重写的channelRead方法中去

同样, 也解释了该小节最初提到过的两种写法的区别:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //写法1:
    ctx.fireChannelRead(msg);
    //写法2
    ctx.pipeline().fireChannelRead(msg);
}

写法1是通过当前节点往下传播事件

写法2是通过头节点往下传递事件

所以, 在handler中如果如果要在channelRead方法中传递channelRead事件, 一定要采用写法2的方式向下传递, 或者交给其父类处理, 如果采用1的写法则每次事件传输到这里都会继续从head节点传输, 从而陷入死循环或者发生异常

这里有一点需要注意, 如果用户代码中channelRead方法, 如果没有显示的调用ctx.fireChannelRead(msg)那么事件则不会再往下传播, 则事件会在这里终止, 所以如果我们写业务代码的时候要考虑有关资源释放的相关操作

如果ctx.fireChannelRead(msg)则事件会继续往下传播, 如果每一个handler都向下传播事件, 当然, 根据我们之前的分析channelRead事件只会在标识为inbound事件的HandlerConetext中传播, 传播到最后, 则最终会调用到tail节点的channelRead方法

我们跟到tailConext的channelRead方法中

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    onUnhandledInboundMessage(msg);
}

我们跟进到onUnhandledInboundMessage方法中:

protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. " +
                        "Please check your pipeline configuration.", msg);
    } finally {
        //释放资源
        ReferenceCountUtil.release(msg);
    }
}

这里做了释放资源的相关的操作

至此, channelRead事件传输相关罗辑剖析完整, 其实对于inbound事件的传输流程都会遵循这一逻辑, 小伙伴们可以自行剖析其他inbound事件的传输流程

以上就是Netty分布式pipeline传播inbound事件源码分析的详细内容,更多关于Netty分布式pipeline传播inbound事件的资料请关注我们其它相关文章!

(0)

相关推荐

  • Netty分布式pipeline管道创建方法跟踪解析

    目录 概述 pipeline的创建 上一章节回顾:Netty分布式源码分析监听读事件 概述 pipeline, 顾名思义, 就是管道的意思, 在netty中, 事件在pipeline中传输, 用户可以中断事件, 添加自己的事件处理逻辑, 可以直接将事件中断不再往下传输, 同样可以改变管道的流向, 传递其他事件.这里有点类似于Spring的AOP, 但是比AOP实现起来简单的多 事件通常分为两种, 一是inBound事件, 另一种是outBound事件, inBound事件, 顾名思义, 就是从另

  • Netty分布式pipeline管道Handler的删除逻辑操作

    目录 删除handler操作 我们跟到getContextPrDie这个方法中 首先要断言删除的节点不能是tail和head 回到remove(ctx)方法 上一小节我们学习了添加handler的逻辑操作, 这一小节我们学习删除handler的相关逻辑 删除handler操作 如果用户在业务逻辑中进行ctx.pipeline().remove(this)这样的写法, 或者ch.pipeline().remove(new SimpleHandler())这样的写法, 则就是对handler进行删除

  • Netty分布式NioSocketChannel注册到selector方法解析

    目录 我们回到最初的NioMessageUnsafe的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel的pipeline final ChannelPipeline pipeline = pipeline(

  • Netty分布式pipeline管道Handler的添加代码跟踪解析

    目录 添加handler 我们跟到其addLast()方法中 再继续跟到addLast()方法中去 我们跟到checkMultiplicity(handler)中 跟到filterName方法中 跟到isInbound(handler)方法中 我们回到最初的addLast()方法中 我们跟进addLast0(newCtx)中 前文传送门:Netty分布式pipeline管道创建 添加handler 我们以用户代码为例进行剖析: .childHandler(new ChannelInitializ

  • Netty客户端接入流程NioSocketChannel创建解析

    目录 NioSocketChannel的创建 回到上一小节的read()方法 我们首先看readBuf jdk底层相关的内容 跟到父类构造方法中 我们跟进其构造方法 前文传送门:Netty客户端处理接入事件handle创建 NioSocketChannel的创建 回到上一小节的read()方法 public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel

  • Netty分布式pipeline传播inbound事件源码分析

    前一小结回顾:pipeline管道Handler删除 传播inbound事件 有关于inbound事件, 在概述中做过简单的介绍, 就是以自己为基准, 流向自己的事件, 比如最常见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程 在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到chann

  • Netty分布式NioEventLoop任务队列执行源码分析

    目录 执行任务队列 跟进runAllTasks方法: 我们跟进fetchFromScheduledTaskQueue()方法 回到runAllTasks(long timeoutNanos)方法中 章节小结 前文传送门:NioEventLoop处理IO事件 执行任务队列 继续回到NioEventLoop的run()方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectN

  • Netty分布式pipeline管道传播outBound事件源码解析

    目录 outbound事件传输流程 这里我们同样给出两种写法 跟到其write方法中: 跟到findContextOutbound中 回到write方法: 继续跟invokeWrite0 我们跟到HeadContext的write方法中 了解了inbound事件的传播过程, 对于学习outbound事件传输的流程, 也不会太困难 outbound事件传输流程 在我们业务代码中, 有可能使用wirte方法往写数据: public void channelActive(ChannelHandlerC

  • Netty分布式pipeline管道异常传播事件源码解析

    目录 传播异常事件 简单的异常处理的场景 我们跟到invokeChannelRead这个方法 我还是通过两种写法来进行剖析 跟进invokeExceptionCaught方法 跟到invokeExceptionCaught方法中 讲完了inbound事件和outbound事件的传输流程, 这一小节剖析异常事件的传输流程 传播异常事件 简单的异常处理的场景 @Override public void channelRead(ChannelHandlerContext ctx, Object msg

  • Netty分布式pipeline管道传播事件的逻辑总结分析

    目录 问题分析 首先完成了handler的添加, 但是并没有马上执行回调 回到callHandlerCallbackLater方法中 章节总结 我们在第一章和第三章中, 遗留了很多有关事件传输的相关逻辑, 这里带大家一一回顾 问题分析 首先看两个问题: 1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为什么会调用到ServerBootstrap的内部类ServerBootstrapAccep

  • Netty分布式行解码器逻辑源码解析

    目录 行解码器LineBasedFrameDecoder 首先看其参数 我们跟到重载的decode方法中 我们看findEndOfLine(buffer)方法 这一小节了解下行解码器LineBasedFrameDecoder, 行解码器的功能是一个字节流, 以\r\n或者直接以\n结尾进行解码, 也就是以换行符为分隔进行解析 同样, 这个解码器也继承了ByteToMessageDecoder 行解码器LineBasedFrameDecoder 首先看其参数 //数据包的最大长度, 超过该长度会进

  • Netty分布式NioEventLoop优化selector源码解析

    目录 优化selector selector的创建过程 代码剖析 这里一步创建了这个优化后的数据结构 最后返回优化后的selector 优化selector selector的创建过程 在剖析selector轮询之前, 我们先讲解一下selector的创建过程 回顾之前的小节, 在创建NioEventLoop中初始化了唯一绑定的selector: NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider

随机推荐