Stream Adventure

Stream Adventure

·

9 min read

Recently, I practice the node.js stream with the stream-adventure repo. I wrote an artile about stream before Basic Stream Operations in Node.js, but its more about basic concepts. This repo provides a more in-depth view, and I learn a lot from it. So in this article, instead of only answers, I will try to explain some my own understanding of how and why. OK, let go!

BEEP BOOP

This is the most simple one. It just requires to write some string in console.

console.log('beep boop')

I guess the point of this problem is to understand the the console.log function will write data to standard output. So below 2 line code has the same effect.

console.log("123")
process.stdout.write("123\n")

MEET PIPE

This problem requires us to pipe a stream(file) to another stream(standard output). So before we try to solve this problem, let's first see what pipe is.

In node.js, there are four types of streams:

  • Readable stream, which data can be read.
  • Writable stream, which data can be written.
  • Duplex stream, which is both Readable and Writable.
  • Transform stream, which is a Duplex stream that can modify or transform the data as it is written and read.

So the pipe method is used to connect a readable stream to a writable stream like below.

readable.pipe(writable)

So if the stream is a duplex stream, then is can be piped to another stream, and also can be piped by another stream.

readable.pipe(duplex).pipe(writable)

So in this problem, a file stream is a readable stream, the standard output is a writable stream. What we will do is just connect these two.

const fs = require('fs')
const file = process.argv[2]

fs.createReadStream(file).pipe(process.stdout)

INPUT OUTPUT

This problem requires us to pipe standard input to standard output. So its basically the same with the previous problem.

The point is we need to understand that the standard input is a readable stream and the standard output is a writable stream.

process.stdin.pipe(process.stdout)

READ IT

This problem asks us to implement a readable stream manually. This can be done by extending the built-in Readable object and implement the _read method. Then we can use the push method to add content to its interval buffer.

const { Readable } = require('stream')

class ReadableStream extends Readable {
  _read (size) {
  }
}

const stream = new ReadableStream()
stream.push(process.argv[2])
stream.pipe(process.stdout)

WRITE TO ME

This problem asks us to implement a writable stream manually. This can be done by extending the built-in Writable object and implement the _write method.

In this method, three parameters are provided:

  • chunk, is the value to be written, commonly a Buffer
  • encoding, character encoding
  • callback, function that will be called when current processing is complete.

The callback function can be provided a Error object to indicate an error or null to indicate a success.

const { Writable } = require('stream')

class MyWritable extends Writable {
  _write (chunk, encoding, callback) {
    console.log(`writing: ${chunk.toString()}`)
    callback()
  }
}

const stream = new MyWritable()
process.stdin.pipe(stream)

TRANSFORM

This problem requires us to implement a transform stream with package through2. This package is a wrapper on the node.js Transform stream.

To complete this tranform, we need to implement 2 functions.

The first one is write function like below.

function write (buffer, encoding, next) {
  this.push('I got some data: ' + buffer + '\n')
  next()
}

Inside this function, we can do any transformations to the data(buffer), then use the push function to provide the new data into its interval buffer, then call next function to end this round processing.

The second one is end function like below.

function end (done) {
  // ...
  done()
}

This function will be called when there is no more data. Then we can do anything then call the done parameter function to end the whole processing.

Both write and end are both optional. If write is not specified, the default implementation passes the input data to the output unmodified. If end is not specified, the default implementation calls this.push(null) to close the output side when the input side ends.

const through = require('through2')

const tr = through(function (buf, _, next) {
  this.push(buf.toString().toUpperCase())
  next()
})
process.stdin.pipe(tr).pipe(process.stdout)

LINES

In this problem, we need to split the input stream into lines and do some transformation according to its line number.

For transformations, we can use the previous approach through2 package. For split stream into lines, we make use of the split2 package. This package is also based on the node.js built-in Transform, we can see that this split lines operation is of course one kind of transformation.

This package is very easy to use, below is the example from its documentation:

fs.createReadStream(file)
.pipe(split2())
.on('data', function (line) {
    //each chunk now is a separate line!
})

With this in mind, let's implement our solution.

const through = require('through2')
const split2 = require('split2')

let lineCount = 0
const tr = through(function (buf, _, next) {
  const line = buf.toString()
  this.push(lineCount % 2 === 0
    ? line.toLowerCase() + '\n'
    : line.toUpperCase() + '\n'
  )
  lineCount++
  next()
})
process.stdin
  .pipe(split2())
  .pipe(tr)
  .pipe(process.stdout)

CONCAT

This problem requires us to concat multiple stream buffers. We can use the concat-stream package.

For example, if the stream is piped from a image file. This file may be a little big, so we will receive multiple stream buffers. With this package, we could concat all these buffers together and receive a whole buffer in the end.

var fs = require('fs')
var concat = require('concat-stream')

var readStream = fs.createReadStream('cat.png')
var concatStream = concat(gotPicture)

readStream.on('error', handleError)
readStream.pipe(concatStream)

function gotPicture(imageBuffer) {
  // imageBuffer is all of `cat.png` as a node.js Buffer
}

function handleError(err) { }

Same logic goes to this problem, let concat buffers from standard input and write the whole buffer data into standard output.

const concat = require('concat-stream')

process.stdin.pipe(concat(function (src) {
  const s = src.toString().split('').reverse().join('')
  process.stdout.write(s)
}))

HTTP SERVER

The point of this problem is to understand that the request and response object are also streams. So in this problem, we can just pipe from request stream to a transfrom stream to do our own operations, and then pipe again to the response stream.

const http = require('http');
const through = require('through2');

const ts = through(function (buf, _, next) {
    this.push(buf.toString().toUpperCase());
    next();
});

const server = http.createServer(function (req, res) {
    if (req.method === 'POST') {

        req.pipe(ts).pipe(res);

    } else res.end('send me a POST\n');
});
server.listen(parseInt(process.argv[2]));

HTTP CLIENT

The point of this problem is to understand the request and response object in http client side are also streams. So we can just pipe standard input into request stream to send to the server and pipe response stream to standard output.

const { request } = require('http')

const options = { method: 'POST' }
const req = request('http://localhost:8099', options, (res) => {
  res.pipe(process.stdout)
})
process.stdin.pipe(req)

WEBSOCKETS

The point of this problem is to learn that the ws also provide a api to handle ws messages in the form of streams.

So the returned object is a duplex stream, we can send messages by writing data and read messages by reading data.

const WebSocket = require('ws')

const ws = new WebSocket('ws://localhost:8099')
const stream = WebSocket.createWebSocketStream(ws)
stream.write('hello\n')
stream.pipe(process.stdout)

HTML STREAM

This one is a little harder. We will be provided by an html stream and do some transformations on the element with specified class and with others unmodified.

This trumpet make this kind of processing much easier. With this package, we can select a part of html and make it a stream too. Then we can do the normal stream operations on this stream and the other part of html will be piped to later stream unmodified.

const trumpet = require('trumpet');
const through = require('through2');
const tr = trumpet();

// sub stream operations
const loud = tr.select('.loud').createStream();
const tr2 = through(function (buf, _, next) {
    this.push(buf.toString().toUpperCase());
    next();
});
loud.pipe(tr2).pipe(loud);

process.stdin.pipe(tr).pipe(process.stdout);

DUPLEXER

Duplex stream is both Readable and Writable stream. So given a readable stream and writable stream, how to combine them together into a duplex stream?

Well, this duplexer2 done this job. We can just pass the readable and writable into it then we have the combined duplex stream.

const { spawn } = require('child_process')
const duplexer = require('duplexer2')

module.exports = function (cmd, args) {
  const ps = spawn(cmd, args)
  return duplexer(ps.stdin, ps.stdout)
}

DUPLEXER REDUX

This problem requires us to understand object mode. Normally, in node.js stream, data passes in the form of buffer. Then we convert this buffer into other forms we want. We can specify that we want object mode, then data passes in the form of object.

So in this problem, we can use the through2 package to create a tranform stream, and use duplexer2 to combine readable and writable into duplex stream.

const duplexer = require('duplexer2')
const through = require('through2').obj

module.exports = function (counter) {
  const counts = {}
  const input = through(write, end)
  return duplexer({ objectMode: true }, input, counter)

  function write (row, _, next) {
    counts[row.country] = (counts[row.country] || 0) + 1
    next()
  }
  function end (done) {
    counter.setCounts(counts)
    done()
  }
}

COMBINER

This process is very similiar to the previous one. But instread of only combine 2 streams, we need to combine multiple streams into a duplex stream.

This stream-combiner can be used to do this. We can pass multiple streams as paramters,

const combine = require('stream-combiner')
const through = require('through2')
const split2 = require('split2')
const zlib = require('zlib')

module.exports = function () {
  const grouper = through(write, end)
  let current

  function write (line, _, next) {
    if (line.length === 0) return next()
    const row = JSON.parse(line)

    if (row.type === 'genre') {
      if (current) {
        this.push(JSON.stringify(current) + '\n')
      }
      current = { name: row.name, books: [] }
    } else if (row.type === 'book') {
      current.books.push(row.name)
    }
    next()
  }
  function end (next) {
    if (current) {
      this.push(JSON.stringify(current) + '\n')
    }
    next()
  }

  return combine(split2(), grouper, zlib.createGzip())
}

CRYPT

This problem basically tells us the crypto operations can be also seen as a streaming processing. The whole logic is the same.

const crypto = require('crypto')

process.stdin
  .pipe(crypto.createDecipheriv('aes256', process.argv[2], process.argv[3]))
  .pipe(process.stdout)

SECRETZ

This last problem tells us the tar package's usage in stream. We can pass tar file streams to this package, and then it will try to split it into files. Every file will be emit one the entry event and we can do another stream operation on each independent file.

const crypto = require('crypto')
const tar = require('tar')
const concat = require('concat-stream')

const parser = new tar.Parse()
parser.on('entry', function (e) {
  if (e.type !== 'File') return e.resume()

  const h = crypto.createHash('md5', { encoding: 'hex' })
  e.pipe(h).pipe(concat(function (hash) {
    console.log(hash + ' ' + e.path)
  }))
})

const cipher = process.argv[2]
const key = process.argv[3]
const iv = process.argv[4]
process.stdin
  .pipe(crypto.createDecipheriv(cipher, key, iv))
  .pipe(parser)