rmanoka / async-scoped Goto Github PK
View Code? Open in Web Editor NEWA scope for async_std and tokio to spawn non-static futures
A scope for async_std and tokio to spawn non-static futures
Hello! I'd like to use this library but cannot figure out how to return references. This is the equivalent using raw crossbeam:
fn main(){
let group1 = [1,2,3];
let group2 = [4,5,6];
let groups = [group1, group2];
let results = crossbeam::scope(|s| {
let mut results = Vec::<&i32>::new();
let mut handles = vec![];
for group in &groups {
handles.push(s.spawn(move |_| {
&group[0]
}));
}
for handle in handles {
results.push(handle.join().unwrap());
}
results
});
println!("{:?}", results.unwrap());
}
Using the async version, "&groups does not live long enough":
let group1 = [1,2,3];
let group2 = [4,5,6];
let groups = [group1, group2];
let results = async_scoped::TokioScope::scope_and_block(|s| {
for group in &groups {
s.spawn(async {
&group[0]
});
}
});
Same with scope_and_collect. I don't think this is a bug, I'm new to rust and could use a pointer if this is possible. Thanks!
In this line, why the code is *guard
instead of !*guard
?
I have the following minimally reproducible example:
use tokio::fs::{File, OpenOptions};
use tokio::io::AsyncWriteExt;
use tokio::spawn;
fn main() {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(8)
.max_blocking_threads(1)
.enable_all()
.build()
.unwrap();
runtime.block_on(async {
let t= spawn(async move {
let mut f: File = OpenOptions::new()
.create(true)
.write(true)
.open("/tmp/foo.txt").await.unwrap();
let (_r, _outputs) = async_scoped::TokioScope::scope_and_block(|s| {
s.spawn(async move {
println!("----1---");
f.write_all(b"asdasd").await.unwrap();
println!("----2---");
f.write_all(b"asdasd").await.unwrap();
println!("----3---");
});
()
});
});
t.await.unwrap();
});
}
And I use next Cargo.toml
dependencies:
[dependencies]
tokio = { version = "1.34.0", features = ["rt", "time", "net", "sync", "rt-multi-thread", "io-util", "fs"] }
async-scoped = { version = "0.8.0", features = ["use-tokio"] }
This code prints ----1---
and ----2---
, but not prints ----3---
. So, program freezes on second write_all
. If i trying to create runtime with .max_blocking_threads(2)
, program woks ok. Second call to write_all
needs an extra thread? Why? It's a bug of the async_scoped
crate or not?
If I execute write_all
without extra spawn
in async block, program works ok too.
We should support async blocks in the scope_and_block and scope_and_collect. This issue is to collect thoughts on possible unsafety if this allowed.
Compiling on any nightly version after 2021-01-10
errors out:
Compiling async-scoped v0.6.0 (/tmp/async-scoped)
error: custom attribute panicked
--> src/tests.rs:6:86
|
6 | #[cfg_attr(all(feature = "use-tokio", not( feature = "use-async-std" )), tokio::test(flavor = "multi_thread", worker_threads=1...
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
...
21 | / test_fixtures! {
22 | | async fn test_scope() {
23 | | let not_copy = String::from("hello world!");
24 | | let not_copy_ref = ¬_copy;
... |
291 | | }
292 | | }
| |_- in this macro invocation
|
= help: message: internal error: entered unreachable code
= note: this error originates in a macro (in Nightly builds, run with -Z macro-backtrace for more info)
error: aborting due to previous error
error: could not compile `async-scoped`
(This is for use-tokio
, the error for use-async-std
is very similar:)
5 | #[cfg_attr(feature = "use-async-std", async_std::test)]
| ^^^^^^^^^^^^^^^
After some trial and error I traced the problem to commit rust-lang/rust@6184f23, which merges rust-lang/rust#80830.
I'm not sure whether this is an issue with rustc
, tokio
/async-std
or async-scoped
, I couldn't find an existing issue about it, so I decided to submit it here for the time being.
For now, I've pushed a workaround to my fork which just adds #![cfg(test)]
to the tests.rs
file, which means that the tests won't be compiled during normal usage, hence avoiding the error.
Are there plans for official support for tokio? I am curious what the technical blockers to this may be.
If I compile the example program, the compiler fails to infer the closure's type:
use async_scoped::*;
#[async_std::main]
async fn main() {
scoped_futures().await;
}
async fn scoped_futures() {
let not_copy = String::from("hello");
let not_copy_ref = ¬_copy;
let ((), vals) = Scope::scope_and_block(|s| {
for _ in 0..10 {
let proc = || async {
println!("Running a task! {:?}", not_copy_ref);
};
s.spawn(proc());
}
});
assert_eq!(vals.len(), 10);
}
$ cargo build
Updating crates.io index
Compiling async-scoped v0.7.0
Compiling rustfearlessstructured v0.1.0 (/Users/verdagon/RustFearlessStructuredConcurrency)
error[E0282]: type annotations needed for `&mut async_scoped::Scope<'_, (), Sp>`
--> src/main.rs:12:46
|
12 | let ((), vals) = Scope::scope_and_block(|s| {
| ^ consider giving this closure parameter the explicit type `&mut async_scoped::Scope<'_, (), Sp>`, where the type parameter `Sp` is specified
error: aborting due to previous error
For more information about this error, try `rustc --explain E0282`.
error: could not compile `rustfearlessstructured`
To learn more, run the command again with --verbose.
(reproduced in https://github.com/Verdagon/RustFearlessStructuredConcurrency/blob/33019c43cd95d5ad2152590263739549d38ab869/src/main.rs)
Is it possible with this crate to limit the number of concurrent spawned tasks in a scope?
For reference, I currently do this with unscoped tasks:
let mut stream = futures::stream::iter(futures).map(tokio::task::spawn).buffer_unordered(concurrent_limit);
while let Some(result) = stream.next().await {
// ... handle result
}
First of all, brilliant job!
Currently, you use feature gates with private traits. But there are cases, where user would like to implement your traits for their own runtimes or provide a different implementation for, let's say, tokio runtime.
My proposition is:
Blocker
/ Spawner
traits public;Scope
, which would be a reference to implementor of those traits.Honestly I don't see any value in feature gates, as they limit the functionality of the crate. If that's for Rust limitations bypass, describe them please.
If it doesn't sound for you, please tell me why. If it sounds great, I'd be happy to implement the proposition!
This issue can be closed once 0.9 version is published to crates.io
It seems that changing Cargo.toml
"just works", but I haven't tested it much.
when using join_all
, we can use references in function results, like:
async fn say_hello_in_vec(who: &str) -> Vec<Vec<&str>> {
let handles = (1..=5).map(|i| hello_in_vec(i, who));
// intra-task concurrency
let handle = join_all(handles);
let results = handle.await;
results
}
async fn hello_in_vec(i: i32, who: &str) -> Vec<&str> {
println!("Job {i} started");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let result = vec!["hello", who];
println!("Job {i} finished");
result
}
When trying to use scope_and_collect
instead, this does not compile:
async fn say_hello_in_vec(who: &str) -> Vec<Vec<&str>> {
let handles = (1..=5).map(|i| hello_in_vec(i, who));
// with async_scoped
let (_, results) = unsafe {
async_scoped::TokioScope::scope_and_collect(|s| {
for handle in handles {
s.spawn(handle);
}
})
.await
};
let results = results.into_iter().map(|res| res.unwrap()).collect();
results
}
async fn hello_in_vec(i: i32, who: &str) -> Vec<&str> {
println!("Job {i} started");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let result = vec!["hello", who];
println!("Job {i} finished");
result
}
error: lifetime may not live long enough
--> src/main.rs:80:35
|
79 | async fn say_hello_in_vec(who: &str) -> Vec<Vec<&str>> {
| - let's call the lifetime of this reference `'1`
80 | let handles = (1..=5).map(|i| hello_in_vec(i, who));
| ^^^^^^^^^^^^^^^^^^^^ returning this value requires that `'1` must outlive `'static`
The link to for_each_concurrent
is broken and is trying to open local application. See https://docs.rs/async-scoped/latest/async_scoped/index.html#structs
In my massively-parallel processing as part of MapLibre's Martin code I use for_each_concurrent
to run hundreds of millions of tasks in parallel. I was trying to understand the point you were making in the docs about its inefficiencies, or if I am using it wrong, but I might have misunderstood. Is this the right approach, or should I go with some alternative that involves scoped async? Thanks!
Hi,
I found your crate and I think it is very useful, however, I'd like to spawn a blocking future using spawn_blocking. Would it be possible to add spawn_blocking function to Scope?
Thanks!
The error messages aren't particularly obvious either. For example:
Compiling async-scoped v0.6.0
error[E0433]: failed to resolve: could not find `test` in `async_std`
--> /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/async-scoped-0.6.0/src/tests.rs:5:62
|
5 | #[cfg_attr(feature = "use-async-std", async_std::test)]
| ^^^^ could not find `test` in `async_std`
...
21 | / test_fixtures! {
22 | | async fn test_scope() {
23 | | let not_copy = String::from("hello world!");
24 | | let not_copy_ref = ¬_copy;
... |
291 | | }
292 | | }
| |_- in this macro invocation
|
= note: this error originates in a macro (in Nightly builds, run with -Z macro-backtrace for more info)
error[E0433]: failed to resolve: use of undeclared type `Scope`
--> /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/async-scoped-0.6.0/src/tests.rs:26:36
|
26 | let (stream, _) = unsafe { Scope::scope(|s| {
| ^^^^^ not found in this scope
|
help: consider importing this struct
|
1 | use crate::Scope;
|
It seems like the documentation should specify that having attributes
enabled for async-std
is a requirement. That feature is not turned on by default.
scope_and_collect
function uses FuturesUnordered
under the hood, so it returns futures in no order.
Unordered makes sense when we handle stream, but in collect
operation it makes little sense and add complexity on caller side.
(Also scope_and_collect
does not document that it returns results unordered).
Glad to be the first issue here. I happened to have a need of scoped task and come across your repo.
When i run scope_and_block()
within async-std runtime it returns
thread 'async-std/runtime' panicked at 'cannot call `Executor::enter()` if already inside an `Executor`', /rustc/5c1f21c3b82297671ad3ae1e8c942d2ca92e84f2/src/libstd/macros.rs:13:23
Would like to see if you have any clue to remove the prompt.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.