dart-lang / pool Goto Github PK
View Code? Open in Web Editor NEWA class for managing a finite pool of resources.
Home Page: https://pub.dev/packages/pool
License: BSD 3-Clause "New" or "Revised" License
A class for managing a finite pool of resources.
Home Page: https://pub.dev/packages/pool
License: BSD 3-Clause "New" or "Revised" License
This would be useful to check if the "worker pool" after a certain timeout is "finished" with all the tasks.
Example of use case:
import 'dart:async';
import 'dart:collection';
import 'dart:mirrors';
import 'package:async/async.dart';
import 'package:pool/pool.dart';
void main() async {
Queue<String> queue = Queue();
Pool workerPool = new Pool(2, timeout: Duration(seconds: 1));
queue.addAll(["Hey", "there", "dart", "team"]);
// If the loop is "inactive" for 5 seconds, processing should stop.
var inactivityTimer = RestartableTimer(Duration(seconds: 5), () {});
while (true) {
if (!inactivityTimer.isActive) {
// RELEVANT LINE: The queue is empty, the inactivity timer has been triggered, there is currently no way besides reflection to check if the pool resources has all completed
// Wait until all all work is done
if (workerPool.allocatedResources == 0) {
print("All workers are done");
break;
}
}
// The queue is empty, wait to check again
if (queue.isEmpty) {
await Future.delayed(Duration(seconds: 5));
} else {
// We have work to do, create a new pooled resource to work on it
workerPool.withResource(() async {
await Future.delayed(Duration(seconds: 10));
print(queue.removeFirst())
});
inactivityTimer.reset();
}
// If we pummel the workerPool with resource requests too fast, it locks up.
await Future.delayed(Duration(milliseconds: 10));
}
}
(Code is condensed to the basic idea, based on a web crawler I am working on).
Is this a feature that you will consider adding to this library?
withResource
is defined as:
Future<T> withResource<T>(FutureOr<T> Function() callback)
Method body has this line:
return await callback();
From my understanding the correct way to write that is something like:
final result = callback();
if (result is Future<T>) {
return await result;
} else {
return result;
}
I found this while searching how to use FutureOr<T>
correctly. Current code in Pool
is not a good sample to follow, or am I missing something?
The pub site can show example code for a package.
https://www.dartlang.org/tools/pub/package-layout#examples
Let me know if you want me to try working on this.
related issue : dart-lang/site-www#413
What is the proper way to handle an Exceception from a queued function:
How to clear all queued functions and to stop the pool?
The following code catches the Exception. But there is no option to clear the still queued functions or to exit the pool in general.
import 'package:pool/pool.dart';
main() async {
final pool = Pool(2, timeout: Duration(seconds: 30));
final List<Future> futures = [];
try {
for (var i = 0; i < 10; i++) {
futures.add(
pool.withResource(
() => Future.delayed(
Duration(seconds: 1),
() {
if (i == 5) {
throw 'Oh no!';
}
print(i);
return i;
},
),
),
);
}
await Future.wait(futures, eagerError: true);
} catch (e) {
print('CAUGHT: $e');
}
}
Output
0
1
2
3
4
CAUGHT: Oh no!
6
7
8
9
Workaround
This workaround uses a global stopSignal
that shortcuts the still queued functions.
import 'package:pool/pool.dart';
main() async {
bool stopSignal = false;
final pool = Pool(2, timeout: Duration(seconds: 30));
final List<Future> futures = [];
try {
for (var i = 0; i < 10; i++) {
futures.add(
pool.withResource(
() => stopSignal == true
? Future(() => null)
: Future.delayed(
Duration(seconds: 1),
() {
if (i == 5) {
throw 'Oh no!';
}
print(i);
return i;
},
),
),
);
}
await Future.wait(futures, eagerError: true);
} catch (e) {
print('CAUGHT: $e');
stopSignal = true;
}
}
0
1
2
3
4
CAUGHT: Oh no!
6
What is the best way to prematurely cancel and dispose of all remaining pool tasks and cleanly close the pool without waiting for the tasks to complete?
I've created the below util (or something like it) a few times.
The problem: the call to Future.wait
allocates a list of Future
instances for the size of sourceItems
(which may be very large) eagerly. Ideally, pkg:pool
would support a feature to allow
lazy iteration of sourceItems
aligned with async progress through the items.
Stream<T> pooledIterator<S, T>(
int poolSize, Iterable<S> sourceItems, Future<T> action(S source),
{bool swallowErrors(S item, Object error, StackTrace stack)}) async* {
swallowErrors ??= (item, e, s) => false;
final pool = Pool(poolSize);
final controller = StreamController<T>();
final items = Future.wait(sourceItems.map((item) async {
final resource = await pool.request();
try {
controller.add(await action(item));
} catch (e, stack) {
if (!swallowErrors(item, e, stack)) {
controller.addError(e, stack);
}
} finally {
resource.release();
}
})).whenComplete(() async {
await controller.close();
});
yield* controller.stream;
await items;
}
The following code runs fine:
import 'package:pool/pool.dart';
main() async {
Pool pool = new Pool(1);
String s = await pool.withResource(() async => "Hi");
print(s);
}
However in strong mode I get the following errors:
dartanalyzer --strong bin/testpool.dart
Analyzing [bin/testpool.dart]...
[error] Couldn't infer type parameter 'T'; 'Future' must be of type 'String'. (/home/adam/dev/fp3/dart/yacht/bin/testpool.dart, line 6, col 25)
[error] The argument type '() โ Future' can't be assigned to the parameter type '() โ String'. (/home/adam/dev/fp3/dart/yacht/bin/testpool.dart, line 6, col 38)
2 errors found.
I believe this is because withResource
allows callback
to return a T
or Future<T>
but this is not reflected in the function signature. Probably the type of callback should be: FutureOr<T> callback()
.
For example:
Lines 558 to 561 in 90eab06
Example failure:
Expected: a value less than <476.1904761904762>...
Expected: a value less than <476.1904761904762>
Actual: <542>
Which: is not a value less than <476.1904761904762>
Average time per task should be proportionate to the available pool resources.
package:test_api expect
google3:///third_party/dart/pool/test/pool_test.dart 558:15 main.<fn>.<fn>.<fn>
===== asynchronous gap ===========================
dart:async _AsyncAwaitCompleter.completeError
google3:///third_party/dart/pool/test/pool_test.dart main.<fn>.<fn>.<fn>
===== asynchronous gap ===========================
dart:async _asyncThenWrapperHelper
google3:///third_party/dart/pool/test/pool_test.dart main.<fn>.<fn>.<fn>
My guess is the host happened to run a bit slow, or we hit GC, or something like that. We either need to split these out into a separate performance test file that we can avoid running most of the time, or we need to make it less sensitive to whatever might cause it to spike.
Hi, I use the pool to execute multiple tasks like this:
final pool = Pool(16, timeout: Duration(seconds: 60));
for (var item in list) {
await pool.withResource(() async {
logger.i('[start download ${item.id}]');
await download(item);
logger.i('[end download ${item.id}]');
});
}
// do something after all files are downloaded
I tried to wait for all the futures in pool complete, but then it only runs one task per time, here is the log:
flutter: [I] [start download 781e3960-32dd-11eb-9ec9-055684436a00]
flutter: [I] [end download 781e3960-32dd-11eb-9ec9-055684436a00]
flutter: [I] [start download 781e6070-32dd-11eb-b6cf-49eed31976fb]
flutter: [I] [end download 781e6070-32dd-11eb-b6cf-49eed31976fb]
flutter: [I] [start download 781eae90-32dd-11eb-a23f-e9de996316c4]
flutter: [I] [end download 781eae90-32dd-11eb-a23f-e9de996316c4]
How can I wait for the pool but still run in parallel?
For example:
R Function(P1, P2) limit2<P1, P2>(R Function(P1, P2) callback) =>
(p1, p2) => withResource(() => callback(p1, p2));
This would make it easier to use pools for use cases like dart-archive/graphs#39, wrapping in 2 levels of closures in place is pretty ugly.
I could imagine some logic where the poolSize is defined via an environment variable or such. In these cases, a non-positive value will just hang. Better to throw early.
import 'dart:async';
import 'package:pool/pool.dart';
main() async {
Pool pool = new Pool(3);
for (int i = 0; i < 4; i++) {
pool.withResource(() {
print(i);
});
}
await pool.close();
}
With the version 1.2.1 of pool it prints "0, 1, 2, 3"
With version 1.2.2, it fails with error
Unhandled exception:
Bad state: withResource() may not be called on a closed Pool.
#0 Pool.withResource.<withResource_async_body> (package:pool/pool.dart:111:7)
#1 Future.Future.microtask.<anonymous closure> (dart:async/future.dart:144)
#2 _microtaskLoop (dart:async/schedule_microtask.dart:41)
#3 _startMicrotaskLoop (dart:async/schedule_microtask.dart:50)
#4 _runPendingImmediateCallback (dart:isolate-patch/isolate_patch.dart:96)
#5 _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:149)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.