Coder Social home page Coder Social logo

andreiavrammsd / cpp-channel Goto Github PK

View Code? Open in Web Editor NEW
287.0 7.0 24.0 136 KB

Thread-safe container for sharing data between threads

Home Page: https://blog.andreiavram.ro/cpp-channel-thread-safe-container-share-data-threads/

License: MIT License

CMake 40.76% C++ 59.24%
cpp multithreading thread-safe channel concurrent-queue queue thread-safe-queue synchronized-queue

cpp-channel's People

Contributors

andreiavrammsd avatar fwflunky avatar tenheadedlion avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

cpp-channel's Issues

Batch operations?

Would it be possible to support batch reads/writes in some form? If you want to write all elements in a container to a channel it seems a bit inefficient to acquire the lock for every item.
This would be nice:

msd::channel<int> chan{10};
std::vector<int> test {1, 2, 3, 4, 5};
std::array<int, 2> test2;
chan << test;

int a, b, c;
chan >> a; //1
chan >> b; //2
chan >> test2; //{3, 4}
chan >> c; //5

to compile example streaming.cpp error under vs2022

The following code encountered an error while compiling streaming.cpp under VS2022:

template
struct std::iterator_traits<msd::blocking_iterator> {
using value_type = typename msd::blocking_iterator::value_type;
using iterator_category = std::output_iterator_tag;
};

The error message is as follows:
C2794 “reference”: not “std::iterator_traits<_Iter>” ....

following improvements be made?

template
struct std::iterator_traits<Yuanling::ThreadSafeChannelIterator> {
using value_type = typename Yuanling::ThreadSafeChannelIterator::ValueType;
using iterator_category = std::output_iterator_tag;
using reference = std::reference_wrapper;
};

Use channel as an input_iterator

Currently, the channel iterators can be used only as output iterators.

Integrates well with STL algorithms.

If I want to pipe two channels, there is no support for the STL algorithms:

msd::channel<int> one;
msd::channel<int> two;
std::copy(one.begin(), one.end(), two.end());

Hope

I like the idea, and the execution. This would make transferring data between threads so much easier. I hope this gets added into C++23 (C++20 is finalized).

Possible Race Condition in push

In the push implementation, closed is checked before the mutex to the channel is locked. This means a could check that the queue is not closed, but have it be closed while it acquires the mutex, pushing to a closed channel.

Race condition between close/read leads to deadlock

Because the close method does not lock and the read method does not check the closed value after locking, it's possible for a deadlock to occur under the following series of events:

Thread 1 (reader): read last element from the channel
Thread 1: call operator>>
Thread 1: check closed(), which is false
Thread 1: lock mutex
Thread 1: check the predicate in wait()
Thread 2 (writer): call close()
Thread 2: set closed=true
Thread 2: call cnd_.notify_all()
Thread 1: calls wait() without predicate (in the implementation of the wait with predicate method)

At this point, Thread 1 waits forever because the notify_all occurred before the actual call to wait().

Swap `operator<<` and `operator>>` to be similar to C++ streams

Currently, the channel is on the right side of the operator, which is backwards from the C++ streams syntax, and makes chaining impossible (or at least very annoying) because << and >> are left associative. I suggest swapping the 2 operators in order to match C++ streams syntax.

for range loops one time more than it should

Thanks for the library! Just to let you know, while playing with the code I encountered some problems regarding closing when using the channel in a for range loop:

A for range loop iterates once on an empty channel when that channel is closed in another thread.

More general: If you rely on the end iterator to finish a loop it comes one iteration too late.

The reason for this behaviour is that blocking while waiting for data happens in operator*. When the block is released upon closing of the channel (not because data has arrived), an undefined value is retrieved and returned.

This can be avoided when the range loop checks for channel validity like this:

1  for (auto in : incoming) {
2      // Check if we are done
3      if (incoming.empty() && incoming.closed()) {
4          break;
5      }
6      ...
7  }

But that a) looks ugly and b) can still cause errors (data not processed) when closing happens between line 1 and 3 in a different thread.

dead lock, never end

this is code, threadpool.h is from https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h, ProcessV2 works fine, but Process never end

#include <cstdio>
#include <fstream>
#include <memory>
#include <string>
#include "msd/channel.hpp"
#include "threadpool.h"

using namespace std;
ThreadPool pool(10);

class BaseConverter {
 protected:
    msd::channel<std::string> lines;

 public:
    auto Process(const std::string &filename) -> shared_ptr<msd::channel<string>> {
        pool.enqueue([&filename, this]() {
            std::ifstream input(filename);
            if (!input.is_open()) {
                printf("open failed");
                return -1;
            }

            string line;
            while (getline(input, line)) {
                // fmt::println("read line:{}", line);
                line >> lines;
            }
            printf("read done");
            lines.close();
            return 0;
        });

        auto thread_num = 10;
        msd::channel<int> worker_done(thread_num);
        auto output = make_shared<msd::channel<string>>();
        for (int i = 0; i < thread_num; i++) {
            i >> worker_done;
            pool.enqueue([this, &worker_done, &output]() {
                for (const auto &e : lines) {
                    e >> *output;
                }
                int t;
                t << worker_done;
                if (worker_done.empty()) {
                    output->close();
                }
            });
        }
        return output;
    }
    auto ProcessV2(const std::string &filename) -> int {
        pool.enqueue([&filename, this]() {
            std::ifstream input(filename);
            if (!input.is_open()) {
                printf("open failed");
                return -1;
            }

            string line;
            while (getline(input, line)) {
                // fmt::println("read line:{}", line);
                line >> lines;
            }
            printf("read done");
            lines.close();
            return 0;
        });

        auto thread_num = 10;
        msd::channel<int> worker_done(thread_num);
        msd::channel<string> output;
        for (int i = 0; i < thread_num; i++) {
            i >> worker_done;
            pool.enqueue([this, &worker_done, &output]() {
                for (const auto &e : lines) {
                    e >> output;
                }
                int t;
                t << worker_done;
                if (worker_done.empty()) {
                    output.close();
                }
            });
        }
        for(const auto &e : output) {
            printf("%s", e.c_str());
        }
        return 0;
    }
};
auto main(int argc, char *argv[]) -> int {
    BaseConverter().ProcessV2("./threadpool.h");
    auto ret = BaseConverter().Process("./threadpool.h");
    for (const auto &e : *ret) {
        printf("%s", e.c_str());
    }
    return 0;
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.