First of all: The new streams implementation is downward compatible. If you start using it the old way, it will behave the old way (minus some issues). There is no need to migrate anything when switching to Node 0.10. The new implementation has some advantages over the old implementation, like "backpressure" handling, so we don't have to deal with pause and resume logic anymore. If you know your code is only running on Node 0.10 or newer, using streams 2 will make streaming more robust and simpler.

Using new streams

Consider a simple http client that pulls down a website. With old streams the code would look like this:

var http = require('http');
http.get('', function (res) {
  res.on('data', function (data) {

The same thing using streams 2:

var http = require('http');
http.get('', function (res) {
  res.on('readable', function () {

The main difference here is that with old streams, you could miss 'data' events if you registered the listener too late. With streams 2, you have to actively consume the data using the read function and therefore you're never loosing anything.

Writeable streams did not change from a users perspective. Just call write with a string or a buffer. Pipes also work the same way as they did before. The above example could be shortened to this:

var http = require('http');
http.get('', function (res) {

Implementing a readable stream

Here is a simple implementation of a readable stream that produces ISO timestamps on every read with a slight delay:

var stream = require('stream');

function TimestampStream(count, options) {, options);
  this.count = count;

TimestampStream.prototype = Object.create(stream.Readable.prototype, {
  constructor : { value : TimestampStream }

TimestampStream.prototype._read = function () {
  if (this.count-- === 0) {
    this.push(null); // end
  } else {
    var s = this;
    setTimeout(function () {
      s.push(new Date().toISOString() + '\n');
    }, 10);

Print 20 timestamps:

new TimestampStream(20).pipe(process.stdout);


Implementing a writable stream

Here is a writable stream implementation that only prints out a character every 25 milliseconds:

var stream = require('stream');

function writeForMovie(str, then) {
  if (str.length === 0) {
    return then();
  setTimeout(function () {
    writeForMovie(str.substring(1), then);
  }, 25);

function HollywoodOut(options) {, options);

HollywoodOut.prototype = Object.create(stream.Writable.prototype, {
  constructor : { value : HollywoodOut }

HollywoodOut.prototype._write = function (data, encoding, cb) {
  var str = data.toString();
  writeForMovie(str, cb);

Now let's combine the readable and the writable streams:

new TimestampStream(20).pipe(new HollywoodOut());


As you can see from the timestamps, all 20 lines get produced as before. The output is buffered by the writable stream and then slowly written out. To configure the buffer size, use the highWaterMark option:

var out = new HollywoodOut({ highWaterMark : 30 });
new TimestampStream(20).pipe(out);


This setup leads to "backpressure" being produced by the writable stream. pipe handles backpressure automatically for us.
Note how the timestamps show when read was called.

Read more about streams 2

Happy streaming!

comments powered by Disqus