Coder Social home page Coder Social logo

supercharge / promise-pool Goto Github PK

View Code? Open in Web Editor NEW
759.0 11.0 39.0 193 KB

Map-like, concurrent promise processing

Home Page: https://superchargejs.com/docs/promise-pool

License: MIT License

JavaScript 52.93% TypeScript 47.07%
map promises promise-pool async concurrency concurrency-control supercharge hacktoberfest

promise-pool's Introduction



Promise Pool

Map-like, concurrent promise processing for Node.js.


Installation · Docs · Usage



Latest Version Monthly downloads

Follow @marcuspoehls and @superchargejs for updates!


Installation

npm i @supercharge/promise-pool

Docs

Usage

Using the promise pool is pretty straightforward. The package exposes a class and you can create a promise pool instance using the fluent interface.

Here’s an example using a concurrency of 2:

import { PromisePool } from '@supercharge/promise-pool'

const users = [
  { name: 'Marcus' },
  { name: 'Norman' },
  { name: 'Christian' }
]

const { results, errors } = await PromisePool
  .withConcurrency(2)
  .for(users)
  .process(async (userData, index, pool) => {
    const user = await User.createIfNotExisting(userData)

    return user
  })

The promise pool uses a default concurrency of 10:

await PromisePool
  .for(users)
  .process(async data => {
    // processes 10 items in parallel by default
  })

Manually Stop the Pool

You can stop the processing of a promise pool using the pool instance provided to the .process() and .handleError() methods. Here’s an example how you can stop an active promise pool from within the .process() method:

await PromisePool
  .for(users)
  .process(async (user, index, pool) => {
    if (condition) {
      return pool.stop()
    }

    // processes the `user` data
  })

You may also stop the pool from within the .handleError() method in case you need to:

import { PromisePool } from '@supercharge/promise-pool'

await PromisePool
  .for(users)
  .handleError(async (error, user, pool) => {
    if (error instanceof SomethingBadHappenedError) {
      return pool.stop()
    }

    // handle the given `error`
  })
  .process(async (user, index, pool) => {
    // processes the `user` data
  })

Bring Your Own Error Handling

The promise pool allows for custom error handling. You can take over the error handling by implementing an error handler using the .handleError(handler).

If you provide an error handler, the promise pool doesn’t collect any errors. You must then collect errors yourself.

Providing a custom error handler allows you to exit the promise pool early by throwing inside the error handler function. Throwing errors is in line with Node.js error handling using async/await.

import { PromisePool } from '@supercharge/promise-pool'

try {
  const errors = []

  const { results } = await PromisePool
    .for(users)
    .withConcurrency(4)
    .handleError(async (error, user) => {
      if (error instanceof ValidationError) {
        errors.push(error) // you must collect errors yourself
        return
      }

      if (error instanceof ThrottleError) { // Execute error handling on specific errors
        await retryUser(user)
        return
      }

      throw error // Uncaught errors will immediately stop PromisePool
    })
    .process(async data => {
      // the harder you work for something,
      // the greater you’ll feel when you achieve it
    })

  await handleCollected(errors) // this may throw

  return { results }
} catch (error) {
  await handleThrown(error)
}

Callback for Started and Finished Tasks

You can use the onTaskStarted and onTaskFinished methods to hook into the processing of tasks. The provided callback for each method will be called when a task started/finished processing:

import { PromisePool } from '@supercharge/promise-pool'

await PromisePool
  .for(users)
  .onTaskStarted((item, pool) => {
    console.log(`Progress: ${pool.processedPercentage()}%`)
    console.log(`Active tasks: ${pool.processedItems().length}`)
    console.log(`Active tasks: ${pool.activeTasksCount()}`)
    console.log(`Finished tasks: ${pool.processedItems().length}`)
    console.log(`Finished tasks: ${pool.processedCount()}`)
  })
  .onTaskFinished((item, pool) => {
    // update a progress bar or something else :)
  })
  .process(async (user, index, pool) => {
    // processes the `user` data
  })

You can also chain multiple onTaskStarted and onTaskFinished handling (in case you want to separate some functionality):

import { PromisePool } from '@supercharge/promise-pool'

await PromisePool
  .for(users)
  .onTaskStarted(() => {})
  .onTaskStarted(() => {})
  .onTaskFinished(() => {})
  .onTaskFinished(() => {})
  .process(async (user, index, pool) => {
    // processes the `user` data
  })

Task Timeouts

Sometimes it’s useful to configure a timeout in which a task must finish processing. A task that times out is marked as failed. You may use the withTaskTimeout(<milliseconds>) method to configure a task’s timeout:

import { PromisePool } from '@supercharge/promise-pool'

await PromisePool
  .for(users)
  .withTaskTimeout(2000) // milliseconds
  .process(async (user, index, pool) => {
    // processes the `user` data
  })

Notice: a configured timeout is configured for each task, not for the whole pool. The example configures a 2-second timeout for each task in the pool.

Correspond Source Items and Their Results

Sometimes you want the processed results to align with your source items. The resulting items should have the same position in the results array as their related source items. Use the useCorrespondingResults method to apply this behavior:

import { setTimeout } from 'node:timers/promises'
import { PromisePool } from '@supercharge/promise-pool'

const { results } = await PromisePool
  .for([1, 2, 3])
  .withConcurrency(5)
  .useCorrespondingResults()
  .process(async (number, index) => {
    const value = number * 2

    return await setTimeout(10 - index, value)
  })

/**
 * source array: [1, 2, 3]
 * result array: [2, 4 ,6]
 * --> result values match the position of their source items
 */

For example, you may have three items you want to process. Using corresponding results ensures that the processed result for the first item from the source array is located at the first position in the result array (=index 0). The result for the second item from the source array is placed at the second position in the result array, and so on …

Return Values When Using Corresponding Results

The results array returned by the promise pool after processing has a mixed return type. Each returned item is one of this type:

  • the actual value type: for results that successfully finished processing
  • Symbol('notRun'): for tasks that didn’t run
  • Symbol('failed'): for tasks that failed processing

The PromisePool exposes both symbols and you may access them using

  • Symbol('notRun'): exposed as PromisePool.notRun
  • Symbol('failed'): exposed as PromisePool.failed

You may repeat processing for all tasks that didn’t run or failed:

import { PromisePool } from '@supercharge/promise-pool'

const { results, errors } = await PromisePool
  .for([1, 2, 3])
  .withConcurrency(5)
  .useCorrespondingResults()
  .process(async (number) => {
    // …
  })

const itemsNotRun = results.filter(result => {
  return result === PromisePool.notRun
})

const failedItems = results.filter(result => {
  return result === PromisePool.failed
})

When using corresponding results, you need to go through the errors array yourself. The default error handling (collect errors) stays the same and you can follow the described error handling section above.

Contributing

  1. Create a fork
  2. Create your feature branch: git checkout -b my-feature
  3. Commit your changes: git commit -am 'Add some feature'
  4. Push to the branch: git push origin my-new-feature
  5. Submit a pull request 🚀

License

MIT © Supercharge


superchargejs.com  ·  GitHub @supercharge  ·  Twitter @superchargejs

promise-pool's People

Contributors

amitlevy21 avatar coffeeflux avatar dependabot-preview[bot] avatar exoego avatar katsuya avatar lostplan avatar m93a avatar marcuspoehls avatar pandrews-hdai avatar peitek avatar simonjang avatar tomfuertes avatar tylercollier avatar wildego avatar willianagostini avatar wzhkobe2408 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

promise-pool's Issues

Docs: add support for iterables

It looks like #72 added support for iterables and async iterables. However, there's no mention of this in the documentation. Adding it would be helpful.

Obtain current iteration on pool

Hey guys!

Amazing library, just started playing with it and was wondering if there's any way to get the current iteration for said pool kinda like a foreach, example of what it'd look like below.

.process(async (uri: string, i: number) => {

It would be amazingly useful for me with what I'm doing in the backend.

Thanks in advance,
Luis Bizarro

Feature request: Accept iterables and async iterables

Currently, the PromisePool.for method only accepts an array of items. However, there are many cases where it would be helpful to also support iterables and async iterables as input.

For example, my use case involves scraping a paginated web page using puppeteer, where I asynchronously yield the article urls. I would like to use a promise pool to fetch the articles, but since I am using an async iterable to generate the URLs, I am unable to use PromisePool.for in its current form.

I believe that allowing PromisePool to accept iterables and async iterables would greatly increase the flexibility and usefulness of this library, and I would be happy to contribute a pull request to add this feature.

There is some overlap with the issue #65, which proposed allowing new items to be added to the pool. I believe that once the logic for consuming iterables is implemented, it won't be hard to also add methods like queueNext and queueLast that would add new items to the pool's queue.

Question about the pool

Does the pool runs every time the max concurrency? Or does it work in batches?

For example, given 10 elements with a concurrency set of 2:

a. Does it does 5 iterations using 2 elements at the time? or
b. As soon as 1 promise gets resolved, immediately starts another until it reaches 2 active promises?

If the async task throw undefined, the error collection will fail

The code below excute async task that will reject with nothing.

Therefore the startProcessing function will fail at new PromisePoolError(error.message, item).

I know this is not common in real world, but it's it nessary to improve robustness ?

async function main () {
  const { results, errors } = await PromisePool
    .for(Users)
    .withConcurrency(3)
    .process(async (user) => {
      await new Promise((resolve, reject) => setTimeout(reject, 1000))

      return user
    })

  console.log('results', results)
  console.log('errors', errors)
}

main()

PromisePool doesn't limit parallel requests after an error occurrence

Code

await PromisePool.for(hugeMessageArray)
  .withConcurrency(5)
  .process(async (message) => {          
    // mutate does a network operation which eventually fails
    await mutate(message);
  });

Expected

Even when a async operation fails, the number of max parallel executions (5) must be respected.

Observed

Peek 2020-11-02 21-02

Observe how the bulk request respect the concurrency at first, but then, when the first error occurrs, it calls process without respecting the concurrency limit.

Canceling a pool

I'm running a pool that takes a while, is there any way to cancel it?

I guess I can do a workaround like this:

.process(async data => {
  if (canceled) return

})

But that would still iterate all input elements for nothing.

Test breaks because of Nullish coalescing

Error

    Jest encountered an unexpected token

    Jest failed to parse a file. This happens e.g. when your code or its dependencies use non-standard JavaScript syntax, or when Jest is not configured to support such syntax.

    Out of the box Jest supports Babel, which will be used to transform your files into valid JS based on your Babel configuration.

    By default "node_modules" folder is ignored by transformers.

    Here's what you can do:
     • If you are trying to use ECMAScript Modules, see https://jestjs.io/docs/ecmascript-modules for how to enable it.
     • If you are trying to use TypeScript, see https://jestjs.io/docs/getting-started#using-typescript
     • To have some of your "node_modules" files transformed, you can specify a custom "transformIgnorePatterns" in your config.
     • If you need a custom transformation specify a "transform" option in your config.
     • If you simply want to mock your non-JS modules (e.g. binary assets) you can stub them out with the "moduleNameMapper" config option.

    You'll find more details and examples of these config options in the docs:
    https://jestjs.io/docs/configuration
    For information about custom transformations, see:
    https://jestjs.io/docs/code-transformation

    Details:

    /codebuild/output/src4287129335/src/node_modules/@supercharge/promise-pool/dist/promise-pool.js:14
            this.items = items ?? [];
                                ^

    SyntaxError: Unexpected token '?'

Cause

That error occurs when I run a test on the AWS build, using codePipeline. AWS limits node versions to 10 and 12. and none of these versions support Nullish coalescing

handleError not working accordingly to docs?

Please see the following example.

When an error is thrown in the process Function, it's not being communicated to the handleError Function.

From the README.md:

Providing a custom error handler allows you to exit the promise pool early by throwing inside the error handler function. Throwing errors is in line with Node.js error handling using async/await.

Is this a bug?

Node Version: 18.17.1
Library Version: 3.0.0

script:

const PromisePool = require("@supercharge/promise-pool").PromisePool;

(async () => {
  await PromisePool.withConcurrency(1)
    .handleError(async (error) => {
      console.log("error", error);
      throw error;
    })
    .for([{ a: 1 }, { b: 2 }, { c: 3 }])
    .process(async (item) => {
      console.log(item);
      throw new Error("test error here");
    });
})();

output:

% node test.js
Debugger attached.
{ a: 1 }
{ b: 2 }
{ c: 3 }
Waiting for the debugger to disconnect...
% 

Reuse pool on dependent promises.

I'm using @supercharge/promise-pool to limit concurrency in my application. I'm attempting to use the same pool to kick off dependent requests but seeing more than my established concurrency going off (e.g. concurrency of 100 is kicking off 200+ requests before ultimately crashing)

My assumption here is that I can re-use the pool to limit concurrency in totality.

    const pool = PromisePool
      .withConcurrency(CONCURRENCY);

    await pool
    .for(roots)
    .process(async (root) => {
      await httpRequest(root);
      await pool
        .for(root.children)
        .process(async (child) => {
          await httpRequest(child);
        });
    });

Am I using this incorrectly? (Cross posted here for reference)

need onProgress(completedTasks)

we need a onProgress callback to see how many promises are fulfilled, useful for making progressbars

.onProgress((completedTasks)=>{
//update progressbar

})
.proccess(async (s)=>{
//...
}
);

Preserving the original errors

Hi! Really appreciate your work on PromisePool.

Is there any way to preserve the original error being thrown from within the PromisePool? Right now it just gets stringified and recreated as a PromisePoolError, which strips away a lot of useful contextual information.

Dynamically change the concurrency

Hi,
Is it possible to change the concurrency while it's running? the usage is for warming up purpose in the cloud. So it can start with 2 and go up after each x task to the maximum rather than just jumping to max number right away

skip a task from including in result

Basically I want to do filtering on my original list based on the result of async operations. I tried returning undefined from the process callback but it just fills the resulting array with undefineds. Is there a way to skip including a value in the resulting array like pMapSkip from sindresorhus/p-map#39?

This module can only be referenced with ECMAScript imports/exports by turning on the 'esModuleInterop' flag and referencing its default export

Hi,

I want to remove the esModuleInterop flag from my projects tsconfig.json.

However, I'm using this awesome library and It's the only dependency I can't seem to find a way around it.

Is there a way to use promise-pool without having esModuleInterop flag?

tried:

import PromisePool from '@supercharge/promise-pool'

import * as PromisePool from '@supercharge/promise-pool'

import { withConcurrency } from '@supercharge/promise-pool'

I Get the compilation error with each

Thank you!

Feature request: Add additional items onto already-running PromisePool instance?

Not an issue per se: more a question of "is this even possible".

The exact usecase I have is recursively crawling a directory structure via a REST API. I can only query 1 folder's contents per API call, plus it's paginated. So it requires lots of slow API calls which could be run in parallel.

Right now, I pass PromisePool a list of top-level folder IDs, then it will scan each one in parallel and save the results to a file. The trouble is: scanning that top level folder yields more API calls to do, since the folder structure's depth could be infinite. In other words: Only the top-level folders benefit from parallelization.

I had thought to use PromisePool on the child folders too, but the concurrency doesn't seem to be "shared" across multiple instances of PromisePool. So then the API gets angry at me for making too many calls too fast.

So it seemed logical to have a single PromisePool instance, then as new "crawl" jobs are discovered, they can simply be pushed onto the list of remaining Promises, to be resolved whenever concurrency permits.

Questions:
Does PromisePool support this kind of workflow today?
If not, would it even be possible to implement (assuming I do the legwork and submit PR)? Or does the architecture of the existing module make this sort of thing impossible/unrealistic? Or maybe this usecase is so "niche" it doesn't have a place?

Lastly, is there a better module for this kind of thing? I came upon a few promising modules, but decided on PromisePool for its simplicity (it's amazing btw!). But now I am realizing my usecase may be more complex than I originally thought.

Browser use causes Critical Dependency Webpack error

Hi there!

I fully realize that Supercharge is designed and built as a Node framework. That being said, a util like PromisePool is really useful for client-side applications as well.

However, due to the way @supercharge/goodies is built, bundling @supercharge/promise-pool into a front-end context throws the following warning on builds (with Webpack):

WARNING  Compiled with 1 warning
Critical dependency: the request of a dependency is an expression
warning  in ./node_modules/@supercharge/goodies/dist/goodies.js

Allow the pool to "failLoud" on error

Currently, the promise pool run the given callback on each item. Even though an error happens during the processing. The pool won't fail on errors and instead collect all errors.

This PR should add an option to let the pool fail loud instead of being silent on errors.

Expected API

try {
  const pool = new PromisePool()
    .for(users)
    .failLoud()  // <-- new method

  await pool.process(async user => {
    // process user
  })
} catch (error) {
  // handle error 
}

Retrieve task index?

I'm running the following code where my functions need to pick a web worker from a list in order to run.

const workersList = [worker1, worker2];
await PromisePoool
	.withConcurrency(workerList.length)
	.for(items)
	.process(async (item) => {
		const worker = workerList[executorIndex];

        await runMyLogic(item, worker);
	});

Right now I can't seem to find any way to know which executor is being used from the pool, may you advise?

where is user going after return?

Hello,

const { results, errors } = await PromisePool
  .withConcurrency(2)
  .for(users)
  .process(async data => {
    const user = await User.createIfNotExisting(data)

    return user
  })

Please suggest what do I do with the user that is return from the above ? where to catch this? for me if I'm printing results its coming undefined? Can you please post a good example over here.

Thanks
Jitender

How to pass errors

Hi,

Following your example:

const { results, errors } = await PromisePool .for(items) .withConcurrency(50) .process(async (item, index, pool) => { var finalised; if (channel === channelTypes.SMS) { finalised = await import_b(item, lists, user_id, validate, channel); } else { finalised = await import_a(item, lists, user_id, validate, channel); } return { finalised }; });

How can I pass errors from my function? The results.length returns the count which is good but the error returns nothing and I know me import_b console give an error.

@marcuspoehls

How to retry a fixed number of times a failed promise?

I have a promise pool like these...

  // Creating imageLinks   
  const { results } = await PromisePool.withConcurrency(20)
  .for(metaplexFiles)
  .process(async (metaplexFile) => {
  console.log("creating an imageLink")
    
    const imageLink = await metaplex.storage().upload(metaplexFile)
    console.log('arweave link created', imageLink)
    
    return imageLink;
  })

Being metaplexFile an array of Files[] that when passed through the promise metaplex.storage().upload(metaplexFile) generates a link...

Now...
Sometimes these process fail, and the link is not generated
And I just want to retry at least 5 times that promises that fail any help?

I have read the error handling documentation

I understand that I can catch the error:

  • At the moment with .handleError()
  • Or at the end with errors

But I'm not sure what is the correct approach...
These is one example of my generated links:
https://i.imgur.com/iGEdFgo.png
here I generated 8 successfully links...
But 2 cases give me an error...

I tried to collect the errors in this way at the end...
https://i.imgur.com/zSX4JCm.png
That is to say, left all the errors and try to make some type of finally 5 times retry or something like that...
But it's seems like I'm in the same place of where I start...
If errors have 100 items for example I could end up facing more mistakes...

Maybe there is not way of make something like these?

  // Creating imageLinks   
  const { results } = await PromisePool.withConcurrency(20)
  .for(metaplexFiles)
  .handleError(async (err) => {
    // MAKE SOME RETRY LOGIC HERE TO SEND THE PROMISE AGAIN TO `.process` 5 times or the max number of times that I decided??? 
  })
  .process(async (metaplexFile) => {
  console.log("creating an imageLink")
    
    const imageLink = await metaplex.storage().upload(metaplexFile)
    console.log('arweave link created', imageLink)
    
    return imageLink;
  })

But I really don't know how to make these logic of retry and send again to .process()
Any help with these issue?

Results does not preserve the order of tasks

In some minor cases, this'll cause some serious problem. I'm suggesting instead of push result to results array, use index to set the result and a null for the fail task. Here's the proposed changes to promise-pool-executor.ts.

  async process (): Promise<ReturnValue<T, R>> {
    for (let i = 0; i < this.items.length; i++) {
      const item = this.items[i]
      if (this.hasReachedConcurrencyLimit()) {
        await this.processingSlot()
      }

      this.startProcessing(item, i) // pass index to startProcessing
    }

    return this.drained()
  }

  startProcessing (item: T, index: Number): void {
    const task = this.createTaskFor(item)
      .then(result => {
        this.results[index] = result // set by index
        this.tasks.splice(this.tasks.indexOf(task), 1)
      })
      .catch(error => {
        this.results[index] = null // set result as null if failed
        this.tasks.splice(this.tasks.indexOf(task), 1)

        if (this.errorHandler) {
          return this.errorHandler(error, item)
        }

        this.errors.push(
          PromisePoolError.createFrom(error, item)
        )
      })

    this.tasks.push(task)
  }

`Error.captureStackTrace` fails in some environments

Error.captureStackTrace is used in several places in the code: https://github.com/search?q=repo%3Asupercharge%2Fpromise-pool%20capturestacktrace&type=code

However, captureStackTrace is non-standard and fails in some environments. See https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Error#static_methods.

For example, in Safari:

Error.captureStackTrace(new Error())
// => TypeError: Error.captureStackTrace is not a function. (In 'Error.captureStackTrace()', 'Error.captureStackTrace' is undefined)

Consider using https://www.npmjs.com/package/capture-stack-trace for compatibility 🥺

Performance vs Batching Promise.all

Hello I was looking to implement this package for large sets of db reads and writes. I was comparing using the PromisePool with concurrency of 1000 to using Promise.all in batches of 1000.

Performance testing was done on 100k db read and write operations.
Using promise pools the execution time for 100k total operations with a concurrency of 1000 was 1:40.937 (m:ss.mmm)
Using 1000 batch size for Promise.all(...) had an execution time of 41.867s (also on 100k db reads and writes)

I tested multiple times on different sets of data to ensure there weren't some random anomalies and to avoid performance variability from db caching.

I figured promise pools would be significantly faster since it should be maintaining 1000 open promises at a time where the batch Promse.all approach needs to wait for all 1000 operations in a batch to complete before starting the next 1000 operations.

Is this expected and are there ways to optimize the use of this library?

Node.js: v18.12.1

  const batchSize = 1000;
  const entries=[..];
  const findAndInsert = async (number) => {
    const item = await getItem(number);
    await itemMapper.insert({ ...item });
  };
  
  console.time("promises"); //start timer
  
  await PromisePool.for(entries)
    .withConcurrency(batchSize)
    .process(findAndInsert);
    
  console.timeEnd("promises"); // end timer

Unintuitive result with 1.4.0 types

Hi there, thanks for putting this library together — it's a great lightweight solution to a common problem. I just played around with the latest 1.4.0 release, and noticed something somewhat unintuitive. Suppose I have the following snippet:

const asyncDouble = async (val: number): Promise<number> => val * 2;

const { results } = await PromisePool.withConcurrency(2)
  .for([1, 2, 3])
  .process(async (i) => {
    const result = await asyncDouble(i);

    return result;
  });

I would expect results to be typed as number[], it's getting typed as Promise<number>[] instead. Is this expected, or have I done something wrong?

Add example of async generator

10 concurrency limit not working, TypeError

const PromisePool = require('@supercharge/promise-pool')
await PromisePool
		.for([1, 2, 3, 4])
		.process(async c => console.log(c))
TypeError: "concurrency" must be a number, 1 or up. Received "undefined" (undefined)
    at PromisePoolExecutor.validateInputs (/home/t/project/node_modules/@supercharge/promise-pool/dist/promise-pool-executor.js:103:19)
    at PromisePoolExecutor.start (/home/t/project/node_modules/@supercharge/promise-pool/dist/promise-pool-executor.js:89:36)
    at PromisePool.process (/home/t/project/node_modules/@supercharge/promise-pool/dist/promise-pool.js:89:14)
    at /home/t/project/script.ts:42:4
    at step (/home/t/project/script.ts:36:23)
    at Object.next (/home/t/project/script.ts:17:53)
    at fulfilled (/home/t/project/script.ts:8:58)
    at processTicksAndRejections (node:internal/process/task_queues:94:5)

Delete unnecessary files from dist

Hi, I am a develop extension for azure marketplace. And I can't publishing because dist have some file with whitespace. Can you pls fix it 🙏
image

Not tree-shakeable

I use esbuild to bundle my app which uses promise-pool like this:

import { PromisePool } from "@supercharge/promise-pool"
...
new PromisePool()
        .withConcurrency(this.parallelism)
        .for(...)
        .process(...)

I noticed that unused code return-value and contracts are bundled in the artifact.
It is due to re-exporting in

export * from './contracts'
export * from './promise-pool'
export * from './promise-pool-error'
export * from './return-value'
export * from './stop-the-promise-pool-error'
export * from './validation-error'

and promise-pool is currently published as CommonJS, not ESM.

So it would be if this library declare "sideEffects": false in package.json or publish ESM version.

When using TypeScript, the types related to the .process() function are not correct

The types of the .process function are not being handled correctly causing compile errors:

import Pool from '@supercharge/promise-pool';

export default class MyClass {
  myConcurrency: number;

  constructor(concurrency: number) {
    this.myConcurrency = concurrency;
  }

  public async processStuff(stuff: Array<number>): Promise<number[]> {
    const pool = new Pool()
      .withConcurrency(this.myConcurrency)
      .for(stuff);

    const { results } = await pool.process((x: number) => x + 1);

    return results;
  }
}

The TypeScript compiler gives the following error:

Argument of type '(x: number) => number' is not assignable to parameter of type '(item: unknown) => number | Promise<number>'.
  Types of parameters 'x' and 'item' are incompatible.
    Type 'unknown' is not assignable to type 'number'.

15     const { results } = await pool.process((x: number) => x + 1);
                                              ~~~~~~~~~~~~~~~~~~~~

.withConcurrency(1) still processes multiple items in the array at once

async function processQueue() {
  const { errors } = await PromisePool
    .withConcurrency(1)
    .for(queue)
    .onTaskStarted(() => {
      console.log('Processing queue item');
    })
    .onTaskFinished(() => {
      console.log('Finished queue item');
    })
    .process(async (item) => {
      console.log(item);
      await Interpolation.process(item.file, item.model, item.output, item.params, item.extension, item.engine);
    })
    errors.forEach(error => {
      console.error(error);
    })
}
exportBtn.addEventListener('click', processQueue);

It seems that it's not possible to process items one by one. They still execute in parallel with at least 2 concurrent processes.
I guess the library was only designed for concurrency and it's not the usecase it was designed for?

Would it be possible to implement this?
I would love to use this library, as I prefer the syntax and features over other alternatives.

Progress for Started and Finished Tasks

Hi,

We need a callback to see how many promises are fulfilled, useful for making progress bars and see what task has finished.
Relationed to issue #33 and PR #34 that wasn't completed.

I see 2 callback:

onTaskStarted(item, percentage, activeTasks, finishedTasks)

  • item: item has started
  • percentage: progress of items that has started
  • activeTasks: array of running tasks
  • finishedTasks: array of completed tasks

onTaskFinished(item, percentage, activeTasks, finishedTasks)

  • item: item has finished
  • percentage: progress of items that has finished

If you approve I can share a PR

Is it possible to timeout processing of an individual item?

I'm using promise-pool in my application for controlling the concurrency of a file downloader to avoid triggering the request rate limitations of an upstream API, for which it's working fairly well but I'm troubleshooting a bit of an edge case and need some help from the library.

Problem I'm facing right now is that my pool sometimes develops a "clog" and I need to metaphorically drano the sucker.

Between the pool statistics available to the onTask* callbacks and the index available to the ProcessHandler callback I've been able to add enough logging to those 3 points to determine that I have several items that the pool starts processing but then they never finish and there are no errors reported.

So the pool will end up finishing the last item with 3 Active Tasks, N-3 Finished Tasks, and then hang forever. In the interim while I don't know what's going on, I'd love to be able to timeout processing of an individual item as a last resort or as a simple way to retry entire items via the library without having to blow out the size and responsibility of my ProcessHandler code to write my timeout and retry manually in there.

question: is this similar to async queue?

Hey, nice lib! After I read a bit on the docs I like the api, but to me it seems like the internals could be just a wrapper around this https://caolan.github.io/async/v3/docs.html#queue? I just wanted to mention this since the above package seems like it's very well maintained, has good typescript support and is battle tested (approx 48 million weekly downloads).

Again, just wanted to give you a heads up. Feel free to close this "issue".

Pool not terminating correcly

First of all thank you for this great package. I noticed an issue in version 2.3.0 where if you rethrow the error in the error handler the promise chain doesn't get stopped but it gets executed to completion (this only happens when concurrency is higher than 1). Downgraded to 2.2.0 and it works perfectly. I will create a minimal reproducible example as soon as I have some free time.

pool.stop() only stops after processing 1 additional item

I have a series of long running tasks that get queued up by promise-pool. When I fire return pool.stop() StopThePromisePoolError(); in promise-pool-executor.js gets thrown but it only stops after processing 1 additional item.

To illustrate for replication:

illustration

This is a fatal flaw on longer running processes, as it should immediately stop and not after it finishes the current task + one additional one.

async function processQueue() {
      const { errors } = await PromisePool
      .withConcurrency(1)
      .for(queue)
      .onTaskStarted((item) => {
        console.log('Starting queue');
        sessionStorage.setItem('currentFile', item.file);
        // fetch metadata
        Media.fetchMetadata();
        // listen for preview
        Preview.listenForPreview();
        // initialize ui for process
        Process.startProcessInterpolation();
      })
      .onTaskFinished(() => {
        console.log('Finished queue item');
      })
      .process(async (item, index, pool) => {
        console.log(item);
        exportBtn.addEventListener('click', function () {
          console.log('stopping queue');
          return pool.stop();
        })
        await Interpolation.process(item.file, item.model, item.output, item.params, item.extension, item.engine);
      })
    errors.forEach(error => {
      console.error(error);
    })
}};

I tried around a lot to fix this but came to no solution.

Handling errors

Hi guys! Thanks for your work. It really helped me in making my Cloud Functions development more enjoyable.

I have a questions that is not promise-pool related but JavaScript oriented. I still hope you can help me with a quick hint as it might be obvious for you and could be helpful as a documentation example 👍

In your example to provided the following:

const { results, errors } = await PromisePool
  .withConcurrency(2)
  .for(users)
  .process(async data => {
    const user = await User.createIfNotExisting(data)

    return user
  })

I am particularly interested in looking at the errors. However I did not really understand how I can access the errors-Object. I know this is a JavaScript-Beginner question, so sorry about this.

I ended up doing this:

 const addedUsers = await PromisePool.withConcurrency(3)
 ...

And then looking into the errors like:

if (addedUsers.errors.length) {
...
...

But that's not a very sleek solution. Any hint how to handle the errors better?

Thank you in advance.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.