Coder Social home page Coder Social logo

ebonnal / streamable Goto Github PK

View Code? Open in Web Editor NEW
5.0 1.0 0.0 3.67 MB

fluent iteration

License: Apache License 2.0

Python 100.00%
data-engineering etl-pipeline etl iterable python3 reverse-etl collections streams fluent-interface immutability iterator iterator-pattern lazy-evaluation method-chaining visitor-pattern

streamable's Introduction

streamable: fluent iteration

Actions Status codecov Actions Status Actions Status Actions Status

1. install

pip install streamable

2. import

from streamable import Stream

3. init

integers: Stream[int] = Stream(lambda: range(10))

Instantiate a Stream[T] by providing a function that returns a fresh Iterable[T] (the data source).

4. operate

odd_integer_strings: Stream[str] = (
    integers
    .filter(lambda n: n % 2)
    .map(str)
)

Stream instances are immutable: operations return a new stream.

Operations are lazy: they do not iterate over the source.

5. iterate

Stream[T] extends Iterable[T] allowing:

>>> list(odd_integer_strings)
['1', '3', '5', '7', '9']
>>> set(odd_integer_strings)
{'9', '1', '5', '3', '7'}
>>> from functools import reduce
>>> from operator import mul
>>> reduce(mul, integers)
945
>>> for odd_integer_string in odd_integer_strings: ...

๐Ÿ“’ Operations

.map

Applies a function on elements.

integer_strings: Stream[str] = integers.map(str)

It has an optional concurrency: int parameter to execute the function concurrently (threads) while preserving the order.

.foreach

Applies a function on elements like .map but yields the elements instead of the results.

printed_integers: Stream[int] = integers.foreach(print)

It has an optional concurrency: int parameter to execute the function concurrently (threads) while preserving the order.

.filter

Filters elements based on a predicate function.

pair_integers: Stream[int] = integers.filter(lambda n: n % 2 == 0)

.group

Groups elements.

parity_groups: Stream[List[int]] = integers.group(size=100, seconds=4, by=lambda i: i % 2)

A group is a list of size elements for which by returns the same value, but it may contain fewer elements in these cases:

  • seconds have elapsed since the last yield of a group
  • upstream is exhausted
  • upstream raises an exception

All the parameters are optional.

.flatten

Ungroups elements assuming that they are Iterables.

integers: Stream[int] = parity_groups.flatten()

It has an optional concurrency parameter to flatten several iterables concurrently (threads).

.slow

Limits the rate at which elements are yielded up to a maximum frequency (elements per second).

slow_integers: Stream[int] = integers.slow(frequency=2)

.catch

Catches exceptions that satisfy a predicate function.

safe_inverse_floats: Stream[float] = (
    integers
    .map(lambda n: 1 / n)
    .catch(lambda ex: isinstance(ex, ZeroDivisionError))
)

It has an optional raise_at_exhaustion parameter to raise the first catched exception when iteration ends.

.observe

Logs the progress of iterations over this stream.

With

observed_slow_integers: Stream[int] = slow_integers.observe(what="integers from 0 to 9")

you should get:

INFO: iteration over 'integers from 0 to 9' will be observed.
INFO: after 0:00:00.000283, 0 error and 1 `integers from 0 to 9` yielded.
INFO: after 0:00:00.501373, 0 error and 2 `integers from 0 to 9` yielded.
INFO: after 0:00:01.501346, 0 error and 4 `integers from 0 to 9` yielded.
INFO: after 0:00:03.500864, 0 error and 8 `integers from 0 to 9` yielded.
INFO: after 0:00:04.500547, 0 error and 10 `integers from 0 to 9` yielded.

The amount of logs will never be overwhelming because they are produced logarithmically e.g. the 11th log will be produced when the iteration reaches the 1024th element.

.limit

Limits the number of elements yielded.

five_first_integers: Stream[int] = integers.limit(count=5)

๐Ÿ“ฆ Notes Box

Extract-Transform-Load

One can leverage this module to write readable custom ETL jobs, especially those dealing with third party APIs.

Check the README dedicated to ETL.

typing

This is a typed module, you can mypy it.

supported Python versions

Compatible with Python 3.7 or newer (unittested for: 3.7.17, 3.8.18, 3.9.18, 3.10.13, 3.11.7, 3.12.1).

go to line

Tip: enclose operations in parentheses to avoid trailing backslashes \.

stream: Stream[str] = (
    Stream(lambda: range(10))
    .map(str)
    .group(2)
    .foreach(print)
    .flatten()
    .filter()
    .catch()
)

functions

The Stream's methods are also exposed as functions:

from streamable.functions import slow

iterator: Iterator[int] = ...
slow_iterator: Iterator[int] = slow(iterator)

set logging level

import logging

logging.getLogger("streamable").setLevel(logging.WARNING)

visitor pattern

The Stream class exposes an .accept method and you can implement a visitor by extending the streamable.visitor.Visitor class:

from streamable.visitor import Visitor

class DepthVisitor(Visitor[int]):
    def visit_stream(self, stream: Stream) -> int:
        if not stream.upstream:
            return 1
        return 1 + stream.upstream.accept(self)

def stream_depth(stream: Stream) -> int:
    return stream.accept(DepthVisitor())
>>> stream_depth(odd_integer_strings)
3

streamable's People

Contributors

ebonnal avatar

Stargazers

Kuba avatar HoLittlePig avatar  avatar Pierre Nodet avatar Laury Lopes avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.