Node.js Stream ondata触发时机与顺序的探索

上次写Stream pipe细节时,在源码中发现一段无用逻辑,由此引发了对Stream data事件触发时机与顺序的探索。

无用逻辑

当时研究pipe细节是基于Node.js v8.11.1的源码,其中针对上游的ondata事件处理有如下一段代码:

// If the user pushes more data while we're writing to dest then we'll end up
// in ondata again. However, we only want to increase awaitDrain once because
// dest will only emit one 'drain' event for the multiple writes.
// => Introduce a guard on increasing awaitDrain.
var increasedAwaitDrain = false;
src.on('data', ondata);
function ondata(chunk) {
  debug('ondata');
  increasedAwaitDrain = false;
  var ret = dest.write(chunk);
  if (false === ret && !increasedAwaitDrain) {
    if (((state.pipesCount === 1 && state.pipes === dest) ||
        (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)) &&
      !cleanedUp) {
      debug('false write response, pause', src._readableState.awaitDrain);
      src._readableState.awaitDrain++;
      increasedAwaitDrain = true;
    }
    src.pause();
  }
}

重点关注increasedAwaitDrain变量,理解这个变量期望达到什么目的,然后仔细阅读代码,会发现if (false === ret && !increasedAwaitDrain)语句中increasedAwaitDrain变量肯定是false,因为前一行才将该变量赋值为false,这样一来这个变量就变得毫无意义。

increasedAwaitDrain = false;
var ret = dest.write(chunk);
if (false === ret && !increasedAwaitDrain) {}

以上就是关键的三行代码,因为Node.js是单线程且dest.write(chunk)内部没有修改变量increasedAwaitDrain的值,那么if语句中increasedAwaitDrain的值肯定还是false,即increasedAwaitDrain相关逻辑没有达到所期望的目标。

无用代码出现的原因

前段虽已经分析出increasedAwaitDrain没起到作用,但作者为什么写了这样一段逻辑呢?其实在定义increasedAwaitDrain语句的上方,作者说可能存在这样一种情况:“当我们接收到一次上游的ondata事件并尝试将数据写到下游时,上游可能同时又有一个data事件触发,而这两个ondata的数据在写入下游时可能都返回false,从而导致src._readableState.awaitDrain++执行两次”。

awaitDrain++执行两次是作者不希望看到的情况,因为下游触发drain事件时awaitDrain相应减1,直到其值为0时才让上游重新流动,如果awaitDrain++执行两次,下游却只触发一次drain事件,awaitDrain就不会为0,上游不重新流动也就无法继续读取数据。

真相的探索过程

虽然从理性上认为increasedAwaitDrain没起到作用,但也无法肯定加绝对,自己尝试去求助,没有出现高手指点出问题所在,但一个同事听我描述后,说可能这就是个BUG,虽心中觉得可能性不大,但还是抱着试试看的心态切换到master分支上去瞅瞅,随即发现最新的代码里并没有与increasedAwaitDrain类似的逻辑,间接说明v8.11.1分支上increasedAwaitDrain相关逻辑的确无用。

虽然比较肯定这里存在一段无用代码,但应该如何理解作者在increasedAwaitDrain上方的注释呢?为了进一步揭露真相,自己继续花时间去看了看stream.Readable相关代码,想知道data事件的触发时机与顺序是如何决定的。

readable流的简单原理

在进一步解释data事件的触发顺序前,简单讲一下readable流的实现原理,如果需要自己实现一个readable流,可以使用new stream.Readable(options)方法,其中options可包含四个属性:highWaterMark、encoding、objectMode、read。最主要的是read属性,当流的使用者需要数据时,read方法被用来从数据源获取数据,然后通过this.push(chunk)将数据传递给使用者,如果没有更多数据可供读取时使用this.push(null)表示读取结束。

const Readable = require('stream').Readable;
let letter = 'ABCDEFG'.split('');
let index = 0;
const rs = new Readable({
  read(size) {
    this.push(letter[index++] || null);
  }
});
rs.on('data', chunk => {
  console.log(chunk.toString());
});
// 输出
// A
// B
// C
// ...

这里ondata虽然没有明显调用read方法,但内部依旧是通过调用read方法结合this.push输出数据,并且在源代码内部可以发现通过参数传递的read方法实际上被赋值给this._read,然后在Readable.prototype.read中调用this._read获取数据。

灵魂代码

为了进一步说明stream.Readable的data事件触发顺序与场景,将有关官方源码经过修改和删减成如下:

function Readable(options) {
  this._read = options.read; // 将参数传递的read函数赋值到this._read
}
// 使用者通过调用read方法获取数据
Readable.prototype.read = function (size) {
  var state = this._readableState;
  // 模拟锁,一次_read如果没有返回(this.push),后续read不会继续调用_read读取数据
  if (!state.reading) {
    state.reading = true;
    state.sync = true; // sync用于在push方法中指示_read内部是否同步调用了push
    this._read(size);
    state.sync = false;
  }
  // _read内部如果是同步调用push,数据会放入缓冲区
  // _read内部如果是异步调用push且缓冲区没有内容,数据可能emit data返回
  // 尝试从缓冲区(state.buffer)中获取大小为size的数据,如果获取成功则触发data事件
  if (ret)
    this.emit('data', ret);
  return ret;
};
// 在this._read执行过程中通过this.push输出数据
Readable.prototype.push = function (chunk, encoding) {
  var state = this._readableState;
  // 本次_read获取到数据,打开锁
  state.reading = false;
  // 流动模式 & 缓冲区没有数据 & 非同步返回,则直接触发data事件
  if (state.flowing && state.length === 0 && !state.sync) {
    stream.emit('data', chunk);
    stream.read(0); // 触发下一次读取,_read异步push的话还是会到这里,类似flow中的保持流出于流动
  }
  else {
    // 将数据放入缓冲区
    state.length += chunk.length;
    state.buffer.push(chunk);
  }
};
// 暂停流动
Readable.prototype.pause = function() {
  if (this._readableState.flowing !== false) {
    this._readableState.flowing = false;
    this.emit('pause');
  }
  return this;
};
function flow(stream) {
  const state = stream._readableState;
  while (state.flowing && stream.read() !== null);
}

data事件的触发时机与顺序

时机

data的触发只有两处:

  • 流如果处于流动模式 & 缓冲区没有数据 & 异步调用push,此时数据不经过缓冲区,直接触发data事件
  • 不满足上述情况时,push的数据会被放入缓冲区,然后再尝试从缓冲区读取指定size的数据并触发data事件

顺序

关于data的触发顺序,实际是由emit顺序决定,为讨论原始问题:“increasedAwaitDrain相关逻辑为什么可以被删除?”,将代码简化:

let count = 0;
src.on('data', chunk => {
  let ret = dest.write(chunk);
  if (!ret) {
    count++;
    src.pause();
  }
});

当监听流的data事件时,流最终会通过resume并调用flow函数进入流动模式模式,即不断的调用read方法读取数据。接下来分析以下几种场景,当dest.write(chunk)返回false时++count会执行几次,注意结合前文的灵魂代码。

  • 场景一:每次_read同步push一次数据

当发生第一次读取,数据同步push到缓冲区,紧接着从缓冲区中读取数据并通过emit data的方式传递到ondata中,如果此时dest.write(chunk)返回false,count++将执行一次,接着由于调用了stream.pause(),while条件state.flowing为false导致stream.read不再被调用,在流重新流动前,count的值不会继续增加。

  • 场景二:每次_read异步push一次数据

当发生第一次读取,异步push的数据将直接通过emit data传递到ondata中,而read函数中的emit由于无法从缓冲区读取数据从而不会触发,同时read返回null导致while循环也相应停止,此种情况下异步push触发data事件后,紧接着的stream.read(0)会继续保持流的流动,当dest.write(chunk)返回false,count++执行一次并将流暂停,紧接着会继续调用一次read,但这次数据将被放入缓冲区且不触发data事件,count++依旧只执行一次。

场景二流暂停一次后再次流动时,数据消耗模式与之前会有所差异,会优先消耗缓冲区数据直至为空时回到之前的模式,但这同样不会导致count++执行多次。

  • 场景三:每次_read多次同步push数据

与场景一类似,只是每次_read会多次往缓冲区写入数据,最终data事件还是依靠从缓冲区读数据后触发。

  • 场景四:每次_read多次异步push数据

同场景二类似,假设在一次_read中有两次异步push,当第一个异步push执行时,data事件触发且其中的dest.write(chunk)返回false,导致count++同时流被暂停,等第二个异步push执行时,由于流已经暂停,数据将写入缓冲区而不是触发data事件,所以count++只执行一次。

  • 场景五:_read操作可能同步或异步push

不管是同步或者异步push,当一次ondata内部将流设置为暂停模式后,flow函数中while条件state.flowing为false将导致stream.read不再调用,异步的push的emit data判断条件同样不再满足,即目前阶段内部不会再有data事件触发直到外部再次间接或直接调用read方法。

以上五个场景是为了分析该问题而模拟的,实际只要能理解第五个场景就能明白所有。

小结

文章最终写出来的内容与我最开始的初衷所偏离,而且自己不知道如何评价这篇文章的好坏,但为了写这文章花了两天业余时间去深入理解stream.Readable却是非常有收获的一件事情,更坚定自己在写文章的路途上可以走的更远。

PS:猜测为什么有烂电影的存在,可能是因为导演长时间投入的创作会让他迷失在内部而无法发现问题,写文章也是,难以通过阅读去优化费心思写的文章。

PS:下图是美团博客的,也许我写了这么多却抵不上这张图,说明方式很重要。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对我们的支持。如果你想了解更多相关内容请查看下面相关链接

(0)

相关推荐

  • Node.js使用supervisor进行开发中调试的方法

    如果你有 PHP 开发经验,会习惯在修改 PHP 脚本直接刷新浏览器以查看结果,而你 在开发 Node.js 实现的 HTTP 应用时会发现,无论你修改了代码的哪一个部分,都必须终止Node.js然后重新运行. 这是因为 Node.js 只有在第一次引用到某一部分时才会去解析 本文件,以后都会直接访问内存,避免重复载入,而 PHP 则是重新读取并解析脚本(如果没有专门的优化配置). 在开发Node.js实现HTTP应用时会发现,无论你修改了代码的哪一部分,都必须终止Node.js再重新运行才会奏

  • Node.js + express实现上传大文件的方法分析【图片、文本文件】

    本文实例讲述了Node.js + express实现上传大文件的方法.分享给大家供大家参考,具体如下: 对于大文件的上传我们首先要引入一个叫做 multer 的库: npm install --save multer 关于这个库,大家可以查阅官方文档: 点击跳转 https://www.npmjs.com/package/multer 我们先将库引入我们的项目中: var multer = require('multer') var upload = multer({ dest: 'upload

  • Node.js Event Loop各阶段讲解

    Event Loop阶段描述图 timers timer阶段处理setTimeout于setInterval回调,开始处理的时机与poll阶段有关联. pending callbacks 该阶段执行某些系统操作的回调,比如TCP套接字在连接时收到ECONNREFUSED. 网上有一些将该阶段称为I/O callbacks的文章都是过时错误的,具体可以移步Node.js官方库下面的这个issue: #1118. idle, prepare 内部使用,忽略. poll poll是一个核心阶段,等新I

  • node.js实现微信开发之获取用户授权

    本篇主要讲述,如何在微信中打开自家页面后,弹窗请求用户授权,以便拿到用户的微信信息. 首先说一下,完成自定义分享信息的,从无到有的流程: 基础硬件服务: 需要一个公网可以访问的有效域名: 购买域名,并备案,我是在阿里云购买的,备案需要十几个工作日. 购买ip,然后设置上面的域名,解析到该ip,这个时间可以快到忽略. 拥有自己的服务器,来存放自己页面项目: 我还是在阿里云购买购买服务器,这个花费最大,几百元一年的使用权. 而且这个服务器,本质就是一台电脑,是电脑就有配置,我目前只是自己学习使用,配

  • 零基础之Node.js搭建API服务器的详解

    零基础之Node.js搭建API服务器 这篇文章写给那些Node.js零基础,但希望自己动手实现服务器API的前端开发者,尝试帮大家打开一扇门. HTTP服务器实现原理 HTTP服务器之所以能提供前端使用的API,其实现原理是服务器保持监听计算机的某个端口(通常是80),等待客户端请求,当请求到达并经过一系列处理后,服务器发送响应数据给到前端. 平时大家通过Ajax调用API,即是发起一次请求,经过服务器处理后,得到结果,然后再进行前端处理.如今使用高级编程语言,要实现服务器那部分功能已经变得非

  • Node.js + express基本用法教程

    本文实例讲述了Node.js + express基本用法.分享给大家供大家参考,具体如下: 这里来讲下 express 框架的使用,编译的环境是 VS Code ,这里我已经配饰了阿里的镜像,所有 npm 指令用 cnpm 代替 首先学会向 Node.js 种引入 express 非常建党只需两步,输入指令: cnpm init 然后就可以载入 express cnpm install express -save 到此为止 express 救成功导入了 这里介绍一个技巧: 输入: cnpm in

  • 使用Node.js实现一个多人游戏服务器引擎

    摘要 听说过文字冒险游戏吗? 如果你的年龄足够大的话(就像我一样),那么你可能听说过.甚至玩过"back in the day".在本文中,我将向你展示编写的整个过程.这不仅仅是一个文本冒险游戏,而是一个能让你和你的朋友们一起玩的,可以进行任何剧情的文本冒险游戏引擎. 没错,我们将通过在添加多人游戏功能来增加它的趣味性. 文字冒险是最早的 RPG 形式的游戏之一,回到还没有图形画面的时代,你只能通过阅读 CRT 显示器上黑色背景下的描述,并且依赖自己的想象力来推动游戏剧情的发展. 如果

  • Node.js之readline模块的使用详解

    什么是readline readline允许从可读流中以逐行的方式读取数据,比如process.stdin等. 在node.js命令行模式下默认引入了readline模块,但如果是使用node.js运行脚本的话,则需要自己通过require('readline')方式手动引入该模块. 怎么使用readline 创建实例 首先.创建一个接口实例,提供一个Object类型的参数.参数如下: input: 监听的可读流(必需) output: 写入readline的可写流(必需) completer:

  • 详解基于React.js和Node.js的SSR实现方案

    基础概念 SSR:即服务端渲染(Server Side Render) 传统的服务端渲染可以使用Java,php 等开发语言来实现,随着 Node.js 和相关前端领域技术的不断进步,前端同学也可以基于此完成独立的服务端渲染. 过程:浏览器发送请求 -> 服务器运行 react代码生成页面 -> 服务器返回页面 -> 浏览器下载HTML文档 -> 页面准备就绪 即:当前页面的内容是服务器生成好给到浏览器的. 对应CSR:即客户端渲染(Client Side Render) 过程:浏

  • Node.js动手撸一个静态资源服务器的方法

    简介 本文介绍了一个简单的静态资源服务器的实例项目,希望能给Node.js初学者带来帮助.项目涉及到http.fs.url.path.zlib.process.child_process等模块,涵盖大量常用api:还包括了基于http协议的缓存策略选取.gzip压缩优化等:最终我们会发布到npm上,做成一个可以全局安装.使用的小工具.麻雀虽小,五脏俱全,一想是不是还有点小激动?话不多说,放码过来. 文中源码地址在最后附录中. 可先行体验项目效果: 安装:npm i -g here11 任意文件夹

随机推荐