Netty启动流程注册多路复用源码解析

目录
  • 注册多路复用
    • 注册channel的步骤
      • 首先看下config()方法
      • 回到initAndRegister()方法:
      • 跟到MultithreadEventLoopGroup的register()方法:
      • 回顾下第二小节channel初始化的步骤:
    • 我们继续看看register()方法:
      • 我们重点关注register0(promise), 跟进去:
      • 我们重点关注doRegister()这个方法

前文传送门:Netty启动流程服务端channel初始化

注册多路复用

回到上一小节的代码:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        //创建channel
        channel = channelFactory.newChannel();
        //初始化channel
        init(channel);
    } catch (Throwable t) {
        //忽略非关键代码
    }
    //注册channel
    ChannelFuture regFuture = config().group().register(channel);
    //忽略非关键代码
    return regFuture;
}

注册channel的步骤

我们讲完创建channel和初始化channel的关键步骤, 我们继续跟注册channel的步骤:

ChannelFuture regFuture = config().group().register(channel);

其中, 重点关注下register(channel)这个方法, 这个方法最终会调用到AbstractChannel中内部类AbstractUnsafe的register()方法, 具体如何调用到这个方法, 可以简单带大家捋一下

首先看下config()方法

由于是ServerBootstrap调用的, 所以我们跟进去:

public final ServerBootstrapConfig config() {
    return config;
}

返回的config是ServerBootrap的成员变量config:

private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);

跟到ServerBootstrapConfig的构造方法:

ServerBootstrapConfig(ServerBootstrap bootstrap) {
    super(bootstrap);
}

继续跟到其父类AbstractBootstrapConfig的构造方法:

protected AbstractBootstrapConfig(B bootstrap) {
    this.bootstrap = ObjectUtil.checkNotNull(bootstrap, "bootstrap");
}

我们发现我们创建的ServerBootstrap作为参数初始化了其成员变量bootstrap

回到initAndRegister()方法:

config()返回的是ServerBootstrapConfig对象

再继续跟到其group()方法:

public final EventLoopGroup group() {
    return bootstrap.group();
}

这里调用Bootstrap的group()方法:

public final EventLoopGroup group() {
    return group;
}

这里返回了AbstractBootstrap的成员变量group, 我们回顾下第一小节, 还记得AbstractBootstrap的group(EventLoopGroup group)方法吗?

public B group(EventLoopGroup group) {
    this.group = group;
    return (B) this;
}

group(EventLoopGroup group)方法初始化了我们boss线程, 而group()返回了boss线程, 也就是说 config().group().register(channel) 中的register()方法是boss线程对象调用的, 由于我们当初初始化的是NioEventLoopGroup, 因此走的是NioEventLoopGroup的父类的MultithreadEventLoopGroup的register()方法

跟到MultithreadEventLoopGroup的register()方法:

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

这里的代码看起来有点晕, 没关系, 以后会讲到, 现在可以大概做个了解, NioEventLoopGroup是个线程组, 而next()方法就是从线程组中选出一个线程, 也就是NioEventLoop线程, 所以这里的next()方法返回的是NioEventLoop对象, 其中register(channel)最终会调用NioEventLoop的父类SingleThreadEventLoop的register(channel)方法

跟到SingleThreadEventLoop的register(channel)方法:

public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

其中DefaultChannelPromise类我们之后也会讲到

我们先跟到register(new DefaultChannelPromise(channel, this)):

public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

channel()会返回我们初始化的NioServerSocketChannel, unsafe()会返回我们创建channel的时候初始化的unsafe对象

跟进去看AbstractChannel的unsafe()的实现:

public Unsafe unsafe() {
    return unsafe;
}

这里返回的unsafe, 就是我们初始化channel创建的unsafe

回顾下第二小节channel初始化的步骤:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

我们看unsafe的初始化:unsafe=newUnsafe()

跟到newUnsafe()中, 我们之前讲过NioServerSokectChannel的父类是AbstractNioMessageChannel, 所以会调用到到AbstractNioMessageChannel类中的newUnsafe()

跟到AbstractNioMessageChannel类中的newUnsafe():

protected AbstractNioUnsafe newUnsafe() {
    return new NioMessageUnsafe();
}

我们看到这里创建了NioMessageUnsafe()对象, 所以在 promise.channel().unsafe().register(this, promise) 代码中, unsafe()是返回的NioMessageUnsafe()对象, 最后调用其父类AbstractUnsafe(也就是AbstractChannel的内部类)的register()方法,

简单介绍下unsafe接口, unsafe顾名思义就是不安全的, 因为很多对channel的io方法都定义在unsafe中, 所以netty将其作为内部类进行封装, 防止被外部直接调用, unsafe接口是Channel接口的内部接口, unsafe的子类也分别封装在Channel的子类中, 比如我们现在剖析的register()方法, 就是封装在AbstractChannel类的内部类AbstractUnsafe中的方法, 有关Unsafe和Channel的继承关系如下:

以上内容如果不明白没有关系, 有关NioEventLoop相关会在后面的章节讲到, 目前我们只是了解是如何走到AbstractUnsafe类的register()即可

我们继续看看register()方法:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    //代码省略
    //所有的复制操作, 都交给eventLoop处理(1)
    AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    //做实际主注册(2)
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            //代码省略
        }
    }
}

我们跟着注释的步骤继续走, 第一步, 绑定eventLoop线程:

AbstractChannel.this.eventLoop = eventLoop;

eventLoop是AbstractChannel的成员变量, 有关eventLoop, 我们会在绪章节讲到, 这里我们只需要知道, 每个channel绑定唯一的eventLoop线程, eventLoop线程和channel的绑定关系就是在这里展现的

再看第二步, 做实际注册:

我们先看if判断, if(eventLoop.inEventLoop())

这里是判断是不是eventLoop线程, 显然我们现在是main()方法所在的线程, 所以走的else, eventLoop.execute()是开启一个eventLoop线程, 而register0(promise)就是再开启线程之后, 通过eventLoop线程执行的, 这里大家暂时作为了解

我们重点关注register0(promise), 跟进去:

private void register0(ChannelPromise promise) {
    try {
        //做实际的注册(1)
        doRegister();
        neverRegistered = false;
        registered = true;
        //触发事件(2)
        pipeline.invokeHandlerAddedIfNeeded();
        safeSetSuccess(promise);
        //触发注册成功事件(3)
        pipeline.fireChannelRegistered();
        if (isActive()) {
            if (firstRegistration) {
                //传播active事件(4)
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        //省略代码
    }
}

我们重点关注doRegister()这个方法

doRegister()最终会调用AbstractNioChannel的doRegister()方法:

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            //jdk底层的注册方法
            //第一个参数为selector, 第二个参数表示不关心任何事件
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            //省略代码
        }
    }
}

我们终于看到和java底层相关的方法了

跟到javaChannel()的方法中:

protected SelectableChannel javaChannel() {
    return ch;
}

这个ch, 就是本章第二小节创建NioServerSocketChannel中初始化的jdk底层ServerSocketChannel

这里register(eventLoop().selector, 0, this)方法中eventLoop().selector, 是获得每一个eventLoop绑定的唯一的selector, 0代表这次只是注册, 并不监听任何事件, this是代表将自身(NioEventLoopChannel)作为属性绑定在返回的selectionKey当中, 这个selectionKey就是与每个channel绑定的jdk底层的SelectionKey对象, 熟悉nio的小伙伴应该不会陌生, 这里不再赘述

回到register0(ChannelPromise promise)方法, 我们看后续步骤:

步骤(2)是触发handler的需要添加事件, 事件传递的内容我们将在后续课程详细介绍, 这里不必深究

步骤(3)是触发注册成功事件(3), 同上

步骤(4)是传播active事件(4), 这里简单强调一下, 这里的方法pipeline.fireChannelActive()第一个注册是执行不到的, 因为isActive()会返回false, 因为链路没完成

本小节梳理了有注册多路复用的相关逻辑, 同学们可以跟着代码自己走一遍以加深印象

以上就是Netty启动流程注册多路复用源码分析的详细内容,更多关于Netty启动流程的资料请关注我们其它相关文章!

(0)

相关推荐

  • Netty分布式Server启动流程服务端初始化源码分析

    目录 第一节:服务端初始化 group方法 初始化成员变量 初始化客户端Handler 第一节:服务端初始化 首先看下在我们用户代码中netty的使用最简单的一个demo: //创建boss和worker线程(1) EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //创建ServerBootstrap(2) ServerBootst

  • Netty分布式server启动流程Nio创建源码分析

    目录 NioServerSocketChannel创建 继承关系 绑定端口 端口封装成socket地址对象 跟进initAndRegister()方法 创建channel 父类的构造方法 将jdk的channel设置为非阻塞模式 前文传送门 Netty分布式Server启动流程服务端初始化源码分析 NioServerSocketChannel创建 我们如果熟悉Nio, 则对channel的概念则不会陌生, channel在相当于一个通道, 用于数据的传输 Netty将jdk的channel进行了

  • Netty启动流程服务端channel初始化源码分析

    目录 服务端channel初始化 回顾上一小节initAndRegister()方法 init(Channel)方法 前文传送门 Netty分布式server启动流程 服务端channel初始化 回顾上一小节initAndRegister()方法 final ChannelFuture initAndRegister() { Channel channel = null; try { //创建channel channel = channelFactory.newChannel(); //初始化

  • Netty组件NioEventLoopGroup创建线程执行器源码解析

    目录 通过上一章的学习, 我们了解了Server启动的大致流程, 有很多组件与模块并没有细讲, 从这个章开始, 我们开始详细剖析netty的各个组件, 并结合启动流程, 将这些组件的使用场景及流程进行一个详细的说明 这一章主要学习NioEventLoop相关的知识, 何为NioEventLoop? NioEventLoop是netty的一个线程, 在上一节我们创建两个NioEventLoopGroup: EventLoopGroup bossGroup = new NioEventLoopGro

  • Netty启动流程注册多路复用源码解析

    目录 注册多路复用 注册channel的步骤 首先看下config()方法 回到initAndRegister()方法: 跟到MultithreadEventLoopGroup的register()方法: 回顾下第二小节channel初始化的步骤: 我们继续看看register()方法: 我们重点关注register0(promise), 跟进去: 我们重点关注doRegister()这个方法 前文传送门:Netty启动流程服务端channel初始化 注册多路复用 回到上一小节的代码: fina

  • Android10 App 启动分析进程创建源码解析

    目录 正文 RootActivityContainer ActivityStartController 调用startActivityUnchecked方法 ActivityStackSupervisor 启动进程 RuntimeInit.applicationInit这个方法 正文 从前文# Android 10 启动分析之SystemServer篇 (四)中可以得知,系统在完成所有的初始化工作后,会通过 mAtmInternal.startHomeOnAllDisplays(currentU

  • Netty分布式NioEventLoop优化selector源码解析

    目录 优化selector selector的创建过程 代码剖析 这里一步创建了这个优化后的数据结构 最后返回优化后的selector 优化selector selector的创建过程 在剖析selector轮询之前, 我们先讲解一下selector的创建过程 回顾之前的小节, 在创建NioEventLoop中初始化了唯一绑定的selector: NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider

  • Netty分布式ByteBuf缓冲区分配器源码解析

    目录 缓冲区分配器 以其中的分配ByteBuf的方法为例, 对其做简单的介绍 跟到directBuffer()方法中 我们回到缓冲区分配的方法 然后通过validate方法进行参数验证 缓冲区分配器 顾明思议就是分配缓冲区的工具, 在netty中, 缓冲区分配器的顶级抽象是接口ByteBufAllocator, 里面定义了有关缓冲区分配的相关api 抽象类AbstractByteBufAllocator实现了ByteBufAllocator接口, 并且实现了其大部分功能 和AbstractByt

  • Netty分布式行解码器逻辑源码解析

    目录 行解码器LineBasedFrameDecoder 首先看其参数 我们跟到重载的decode方法中 我们看findEndOfLine(buffer)方法 这一小节了解下行解码器LineBasedFrameDecoder, 行解码器的功能是一个字节流, 以\r\n或者直接以\n结尾进行解码, 也就是以换行符为分隔进行解析 同样, 这个解码器也继承了ByteToMessageDecoder 行解码器LineBasedFrameDecoder 首先看其参数 //数据包的最大长度, 超过该长度会进

  • Android okhttp的启动流程及源码解析

    前言 这篇文章主要讲解了okhttp的主要工作流程以及源码的解析. 什么是OKhttp 简单来说 OkHttp 就是一个客户端用来发送 HTTP 消息并对服务器的响应做出处理的应用层框架. 那么它有什么优点呢? 易使用.易扩展. 支持 HTTP/2 协议,允许对同一主机的所有请求共用同一个 socket 连接. 如果 HTTP/2 不可用, 使用连接池复用减少请求延迟. 支持 GZIP,减小了下载大小. 支持缓存处理,可以避免重复请求. 如果你的服务有多个 IP 地址,当第一次连接失败,OkHt

  • Netty实战源码解析NIO编程

    目录 1 前言 2 Netty是什么? 3 Java I/O模型简介 3.1 BIO代码实现 4 Java NIO 4.1 基本介绍 4.2 三大核心组件的关系 4.3 Buffer缓冲区 4.4 Channel通道 4.5 Selector选择器 4.5.1 Selector的创建 4.5.2 注册Channel到Selector 4.5.3 SelectionKey 4.5.4 从Selector中选择Channel 4.5.5 停止选择的方法 4.5.6 NIO客户端.服务端 5 Java

随机推荐