Coder Social home page Coder Social logo

Support for async/await about quinn HOT 11 CLOSED

quinn-rs avatar quinn-rs commented on July 20, 2024
Support for async/await

from quinn.

Comments (11)

LucioFranco avatar LucioFranco commented on July 20, 2024 2

I have found a work around but it requires an allocation via box.

#![feature(await_macro, async_await, futures_api, pin)]

use futures::TryFutureExt;
use tokio::runtime::current_thread as current;

fn main() {
    // Create a very simple 0.3 Future via an async block
    let f = async move {
        Ok(())
    };

    // Or via an async fn

    // f2 and f are the same type both `impl Future<Output = Result<(), ()>>`
    let f2 = example(); 

    let mut runtime = current::Runtime::new().unwrap();

    // Pin this 0.3 future so that the values do not move
    let fut = Box::pinned(f);

    // Translate this 0.3 pinned future to a 0.1 future
    let fut = fut.compat();

    // Spawn the 0.1 future we created via an async block
    runtime.spawn(fut);
	
	// Lets spawn the second future we created via an async fn
    runtime.spawn(Box::pinned(f2).compat());

    runtime.run().unwrap();
}

async fn example() -> Result<(), ()> {
    Ok(())
}

and my cargo.toml looks like:

[dependencies]
futures-preview = { git = "https://github.com/LucioFranco/futures-rs", features = ["tokio-compat"] }
tokio = { version = "0.1", features = ["async-await-preview"]}

This works for me on nightly-x86_64-apple-darwin unchanged - rustc 1.33.0-nightly (790f4c566 2018-12-19).

So the key here is that you must pin the async block before passing it to the compat layer. Another thing to note is that it requires that the 0.3 future is a TryFuture which is a regular 0.3 future with the additional bounds that the associated type Ouput must be a Result. Only TryFutures have the compat method. To show this even further the only import from futures 0.3 is TryFutureExt since all the nice future combinators now exist as extension traits.

If you'd like more information there is a post in futures-rs that is still sitting in a PR but it can be found here.

Hope this helps!

from quinn.

ysimonson avatar ysimonson commented on July 20, 2024 2

Thanks, that did it!

#![feature(await_macro, async_await, futures_api, pin)]

extern crate quinn;
extern crate tokio;

use tokio::prelude::*;
use futures_preview::{TryFutureExt, StreamExt};
use futures_preview::compat::Stream01CompatExt;
use tokio::runtime::current_thread as current;

async fn listener(incoming: quinn::Incoming) -> Result<(), ()> {
    let mut incoming = incoming.compat();

    while let Some(Ok(conn)) = await!(incoming.next()) {
        let mut incoming = conn.incoming.compat();

        while let Some(byte_stream) = await!(incoming.next()) {
            match byte_stream.unwrap() {
                quinn::NewStream::Bi(_) => {
                    println!("byte stream!");
                },
                quinn::NewStream::Uni(_) => {
                    // config.max_remote_uni_streams is defaulted to 0
                    unreachable!();
                }
            }
        }
    }

    Ok(())
}

fn main() {
    let builder = quinn::Endpoint::new();
    let (_, driver, incoming) = builder.bind("0.0.0.0:9393").unwrap();
    let mut runtime = current::Runtime::new().unwrap();
    runtime.spawn(driver.map_err(|err| panic!(err)));
    runtime.spawn(Box::pinned(listener(incoming)).compat());
    runtime.run().unwrap();
}

Note that this does not have proper error handling, and will only handle a single connection at a time.

from quinn.

djc avatar djc commented on July 20, 2024 1

No, let's not close this! I think wanting to use Quinn with whatever futures compatibility layer today is perfectly reasonable and we should see if/how we can get it to work.

from quinn.

ysimonson avatar ysimonson commented on July 20, 2024 1

With those tips, the proof of concept is getting dangerously close to working:

#![feature(await_macro, async_await, futures_api, pin)]

extern crate quinn;
extern crate tokio;

use futures_preview::{TryFutureExt, StreamExt};
use futures_preview::compat::Stream01CompatExt;
use tokio::runtime::current_thread as current;

async fn listener() -> Result<(), ()> {
    let builder = quinn::Endpoint::new();
    let (_, _, incoming) = builder.bind("0.0.0.0:9393").unwrap();
    let mut incoming = incoming.compat();

    while let Some(Ok(conn)) = await!(incoming.next()) {
        let mut incoming = conn.incoming.compat();

        while let Some(byte_stream) = await!(incoming.next()) {
            match byte_stream.unwrap() {
                quinn::NewStream::Bi(_) => {
                    println!("byte stream!");
                },
                quinn::NewStream::Uni(_) => {
                    // config.max_remote_uni_streams is defaulted to 0
                    unreachable!();
                }
            }
        }
    }

    Ok(())
}

fn main() {
    let mut runtime = current::Runtime::new().unwrap();
    runtime.spawn(Box::pinned(listener()).compat());
    runtime.run().unwrap();
}

It compiles, but the first await!(incoming.next()) immediately returns None, when it should block until a connection is ready. This causes the program to immediately exit without error. Any idea why this would be? My assumption would be that a Compat01As03 stream's next() would behave the same as any other 0.3 stream's next().

from quinn.

Ralith avatar Ralith commented on July 20, 2024 1

EndpointBuilder::bind() returns three things: the Endpoint itself, a Driver future that performs the actual I/O, and a stream of incoming NewConnections. You're dropping the Driver, so no packets can be received (or sent), so no new connections will ever be accepted. You just need to spawn the Driver (the second element of the tuple) onto your runtime using e.g. tokio_current_thread::spawn.

e: note that constructing an Endpoint is synchronous, so usually it makes sense to do that and spawn off the Driver in main so you don't have to worry about it elsewhere.

from quinn.

Ralith avatar Ralith commented on July 20, 2024

Hey, thanks for your interest! We have plans to make Quinn more friendly to sharing an Endpoint between multiple threads in the future, but for now you can use an executor that supports !Send futures, such as the one defined in tokio::runtime::current_thread.

It looks like you're using tokio-async-await, and I'm not sure how exactly that works. async/await syntax itself supports !Send futures just fine, but I don't know what's necessary to use that with tokio-async-await. Instead, you could try using the futures 0.3 compat tools to convert Quinn's 0.1 futures to 0.3, use async/await syntax as desired, and then convert back to 0.1 futures again to pass to the tokio current-thread runtime.

Ultimately, we'll update things to use futures 0.3 directly, but for now our focus is on updating to the latest IETF drafts.

from quinn.

ysimonson avatar ysimonson commented on July 20, 2024

Sadly, I think run_async and spawn_async are the only ways to schedule async blocks (which is necessary because await! can only be called inside of an async block), and they don't offer control over which thread they're run on.

I'll dig in the tokio source code and see if there's some way to mash everything together.

from quinn.

Ralith avatar Ralith commented on July 20, 2024

To be clear, you can do this by using the futures 0.3 compatibility tools instead of tokio-async-await, because you'll end up with regular futures-0.1 futures that you can spawn like any other future.

from quinn.

ysimonson avatar ysimonson commented on July 20, 2024

This is getting outside my comfort zone because it's building on a lot of unstable work, and there isn't much prior art. But unless I'm misunderstanding you, I think that's not possible because futures produced by the compatibility layer are pinned, and I guess tokio requires that they not be pinned. e.g., let's simplify and not even worry about quinn for now:

#![feature(await_macro, async_await, futures_api, pin)]

extern crate quinn;
extern crate tokio;
extern crate futures_preview;

use futures_preview::compat;
use tokio::runtime::current_thread as current;

fn main() {
    let f = compat::Compat::new(async move {
        Ok(())
    });

    let runtime = current::Runtime::new().unwrap();
    runtime.spawn(f);
    runtime.run().unwrap();
}

This will produce an error:

error[E0277]: the trait bound `std::future::GenFuture<[static generator@src/main.rs:11:44: 13:6 _]>: std::pin::Unpin` is not satisfied in `impl futures_preview::Future`
  --> src/main.rs:16:13
   |
16 |     runtime.spawn(f);
   |             ^^^^^ within `impl futures_preview::Future`, the trait `std::pin::Unpin` is not implemented for `std::future::GenFuture<[static generator@src/main.rs:11:44: 13:6 _]>`
   |
   = help: the following implementations were found:
             <std::future::GenFuture<T> as std::pin::Unpin>
   = note: required because it appears within the type `impl futures_preview::Future`
   = note: required because of the requirements on the impl of `tokio::prelude::Future` for `futures_preview::compat::Compat<impl futures_preview::Future>`

For referencing, I'm pulling in futures-preview in Cargo.toml like so:

[dependencies]
futures_preview = { version = "0.3.0-alpha.10", package = "futures-preview", features = ["compat", "tokio-compat"] }

Which seems pretty hack-ey because I have essentially two copies of futures 0.1 - those from the futures 0.1 crate, as pulled in by tokio, and those from the futures 0.3 compatibility layer.

Anyway, I get that this is veering pretty off-course from quinn and the project's (at least short-term) roadmap. Would you like me to close the issue?

from quinn.

carllerche avatar carllerche commented on July 20, 2024

All the misc Tokio types need to have additional _async fns added to support futures 0.3.

So, the current_thread runtime can have fns added similar to the main runtime and it should work.

from quinn.

Ralith avatar Ralith commented on July 20, 2024

Which seems pretty hack-ey because I have essentially two copies of futures 0.1 - those from the futures 0.1 crate, as pulled in by tokio, and those from the futures 0.3 compatibility layer.

These should be using the same copy of futures 0.1; futures 0.3 depends on futures 0.1 to provide this functionality.

from quinn.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.