🍃このブログは移転しました。
3秒後、自動的に移動します・・・。

NodeJSのStreamについておさらい

とあるNodeJSのサーバー実装を読んでて、頻出するこいつらに目が慣れてなかったせいで、読み進めるのに時間がかかってた。

ので、思い出しながら、おさらいがてらメモを残してたので記事にしておく。

考え方やデザインがどうっていうよりも、コードを読む時に思い出せるようにって感じの切り口になってます。

Stream | Node.js v13.12.0 Documentation

よくみるAPI

  • Readable: ソース
  • Writable: 目的地
  • Duplex: ReadableでありWritable
  • Transform: Duplexと同じだが、その名の通り加工してつなぐ用
  • finished()
  • pipeline()

すべてビルトインの`stream`から。

`finished`と`pipeline`はv10から追加されたもので、それまで同等のことをするには、`readable-stream`を別途インストールして、そこから使ってたぽい。

ちなみにTypeScriptの型はこちら。

DefinitelyTyped/stream.d.ts at master · DefinitelyTyped/DefinitelyTyped · GitHub

更新されてずれてそう。

使われ方の基本

Streamはつなげてなんぼなので、まずそのあたりから。

const { pipeline } = require('stream');

// simple
readable.pipe(writable);

// pipe chaining
readable
  .pipe(transform)
  .pipe(writable);

// pipeline
pipeline(readable, transform, writable, callback);
pipeline(readable, transform1, transform2, writable, callback);

こうすると上流である`readable`からデータが流れてく。

`readable`は`implements Readable`な感じで、`Duplex`でもいい。`writable`も同じく。

なのでこういうコードもある。

pipeline(readable, duplex, callback); // duplex as writable
pipeline(duplex, writable, callback); // duplex as readable

readable → duplex → writableという流れ。

流れるきっかけ

`Readable`なストリームも用意しただけじゃデータは流れない。

  • `on('data')`する
  • `pipe()`する
    • 内部的に`on('data')`してる
    • `pipeline()`も同じ
  • `resume()`する

こうすることで流れ出す。

内部的にも`readableFlowing`というGetterから取れるフラグがある。

const r = getReadableStreamSomehow();
console.log(r.readableFlowing); // null

r.on('data', console.log);
console.log(r.readableFlowing); // true

r.pause();
console.log(r.readableFlowing); // false

r.resume();
console.log(r.readableFlowing); // true

基本的には`pipe()`だけ使ってれば良いはず。

ただし、流れるデータを特定の単位で扱いたいとかの要件があるなら、手動で`on('data')`したり`pause/resume()`したりする。

`on('readable')` / `readable.read()`についてはいったん割愛。

さて、接続はこれでできるけど、データを流すためにはそれぞれちゃんと実装されてる必要があり・・。
というわけで実装する側のポイントへ。

Readable

いわゆるデータのEmitter。

パターンに応じて、`_read()`もしくは`read()`を実装する必要がある。

実装のパターンはいくつかあるけど、結果的には同じ。

const { Readable } = require('stream');

// new はあってもなくても
const r1 = new Readable({
  // _read ではない
  read() {
    this.push(`${Date.now()}\n`);
  }
})

const r2 = new Readable();
// read ではない
r2._read = () => {
  r2.push(`${Date.now()}\n`);
};

const r3 = new class extends Readable {
  // read ではない
  _read() {
    this.push(`${Date.now()}\n`);
  }
}();

`new Readable(options)`が一番シュッと書けるけど、`constructor()`でいろいろやりたいのでだいたい`class`で書くことになりそう。

`Readable`なストリームは、`push()`を使って任意のタイミングでバッファにデータを貯める。
`push(null)`は特別で、そのストリームのデータの終端を表す。

`read`と`_read`は空の関数を実装しておいて、任意のタイミングで`push()`する実装例もある。

この例だと、ものすごい勢いで`Date.now()`が流れるストリームができる。

Writable

いわゆるデータのReceiver。

パターンに応じて、`_write()`もしくは`write()`を実装する必要がある。

const { Writable } = require('stream');

const w = new Writable({
  write(data, _encoding, callback) {
    console.log(data);
    callback();
  }
});

コードの構造は`Readable`と一緒。

上層からデータが流れてくると、`write()`(か`_write()`)が呼ばれるので、そこでやりたい処理を書く。

この例だとコンソールに出力するだけのストリーム。

Duplex

`Readable`であり`Writable`でもある。

なのでパターンに応じて、`read()` or `_read()`および`write()` or `_write()`を実装しないといけない。

それぞれにバッファがあるのがポイント。
なのでオプションも`(readable|writable)ObjectMode`と2つあって、`(readable|writable)HighWaterMark`と2つある。

1人2役できるので便利やけど、コードを読むのがちょっと大変ではある・・。

Transform

特殊な`Duplex`ストリーム。

パターンに応じて、`_transform()`か`transform()`を実装する必要がある。

class extends Transform {
  _transform(data, encoding, callback) {
    // transform w/ data

    // そして次へ送る
    this.push(transformedData);
    callback();

    // こう書いても同じ
    // callback(null, transformedData);
  }
}

もちろん`push()`をサボってもいい。
ただサボるなら`Transform`である必要はない・・。

そのほか

実装すべきメソッド

`_final()`とか`_flush()`とか、ストリームの種類によってはまだいろいろある。

流れるデータの型

基本的に`String`か`Buffer`だが、`{ objectMode: true }`をストリーム初期化時に渡すと、`null`以外の値はそのまま流せるようになる。

ただし`pipe()`または`pipeline()`でつなぐストリームが同じオプションになってないと、思わぬ挙動になるので注意とのこと。

流れるデータの流量

`highWaterMark`オプションで調整する。

highWaterMarkから探るNode.jsのStreamの仕組み - Yahoo! JAPAN Tech Blog

finished() / pipeline()

ストリームのエラー検知、後始末に使えるやつ。

const { finished } = require('stream');

const rs = getReadableStreamSomehow();
finished(rs, err => {
  if (err) {
    console.error('Stream failed.', err);
  } else {
    console.log('Stream is done reading.');
  }
});

`util.promisify(finished)`すれば、PromiseなコードとStreamのコードを混ぜて使える。

`pipeline()`もそうで、最後に渡したコールバックでまとめてエラー処理ができる。

同じく`util.promisify(pipeline)`できる。

おわりに

いざ自分が実装するとなると、もう少し知っておくべきメソッドやらイベントやらがある。

ただ、なんとなくコードを読むだけならコレで十分なのではないかなーと思う。

より詳しい記事みつけた。

Understanding Streams in Node.js - NodeSource