在 NodeJs 中,流随处可见,读/写文件流,HTTP请求/返回流,stdin/stdout流。理解并运用好流会让你的Node更具力量。

Stream

  • lib/_stream_readable.js
  • lib/_stream_writable.js
  • lib/_stream_tranform.js
  • lib/_stream_duplex.js

流主要有可读 Readable,可写流 Writable,双工可读可写流 Duplex, Transform 流就是继承 Duplex 的。
通过pipe管道,可读流可以pipe到一个或多个可写流。
看源码能发现里面涉及了一堆状态控制的代码,什么时候读,什么时候写,什么时候暂停读。
大部分情况下程序面对的问题。通常都可以抽象成一个输入/输出的问题,中间可能会包含转换。

实际问题怎么运用流呢。

读取大文件

小文件可以一次性读到内存,但如果一个 10G 的文件呢?ReadLine 模块很好用但是你知道背后怎么实现的吗。
试试怎么用 stream.Transform 来自己实现一个readLine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
const stream = require("stream");
const fs = require("fs");

function ReadLineStream(FILE_PATH) {
var transformer = new stream.Transform({ objectMode: true });
// 实现Transform主要的_transform方法
transformer._transform = function(chunk, encoding, done) {
// buffer 需要toString
var data = chunk.toString();

// 累加读取出来的数据
if (this._lastLineData) data = this._lastLineData + data;

// 按照换行符分割
var lines = data.split("\n");

// 取出数组末尾剩余不是一行的内容和下一次拼接
this._lastLineData = lines.splice(lines.length - 1, 1)[0];

// 通过this.push 对外输出剩余累计的行
lines.forEach(this.push.bind(this));
done();
};

transformer._flush = function(done) {
// 末尾处理
if (this._lastLineData) this.push(this._lastLineData);
this._lastLineData = null;
done();
};

// 创建文件读取流
var source = fs.createReadStream(FILE_PATH);
source.pipe(transformer);
return transformer;
}

var fileReader = ReadLineStream("xx.log");
// 创建一个可写流
var writable = Stream.Writable({
objectMode: true,
write: function(line, _, next) {
async () => {
// 逐行写入到mysql
var parsed = JSON.parse(line);
await mysql.insert("insert into log set ?", parsed);
process.nextTick(next);
};
return true;
}
});

fileReader.pipe(writable);

通过实现一个消费/可写流我们就可以来对大文件进行处理,比如说实现一个 word count 计数器, 从文件导入到数据库。

而不用担心需要一次性读取整个文件到内存里out of memory这种问题。

通过继承 strean 模块我们也不需要过多的去考虑什么时候该读,什么时候因为写的压力大(背压),该停止读,让整个读写流有序的运行。
你只需要专注于实现你自己的 write read transform
Spark, Strom 的实时计算流也是这样的,大任务分解成小任务,只需要专注于自己业务逻辑的 map,reduce

单机爬虫

再举个栗子, 它的输入可能是一堆 URL、输出是结构化的数据。需要写入到关系型数据库。

  • 可以把 URL 数据获取抽象成一个可读流,
  • 爬取过程,数据提取抽象成一个 transform 流
  • 写入数据库抽象成一个可写流,

只需要约定好每个过程输出的数据模型,就可以在每个过程实现各种目的不一样的流。

  • 如数据源,可以是读取文件,MYQL,分布式列队
  • 抓取转换流,可以是普通的 HTTP爬虫,Puppeteer 可渲染性爬虫
  • 数据存储流,可以写文件,MYQL,或者HDFS

read > transform > write

这样程序看起来是不是特别简洁?

URL 读取流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
const Stream = require('stream');
const fs = require('fs');
const Sequelize = require('sequelize');
const Op = Sequelize.Op

const sequelize = new Sequelize('database', 'username', 'password', {
host: 'localhost',
dialect: 'mysql',
pool: {
max: 5,
min: 0,
acquire: 30000,
idle: 10000
},
});

// mysql表模型
const Url = sequelize.define('urls', {
id: {
type: Sequelize.INTEGER,
primaryKey: true,
autoIncrement: true,
},
url: Sequelize.STRING,
});


class SpiderMysqlSourceReadStream extends Stream.Readable {

constructor(opts){
super();
this.opts = opts || {};
this.filePath = opts.file || '';
this.fileStream = null;
this.cursor = 0;
this.connection = null;
Stream.Readable.call(this, {
objectMode: true,
highWaterMark: opts.highWaterMark || 1000
});
}

// 覆盖并实现read方法
_read(){

// 查大于当前id的10条
var urls = await Model.findAll({
attributes: ['url', 'id'],
where: {
id: {
[Op.gt]: this.cursor
}
},
limit: 10
});

urls.forEach((row) => {
this.cursor = row.id;
this.push(row);
});

}
}

把 URL 的变成结构化数据的转换流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
const cheerio = require('cheerio')

class SpiderFetch extends Stream.Transform {
_tranform(row, encoding, done){
var url = row.url;
(async () => {
var parsed = null;
try{
var contentResp = await fetch(url);
var contextHtml = await contentResp.text();
parsed = this.parse(contextHtml);
}catch(e){
console.log('fetch error', e)
}

this.push(parsed);
done();
})();
}

parse(contextHtml){
var $ = cheerio.load(contextHtml);
....
....
}
}

mysql 入库写入流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// mysql表模型
const Document = sequelize.define('document', {
id: {
type: Sequelize.INTEGER,
primaryKey: true,
autoIncrement: true,
},
title: Sequelize.STRING,
});

const MysqlWriteStream extends Stream.Writable {

_write(chunk, encoding, done){
var data = JSON.parse(chunk.toString());
(async () => {
await Document.create(data);
done();
})();
}

}

var readStream = new SpiderMysqlSourceReadStream();
var fetchStream = new SpiderFetch();
var writeStream = new MysqlWriteStream();

readStream.pipe(fetchStream).pipe(writeStream);

是不是简洁明了?
在Node中异步流随处可见,谁让它基因就是这样呢。

参考: