Node.js Worker Threads & AWS Lambda

Kevin Dico
Gecogeco
Published in
7 min readOct 4, 2023

--

What can executing JavaScript threads in parallel bring to the serverless table?

Photo by Steven Lelham on Unsplash

Most Lambda functions we write are probably asynchronous and doesn’t require much effort during their design phases to correctly pull off. Sooner or later, you may run into requirements where your Lambda function needs to execute threads in parallel — whether the tasks must be processed as a transaction, you don’t have control over the design that explicitly asks for parallelism, or updating the resources in your architecture is not allowed. Whatever your reason is for exploring such as solution, read on to get acquainted with some examples and start building something similar.

In this story, we’ll go through similar solutions that try to solve a problem which is a mock CPU-intensive requirement. The solutions provided should draw most of your attention to the use of worker threads because of the elementary requirement.

AWS References

First, let’s quote an AWS documentation below which establishes a relation between memory and CPU.

The amount of memory also determines the amount of virtual CPU available to a function.

And another about the maximum virtual CPU count from 2020.

Since Lambda allocates CPU power proportional to the amount of memory provisioned, customers now have access to up to 6 vCPUs.

Jeremy Daly also mentioned this from 2018 about the minimum memory setting to configure a Lambda function with a multi-core CPU.

If your function is allocated more than 1.8GB of memory, then it will utilize a multi core CPU. If you have CPU intensive workloads, increasing your memory to more than 1.8GBs should give you significant gains. The same is true for I/O bound workloads like parallel calculations.

Mock Requirement

The mock requirement for this story is as follows.

Loop through N1, N2, and …N3 and output the time it took to loop through all numbers. Set the Lambda function timeout to 60 seconds.

We’ve already established that we’ll loop through the large number set from the requirement alone and there’s no way around it. How we’ll loop through these will vary in the solutions we’ll explore.

For brevity, a task is what we’ll call a single and complete loop of a number in the large number set N from the requirement statement.

We’re also required to output the time taken per task. This was just added to show how we can pass values from workers threads to the main thread. Also included in the requirement is a specific function timeout which should be good enough to avoid timeouts.

We will be noting the billed duration value of a Lambda function configured with a 128 MB and 2048 MB memory for each solution as the single metric to use for comparison. The 2048 MB memory configuration should appease the suggested memory configuration for a multi-core Lambda function per above quote.

Solution 1 — The baseline.

Let’s establish a baseline by using a simple solution where we complete all tasks sequentially using a single thread and measuring the memory usage and execution time in Lambda. This baseline should make the comparison easier between solutions.

This is how the Node.js 18 Lambda handler is written.

// index.js

const assert = require('node:assert/strict');

const nums = [
50_000_000, 100_000_000, 150_000_000, 200_000_000,
250_000_000, 300_000_000,
];

const loop = (num) => {
assert.strictEqual(typeof num, 'number');

const start = Date.now();

for (let i = 0; i < num; i += 1) {
// Do nothing.
}

return Date.now() - start;
};

const handler = () => {
let ms_elapsed_total = 0;

nums.forEach((num) => {
const ms_elapsed = loop(num);

assert.strictEqual(typeof ms_elapsed, 'number');

ms_elapsed_total += ms_elapsed;
});

console.info(
'Total Elapsed (MS):',
JSON.stringify(ms_elapsed_total),
);
};

module.exports = {
handler,
};

Solution 2 — A worker thread per task where the worker thread count matches the task count.

We’re now only using worker threads to complete the tasks. To summarize, we have a parent thread that creates a new worker thread for each task. The worker thread then reports the compute time back to the main thread. The main thread finally sums and logs the total elapsed time.

This is how the Node.js 18 Lambda handler is written.

// index.js

const assert = require('node:assert/strict');
const path = require('node:path');
const { Worker } = require('node:worker_threads');

const nums = [
50_000_000, 100_000_000, 150_000_000, 200_000_000,
250_000_000, 300_000_000,
];

const handler = () => {
let done = 0;

let ms_elapsed_total = 0;

nums.forEach((num) => {
const worker = new Worker(
path.join(__dirname, 'worker.js'),
{
workerData: num,
},
);

worker.on('message', (ms_elapsed) => {
done += 1;

assert.strictEqual(typeof ms_elapsed, 'number');
ms_elapsed_total += ms_elapsed;

if (done === nums.length) {
console.info(
'Total Elapsed (MS):',
JSON.stringify(ms_elapsed_total),
);
}
});
});
};

module.exports = {
handler,
};

We now also have a separate file for the worker threads.

// worker.js

const assert = require('node:assert/strict');
const {
isMainThread,
parentPort,
workerData,
} = require('node:worker_threads');

const loop = (num) => {
assert.strictEqual(typeof num, 'number');

const start = Date.now();

for (let i = 0; i < num; i += 1) {
// Do nothing.
}

return Date.now() - start;
};

if (isMainThread) {
// Main thread.

// Export for testing.
module.exports = {
loop,
};
} else {
// Worker thread.

const ms_elapsed = loop(workerData);

parentPort.postMessage(ms_elapsed);
}

Solution 3 — A worker thread per task where the worker thread count matches the virtual CPU count.

This solution builds on solution 2 by using a pool of worker threads where the pool size is based on the virtual CPU count available to the Lambda function. By using a pool of worker threads, we minimize the overhead of creating worker threads when we create them once and re-use them for the next tasks available.

This is how the Node.js 18 Lambda handler is written.

// index.js

const assert = require('node:assert/strict');
const os = require('node:os');
const path = require('node:path');

const { WorkerPool } = require('./pool');

const nums = [
50_000_000, 100_000_000, 150_000_000, 200_000_000,
250_000_000, 300_000_000,
];

const handler = () => {
const pool = new WorkerPool(
os.cpus().length,
path.join(__dirname, 'worker.js'),
);

let done = 0;

let ms_elapsed_total = 0;

nums.forEach((num) => {
pool.run_task(num, (error, ms_elapsed) => {
done += 1;

if (error) {
throw error;
} else {
assert.strictEqual(typeof ms_elapsed, 'number');

ms_elapsed_total += ms_elapsed;
}

if (done === nums.length) {
pool.close();

console.info(
'Total Elapsed (MS):',
JSON.stringify(ms_elapsed_total),
);
}
});
});
};

module.exports = {
handler,
};

Our worker pool file looks like this which is based on a Node.js 18 documentation.

// pool.js

const { AsyncResource } = require('node:async_hooks');
const { EventEmitter } = require('node:events');
const { Worker } = require('node:worker_threads');

const k_task_info = Symbol('k_task_info');

const k_worker_freed_event = Symbol('k_worker_freed_event');

class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super(WorkerPoolTaskInfo.name);
this.callback = callback;
}

done(error, result) {
this.runInAsyncScope(
this.callback,
null,
error,
result,
);
this.emitDestroy();
}
}

class WorkerPool extends EventEmitter {
constructor(num_threads, file_name) {
super();

this.num_threads = num_threads;
this.file_name = file_name;

this.workers = [];
this.free_workers = [];
this.tasks = [];

for (let i = 0; i < num_threads; i += 1) {
this.add_new_worker();
}

this.on(k_worker_freed_event, () => {
if (this.tasks.length) {
const { task, callback } = this.tasks.shift();

this.run_task(task, callback);
}
});
}

add_new_worker() {
const worker = new Worker(this.file_name);

worker.on('message', (result) => {
worker[k_task_info].done(null, result);

delete worker[k_task_info];

this.free_workers.push(worker);
this.emit(k_worker_freed_event);
});

worker.on('error', (error) => {
if (worker[k_task_info]) {
worker[k_task_info].done(error, null);
} else {
this.emit('error', error);
}

this.workers.splice(this.workers.indexOf(worker), 1);
this.add_new_worker();
});

this.workers.push(worker);
this.free_workers.push(worker);

this.emit(k_worker_freed_event);
}

run_task(worker_data, callback) {
if (this.free_workers.length) {
const worker = this.free_workers.pop();

worker[k_task_info] = new WorkerPoolTaskInfo(
callback,
);
worker.postMessage(worker_data);
} else {
this.tasks.push({ task: worker_data, callback });
}
}

close() {
this.workers.forEach((worker) => worker.terminate());
}
}

module.exports = {
WorkerPool,
};

The worker thread file remains roughly similar as solution 2.

// worker.js

const assert = require('node:assert/strict');
const {
parentPort,
isMainThread,
} = require('node:worker_threads');

const loop = (num) => {
assert.strictEqual(typeof num, 'number');

const start = Date.now();

for (let i = 0; i < num; i += 1) {
// Do nothing.
}

return Date.now() - start;
};

if (isMainThread) {
// Main thread.

// Export for testing.
module.exports = {
loop,
};
} else {
// Worker thread.

parentPort.on('message', (num) => {
const ms_elapsed = loop(num);

parentPort.postMessage(ms_elapsed);
});
}

Results

Fig. A and B shows significant differences in the billed duration between solutions. Surprisingly, solution 3 didn’t perform that much better than solution 1 despite using worker threads and pooling.

Looking at the implementation of solution 3 would show that it set the pool size based on the first system utility function in Fig. C such that regardless of the memory configuration, the worker thread pool size was always 2. In solution 2 however, the maximum number of worker threads running in parallel was 6 — one for each task.

It’s also worth noting that setting the pool size to a constant value N where N is 2 < N < 6 may produce proportionally better or worse billed duration values based on Fig. A and B.

At this point, you probably have a good idea which solution to pick or build your own solution from. Personally, it’s either solution 1 or 3. Solution 2 was really just a stand-in to prepare for solution 3 because of the unnecessary performance overhead when creating worker threads beyond the available parallelism value suggested by system utilities. Solution 1 works fine and is likely easier to maintain when you have predictable input data to process or the billed duration is tolerable.

I hope that the above examples and explanations helped provide useful information about using worker threads in general and in Lambda.

--

--