Netty分布式NioEventLoop任务队列执行源码分析
目录
- 执行任务队列
- 跟进runAllTasks方法:
- 我们跟进fetchFromScheduledTaskQueue()方法
- 回到runAllTasks(long timeoutNanos)方法中
- 章节小结
前文传送门:NioEventLoop处理IO事件
执行任务队列
继续回到NioEventLoop的run()方法:
protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: //轮询io事件(1) select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; //默认是50 final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { //记录下开始时间 final long ioStartTime = System.nanoTime(); try { //处理轮询到的key(2) processSelectedKeys(); } finally { //计算耗时 final long ioTime = System.nanoTime() - ioStartTime; //执行task(3) runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } //代码省略 } }
我们看到处理完轮询到的key之后, 首先记录下耗时, 然后通过runAllTasks(ioTime * (100 - ioRatio) / ioRatio)执行taskQueue中的任务
我们知道ioRatio默认是50, 所以执行完ioTime * (100 - ioRatio) / ioRatio后, 方法传入的值为ioTime, 也就是processSelectedKeys()的执行时间:
跟进runAllTasks方法:
protected boolean runAllTasks(long timeoutNanos) { //定时任务队列中聚合任务 fetchFromScheduledTaskQueue(); //从普通taskQ里面拿一个任务 Runnable task = pollTask(); //task为空, 则直接返回 if (task == null) { //跑完所有的任务执行收尾的操作 afterRunningAllTasks(); return false; } //如果队列不为空 //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间) final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //执行每一个任务 for (;;) { safeExecute(task); //标记当前跑完的任务 runTasks ++; //当跑完64个任务的时候, 会计算一下当前时间 if ((runTasks & 0x3F) == 0) { //定时任务初始化到当前的时间 lastExecutionTime = ScheduledFutureTask.nanoTime(); //如果超过截止时间则不执行(nanoTime()是耗时的) if (lastExecutionTime >= deadline) { break; } } //如果没有超过这个时间, 则继续从普通任务队列拿任务 task = pollTask(); //直到没有任务执行 if (task == null) { //记录下最后执行时间 lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工作 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
首先会执行fetchFromScheduledTaskQueue()这个方法, 这个方法的意思是从定时任务队列中聚合任务, 也就是将定时任务中找到可以执行的任务添加到taskQueue中
我们跟进fetchFromScheduledTaskQueue()方法
private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); //从定时任务队列中抓取第一个定时任务 //寻找截止时间为nanoTime的任务 Runnable scheduledTask = pollScheduledTask(nanoTime); //如果该定时任务队列不为空, 则塞到普通任务队列里面 while (scheduledTask != null) { //如果添加到普通任务队列过程中失败 if (!taskQueue.offer(scheduledTask)) { //则重新添加到定时任务队列中 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } //继续从定时任务队列中拉取任务 //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中 scheduledTask = pollScheduledTask(nanoTime); } return true; }
long nanoTime = AbstractScheduledEventExecutor.nanoTime() 代表从定时任务初始化到现在过去了多长时间
Runnable scheduledTask= pollScheduledTask(nanoTime) 代表从定时任务队列中拿到小于nanoTime时间的任务, 因为小于初始化到现在的时间, 说明该任务需要执行了
跟到其父类AbstractScheduledEventExecutor的pollScheduledTask(nanoTime)方法中:
protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); //拿到定时任务队列 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; //peek()方法拿到第一个任务 ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } if (scheduledTask.deadlineNanos() <= nanoTime) { //从队列中删除 scheduledTaskQueue.remove(); //返回该任务 return scheduledTask; } return null; }
我们看到首先获得当前类绑定的定时任务队列的成员变量
如果不为空, 则通过scheduledTaskQueue.peek()弹出第一个任务
如果当前任务小于传来的时间, 说明该任务需要执行, 则从定时任务队列中删除
我们继续回到fetchFromScheduledTaskQueue()方法中:
private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); //从定时任务队列中抓取第一个定时任务 //寻找截止时间为nanoTime的任务 Runnable scheduledTask = pollScheduledTask(nanoTime); //如果该定时任务队列不为空, 则塞到普通任务队列里面 while (scheduledTask != null) { //如果添加到普通任务队列过程中失败 if (!taskQueue.offer(scheduledTask)) { //则重新添加到定时任务队列中 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } //继续从定时任务队列中拉取任务 //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中 scheduledTask = pollScheduledTask(nanoTime); } return true; }
弹出需要执行的定时任务之后, 我们通过taskQueue.offer(scheduledTask)添加到taskQueue中, 如果添加失败, 则通过scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)重新添加到定时任务队列中
如果添加成功, 则通过pollScheduledTask(nanoTime)方法继续添加, 直到没有需要执行的任务
这样就将定时任务队列需要执行的任务添加到了taskQueue中
回到runAllTasks(long timeoutNanos)方法中
protected boolean runAllTasks(long timeoutNanos) { //定时任务队列中聚合任务 fetchFromScheduledTaskQueue(); //从普通taskQ里面拿一个任务 Runnable task = pollTask(); //task为空, 则直接返回 if (task == null) { //跑完所有的任务执行收尾的操作 afterRunningAllTasks(); return false; } //如果队列不为空 //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间) final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //执行每一个任务 for (;;) { safeExecute(task); //标记当前跑完的任务 runTasks ++; //当跑完64个任务的时候, 会计算一下当前时间 if ((runTasks & 0x3F) == 0) { //定时任务初始化到当前的时间 lastExecutionTime = ScheduledFutureTask.nanoTime(); //如果超过截止时间则不执行(nanoTime()是耗时的) if (lastExecutionTime >= deadline) { break; } } //如果没有超过这个时间, 则继续从普通任务队列拿任务 task = pollTask(); //直到没有任务执行 if (task == null) { //记录下最后执行时间 lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工作 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
首先通过 Runnable task = pollTask() 从taskQueue中拿一个任务
任务不为空, 则通过 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos 计算一个截止时间, 任务的执行时间不能超过这个时间
然后在for循环中通过safeExecute(task)执行task
我们跟到safeExecute(task)中:
protected static void safeExecute(Runnable task) { try { //直接调用run()方法执行 task.run(); } catch (Throwable t) { //发生异常不终止 logger.warn("A task raised an exception. Task: {}", task, t); } }
这里直接调用task的run()方法进行执行, 其中发生异常, 只打印一条日志, 代表发生异常不终止, 继续往下执行
回到runAllTasks(long timeoutNanos)方法:
protected boolean runAllTasks(long timeoutNanos) { //定时任务队列中聚合任务 fetchFromScheduledTaskQueue(); //从普通taskQ里面拿一个任务 Runnable task = pollTask(); //task为空, 则直接返回 if (task == null) { //跑完所有的任务执行收尾的操作 afterRunningAllTasks(); return false; } //如果队列不为空 //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间) final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //执行每一个任务 for (;;) { safeExecute(task); //标记当前跑完的任务 runTasks ++; //当跑完64个任务的时候, 会计算一下当前时间 if ((runTasks & 0x3F) == 0) { //定时任务初始化到当前的时间 lastExecutionTime = ScheduledFutureTask.nanoTime(); //如果超过截止时间则不执行(nanoTime()是耗时的) if (lastExecutionTime >= deadline) { break; } } //如果没有超过这个时间, 则继续从普通任务队列拿任务 task = pollTask(); //直到没有任务执行 if (task == null) { //记录下最后执行时间 lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工作 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
每次执行完task, runTasks自增
这里 if ((runTasks & 0x3F) == 0) 代表是否执行了64个任务, 如果执行了64个任务, 则会通过 lastExecutionTime = ScheduledFutureTask.nanoTime() 记录定时任务初始化到现在的时间, 如果这个时间超过了截止时间, 则退出循环
如果没有超过截止时间, 则通过 task = pollTask() 继续弹出任务执行
这里执行64个任务统计一次时间, 而不是每次执行任务都统计, 主要原因是因为获取系统时间是个比较耗时的操作, 这里是netty的一种优化方式
如果没有task需要执行, 则通过afterRunningAllTasks()做收尾工作, 最后记录下最后的执行时间
以上就是有关执行任务队列的相关逻辑
章节小结
本章学习了有关NioEventLoopGroup的创建, NioEventLoop的创建和启动, 以及多路复用器的轮询处理和task执行的相关逻辑, 通过本章学习, 我们应该掌握如下内容:
- 1. NioEventLoopGroup如何选择分配NioEventLoop
- 2. NioEventLoop如何开启
- 3. NioEventLoop如何进行select操作
- 4. NioEventLoop如何执行task
以上就是Netty分布式NioEventLoop任务队列执行源码分析的详细内容,更多关于Netty分布式NioEventLoop执行任务队列的资料请关注我们其它相关文章!