Coder Social home page Coder Social logo

csurfer / pypette Goto Github PK

View Code? Open in Web Editor NEW
264.0 11.0 9.0 582 KB

Ridiculously simple flow controller for building complex pipelines

Home Page: https://csurfer.github.io/pypette

License: MIT License

Python 100.00%
python threads multiprocessing multithreading

pypette's Introduction

pypiv pyv Build Status Coverage Status License


pypette (to be read as pipette) is a module which makes building pipelines ridiculously simple, allowing users to control the flow with minimal instructions.

Features

  • Ridiculously simple interface.
  • Ability to view pipeline structure within the comfort of a terminal.
  • Run pipeline in exception resilient way if needed.
  • Create dependencies on pipelines easily.
  • Generate a easy to view/understand report within the comfort of a terminal.

Setup

Using pip

pip install pypette

Directly from the repository

git clone https://github.com/csurfer/pypette.git
python pypette/setup.py install

Documentation

Detailed documentation can be found at https://csurfer.github.io/pypette

Structures

Job

The basic unit of execution, say a python method or a callable.

from pypette import Job

def print_hello():
    print("Hello!")

def print_hello_msg(msg):
    print("Hello " + msg + "!")

# Job without arguments
j1 = Job(print_hello)

# Job with arguments specified as argument list
j2 = Job(print_hello_msg, args=("pypette is simple",))

# Job with arguments specified as key word arguments
j3 = Job(print_hello_msg, kwargs={"msg":"pypette is simple"})

BashJob

The basic unit of execution, which runs a bash command.

from pypette import BashJob

# Job with bash commands
b1 = BashJob(['ls', '-l'])
b2 = BashJob(['pwd'])

Pipe

Structure to specify the flow in which the jobs need to be executed. The whole interface consists of only 4 methods.

from pypette import Pipe

# 1. Create a new Pipe
p = Pipe('TestPipe')

# 2. Add jobs to execute. (Assuming job_list is a list of python/bash jobs)

# To run the jobs in job_list in order one after the other where each job waits
# for the job before it to finish.
p.add_jobs(job_list)

# To run the jobs in job_list parallelly and run the next step only after all
# jobs in job list finish.
p.add_jobs(job_list, run_in_parallel=True)

# Add jobs in a builder format.
p.add_stage(job1).add_stage(job2) # To add jobs in series.
p.add_stage(job1, job2) # To add jobs in parallel.

Building complex pipelines

Jobs submitted to pipeline should be callables i.e. structures which can be run. This means python methods, lambdas etc qualify.

What about Pipe itself?

Of course, it is a callable and you can submit a pipe object to be run along with regular jobs. This way you can build small pipelines which achieve a specific task and then combine them to create more complex pipelines.

from pypette import BashJob, Job, Pipe

def welcome():
    print("Welcome user!")

def havefun():
    print("Have fun!")

def goodbye():
    print("Goodbye!")

# Build a simple pipeline
p1 = Pipe('Fun')
p1.add_jobs([
    Job(havefun),
])

# Include simple pipeline into a complicated pipeline
p2 = Pipe('Overall')
p2.add_jobs([
    Job(welcome),
    p1,
    Job(goodbye),
    BashJob(['ls', '-l']),
    BashJob(['pwd'])
])

p2.run() # This first runs welcome, then runs p1 pipeline then runs goodbye.

Example pipeline

An example pipeline and its code are included in examples folder.

Visualizing the pipeline using graph()

Pipeline objects have a method called graph() which helps visualize the pipeline within the comfort of your terminal. The graph is recursive in nature and it visualizes everything that will be run if we call run() on the pipe object.

Visualizing the top-level pipeline in examples/basic.py led to the following visualization.

Running the entire pipeline.

The only thing you need to do at this point to run the entire pipeline is to call run() on your pipeline object.

Reporting the entire pipeline.

The only thing you need to do at this point to get the report of entire pipeline is to call report() on your pipeline object.

Contributing

Bug Reports and Feature Requests

Please use issue tracker for reporting bugs or feature requests.

Development

  1. Checkout the repository.
  2. Make your changes and add/update relavent tests.
  3. Install poetry using pip install poetry.
  4. Run poetry install to create project's virtual environment.
  5. Run tests using poetry run tox (Any python versions which you don't have checked out will fail this). Fix failing tests and repeat.
  6. Make documentation changes that are relavant.
  7. Install pre-commit using pip install pre-commit and run pre-commit run --all-files to do lint checks.
  8. Generate documentation using poetry run sphinx-build -b html docs/ docs/_build/html.
  9. Generate requirements.txt for automated testing using poetry export --dev --without-hashes -f requirements.txt > requirements.txt.
  10. Commit the changes and raise a pull request.

Buy the developer a cup of coffee!

If you found the utility helpful you can buy me a cup of coffee using

Donate

pypette's People

Contributors

csurfer avatar dependabot[bot] avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pypette's Issues

Job should accept bound methods, too

Pypyette Jobs currently insist that their function be an actual FunctionType object. However, class methods are perfectly good callables that work just fine if you lie to a Job about what they are:

from pypette import Job, Pipe

class Foo:
    def method(self):
        print('In method')

f = Foo()
p1 = Pipe('Method pipe is sad')
p1.add_jobs([Job(f.method)])  # Fails: AssertionError: Python function expected

p2 = Pipe('Function pipe is happy')
p2.add_jobs([Job(lambda: f.method())])
p2.run()

That's a problem, though, because

  • lambda is kind of an ugly hack
  • it throws away information about the method (like its name, so that .graph() displays just <lambda> instead), and
  • it doesn't seem actually functionally necessary.

Please broaden Job's typechecking to accept methods, too. Thanks!

Job results to be pass across pipeline

Hi, one thing I want to see in this awesome lib is the ability to pass data or job results across the pipeline.

As wikipedia stands as a definition of pipeline is:

In computing, a pipeline is a set of data processing elements connected in series, where the output of one element is the input of the next one

Maybe I could help with this feature if you agree...

Pipes should fail after an exception

In my opinion, pipelines should fail when a Job's function raises an exception. For example, I imagine a Pipe with [Job(build_a_thing), Job(test_a_thing), Job(deploy_a_thing)]. If the test fails, we should never get to the deploy step. That's not now it works now, though:

from pypette import Pipe, Job

def func1():
    1/0

def func2():
    print('in func2')

pipe = Pipe('Simple pipe')
pipe.add_jobs([
    Job(func1),
    Job(func2),
])

pipe.run()

In this case, func2 will be executed even though func1 explodes. If the two run in parallel, then func2 should be allowed to finish (that is, don't say "oh, func1 died! time to kill all the other Jobs!"). But given this scenario:

def func3():
    print('in func3')

second_pipe = Pipe('Simple pipe')
second_pipe.add_jobs([
    Job(func1),
    Job(func2),
], run_in_parallel=True)
second_pipe.add_jobs([Job(func3)])

pipe.run()

I don't think func3 should ever be called because one of the Jobs in the first step of the pipeline failed.

Job dependency a little complex

@csurfer thanks for your codes, after read the README docs, i found it's a good project.

However, i think it can be better, such as job dependency is a little complex, i have ever try Airlfow, it's job dependency describe is interesting, in airflow, you can define the dependency like this:

job1 << job2  # for job1 depend job2
job3 >> job4  # for job4 depend job3

maybe it can be implemented in this project. ๐Ÿ˜„

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.