Coder Social home page Coder Social logo

pipd's Introduction

⚙️ Install

pip install pipd

Basics

Create a pipeline

from pipd import Pipe
pipe = Pipe(1, 2, 3, 4, 5)
# or
pipe = Pipe([1, 2, 3, 4, 5])

Chaining (dot-based)

pipe = Pipe(1, 2, 3, 4, 5).map(lambda x: x + 1).map(lambda x: x * 2)
list(pipe) == [4, 6, 8, 10, 12]

Chaining (pipe-based)

from pipd import Pipe, Map
pipe = Pipe(1, 2, 3, 4, 5) | Map(lambda x: x + 1) | Map(lambda x: x * 2)
list(pipe) == [4, 6, 8, 10, 12]

Use a meta pipeline (i.e. functional only)

pipe = Pipe.map(lambda x: x + 1).map(lambda x: x * 2)
# or
pipe = Map(lambda x: x + 1) | Map(lambda x: x * 2)

list(pipe([1, 2, 3, 4, 5])) == [4, 6, 8, 10, 12]

Iterate over the pipeline

for item in pipe:
    # do something with item

Iterate over the pipeline one item at a time

it = iter(pipe)
next(it)
next(it)

Functions

map

from pipd import Pipe

pipe = Pipe(1, 2, 3).map(lambda x: x * 2)
list(pipe) == [2, 4, 6]

Map items to parallel workers

from pipd import Pipe

pipe = Pipe(1, 2, 3).map(lambda x: x * 2, num_workers=2) # parallel map (note: order is not guaranteed)
list(pipe) == [2, 4, 6]

filter

from pipd import Pipe

pipe = Pipe(1, 2, 3).filter(lambda x: x != 1)
list(pipe) == [2, 3]

side

Applies a function on each item in the pipeline without changing the item, useful for logging, saving state, etc.

from pipd import Pipe

pipe = Pipe(1, 2, 3).side(lambda x: print('side', x))
it = iter(pipe)
print(next(it))
print(next(it))
Show output
side 1
1
side 2
2

batch

from pipd import Pipe

pipe = Pipe(1, 2, 3, 4, 5).batch(2)
list(pipe) == [[1, 2], [3, 4], [5]]

unbatch

from pipd import Pipe

pipe = Pipe([1, 2], [3], [4, 5]).unbatch()
list(pipe) == [1, 2, 3, 4, 5]

log

from pipd import Pipe

pipe = Pipe(range(10)).log()
list(pipe) # runs the pipeline
Show output
0
1
2
3
4
5
6
7
8
9

limit

from pipd import Pipe

pipe = Pipe(range(10)).limit(5).log()
list(pipe) == [0, 1, 2, 3, 4]

repeat

from pipd import Pipe

pipe = Pipe(1, 2, 3).repeat(2)
list(pipe) == [1, 2, 3, 1, 2, 3]

sleep

Useful for debugging a pipeline that runs too fast.

from pipd import Pipe

pipe = Pipe(range(5)).sleep(0.1).log()
list(pipe) # runs the pipeline
Show output
0 # sleep for 0.1 seconds
1 # sleep for 0.1 seconds
2 # sleep for 0.1 seconds
3 # sleep for 0.1 seconds
4 # sleep for 0.1 seconds

tqdm

Shows a progress bar for the pipeline, useful to check how many s/it a pipeline is running at. Requires tqdm to be installed.

from pipd import Pipe

pipe = Pipe(range(5)).sleep(1).tqdm()
print(list(pipe))
Show output
0it [00:00, ?it/s]
1it [00:01,  1.01s/it]
2it [00:02,  1.01s/it]
3it [00:03,  1.01s/it]
4it [00:04,  1.00s/it]
5it [00:05,  1.01s/it]

[0, 1, 2, 3, 4]

mix

Mixes iterators or pipes together, either interleaved or random.

from pipd import Pipe, Mix

pipe = Pipe.mix([1, 2, 3], ['a', 'b', 'c'])
# or
pipe = Mix([1, 2, 3], ['a', 'b', 'c'])
list(pipe) == [1, 'a', 2, 'b', 3, 'c']
from pipd import Pipe

pipe = Pipe.mix(
    Pipe(1, 2, 3),
    Pipe('a', 'b', 'c'),
    random=True # randomize from which iterator/pipe to take the next item
)
list(pipe) == [1, 2, 'a', 'b', 3, 'c']

map_key

from pipd import Pipe

pipe = Pipe([{'a': 1, 'b': 2}, {'a': 2, 'b': 4}]).map_key('a', lambda x: x + 1)
list(pipe) == [{'a': 2, 'b': 2}, {'a': 3, 'b': 4}]

shuffle

from pipd import Pipe

pipe = Pipe(range(10)).shuffle(size=5) # Shuffle buffer has size 5
list(pipe) == [3, 5, 6, 0, 2, 4, 1, 7, 8, 9]

read_files

from pipd import Pipe

pipe = Pipe(['*.md']).read_files()
list(pipe) == ['README.md']

read_lines

from pipd import Pipe

pipe = Pipe(['.gitignore', 'setup.py']).read_lines()
list(pipe)
Show output
['__pycache__',
 '.mypy_cache',
 '.DS_Store',
 'TODO.md',
 '*.ipynb',
 'from setuptools import find_packages, setup',
 '',
 'setup(',
 'name="pipd",',
 'packages=find_packages(exclude=[]),',
 'version="0.2.1",',
 'description="Utility functions for python data pipelines.",',
 'long_description_content_type="text/markdown",',
 'author="ElevenLabs",',
 'url="https://github.com/elevenlabs/pipd",',
 'keywords=["data processing", "pipeline"],',
 'install_requires=[],',
 'classifiers=[],',
 ')']

write_lines

from pipd import Pipe

pipe = Pipe([0, 1, 2, 3]).write_lines('test.txt')
list(pipe) == [0, 1, 2, 3]
test.txt
0
1
2
3

filter_cached

Saves items to cache filepath such that once the pipeline is run again, the items are filtered out. The optional key function can be used to specify a custom key for the cache.

from pipd import Pipe

pipe = Pipe(range(5)).filter_cached(filepath='./cache.txt', key=lambda x: x)

list(pipe) == [0, 1, 2, 3, 4]
list(pipe) == []
cache.txt
0
1
2
3
4

read_csv

write_csv

Create custom Pipe object

from pipd import Pipe

class PlusOne(Pipe):

    def __call__(self, items):
        for item in items:
            yield item + 1

# Usage

pipe = Pipe([0,1,2]).plus_one()
list(pipe) == [1,2,3]

# or

pipe = Pipe([0,1,2]) | PlusOne()
list(pipe) == [1,2,3]

# or

pipe = PlusOne()
list(pipe([0,1,2])) == [1,2,3]

# multi

pipe = Pipe([0,1,2]).plus_one().plus_one()
list(pipe) == [2,3,4]

pipd's People

Contributors

flavioschneider avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pipd's Issues

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.