andreiavrammsd / cpp-channel Goto Github PK
View Code? Open in Web Editor NEWThread-safe container for sharing data between threads
Home Page: https://blog.andreiavram.ro/cpp-channel-thread-safe-container-share-data-threads/
License: MIT License
Thread-safe container for sharing data between threads
Home Page: https://blog.andreiavram.ro/cpp-channel-thread-safe-container-share-data-threads/
License: MIT License
Would it be possible to support batch reads/writes in some form? If you want to write all elements in a container to a channel it seems a bit inefficient to acquire the lock for every item.
This would be nice:
msd::channel<int> chan{10};
std::vector<int> test {1, 2, 3, 4, 5};
std::array<int, 2> test2;
chan << test;
int a, b, c;
chan >> a; //1
chan >> b; //2
chan >> test2; //{3, 4}
chan >> c; //5
The following code encountered an error while compiling streaming.cpp under VS2022:
template
struct std::iterator_traits<msd::blocking_iterator> {
using value_type = typename msd::blocking_iterator::value_type;
using iterator_category = std::output_iterator_tag;
};
The error message is as follows:
C2794 “reference”: not “std::iterator_traits<_Iter>” ....
following improvements be made?
template
struct std::iterator_traits<Yuanling::ThreadSafeChannelIterator> {
using value_type = typename Yuanling::ThreadSafeChannelIterator::ValueType;
using iterator_category = std::output_iterator_tag;
using reference = std::reference_wrapper;
};
It seems that msd::channel
does not support function pointer.
[]() {} >> chan;
( []() {} ) >> chan;
Currently, the channel iterators can be used only as output iterators.
Integrates well with STL algorithms.
If I want to pipe two channels, there is no support for the STL algorithms:
msd::channel<int> one;
msd::channel<int> two;
std::copy(one.begin(), one.end(), two.end());
I like the idea, and the execution. This would make transferring data between threads so much easier. I hope this gets added into C++23 (C++20 is finalized).
This line will fail if the type of the class is not default constructible.
It is a limitation which not mentioned in the readme.
In the push implementation, closed
is checked before the mutex to the channel is locked. This means a could check that the queue is not closed, but have it be closed while it acquires the mutex, pushing to a closed channel.
Because the close method does not lock and the read method does not check the closed value after locking, it's possible for a deadlock to occur under the following series of events:
Thread 1 (reader): read last element from the channel
Thread 1: call operator>>
Thread 1: check closed(), which is false
Thread 1: lock mutex
Thread 1: check the predicate in wait()
Thread 2 (writer): call close()
Thread 2: set closed=true
Thread 2: call cnd_.notify_all()
Thread 1: calls wait() without predicate (in the implementation of the wait with predicate method)
At this point, Thread 1 waits forever because the notify_all occurred before the actual call to wait().
Hi, I'm looking into adding this library into Conan Center index,
and a few questions came up during the review process of conan-io/conan-center-index#23136
My main question is if you expect to ever use targets, and if so, what do you expect it to be.
/cc @toge
Currently, the channel is on the right side of the operator, which is backwards from the C++ streams syntax, and makes chaining impossible (or at least very annoying) because <<
and >>
are left associative. I suggest swapping the 2 operators in order to match C++ streams syntax.
Thanks for the library! Just to let you know, while playing with the code I encountered some problems regarding closing when using the channel in a for range loop:
A for range loop iterates once on an empty channel when that channel is closed in another thread.
More general: If you rely on the end iterator to finish a loop it comes one iteration too late.
The reason for this behaviour is that blocking while waiting for data happens in operator*
. When the block is released upon closing of the channel (not because data has arrived), an undefined value is retrieved and returned.
This can be avoided when the range loop checks for channel validity like this:
1 for (auto in : incoming) {
2 // Check if we are done
3 if (incoming.empty() && incoming.closed()) {
4 break;
5 }
6 ...
7 }
But that a) looks ugly and b) can still cause errors (data not processed) when closing happens between line 1 and 3 in a different thread.
this is code, threadpool.h
is from https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h, ProcessV2 works fine, but Process never end
#include <cstdio>
#include <fstream>
#include <memory>
#include <string>
#include "msd/channel.hpp"
#include "threadpool.h"
using namespace std;
ThreadPool pool(10);
class BaseConverter {
protected:
msd::channel<std::string> lines;
public:
auto Process(const std::string &filename) -> shared_ptr<msd::channel<string>> {
pool.enqueue([&filename, this]() {
std::ifstream input(filename);
if (!input.is_open()) {
printf("open failed");
return -1;
}
string line;
while (getline(input, line)) {
// fmt::println("read line:{}", line);
line >> lines;
}
printf("read done");
lines.close();
return 0;
});
auto thread_num = 10;
msd::channel<int> worker_done(thread_num);
auto output = make_shared<msd::channel<string>>();
for (int i = 0; i < thread_num; i++) {
i >> worker_done;
pool.enqueue([this, &worker_done, &output]() {
for (const auto &e : lines) {
e >> *output;
}
int t;
t << worker_done;
if (worker_done.empty()) {
output->close();
}
});
}
return output;
}
auto ProcessV2(const std::string &filename) -> int {
pool.enqueue([&filename, this]() {
std::ifstream input(filename);
if (!input.is_open()) {
printf("open failed");
return -1;
}
string line;
while (getline(input, line)) {
// fmt::println("read line:{}", line);
line >> lines;
}
printf("read done");
lines.close();
return 0;
});
auto thread_num = 10;
msd::channel<int> worker_done(thread_num);
msd::channel<string> output;
for (int i = 0; i < thread_num; i++) {
i >> worker_done;
pool.enqueue([this, &worker_done, &output]() {
for (const auto &e : lines) {
e >> output;
}
int t;
t << worker_done;
if (worker_done.empty()) {
output.close();
}
});
}
for(const auto &e : output) {
printf("%s", e.c_str());
}
return 0;
}
};
auto main(int argc, char *argv[]) -> int {
BaseConverter().ProcessV2("./threadpool.h");
auto ret = BaseConverter().Process("./threadpool.h");
for (const auto &e : *ret) {
printf("%s", e.c_str());
}
return 0;
}
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.