Comments (11)
I have found a work around but it requires an allocation via box.
#![feature(await_macro, async_await, futures_api, pin)]
use futures::TryFutureExt;
use tokio::runtime::current_thread as current;
fn main() {
// Create a very simple 0.3 Future via an async block
let f = async move {
Ok(())
};
// Or via an async fn
// f2 and f are the same type both `impl Future<Output = Result<(), ()>>`
let f2 = example();
let mut runtime = current::Runtime::new().unwrap();
// Pin this 0.3 future so that the values do not move
let fut = Box::pinned(f);
// Translate this 0.3 pinned future to a 0.1 future
let fut = fut.compat();
// Spawn the 0.1 future we created via an async block
runtime.spawn(fut);
// Lets spawn the second future we created via an async fn
runtime.spawn(Box::pinned(f2).compat());
runtime.run().unwrap();
}
async fn example() -> Result<(), ()> {
Ok(())
}
and my cargo.toml looks like:
[dependencies]
futures-preview = { git = "https://github.com/LucioFranco/futures-rs", features = ["tokio-compat"] }
tokio = { version = "0.1", features = ["async-await-preview"]}
This works for me on nightly-x86_64-apple-darwin unchanged - rustc 1.33.0-nightly (790f4c566 2018-12-19)
.
So the key here is that you must pin the async
block before passing it to the compat layer. Another thing to note is that it requires that the 0.3 future is a TryFuture
which is a regular 0.3 future with the additional bounds that the associated type Ouput
must be a Result
. Only TryFuture
s have the compat
method. To show this even further the only import from futures 0.3 is TryFutureExt
since all the nice future combinators now exist as extension traits.
If you'd like more information there is a post in futures-rs
that is still sitting in a PR but it can be found here.
Hope this helps!
from quinn.
Thanks, that did it!
#![feature(await_macro, async_await, futures_api, pin)]
extern crate quinn;
extern crate tokio;
use tokio::prelude::*;
use futures_preview::{TryFutureExt, StreamExt};
use futures_preview::compat::Stream01CompatExt;
use tokio::runtime::current_thread as current;
async fn listener(incoming: quinn::Incoming) -> Result<(), ()> {
let mut incoming = incoming.compat();
while let Some(Ok(conn)) = await!(incoming.next()) {
let mut incoming = conn.incoming.compat();
while let Some(byte_stream) = await!(incoming.next()) {
match byte_stream.unwrap() {
quinn::NewStream::Bi(_) => {
println!("byte stream!");
},
quinn::NewStream::Uni(_) => {
// config.max_remote_uni_streams is defaulted to 0
unreachable!();
}
}
}
}
Ok(())
}
fn main() {
let builder = quinn::Endpoint::new();
let (_, driver, incoming) = builder.bind("0.0.0.0:9393").unwrap();
let mut runtime = current::Runtime::new().unwrap();
runtime.spawn(driver.map_err(|err| panic!(err)));
runtime.spawn(Box::pinned(listener(incoming)).compat());
runtime.run().unwrap();
}
Note that this does not have proper error handling, and will only handle a single connection at a time.
from quinn.
No, let's not close this! I think wanting to use Quinn with whatever futures compatibility layer today is perfectly reasonable and we should see if/how we can get it to work.
from quinn.
With those tips, the proof of concept is getting dangerously close to working:
#![feature(await_macro, async_await, futures_api, pin)]
extern crate quinn;
extern crate tokio;
use futures_preview::{TryFutureExt, StreamExt};
use futures_preview::compat::Stream01CompatExt;
use tokio::runtime::current_thread as current;
async fn listener() -> Result<(), ()> {
let builder = quinn::Endpoint::new();
let (_, _, incoming) = builder.bind("0.0.0.0:9393").unwrap();
let mut incoming = incoming.compat();
while let Some(Ok(conn)) = await!(incoming.next()) {
let mut incoming = conn.incoming.compat();
while let Some(byte_stream) = await!(incoming.next()) {
match byte_stream.unwrap() {
quinn::NewStream::Bi(_) => {
println!("byte stream!");
},
quinn::NewStream::Uni(_) => {
// config.max_remote_uni_streams is defaulted to 0
unreachable!();
}
}
}
}
Ok(())
}
fn main() {
let mut runtime = current::Runtime::new().unwrap();
runtime.spawn(Box::pinned(listener()).compat());
runtime.run().unwrap();
}
It compiles, but the first await!(incoming.next())
immediately returns None
, when it should block until a connection is ready. This causes the program to immediately exit without error. Any idea why this would be? My assumption would be that a Compat01As03
stream's next()
would behave the same as any other 0.3 stream's next()
.
from quinn.
EndpointBuilder::bind()
returns three things: the Endpoint
itself, a Driver
future that performs the actual I/O, and a stream of incoming NewConnection
s. You're dropping the Driver
, so no packets can be received (or sent), so no new connections will ever be accepted. You just need to spawn the Driver
(the second element of the tuple) onto your runtime using e.g. tokio_current_thread::spawn
.
e: note that constructing an Endpoint
is synchronous, so usually it makes sense to do that and spawn off the Driver
in main so you don't have to worry about it elsewhere.
from quinn.
Hey, thanks for your interest! We have plans to make Quinn more friendly to sharing an Endpoint
between multiple threads in the future, but for now you can use an executor that supports !Send
futures, such as the one defined in tokio::runtime::current_thread
.
It looks like you're using tokio-async-await, and I'm not sure how exactly that works. async/await syntax itself supports !Send
futures just fine, but I don't know what's necessary to use that with tokio-async-await. Instead, you could try using the futures 0.3 compat tools to convert Quinn's 0.1 futures to 0.3, use async/await syntax as desired, and then convert back to 0.1 futures again to pass to the tokio current-thread runtime.
Ultimately, we'll update things to use futures 0.3 directly, but for now our focus is on updating to the latest IETF drafts.
from quinn.
Sadly, I think run_async
and spawn_async
are the only ways to schedule async blocks (which is necessary because await!
can only be called inside of an async block), and they don't offer control over which thread they're run on.
I'll dig in the tokio source code and see if there's some way to mash everything together.
from quinn.
To be clear, you can do this by using the futures 0.3 compatibility tools instead of tokio-async-await, because you'll end up with regular futures-0.1 futures that you can spawn like any other future.
from quinn.
This is getting outside my comfort zone because it's building on a lot of unstable work, and there isn't much prior art. But unless I'm misunderstanding you, I think that's not possible because futures produced by the compatibility layer are pinned, and I guess tokio requires that they not be pinned. e.g., let's simplify and not even worry about quinn for now:
#![feature(await_macro, async_await, futures_api, pin)]
extern crate quinn;
extern crate tokio;
extern crate futures_preview;
use futures_preview::compat;
use tokio::runtime::current_thread as current;
fn main() {
let f = compat::Compat::new(async move {
Ok(())
});
let runtime = current::Runtime::new().unwrap();
runtime.spawn(f);
runtime.run().unwrap();
}
This will produce an error:
error[E0277]: the trait bound `std::future::GenFuture<[static generator@src/main.rs:11:44: 13:6 _]>: std::pin::Unpin` is not satisfied in `impl futures_preview::Future`
--> src/main.rs:16:13
|
16 | runtime.spawn(f);
| ^^^^^ within `impl futures_preview::Future`, the trait `std::pin::Unpin` is not implemented for `std::future::GenFuture<[static generator@src/main.rs:11:44: 13:6 _]>`
|
= help: the following implementations were found:
<std::future::GenFuture<T> as std::pin::Unpin>
= note: required because it appears within the type `impl futures_preview::Future`
= note: required because of the requirements on the impl of `tokio::prelude::Future` for `futures_preview::compat::Compat<impl futures_preview::Future>`
For referencing, I'm pulling in futures-preview in Cargo.toml
like so:
[dependencies]
futures_preview = { version = "0.3.0-alpha.10", package = "futures-preview", features = ["compat", "tokio-compat"] }
Which seems pretty hack-ey because I have essentially two copies of futures 0.1 - those from the futures 0.1 crate, as pulled in by tokio, and those from the futures 0.3 compatibility layer.
Anyway, I get that this is veering pretty off-course from quinn and the project's (at least short-term) roadmap. Would you like me to close the issue?
from quinn.
All the misc Tokio types need to have additional _async
fns added to support futures 0.3.
So, the current_thread runtime can have fns added similar to the main runtime and it should work.
from quinn.
Which seems pretty hack-ey because I have essentially two copies of futures 0.1 - those from the futures 0.1 crate, as pulled in by tokio, and those from the futures 0.3 compatibility layer.
These should be using the same copy of futures 0.1; futures 0.3 depends on futures 0.1 to provide this functionality.
from quinn.
Related Issues (20)
- stream sending buffer size and how much left HOT 3
- ECN interop failures HOT 3
- aggressive open_bi HOT 1
- Create my own AsyncUdpSocket HOT 1
- "SendableFrames was SendableFrames { acks: false, other: true }, but only ACKs have been written" HOT 11
- Black hole detection false-positives HOT 5
- Expose Packet Decoder? HOT 2
- ReadExactError::FinishedEarly byte count is sometimes incorrect HOT 1
- How to receive data in blocking way HOT 3
- Inconsistent documentation on platform availability of `local_ip` HOT 3
- long running bi stream HOT 5
- seems like quinn 0.11 not working well under heavy load HOT 12
- API for awaiting for stream reset on the reader HOT 5
- build fails on Solaris HOT 4
- Rotation of Connection IDs HOT 5
- How to run insecure connection example? HOT 1
- Weird issue when transferring large file HOT 3
- `ACKs are delivered in order` panic when packets are reordered
- Allow a client to specify Initial Connection ID HOT 2
- Bundle flow control and ACK frames opportunistically
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from quinn.