DailyJS

DailyJS

The JavaScript blog.


AuthorRoly Fentanes
Featured

tutorials node streams

Mastering Node Streams: Part 2

Posted on .

If you've ever used the Request module, you've probably noticed that calling it returns a stream object synchronously. You can pipe it right away. To see what I mean, this is how you would normally pipe HTTP responses:

var http = require('http');

http.get('http://www.google.com', function onResponse(response) {  
  response.pipe(destinationStream);
});

Compare that example to using the Request module:

var request = require('request');

request('http://www.google.com').pipe(destinationStream);  

That's easier to understand, shorter, and requires one less level of indentation. In this article, I'll explain how this is done so you can make modules that work this way.

How to do It

First, it's vitally important to understand how the stream API works. If you haven't done so yet, take a look at the stream API docs, I promise it's not too long.

First, we'll take a look at readable streams. Readable streams can be paused()d and resume()d. If we're using another object to temporarily represent it while it's not available, the reasonable thing to do would be to keep a paused property on this object, updated properly as pause() and resume() are called. Some readable streams also have destroy() and setEncoding(). Again, the first thing that might come to mind is to keep the properties destroyed and encoding on the temporary stream.

But, not all readable streams are created equal, some might have more methods or they might not have a destroy() method. The most reliable method I've found is to look at the stream's prototype, iterate through the functions including those it inherits, and buffer all calls to them until the real stream is available. This works for a writable stream's write() and end() methods, and for even emitter methods such as on().

Standard stream methods don't return anything, except for write() which returns false if the write buffer is full. In this case it will be false as long as the real stream is not yet available.

Another special case is pipe(). Every readable stream's pipe method works the same way. It doesn't need to be overwritten or queued. When pipe() is called, it listens for events from both the source and destination streams. It writes to the destination stream whenever data is emitted from the source, and it pauses and resumes the source as needed. We're already queueing calls to methods inherited from event emitter.

What about emitting an event before the real source stream is available? You couldn't do this if you queued calls to emit(). The events would fire only after the real stream becomes available. If you're a perfectionist, you would want to consider this very rare case and come up with a solution.

Introducing Streamify

Streamify does all of this for you, so you don't have to deal with the complexities and still get the benefits of a nicer API. Our previous http example can be rewritten to work like Request does.

var http = require('http');  
var streamify = require('streamify');

var stream = streamify();  
http.get('http://www.google.com', function onResponse(response) {  
  stream.resolve(response);
});

// `stream` can be piped already
stream.pipe(destinationStream);  

You might think this is unnecessary since Request already exists and it already does this. Keep in mind Request is only one module which handles one type of stream. This can be used with any type of stream which is not immediately available in the current event loop iteration.

You could even do something crazy like this:

var http = require('http');  
var fs = require('fs');  
var streamify = require('streamify');

function uploadToFirstClient() {  
  var stream = streamify({ superCtor: http.ServerResponse });

  var server = http.createServer(function onRequest(request, response) {
    response.writeHead(200);
    stream.resolve(response);
  }).listen(3000);

  stream.on('pipe', function onpipe(source) {
    source.on('end', server.close.bind(server));
  });

  return stream;
}

fs.createReadStream('/path/to/myfile').pipe(uploadToFirstClient);  

In the previous example I used Node's standard HTTP module, but it could easily be replaced with Request -- Streamify works fine with Request.

Streamify also helps when you need to make several requests before the stream you actually want is available:

var request = require('request');  
var streamify = require('streamify');

module.exports = function myModule() {  
  var stream = streamify();

  request.get('http://somesite.com/authenticate', function onAuthenticate(err, response) {
    if (err) return stream.emit('error', err);

    var options = { uri: 'http://somesite.com/listmyfiles', json: true };
    request.get(options, function onList(err, result) {
      if (err) return stream.emit('error', err);
      stream.resolve(request.get('http://somesite.com/download/' + result.file));
    });
  });

  return stream;
};

This works wonders for any use case in which we want to work with a stream that will be around in the future, but is preceded by one or many asynchronous operations.

streamland

Featured

tutorials node streams

Mastering Node Streams: Part 1

Posted on .

Streams are one of the most underused data types in Node. If you're deep into Node, you've probably heard this before. But seeing several new modules pop up that are not taking advantage of streams or using them to their full potential, I feel the need to reiterate it.

The common pattern I see in modules which require input is this:

var foo = require('foo');

foo('/path/to/myfile', function onResult(err, results) {  
  // do something with results
});

By only having your module's entry point be a path to a file, you are limiting the stream they could use on it to a readable file stream that they have no control over.

You might think that it's very common for your module to read from a file, but this does not consider the fact that a stream is not only a file stream. A stream could be several things. It could be a parser, HTTP request, or a child process. There are several other possibilities.

Only supporting file paths limits developers -- any other kind of stream will have to be written to the file system and then read later, which is less efficient. One reason for this is the extra memory it takes to store the stream. Secondly, it takes longer to stream the file to disk and then read the data the user needs.

To avoid this, the above foo module's API should be written this way

var stream = fs.createReadStream('/path/to/myfile');  
foo(stream, function onResult(err, result) {  
  // do something with results
});

Now foo can take in any type of stream, including a file stream. This is perhaps too long-winded when a file stream is passed; the fs module has to be required, and then a suitable stream must be created.

The solution is to allow both a stream and a file path as arguments.

foo(streamOrPath, function onResult(err, results) {  
  // ...
});

Inside foo, it checks the type of streamOrPath, and will create a stream if needed.

module.exports = function foo(streamOrPath, callback) {  
  if (typeof streamOrPath === 'string') {
    stream = fs.createReadStream(streamOrPath);
  } else if (streamOrPath.pipe && streamOrPath.readable) {
    stream = streamOrPath;
  } else {
    throw new TypeError('foo can only be called with a stream or a file path');
  }

  // do whatever with `stream`
};

There you have it, really simple right? So simple I've created a module just for this common use case, called streamin.

var streamin = require('streamin');

module.exports = function foo(streamOrPath, callback) {  
  var stream = streamin(streamOrPath);

  // do whatever with `stream`
};

Don't be fooled by its name, streamin works with writable streams too.

In the next part, I'll show you how modules like request return streams synchronously even when they're not immediately available.