在 NodeJs 中,流随处可见,读/写文件流,HTTP请求/返回流,stdin/stdout流。理解并运用好流会让你的Node更具力量。
Stream
- lib/_stream_readable.js
- lib/_stream_writable.js
- lib/_stream_tranform.js
- lib/_stream_duplex.js
流主要有可读 Readable,可写流 Writable,双工可读可写流 Duplex, Transform 流就是继承 Duplex 的。
通过pipe管道,可读流可以pipe到一个或多个可写流。
看源码能发现里面涉及了一堆状态控制的代码,什么时候读,什么时候写,什么时候暂停读。
大部分情况下程序面对的问题。通常都可以抽象成一个输入/输出的问题,中间可能会包含转换。
实际问题怎么运用流呢。
读取大文件
小文件可以一次性读到内存,但如果一个 10G 的文件呢?ReadLine 模块很好用但是你知道背后怎么实现的吗。
试试怎么用 stream.Transform 来自己实现一个readLine
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| const stream = require("stream"); const fs = require("fs");
function ReadLineStream(FILE_PATH) { var transformer = new stream.Transform({ objectMode: true }); transformer._transform = function(chunk, encoding, done) { var data = chunk.toString();
if (this._lastLineData) data = this._lastLineData + data;
var lines = data.split("\n");
this._lastLineData = lines.splice(lines.length - 1, 1)[0];
lines.forEach(this.push.bind(this)); done(); };
transformer._flush = function(done) { if (this._lastLineData) this.push(this._lastLineData); this._lastLineData = null; done(); };
var source = fs.createReadStream(FILE_PATH); source.pipe(transformer); return transformer; }
var fileReader = ReadLineStream("xx.log");
var writable = Stream.Writable({ objectMode: true, write: function(line, _, next) { async () => { var parsed = JSON.parse(line); await mysql.insert("insert into log set ?", parsed); process.nextTick(next); }; return true; } });
fileReader.pipe(writable);
|
通过实现一个消费/可写流我们就可以来对大文件进行处理,比如说实现一个 word count 计数器, 从文件导入到数据库。
而不用担心需要一次性读取整个文件到内存里out of memory这种问题。
通过继承 strean 模块我们也不需要过多的去考虑什么时候该读,什么时候因为写的压力大(背压),该停止读,让整个读写流有序的运行。
你只需要专注于实现你自己的 write read transform
Spark, Strom 的实时计算流也是这样的,大任务分解成小任务,只需要专注于自己业务逻辑的 map,reduce
单机爬虫
再举个栗子, 它的输入可能是一堆 URL、输出是结构化的数据。需要写入到关系型数据库。
- 可以把 URL 数据获取抽象成一个可读流,
- 爬取过程,数据提取抽象成一个 transform 流
- 写入数据库抽象成一个可写流,
只需要约定好每个过程输出的数据模型,就可以在每个过程实现各种目的不一样的流。
- 如数据源,可以是读取文件,MYQL,分布式列队
- 抓取转换流,可以是普通的 HTTP爬虫,Puppeteer 可渲染性爬虫
- 数据存储流,可以写文件,MYQL,或者HDFS
read > transform > write
这样程序看起来是不是特别简洁?
URL 读取流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| const Stream = require('stream'); const fs = require('fs'); const Sequelize = require('sequelize'); const Op = Sequelize.Op
const sequelize = new Sequelize('database', 'username', 'password', { host: 'localhost', dialect: 'mysql', pool: { max: 5, min: 0, acquire: 30000, idle: 10000 }, });
const Url = sequelize.define('urls', { id: { type: Sequelize.INTEGER, primaryKey: true, autoIncrement: true, }, url: Sequelize.STRING, });
class SpiderMysqlSourceReadStream extends Stream.Readable {
constructor(opts){ super(); this.opts = opts || {}; this.filePath = opts.file || ''; this.fileStream = null; this.cursor = 0; this.connection = null; Stream.Readable.call(this, { objectMode: true, highWaterMark: opts.highWaterMark || 1000 }); }
_read(){
var urls = await Model.findAll({ attributes: ['url', 'id'], where: { id: { [Op.gt]: this.cursor } }, limit: 10 });
urls.forEach((row) => { this.cursor = row.id; this.push(row); });
} }
|
把 URL 的变成结构化数据的转换流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| const cheerio = require('cheerio')
class SpiderFetch extends Stream.Transform { _tranform(row, encoding, done){ var url = row.url; (async () => { var parsed = null; try{ var contentResp = await fetch(url); var contextHtml = await contentResp.text(); parsed = this.parse(contextHtml); }catch(e){ console.log('fetch error', e) }
this.push(parsed); done(); })(); }
parse(contextHtml){ var $ = cheerio.load(contextHtml); .... .... } }
|
mysql 入库写入流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| const Document = sequelize.define('document', { id: { type: Sequelize.INTEGER, primaryKey: true, autoIncrement: true, }, title: Sequelize.STRING, });
const MysqlWriteStream extends Stream.Writable {
_write(chunk, encoding, done){ var data = JSON.parse(chunk.toString()); (async () => { await Document.create(data); done(); })(); }
}
var readStream = new SpiderMysqlSourceReadStream(); var fetchStream = new SpiderFetch(); var writeStream = new MysqlWriteStream();
readStream.pipe(fetchStream).pipe(writeStream);
|
是不是简洁明了?
在Node中异步流随处可见,谁让它基因就是这样呢。
参考: