Comments (14)
Oh dear, that's not good. Thanks for opening an issue! I'll look into this as soon as I can.
from concurrentqueue.
Here is a test that replicates the issue. Note that test3 and test4 are essentially your unit test, and test1 and test2 are my "adaptation" for openmp. It appears that this is an openmp related issue: test3 and test4 pass, but test1 and test2 do not.
/**
- @file testconcurrentqueue.cpp
- @InGroup
- @author tpan
- @brief
- @details
* - Copyright (c) 2015 Georgia Institute of Technology. All Rights Reserved.
* - TODO add License
*/
#include "concurrentqueue/concurrentqueue.h"
#include
#include <omp.h>
#include
using namespace moodycamel;
void test1() {
// Implicit
const int MAX_THREADS = 48;
ConcurrentQueue q(4096 * (MAX_THREADS + 1));
#pragma omp parallel num_threads(4) shared (q)
{
q.enqueue(omp_get_thread_num());
}
#pragma omp parallel num_threads(4) shared (q)
{
int v;
assert(q.try_dequeue(v));
printf("tid %d dequeued %d\n", omp_get_thread_num(), v);
}
#pragma omp parallel num_threads(4) shared (q)
{
q.enqueue(omp_get_thread_num());
#pragma omp barrier
int v;
assert(q.try_dequeue(v));
printf("tid %d dequeued %d\n", omp_get_thread_num(), v);
}
#pragma omp parallel num_threads(MAX_THREADS) shared(q)
{
for (volatile int i = 0; i != 4096; ++i) {
continue;
}
q.enqueue(omp_get_thread_num());
printf("tid %d enqueue.\n", omp_get_thread_num());
for (volatile int i = 0; i != 4096; ++i) {
continue;
}
#pragma omp barrier
}
std::vector seenIds(MAX_THREADS, false);
int v;
for (std::size_t i = 0; i != MAX_THREADS; ++i) {
assert(q.try_dequeue(v));
if (seenIds[v]) printf("already seen %d\n", v);
else printf("haven't seen %d\n", v);
seenIds[v] = true;
}
for (std::size_t i = 0; i != MAX_THREADS; ++i) {
assert(seenIds[i]);
}
}
void test2() {
// Test many threads and implicit queues being created and destroyed concurrently
int nThreads = 32;
std::vector success(nThreads, true);
#pragma omp parallel num_threads(nThreads) shared(success)
{
for (int i = 0; i != 5; ++i) {
ConcurrentQueue q(1);
q.enqueue(i);
}
ConcurrentQueue<int> q(15);
for (int i = 0; i != 100; ++i) {
q.enqueue(i);
}
int item;
for (int i = 0; i != 100; ++i) {
if (!q.try_dequeue(item) || item != i) {
success[omp_get_thread_num()] = false;
}
}
if (q.size_approx() != 0) {
success[omp_get_thread_num()] = false;
}
#pragma omp barrier
}
for (int tid = 0; tid != nThreads; ++tid) {
assert(success[tid]);
}
}
// this one is not openmp based.
void test3() {
// Implicit
const int MAX_THREADS = 48;
ConcurrentQueue<int> q(4096 * (MAX_THREADS + 1));
std::thread t0([&]() { q.enqueue(0); });
t0.join();
std::thread t1([&]() { q.enqueue(1); });
t1.join();
std::thread t2([&]() { q.enqueue(2); });
t2.join();
q.enqueue(3);
int item;
int i = 0;
while (q.try_dequeue(item)) {
assert(item == i);
++i;
}
assert(i == 4);
std::vector<std::thread> threads(MAX_THREADS);
for (int rep = 0; rep != 2; ++rep) {
for (std::size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid] = std::thread([&](std::size_t tid) {
for (volatile int i = 0; i != 4096; ++i) {
continue;
}
q.enqueue((int)tid);
for (volatile int i = 0; i != 4096; ++i) {
continue;
}
}, tid);
}
for (std::size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid].join();
}
std::vector<bool> seenIds(threads.size());
for (std::size_t i = 0; i != threads.size(); ++i) {
assert(q.try_dequeue(item));
assert(!seenIds[item]);
seenIds[item] = true;
}
for (std::size_t i = 0; i != seenIds.size(); ++i) {
assert(seenIds[i]);
}
}
}
void test4 () {
// Test many threads and implicit queues being created and destroyed concurrently
std::vector<std::thread> threads(32);
std::vector<bool> success(threads.size(), true);
for (std::size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid] = std::thread([&](std::size_t tid) {
for (int i = 0; i != 5; ++i) {
ConcurrentQueue<int> q(1);
q.enqueue(i);
}
ConcurrentQueue<int> q(15);
for (int i = 0; i != 100; ++i) {
q.enqueue(i);
}
int item;
for (int i = 0; i != 100; ++i) {
if (!q.try_dequeue(item) || item != i) {
success[tid] = false;
}
}
if (q.size_approx() != 0) {
success[tid] = false;
}
}, tid);
}
for (std::size_t tid = 0; tid != threads.size(); ++tid) {
threads[tid].join();
assert(success[tid]);
}
}
int main (int argc, char** argv) {
test1();
test2();
}
from concurrentqueue.
BTW, thanks for the very prompt response!
from concurrentqueue.
ThreadSanitizer encounters the following error which causes a core dump. This is with the test above.
test_conc_queue: /home/tpan/src/bliss/ext/concurrentqueue/concurrentqueue.h:3209: void moodycamel::ConcurrentQueue<T, Traits>::implicit_producer_thread_exited(moodycamel::ConcurrentQueue<T, Traits>::ImplicitProducer*) [with T = int; Traits = moodycamel::ConcurrentQueueDefaultTraits]: Assertion `hash != nullptr' failed.
Aborted (core dumped)
from concurrentqueue.
I'll try a couple things on my lunch break.
That assert(hash != nullptr);
should never, ever fail with the default traits. The variable is initialized in the constructor to something non-null, and is only ever assigned non-null values thereafter.
To clarify, is the thread sanitizer bug only showing up with the OpenMP tests also? Or all of them?
from concurrentqueue.
sorry, just saw your message.
the thread sanitizer bug is showing up only for OpenMP tests, not all of them.
Tony
from concurrentqueue.
Interesting. If I run either test1 or test2 in isolation (with asan), I get no errors. If I run test1 followed by test2, then asan finds an error. Neither of the tests themselves fail. I'll have to look into this further.
from concurrentqueue.
Thank you very much!
from concurrentqueue.
This may not be an OpenMP-only bug. I still have to hunt it down, but I suspect it's due to the same bug as the one rob-p is seeing here (turning off implicit producer recycling fixes the crash for him).
from concurrentqueue.
Hi, Cameron, I just saw your message and also looked through rob-p's report. I'll give #undef a try.
Thanks.
from concurrentqueue.
I've managed to run an integration test with the entire queue as a whole under Relacy, and it found a bug (with explicit producers -- I haven't done the same for implicit ones yet). There's a chance these bugs are related. I'll continue working with Relacy until I can identify and fix all the bugs it finds, and then revisit this code (which, thankfully, I can reproduce on my end too) to see if that was the issue. I'll keep you posted!
from concurrentqueue.
Please be aware that I did something stupid in my test code above:
assert(q.try_dequeue(item));
calling a logically significant function in an assert statement is not a good thing.
Thanks!
from concurrentqueue.
Heh, I missed that, thanks for the heads up :-) I wasn't defining NDEBUG anyway, though (figured it be good to leave in as many checks as possible).
I don't have much spare time, and these types of bugs require lots of time to investigate, so a fix might take a while, sorry. I just wanted to say that I am looking into all the reported bugs, and will fix them as soon as possible.
from concurrentqueue.
Closing as there have been many fixes in the queue over the years. Feel free to reopen if this is still an issue.
from concurrentqueue.
Related Issues (20)
- concurrentqueue and readerwriterqueue,which one has better performance? HOT 5
- EXPLICIT_INITIAL_INDEX_SIZE not working HOT 13
- How to use token in multi producers and multi consumers situation? HOT 1
- Zero-copy support HOT 4
- Release queue memory without calling destructors HOT 2
- How to force fully remove the remaining data. HOT 4
- Question: Does this build and run on QNX ? HOT 2
- Memory Leak? HOT 2
- Memory Leak HOT 6
- [feature-request] Associating ProducerToken & ConsumerToken explicitly
- segment fault
- Occasionally program can be stuck inside try_dequeue of blockingconcurrentqueue.h HOT 1
- installed concurrentqueue using vcpkg, but can not make.
- try_dequeue_bulk core dump. HOT 2
- Introducing LIFO semaphore
- Waiting without dequeuing HOT 1
- ConcurrentQueue ~150x slower on Windows HOT 11
- Naming conflict HOT 2
- About license HOT 1
- Weird dequeue behavior HOT 8
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from concurrentqueue.