Node Streams Part 1: Readable Streams

7 minute read Published:

A deep dive into readable streams

Whether you know it or not, if you have coded anything in Node.js it is likely that you have used streams. An express server serves streams as its responses and accepts streams as the requests.

You can think of a stream as a path for chunked data to travel down. Data is buffered and sent along in chunks, like trucks down a highway. In this way you can send a large amount of data in a manageable way to different processes without sacrificing performance. The data passed through a stream can be read, written, and transformed. There are three main types of streams and they are most likely piped together in this way:

readable.pipe(duplex).pipe(writable)

Three Types of Streams

  1. Readable Stream (ex: http.IncomingMessage)
  2. Writable Stream (ex: http.ServerResponse)
  3. Duplex Stream (ex: net.socket)
    a. Transform Stream (ex: crypto.createCipher)

Let’s get into each one in detail using a number of examples. The full repo for these examples are available here, and were written for a talk I gave at an NYC JavaScript Meetup. For all of the following examples we’ll use this text file named tyrion.txt containing the following:

It’s not easy being drunk all the time. If it were easy, everyone would do it.

Readable Stream

A readable stream is just a stream of data that you can read from. Here is a basic node app that just reads a file to standard output.

Readable Stream Example

const fs = require('fs')

// ReadableStream
let about = fs.createReadStream('tyrion.txt')

about.pipe(process.stdout)

This script, when run, will output the text It’s not easy being drunk all the time. If it were easy, everyone would do it. to standard output. The variable about is the readable stream generated by fs.createReadStream. The pipe function allows you to compose these streams together in order to process a stream in sequence. The stream at the beginning of a pipe operation is the readable stream and the stream at the end of the pipe operation is the writable stream. That’s right, process.stdout is a writable stream!

and take a look at the Node JS source code for the function, fs.createReadStream.

fs.createReadStream = function(path, options) {
  return new ReadStream(path, options);
};
util.inherits(ReadStream, Readable);

Pretty simple right? It’s just syntactic sugar for constructing a ReadStream. Readable is defined as such:

// inserted here for context
const Stream = require('stream').Stream;
const Readable = Stream.Readable;

The util.inherits function allows for an object to inherit another, in this case ReadStream will inherit Readable.

// simplified version of the inherits function
function inherits(ctor, superCtor) {
  ctor.super_ = superCtor;
  Object.setPrototypeOf(ctor.prototype, superCtor.prototype);
}

fs.createReadStream then goes on to define specifics that is a bit different from a generic Readable stream. Feel free to explore that on your own.

Readable Stream Properties

So what can a Readable Stream do? If you take a look at the Node JS documentation, you’ll find a great overview of what is available.

All streams are instances of EventEmitter, which means that they can emit events that we can listen to. The readable stream emits the following events:

  • Event: ‘close’
  • Event: ‘data’
  • Event: ‘end’
  • Event: ‘error’
  • Event: ‘readable’

The Close Event

When a stream and its resources have been closed, this event fires. This signifies that no more events will occur and that the stream is done. It cannot handle any more data and it will not do any more computation. It is not a guarantee that a Readable stream will emit the ‘close’ event, so do not depend on it for listening to the end of a stream.

The Data Event

Get comfortable with this event. It is the workhorse of the stream. It lets you access the data that is being passed through the stream. Note that the data you receive is often a Buffer or a string which requires some decoding. For example:

const fs = require('fs')

// ReadableStream
let about = fs.createReadStream('tyrion.txt')

about.on('data', (chunk) => {
		// chunk is an instance of a Buffer
    console.log(chunk.toString())
})

about.pipe(process.stdout)

Data can also be passed or read as a predefined JS object. To learn more about object mode, read “Understanding Object Streams” by Fionn Kellehr

The Error Event

Once you understand how EventEmitters work, it should be pretty intuitive as to how to use the rest of the events. I want to bring attention to the Error Event because it is always extremely important to make sure you handle all errors properly. One of the worst things you can do is to allow errors to slip through unhandled causing your app to explode. If you are implementing your own ReadableStream, always make sure you have some sort of error handling set up.

It is always better to assume the code you’re writing will fail instead of the other way around. The error event is passed one parameter, the error object.

about.on('error', (err) => {
		handleStreamError(err)
})

The Readable Event

This event is triggered when a stream either has data available to be read or when the end of the stream has been reached. In order to actually get the data from the stream when this event fires, you have to call stream.read(). If the event fired because the end of the stream has been reached then the result of stream.read() will be null. This event will be fired before the end event. You probably don’t need to use this event as data and end are easier to understand, but using this event can result in higher throughput.

about.on('readable', () => {
    let chunk
    while (null !== (chunk = about.read())) {
        console.log(`Received data: ${chunk.toString()}`)
    }
})

As I mentioned before, in general you shouldn’t use this but it is good to know about it.

Read Stream Methods

In addition to events, the read stream has a number of methods for your use that are extremely helpful.
* Method: pause()
* Method: resume()
* Method: pipe()
* Method: destroy()

The Pause and Resume Methods

Streams can be paused and resumed, which can make orchestrating asynchronous actions easier. The pause method stops any ‘data’ events from firing and will continue to fill the internal buffer until the high water mark is hit. The resume method will set the flowing property on the stream to true. Using resume can also allow you to consume a stream in its entirety without actually processing any of its data.

about.resume().on('end', () => {
    console.log('reached the end but did not read anything.')
})

The High Water Mark

This is a good time to discuss the highWaterMark and what it is. If you think of a stream as a river, then when you call stream.pause() you are effectively inserting a dam in the river. The highWaterMark relates to the height of the banks of the river. The river will continue to fill with data until the banks are overflowed. Luckily, Node implements safety measures to ensure that the banks don’t actually overflow, as that would cause data loss. If you are doing an expensive computation on a chunk of data it may be a good idea to lower the highWaterMark to make sure you aren’t storing a lot of data in memory.

The Pipe Method

The pipe method will probably be the one method you use the most! Make sure you understand how to use it. In reality, it’s pretty simple. The pipe just allows you to connect the end of one stream to the start of another, like fitting pvc pipes together. The one thing to remember with pipe is that you must pipe a readable stream to a writable stream and a writable stream to a readable stream. If you pipe a readable stream to another readable stream, the second one will have no idea what to do. The only exception is if the readable stream is also a writable stream (making it a duplex stream). The other thing to note about piping streams is that if you pipe a readable stream to a writable stream that has been closed you will get an error.

about.pipe(writableStream)
aboutTwo.pipe(writableStream)

If writableStream is a standard writable stream then the second pipe will fail. This is because the pipe function automatically emits the end event which makes the writable stream no longer open. To bypass this you can pass the end property to the pipe options:

about.pipe(writableStream, {end: false})
aboutTwo.pipe(writableStream)
comments powered by Disqus