Hi, I found the case when I want to cancel main starting coroutine (and await it's cancellation) before the cancellation of all other pending loop's tasks.
Perhaps, it's not the issue of aiorun
package directly, but let me ask advice how to deal with such case (if it can be solved using aiorun
).
The case can be reproduced by using code below:
import argparse
import asyncio
import logging
import signal
from aiorun import run
class SomeThirdPartyRunner:
def __init__(self):
self._queue = list(range(5))
# actually, I can't use `shutdown_waits_for` below because it's 3'rd party package
self._work_task = asyncio.create_task(self._work())
self._work_task_done = asyncio.Future()
async def _work(self):
try:
while self._queue:
logging.info(f'processing queue={self._queue}')
self._queue.pop()
await asyncio.sleep(1)
except asyncio.CancelledError:
logging.info(' *** we are here only when aiorun is used *** ')
finally:
self._work_task_done.set_result(None)
async def wait_done(self):
await self._work_task_done
async def corofn():
runner = SomeThirdPartyRunner()
try:
await asyncio.sleep(2)
except asyncio.CancelledError:
pass
finally:
logging.info('stopping runner...')
await runner.wait_done()
logging.info('runner stopped')
def run_through_aiorun():
run(_aiorun_main())
async def _aiorun_main():
await corofn()
def run_through_asyncio():
asyncio.run(_asyncio_main())
async def _asyncio_main():
loop = asyncio.get_event_loop()
task = loop.create_task(corofn())
task.add_done_callback(lambda _: loop.stop())
loop.add_signal_handler(signal.SIGINT, task.cancel)
loop.add_signal_handler(signal.SIGTERM, task.cancel)
await task
if __name__ == '__main__':
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - [%(levelname)s] - [%(name)s] "
"- %(filename)s:%(lineno)d - %(message)s",
)
parser = argparse.ArgumentParser()
parser.add_argument('--aiorun',
action='store_true',
help='Run script using aiorun package')
args = parser.parse_args()
if args.aiorun:
logging.info('Running using aiorun')
run_through_aiorun()
else:
logging.info('Running using asyncio')
run_through_asyncio()
Starting script using aiorun
and press Ctrl+C on 2'nd message processing queue
:
(env_3_8) MacBook-Pro-2:~ fedir$ python script.py --aiorun
[INFO] - [root] - script.py:83 - Running using aiorun
[DEBUG] - [aiorun] - aiorun.py:155 - Entering run()
[DEBUG] - [asyncio] - selector_events.py:59 - Using selector: KqueueSelector
[DEBUG] - [aiorun] - aiorun.py:236 - Creating default executor
[INFO] - [root] - script.py:18 - processing queue=[0, 1, 2, 3, 4]
[INFO] - [root] - script.py:18 - processing queue=[0, 1, 2, 3]
^C[DEBUG] - [aiorun] - aiorun.py:304 - Entering shutdown handler
[CRITICAL] - [aiorun] - aiorun.py:317 - Stopping the loop
[INFO] - [aiorun] - aiorun.py:249 - Entering shutdown phase.
[INFO] - [aiorun] - aiorun.py:262 - Cancelling pending tasks.
[DEBUG] - [aiorun] - aiorun.py:264 - Cancelling task: <Task pending name='Task-1' coro=<run.<locals>.new_coro() running at /Users/fedir/env/env_3_8/lib/python3.8/site-packages/aiorun.py:206> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10a00fdf0>()]>>
[DEBUG] - [aiorun] - aiorun.py:264 - Cancelling task: <Task pending name='Task-2' coro=<SomeThirdPartyRunner._work() running at script.py:20> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10a00fe50>()]>>
[INFO] - [aiorun] - aiorun.py:276 - Running pending tasks till complete
[INFO] - [root] - script.py:40 - stopping runner...
[INFO] - [root] - script.py:22 - *** we are here only when aiorun is used ***
[INFO] - [root] - script.py:42 - runner stopped
[INFO] - [aiorun] - aiorun.py:281 - Waiting for executor shutdown.
[INFO] - [aiorun] - aiorun.py:286 - Shutting down async generators
[INFO] - [aiorun] - aiorun.py:288 - Closing the loop.
[INFO] - [aiorun] - aiorun.py:290 - Leaving. Bye!
Desired result for me in this case is do not cancel corofn()
along with SomeThirdPartyRunner._work()
.
Starting script using asyncio
and press Ctrl+C on the same log msg:
(env_3_8) MacBook-Pro-2:~ fedir$ python script.py
[INFO] - [root] - script.py:86 - Running using asyncio
[DEBUG] - [asyncio] - selector_events.py:59 - Using selector: KqueueSelector
[INFO] - [root] - script.py:18 - processing queue=[0, 1, 2, 3, 4]
[INFO] - [root] - script.py:18 - processing queue=[0, 1, 2, 3]
^C[INFO] - [root] - script.py:40 - stopping runner...
[INFO] - [root] - script.py:18 - processing queue=[0, 1, 2]
[INFO] - [root] - script.py:18 - processing queue=[0, 1]
[INFO] - [root] - script.py:18 - processing queue=[0]
[INFO] - [root] - script.py:42 - runner stopped
I've done some updates into aiorun
to receive expected result:
https://github.com/cjrh/aiorun/blob/master/aiorun.py#L210
_origin_coro_task = loop.create_task(new_coro())
and then on the start of "Entering the shutdown phase":
https://github.com/cjrh/aiorun/blob/master/aiorun.py#L249
if _origin_coro_task is not None:
logger.debug("Cancelling origin coro task: %s", _origin_coro_task)
_origin_coro_task.cancel()
loop.run_until_complete(_origin_coro_task)
And received expected result below:
(env_3_8) MacBook-Pro-2:~ fedir$ python script.py --aiorun
[INFO] - [root] - script.py:83 - Running using aiorun
[DEBUG] - [aiorun] - aiorun.py:155 - Entering run()
[DEBUG] - [asyncio] - selector_events.py:59 - Using selector: KqueueSelector
[DEBUG] - [aiorun] - aiorun.py:237 - Creating default executor
[INFO] - [root] - script.py:18 - processing queue=[0, 1, 2, 3, 4]
[INFO] - [root] - script.py:18 - processing queue=[0, 1, 2, 3]
^C[DEBUG] - [aiorun] - aiorun.py:309 - Entering shutdown handler
[CRITICAL] - [aiorun] - aiorun.py:322 - Stopping the loop
[INFO] - [aiorun] - aiorun.py:250 - Entering shutdown phase.
[DEBUG] - [aiorun] - aiorun.py:252 - Cancelling origin coro task: <Task pending name='Task-1' coro=<run.<locals>.new_coro() running at /Users/fedir/env/env_3_8/lib/python3.8/site-packages/aiorun.py:207> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x105197d90>()]>>
[INFO] - [root] - script.py:40 - stopping runner...
[INFO] - [root] - script.py:18 - processing queue=[0, 1, 2]
[INFO] - [root] - script.py:18 - processing queue=[0, 1]
[INFO] - [root] - script.py:18 - processing queue=[0]
[INFO] - [root] - script.py:42 - runner stopped
[INFO] - [aiorun] - aiorun.py:267 - Cancelling pending tasks.
[INFO] - [aiorun] - aiorun.py:281 - Running pending tasks till complete
[INFO] - [aiorun] - aiorun.py:286 - Waiting for executor shutdown.
[INFO] - [aiorun] - aiorun.py:291 - Shutting down async generators
[INFO] - [aiorun] - aiorun.py:293 - Closing the loop.
[INFO] - [aiorun] - aiorun.py:295 - Leaving. Bye!
It would be great to receive expected result using the current aiorun functionality. Thx in advance for feedback.
(env versions)
(env_3_8) MacBook-Pro-2:~ fedir$ python -V
Python 3.8.1
(env_3_8) MacBook-Pro-2:~ fedir$ pip freeze | grep aiorun
aiorun==2020.1.3
P.S. I can provide more practical example of such behaviour if needed (using the combination of aiorun
and aiokafka
packages).