Comments (3)
Hi, that's some good observation, and in the specific case you posted, it's also the expected behavior.
Bytewax can parallelize your dataflow, but multiple python threads in the same process will still share the same GIL.
There are a couple of things that I'd correct on the input source as well, but they won't change the result that much. But just as an advice, if you want to create a number of partitions that matches the dynamic number of workers, you should probably use DynamicSource
as a base class. Something like this:
class TestingPartition(StatelessSourcePartition):
def __init__(self, it, batch_size):
self.it = it
self.batch_size = batch_size
def next_batch(self):
return [next(self.it) for i in range(self.batch_size)]
class TestingSource(DynamicSource):
def __init__(self, num_records, batch_size = 1):
print(f"Created source for input: {threading.get_ident()}")
self._num_records = num_records
self._batch_size = batch_size
def build(self, worker_index, worker_count):
records_per_part = self._num_records // worker_count
print(f"Created part for input: [thread: {threading.get_ident()},worker_index: {worker_index}, part_records: {records_per_part}]")
return TestingPartition(iter(range(records_per_part)), self._batch_size)
The point is that if each partition runs on a different thread that uses the same python processes, the next_batch
calls won't be able to run concurrently, because they will be fighting over trying to acquire the same lock.
Now, if instead of doing that you were doing any operation that releases the GIL while ongoing, like making a web request, you would see a real increase in performance. Try something like this:
import bytewax.operators as op
import requests
from bytewax.dataflow import Dataflow
from bytewax.inputs import DynamicSource, StatelessSourcePartition
from bytewax.outputs import DynamicSink, StatelessSinkPartition
class _NullSink(StatelessSinkPartition):
def write_batch(self, items):
pass
class NullOutput(DynamicSink):
def build(self, worker_index, worker_count):
return _NullSink()
class TestingPartition(StatelessSourcePartition):
def __init__(self, num):
self.num = num
self.counter = 0
def next_batch(self):
self.counter += 1
if self.counter > self.num:
raise StopIteration()
# Add your url here, beware of the number of calls you make
return [requests.get("<an-url>")]
class TestingSource(DynamicSource):
def __init__(self, num_records):
self._num_records = num_records
def build(self, worker_index, worker_count):
records_per_part = self._num_records // worker_count
return TestingPartition(records_per_part)
def run(records=10):
flow = Dataflow('dataflow')
stream = op.input('in', flow, TestingSource(records))
op.output('out', stream, NullOutput())
return flow
Running it with a single worker I get ~3s
, running with 8 workers it takes ~0.8s
and with 16 workers only ~0.1s
.
But as you noticed, not all work can run free of the GIL for most of the time.
When that's the case, you should use bytewax's ability to run separate processes instead of multiple workers in the same process. We have a testing runner to try this easily, but follow the guide in the documentation once you want a production setup.
Running multiple processes can face an initial delay, while all workers try to connect to each other. So a short running task might take more time than running a single process, but you can measure the improvement by measuring for example the time it takes between the first and last message processed. Let's try to modify your code to show that:
import time
import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.inputs import DynamicSource, StatelessSourcePartition
from bytewax.outputs import DynamicSink, StatelessSinkPartition
class _NullSink(StatelessSinkPartition):
def write_batch(self, items):
pass
class NullOutput(DynamicSink):
def __init__(self, **config):
self._config = config
def build(self, worker_index, worker_count):
return _NullSink()
class TestingPartition(StatelessSourcePartition):
def __init__(self, it, batch_size):
self.it = it
self.batch_size = batch_size
self.start = None
def next_batch(self):
# First time this is called we set self.start
if self.start is None:
self.start = time.time()
# If `next(self.it)` raises `StopIteration`, it
# means we consumed our iterator. Print elapsed
# time and reraise so the dataflow can stop.
try:
return [next(self.it) for i in range(self.batch_size)]
except StopIteration as e:
print(f"Elapsed: {time.time() - self.start:.2f}s")
raise e
class TestingSource(DynamicSource):
def __init__(self, num_records, batch_size = 1):
self._num_records = num_records
self._batch_size = batch_size
def build(self, worker_index, worker_count):
records_per_part = self._num_records // worker_count
return TestingPartition(iter(range(records_per_part)), self._batch_size)
def run(records=1000000, batch=1):
flow = Dataflow('dataflow')
stream = op.input('in', flow, TestingSource(records, batch))
op.output('out', stream, NullOutput())
return flow
And now try to run it.
This is the baseline, running with a single worker and single process, 100M records, 10k batch:
❯ time python -m bytewax.run "examples.scale:run(records=100_000_000, batch=10_000)"
Worker 0: 3.21s
________________________________________________________
Executed in 3.28 secs fish external
usr time 3.26 secs 0.00 millis 3.26 secs
sys time 0.02 secs 4.86 millis 0.02 secs
And here is running it with 8 workers. You can see that some workers manage to finish early, but overall the total time spent is very similar:
❯ time python -m bytewax.run "examples.scale:run(records=100_000_000, batch=10000)" -w8
Worker 7: 1.53s
Worker 5: 1.87s
Worker 2: 2.44s
Worker 0: 2.29s
Worker 3: 2.93s
Worker 6: 2.95s
Worker 1: 3.18s
Worker 4: 3.39s
________________________________________________________
Executed in 3.47 secs fish external
usr time 3.39 secs 263.00 micros 3.39 secs
sys time 0.23 secs 246.00 micros 0.23 secs
Finally, try the testing runner with 1 worker per process, but 8 separate processes:
❯ time python -m bytewax.testing "examples.scale:run(records=100_000_000, batch=10_000)" -w1 -p8
Worker 0: 0.54s
Worker 6: 0.55s
Worker 3: 0.55s
Worker 1: 0.55s
Worker 2: 0.56s
Worker 7: 0.58s
Worker 5: 0.59s
Worker 4: 0.60s
________________________________________________________
Executed in 798.50 millis fish external
usr time 5.19 secs 568.00 micros 5.19 secs
sys time 0.35 secs 530.00 micros 0.35 secs
This last one was a lucky run, because all workers could connect immediately to each other, in other cases you might see some delay introduced by this:
❯ time python -m bytewax.testing "examples.scale:run(records=100_000_000, batch=10_000)" -w1 -p8
worker 4: error connecting to worker 0: Connection refused (os error 111); retrying
worker 3: error connecting to worker 0: Connection refused (os error 111); retrying
worker 5: error connecting to worker 0: Connection refused (os error 111); retrying
worker 2: error connecting to worker 0: Connection refused (os error 111); retrying
worker 1: error connecting to worker 0: Connection refused (os error 111); retrying
worker 7: error connecting to worker 6: Connection refused (os error 111); retrying
Worker 0: 0.53s
Worker 1: 0.54s
Worker 6: 0.56s
Worker 4: 0.56s
Worker 2: 0.57s
Worker 5: 0.57s
Worker 7: 0.58s
Worker 3: 0.61s
________________________________________________________
Executed in 1.80 secs fish external
usr time 5.19 secs 0.00 micros 5.19 secs
sys time 0.39 secs 716.00 micros 0.39 secs
But you can see that the running time of each worker is consistently lower than in the original one.
I hope this answer your doubts, but let me know if you want me to expand on anything here.
from bytewax.
Hi @Psykopear!
Thanks a lot!
Now it is more clearer.
from bytewax.
Happy to know it was useful, I'll close this issue then, but feel free to comment here if you need anything else
from bytewax.
Related Issues (20)
- Mac cannot run `bytewax.run` with workers or processes greater than 1 HOT 6
- Make version accesible as `bytewax.__version__` from python code HOT 1
- [FEATURE] Expose window metadata HOT 1
- Python code examples in Rust code are outdated HOT 1
- Separate `epoch_interval` and `snapshot_interval`
- `count_window` only sends the count to the event clock's `dt_getter` function
- [FEATURE] Release a Python 3.12 wheel HOT 1
- Kafka group.id to manage the number of workers Bytewax Docker HOT 3
- Provide worker count and worker index in list_parts of FixedPartitionedSource HOT 2
- Allow intra-file source parallelism HOT 1
- Some mechanism for queuing batch source partition reads
- Backup interval example from docs does not work
- [FEATURE] - Add Auth to RedpandaSchemaRegistry HOT 1
- Order of streams in SessionWindow HOT 3
- RTD flyout view page source link is broken for API docs HOT 4
- Calendar windower
- Data missing from windows when `align_to` is very long ago
- [FEATURE] Make CodSpeed work HOT 3
- Inconsistent SessionWindow output HOT 8
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from bytewax.