Coder Social home page Coder Social logo

concurrent-queue's Introduction

concurrent-queue

Build License Cargo Documentation

A concurrent multi-producer multi-consumer queue.

There are two kinds of queues:

  1. Bounded queue with limited capacity.
  2. Unbounded queue with unlimited capacity.

Queues also have the capability to get closed at any point. When closed, no more items can be pushed into the queue, although the remaining items can still be popped.

These features make it easy to build channels similar to std::sync::mpsc on top of this crate.

Examples

use concurrent_queue::ConcurrentQueue;

let q = ConcurrentQueue::unbounded();
q.push(1).unwrap();
q.push(2).unwrap();

assert_eq!(q.pop(), Ok(1));
assert_eq!(q.pop(), Ok(2));

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

concurrent-queue's People

Contributors

dependabot[bot] avatar garious avatar gmorenz avatar james7132 avatar mamaicode avatar mreconomical avatar notgull avatar taiki-e avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

concurrent-queue's Issues

Improve documentation of internals of `bounded` implementation

see also #58 (comment) and a few related comments.

The logic in large parts of the bounded implementation is somewhat non-obvious, and partially stems from other code bases, so it might be a good idea to write down how it works, and that it works correctly, or find a document that already properly does that and link to it. (ad the comments that already exist partially duplicate each other, not necessarily adding to understanding of how all the components interact)

RUSTSEC-2021-0145: Potential unaligned read

Potential unaligned read

Details
Status unsound
Package atty
Version 0.2.14
URL softprops/atty#50
Date 2021-07-04

On windows, atty dereferences a potentially unaligned pointer.

In practice however, the pointer won't be unaligned unless a custom global allocator is used.

In particular, the System allocator on windows uses HeapAlloc, which guarantees a large enough alignment.

atty is Unmaintained

A Pull Request with a fix has been provided over a year ago but the maintainer seems to be unreachable.

Last release of atty was almost 3 years ago.

Possible Alternative(s)

The below list has not been vetted in any way and may or may not contain alternatives;

  • is-terminal
  • std::io::IsTerminal nightly-only experimental

See advisory page for additional details.

weird test failures on s390x-unknown-linux-gnu with rustc + llvm 13 only

For context: I'm the primary package maintainer for Rust crates in Fedora Linux. We're running test suites for all crates, where possible, in an effort to improve the quality of the packages that we ship.

I came across a very weird test failure in concurrent-queue 1.2.3 and 1.2.4:

running 10 tests
test close ... ok
test capacity ... ok
test len ... ok
test len_empty_full ... ok
test linearizable ... ok
test mpmc ... FAILED
test smoke ... ok
test drops ... ok
test zero_capacity - should panic ... ok
test spsc has been running for over 60 seconds

And then the test runner is stuck forever.

This problem with the mpmc and spsc tests is limited to s390x, and only when the crate is compiled with Rust on LLVM 13 (such as on Fedora 35, where LLVM 14 is not available).

I cannot determine for certain which change might have started to cause this problem, because we don't include s390x in our CI, due to limited builder capacity, but I can provide these data points:

Last known successful build:

  • concurrent-queue 1.2.2
  • rust version: 1.58.1 built against llvm 13
  • build host: Fedora 35 running Linux 5.15.6

First known unsuccessful build:

  • concurrent-queue 1.2.3
  • rust version: 1.62.0 built against llvm 13
  • build host: Fedora 36 running Linux 5.18.9

But this is also successful:

  • concurrent-queue 1.2.3
  • rust version: 1.62.0 built against llvm 14
  • build host: Fedora 36 running Linux 5.18.9

Fenced load in bounded queue / v1.2.3

I have commented on the relevant commit (#16), but too late as the commit was merged so I thought I would open an issue so this can be tracked.

Admittedly, I don't know what exactly is the role of the fence here. This fence does not exist in Dmitry Vyukov's original implementation of the queue, so I guess it was added as part of the modifications that ensure that this queue is linearisable (unlike the original queue).

That being said, if the cross-platform solution is indeed to place the load before the fence (this, I do not know) then I am pretty sure that the intel specialization that uses a lock operation instead of an mfence should also keep the load before.

I did look at https://www.cl.cam.ac.uk/~pes20/cpp/cpp0xmappings.html but could not see where it states that lock + mov (in this order) is equivalent to mov + mfence. In fact, the latest GCC does use the lock optimization and definitely preserves the order, i.e. mov + lock (see this godbolt: https://godbolt.org/z/o3rYdTvYv).

Investigate ways to make `steal` from `async-executor` more efficient

It appears that a large amount of time in async-executor is spent shuffling around tasks between different queues. It would be interesting if there are ways to make such stealing of half of a queue faster, perhaps by making such a function part of this crate and utilize knowledge about internals to make such "mass-moves" faster.

Remove the heap allocation from non-single queues

This appears to have been added in ee83323, possibly in response to the Clippy warning that shows up otherwise

Bounded(Box<Bounded<T>>),

There is a useless heap allocation here that only serves to equalize the sizes of the ConcurrentQueue enum arms. As this queue is usually just shoved into a heap allocation for another structure, it seems like a waste of a heap allocation. Especially since only around four words of memory are wasted in the worse case, assuming a T with a size of a word.

How to use concurrent-queue with tokio and lambda?

Dear, I want to use Concurrent-Queue in my module,

#[cfg(test)]
mod test
{
	use std::borrow::Borrow;
	use std::fmt::Display;
	use std::sync::Arc;
	use std::sync::atomic::{AtomicBool, Ordering};
	use std::thread::sleep;
	use std::time::Duration;
	use concurrent_queue::ConcurrentQueue;
	use tokio::runtime::Runtime;


	pub fn runConcurrentBy<T,ActProd, ActCons>( threads:i32, actProd:ActProd, actCons:ActCons ) -> (Arc<AtomicBool>,Runtime)
	where ActProd: Fn( &ConcurrentQueue<T> ),
		ActCons: Fn( &T ) + Send + Sync + 'static,
		T: Display + Send + 'static
	{
		let mut q = ConcurrentQueue::unbounded();

		let rtm = Runtime::new().unwrap();
		//
		let end = AtomicBool::new(false);

		let stopAll = Arc::new(end);

		let queue = Arc::new(q);

		for _ in 0..threads
		{
			let stop = stopAll.clone();
			let q = queue.clone();

			rtm.spawn(async move
			{
				while !stop.load( Ordering::Relaxed )
				{
					if let Ok( val ) = q.pop()
					{
						actCons( &val );
						//println!( "item: {}", val );
					}
				}
			});
		}

		actProd( &queue );


		(stopAll,rtm)
	}


	#[tokio::test]
	async fn test_concurrentQueue()
	{
		let data = vec![ 1,2,3,4,5,6,7,8,9,10,11 ];

		let (stop,rtm) = runConcurrentBy( 10,
		|q|
		{
			for val in &data
			{
				q.push(val.to_owned()).unwrap();
			}
		},
		|v|
		{
			println!( "item: {}", v );
		}
		);

		sleep( Duration::from_secs_f64(3.0) );

		stop.swap( true, Ordering::Relaxed );
		rtm.shutdown_background();
	}
}

But always get error like..

17 |       pub fn runConcurrentBy<T,ActProd, ActCons>( threads:i32, actProd:ActProd, actCons:ActCons ) -> (Arc<AtomicBool>,Runtime)
   |                                                                                 ------- move occurs because `actCons` has type `ActCons`, which does not implement the `Copy` trait
...
38 | /             {
39 | |                 while !stop.load( Ordering::Relaxed )
40 | |                 {
41 | |                     if let Ok( val ) = q.pop()
42 | |                     {
43 | |                         actCons( &val );
   | |                         ------- use occurs due to use in generator
...  |
46 | |                 }
47 | |             });
   | |_____________^ value moved here, in previous iteration of loop
   |
help: consider borrowing `actCons`
   |
38 |             &{
   |             +

Please teach me hot to fix this, thank you very much

Cannot build --no-default-features --features loom --cfg loom

Discovered while working on smol-rs/event-listener#126

RUSTFLAGS="--cfg=loom" cargo build --no-default-features --features loom
   Compiling concurrent-queue v2.4.0 (/Users/jbr/code/concurrent-queue)
error[E0425]: cannot find function `spin_loop` in this scope
  --> src/sync.rs:51:5
   |
51 |     spin_loop();
   |     ^^^^^^^^^ not found in this scope
   |
help: consider importing one of these items
   |
42 + use core::hint::spin_loop;
   |
42 + use loom::hint::spin_loop;
   |

warning: unused import: `loom::thread::yield_now`
  --> src/sync.rs:39:20
   |
39 |     pub(crate) use loom::thread::yield_now;
   |                    ^^^^^^^^^^^^^^^^^^^^^^^
   |
   = note: `#[warn(unused_imports)]` on by default

For more information about this error, try `rustc --explain E0425`.
warning: `concurrent-queue` (lib) generated 1 warning
error: could not compile `concurrent-queue` (lib) due to 1 previous error; 1 warning emitted
                                                                                            

Merge implementation with crossbeam-queue

From a brief glance at both crates, it seems to me that there is very little noticeable difference between the algorithm used in concurrent-queue and the one used in crossbeam-queue. It would be nice to just re-implement concurrent-queue in terms of crossbeam-queue if there are no/little significant differences between the two implementations, as it would deduplicate code in the Rust ecosystem.

See value stored in head/tail of queue

Not sure if this is an anti-pattern towards what you guys have implemented. But I'd like to know if there's any reason as to why there's no functions such as get_head, get_tail for the Bounded and Unbounded queues?

I have a use case where I do not want to push an item onto the queue if said item was the last thing to be added. Hence the question.

Thanks!

Implement UnwindSafe?

Can ConcurrentQueue implement the UnwindSafe trait? Currently, it can't be used within panic::catch_unwind:

the type std::cell::UnsafeCell<std::mem::MaybeUninit<std::sync::Arc<pala_types::message::Message<pala_types::transaction::LedgerTx>>>> may contain interior mutability and a reference may not be safely transferrable across a catch_unwind boundary

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.