Coder Social home page Coder Social logo

Comments (10)

ibraheemdev avatar ibraheemdev commented on July 18, 2024 1

std::sync::mpsc and crossbeam also implement the same type of rendezvous channel. A successful send only guarantees that recv has been called, not completed. The standard documentation is also a bit misleading about this distinction:

If the buffer size is 0, however, the channel becomes a rendezvous channel and it guarantees that the receiver has indeed received the data if this function returns success.

crossbeam is a bit clearer:

If called on a zero-capacity channel, this method will wait for a receive operation to appear on the other side of the channel.

from flume.

zesterer avatar zesterer commented on July 18, 2024

For my purpose it is important that the sender does not wake up before the receiver received its value in a rendezvous channel.

What do you mean by 'wake up'? In the extreme case, pessimistic scheduler behaviour could certainly mean that the sender could start running before the receiver starts running, and there's pretty much nothing you can do to stop that.

it is allowed to print ho, hey instead of hey, ho

That's entirely governed by the behaviour of the async runtime. Theoretically, flume won't wake up the sender until it knows that a receiver is currently ready and waiting to receive a message (note that this is not the same thing as the receiver having woken up!), but there's no telling what order the async runtime's scheduler will actually decide to wake the tasks up.

I strongly recommend not building your system around very strict assumptions like this. If you want to guarantee ordering, use the synchronisation primitives available to you - like mutexes, signals, etc. - so that you're not relying on changeable behaviour of the runtime.

from flume.

orlp avatar orlp commented on July 18, 2024

What do you mean by 'wake up'?

Unblocking is probably the better word.

I mean that send.send_async(()).await does not unblock until the value it tried to send has been received, that is that an associated recv.recv_async().await has successfully unblocked.

it is allowed to print ho, hey instead of hey, ho

That's entirely governed by the behaviour of the async runtime.

In a single-threaded executor that is literally impossible, how it's implemented right now.

In the extreme case, pessimistic scheduler behaviour could certainly mean that the sender could start running before the receiver starts running, and there's pretty much nothing you can do to stop that.

You absolutely could, and flume already does. It returns Poll::Pending unless the value has been picked up by a receiver. At least if I read the poll function correctly.

use the synchronisation primitives available to you

I would consider a rendezvous channel a synchronization primitive.

so that you're not relying on changeable behaviour of the runtime

What I'm describing is runtime-agnostic.

from flume.

zesterer avatar zesterer commented on July 18, 2024

Unblocking is probably the better word.

I think you're getting confused here. There are two distinct concepts here that are not the same. An async task being placed into a ready state first does not necessarily imply that it will be the first task to be run when the scheduler next gets to make such a decision.

Consider the following timeline:

  1. Task 1 calls recv(). The task is switched from a ready state to a waiting state.

  2. Task 1 .awaits the future returned by recv(). Doing so causes the task to yield to the runtime, at which point the runtime's scheduler decides to run Task 2 (since it is the only one of the two tasks currently in a ready state).

  3. Task 2 calls send(()). Because there is a receiver (Task 1) already waiting, it does not need to switch itself into a waiting state and so it simply places the message on the queue. Placing the message on the queue causes Task 1 to be switched to a ready state.

  4. Task 2 .awaits the future returned by send(()). Await still causes the task to yield to the runtime (yes, even if the future is immediately ready: see here for documentation demonstrating that Tokio does this).

  5. The runtime needs to make a decision about which task to run next. Because both Task 1 and Task 2 are both in a ready state, the choice as to which gets run first is arbitrary.

Note that all of this applies for a single-threaded async runtime too.

You absolutely could, and flume already does.

As demonstrated, it does not. What it does guarantee is that for rendezvous queues (i.e: those with a bound of 0), a call to .send will place the task into a waiting state unless there is already a receiver already waiting to receive an item from the queue.

What I'm describing is runtime-agnostic.

It is not. One could feasibly imagine reasonable scheduler algorithms that result in either Task 1 or Task 2 being run first after the send occurs (again, this also applies for single-threaded runtimes too).

I would consider a rendezvous channel a synchronization primitive.

A rendezvous channel is not a synchronisation primitive in the sense that it does not guarantee a canonical post-rendezvous ordering for sender and receiver. Why? Because to guarantee such an ordering, you need a critical section: and there is no critical section between the two. The instant the send(()) call occurs, both tasks immediately lose whatever relationship with one-another they may have had, and that includes order of execution.

from flume.

orlp avatar orlp commented on July 18, 2024

I understand the confusion now, I misread the code. You are right that flume::Sender immediately returns Poll::Ready if there is a reader waiting.

Why? Because to guarantee such an ordering, you need a critical section: and there is no critical section between the two. The instant the send(()) call occurs, both tasks immediately lose whatever relationship with one-another they may have had, and that includes order of execution.

I assumed an implementation where the sender is blocked until the receiver unblocks it. An implementation I wrote does have this feature, and I need it so I guess I can't use flume's bounded(0) channel. I thought I recognized a similar pattern in flume's code but I was wrong.

from flume.

orlp avatar orlp commented on July 18, 2024

For the record, this does mean that flume's bounded(0) is effectively a strange 1-capacity channel. It can hold ownership over data indefinitely long, and it can even 'lose' data in that Drop on the data gets called when the channel dies while it still holds ownership.

Even though it is a rendezvous channel where the sender returns Err(T) "if all receivers have been dropped", values can get stuck in the channel if a receiver gets dropped between a send and a receive. I would expect a 'true' rendezvous channel to provide atomic ownership transfer from sender to receiver, but this is not what happens.

Consider this example:

#![feature(noop_waker)]

use std::pin::Pin;
use std::task::{Waker, Context};
use std::future::Future;

#[derive(Debug)]
struct MayNeverDrop;
impl Drop for MayNeverDrop {
    fn drop(&mut self) {
        panic!("dropped");
    }
}

async fn produces_neverdrops(s: flume::Sender<MayNeverDrop>) {
    loop {
        let p = MayNeverDrop;
        println!("produced");
        match s.send_async(p).await {
            Ok(()) => continue,
            Err(x) => {
                core::mem::forget(x);
                break;
            }
        }
    }
}

async fn consumes_neverdrops(r: flume::Receiver<MayNeverDrop>) {
    while let Ok(x) = r.recv_async().await {
        println!("consumed");
        core::mem::forget(x);
    }
}

fn main() {
    let (send, recv) = flume::bounded(0);
    let mut producer = produces_neverdrops(send).boxed();
    let mut consumer = consumes_neverdrops(recv).boxed();
    
    let dummy_waker = Waker::noop();
    let mut cx = Context::from_waker(&dummy_waker);
    
    dbg!(Future::poll(Pin::new(&mut consumer), &mut cx));
    dbg!(Future::poll(Pin::new(&mut producer), &mut cx));
    dbg!(Future::poll(Pin::new(&mut consumer), &mut cx));
    dbg!(Future::poll(Pin::new(&mut producer), &mut cx));
    dbg!(Future::poll(Pin::new(&mut consumer), &mut cx));
    dbg!(Future::poll(Pin::new(&mut producer), &mut cx));
    println!("cancelling consumer");
    drop(consumer);
    println!("simulate join() on producer");
    dbg!(Future::poll(Pin::new(&mut producer), &mut cx));
    println!("leaving scope")
}

One might expect that as long as you join() on the producer then you may cancel the consumes_neverdrops at any time, and the producer will clean up by taking back the last value from the Err and forgetting it. Furthermore you might expect the log to say

produced
consumed
produced
consumed

since our channel has zero capacity.

Instead we see the following:

[src/main.rs:44:5] Future::poll(Pin::new(&mut consumer), &mut cx) = Pending
produced
produced
[src/main.rs:45:5] Future::poll(Pin::new(&mut producer), &mut cx) = Pending
consumed
consumed
[src/main.rs:46:5] Future::poll(Pin::new(&mut consumer), &mut cx) = Pending
produced
produced
[src/main.rs:47:5] Future::poll(Pin::new(&mut producer), &mut cx) = Pending
consumed
consumed
[src/main.rs:48:5] Future::poll(Pin::new(&mut consumer), &mut cx) = Pending
produced
produced
[src/main.rs:49:5] Future::poll(Pin::new(&mut producer), &mut cx) = Pending
cancelling consumer
simulate join() on producer
thread 'main' panicked at src/main.rs:11:9:
dropped

One might wonder how we managed to pass TWO values per poll through a zero-capacity channel. That really looks like a 1-capacity channel to me.

from flume.

orlp avatar orlp commented on July 18, 2024

To further support the idea that the current design of bounded(0) isn't a true bounded(0) channel but a strange bounded(1) channel, consider this example:

async fn producer(s: flume::Sender<()>) {
    loop {
        println!("produced");
        s.send_async(()).await.ok();
    }
}

async fn consumer(r: flume::Receiver<()>) {
    loop {
        r.recv_async().await.ok();
        println!("consumed");
    }
}

fn main() {
    for capacity in [3, 2, 1, 0] {
        let (send, recv) = flume::bounded(capacity);
        let mut producer = producer(send).boxed();
        let mut consumer = consumer(recv).boxed();
        
        let dummy_waker = Waker::noop();
        let mut cx = Context::from_waker(&dummy_waker);
        
        println!("capacity: {capacity}");
        let _ = Future::poll(Pin::new(&mut producer), &mut cx);
        let _ = Future::poll(Pin::new(&mut consumer), &mut cx);
        let _ = Future::poll(Pin::new(&mut producer), &mut cx);
        let _ = Future::poll(Pin::new(&mut consumer), &mut cx);
        println!("");
    }
}

This prints:

capacity: 3
produced
produced
produced
produced
consumed
consumed
consumed
consumed
produced
produced
produced
produced
consumed
consumed
consumed
consumed

capacity: 2
produced
produced
produced
consumed
consumed
consumed
produced
produced
produced
consumed
consumed
consumed

capacity: 1
produced
produced
consumed
consumed
produced
produced
consumed
consumed

capacity: 0
produced
consumed
produced
produced
consumed
consumed

As you would expect, for bounded(n) we produce n + 1 values then block because the channel is full on the n + 1th value. Then the receiver accepts n + 1 values: n until the channel is empty and the last value is taken directly from the sender until we have to block until more values are available.

This pattern holds for all n, except 0 where we mysteriously see that we start producing and consuming two at a time, passing more than one value through the channel at a time even though the channel is supposed to have 0 capacity.

from flume.

zesterer avatar zesterer commented on July 18, 2024

values can get stuck in the channel if a receiver gets dropped between a send and a receive

This is actually a broader issue in the async ecosystem, that of future cancellation. It's a very difficult problem to engineer around (even more so in the case of streams), and is the main thing that makes having flume support both sync and async in the same implementation quite difficult. The sync implementations don't need to deal with this since you can't remotely cancel a thread (or, at least, it's reasonable to expect lost values if this is done).

To further support the idea that the current design of bounded(0) isn't a true bounded(0) channel but a strange bounded(1) channel, consider this example

I don't think that this example demonstrates what you think it's demonstrating.

It is entirely possible for both produced and consumed to be printed twice in a row due to the scheduling issue (not really an issue) that I mentioned previously.

As I said, send(()).await on a rendezvous channel does not guarantee that the sending task will not do work until the receiving task has started doing work, nor should it: the only guarantee is that the sending task will not do work until there is a receiving task waiting to receive the message.

If you've got the impression that this is a guarantee made by flume, then I apologise: that's not what it's designed to do, and this isn't a bug.

If you want that sort of behaviour, then the thing to do is sending some sort of sync primitive to the receiving task. For example, the sending might send a oneshot::Sender<()> to the receiving task, such that the receiving task can signal to the sending task when the value has been processed, allowing the sending task to wait until the right time to continue doing work.

from flume.

orlp avatar orlp commented on July 18, 2024

As I said, send(()).await on a rendezvous channel does not guarantee that the sending task will not do work until the receiving task has started doing work, nor should it: the only guarantee is that the sending task will not do work until there is a receiving task waiting to receive the message.

My initial opening of the issue was based on a misunderstanding of the current behavior, but I've since come to feel more strongly and that my expected behavior is actually the correct behavior of a zero-capacity channel. A zero-capacity channel should never be able to end up in a position where it owns data, in my opinion, as it models something with no capacity whatsoever.

However if your interpretation is that it's not incorrect and thus not a bug we can close this.

from flume.

zesterer avatar zesterer commented on July 18, 2024

I don't see this as the channel 'owning' data: just that the receiving task is being given ownership of data by the sending task, but is not yet in a state to process that change in ownership. It's an interesting and subtle distinction though, so might be worth further documentation.

from flume.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.