mowshon / bounded_pool_executor Goto Github PK
View Code? Open in Web Editor NEWBounded Process&Thread Pool Executor
License: MIT License
Bounded Process&Thread Pool Executor
License: MIT License
Hello,
how to use the BoundedPoolExecutor when in combination with a decorator and concurrent futures as_completed
?
See SF here for more details.
Thank you!
Hi
do you have any idea why BoundedThreadPoolExecutor is slower as ThreadPoolExecutor?
Machine has 4 CPUs ThreadPoolExecutor executor is using by default 5*CPU = 20 which equals the BoundedThreadPoolExecutor max_workers.
Before
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_feature = {executor.submit(extract_features, filename): filename for filename in files}
for future in concurrent.futures.as_completed(future_to_feature):
...
After
with BoundedThreadPoolExecutor(max_workers=20) as worker:
for file in files:
...
Using a ThreadPoolExecutor (for my context I do it inside a django that's why I don't use a process pool cause i would need to initialize django for each process which is not optimal) and setting max_worker
to like 100 doesn't limit the amount of worker summoned, my task require like 3851 workers which are generated in a sec (the task is heavy and should take at least 10 to 30secs by worker), so the results are wrong and incomplete, don't know if it's just my usage being wrong but eh i'd figured i would report that here.
Here's an example:
with BoundedThreadPoolExecutor(max_workers=100) as worker:
for chunk in range(chunks):
worker.submit(chunk_method, chunk, query)
worker.shutdown()
i got this error when i run the test script :
`Traceback (most recent call last):
File "", line 1, in
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
exitcode = _main(fd)
#0 Worker initialization
Traceback (most recent call last):
File "", line 1, in
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
exitcode = _main(fd)
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 114, in _main
prepare(preparation_data)
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 225, in prepare
_fixup_main_from_path(data['init_main_from_path'])
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path
run_name="mp_main")
File "C:\Program Files\Python37\lib\runpy.py", line 263, in run_path
pkg_name=pkg_name, script_name=fname)
File "C:\Program Files\Python37\lib\runpy.py", line 96, in _run_module_code
#0 Worker initialization
Traceback (most recent call last):
File "", line 1, in
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
exitcode = _main(fd)
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 114, in _main
prepare(preparation_data)
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 225, in prepare
_fixup_main_from_path(data['init_main_from_path'])
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path
run_name="mp_main")
File "C:\Program Files\Python37\lib\runpy.py", line 263, in run_path
pkg_name=pkg_name, script_name=fname)
File "C:\Program Files\Python37\lib\runpy.py", line 96, in _run_module_code
mod_name, mod_spec, pkg_name, script_name)
File "C:\Program Files\Python37\lib\runpy.py", line 85, in run_code
exec(code, run_globals)
File "C:\Users\Administrator\Downloads\Compressed\ProExp\0\jomla\test.py", line 13, in
worker.submit(do_job, num)
File "C:\Program Files\Python37\lib\site-packages\bounded_pool_executor_init.py", line 20, in submit
future = super().submit(fn, *args, **kwargs)
File "C:\Program Files\Python37\lib\concurrent\futures\process.py", line 641, in submit
self._start_queue_management_thread()
File "C:\Program Files\Python37\lib\concurrent\futures\process.py", line 583, in _start_queue_management_thread
self._adjust_process_count()
File "C:\Program Files\Python37\lib\concurrent\futures\process.py", line 607, in _adjust_process_count
p.start()
File "C:\Program Files\Python37\lib\multiprocessing\process.py", line 112, in start
self._popen = self._Popen(self)
File "C:\Program Files\Python37\lib\multiprocessing\context.py", line 322, in _Popen
return Popen(process_obj)
File "C:\Program Files\Python37\lib\multiprocessing\popen_spawn_win32.py", line 46, in init
Traceback (most recent call last):
File "", line 1, in
mod_name, mod_spec, pkg_name, script_name)
#0 Worker initialization
prep_data = spawn.get_preparation_data(process_obj._name)
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 143, in get_preparation_data
Traceback (most recent call last):
File "", line 1, in
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 114, in _main
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
_check_not_importing_main()
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 136, in _check_not_importing_main
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
prepare(preparation_data)
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 225, in prepare
File "C:\Program Files\Python37\lib\runpy.py", line 85, in _run_code
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if _name_ == '_main_':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
_fixup_main_from_path(data['init_main_from_path'])
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path
run_name="mp_main")
File "C:\Program Files\Python37\lib\runpy.py", line 263, in run_path
pkg_name=pkg_name, script_name=fname)
File "C:\Program Files\Python37\lib\runpy.py", line 96, in _run_module_code
mod_name, mod_spec, pkg_name, script_name)
File "C:\Program Files\Python37\lib\runpy.py", line 85, in run_code
exec(code, run_globals)
File "C:\Users\Administrator\Downloads\Compressed\ProExp\0\jomla\test.py", line 13, in
worker.submit(do_job, num)
File "C:\Program Files\Python37\lib\site-packages\bounded_pool_executor_init.py", line 20, in submit
future = super().submit(fn, *args, **kwargs)
File "C:\Program Files\Python37\lib\concurrent\futures\process.py", line 641, in submit
self._start_queue_management_thread()
File "C:\Program Files\Python37\lib\concurrent\futures\process.py", line 583, in _start_queue_management_thread
self._adjust_process_count()
File "C:\Program Files\Python37\lib\concurrent\futures\process.py", line 607, in _adjust_process_count
exitcode = _main(fd)
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 114, in _main
prepare(preparation_data)
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 225, in prepare
_fixup_main_from_path(data['init_main_from_path'])
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path
run_name="mp_main")
File "C:\Program Files\Python37\lib\runpy.py", line 263, in run_path
pkg_name=pkg_name, script_name=fname)
File "C:\Program Files\Python37\lib\runpy.py", line 96, in run_module_code
mod_name, mod_spec, pkg_name, script_name)
File "C:\Program Files\Python37\lib\runpy.py", line 85, in run_code
exec(code, run_globals)
File "C:\Users\Administrator\Downloads\Compressed\ProExp\0\jomla\test.py", line 13, in
worker.submit(do_job, num)
File "C:\Program Files\Python37\lib\site-packages\bounded_pool_executor_init.py", line 20, in submit
exec(code, run_globals)
Traceback (most recent call last):
File "test.py", line 13, in
worker.submit(do_job, num)
File "C:\Program Files\Python37\lib\site-packages\bounded_pool_executor_init.py", line 20, in submit
future = super().submit(fn, *args, **kwargs)
File "C:\Program Files\Python37\lib\concurrent\futures\process.py", line 625, in submit
raise BrokenProcessPool(self._broken)
concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore`
It says "BoundedProcessPoolExecutor will put a new worker in queue only when another worker has finished his work." - but what happens when there are no free workers? code blocks on this action?
It would be great if you could push to the pypi repository with the new merge.
I have defined a simple decorator to wrap my functions as a Thread
using this implementation:
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
_DEFAULT_POOL = ThreadPoolExecutor(max_workers=100)
def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)
return wrap
I have then refactored my implementation with yours BoundedPoolExecutor
in this way:
from bounded_pool_executor import BoundedThreadPoolExecutor
from functools import wraps
_DEFAULT_POOL = BoundedThreadPoolExecutor(max_workers=100)
def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)
return wrap
Is this correct?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.