datapipes is an asynchronous multi streaming library.
datapipes encourages to handle multi streamings asynchronously. datapipes has a few objects sparated by its responsibility.
- Source Produces resources and emits the resource to pipe.
- Tube Effector for resources. Processes resource in the middle of pipe.
- Sink Consumer for resources. Do something with processed resources.
- Pipe Resources pass through the pipe. Handles resources asynchronously.
Source
| โ data flow
|
Tube
| pipe is '|'
|
Sink
To handle multi streamings, datapipes offers composabiliy. Source, Tube and Sink are composable individually. So the diagram above will be:
Composed Source works concurrently.
[Source Source Source]
|
| pipe handles asynchronous.
|
Tube
Tube Composed Tube has individual tube in series.
Tube
Tube
|
|
|
[Sink Sink Sink]
Composed Sink works concurrently.
Add this line to your application's Gemfile:
gem 'datapipes'
And then execute:
$ bundle
Or install it yourself as:
$ gem install datapipes
You have to define your own Source, Tube and Sink.
A basic source is list type. it produces a value in several times.
Use produce
method to emit data to pipe.
class List < Datapipes::Source
def run
(1..10).each {|i| produce(i) }
end
end
Next is tube. Tube processes piped data. A example tube recieve Integer value then increase amount of the value.
Define accept?
to recieve the data or skip this.
class Triple < Datapipes::Tube
def run(data)
if accept? data
[data, data, data]
else
data
end
end
def accept?(data)
data.is_a? Integer and data > 3
end
end
Sink consumes piped data. A typical sink is printing data.
class Print < Datapipes::Sink
def run(data)
puts data if accept? data
end
def accept?(data)
data.is_a? Array and data[0] < 7
end
end
You can make your own datapipe with your objects.
datapipe = Datapipes.new(
List.new, # A source
Print.new, # A sink
tube: Triple.new,
)
Then just run everything with run_resource
.
datapipe.run_resource
The output will be:
4
4
4
5
5
5
6
6
6
Congratulation!!
TODO...
- Fork it ( http://github.com/taiki45/datapipes/fork )
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request