Coder Social home page Coder Social logo

tannerlinsley / swimmer Goto Github PK

View Code? Open in Web Editor NEW
202.0 7.0 8.0 121 KB

๐ŸŠ Swimmer - An async task pooling and throttling utility for JS

Home Page: https://codesandbox.io/s/mq2j7jq39x?expanddevtools=1&hidenavigation=1

License: MIT License

JavaScript 100.00%
javascript async promises pooling throttling task concurrency await

swimmer's Introduction

๐ŸŠโ€ swimmer

David Dependancy Status npm package v npm package dm Join the community on Slack Github Stars Twitter Follow

An async task pooling and throttling utility for javascript.

Features

  • ๐Ÿš€ 3kb and zero dependencies
  • ๐Ÿ”ฅ ES6 and async/await ready
  • ๐Ÿ˜Œ Simple to use!

Interactive Demo

Installation

$ yarn add swimmer
# or
$ npm i swimmer --save

UMD

https://unpkg.com/swimmer/umd/swimmer.min.js

Inline Pooling

Inline Pooling is great for:

  • Throttling intensive tasks in a serial fashion
  • Usage with async/await and promises.
  • Ensuring that all tasks succeed.
import { poolAll } from 'swimmer'

const urls = [...]

const doIntenseTasks = async () => {
  try {
    const res = await poolAll(
      urls.map(task =>
        () => fetch(url) // Return an array of functions that return a promise
      ),
      10 // Set the concurrency limit
    )
  } catch (err, task) {
    // If an error is encountered, the entire pool stops and the error is thrown
    console.log(`Encountered an error with task: ${task}`)
    throw err
  }

  // If no errors are thrown, you get your results!
  console.log(res) // [result, result, result, result]
}

Custom Pooling

Custom pools are great for:

  • Non serial
  • Reusable pools
  • Handling errors gracefully
  • Task management and retry
  • Variable throttle speed, pausing, resuming of tasks
import { createPool } from 'swimmer'

const urls = [...]
const otherUrls = [...]

// Create a new pool with a throttle speed and some default tasks
const pool = createPool({
  concurrency: 5,
  tasks: urls.map(url => () => fetch(url))
})

// Subscribe to errors
pool.onError((err, task) => {
  console.warn(err)
  console.log(`Encountered an error with task ${task}. Resubmitting to pool!`)
  pool.add(task)
})

// Subscribe to successes
pool.onSuccess((res, task) => {
  console.log(`Task Complete. Result: ${res}`)
})

// Subscribe to settle
pool.onSettled(() => console.log("Pool is empty. All tasks are finished!"))

const doIntenseTasks = async () => {
  // Add more tasks to the pool.
  tasks.forEach(
    url => pool.add(
      () => fetch(url)
    )
  )

  // Increase the concurrency to 10! This can also be done while it's running.
  pool.throttle(10)

  // Pause the pool
  pool.stop()

  // Start the pool again!
  pool.start()

  // Add a single task and WAIT for it's completion/failure
  try {
    const res = await pool.add(() => fetch('http://custom.com/asset.json'))
    console.log('A custom asset!', res)
  } catch (err) {
    console.log('Darn! An error...')
    throw err
  }

  // Then clear the pool. Any running tasks will attempt to finished.
  pool.clear()
}

API

Swimmer exports two functions:

  • poolAll - creates an inline async/await/promise compatible pool
    • Arguments
      • Array[Function => Promise] - An array of functions that return a promise.
      • Int - The concurrency limit for this pool.
    • Returns
      • A Promise that resolves when all tasks are complete, or throws an error if one of them fails.
    • Example:
  • createPool - creates an custom pool
    • Arguments
      • Object{} - An optional configuration object for this pool
        • concurrency: Int (default: 5) - The concurrency limit for this pool.
        • started: Boolean (default: true) - Whether the pool should be started by default or not.
        • tasks: Array[Function => Promise] - An array of functions that return a promise. These tasks will be preloaded into the pool.
    • Returns
      • Object{}
        • add(() => Promise, config{}) - Adds a task to the pool. Optionally pass a config object
          • config.priority - Set this option to true to queue this task in front of all other pending tasks.
          • Returns a promise that resolves/rejects with the eventual response from this task
        • start() - Starts the pool.
        • stop() - Stops the pool.
        • throttle(Int) - Sets a new concurrency rate for the pool.
        • clear() - Clears all pending tasks from the pool.
        • getActive() - Returns all active tasks.
        • getPending() - Returns all pending tasks.
        • getAll() - Returns all tasks.
        • isRunning() - Returns true if the pool is running.
        • isSettled() - Returns true if the pool is settled.
        • onSuccess((result, task) => {}) - Registers an onSuccess callback.
        • onError((error, task) => {}) - Registers an onError callback.
        • onSettled(() => {}) - Registers an onSettled callback.

Tip of the year

Make sure you are passing an array of thunks. A thunk is a function that returns your task, not your task itself. If you pass an array of tasks that have already been fired off then it's too late for Swimmer to manage them :(

Contributing

We are always looking for people to help us grow swimmer's capabilities and examples. If you have an issue, feature request, or pull request, let us know!

License

Swimmer uses the MIT license. For more information on this license, click here.

swimmer's People

Contributors

danilobuerger avatar tannerlinsley avatar wolverineks avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

swimmer's Issues

TypeScript port

Your lib is awesome! Because I use TypeScript, I ported the code to TypeScript; I'm not an expert but it works pretty nice. Maybe you want to support TypeScript?

// code based on JS version at https://github.com/tannerlinsley/swimmer/blob/5c4fceb83e233bbc8850fb55337526c07ec3a18a/src/index.js
type TThunk<T> = () => Promise<T>;

type TPoolConfig<T> = {
    concurrency: number;
    started: boolean;
    tasks: TThunk<T>[];
};

type TSettledCb = () => void;
type TErrorCb<T, E> = (error: E, task: TThunk<T>) => void;
type TSuccessCb<T> = (result: T, task: TThunk<T>) => void;

const defaultConfig: TPoolConfig<any> = {
    concurrency: 5,
    started: true,
    tasks: [],
};

const resolves = new WeakMap<TThunk<any>, Function>();
const rejects = new WeakMap<TThunk<any>, Function>();

export function createPool<T = any, E = any>(config: Partial<TPoolConfig<T>> = defaultConfig) {
    const { concurrency, started, tasks } = {
        ...defaultConfig,
        ...config,
    };

    let onSettles: TSettledCb[] = [];
    let onErrors: TErrorCb<T, E>[] = [];
    let onSuccesses: TSuccessCb<T>[] = [];

    let running: boolean = started;
    let active: TThunk<T>[] = [];
    let pending: TThunk<T>[] = tasks;
    let currentConcurrency = concurrency;

    const tick = () => {
        if (!running) {
            return;
        }
        if (!pending.length && !active.length) {
            onSettles.forEach(d => d());
            return;
        }
        while (active.length < currentConcurrency && pending.length) {
            const nextFn = pending.shift()!;
            active.push(nextFn);
            /* eslint-disable no-loop-func */
            (async () => {
                let success = false;
                let res: any;
                let error: any;
                try {
                    res = await nextFn();
                    success = true;
                } catch (e) {
                    error = e;
                }
                active = active.filter(d => d !== nextFn);
                if (success) {
                    resolves.get(nextFn)!(res);
                    onSuccesses.forEach(d => d(res, nextFn));
                } else {
                    rejects.get(nextFn)!(error);
                    onErrors.forEach(d => d(error, nextFn));
                }
                tick();
            })();
            /* eslint-enable no-loop-func */
        }
    };

    const api = {
        add: (fn: TThunk<T>, { priority }: { priority?: boolean } = {}) =>
            new Promise((resolve, reject) => {
                if (priority) {
                    pending.unshift(fn);
                } else {
                    pending.push(fn);
                }
                resolves.set(fn, resolve);
                rejects.set(fn, reject);
                tick();
            }),
        throttle: (n: number) => {
            currentConcurrency = n;
        },
        onSettled: (cb: TSettledCb) => {
            onSettles.push(cb);
            return () => {
                onSettles = onSettles.filter(d => d !== cb);
            };
        },
        onError: (cb: TErrorCb<T, E>) => {
            onErrors.push(cb);
            return () => {
                onErrors = onErrors.filter(d => d !== cb);
            };
        },
        onSuccess: (cb: TSuccessCb<T>) => {
            onSuccesses.push(cb);
            return () => {
                onSuccesses = onSuccesses.filter(d => d !== cb);
            };
        },
        stop: () => {
            running = false;
        },
        start: () => {
            running = true;
            tick();
        },
        clear: () => {
            pending = [];
        },
        getActive: () => active,
        getPending: () => pending,
        getAll: () => [...active, ...pending],
        isRunning: () => running,
        isSettled: () => !running && !active.length && !pending.length,
    };

    return api;
}

export function poolAll<T>(tasks: TThunk<T>[], concurrency: number) {
    return new Promise((resolve, reject) => {
        const pool = createPool({
            concurrency,
        });
        const results: T[] = [];
        pool.onSettled(() => {
            resolve(results);
        });
        pool.onError(err => {
            reject(err);
        });
        tasks.forEach((task, i) => {
            pool.add(async () => {
                const res = await task();
                results[i] = res;
                return res;
            });
        });
        pool.start();
    });
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.