Coder Social home page Coder Social logo

async-sema's Introduction

async-sema

This is a semaphore implementation for use with async and await. The implementation follows the traditional definition of a semaphore rather than the definition of an asynchronous semaphore seen in some js community examples. Where as the latter one generally allows every defined task to proceed immediately and synchronizes at the end, async-sema allows only a selected number of tasks to proceed at once while the rest will remain waiting.

Async-sema manages the semaphore count as a list of tokens instead of a single variable containing the number of available resources. This enables an interesting application of managing the actual resources with the semaphore object itself. To make it practical the constructor for Sema includes an option for providing an init function for the semaphore tokens. Use of a custom token initializer is demonstrated in examples/pooling.js.

Usage

Firstly, add the package to your project's dependencies:

npm install --save async-sema

or

yarn add async-sema

Then start using it like shown in the following example. Check more use case examples here.

Example

const { Sema } = require('async-sema');
const s = new Sema(
  4, // Allow 4 concurrent async calls
  {
    capacity: 100 // Prealloc space for 100 tokens
  }
);

async function fetchData(x) {
  await s.acquire()
  try {
    console.log(s.nrWaiting() + ' calls to fetch are waiting')
    // ... do some async stuff with x
  } finally {
    s.release();
  }
}

const data = await Promise.all(array.map(fetchData));

The package also offers a simple rate limiter utilizing the semaphore implementation.

const { RateLimit } = require('async-sema');

async function f() {
  const lim = RateLimit(5); // rps

  for (let i = 0; i < n; i++) {
    await lim();
    // ... do something async
  }
}

API

Sema

Constructor(nr, { initFn, pauseFn, resumeFn, capacity })

Creates a semaphore object. The first argument is mandatory and the second argument is optional.

  • nr The maximum number of callers allowed to acquire the semaphore concurrently.
  • initFn Function that is used to initialize the tokens used to manage the semaphore. The default is () => '1'.
  • pauseFn An optional fuction that is called to opportunistically request pausing the the incoming stream of data, instead of piling up waiting promises and possibly running out of memory. See examples/pausing.js.
  • resumeFn An optional function that is called when there is room again to accept new waiters on the semaphore. This function must be declared if a pauseFn is declared.
  • capacity Sets the size of the preallocated waiting list inside the semaphore. This is typically used by high performance where the developer can make a rough estimate of the number of concurrent users of a semaphore.

async drain()

Drains the semaphore and returns all the initialized tokens in an array. Draining is an ideal way to ensure there are no pending async tasks, for example before a process will terminate.

nrWaiting()

Returns the number of callers waiting on the semaphore, i.e. the number of pending promises.

tryAcquire()

Attempt to acquire a token from the semaphore, if one is available immediately. Otherwise, return undefined.

async acquire()

Acquire a token from the semaphore, thus decrement the number of available execution slots. If initFn is not used then the return value of the function can be discarded.

release(token)

Release the semaphore, thus increment the number of free execution slots. If initFn is used then the token returned by acquire() should be given as an argument when calling this function.

RateLimit(rps, { timeUnit, uniformDistribution })

Creates a rate limiter function that blocks with a promise whenever the rate limit is hit and resolves the promise once the call rate is within the limit set by rps. The second argument is optional.

The timeUnit is an optional argument setting the width of the rate limiting window in milliseconds. The default timeUnit is 1000 ms, therefore making the rps argument act as requests per second limit.

The uniformDistribution argument enforces a discrete uniform distribution over time, instead of the default that allows hitting the function rps time and then pausing for timeWindow milliseconds. Setting the uniformDistribution option is mainly useful in a situation where the flow of rate limit function calls is continuous and and occuring faster than timeUnit (e.g. reading a file) and not enabling it would cause the maximum number of calls to resolve immediately (thus exhaust the limit immediately) and therefore the next bunch calls would need to wait for timeWindow milliseconds. However if the flow is sparse then this option may make the code run slower with no advantages.

Contributing

  1. Fork this repository to your own GitHub account and then clone it to your local device
  2. Move into the directory of the clone: cd async-sema
  3. Link it to the global module directory of Node.js: npm link

Inside the project where you want to test your clone of the package, you can now either use npm link async-sema to link the clone to the local dependencies.

Author

Olli Vanhoja (@OVanhoja)

async-sema's People

Contributors

cardin avatar dsabanin avatar fauxfaux avatar gastonfig avatar leerob avatar leo avatar mfix22 avatar olliv avatar pranaygp avatar riophae avatar sergiodxa avatar timneutkens avatar weeezes avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

async-sema's Issues

Some nice way of async iterating?

I'm trying to limit uploads to 10 simultaneously, and I'm doing it like this:

	const uploadSema = new Sema(10)
	for (const upload of uploads) {
		await uploadSema.acquire()
		uploadOne(upload).finally(() => uploadSema.release())
	}
	await uploadSema.drain()

It's reasonably nice, but I was wondering if there wasn't a way to make this nicer.

I moved the logic to this helper function

export const queuedWork = async (items, fn, workers = 10) => {
	const sema = new Sema(workers)
	let threw = null
	for (const item of items) {
		if (threw) break
		await sema.acquire()
		// eslint-disable-next-line promise/catch-or-return
		Promise.resolve(fn(item))
			.catch(err => {
				threw = err
			})
			.finally(() => sema.release())
	}
	await sema.drain()
	if (threw) throw threw
}

Is this a good way of going about it? Is there maybe a more elegant way?

(wrote these tests too)

test('queuedWork async', async () => {
	const out = []
	await queuedWork([1, 5, 7, 89, 2], async n => out.push(n), 2)
	expect(out).toEqual([1, 5, 7, 89, 2])
})

test('queuedWork sync', async () => {
	const out = []
	await queuedWork([1, 5, 7, 89, 2], n => out.push(n), 2)
	expect(out).toEqual([1, 5, 7, 89, 2])
})

test('queuedWork throws', async () => {
	const out = []
	await expect(
		queuedWork(
			[1, 2, 3, 4, 5],
			async n => {
				if (n === 2) throw new Error('meep')
				else out.push(n)
			},
			2
		)
	).rejects.toThrow('meep')
	expect(out).toEqual([1, 3])
})


test('queuedWork throws sync', async () => {
	const out = []
	await expect(
		queuedWork(
			[1, 2, 3, 4, 5],
			n => {
				if (n === 2) throw new Error('meep')
				else out.push(n)
			},
			2
		)
	).rejects.toThrow('meep')
	expect(out).toEqual([1])
})

Add examples to README

All examples are currently in an examples directory, but we should perhaps include the basic on on the README itself

IE 11 Support

When adding this to a project, my app no longer loads in IE 11. I can't find any polyfills which alleviate the problem.

Has anyone had success running in IE?

Rate limit and throttle: Last item throttled

Thanks for this code!

const lim = RateLimit(2, {
  timeUnit: (5000),
});

await Promise.all(
  images.map(async (image, index) => {

    await lim();

    const parsed = await _prepImage.call(this, image);

    await util.downloadImage(parsed.url, parsed.target);

  })
);

From what I can tell, the above code downloads two images at a time and pauses 5 seconds between concurrent downloads.

I am noticing though, the last image download waits when I would like it to exit as soon as it's resolved.

In other words, since there are no more images to download (as it it the last promise to resolve) I don't want it to wait.

Questions:

  • Considering the above code snippet, am I using RateLimit properly (or, is this a good use case for RateLimit)?
  • How can I exit out of the last promise to resolve without pausing?

Many thanks in advance for the help and for sharing this code!

RateLimit does not work with number less than 1.

async function f() {
  const lim = RateLimit(0.5, { timeUnit: 1000 }); // rps
  console.time('order');

  for (let i = 0; i < 100; i++) {
    await lim();
    console.timeLog('order');
  }
  console.timeEnd('order');
}
f();

order: 1.083s
order: 2.085s
order: 3.087s
order: 4.091s
async function f() {
  const lim = RateLimit(1, { timeUnit: 2000 }); // rps
  console.time('order');

  for (let i = 0; i < 100; i++) {
    await lim();
    console.timeLog('order');
  }
  console.timeEnd('order');
}
f();

order: 2.071s
order: 4.075s
order: 6.078s
order: 8.081s

it should work with number less than 1 or maybe we should write it in the doc

Intuition surrounding `nr` values

How should one best figure out how to meaningfully determine this value? The examples seem to not follow any strict pattern.

Can the readme be made to include a rough guideline or set of heuristics to determine this for a given type of application?

.release() does not check actual token count

The nr value given to Sema() does not stop .release() from being called as many times as a user wants.

Example below shows an nr value of 1, but the user is able to obtain 2 concurrent uses.

const Sema = require("async-sema").Sema;

let i = 0;
const a = new Sema(1);
a.release();
a.acquire().then(() => {
	i++;
	console.log(`i = ${i}`);
	return a.acquire();
}).then(() => {
	i++;
	console.log(`i = ${i}`);
	return a.acquire();
});

Args object should be optional

Now the constructor requires an on empty object for default args:

new Sema(10, {})

while

new Sema(10)

would be optimal.

Possible p() & v() confusion

Looking at the source code and the examples, it seems to me that the library is using p() to signal/free up a resource, whereas v() is used to wait/"block" on the semaphore.

However, according to the definition of a semaphore, the uses of p() and v() are the other way around. Some resources to verify are:

Please let me know if I'm misunderstanding the use case here. :)

broken typescript definition

Hi all, currently typescript definition seems broken, typescript compiler complains about malformed type definition file (.d.ts) syntax.

maybe it's off topic, I love this package. so i've migrated async-sema package to typescript. i'd like to discuss about publishing typescript version of async-sema. should i publish typescript version of async-sema package as individual package? or just send PR to this repo? Please kindly give me your advice. Thanks :)

Action required: Greenkeeper could not be activated 🚨

🚨 You need to enable Continuous Integration on all branches of this repository. 🚨

To enable Greenkeeper, you need to make sure that a commit status is reported on all branches. This is required by Greenkeeper because it uses your CI build statuses to figure out when to notify you about breaking changes.

Since we didn’t receive a CI status on the greenkeeper/initial branch, it’s possible that you don’t have CI set up yet. We recommend using Travis CI, but Greenkeeper will work with every other CI service as well.

If you have already set up a CI for this repository, you might need to check how it’s configured. Make sure it is set to run on all new branches. If you don’t want it to run on absolutely every branch, you can whitelist branches starting with greenkeeper/.

Once you have installed and configured CI on this repository correctly, you’ll need to re-trigger Greenkeeper’s initial pull request. To do this, please delete the greenkeeper/initial branch in this repository, and then remove and re-add this repository to the Greenkeeper App’s white list on Github. You'll find this list on your repo or organization’s settings page, under Installed GitHub Apps.

Way to cancel pending tokens

I find I have the need to do a series of concurrent tasks, with a max concurrency, but also with a max total run time. After this time, whatever hasn't finished or started can be ignored.

I propose to add a new API

sema.cancel()

Which rejects all pending .acquire() calls. This way I can do the following

const timeout = setTimeout(() => sema.cancel(), 5000);
await Promise.all(jobs.map(async job => {
  try {
    sema.acquire();
  } catch (err) {
    if (err.code === 'CANCELLED') {
      return null
    }
    throw err
  }

  try {
    await doWork(job);
  } finally {
    sema.release();
  }
}));
clearTimeout(timeout);

If accepted I can make a PR

What is the `nr` variable?

sorry to interrupt. what does nr stands for, I thought it was num of resource, but then what is the capacity variable?

Add Sema.run utility for easy task processing

I think adding the following utility function might simplify working with semaphores a lot

export class Sema {
    public runTask<T>(task: (token?: any) => Promise<T>): Promise<T> {
        return this.acquire().then(token =>
            Promise.resolve(token)
                .then(task)
                .finally(() => sem.release(token))
        );
    }
}

It would allow to simply and safely queue tasks for processing while making sure that acquired tokens are never lost:

const sem = new Sema(3);

// Run many task in parallel limited by sem
for (let i = 0; i < 100; i++) {
    sem.runTask(async () => {
        // do some stuff
    });
}

// Alternatively use Promise.all and map
await Promise.all(items.map(item => sem.runTask(() => processItem(item))))

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.