Coder Social home page Coder Social logo

pipes's Introduction

Build Status GitHub

become a patron

Pipes are small components for writing expressive code when working on collections. Pipes chain together into a pipeline that receives data from a source, operates on that data, and send the results to a destination.

This is a header-only library, implemented in C++14.

The library is under development and subject to change. Contributions are welcome. You can also log an issue if you have a wish for enhancement or if you spot a bug.

Contents

A First Example

Here is a simple example of a pipeline made of two pipes: transform and filter:

auto const source = std::vector<int>{0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
auto destination = std::vector<int>{};

source >>= pipes::filter([](int i){ return i % 2 == 0; })
       >>= pipes::transform([](int i){ return i * 2; })
       >>= pipes::push_back(destination);

// destination contains {0, 4, 8, 12, 16};

What's going on here:

  • Each elements of source is sent to filter.
  • Every time filter receives a piece of data, it sends its to the next pipe (here, transform) only if that piece of data satisfies filter's' predicate.
  • transform then applies its function on the data its gets and sends the result to the next pipe (here, pipes::push_back).
  • pipes::push_back push_backs the data it receives to its vector (here, destination).

A Second Example

Here is a more elaborate example with a pipeline that branches out in several directions:

A >>= pipes::transform(f)
  >>= pipes::filter(p)
  >>= pipes::unzip(pipes::push_back(B),
                   pipes::fork(pipes::push_back(C),
                               pipes::filter(q) >>= pipes::push_back(D),
                               pipes::filter(r) >>= pipes::push_back(E));

Here, unzip takes the std::pairs or std::tuples it receives and breaks them down into individual elements. It sends each element to the pipes it takes (here pipes::push_back and fork).

fork takes any number of pipes and sends the data it receives to each of them.

Since data circulates through pipes, real life pipes and plumbing provide a nice analogy (which gave its names to the library). For example, the above pipeline can be graphically represented like this:

Doesn't it look like ranges?

Pipes sort of look like ranges adaptors from afar, but those two libraries have very different designs.

Range views are about adapting ranges with view layers, and reading through those layers in lazy mode. Ranges are "pull based", in that components ask for the next value. Pipes are about sending pieces of data as they come along in a collection through a pipeline, and let them land in a destination. Pipes are "push based", in that components wait for the next value.

Ranges and pipes have overlapping components such as transform and filter. But pipes do things like ranges can't do, such as pipes::mux, pipes::fork and pipes:unzip, and ranges do things that pipes can't do, like infinite ranges.

It is possible to use ranges and pipes in the same expression though:

ranges::view::zip(dadChromosome, momChromosome)
    >>= pipes::transform(crossover) // crossover takes and returns a tuple of 2 elements
    >>= pipes::unzip(pipes::push_back(gameteChromosome1),
                     pipes::push_back(gameteChromosome2));

Operating on several collections

The pipes library allows to manipulate several collections at the same time, with the pipes::mux helper. Note that contrary to range::view::zip, pipes::mux doesn't require to use tuples:

auto const input1 = std::vector<int>{1, 2, 3, 4, 5};
auto const input2 = std::vector<int>{10, 20, 30, 40, 50};

auto results = std::vector<int>{};

pipes::mux(input1, input2) >>= pipes::filter   ([](int a, int b){ return a + b < 40; })
                           >>= pipes::transform([](int a, int b) { return a * b; })
                           >>= pipes::push_back(results);

// results contains {10, 40, 90}

Operating on all the possible combinations between several collections

pipes::cartesian_product takes any number of collections, and generates all the possible combinations between the elements of those collections. It sends each combination successively to the next pipe after it.

Like pipes::mux, pipes::cartesian_product doesn't use tuples but sends the values directly to the next pipe:

auto const inputs1 = std::vector<int>{1, 2, 3};
auto const inputs2 = std::vector<std::string>{"up", "down"};

auto results = std::vector<std::string>{};

pipes::cartesian_product(inputs1, inputs2)
    >>= pipes::transform([](int i, std::string const& s){ return std::to_string(i) + '-' + s; })
    >>= pipes::push_back(results);

// results contains {"1-up", "1-down", "2-up", "2-down", "3-up", "3-down"}

Operating on adjacent elements of a collection

pipes::adjacent allows to send adjacent pairs of element from a range to a pipeline:

auto const input = std::vector<int>{1, 2, 4, 7, 11, 16};

auto results = std::vector<int>{};

pipes::adjacent(input)
    >>= pipes::transform([](int a, int b){ return b - a; })
    >>= pipes::push_back(results);

// result contains {1, 2, 3, 4, 5};

Operating on all combinations of elements of one collection

pipes::combinations sends each possible couple of different elements of a range to a pipeline:

auto const inputs = std::vector<int>{ 1, 2, 3, 4, 5 };

auto results = std::vector<std::pair<int, int>>{};

 pipes::combinations(inputs)
     >>= pipes::transform([](int i, int j){ return std::make_pair(i, j); })
     >>= pipes::push_back(results);
     
 /*
 results contains:
 {
    { 1, 2 }, { 1, 3 }, { 1, 4 }, { 1, 5 },
              { 2, 3 }, { 2, 4 }, { 2, 5 },
                        { 3, 4 }, { 3, 5 },
                                  { 4, 5 }
 }
 /*

End pipes

This library also provides end pipes, which are components that send data to a collection in an elaborate way. For example, the map_aggregate pipe receives std::pair<Key, Value>s and adds them to a map with the following rule:

  • if its key is not already in the map, insert the incoming pair in the map,
  • otherwise, aggregate the value of the incoming pair with the existing one in the map.

Example:

std::map<int, std::string> entries = { {1, "a"}, {2, "b"}, {3, "c"}, {4, "d"} };
std::map<int, std::string> entries2 = { {2, "b"}, {3, "c"}, {4, "d"}, {5, "e"} };
std::map<int, std::string> results;

// results is empty

entries >>= pipes::map_aggregator(results, concatenateStrings);

// the elements of entries have been inserted into results

entries2 >>= pipes::map_aggregator(results, concatenateStrings);

// the new elements of entries2 have been inserter into results, the existing ones have been concatenated with the new values 
// results contains { {1, "a"}, {2, "bb"}, {3, "cc"}, {4, "dd"}, {5, "e"} }

All components are located in the namespace pipes.

Easy integration with STL algorithms

All pipes can be used as output iterators of STL algorithms:

std::set_difference(begin(setA), end(setA),
                    begin(setB), end(setB),
                    transform(f) >>= filter(p) >>= map_aggregator(results, addValues));

Streams support

The contents of an input stream can be sent to a pipe by using read_in_stream. The end pipe to_out_stream sends data to an output stream.

The following example reads strings from the standard input, transforms them to upper case, and sends them to the standard output:

std::cin >>= pipes::read_in_stream<std::string>{}
         >>= pipes::transform(toUpper)
         >>= pipes::to_out_stream(std::cout);

List of available pipes

General pipes

dev_null

dev_null is a pipe that doesn't do anything with the value it receives. It is useful for selecting only some data coming out of an algorithm that has several outputs. An example of such algorithm is set_segregate:

std::set<int> setA = {1, 2, 3, 4, 5};
std::set<int> setB = {3, 4, 5, 6, 7};

std::vector<int> inAOnly;
std::vector<int> inBoth;

sets::set_seggregate(setA, setB,
                     pipes::push_back(inAOnly),
                     pipes::push_back(inBoth),
                     dev_null{});

// inAOnly contains {1, 2}
// inBoth contains {3, 4, 5}

drop

drop is a pipe that ignores the first N incoming values, and sends on the values after them to the next pipe:

auto const input = std::vector<int>{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

auto result = std::vector<int>{};

input >>= pipes::drop(5)
      >>= pipes::push_back(result);

// result contains { 6, 7, 8, 9, 10 }

drop_while

drop is a pipe that ignores the incoming values until they stop satisfying a predicate, and sends on the values after them to the next pipe:

auto const input = std::vector<int>{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

auto result = std::vector<int>{};

input >>= pipes::drop_while([](int i){ return i != 6; })
      >>= pipes::push_back(result);

// result contains { 6, 7, 8, 9, 10 }

filter

filter is a pipe that takes a predicate p and, when it receives a value x, sends the result on to the next pipe iif p(x) is true.

std::vector<int> input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
std::vector<int> results;

input >>= pipes::filter([](int i){ return i % 2 == 0; })
      >>= pipes::push_back(results);

// results contains {2, 4, 6, 8, 10}

fork

fork is a pipe that takes any number of pipes, and sends a copy of the values it receives to each of those pipes.

std::vector<int> input = {1, 2, 3, 4, 5};
std::vector<int> results1;
std::vector<int> results2;
std::vector<int> results3;

input >>= pipes::fork(pipes::push_back(results1),
                       pipes::push_back(results2),
                       pipes::push_back(results3));

// results1 contains {1, 2, 3, 4, 5}
// results2 contains {1, 2, 3, 4, 5}
// results3 contains {1, 2, 3, 4, 5}

join

The join pipe receives collection and sends each element of each of those collections to the next pipe:

auto const input = std::vector<std::vector<int>>{ {1, 2}, {3, 4}, {5, 6} };
auto results = std::vector<int>{};

input >>= pipes::join >>= pipes::push_back(results);

// results contain {1, 2, 3, 4, 5, 6}

partition

partition is a pipe that takes a predicate p and two other pipes. When it receives a value x, sends the result on to the first pipe iif p(x) is true, and to the second pipe if p(x) is false.

std::vector<int> input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
std::vector<int> evens;
std::vector<int> odds;

input >>= pipes::partition([](int n){ return n % 2 == 0; },
                           pipes::push_back(evens),
                           pipes::push_back(odds));

// evens contains {2, 4, 6, 8, 10}
// odds contains {1, 3, 5, 7, 9}

read_in_stream

read_in_stream is a template pipe that reads from an input stream. The template parameter indicates what type of data to request from the stream:

auto const input = std::string{"1.1 2.2 3.3"};

std::istringstream(input) >>= pipes::read_in_stream<double>{}
                          >>= pipes::transform([](double d){ return d * 10; })
                          >>= pipes::push_back(results);

// results contain {11, 22, 33};

switch

switch_ is a pipe that takes several case_ branches. Each branch contains a predicate and a pipe. When it receives a value, it tries it successively on the predicates of each branch, and sends the value on to the pipe of the first branch where the predicate returns true. The default_ branch is equivalent to one that takes a predicate that returns always true. Having a default_ branch is not mandatory.

std::vector<int> numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
std::vector<int> multiplesOf4;
std::vector<int> multiplesOf3;
std::vector<int> rest;

numbers >>= pipes::switch_(pipes::case_([](int n){ return n % 4 == 0; }) >>= pipes::push_back(multiplesOf4),
                           pipes::case_([](int n){ return n % 3 == 0; }) >>= pipes::push_back(multiplesOf3),
                           pipes::default_ >>= pipes::push_back(rest) ));

// multiplesOf4 contains {4, 8};
// multiplesOf3 contains {3, 6, 9};
// rest contains {1, 2, 5, 7, 10};

stride

stride is a pipe that sends every Nth element starting from the first one. Hence N-1 elements after every Nth element are ignored

auto const input = std::vector<int>{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

auto result = std::vector<int>{};

input >>= pipes::stride(3)
      >>= pipes::push_back(result);
      
// result contains {1, 4, 7, 10}

take

take takes a number N and sends to the next pipe the first N element that it receives. The elements after it are ignored:

auto const input = std::vector<int>{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

auto result = std::vector<int>{};

input >>= pipes::take(6)
      >>= pipes::push_back(result);
      
// result contains {1, 2, 3, 4, 5, 6}

take_while

take_while takes a predicate and sends to the next pipe the first values it receives. It stops when one of them doesn't satisfy the predicate:

auto const input = std::vector<int>{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

auto result = std::vector<int>{};

input >>= pipes::take_while([](int i){ return i != 7; })
      >>= pipes::push_back(result);

// result contains {1, 2, 3, 4, 5, 6}

tee

tee is a pipe that takes one other pipe, and sends a copy of the values it receives to each of these pipes before sending them on to the next pipe. Like the tee command on UNIX, this pipe is useful to take a peek at intermediary results.

auto const inputs = std::vector<int>{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
auto intermediaryResults = std::vector<int>{};
auto results = std::vector<int>{};

inputs
    >>= pipes::transform([](int i) { return i * 2; })
    >>= pipes::tee(pipes::push_back(intermediaryResults))
    >>= pipes::filter([](int i){ return i >= 12; })
    >>= pipes::push_back(results);

// intermediaryResults contains {2, 4, 6, 8, 10, 12, 14, 16, 18, 20}
// results contains {12, 14, 16, 18, 20}

transform

transform is a pipe that takes a function f and, when it receives a value, applies f on it and sends the result on to the next pipe.

std::vector<int> input = {1, 2, 3, 4, 5};
std::vector<int> results;

input >>= pipes::transform([](int i) { return i*2; })
      >>= pipes::push_back(results);

// results contains {2, 4, 6, 8, 10}

unzip

unzip is a pipe that takes N other pipes. When it receives a std::pair or std::tuple of size N (for std::pair N is 2), it sends each of its components to the corresponding output pipe:

std::map<int, std::string> entries = { {1, "one"}, {2, "two"}, {3, "three"}, {4, "four"}, {5, "five"} };
std::vector<int> keys;
std::vector<std::string> values;

entries >>= pipes::unzip(pipes::push_back(keys),
                         pipes::push_back(values)));

// keys contains {1, 2, 3, 4, 5};
// values contains {"one", "two", "three", "four", "five"};

End pipes

for_each

for_each takes a function (or function object) that sends to the data it receives to that function. One of its usages is to give legacy code that does not use STL containers access to STL algorithms:

std::vector<int> input = {1, 2, 3, 4, 5, 6, 7 ,8, 9, 10};

void legacyInsert(int number, DarkLegacyStructure const& thing); // this function inserts into the old non-STL container

DarkLegacyStructure legacyStructure = // ...

std::copy(begin(input), end(input), for_each([&legacyStructure](int number){ legacyInsert(number, legacyStructure); });

Read the full story about making legacy code compatible with the STL.

Note that for_each goes along with a helper function object, do_, that allows to perfom several actions sequentially on the output of the algorithm:

std::copy(begin(input), end(input), pipes::for_each(pipes::do_([&](int i){ results1.push_back(i*2);}).
                                                           then_([&](int i){ results2.push_back(i+1);}).
                                                           then_([&](int i){ results3.push_back(-i);})));

insert

In the majority of cases where it is used in algoritms, std::inserter forces its user to provide a position. It makes sense for un-sorted containers such as std::vector, but for sorted containers such as std::set we end up choosing begin or end by default, which doesn't make sense:

std::vector<int> v = {1, 3, -4, 2, 7, 10, 8};
std::set<int> results;
std::copy(begin(v), end(v), std::inserter(results, end(results)));

insert removes this constraint by making the position optional. If no hint is passed, the containers is left to determine the correct position to insert:

std::vector<int> v = {1, 3, -4, 2, 7, 10, 8};
std::set<int> results;
std::copy(begin(v), end(v), insert(results));

//results contains { -4, 1, 2, 3, 7, 8, 10 }

Read the full story about insert.

map_aggregator

map_aggregator provides the possibility to embark an aggregator function in the inserter iterator, so that new elements whose key is already present in the map can be merged with the existent (e.g. have their values added together).

std::vector<std::pair<int, std::string>> entries = { {1, "a"}, {2, "b"}, {3, "c"}, {4, "d"} };
std::vector<std::pair<int, std::string>> entries2 = { {2, "b"}, {3, "c"}, {4, "d"}, {5, "e"} };
std::map<int, std::string> results;

std::copy(entries.begin(), entries.end(), map_aggregator(results, concatenateStrings));
std::copy(entries2.begin(), entries2.end(), map_aggregator(results, concatenateStrings));

// results contains { {1, "a"}, {2, "bb"}, {3, "cc"}, {4, "dd"}, {5, "e"} }

set_aggreagator provides a similar functionality for aggregating elements into sets.

Read the full story about map_aggregator and set_aggregator.

override

override is the pipe equivalent to calling begin on an existing collection. The data that override receives overrides the first element of the container, then the next, and so on:

std::vector<int> input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
std::vector<int> results = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0};

input >>= pipes::filter([](int i){ return i % 2 == 0; })
      >>= pipes::override(results);

// results contains {2, 4, 6, 8, 10, 0, 0, 0, 0, 0};

override can also write on a specifc data member instead of erasing the complete structure in the outputs:

struct P
{
    int x = 0;
    int y = 0;
};

auto const xs = std::vector<int>{1, 2, 3, 4, 5};
auto results = std::vector<P>(5);

xs >>= pipes::override(results, &P::x);

// results now contains { {1,0}, {2,0}, {3,0}, {4,0}, {5,0} }

override can also send data to a specific setter function of the outputs:

struct P
{
    int x = 0;
    int y = 0;
    
    void setX(int aX){ x = aX; }
};

auto const xs = std::vector<int>{1, 2, 3, 4, 5};
auto results = std::vector<P>(5);

xs >>= pipes::override(results, &P::setX);

// results now contains { {1,0}, {2,0}, {3,0}, {4,0}, {5,0} }

push_back

push_back is a pipe that is equivalent to std::back_inserter. It takes a collection that has a push_back member function, such as a std::vector, and push_backs the values it receives into that collection.

set_aggregator

Like map_aggregator, but inserting/aggregating into std::sets. Since std::set values are const, this pipe erases the element and re-inserts the aggregated value into the std::set.

struct Value
{
    int i;
    std::string s;
};

bool operator==(Value const& value1, Value const& value2)
{
    return value1.i == value2.i && value1.s == value2.s;
}

bool operator<(Value const& value1, Value const& value2)
{
    if (value1.i < value2.i) return true;
    if (value2.i < value1.i) return false;
    return value1.s < value2.s;
}

Value concatenateValues(Value const& value1, Value const& value2)
{
    if (value1.i != value2.i) throw std::runtime_error("Incompatible values");
    return { value1.i, value1.s + value2.s };
}

int main()
{
    std::vector<Value> entries = { Value{1, "a"}, Value{2, "b"}, Value{3, "c"}, Value{4, "d"} };
    std::vector<Value> entries2 = { Value{2, "b"}, Value{3, "c"}, Value{4, "d"}, Value{5, "e"} };
    std::set<Value> results;

    std::copy(entries.begin(), entries.end(), pipes::set_aggregator(results, concatenateValues));
    std::copy(entries2.begin(), entries2.end(), pipes::set_aggregator(results, concatenateValues));

    // results contain { Value{1, "a"}, Value{2, "bb"}, Value{3, "cc"}, Value{4, "dd"}, Value{5, "e"} }
}

to_out_stream

to_out_stream takes an output stream and sends incoming to it:

auto const input = std::vector<std::string>{"word1", "word2", "word3"};

input >>= pipes::transform(toUpper)
      >>= pipes::to_out_stream(std::cout);

// sends "WORD1WORD2WORD3" to the standard output

become a patron

pipes's People

Contributors

alekece avatar ams21 avatar boaz001 avatar fried-water avatar hnosmium0001 avatar jjcasmar avatar joboccara avatar menuet avatar napoli1084 avatar nikbomb avatar offa avatar rathod-sahaab avatar sraaphorst avatar sunnywar avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pipes's Issues

Tests do not build on Visual Studio 2019 version 16.2 with /permissive-

Error:

  C:\PROGRA~2\MICROS~4\2019\ENTERP~1\VC\Tools\MSVC\1422~1.279\bin\HostX64\x64\cl.exe  /nologo /TP  -I..\..\..\include /permissive- /DWIN32 /D_WINDOWS /W3 /GR /EHsc /MDd /ZI /Ob0 /Od /RTC1 /JMC /showIncludes /FoCMakeFiles\tests.dir\tests\demux.cpp.obj /FdCMakeFiles\tests.dir\ /FS -c ..\..\..\tests\demux.cpp
C:\Users\Administrator\source\repos\pipes\include\helpers\meta.hpp(16): error C2672: 'get': no matching overloaded function found
  C:\Users\Administrator\source\repos\pipes\include\helpers\meta.hpp(29): note: see reference to function template instantiation 'F pipes::detail::for_each_impl<_Ty,std::tuple<std::back_insert_iterator<std::vector<int,std::allocator<int>>>,std::back_insert_iterator<std::vector<int,std::allocator<int>>>>&,0,1>(F &&,Tuple,std::integer_sequence<unsigned __int64,0,1>)' being compiled
          with
          [
              F=pipes::demux_pipe<std::back_insert_iterator<std::vector<int,std::allocator<int>>>,std::back_insert_iterator<std::vector<int,std::allocator<int>>>>::onReceive::<lambda_bcc8ffa6a8da3120d18a205209e637ed>,
              _Ty=pipes::demux_pipe<std::back_insert_iterator<std::vector<int,std::allocator<int>>>,std::back_insert_iterator<std::vector<int,std::allocator<int>>>>::onReceive::<lambda_bcc8ffa6a8da3120d18a205209e637ed>,
              Tuple=std::tuple<std::back_insert_iterator<std::vector<int,std::allocator<int>>>,std::back_insert_iterator<std::vector<int,std::allocator<int>>>> &
          ]
  C:\Users\Administrator\source\repos\pipes\include\demux.hpp(20): note: see reference to function template instantiation 'F pipes::detail::for_each<pipes::demux_pipe<std::back_insert_iterator<std::vector<int,std::allocator<_Ty>>>,std::back_insert_iterator<std::vector<_Ty,std::allocator<_Ty>>>>::onReceive::<lambda_bcc8ffa6a8da3120d18a205209e637ed>,std::tuple<std::back_insert_iterator<std::vector<_Ty,std::allocator<_Ty>>>,std::back_insert_iterator<std::vector<_Ty,std::allocator<_Ty>>>>&>(F &&,Tuple)' being compiled
          with
          [
              F=pipes::demux_pipe<std::back_insert_iterator<std::vector<int,std::allocator<int>>>,std::back_insert_iterator<std::vector<int,std::allocator<int>>>>::onReceive::<lambda_bcc8ffa6a8da3120d18a205209e637ed>,
              _Ty=int,
              Tuple=std::tuple<std::back_insert_iterator<std::vector<int,std::allocator<int>>>,std::back_insert_iterator<std::vector<int,std::allocator<int>>>> &
          ]
  C:\Users\Administrator\source\repos\pipes\include\output_iterator.hpp(33): note: see reference to function template instantiation 'void pipes::demux_pipe<std::back_insert_iterator<std::vector<int,std::allocator<_Ty>>>,std::back_insert_iterator<std::vector<_Ty,std::allocator<_Ty>>>>::onReceive<T>(T &&)' being compiled
          with
          [
              _Ty=int,
              T=int
          ]
  C:\Users\Administrator\source\repos\pipes\include\output_iterator.hpp(33): note: see reference to function template instantiation 'void pipes::demux_pipe<std::back_insert_iterator<std::vector<int,std::allocator<_Ty>>>,std::back_insert_iterator<std::vector<_Ty,std::allocator<_Ty>>>>::onReceive<T>(T &&)' being compiled
          with
          [
              _Ty=int,
              T=int
          ]
  C:\Users\Administrator\source\repos\pipes\include\output_iterator.hpp(13): note: see reference to function template instantiation 'Derived &pipes::OutputIteratorBase<Derived>::operator =<T>(T &&)' being compiled
          with
          [
              Derived=pipes::demux_pipe<std::back_insert_iterator<std::vector<int,std::allocator<int>>>,std::back_insert_iterator<std::vector<int,std::allocator<int>>>>,
              T=int
          ]
  C:\Users\Administrator\source\repos\pipes\include\output_iterator.hpp(13): note: see reference to function template instantiation 'Derived &pipes::OutputIteratorBase<Derived>::operator =<T>(T &&)' being compiled
          with
          [
              Derived=pipes::demux_pipe<std::back_insert_iterator<std::vector<int,std::allocator<int>>>,std::back_insert_iterator<std::vector<int,std::allocator<int>>>>,
              T=int
          ]
  C:\Users\Administrator\source\repos\pipes\tests\demux.cpp(98): note: see reference to function template instantiation 'void pipes::send<pipes::demux_pipe<std::back_insert_iterator<std::vector<int,std::allocator<_Ty>>>,std::back_insert_iterator<std::vector<_Ty,std::allocator<_Ty>>>>,int>(OutputIterator &,T &&)' being compiled
          with
          [
              _Ty=int,
              OutputIterator=pipes::demux_pipe<std::back_insert_iterator<std::vector<int,std::allocator<int>>>,std::back_insert_iterator<std::vector<int,std::allocator<int>>>>,
              T=int
          ]
C:\Users\Administrator\source\repos\pipes\include\helpers\meta.hpp(15): error C2784: 'tuple_element<_Idx,std::pair<_Ty1,_Ty2>>::type &std::get(std::pair<_Ty1,_Ty2> &) noexcept': could not deduce template argument for 'std::pair<_Ty1,_Ty2> &' from 'std::tuple<std::back_insert_iterator<std::vector<int,std::allocator<_Ty>>>,std::back_insert_iterator<std::vector<_Ty,std::allocator<_Ty>>>>'
          with
          [
              _Ty=int
          ]
  C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\VC\Tools\MSVC\14.22.27905\include\utility(452): note: see declaration of 'std::get'
C:\Users\Administrator\source\repos\pipes\include\helpers\meta.hpp(16): error C2672: 'operator __surrogate_func': no matching overloaded function found

Seems like the simplest case isn't working here!

I love the idea of this library. I would prefer to use it but the following simple usecase fails in unexpected ways:

pipes::mux(tags, res) >>= pipes::filter([&](auto t, auto r) { return (t == 'b');})
       >>= pipes::unzip(pipes::push_back(bs), pipes::push_back(rs));

Godbolt Link here

`optional` storage may be misaligned

The implementation of optional in helpers stores objects in a character buffer, but the buffer may not meet the alignment requirements for types stored in it.

Move-only type support

It doesn't look like move-only types or move semantics are supported by the library. As a minimal example, I would have expected this to work:

  std::vector<std::unique_ptr<int>> input{ std::make_unique<int>(0), std::make_unique<int>(1), std::make_unique<int>(2) };
  std::vector<std::unique_ptr<int>> results;

  std::move(input) >>= pipes::funnel >>= std::back_inserter(results);

But it tries to copy the std::unique_ptrs and gives a compiler error.

element-wise transform on mux'd pipes

It would be nice if element-wise transformations like this worked on mux'd streams.

std::vector<int> a{ 1, 2, 3, 4 };
std::vector<int> b{ 11, 22, 33, 44 };

mux( a, b ) >>=transform( []( auto i, auto j ) {
      return std::make_tuple( i * 10, j - 5 );
    } )
    >>=  filter( []( auto i, auto j ) { return i + j > 30.0; } )
    >>= to_out_stream( std::cout );

Since this does something different than passing on the returned value, it might be desirable to make a new/different pipe and to keep the existing behavior under the existing name.

Pointer to member support

Pointer to members are not supported, so given a type s:

struct s {
  int get_42() { return 42; }
};
std::vector<s> input (10);
std::vector<int> results;

I have to write:

  input >>= pipes::funnel
    >>= pipes::transform([](auto&& x) { return x.get_42(); })
    >>= back_inserter(results);

instead of:

  input >>= pipes::funnel
    >>= pipes::transform(&s::get_42})
    >>= back_inserter(results);

Pipes

Hi there.
Is there a way to use another data type and not a vector?
Thanks!

tests: missing Makefile (or equivalent)

It should greatly assist on testing the code with the inclusion of something like a Makefile for building the test cases. Direct benefits include:

  1. Making sure the code compiles correctly on supported platforms;
  2. Making sure the code is functioning (i.e., test cases run as intended);
  3. Catching regressions early on.

CPPCON video

Hi,

Really great library just finished watching your cppcon presentation. really nice.

A while back I toyed myself with the push model and pipes etc.
https://github.com/RPeschke/ArggLib

since I never planed on really publishing it its not very well documented. But anyway I came across some of the same problems that you are mentioning on your limitation slide.

  • reverse etc.

operations like this require that you have knowledge about when the stream is over and then act on this knowledge.

To achieve this I implemented my processor with "onStart", "onStop" functionality. The mechanism which called the processor/pipe just checks if a onStart/onEnd function exists. It starts the last processor in the pipe first and moves to the front.
it ends with calling the onEnd function of each processor front to back.

exameple:
these are example processors which only implement one part of the processor. OnStart only implements the on start function OnEnd only implements the on end function and evaluate only implements the on eval function.

  param() | proc() 
	  >> OnStart([&]  { out << "1\n"; }) 
	  >> OnEnd([&] ( ) { out << "2\n"; }) 
	  >> OnStart([&] {out << "3\n"; }) 
	  >> OnEnd([&]() {out << "4\n"; })
	  >> Evaluate([&] { out << "eval\n"; return 1; });

// "3\n1\neval\n2\n4\n");

with this model, you can buffer the input stream until you reach the end of the stream and then run it in reverse.

I am using the right shift operator to create pipes and then run the pipe by using the | operator. if the input is void I pipe in the param() helper.

  • end pipe early.

Each processor in my pipeline has the possibility to send a stop signal back to the pipeline therefore each operator() of the processor has to return an enumeration which tells success, skip or stop,
image

  • split
    I am not entirely sure I got it correctly but basically, I see two alternatives either you want to process all parts of the split simultaneously.

example:

"a string with multiple words" | magic_split | use_parts

where use_parts has a operator() which takes an "almost" arbitrary amount of input parameters. and it receives

use_parts ("a", "string", "with", "multiple", "words").
which is an exercise in the template of function. I can point you to my implementation.

Or you want to split it by time .

example:
"a string with multiple words" | magic_split2 | use_parts

use_parts("a")
use_parts("string")
use_parts("with")
use_parts("multiple")
use_parts("words")

which would be just calling the output pipe multiple times.

magic_split2 (str , next){

auto sp = split(str);
for (auto s : sp){
 next(s);
}

}
  • fragments

inputs >>= transform(f)

this can be done by building an adapter which is basically a reference to the input and then calling it with a special void input type which just tells the machinery to start working.

auto fragment = ref_input(input);

void_input >>=  fragment >>= Long_and_complicated_pipeline

in my library, I generally do it by separating the creation of a pipeline and its invocation. Therefore I build the pipeline up with the right shift operator and execute it with the bit or operator.

auto x1 = param() | Import_CSV()->fileName("fileNames.txt") >> out_stream();

this example stores the string "fileName.txt" and is executed once "param() | " is applied to it.

  • creating the result of the pipe

for this again I am using the onEnd function. the result of this function will be the result of the entire pipeline. In my implementation, the last something, that is returned from one of the onEnd functions will be the return of the entire pipe.

	auto x1 = param() | Import_CSV()->fileName("fileNames.txt") >> out_stream();

in this example, x1 is a smart pointer to the string streamer which is created in out_stream.

let me know if this is useful for you. I am sorry that the library is not more organized but so far it has only been my personal sandbox. Also i could not use new features since it had to compile with cern root.

Again great talk. Cheers,
Richard

could using macro FWD be removed?

I am investigating integrating pipes into a production code base. One of the concern that came out when reviewing the library is its use/definition of macro FWD. Because it's such a common name, there seems to be a good level of risk that including pipes header files might cause problems in some other code area that's completely unrelated.

It seems most of time it's used, it can be replaced with something like std::forward<T>(value) (or FWD's current definition std::forward<decltype(value)>(value). Could that be done? I could submit a pull request some time later if you are busy.

Thanks.

tee example

something is missing in the tee description - probably a transform and filter statement:

auto const inputs = std::vector{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
auto intermediaryResults = std::vector{};
auto results = std::vector{};

inputs >>= pipes::tee(pipes::push_back(intermediaryResults))
>>= pipes::push_back(results);

// intermediaryResults contains {2, 4, 6, 8, 10, 12, 14, 16, 18, 20}
// results contains {12, 14, 16, 18, 20}

fifo

could this be used to implement a fifo? if so, is this thread-safe or thread-unsafe

error: no viable overloaded '>>='

My simple test code is not compiling:

#include <string>
#include <iostream>

// first problem: WHY DO I NEED OPTIONAL HERE?
#include <optional>

#include "pipes/transform.hpp"
#include "pipes/to_out_stream.hpp"

int main()
{
    std::string test{"ABC"};
    test 
        >>= pipes::transform([](char c){return c + 2; })
        >>= pipes::to_out_stream(std::cout);
}

https://godbolt.org/z/PsM9Yf

For the first problem see #64

For the actual problem:

Compiling this gives an error message:

<source>:15:9: error: no viable overloaded '>>='
        >>= pipes::to_out_stream(std::cout);
        ^   ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1 error generated.
Compiler returned: 1

what am i missing here?

Question: What would be a good demonstration of pipeline stages where each stage is multi-threaded?

Thanks for a great library! It really got me thinking!

I'm working on my own version that has the capability for each pipeline stage to be multi-threaded.

What would be a good demonstration of a pipeline where each stage would be running in its own thread?

I'm hoping it should be very easy to accomplish this capability, I just am looking for something that would show off that capability either in an unusual way or something that couldn't be done easily without multi-threaded pipeline stage capability.

Thanks for any ideas!

add count end pipe

This is an end pipe which captures an integral type and return in that integral type the number of values that has gone through the pipe. Useful for example to filter a vector, override values directly in place and remove the unfiltered values.

    auto v1Andv2 = ranges::views::zip(v1, v2);
    ranges::actions::sort(v1Andv2);
    int count = 0;
    ranges::views::unique(v1Andv2) >>= pipes::fork(pipes::unzip(pipes::override(v1),   //
                                                                pipes::override(v2)),  //
                                                   pipes::for_each([&count](const auto &v) { count++; }));
    v1.erase(std::begin(v1) + count, std::end(v1));
    v2.erase(std::begin(v2) + count, std::end(v2));

Instead of a for_each performing the count, a specific component would be more verbose.

Error in send.hpp "'forward_as_tuple' is not a member of 'std'

Hello I am trying to explore pipes library since last 2 days.
My tool chain is : Qt 5.13.0 with MinGW7.3.0 , CMake 3.15.4
System :- Windows
When I tried to add #include <pipes/pipes.hpp>
It actually gave me parsing error as below

In file included from C:/PROGRA2/pipes/include/pipes/base.hpp:4:0,
from C:/PROGRA
2/pipes/include/pipes/impl/concepts.hpp:7,
from C:/PROGRA2/pipes/include/pipes/adjacent.hpp:4,
from C:/PROGRA
2/pipes/include/pipes/pipes.hpp:4,
from E:\Languages\QT-Programs\Pipes_Cplusplus\PipesInCplusplus\main.cpp:2:
C:/PROGRA2/pipes/include/pipes/send.hpp: In function 'void pipes::send(ValuesThenPipeline&& ...)':
C:/PROGRA
2/pipes/include/pipes/send.hpp:38:50: error: 'forward_as_tuple' is not a member of 'std'
detail::send(detail::send_tag<0>{}, std::forward_as_tuple(FWD(valuesThenPipeline)...), std::make_index_sequence<sizeof...(ValuesThenPipeline) - 1>{});
^~~~~~~~~~~~~~~~
mingw32-make.exe[2]: *** [CMakeFiles\PipesInCplusplus.dir\build.make:77: CMakeFiles/PipesInCplusplus.dir/main.cpp.obj] Error 1
mingw32-make.exe[1]: *** [CMakeFiles\Makefile2:76: CMakeFiles/PipesInCplusplus.dir/all] Error 2
mingw32-make.exe: *** [Makefile:83: all] Error 2

And when I added two header files in send.hpp
#include <stddef.h> & tuple

It solved above error
The stddef.h file was added for size_t and tuple is added for forward_as_tuple function.

I am not sure whether it's a issue or it's a system problem?

[discussion] Implementing `drop_last` for pipes

If a pipe is allowed to store the values locally in a circular queue, it can accumulate the first N values in it, and then release the rest.

Assuming an interface like Boost.CircularBuffer for the circular queue

class drop_last
{
  circular_buffer<std::any> queue_;

public:
  template <typename... Values, typename TailPipeline>
  void onReceive(Values&&... values, TailPipeline&& tailPipeline)
  {
    using DataStore = std::tuple<Values...>;
    if(queue_.full()) {
      send(SOMEHOW_FORWARD_OR_MOVE(std::any_cast<DataStore>(queue_.front()), FWD(tailPipeline));
    }
    queue_.push_back(std::make_any<DataStore>(values...));
  }

  explicit drop_last(std::size_t nbToDrop) queue_(nbToDrop) {}
};

Can it be made more efficient?

  • If the size is 1, we can use std::exchange to efficiently move the values. If the size is constant, we could use a circular buffer on stack (eg: with a backend on std::array instead of std::vector)
  • If the circular buffer implementation supports a std::exchange like operation, queue_.front() and queue_.push_back could be made one single operation
  • If the drop_last was templated instead of onReceive, there would be no need of using std::any

Big issue: SOMEHOW_FORWARD_OR_MOVE

Utility of `funnel` unclear

I don't see any documentation about funnel. It like the library should be implementable without it and that it adds unnecessary syntactic overhead.

Make pipes available on godbolt.org

Hi,

I like to compile my code on godbolt.org since it has different compilers (and versions) I can quickly test with.

I woud like to see pipes on godbolt. Is there any work being done regarding this?

Cheers,
Serkan

pipes::custom not found

I really enjoy the library! Very nice.

I just wondered, where the pipes::custom object is?

fold / reduce missing

Maybe I missed some way to combine things, but to me it looks as if something like fold/reduce is missing.

non exhaustive pipes

It would be really nice if pipes could communicate that they are done. Take for example take, where there is some expensive transformation downstream, it would be sweet if take(n) would only 'trigger' n calculations. So maybe something like a done() method could be used to communicate upstream that the pushing can stop. For example the code for onReceive in transform looks like this.

    template<typename... Values, typename TailPipeline>
    void onReceive(Values&&... values, TailPipeline&& tailPipeline)
    {
        send(detail::invoke(function_.get(), FWD(values)...), tailPipeline);
    }

It does not check whether the tailPipeline wants the result. So if pipes had a way to check, then we could rewrite this as

    template<typename... Values, typename TailPipeline>
    void onReceive(Values&&... values, TailPipeline&& tailPipeline)
    {
        if( not tailPipeline.done() )
        {
            send(detail::invoke(function_.get(), FWD(values)...), tailPipeline);
        } 
        else 
        {
           done_ = true;
        }
    }

Some pipes (e.g. filter) can cause their input value to be moved more than once.

The onReceive member function of the filter pipe forwards their input value both to the predicate_ and to the tailPipeline. This can lead to the value being moved more than once if both take the argument by value or &&.
Some other pipes also forward their input value more than once.

See below for a test you can add to filter to detect the issue.

I think the fix for filter is to not forward to the predicate, as predicates are likely to take their arguments by const&.
For other pipes, you may want to think about where it is more likely that forwarding will serve.

class TwoMoveDetector
{
public:
	TwoMoveDetector() = default;
	TwoMoveDetector(const TwoMoveDetector& other) = default;
	TwoMoveDetector(TwoMoveDetector&& other)
	{
		REQUIRE_FALSE(other.m_movedFrom);
		other.m_movedFrom = true;
	}

private:
	bool m_movedFrom = false;
};

TEST_CASE("filter moves value at most once")
{
    std::vector<TwoMoveDetector> input(1);
    auto const passthrough = pipes::filter([](TwoMoveDetector){return true;});
        
    std::vector<TwoMoveDetector> results;
    std::copy(std::make_move_iterator(begin(input)), std::make_move_iterator(end(input)), passthrough >>= pipes::push_back(results));
}

[review] possible bug

in include/pipes/base.hpp there is
struct pipeline_base : detail::crtp<Derived, pipeline_base>
{
...
Derived& operator++() { return this->derived(); } // should here be ++this->derived();
... };

Clarify the difference between Ranges and Pipes

In .Net (c#) the difference is clear. They have IEnumerable and IObservable. Roughly speaking

Ranges are IEnumerable ( interactive )

and

Pipes are IObservable ( reactive )

The basic interfaces of each type have the same shape with the difference described succinctly.

IEnumerable allows you to write queries over space
IObservable allows you to write queries over time

or another way to see it is that

IEnumerable is pull based. The consumer asks for the next value
IObservable is push based. The consumer waits for the next value

That being said there exists a library call ReactiveExtensions that has a C++ port that follows the same reactive model that Pipes uses. https://github.com/ReactiveX/RxCpp Maybe you can get some inspiration from that code. There are loads of operators for dealing with time that space based query engines such as rangev3 don't need to worry about. async and threads and scheduling etc.

Regards

Brad

Allow pipelines to have a result (sink?) object

I'm wondering if its possible to give pipelines a T sink() && function to return an arbitrary object upon completion. This would solve a number of issues such as allowing to create a collection on a single line, and implementing sink queries like count/contains/find/accumulate. This would require plumbing the return value through all the pipes at the end, as well as having the final pipeline essentially be an accumulator that holds some state.

At a first glance the easiest way to do this would be have the >>= operator for ranges and pipelines return sink() instead of void and adding a sink() overload to the pipelines.

// pipeline_base
auto sink() && { return void_t{}; } //  too bad we can't actually return void
  
// generic_pipeline
auto sink() && { return std::move(tailPipeline_).sink(); }

// fork
auto sink() && { return std::make_tuple(...); }

Then creating a count or to_vector sink pipeline would be fairly straightforward. Could also make a more generic to similar to what's being done with inserter.

template <typename Ele>
class vector_sink: public pipeline_base<vector_sink<T>>
{
public:
    template<typename T>
    void onReceive(T&& value) { vector_.push_back(FWD(value)); }

    auto sink() && { return std::move(vector_); }
    
private:
    std::vector<Ele> vector_;
};

I have no idea if any of the above compiles or their is some fundamental flaw in this approach, if not I can try to come up with a PR to try and fully implement it when I have some time.

Using ptr to member as pipes field ?

Let's say i have the following code:

struct vertex
    {
        transform::position_2d pos{transform::position_2d::scalar(0.f)};
        transform::position_2d texture_pos{transform::position_2d::scalar(0.f)};
        graphics::color pixel_color{graphics::white};
    };

    struct vertex_array
    {
        std::vector<vertex> vertices;
        vertex_geometry_type geometry_type;
    };

// logic code
 for (auto &v : array_cmp.vertices) v.pixel_color = clr_winner;

What i would like to achieve with pipes is smth like:

struct vertex
    {
        transform::position_2d pos{transform::position_2d::scalar(0.f)};
        transform::position_2d texture_pos{transform::position_2d::scalar(0.f)};
        graphics::color pixel_color{graphics::white};
    };

    struct vertex_array
    {
        std::vector<vertex> vertices;
        vertex_geometry_type geometry_type;
    };

//logic code
array_cmp.vertices >>= pipes::member(&geometry::vertex::pixel_color) >>= pipes::fill(clr_winner) >>= pipes::override(array_cmp.vertices);

// or
array_cmp.vertices >>=  pipes::fill(&geometry::vertex::pixel_color, clr_winner) >>= pipes::override(array_cmp.vertices);

Do you think it's will be possible to achieve ?

Add CMake support

It would be good to have CMake support so that users can easily add the library to their projects, tests can be built and run, and Vcpkg/Conan can support it. Ideally one could install the library through CMake and then use it like:

find_package(pipes REQUIRED)
target_link_libraries(my_target PRIVATE joboccara::pipes)

Suggestion: Move dead_end_iterator out of tests

Loved your C++Now talk on this subject.

I don't understand why fluent::dead_end_iterator is implemented (and tested) only in tests.
Why not move tests/dead_end_iterator.hpp to either the root directory, or output?

[New pipes] pipes::values and pipes::keys for std::map ; pipes::erase

Hey,
In the range-v3 library, there are ranges::values and ranges::keys that can be used to only send the keys or the values of a map.
We can use pipes::unzip for that, but it is kinda ugly to write
map >>= pipes::unzip(pipes::dev_null(), /*Do something*/);
So I think pipes::value and pipes::keys would be a useful addition to this library

Edit : Just got a new idea : pipes::erase
It could be used to replace std::erase like that
cont >>= pipes::erase(predicate);

Code formatting

Upload _clang-format or atleast rules to configure formatting tools

Start marking releases

Currently there is no releases for this library (and I assume we are supposed to make a copy of this in our own projects?). It would nice if you can mark releases, this way it can be added to package managers such as Conan.

Create a single pipe for multiple data

I'm trying to produce a pipeline that will operate on multiple data sets.
For example I have a container that contains two vectors. One for rectangles and one for trapezoids.

I can now create a pipeline that operates on just the rectangles and another to operates on the trapezoids.
This method will work but the pipeline needs to be coded twice with the risk of errors.

Would it be possible to define the pipeline just once but iterate over each of the individual entries of both vectors.
So conceptualy something like this:
`
struct Container
{
std::vector rects;
std:vector traps;
};

Container inputs;
Container output;
inputs >>= pipes::transform(scale)
>>= pipes::transform(mirror)
>>= pipes::transform(clip)
>>= pipes::push_back(output);
`
It would also be very nice if the pipeline could change over from one shape type to another.
For example a trapezoid could morph into a rectangle after a certain operation.

Execution policy

This is an enhancement suggestion.

Do you plan to implement any execution policy ?
My underlying question is are you willing to stick to C++14 or do you plan to move on to C++20 ?

I understand that dealing with unsequenced policies may be unseasy according to the library design,
but the following would be nice :

inputs
   >>= pipes::transform(std::execution::par, [](auto arg){ return arg * 2; }) // parallel execution
   >>= pipes::filter([](auto arg){ return arg < 42; })
   >>= pipes::push_back(results)
;

or maybe :

inputs
   >>= pipes::transform<std::execution::parallel_policy>([](auto arg){ return arg * 2; }) // parallel execution
   >>= pipes::filter([](auto arg){ return arg < 42; })
   >>= pipes::push_back(results)
;

Ps : Keep up the good work 👍

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.