yaox023
yaox023's blog

yaox023's blog

Basic Stream Operations in Node.js

Basic Stream Operations in Node.js

yaox023's photo
yaox023
·Jul 24, 2022·

5 min read

Subscribe to my newsletter and never miss my upcoming articles

Read/Write Stream

Below example shows the basic usage of readable stream and writeable stream.

const fs = require("fs");

const filePath = "./hello.txt";

const writeStream = fs.createWriteStream(filePath);
writeStream.write("hello,");
writeStream.write("world.");

const readStream = fs.createReadStream(filePath);
readStream.on("data", (data) => {
    console.log("read: ", data.toString());
});
readStream.on("close", () => {
    console.log("read close");
});

We can use different source for readable stream. Below example shows how to create our own stream source.

const { Readable } = require("stream");

async function* generate() {
    yield "Hello";
    yield "World";
}

const rs = Readable.from(generate());
rs.on("data", data => {
    console.log(data);
});

Instead of data event, we can use promise api to read too.

const fs = require("fs");

const rs = fs.createReadStream("hello.txt");

(async () => {
    for await (const chunk of rs) {
        console.log("read: ", chunk.toString());
    }
    console.log("close");
})();

Two Reading Modes

Readable streams effectively operate in one of two modes: flowing and paused. The difference is:

  • In flowing mode, data is read automatically and provided to the user
  • In paused mode, the stream.read() method should be called to read data

By default, a readable stream is in paused mode. We can register a data event or use pipe method to switch to flowing mode like below example.

const fs = require("fs");

const rs = fs.createReadStream("./hello.txt");
rs.pipe(process.stdout)

Below example shows how to read data manually in paused mode.

const fs = require("fs");

const rs = fs.createReadStream("./hello.txt");

rs.on("readable", () => {
    let data = rs.read();
    while (data !== null) {
        console.log("read: ", data.toString());
        data = rs.read();
    }
});

Transform Stream

Transform stream could accept a stream, do some transformations then pass it to another stream.

Blow example shows how to pipe a readable stream to a transform stream and then pipe it to a writeable stream.

const fs = require("fs");
const { Transform } = require("stream");

const rs = fs.createReadStream("./hello.txt");
const ws = fs.createWriteStream("./world.txt");
const ts = new Transform({
    transform(chunk, encoding, callback) {
        const changed = chunk.toString().toUpperCase();
        callback(null, changed);
    }
});

rs.pipe(ts).pipe(ws);

Transform stream is very useful. Below example shows how to use it to gzip a file.

const fs = require("fs");
const zlib = require("zlib");
const gzip = zlib.createGzip();
const input = fs.createReadStream("hello.txt");
const output = fs.createWriteStream("hello.txt.gz");
input.pipe(gzip).pipe(output);

Pipeline

Pipeline is a tool to put multiple stream piping together.

Below example shows the usage of transform stream in a pipeline.

const fs = require("fs");
const { Transform, pipeline } = require("stream");

const rs = fs.createReadStream("./hello.txt");
const ws = fs.createWriteStream("./world.txt");
const ts = new Transform({
    transform(chunk, encoding, callback) {
        const changed = chunk.toString().toUpperCase();
        callback(null, changed);
    }
});

pipeline(
    rs,
    ts,
    ws,
    err => {
        if (err) {
            console.error(err)
        } else {
            console.log("pipeline succeeed")
        }
    }
)

There is also a pipeline promise version we can use.

const fs = require("fs");
const { Transform, pipeline } = require("stream");
const util = require("util");

const rs = fs.createReadStream("./hello.txt");
const ws = fs.createWriteStream("./world.txt");
const ts = new Transform({
    transform(chunk, encoding, callback) {
        const changed = chunk.toString().toUpperCase();
        callback(null, changed);
    }
});

(async () => {
    try {
        await util.promisify(pipeline)(
            rs, ts, ws
        );
        console.log("success");
    } catch (e) {
        console.error(e);
    }
})();

Create Readable Stream

Now let's try to create stream manually. The basic process is like below. Let's first see the code and then I will explain the key point.

const { Readable } = require("stream");

const arr = [
    "python",
    "golang",
    "javascript",
    "rust",
    "typescript"
];

class MyReadableStream extends Readable {

    constructor(arr) {
        super();
        this.arr = arr;
        this.i = 0;
    }

    _read() {
        if (this.i < this.arr.length) {
            this.push(this.arr[this.i]);
            this.i++;
        } else {
            this.push(null);
        }
    }
}

const rs = new MyReadableStream(arr);
rs.on("data", data => {
    console.log(data);
});
rs.on("end", () => {
    console.log("end");
});

As you can see, this code creates a readable stream from an array manually. First, we need to create a class extends to the Readable object. Then we implement the _read method. Inside this method, we push the array data one by one. So later when this stream starts flowing, this method will be called constantly. Note that we call the push method this.push(null) when array data is consumed. This tells the stream that all data is done.

Then we run this code, and the output is like below.

 node index.js
<Buffer 70 79 74 68 6f 6e>
<Buffer 67 6f 6c 61 6e 67>
<Buffer 6a 61 76 61 73 63 72 69 70 74>
<Buffer 72 75 73 74>
<Buffer 74 79 70 65 73 63 72 69 70 74>
end

As few options we can choose. First if we set the encoding parameter, then this data flowing is not a raw buffer, it will be a string with the specified encoding.

class MyReadableStream extends Readable {

    constructor(arr) {
        super({ encoding: "utf-8" });
        this.arr = arr;
        this.i = 0;
    }

    // ...
}

Execution output is like this.

 node index.js
python
golang
javascript
rust
typescript
end

Or we can specify the object mode, then data flowing as the form of object.

class MyReadableStream extends Readable {

    constructor(arr) {
        super({objectMode: true});
        this.arr = arr;
        this.i = 0;
    }

    _read() {
        if (this.i < this.arr.length) {
            this.push({data: this.arr[this.i]});
            this.i++;
        } else {
            this.push(null);
        }
    }
}

Execution output is like this.

 node index.js
{ data: 'python' }
{ data: 'golang' }
{ data: 'javascript' }
{ data: 'rust' }
{ data: 'typescript' }
end
 
Share this