Cluster Mode Implementation - Complete Guide
Published: September 25, 2024 | Reading time: 21 minutes
Cluster Mode Overview
Node.js cluster mode enables multi-process applications for better performance:
Cluster Benefits
# Cluster Benefits
- Multi-core utilization
- Improved performance
- Fault tolerance
- Load distribution
- Process isolation
- Graceful restarts
- Zero-downtime deployments
Basic Cluster Setup
Simple Cluster Implementation
Basic Cluster Setup
# Basic Cluster Setup
# 1. Simple Cluster Implementation
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
const numCPUs = os.cpus().length;
console.log(`Master process ${process.pid} is running`);
console.log(`Starting ${numCPUs} workers`);
// Fork workers
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// Handle worker exit
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died`);
console.log('Starting a new worker');
cluster.fork();
});
} else {
// Worker process
const express = require('express');
const app = express();
app.get('/', (req, res) => {
res.json({
message: 'Hello from worker',
pid: process.pid,
worker: cluster.worker.id
});
});
app.listen(3000, () => {
console.log(`Worker ${process.pid} started`);
});
}
# 2. Advanced Cluster Implementation
const cluster = require('cluster');
const os = require('os');
const http = require('http');
if (cluster.isMaster) {
const numCPUs = os.cpus().length;
const workers = [];
console.log(`Master process ${process.pid} is running`);
// Create workers
for (let i = 0; i < numCPUs; i++) {
const worker = cluster.fork();
workers.push(worker);
}
// Handle worker messages
cluster.on('message', (worker, message) => {
console.log(`Message from worker ${worker.process.pid}:`, message);
});
// Handle worker exit
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died with code ${code}`);
if (code !== 0 && !worker.exitedAfterDisconnect) {
console.log('Starting a new worker');
cluster.fork();
}
});
// Graceful shutdown
process.on('SIGTERM', () => {
console.log('Master received SIGTERM, shutting down gracefully');
for (const worker of workers) {
worker.disconnect();
}
});
} else {
// Worker process
const express = require('express');
const app = express();
app.get('/', (req, res) => {
res.json({
message: 'Hello from worker',
pid: process.pid,
worker: cluster.worker.id,
uptime: process.uptime()
});
});
app.get('/health', (req, res) => {
res.json({
status: 'healthy',
pid: process.pid,
memory: process.memoryUsage()
});
});
const server = app.listen(3000, () => {
console.log(`Worker ${process.pid} started on port 3000`);
});
// Send message to master
process.send({ type: 'worker_started', pid: process.pid });
// Graceful shutdown
process.on('SIGTERM', () => {
console.log(`Worker ${process.pid} received SIGTERM`);
server.close(() => {
process.exit(0);
});
});
}
# 3. Cluster with Load Balancing
const cluster = require('cluster');
const os = require('os');
const http = require('http');
if (cluster.isMaster) {
const numCPUs = os.cpus().length;
const workers = [];
// Create workers
for (let i = 0; i < numCPUs; i++) {
const worker = cluster.fork();
workers.push(worker);
}
// Load balancing
let workerIndex = 0;
const server = http.createServer((req, res) => {
const worker = workers[workerIndex];
workerIndex = (workerIndex + 1) % workers.length;
// Send request to worker
worker.send({ type: 'request', req, res });
});
server.listen(3000, () => {
console.log(`Load balancer running on port 3000`);
});
} else {
// Worker process
const express = require('express');
const app = express();
app.get('/', (req, res) => {
res.json({
message: 'Hello from worker',
pid: process.pid,
worker: cluster.worker.id
});
});
// Handle messages from master
process.on('message', (message) => {
if (message.type === 'request') {
// Handle request
app(message.req, message.res);
}
});
}
# 4. Cluster with PM2
// ecosystem.config.js
module.exports = {
apps: [{
name: 'my-app',
script: './app.js',
instances: 'max',
exec_mode: 'cluster',
env: {
NODE_ENV: 'development'
},
env_production: {
NODE_ENV: 'production'
}
}]
};
# 5. Cluster with Docker
// Dockerfile
FROM node:18-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production
COPY . .
EXPOSE 3000
CMD ["node", "cluster.js"]
# docker-compose.yml
version: '3.8'
services:
app:
build: .
ports:
- "3000:3000"
environment:
- NODE_ENV=production
deploy:
replicas: 4
# 6. Cluster with Kubernetes
// deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: nodejs-app
spec:
replicas: 4
selector:
matchLabels:
app: nodejs-app
template:
metadata:
labels:
app: nodejs-app
spec:
containers:
- name: nodejs-app
image: nodejs-app:latest
ports:
- containerPort: 3000
env:
- name: NODE_ENV
value: "production"
# 7. Cluster with Health Checks
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
const numCPUs = os.cpus().length;
const workers = [];
// Create workers
for (let i = 0; i < numCPUs; i++) {
const worker = cluster.fork();
workers.push(worker);
}
// Health check
setInterval(() => {
workers.forEach(worker => {
worker.send({ type: 'health_check' });
});
}, 30000);
// Handle worker messages
cluster.on('message', (worker, message) => {
if (message.type === 'health_response') {
console.log(`Worker ${worker.process.pid} is healthy`);
}
});
} else {
// Worker process
const express = require('express');
const app = express();
app.get('/', (req, res) => {
res.json({
message: 'Hello from worker',
pid: process.pid
});
});
// Handle health checks
process.on('message', (message) => {
if (message.type === 'health_check') {
process.send({ type: 'health_response', pid: process.pid });
}
});
app.listen(3000, () => {
console.log(`Worker ${process.pid} started`);
});
}
# 8. Cluster with Graceful Shutdown
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
const numCPUs = os.cpus().length;
const workers = [];
// Create workers
for (let i = 0; i < numCPUs; i++) {
const worker = cluster.fork();
workers.push(worker);
}
// Graceful shutdown
process.on('SIGTERM', () => {
console.log('Master received SIGTERM');
workers.forEach(worker => {
worker.disconnect();
});
setTimeout(() => {
workers.forEach(worker => {
worker.kill();
});
process.exit(0);
}, 10000);
});
} else {
// Worker process
const express = require('express');
const app = express();
app.get('/', (req, res) => {
res.json({
message: 'Hello from worker',
pid: process.pid
});
});
const server = app.listen(3000, () => {
console.log(`Worker ${process.pid} started`);
});
// Graceful shutdown
process.on('SIGTERM', () => {
console.log(`Worker ${process.pid} received SIGTERM`);
server.close(() => {
process.exit(0);
});
});
}
# 9. Cluster with Process Monitoring
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
const numCPUs = os.cpus().length;
const workers = [];
// Create workers
for (let i = 0; i < numCPUs; i++) {
const worker = cluster.fork();
workers.push(worker);
}
// Monitor workers
setInterval(() => {
workers.forEach(worker => {
const memUsage = process.memoryUsage();
console.log(`Worker ${worker.process.pid} memory: ${Math.round(memUsage.heapUsed / 1024 / 1024)} MB`);
});
}, 30000);
} else {
// Worker process
const express = require('express');
const app = express();
app.get('/', (req, res) => {
res.json({
message: 'Hello from worker',
pid: process.pid,
memory: process.memoryUsage()
});
});
app.listen(3000, () => {
console.log(`Worker ${process.pid} started`);
});
}
# 10. Cluster with Custom Load Balancing
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
const numCPUs = os.cpus().length;
const workers = [];
// Create workers
for (let i = 0; i < numCPUs; i++) {
const worker = cluster.fork();
workers.push(worker);
}
// Custom load balancing
let requestCount = 0;
const server = require('http').createServer((req, res) => {
requestCount++;
// Round-robin load balancing
const workerIndex = requestCount % workers.length;
const worker = workers[workerIndex];
// Send request to worker
worker.send({ type: 'request', req, res });
});
server.listen(3000, () => {
console.log('Load balancer running on port 3000');
});
} else {
// Worker process
const express = require('express');
const app = express();
app.get('/', (req, res) => {
res.json({
message: 'Hello from worker',
pid: process.pid,
worker: cluster.worker.id
});
});
// Handle messages from master
process.on('message', (message) => {
if (message.type === 'request') {
// Handle request
app(message.req, message.res);
}
});
}
Load Balancing Strategies
Different Load Balancing Approaches
Load Balancing Strategies
- Round-robin
- Least connections
- Weighted round-robin
- IP hash
- Random
- CPU-based
- Memory-based
Cluster Benefits
- Multi-core utilization
- Fault tolerance
- Process isolation
- Graceful restarts
- Zero-downtime deployments
- Better performance
- Scalability
Production Considerations
Production Deployment
Production Considerations
# Production Considerations
# 1. Environment Configuration
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
const numCPUs = process.env.NODE_ENV === 'production'
? os.cpus().length
: 2;
console.log(`Starting ${numCPUs} workers`);
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
} else {
// Worker process
const express = require('express');
const app = express();
app.get('/', (req, res) => {
res.json({
message: 'Hello from worker',
pid: process.pid,
env: process.env.NODE_ENV
});
});
const port = process.env.PORT || 3000;
app.listen(port, () => {
console.log(`Worker ${process.pid} started on port ${port}`);
});
}
# 2. Process Management
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
const numCPUs = os.cpus().length;
const workers = [];
// Create workers
for (let i = 0; i < numCPUs; i++) {
const worker = cluster.fork();
workers.push(worker);
}
// Process management
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died`);
if (code !== 0 && !worker.exitedAfterDisconnect) {
console.log('Starting a new worker');
cluster.fork();
}
});
// Restart workers periodically
setInterval(() => {
workers.forEach(worker => {
if (worker.isDead()) {
console.log(`Restarting dead worker ${worker.process.pid}`);
cluster.fork();
}
});
}, 60000);
} else {
// Worker process
const express = require('express');
const app = express();
app.get('/', (req, res) => {
res.json({
message: 'Hello from worker',
pid: process.pid
});
});
app.listen(3000, () => {
console.log(`Worker ${process.pid} started`);
});
}
# 3. Monitoring and Logging
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
const numCPUs = os.cpus().length;
const workers = [];
// Create workers
for (let i = 0; i < numCPUs; i++) {
const worker = cluster.fork();
workers.push(worker);
}
// Monitor workers
setInterval(() => {
workers.forEach(worker => {
const memUsage = process.memoryUsage();
console.log(`Worker ${worker.process.pid} memory: ${Math.round(memUsage.heapUsed / 1024 / 1024)} MB`);
});
}, 30000);
// Handle worker messages
cluster.on('message', (worker, message) => {
console.log(`Message from worker ${worker.process.pid}:`, message);
});
} else {
// Worker process
const express = require('express');
const app = express();
app.get('/', (req, res) => {
res.json({
message: 'Hello from worker',
pid: process.pid
});
});
// Send periodic status updates
setInterval(() => {
process.send({
type: 'status',
pid: process.pid,
memory: process.memoryUsage(),
uptime: process.uptime()
});
}, 30000);
app.listen(3000, () => {
console.log(`Worker ${process.pid} started`);
});
}
# 4. Security Considerations
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
const numCPUs = os.cpus().length;
// Create workers
for (let i = 0; i < numCPUs; i++) {
const worker = cluster.fork();
}
// Handle worker exit
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died`);
cluster.fork();
});
} else {
// Worker process
const express = require('express');
const app = express();
// Security middleware
app.use(require('helmet')());
app.use(require('cors')());
app.get('/', (req, res) => {
res.json({
message: 'Hello from worker',
pid: process.pid
});
});
app.listen(3000, () => {
console.log(`Worker ${process.pid} started`);
});
}
# 5. Performance Optimization
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
const numCPUs = os.cpus().length;
const workers = [];
// Create workers
for (let i = 0; i < numCPUs; i++) {
const worker = cluster.fork();
workers.push(worker);
}
// Performance monitoring
setInterval(() => {
workers.forEach(worker => {
const memUsage = process.memoryUsage();
const cpuUsage = process.cpuUsage();
console.log(`Worker ${worker.process.pid}:`, {
memory: Math.round(memUsage.heapUsed / 1024 / 1024) + ' MB',
cpu: cpuUsage.user + cpuUsage.system
});
});
}, 30000);
} else {
// Worker process
const express = require('express');
const app = express();
// Performance middleware
app.use(require('compression')());
app.use(require('express-rate-limit')({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100 // limit each IP to 100 requests per windowMs
}));
app.get('/', (req, res) => {
res.json({
message: 'Hello from worker',
pid: process.pid
});
});
app.listen(3000, () => {
console.log(`Worker ${process.pid} started`);
});
}
Summary
Cluster mode implementation involves several key components:
- Basic Setup: Master-worker architecture and process management
- Load Balancing: Different strategies for distributing requests
- Production Considerations: Environment configuration, monitoring, and security
- Performance Optimization: Resource monitoring and optimization techniques
Need More Help?
Struggling with cluster mode implementation or need help scaling your Node.js application? Our Node.js experts can help you implement effective multi-process architectures.
Get Cluster Help