Coder Social home page Coder Social logo

ccia_code_samples's People

Contributors

anthonywilliams avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ccia_code_samples's Issues

#Listing 4.5: Double lock on mutex?

    void wait_and_pop(T& value)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk,[this]{return !data_queue.empty();});
        value=data_queue.front();
        data_queue.pop();
    }
    bool empty() const
    {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }

Class threadsafe_queue's wait_for_lock function lock on mutex and wait for predicate empty() to be false. However, calling of empty() require locking the mutex once more.

Does that lead to undefined behaviour due to double-locking of one mutex?

Why the code in listing_5.1.cpp works?

For the spinlock code, the lock logic is as follows:

void lock()
{
    while(flag.test_and_set(std::memory_order_acquire));
}

However, I don't understand why it works. The memory_order_acquire just make sure that all memory operations can not be moved before it. But flag.test_and_set contains a write operation after the read. And I think the write operation has relax memory order with the above code. That means if there are 2 threads that call the lock() simultaneously:

  • Firstly, thread1's flag.test_and_set returns false and writes true into flag, but with relax memory order.
  • Then, thread2's flag.test_and_set use acquire to read the data in flag, but it may not read the true set by thread 1, since thread 1's write is using relax memory order. Then thread2 may get false too, and it can also pass the lock and enter the critical section.

So I think it should use std::memory_order_acq_rel to make the logic correct.

Can you tell me what's wrong with my understanding?

Thanks

Listing 2.8 is Listing 2.9 in the book

I have the Second Edition of C++ Concurrency (ISBN ending in 4693) and I'm very confused why this repo's numbering is different than the book and why the code doesn't match. For example, page 31-32 of the book has listing 2.9 shown, but the code is found in listing 2.8 in the repo. And then the code in the repo does not match the book. For example, at the end of the listing in the book, a standard for loop is used, whereas in the repo's listing, a std:for_each is used.

Why does the repo not match the book?

Listing 9.12: a question about locking

The line 61 is commented out in the repository (compared to the printed book):
// self->set_clear_mutex.unlock();
Why?
And it is suspicious that the order of set_clear_mutex and lk locking changes: initially lk is locked first (it must be locked before the call to interruptible_wait) and set_clear_mutex is locked second (in the ctor of custom_lock); but then the order changes in custom_lock::lock().
Is it really correct?

assert in listing 5.5 never fire

Thanks for your book and code.
I tested code in listing 5.5.
The book says, in the code in Listing 5.5 the assert can fire, but I tested it and the assert never fire:

mi.U18|icefire|2021-12-29 13:06:30[muice@ listings]g++ listing_5.5.cpp -pthread
mi.U18|icefire|2021-12-29 13:07:18[muice@ listings]./a.out 
mi.U18|icefire|2021-12-29 13:07:20[muice@ listings]echo $?
0
mi.U18|icefire|2021-12-29 13:07:24[muice@ listings]./a.out 
mi.U18|icefire|2021-12-29 13:09:27[muice@ listings]echo $?
0

Test environement:
CPU: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz
g++ version: g++ (Ubuntu 7.5.0-3ubuntu1~18.04) 7.5.0
os version: Ubuntu 18.04.4

Appendix C, Listing C9

Reading Appendix C, A message-passing framework and complete ATM example, I could not find where the mutex iom is created, which is used in Listing C9, for example here

            incoming.wait()
                .handle<issue_money>(
                    [&](issue_money const& msg)
                    {
                        {
                            std::lock_guard<std::mutex> lk(iom);
                            std::cout<<"Issuing "
                                     <<msg.amount<<std::endl;
                        }
                    }
                    )

Am I missing something?

missing pieces of code in examples 9.6 - 9.8 ?

Been trying to get these things working

9.6 Thread Local Queue - https://github.com/anthonywilliams/ccia_code_samples/blob/main/listings/listing_9.6.cpp#L6-L7

9.7 Work Stealing Queue example - https://github.com/anthonywilliams/ccia_code_samples/blob/main/listings/listing_9.7.cpp#L41

I just seem to be failing in one specific area.. the static thread_local

https://github.com/anthonywilliams/ccia_code_samples/blob/main/listings/listing_9.8.cpp#L4-L15

How do I get this to compile.. I believe I need to somehow add some sort of instantiation in the thread_pool ?? but adding attempts to do that made no progress.

anyone gotten these to compile and run? Thank you

% sh compile.sh                                        
-- Conan: Adjusting output directories
-- Conan: Using cmake targets configuration
-- Conan: Adjusting default RPATHs Conan policies
-- Conan: Adjusting language standard
-- Current conanbuildinfo.cmake directory: /ch09_07_work_stealing_queues/build
-- Configuring done
-- Generating done
-- Build files have been written to: /ch09_07_work_stealing_queues/build
/usr/local/Cellar/cmake/3.18.4/bin/cmake -S/ch09_07_work_stealing_queues -B/ch09_07_work_stealing_queues/build --check-build-system CMakeFiles/Makefile.cmake 0
/usr/local/Cellar/cmake/3.18.4/bin/cmake -E cmake_progress_start /ch09_07_work_stealing_queues/build/CMakeFiles /ch09_07_work_stealing_queues/build//CMakeFiles/progress.marks
/Applications/Xcode.app/Contents/Developer/usr/bin/make  -f CMakeFiles/Makefile2 all
/Applications/Xcode.app/Contents/Developer/usr/bin/make  -f CMakeFiles/test_cpp_multi.dir/build.make CMakeFiles/test_cpp_multi.dir/depend
cd /ch09_07_work_stealing_queues/build && /usr/local/Cellar/cmake/3.18.4/bin/cmake -E cmake_depends "Unix Makefiles" /ch09_07_work_stealing_queues /ch09_07_work_stealing_queues /ch09_07_work_stealing_queues/build /ch09_07_work_stealing_queues/build /ch09_07_work_stealing_queues/build/CMakeFiles/test_cpp_multi.dir/DependInfo.cmake --color=
/Applications/Xcode.app/Contents/Developer/usr/bin/make  -f CMakeFiles/test_cpp_multi.dir/build.make CMakeFiles/test_cpp_multi.dir/build
[ 33%] Linking CXX executable bin/test_cpp_multi
/usr/local/Cellar/cmake/3.18.4/bin/cmake -E cmake_link_script CMakeFiles/test_cpp_multi.dir/link.txt --verbose=1
/Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/bin/c++ -g -isysroot /Applications/Xcode.app/Contents/Developer/Platforms/MacOSX.platform/Developer/SDKs/MacOSX11.1.sdk -Wl,-search_paths_first -Wl,-headerpad_max_install_names CMakeFiles/test_cpp_multi.dir/src/thread_safe_queue.cpp.o CMakeFiles/test_cpp_multi.dir/src/helloworld.cpp.o -o bin/test_cpp_multi  -lpthread 
Undefined symbols for architecture x86_64:
  "thread-local wrapper routine for thread_pool::local_work_queue", referenced from:
      thread_pool::worker_thread(unsigned int) in helloworld.cpp.o
      thread_pool::pop_task_from_local_queue(function_wrapper&) in helloworld.cpp.o
      std::__1::future<std::__1::result_of<std::__1::__bind<std::__1::list<int, std::__1::allocator<int> > (sorter<int>::*)(std::__1::list<int, std::__1::allocator<int> >&), sorter<int>*, std::__1::list<int, std::__1::allocator<int> > > ()>::type> thread_pool::submit<std::__1::__bind<std::__1::list<int, std::__1::allocator<int> > (sorter<int>::*)(std::__1::list<int, std::__1::allocator<int> >&), sorter<int>*, std::__1::list<int, std::__1::allocator<int> > > >(std::__1::__bind<std::__1::list<int, std::__1::allocator<int> > (sorter<int>::*)(std::__1::list<int, std::__1::allocator<int> >&), sorter<int>*, std::__1::list<int, std::__1::allocator<int> > >) in helloworld.cpp.o
  "thread-local wrapper routine for thread_pool::my_index", referenced from:
      thread_pool::worker_thread(unsigned int) in helloworld.cpp.o
      thread_pool::pop_task_from_other_thread_queue(function_wrapper&) in helloworld.cpp.o
ld: symbol(s) not found for architecture x86_64
clang: error: linker command failed with exit code 1 (use -v to see invocation)
make[2]: *** [bin/test_cpp_multi] Error 1
make[1]: *** [CMakeFiles/test_cpp_multi.dir/all] Error 2
make: *** [all] Error 2

Indefinite waiting in the fine-grained thread-safe queue (Listings 6.7 to 6.10)

Hi there,

it seems that the fine-grained thread-safe queue of Listings 6.7 to 6.10 can block indefinitely. I took the code from the book, with a minor fix (see here for my bug report of that fix).

Here is a complete reproduction of the class from those listings:

template <typename T>
class threadsafe_queue {
private:
    struct node {
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;
    };
    std::mutex head_mutex;
    std::unique_ptr<node> head;
    std::mutex tail_mutex;
    node* tail;
    std::condition_variable data_cond;

public:
    threadsafe_queue()
        : head(new node)
        , tail(head.get())
    {
    }
    threadsafe_queue(const threadsafe_queue& other) = delete;
    threadsafe_queue& operator=(const threadsafe_queue& other) = delete;

    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_ptr<node> const old_head = wait_pop_head();
        return old_head->data;
    }
    void wait_and_pop(T& value)
    {
        std::unique_ptr<node> const old_head = wait_pop_head(value);
    }

    std::shared_ptr<T> try_pop()
    {
        std::unique_ptr<node> old_head = try_pop_head();
        return old_head ? old_head->data : std::shared_ptr<T>();
    }
    bool try_pop(T& value)
    {
        std::unique_ptr<node> const old_head = try_pop_head(value);
        return old_head;
    }
    bool empty()
    {
        std::lock_guard<std::mutex> head_lock(head_mutex);
        return (head.get() == get_tail());
    }
    void push(T new_value)
    {
        std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
        std::unique_ptr<node> p(new node);
        {
            std::lock_guard<std::mutex> tail_lock(tail_mutex);
            tail->data = new_data;
            node* const new_tail = p.get();
            tail->next = std::move(p);
            tail = new_tail;
        }
        data_cond.notify_one();
    }

private:
    std::unique_ptr<node> try_pop_head()
    {
        std::lock_guard<std::mutex> head_lock(head_mutex);
        if (head.get() == get_tail()) {
            return std::unique_ptr<node>();
        }
        return pop_head();
    }
    std::unique_ptr<node> try_pop_head(T& value)
    {
        std::lock_guard<std::mutex> head_lock(head_mutex);
        if (head.get() == get_tail()) {
            return std::unique_ptr<node>();
        }
        value = std::move(*head->data);
        return pop_head();
    }
    node* get_tail()
    {
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
        return tail;
    }
    std::unique_ptr<node> pop_head()
    {
        std::unique_ptr<node> old_head = std::move(head);
        head = std::move(old_head->next);
        return old_head;
    }
    std::unique_lock<std::mutex> wait_for_data()
    {
        std::unique_lock<std::mutex> head_lock(head_mutex);

        // ================================================
        // The following line can block indefinitely:
        data_cond.wait(head_lock, [&] { return head.get() != get_tail(); });
        return head_lock;
    }
    std::unique_ptr<node> wait_pop_head()
    {
        std::unique_lock<std::mutex> head_lock(wait_for_data());
        return pop_head();
    }
    std::unique_ptr<node> wait_pop_head(T& value)
    {
        std::unique_lock<std::mutex> head_lock(wait_for_data());
        value = std::move(*head->data);
        return pop_head();
    }
};

For testing, I used the code form Listing 11.1:

void test_concurrent_push_and_pop_on_empty_queue()
{
    threadsafe_queue<int> q;
    std::promise<void> go, push_ready, pop_ready;
    std::shared_future<void> ready(go.get_future());
    std::future<void> push_done;
    std::future<int> pop_done;
    try {
        push_done = std::async(std::launch::async,
            [&q, ready, &push_ready]() {
                push_ready.set_value();
                ready.wait();
                q.push(42);
            });
        pop_done = std::async(std::launch::async,
            [&q, ready, &pop_ready]() {
                pop_ready.set_value();
                ready.wait();
                // return q.pop();
                return *q.wait_and_pop();
            });
        push_ready.get_future().wait();
        pop_ready.get_future().wait();
        go.set_value();
        push_done.get();
        assert(pop_done.get() == 42);
        assert(q.empty());
    } catch (...) {
        go.set_value();
        throw;
    }
}

The only change to the original test is the replacement of q.pop(); with return *q.wait_and_pop();, to match the interface of the queue. (NB: I find it noteworthy that that test uses pop() in the first place - after so many chapters taking about correctly defining concurrent interfaces...).

Running that test function in a loop of ~100 iterations is enough on my setup to reliably reach a blocking state. The line

data_cond.wait(...)

seems to block, and not get woken up any more. I played around with this a bit, and interestingly, it seems that this is despite the data_cond.notify_one(); happening after the wait() in the cases where this blocks...

To test a bit more, I replaced the wait with a loop

while( head.get() == get_tail() ) {
    data_cond.wait_for(
        head_lock,
        std::chrono::milliseconds(1)
    );
}

and this seems to work, but that is of course less then ideal, and in the end slower then the thread-safe queue with a mutex on the whole data structure from Listing 4.5...

Anyway, what is going on here, and can anyone confirm that this is happening? It's weird, as the example is coming straight from the book, and I had hoped that this is a correct implementation.

My system is Ubuntu 20.04.6 LTS, and the problem occurs both with Clang 11 and GCC 10.5.0.

Cheers and thanks
Lucas

Listing 9.3 parallel_accumulate using a thread pool with waitable tasks do not compile

Hi, wonder if there is anyone who have manged to get it working.

Firstly, errors on submit() function like:

:9749:52: required from here
:9678:18: error: no matching function for call to
โ€˜cxx_thread_pool::listing_9_2::thread_pool::submit(
cxx_thread_pool::accumulate_block<__gnu_cxx::__normal_iterator<int*, std::vector >, int>&)โ€™

9678 | futures[i] = pool.submit(ac);
:9618:55: note: candidate: โ€˜template std::future<typename std::result_of<F()>::type> cxx_thread_pool::listing_9_2::thread_pool::submit(F)โ€™
9618 | std::future<typename std::result_of<F()>::type> submit(F f)
| ^~~~~~
:9618:55: note: template argument deduction/substitution failed:
: In substitution of โ€˜template std::future<typename std::result_of<F()>::type> cxx_thread_pool::listing_9_2::thread_pool::submit(F) [with F = cxx_thread_pool::accumulate_block<__gnu_cxx::__normal_iterator<int*, std::vector >, int>]โ€™:
:9678:18: required from โ€˜T cxx_thread_pool::parallel_accumulate(Iterator, Iterator, T) [with Iterator = __gnu_cxx::__normal_iterator<int*, std::vector >; T = int]โ€™
:9749:52: required from here
:9618:55: error: no type named โ€˜typeโ€™ in โ€˜class std::result_of<cxx_thread_pool::accumulate_block<__gnu_cxx::__normal_iterator<int*, std::vector >, int>()>โ€™

Secondly, not sure how the task submitted gets the range of block to work on since submit() expects F that has no arguments.

Any help or suggestions?

Many thanks

Possible bug in listings 7.4-5

I could be wrong about this, but I thought it better to ask a stupid question & learn from it.

If one thread gets suspended (or there's a cache-miss or something) in pop() between testing if old_head isn't nullptr, and the compare/exchange with head/old_head/old_head->next, and another thread succeeds in its compare/exchange with chain_pending_nodes() as called by chain_pending_node() as called by try_reclaim() when threads_in_app != 1 ... then couldn't the stack get replaced by the list of to-be-deleted nodes?

If the compare/exchange in pop() doesn't begin until the thread resumes, then I don't think it won't detect that old_head->next has changed.

The solution, I believe, would be to have a separate node* next_free in struct node.

Listing 9.6: the local_work_queue is always nullptr

In the function worker_thread(), "local_work_queue" call reset() and then it's not NULL.
However, in the function submit(), this variable is always nullptr, so the tasks are pushed into global queue.
Why?

6.11 doesn't compile

typedef typename bucket_data::iterator bucket_iterator;
bucket_iterator find_entry_for(Key const& key) const
{
  return std::find_if(data.begin(), data.end(), [&](bucket_value const& item) {
    return item.first == key;
  });
}

This returns a const_iterator, the compiler issues an error because it can't convert between the two.

Listing 9.5 crashes

void operator() () { impl_->call(); } // Thread 4: EXC_BAD_ACCESS (code=1, address=0x0)

Code crashes when running at real time and doesn't sort when stepping through with a debugger.

Don't have time to keep debugging this author's code - please fix.

Code used - (click to expand / collapse
#include <atomic>
#include <memory>
#include <functional>
#include <vector>
#include <thread>
#include <numeric>
#include <iostream>
#include <future>
#include <list>

namespace mt {
template <typename T>
class queue {
public:
    queue() : head_(std::make_unique<node>()), tail_(head_.get()) { }
    
    queue(const queue&) = delete;
    queue& operator=(const queue&) = delete;
    
    std::shared_ptr<T> try_pop()
    {
        std::unique_ptr<node> old_head = try_pop_head();
        return old_head ? old_head->data_ : std::shared_ptr<T>();
    }
    
    bool try_pop(T &val)
    {
        std::unique_ptr<node> old_head = try_pop_head(val);
        return old_head.get();
    }
    
    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_ptr<node> old_head = wait_pop_head();
        return old_head->data_;
    }
    
    void wait_and_pop(T &val)
    {
        std::unique_ptr<node> old_head = wait_pop_head(val);
    }
    
    template <typename V>
    void push(V &&val)
    {
        auto new_data = std::make_shared<T>(std::forward<V>(val));
        auto p = std::make_unique<node>();
        
        {
            std::lock_guard lock(tail_m);
            tail_->data_ = new_data;
            node *new_tail = p.get();
            tail_->next_ = std::move(p);
            tail_ = new_tail;
        }
        
        cv.notify_one();
    }
    
    bool empty() const
    {
        std::lock_guard lock(head_m);
        return head_.get() == get_tail();
    }
    
private:
    struct node
    {
        std::shared_ptr<T> data_;
        std::unique_ptr<node> next_;
    };
    
    std::unique_ptr<node> pop_head()
    {
        std::unique_ptr<node> old_head = std::move(head_);
        head_ = std::move(old_head->next_);
        return old_head;
    }
    
    std::unique_lock<std::mutex> wait_for_data()
    {
        std::unique_lock<std::mutex> lock(head_m);
        cv.wait(lock, [&] () { return head_.get() != get_tail(); } );
        return lock;
    }
    
    std::unique_ptr<node> wait_pop_head()
    {
        std::unique_lock<std::mutex> lock(wait_for_data());
        return pop_head();
    }
    
    std::unique_ptr<node> wait_pop_head(T &val)
    {
        std::unique_lock<std::mutex> lock(wait_for_data());
        val = std::move(*head_->data_);
        return pop_head();;
    }
    
    node* get_tail() const
    {
        std::lock_guard lock(tail_m);
        return tail_;
    }
    
    std::unique_ptr<node> try_pop_head()
    {
        std::lock_guard lock(head_m);
        if (head_.get() == get_tail()) { return std::unique_ptr<node>(); }
        return pop_head();
    }
    
    std::unique_ptr<node> try_pop_head(T &val)
    {
        std::lock_guard lock(head_m);
        if (head_.get() == get_tail()) { return std::unique_ptr<node>(); }
        val = std::move(*head_->data_);
        return pop_head();
    }
    
    std::unique_ptr<node> head_;
    node *tail_;
    
    mutable std::mutex head_m;
    mutable std::mutex tail_m;
    
    std::condition_variable cv;
};
} // namespace mt (multi-threaded)

// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

class join_threads {
public:
    explicit join_threads(std::vector<std::thread> &threads) : threads_(threads) { }
    
    ~join_threads()
    {
        for (auto &t : threads_)
            if (t.joinable()) { t.join(); }
    }
    
private:
    std::vector<std::thread> &threads_;
};

// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

class function_wrapper {
public:
    function_wrapper() = default;
    
    function_wrapper(const function_wrapper&) = delete;
    function_wrapper(function_wrapper&) = delete;
    function_wrapper& operator=(const function_wrapper&) = delete;
    
    function_wrapper(function_wrapper &&other) noexcept : impl_(std::move(other.impl_)) { }
    
    function_wrapper& operator=(function_wrapper &&rhs) noexcept
    {
        impl_ = std::move(rhs.impl_);
        return *this;
    }
    
    template <typename Func>
    function_wrapper(Func &&f) noexcept : impl_(std::make_unique<impl_type<Func>>(std::move(f))) { }
    
    void operator() () { impl_->call(); }
    
    
private:
    struct impl_base {
        // abstract base class
        virtual void call() = 0;
        virtual ~impl_base() { }
    };
    
    std::unique_ptr<impl_base> impl_;
    
    template <typename Func>
    struct impl_type : impl_base {
        Func f_;
        
        impl_type(Func &&f) : f_(std::move(f)) { }
        void call() { f_(); }
    };
};

// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

class thread_pool {
public:
    thread_pool() : done_(false), joiner_(threads_)
    {
        std::size_t thread_count = std::thread::hardware_concurrency();
        
        try {
            for (int i = 0; i <= thread_count; ++i)
                threads_.push_back(std::thread(&thread_pool::worker_thread, this));
        } catch (...) {
            done_ = true;
            throw;
        }
    }
    
    ~thread_pool() { done_ = true; }
    
    template <typename Func>
    std::future<std::invoke_result_t<Func&&>> submit(Func f) // std::result_of is deprecated
    {
        typedef std::invoke_result_t<Func&&> T;
        
        std::packaged_task<T()> task(std::move(f));
        std::future<T> result(task.get_future());
        workq_.push(std::move(task));
        
        return result;
    }
    
    void run_pending_task()
    {
        function_wrapper task;
        
        if (workq_.try_pop()) {
            task();
        } else {
            std::this_thread::yield();
        }
    }
    
private:
    // std::atomic_bool done;
    // https://comp.std.cpp.narkive.com/nwMakWji/std-atomic-bool-vs-std-atomic-bool
    std::atomic<bool> done_;
    
    mt::queue<function_wrapper> workq_;
    
    std::vector<std::thread> threads_;
    join_threads joiner_;
    
    void worker_thread()
    {
        while (!done_) {
            function_wrapper task;
            
            if (workq_.try_pop(task)) {
                task();
            } else {
                std::this_thread::yield();
            }
            
        }
    }
};

// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

namespace par {
template <typename T>
struct sorter {
    thread_pool pool;
    
    std::list<T> do_sort(std::list<T> &chunk_data) {
        if (chunk_data.empty()) { return chunk_data; }
        
        std::list<T> result;
        result.splice(result.begin(), chunk_data, chunk_data.begin());
        
        const T &partition_val = *result.begin();
        
        auto divide_point = std::partition(chunk_data.begin(), chunk_data.end(), [&] (const T &val) {
            return val < partition_val;
        });
        
        std::list<T> new_lower_chunk;
        new_lower_chunk.splice(new_lower_chunk.end(), chunk_data, chunk_data.begin(), divide_point);
        
        std::future<std::list<T>> new_lower = pool.submit(std::bind(&sorter::do_sort, this, std::move(new_lower_chunk)));
        // std::future<std::list<T>> new_lower = pool.submit([this, &new_lower_chunk] () { do_sort(std::move(new_lower_chunk)); } );
        
        std::list<T> new_higher(do_sort(chunk_data));
        result.splice(result.end(), new_higher);
        
        while (new_lower.wait_for(std::chrono::seconds(0)) == std::future_status::timeout) {
            pool.run_pending_task();
        }
        
        result.splice(result.begin(), new_lower.get());
        
        return result;
    }
};

template <typename T>
std::list<T> quicksort(std::list<T> input) {
    if (input.empty()) { return input; }
    sorter<T> s;
    return s.do_sort(input);
}

} // namespace par

// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

void printList(const std::list<int> &ivec) {
    for (int i : ivec) {
        std::cout << i << ' ';
    } std::cout << '\n';
}

// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

int main()
{
    std::list<int> ilist = { 9, 8, 7, 6, 5, 4, 3, 2, 1 };
    std::cout << "ilist:  ";
    printList(ilist);
    
    std::list<int> result = par::quicksort(ilist);
    
    std::cout << "result: ";
    printList(ilist);
    
    return 0;
}

Function f() in listing_2.6 needs extra parentheses

Hello,
ฤฐt does not work.
I think function declaration and object creation are mixed in function "f()" this line needs extra parentheses:

scoped_thread t(std::thread(func(some_local_state)));
should be
scoped_thread t((std::thread(func(some_local_state))));
or
scoped_thread t{std::thread(func(some_local_state))};

Bug in listings 6.9 and 6.10

Hi there!

Listing 6.9 and 6.10 contain a comparison of a unique_ptr with a raw pointer:

head!=get_tail()

See here and here, which fails to compile.

This is done correctly in other places in Listing 6.10 though, where head.get() is used instead.

Furthermore, Listing 6.9 declares a unique_ptr const

std::unique_ptr<node> const old_head=std::move(head);

This fails to compile as well, see here, as the const pointer cannot be returned any more.

Interestingly, none of these occur in the book. It might hence be better to use the code from the book instead. I wonder why those two are out of sync though :-)

Lastly, in Listing 6.9, a local object is moved, see here

return std::move(head_lock);

This prevents copy elision, and leads to an error with my compiler as well. A simple return would do here. This is in the Listing here and in the book.

Cheers
Lucas

fix listing 3.8

void unlock()
    {
        if(this_thread_hierarchy_value!=hierarchy_value)
            throw std::logic_error("mutex hierarchy violated"); // missing
        this_thread_hierarchy_value=previous_hierarchy_value;
        internal_mutex.unlock();
    }

Listing C.8 does not compile

Class bank_machine has a member variable called balance. The comparison f(balance>=msg.amount) and decrementing of balance-=msg.amount; do not compile. I changed the name from balance to bank_balance and everything compiles correct in VS2019 16.11.2 with c++20. I believe that the balance message and balance member variable are being confused by the compiler.

threadpool example crashes in my attempt to use it via apple clang

First off thanks for writing this wonderful book. You may want to report that you are the copyright holder to github for a repo that contains a copy stored on it https://github.com/KnightofDawn/book-1/blob/master/C%2B%2B%20Concurrency%20in%20Action%2C%202nd%20Edition.pdf

my question though is regarding my attempt to use your book (the real one, not the pdf) .. and it is crashing

https://stackoverflow.com/questions/65427621/example-c-threadpool-is-crashing

edit: https://stackoverflow.com/questions/65637282/c-no-instance-of-overloaded-function-but-using-template-typename/65637408#65637408 was my rewording of the issue which got me unblocked

void find_the_answer_to_ltuae(){
    std::cout << "About to sleep 10 seconds...";
    sleep(10);
    std::cout << 100 << std::endl;
}

int main()
{
    std::cout << "About to create my_threadpool...." << std::endl;
    thread_pool my_threadpool;
    std::cout << "Done creating my_threadpool...." << std::endl << "Submitting first job now";
    my_threadpool.submit(find_the_answer_to_ltuae);
    std::cout<<"The answer is " << 42 << std::endl;
    sleep(100);
}
% ./build/bin/test_cpp_multi
About to create my_threadpool....
thread_pool()... hardware_concurrency() = 2
push_back!push_back!Done!
Done creating my_threadpool....
zsh: illegal hardware instruction  ./build/bin/test_cpp_multi

when i tried to use all 12 threads according to the hardware concurrency setting.. that too crashed .. so I hardcoded back down to 2 threads in the pool.. but it then crashed somewhere else in the program.. unsure why or how to get it working on apple clang.

thanks!

Possible broken pop in listings 7.17

Read the following pop code, I found there may be something wrong with res=ptr->data.exchange(nullptr), which should be replaced something like res=ptr->data.load().

    std::unique_ptr<T> pop()
    {
        counted_node_ptr old_head=head.load(std::memory_order_relaxed);
        for(;;)
        {
            increase_external_count(head,old_head);
            node* const ptr=old_head.ptr;
            if(ptr==tail.load().ptr)
            {
                ptr->release_ref();
                return std::unique_ptr<T>();
            }
            if(head.compare_exchange_strong(old_head,ptr->next))
            {
                T* const res=ptr->data.exchange(nullptr);
                free_external_counter(old_head);
                return std::unique_ptr<T>(res);
            }
            ptr->release_ref();
        }
    }

`std::partial_sum` example โ€“ pg. 290

"...onto the elements in the third and final chunk to get the final result: {1, 3, 6}, {10, 15, 21}, {28, 36, 55}." โ€“ pg. 290

The final element in the std::partial_sum calculation should be 45, not 55 (24 + 21).

It appears as 45 further down the page.

"..., and finally after round four you have 1, 3, 6, 10, 15, 21, 28, 36, 45, which is the final answer." โ€“ pg. 290

Listing 8.1 Runtime Error

Combining Listing 6.1 and Listing 8.1 we get the following runtime error:

#include <exception>
#include <stack>
#include <mutex>
#include <memory>
struct empty_stack: std::exception
{
const char* what() const throw()
{
return "empty stack";
}
};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack(){}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
data=other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
std::shared_ptr<T> const res(
std::make_shared<T>(std::move(data.top())));
data.pop();
return res;
}
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
value=std::move(data.top());
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};

#include <thread>
#include <vector>
#include <future>
#include <list>
template<typename T>
struct sorter
{
struct chunk_to_sort
{
std::list<T> data;
std::promise<std::list<T> > promise;
};
thread_safe_stack<chunk_to_sort> chunks;
std::vector<std::thread> threads;
unsigned const max_thread_count;
std::atomic<bool> end_of_data;
sorter():
max_thread_count(std::thread::hardware_concurrency()-1),
end_of_data(false)
{}
~sorter()
{
end_of_data=true;
for(unsigned i=0;i<threads.size();++i)
{
threads[i].join();
}
}
void try_sort_chunk()
{
boost::shared_ptr<chunk_to_sort > chunk=chunks.pop();
if(chunk)
{
sort_chunk(chunk);
}
}
std::list<T> do_sort(std::list<T>& chunk_data)
{
if(chunk_data.empty())
{
return chunk_data;
}
std::list<T> result;
result.splice(result.begin(),chunk_data,chunk_data.begin());
T const& partition_val=*result.begin();
typename std::list<T>::iterator divide_point=
std::partition(chunk_data.begin(),chunk_data.end(),
[&](T const& val){return val<partition_val;});
chunk_to_sort new_lower_chunk;
new_lower_chunk.data.splice(new_lower_chunk.data.end(),
chunk_data,chunk_data.begin(),
divide_point);
std::future<std::list<T> > new_lower=
new_lower_chunk.promise.get_future();
chunks.push(std::move(new_lower_chunk));
if(threads.size()<max_thread_count)
{
threads.push_back(std::thread(&sorter<T>::sort_thread,this));
}
std::list<T> new_higher(do_sort(chunk_data));
result.splice(result.end(),new_higher);
while(new_lower.wait_for(std::chrono::seconds(0)) !=
std::future_status::ready)
{
try_sort_chunk();
}
result.splice(result.begin(),new_lower.get());
return result;
}
void sort_chunk(boost::shared_ptr<chunk_to_sort > const& chunk)
{
chunk->promise.set_value(do_sort(chunk->data));
}
void sort_thread()
{
while(!end_of_data)
{
try_sort_chunk();
std::this_thread::yield();
}
}
};
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
if(input.empty())
{
return input;
}
sorter<T> s;
return s.do_sort(input);
}

libc++abi: terminating due to uncaught exception of type empty_stack: empty stack
Message from debugger: killed
Program ended with exit code: 9

This is after making a few amendments to allow the code to compile.

// thread_safe_stack<chunk_to_sort> chunks;
threadsafe_stack<chunk_to_sort> chunks;
// std::shared_ptr<T> pop()
boost::shared_ptr<T> pop()
    {
        std::lock_guard<std::mutex> lock(m);
        if(data.empty()) throw empty_stack();
        // std::shared_ptr<T> const res( std::make_shared<T>(std::move(data.top()))); 
        boost::shared_ptr<T> const res(new T(std::move(data.top())));
        data.pop();
        return res;
    }

The same error shows with both boost::shared_ptr<T> and std::shared_ptr<T>.


I will link a PR once I find a solution.


On a personal note, please...please test your code...

Whilst I appreciate the following from page 255...

"As with most of the examples, this is intended to demonstrate an idea rather than being production-ready code."

, ...it's extremely unhelpful as a reader to be exposed to code that either doesn't compile, run, or exhibits data races / undefined behaviour on a frequent basis.

It would also be nice if this repo could be rejigged to include a combination of listing excerpts (that focus on the changes) and full listings (that demonstrate functionality)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.