Coder Social home page Coder Social logo

Comments (3)

Psykopear avatar Psykopear commented on June 6, 2024

Hi, that's some good observation, and in the specific case you posted, it's also the expected behavior.

Bytewax can parallelize your dataflow, but multiple python threads in the same process will still share the same GIL.

There are a couple of things that I'd correct on the input source as well, but they won't change the result that much. But just as an advice, if you want to create a number of partitions that matches the dynamic number of workers, you should probably use DynamicSource as a base class. Something like this:

class TestingPartition(StatelessSourcePartition):
    def __init__(self, it, batch_size):
        self.it = it
        self.batch_size = batch_size

    def next_batch(self):
        return [next(self.it) for i in range(self.batch_size)]


class TestingSource(DynamicSource):
    def __init__(self, num_records, batch_size = 1):
        print(f"Created source for input: {threading.get_ident()}")
        self._num_records = num_records
        self._batch_size = batch_size

    def build(self, worker_index, worker_count):
        records_per_part = self._num_records // worker_count
        print(f"Created part for input: [thread: {threading.get_ident()},worker_index: {worker_index}, part_records: {records_per_part}]")
        return TestingPartition(iter(range(records_per_part)), self._batch_size)

The point is that if each partition runs on a different thread that uses the same python processes, the next_batch calls won't be able to run concurrently, because they will be fighting over trying to acquire the same lock.

Now, if instead of doing that you were doing any operation that releases the GIL while ongoing, like making a web request, you would see a real increase in performance. Try something like this:

import bytewax.operators as op
import requests
from bytewax.dataflow import Dataflow
from bytewax.inputs import DynamicSource, StatelessSourcePartition
from bytewax.outputs import DynamicSink, StatelessSinkPartition


class _NullSink(StatelessSinkPartition):
    def write_batch(self, items):
        pass


class NullOutput(DynamicSink):
    def build(self, worker_index, worker_count):
        return _NullSink()


class TestingPartition(StatelessSourcePartition):
    def __init__(self, num):
        self.num = num
        self.counter = 0

    def next_batch(self):
        self.counter += 1
        if self.counter > self.num:
            raise StopIteration()
        # Add your url here, beware of the number of calls you make
        return [requests.get("<an-url>")]


class TestingSource(DynamicSource):
    def __init__(self, num_records):
        self._num_records = num_records

    def build(self, worker_index, worker_count):
        records_per_part = self._num_records // worker_count
        return TestingPartition(records_per_part)


def run(records=10):
    flow = Dataflow('dataflow')
    stream = op.input('in', flow, TestingSource(records))
    op.output('out', stream, NullOutput())
    return flow

Running it with a single worker I get ~3s, running with 8 workers it takes ~0.8s and with 16 workers only ~0.1s.

But as you noticed, not all work can run free of the GIL for most of the time.
When that's the case, you should use bytewax's ability to run separate processes instead of multiple workers in the same process. We have a testing runner to try this easily, but follow the guide in the documentation once you want a production setup.

Running multiple processes can face an initial delay, while all workers try to connect to each other. So a short running task might take more time than running a single process, but you can measure the improvement by measuring for example the time it takes between the first and last message processed. Let's try to modify your code to show that:

import time

import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.inputs import DynamicSource, StatelessSourcePartition
from bytewax.outputs import DynamicSink, StatelessSinkPartition


class _NullSink(StatelessSinkPartition):
    def write_batch(self, items):
        pass

class NullOutput(DynamicSink):
    def __init__(self, **config):
        self._config = config

    def build(self, worker_index, worker_count):
        return _NullSink()


class TestingPartition(StatelessSourcePartition):
    def __init__(self, it, batch_size):
        self.it = it
        self.batch_size = batch_size
        self.start = None

    def next_batch(self):
        # First time this is called we set self.start
        if self.start is None:
            self.start = time.time()
        # If `next(self.it)` raises `StopIteration`, it
        # means we consumed our iterator. Print elapsed
        # time and reraise so the dataflow can stop.
        try:
            return [next(self.it) for i in range(self.batch_size)]
        except StopIteration as e:
            print(f"Elapsed: {time.time() - self.start:.2f}s")
            raise e

class TestingSource(DynamicSource):
    def __init__(self, num_records, batch_size = 1):
        self._num_records = num_records
        self._batch_size = batch_size

    def build(self, worker_index, worker_count):
        records_per_part = self._num_records // worker_count
        return TestingPartition(iter(range(records_per_part)), self._batch_size)


def run(records=1000000, batch=1):
    flow = Dataflow('dataflow')
    stream = op.input('in', flow, TestingSource(records, batch))
    op.output('out', stream, NullOutput())
    return flow

And now try to run it.

This is the baseline, running with a single worker and single process, 100M records, 10k batch:

❯ time python -m bytewax.run "examples.scale:run(records=100_000_000, batch=10_000)"
Worker 0: 3.21s

________________________________________________________
Executed in    3.28 secs    fish           external
   usr time    3.26 secs    0.00 millis    3.26 secs
   sys time    0.02 secs    4.86 millis    0.02 secs

And here is running it with 8 workers. You can see that some workers manage to finish early, but overall the total time spent is very similar:

❯ time python -m bytewax.run "examples.scale:run(records=100_000_000, batch=10000)" -w8
Worker 7: 1.53s
Worker 5: 1.87s
Worker 2: 2.44s
Worker 0: 2.29s
Worker 3: 2.93s
Worker 6: 2.95s
Worker 1: 3.18s
Worker 4: 3.39s

________________________________________________________
Executed in    3.47 secs    fish           external
   usr time    3.39 secs  263.00 micros    3.39 secs
   sys time    0.23 secs  246.00 micros    0.23 secs

Finally, try the testing runner with 1 worker per process, but 8 separate processes:

❯ time python -m bytewax.testing "examples.scale:run(records=100_000_000, batch=10_000)" -w1 -p8
Worker 0: 0.54s
Worker 6: 0.55s
Worker 3: 0.55s
Worker 1: 0.55s
Worker 2: 0.56s
Worker 7: 0.58s
Worker 5: 0.59s
Worker 4: 0.60s

________________________________________________________
Executed in  798.50 millis    fish           external
   usr time    5.19 secs    568.00 micros    5.19 secs
   sys time    0.35 secs    530.00 micros    0.35 secs

This last one was a lucky run, because all workers could connect immediately to each other, in other cases you might see some delay introduced by this:

❯ time python -m bytewax.testing "examples.scale:run(records=100_000_000, batch=10_000)" -w1 -p8
worker 4:	error connecting to worker 0: Connection refused (os error 111); retrying
worker 3:	error connecting to worker 0: Connection refused (os error 111); retrying
worker 5:	error connecting to worker 0: Connection refused (os error 111); retrying
worker 2:	error connecting to worker 0: Connection refused (os error 111); retrying
worker 1:	error connecting to worker 0: Connection refused (os error 111); retrying
worker 7:	error connecting to worker 6: Connection refused (os error 111); retrying
Worker 0: 0.53s
Worker 1: 0.54s
Worker 6: 0.56s
Worker 4: 0.56s
Worker 2: 0.57s
Worker 5: 0.57s
Worker 7: 0.58s
Worker 3: 0.61s

________________________________________________________
Executed in    1.80 secs    fish           external
   usr time    5.19 secs    0.00 micros    5.19 secs
   sys time    0.39 secs  716.00 micros    0.39 secs

But you can see that the running time of each worker is consistently lower than in the original one.

I hope this answer your doubts, but let me know if you want me to expand on anything here.

from bytewax.

szhem avatar szhem commented on June 6, 2024

Hi @Psykopear!

Thanks a lot!
Now it is more clearer.

from bytewax.

Psykopear avatar Psykopear commented on June 6, 2024

Happy to know it was useful, I'll close this issue then, but feel free to comment here if you need anything else

from bytewax.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.