Coder Social home page Coder Social logo

brunocodutra / ring-channel Goto Github PK

View Code? Open in Web Editor NEW
38.0 4.0 5.0 148 KB

Bounded MPMC channel abstraction on top of a ring buffer

Home Page: https://crates.io/crates/ring-channel

License: MIT License

Rust 100.00%
ring-buffer channel mpmc asynchronous

ring-channel's Introduction

RingChannel docs.badge codecov.badge

Bounded MPMC channel abstraction on top of a ring buffer.

Using RingChannel

RingChannel is available on crates.io, simply add it as a dependency in your Cargo.toml:

[dependencies]
ring-channel = "0.12"

The full API documentation is available on docs.rs

Contribution

RingChannel is an open source project and you're very welcome to contribute to this project by opening issues and/or pull requests, see CONTRIBUTING for general guidelines.

License

RingChannel is distributed under the terms of the MIT license, see LICENSE for details.

ring-channel's People

Contributors

brunocodutra avatar dependabot[bot] avatar dylan-dpc avatar github-actions[bot] avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

ring-channel's Issues

Can't use `ring channel` in async context: `NonNull<ControlBlock> is not `Send`

Hey, thanks for this crate.

However, I'm trying to use ring channel where it's spawned in a background thread and when I try to spawn futures on I get:

99  |                     let task = rt.spawn(async move {
    |                                   ^^^^^ future created by async block is not `Send`
    |
    = help: within `Foo`, the trait `Sync` is not implemented for `NonNull<ring_channel::control::ControlBlock<Bar>>`
note: captured value is not `Send`

Is it reasonable to replace NonNull with AtomicPointer?

Cargo audit warning: spin is no longer actively maintained

Not necessarily a bug, but something that showed up in our CI when we added support for this terrific crate.

Warning from cargo audit:

warning: 1 warning found
Crate:  spin
Title:  spin is no longer actively maintained
Date:   2019-11-21
URL:    https://rustsec.org/advisories/RUSTSEC-2019-0031
Dependency tree: 
spin 0.5.2
└── ring-channel 0.8.1

Asyncronous?

thanks for making this. I was looking for a kind of unbouded drop channel. However I compose a lot with Streams and Sinks from futures-rs 0.3.

Since on crates this has the asynchronous category, I wondered how to use that, since these traits don't seem to be implemented for RingReceiver and RingSender...

I think there's an error in impl<T> Stream for RingReceiver<T>

Err(RecvError::Empty) => {
   // Keep polling thread awake.
   ctx.waker().wake_by_ref();
   Poll::Pending
}

AFAICT you should just return Pending here and store the waker. When new data comes in, then you call wake (I suppose that means your ringbuffer needs to wake the waker).

Weird issue

Consider this code:

pub async fn tst_nested(mut tx: ring_channel::RingSender<std::time::Instant>) {
    tx.send(std::time::Instant::now());
}

pub async fn tst(mut tx: ring_channel::RingSender<std::time::Instant>) {
    tx.send(std::time::Instant::now());
    tst_nested(tx.clone()).await;
}

fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    let (mut tx, mut rx) = ring_channel::ring_channel(std::num::NonZeroUsize::new(100).unwrap());
    rt.spawn(tst(tx));
    let msg = rx.recv().unwrap();
    let msg = rx.recv().unwrap();
}

It fails to compile with the following error:

error[E0277]: `std::ptr::NonNull<ring_channel::control::ControlBlock<std::time::Instant>>` cannot be shared between threads safely
   --> src/main.rs:147:8
    |
147 |     rt.spawn(tst(tx));
    |        ^^^^^ `std::ptr::NonNull<ring_channel::control::ControlBlock<std::time::Instant>>` cannot be shared between threads safely
    |
    = help: within `ring_channel::channel::RingSender<std::time::Instant>`, the trait `std::marker::Sync` is not implemented for `std::ptr::NonNull<ring_channel::control::ControlBlock<std::time::Instant>>`
    = note: required because it appears within the type `ring_channel::control::ControlBlockRef<std::time::Instant>`

I omitted the rest of the error description as it is long and scary.

The workarounds I found:

  1. Do not call tst_nested() :)
  2. Pass the RingSender by mutable reference. I'm not sure it is always possible/desireable though.
  3. Just do not use tx.clone() when calling tst_nested() - for this tiny example it is OK, but generally not always possible.

I'm not 100% sure that it is a bug, but looks highly suspicious. Note that crossbeam_channel works fine in this scenario.

Can't get consistent behavior

Hi, I'm running some integration tests with ring_channel in it, and I can't get a consistent result here unless I add a sleep:

use ring_channel::*;
use futures::{prelude::*};
use std::{num::NonZeroUsize, thread, sync::{ Arc, Mutex } };

#[async_std::main]
//
async fn main()
{
   let lock  = Arc::new(Mutex::new( () ));
   let lock2 = lock.clone();

   let (mut tx, mut rx) = ring_channel( NonZeroUsize::new(1).unwrap() );

   thread::spawn( move ||
   {
      async_std::task::block_on( async
      {
         let _guard = lock.lock();

         for i in 0..1000
         {
            tx.send( i ).unwrap();
         }
      });
   });

   // std::thread::sleep( std::time::Duration::from_millis(100) );

   let mut count = 0;

   while let Some(num) = rx.next().await
   {
      let _guard = lock2.lock();
      count += num;
   }

   assert_eq!( 999, count );
}

The count will often be over 1000 when not sleeping here. I don't understand why as for me the sequence of events is:

    • if rx.next().await runs first, should block as the channel is empty
    • if the sender runs first, it will send everything at once, protected by mutex. If in the mean time the rx has pulled the first item out and is waiting on the mutex, it should only have the first value (0), then block waiting for the sender to finish and then pull 999 out the buffer.
  1. sender gets lock first
  2. sender sends all numbers up to 999
  3. now rx get's to run and should only see 999 as the channel has bounded size of 1.

What do you make of it? Maybe it's nothing to do with ringchannel but I'm missing some subtle timing issue of the mutexes. Just trying to understand exactly how this is possible.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.