Node.js Stream Backpressure

Node.js Stream Backpressure

·

4 min read

When we try to write our own custom stream implementation in Node.js, a concept called back pressure we may will run into. We know that in basic stream logic, we have readable stream and writeable stream and we want to pass readable stream to writeable stream. But because the reading speed and writing speed is different, so we may encouter memory problem in this process of streaming.

For example, normally the writing speed is a lot lower than reading speed. So when we reading data and pass it to the writeable stream, the writeable stream cannot catch up the speed, so the data will be stored in a temporary buffer. As this process going, this buffer is bigger and bigger. And finally, we have this out of memory error.

Take below code as an example. The server accepts client post data and store them into a file.

app.post("data", (req, res) => {
    const ws = fs.createWriteStream("./data");

    const bufs = [];
    req
        .on("data", buf => {
            bufs.push(buf);
        })
        .on("end", () => {
            ws.write(Buffer.concat(bufs));
            ws.destroy();
            res.end();
        })
        .on("error", err => {
            console.log(err);
        });
});

This is a pretty bad implementation. All the data stored in the bufs array may cause memory issue if the data is large. We can make some changes and pass the data chunks into the writeable stream directly.

app.post("data", (req, res) => {
    const ws = fs.createWriteStream("./data");

    req
        .on("data", buf => {
            ws.write(buf);
        })
        .on("end", () => {
            ws.destroy();
            res.end();
        })
        .on("error", err => {
            console.log(err);
        });
});

This is better. As we just talked about, if the writing speed is lower than the reading speed, data stored in stream buffer may accumulate, so we may still have the memory issue.

To control this hehavior, there is a parameter called highWaterMark is streams. This works more like a hint as of a limit. When the buffer size in stream exceeds this parameter, we will get this information when we call the write method. So when this happens, we need to pause reading and start to wait. As the writing process going, the buffer become smaller, then if it is lower than this highWaterMark parameter, we will get noticed by the drain event. Then we can begin to read again. Let's code this logic.

app.post("data", (req, res) => {
    const ws = fs.createWriteStream("./data");

    req
        .on("data", buf => {
            const result = ws.write(buf);

            // stop reading when buffer size exceed highWaterMark
            if (result === false) {
                req.pause();

                // resume reading when buffer size lower than highWaterMark
                ws.on("drain", () => {
                    req.resume();
                });
            }
        })
        .on("end", () => {
            ws.destroy();
            res.end();
        })
        .on("error", err => {
            console.log(err);
        });
});

This is the whole demonstration for how to use back pressure to avoid the memory issue. Actually, this logic is already handled well if we use pipe. Code is like below.

app.post("data", (req, res) => {
    const ws = fs.createWriteStream("./data");

    req
        .pipe(ws)
        .on("error", err => {
            console.log(err);
        });
});

But if we want to create our own stream implementation, the back pressure logic should be implemented.

The best way to fully understand this resume/pause process is to play with your own example. Below code show the raw process between a custom readable stream and a custom writeable stream. Play with it to get deeper understanding.


const { Writable, Transform, Readable } = require('stream');
const fs = require("fs");

// create a custom writeable stream
const ws = new Writable({
    highWaterMark: 16,
    write(chunk, encoding, cb) {
        console.log("write data");
        cb();
    }
});

let count = 3;
let i = 0;

// create a custom readable stream
const rs = new Readable({
    highWaterMark: 64,
    read() {
        if (i < count) {
            const buf = Buffer.alloc(64);

            console.log("push data: ", i);
            const result = this.push(buf);
            console.log("push result: ", result);

            i++;

        } else {
            console.log("read end", i);
            this.push(null);
        }
    }
});

// attach data event to start streaming
rs.on("data", data => {
    console.log("on data");
    const result = ws.write(data);
    console.log("write result: ", result);

    if (result === false) {
        console.log("pause");

        rs.pause();
        ws.once("drain", () => {
            console.log("writeable drain, resume");
            rs.resume();
        });
    }
});

rs.on("end", () => {
    console.log("read end");
    ws.destroy();
});

ws.on("close", () => {
    console.log("write close");
});

// % node 1.js
// push data:  0
// push result:  false
// push data:  1
// push result:  false
// on data
// write data
// write result:  false
// pause
// writeable drain, resume
// push data:  2
// push result:  false
// on data
// write data
// write result:  false
// pause
// writeable drain, resume
// read end 3
// on data
// write data
// write result:  false
// pause
// read end
// write close