Scaling with cluster
We know that Node.js application is single threaded and run in one process. We can create child processes to process cpu intensive jobs with child_process package.
There is another built-in package called Cluster, which we can use to distribute workloads to multiple application instances.
For example, we have an express application, and we want to start the same application multiple times, so we can maximize the usage of cpus. In this case, this cluster package is suitable solution. Now let's walk through the process with a simple example.
Support we have a simple express application like below.
// app.js
const express = require('express');
const compression = require('compression');
const fs = require("fs")
const app = express();
const port = 3000;
app.use(compression());
app.get('/', (req, res) => {
res.writeHead(200, {"content-type": "text/html"})
res.end(fs.readFileSync("./index.html", "utf8"))
});
app.listen(port, () => {
console.log(`listening on port ${port}`);
});
We can just run this app.js
, then this is just one application.
Now let's use cluster to run mutiple applications.
// index.js
const cluster = require("cluster");
const os = require("os");
const numCPUs = os.cpus().length;
if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`);
// Fork workers.
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
} else {
require("./app.js");
console.log(`Worker ${process.pid} started`);
}
Each time the cluster.fork()
is called, this code will be run in a new process again. So inside the code, the isPrimary
is used to check is current process is the master process. If it is, then multiple processes are forked. Then if the code is run in child processes, then the application will be started.
Now let use autocannon to do a simple benchmarking.
First, let start the application with node app.js
, so only one application is started. Then we do the benchmarking.
% autocannon --connections 200 http://localhost:3000
Running 10s test @ http://localhost:3000
200 connections
┌─────────┬───────┬───────┬───────┬───────┬──────────┬────────┬───────┐
│ Stat │ 2.5% │ 50% │ 97.5% │ 99% │ Avg │ Stdev │ Max │
├─────────┼───────┼───────┼───────┼───────┼──────────┼────────┼───────┤
│ Latency │ 11 ms │ 11 ms │ 15 ms │ 16 ms │ 11.78 ms │ 1.7 ms │ 61 ms │
└─────────┴───────┴───────┴───────┴───────┴──────────┴────────┴───────┘
┌───────────┬─────────┬─────────┬────────┬─────────┬─────────┬────────┬─────────┐
│ Stat │ 1% │ 2.5% │ 50% │ 97.5% │ Avg │ Stdev │ Min │
├───────────┼─────────┼─────────┼────────┼─────────┼─────────┼────────┼─────────┤
│ Req/Sec │ 14287 │ 14287 │ 16575 │ 16591 │ 16325.2 │ 681.65 │ 14284 │
├───────────┼─────────┼─────────┼────────┼─────────┼─────────┼────────┼─────────┤
│ Bytes/Sec │ 7.07 MB │ 7.07 MB │ 8.2 MB │ 8.21 MB │ 8.08 MB │ 337 kB │ 7.07 MB │
└───────────┴─────────┴─────────┴────────┴─────────┴─────────┴────────┴─────────┘
Req/Bytes counts sampled once per second.
# of samples: 10
163k requests in 10.02s, 80.8 MB read
Then we start this application with the cluster code node index.js
. You can see that multiple applications are started.
% node index.js
Primary 33506 is running
Worker 33510 started
Worker 33513 started
listening on port 3000
listening on port 3000
Worker 33507 started
Worker 33511 started
Worker 33508 started
Worker 33512 started
listening on port 3000
listening on port 3000
listening on port 3000
listening on port 3000
Worker 33514 started
Worker 33509 started
listening on port 3000
listening on port 3000
Then let's do the same benchmarking again.
% autocannon --connections 200 http://localhost:3000
Running 10s test @ http://localhost:3000
200 connections
┌─────────┬──────┬──────┬───────┬───────┬─────────┬─────────┬────────┐
│ Stat │ 2.5% │ 50% │ 97.5% │ 99% │ Avg │ Stdev │ Max │
├─────────┼──────┼──────┼───────┼───────┼─────────┼─────────┼────────┤
│ Latency │ 1 ms │ 4 ms │ 9 ms │ 11 ms │ 3.77 ms │ 2.72 ms │ 129 ms │
└─────────┴──────┴──────┴───────┴───────┴─────────┴─────────┴────────┘
┌───────────┬───────┬───────┬─────────┬─────────┬─────────┬─────────┬───────┐
│ Stat │ 1% │ 2.5% │ 50% │ 97.5% │ Avg │ Stdev │ Min │
├───────────┼───────┼───────┼─────────┼─────────┼─────────┼─────────┼───────┤
│ Req/Sec │ 26223 │ 26223 │ 49023 │ 54079 │ 47042.4 │ 7777.81 │ 26211 │
├───────────┼───────┼───────┼─────────┼─────────┼─────────┼─────────┼───────┤
│ Bytes/Sec │ 13 MB │ 13 MB │ 24.3 MB │ 26.8 MB │ 23.3 MB │ 3.85 MB │ 13 MB │
└───────────┴───────┴───────┴─────────┴─────────┴─────────┴─────────┴───────┘
Req/Bytes counts sampled once per second.
# of samples: 10
471k requests in 10.04s, 233 MB read
With this cluster method, we can see a significant improvement.
Some improvements
The above cluster code shows how easy we can scale up the application. But to make the service more stable, more work may needs to be done.
First, some workers may die because of errors. So we may need to watch for this event and start a new process again.
cluster.on('exit', (worker, code, signal) => {
console.log('worker %d died (%s). restarting...',
worker.process.pid, signal || code);
cluster.fork();
});
Second, a worker process may die, but before that, we may need want to log some error. So we can do this.
if (cluster.isPrimary) {
// ...
} else {
require("./app.js");
process.on("uncaughtException", (e) => {
// do some logging
console.error(e)
// exit manually
process.exit(1);
});
}
Third, inside the worker process, we may want to watch the memory usage. So if there are some memory leaks, we can detect them and stop the worker.
if (cluster.isPrimary) {
// ...
} else {
require("./app.js");
// watch for memory usage
setInterval(() => {
if (process.memoryUsage().rss > 734003200) {
process.exit(1);
}
}, 5000)
}
Last, to ensure the worker process is still run properly, we may use some ping pong strategy. So for example, after the fork method is called, we could get the worker object const worker = cluster.fork();
. Then we can send messages from master to worker worker.send("ping");
. Then worker process could get this message with process.on("message", msg => {});
and send messages back with process.send("pong");
. Then master process could get this response with worker.on("message", msg => {});
. So with these communication, we can implement a ping pong strategy and if the master have't got the the proper response for some time, we can kill the worker with process.kill(worker.process.pid);
, then start a new one again.
These are some basic strategies for improvement. In reality, you can choose to implement more or use a more sophisticated third-party package to make the service more stable.