Coder Social home page Coder Social logo

pool's People

Contributors

bcko avatar chalin avatar davidmorgan avatar dependabot[bot] avatar devoncarew avatar franklinyow avatar jakemac53 avatar keertip avatar kevmoo avatar listepo avatar lrhn avatar natebosch avatar nex3 avatar pq avatar srawlins avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pool's Issues

[Feature Request] Add getter for _allocatedResources private field

This would be useful to check if the "worker pool" after a certain timeout is "finished" with all the tasks.

Example of use case:

import 'dart:async';
import 'dart:collection';
import 'dart:mirrors';

import 'package:async/async.dart';
import 'package:pool/pool.dart';

void main() async {
  Queue<String> queue = Queue();
  Pool workerPool = new Pool(2, timeout: Duration(seconds: 1));
  queue.addAll(["Hey", "there", "dart", "team"]);

  // If the loop is "inactive" for 5 seconds, processing should stop.
  var inactivityTimer = RestartableTimer(Duration(seconds: 5), () {});

  while (true) {
    if (!inactivityTimer.isActive) {
      // RELEVANT LINE: The queue is empty, the inactivity timer has been triggered, there is currently no way besides reflection to check if the pool resources has all completed 
      // Wait until all all work is done
      if (workerPool.allocatedResources == 0) {
        print("All workers are done");
        break;
      }
    }

    // The queue is empty, wait to check again
    if (queue.isEmpty) {
      await Future.delayed(Duration(seconds: 5));
    } else {
     // We have work to do, create a new pooled resource to work on it
      workerPool.withResource(() async {
        await Future.delayed(Duration(seconds: 10));
        print(queue.removeFirst())
      });
      inactivityTimer.reset();
    }

    // If we pummel the workerPool with resource requests too fast, it locks up.
    await Future.delayed(Duration(milliseconds: 10));
  }
}

(Code is condensed to the basic idea, based on a web crawler I am working on).

Is this a feature that you will consider adding to this library?

withResource awaits sync callback

withResource is defined as:
Future<T> withResource<T>(FutureOr<T> Function() callback)

Method body has this line:
return await callback();

From my understanding the correct way to write that is something like:

final result = callback();
if (result is Future<T>) {
  return await result;
} else {
  return result;
}

I found this while searching how to use FutureOr<T> correctly. Current code in Pool is not a good sample to follow, or am I missing something?

Stopping the pool on Exception

What is the proper way to handle an Exceception from a queued function:

How to clear all queued functions and to stop the pool?

The following code catches the Exception. But there is no option to clear the still queued functions or to exit the pool in general.

import 'package:pool/pool.dart';

main() async {
  final pool = Pool(2, timeout: Duration(seconds: 30));

  final List<Future> futures = [];

  try {
    for (var i = 0; i < 10; i++) {
      futures.add(
        pool.withResource(
          () => Future.delayed(
            Duration(seconds: 1),
            () {
              if (i == 5) {
                throw 'Oh no!';
              }
              print(i);
              return i;
            },
          ),
        ),
      );
    }

    await Future.wait(futures, eagerError: true);
  } catch (e) {
    print('CAUGHT: $e');
  }
}

Output

0
1
2
3
4
CAUGHT: Oh no!
6
7
8
9

Workaround

This workaround uses a global stopSignal that shortcuts the still queued functions.

import 'package:pool/pool.dart';

main() async {
  bool stopSignal = false;

  final pool = Pool(2, timeout: Duration(seconds: 30));

  final List<Future> futures = [];

  try {
    for (var i = 0; i < 10; i++) {
      futures.add(
        pool.withResource(
          () => stopSignal == true
              ? Future(() => null)
              : Future.delayed(
                  Duration(seconds: 1),
                  () {
                    if (i == 5) {
                      throw 'Oh no!';
                    }
                    print(i);
                    return i;
                  },
                ),
        ),
      );
    }

    await Future.wait(futures, eagerError: true);

  } catch (e) {
    print('CAUGHT: $e');
    stopSignal = true;
  }
}
0
1
2
3
4
CAUGHT: Oh no!
6

How do I cancel / clear the pool?

What is the best way to prematurely cancel and dispose of all remaining pool tasks and cleanly close the pool without waiting for the tasks to complete?

Support efficient pooled iteration

I've created the below util (or something like it) a few times.

The problem: the call to Future.wait allocates a list of Future instances for the size of sourceItems (which may be very large) eagerly. Ideally, pkg:pool would support a feature to allow
lazy iteration of sourceItems aligned with async progress through the items.

Stream<T> pooledIterator<S, T>(
    int poolSize, Iterable<S> sourceItems, Future<T> action(S source),
    {bool swallowErrors(S item, Object error, StackTrace stack)}) async* {
  swallowErrors ??= (item, e, s) => false;
  final pool = Pool(poolSize);
  final controller = StreamController<T>();

  final items = Future.wait(sourceItems.map((item) async {
    final resource = await pool.request();

    try {
      controller.add(await action(item));
    } catch (e, stack) {
      if (!swallowErrors(item, e, stack)) {
        controller.addError(e, stack);
      }
    } finally {
      resource.release();
    }
  })).whenComplete(() async {
    await controller.close();
  });

  yield* controller.stream;

  await items;
}

withResource() when callback returns a future and strong mode errors

The following code runs fine:

import 'package:pool/pool.dart';

main() async {
  Pool pool = new Pool(1);
  String s = await pool.withResource(() async => "Hi");
  print(s);
}

However in strong mode I get the following errors:

dartanalyzer --strong bin/testpool.dart
Analyzing [bin/testpool.dart]...
[error] Couldn't infer type parameter 'T'; 'Future' must be of type 'String'. (/home/adam/dev/fp3/dart/yacht/bin/testpool.dart, line 6, col 25)
[error] The argument type '() โ†’ Future' can't be assigned to the parameter type '() โ†’ String'. (/home/adam/dev/fp3/dart/yacht/bin/testpool.dart, line 6, col 38)
2 errors found.

I believe this is because withResource allows callback to return a T or Future<T> but this is not reflected in the function signature. Probably the type of callback should be: FutureOr<T> callback().

forEach tests using a stopwatch are flaky

For example:

pool/test/pool_test.dart

Lines 558 to 561 in 90eab06

expect((watch.elapsed ~/ itemCount).inMicroseconds,
lessThan(expectedElapsed / (poolSize - otherTaskCount)),
reason: 'Average time per task should be '
'proportionate to the available pool resources.');

Example failure:

Expected: a value less than <476.1904761904762>...
Expected: a value less than <476.1904761904762>
  Actual: <542>
   Which: is not a value less than <476.1904761904762>
Average time per task should be proportionate to the available pool resources.
package:test_api                                             expect
google3:///third_party/dart/pool/test/pool_test.dart 558:15  main.<fn>.<fn>.<fn>
===== asynchronous gap ===========================
dart:async                                                   _AsyncAwaitCompleter.completeError
google3:///third_party/dart/pool/test/pool_test.dart         main.<fn>.<fn>.<fn>
===== asynchronous gap ===========================
dart:async                                                   _asyncThenWrapperHelper
google3:///third_party/dart/pool/test/pool_test.dart         main.<fn>.<fn>.<fn>

My guess is the host happened to run a bit slow, or we hit GC, or something like that. We either need to split these out into a separate performance test file that we can avoid running most of the time, or we need to make it less sensitive to whatever might cause it to spike.

How to wait for pool finish?

Hi, I use the pool to execute multiple tasks like this:

final pool = Pool(16, timeout: Duration(seconds: 60));

   for (var item in list) {
        await pool.withResource(() async {
            logger.i('[start download ${item.id}]');
            await download(item);
            logger.i('[end download ${item.id}]');
        });
      }
// do something after all files are downloaded

I tried to wait for all the futures in pool complete, but then it only runs one task per time, here is the log:

flutter: [I]  [start download 781e3960-32dd-11eb-9ec9-055684436a00]
flutter: [I]  [end download 781e3960-32dd-11eb-9ec9-055684436a00]
flutter: [I]  [start download 781e6070-32dd-11eb-b6cf-49eed31976fb]
flutter: [I]  [end download 781e6070-32dd-11eb-b6cf-49eed31976fb]
flutter: [I]  [start download 781eae90-32dd-11eb-a23f-e9de996316c4]
flutter: [I]  [end download 781eae90-32dd-11eb-a23f-e9de996316c4]

How can I wait for the pool but still run in parallel?

Add `limitN` functions to wrap callbacks

For example:

R Function(P1, P2) limit2<P1, P2>(R Function(P1, P2) callback) =>
  (p1, p2) => withResource(() => callback(p1, p2));

This would make it easier to use pools for use cases like dart-archive/graphs#39, wrapping in 2 levels of closures in place is pretty ugly.

Throw ArgumentError if poolSize <= 0

I could imagine some logic where the poolSize is defined via an environment variable or such. In these cases, a non-positive value will just hang. Better to throw early.

Bad state: withResource() may not be called on a closed Pool.

import 'dart:async';
import 'package:pool/pool.dart';

main() async {
  Pool pool = new Pool(3);

  for (int i = 0; i < 4; i++) {
    pool.withResource(() {
      print(i);
    });
  }

  await pool.close();
}

With the version 1.2.1 of pool it prints "0, 1, 2, 3"
With version 1.2.2, it fails with error

Unhandled exception:
Bad state: withResource() may not be called on a closed Pool.
#0      Pool.withResource.<withResource_async_body> (package:pool/pool.dart:111:7)
#1      Future.Future.microtask.<anonymous closure> (dart:async/future.dart:144)
#2      _microtaskLoop (dart:async/schedule_microtask.dart:41)
#3      _startMicrotaskLoop (dart:async/schedule_microtask.dart:50)
#4      _runPendingImmediateCallback (dart:isolate-patch/isolate_patch.dart:96)
#5      _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:149)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.