Netty分布式NioEventLoop任务队列执行源码分析

目录
  • 执行任务队列
    • 跟进runAllTasks方法:
    • 我们跟进fetchFromScheduledTaskQueue()方法
    • 回到runAllTasks(long timeoutNanos)方法中
  • 章节小结

前文传送门:NioEventLoop处理IO事件

执行任务队列

继续回到NioEventLoop的run()方法:

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    //轮询io事件(1)
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;
            //默认是50
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    runAllTasks();
                }
            } else {
                //记录下开始时间
                final long ioStartTime = System.nanoTime();
                try {
                    //处理轮询到的key(2)
                    processSelectedKeys();
                } finally {
                    //计算耗时
                    final long ioTime = System.nanoTime() - ioStartTime;
                    //执行task(3)
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        //代码省略
    }
}

我们看到处理完轮询到的key之后, 首先记录下耗时, 然后通过runAllTasks(ioTime * (100 - ioRatio) / ioRatio)执行taskQueue中的任务

我们知道ioRatio默认是50, 所以执行完ioTime * (100 - ioRatio) / ioRatio后, 方法传入的值为ioTime, 也就是processSelectedKeys()的执行时间:

跟进runAllTasks方法:

protected boolean runAllTasks(long timeoutNanos) {
    //定时任务队列中聚合任务
    fetchFromScheduledTaskQueue();
    //从普通taskQ里面拿一个任务
    Runnable task = pollTask();
    //task为空, 则直接返回
    if (task == null) {
        //跑完所有的任务执行收尾的操作
        afterRunningAllTasks();
        return false;
    }
    //如果队列不为空
    //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    //执行每一个任务
    for (;;) {
        safeExecute(task);
        //标记当前跑完的任务
        runTasks ++;
        //当跑完64个任务的时候, 会计算一下当前时间
        if ((runTasks & 0x3F) == 0) {
            //定时任务初始化到当前的时间
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            //如果超过截止时间则不执行(nanoTime()是耗时的)
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        //如果没有超过这个时间, 则继续从普通任务队列拿任务
        task = pollTask();
        //直到没有任务执行
        if (task == null) {
            //记录下最后执行时间
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    //收尾工作
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

首先会执行fetchFromScheduledTaskQueue()这个方法, 这个方法的意思是从定时任务队列中聚合任务, 也就是将定时任务中找到可以执行的任务添加到taskQueue中

我们跟进fetchFromScheduledTaskQueue()方法

private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    //从定时任务队列中抓取第一个定时任务
    //寻找截止时间为nanoTime的任务
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    //如果该定时任务队列不为空, 则塞到普通任务队列里面
    while (scheduledTask != null) {
        //如果添加到普通任务队列过程中失败
        if (!taskQueue.offer(scheduledTask)) {
            //则重新添加到定时任务队列中
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        //继续从定时任务队列中拉取任务
        //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中
        scheduledTask = pollScheduledTask(nanoTime);
    }
    return true;
}

long nanoTime = AbstractScheduledEventExecutor.nanoTime() 代表从定时任务初始化到现在过去了多长时间

Runnable scheduledTask= pollScheduledTask(nanoTime) 代表从定时任务队列中拿到小于nanoTime时间的任务, 因为小于初始化到现在的时间, 说明该任务需要执行了

跟到其父类AbstractScheduledEventExecutor的pollScheduledTask(nanoTime)方法中:

protected final Runnable pollScheduledTask(long nanoTime) {
    assert inEventLoop();
    //拿到定时任务队列
    Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
    //peek()方法拿到第一个任务
    ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
    if (scheduledTask == null) {
        return null;
    }
    if (scheduledTask.deadlineNanos() <= nanoTime) {
        //从队列中删除
        scheduledTaskQueue.remove();
        //返回该任务
        return scheduledTask;
    }
    return null;
}

我们看到首先获得当前类绑定的定时任务队列的成员变量

如果不为空, 则通过scheduledTaskQueue.peek()弹出第一个任务

如果当前任务小于传来的时间, 说明该任务需要执行, 则从定时任务队列中删除

我们继续回到fetchFromScheduledTaskQueue()方法中:

private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    //从定时任务队列中抓取第一个定时任务
    //寻找截止时间为nanoTime的任务
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    //如果该定时任务队列不为空, 则塞到普通任务队列里面
    while (scheduledTask != null) {
        //如果添加到普通任务队列过程中失败
        if (!taskQueue.offer(scheduledTask)) {
            //则重新添加到定时任务队列中
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        //继续从定时任务队列中拉取任务
        //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中
        scheduledTask = pollScheduledTask(nanoTime);
    }
    return true;
}

弹出需要执行的定时任务之后, 我们通过taskQueue.offer(scheduledTask)添加到taskQueue中, 如果添加失败, 则通过scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)重新添加到定时任务队列中

如果添加成功, 则通过pollScheduledTask(nanoTime)方法继续添加, 直到没有需要执行的任务

这样就将定时任务队列需要执行的任务添加到了taskQueue中

回到runAllTasks(long timeoutNanos)方法中

protected boolean runAllTasks(long timeoutNanos) {
    //定时任务队列中聚合任务
    fetchFromScheduledTaskQueue();
    //从普通taskQ里面拿一个任务
    Runnable task = pollTask();
    //task为空, 则直接返回
    if (task == null) {
        //跑完所有的任务执行收尾的操作
        afterRunningAllTasks();
        return false;
    }
    //如果队列不为空
    //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    //执行每一个任务
    for (;;) {
        safeExecute(task);
        //标记当前跑完的任务
        runTasks ++;
        //当跑完64个任务的时候, 会计算一下当前时间
        if ((runTasks & 0x3F) == 0) {
            //定时任务初始化到当前的时间
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            //如果超过截止时间则不执行(nanoTime()是耗时的)
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        //如果没有超过这个时间, 则继续从普通任务队列拿任务
        task = pollTask();
        //直到没有任务执行
        if (task == null) {
            //记录下最后执行时间
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    //收尾工作
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

首先通过 Runnable task = pollTask() 从taskQueue中拿一个任务

任务不为空, 则通过 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos 计算一个截止时间, 任务的执行时间不能超过这个时间

然后在for循环中通过safeExecute(task)执行task

我们跟到safeExecute(task)中:

protected static void safeExecute(Runnable task) {
    try {
        //直接调用run()方法执行
        task.run();
    } catch (Throwable t) {
        //发生异常不终止
        logger.warn("A task raised an exception. Task: {}", task, t);
    }
}

这里直接调用task的run()方法进行执行, 其中发生异常, 只打印一条日志, 代表发生异常不终止, 继续往下执行

回到runAllTasks(long timeoutNanos)方法:

protected boolean runAllTasks(long timeoutNanos) {
    //定时任务队列中聚合任务
    fetchFromScheduledTaskQueue();
    //从普通taskQ里面拿一个任务
    Runnable task = pollTask();
    //task为空, 则直接返回
    if (task == null) {
        //跑完所有的任务执行收尾的操作
        afterRunningAllTasks();
        return false;
    }
    //如果队列不为空
    //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    //执行每一个任务
    for (;;) {
        safeExecute(task);
        //标记当前跑完的任务
        runTasks ++;
        //当跑完64个任务的时候, 会计算一下当前时间
        if ((runTasks & 0x3F) == 0) {
            //定时任务初始化到当前的时间
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            //如果超过截止时间则不执行(nanoTime()是耗时的)
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        //如果没有超过这个时间, 则继续从普通任务队列拿任务
        task = pollTask();
        //直到没有任务执行
        if (task == null) {
            //记录下最后执行时间
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    //收尾工作
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

每次执行完task, runTasks自增

这里 if ((runTasks & 0x3F) == 0) 代表是否执行了64个任务, 如果执行了64个任务, 则会通过 lastExecutionTime = ScheduledFutureTask.nanoTime() 记录定时任务初始化到现在的时间, 如果这个时间超过了截止时间, 则退出循环

如果没有超过截止时间, 则通过 task = pollTask() 继续弹出任务执行

这里执行64个任务统计一次时间, 而不是每次执行任务都统计, 主要原因是因为获取系统时间是个比较耗时的操作, 这里是netty的一种优化方式

如果没有task需要执行, 则通过afterRunningAllTasks()做收尾工作, 最后记录下最后的执行时间

以上就是有关执行任务队列的相关逻辑

章节小结

本章学习了有关NioEventLoopGroup的创建, NioEventLoop的创建和启动, 以及多路复用器的轮询处理和task执行的相关逻辑, 通过本章学习, 我们应该掌握如下内容:

  • 1.  NioEventLoopGroup如何选择分配NioEventLoop
  • 2.  NioEventLoop如何开启
  • 3.  NioEventLoop如何进行select操作
  • 4.  NioEventLoop如何执行task

以上就是Netty分布式NioEventLoop任务队列执行源码分析的详细内容,更多关于Netty分布式NioEventLoop执行任务队列的资料请关注我们其它相关文章!

(0)

相关推荐

  • Netty源码分析NioEventLoop执行select操作入口

    分析完了selector的创建和优化的过程, 这一小节分析select相关操作 select操作的入口,NioEventLoop的run方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SEL

  • Netty源码分析NioEventLoop处理IO事件相关逻辑

    目录 NioEventLoop的run()方法: processSelectedKeys()方法 processSelectedKeysOptimized(selectedKeys.flip())方法 processSelectedKey(k, (AbstractNioChannel) a)方法 之前我们了解了执行select()操作的相关逻辑, 这一小节我们继续学习轮询到io事件的相关逻辑: NioEventLoop的run()方法: protected void run() { for (;

  • Netty事件循环主逻辑NioEventLoop的run方法分析

    目录 Netty事件循环主逻辑 初始化 EventLoop 处理读事件 注意 Netty事件循环主逻辑 Netty 事件循环主逻辑在 NioEventLoop.run 中的 processSelectedKeys函数中 protected void run() { //主循环不断读取IO事件和task,因为 EventLoop 也是 juc 的 ScheduledExecutorService 实现 for (;;) { try { switch (selectStrategy.calculat

  • Netty源码分析NioEventLoop初始化线程选择器创建

    前文传送门:NioEventLoop创建 初始化线程选择器 回到上一小节的MultithreadEventExecutorGroup类的构造方法: protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { //代码省略 if (executor == null) { //创建一个新的线程

  • Netty分布式NioEventLoop优化selector源码解析

    目录 优化selector selector的创建过程 代码剖析 这里一步创建了这个优化后的数据结构 最后返回优化后的selector 优化selector selector的创建过程 在剖析selector轮询之前, 我们先讲解一下selector的创建过程 回顾之前的小节, 在创建NioEventLoop中初始化了唯一绑定的selector: NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider

  • Netty源码分析NioEventLoop线程的启动

    目录 之前的小节我们学习了NioEventLoop的创建以及线程分配器的初始化, 那么NioEventLoop是如何开启的呢, 我们这一小节继续学习 NioEventLoop的开启方法在其父类SingleThreadEventExecutor中的execute(Runnable task)方法中, 我们跟到这个方法: @Override public void execute(Runnable task) { if (task == null) { throw new NullPointerEx

  • Netty分布式NioEventLoop任务队列执行源码分析

    目录 执行任务队列 跟进runAllTasks方法: 我们跟进fetchFromScheduledTaskQueue()方法 回到runAllTasks(long timeoutNanos)方法中 章节小结 前文传送门:NioEventLoop处理IO事件 执行任务队列 继续回到NioEventLoop的run()方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectN

  • Netty分布式pipeline传播inbound事件源码分析

    前一小结回顾:pipeline管道Handler删除 传播inbound事件 有关于inbound事件, 在概述中做过简单的介绍, 就是以自己为基准, 流向自己的事件, 比如最常见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程 在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到chann

  • Netty分布式行解码器逻辑源码解析

    目录 行解码器LineBasedFrameDecoder 首先看其参数 我们跟到重载的decode方法中 我们看findEndOfLine(buffer)方法 这一小节了解下行解码器LineBasedFrameDecoder, 行解码器的功能是一个字节流, 以\r\n或者直接以\n结尾进行解码, 也就是以换行符为分隔进行解析 同样, 这个解码器也继承了ByteToMessageDecoder 行解码器LineBasedFrameDecoder 首先看其参数 //数据包的最大长度, 超过该长度会进

  • Netty分布式ByteBuf缓冲区分配器源码解析

    目录 缓冲区分配器 以其中的分配ByteBuf的方法为例, 对其做简单的介绍 跟到directBuffer()方法中 我们回到缓冲区分配的方法 然后通过validate方法进行参数验证 缓冲区分配器 顾明思议就是分配缓冲区的工具, 在netty中, 缓冲区分配器的顶级抽象是接口ByteBufAllocator, 里面定义了有关缓冲区分配的相关api 抽象类AbstractByteBufAllocator实现了ByteBufAllocator接口, 并且实现了其大部分功能 和AbstractByt

  • ZooKeeper框架教程Curator分布式锁实现及源码分析

    目录 如何使用InterProcessMutex 实现思路 代码实现概述 InterProcessMutex源码分析 实现接口 属性 构造方法 方法 获得锁 释放锁 LockInternals源码分析 获取锁 释放锁 总结 ZooKeeper入门教程一简介与核心概念 ZooKeeper入门教程二在单机和集群环境下的安装搭建及使用 ZooKeeper入门教程三分布式锁实现及完整运行源码 上一篇文章中,我们使用zookeeper的java api实现了分布式排他锁. Curator中有着更为标准.规

  • Netty启动步骤绑定端口示例方法源码分析

    目录 绑定端口 我们继续跟第一小节的最初的doBind()方法 第二步, 获得channel 重点关注下doBind(localAddress)方法 最终会走到这一步, pipeline.fireChannelActive() 章节总结 前文传送门:Netty启动流程注册多路复用源码解析 绑定端口 上一小节我们学习了channel注册在selector的步骤, 仅仅做了注册但并没有监听事件, 事件是如何监听的呢? 我们继续跟第一小节的最初的doBind()方法 private ChannelFu

  • Netty启动流程注册多路复用源码解析

    目录 注册多路复用 注册channel的步骤 首先看下config()方法 回到initAndRegister()方法: 跟到MultithreadEventLoopGroup的register()方法: 回顾下第二小节channel初始化的步骤: 我们继续看看register()方法: 我们重点关注register0(promise), 跟进去: 我们重点关注doRegister()这个方法 前文传送门:Netty启动流程服务端channel初始化 注册多路复用 回到上一小节的代码: fina

  • Nett分布式分隔符解码器逻辑源码剖析

    目录 分隔符解码器 我们看其中的一个构造方法 我们跟到重载decode方法中 我们看初始化该属性的构造方法 章节总结 前文传送门:Netty分布式行解码器逻辑源码解析 分隔符解码器 基于分隔符解码器DelimiterBasedFrameDecoder, 是按照指定分隔符进行解码的解码器, 通过分隔符, 可以将二进制流拆分成完整的数据包 同样继承了ByteToMessageDecoder并重写了decode方法 我们看其中的一个构造方法 public DelimiterBasedFrameDeco

随机推荐