Coder Social home page Coder Social logo

bounded_pool_executor's People

Contributors

caindrac avatar cbenz avatar gabrielbigardi avatar mowshon avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

bounded_pool_executor's Issues

Speed (bounded_pool_executor slower)

Hi

do you have any idea why BoundedThreadPoolExecutor is slower as ThreadPoolExecutor?
Machine has 4 CPUs ThreadPoolExecutor executor is using by default 5*CPU = 20 which equals the BoundedThreadPoolExecutor max_workers.

Before

with concurrent.futures.ThreadPoolExecutor() as executor:
        future_to_feature = {executor.submit(extract_features, filename): filename for filename in files}
        for future in concurrent.futures.as_completed(future_to_feature):
        ...

After

with BoundedThreadPoolExecutor(max_workers=20) as worker:
    for file in files:
    ...

ThreadPoolExecutor doesn't limit worker and only return a part of the results

Using a ThreadPoolExecutor (for my context I do it inside a django that's why I don't use a process pool cause i would need to initialize django for each process which is not optimal) and setting max_worker to like 100 doesn't limit the amount of worker summoned, my task require like 3851 workers which are generated in a sec (the task is heavy and should take at least 10 to 30secs by worker), so the results are wrong and incomplete, don't know if it's just my usage being wrong but eh i'd figured i would report that here.

Here's an example:

        with BoundedThreadPoolExecutor(max_workers=100) as worker:
            for chunk in range(chunks):
                worker.submit(chunk_method, chunk, query)

            worker.shutdown()

Is windows not supported ?

i got this error when i run the test script :

`Traceback (most recent call last):
File "", line 1, in
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
exitcode = _main(fd)
#0 Worker initialization
Traceback (most recent call last):
File "", line 1, in
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
exitcode = _main(fd)
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 114, in _main
prepare(preparation_data)
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 225, in prepare
_fixup_main_from_path(data['init_main_from_path'])
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path
run_name="mp_main")
File "C:\Program Files\Python37\lib\runpy.py", line 263, in run_path
pkg_name=pkg_name, script_name=fname)
File "C:\Program Files\Python37\lib\runpy.py", line 96, in _run_module_code
#0 Worker initialization
Traceback (most recent call last):
File "", line 1, in
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
exitcode = _main(fd)
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 114, in _main
prepare(preparation_data)
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 225, in prepare
_fixup_main_from_path(data['init_main_from_path'])
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path
run_name="mp_main")
File "C:\Program Files\Python37\lib\runpy.py", line 263, in run_path
pkg_name=pkg_name, script_name=fname)
File "C:\Program Files\Python37\lib\runpy.py", line 96, in _run_module_code
mod_name, mod_spec, pkg_name, script_name)
File "C:\Program Files\Python37\lib\runpy.py", line 85, in run_code
exec(code, run_globals)
File "C:\Users\Administrator\Downloads\Compressed\ProExp\0\jomla\test.py", line 13, in
worker.submit(do_job, num)
File "C:\Program Files\Python37\lib\site-packages\bounded_pool_executor_init
.py", line 20, in submit
future = super().submit(fn, *args, **kwargs)
File "C:\Program Files\Python37\lib\concurrent\futures\process.py", line 641, in submit
self._start_queue_management_thread()
File "C:\Program Files\Python37\lib\concurrent\futures\process.py", line 583, in _start_queue_management_thread
self._adjust_process_count()
File "C:\Program Files\Python37\lib\concurrent\futures\process.py", line 607, in _adjust_process_count
p.start()
File "C:\Program Files\Python37\lib\multiprocessing\process.py", line 112, in start
self._popen = self._Popen(self)
File "C:\Program Files\Python37\lib\multiprocessing\context.py", line 322, in _Popen
return Popen(process_obj)
File "C:\Program Files\Python37\lib\multiprocessing\popen_spawn_win32.py", line 46, in init
Traceback (most recent call last):
File "", line 1, in
mod_name, mod_spec, pkg_name, script_name)
#0 Worker initialization
prep_data = spawn.get_preparation_data(process_obj._name)
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 143, in get_preparation_data
Traceback (most recent call last):
File "", line 1, in
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 114, in _main
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
_check_not_importing_main()
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 136, in _check_not_importing_main
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
prepare(preparation_data)
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 225, in prepare
File "C:\Program Files\Python37\lib\runpy.py", line 85, in _run_code
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.

    This probably means that you are not using fork to start your
    child processes and you have forgotten to use the proper idiom
    in the main module:

        if _name_ == '_main_':
            freeze_support()
            ...

    The "freeze_support()" line can be omitted if the program
    is not going to be frozen to produce an executable.
_fixup_main_from_path(data['init_main_from_path'])

File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path
run_name="mp_main")
File "C:\Program Files\Python37\lib\runpy.py", line 263, in run_path
pkg_name=pkg_name, script_name=fname)
File "C:\Program Files\Python37\lib\runpy.py", line 96, in _run_module_code
mod_name, mod_spec, pkg_name, script_name)
File "C:\Program Files\Python37\lib\runpy.py", line 85, in run_code
exec(code, run_globals)
File "C:\Users\Administrator\Downloads\Compressed\ProExp\0\jomla\test.py", line 13, in
worker.submit(do_job, num)
File "C:\Program Files\Python37\lib\site-packages\bounded_pool_executor_init
.py", line 20, in submit
future = super().submit(fn, *args, **kwargs)
File "C:\Program Files\Python37\lib\concurrent\futures\process.py", line 641, in submit
self._start_queue_management_thread()
File "C:\Program Files\Python37\lib\concurrent\futures\process.py", line 583, in _start_queue_management_thread
self._adjust_process_count()
File "C:\Program Files\Python37\lib\concurrent\futures\process.py", line 607, in _adjust_process_count
exitcode = _main(fd)
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 114, in _main
prepare(preparation_data)
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 225, in prepare
_fixup_main_from_path(data['init_main_from_path'])
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path
run_name="mp_main")
File "C:\Program Files\Python37\lib\runpy.py", line 263, in run_path
pkg_name=pkg_name, script_name=fname)
File "C:\Program Files\Python37\lib\runpy.py", line 96, in run_module_code
mod_name, mod_spec, pkg_name, script_name)
File "C:\Program Files\Python37\lib\runpy.py", line 85, in run_code
exec(code, run_globals)
File "C:\Users\Administrator\Downloads\Compressed\ProExp\0\jomla\test.py", line 13, in
worker.submit(do_job, num)
File "C:\Program Files\Python37\lib\site-packages\bounded_pool_executor_init
.py", line 20, in submit
exec(code, run_globals)
Traceback (most recent call last):
File "test.py", line 13, in
worker.submit(do_job, num)
File "C:\Program Files\Python37\lib\site-packages\bounded_pool_executor_init
.py", line 20, in submit
future = super().submit(fn, *args, **kwargs)
File "C:\Program Files\Python37\lib\concurrent\futures\process.py", line 625, in submit
raise BrokenProcessPool(self._broken)
concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore`

Docs -

It says "BoundedProcessPoolExecutor will put a new worker in queue only when another worker has finished his work." - but what happens when there are no free workers? code blocks on this action?

Using BoundedPoolExecutor with decorator

I have defined a simple decorator to wrap my functions as a Thread using this implementation:

from concurrent.futures import ThreadPoolExecutor
from functools import wraps

_DEFAULT_POOL = ThreadPoolExecutor(max_workers=100)
def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

    return wrap

I have then refactored my implementation with yours BoundedPoolExecutor in this way:

from bounded_pool_executor import BoundedThreadPoolExecutor
from functools import wraps
_DEFAULT_POOL = BoundedThreadPoolExecutor(max_workers=100)
def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

    return wrap

Is this correct?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.