Coder Social home page Coder Social logo

event-listener's People

Contributors

antonsol919 avatar atouchet avatar beckend avatar cynecx avatar dependabot[bot] avatar fogti avatar jbr avatar kalcutter avatar mamaicode avatar notgull avatar r3v2d0g avatar taiki-e avatar tomaka avatar withoutboats avatar zeenix avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

event-listener's Issues

Possible racecondition

I have made a simple channel, trying to use this library:
https://github.com/beckend/footgun-event-listener

This hangs often recv_wait:

#[derive(Debug)]
struct Channel<T> {
    closed: AtomicBool,
    events_recv: Event,
    events_queue_is_empty: Event,
    queue: Mutex<VecDeque<T>>,
}

impl<T> Receiver<T> {
    pub fn recv_wait(&self) -> Option<T> {
        loop {
            if self.channel.is_closed() {
                return None;
            }

            let mut listener = self.channel.events_recv.listen();
            let lis = listener.as_mut();
            let mut queue = self.channel.queue.lock().unwrap();

            if let Some(payload) = queue.pop_front() {
                return Some(payload);
            } else {
                self.channel.events_queue_is_empty.notify(usize::MAX);
            }

            drop(queue);

            lis.wait();
        }
    }
}

It's no supposed to hang since I use close_notify:

impl<T> Sender<T> {
    // ...
    pub fn close(&self) {
        self.channel.closed.store(true, Ordering::SeqCst);
    }

    pub fn close_notify(&self) {
        self.close();
        self.channel.events_recv.notify(usize::MAX);
    }
}

It's a minimal channel example with and test.
https://github.com/beckend/footgun-event-listener/blob/main/src/__tests__/channel.spec.rs

Why is the mutex always acquired early in EventListener::poll?

event-listener/src/lib.rs

Lines 665 to 699 in 42a1cac

let mut list = self.inner.lock();
let entry = match self.entry {
None => unreachable!("cannot poll a completed `EventListener` future"),
Some(entry) => entry,
};
let state = unsafe { &entry.as_ref().state };
// Do a dummy replace operation in order to take out the state.
match state.replace(State::Notified(false)) {
State::Notified(_) => {
// If this listener has been notified, remove it from the list and return.
list.remove(entry, self.inner.cache_ptr());
drop(list);
self.entry = None;
return Poll::Ready(());
}
State::Created => {
// If the listener was just created, put it in the `Polling` state.
state.set(State::Polling(cx.waker().clone()));
}
State::Polling(w) => {
// If the listener was in the `Polling` state, update the waker.
if w.will_wake(cx.waker()) {
state.set(State::Polling(w));
} else {
state.set(State::Polling(cx.waker().clone()));
}
}
State::Waiting(_) => {
unreachable!("cannot poll and wait on `EventListener` at the same time")
}
}
Poll::Pending

Currently, the mutex is always acquired on entering the EventListener::poll method. This doesn't seem to be necessary to me, as it is only used in the State::Notified case

Edit: ok, it is needed as we modify the linked list. I think it could still be moved after the first match statement, as the pointer isn't dereferenced yet.

Notifying additional listeners

I'm pretty sure my use case for Event::notify wants to notify n additional listeners. You can see the code here: https://github.com/withoutboats/access-queue/blob/master/src/lib.rs

It's like a generalization of a Mutex which rather than using an AtomicBool, allows up to count concurrent accesses, used by a counter which decrements toward zero as accesses are given out, and increments back up as they are released. Whenever accesses are released, I want to increase the number of notified listeners until I hit the number of active listeners.

My understanding of the code is that if accesses are released before all of the previously notified listeners have awoken, some listeners can end up dropped, as they are not notified, but there is now free space in the queue which new attempts to access will take.

Is it possible to provide a version of Event::notify which will notify additional active listeners (up to the number of active listeners in total)?

Remove EventListener::new footgun

As discussed in #90 and #89, EventListener::new has a big footgun: it takes &Event ref but doesn't add itself to listen to the event passed. This is highly unintuitive IMO and if possible, we should remedy this. Two options here:

  1. Add a new method that doesn't take a Event ref and deprecate new in favor of that. OR
  2. (if possible) make listen() implicit in the first poll of EventListener and deprecate EventListener::listen.

I'd very much prefer the 2, since it removes the footgun completely.

Upgrading v4 to v5 panic: Listener was never inserted into the list

I'm unsure if I'm holding the new interface wrong somehow or if I've encountered a bug.

Given this diff that attempts to track the event-listener interface changes, I'm encountering a deterministic panic like:

event-listener-5.2.0/src/lib.rs:1251:36:
listener was never inserted into the list

and as far as I can tell the code reduces to

let n = AtomicUsize::from(1);
let event = Event::new();
let mut listener = event.listen();
loop {
    if 0 == n.load(Ordering::SeqCst) {
        return Poll::Ready(());
    };
    ready!(Pin::new(&mut listener).poll(cx));
}

Do I need to replace the listener every time I poll it? Am I holding this wrong? Is this a bug?

If I'm using it wrong I'll happily contribute documentation to help others avoid this error

RUSTSEC-2021-0145: Potential unaligned read

Potential unaligned read

Details
Status unsound
Package atty
Version 0.2.14
URL softprops/atty#50
Date 2021-07-04

On windows, atty dereferences a potentially unaligned pointer.

In practice however, the pointer won't be unaligned unless a custom global allocator is used.

In particular, the System allocator on windows uses HeapAlloc, which guarantees a large enough alignment.

atty is Unmaintained

A Pull Request with a fix has been provided over a year ago but the maintainer seems to be unreachable.

Last release of atty was almost 3 years ago.

Possible Alternative(s)

The below list has not been vetted in any way and may or may not contain alternatives;

  • is-terminal
  • std::io::IsTerminal nightly-only experimental

See advisory page for additional details.

poll not waking when Pending is returned

This is a follow up to async-rs/async-std#841 I tracked the poll that isn't waking in the chain down to the EventListener.

It seems that when pending is returned

cx.waker().wake_by_ref();

needs to be called or having more event listeners active then executer threads seems to lead to them indefinitively spinning.

I'll add a PR for this.

Thread local, `!Send + !Sync` version of `Event`

Does it make sense to have a !Send + !Sync version of Event, to be used with LocalExecutor? Feels silly to pay the cost of thread synchronization when everything is happening on a single thread.

(and also a thread local version of Mutex and RwLock, which IIUC is based on event_listener?)

`EventListener` doesn't implement `UnwindSafe`

I think it's a pretty normal use case to create an EventListener, then send the EventListener somewhere else, but this somewhere else panics and we want to continue running the program anyway.

Since Event itself implements RefUnwindSafe, it seems that EventListener not implementing UnwindSafe is just an overlook?

Double checking and memory ordering in the example can be relaxed?

In the crate example, two checks per loop are used

let flag = Arc::new(AtomicBool::new(false));
let event = Arc::new(Event::new());

// Spawn a thread that will set the flag after 1 second.
thread::spawn({
    let flag = flag.clone();
    let event = event.clone();
    move || {
        // Wait for a second.
        thread::sleep(Duration::from_secs(1));

        // Set the flag.
        flag.store(true, Ordering::SeqCst);

        // Notify all listeners that the flag has been set.
        // <-- implicit SeqCst fence here
        event.notify(usize::MAX); // <-- access to inner list mutex
    }
});

// Wait until the flag is set.
loop {
    // Check the flag.
    if flag.load(Ordering::SeqCst) {
        break;
    }

    // Start listening for events.
    let listener = event.listen(); // <-- access to inner list mutex
    // <-- implicit SeqCst fence here

    // Check the flag again after creating the listener.
    if flag.load(Ordering::SeqCst) {
        break;
    }

    // Wait for a notification and continue the loop.
    listener.wait(); // <-- access to inner list mutex
}

But is the first check and the ordering really necessary?

The only possibility for this to hang is when during a single loop, both of those conditions were satisfied:

  1. The listen mutex access is not visible from notify
  2. The second load returns false

The reasoning

  1. If condition 1 is true, the Mutex guarantees notify synchronizes-with listen
  2. notify synchronizes-with listen + listen sequences-before load -> notify inter-thread happens-before load (Inter-thread happens-before Rule No.3)
  3. store sequences-before notify + notify inter-thread happens-before load -> store inter-thread happens-before load (Inter-thread happens-before Rule No.4)
  4. store happens-before load -> store is visible side-effect load, condition 2 is false.

So the correctness is guaranteed. As we can see, this correctness guarantee has nothing to do with the first load and the memory ordering of the load. I think the below code works as well

let flag = Arc::new(AtomicBool::new(false));
let event = Arc::new(Event::new());

// Spawn a thread that will set the flag after 1 second.
thread::spawn({
    let flag = flag.clone();
    let event = event.clone();
    move || {
        // Wait for a second.
        thread::sleep(Duration::from_secs(1));

        // Set the flag.
        flag.store(true, Ordering::Relaxed);

        // Notify all listeners that the flag has been set.
        // <-- implicit SeqCst fence here
        event.notify(usize::MAX); // <-- access to inner list mutex
    }
});

// Check the flag and see whether we can oppurtunistically bypass listener creation
if !flag.load(Ordering::Relaxed) {
    // Wait until the flag is set.
    loop {
    
        // Start listening for events.
        let listener = event.listen(); // <-- access to inner list mutex
        // <-- implicit SeqCst fence here
    
        // Check the flag again after creating the listener.
        if flag.load(Ordering::Relaxed) {
            break;
        }
    
        // Wait for a notification and continue the loop.
        listener.wait(); // <-- access to inner list mutex
    }
}

Perhaps I'm missing something. I'm actually not too sure about some of my assertions above. Or there is more API and engineering concern to this matter?


Edit: I realize different usage may impose a different ordering requirement. I'm only using this crate to do grouped task backoff, so not hanging is everything I need. However, if people are doing initialization before notify and processing result after the loop, it seems the above code with Relaxed still works. (The different notify circumstances can cause two different exit routes from the loop, but both routes seem to guarantee visibility)

expose listener count

Using this as a base to create libraries, it's very useful to know the listener count from time to time.

[Question] Integration with loom or shuttle?

Hi,

This package has been a lifesaver. I really unable to implement https://github.com/rmcgibbo/async-priority-channel (which whos API is basically copied completely from smol-rs/async-channel) without out.

One question: would it be straightforward/possible for event-listener to provide a hook to use loom or shuttle. I'd like to be able to test my code which depends on event-listener using these techniques if possible. tokio-rs/loom#220 seems to indicate that the latest release of loom now handles SeqCst which is probably necessary.

no_std impl tries to collect std::usize::MAX flags when notified

Hit this bug while trying to integrate no_std into async-channel.

// Collect every tag we need.
let tags = {
let count = notify.count(Internal::new());
let mut tags = Vec::with_capacity(count);
for _ in 0..count {
tags.push(notify.next_tag(Internal::new()));
}
// Convert into an iterator.
tags.into_iter()
};

It is a common pattern for the number of listeners woken up to be std::usize::MAX. If the mutex is under contention and the program falls back to the node queue, it then tries to fill up a Vec with MAX entries, which immediately causes the program to freeze up.

I'm not sure what the best solution to this is. We could just disable tagged events on no_std, since as far as I know there is no way to implement it without blocking mutexes.

Incorrect mutex example

The mutex example compiles but run incorrectly in my computer. I did some fixup and it works.

impl<T> Drop for MutexGuard<'_, T> {
    fn drop(&mut self) {
        self.0.locked.store(false, Ordering::Release);
        self.0.lock_ops.notify(1);
    }
}

fmt::Debug should produce actually useful output

At the moment, the fmt::Debug implementation for Event and EventListener just writes Event { .. }, giving no useful information. It would be better if it produced actual descriptive information regarding the contents of the Event.

For Event, if it can lock the mutex, I'd like it to produce:

  • Number of notified listeners.
  • Total number of listeners.

For EventListener, it should indicate whether it's registered in the Event and, if it is, its current state (listening, notified, etc etc).

Unsoundness

event-listener in its current state (531c106) is unsound. It's possible to trigger a use-after-free bug completely with safe Rust.

PoC:

use event_listener::{Event, EventListener};

fn main() {
    let event = Event::new();
    let event2 = Event::new();

    let mut listener = Box::pin(EventListener::<()>::new());
    listener.as_mut().listen(&event);
    listener.as_mut().listen(&event2);

    drop(listener);
    event.notify(1);
}

cargo miri run:

paul@Pauls-MBP ~/dev/event-listener-test (git)-[master] % cargo miri run
Preparing a sysroot for Miri (target: aarch64-apple-darwin)... done
    Finished dev [unoptimized + debuginfo] target(s) in 0.00s
     Running `/Users/paul/.rustup/toolchains/nightly-aarch64-apple-darwin/bin/cargo-miri runner target/miri/aarch64-apple-darwin/debug/event-listener-test`
error: Undefined Behavior: out-of-bounds pointer use: alloc846 has been freed, so this pointer is dangling
   --> /Users/paul/.rustup/toolchains/nightly-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/ptr/non_null.rs:399:18
    |
399 |         unsafe { &*self.as_ptr().cast_const() }
    |                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ out-of-bounds pointer use: alloc846 has been freed, so this pointer is dangling
    |
    = help: this indicates a bug in the program: it performed an invalid operation, and caused Undefined Behavior
    = help: see https://doc.rust-lang.org/nightly/reference/behavior-considered-undefined.html for further information
help: alloc846 was allocated here:
   --> src/main.rs:7:24
    |
7   |     let mut listener = Box::pin(EventListener::<()>::new());
    |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
help: alloc846 was deallocated here:
   --> src/main.rs:11:5
    |
11  |     drop(listener);
    |     ^^^^^^^^^^^^^^
    = note: BACKTRACE (of the first span):
    = note: inside `std::ptr::NonNull::<event_listener::sys::Link<()>>::as_ref::<'_>` at /Users/paul/.rustup/toolchains/nightly-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/ptr/non_null.rs:399:18: 399:46
    = note: inside `event_listener::sys::Inner::<()>::notify::<event_listener::notify::Notify>` at /Users/paul/.cargo/registry/src/index.crates.io-6f17d22bba15001f/event-listener-4.0.0/src/std.rs:263:42: 263:52
    = note: inside `event_listener::sys::<impl event_listener::Inner<()>>::notify::<event_listener::notify::Notify>` at /Users/paul/.cargo/registry/src/index.crates.io-6f17d22bba15001f/event-listener-4.0.0/src/std.rs:119:9: 119:35
    = note: inside `event_listener::Event::notify::<i32>` at /Users/paul/.cargo/registry/src/index.crates.io-6f17d22bba15001f/event-listener-4.0.0/src/lib.rs:432:24: 432:44
note: inside `main`
   --> src/main.rs:12:5
    |
12  |     event.notify(1);
    |     ^^^^^^^^^^^^^^^

note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace

error: aborting due to 1 previous error

The issue is that EventListener::listen(mut self: Pin<&mut Self>, event: &Event<T>) doesn't deal with the case when the EventListener is already currently linked/associated with another/or same Event. It just unconditionally overwrites the content of the EventListener's Listener.

In my opinion, the general api design of Event/EventListener is rather unnecessarily complex and introduces more overhead than needed. Tokio's Notify api is much "better" in that regard. 🤷‍♂️

Hang since v2.3.2

This commit d9a315d seems to have triggered a regression in async-channel, specifically the following code succeeds for me with v2.3.1 but hangs with v2.3.2:

use async_channel::{bounded, Sender};
use futures::{future::RemoteHandle, FutureExt};
use std::{any::Any, future::Future, mem, panic::AssertUnwindSafe, pin::Pin};
use tokio::{
    runtime::Handle, task::{JoinError, LocalSet}
};

type Request = Box<dyn FnOnce() -> Box<dyn Future<Output = Response>> + Send>;
type Response = Result<Box<dyn Any + Send>, Box<dyn Any + Send>>;

#[derive(Debug)]
pub(super) struct Pool {
    sender: Sender<(Request, Sender<RemoteHandle<Response>>)>,
}
impl Pool {
    pub(super) fn new(threads: usize) -> Self {
        let handle = Handle::current();
        let handle1 = handle.clone();
        let (sender, receiver) = bounded::<(Request, Sender<RemoteHandle<Response>>)>(1);
        for _ in 0..threads {
            let receiver = receiver.clone();
            let handle = handle.clone();
            let _ = handle1.spawn_blocking(move || {
                let local = LocalSet::new();
                handle.block_on(local.run_until(async {
                    while let Ok((task, sender)) = receiver.recv().await {
                        let _ = local.spawn_local(async move {
                            let (remote, remote_handle) = Pin::from(task()).remote_handle();
                            let _ = sender.send(remote_handle).await;
                            remote.await;
                        });
                    }
                }))
            });
        }
        Self { sender }
    }
    pub(super) fn spawn_pinned<F, Fut, T>(
        &self, task: F,
    ) -> impl Future<Output = Result<T, JoinError>> + Send
    where
        F: FnOnce() -> Fut + Send + 'static,
        Fut: Future<Output = T> + 'static,
        T: Send + 'static,
    {
        let sender = self.sender.clone();
        async move {
            let task: Request = Box::new(|| {
                Box::new(
                    AssertUnwindSafe(task().map(|t| Box::new(t) as Box<dyn Any + Send>))
                        .catch_unwind(),
                )
            });
            let (sender_, receiver) = bounded::<RemoteHandle<Response>>(1);
            sender.send((task, sender_)).await.unwrap();
            let res = receiver.recv().await;
            let res = res.unwrap().await;
            #[allow(deprecated)]
            res.map(|x| *Box::<dyn Any + Send>::downcast(x).unwrap())
                .map_err(JoinError::panic)
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use futures::future::join_all;
    use std::sync::{
        atomic::{AtomicUsize, Ordering}, Arc
    };

    #[tokio::test]
    async fn spawn_pinned() {
        const TASKS: usize = 1000;
        const ITERS: usize = 10;
        const THREADS: usize = 4;
        let pool = Pool::new(THREADS);
        let count = Arc::new(AtomicUsize::new((1..TASKS).sum()));
        for i in 0..ITERS {
            println!("iter {}", i);
            join_all((0..TASKS).map(|i| {
                let count = count.clone();
                pool.spawn_pinned(move || async move {
                    let _ = count.fetch_sub(i, Ordering::Relaxed);
                })
            }))
            .await
            .into_iter()
            .collect::<Result<(), _>>()
            .unwrap();
            assert_eq!(count.load(Ordering::Relaxed), 0);
            count.store((1..TASKS).sum(), Ordering::Relaxed);
        }
    }
}

(this is a horrible and hopefully temporary hack in lieu of tokio-rs/tokio#2545)

Remove concurrent-queue dependency

concurrent-queue is a pretty heavy dependency for a crate as small as event-listener. It would be nice if we were able to avoid using it.

Miri test failure with --no-default-features

https://github.com/smol-rs/event-listener/actions/runs/6367362187/job/17285871726

failures:

---- sys::tests::listener_slab_notify_prop stdout ----
thread 'sys::tests::listener_slab_notify_prop' panicked at src/no_std.rs:1270:9:
assertion `left == right` failed
  left: Listener { state: Task(_), prev: Cell { value: None }, next: Cell { value: Some(3) } }
 right: Listener { state: Task(_), prev: Cell { value: None }, next: Cell { value: Some(3) } }
stack backtrace:
   0: std::panicking::begin_panic_handler
             at /home/runner/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panicking.rs:597:5
   1: core::panicking::panic_fmt
             at /home/runner/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/panicking.rs:72:14
   2: core::panicking::assert_failed_inner
             at /home/runner/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/panicking.rs:[31](https://github.com/smol-rs/event-listener/actions/runs/6367362187/job/17285871726#step:6:32)5:17
   3: core::panicking::assert_failed::<sys::Entry<()>, sys::Entry<()>>
             at /home/runner/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/panicking.rs:270:5
   4: sys::tests::listener_slab_notify_prop
             at src/no_std.rs:1270:9
   5: sys::tests::listener_slab_notify_prop::{closure#0}
             at src/no_std.rs:1119:[36](https://github.com/smol-rs/event-listener/actions/runs/6367362187/job/17285871726#step:6:37)
   6: <{closure@src/no_std.rs:1119:5: 1361:6} as std::ops::FnOnce<()>>::call_once - shim
             at /home/runner/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ops/function.rs:250:5
   7: <fn() -> std::result::Result<(), std::string::String> as std::ops::FnOnce<()>>::call_once - shim(fn() -> std::result::Result<(), std::string::String>)
             at /home/runner/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ops/function.rs:2[50](https://github.com/smol-rs/event-listener/actions/runs/6367362187/job/17285871726#step:6:51):5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.


failures:
    sys::tests::listener_slab_notify_prop

Failed assertion is:

event-listener/src/no_std.rs

Lines 1270 to 1277 in d2ed2cd

assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Task(Task::Waker(waker))),
prev: Cell::new(None),
next: Cell::new(Some(key3)),
}
);

This reminded me of rust-lang/rust#66281, but I have not looked if it is related.

cc @notgull

Question about new API

With the previous version I was able to do something like this:

  async fn recv(&self) -> Vec<ParsedMetric> {
    let listener = EventListener::new();
    tokio::pin!(listener);

    loop {
      if let Some(metrics) = self.lifo.lock().pop_back() {
        return metrics;
      }

      if listener.is_listening() {
        listener.as_mut().await;
      } else {
        listener.as_mut().listen(&self.event);
      }
    }
  }

In the new version this seems to roughly translate to:

  async fn recv(&self) -> Vec<ParsedMetric> {
    loop {
      if let Some(metrics) = self.lifo.lock().pop_back() {
        return metrics;
      }

      listener!(self.event => listener);

      if let Some(metrics) = self.lifo.lock().pop_back() {
        return metrics;
      }

      listener.await
    }
  }

However this fails compile with:

rror: future cannot be sent between threads safely
   --> pulse-metrics/src/pipeline/processor/buffer/mod.rs:67:7
    |
67  |       tokio::spawn(async move {
    |       ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `event_listener::sys::Inner<()>`, the trait `std::marker::Send` is not implemented for `NonNull<event_listener::sys::Link<()>>`
note: future is not `Send` as this value is used across an await
   --> pulse-metrics/src/pipeline/processor/buffer/mod.rs:87:16
    |
81  |       listener!(self.event => listener);
    |       --------------------------------- has type `StackSlot<'_, ()>` which is not `Send`
...
87  |       listener.await
    |                ^^^^^ await occurs here, with `mut $listener` maybe used later
note: required by a bound in `tokio::spawn`
   --> /Users/mklein/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/task/spawn.rs:166:21
    |
164 |     pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
    |            ----- required by a bound in this function
165 |     where
166 |         F: Future + Send + 'static,
    |                     ^^^^ required by this bound in `spawn`

I haven't dug into the guts but it seems like awaiting the future should somehow consume it such that this would not fail the Send check? Am I doing something wrong? Thank you.

missing_docs warning in easy_wrapper implementation

warning: missing documentation for a method
    --> src/lib.rs:1102:1
     |
1102 | / easy_wrapper! {
1103 | |     /// A future returned by [`Receiver::recv()`].
1104 | |     #[derive(Debug)]
1105 | |     #[must_use = "futures do nothing unless you `.await` or poll them"]
1106 | |     pub struct Recv<'a, T>(RecvInner<'a, T> => Result<T, RecvError>);
1107 | |     pub(crate) wait();
1108 | | }
     | |_^
     |
     = note: this warning originates in the macro `$crate::__pin_project_struct_make_proj_method` which comes from the expansion of the macro `easy_wrapper` (in Nightly builds, run with -Z macro-backtrace for more info)

It looks like this comes from pin_project_lite, but I'm not sure from where, exactly. Since this issue hasn't been observed in other crates that use pin_project_lite I don't think it's an issue with that crate.

Split event-listener-strategy out into a new crate

I generally like having only one crate per repo for this organization, since it makes the CI/release pipeline a lot cleaner and more helpful. So it would be nice to split out event-listener-strategy into a new crate.

UB due to lack of a full fence in full_fence (on x86)

I was recently made aware of this code:

// HACK(stjepang): On x86 architectures there are two different ways of executing
// a `SeqCst` fence.
//
// 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
// 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg` instruction.
//
// Both instructions have the effect of a full barrier, but empirical benchmarks have shown
// that the second one is sometimes a bit faster.
//
// The ideal solution here would be to use inline assembly, but we're instead creating a
// temporary atomic variable and compare-and-exchanging its value. No sane compiler to
// x86 platforms is going to optimize this away.
atomic::compiler_fence(Ordering::SeqCst);
let a = AtomicUsize::new(0);
let _ = a.compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst);
atomic::compiler_fence(Ordering::SeqCst);

As the comment says, this is UB -- and the myth that no sane compiler is going to optimize atomics is a myth. However, I admit I don't know enough about this specific case to say whether this is a risk for this crate.

What I am wondering, why doesn't this use inline assembly? That is the intended mechanism to force the compiler to generate code in a particular way, and it is clearly sound here. The alternative is even mentioned in the comment, but unfortunately without an explanation of why it was discarded.

Add a poll_listen() function

By embedding a singular Entry into the Event/Inner structure itself, we could probably add a poll_listen() function that allows a user to poll for an Event notification without needing to use an EventListener. Such a function would probably have the same caveat that poll_read()/poll_write() does in async_io::Async, where if it's called by different tasks it will cause a busy loop where one task wakes up another.

wasm test failed with "index out of bounds: the len is 0 but the index is 4"

https://github.com/smol-rs/event-listener/actions/runs/7155060118/job/19483256324

Running tests/notify.rs (target/wasm32-unknown-unknown/debug/deps/notify-0ef5209d8abd9615.wasm)
Set timeout to 20 seconds...
Executing bindgen...                              
                                                  
running 9 tests

panicked at /home/runner/work/event-listener/event-listener/src/no_std.rs:639:48:
index out of bounds: the len is 0 but the index is 4

Stack:

Error
    at /home/runner/work/event-listener/event-listener/target/wasm32-unknown-unknown/wbg-tmp-notify-0ef5209d8abd9615.wasm/wasm-bindgen-test.js:562:17
    at logError (/home/runner/work/event-listener/event-listener/target/wasm32-unknown-unknown/wbg-tmp-notify-0ef5209d8abd9615.wasm/wasm-bindgen-test.js:223:18)
    at module.exports.__wbg_new_abda76e883ba8a5f (/home/runner/work/event-listener/event-listener/target/wasm32-unknown-unknown/wbg-tmp-notify-0ef5209d8abd9615.wasm/wasm-bindgen-test.js:561:65)
    at console_error_panic_hook::Error::new::h7d6b2a2d770f64c0 (wasm://wasm/0039f982:wasm-function[1462]:0x9b9fe)
    at console_error_panic_hook::hook_impl::hd5f26b63473b4819 (wasm://wasm/0039f982:wasm-function[405]:0x68b55)
    at console_error_panic_hook::hook::hedca[37](https://github.com/smol-rs/event-listener/actions/runs/7155060118/job/19483256324#step:18:38)fbd9551172 (wasm://wasm/00[39](https://github.com/smol-rs/event-listener/actions/runs/7155060118/job/19483256324#step:18:40)f982:wasm-function[2033]:0xa4f45)
    at wasm_bindgen_test::__rt::Context::new::{{closure}}::{{closure}}::h4947e30e1bc66a88 (wasm://wasm/0039f982:wasm-function[1124]:0x92beb)
    at std::panicking::rust_panic_with_hook::hbf46ef0245cc9589 (wasm://wasm/0039f982:wasm-function[781]:0x83b7b)
    at std::panicking::begin_panic_handler::{{closure}}::hc07db45[42](https://github.com/smol-rs/event-listener/actions/runs/7155060118/job/19483256324#step:18:43)14d2c87 (wasm://wasm/0039f982:wasm-function[1003]:0x8e[49](https://github.com/smol-rs/event-listener/actions/runs/7155060118/job/19483256324#step:18:50)c)
    at std::sys_common::backtrace::__rust_end_short_backtrace::hf9e2f055fb5ef672 (wasm://wasm/0039f982:wasm-function[2[53](https://github.com/smol-rs/event-listener/actions/runs/7155060118/job/19483256324#step:18:54)3]:0xa90e6)


test notify::notify_all_fair ... FAIL

...

panic seems to be happening on this line:

let entry = &self.listeners[e.get()];

cc @notgull

Code coverage

In my most recent PRs, I've added a significant amount of code that doesn't end up being tested on the common case. In order to make sure our tests function properly, it would be nice to have an automatic code coverage utility in order to make sure every branch of our code is tested properly.

It may also be a good idea to implement this for other crates in this organization.

[Feature] Getting the number of notified listeners back

Hello!

I've just built an RWLock based around a single atomic and now I'm trying to make an async version with eventlistener. For my purposes, simply knowing if any listeners were notified would suffice, but knowing the count seems a logical extension of that.

What do you think?

Make this crate `no_std`

If this crate were no_std, it would allow crates like async-lock and async-channel to be used on no_std as well. I envision an API with a default std feature, and without it this crate would not depend on parking (and the wait API would disappear as well).

However, the current version of the code uses a Mutex to lock the linked list containing event listeners. Theoretically, we could replace this with an atomic linked list similar to the one used in concurrent-queue.

This would help smol be able to be used in embedded systems. Any thoughts on this?

RFC: Less complex, footgun free API

Continued from the discussion in #100

The APIs of v3.x and v4.x increase complexity at the cost of adding more footguns. Originally I'd wanted to keep the API small and simple, but this appears to have introduced a handful of different ways to mess up (e.g. panicking when creating a new EventListener without listening on it). In addition, v4.0 (#94) introduced a not-insignificant amount of overhead over the v3.x API. Therefore, as much as it pains me to have three breaking changes in this crate in such a short span of time, I think we need a better API.

Note: I am bad at naming things. None of these names are final.

Reference-Level Explanation

The idea I proposed in #100: is as follows: add an API based around a Listener trait that would look like this:

pub trait Listener<T>: Future<Output = T> + Sealed {
    fn wait(self) -> T;
    fn wait_timeout(self, timeout: Duration) -> Option<T>;
    fn wait_deadline(self, deadline: Instant) -> Option<T>;

    fn discard(self) -> Option<T>;
    fn listens_to(&self, event: &Event<T>) -> bool;
    fn same_event(&self, other: &(impl Listener<T> + ?Sized)) -> bool;
}

This Listener trait can do anything that the current EventListener type can do. It can be awaited or it can be used to block the current thread. Alternatively, it can be discarded.

To emulate the previous 2.x API, we would have an EventListener type. This is a heap-allocated event listener that can be moved freely.

impl<T> Event<T> {
    pub fn listen(&self) -> EventListener<T>;
}

pub struct EventListener<T = ()>(/* ... */);

impl Listener<T> for EventListener<T> { /* ... */ }

It would be used like this:

struct Flag {
    flag: AtomicBool,
    change_ops: Event<()>
}

impl Flag {
    async fn wait(&self) {
        loop {
            // Check the flag.
            if self.flag.load(Ordering::Relaxed) {
                return;
            }

            // Create a listener. By default it is already inserted into the list.
            let listener = self.change_ops.listen();

            // Check the flag again.
            if self.flag.load(Ordering::SeqCst) {
                return;
            }

            // Wait on the listener.
            listener.await;
        }
    }

    fn notify(&self) {
        self.flag.store(true, Ordering::Release);
        self.change_ops.notify(1);
    }
}

However, this is inefficient, as it preforms a heap allocation every time listen() is called. We also provide an API that allows one to use the stack instead of the heap, at a slightly higher complexity cost. To start, you create a StackSlot, which contains all of the state of the EventListener but stored on the stack. After being pinned, it can be transformed into a StackListener.

impl<T> Event<T> {
    pub fn stack_slot(&self) -> StackSlot<'_, T> { /* ... */ }
}

pub struct StackSlot<'ev, T>(/* ... */);
pub struct StackListener<'ev, 'listener, T>(/* ... */);

impl<'ev, T> StackSlot<'ev, T> {
    pub fn listen<'this>(self: Pin<&'this mut Self>) -> StackListener<'ev, 'this, T> { /* ... */ }
}

impl<T> Listener<T> for StackListener<'_, '_, T> { /* ... */ }

In addition, a macro will be provided that creates a StackSlot from an Event and automatically pins it to the heap.

macro_rules! listener {
    ($name:ident, $event:expr) => { /* ... */ }
}

/*
listener!(l, ev);
Expands to:
let mut l = ev.stack_slot();
let mut l = Pin::new_unchecked(&mut l);
*/

The example from above can be modified to look like this:

struct Flag {
    flag: AtomicBool,
    change_ops: Event<()>
}

impl Flag {
    async fn wait(&self) {
        listener!(slot, &self.change_ops);

        loop {
            // Check the flag.
            if self.flag.load(Ordering::Relaxed) {
                return;
            }

            // Insert the listener into the list.
            let listener = slot.listen();

            // Check the flag again.
            if self.flag.load(Ordering::SeqCst) {
                return;
            }

            // Wait on the listener.
            listener.await;
        }
    }

    fn notify(&self) {
        self.flag.store(true, Ordering::Release);
        self.change_ops.notify(1);
    }
}

@smol-rs/admins Thoughts on this?

Potential no-alloc version of the event listeners

While reviewing other versions of synchronization primitives, I realized that the version of this crate that exists in the tokio ecosystem doesn't allocate at all. It stores the event listener entry on the stack in the particular entry, using pin guarantees to erase any potential unsoundness. We may be able to emulate this approach in this crate as well.

The main advantage of this is that we could avoid allocation altogether. This could lead to faster code, but could also allow this crate (and it's dependents) to be run virtually anywhere. The main downside is that it would make it impossible to make this crate forbid(unsafe_code), the API would need to be radically changed and it would make the crate less simple.

Any thoughts on these trade-offs? #26 looks like it tries this.

EventListener::new footgun

Copy&pasting from the matrix channel to avoid repeating:

zeenix: this method takes an Event ref but doesn't actually setup the listener to listen to this event. This seems like a pretty big footgun
Saltbag: The only other option is to have it insert itself into the list on await, which has its own footguns. It can’t be set up in new since it needs to be pinned.
zeenix: it's just a very strange and unintuitive api now. I see multiple issues here:

  • EventListener existing w/o listening is just strange. The name implies something that's listening.
  • new takes a Event ref but doesn't listen to the given event.
  • The footgun of awaiting a listener w/o calling listen on it. That state should not be allowed to ever exist.

If listening require pinning, that should always be implied then. That's the whole point of EventListener type. At the very least, new and EventListener docs should document all these caveats well.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.