smol-rs / event-listener Goto Github PK
View Code? Open in Web Editor NEWNotify async tasks or threads
License: Apache License 2.0
Notify async tasks or threads
License: Apache License 2.0
I have made a simple channel, trying to use this library:
https://github.com/beckend/footgun-event-listener
This hangs often recv_wait:
#[derive(Debug)]
struct Channel<T> {
closed: AtomicBool,
events_recv: Event,
events_queue_is_empty: Event,
queue: Mutex<VecDeque<T>>,
}
impl<T> Receiver<T> {
pub fn recv_wait(&self) -> Option<T> {
loop {
if self.channel.is_closed() {
return None;
}
let mut listener = self.channel.events_recv.listen();
let lis = listener.as_mut();
let mut queue = self.channel.queue.lock().unwrap();
if let Some(payload) = queue.pop_front() {
return Some(payload);
} else {
self.channel.events_queue_is_empty.notify(usize::MAX);
}
drop(queue);
lis.wait();
}
}
}
It's no supposed to hang since I use close_notify
:
impl<T> Sender<T> {
// ...
pub fn close(&self) {
self.channel.closed.store(true, Ordering::SeqCst);
}
pub fn close_notify(&self) {
self.close();
self.channel.events_recv.notify(usize::MAX);
}
}
It's a minimal channel example with and test.
https://github.com/beckend/footgun-event-listener/blob/main/src/__tests__/channel.spec.rs
Lines 665 to 699 in 42a1cac
Currently, the mutex is always acquired on entering the EventListener::poll
method. This doesn't seem to be necessary to me, as it is only used in the State::Notified
case
Edit: ok, it is needed as we modify the linked list. I think it could still be moved after the first match statement, as the pointer isn't dereferenced yet.
I'm pretty sure my use case for Event::notify
wants to notify n
additional listeners. You can see the code here: https://github.com/withoutboats/access-queue/blob/master/src/lib.rs
It's like a generalization of a Mutex which rather than using an AtomicBool
, allows up to count
concurrent accesses, used by a counter which decrements toward zero as accesses are given out, and increments back up as they are released. Whenever accesses are released, I want to increase the number of notified listeners until I hit the number of active listeners.
My understanding of the code is that if accesses are released before all of the previously notified listeners have awoken, some listeners can end up dropped, as they are not notified, but there is now free space in the queue which new attempts to access will take.
Is it possible to provide a version of Event::notify
which will notify additional active listeners (up to the number of active listeners in total)?
As discussed in #90 and #89, EventListener::new
has a big footgun: it takes &Event
ref but doesn't add itself to listen to the event passed. This is highly unintuitive IMO and if possible, we should remedy this. Two options here:
new
in favor of that. ORlisten()
implicit in the first poll of EventListener
and deprecate EventListener::listen
.I'd very much prefer the 2, since it removes the footgun completely.
I'm unsure if I'm holding the new interface wrong somehow or if I've encountered a bug.
Given this diff that attempts to track the event-listener interface changes, I'm encountering a deterministic panic like:
event-listener-5.2.0/src/lib.rs:1251:36:
listener was never inserted into the list
and as far as I can tell the code reduces to
let n = AtomicUsize::from(1);
let event = Event::new();
let mut listener = event.listen();
loop {
if 0 == n.load(Ordering::SeqCst) {
return Poll::Ready(());
};
ready!(Pin::new(&mut listener).poll(cx));
}
Do I need to replace the listener every time I poll it? Am I holding this wrong? Is this a bug?
If I'm using it wrong I'll happily contribute documentation to help others avoid this error
Potential unaligned read
Details | |
---|---|
Status | unsound |
Package | atty |
Version | 0.2.14 |
URL | softprops/atty#50 |
Date | 2021-07-04 |
On windows, atty
dereferences a potentially unaligned pointer.
In practice however, the pointer won't be unaligned unless a custom global allocator is used.
In particular, the System
allocator on windows uses HeapAlloc
, which guarantees a large enough alignment.
A Pull Request with a fix has been provided over a year ago but the maintainer seems to be unreachable.
Last release of atty
was almost 3 years ago.
The below list has not been vetted in any way and may or may not contain alternatives;
See advisory page for additional details.
This is a follow up to async-rs/async-std#841 I tracked the poll that isn't waking in the chain down to the EventListener.
It seems that when pending is returned
cx.waker().wake_by_ref();
needs to be called or having more event listeners active then executer threads seems to lead to them indefinitively spinning.
I'll add a PR for this.
Does it make sense to have a !Send + !Sync
version of Event
, to be used with LocalExecutor
? Feels silly to pay the cost of thread synchronization when everything is happening on a single thread.
(and also a thread local version of Mutex
and RwLock
, which IIUC is based on event_listener
?)
smol-rs/async-lock#80 (comment)
Given that this only cropped up after the v5.0 update, it's probably this crate's fault.
I think it's a pretty normal use case to create an EventListener
, then send the EventListener
somewhere else, but this somewhere else panics and we want to continue running the program anyway.
Since Event
itself implements RefUnwindSafe
, it seems that EventListener
not implementing UnwindSafe
is just an overlook?
In the crate example, two checks per loop are used
let flag = Arc::new(AtomicBool::new(false));
let event = Arc::new(Event::new());
// Spawn a thread that will set the flag after 1 second.
thread::spawn({
let flag = flag.clone();
let event = event.clone();
move || {
// Wait for a second.
thread::sleep(Duration::from_secs(1));
// Set the flag.
flag.store(true, Ordering::SeqCst);
// Notify all listeners that the flag has been set.
// <-- implicit SeqCst fence here
event.notify(usize::MAX); // <-- access to inner list mutex
}
});
// Wait until the flag is set.
loop {
// Check the flag.
if flag.load(Ordering::SeqCst) {
break;
}
// Start listening for events.
let listener = event.listen(); // <-- access to inner list mutex
// <-- implicit SeqCst fence here
// Check the flag again after creating the listener.
if flag.load(Ordering::SeqCst) {
break;
}
// Wait for a notification and continue the loop.
listener.wait(); // <-- access to inner list mutex
}
But is the first check and the ordering really necessary?
The only possibility for this to hang is when during a single loop, both of those conditions were satisfied:
listen
mutex access is not visible from notify
load
returns falseThe reasoning
Mutex
guarantees notify
synchronizes-with listen
notify
synchronizes-with listen
+ listen
sequences-before load
-> notify
inter-thread happens-before load
(Inter-thread happens-before Rule No.3)store
sequences-before notify
+ notify
inter-thread happens-before load
-> store
inter-thread happens-before load
(Inter-thread happens-before Rule No.4)store
happens-before load
-> store
is visible side-effect load
, condition 2 is false.So the correctness is guaranteed. As we can see, this correctness guarantee has nothing to do with the first load
and the memory ordering of the load
. I think the below code works as well
let flag = Arc::new(AtomicBool::new(false));
let event = Arc::new(Event::new());
// Spawn a thread that will set the flag after 1 second.
thread::spawn({
let flag = flag.clone();
let event = event.clone();
move || {
// Wait for a second.
thread::sleep(Duration::from_secs(1));
// Set the flag.
flag.store(true, Ordering::Relaxed);
// Notify all listeners that the flag has been set.
// <-- implicit SeqCst fence here
event.notify(usize::MAX); // <-- access to inner list mutex
}
});
// Check the flag and see whether we can oppurtunistically bypass listener creation
if !flag.load(Ordering::Relaxed) {
// Wait until the flag is set.
loop {
// Start listening for events.
let listener = event.listen(); // <-- access to inner list mutex
// <-- implicit SeqCst fence here
// Check the flag again after creating the listener.
if flag.load(Ordering::Relaxed) {
break;
}
// Wait for a notification and continue the loop.
listener.wait(); // <-- access to inner list mutex
}
}
Perhaps I'm missing something. I'm actually not too sure about some of my assertions above. Or there is more API and engineering concern to this matter?
Edit: I realize different usage may impose a different ordering requirement. I'm only using this crate to do grouped task backoff, so not hanging is everything I need. However, if people are doing initialization before notify
and processing result after the loop, it seems the above code with Relaxed
still works. (The different notify
circumstances can cause two different exit routes from the loop, but both routes seem to guarantee visibility)
Using this as a base to create libraries, it's very useful to know the listener count from time to time.
Hi,
This package has been a lifesaver. I really unable to implement https://github.com/rmcgibbo/async-priority-channel (which whos API is basically copied completely from smol-rs/async-channel) without out.
One question: would it be straightforward/possible for event-listener to provide a hook to use loom or shuttle. I'd like to be able to test my code which depends on event-listener using these techniques if possible. tokio-rs/loom#220 seems to indicate that the latest release of loom now handles SeqCst which is probably necessary.
Hit this bug while trying to integrate no_std
into async-channel
.
Lines 126 to 136 in 7ce2634
It is a common pattern for the number of listeners woken up to be std::usize::MAX
. If the mutex is under contention and the program falls back to the node queue, it then tries to fill up a Vec
with MAX
entries, which immediately causes the program to freeze up.
I'm not sure what the best solution to this is. We could just disable tagged events on no_std
, since as far as I know there is no way to implement it without blocking mutexes.
The mutex
example compiles but run incorrectly in my computer. I did some fixup and it works.
impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
self.0.locked.store(false, Ordering::Release);
self.0.lock_ops.notify(1);
}
}
At the moment, the fmt::Debug
implementation for Event
and EventListener
just writes Event { .. }
, giving no useful information. It would be better if it produced actual descriptive information regarding the contents of the Event
.
For Event
, if it can lock the mutex, I'd like it to produce:
For EventListener
, it should indicate whether it's registered in the Event
and, if it is, its current state (listening, notified, etc etc).
event-listener
in its current state (531c106) is unsound. It's possible to trigger a use-after-free bug completely with safe Rust.
PoC:
use event_listener::{Event, EventListener};
fn main() {
let event = Event::new();
let event2 = Event::new();
let mut listener = Box::pin(EventListener::<()>::new());
listener.as_mut().listen(&event);
listener.as_mut().listen(&event2);
drop(listener);
event.notify(1);
}
cargo miri run
:
paul@Pauls-MBP ~/dev/event-listener-test (git)-[master] % cargo miri run
Preparing a sysroot for Miri (target: aarch64-apple-darwin)... done
Finished dev [unoptimized + debuginfo] target(s) in 0.00s
Running `/Users/paul/.rustup/toolchains/nightly-aarch64-apple-darwin/bin/cargo-miri runner target/miri/aarch64-apple-darwin/debug/event-listener-test`
error: Undefined Behavior: out-of-bounds pointer use: alloc846 has been freed, so this pointer is dangling
--> /Users/paul/.rustup/toolchains/nightly-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/ptr/non_null.rs:399:18
|
399 | unsafe { &*self.as_ptr().cast_const() }
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ out-of-bounds pointer use: alloc846 has been freed, so this pointer is dangling
|
= help: this indicates a bug in the program: it performed an invalid operation, and caused Undefined Behavior
= help: see https://doc.rust-lang.org/nightly/reference/behavior-considered-undefined.html for further information
help: alloc846 was allocated here:
--> src/main.rs:7:24
|
7 | let mut listener = Box::pin(EventListener::<()>::new());
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
help: alloc846 was deallocated here:
--> src/main.rs:11:5
|
11 | drop(listener);
| ^^^^^^^^^^^^^^
= note: BACKTRACE (of the first span):
= note: inside `std::ptr::NonNull::<event_listener::sys::Link<()>>::as_ref::<'_>` at /Users/paul/.rustup/toolchains/nightly-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/ptr/non_null.rs:399:18: 399:46
= note: inside `event_listener::sys::Inner::<()>::notify::<event_listener::notify::Notify>` at /Users/paul/.cargo/registry/src/index.crates.io-6f17d22bba15001f/event-listener-4.0.0/src/std.rs:263:42: 263:52
= note: inside `event_listener::sys::<impl event_listener::Inner<()>>::notify::<event_listener::notify::Notify>` at /Users/paul/.cargo/registry/src/index.crates.io-6f17d22bba15001f/event-listener-4.0.0/src/std.rs:119:9: 119:35
= note: inside `event_listener::Event::notify::<i32>` at /Users/paul/.cargo/registry/src/index.crates.io-6f17d22bba15001f/event-listener-4.0.0/src/lib.rs:432:24: 432:44
note: inside `main`
--> src/main.rs:12:5
|
12 | event.notify(1);
| ^^^^^^^^^^^^^^^
note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace
error: aborting due to 1 previous error
The issue is that EventListener::listen(mut self: Pin<&mut Self>, event: &Event<T>)
doesn't deal with the case when the EventListener
is already currently linked/associated with another/or same Event
. It just unconditionally overwrites the content of the EventListener's Listener
.
In my opinion, the general api design of Event/EventListener
is rather unnecessarily complex and introduces more overhead than needed. Tokio's Notify
api is much "better" in that regard. 🤷♂️
This commit d9a315d seems to have triggered a regression in async-channel
, specifically the following code succeeds for me with v2.3.1 but hangs with v2.3.2:
use async_channel::{bounded, Sender};
use futures::{future::RemoteHandle, FutureExt};
use std::{any::Any, future::Future, mem, panic::AssertUnwindSafe, pin::Pin};
use tokio::{
runtime::Handle, task::{JoinError, LocalSet}
};
type Request = Box<dyn FnOnce() -> Box<dyn Future<Output = Response>> + Send>;
type Response = Result<Box<dyn Any + Send>, Box<dyn Any + Send>>;
#[derive(Debug)]
pub(super) struct Pool {
sender: Sender<(Request, Sender<RemoteHandle<Response>>)>,
}
impl Pool {
pub(super) fn new(threads: usize) -> Self {
let handle = Handle::current();
let handle1 = handle.clone();
let (sender, receiver) = bounded::<(Request, Sender<RemoteHandle<Response>>)>(1);
for _ in 0..threads {
let receiver = receiver.clone();
let handle = handle.clone();
let _ = handle1.spawn_blocking(move || {
let local = LocalSet::new();
handle.block_on(local.run_until(async {
while let Ok((task, sender)) = receiver.recv().await {
let _ = local.spawn_local(async move {
let (remote, remote_handle) = Pin::from(task()).remote_handle();
let _ = sender.send(remote_handle).await;
remote.await;
});
}
}))
});
}
Self { sender }
}
pub(super) fn spawn_pinned<F, Fut, T>(
&self, task: F,
) -> impl Future<Output = Result<T, JoinError>> + Send
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = T> + 'static,
T: Send + 'static,
{
let sender = self.sender.clone();
async move {
let task: Request = Box::new(|| {
Box::new(
AssertUnwindSafe(task().map(|t| Box::new(t) as Box<dyn Any + Send>))
.catch_unwind(),
)
});
let (sender_, receiver) = bounded::<RemoteHandle<Response>>(1);
sender.send((task, sender_)).await.unwrap();
let res = receiver.recv().await;
let res = res.unwrap().await;
#[allow(deprecated)]
res.map(|x| *Box::<dyn Any + Send>::downcast(x).unwrap())
.map_err(JoinError::panic)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::future::join_all;
use std::sync::{
atomic::{AtomicUsize, Ordering}, Arc
};
#[tokio::test]
async fn spawn_pinned() {
const TASKS: usize = 1000;
const ITERS: usize = 10;
const THREADS: usize = 4;
let pool = Pool::new(THREADS);
let count = Arc::new(AtomicUsize::new((1..TASKS).sum()));
for i in 0..ITERS {
println!("iter {}", i);
join_all((0..TASKS).map(|i| {
let count = count.clone();
pool.spawn_pinned(move || async move {
let _ = count.fetch_sub(i, Ordering::Relaxed);
})
}))
.await
.into_iter()
.collect::<Result<(), _>>()
.unwrap();
assert_eq!(count.load(Ordering::Relaxed), 0);
count.store((1..TASKS).sum(), Ordering::Relaxed);
}
}
}
(this is a horrible and hopefully temporary hack in lieu of tokio-rs/tokio#2545)
concurrent-queue
is a pretty heavy dependency for a crate as small as event-listener
. It would be nice if we were able to avoid using it.
https://github.com/smol-rs/event-listener/actions/runs/6367362187/job/17285871726
failures:
---- sys::tests::listener_slab_notify_prop stdout ----
thread 'sys::tests::listener_slab_notify_prop' panicked at src/no_std.rs:1270:9:
assertion `left == right` failed
left: Listener { state: Task(_), prev: Cell { value: None }, next: Cell { value: Some(3) } }
right: Listener { state: Task(_), prev: Cell { value: None }, next: Cell { value: Some(3) } }
stack backtrace:
0: std::panicking::begin_panic_handler
at /home/runner/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panicking.rs:597:5
1: core::panicking::panic_fmt
at /home/runner/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/panicking.rs:72:14
2: core::panicking::assert_failed_inner
at /home/runner/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/panicking.rs:[31](https://github.com/smol-rs/event-listener/actions/runs/6367362187/job/17285871726#step:6:32)5:17
3: core::panicking::assert_failed::<sys::Entry<()>, sys::Entry<()>>
at /home/runner/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/panicking.rs:270:5
4: sys::tests::listener_slab_notify_prop
at src/no_std.rs:1270:9
5: sys::tests::listener_slab_notify_prop::{closure#0}
at src/no_std.rs:1119:[36](https://github.com/smol-rs/event-listener/actions/runs/6367362187/job/17285871726#step:6:37)
6: <{closure@src/no_std.rs:1119:5: 1361:6} as std::ops::FnOnce<()>>::call_once - shim
at /home/runner/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ops/function.rs:250:5
7: <fn() -> std::result::Result<(), std::string::String> as std::ops::FnOnce<()>>::call_once - shim(fn() -> std::result::Result<(), std::string::String>)
at /home/runner/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ops/function.rs:2[50](https://github.com/smol-rs/event-listener/actions/runs/6367362187/job/17285871726#step:6:51):5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
failures:
sys::tests::listener_slab_notify_prop
Failed assertion is:
Lines 1270 to 1277 in d2ed2cd
This reminded me of rust-lang/rust#66281, but I have not looked if it is related.
cc @notgull
With the previous version I was able to do something like this:
async fn recv(&self) -> Vec<ParsedMetric> {
let listener = EventListener::new();
tokio::pin!(listener);
loop {
if let Some(metrics) = self.lifo.lock().pop_back() {
return metrics;
}
if listener.is_listening() {
listener.as_mut().await;
} else {
listener.as_mut().listen(&self.event);
}
}
}
In the new version this seems to roughly translate to:
async fn recv(&self) -> Vec<ParsedMetric> {
loop {
if let Some(metrics) = self.lifo.lock().pop_back() {
return metrics;
}
listener!(self.event => listener);
if let Some(metrics) = self.lifo.lock().pop_back() {
return metrics;
}
listener.await
}
}
However this fails compile with:
rror: future cannot be sent between threads safely
--> pulse-metrics/src/pipeline/processor/buffer/mod.rs:67:7
|
67 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `event_listener::sys::Inner<()>`, the trait `std::marker::Send` is not implemented for `NonNull<event_listener::sys::Link<()>>`
note: future is not `Send` as this value is used across an await
--> pulse-metrics/src/pipeline/processor/buffer/mod.rs:87:16
|
81 | listener!(self.event => listener);
| --------------------------------- has type `StackSlot<'_, ()>` which is not `Send`
...
87 | listener.await
| ^^^^^ await occurs here, with `mut $listener` maybe used later
note: required by a bound in `tokio::spawn`
--> /Users/mklein/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/task/spawn.rs:166:21
|
164 | pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
| ----- required by a bound in this function
165 | where
166 | F: Future + Send + 'static,
| ^^^^ required by this bound in `spawn`
I haven't dug into the guts but it seems like awaiting the future should somehow consume it such that this would not fail the Send check? Am I doing something wrong? Thank you.
warning: missing documentation for a method
--> src/lib.rs:1102:1
|
1102 | / easy_wrapper! {
1103 | | /// A future returned by [`Receiver::recv()`].
1104 | | #[derive(Debug)]
1105 | | #[must_use = "futures do nothing unless you `.await` or poll them"]
1106 | | pub struct Recv<'a, T>(RecvInner<'a, T> => Result<T, RecvError>);
1107 | | pub(crate) wait();
1108 | | }
| |_^
|
= note: this warning originates in the macro `$crate::__pin_project_struct_make_proj_method` which comes from the expansion of the macro `easy_wrapper` (in Nightly builds, run with -Z macro-backtrace for more info)
It looks like this comes from pin_project_lite
, but I'm not sure from where, exactly. Since this issue hasn't been observed in other crates that use pin_project_lite
I don't think it's an issue with that crate.
I generally like having only one crate per repo for this organization, since it makes the CI/release pipeline a lot cleaner and more helpful. So it would be nice to split out event-listener-strategy
into a new crate.
Swap would return the previous value, so line 43 should be changed or the two values in line 44 and 46 should be swapped.
I was recently made aware of this code:
Lines 562 to 577 in 0ea4641
As the comment says, this is UB -- and the myth that no sane compiler is going to optimize atomics is a myth. However, I admit I don't know enough about this specific case to say whether this is a risk for this crate.
What I am wondering, why doesn't this use inline assembly? That is the intended mechanism to force the compiler to generate code in a particular way, and it is clearly sound here. The alternative is even mentioned in the comment, but unfortunately without an explanation of why it was discarded.
serde_cbor is unmaintained
Details | |
---|---|
Status | unmaintained |
Package | serde_cbor |
Version | 0.11.2 |
URL | https://github.com/pyfisch/cbor |
Date | 2021-08-15 |
The serde_cbor
crate is unmaintained. The author has archived the github repository.
Alternatives proposed by the author:
See advisory page for additional details.
Hi.
In https://docs.rs/event-listener-strategy/latest/event_listener_strategy/struct.NonBlocking.html there are
auto-trait implementations of !Send
and !Sync
. Is this now the intended behaviour?
This causes an issue for using async_lock::OnceCell
between threads. It used to work before smol-2.0.0
was released.
By embedding a singular Entry
into the Event/Inner
structure itself, we could probably add a poll_listen()
function that allows a user to poll for an Event
notification without needing to use an EventListener
. Such a function would probably have the same caveat that poll_read()/poll_write()
does in async_io::Async
, where if it's called by different tasks it will cause a busy loop where one task wakes up another.
https://github.com/smol-rs/event-listener/actions/runs/7155060118/job/19483256324
Running tests/notify.rs (target/wasm32-unknown-unknown/debug/deps/notify-0ef5209d8abd9615.wasm)
Set timeout to 20 seconds...
Executing bindgen...
running 9 tests
panicked at /home/runner/work/event-listener/event-listener/src/no_std.rs:639:48:
index out of bounds: the len is 0 but the index is 4
Stack:
Error
at /home/runner/work/event-listener/event-listener/target/wasm32-unknown-unknown/wbg-tmp-notify-0ef5209d8abd9615.wasm/wasm-bindgen-test.js:562:17
at logError (/home/runner/work/event-listener/event-listener/target/wasm32-unknown-unknown/wbg-tmp-notify-0ef5209d8abd9615.wasm/wasm-bindgen-test.js:223:18)
at module.exports.__wbg_new_abda76e883ba8a5f (/home/runner/work/event-listener/event-listener/target/wasm32-unknown-unknown/wbg-tmp-notify-0ef5209d8abd9615.wasm/wasm-bindgen-test.js:561:65)
at console_error_panic_hook::Error::new::h7d6b2a2d770f64c0 (wasm://wasm/0039f982:wasm-function[1462]:0x9b9fe)
at console_error_panic_hook::hook_impl::hd5f26b63473b4819 (wasm://wasm/0039f982:wasm-function[405]:0x68b55)
at console_error_panic_hook::hook::hedca[37](https://github.com/smol-rs/event-listener/actions/runs/7155060118/job/19483256324#step:18:38)fbd9551172 (wasm://wasm/00[39](https://github.com/smol-rs/event-listener/actions/runs/7155060118/job/19483256324#step:18:40)f982:wasm-function[2033]:0xa4f45)
at wasm_bindgen_test::__rt::Context::new::{{closure}}::{{closure}}::h4947e30e1bc66a88 (wasm://wasm/0039f982:wasm-function[1124]:0x92beb)
at std::panicking::rust_panic_with_hook::hbf46ef0245cc9589 (wasm://wasm/0039f982:wasm-function[781]:0x83b7b)
at std::panicking::begin_panic_handler::{{closure}}::hc07db45[42](https://github.com/smol-rs/event-listener/actions/runs/7155060118/job/19483256324#step:18:43)14d2c87 (wasm://wasm/0039f982:wasm-function[1003]:0x8e[49](https://github.com/smol-rs/event-listener/actions/runs/7155060118/job/19483256324#step:18:50)c)
at std::sys_common::backtrace::__rust_end_short_backtrace::hf9e2f055fb5ef672 (wasm://wasm/0039f982:wasm-function[2[53](https://github.com/smol-rs/event-listener/actions/runs/7155060118/job/19483256324#step:18:54)3]:0xa90e6)
test notify::notify_all_fair ... FAIL
...
panic seems to be happening on this line:
Line 639 in 531c106
cc @notgull
Previously, there was a notify_all
method available. This method was dropped in version 2.0.0. How should previously existing usages of this method be replaced?
In my most recent PRs, I've added a significant amount of code that doesn't end up being tested on the common case. In order to make sure our tests function properly, it would be nice to have an automatic code coverage utility in order to make sure every branch of our code is tested properly.
It may also be a good idea to implement this for other crates in this organization.
Hello!
I've just built an RWLock based around a single atomic and now I'm trying to make an async version with eventlistener. For my purposes, simply knowing if any listeners were notified would suffice, but knowing the count seems a logical extension of that.
What do you think?
If this crate were no_std
, it would allow crates like async-lock
and async-channel
to be used on no_std
as well. I envision an API with a default std
feature, and without it this crate would not depend on parking
(and the wait
API would disappear as well).
However, the current version of the code uses a Mutex
to lock the linked list containing event listeners. Theoretically, we could replace this with an atomic linked list similar to the one used in concurrent-queue
.
This would help smol
be able to be used in embedded systems. Any thoughts on this?
Continued from the discussion in #100
The APIs of v3.x and v4.x increase complexity at the cost of adding more footguns. Originally I'd wanted to keep the API small and simple, but this appears to have introduced a handful of different ways to mess up (e.g. panicking when creating a new
EventListener
without listen
ing on it). In addition, v4.0 (#94) introduced a not-insignificant amount of overhead over the v3.x API. Therefore, as much as it pains me to have three breaking changes in this crate in such a short span of time, I think we need a better API.
Note: I am bad at naming things. None of these names are final.
The idea I proposed in #100: is as follows: add an API based around a Listener
trait that would look like this:
pub trait Listener<T>: Future<Output = T> + Sealed {
fn wait(self) -> T;
fn wait_timeout(self, timeout: Duration) -> Option<T>;
fn wait_deadline(self, deadline: Instant) -> Option<T>;
fn discard(self) -> Option<T>;
fn listens_to(&self, event: &Event<T>) -> bool;
fn same_event(&self, other: &(impl Listener<T> + ?Sized)) -> bool;
}
This Listener
trait can do anything that the current EventListener
type can do. It can be await
ed or it can be used to block the current thread. Alternatively, it can be discarded.
To emulate the previous 2.x
API, we would have an EventListener
type. This is a heap-allocated event listener that can be moved freely.
impl<T> Event<T> {
pub fn listen(&self) -> EventListener<T>;
}
pub struct EventListener<T = ()>(/* ... */);
impl Listener<T> for EventListener<T> { /* ... */ }
It would be used like this:
struct Flag {
flag: AtomicBool,
change_ops: Event<()>
}
impl Flag {
async fn wait(&self) {
loop {
// Check the flag.
if self.flag.load(Ordering::Relaxed) {
return;
}
// Create a listener. By default it is already inserted into the list.
let listener = self.change_ops.listen();
// Check the flag again.
if self.flag.load(Ordering::SeqCst) {
return;
}
// Wait on the listener.
listener.await;
}
}
fn notify(&self) {
self.flag.store(true, Ordering::Release);
self.change_ops.notify(1);
}
}
However, this is inefficient, as it preforms a heap allocation every time listen()
is called. We also provide an API that allows one to use the stack instead of the heap, at a slightly higher complexity cost. To start, you create a StackSlot
, which contains all of the state of the EventListener
but stored on the stack. After being pinned, it can be transformed into a StackListener
.
impl<T> Event<T> {
pub fn stack_slot(&self) -> StackSlot<'_, T> { /* ... */ }
}
pub struct StackSlot<'ev, T>(/* ... */);
pub struct StackListener<'ev, 'listener, T>(/* ... */);
impl<'ev, T> StackSlot<'ev, T> {
pub fn listen<'this>(self: Pin<&'this mut Self>) -> StackListener<'ev, 'this, T> { /* ... */ }
}
impl<T> Listener<T> for StackListener<'_, '_, T> { /* ... */ }
In addition, a macro will be provided that creates a StackSlot
from an Event
and automatically pins it to the heap.
macro_rules! listener {
($name:ident, $event:expr) => { /* ... */ }
}
/*
listener!(l, ev);
Expands to:
let mut l = ev.stack_slot();
let mut l = Pin::new_unchecked(&mut l);
*/
The example from above can be modified to look like this:
struct Flag {
flag: AtomicBool,
change_ops: Event<()>
}
impl Flag {
async fn wait(&self) {
listener!(slot, &self.change_ops);
loop {
// Check the flag.
if self.flag.load(Ordering::Relaxed) {
return;
}
// Insert the listener into the list.
let listener = slot.listen();
// Check the flag again.
if self.flag.load(Ordering::SeqCst) {
return;
}
// Wait on the listener.
listener.await;
}
}
fn notify(&self) {
self.flag.store(true, Ordering::Release);
self.change_ops.notify(1);
}
}
@smol-rs/admins Thoughts on this?
While reviewing other versions of synchronization primitives, I realized that the version of this crate that exists in the tokio
ecosystem doesn't allocate at all. It stores the event listener entry on the stack in the particular entry, using pin guarantees to erase any potential unsoundness. We may be able to emulate this approach in this crate as well.
The main advantage of this is that we could avoid allocation altogether. This could lead to faster code, but could also allow this crate (and it's dependents) to be run virtually anywhere. The main downside is that it would make it impossible to make this crate forbid(unsafe_code)
, the API would need to be radically changed and it would make the crate less simple.
Any thoughts on these trade-offs? #26 looks like it tries this.
Copy&pasting from the matrix channel to avoid repeating:
zeenix: this method takes an Event ref but doesn't actually setup the listener to listen to this event. This seems like a pretty big footgun
Saltbag: The only other option is to have it insert itself into the list on await, which has its own footguns. It can’t be set up in new since it needs to be pinned.
zeenix: it's just a very strange and unintuitive api now. I see multiple issues here:
EventListener
existing w/o listening is just strange. The name implies something that's listening.new
takes aEvent
ref but doesn't listen to the given event.- The footgun of awaiting a listener w/o calling listen on it. That state should not be allowed to ever exist.
If listening require pinning, that should always be implied then. That's the whole point of
EventListener
type. At the very least, new andEventListener
docs should document all these caveats well.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.