Coder Social home page Coder Social logo

asynccpu's Introduction

Asynchronous CPU

Test Test Coverage Maintainability Code Climate technical debt Updates Python versions Twitter URL

Supports async / await pattern for CPU-bound operations.

Advantage

  1. Support async / await pattern for CPU-bound operations
  2. Free from handling event loop

1. Support async / await pattern for CPU-bound operations

The async / await syntax makes asynchronous code as:

  • Simple
  • Readable

This syntax is not only for I/O-bound but also CPU-bound operations. This package supports Coroutine function to run in ProcessPoolExecutor and returns Awaitable object.

2. Free from handling event loop

asyncio is focusing not CPU-bound but I/O-bound operations. High-level APIs of asyncio doesn't support CPU-bound operations since it works based on not ProcessPoolExecutor but ThreadPoolExecutor. When we want to run CPU-bound operations concurrently with asyncio, we need to use Low-level APIs which need finer control over the event loop behavior.

Application developers should typically use the High-level asyncio functions, such as asyncio.run(), and should rarely need to reference Low-level APIs, such as the Event Loop object or call its methods.

See: Event Loop — Python 3 documentation

Quickstart

1. Install

pip install asynccpu

2. Implement

This package provides ProcessTaskPoolExecutor extends ProcessPoolExecutor, And its instance has the method: create_process_task().

Ex:

import asyncio
from asynccpu import ProcessTaskPoolExecutor


async def process_cpu_bound(task_id):
    """
    CPU-bound operations will block the event loop:
    in general it is preferable to run them in a process pool.
    """
    return f"task_id: {task_id}, result: {sum(i * i for i in range(10 ** 7))}"


with ProcessTaskPoolExecutor(max_workers=3, cancel_tasks_when_shutdown=True) as executor:
    awaitables = {executor.create_process_task(process_cpu_bound, x) for x in range(10)}
    results = await asyncio.gather(*awaitables)
    for result in results:
        print(result)

Note

The argument of Coroutine requires not "raw Coroutine object" but "Coroutine function" since raw Coroutine object is not picklable.

This specification is depend on the one of Python multiprocessing package:

multiprocessing — Process-based parallelism

Note When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe.

See: Answer: Python multiprocessing PicklingError: Can't pickle <type 'function'> - Stack Overflow

How do I...

Capture log from subprocess?

Ex:

import asyncio
import multiprocessing
from logging import DEBUG, Formatter, StreamHandler, getLogger, handlers
from asynccpu import ProcessTaskPoolExecutor


def listener_configurer():
    console_handler = StreamHandler()
    console_handler.setFormatter(Formatter("[%(levelname)s/%(processName)s] %(message)s"))
    # Key Point 4
    return handlers.QueueListener(queue, console_handler)


def worker_configurer():
    root_logger = getLogger()
    root_logger.setLevel(DEBUG)


with multiprocessing.Manager() as manager:
    # Key Point 2
    queue = manager.Queue()
    listener = listener_configurer()
    listener.start()
    with ProcessTaskPoolExecutor(
        max_workers=3,
        cancel_tasks_when_shutdown=True,
        # Key Point 1
        queue=queue,
        # Key Point 3
        configurer=worker_configurer
    ) as executor:
        futures = {executor.create_process_task(process_cpu_bound, x) for x in range(10)}
        return await asyncio.gather(*futures)
    listener.stop()

This implementation is based on following document:

Logging to a single file from multiple processes | Logging Cookbook — Python 3 documentation

Key Points

  1. Inject special queue.Queue object into subprocess
  2. Create special queue.Queue object via multiprocessing.Manager
  3. Inject configurer to configure logger for Windows
  4. Consider to use logging.handlers.QueueListener
1. Inject special queue.Queue object into subprocess

We can capture logs from subprocess via queue.Queue object. logging.handlers.QueueHandler is often used for multi-threaded, multi-process code logging.

See: Logging Cookbook — Python 3 documentation

ProcessTaskPoolExecutor automatically set queue argument into root logger as logging.handlers.QueueHandler if queue argument is set.

2. Create special queue.Queue object via multiprocessing.Manager

We have to create queue.Queue object via multiprocessing.Manager due to limitation of ProcessPoolExecutor running inside, otherwise, following error raised when refer queue argument in child process:

RuntimeError: Queue objects should only be shared between processes through inheritance

multiprocessing.Manager instantiates special queue.Queue object (Proxy Object).

See:

3. Inject configurer to configure logger for Windows

On POSIX, subprocess will share loging configuration with parent process by process fork semantics. On Windows you can't rely on fork semantics, so each process requires to run the logging configuration code when it starts.

ProcessTaskPoolExecutor will automatically execute configurer argument before starting Coroutine function.

This design is based on following document:

Logging to a single file from multiple processes | Logging Cookbook — Python 3 documentation

For instance, this allows us to set log level in subprocess on Windows.

Note that configuring root logger in subprocess seems to effect parent process on POSIX.

We don't have to create an implementation on the Listener process from scratch, we can use it right away with logging.handlers.QueueListener.

Credits

This package was created with Cookiecutter and the yukihiko-shinoda/cookiecutter-pypackage project template.

asynccpu's People

Contributors

yukihiko-shinoda avatar

Stargazers

Karako avatar nikkie avatar Paul Everitt avatar  avatar KEVIN avatar

Watchers

 avatar

asynccpu's Issues

Running the example

The example does not work

Environment

  • Asynchronous CPU version: 1.2.2
  • Python version: 3.10.5
  • Operating System: Windows 10

Description

I try to execute the first example in the readme:

import asyncio
from asynccpu import ProcessTaskPoolExecutor

async def process_cpu_bound(task_id):
    """
    CPU-bound operations will block the event loop:
    in general it is preferable to run them in a process pool.
    """
    return f"task_id: {task_id}, result: {sum(i * i for i in range(10 ** 7))}"

with ProcessTaskPoolExecutor(max_workers=3, cancel_tasks_when_shutdown=True) as executor:
    awaitables = {executor.create_process_task(process_cpu_bound, x) for x in range(10)}
    results = await asyncio.gather(*awaitables)
    for result in results:
        print(result)

Yet it fails with:

    results = await asyncio.gather(*awaitables)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
SyntaxError: 'await' outside function

I also get an error if I enclose the with block of code in a synchronous or asynchronous function. In a synchronous function:

def main():
    with ProcessTaskPoolExecutor(max_workers=3, cancel_tasks_when_shutdown=True) as executor:
        awaitables = {executor.create_process_task(process_cpu_bound, x) for x in range(10)}
        results = await asyncio.gather(*awaitables)
        for result in results:
            print(result)

if __name__ == '__main__':
    main()

it gives:

     results = await asyncio.gather(*awaitables)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
SyntaxError: 'await' outside async function

In an asynchronous function:

async def main():
    with ProcessTaskPoolExecutor(max_workers=3, cancel_tasks_when_shutdown=True) as executor:
        awaitables = {executor.create_process_task(process_cpu_bound, x) for x in range(10)}
        results = await asyncio.gather(*awaitables)
        for result in results:
            print(result)

if __name__ == '__main__':
    main()

it gives:

RuntimeWarning: coroutine 'main' was never awaited
  main()

and no results are displayed.

Initial Update

The bot created this issue to inform you that pyup.io has been set up on this repo.
Once you have closed it, the bot will open pull requests for updates as soon as they are available.

SyntaxError: 'return' outside function

Title

Case when trouble

  • Asynchronous CPU version: 1.2.2
  • Python version: 3.10.12
  • Operating System: ubuntu
  • environment: google colab

Description

File ["<ipython-input-10-c39c2bfb9b8e>"](https://localhost:8080/#), line 30
    return await asyncio.gather(*futures)
    ^
SyntaxError: 'return' outside function

What I Did

test code from pypi

import asyncio
import multiprocessing
from logging import DEBUG, Formatter, StreamHandler, getLogger, handlers
from asynccpu import ProcessTaskPoolExecutor

def listener_configurer():
    console_handler = StreamHandler()
    console_handler.setFormatter(Formatter("[%(levelname)s/%(processName)s] %(message)s"))
    # Key Point 4
    return handlers.QueueListener(queue, console_handler)

def worker_configurer():
    root_logger = getLogger()
    root_logger.setLevel(DEBUG)

with multiprocessing.Manager() as manager:
    # Key Point 2
    queue = manager.Queue()
    listener = listener_configurer()
    listener.start()
    with ProcessTaskPoolExecutor(
        max_workers=3,
        cancel_tasks_when_shutdown=True,
        # Key Point 1
        queue=queue,
        # Key Point 3
        configurer=worker_configurer
    ) as executor:
        futures = {executor.create_process_task(process_cpu_bound, x) for x in range(10)}
        return await asyncio.gather(*futures)
    listener.stop()```

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.