Coder Social home page Coder Social logo

bytewax / bytewax Goto Github PK

View Code? Open in Web Editor NEW
1.2K 13.0 52.0 10.28 MB

Python Stream Processing

Home Page: https://docs.bytewax.io/

License: Apache License 2.0

Dockerfile 0.17% Shell 0.07% Python 56.68% Rust 42.16% Just 0.93%
python stream-processing rust data-engineering data-processing data-science dataflow machine-learning streaming-data

bytewax's Introduction

Actions Status PyPI Bytewax User Guide

Bytewax

Python Stateful Stream Processing Framework

Bytewax is a Python framework that simplifies event and stream processing. Because Bytewax couples the stream and event processing capabilities of Flink, Spark, and Kafka Streams with the friendly and familiar interface of Python, you can re-use the Python libraries you already know and love. Connect data sources, run stateful transformations and write to various different downstream systems with built-in connectors or existing Python libraries. Bytewax Dataflow Animation

How it all works

Bytewax is a Python framework and Rust distributed processing engine that uses a dataflow computational model to provide parallelizable stream processing and event processing capabilities similar to Flink, Spark, and Kafka Streams. You can use Bytewax for a variety of workloads from moving data à la Kafka Connect style all the way to advanced online machine learning workloads. Bytewax is not limited to streaming applications but excels anywhere that data can be distributed at the input and output.

Bytewax has an accompanying command line interface, waxctl, which supports the deployment of dataflows on cloud servers or Kubernetes. You can download it here.


Getting Started with Bytewax

pip install bytewax

Install waxctl

Dataflow, Input and Operators

A Bytewax dataflow is Python code that will represent an input, a series of processing steps, and an output. The inputs could range from a Kafka stream to a WebSocket and the outputs could vary from a data lake to a key-value store.

import json

from bytewax import operators as op
from bytewax.connectors.kafka import operators as kop
from bytewax.dataflow import Dataflow

Bytewax has input and output helpers for common input and output data sources but you can also create your own with the Sink and Source API.

At a high-level, the dataflow compute model is one in which a program execution is conceptualized as data flowing through a series of operator-based steps. Operators like map and filter are the processing primitives of Bytewax. Each of them gives you a “shape” of data transformation, and you give them regular Python functions to customize them to a specific task you need. See the documentation for a list of the available operators

BROKERS = ["localhost:19092"]
IN_TOPICS = ["in_topic"]
OUT_TOPIC = "out_topic"
ERR_TOPIC = "errors"


def deserialize(kafka_message):
    return json.loads(kafka_message.value)


def anonymize_email(event_data):
    event_data["email"] = "@".join(["******", event_data["email"].split("@")[-1]])
    return event_data


def remove_bytewax(event_data):
    return "bytewax" not in event_data["email"]


flow = Dataflow("kafka_in_out")
stream = kop.input("inp", flow, brokers=BROKERS, topics=IN_TOPICS)
# we can inspect the stream coming from the kafka topic to view the items within on std out for debugging
op.inspect("inspect-oks", stream.oks)
# we can also inspect kafka errors as a separate stream and raise an exception when one is encountered
errs = op.inspect("errors", stream.errs).then(op.raises, "crash-on-err")
deser_msgs = op.map("deserialize", stream.oks, deserialize)
anon_msgs = op.map("anon", deser_msgs, anonymize_email)
filtered_msgs = op.filter("filter_employees", anon_msgs, remove_bytewax)
processed = op.map("map", anon_msgs, lambda m: KafkaSinkMessage(None, json.dumps(m)))
# and finally output the cleaned data to a new topic
kop.output("out1", processed, brokers=BROKERS, topic=OUT_TOPIC)

Windowing, Reducing and Aggregating

Bytewax is a stateful stream processing framework, which means that some operations remember information across multiple events. Windows and aggregations are also stateful, and can be reconstructed in the event of failure. Bytewax can be configured with different state recovery mechanisms to durably persist state in order to recover from failure.

There are multiple stateful operators available like reduce, stateful_map and fold_window. The complete list can be found in the API documentation for all operators. Below we use the fold_window operator with a tumbling window based on system time to gather events and calculate the number of times events have occurred on a per-user basis.

from datetime import datetime, timedelta, timezone

from bytewax.dataflow import Dataflow
import bytewax.operators.window as win
from bytewax.operators.window import EventClock, TumblingWindower
from bytewax.testing import TestingSource

flow = Dataflow("window_eg")

src = [
    {"user_id": "123", "value": 5, "time": "2023-1-1T00:00:00Z"},
    {"user_id": "123", "value": 7, "time": "2023-1-1T00:00:01Z"},
    {"user_id": "123", "value": 2, "time": "2023-1-1T00:00:07Z"},
]
inp = op.input("inp", flow, TestingSource(src))
keyed_inp = op.key_on("keyed_inp", inp, lambda x: x["user_id"])


# This function instructs the event clock on how to retrieve the
# event's datetime from the input.
# Note that the datetime MUST be UTC. If the datetime is using a different
# representation, we would have to convert it here.
def get_event_time(event):
    return datetime.fromisoformat(event["time"])


# Configure the `fold_window` operator to use the event time.
clock = EventClock(get_event_time, wait_for_system_duration=timedelta(seconds=10))

# And a 5 seconds tumbling window
align_to = datetime(2023, 1, 1, tzinfo=timezone.utc)
windower = TumblingWindower(align_to=align_to, length=timedelta(seconds=5))

five_sec_buckets_win_out = win.collect_window(
    "five_sec_buckets", keyed_inp, clock, windower
)


def calc_avg(bucket):
    values = [event["value"] for event in bucket]
    if len(values) > 0:
        return sum(values) / len(values)
    else:
        return None


five_sec_avgs = op.map_value("avg_in_bucket", five_sec_buckets_win_out.down, calc_avg)

Merges and Joins

Merging or Joining multiple input streams is a common task for stream processing, Bytewax enables different types of joins to facilitate different patters.

Merging Streams

Merging streams is like concatenating, there is no logic and the resulting stream will potentially include heterogeneous records.

from bytewax import operators as op

from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource

flow = Dataflow("merge")

src_1 = [
    {"user_id": "123", "name": "Bumble"},
]
inp1 = op.input("inp1", flow, TestingSource(src_1))

src_2 = [
    {"user_id": "123", "email": "[email protected]"},
    {"user_id": "456", "email": "[email protected]"},
]
inp2 = op.input("inp2", flow, TestingSource(src_2))
merged_stream = op.merge("merge", inp1, inp2)
op.inspect("debug", merged_stream)
Joining Streams

Joining streams is different than merging because it uses logic to join the records in the streams together. The joins in Bytewax can be running or not. A regular join in streaming is more closely related to an inner join in SQL in that the dataflow will emit data downstream from a join when all of the sides of the join have matched on the key.

from bytewax import operators as op

from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource

flow = Dataflow("join")

src_1 = [
    {"user_id": "123", "name": "Bumble"},
]
inp1 = op.input("inp1", flow, TestingSource(src_1))
keyed_inp_1 = op.key_on("key_stream_1", inp1, lambda x: x["user_id"])
src_2 = [
    {"user_id": "123", "email": "[email protected]"},
    {"user_id": "456", "email": "[email protected]"},
]
inp2 = op.input("inp2", flow, TestingSource(src_2))
keyed_inp_2 = op.key_on("key_stream_2", inp2, lambda x: x["user_id"])

merged_stream = op.join("join", keyed_inp_1, keyed_inp_2)
op.inspect("debug", merged_stream)

Output

Output in Bytewax is described as a sink and these are grouped into connectors. There are a number of basic connectors in the bytewax repo to help you during development. In addition to the built-in connectors, it is possible to use the input and output API to build a custom sink and source. There is also a hub for connectors built by the community, partners and Bytewax. Below is an example of a custom connector for Postgres using the psycopg2 library.

% skip: next

import psycopg2

from bytewax import operators as op
from bytewax.outputs import FixedPartitionedSink, StatefulSinkPartition


class PsqlSink(StatefulSinkPartition):
    def __init__(self):
        self.conn = psycopg2.connect("dbname=website user=bytewax")
        self.conn.set_session(autocommit=True)
        self.cur = self.conn.cursor()

    def write_batch(self, values):
        query_string = """
            INSERT INTO events (user_id, data)
            VALUES (%s, %s)
            ON CONFLICT (user_id)
            DO UPDATE SET data = %s;
        """
        self.cur.execute_values(query_string, values)

    def snapshot(self):
        pass

    def close(self):
        self.conn.close()


class PsqlOutput(FixedPartitionedSink):
    def list_parts(self):
        return ["single"]

    def build_part(step_id, for_part, resume_state):
        return PsqlSink()

Execution

Bytewax dataflows can be executed in a single Python process, or on multiple processes on multiple hosts with multiple worker threads. When processing data in a distributed fashion, Bytewax uses routing keys to ensure your state is updated in a correct way automatically.

# a single worker locally
python -m bytewax.run my_dataflow:flow

# Start two worker threads in a single process.
python -m bytewax.run my_dataflow -w 2

# Start a process on two separate machines to form a Bytewax cluster.
# Start the first process with two worker threads on `machine_one`.
machine_one$ python -m bytewax.run my_dataflow -w 2 -i0 -a "machine_one:2101;machine_two:2101"

# Then start the second process with three worker threads on `machine_two`.
machine_two$ python -m bytewax.run my_dataflow -w 3 -i1 -a "machine_one:2101;machine_two:2101"

It can also be run in a Docker container as described further in the documentation.

Kubernetes

The recommended way to run dataflows at scale is to leverage the kubernetes ecosystem. To help manage deployment, we built waxctl, which allows you to easily deploy dataflows that will run at huge scale across multiple compute nodes.

waxctl df deploy my_dataflow.py --name my-dataflow

Why Bytewax?

At a high level, Bytewax provides a few major benefits:

  • The operators in Bytewax are largely “data-parallel”, meaning they can operate on independent parts of the data concurrently.
  • Bytewax offers the ability to express higher-level control constructs, like iteration.
  • Bytewax allows you to develop and run your code locally, and then easily scale that code to multiple workers or processes without changes.
  • Bytewax can be used in both a streaming and batch context
  • Ability to leverage the Python ecosystem directly

Community

Slack is the main forum for communication and discussion.

GitHub Issues is reserved only for actual issues. Please use the community Slack for discussions.

Code of Conduct

More Examples

For a more complete example, and documentation on the available operators, check out the User Guide and the /examples folder.

License

Bytewax is licensed under the Apache-2.0 license.

Contributing

Contributions are welcome! This community and project would not be what it is without the contributors. All contributions, from bug reports to new features, are welcome and encouraged.

Please view the Contribution Guide for how to get started.



With ❤️ Bytewax

bytewax's People

Contributors

awmatheson avatar blakestier avatar cra avatar davidselassie avatar galenward avatar jhanninen avatar jonasbest avatar kasun avatar konradsienkowski avatar lfunderburk avatar miccioest avatar mttcnnff avatar piepra avatar psykopear avatar rabernat avatar rcdewit avatar thebits avatar whoahbot avatar zzl221000 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

bytewax's Issues

[FEATURE] Apache Pulsar Connector

Is your feature request related to a problem? Please describe.
Add Pulsar as a Source. All the feature of Kafka. Pulsar also supports AMQP, MQTT, WebSockets, and Kafka as protocols.

Describe the solution you'd like
Add Pulsar the same as Kafka

Describe alternatives you've considered
We can use Pulsar's other protocols, but native is the fastest and easiest.

Additional context
Contact me for assistance. https://github.com/tspannhw/SpeakerProfile

[FEATURE] Work with Jupyter notebook?

Bug Description

I tried to run the simple examples from a jupyter notebook but got the following error message.
I didn't have time to look through the source code, but i assume this is some missing default.


Exception Traceback (most recent call last)
in
30 flow.inspect(peek)
31
---> 32 ec.build_and_run()

Exception: Unrecognized option: 'm'

image

Python version (python -V)

3.8

Bytewax version (pip list | grep bytewax)

0.7.0

Relevant log output

---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-1-ded9d2dc59fa> in <module>
     30 flow.inspect(peek)
     31 
---> 32 ec.build_and_run()

Exception: Unrecognized option: 'm'

Steps to Reproduce

Start a jupyter notebook
Run the basic.py code in a notebook, removing the 'if main' condition.

KafkaInputConfig with TLS

Allow KafkaInputConfig to connect to TLS enabled Kafka clusters.

Args
tls-enabled
    Enable TLS for the Kafka API (not necessary if specifying custom certs)
tls-cert
    The certificate to be used for TLS authentication with the broker
tls-key
    The certificate key to be used for TLS authentication with the broker
tls-truststore
    The truststore to be used for TLS communication with the broker

Deadlocks when raising Python logging level

Bug Description

Increasing the default Python logging level causes deadlocks when executing dataflows.

My hunch is this has to do with pyo3-log causes all Rust debug! calls to thread through to Python logging, so any of our Rust code and libraries we call into that spawn background threads might be trying to obtain the GIL on those threads. That is one issue I've run into in the past, but might not be the case again!

If it is logging and the GIL, we might need to thoroughly diagram out our GIL boundaries in our worker threads so we can solve this more generically.

Python version (python -V)

Python 3.10.4

Bytewax version (pip list | grep bytewax)

0.10.0

Relevant log output

No response

Steps to Reproduce

Run pytest --log-level=DEBUG and you'll get deadlocks.

Python code formatting

Is your feature request related to a problem? Please describe.
I see some code format inconsistencies in Python source files. Is the team favourable towards enforcing a styling guide for Python code so future contributions from multiple contributors can follow a single style guide easily?

Describe the solution you'd like

  • python-black
    I see that we are using python-black.
Skipping .ipynb files as Jupyter dependencies are not installed.
You can fix this by running ``pip install black[jupyter]``
All done! ✨ 🍰 ✨
37 files left unchanged.

Shall we introduce pre-commit [1] tool to the project so we can run python-black at git commits?

  • python-isort
    I see some minor import statement inconsistencies and these can be easily fixed using python-isort [2]
    isort can automatically sort different import types into groups ( standard lib, third party, byteswax local imports ) and also sort imports alphabetically so it's easier when reading import statements.

  • pep8

pytests/test_inputs.py:5:1: F401 'bytewax.Dataflow' imported but unused
pytests/test_inputs.py:5:1: F401 'bytewax.Emit' imported but unused
pytests/test_inputs.py:5:1: F401 'bytewax.run' imported but unused
pytests/test_operators.py:1:1: F401 'os' imported but unused
pytests/test_operators.py:7:1: F401 'pytest.mark' imported but unused
pytests/test_execution.py:3:1: F401 'threading' imported but unused
pytests/test_execution.py:62:89: E501 line too long (128 > 88 characters)
pytests/test_execution.py:82:89: E501 line too long (138 > 88 characters)
pytests/test_execution.py:118:89: E501 line too long (138 > 88 characters)
pytests/test_execution.py:159:89: E501 line too long (138 > 88 characters)

We can use a tool like flake8 to make sure our code complies with pep8.

[1] https://pre-commit.com/
[2] https://github.com/PyCQA/isort

Describe alternatives you've considered

Additional context
If the team agree to this we can further discuss the rules we need to follow and I would be happy to submit a PR for this.

Logging in dataflow functions

Bug Description

The bytes in the Kafka topic I am trying to read has proprietary serialization, the python logging I added into a function called by flow.map() does not log out to stdout, for example:

import logging

logger = logging.getLogger(__name__)

...

def deserialize(key_bytes__payload_bytes):
    key_bytes, payload_bytes = key_bytes__payload_bytes
   
   # this doesn't log to stdout
    logger.info(type(key_bytes))
    
    key = json.loads(key_bytes.decode("utf-8")) if key_bytes else None
    payload = json.loads(payload_bytes.decode("utf-8")) if payload_bytes else None
    return key, payload

...

How can I go about enabling the logging within the python functions in the dataflow?

Python version (python -V)

3.9.7

Bytewax version (pip list | grep bytewax)

0.11.0

Relevant log output

No response

Steps to Reproduce

Run the following dataflow:

import logging

logger = logging.getLogger(__name__)

flow = Dataflow()
flow.input(
    "input",
    KafkaInputConfig(
        brokers=[f"{os.environ['bootstrap_servers']}"],
        topic=f"{os.environ['topic']}",
        starting_offset="beginning",
        tail=False,
        **additional_configs,
    ),
)

def deserialize(key_bytes__payload_bytes):
    key_bytes, payload_bytes = key_bytes__payload_bytes
   
   # this doesn't log to stdout
    logger.info(type(key_bytes))
    
    key = json.loads(key_bytes.decode("utf-8")) if key_bytes else None
    payload = json.loads(payload_bytes.decode("utf-8")) if payload_bytes else None
    return key, payload


flow.map(deserialize)

flow.capture(StdOutputConfig())

run_main(flow)

Multi-worker stateful dataflow crash recovery

We have some in-progress bare-bones state recovery in #73 and #81, although it is currently not released as of v0.9.0. That allows a running Bytewax dataflow to persist the state in stateful_map and reduce operators every epoch into either Kafka or Sqlite as backing storage, and to resume execution of the dataflow automatically from the epoch it failed on. Unfortunately, resuming the dataflow does not work correctly with multi-worker execution.

Work on getting crash recovery working in multiple-worker environments.

[FEATURE] More types of windowing

Is your feature request related to a problem? Please describe.

Tumbling windows are only one sort of window. We need additional window types, for example:

  • Hopping window #172
  • Sliding window #173
  • Session window #174

These window types are as described here: https://learn.microsoft.com/en-us/azure/stream-analytics/stream-analytics-window-functions

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

Include `additional_properties` option for KafkaRecoveryConfig

Is your feature request related to a problem? Please describe.
I want to store the state on a Kafka broker that is reachable only through SASL_SSL authentication. Currently the KafkaRecoveryConfig only allows brokers that are reachable without authentication.

Describe the solution you'd like
Similar to the KafkaIn/Outputconfig I'd like to be able to include additional properties so I can define SASL username, password, mechanism, etc.

Describe alternatives you've considered
Alternative is to spin up a broker separate from my datasource/sink broker and available without authentication. In our case that option is however unfeasible and would lead to additional overhead that needs to be managed.

Convert window tests to use event time

Bug Description

As alluded to in this comment #137 (comment), system time unit tests are very finicky to setup and aren't working yet for two reasons:

  1. Part of system time windowing is operators scheduling wake ups. The order of those wake ups affects behavior (that's ok, because the desired behavior is based on time). We currently mock out Clock in the WindowLogic via TestingClock to allow us to arbitrarily change the time in a deterministic manner, but we don't do the same for the time internally to StatefulUnary used to schedule the wake ups. Thus when Timely decides to wake up the operator and we decide "wakeup time is past: run the logic!" is based on an inconsistent sense of system time compared to the windowing system.

  2. Even if we could synchronize both times to a TestingClock, the deterministic modification of that clock via clock.now = whatever is very challenging to schedule at just the right moment. Timely is a dataflow framework, so if you try to do something out-of-band (like modifying a shared global state of the current time), you have to really understand the behavior of the Timely scheduler and activators to know when code will run. This means that unit tests that change the clock will have to rely on very specific activation orders and are thus hard to write.

For example, I wrote the following test with the following input:

    def gen():
        clock.now = start_at  # +0 sec; reset on recover
        yield {"user": "a", "type": "login"}  # Epoch 0
        clock.now += timedelta(seconds=4)  # +4 sec
        yield {"user": "a", "type": "post"}  # Epoch 1
        clock.now += timedelta(seconds=4)  # +8 sec
        yield {"user": "a", "type": "post"}  # Epoch 2, Instant A
        clock.now += timedelta(seconds=4)  # +12 sec
        # First 10 sec window closes during processing this input.
        # Instant B
        yield {"user": "b", "type": "login"}  # Epoch 3, 
        yield "BOOM"  # Epoch 4
        clock.now += timedelta(seconds=4)  # +16 sec
        yield {"user": "a", "type": "post"}
        clock.now += timedelta(seconds=4)  # +20 sec
        # Second 10 sec window closes during processing this input.
        yield {"user": "b", "type": "post"}
        clock.now += timedelta(seconds=4)  # +24 sec
        yield {"user": "b", "type": "post"}
        clock.now += timedelta(seconds=4)  # +28 sec

Note that instant A and instant B are different times, and Timely could decide to run operators in these interleaving spaces. And since the order of events affects system time, you need to know the intricacies of the scheduler to make this test work.

Personally, I think the best way to deal with this is to minimize reliance on system time in unit tests. We should write as many of our unit tests as possible using just event time and infinite late windows (since that also uses system time), since that means that the ordering of the input is the only determinant of the output. In very specific instances where we need to test a system time thing, we should be careful about it and perhaps find a way to test the component Rust-side at a lower level.

Python version (python -V)

Python 3.10.4

Bytewax version (pip list | grep bytewax)

bytewax 0.11.2

Relevant log output

No response

Steps to Reproduce

See example above. Or run the test in

def test_fold_window(recovery_config):
start_at = datetime(2022, 1, 1, tzinfo=timezone.utc)
clock = TestingClock(start_at)
flow = Dataflow()
def gen():
clock.now = start_at # +0 sec; reset on recover
yield {"user": "a", "type": "login"}
clock.now += timedelta(seconds=4) # +4 sec
yield {"user": "a", "type": "post"}
clock.now += timedelta(seconds=4) # +8 sec
yield {"user": "a", "type": "post"}
clock.now += timedelta(seconds=4) # +12 sec
# First 10 sec window closes during processing this input.
yield {"user": "b", "type": "login"}
yield "BOOM"
clock.now += timedelta(seconds=4) # +16 sec
yield {"user": "a", "type": "post"}
clock.now += timedelta(seconds=4) # +20 sec
# Second 10 sec window closes during processing this input.
yield {"user": "b", "type": "post"}
clock.now += timedelta(seconds=4) # +24 sec
yield {"user": "b", "type": "post"}
clock.now += timedelta(seconds=4) # +28 sec
flow.input("inp", TestingBuilderInputConfig(gen))
armed = Event()
armed.set()
def trigger(item):
if item == "BOOM":
if armed.is_set():
raise RuntimeError("BOOM")
else:
return []
else:
return [item]
flow.flat_map(trigger)
def key_off_user(event):
return (event["user"], event["type"])
flow.map(key_off_user)
clock_config = TestingClockConfig(clock)
window_config = TumblingWindowConfig(
length=timedelta(seconds=10), start_at=start_at
)
def count(counts, typ):
if typ not in counts:
counts[typ] = 0
counts[typ] += 1
return counts
flow.fold_window("count", clock_config, window_config, dict, count)
out = []
flow.capture(TestingOutputConfig(out))
with raises(RuntimeError):
run_main(flow, epoch_config=epoch_config, recovery_config=recovery_config)
assert out == [
("a", {"login": 1, "post": 2}),
]
# Disable bomb
armed.clear()
out.clear()
# Recover
run_main(flow, epoch_config=epoch_config, recovery_config=recovery_config)
assert out == [
("b", {"login": 1}),
("a", {"post": 1}),
("b", {"post": 2}),
]
.

Install the latest version of bytewax with pip.

Bug Description

I am unable to install the latest version of bytewax with pip.

When I have to install bytewax with pip install bytewax, the version it installs is 0.11.0

I tried to force version 0.16.1 manually and got the following error:

ERROR: Could not find a version that satisfies the requirement bytewax==0.16.1 (from versions: 0.6.1, 0.7.0, 0.7.1, 0.8.0b0, 0.8.0b1, 0.8.0b2, 0.8.0, 0.9.0, 0.10.0, 0.10.1, 0.11.0)
ERROR: No matching distribution found for bytewax==0.16.1

I tried using another environment with python 3.8.10 and got the same problem.

pip version: 23.1.2

Python version (python -V)

Python 3.10.11

Bytewax version (pip list | grep bytewax)

bytewax 0.11.0

Operating System version (uname -morp)

Microsoft Windows [10.0.22.621.1702]

Relevant log output

No response

Steps to Reproduce

  1. pip install bytewax
    image

No error from KafkaInputConfig when topic doesn't exist

Bug Description

When you start a dataflow and the topic does not exist the dataflow will not share any details about that and continue to run in silence.

Python version (python -V)

3.9

Bytewax version (pip list | grep bytewax)

0.11.0

Relevant log output

No response

Steps to Reproduce

start a dataflow with a topic that doesn't exist

KafkaInputConfig with SASL_SCRAM

Allow KafkaInputConfig to authenticate over SASL_SCRAM.

Args
sasl-mechanism
    The authentication mechanism to use: SCRAM-SHA-256 or SCRAM-SHA-512
user
    SASL user to be used for authentication
password
    SASL password to be used for authentication

[FEATURE] Publish package for Python 3.11

Is your feature request related to a problem? Please describe.
We currently publish packages for python 3.7-3.10

Describe the solution you'd like
Add Python 3.11 to the CI matrix

[FEATURE] Support hopping window

Is your feature request related to a problem? Please describe.

https://learn.microsoft.com/en-us/azure/stream-analytics/stream-analytics-window-functions#hopping-window

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

[FEATURE] Support stream/table joins

Is it possible to use bytewax for joining content of different kafka topics (similar to what ksqldb is doing) ?

doing this will be an example of integrating "persistent queries" (permanent background processes that never stops). is this a good use case for bytewax?

also comparing to ksqldb, what happens if the bytewax workers are killed? will those persistent queries restart automatically when workers come back up and will they use the latest/earliest committed offset on the kafka topics ? or is the restart manual?

cheers

Bytewax 0.13.1 not found in colab environment

Bug Description

Bytewax 0.13.1 can not be found in some instances of running https://colab.research.google.com/gist/awmatheson/cfb97c520aaf8aada80607085bc8901f/pydata-2022.ipynb despite the same OS and same Python version as other versions.

Python version (python -V)

Python 3.8.15

Bytewax version (pip list | grep bytewax)

Bytewax 0.13.1

Relevant log output

`pip install bytewax==0.13.1` output

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
ERROR: Could not find a version that satisfies the requirement bytewax==0.13.1 (from versions: 0.6.1, 0.7.0, 0.7.1, 0.8.0b0, 0.8.0b1, 0.8.0b2, 0.8.0, 0.9.0, 0.10.0, 0.10.1, 0.11.0)
ERROR: No matching distribution found for bytewax==0.13.1

cat /etc/os-release output

NAME="Ubuntu"
VERSION="18.04.6 LTS (Bionic Beaver)"
ID=ubuntu
ID_LIKE=debian
PRETTY_NAME="Ubuntu 18.04.6 LTS"
VERSION_ID="18.04"
HOME_URL="https://www.ubuntu.com/"
SUPPORT_URL="https://help.ubuntu.com/"
BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"
PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"
VERSION_CODENAME=bionic
UBUNTU_CODENAME=bionic


### Steps to Reproduce

1. click --> https://colab.research.google.com/gist/awmatheson/cfb97c520aaf8aada80607085bc8901f/pydata-2022.ipynb
2. pip install bytewax==0.13.1

[FEATURE] Add support for environment variables in waxctl

Problem

I am trying to deploy a dataflow to EC2 using waxctl. My Python code is a package, that depends on a couple of environment variables to run.

I tried to pass them to my EC2 runtime by defining them in a setup-ec2.sh script, as follows

export HOPSWORKS_PROJECT_NAME=my_project_name
export HOPSWORKS_API_KEY=my_api_key

and then pass this script as --system-setup-file-name parameter.

$ waxctl aws deploy project-files.tar \
		-f src/dataflow.py \
		--system-setup-file-name ./setup-ec2.sh \
		--region us-east-1 \
		-t t2.micro \
		--name "bytewax" \
		--python-package \
		--debug

However, when I deploy the dataflow, it crashes as soon as I try to fetch these env variables in my Python code using os.environ['HOPSWORKS_PROJECT_NAME']

Ideal solution

Can you make these export statement in my setup-ec2.sh work?

KafkaInputConfig pickling error

Bug Description

When using spawn_cluster with KafkaInputConfig, you will get a pickling error, as in the following example:

https://github.com/bytewax/real-time-data-enrichment-with-bytewax/blob/main/src/dataflow.py

TypeError: argument 'auto_commit': 'int' object cannot be converted to 'PyBool'

Python version (python -V)

Python 3.x

Bytewax version (pip list | grep bytewax)

v0.10.0

Relevant log output

Process SpawnPoolWorker-1:
Traceback (most recent call last):
  File "/home/whoahbot/.pyenv/versions/bytewax/lib/python3.9/site-packages/multiprocess/process.py", line 315, in _bootstrap
    self.run()
  File "/home/whoahbot/.pyenv/versions/bytewax/lib/python3.9/site-packages/multiprocess/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/whoahbot/.pyenv/versions/bytewax/lib/python3.9/site-packages/multiprocess/pool.py", line 114, in worker
    task = get()
  File "/home/whoahbot/.pyenv/versions/bytewax/lib/python3.9/site-packages/multiprocess/queues.py", line 370, in get
    return _ForkingPickler.loads(res)
  File "/home/whoahbot/.pyenv/versions/bytewax/lib/python3.9/site-packages/dill/_dill.py", line 387, in loads
    return load(file, ignore, **kwds)
  File "/home/whoahbot/.pyenv/versions/bytewax/lib/python3.9/site-packages/dill/_dill.py", line 373, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "/home/whoahbot/.pyenv/versions/bytewax/lib/python3.9/site-packages/dill/_dill.py", line 646, in load
    obj = StockUnpickler.load(self)
TypeError: argument 'auto_commit': 'int' object cannot be converted to 'PyBool'

Steps to Reproduce

Enable at-least-once delivery for ACK-based input sources via GC callback

Right now, if you want to use ACK-based input sources (e.g. SQS, RabbitMQ), you have to ACK immediately upon message receipt (because otherwise you can never ACK unless you do some very hacky and input specific things like send the raw message type through the dataflow and ACK on output in a custom connector). This means that those input systems are limited to at-most-once delivery.

A way we could facilitate at-least-once delivery is by adding a StatefulSource.garbage_collect(self, state) callback that is called when an epoch is older than the cluster frontier (meaning all outputs and backups for it have completed) with the snapshot that was taken at the end of that epoch.

Then, a SQS input source could do something like (made up SQS Python API):

class _SQSSource(StatefulSource):
    def __init__(self, queue_name, resume_state):
        self._queue = SQSQueueConsumer(queue_name)
        self._in_flight_msg_ids = resume_state or set()

    def next(self):
        msg = self._queue.poll(0.001)
        if msg is None:
            return
        else:
            self._in_flight_msg_ids.insert(msg.id())
            return msg.payload()

    def snapshot(self):
        return self._in_flight_msg_ids

    def garbage_collect(self, state):
        closed_in_flight_msg_ids = state
        for msg_id in closed_in_flight_msg_ids:
            self._queue.ack(msg_id)
            self._in_flight_msg_ids.remove(msg_id)

    def close(self):
        self._queue.close()

I believe this will work because any messages consumed during an epoch that is "rolled back" during recovery will eventually visibility timeout and be delivered again. This requires some coordination with the SQS visibility timeout value; it'd have to be longer than the epoch interval so that you don't redeliver before GC could even be called.

Returning the snapshot state on GC instead of baking in the concept of message IDs directly is cool because then you can support other input sources that have "dynamic retention". The one that comes to mind is if you wanted to build a Postgres logical replication input. You need to be periodically "ACKing" up to an LSN so the replication slot does not retain changes forever.

Something like (made up Postgres Python API, but inspired by psychopg2's replication API):

class _PGLRSource(StatefulSource):
    def __init__(self, conn, resume_state):
        self._lsn = resume_state
        self._cursor = conn.start_replication(start_lsn=self._lsn)
        
    def next(self):
        msg = self._cursor.read_message()
        if msg is None:
            return
        else if msg.data_start > self._lsn:  # Skip the msg at the recovery LSN.
            self._lsn = msg.data_start
            return msg.payload

    def snapshot(self):
        return self._lsn

    def garbage_collect(self, state):
        lsn = state
        self._cursor.send_feedback(flush_lsn=lsn)

    def close(self):
        self._queue.close()

Unable to build on windows platform

Bug Description

Bytewax cannot be compiled and built on windows platform due to rdkafka library.
Compile conditionally for windows platform and it will work fine.

Python version (python -V)

rust issue

Bytewax version (pip list | grep bytewax)

bytewax 0.16.1

Relevant log output

No response

Steps to Reproduce

cargo build

Recovery state will be incorrect if any worker crashes during the initial epoch

Bug Description

During dataflow execution, we write out whenever a worker completes an epoch. We use these written epochs to re-derive the progress of the entire cluster. Unfortunately, though, on resume we do not know beforehand how many worker's progress messages we're waiting for. Thus if the cluster crashes after worker 1 has written progress but worker 2 has not, then upon resume, it'll look like you're resuming from just a cluster of size 1 and will silently skip data.

We need to have some way of synchronously waiting for all workers to confirm they've written a marker of their existence to the recovery store before each execution (resume or not).

Python version (python -V)

Python 3.10.6

Bytewax version (pip list | grep bytewax)

0.13.1

Relevant log output

No response

Steps to Reproduce

I don't have some example code yet, but this should theoretically be possible.

CPU usage of workers jumps to 100 percent after input source finishes

Bug Description

When using spawn_cluster and you create more workers than 'distribute' is able to allocate work to, the workers that finish early immediately jump to 100 percent cpu usage until the whole job finishes.

This is possibly related to #161 but seems different, because in this case we know for a fact there is no more input.

Python version (python -V)

3.10

Bytewax version (pip list | grep bytewax)

0.15.1

Relevant log output

No response

Steps to Reproduce

  • Spawn a cluster with many more workers than work available to them.
  • Have some of your manual input functions exit as they have no more work.
  • The cpu usage of those workers with no more input will spike to 100 percent while waiting for other workers to finish.

TypeError: KafkaInputConfig.__new__() got an unexpected keyword argument 'sasl.username'

Bug Description

Trying to use the SASL config with KafkaInputConfig on Bytewax 0.11.1 that was introduced in #119, however recieving an input validation error:

TypeError: KafkaInputConfig.__new__() got an unexpected keyword argument 'sasl.username'

Python version (python -V)

3.9.7

Bytewax version (pip list | grep bytewax)

0.11.1

Relevant log output

Traceback (most recent call last):
  File "/Users/shahnewazkhan/projects/dapper/data-platform/wranglers/stream_processors/auth0_logs/bytewax/sasl.py", line 24, in <module>
    KafkaInputConfig(
TypeError: KafkaInputConfig.__new__() got an unexpected keyword argument 'sasl.username'


### Steps to Reproduce

from bytewax.dataflow import Dataflow
from bytewax.inputs import KafkaInputConfig
from bytewax.execution import run_main
from bytewax.outputs import StdOutputConfig
import os
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

additional_configs = {
        "sasl.username":os.environ['kafkaReaderUsername'],
        "sasl.password":os.environ['kafkaReaderPassword'],
        "sasl.mechanisms":"PLAIN",
        "security.protocol":"SASL_SSL"
    }

flow = Dataflow()
flow.input(
    "inp",
    KafkaInputConfig(
        brokers=[f"{os.environ['kafkaURL']}"],
        topic=f"{os.environ['topic']}",
        starting_offset="beginning",
        tail=False,
        **additional_configs,
    ),
)

def deserialize(key_bytes__payload_bytes):
    key_bytes, payload_bytes = key_bytes__payload_bytes
    logger.info(f'TYPE OF BYTES {type(key_bytes)}')
    return key_bytes__payload_bytes


flow.map(deserialize)

flow.capture(StdOutputConfig())

run_main(flow)

Support Schema Registry

I'm using KafkaInputConfig and I need to use schema registry to consume and produce messages in avro in kafka, I'm having a lot of difficulties using it, does it have support? Do we have an example on how to use it?

Output of window operators is very delayed when dataflow is waiting for input

Bug Description

If you have a dataflow that has an input source that is currently waiting on new input, the downstream output of a windowing operator will only be emitted on the epoch boundary (which defaults to every 10 seconds) even though they should be emitted ASAP after the closure of the window. This will make it appear the the sliding/tumbling window "length" argument has little effect, and instead the "epoch length" argument of the entry point is actually what controls how often windows are closed.

Take note: The epoch length should have no affect on the semantics of the dataflow, it is supposed to be solely an operational knob to adjust load due to the recovery system.

Fix Implementation: This bug is due to the fact that we only query next_awake during the snapshotting process. This should happen after every logic activation.

Python version (python -V)

Python 3.10.6

Bytewax version (pip list | grep bytewax)

bytewax 0.15.1

Relevant log output

No response

Steps to Reproduce

This would happen if you e.g. have input from a Kafka topic that only gets data on occasion.

[FEATURE] Support sliding window

Is your feature request related to a problem? Please describe.

https://learn.microsoft.com/en-us/azure/stream-analytics/stream-analytics-window-functions#hopping-window

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

Broken links in simple-example page

Bug Description

https://docs.bytewax.io/getting-started/simple-example

In the above page docs to operators are broken. Below are some of the links. They seem to be referring to "operators/operators" page and I guess they should be referring to "apidocs" page instead.

https://docs.bytewax.io/operators/operators/#map (correct link - https://docs.bytewax.io/apidocs#bytewax.Dataflow.map)
https://docs.bytewax.io/operators/operators/#flat-map
https://docs.bytewax.io/operators/operators#reduce-epoch

Python version (python -V)

3.10.0

Bytewax version (pip list | grep bytewax)

latest

Relevant log output

No response

Steps to Reproduce

https://docs.bytewax.io/getting-started/simple-example has links to docs that are broken.

Installing Bytewax for Python 3.8.9 on MacOS Fails

Bug Description

When installing Bytewax for Python 3.8.9 on MacOS, pip tries to build Bytewax and then requires Maturin and Rust. This is an older 9.8 version, but is still used. When I tried to simulate this in a docker container I couldn't replicate the behavior. I could also not replicate the behavior with Conda, since Conda skips 3.8.9.

Python version (python -V)

Python 3.8.9

Bytewax version (pip list | grep bytewax)

bytewax 0.8.0

Relevant log output

Collecting bytewax
  Downloading bytewax-0.8.0.tar.gz (3.2 MB)
     |████████████████████████████████| 3.2 MB 4.0 MB/s 
  Installing build dependencies ... error
  ERROR: Command errored out with exit status 1:
   command: /Users/awmatheson/.pyenv/versions/3.8.9/envs/py389/bin/python3.8 /Users/awmatheson/.pyenv/versions/3.8.9/envs/py389/lib/python3.8/site-packages/pip install --ignore-installed --no-user --prefix /private/var/folders/lx/0qxsj_495jl6_8vg_0wpj24h0000gn/T/pip-build-env-bll3g_fv/overlay --no-warn-script-location --no-binary :none: --only-binary :none: -i https://pypi.org/simple -- 'maturin>=0.12,<0.13'
       cwd: None
  Complete output (59 lines):
  Collecting maturin<0.13,>=0.12
    Downloading maturin-0.12.11.tar.gz (141 kB)
    Installing build dependencies: started
    Installing build dependencies: finished with status 'done'
    Getting requirements to build wheel: started
    Getting requirements to build wheel: finished with status 'done'
      Preparing wheel metadata: started
      Preparing wheel metadata: finished with status 'done'
  Collecting tomli>=1.1.0; python_version < "3.11"
    Using cached tomli-2.0.1-py3-none-any.whl (12 kB)
  Building wheels for collected packages: maturin
    Building wheel for maturin (PEP 517): started
    Building wheel for maturin (PEP 517): finished with status 'error'
    ERROR: Command errored out with exit status 1:
     command: /Users/awmatheson/.pyenv/versions/3.8.9/envs/py389/bin/python3.8 /Users/awmatheson/.pyenv/versions/3.8.9/envs/py389/lib/python3.8/site-packages/pip/_vendor/pep517/_in_process.py build_wheel /var/folders/lx/0qxsj_495jl6_8vg_0wpj24h0000gn/T/tmppmkzoa2r
         cwd: /private/var/folders/lx/0qxsj_495jl6_8vg_0wpj24h0000gn/T/pip-install-mzpj84mt/maturin
    Complete output (36 lines):
    running bdist_wheel
    running build
    installing to build/bdist.macosx-12.1-x86_64/wheel
    running install
    Traceback (most recent call last):
      File "/Users/awmatheson/.pyenv/versions/3.8.9/envs/py389/lib/python3.8/site-packages/pip/_vendor/pep517/_in_process.py", line 280, in <module>
        main()
      File "/Users/awmatheson/.pyenv/versions/3.8.9/envs/py389/lib/python3.8/site-packages/pip/_vendor/pep517/_in_process.py", line 263, in main
        json_out['return_val'] = hook(**hook_input['kwargs'])
      File "/Users/awmatheson/.pyenv/versions/3.8.9/envs/py389/lib/python3.8/site-packages/pip/_vendor/pep517/_in_process.py", line 204, in build_wheel
        return _build_backend().build_wheel(wheel_directory, config_settings,
      File "/private/var/folders/lx/0qxsj_495jl6_8vg_0wpj24h0000gn/T/pip-build-env-etccmpel/overlay/lib/python3.8/site-packages/setuptools/build_meta.py", line 216, in build_wheel
        return self._build_with_temp_dir(['bdist_wheel'], '.whl',
      File "/private/var/folders/lx/0qxsj_495jl6_8vg_0wpj24h0000gn/T/pip-build-env-etccmpel/overlay/lib/python3.8/site-packages/setuptools/build_meta.py", line 202, in _build_with_temp_dir
        self.run_setup()
      File "/private/var/folders/lx/0qxsj_495jl6_8vg_0wpj24h0000gn/T/pip-build-env-etccmpel/overlay/lib/python3.8/site-packages/setuptools/build_meta.py", line 145, in run_setup
        exec(compile(code, __file__, 'exec'), locals())
      File "setup.py", line 118, in <module>
        setup(
      File "/private/var/folders/lx/0qxsj_495jl6_8vg_0wpj24h0000gn/T/pip-build-env-etccmpel/overlay/lib/python3.8/site-packages/setuptools/__init__.py", line 153, in setup
        return distutils.core.setup(**attrs)
      File "/Users/awmatheson/.pyenv/versions/3.8.9/lib/python3.8/distutils/core.py", line 148, in setup
        dist.run_commands()
      File "/Users/awmatheson/.pyenv/versions/3.8.9/lib/python3.8/distutils/dist.py", line 966, in run_commands
        self.run_command(cmd)
      File "/Users/awmatheson/.pyenv/versions/3.8.9/lib/python3.8/distutils/dist.py", line 985, in run_command
        cmd_obj.run()
      File "/private/var/folders/lx/0qxsj_495jl6_8vg_0wpj24h0000gn/T/pip-build-env-etccmpel/overlay/lib/python3.8/site-packages/wheel/bdist_wheel.py", line 335, in run
        self.run_command('install')
      File "/Users/awmatheson/.pyenv/versions/3.8.9/lib/python3.8/distutils/cmd.py", line 313, in run_command
        self.distribution.run_command(command)
      File "/Users/awmatheson/.pyenv/versions/3.8.9/lib/python3.8/distutils/dist.py", line 985, in run_command
        cmd_obj.run()
      File "setup.py", line 60, in run
        raise RuntimeError(
    RuntimeError: cargo not found in PATH. Please install rust (https://www.rust-lang.org/tools/install) and try again
    ----------------------------------------
    ERROR: Failed building wheel for maturin
  Failed to build maturin
  ERROR: Could not build wheels for maturin which use PEP 517 and cannot be installed directly
  WARNING: You are using pip version 20.2.3; however, version 22.0.4 is available.
  You should consider upgrading via the '/Users/awmatheson/.pyenv/versions/3.8.9/envs/py389/bin/python3.8 -m pip install --upgrade pip' command.
  ----------------------------------------
ERROR: Command errored out with exit status 1: /Users/awmatheson/.pyenv/versions/3.8.9/envs/py389/bin/python3.8 /Users/awmatheson/.pyenv/versions/3.8.9/envs/py389/lib/python3.8/site-packages/pip install --ignore-installed --no-user --prefix /private/var/folders/lx/0qxsj_495jl6_8vg_0wpj24h0000gn/T/pip-build-env-bll3g_fv/overlay --no-warn-script-location --no-binary :none: --only-binary :none: -i https://pypi.org/simple -- 'maturin>=0.12,<0.13' Check the logs for full command output.

Steps to Reproduce

  1. brew install pyenv pyenv-virtualenv
  2. eval "$(pyenv init --path)"
  3. eval "$(pyenv init -)"
  4. pyenv install 3.8.9
  5. pyenv virtualenv 3.8.9 py389
  6. pyenv local py389
  7. pyenv activate py389

bug, due to a serialization pass on the dataflow

Bug Description

Hi there,
I am trying to adjust the example here: https://github.com/bytewax/bytewax/blob/main/examples/event_time_processing.py to my project needs. At the moment it is still pretty much identical. But i already get an error when running the dataflow. I think this is the critical part
` def acc_values(acc, event):
acc.append(event['bid'])
return acc

flow.fold_window("running_average", cc, wc, list, acc_values)`

I throws following error:
thread 'webserver-threads' panicked at 'called Result::unwrap()on anErrvalue: PyErr { type: <class 'TypeError'>, value: TypeError("<class 'list'> can not be JSON encoded: Object of type type is not JSON serializable"), traceback: Some(<traceback object at 0x108fccd00>) }', src/webserver/mod.rs:29:35 note: run withRUST_BACKTRACE=1 environment variable to display a backtrace

Python version (python -V)

Python 3.10.9

Bytewax version (pip list | grep bytewax)

bytewax==0.14.0

Relevant log output

No response

Steps to Reproduce

  1. Checkout and run https://github.com/bytewax/bytewax/blob/main/examples/event_time_processing.py

Stateful input sources

Find a way to have our input sources backup their state into the recovery system. This will allow us to properly resume.

[FEATURE] Provide the option to include Kafka headers with Kafka InputConfig

Is your feature request related to a problem? Please describe.
Currently it is not possible to parse Kafka headers using the Kafka input config. Kafka headers might contain valuable information. E.g. CloudEvents propagate various fields like event_id and timestamp in kafka headers.

Describe the solution you'd like
I would like to be able to optionally retrieve Kafka Headers alongside with the message to use the header information in my dataflow.

Describe alternatives you've considered
The alternative is to customize the message to include all necessary information in the body

Scheduled awake times should be part of recovery state

Bug Description

Currently StatefulUnary does not persist the "awake times". This means that if you're doing a windowing operation and you crash while you're waiting for a timeout to fire so a window closes, upon recovery that window will not close until you hit that key again. This means that the behavior across resume is not identical.

We'll need to think through any details on what that means for system time windows. I think it just means do everything the same, as any scheduled wakeups that were scheduled during downtime will be past and thus triggered.

Python version (python -V)

Python 3.10.4

Bytewax version (pip list | grep bytewax)

bytewax 0.11.1

Relevant log output

No response

Steps to Reproduce

I think this will do it:

def test_fold_window(recovery_config):
    flow = Dataflow()

    # Remember that clocks are built per-key so the `TestingClock` in
    # this 2 key input has sort of strange behavior.
    inp = [
        {"user": "a", "type": "login_a"},  # +0
        {"user": "a", "type": "post_a"},  # +4
        {"user": "a", "type": "post_a"},  # +8
        {"user": "b", "type": "login_b"},  # +0
        # Close window for "a" at +10.
        "BOOM",
        {"user": "a", "type": "post_a"},  # +12
        {"user": "b", "type": "post_b"},  # +4
        {"user": "b", "type": "post_b"},  # +8
        # Close window for "a" and "b" b/c EOF.
    ]
    flow.input("inp", TestingInputConfig(inp))

    armed = Event()
    armed.set()

    def trigger(item):
        if item == "BOOM":
            if armed.is_set():
                raise RuntimeError("BOOM")
            else:
                return []
        else:
            return [item]

    flow.flat_map(trigger)

    def key_off_user(event):
        return event["user"], event["type"]

    flow.map(key_off_user)

    def count(counts, typ):
        if typ not in counts:
            counts[typ] = 0
        counts[typ] += 1
        return counts

    start_at = datetime(2022, 1, 1)
    clock_config = TestingClockConfig(item_incr=timedelta(seconds=4), start_at=start_at)
    window_config = TumblingWindowConfig(
        length=timedelta(seconds=10), start_at=start_at
    )
    flow.fold_window("count", clock_config, window_config, dict, count)

    out = []
    flow.capture(TestingOutputConfig(out))

    with raises(RuntimeError):
        run_main(flow, epoch_config=epoch_config, recovery_config=recovery_config)

    assert out == [
        ("a", {"login_a": 1, "post_a": 2}),
    ]

    # Disable bomb
    armed.clear()
    out.clear()

    # Recover
    run_main(flow, epoch_config=epoch_config, recovery_config=recovery_config)

    assert out == [
        ("b", {"login_b": 1, "post_b": 2}),
        ("a", {"post_a": 1}),
    ]

[FEATURE] Discussion about the use of multiprocessing

Is your feature request related to a problem? Please describe.

To spawn a cluster we use the spawn_cluster function, which is defined in python and does this:

def _gen_addresses(proc_count: int) -> Iterable[str]:
    return [f"localhost:{proc_id + 2101}" for proc_id in range(proc_count)]


def spawn_cluster(
    flow: Dataflow,
    *,
    epoch_config: Optional[EpochConfig] = None,
    recovery_config: Optional[RecoveryConfig] = None,
    proc_count: int = 1,
    worker_count_per_proc: int = 1,
    mp_ctx=get_context("spawn"),
) -> List[Tuple[int, Any]]:

    addresses = _gen_addresses(proc_count)
    with mp_ctx.Pool(processes=proc_count) as pool:
        futures = [
            pool.apply_async(
                cluster_main,
                (flow,),
                {
                    "epoch_config": epoch_config,
                    "recovery_config": recovery_config,
                    "addresses": addresses,
                    "proc_id": proc_id,
                    "worker_count_per_proc": worker_count_per_proc,
                },
            )
            for proc_id in range(proc_count)
        ]
        pool.close()

        for future in futures:
            # Will re-raise exceptions from subprocesses.
            future.get()

        pool.join()

It spawns proc_count processes using multiprocess.
This works fine, but it requires all the arguments of apply_sync to be pickable, which is limiting us in some cases (for example we can't use decorators that return generators), and it requires some boilerplate on the Rust side (see the "Egregious hack").

The point is that we don't use any of the multiprocessing utilities, except in a couple of tests, so if we can find a way to avoid using multiprocessing we could remove a lot of the boilerplate, remove a dependency, and avoid the pitfalls of requiring everything to be pickable.

Describe the solution you'd like

Since we don't need to share state between the processes (not in python at least), we could make spawn_cluster spawn subprocesses in the system, so that the python file is evaluated anew for each process.

Something like this:

def spawn_cluster(
    flow: Dataflow,
    *,
    epoch_config: Optional[EpochConfig] = None,
    recovery_config: Optional[RecoveryConfig] = None,
    proc_count: int = 1,
    worker_count_per_proc: int = 1,
) -> List[Tuple[int, Any]]:
    addresses = _gen_addresses(proc_count)

    proc_id = os.getenv("BYTEWAX_PROC_ID", None)
    if proc_id is not None:
        cluster_main(
            flow,
            addresses,
            int(proc_id),
            epoch_config=epoch_config,
            recovery_config=recovery_config,
            worker_count_per_proc=worker_count_per_proc,
        )
    else:
        processes = []
        for proc_id in range(proc_count):
            env = os.environ.copy()
            env["BYTEWAX_PROC_ID"] = f"{proc_id}"
            processes.append(subprocess.Popen(["python", *sys.argv], env=env))

        while True:
            if all([process.poll() is not None for process in processes]):
                break

Describe alternatives you've considered
I don't particularly like this solution, but it's one way to solve the problem.
I opened this issue to discuss possible alternatives.

Another thing we could do is make the spawn_cluster an "external" script, so not called from python directly.

We could remove the spawn_cluster function from the python api, and let users use just cluster_main.
cluster_main could read the other parameters (proc_id, worker_count_per_proc and addresses) from args, or another env var, and the spawn_process script (which could also be written in rust, or a shell script, or another python script, it doesn't really matter) would set the variables similarly to what I proposed above:

if __name__ == '__main__':
    cluster_main(flow, epoch_config=epoch_config, recovery_config=recovery_config)

Which then you'd run with:

spawn_cluster dataflow.py -p 2 -w 3

But I still haven't properly explored this solution, it might hide some other problems

Additional context
Let me know what you think, and if you have other ideas I can spend some time building a proof of concept and see if it could work

[FEATURE] Waxctl to install non-Python dependencies on EC2, like gcc

I am trying to run a dataflow on EC2, where one of my Python dependencies (twofish) requires gcc to install. As gcc is not installed by default in an EC2 instance, the dataflow cannot run.

It would be great if I could pass some kind of setup.sh script as parameter to waxctl, where I would add something like

sudo yum update
sudo yum groupinstall "Development Tools"

Waxctl could run this script before the pip install -r requirements, and this way ensure the build process completes.

Missing Autocomplete in VSCode / PyCharm

Bug Description

Hello together,

I use bytewax within a virtual environment. Environment was created under Python 3.10.9 using the command
python -m venv .venv
and then I activated the environment. I installed bytewax via pip and can create imports.

from bytewax.dataflow import Dataflow

The problem is that "Dataflow" is not found within the IDE. This means that autocomplete, etc. do not work.

However, I can execute and use Dataflow without any problems. It is only annoying within the IDEs.

Python version (python -V)

Python 3.10.9

Bytewax version (pip list | grep bytewax)

0.14.0

Relevant log output

Cannot find reference 'Dataflow' in 'dataflow.py'

Steps to Reproduce

  1. Create new project in PyCharm / VSCode (under MacOS 13.1)
  2. Create and activate venv
  3. pip install bytewax
  4. create new .py file with
    from bytewax.dataflow import Dataflow

Issue using KafkaInputConfig with spawn_cluster

Bug Description

When running the dataflow in this repo that uses the KafkaInputConfig the dataflow works correctly while using cluster_main, but when substituting cluster_main with spawn_cluster the dataflow fails with the error shown in the log output below.

Python version (python -V)

Python 3.9

Bytewax version (pip list | grep bytewax)

bytewax 0.10.0

Relevant log output

Process SpawnPoolWorker-1:
Traceback (most recent call last):
  File "/Users/awmatheson/anaconda3/envs/bytewax39/lib/python3.9/site-packages/multiprocess/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/awmatheson/anaconda3/envs/bytewax39/lib/python3.9/site-packages/multiprocess/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/awmatheson/anaconda3/envs/bytewax39/lib/python3.9/site-packages/multiprocess/pool.py", line 114, in worker
    task = get()
  File "/Users/awmatheson/anaconda3/envs/bytewax39/lib/python3.9/site-packages/multiprocess/queues.py", line 371, in get
    return _ForkingPickler.loads(res)
  File "/Users/awmatheson/anaconda3/envs/bytewax39/lib/python3.9/site-packages/dill/_dill.py", line 327, in loads
    return load(file, ignore, **kwds)
  File "/Users/awmatheson/anaconda3/envs/bytewax39/lib/python3.9/site-packages/dill/_dill.py", line 313, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "/Users/awmatheson/anaconda3/envs/bytewax39/lib/python3.9/site-packages/dill/_dill.py", line 525, in load
    obj = StockUnpickler.load(self)
TypeError: argument 'auto_commit': 'int' object cannot be converted to 'PyBool'
^CProcess SpawnPoolWorker-2:
Traceback (most recent call last):
  File "/Users/awmatheson/bytewax/real-time-data-enrichment-with-bytewax/src/dataflow.py", line 55, in <module>
    spawn_cluster(flow, input_config, output_builder)
  File "/Users/awmatheson/anaconda3/envs/bytewax39/lib/python3.9/site-packages/bytewax/execution.py", line 168, in spawn_cluster
    future.get()
  File "/Users/awmatheson/anaconda3/envs/bytewax39/lib/python3.9/site-packages/multiprocess/pool.py", line 765, in get
    self.wait(timeout)
  File "/Users/awmatheson/anaconda3/envs/bytewax39/lib/python3.9/site-packages/multiprocess/pool.py", line 762, in wait
    self._event.wait(timeout)
  File "/Users/awmatheson/anaconda3/envs/bytewax39/lib/python3.9/threading.py", line 574, in wait
    signaled = self._cond.wait(timeout)
  File "/Users/awmatheson/anaconda3/envs/bytewax39/lib/python3.9/threading.py", line 312, in wait
    waiter.acquire()
KeyboardInterrupt

Steps to Reproduce

Follow the steps in the README.md and run the dataflow with spawn_cluster instead of cluster_main

[FEATURE] Support session window

Is your feature request related to a problem? Please describe.

https://learn.microsoft.com/en-us/azure/stream-analytics/stream-analytics-window-functions#session-window

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

Unhelpful error when missing AdvanceTo and Emit

Bug Description

When missing the decorator yield_epochs or specifying AdvanceTo and Emit there is an error that is raised that is not particularly helpful. It points to the code line src/execution/mod.rs:94:29 and a message Unknown input action but does not point to the missing decorator or the missing calls AdvanceTo and Emit.

Python version (python -V)

Python 3.9

Bytewax version (pip list | grep bytewax)

bytewax 0.9.0

Relevant log output

thread 'timely:work-0' panicked at 'Unknown input action', src/execution/mod.rs:94:29
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Steps to Reproduce

Run the Dataflow below and you will get the error.

from websocket import create_connection

from bytewax import Dataflow, inputs, spawn_cluster

ws = create_connection("wss://ws-feed.pro.coinbase.com")
ws.send(
    json.dumps(
        {
            "type": "subscribe",
            "product_ids": ['BTC-USD'],
            "channels": ["level2"],
        }
    )
)
print(ws.recv())

def input_builder(worker_index, worker_count):
    return inputs.fully_ordered(ws_input())


def ws_input():
    while True:
        yield (ws.recv())


def output_builder(worker_index, worker_count):
    return print


flow = Dataflow()
flow.capture()

if __name__ == "__main__":
    spawn_cluster(flow, input_builder, output_builder)

Add input config for websockets

Is your feature request related to a problem? Please describe.
There is a lot of repetitive code in connecting to various data sources like websockets, sse, kafka topics etc. It would be really nice to have helpers here that could eliminate the need to re-write this code. This scopes adding a single helper for websockets.

Describe the solution you'd like
A first pass could be similar to inputs.py. A sources.py file with the common steps patterns for using a websocket in your dataflow (connect, send, recv, keep_alive etc.)

def input_builder(worker_index, worker_count):
  return websocket_source('ws://my-ws', my-params)

Describe alternatives you've considered
Currently its defined in the dataflow Python file and placed in an input builder. Another alternative would

Explicit error messages on malformed `(key, value)` items

We have a few places where we have to cast Python types to specific Rust types to interface with Timely correctly. One big example is if you use stateful_map or reduce, your input needs to be Python (key, value) 2-tuples and we attempt to parse them apart. If the items at that point in the dataflow aren't 2-tuples, we error with a very generic error about types. We should instead report something explicitly like "items at this point in the dataflow should be (key, value) 2-tuples, but you have XXX".

[FEATURE] Fluent API for Dataflow methods

Is your feature request related to a problem? Please describe.
I've worked with a lot of APIs over the years and have become a proponent of Fluent style for this kind of library. IMHO it would improve the developer experience of using the Bytewax library. In particular, I am proposing changes to the Dataflow class.

Current Style

flow = Dataflow()
flow.input("inp", FileInput("wordcount.txt"))
flow.map(lower)
flow.flat_map(tokenize)
flow.map(initial_count)
flow.reduce_window("sum", clock_config, window_config, add)
flow.output("out", StdOutput())

Describe the solution you'd like
A clear and concise description of what you want to happen.

Proposed Style

flow = Dataflow() \
	.input("inp", FileInput("wordcount.txt")) \
	.map(lower) \
	.flat_map(tokenize) \
	.map(initial_count) \
	.reduce_window("sum", clock_config, window_config, add) \
	.output("out", StdOutput())

This would support use of method chaining with the Dataflow class. The implementation of this approach only requires having methods that currently return None would return a reference to self instead. IMHO it should be easy to implement. Happy to discuss further if it helps.

Describe alternatives you've considered
The alternative is that I have to create a wrapper around the Dataflow class and delegate every method call.

Additional context
Add any other context or screenshots about the feature request here.

Try to make idle workers not spin

As referenced in this comment worker threads will "spin" on Worker::step if there's no work. This means unnecessarily higher CPU usage for an "idle" dataflow.

We should see if we can find a way to introduce some waiting to slow down this spin.

There are a few mechanisms here we need to be aware of when thinking about a solution:

  1. Timely gives us Worker::step_or_park which will cause the worker thread to "park" if there's no work for up to a specified timeout. Timely will automatically wake up when there is new work for the operators, though.

  2. This timeout controls how long until the worker main code loop is able to cycle, which we need to do to communicate interrupt flags. So we can't set it to "forever" and have Timely do everything.

  3. We also never know when an input operator will be getting new data. So currently, we call activator.activate() during each awakening to try again ASAP. This itself might need a different timeout depending on the data source. It might be possible to use a SyncActivator and pass that out of the input source so some other thread can monitor the readability of sockets, files, etc and wake up that operator without timeouts. But that seems tricky.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.