Node.js pipe实现源码解析

从前面两篇文章,我们了解到。想要把 Readable 的数据写到 Writable,就必须先手动的将数据读入内存,然后写入 Writable。换句话说,每次传递数据时,都需要写如下的模板代码

readable.on('readable', (err) => {
 if(err) throw err

 writable.write(readable.read())
})

为了方便使用,Node.js 提供了 pipe() 方法,让我们可以优雅的传递数据

readable.pipe(writable)

现在,就让我们来看看它是如何实现的吧

pipe

首先需要先调用 Readable 的 pipe() 方法

// lib/_stream_readable.js

Readable.prototype.pipe = function(dest, pipeOpts) {
 var src = this;
 var state = this._readableState;

 // 记录 Writable
 switch (state.pipesCount) {
  case 0:
   state.pipes = dest;
   break;
  case 1:
   state.pipes = [state.pipes, dest];
   break;
  default:
   state.pipes.push(dest);
   break;
 }
 state.pipesCount += 1;

 // ...

  src.once('end', endFn);

 dest.on('unpipe', onunpipe);

 // ...

 dest.on('drain', ondrain);

 // ...

 src.on('data', ondata);

 // ...

 // 保证 error 事件触发时,onerror 首先被执行
 prependListener(dest, 'error', onerror);

 // ...

 dest.once('close', onclose);

 // ...

 dest.once('finish', onfinish);

 // ...

 // 触发 Writable 的 pipe 事件
 dest.emit('pipe', src);

 // 将 Readable 改为 flow 模式
 if (!state.flowing) {
  debug('pipe resume');
  src.resume();
 }

 return dest;
};

执行 pipe() 函数时,首先将 Writable 记录到 state.pipes 中,然后绑定相关事件,最后如果 Readable 不是 flow 模式,就调用 resume() 将 Readable 改为 flow 模式

传递数据

Readable 从数据源获取到数据后,触发 data 事件,执行 ondata()

ondata() 相关代码:

// lib/_stream_readable.js

 // 防止在 dest.write(chunk) 内调用 src.push(chunk) 造成 awaitDrain 重复增加,awaitDrain 不能清零,Readable 卡住的情况
 // 详情见 https://github.com/nodejs/node/issues/7278
 var increasedAwaitDrain = false;
 function ondata(chunk) {
  debug('ondata');
  increasedAwaitDrain = false;
  var ret = dest.write(chunk);
  if (false === ret && !increasedAwaitDrain) {
   // 防止在 dest.write() 内调用 src.unpipe(dest),导致 awaitDrain 不能清零,Readable 卡住的情况
   if (((state.pipesCount === 1 && state.pipes === dest) ||
      (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)
     ) &&
     !cleanedUp) {
    debug('false write response, pause', src._readableState.awaitDrain);
    src._readableState.awaitDrain++;
    increasedAwaitDrain = true;
   }
   // 进入 pause 模式
   src.pause();
  }
 }

在 ondata(chunk) 函数内,通过 dest.write(chunk) 将数据写入 Writable

此时,在 _write() 内部可能会调用 src.push(chunk) 或使其 unpipe,这会导致 awaitDrain 多次增加,不能清零,Readable 卡住

当不能再向 Writable 写入数据时,Readable 会进入 pause 模式,直到所有的 drain 事件触发

触发 drain 事件,执行 ondrain()

// lib/_stream_readable.js

 var ondrain = pipeOnDrain(src);

 function pipeOnDrain(src) {
  return function() {
   var state = src._readableState;
   debug('pipeOnDrain', state.awaitDrain);
   if (state.awaitDrain)
    state.awaitDrain--;
   // awaitDrain === 0,且有 data 监听器
   if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
    state.flowing = true;
    flow(src);
   }
  };
 }

每个 drain 事件触发时,都会减少 awaitDrain,直到 awaitDrain 为 0。此时,调用 flow(src),使 Readable 进入 flow 模式

到这里,整个数据传递循环已经建立,数据会顺着循环源源不断的流入 Writable,直到所有数据写入完成

unpipe

不管写入过程中是否出现错误,最后都会执行 unpipe()

// lib/_stream_readable.js

// ...

 function unpipe() {
  debug('unpipe');
  src.unpipe(dest);
 }

// ...

Readable.prototype.unpipe = function(dest) {
 var state = this._readableState;
 var unpipeInfo = { hasUnpiped: false };

 // 啥也没有
 if (state.pipesCount === 0)
  return this;

 // 只有一个
 if (state.pipesCount === 1) {
  if (dest && dest !== state.pipes)
   return this;
  // 没有指定就 unpipe 所有
  if (!dest)
   dest = state.pipes;

  state.pipes = null;
  state.pipesCount = 0;
  state.flowing = false;
  if (dest)
   dest.emit('unpipe', this, unpipeInfo);
  return this;
 }

 // 没有指定就 unpipe 所有
 if (!dest) {
  var dests = state.pipes;
  var len = state.pipesCount;
  state.pipes = null;
  state.pipesCount = 0;
  state.flowing = false;

  for (var i = 0; i < len; i++)
   dests[i].emit('unpipe', this, unpipeInfo);
  return this;
 }

 // 找到指定 Writable,并 unpipe
 var index = state.pipes.indexOf(dest);
 if (index === -1)
  return this;

 state.pipes.splice(index, 1);
 state.pipesCount -= 1;
 if (state.pipesCount === 1)
  state.pipes = state.pipes[0];

 dest.emit('unpipe', this, unpipeInfo);

 return this;
};

Readable.prototype.unpipe() 函数会根据 state.pipes 属性和 dest 参数,选择执行策略。最后会触发 dest 的 unpipe 事件

unpipe 事件触发后,调用 onunpipe(),清理相关数据

// lib/_stream_readable.js

 function onunpipe(readable, unpipeInfo) {
  debug('onunpipe');
  if (readable === src) {
   if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
    unpipeInfo.hasUnpiped = true;
    // 清理相关数据
    cleanup();
   }
  }
 }

End

在整个 pipe 的过程中,Readable 是主动方 ( 负责整个 pipe 过程:包括数据传递、unpipe 与异常处理 ),Writable 是被动方 ( 只需要触发 drain 事件 )

总结一下 pipe 的过程:

  • 首先执行 readbable.pipe(writable),将 readable 与 writable 对接上
  • 当 readable 中有数据时,readable.emit('data'),将数据写入 writable
  • 如果 writable.write(chunk) 返回 false,则进入 pause 模式,等待 drain 事件触发
  • drain 事件全部触发后,再次进入 flow 模式,写入数据
  • 不管数据写入完成或发生中断,最后都会调用 unpipe()
  • unpipe() 调用 Readable.prototype.unpipe(),触发 dest 的 unpipe 事件,清理相关数据

参考:

https://github.com/nodejs/node/blob/master/lib/_stream_readable.js

https://github.com/nodejs/node/blob/master/lib/_stream_writable.js

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • node.js实现BigPipe详解

    BigPipe 是 Facebook 开发的优化网页加载速度的技术.网上几乎没有用 node.js 实现的文章,实际上,不止于 node.js,BigPipe 用其他语言的实现在网上都很少见.以至于这技术出现很久以后,我还以为就是整个网页的框架先发送完毕后,用另一个或几个 ajax 请求再请求页面内的模块.直到不久前,我才了解到原来 BigPipe 的核心概念就是只用一个 HTTP 请求,只是页面元素不按顺序发送而已. 了解了这个核心概念就好办了,得益于 node.js 的异步特性,很容易就可以

  • 理解nodejs的stream和pipe机制的原理和实现

    前言 前几天别人请教我关于pipe的问题,我发现我虽然用了nodejs很久,但是由于每次用的不多所以经常回避stream的使用,导致一直不熟,现在重新学习整理一下相关知识. 通过nodeschool学习stream nodeschool有一个stream-adventure教程教导stream的使用,很简单 简单stream进行pipe 首先,我们可以通过管道将输入定位到输出,输入输出可以是控制台或者文件流或者http请求,比如 process.stdin.pipe(process.stdout

  • nodejs实现bigpipe异步加载页面方案

    Bigpipe介绍 Facebook首创的一种减少HTTP请求的,首屏快速加载的的异步加载页面方案.是前端性能优化的一个方向. BigPipe与AJAX的比较 AJAX主要是XMLHttpRequest,前端异步的向服务器请求,获取动态数据添加到网页上.这样的往返请求需要耗费时间,而BigPipe技术并不需要发送XMLHttpRequest请求,这样就节省时间损耗.减少请求带来的另一个好处就是直接减少服务器负载.还有一个不同点就是AJAX请求前服务器在等待.请求后页面在等待.BIGPIPE可以前

  • Node.js pipe实现源码解析

    从前面两篇文章,我们了解到.想要把 Readable 的数据写到 Writable,就必须先手动的将数据读入内存,然后写入 Writable.换句话说,每次传递数据时,都需要写如下的模板代码 readable.on('readable', (err) => { if(err) throw err writable.write(readable.read()) }) 为了方便使用,Node.js 提供了 pipe() 方法,让我们可以优雅的传递数据 readable.pipe(writable)

  • solid.js响应式createSignal 源码解析

    目录 正文 createSignal readSignal writeSignal 案例分析 总结 正文 www.solidjs.com/docs/latest… createSignal 用来创建响应式数据,它可以跟踪单个值的变化. solid.js 的响应式实现参考了 S.js,它是一个体积超小的 reactive 库,支持自动收集依赖和简单的响应式编程. createSignal createSignal 首先我们来看下 createSignal 的声明: // packages/soli

  • next.js getServerSideProps源码解析

    目录 SSR 处理 动态加载处理 总结 SSR 处理 老规矩,昨天写了关于 getServerSideProps 的内容,今天趁热写一下 getServerSideProps 相应的源码,看看 next.js getServerSideProps 是怎么实现的,还有什么从文档无法知晓的细节. 我们先从 SSR 时相关的 getServerSideProps 处理看起,源码排查步骤上一步已经有所介绍,本篇不再多说,在 SSR 时,next.js 会调用 doRender 来进行渲染,其中会再次调用

  • vue 源码解析之虚拟Dom-render

    vue 源码解析 --虚拟Dom-render instance/index.js function Vue (options) { if (process.env.NODE_ENV !== 'production' && !(this instanceof Vue) ) { warn('Vue is a constructor and should be called with the `new` keyword') } this._init(options) } renderMixin

  • Node.js 中如何收集和解析命令行参数

    前言 在开发 CLI(Command Line Interface)工具的业务场景下,离不开命令行参数的收集和解析. 接下来,本文介绍如何收集和解析命令行参数. 收集命令行参数 在 Node.js 中,可以通过 process.argv 属性收集进程被启动时传入的命令行参数: // ./example/demo.js process.argv.slice(2); // 命令行执行如下命令 node ./example/demo.js --name=xiaoming --age=20 man //

  • vue3 keepalive源码解析解决线上问题

    目录 引言 1.keepalive功能 2.keepalive使用场景 3.在项目中的使用过程 4.vue3 keepalive源码调试 5.vue3 keealive源码粗浅分析 6.总结 引言 1.通过本文可以了解到vue3 keepalive功能 2.通过本文可以了解到vue3 keepalive使用场景 3.通过本文可以学习到vue3 keepalive真实的使用过程 4.通过本文可以学习vue3 keepalive源码调试 5.通过本文可以学习到vue3 keepalive源码的精简分

  • 微前端架构ModuleFederationPlugin源码解析

    目录 序言 背景 MF 基本介绍 应用场景 微前端架构 服务化的 library 和 components ModuleFederationPlugin 源码解析 入口源码 Exposes Remotes Shared 小结 总结 序言 本文是 Webpack ModuleFederationPlugin(后面简称 MF) 源码解析 文章中的第一篇,在此系列文章中,我将带领大家抽丝剥茧.一步步地去解析 MF 源码.当然为了帮助大家理解,可能中间也会涉及到 Webpack 源码中的其它实现,我会根

  • 编程式安装依赖install-pkg源码解析

    目录 正文 使用 源码分析 总结 正文 通常安装依赖都是通过命令式的方式来安装,有没有想过可以通过编程式的方式来安装依赖呢? install-pkg是一个用于安装依赖的工具,它可以在不同的环境下安装依赖,比如 npm.yarn.pnpm 等. 源码地址:github.com/antfu/insta… 使用 install-pkg的使用非常简单,根据README的说明,就通过下面的代码就可以安装依赖了: import { install } from 'install-pkg' await ins

  • vue3模块创建runtime-dom源码解析

    目录 前言 创建模块 nodeOptions patchProps patchProp patchClass patchStyle patchEvent patchAttr 总结 前言 runtime-dom 是针对浏览器的运行时,包括 DOM 操作.props(例如class.事件.样式以及其它attributes)的更新等内容:本小节我们开启 runtime-dom 的篇章. 创建模块 在 packages/runtime-dom/ 目录下创建目录文件: │ │ └─ src │ │ ├─

  • jq源码解析之绑在$,jQuery上面的方法(实例讲解)

    1.当我们用$符号直接调用的方法.在jQuery内部是如何封装的呢?有没有好奇心? // jQuery.extend 的方法 是绑定在 $ 上面的. jQuery.extend( { //expando 用于决定当前页面的唯一性. /\D/ 非数字.其实就是去掉小数点. expando: "jQuery" + ( version + Math.random() ).replace( /\D/g, "" ), // Assume jQuery is ready wit

随机推荐