NodeJS runs in a single thread
NodeJs runs in a single thread.
Internally NodeJs has hidden threads used for IO, but you cannot assign those threads to CPU work.
To fix this problem NodeJS introduced:
Worker Threads
Since JavaScript doesn’t support concurrency, NodeJS workers make use of the V8 runtime which allows the workers to run in complete isolation from other existing workers.
You can see examples below.
Alternatively clone the code from github:
git clone https://github.com/javaspeak/javascript-examples.git
Once cloned you can find the worker thread examples under a folder called:
"nodejs_only/worker_threads"
This is a simple example where the parent thread passes data to the Worker Thread via the workerData property:
const worker = new Worker('./worker_example.js', { workerData );
To run this example change directory to:
javascript-examples/nodejs_only/worker_threads/workerData
and type:
node main.js
// This example spawns off a worker thread and waits for it to complete. The main thread passes
// "hello John Doe" to the worker thread and the worker thread returns in back to the main thread
// on completion as json.
//
// To run:
//
// node main.js
//
// Output:
//
// workerData: hello John Doe
// Inside worker_example.js
// { welcome: 'hello John Doe' }
//
const { Worker } = require('worker_threads')
const runService = (workerData) => {
console.log('workerData: ' + workerData)
return new Promise((resolve, reject) => {
// import workerExample.js script..
const worker = new Worker('./worker_example.js', { workerData });
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0)
reject(new Error(`stopped with ${code} exit code`));
})
})
}
const run = async () => {
const result = await runService('hello John Doe')
console.log(result);
}
run().catch(err => console.error(err))
// This worker thread is spawned by main.js
//
// See main.js for instructions to run
//
const { workerData, parentPort } = require('worker_threads')
console.log('Inside worker_example.js')
parentPort.postMessage({ welcome: workerData })
In this example the parent thread communicates with the worker thread using messaging.
On the parent side:
It registers to receive messages:
worker.on("message", result => {
console.log(`${result.capitalizedText}`);
});
It sends a message:
worker.postMessage({text: 'hello'});
On the worker thread side it registers to receive messages and sends a response:
parentPort.on("message", data => {
parentPort.postMessage({capitalizedText: capitalize(data.text)});
});
To run this example change to:
javascript-examples/nodejs_only/worker_threads/messaging
and type:
node main.js
// This example shows using messaging between the parent and worker thread.
// The worker thread capitalizes the text passed to it and returns it.
//
// To run:
//
// node main.js
//
// Output:
//
// HELLO
// GOODBYE
//
const {Worker} = require("worker_threads");
//Create new worker
const worker = new Worker("./worker.js");
// Listen for a message from worker
worker.on("message", result => {
console.log(`${result.capitalizedText}`);
});
worker.on("error", error => {
console.log(error);
});
worker.postMessage({text: 'hello'});
worker.postMessage({text: 'goodbye'});
// This is the worker demonstrating messaging between the parent thread and a worker thread.
//
// To run look at instructions in the main.js
//
const {parentPort} = require("worker_threads");
parentPort.on("message", data => {
parentPort.postMessage({capitalizedText: capitalize(data.text)});
});
function capitalize(text) {
return text.toUpperCase()
}
In this example we demonstrate that sharing a CPU intensive task with 4 CPUs is faster than doing it on a single CPU
There are 2 examples in here:
(i) Parent thread with one worker thread
main_for_single_thread.js
(ii) Parent thread with 4 worker threads
main_for_multi_thread.js
As to be expected the job with 4 threads runs 4 times faster as they execute the job in parallel and make use of multiple CPUs.
What is interesting here is how we deal with several threads:
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
const thread_results = await Promise.all(workerPromises);
const total =
thread_results[0] +
thread_results[1] +
thread_results[2] +
thread_results[3];
res.status(200).send(`result is ${total}`);
});
Notice how the creation of each worker thread is done in a promise and the promises are added to an array of promises.
Then the Promise.all(..) method triggers the work
Look at the main javascript files for instructions of how to run them.
// This example runs 4 thread workers
//
// To run:
//
// node node main_for_multi_thread.js
// time curl --get http://localhost:3000/blocking
//
// Output
//
// result is 20000000000 curl --get http://localhost:3000/blocking 0.00s user 0.00s system 0% cpu 5.244 total
//
// This took 5 seconds to run using 4 CPU
//
const express = require("express");
const { Worker } = require("worker_threads");
const app = express();
const port = process.env.PORT || 3000;
const THREAD_COUNT = 4;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
function createWorker() {
return new Promise(function (resolve, reject) {
const worker = new Worker("./four_workers.js", {
workerData: { thread_count: THREAD_COUNT },
});
worker.on("message", (data) => {
resolve(data);
});
worker.on("error", (msg) => {
reject(`An error ocurred: ${msg}`);
});
});
}
app.get("/blocking", async (req, res) => {
const workerPromises = [];
for (let i = 0; i < THREAD_COUNT; i++) {
workerPromises.push(createWorker());
}
const thread_results = await Promise.all(workerPromises);
const total =
thread_results[0] +
thread_results[1] +
thread_results[2] +
thread_results[3];
res.status(200).send(`result is ${total}`);
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
// This four_workers.js is used with main_for_multi_threads.js
//
// To see how to run the example look at main_for_multi_threads.js
//
const { workerData, parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000 / workerData.thread_count; i++) {
counter++;
}
parentPort.postMessage(counter);
// This example runs a single thread worker
//
// To run:
//
// node main_for_single_thread.js
// time curl --get http://localhost:3000/blocking
//
// Output
//
// result is 20000000000 curl --get http://localhost:3000/blocking 0.00s user 0.00s system 0% cpu 20.384 total
//
// This took 20 seconds to run
//
const express = require("express");
const { Worker } = require("worker_threads");
const app = express();
const port = process.env.PORT || 3000;
app.get("/non-blocking/", (req, res) => {
res.status(200).send("This page is non-blocking");
});
app.get("/blocking", async (req, res) => {
const worker = new Worker("./worker.js");
worker.on("message", (data) => {
res.status(200).send(`result is ${data}`);
});
worker.on("error", (msg) => {
res.status(404).send(`An error occurred: ${msg}`);
});
});
app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
// This worker.js is used with main_for_single_thread.js
//
// To see how to run the example look at main_for_single_thread.js
//
const { parentPort } = require("worker_threads");
let counter = 0;
for (let i = 0; i < 20_000_000_000; i++) {
counter++;
}
parentPort.postMessage(counter);
====================================================================================================
README.txt
====================================================================================================
This example shows how worker threads can be used to share CPU tasks
There are 2 examples in here:
(i) Parent thread with one worker thread - single_thread.js
(ii) Parent thread with 4 worker threads - multi_thread.js
====================================================================================================
Steps carried out to create this project
====================================================================================================
(i) cd javascript-examples/nodejs_only
(ii) mkdir share_job_among_worker_threads
(iii) cd share_job_among_worker_threads
(iv) npm init -y
(v) npm install express
====================================================================================================
Steps to run the code
====================================================================================================
(i) node main_for_single_thread.js
(ii) time curl --get http://localhost:3000/blocking
(iii) Output:
result is 20000000000 curl --get http://localhost:3000/blocking 0.00s user 0.00s system 0% cpu 20.384 total
(iii) node main_for_multi_thread.js
(iv) time curl --get http://localhost:3000/blocking
(v) Output:
result is 20000000000 curl --get http://localhost:3000/blocking 0.00s user 0.00s system 0% cpu 5.244 total
So above you can see that we did the job 4 times faster with 4 threads
{
"name": "share_job_among_worker_threads",
"version": "1.0.0",
"description": "",
"main": "single_thread.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"express": "^4.18.2"
}
}
In this example we demonstrate how to share memory between main and worker threads.
First of we create a shared buffer:
// get size of the array buffer with int32 size buffer for each element in the array
const size = Int32Array.BYTES_PER_ELEMENT*nums.length;
// create the buffer for the shared array
const sharedBuffer = new SharedArrayBuffer(size);
const sharedArray = new Int32Array(sharedBuffer);
Then we use Atomics to store data in the sharedArray:
nums.forEach((num, index) => {
Atomics.store(sharedArray, index, num);
})
We then post the shared array from the main thread to the worker one:
worker.postMessage({nums: sharedArray});
The worker thread receives the message and can read the values of the shared array
Look at the main.js for instructions on how to run it
//
// Here we are using Atomics and a SharedArrayBuffer to ensure atomicity of read and write on a
// shared array.
//
// In this example we are uings Atomics.store to store ints in a shared array. In the worker
// we are just iterating through the elements and retrieving the values without requiring a special
// Atomics read method.
//
// To run:
//
// node main.js
//
// Output is:
//
// 21th Fibonacci Number: 10946
// 33th Fibonacci Number: 3524578
// 15th Fibonacci Number: 610
// 40th Fibonacci Number: 102334155
const {Worker} = require("worker_threads");
let nums = [21, 33, 15, 40];
//get size of the array buffer with int32 size buffer for each element in the array
const size = Int32Array.BYTES_PER_ELEMENT*nums.length;
//create the buffer for the shared array
const sharedBuffer = new SharedArrayBuffer(size);
const sharedArray = new Int32Array(sharedBuffer);
nums.forEach((num, index) => {
Atomics.store(sharedArray, index, num);
})
//Create new worker
const worker = new Worker("./worker.js");
//Listen for a message from worker
worker.on("message", result => {
console.log(`${result.num}th Fibonacci Number: ${result.fib}`);
});
worker.on("error", error => {
console.log(error);
});
worker.postMessage({nums: sharedArray});
// Here we are iterating through ints of a shared array that was created with a SharedArrayBuffer
// For running instructions, see main.js
const {parentPort} = require("worker_threads");
function getFib(num) {
if (num === 0) {
return 0;
}
else if (num === 1) {
return 1;
}
else {
return getFib(num - 1) + getFib(num - 2);
}
}
parentPort.on("message", data => {
data.nums.forEach(num => {
parentPort.postMessage({num: num, fib: getFib(num)});
});
})
Back: JavaScript
Page Author: JD