Node.js中你不可不精的Stream(流)

一、什么是Stream(流)

流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。 stream 模块提供了基础的API。使用这些API可以很容易地来构建实现流接口的对象。例如, HTTP 请求 和 process.stdout 就都是流的实例。
流可以是可读的、可写的,或是可读写的。注意,所有的流都是 EventEmitter 的实例。

二、流的类型

Node.js 中有四种基本的流类型:

  • Readable - 可读的流 (例如 fs.createReadStream())。
  • Writable - 可写的流 (例如 fs.createWriteStream())。
  • Duplex - 可读写的流(双工流) (例如 net.Socket)。
  • Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate())。
var Stream = require('stream') //stream 模块引入方式

var Readable = Stream.Readable //可读的流
var Writable = Stream.Writable //可写的流
var Duplex = Stream.Duplex //可读写的流
var Transform = Stream.Transform //在读写过程中可以修改和变换数据的 Duplex 流

Node.js中关于流的操作被封装到了Stream模块中,这个模块也被多个核心模块所引用。例如在fs.createReadStream()和fs.createWriteStream()的源码实现里,都调用了Stream模块提供的抽象接口来实现对流数据的操作。

三、为什么使用Stream?

我们通过两个例子,了解一下为什么要使用Stream。

Exp1:

下面是一个读取文件内容的例子:

const fs = require('fs')

fs.readFile(file, function (err, content) { //读出来的content是Buffer
 console.log(content)
 console.log(content.toString())
})

但如果文件内容较大,譬如在500M时,执行上述代码的输出为:

<Buffer 64 74 09 75 61 09 63 6f 75 6e 74 0a 0a 64 74 09 75 61 09 63 6f 75 6e 74 0a 32 30 31 35 31 32 30 38 09 4d 6f 7a 69 6c 6c 61 2f 35 2e 30 20 28 63 6f 6d ... >
buffer.js:382
 throw new Error('toString failed');
 ^

Error: toString failed
 at Buffer.toString (buffer.js:382:11)

报错的原因是content这个Buffer对象的长度过大,导致toString方法失败。

可见,这种一次获取全部内容的做法,不适合操作大文件。

可以考虑使用流来读取文件内容。

var fs = require('fs')
fs.createReadStream(bigFile).pipe(process.stdout) 

fs.createReadStream创建一个可读流,连接了源头(上游,文件)和消耗方(下游,标准输出)。

执行上面代码时,流会逐次调用fs.read(ReadStream这个类的源码里有一个_read方法,这个_read方法在内部调用了fs.read来实现对文件的读取),将文件中的内容分批取出传给下游。

在文件看来,它的内容被分块地连续取走了。

在下游看来,它收到的是一个先后到达的数据序列。

如果不需要一次操作全部内容,它可以处理完一个数据便丢掉。

在流看来,任一时刻它都只存储了文件中的一部分数据,只是内容在变化而已。

这种情况就像是用水管去取池子中的水。

每当用掉一点水,水管便会从池子中再取出一点。

无论水池有多大,都只存储了与水管容积等量的水。

Exp2:

下面是一个在线看视频的例子,假定我们通过HTTP请求返回视频内容给用户

const http = require('http');
const fs = require('fs');

http.createServer((req, res) => {
 fs.readFile(videoPath, (err, data) => {
 res.end(data);
});
}).listen(8080);

但这样有两个明显的问题

  • 视频文件需要全部读取完,才能返回给用户,这样等待时间会很长。
  • 视频文件一次全放入内存中,内存吃不消。

用流可以将视频文件一点一点读到内存中,再一点一点返回给用户,读一部分,写一部分。(利用了 HTTP 协议的 Transfer-Encoding: chunked 分段传输特性),用户体验得到优化,同时对内存的开销明显下降。

const http = require('http');
const fs = require('fs');

http.createServer((req, res) => {
 fs.createReadStream(videoPath).pipe(res);
}).listen(8080);

通过上述两个例子,我们知道,在大数据情况下必须使用流式处理。

四、可读流(Readable Stream)

可读流(Readable streams)是对提供数据的源头(source)的抽象。

常见的可读流:

  • HTTP responses, on the client
  • HTTP requests, on the server
  • fs read streams
  • TCP sockets //sockets是一个双工流,即可读可写的流
  • process.stdin //标准输入

所有的 Readable Stream 都实现了 stream.Readable 类定义的接口。

可读流的两种模式(flowing 和 paused)

  • 在 flowing 模式下,可读流自动从系统底层读取数据,并通过 EventEmitter 接口的事件尽快将数据提供给应用(所有的流都是 EventEmitter 的实例)。
  • 在 paused 模式下,必须显式调用 stream.read()方法来从流中读取数据片段。

创建流的Readable流,默认是非流动模式(paused模式),默认不会读取数据。所有初始工作模式为paused的Readable流,可以通过下面三种途径切换为flowing模式:

  • 监听'data'事件
  • 调用stream.resume()方法
  • 调用stream.pipe()方法将数据发送到Writable

fs.createReadStream(path[, options])源码实现

//文件名 ReadStream.js
let fs = require('fs');//读取文件
let EventEmitter = require('events');
class ReadStream extends EventEmitter {//流操作都是基于事件的
 constructor(path, options = {}) {
 super();
 //需要的参数
 this.path = path;//读取文件的路径
 this.highWaterMark = options.highWaterMark || 64 * 1024;//缓冲区大小,默认64KB
 this.autoClose = options.autoClose || true;//是否需要自动关闭文件描述符,默认为true
 this.start = options.start || 0; //options 可以包括 start 和 end 值,使其可以从文件读取一定范围的字节而不是整个文件
 this.pos = this.start; // 从文件的那个位置开始读取内容,pos会随着读取的位置而改变
 this.end = options.end || null; // null表示没传递
 this.encoding = options.encoding || null;
 this.flags = options.flags || 'r';//以何种方式操作文件

 // 参数的问题
 this.flowing = null; // 默认为非流动模式
 // 建一个buffer存放读出来的数据
 this.buffer = Buffer.alloc(this.highWaterMark);
 this.open();
 // {newListener:[fn]}
 // 次方法默认同步调用的
 this.on('newListener', (type) => { // 等待着 它监听data事件
  if (type === 'data') {//当监听到data事件时,把流设置为流动模式
  this.flowing = true;
  this.read();// 开始读取 客户已经监听了data事件
  }
 })
 }
 pause(){//将流从flowing模式切换为paused模式
 this.flowing = false;
 }
 resume(){//将流从paused模式切换为flowing模式
 this.flowing =true;
 this.read();//将流从paused模式切换为flowing模式后,继续读取文件内容
 }
 read(){ // 默认第一次调用read方法时还没有获取fd,文件的打开是异步的,所以不能直接读
 if(typeof this.fd !== 'number'){ //如果fd不是number类型,证明文件还没有打开,此时需要监听一次open事件,因为文件一打开,就会触发open事件,这个在this.open()里写了
  return this.once('open',() => this.read()); // 等待着触发open事件后fd肯定拿到了,拿到以后再去执行read方法
 }
 // 当获取到fd时 开始读取文件了
 // 第一次应该读2个 第二次应该读2个
 // 第二次pos的值是4 end是4
 // 读取文件里一共4有个数为123 4,我们读取里面的123 4
 let howMuchToRead = this.end?Math.min(this.end-this.pos+1,this.highWaterMark): this.highWaterMark;//规定每次读取多少个字节
 fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (error, byteRead) => { // byteRead为真实的读到了几个字节的内容
  // 读取完毕
  this.pos += byteRead; // 读出来两个,pos位置就往后移两位
  // this.buffer默认就是三个
  let b = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead);//对读出来的内容进行编码
  this.emit('data', b);//触发data事件,将读到的内容输出给用户
  if ((byteRead === this.highWaterMark)&&this.flowing){
  return this.read(); // 继续读
  }
  // 这里就是没有更多的逻辑了
  if (byteRead < this.highWaterMark){
  // 没有更多了
  this.emit('end'); // 读取完毕
  this.destroy(); // 销毁即可
  }
 });
 }
 // 打开文件用的
 destroy() {
 if (typeof this.fd != 'number') { return this.emit('close'); } //如果文件还没打开,直接触发close事件
 fs.close(this.fd, () => {
  // 如果文件打开过了 那就关闭文件并且触发close事件
  this.emit('close');
 });
 }
 open() {
 fs.open(this.path, this.flags, (err, fd) => { //fd是文件描述符,它标识的就是当前this.path这个文件,从3开始(number类型)
  if (err) {
  if (this.autoClose) { // 如果需要自动关闭我再去销毁fd
   this.destroy(); // 销毁(关闭文件,触发关闭事件)
  }
  this.emit('error', err); // 如果有错误触发error事件
  return;
  }
  this.fd = fd; // 保存文件描述符
  this.emit('open', this.fd); // 文件被打开了,触发文件被打开的方法
 });
 }
 pipe(dest){//管道流的实现 pipe()方法是ReadStream下的方法,它里面的参数是WritableStream
 this.on('data',(data)=>{
  let flag = dest.write(data);
  if(!flag){//这个flag就是每次调用ws.write()后返回的读状态值
  this.pause();// 已经不能继续写了,等他写完了再恢复
  }
 });
 dest.on('drain',()=>{//当读取缓存区清空后
  console.log('写一下停一下')
  this.resume();//继续往dest写入数据
 });
 }
}
module.exports = ReadStream;//导出可读流

使用fs.createReadStream()

// 流:有序的有方向的,可以自己控制速率
// 读:读是将内容读取到内存中
// 写:写是将内存或者文件的内容写入到文件内
// 读取的时候默认读 默认一次读取64k,encoding 读取出来的内容默认都是buffer
//let fs = require('fs');
//let rs = fs.createReadStream({...});//原生实现可读流
let ReadStream = require('./ReadStream');
let rs = new ReadStream('./2.txt', {
 highWaterMark: 3, // 字节
 flags:'r',//读文件
 autoClose:true, // 默认读取完毕后自动关闭文件描述符
 start:0,
 //end:3,// 流是闭合区间 包start也包end
 encoding:'utf8'
});
// 默认创建一个流 是非流动模式(上述源码中有写的),默认不会读取数据
// 如果我们需要接收数据,那我们要监听data事件,这样数据会自动的流出来
rs.on('error',function (err) {// 通常,这会在底层系统内部出错从而不能产生数据,或当流的实现试图传递错误数据时发生。
 console.log(err)
});
rs.on('open',function () {//文件被打开了,获取到了fd。内部会自动的触发这个事件 rs.emit('data');
 console.log('文件打开了');
});
rs.on('data',function (data) {//有数据流出来了
 console.log(data);
 rs.pause(); // 暂停触发on('data')事件,将流动模式又转化成了非流动模式
});
setTimeout(()=>{rs.resume()},3000);//三秒钟之后再将非流动模式转化为流动模式
rs.on('end',function () {// 读取完毕
 console.log('读取完毕了');
});
rs.on('close',function () {//close 事件将在流或其底层资源(比如一个文件)关闭后触发。close 事件触发后,该流将不会再触发任何事件。
 //console.log('关闭')
});

四、可写流(Writable Stream)

可写流是对数据流向设备的抽象,用来消费上游流过来的数据,通过可写流程序可以把数据写入设备,常见的是本地磁盘文件或者 TCP、HTTP 等网络响应。

常见的可写流:

  • HTTP requests, on the client
  • HTTP responses, on the server
  • fs write streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • child process stdin
  • process.stdout, process.stderr

所有 Writable 流都实现了 stream.Writable 类定义的接口。

可写流的使用

调用可写流实例的 write() 方法就可以把数据写入可写流

const fs = require('fs');
const rs = fs.createReadStream(sourcePath);
const ws = fs.createWriteStream(destPath);

rs.setEncoding('utf-8'); // 设置编码格式
rs.on('data', chunk => {
ws.write(chunk); // 写入数据
});

监听了可读流的data事件就会使可读流进入流动模式,我们在回调事件里调用了可写流的 write() 方法,这样数据就被写入了可写流抽象的设备destPath中。

write() 方法有三个参数

  • chunk {String| Buffer},表示要写入的数据
  • encoding 当写入的数据是字符串的时候可以设置编码
  • callback 数据被写入之后的回调函数

drain事件

如果调用 stream.write(chunk)方法返回false,表示当前缓存区已满,流将在适当的时机(缓存区清空后)触发drain事件。

const fs = require('fs');
const rs = fs.createReadStream(sourcePath);
const ws = fs.createWriteStream(destPath);

rs.setEncoding('utf-8'); // 设置编码格式
rs.on('data', chunk => {
let flag = ws.write(chunk); // 写入数据
if (!flag) { // 如果缓存区已满暂停读取
rs.pause();
}
});

ws.on('drain', () => {
rs.resume(); // 缓存区已清空 继续读取写入
});

fs.createWriteStream(path[, options])源码实现

// 文件 WriteStream.js
let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter {
 constructor(path, options = {}) {
 super();
 this.path = path;
 this.flags = options.flags || 'w';
 this.encoding = options.encoding || 'utf8';
 this.start = options.start || 0;
 this.pos = this.start;
 this.mode = options.mode || 0o666;
 this.autoClose = options.autoClose || true;
 this.highWaterMark = options.highWaterMark || 16 * 1024;
 this.open(); // fd 异步的 //触发一个open事件,当触发open事件后fd肯定就存在了

 // 写文件的时候 需要的参数有哪些
 // 第一次写入是真的往文件里写
 this.writing = false; // 默认第一次就不是正在写入
 // 用简单的数组来模拟一下缓存
 this.cache = [];
 // 维护一个变量,表示缓存的长度
 this.len = 0;
 // 是否触发drain事件
 this.needDrain = false;
 }
 clearBuffer() {
 let buffer = this.cache.shift();
 if (buffer) { // 如果缓存里有
  this._write(buffer.chunk, buffer.encoding, () => this.clearBuffer());
 } else {// 如果缓存里没有了
  if (this.needDrain) { // 需要触发drain事件
  this.writing = false; // 告诉下次直接写就可以了 不需要写到内存中了
  this.needDrain = false;
  this.emit('drain');
  }
 }
 }
 _write(chunk, encoding, clearBuffer) { // 因为write方法是同步调用的此时fd还没有获取到,所以等待获取到再执行write操作
 if (typeof this.fd != 'number') {
  return this.once('open', () => this._write(chunk, encoding, clearBuffer));
 }
 fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, byteWritten) => {
  this.pos += byteWritten;
  this.len -= byteWritten; // 每次写入后就要在内存中减少一下
  clearBuffer(); // 第一次就写完了
 })
 }
 write(chunk, encoding = this.encoding) { // 客户调用的是write方法去写入内容
 // 要判断 chunk必须是buffer或者字符串 为了统一,如果传递的是字符串也要转成buffer
 chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
 this.len += chunk.length; // 维护缓存的长度 3
 let ret = this.len < this.highWaterMark;
 if (!ret) {
  this.needDrain = true; // 表示需要触发drain事件
 }
 if (this.writing) { // 表示正在写入,应该放到内存中
  this.cache.push({
  chunk,
  encoding,
  });
 } else { // 第一次
  this.writing = true;
  this._write(chunk, encoding, () => this.clearBuffer()); // 专门实现写的方法
 }
 return ret; // 能不能继续写了,false表示下次的写的时候就要占用更多内存了
 }
 destroy() {
 if (typeof this.fd != 'number') {
  this.emit('close');
 } else {
  fs.close(this.fd, () => {
  this.emit('close');
  });
 }
 }
 open() {
 fs.open(this.path, this.flags, this.mode, (err, fd) => {
  if (err) {
  this.emit('error', err);
  if (this.autoClose) {
   this.destroy(); // 如果自动关闭就销毁文件描述符
  }
  return;
  }
  this.fd = fd;
  this.emit('open', this.fd);
 });
 }
}
module.exports = WriteStream;

使用fs.createWriteStream()

// 可写流有缓存区的概念
// 1.第一次写入是真的向文件里写,第二次在写入的时候是放到了缓存区里
// 2.写入时会返回一个boolean类型,返回为false时表示缓存区满了,不要再写入了
// 3.当内存和正在写入的内容消耗完后,会触发一个drain事件
//let fs = require('fs');
//let rs = fs.createWriteStream({...});//原生实现可写流
let WS = require('./WriteStream')
let ws = new WS('./2.txt', {
 flags: 'w', // 写入文件,默认文件不存在会创建
 highWaterMark: 1, // 设置当前缓存区的大小
 encoding: 'utf8', // 文件里存放的都是二进制
 start: 0,
 autoClose: true, // 自动关闭文件描述符
 mode: 0o666, // 可读可写
});
// drain的触发时机,只有当highWaterMark填满时,才可能触发drain
// 当嘴里的和地下的都吃完了,就会触发drain方法
let i = 9;
function write() {
 let flag = true;
 while (flag && i >= 0) {
 i--;
 flag = ws.write('111'); // 987 // 654 // 321 // 0
 console.log(flag)
 }
}
write();
ws.on('drain', function () {
 console.log('dry');
 write();
});

总结

stream(流)分为可读流(flowing mode和paused mode)、可写流、可读写流,Node.js 提供了多种流对象。 例如, HTTP 请求 和 process.stdout 就都是流的实例。stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实现流接口的对象。它们底层都调用了stream模块并进行封装。

好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

(0)

相关推荐

  • 详解Nodejs 通过 fs.createWriteStream 保存文件

    工作中难免会遇到处理大文件的时候,有这种stream的处理方式,就不需要一次处理太大的文件,从而导致内存不够用,或者内存占用太多. fs.createWriteStream 似乎不会自己创建不存在的文件夹,所以在使用之前需要注意,保存文件的文件夹一定要提前创建. const path = '/xxxxxx/ddd/'; if (!fs.existsSync(path)) { fs.mkdirSync(path); } 创建完文件夹,我们就可以进行文件添加操作了.我们希望在使用文件添加操作的时候是

  • 深入nodejs中流(stream)的理解

    nodejs的fs模块并没有提供一个copy的方法,但我们可以很容易的实现一个,比如: var source = fs.readFileSync('/path/to/source', {encoding: 'utf8'}); fs.writeFileSync('/path/to/dest', source); 这种方式是把文件内容全部读入内存,然后再写入文件,对于小型的文本文件,这没有多大问题,比如grunt-file-copy就是这样实现的.但是对于体积较大的二进制文件,比如音频.视频文件,动

  • Node.js Streams文件读写操作详解

    Node.js 天生异步和事件驱动,非常适合处理 I/O 相关的任务.如果你在处理应用中 I/O 相关的操作,你可以利用 Node.js 中的流(stream).因此,我们先具体看看流,理解一下它们是怎么简化 I/O 操作的吧. 流是什么 流是 unix 管道,让你可以很容易地从数据源读取数据,然后流向另一个目的地. 简单来说,流不是什么特别的东西,它只是一个实现了一些方法的 EventEmitter .根据它实现的方法,流可以变成可读流(Readable),可写流(Writable),或者双向

  • node.js中的fs.createWriteStream方法使用说明

    方法说明: 返回一个WriteStream(输出流)对象(可写流). 语法: 复制代码 代码如下: fs.createWriteStream(path, [options]) 由于该方法属于fs模块,使用前需要引入fs模块(var fs= require("fs") ) 接收参数: path    文件路径 option (object) 参数包含以下属性: 复制代码 代码如下: { flags: 'w',     encoding: null,     mode: 0666 } op

  • node.js中的fs.createReadStream方法使用说明

    方法说明: 返回一个readStream(文件读取流,输入流)对象.(可读流) 语法: 复制代码 代码如下: fs.createReadStream(path, [options]) 由于该方法属于fs模块,使用前需要引入fs模块(var fs= require("fs") ) 接收参数: path: (string) 欲读取的文件路径 options : (object) 数组对象包含以下属性 复制代码 代码如下: { flags: 'r',   encoding: null,  

  • 理解nodejs的stream和pipe机制的原理和实现

    前言 前几天别人请教我关于pipe的问题,我发现我虽然用了nodejs很久,但是由于每次用的不多所以经常回避stream的使用,导致一直不熟,现在重新学习整理一下相关知识. 通过nodeschool学习stream nodeschool有一个stream-adventure教程教导stream的使用,很简单 简单stream进行pipe 首先,我们可以通过管道将输入定位到输出,输入输出可以是控制台或者文件流或者http请求,比如 process.stdin.pipe(process.stdout

  • 浅析Node.js的Stream模块中的Readable对象

    我一直都很不愿意扯 nodejs 的流,因为从第一次看到它我就觉得它的设计实在是太恶心了.但是没办法,Stream 规范尚未普及,而且确实有很多东西都依赖了 nodejs 的流来实现的,所以我也只能捏着鼻子硬着头皮来扯一扯这又臭又硬的 nodejs 流对象了. nodejs 自带了一个叫 stream 的模块,引入它便可以得到一组流对象构造器.现在我只说最简单的 stream.Readable. 其实用过 nodejs 的几乎都接触过 Readable 的实例,只是平时没太在意而已.一个非常典型

  • Nodejs学习笔记之Stream模块

    一,开篇分析 流是一个抽象接口,被 Node 中的很多对象所实现.比如对一个 HTTP 服务器的请求是一个流,stdout 也是一个流.流是可读,可写或兼具两者的. 最早接触Stream是从早期的unix开始的, 数十年的实践证明Stream 思想可以很简单的开发出一些庞大的系统. 在unix里,Stream是通过 "|" 实现的.在node中,作为内置的stream模块,很多核心模块和三方模块都使用到. 和unix一样,node stream主要的操作也是.pipe(),使用者可以使

  • Node.js中的流(Stream)介绍

    什么是流? 说到流,就涉及到一个*nix的概念:管道--在*nix中,流在Shell中被实现为可以通过 |(管道符) 进行桥接的数据,一个进程的输出(stdout)可被直接作为下一个进程的输入(stdin). 在Node中,流(Stream)的概念与之类似,代表一种数据流可供桥接的能力. pipe 流化的精髓在于 .pipe()方法.可供桥接的能力,在于数据流的两端(上游/下游 或称为 读/写流)以一个 .pipe()方法进行桥接. 伪代码的表现形式为: 复制代码 代码如下: //上游.pipe

  • Nodejs Stream 数据流使用手册

    1.介绍 本文介绍了使用 node.js streams 开发程序的基本方法. <code class="hljs mizar">"We should have some ways of connecting programs like garden hose--screw in another segment when it becomes necessary to massage data in another way. This is the way of

随机推荐