Coder Social home page Coder Social logo

node-worker-threads-pool's Introduction

node-worker-threads-pool

Actions Status Coverage Status

Simple worker threads pool using Node's worker_threads module. Compatible with ES6+ Promise, Async/Await and TypeScript🚀.

With this library, you can:

  • Use StaticPool to create a threads pool with a task from worker file or from task function provided to make use of multi-core processor.
  • Use DynamicPool to create a threads pool with different tasks provided each call. Thus you can get more flexibility than StaticPool and make use of multi-core processor.
  • Gain extra controllability for the underlying threads by the power of worker_threads, like resourceLimits, SHARE_ENV, transferList and more.

Notification

  1. This module can only run in Node.js.

Installation

npm install node-worker-threads-pool --save

Simple Example

Quickly create a pool with static task:

const { StaticPool } = require('node-worker-threads-pool');

const staticPool = new StaticPool({
  size: 4,
  task: (n) => n + 1
});

staticPool.exec(1).then((result) => {
  console.log('result from thread pool:', result); // result will be 2.
});

There you go! 🎉

Create a pool with dynamic task:

const { DynamicPool } = require('node-worker-threads-pool');

const dynamicPool = new DynamicPool(4);

dynamicPool
  .exec({
    task: (n) => n + 1,
    param: 1
  })
  .then((result) => {
    console.log(result); // result will be 2.
  });

dynamicPool
  .exec({
    task: (n) => n + 2,
    param: 1
  })
  .then((result) => {
    console.log(result); // result will be 3.
  });

About the differences between StaticPool and DynamicPool, please see this issue.

API

Class: StaticPool

Instance of StaticPool is a threads pool with static task provided.

new StaticPool(opt)

  • opt <Object>
    • size <number> Number of workers in this pool.
    • task <string | function> Static task to do. It can be a absolute path of worker file (usage here) or a function. ⚠️Notice: If task is a function, you can NOT access variables defined outside the task function! If you do want to use external data, use workerData to pass some cloneable data.
    • workerData <any> Cloneable data you want to access in task function. (usage here)
    • shareEnv <boolean> Set true to enable SHARE_ENV for all threads in pool.
    • resourceLimits <Object> Set resourceLimits for all threads in pool.

Example with worker file

In the worker.js :

// Access the workerData by requiring it.
const { parentPort, workerData } = require('worker_threads');

// Something you shouldn"t run in main thread
// since it will block.
function fib(n) {
  if (n < 2) {
    return n;
  }
  return fib(n - 1) + fib(n - 2);
}

// Main thread will pass the data you need
// through this event listener.
parentPort.on('message', (param) => {
  if (typeof param !== 'number') {
    throw new Error('param must be a number.');
  }
  const result = fib(param);

  // Access the workerData.
  console.log('workerData is', workerData);

  // return the result to main thread.
  parentPort.postMessage(result);
});

In the main.js :

const { StaticPool } = require('node-worker-threads-pool');

const filePath = 'absolute/path/to/your/worker/script';

const pool = new StaticPool({
  size: 4,
  task: filePath,
  workerData: 'workerData!'
});

for (let i = 0; i < 20; i++) {
  (async () => {
    const num = 40 + Math.trunc(10 * Math.random());

    // This will choose one idle worker in the pool
    // to execute your heavy task without blocking
    // the main thread!
    const res = await pool.exec(num);

    console.log(`Fibonacci(${num}) result:`, res);
  })();
}

Access workerData in task function

You can access workerData in task function using this keyword:

const pool = new StaticPool({
  size: 4,
  workerData: 'workerData!',
  task() {
    console.log(this.workerData);
  }
});

⚠️Remember not to use arrow function as a task function when you use this.workerData, because arrow function don't have this binding.

staticPool.exec(param)

  • param <any> The param your worker script or task function need.
  • Returns: <Promise>

The simplest way to execute a task without considering other configurations. This will choose an idle worker in the pool to execute your heavy task with the param you provided. The Promise is resolved with the result.

staticPool.createExecutor()

  • Returns: <StaticTaskExecutor>

Create a task executor of this pool. This is used to apply some advanced settings to a task. See more details of StaticTaskExecutor.

staticPool.destroy()

Call worker.terminate() for every worker in the pool and release them.

Class: StaticTaskExecutor

Executor for StaticPool. Used to apply some advanced settings to a task.

Example

const staticPool = new StaticPool({
  size: 4,
  task: (buf) => {
    // do something with buf.
  }
});

const buf = Buffer.alloc(1024 * 1024);

staticPool
  .createExecutor() // create a StaticTaskExecutor instance.
  .setTimeout(1000) // set timeout for task.
  .setTransferList([buf.buffer]) // set transferList.
  .exec(buf) // execute!
  .then(() => console.log('done!'));

staticTaskExecutor.setTimeout(t)

  • t <number> timeout in millisecond.
  • Returns: <StaticTaskExecutor>

Set timeout for this task.

staticTaskExecutor.setTransferList(transferList)

  • transferList <Object[]>
  • Returns: <StaticTaskExecutor>

Set transferList for this task. This is useful when you want to pass some huge data into worker thread.

staticTaskExecutor.exec(param)

  • param <any>
  • Returns: <Promise>

Execute this task with the parameter and settings provided. The Promise is resolved with the result your task returned.

Class: DynamicPool

Instance of DynamicPool is a threads pool executes different task functions provided every call.

new DynamicPool(size[, opt])

  • size <number> Number of workers in this pool.
  • opt
    • shareEnv <boolean> Set true to enable SHARE_ENV for every threads in pool.
    • resourceLimits <Object> Set resourceLimits for all threads in pool.

dynamicPool.exec(opt)

  • opt
    • task <function> Function as a task to do. ⚠️Notice: You can NOT access variables defined outside the task function!
    • timeout <number> Timeout in milisecond for limiting the execution time. When timeout, the function will throw a TimeoutError, use isTimeoutError function to detect it.
  • Returns: <Promise>

Choose one idle worker in the pool to execute your task function. The Promise is resolved with the result your task returned.

dynamicPool.createExecutor(task)

  • task Function task function.
  • Returns: <DynamicTaskExecutor>

Create a task executor of this pool. This is used to apply some advanced settings to a task. See more details of DynamicTaskExecutor.

dynamicPool.destroy()

Call worker.terminate() for every worker in the pool and release them.

Class: DynamicTaskExecutor

Executor for DynamicPool. Used to apply some advanced settings to a task.

Example

const dynamicPool = new DynamicPool(4);

const buf = Buffer.alloc(1024 * 1024);

dynamicPool
  .createExecutor((buf) => {
    // do something with buf.
  })
  .setTimeout(1000) // set timeout for task.
  .setTransferList([buf.buffer]) // set transferList.
  .exec(buf) // execute!
  .then(() => console.log('done!'));

dynamicTaskExecutor.setTimeout(t)

  • t <number> timeout in millisecond.
  • Returns: <DynamicTaskExecutor>

Set timeout for this task.

dynamicTaskExecutor.setTransferList(transferList)

  • transferList <Object[]>
  • Returns: <DynamicTaskExecutor>

Set transferList for this task. This is useful when you want to pass some huge data into worker thread.

dynamicTaskExecutor.exec(param)

  • param <any>
  • Returns: <Promise>

Execute this task with the parameter and settings provided. The Promise is resolved with the result your task returned.

function: isTimeoutError

Detect if a thrown error is TimeoutError.

isTimeoutError(err)

  • err <Error> The error you want to detect.
  • Returns <boolean> true if the error is a TimeoutError.

Example

const { isTimeoutError } = require("node-worker-threads-pool");

// create pool.
...

// static pool exec with timeout.
const timeout = 1000;
try {
  const res = await staticPool.exec(param, timeout);
} catch (err) {
  if (isTimeoutError(err)) {
    // deal with timeout.
  } else {
    // deal with other errors.
  }
}

// dynamic pool exec with timeout.
const timeout = 1000;
try {
  const res = await dynamicPool.exec({
    task() {
      // your task.
    },
    timeout
  });
} catch (err) {
  if (isTimeoutError(err)) {
    // deal with timeout.
  } else {
    // deal with other errors.
  }
}

Integration with Webpack

If you are using webpack in your project and want to import third-party libraries in task function, please use this.require:

const staticPool = new StaticPool({
  size: 4,
  task() {
    const lib = this.require('lib');
    // ...
  }
});
const dynamicPool = new DynamicPool(4);

dynamicPool
  .exec({
    task() {
      const lib = this.require('lib');
      // ...
    }
  })
  .then((result) => {
    // ...
  });

node-worker-threads-pool's People

Contributors

agubler avatar dependabot[bot] avatar suchmokuo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

node-worker-threads-pool's Issues

Worker thread initation

Now the threads are in the pool as soon as they are created.

In my project the worker threads spend long time on initation and I don't want them to be in the pool before they are ready to serve.

Can't find class at runtime

I may be demonstrating my deep, deep ignorance of threading in node, but this is my first attempt at using worker threads, and the documentation says I ought to use a worker pool.

So here we are. I am using the node-worker-threads-pool package. My app will be training several dozen ML models simultaneously using TensorFlow.

The following TypeScript code falls over at runtime, although it transpiles fine:

trainModels = async () => {
   const modelIds: string[] = getModelId();

   for await (const modelId of modelIds) {
      this.dynamicPool.exec({
         task: id => {
            const predictor = new Predictor(id, []);
            // ...do such-and-such
         },
         param: modelId,
      });
   }
};

The runtime complains:

ReferenceError [Error]: Predictor is not defined

So it can't find it. Accessing the class elsewhere in the code is fine; but not within a thread.

I"m guessing this is threading 101 in node. How do I get around this? Perhaps I can construct the class and then create the thread within that class instead? What is the pattern?

Support for ".ts" worker files

What is the recommended way to use workers that are written in Typescript? The library seems to only support .js files

Pass Function Reference to DynamicPool task Argument

Hello again, and thanks for writing this library!

Question Context:

  • TypeScript compiler (tsc) v4.2.4
  • TS module = commonjs
  • TS target = es6
  • No webpack

I am attempting to pass a function reference to the task argument of DynamicPool.exec. I've written a singleton class wrapper around my thread pool like this:

import * as Pool from 'node-worker-threads-pool';
import os from 'os';

export class ThreadPool {
	public static getInstance(): ThreadPool {
		if (!ThreadPool.instance) {
			ThreadPool.instance = new ThreadPool();
		}
		return ThreadPool.instance;
	}

	private static readonly cpuCount: number = os.cpus().length;
	private static instance: ThreadPool;
	private pool: Pool.DynamicPool;
	
	private constructor() {
		this.pool = new Pool.DynamicPool(ThreadPool.cpuCount**2);
	}

	public runTaskInThread = async <InputType, OutputType>(
		task: any,
		param: InputType,
	): Promise<OutputType> => {
		try {
			return await this.pool.exec({ task, param, });
		} catch (error) {
			console.error('runTaskInThread worker failed. Error:', error, error.stack);
			throw error;
		}
	}
}

and elsewhere in my program (I'll call the location reducers/File-A.ts here for simplicity), I am requesting a thread in the dynamic pool to execute a function like this:

const pool = ThreadPool.getInstance();
const output = await pool.runTaskInThread<Inputs[], Outputs[]>(
    someCPUExpensiveFunctionImportedFromADifferentFile,
    inputs
);

The function someCPUExpensiveFunctionImportedFromADifferentFile resides in util/File-B and depends on no 3rd party npm libraries. It only imports a helper utility function (that also imports no 3rd party npm dependencies) and some local TypeScript interface definitions. However, after successfully compiling the program I encounter this runtime error:

An unexpected error occurred: ReferenceError: util_1 is not defined Stacktrace: ReferenceError: util_1 is not defined
athena         |     at Object.task (evalmachine.<anonymous>:2:36)
athena         |     at MessagePort.<anonymous> ([worker eval]:23:36)
athena         |     at MessagePort.[nodejs.internal.kHybridDispatch] (internal/event_target.js:354:41)
athena         |     at MessagePort.exports.emitMessage (internal/per_context/messageport.js:18:26)

The function someCPUExpensiveFunctionImportedFromADifferentFile resides in the location util/File-B.ts, which explains the compiled util_1 reference in the stacktrace. I am importing someCPUExpensiveFunctionImportedFromADifferentFile into File-A.ts using import de-structuring syntax since the export is a named export.

Can functions be passed to the DynamicPool.exec() function in this way? Or do they have to exist in the same file that the DynamicPool object resides in? I did read the note in the docs that clearly state If task is a function, you can not use closure in it! but I don't believe I am using a closure in the someCPUExpensiveFunctionImportedFromADifferentFile function.

I greatly appreciate any guidance on this issue.

node hangs after completing the task (handles keep the process running)

When running from top-most level, node hangs after completing the task because handles keep the process running.
I have installed the "why-is-node-running" package to understand why, here is the output:

C:\>node main.js
result from thread pool: 2
There are 7 handle(s) keeping the process running

# TTYWRAP
(unknown stack trace)

# WORKER
C:\DATA\web\neatalerts\crons\node\node_modules\node-worker-threads-pool\dist\poolWorker.js:17 - super(...args);
C:\DATA\web\neatalerts\crons\node\node_modules\node-worker-threads-pool\dist\staticPool.js:76 - this.fill(() => new poolWorker_1.PoolWorker(script, workerOpt));
C:\DATA\web\neatalerts\crons\node\node_modules\node-worker-threads-pool\dist\pool.js:50       - const worker = getWorker();
C:\DATA\web\neatalerts\crons\node\node_modules\node-worker-threads-pool\dist\pool.js:83       - this.workers.push(this.createWorker());
C:\DATA\web\neatalerts\crons\node\node_modules\node-worker-threads-pool\dist\staticPool.js:76 - this.fill(() => new poolWorker_1.PoolWorker(script, workerOpt));
C:\DATA\web\neatalerts\crons\node\fr\main.js:2336                                             - const staticPool = new StaticPool({
C:\DATA\web\neatalerts\crons\node\fr\main.js:5575                                             - })();

And here is the code:

var { StaticPool } = require('node-worker-threads-pool');

const staticPool = new StaticPool({
  size: 4,
  task: (n) => n + 1
});

return staticPool.exec(1).then((result) => {
  console.log('result from thread pool:', result); // result will be 2.

  setTimeout(function () { log_why(); }, 200); // should be placed at the end of execution (logs out active handles that are keeping node running)
  process.on('uncaughtException', function (exception) { console.log(exception); }); // show exception details in the console
});

How do make a static pool and listen for multiple messages from a worker?

I can create a static pool and then to invoke one of them I use

const result = await pool.exec(4);

but this will only get the value once the task fully finished. I want the worker to be able to send multiple messages before it finishes. And the parent should be able to get the messages. Something like this

pool.on('message', function(data) {
// some worker sent some data here
});

is this possible to do?

Is there any way to know when all tasks finished?

I start thousands task a time and only 8 works, I need to know when all the tasks finished, is there any way to know that, like: poll.on('finished', () => {}); or await poll.finish() even better I think.
Thx.

Dynamic Imports do not work

Any attempt to use dynamic imports inside a task fails with

TypeError [ERR_VM_DYNAMIC_IMPORT_CALLBACK_MISSING]: A dynamic import callback was not specified. at new NodeError (node:internal/errors:371:5) at importModuleDynamicallyCallback (node:internal/process/esm_loader:39:9) at Object.task (evalmachine.:3:25) at MessagePort. ([worker eval]:25:36) at MessagePort.[nodejs.internal.kHybridDispatch] (node:internal/event_target:562:20) at MessagePort.exports.emitMessage (node:internal/per_context/messageport:23:28)

Node version: 14.19

Thread pool vs synchronous performance example (for your information)

Hope this is of some use to you! The threads broke even at 10000000 and beyond that did much better than synchronous

// random-number-sum-performance-test.js
const { performance } = require('perf_hooks')

const { StaticPool } = require('node-worker-threads-pool')

let n = 1
async function main() {
    n = 1
    for (let exponent = 0; exponent < 9; exponent++) {
        n *= 10
        console.log(`--- n=${n}: ---`)
        await time(withPool)
        await time(nothingFancy)
    }
}
main()

async function withPool() {
    const staticPool = new StaticPool({
        size: 4,
        task: n => {
            let total = 0
            for (let i = 0; i < n; i++) total += Math.random()
            return total
        },
    })

    const total = (await Promise.all([1, 2, 3, 4].map(() => staticPool.exec(n)))).reduce((a, b) => a + b)
    return total
}

function nothingFancy() {
    let total = 0
    const m = n * 4
    for (let i = 0; i < m; i++) total += Math.random()
    return total
}

async function time(f) {
    const t1 = performance.now()
    const result = await f()
    const t2 = performance.now()
    console.log(`${f.name} took ${t2 - t1} to get ${result}`)
}

Results:

--- n=10: ---
withPool took 53.835896998643875 to get 19.930073961113138
nothingFancy took 0.06096699833869934 to get 16.237419498424664
--- n=100: ---
withPool took 53.99462300539017 to get 191.49533349250487
nothingFancy took 0.027156993746757507 to get 204.45374014731104
--- n=1000: ---
withPool took 50.01943700015545 to get 2004.4566054103996
nothingFancy took 0.9318670034408569 to get 2015.9208923115145
--- n=10000: ---
withPool took 48.72436600923538 to get 20007.1920261762
nothingFancy took 2.4409569948911667 to get 20012.901073635243
--- n=100000: ---
withPool took 54.470831006765366 to get 200086.70983438176
nothingFancy took 5.684918999671936 to get 199998.26497395104
--- n=1000000: ---
withPool took 66.65430599451065 to get 2001061.973088672
nothingFancy took 43.69860699772835 to get 1999745.198802261
--- n=10000000: ---
withPool took 172.60957700014114 to get 19998626.755245283
nothingFancy took 377.86564199626446 to get 20002634.54283542
--- n=100000000: ---
withPool took 1268.5459830015898 to get 199999921.15196624
nothingFancy took 4249.29148799181 to get 200004158.20174262
--- n=1000000000: ---
withPool took 12684.676231995225 to get 1999962044.2081757
nothingFancy took 49840.80099800229 to get 1999954981.8320045

How to use with Axios

const staticPool = new StaticPool({
  size: 4,
  task: () => {
    const axios = require('axios')
    return axios.post('URL', { hello: 'world' })
  },
})

const result = await staticPool.exec()
DataCloneError: function transformRequest(data, headers) {
    normalizeHeaderName(headers, 'Accept');
    normalizeHeaderName(...<omitted>... } could not be cloned.
    at MessagePort.<anonymous> ([worker eval]:19:18)
    at processTicksAndRejections (internal/process/task_queues.js:97:5)

Send TransferList ArrayBuffer Back to Main Thread From Worker Thread

Hello, thanks for writing this awesome library!

Is it possible to send data from a worker thread back to the parent thread by reference (via the parentPort.postMessage transfer list argument API) rather than by clone? I have worker threads in my program that create large arrays of objects (composed of primitive JavaScript types).

The objects are C-like structures, so converting them to binary ArrayBuffer objects would be doable. I believe it would be more performant to pass the large array back to the main thread by handing it the pointer to the array from the worker thread, rather than using the structured clone algorithm to send a copy back.

From my understanding it seems the existing setTransferList API can only be used for passing large binary data from the main thread to the worker thread?

timeout isn't safe. Potential DDOS issue

I've read a bit of the code.
And I believe I've found a vulnerability:
For example consider this:

const { StaticPool } = require("node-worker-threads-pool");

const staticPool = new StaticPool({
  size: 1,
  task: (n) => {while(n){console.log("a");}
return n;
},
timeout:10
});
staticPool.exec(0).then((result) => {
  console.log("result from thread pool:", result); // result will be 0.
});
staticPool.exec(1).then((result) => {
  console.log("result from thread pool:", result); //will never return
});
staticPool.exec(0).then((result) => {
  console.log("result from thread pool:", result); // will never be executed
});

The worker thread is now will be hijacked till node restart in the malicious task which never ends.
Moreover the library never says to the user that something went wrong.
The process will keep running, with the exhausted poll (the attacker only need to make sure that he sends as much tasks as available workers).
In dynamic pool, attacker might cause a server CPU runout.
For summary:
The user which tries to protect its process from an expensive task taking down its service, might be tricked to use timeout feature, which supposedly guarantees that this never happens.
The attacker only need to find a case where task is expensive to compute (for example an edge case of regex).
The threads are hijacked to the expensive case, leading to a potential DDOS.
The user is never informed that something went wrong, while the pool never executes following tasks.

Typescript

Is it possible to have a typescript descriptore ?

Static classes cannot be used

I tried to export Excel data using a static utility class, but something happened.

the code:
`

 public test14(): void {
    let lstHeader = [
        ["a", "a", "e"],
        ["b", "b", "e"],
        ["b", "b", "f"],
        ["c", "a", "a"],
        ["c", "a", "a"]
    ];

    let pool = new StaticPool({
        size: 10,
        workerData: ExcelUtil,
        task: num => {
            //write header data
            ExcelUtil.writeHeader(lstHeader, 22);
            for (let i = 0, rowIndex = 3; i < num * 50 + num; i++, rowIndex += 2) {
                for (let j = 0; j < 5; j++) {
                    //write cell data
                    ExcelUtil.writeCellData(rowIndex, j, "test");
                    ExcelUtil.writeCellData(rowIndex + 1, j, "demo");
                }
            }
            //get data sheet
            let sheet = ExcelUtil.getDataSheet();
            return {sheet: sheet, name: "Sheet" + num};
        }
    });

    let works: Array<Promise<void>> = [];
    let lstData: Array<Record<string, any>> = [];
    for (let i = 1; i <= 5; i++) {
        works.push(new Promise<void>(resolve => {
            pool.exec(i).then(result => {
                lstData.push(result);
                resolve();
            });
        }));
    }

    Promise.all(works).then(() => {
        pool.destroy().then();
        LogUtil.loggerLine(Log.of("ApplicationTest", "test14", "lstData", lstData));
        ExcelUtil.write(lstData, "C:\\Users\\YongKJ\\Desktop\\demo-" + Date.now() + ".xlsx");
    });

}

`

The error:
`

    internal/worker.js:204
        this[kPort].postMessage({
                    ^
    DOMException [DataCloneError]: class ExcelUtil {
        //获取excel表数据
        static toMap(excelName, sheetName, headerRow, headerCol, headerLastCol...<omitted>...
    } could not be cloned.

`

this.require in worker thread can't find module unless running from project directory

Hi,

I'm creating an API with nodejs + express etc. I am using this library for handling a situation where I need worker thread management. I am running the app in production as a systemd service on Ubuntu, where the command to start the app is: node /opt/projectname. Node starts the app and it runs fine. The problem occurs when I trigger the use of a worker thread which requires an external library (in this case axios).

I have a staticPool set up. If I try to use this.require in the wortker thread function while running the code from another directory—that is, not the project directory—the module cannot be found. When running from the project directory everything is fine.

Setting the WorkingDirectory to the project directory in the systemd unit file stops this issue, but I wanted to make you aware that this issue exists, and possibly document the solution for future users of this library.

Thanks for this great library.

Jaapie

Invalid or unexpected token

I dont know why when I initial instance StaticPool with TypeScript always SyntaxError: Invalid or unexpected token

import { StaticPool } from "node-worker-threads-pool"
const staticPool = new StaticPool({
  size: 4,
  task: (n: number):number => n + 1,
});
 
staticPool.exec(1).then((result: any) => {
  console.log("result from thread pool:", result); // result will be 2.
});

Worker num will reduce by one if timeout happens

If timeout happens for a StaticPool, the worker num will reduce by one.

Like if I init the pool like this

const pool = new StaticPool({size: 3, task: taskFilePath});
const params = [1, 2, 3, 4, 5];
params.map(p => pool.exec(p, timeout));

Then if one task goes timeout, the concurrent number will reduce by one which means there will be only 2 workers afterwards, so if size is 1 and timeout happens, the pool will stuck there.

Enable SHARE_ENV

Enable the SHARE_ENV flag so the threads can share read and write access to the env of the main process

Doesn't work in other module then main

I have implemented this package and it was working like a charm.

However I decided to organize my project code using MVC structure.

moved my code from route functions to Controllers, found out that this package doesn't create threads while in module which is imported by main code.

e.g: I cannot create threads in Controllers.

Allow sending progression notification to parent process + Allow Typescript task.

Hi,

During long process I would like to send progress notification to the parent.

For now child can only send one message to parent that notify that the process is done.

I start a fork for that change here and some more:

  • ✅ Convert the code base to typescript. [DONE]
  • ✅ Add option to identify if a message is a done notification and / or handler a progress notification. [DONE]
  • ✅ Transpile .ts file on fly. [DONE]
  • 🔲 add option to access to std(in/out/err) during task execution.

Questions:
By converting your code to typescript the _ prefix can be remove ? the private / protected keywords doing the same job.
SHARE_ENV from worker_threads is just a symbol, why not simply import it at the top of the file, instead of inside a if (opt.shareEnv) block. I prefer import over require.

I suggest adding a handler name isDone(message: any|ResultType): boolean
If this function return true if the message had been handled as a progressing message, and false if the message is a completion one.

if maybe: isDone(message: any|ResultType, task: TaskFunc): boolean, to simplify integration.

let me know your point of view.

How to use other packages within task?

const PSD = require('psd.js');
const { StaticPool } = require("node-worker-threads-pool");

const pool = new StaticPool({
size: 4,
task: param=> {
const psd = PSD.func(param);
...
},
});

pool.exec(param).then(() => { ... });

ReferenceError: PSD is not defined
at Object.task ([worker eval]:8:17)
at MessagePort. ([worker eval]:29:46)
at MessagePort.emit (events.js:223:5)
at MessagePort.onmessage (internal/worker/io.js:70:8)

ps: PSD object is not support clone, i can't pass it within the param.

How does the pool work when you limit it to x amount?

Hello there,

In my case I am using this to run 1k of heavy calculations.

I set my pool to 10 workers and just loop over them as shown in the documentation.

 for (let i = 0; i < data.length; i++) {
      (async () => {
        const _data = data[i];
        await pool.exec({ _data  });
      })();
    }

I can see that all of them are run but... I have a few questions.

If you hit 10 running workers.. what then? does it wait until one finishes and then starts the next batch?
Why am i using 'await pool.exec', they are running async so I'm not sure what the await in this case actually does?

ReferenceError [Error]: xxx is not defined

Hello,

I tried to use DynamicPool and dispatch each web request to the pool to handle. But I got the following error message.

ReferenceError [Error]: router is not defined

And I got the following in the document.

task Function as a task to do. ⚠️Notice: You can NOT access variables defined outside the task function!

So why access variables defined outside the task function is not allowed?

The following is my code.

const Router = require("./router.js");
const NodeThreadPriority = require("./add_on/binding");
const { DynamicPool } = require('node-worker-threads-pool');
const pool = new DynamicPool(8);

const router = new Router();

function runServer() {
http.createServer((req, res) => {
pool.exec({task: ()=>router.dispatch(res, new URL(req.url, http://${req.headers.host})), timeout: 100});
}).listen(3000, () => {
console.log("Server running on http://localhost:3000/");
});
}

Allow using Comlink.expose() on pool workers from main thread

Hey!
First of all thank you so much for making and maintaining the library.
I aim to use it in my project which requires performing multiple CPU intensive tasks within a nodejs app.

What I'm trying to perform and currently struggling in:
In my main thread I have some I/O performing API that I have to keep in the main thread only. I want to invoke it naturally from my workers.
I would prefer using the Comlink lib for that. Specifically I need to use the expose API (docs).
For achieving this, I need to have access to all of my StaticPool's workers and apply a Comlink.expose(<some API>, oneOfTheWorkers) on each of them.
Will it be possible to introduce something along the lines of allWorkers() to the Pool interface, or some other solution? I'm willing to contribute a PR.

Thanks!
Guy

unable to run tests when using typescript (with jest)

Hi
I'm using NestJS with Jest testing.

worker is expected to be a .js file while running with jest
the files iterated are provided as .ts.

For now the workaround is to run the tests over the .dist folder
and not over the src folder which is planned and configured by Jest / NestJS
and also a better practice.

Here is one workaround solution using a proxy .js worker,
please consider this approach:

https://wanago.io/2019/05/06/node-js-typescript-12-worker-threads/

Can't run with while(true)

Please support resolve my issue.

My code:
const pool = new StaticPool({
size: 5,
task: async () => {
var server = require('./server.js');
await server.run()
}
});

while (true) {
    (async () => {
        await pool.exec();
    })();

}

How to prevent crashed workers to block/exit the main process?

Assuming several workers started, I noticed that if one of the workers stopped working, the main process also stop working, can we prevent that behavior and if it happens, what practice to follow?
Is it possible to restart a worker that crashed?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.