smol-rs / async-channel Goto Github PK
View Code? Open in Web Editor NEWAsync multi-producer multi-consumer channel
License: Apache License 2.0
Async multi-producer multi-consumer channel
License: Apache License 2.0
Lines 204 to 208 in a0ba218
It's unclear to me why all streams are woken up instead of a single one when sending a message. (That is, why don't streams and recv behave in the same way, when the stream is typically essentially "polling recv forever").
Would it be possible to document that further?
Thanks,
Hello, how's it going?
I've been doing some benchmarking today to figure out where i can improve the performance of async_backplane (which is pretty cool btw, take a look :) ) and the bulk of the time is spent in channel creation/destruction (i presume creation). Is there any way some of this work could be delayed?
On my crappy 2015 macbook pro, it's ~1.3us to create and drop, which seems a little steep vs the ~180ns for try_send + try_recv. I realise there are two ends of it so it may not be entirely possible, but I thought I'd ask :)
I was surprised to find that cloning a Receiver
and sending a message on the connected Sender
results in the message showing up at only one of the downstream Receiver
s, but not both. Sending more messages results in a round-robin of sorts, where the Receiver
next in line awaiting recv
will get the next message.
This doesn't seem to be the implied MPMC behavior, though I'm not an authority on the matter.
Here's a minimal example in the form of a failing test:
#[test]
fn channel_sanity() {
let (tx, rx1) = async_channel::unbounded::<u32>();
let rx2 = rx1.clone();
let t1 = smol::spawn(async move {
let n = rx2.recv().await.unwrap();
assert_eq!(n, 666);
});
let t2 =
smol::spawn(async move {
let n = rx1.recv().await.unwrap();
assert_eq!(n, 666);
});
smol::block_on(async move {
tx.send(666).await.unwrap();
tx.send(123).await.unwrap();
let ((), ()) = futures::future::join(t1, t2).await;
});
}
Hello!
Are you guys planning to implement no_std
support for this channel?
I know that it is currently blocked by smol-rs/event-listener#61 but assume that it will be resolved soon.
I've checked the implementation and there are few decisions to be made (e.g. how to change std::abort
- panic
?) but this crates seems to be almost no_std
ready.
So my question is: did you plan to make such a move?
Contrary to the try_send method, the channel will discard the oldest value in the channel when it's full.
as Receiver
can detect if the channel is closed
, it probably can implement FusedStream
...
see also concurrent_queue::ConcurrentQueue::is_closed
Hey guys,
I am writing an MQTT client that heavily uses channels for internal communication across Network <-> Handler <-> Client.
In some cases I saw that the client places a message into the channel to handler it is visible that there is a message if the length is checked but the Handler is not always awoken for the processing of the client request.
I Included an image here with some explanation.
This is a test function inside a library which runs on tokio multithreaded.
What could be causing the receiver not to awake?
Thanks in advance
It would be nice to have oneshot
module with efficient oneshot channels.
Would it be possible to have overflow mode (similar to one in async_broadcast) in this implementation? Would be nice to have this mode for a channel that doesn't require Clone for messages.
From my initial inspection, it looks like Sender::try_send
is safe to use inside signal handlers. If this is case, it might be worth documenting since it's a compelling alternative to signal_hook::iterator::Signals
.
I believe Sender::send()
is not cancel safe in regard that it takes ownership of the value to be sent. If the future is cancelled, the value is gone with no way to recover it. This makes using it inconvenient within the select!
loop.
tokio's mpsc::Sender
suggests using reserve()
and Permit::send()
to resolve this problem. reserve()
returns a future, but it does not consume any value by itself. Permit::send()
then can be used to send the value without any blocking.
Can a similar API be implemented for async-channel? Is that feasible?
[1] https://docs.rs/tokio/latest/tokio/macro.select.html
[2] https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Sender.html#cancel-safety
Hi,
The selected version of concurrent-queue
is incorrect in the Cargo.toml
and should be at least 2.5
since #89 .
async-channel
version 2.3.0 use ForcePushError
which is only available since concurrent-queue
version 2.5.0.
Thus, if you have any version prior to 2.5.0 in your Cargo.lock
, it will not compile. (This actually fails only if you have a version of concurrent-queue
previously set to 2.4.0
, the others versions are not compatible with event-listener
, which is used by concurrent-queue
).
Simple steps to reproduce the error:
cargo new ac-bug
cd ac-bug
cargo add [email protected]
# Everything works fine because `cargo` will pull the last version of `concurrent-queue` with 2.x.y.
cargo b
# This is valid because `concurrent-queue` should be `^2.4.0` thanks to `event-listener`.
cargo update concurrent-queue --precise 2.4.0
# Note: This will be rejected
cargo update concurrent-queue --precise 2.3.0
# Kaboom
cargo b
I think piper::pipe
is very useful when you need AsyncRead/AsyncWrite on just bytes.
Are you planing to move its implementation here?
I have a rust program using gtk4. My cargo file is the following:
[package]
name = "mycode"
version = "0.1.0"
edition = "2024"
[dependencies]
async-channel = "2.3.0"
gtk4 = { version = "0.8.1", features = ["v4_6"] }
pollster = "0.3.0"
reqwest = { version = "0.12.2", features = ["blocking"] }
rfd = "0.14.1"
serde = "1.0.197"
serde_json = "1.0.115"
yup-oauth2 = "8.3.3"
And I get the following error.
cargo build
Compiling async-channel v2.3.0
error[E0432]: unresolved import concurrent_queue::ForcePushError
--> /home/vania/.cargo/registry/src/index.crates.io-6f17d22bba15001f/async-channel-2.3.0/src/lib.rs:50:41
|
50 | ...rentQueue, ForcePushError, PopError, PushError};
| ^^^^^^^^^^^^^^ no ForcePushError
in the root
error[E0599]: no method named force_push
found for struct ConcurrentQueue
in the current scope
--> /home/vania/.cargo/registry/src/index.crates.io-6f17d22bba15001f/async-channel-2.3.0/src/lib.rs:313:34
|
313 | match self.channel.queue.force_push(msg) {
| ^^^^^^^^^^ method not found in ConcurrentQueue<T>
Is it a version problem ??
A blocked sender loosing interest (i.e. kept alive but no longer polling) leads to a full deadlock: other blocked senders are never re-scheduled when messages are dequeued. This behaviour is surprising and quite the footgun since there is theoretically no obligation to poll a future to completion. AFAIK this is also at odd with what other channels do (tokio::mpsc even goes farther and prevents a deadlock if more senders are forgotten than the nominal capacity -- but this is arguably a less likely scenario).
Fortunately the fix looks trivial: use Event::notify_additional
instead of Event::notify
and get rid of the extra notification currently sent when a sender is unblocked.
I encountered this while testing stuff on Wasm (cleaned stacktrace):
TypeError: waiting is not allowed on this thread
std::sync::mutex::Mutex<T>::lock
event_listener::sys::<impl event_listener::Inner<T>>::lock
event_listener::sys::<impl event_listener::Inner<T>>::remove
event_listener::_::<impl core::ops::drop::Drop for event_listener::InnerListener<T,B>>::drop::__drop_inner
event_listener::_::<impl core::ops::drop::Drop for event_listener::InnerListener<T,B>>
core::ptr::drop_in_place<event_listener::InnerListener<(),alloc::sync::Arc<event_listener::Inner<()>>>>
core::ptr::drop_in_place<alloc::boxed::Box<event_listener::InnerListener<(),alloc::sync::Arc<event_listener::Inner<()>>>>>
core::ptr::drop_in_place<core::pin::Pin<alloc::boxed::Box<event_listener::InnerListener<(),alloc::sync::Arc<event_listener::Inner<()>>>>>>
core::ptr::drop_in_place<event_listener::EventListener>
core::ptr::drop_in_place<core::option::Option<event_listener::EventListener>>
core::ptr::drop_in_place<async_channel::RecvInner<()>>
core::ptr::drop_in_place<event_listener_strategy::FutureWrapper<async_channel::RecvInner<()>>>
Which I think boils down to:
Recv
holding RecvInner
RecvInner
holding Option<EventListener>
EventListener
holding InnerListener
InnerListener
drop implementation calling std::Inner::remove()
std::Inner::remove()
calling std::inner::lock()
std::inner::lock()
calling Mutex::lock()
I think in this case spinlooping makes sense when running on Wasm?
Let me know if this issue should be moved to event-listener
instead.
I believe this can be done without polluting the public API of this crate. Something like this would be very convenient:
// async
s.send(msg).await;
r.recv().await;
// blocking
s.send(msg).wait();
r.recv().wait();
Should also be quite easy to implement:
#[must_use]
pub struct Send<'a, T> {
sender: &'a Sender<T>,
listener: Option<EventListener>,
msg: Option<T>,
}
impl<'a, T: Unpin> Future for Send<'a, T> {
type Output = Result<(), SendError<T>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let msg = self.msg.take().unwrap();
match self.sender.try_send(msg) {
Ok(_) => return Poll::Ready(Ok(())),
Err(PushError::Closed(m)) => return Poll::Ready(Err(SendError(m))),
Err(PushError::Full(m)) => self.msg = Some(m),
}
match self.listener.as_mut() {
Some(listener) => match Pin::new(listener).poll(cx) {
Poll::Ready(()) => self.listener = None,
Poll::Pending => return Poll::Pending,
},
None => self.listener = Some(self.sender.channel.send_ops.listen()),
}
}
}
}
impl<'a, T> Send<'a, T> {
pub fn wait(mut self) -> Result<(), SendError<T>> {
loop {
let msg = self.msg.take().unwrap();
match self.sender.try_send(msg) {
Ok(_) => return Ok(()),
Err(PushError::Closed(m)) => return Err(SendError(m)),
Err(PushError::Full(m)) => self.msg = Some(m),
}
match self.listener.take() {
Some(listener) => listener.wait(),
None => self.listener = Some(self.sender.channel.send_ops.listen()),
}
}
}
}
From some super basic benchmarking (spsc), this implementation is ~30% faster than doing block_on(s.send(msg))
and block_on(r.recv())
. It was also slightly faster than crossbeam_channel::bounded
for my use case.
Hi, what is the best way to get a reply from a message sent to a channel?
The only way I found to achive it is by creating a second temporary channel for each call and push the response into it.
For example:
pub struct ReplyRequest {
pub message: String,
pub responder: async_channel::Sender<String>,
}
async fn send_a_message(request_tx: async_channel::Sender<ReplyRequest>) {
let (reply_tx, reply_rx) = async_channel::bounded(1);
let message = "Hello world".to_owned();
request_tx.send(ReplyRequest { message, responder: reply_tx }).await.unwrap();
let received_response = reply_rx.recv().await.unwrap();
println!("Replied: {}", received_response);
}
main(){
let (request_tx, request_rx) = async_channel::bounded(1);
your_executor::spawn(async move {
loop {
match request_rx.recv().await {
Ok(message) => {
println!("Received: {}", message.message);
message.responder.send("A new World!".to_owned()).await.unwrap();
}
Err(err) => panic!()
}
}
})
send_a_message(request_tx.clone()).await;
}
Is this the correct way of achieving it?
I'm looking to migrate a piece of code from a custom channel implementation to async-channel, but one important need is the ability to have statistics about the channels (including sender and receiver count). I see the Channel
struct has that data, but it doesn't appear to be exposed on either Sender
or Receiver
.
Would you be amenable to a PR which adds:
pub fn sender_count(&self) -> usize;
pub fn receiver_count(&self) -> usize;
to both Sender
and Receiver
?
Sometimes, I need to communicate between a synchronous thread and asynchronous code. I'd like to have synchronous methods for Sender
and Receiver
.
The flume
crate offers both synchronous and asynchronous methods for its channels. (One downside of flume compared to async-channel is that async-channel guarantees in the documentation that the only SendError
is "the receiver was closed", and the only RecvError
is "the sender was closed". I think that's true for flume as well, but it isn't documented.)
The following does not compile from v2.0.0 and after:
use futures::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
struct RecvU32(async_channel::Receiver<u32>);
impl Stream for RecvU32 {
type Item = u32;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<u32>> {
Pin::new(&mut self.0).poll_next(cx)
}
}
This is the error message:
error[E0277]: `PhantomPinned` cannot be unpinned
--> src/main.rs:11:18
|
11 | Pin::new(&mut self.0).poll_next(cx)
| -------- ^^^^^^^^^^^ within `(PhantomData<&()>, PhantomPinned)`, the trait `Unpin` is not implemented for `PhantomPinned`
| |
| required by a bound introduced by this call
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required because it appears within the type `(PhantomData<&()>, PhantomPinned)`
= note: required for `event_listener::EventListener` to implement `Unpin`
note: required because it appears within the type `__Origin<'_, u32>`
--> /home/oblique/.cargo/registry/src/index.crates.io-6f17d22bba15001f/async-channel-2.1.0/src/lib.rs:480:1
|
480 | / pin_project_lite::pin_project! {
481 | | /// The receiving side of a channel.
482 | | ///
483 | | /// Receivers can be cloned and shared among threads. When all receivers associated with a channel
... |
507 | | }
508 | | }
| |_^
= note: required for `async_channel::Receiver<u32>` to implement `Unpin`
note: required by a bound in `Pin::<P>::new`
--> /home/oblique/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/pin.rs:484:23
|
484 | impl<P: Deref<Target: Unpin>> Pin<P> {
| ^^^^^ required by this bound in `Pin::<P>::new`
...
503 | pub const fn new(pointer: P) -> Pin<P> {
| --- required by a bound in this associated function
= note: this error originates in the macro `$crate::__pin_project_make_unpin_impl` which comes from the expansion of the macro `pin_project_lite::pin_project` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0277`.
error: could not compile `tt` (bin "tt") due to previous error
Was this a design decision?
In some workloads, current implementation can lead to a CPU core being maxed out, even though there is not much work going on. I've managed to reproduce this behavior on a small example, where we have one producer and multiple consumer, and producer is slower than consumers. Here is a gist with code:
https://gist.github.com/mvucenovic/12221d23211ab7e22989de517a799b7f
With this code issue is reproducible with the unbounded channel, and bounded channels larger than 1.
Note that this is not a correctness bug, system will not deadlock or starve, but it will use 100% of CPU, presumably because of the unrestricted spin loop in recv method.
It is possible, though I have not tried, that the issue could occur when there are multiple producers that are faster then consumer.
This project declares a dependency on crate blocking
which is unused since git commit c7b259c.
A difference between zero capacity and one capacity bounded channel is that call try_send() return OK only when there is already receiver wait on it. And this is useful is somecase.
Golang and crossbeam-channel both have zero capacity bounded channel, maybe we should support it.
when i Compiling i linux arm64 ,I get an error message:
Compiling roxmltree v0.19.0 Compiling async-task v4.7.1 Compiling futures-lite v1.13.0 Compiling enumflags2 v0.7.10 Compiling piper v0.2.3 Compiling fontconfig-parser v0.5.6 Compiling crossbeam-epoch v0.9.18 Compiling async-channel v2.3.0 error[E0432]: unresolved import
concurrent_queue::ForcePushError
--> /root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/async-channel-2.3.0/src/lib.rs:50:41
|
50 | use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};
| ^^^^^^^^^^^^^^ no ForcePushError in the root
error[E0599]: no method named force_push found for struct
ConcurrentQueuein the current scope --> /root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/async-channel-2.3.0/src/lib.rs:313:34 | 313 | match self.channel.queue.force_push(msg) { | ^^^^^^^^^^ method not found in
ConcurrentQueue
Some errors have detailed explanations: E0432, E0599.
For more information about an error, try rustc --explain E0432.
error: could not compile async-channel
(lib) due to 2 previous errors
warning: build failed, waiting for other jobs to finish...
root@iot-OptiPlex-3020:/data/gitlab/app#`
In SQLx we originally tried using async-std'
s MPMC channels to implement our connection pool but found that particular implementation preferred high throughput over fairness, which is undesirable for a connection pool meant for use in a webserver; at high load some clients would time out waiting for a connection while other clients were served successfully. We pretty quickly switched to our own ad-hoc implementation, but that still suffers from unfairness: launchbadge/sqlx#453
Though I don't think that's impossible to remedy in our current implementation, I think using a ready-made, fair MPMC channel would simplify things a lot.
I don't see any documentation about what guarantees this crate can make regarding fairness, is that something that's been considered/characterized yet?
It would be really great if it was possible to have a methods like send_all
, and try_send_all
such that it is easy to send messages to all Receiver
s, not just a single one.
Hello!
I've just written this in my project to work around the send method being async
#![cfg_attr(feature = "nightly", feature(type_alias_impl_trait))]
#[cfg(not(feature = "nightly"))]
type Quiet = Pin<Box<dyn Future<Output = ()> + 'static + Send>>;
#[cfg(not(feature = "nightly"))]
pin_project! {
pub struct Sending<T> {
#[pin]
inner: Quiet,
_phantom: PhantomData<T>,
}
}
#[cfg(not(feature = "nightly"))]
impl<T: 'static + Send> Sending<T> {
#[allow(unused_must_use)]
pub fn new(sender: Sender<T>, value: T) -> Sending<T> {
Sending {
inner: Box::pin(async move {
sender.send(value).await;
}),
_phantom: PhantomData,
}
}
}
#[cfg(feature = "nightly")]
type Quiet<T> = impl Future<Output=()>;
#[allow(unused_must_use)]
#[cfg(feature = "nightly")]
fn quiet<T: 'static + Send>(sender: Sender<T>, value: T) -> Quiet<T> {
async move { sender.send(value).await; }
}
#[cfg(feature = "nightly")]
pin_project! {
pub struct Sending<T> {
#[pin]
inner: Quiet<T>,
}
}
#[cfg(feature = "nightly")]
impl<T: 'static + Send> Sending<T> {
pub fn new(sender: Sender<T>, value: T) -> Sending<T> {
Sending { inner: quiet(sender, value) }
}
}
impl<T: 'static + Send> Future for Sending<T> {
type Output = ();
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<()> {
let this = self.project();
Quiet::poll(this.inner, ctx)
}
}
Is this (or a version that returns the send result, doesn't use nightly and always boxes) something you might be interested in a PR for?
The documentation states:
Capacity must be a positive number. If cap is zero, this function will panic.
This could be fixed by taking a NonZeroUsize
instead.
When we call try_send
on a sender, here's what I think is happening:
ConcurrentQueue::push
is called hereUnbounded::push
is called herebusy_wait
is called hereI find it weird that we end up yielding the thread in async code.
If this is expected, I think it should be documented.
I have a dev server with high concurrency (32 cores). I ran the async-channel
tests on this server, and it appears that the mpmc
and mpmc_stream
bounded tests missed a notification and ran forever. I cancelled the tests and ran them again, but I haven't been able to replicate this issue, even after running cargo test
100 times.
Not sure what I missed, that repo is a dependency of this one.. has it moved somewhere?
Hello! Not an issue here per-se, but for those looking for an async channels crate that works with wasm (a niche community I'm sure), I just wanted to flag that this one does! I'm not sure if support here is intentional--since it's not mentioned in the docs, I have to assume it's unintentional, but, regardless, it's certainly appreciated.
I tried std and crossbeam, which, in general, are not appropriate for wasm, since to do anything useful most of the time, you have to block the main thread (which actually causes a panic at runtime), I then tried tokio, which fails to compile at all for wasm. However, I added this dependency, and it worked right out of the box! Thanks for your work creating this crate.
Hi,
I would like to know if the Recv future is cancel safe. Such that it could be used in a select! Macro.
I read the other issue about a select! and know that a select can create some annoying issues.
Thank you!
After the last receiver is dropped, all the items in the channel are effectively abandoned and can never be accessed. In this scenario, I think it's appropriate to drain the channel and drop all the items. One use case where this is relevant is a dispatch mechanism that uses oneshot channels:
let (req_s, req_r) = bounded(10);
// dispatch
for i in 0.. {
let (s, r) = oneshot();
req_s.send((i, s)).await.unwrap();
let res = r.recv().await.unwrap();
}
// worker
while let Ok((i, s)) = req_r.recv().await {
s.send(do_work(i));
}
Here, if worker
terminates while there are still requests in the channel, dispatch
can end up waiting for the response forever. This can be addressed with timeouts, but I think automatically draining the channel is a better solution.
The bevy
project is currently waiting for a release of this project supporting concurrent-queue
2.0.0 due to the release of async-executor
1.5.0 also supporting concurrent-queue
2.0.0.
The blocking methods on channels got restricted to non wasm and while its a bad idea to use it on the main thread, it can be quite useful when you want to run blocking code on a web worker.
my current use is with https://crates.io/crates/wasm_thread
https://github.com/smol-rs/async-fs/runs/7738662148?check_suite_focus=true
error: failed to parse manifest at `/home/runner/.cargo/registry/src/github.com-1ecc6299db9ec823/async-channel-1.7.0/Cargo.toml`
Caused by:
feature `resolver` is required
this Cargo does not support nightly features, but if you
switch to nightly channel you can add
`cargo-features = ["resolver"]` to enable this feature
This is due to a cargo bug: rust-lang/cargo#10954
Building crate async-channel v2.3.1 fails tests mpmc_stream
and mpmc
on s390x, like this:
Running `CARGO=/usr/bin/cargo CARGO_MANIFEST_DIR=/usr/share/cargo/registry/async-channel-2.3.1 CARGO_PKG_AUTHORS='Stjepan Glavina <[email protected]>' CARGO_PKG_DESCRIPTION='Async multi-producer multi-consumer channel' CARGO_PKG_HOMEPAGE='' CARGO_PKG_LICENSE='Apache-2.0 OR MIT' CARGO_PKG_LICENSE_FILE='' CARGO_PKG_NAME=async-channel CARGO_PKG_README=README.md CARGO_PKG_REPOSITORY='https://github.com/smol-rs/async-channel' CARGO_PKG_RUST_VERSION=1.60 CARGO_PKG_VERSION=2.3.1 CARGO_PKG_VERSION_MAJOR=2 CARGO_PKG_VERSION_MINOR=3 CARGO_PKG_VERSION_PATCH=1 CARGO_PKG_VERSION_PRE='' LD_LIBRARY_PATH='/tmp/tmp.5edrX768DR/target/s390x-unknown-linux-gnu/release/deps:/tmp/tmp.5edrX768DR/target/s390x-unknown-linux-gnu/release:/usr/lib/rustlib/s390x-unknown-linux-gnu/lib' /tmp/tmp.5edrX768DR/target/s390x-unknown-linux-gnu/release/deps/bounded-9d69132e446b210a`
104s
104s running 22 tests
104s test capacity ... ok
104s test len_empty_full ... ok
104s test receiver_count ... ok
104s test recv_after_close ... ok
104s test len ... ok
104s test send_after_close ... ok
104s test sender_count ... ok
104s test smoke ... ok
104s test smoke_blocking ... ok
104s test mpmc_stream ... FAILED
105s test mpmc ... FAILED
105s test spsc ... ok
105s test weak ... ok
105s test forget_blocked_sender ... ok
105s test forget_blocked_receiver ... ok
105s test close_wakes_sender ... ok
105s test close_wakes_receiver ... ok
106s test try_recv ... ok
107s test try_send ... ok
107s test force_send ... ok
108s test recv ... ok
108s test send ... ok
108s
108s failures:
108s
108s ---- mpmc_stream stdout ----
108s thread 'mpmc_stream' panicked at tests/bounded.rs:501:9:
108s assertion `left == right` failed
108s left: 3
108s right: 4
108s stack backtrace:
108s 0: rust_begin_unwind
108s 1: core::panicking::panic_fmt
108s 2: core::panicking::assert_failed_inner
108s 3: core::panicking::assert_failed
108s 4: core::ops::function::FnOnce::call_once
108s note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
108s
108s ---- mpmc stdout ----
108s thread 'mpmc' panicked at tests/bounded.rs:468:9:
108s assertion `left == right` failed
108s left: 5
108s right: 4
108s stack backtrace:
108s 0: rust_begin_unwind
108s 1: core::panicking::panic_fmt
108s 2: core::panicking::assert_failed_inner
108s 3: core::panicking::assert_failed
108s 4: core::ops::function::FnOnce::call_once
108s note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
108s
108s
108s failures:
108s mpmc
108s mpmc_stream
108s
108s test result: FAILED. 20 passed; 2 failed; 0 ignored; 0 measured; 0 filtered out; finished in 3.50s
Curiously, a previous build from virtually identical source (only change was specific to armel) succeeded.
The build environment had changed slightly:
systemd 256.2 -> 256-4
libc6 2.38 -> 2.39
linux-libc-dev 6.9.9 -> 6.9.10
Perhaps this is related to #78 - except here the failure has persisted in 5 contiguous rebuilds.
Hi there - I'm testing out a scheme for message-passing to a tokio websocket server from another running thread. The receiving end looks like this:
async fn gather_connections(
socket: TcpListener,
push_sender: Sender<i32>,
) {
while let Ok((stream, _)) = socket.accept().await {
println!("got new TCP connection");
}
}
async fn poll_state_updates(
mut state_receiver: async_channel::Receiver<i32>,
) {
while let Some(val) = state_receiver.next().await {
println!("{}", val)
}
}
pub async fn server_main(
push_sender: Sender<i32>,
state_receiver: async_channel::Receiver<i32>,
) {
let addr = "localhost:8080";
let socket = TcpListener::bind(&addr)
.await
.expect("unable to bind to socket");
println!("listening...");
let (_, _) = tokio::join!(
gather_connections(socket, push_sender),
poll_state_updates(state_receiver),
);
}
What I'm seeing here is a significant delay in the activity of socket.accept().await
- about a 1-second delay each time I attempt to open a socket with the server - when I'm also running the polling task on the receiver stream. This delay doesn't occur when the receiver stream task is not run. I've poked at the code and I have no idea why this is happening, if I've done something fundamentally wrong (I'm new to Rust and its async implementations generally), or if this is a tokio problem. There doesn't seem to be any problem or delay in sending messages across the channel.
Running 1.57.0 MSVC on:
[dependencies]
tokio-tungstenite = "*"
tokio = { version = "1.15.0", features = ["full"] }
num_cpus = "1.13.1"
futures-util = "0.3"
async-channel = "1.6.1"
Any idea what I've done wrong here? Thanks.
Hi,
How would a user replace the select!
macro of Crossbeam when using this library?
I'm wrapping Receiver::recv
and Sender::send
in structs. Because they're async fn
instead of manual Future impls they require boxing when stored. It'd be nice if we could expose two new types:
RecvFuture
SendFuture
These would not require boxing, which means they can be inlined in implementations and would increase overall perf when wrapping async-channel
. Given the simplicity of send
and recv
I suspect the implementation to be rather straight forward.
Thanks!
I'm not sure if can post this as an issue, but here it goes:
I'm trying to use Receiver::recv()
together with a timeout, similar like this example.
However, as Receiver::recv()
returns RecvError
, not std::io::Error
, things got complicated. I ended up to use anyhow::Error
but it really feels clumsy. (see code below).
Does RecvError
itself support converting from std::io::ErrorKind
? or, is there a plan to support users to create RecvError
for timeouts?
Here is my function complicated by error handling:
async fn chan_recv_with_timeout(chan: &Receiver<Vec<u8>>, timeout: Option<Duration>) -> anyhow::Result<Vec<u8>> {
let chan_f = chan.recv();
if let Some(duration) = timeout {
let time_f = Timer::after(duration);
futures::pin_mut!(chan_f);
match future::select(chan_f, time_f).await {
Either::Left((c, _)) => match c {
Ok(t) => Ok(t),
Err(r) => Err(anyhow::Error::new(r)),
},
Either::Right(_) => Err(anyhow::Error::new::<io::Error>(io::ErrorKind::TimedOut.into())),
}
} else {
match chan_f.await {
Ok(t) => Ok(t),
Err(r) => Err(anyhow::Error::new(r)),
}
}
}
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.