分布式Netty源码分析EventLoopGroup及介绍

目录
  • EventLoopGroup介绍
    • 功能1:先来看看注册Channel
    • 功能2:执行一些Runnable任务
  • EventLoop介绍
    • NioEventLoop介绍
  • EpollEventLoop介绍
  • 后续

EventLoopGroup介绍

在前面一篇文章中提到了,EventLoopGroup主要负责2个事情,这里再重复下:

它主要包含2个方面的功能,注册Channel和执行一些Runnable任务。

功能1:先来看看注册Channel

即将Channel注册到Selector上,由Selector来调度Channel的相关事件,如读、写、Accept等事件。

而EventLoopGroup的设计是,它包含多个EventLoop(每一个EventLoop通常内部包含一个线程),在执行上述注册过程中是需要选择其中的一个EventLoop来执行上述注册行为,这里就出现了一个选择策略的问题,该选择策略接口是EventExecutorChooser,你也可以自定义一个实现。

从上面可以看到,EventLoopGroup做的工作大部分是一些总体性的工作如初始化上述多个EventLoop、EventExecutorChooser等,具体的注册Channel还是交给它内部的EventLoop来实现。

功能2:执行一些Runnable任务

EventLoopGroup继承了EventExecutorGroup,EventExecutorGroup也是EventExecutor的集合,EventExecutorGroup也是掌管着EventExecutor的初始化工作,EventExecutorGroup对于Runnable任务的执行也是选择内部中的一个EventExecutor来做具体的执行工作。

netty中很多任务都是异步执行的,一旦当前线程要对某个EventLoop执行相关操作,如注册Channel到某个EventLoop,如果当前线程和所要操作的EventLoop内部的线程不是同一个,则当前线程就仅仅向EventLoop提交一个注册任务,对外返回一个ChannelFuture。

总结:EventLoopGroup含有上述2种功能,它更多的是一个集合,但是具体的功能实现还是选择内部的一个item元素来执行相关任务。 这里的内部item元素通常即实现了EventLoop,又实现了EventExecutor,如NioEventLoop等

继续来看看EventLoopGroup的整体类图

从图中可以看到有2路分支:

  • 1 MultithreadEventLoopGroup:用于封装多线程的初始化逻辑,指定线程数等,即初始化对应数量的EventLoop,每个EventLoop分配到一个线程

上图中的newChild方法,NioEventLoopGroup就采用NioEventLoop作为实现,EpollEventLoopGroup就采用EpollEventLoop作为实现

如NioEventLoopGroup的实现:

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
  • 2 EventLoop接口实现了EventLoopGroup接口,主要因为EventLoopGroup中的功能接口还是要靠内部的EventLoop来完成具体的操作

EventLoop介绍

EventLoop主要工作就是注册Channel,并负责监控管理Channel的读写等事件,这就涉及到不同的监控方式,linux下有3种方式来进行事件监听

select、poll、epoll

目前java的Selector接口的实现如下:

PollSelectorImpl:实现了poll方式

EPollSelectorImpl:实现了epoll方式

而Netty呢则使用如下:

NioEventLoop:采用的是jdk Selector接口(使用PollSelectorImpl的poll方式)来实现对Channel的事件检测

EpollEventLoop:没有采用jdk Selector的接口实现EPollSelectorImpl,而是Netty自己实现的epoll方式来实现对Channel的事件检测,所以在EpollEventLoop中就不存在jdk的Selector。

NioEventLoop介绍

对于NioEventLoopGroup的功能,NioEventLoop都要做实际的实现,NioEventLoop既要实现注册功能,又要实现运行Runnable任务

对于注册Channel:NioEventLoop将Channel注册到NioEventLoop内部的PollSelectorImpl上,来监听该Channel的读写事件

对于运行Runnable任务:NioEventLoop的父类的父类SingleThreadEventExecutor实现了运行Runnable任务,在SingleThreadEventExecutor中,有一个任务队列还有一个分配的线程

private final Queue<Runnable> taskQueue;
private volatile Thread thread;

NioEventLoop在该线程中不仅要执行Selector带来的IO事件,还要不断的从上述taskQueue中取出任务来执行这些非IO事件。下面我们来详细看下这个过程

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                    // fallthrough
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                processSelectedKeys();
                runAllTasks();
            } else {
                final long ioStartTime = System.nanoTime();

                processSelectedKeys();

                final long ioTime = System.nanoTime() - ioStartTime;
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }

            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    break;
                }
            }
        } catch (Throwable t) {
            ...
        }
    }
}

来详细说下这个过程:

  • 1 计算当前是否需要执行select过程

如果当前没有Runnable任务,则执行select(这个select过程稍后详细来说)。

如果当前有Runnable任务,则要去执行处理流程,此时顺便执行下selector.selectNow(),万一有事件发生那就赚了,没有白走这次处理流程

  • 2 根据IO任务的时间占比设置来执行IO任务和非IO任务,即上面提到的Runnable任务

如果ioRatio=100则每次都是执行全部的IO任务,执行全部的非IO任务 默认ioRatio=50,即一半时间用于处理IO任务,另一半时间用于处理非IO任务。怎么去控制非IO任务所占用时间呢?

这里是每执行64个非IO任务(这里可能是每个非IO任务比较短暂,减少一些判断带来的消耗)就判断下占用时间是否超过了上述时间限制

接下来详细看下上述select过程

Selector selector = this.selector;
try {
    int selectCnt = 0;
    long currentTimeNanos = System.nanoTime();
    long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
    for (;;) {
        long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
        if (timeoutMillis <= 0) {
            if (selectCnt == 0) {
                selector.selectNow();
                selectCnt = 1;
            }
            break;
        }
        // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
        // Selector#wakeup. So we need to check task queue again before executing select operation.
        // If we don't, the task might be pended until select operation was timed out.
        // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
        if (hasTasks() && wakenUp.compareAndSet(false, true)) {
            selector.selectNow();
            selectCnt = 1;
            break;
        }
        int selectedKeys = selector.select(timeoutMillis);
        selectCnt ++;
        if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
            // - Selected something,
            // - waken up by user, or
            // - the task queue has a pending task.
            // - a scheduled task is ready for processing
            break;
        }
        if (Thread.interrupted()) {
            // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
            // As this is most likely a bug in the handler of the user or it's client library we will
            // also log it.
            //
            // See https://github.com/netty/netty/issues/2426
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely because " +
                        "Thread.currentThread().interrupt() was called. Use " +
                        "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
            }
            selectCnt = 1;
            break;
        }
        long time = System.nanoTime();
        if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
            // timeoutMillis elapsed without anything selected.
            selectCnt = 1;
        } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            // The selector returned prematurely many times in a row.
            // Rebuild the selector to work around the problem.
            logger.warn(
                    "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                    selectCnt, selector);
            rebuildSelector();
            selector = this.selector;
            // Select again to populate selectedKeys.
            selector.selectNow();
            selectCnt = 1;
            break;
        }
        currentTimeNanos = time;
    }
} catch (CancelledKeyException e) {
	...
}
  • 1 首先计算此次select过程的截止时间
    protected long delayNanos(long currentTimeNanos) {
        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
        if (scheduledTask == null) {
            return SCHEDULE_PURGE_INTERVAL;
        }
        return scheduledTask.delayNanos(currentTimeNanos);
    }

这里其实就是从一个定时 任务队列中取出定时任务,如果有则计算出离当前定时任务的下一次执行时间之差,如果没有则按照固定的1s作为select过程的时间

  • 2 将当前时间差转化成ms

如果当前时间差不足0.5ms的话,即timeoutMillis<=0,并且是第一次执行,则认为时间太短执行执行一次selectNow

  • 3 如果有任务,则立即执行一次selectNow,跳出for循环
  • 4 然后就是普通的selector.select(timeoutMillis)

在这段时间内如果有事件则跳出for循环,如果没有事件则已经花费对应的时间差了,再次执行for循环,计算的timeoutMillis就会小于0,也会跳出for循环

在上述逻辑中,基本selectCnt都是1,不会出现很多次,而这里针对selectCnt有很多次的处理是基于一个情况:

 selector.select(timeoutMillis)

Selector的正常逻辑是一旦有事件就返回,没有事件则最多等待timeoutMillis时间。 然而底层操作系统实现可能有bug,会出现:即使没有产生事件就直接返回了,并没有按照要求等待timeoutMillis时间。

现在的解决办法就是: 记录上述出现的次数,一旦超过512这个阈值(可设置),就重新建立新的Selector,并将之前的Channel也全部迁移到新的Selector上

至此,NioEventLoop的主逻辑流程就介绍完了,之后就该重点介绍其中对于IO事件的处理了。然后就会引出来ChannelPipeline的处理流程

EpollEventLoop介绍

EpollEventLoop和NioEventLoop的主流程逻辑基本上是差不多的,不同之处就在于EpollEventLoop用epoll方式替换NioEventLoop中的PollSelectorImpl的poll方式。

这里不再详细说明了,之后会详细的说明Netty的epoll方式和jdk中的epoll方式的区别。

后续

下一篇就要详细描述下NioEventLoop对于IO事件的处理,即ChannelPipeline的处理流程。

以上就是分布式Netty源码分析EventLoopGroup及介绍的详细内容,更多关于分布式Netty EventLoopGroup源码分析的资料请关注我们其它相关文章!

(0)

相关推荐

  • SpringBoot整合Netty心跳机制过程详解

    前言 Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty. 最终能达到的效果: 客户端每隔 N 秒检测是否需要发送心跳. 服务端也每隔 N 秒检测是否需要发送心跳. 服务端可以主动 push 消息到客户端. 基于 SpringBoot 监控,可以查看实时连接以及各种应用信息. IdleStateHandler Netty 可以使用 IdleStateHandler 来实现连接管理,当连接空闲时间太长(没有发送.接收消息)时则会触发一个

  • 使用Netty搭建服务端和客户端过程详解

    前言 前面我们介绍了网络一些基本的概念,虽然说这些很难吧,但是至少要做到理解吧.有了之前的基础,我们来正式揭开Netty这神秘的面纱就会简单很多. 服务端 public class PrintServer { public void bind(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); //1 EventLoopGroup workerGroup = new NioEventLo

  • Java Netty实现心跳机制过程解析

    netty心跳机制示例,使用Netty实现心跳机制,使用netty4,IdleStateHandler 实现.Netty心跳机制,netty心跳检测,netty,心跳 本文假设你已经了解了Netty的使用,或者至少写过netty的helloworld,知道了netty的基本使用.我们知道使用netty的时候,大多数的东西都与Handler有关,我们的业务逻辑基本都是在Handler中实现的.Netty中自带了一个IdleStateHandler 可以用来实现心跳检测. 心跳检测的逻辑 本文中我们

  • 使用Netty进行编解码的操作过程详解

    前言 何为编解码,通俗的来说,我们需要将一串文本信息从A发送到B并且将这段文本进行加工处理,如:A将信息文本信息编码为2进制信息进行传输.B接受到的消息是一串2进制信息,需要将其解码为文本信息才能正常进行处理. 上章我们介绍的Netty如何解决拆包和粘包问题,就是运用了解码的这一功能. java默认的序列化机制 使用Netty大多是java程序猿,我们基于一切都是对象的原则,经常会将对象进行网络传输,那么对于序列化操作肯定大家都是非常熟悉的. 一个对象是不能直接进行网络I/O传输的,jdk默认是

  • Java Netty HTTP服务实现过程解析

    超文本传输协议(HTTP,HyperText Transfer Protocol)是互联网上应用最为广泛的一种网络协议. 在后端开发中接触HTTP协议的比较多,目前大部分都是基于Servlet容器实现的Http服务,往往有一些核心子系统对性能的要求非常高,这个时候我们可以考虑采用NIO的网络模型来实现HTTP服务,以此提高性能和吞吐量,Netty除了开发网络应用非常方便,还内置了HTTP相关的编解码器,让用户可以很方便的开发出高性能的HTTP协议的服务,Spring Webflux默认是使用的N

  • 分布式Netty源码分析EventLoopGroup及介绍

    目录 EventLoopGroup介绍 功能1:先来看看注册Channel 功能2:执行一些Runnable任务 EventLoop介绍 NioEventLoop介绍 EpollEventLoop介绍 后续 EventLoopGroup介绍 在前面一篇文章中提到了,EventLoopGroup主要负责2个事情,这里再重复下: 它主要包含2个方面的功能,注册Channel和执行一些Runnable任务. 功能1:先来看看注册Channel 即将Channel注册到Selector上,由Select

  • 分布式Netty源码分析概览

    目录 服务器端demo EventLoopGroup介绍 功能1:先来看看注册Channel 功能2:执行一些Runnable任务 ChannelPipeline介绍 bind过程 sync介绍 误区 4 后续 服务器端demo 看下一个简单的Netty服务器端的例子 public static void main(String[] args){ EventLoopGroup bossGroup=new NioEventLoopGroup(1); EventLoopGroup workerGro

  • Netty源码分析NioEventLoop执行select操作入口

    分析完了selector的创建和优化的过程, 这一小节分析select相关操作 select操作的入口,NioEventLoop的run方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SEL

  • Netty源码分析NioEventLoop线程的启动

    目录 之前的小节我们学习了NioEventLoop的创建以及线程分配器的初始化, 那么NioEventLoop是如何开启的呢, 我们这一小节继续学习 NioEventLoop的开启方法在其父类SingleThreadEventExecutor中的execute(Runnable task)方法中, 我们跟到这个方法: @Override public void execute(Runnable task) { if (task == null) { throw new NullPointerEx

  • Netty源码分析NioEventLoop初始化线程选择器创建

    前文传送门:NioEventLoop创建 初始化线程选择器 回到上一小节的MultithreadEventExecutorGroup类的构造方法: protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { //代码省略 if (executor == null) { //创建一个新的线程

  • Netty源码分析NioEventLoop处理IO事件相关逻辑

    目录 NioEventLoop的run()方法: processSelectedKeys()方法 processSelectedKeysOptimized(selectedKeys.flip())方法 processSelectedKey(k, (AbstractNioChannel) a)方法 之前我们了解了执行select()操作的相关逻辑, 这一小节我们继续学习轮询到io事件的相关逻辑: NioEventLoop的run()方法: protected void run() { for (;

  • Netty分布式Server启动流程服务端初始化源码分析

    目录 第一节:服务端初始化 group方法 初始化成员变量 初始化客户端Handler 第一节:服务端初始化 首先看下在我们用户代码中netty的使用最简单的一个demo: //创建boss和worker线程(1) EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //创建ServerBootstrap(2) ServerBootst

  • Netty源码解析NioEventLoop创建的构造方法

    目录 前文传送门:Netty源码分析 NioEventLoop 回到上一小节的MultithreadEventExecutorGroup类的构造方法: protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { //代码省略 if (executor == null) { //创建一个新的

  • Netty分布式server启动流程Nio创建源码分析

    目录 NioServerSocketChannel创建 继承关系 绑定端口 端口封装成socket地址对象 跟进initAndRegister()方法 创建channel 父类的构造方法 将jdk的channel设置为非阻塞模式 前文传送门 Netty分布式Server启动流程服务端初始化源码分析 NioServerSocketChannel创建 我们如果熟悉Nio, 则对channel的概念则不会陌生, channel在相当于一个通道, 用于数据的传输 Netty将jdk的channel进行了

  • Netty分布式客户端接入流程初始化源码分析

    目录 前文概述: 第一节:初始化NioSockectChannelConfig 创建channel 跟到其父类DefaultChannelConfig的构造方法中 再回到AdaptiveRecvByteBufAllocator的构造方法中 继续跟到ChannelMetadata的构造方法中 回到DefaultChannelConfig的构造方法 前文概述: 之前的章节学习了server启动以及eventLoop相关的逻辑, eventLoop轮询到客户端接入事件之后是如何处理的?这一章我们循序渐

随机推荐