node.js中stream流中可读流和可写流的实现与使用方法实例分析

本文实例讲述了node.js中stream流中可读流和可写流的实现与使用方法。分享给大家供大家参考,具体如下:

node.js中的流 stream 是处理流式数据的抽象接口。node.js 提供了很多流对象,像http中的request和response,和 process.stdout 都是流的实例。

流可以是 可读的,可写的,或是可读可写的。所有流都是 events 的实例。

一、流的类型

node.js中有四种基本流类型:

1、Writable 可写流 (例:fs.createWriteStream() )

2、Readable 可读流 (例:fs.createReadStream() )

3、Duplex 可读又可写流 (例:net.Socket )

4、Transform 读写过程中可修改或转换数据的 Duplex 流 (例:zlib.createDeflate() )

二、流中的数据有两种模式

1、二进制模式,都是 string字符串  和 Buffer。

2、对象模式,流内部处理的是一系统普通对象。

三、可读流的两种模式

1、流动模式 ( flowing ) ,数据自动从系统底层读取,并通过事件,尽可能快地提供给应用程序。

2、暂停模式 ( paused ),必须显式的调用 read() 读取数据。

可读流 都开始于暂停模式,可以通过如下方法切换到流动模式:

1、添加 'data' 事件回调。

2、调用 resume()。

3、调用 pipe()。

可读流通过如下方法切换回暂停模式:

1、如果没有管道目标,调用 pause()。

2、如果有管道目标,移除所有管道目标,调用 unpipe() 移除多个管道目标。

四、创建可读流,并监听事件

const fs = require('fs');
//创建一个文件可读流
let rs = fs.createReadStream('./1.txt', {
  //文件系统标志
  flags: 'r',
  //数据编码,如果调置了该参数,则读取的数据会自动解析
  //如果没调置,则读取的数据会是 Buffer
  //也可以通过 rs.setEncoding() 进行设置
  encoding: 'utf8',
  //文件描述符,默认为null
  fd: null,
  //文件权限
  mode: 0o666,
  //文件读取的开始位置
  start: 0,
  //文件读取的结束位置(包括结束位置)
  end: Infinity,
  //读取缓冲区的大小,默认64K
  highWaterMark: 3
});
//文件被打开时触发
rs.on('open', function () {
  console.log('文件打开');
});
//监听data事件,会让当前流切换到流动模式
//当流中将数据传给消费者后触发
//由于我们在上面配置了 highWaterMark 为 3字节,所以下面会打印多次。
rs.on('data', function (data) {
  console.log(data);
});
//流中没有数据可供消费者时触发
rs.on('end', function () {
  console.log('数据读取完毕');
});
//读取数据出错时触发
rs.on('error', function () {
  console.log('读取错误');
});
//当文件被关闭时触发
rs.on('close', function () {
  console.log('文件关闭');
});

注意,'open' 和 'close' 事件并不是所有流都会触发。

当们监听'data'事件后,系统会尽可能快的读取出数据。但有时候,我们需要暂停一下流的读取,操作其他事情。

这时候就需要用到 pause() 和 resume() 方法。

const fs = require('fs');
//创建一个文件可读流
let rs = fs.createReadStream('./1.txt', {
  highWaterMark: 3
});
rs.on('data', function (data) {
  console.log(`读取了 ${data.length} 字节数据 : ${data.toString()}`);
  //使流动模式的流停止触发'data'事件,切换出流动模式,数据都会保留在内部缓存中。
  rs.pause();
  //等待3秒后,再恢复触发'data'事件,将流切换回流动模式。
  setTimeout(function () {
    rs.resume();
  }, 3000);
});

可读流的 'readable' 事件,当流中有数据可供读取时就触发。

注意当监听 'readable' 事件后,会导致流停止流动,需调用 read() 方法读取数据。

注意 on('data'),on('readable'),pipe() 不要混合使用,会导致不明确的行为。

const fs = require('fs');
let rs = fs.createReadStream('./1.txt', {
  highWaterMark: 1
});
//当流中有数据可供读取时就触发
rs.on('readable', function () {
  let data;
  //循环读取数据
  //参数表示要读取的字节数
  //如果可读的数据不足字节数,则返回缓冲区剩余数据
  //如是没有指定字节数,则返回缓冲区中所有数据
  while (data = rs.read()) {
    console.log(`读取到 ${data.length} 字节数据`);
    console.log(data.toString());
  }
});

五、创建可写流,并监听事件

const fs = require('fs');
//创建一个文件可写流
let ws = fs.createWriteStream('./1.txt', {
  highWaterMark: 3
});
//往流中写入数据
//参数一表示要写入的数据
//参数二表示编码方式
//参数三表示写入成功的回调
//缓冲区满时返回false,未满时返回true。
//由于上面我们设置的缓冲区大小为 3字节,所以到写入第3个时,就返回了false。
console.log(ws.write('1', 'utf8'));
console.log(ws.write('2', 'utf8'));
console.log(ws.write('3', 'utf8'));
console.log(ws.write('4', 'utf8'));
function writeData() {
  let cnt = 9;
  return function () {
    let flag = true;
    while (cnt && flag) {
      flag = ws.write(`${cnt}`);
      console.log('缓冲区中写入的字节数', ws.writableLength);
      cnt--;
    }
  };
}
let wd = writeData();
wd();
//当缓冲区中的数据满的时候,应停止写入数据,
//一旦缓冲区中的数据写入文件了,并清空了,则会触发 'drain' 事件,告诉生产者可以继续写数据了。
ws.on('drain', function () {
  console.log('可以继续写数据了');
  console.log('缓冲区中写入的字节数', ws.writableLength);
  wd();
});
//当流或底层资源关闭时触发
ws.on('close', function () {
  console.log('文件被关闭');
});
//当写入数据出错时触发
ws.on('error', function () {
  console.log('写入数据错误');
});

写入流的 end() 方法 和 'finish' 事件监听

const fs = require('fs');
//创建一个文件可写流
let ws = fs.createWriteStream('./1.txt', {
  highWaterMark: 3
});
//往流中写入数据
//参数一表示要写入的数据
//参数二表示编码方式
//参数三表示写入成功的回调
//缓冲区满时返回false,未满时返回true。
//由于上面我们设置的缓冲区大小为 3字节,所以到写入第3个时,就返回了false。
console.log(ws.write('1', 'utf8'));
console.log(ws.write('2', 'utf8'));
console.log(ws.write('3', 'utf8'));
console.log(ws.write('4', 'utf8'));
//调用end()表明已经没有数据要被写入,在关闭流之前再写一块数据。
//如果传入了回调函数,则将作为 'finish' 事件的回调函数
ws.end('最后一点数据', 'utf8');
//调用 end() 且缓冲区数据都已传给底层系统时触发
ws.on('finish', function () {
  console.log('写入完成');
});

写入流的 cork() 和 uncork() 方法,主要是为了解决大量小块数据写入时,内部缓冲可能失效,导致的性能下降。

const fs = require('fs');
let ws = fs.createWriteStream('./1.txt', {
  highWaterMark: 1
});
//调用 cork() 后,会强制把所有写入的数据缓冲到内存中。
//不会因为写入的数据超过了 highWaterMark 的设置而写入到文件中。
ws.cork();
ws.write('1');
console.log(ws.writableLength);
ws.write('2');
console.log(ws.writableLength);
ws.write('3');
console.log(ws.writableLength);
//将调用 cork() 后的缓冲数据都输出到目标,也就是写入文件中。
ws.uncork();

注意 cork() 的调用次数要与 uncork() 一致。

const fs = require('fs');
let ws = fs.createWriteStream('./1.txt', {
  highWaterMark: 1
});
//调用一次 cork() 就应该写一次 uncork(),两者要一一对应。
ws.cork();
ws.write('4');
ws.write('5');
ws.cork();
ws.write('6');
process.nextTick(function () {
  //注意这里只调用了一次 uncork()
  ws.uncork();
  //只有调用同样次数的 uncork() 数据才会被输出。
  ws.uncork();
});

六、可读流的 pipe() 方法

pipe() 方法类似下面的代码,在可读流与可写流之前架起一座桥梁。

const fs = require('fs');
//创建一个可读流
let rs = fs.createReadStream('./1.txt', {
  highWaterMark: 3
});
//创建一个可写流
let ws = fs.createWriteStream('./2.txt', {
  highWaterMark: 3
});
rs.on('data', function (data) {
  let flag = ws.write(data);
  console.log(`往可写流中写入 ${data.length} 字节数据`);
  //如果写入缓冲区已满,则暂停可读流的读取
  if (!flag) {
    rs.pause();
    console.log('暂停可读流');
  }
});
//监控可读流数据是否读完
rs.on('end', function () {
  console.log('数据已读完');
  //如果可读流读完了,则调用 end() 表示可写流已写入完成
  ws.end();
});
//如果可写流缓冲区已清空,可以再次写入,则重新打开可读流
ws.on('drain', function () {
  rs.resume();
  console.log('重新开启可读流');
});

我们用 pipe() 方法完成上面的功能。

const fs = require('fs');
//创建一个可读流
let rs = fs.createReadStream('./1.txt', {
  highWaterMark: 3
});
//创建一个可写流
let ws = fs.createWriteStream('./2.txt', {
  highWaterMark: 3
});
let ws2 = fs.createWriteStream('./3.txt', {
  highWaterMark: 3
});
//绑定可写流到可读流,自动将可读流切换到流动模式,将可读流的所有数据推送到可写流。
rs.pipe(ws);
//可以绑定多个可写流
rs.pipe(ws2);

我们也可以用 unpipe() 手动的解绑可写流。

const fs = require('fs');
//创建一个可读流
let rs = fs.createReadStream('./1.txt', {
  highWaterMark: 3
});
//创建一个可写流
let ws = fs.createWriteStream('./2.txt', {
  highWaterMark: 3
});
let ws2 = fs.createWriteStream('./3.txt', {
  highWaterMark: 3
});
rs.pipe(ws);
rs.pipe(ws2);
//解绑可写流,如果参数没写,则解绑所有管道
setTimeout(function () {
  rs.unpipe(ws2);
}, 0);

希望本文所述对大家node.js程序设计有所帮助。

(0)

相关推荐

  • 说说node中的可读流和可写流的区别

    前言 nodejs中大量的api与流有关,曾经看到公司的一些大神的node代码,实现一个接口只需要pipe一下另一个java接口就可以了.简单的一行代码实在让人困惑.作为小白的自己一脸懵逼却又不敢问,因为根本不知道从何问起.现在终于通过学习,也能对流说出个123,希望和大家共同交流. 流简介 流分为缓冲模式和对象模式,缓冲模式只能处理buffer或字符串,对象模式可以处理js对象.流又分为四种类型:可读流.可写流.双工流和转换流.后两种其实是对可读和可写流的应用.所以我想先聊聊可读流和可写流.

  • Nodejs Stream 数据流使用手册

    1.介绍 本文介绍了使用 node.js streams 开发程序的基本方法. <code class="hljs mizar">"We should have some ways of connecting programs like garden hose--screw in another segment when it becomes necessary to massage data in another way. This is the way of

  • node.js使用stream模块实现自定义流示例

    本文实例讲述了node.js使用stream模块实现自定义流.分享给大家供大家参考,具体如下: 有些时候我们需要自定义一些流,来操作特殊对象,node.js中为我们提供了一些基本流类. 我们新创建的流类需要继承四个基本流类之一(stream.Writeable,stream.Readable,stream.Duplex,stream.Transform),并确保调用了父类构造函数. 一.实现自定义的可读流 实现可读流需继承 stream.Readable,并实现 readable._read()

  • 快速了解Node中的Stream流是什么

    Stream Buffer 的工作原理 Data 是一块大数据 他被分为很多个小数据 每块小数据都被存储在内存中的 Buffer 中 接着 Buffer 不断接收小数据 同时一旦 Buffer 接收的小数据填满了就会被消费 填满的 Buffer 也被称为一个 Chunk 所有 Chunk 组合而成的才是那块 Data 大数据 Stream 的分类 Read Stream Write Stream Duplex Transform Duplex 实际上就是有两个 Buffer 一个处理 ReadS

  • Node.js中的流(Stream)介绍

    什么是流? 说到流,就涉及到一个*nix的概念:管道--在*nix中,流在Shell中被实现为可以通过 |(管道符) 进行桥接的数据,一个进程的输出(stdout)可被直接作为下一个进程的输入(stdin). 在Node中,流(Stream)的概念与之类似,代表一种数据流可供桥接的能力. pipe 流化的精髓在于 .pipe()方法.可供桥接的能力,在于数据流的两端(上游/下游 或称为 读/写流)以一个 .pipe()方法进行桥接. 伪代码的表现形式为: 复制代码 代码如下: //上游.pipe

  • Node.js中你不可不精的Stream(流)

    一.什么是Stream(流) 流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface). stream 模块提供了基础的API.使用这些API可以很容易地来构建实现流接口的对象.例如, HTTP 请求 和 process.stdout 就都是流的实例. 流可以是可读的.可写的,或是可读写的.注意,所有的流都是 EventEmitter 的实例. 二.流的类型 Node.js 中有四种基本的流类型: Readable - 可读的流 (例如 fs.cre

  • Node.js中流(stream)的使用方法示例

    前言 本文主要给大家介绍了关于Node.js 流(stream)的使用方法,分享出来供大家参考学习,下面话不多说,来一起看看详细的介绍: 流是基于事件的API,用于管理和处理数据,而且有不错的效率.借助事件和非阻塞I/O库,流模块允许在其可用的时候动态处理,在其不需要的时候释放掉. 使用流的好处 举一个读取文件的例子: 使用fs.readFileSync同步读取一个文件,程序会被阻塞,所有的数据都会被读取到内存中. 换用fs.readFile读取文件,程序不会被阻塞,但是所有的数据依旧会被一次性

  • 深入nodejs中流(stream)的理解

    nodejs的fs模块并没有提供一个copy的方法,但我们可以很容易的实现一个,比如: var source = fs.readFileSync('/path/to/source', {encoding: 'utf8'}); fs.writeFileSync('/path/to/dest', source); 这种方式是把文件内容全部读入内存,然后再写入文件,对于小型的文本文件,这没有多大问题,比如grunt-file-copy就是这样实现的.但是对于体积较大的二进制文件,比如音频.视频文件,动

  • 详解nodeJs文件系统(fs)与流(stream)

    一.简介 本文将介绍node.js文件系统(fs)和流(stream)的一些API已经参数使用情况. 二.目录 文件系统将介绍以下方法: 1.fs.readFile 2.fs.writeFile 3.fs.open 4.fs.read 5.fs.stat 6.fs.close 7.fs.mkdir 8.fs.rmdir 9.fs.readdir 10.fs.unlink stream流的四种类型readable,writable,duplex,transform以及stream对象的事件. 三.

  • 浅谈手写node可读流之流动模式

    node的可读流基于事件 可读流之流动模式,这种流动模式会有一个"开关",每次当"开关"开启的时候,流动模式起作用,如果将这个"开关"设置成暂停的话,那么,这个可读流将不会去读取文件,直到将这个"开关"重新置为流动. 读取文件流程 读取文件内容的流程,主要为: 打开文件,打开文件成功,将触发open事件,如果打开失败,触发error事件和close事件,将文件关闭. 开始读取文件中的内容,监听data事件,数据处于流动状态,可

随机推荐