Supports async / await pattern for CPU-bound operations.
- Support async / await pattern for CPU-bound operations
- Free from handling event loop
The async / await syntax makes asynchronous code as:
- Simple
- Readable
This syntax is not only for I/O-bound but also CPU-bound operations.
This package supports Coroutine
function to run in ProcessPoolExecutor
and returns Awaitable
object.
asyncio
is focusing not CPU-bound but I/O-bound operations.
High-level APIs of asyncio
doesn't support CPU-bound operations
since it works based on not ProcessPoolExecutor
but ThreadPoolExecutor
.
When we want to run CPU-bound operations concurrently with asyncio
,
we need to use Low-level APIs which need finer control over the event loop behavior.
Application developers should typically use the High-level asyncio
functions, such as asyncio.run(),
and should rarely need to reference Low-level APIs, such as the Event Loop object or call its methods.
See: Event Loop — Python 3 documentation
pip install asynccpu
This package provides ProcessTaskPoolExecutor
extends ProcessPoolExecutor
,
And its instance has the method: create_process_task()
.
Ex:
import asyncio
from asynccpu import ProcessTaskPoolExecutor
async def process_cpu_bound(task_id):
"""
CPU-bound operations will block the event loop:
in general it is preferable to run them in a process pool.
"""
return f"task_id: {task_id}, result: {sum(i * i for i in range(10 ** 7))}"
with ProcessTaskPoolExecutor(max_workers=3, cancel_tasks_when_shutdown=True) as executor:
awaitables = {executor.create_process_task(process_cpu_bound, x) for x in range(10)}
results = await asyncio.gather(*awaitables)
for result in results:
print(result)
The argument of Coroutine
requires not "raw Coroutine
object" but "Coroutine
function"
since raw Coroutine
object is not picklable.
This specification is depend on the one of Python multiprocessing
package:
multiprocessing — Process-based parallelism
Note When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe.
See: Answer: Python multiprocessing PicklingError: Can't pickle <type 'function'> - Stack Overflow
Ex:
import asyncio
import multiprocessing
from logging import DEBUG, Formatter, StreamHandler, getLogger, handlers
from asynccpu import ProcessTaskPoolExecutor
def listener_configurer():
console_handler = StreamHandler()
console_handler.setFormatter(Formatter("[%(levelname)s/%(processName)s] %(message)s"))
# Key Point 4
return handlers.QueueListener(queue, console_handler)
def worker_configurer():
root_logger = getLogger()
root_logger.setLevel(DEBUG)
with multiprocessing.Manager() as manager:
# Key Point 2
queue = manager.Queue()
listener = listener_configurer()
listener.start()
with ProcessTaskPoolExecutor(
max_workers=3,
cancel_tasks_when_shutdown=True,
# Key Point 1
queue=queue,
# Key Point 3
configurer=worker_configurer
) as executor:
futures = {executor.create_process_task(process_cpu_bound, x) for x in range(10)}
return await asyncio.gather(*futures)
listener.stop()
This implementation is based on following document:
Logging to a single file from multiple processes | Logging Cookbook — Python 3 documentation
- Inject special
queue.Queue
object into subprocess - Create special
queue.Queue
object viamultiprocessing.Manager
- Inject configurer to configure logger for Windows
- Consider to use
logging.handlers.QueueListener
We can capture logs from subprocess via queue.Queue
object.
logging.handlers.QueueHandler
is often used for multi-threaded, multi-process code logging.
See: Logging Cookbook — Python 3 documentation
ProcessTaskPoolExecutor
automatically set queue
argument into root logger as logging.handlers.QueueHandler
if queue
argument is set.
2. Create special queue.Queue
object via multiprocessing.Manager
We have to create queue.Queue
object via multiprocessing.Manager
due to limitation of ProcessPoolExecutor
running inside,
otherwise, following error raised when refer queue argument in child process:
RuntimeError: Queue objects should only be shared between processes through inheritance
multiprocessing.Manager
instantiates special queue.Queue
object (Proxy Object).
See:
- Using concurrent.futures.ProcessPoolExecutor | Logging Cookbook — Python 3 documentation
- Proxy Objects | multiprocessing — Process-based parallelism — Python 3 documentation
On POSIX, subprocess will share loging configuration with parent process by process fork semantics. On Windows you can't rely on fork semantics, so each process requires to run the logging configuration code when it starts.
ProcessTaskPoolExecutor
will automatically execute configurer
argument
before starting Coroutine
function.
This design is based on following document:
Logging to a single file from multiple processes | Logging Cookbook — Python 3 documentation
For instance, this allows us to set log level in subprocess on Windows.
Note that configuring root logger in subprocess seems to effect parent process on POSIX.
4. Consider to use logging.handlers.QueueListener
We don't have to create an implementation on the Listener process from scratch, we can use it right away with logging.handlers.QueueListener
.
This package was created with Cookiecutter and the yukihiko-shinoda/cookiecutter-pypackage project template.