First of all: The new streams implementation is downward compatible. If you start using it the old way, it will behave the old way (minus some issues). There is no need to migrate anything when switching to Node 0.10. The new implementation has some advantages over the old implementation, like "backpressure" handling, so we don't have to deal with pause and resume logic anymore. If you know your code is only running on Node 0.10 or newer, using streams 2 will make streaming more robust and simpler.

Using new streams

Consider a simple http client that pulls down a website. With old streams the code would look like this:

var http = require('http');
http.get('http://maxantoni.de/blog/feed.rss', function (res) {
  res.on('data', function (data) {
    process.stdout.write(data);
  });
});

The same thing using streams 2:

var http = require('http');
http.get('http://maxantoni.de/blog/feed.rss', function (res) {
  res.on('readable', function () {
    process.stdout.write(res.read());
  });
});

The main difference here is that with old streams, you could miss 'data' events if you registered the listener too late. With streams 2, you have to actively consume the data using the read function and therefore you're never loosing anything.

Writeable streams did not change from a users perspective. Just call write with a string or a buffer. Pipes also work the same way as they did before. The above example could be shortened to this:

var http = require('http');
http.get('http://maxantoni.de/blog/feed.rss', function (res) {
  res.pipe(process.stdout);
});

Implementing a readable stream

Here is a simple implementation of a readable stream that produces ISO timestamps on every read with a slight delay:

var stream = require('stream');

function TimestampStream(count, options) {
  stream.Readable.call(this, options);
  this.count = count;
}

TimestampStream.prototype = Object.create(stream.Readable.prototype, {
  constructor : { value : TimestampStream }
});

TimestampStream.prototype._read = function () {
  if (this.count-- === 0) {
    this.push(null); // end
  } else {
    var s = this;
    setTimeout(function () {
      s.push(new Date().toISOString() + '\n');
    }, 10);
  }
};

Print 20 timestamps:

new TimestampStream(20).pipe(process.stdout);

2013-06-27T20:56:33.944Z
2013-06-27T20:56:33.959Z
2013-06-27T20:56:33.972Z
2013-06-27T20:56:33.983Z
...

Implementing a writable stream

Here is a writable stream implementation that only prints out a character every 25 milliseconds:

var stream = require('stream');

function writeForMovie(str, then) {
  if (str.length === 0) {
    return then();
  }
  setTimeout(function () {
    process.stdout.write(str.charAt(0));
    writeForMovie(str.substring(1), then);
  }, 25);
}

function HollywoodOut(options) {
  stream.Writable.call(this, options);
}

HollywoodOut.prototype = Object.create(stream.Writable.prototype, {
  constructor : { value : HollywoodOut }
});

HollywoodOut.prototype._write = function (data, encoding, cb) {
  var str = data.toString();
  writeForMovie(str, cb);
};

Now let's combine the readable and the writable streams:

new TimestampStream(20).pipe(new HollywoodOut());

2013-06-27T21:02:21.243Z
2013-06-27T21:02:21.255Z
2013-06-27T21:02:21.267Z
2013-06-27T21:02:21.278Z
...

As you can see from the timestamps, all 20 lines get produced as before. The output is buffered by the writable stream and then slowly written out. To configure the buffer size, use the highWaterMark option:

var out = new HollywoodOut({ highWaterMark : 30 });
new TimestampStream(20).pipe(out);

2013-06-27T21:04:21.147Z
2013-06-27T21:04:21.168Z
2013-06-27T21:04:21.182Z
2013-06-27T21:04:22.501Z
2013-06-27T21:04:22.513Z
2013-06-27T21:04:23.838Z
...

This setup leads to "backpressure" being produced by the writable stream. pipe handles backpressure automatically for us.
Note how the timestamps show when read was called.

Read more about streams 2

Happy streaming!


comments powered by Disqus