Coder Social home page Coder Social logo

async-service's People

Contributors

carver avatar cburgdorf avatar davesque avatar gsalgado avatar njgheorghita avatar pipermerriam avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

async-service's Issues

Should `ManagerAPI.cancel()` raise an error if the service is already stopping or finished

from: #31 (comment)

What is wrong

Currently, #31 introduces the rule that if a service is already stopping or finished that calling ManagerAPI.cancel() will return early without doing anything.

Should this instead raise a LifecycleError?

The counter argument to this would be a case where we want to be able to blindly call manager.cancel(). If we choose to raise an exception then the call site would have to do this conditionally which adds boilerplate.

The argument for this is that removes the ambiguity where ManagerAPI.is_cancelled will not always be True after a call to Manager.cancel().

How can it be fixed

Decide which of the above is more important.

Need `external_api` decorator for trio service.

What is wrong

We have an external_api decorator that can be used within an asyncio context.

We need an external_api decorator that can be used with in a trio context

How to fix

Either write one that works only in trio or better, write one that works in either context.

Child services behavior with respect to cancellations.

What is wrong

Currently when you use Manager.run_child_service or Manager.run_child_daemon_service it simply delegates to Manager.run_task to do this actual service running.

The cancellation semantics for services are that they will only register as is_cancelled if their cancel() method was called.

The desired behavior would be for all actively running child services to also register as is_cancelled if their parent service gets cancelled. Currently this is not the case as they will just be cancelled by an injected cancellation exception.

How to fix

We need to alter how child services are run such that when cancel() is called on the parent, the child service is cancelled via calling cancel() on the respective manager.

We also need to ensure that the child service is cancelled when it normally would based on the task DAG.

Race condition between local errors and child service errors

What is wrong.

Here is a test case that will fail on average 50% of the time.

@pytest.mark.trio
async def test_trio_service_with_weird_alex_error():
    ready_daemon_exit = trio.Event()
    daemon_exit = trio.Event()

    class ChildService(Service):
        async def run(self):
            await ready_daemon_exit.wait()
            daemon_exit.set()
            raise RuntimeError('arst')

    child = ChildService()

    class MainService(Service):
        async def run(self) -> None:
            self.manager.run_child_service(child)
            ready_daemon_exit.set()
            await daemon_exit.wait()
            self.manager.cancel()

    service = MainService()
    with pytest.raises(RuntimeError):
        await TrioManager.run_service(service)

The reason for the failure is a race condition between the child service's internal mechanisms registering the error and reporting it, and the parent service cancelling the child service and exiting before the child service is able to report the error.

in my tests, the child service does reliably observe the error in the child task, however it gets cancelled before it has a chance to report it.

How can it be fixed

This is related to #41

My rough experimentation suggests there are two things that need to be done for this to be fixed.

  1. The DaemonTaskExit exception should be reported via the same mechanism as task errors.
try:
    raise DaemonTaskExit
except DaemonTaskExit:
    self._errors.append(...)

This change keeps the exception from directly triggering trio's cancellation mechanisms which is needed to prevent the cancellation from immediately being pushed to everything running under the service that the DaemonTaskExit happened in.

  1. When cancelling tasks, if the task represents the running of a child service, then we need to first do an await manager.stop() on the child service's manager before triggering the task's cancel scope. This ensures that the child service doesn't get interrupted before being able to report errors from it's child tasks.

Require use of `external_api` decorator

What was wrong?

Right now we have a version of external_api for both asyncio and trio. These decorators only apply to coroutine functions and there is no enforcement that would require implementers of a service to use this decorator.

Similarly, if an external method schedules a task, we put this new task as a new root node in the task DAG. Such a method could be either synchronous or async since run_task is a synchronous function.

There is no decorator equivalent to external_api that can be applied to synchronous functions.

This leads to the API having a set of "best practices" but little to no enforcement of those best practices.

How can it be fixed.

I think the ideal API is one that:

  • can be applied to both synchronous and asynchronous APIs.
  • one that restricts calls to ManagerAPI.run_task to only internal calls within the service and external API calls which have the decorator.
  • still allows external methods that are not decorated to access things like self.manager.is_running, etc for convenience functions.

I think this can be done but I'm also worried that there will be edge cases we can't cover (but they may not be a very big deal).

External service call that runs a task throws an exception

What is wrong

If a Service class exposes an API that calls self.manager.run_task the call will fail.

class MyService(Service):
    def schedule_thing(self):
        self.manager.run_task(self.do_thing)

This is because there is a check that the current task is part of the set of tasks which are already known to the service. In the above example, the current task will be external to the service and should not be cancelled upon service cancellation.

How can it be fixed.

I think that it makes the most sense to place these tasks in a separate container since they are detattched from the task DAG. I think it's appropriate to treat them as new root nodes in the DAG.

Beta release

Once we've got Trinity fully migrated onto these APIs I'll feel comfortable considering a beta release.

The only significant API change I'm considering is consolidating the InternalManagerAPI and ManagerAPI to lift the restriction that external callers are not allowed to directly schedule child tasks and services. The original motivation for this restriction was based on a concept of an alternate manager that ran the service in a separate process. I've since come to think this isn't a good idea, and with this lifted, there's no good argument I can make to maintain the restriction.

Decide correct behavior when a task fails to exit.

What was wrong?

In both ManagerAPI implementations a user could write code like the following.

class MyService(Service):
    async def run(self):
        try:
            ...  # main logic
        finally:
            ...  # do something blocking that doesn't exit

In this case the service will hang on shutdown. you can also make some trivial modifications to the code above to have it catch the cancellation exception and not re-raise which would result in roughly the same hanging during shutdown behavior.

How can we fix this?

  1. Add a timeout and exit the service anyways after the timeout
  2. Add a timeout and log a warning if something fails due to the timeout.
  3. Do nothing and just hang...
  4. other ideas?

I think option 1 is a no-go since there are legitimate use cases where you would want to hang during shutdown to do some cleanup.

I think option 2 is possible, but it results in a warning being omitted during legitimate use cases.

I'm inclined towards option 3 because a user does have to do some extra work to get themselves into this situation. We could potentially mitigate the user impact via good documentation on how to do cleanup correctly, patterns that are likely to result in unclean shutdowns, absolute no-no's like not re-raising cancellation exceptions, and debugging advice for services that are hanging.

Stable release?

What was wrong?

The library currently only has alpha releases.

How can it be fixed?

Should we move to stable? Plan would be to cut a beta, shake the tree for a few weeks and then move to a stable 0.x.x release.

Ensure trio service shutdown follows DAG cancellation logic

What is wrong

See: https://github.com/ethereum/async-service/blob/04bbca368b0dfdeb87e8f9ff871ffb23a27b01ee/tests-asyncio/test_task_dag_cancellation_order.py

The asyncio service builds a DAG of tasks. The idea is that all of the tasks a service runs can be modeled as a dag

class MyService(Service):
    async def run():
        with open_socket() as socket:
            self.run_task(self.consume_socket, socket)
            await self.manager.wait_finished()
    async def consume_socket(self, socket):
        while True:
            try:
                do_thing_with_socket(socket)
            except CANCELLATION:
                do_socket_cleanup(socket)

In the example above we have a DAG of run() -> consume_socket(). The consume_socket task needs the socket to be open. The run() method will close the socket when it gets cancelled.

IF run() is cancelled before consume_socket() then when consume_socket get's cancelled it will try to do do_socket_cleanup which will error because the socket will have already been closed.

In order to address this we need to ensure that we unroll our task DAG by cancelling them from the leaf's back towards the root.

How can it be fixed

The test for the asyncio version should be replicated in the trio version and then if trio doesn't do this for us automatically we will need to do something like lazily spawning new nurseries for each task to house it's sub-tasks and then doing the nursery cancellations in the proper order.

RuntimeError: dictionary changed size during iteration in _handle_cancelled()

This happened when one my service's daemon tasks terminated by raising an exception. The exception from the daemon task is lost and this was the only thing I got in the logs

13:26:13 DEBUG: starting lookup; initial neighbours: [<Node(0x15ac307a470b411745a6f10544ed54c0a14ad640b21f04f523e736e732bf709d4e28c2f06526ecabc03eed226b6d9bee8e433883cd20ab6cbd114bab77a8775d@52.176.7.10:30303)>, <Node(0x1836710284935d21d3c8ec8735ded61057b3b6700415c9a17643c0fbb79c0cc65acf478374efcba027e21549e845ad2a4bfc9a695579676ac5ffd974f4941fb7@58.243.201.60:42445)>, <Node(0x865a63255b3bb68023b6bffd5095118fcc13e79dcf014fe4e47e065c350c7cc72af2e53eff895f11ba1bbb6a2b33271c1116ee870f266618eadfc2e78aa7349c@52.176.100.77:30303)>]
Traceback (most recent call last):
  File "scripts/discovery.py", line 67, in <module>
    trio.run(main)
  File "/home/salgado/virtualenvs/trinity-trio/lib/python3.7/site-packages/trio/_core/_run.py", line 1804, in run
    raise runner.main_task_outcome.error
  File "scripts/discovery.py", line 63, in main
    await service.manager.wait_finished()
  File "/home/salgado/virtualenvs/trinity-trio/lib/python3.7/site-packages/async_generator/_util.py", line 42, in __aexit__
    await self._agen.asend(None)
  File "/home/salgado/src/async-service/async_service/trio.py", line 413, in background_trio_service
    await manager.stop()
  File "/home/salgado/virtualenvs/trinity-trio/lib/python3.7/site-packages/trio/_core/_run.py", line 730, in __aexit__
    raise combined_error_from_nursery
  File "/home/salgado/src/async-service/async_service/trio.py", line 140, in run
    system_nursery.cancel_scope.cancel()
  File "/home/salgado/virtualenvs/trinity-trio/lib/python3.7/site-packages/trio/_core/_run.py", line 730, in __aexit__
    raise combined_error_from_nursery
  File "/home/salgado/src/async-service/async_service/trio.py", line 65, in _handle_cancelled
    for task in iter_dag(self._service_task_dag):
  File "/home/salgado/src/async-service/async_service/_utils.py", line 56, in iter_dag
    for item in dag.keys():
RuntimeError: dictionary changed size during iteration

Ensure consistence of service not exiting until all run_task's exit (unless cancelled)

What was wrong?

I believe this is a deficiency in the async-service APIs. I think there are edge cases whre you have a run method like this which schedules some background things and then immediately returns. I believe that the internals don't necessarily always handle this case correctly and sometimes the service actually exits and cancels itself before the background tasks/services actually get started. I ran into this recently as well and used this pattern to fix it.

ethereum/trinity#1707 (comment)

Code that produced the error

https://github.com/ethereum/trinity/pull/1707/files#diff-d51f89d0a2c5a88e0ba91bc1a594851aR68

A run method like the above ^ causes the service to exit prematurely, even though child tasks/services should be running.

Expected Result

Even after run() exits, the service should not terminate until one of:

  • all child tasks/services terminate
  • the service is canceled

How can it be fixed?

Maybe keep a counter of scheduled but unlaunched tasks/services to block execution at the end of the service's run() method? I don't really know the internals, so it's just a guess.

Deduplicate test suite with something generic.

What is wrong

Currently, the test suite is basically duplicated between the tests-asyncio and tests-trio directories.

How can it be fixed

Some sort of framework agnostic test suite that can be run within either asyncio or trio.

Probably depends on #18

Maybe use `sniffio` to implement generic `run_service` and `backgorund_service` methods.

What is wrong

The Service API is currently totally agnostic to whether things are trio/asyncio. This is good.

The Manager API is also totally agnostic to whether things are trio/asyncio, however you do have to use the right manager for any given async framework. Basically, ignoring how you acquire a manager instance, the same code can work with both asyncio/trio.

The running of services is not agnostic. You're required to pick run_asyncio_service or run_trio_service. Same with backgorund_asyncio_service and background_trio_service.

It would be ideal to expose simple run_service and background_service APIs that could be used in either async framework.

How can it be fixed

I think we can pretty easily use sniffio to create these generic apis. I'm not seeing an obvious downside and sniffio is an incredibly simple and small library.

No handling of KeyboardInterrupts in asyncio services

When a running service's process gets sent a SIGINT, that will cause a KeyboardInterrupt to be triggered by whatever code is executing at that moment, and that exception goes uncaught, causing the manager's run() method to exit without cancelling the service or waiting for its cleanup. Once that happens, any code that calls await manager.stop() or await manager.wait_finished() will hang forever, obviously. This seems to be the cause of most of the timeout errors we get when shutting down trinity components.

IIUC, one complicating factor here is the fact that the KeyboardInterrupt may come from different places, depending on what code is executing at the moment. For example:

  1. Any task started by the service via run_task() & co
  2. AsyncioManager._wait_all_tasks_done() -- although less likely as it usually yields shortly after being given control
  3. Any asyncio APIs

Ability to reliably run code AFTER all tasks have stopped but BEFORE service is registered as finished.

What is wrong

The previous API this library was based on had a _cleanup() method that you could implement that would allow you to implement logic that would run after all tasks have been stopped but before the service registers as finished.

At present, the asyncio service runs all of the background tasks as a DAG with the Service.run() method at the root of the dag. Upon cancellation, any background task which is cancelled is guaranteed to have had all of it's children cancelled first. This means that at the point where the Service.run() method is cancelled, it is guaranteed that all other background tasks have completed. Because of this, the following is a reliable way to run code in the manner specified above.

class MyService(Service):
    async def run(self):
        self.manager.run_task(do_background_thing_a)
        self.manager.run_task(do_background_thing_b)

        try:
            await self.manager.wait_finished()
        finally:
            ...  # do cleanup

While this pattern accomplishes the goal, it requires intimate knowledge of how the service lifecycle functions to understand why this works. Is this ok?

How can it be fixed.

  1. We could just document this well. Good documentation examples and guides as well as the explanation above to help explain why it works.
  2. Create a new API where people can embed this logic.

With respect to option 1: I'm inclined to do it this way to keep the library as simple as possible. I like that a service is just a function... that you can wrap up in a class or put in a plain function and use the as_service decorator.

With respect to option 2: This would be pretty simple to implement. I see two obvious options.

class MyService(Service):
    async def run(self):
        self.manager.run_task(do_background_thing_a)
        self.manager.run_task(do_background_thing_b)

        await self.manager.wait_finished()

    async def cleanup(self):
            ...  # do whatever cleanup and then re-raise

#
# OR MAYBE....
#

await run_service(service, cleanup_fn=...)

Both of these suffer from not having an easy way to get context into how/why the service exited where as the try/finally can be implemented as try/except or even if any(sys.exc_info()) to check if an exception is being handled. In fact, it even allows easy implementation of something like this pattern when it's justified.

class MyService(Service):
    async def run(self):
        self.manager.run_task(do_background_thing_a)
        self.manager.run_task(do_background_thing_b)

        try:
            await self.manager.wait_finished()
        except:
            await self.on_error()
        finally:
            await self.on_exit()
        else:
            await self.on_success()

My inclination is to leave it simple and document these patterns well with good examples that are easy to copy/paste from.

We could even explore a metaclass based solution that keeps the manager unaware of this and implements the pattern above by manipulating the run method on class construction.

Type hints do not prevent external access to internal manager

What is wrong

The type hints on Service were supposed to disallow external service consumers to call service.manager and hence prevent access to task scheduling functionality.

Instead external parties should be forced to gain access via service.get_manager() which does not expose task scheduling functionality.

Further described here:

@property
def manager(self) -> "InternalManagerAPI":
"""
Expose the manager as a property here intead of
:class:`async_service.abc.ServiceAPI` to ensure that anyone using
proper type hints will not have access to this property since it isn't
part of that API, while still allowing all subclasses of the
:class:`async_service.base.Service` to access this property directly.
"""
return self._manager

It doesn't seem to be working as demonstrated here: https://github.com/ethereum/trinity/pull/1792/files#r442016399

Haven't looked into it yet.

DaemonTaskExit in trio on cancellation

Currently this is undiagnosed but it appears that there can be some race conditions where shutting down a service results in a DaemonTaskExit being incorrectly raised when the child task exits.

The condition needed to trigger this seems to be having something like a nursery running in the task where the task hangs waiting for the nursery to exit. The cancellation semantics of trio potentially trigger the cancellation immediately which results in the task exiting immediately and then the service treats that as an error... but this is not confirmed.

Unbounded memory growth for task tracking.

What was wrong?

Currently, both the trio and asyncio service appear to have unbounded memory growth when running background tasks.

How can it be fixed?

We need logic that for a given task, will wait until all of it's children have completed and been cleaned up, and then remove the task from the DAG.

`as_service` decorator needs fancy type hints.

What is wrong?

The as_service decorator returns a ServiceAPI. If the function takes any arguments, those are expected to be passed into the constructor of the returned class. However, since it specifies it returns a ServiceAPI, mypy thinks that the __init__ method takes no arguments.

How can it be fixed.

I think we need to use the mypy plugin API to inform mypy that the returned type has a constructor that matches that of the function arguments (minus the manager)

Race condition causing Lifecycle error during cancellation

If this is a bug report, please fill in the following sections.
If this is a feature request, delete and describe what you would like with examples.

What was wrong?

In the trinity codebase this warning happens occasionally

<Manager[ETHProxyPeerPool] flags=SRCfE>: unexpected error when cancelling tasks, service may not terminate
Traceback (most recent call last):
  File "/home/piper/python-environments/trinity/lib/python3.8/site-packages/async_service/asyncio.py", line 122, in _handle_cancelled
    await self._real_handle_cancelled()
  File "/home/piper/python-environments/trinity/lib/python3.8/site-packages/async_service/asyncio.py", line 170, in _real_handle_cancelled
    raise LifecycleError(f"Should have already been completed: {asyncio_task}")
async_service.exceptions.LifecycleError: Should have already been completed: <Task pending name='Task-889' coro=<BaseManager._run_and_manage_task() running at /home/piper/python-environments/trinity/lib/python3.8/site-packages/async_service/base.py:300> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f035d06b280>()]>>

This appears to be the result of a new task being scheduled while cancellation is in progress.

How can it be fixed?

Look into how to reproduce this and then figure out how to prevent it? This could be as simple as a small wait to allow any tasks that are in the process of being scheduled to land into the data structures before proceeding with task cancellation, but ideally we can land on a mechanism that is more precise.

New API: manager.wait_child_tasks_complete()

What was wrong?

This is heavily inspired by and related to #66

Currently the standard pattern we typically use in many places is:

def run(self):
    ... do sub-task things
    await self.manager.wait_finished()

This pattern doesn't have to be present in the run method, but it is most commonly found here.

In many cases, we may actually not want to wait indefinitely, but rather, until all of the child tasks are finished.

How can it be fixed?

Since the services manage a DAG for all tasks, we should be able to implement a method that blocks until all child tasks have completed. Considering the case where a service runs child tasks or services and then has a natural exit condition, the above pattern would likely turn into:

def run(self):
    ... do sub-task things
    await self.manager.wait_all_child_tasks_complete()

Once #66 has been addressed, this pattern would not be explicitly required, I believe it would still impove the UX of the library since it would expose a more refined API for waiting for the actual condition that the code intends to block until rather than having to resort to an API that blocks indefinitely absent a call to manager.cancel().

cleanup via local `TaskAPI` abstraction.

I think that we can cleanup and de-duplicate some code in the respective trio and asyncio implementations by defining a common TaskAPI.

We would need an implementation for both trio and asyncio.

I believe that this would allow us to embed all of the framework specific logic in the class itself, allowing much of the business logic to move into the common BaseManager implementation.

  • TaskAPI.__call__ can potentially replace _run_and_manager_task
  • await TaskAPI.cancel() can encapsulate the differences in cleanup logic.
  • await TaskAPI.done() can handle waiting for a task to be fully complete.
  • TaskAPI.dependencies can handle tracking of task children.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.