Coder Social home page Coder Social logo

fastapi-events's People

Contributors

bvdcreations avatar deepsourcebot avatar ediskandarov avatar melvinkcx avatar mohsin-ul-islam avatar psychedelicious avatar smithk86 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

fastapi-events's Issues

Prevent event loop blocking by sync handler

    async def handle(self, event: Event) -> None:
        for handler in self._get_handlers_for_event(event_name=event[0]):
            if inspect.iscoroutinefunction(handler):
                await handler(event)
            else:
                handler(event)

Potential memory leak

token: Token = event_store.set(deque())
try:
await self.app(scope, receive, send)
finally:
await self._process_events()
event_store.reset(token)

        token: Token = event_store.set(deque())
        try:
            await self.app(scope, receive, send)
        finally:
            await self._process_events()
        event_store.reset(token)

If await self._process_events() raises an exception then event_store.reset(token) does not happen.

Events With Multiple Local Handlers Can Be Interfered By Each Other

Today, an event with multiple local handlers registered can interfere with each other as all handlers are given the same shared copy of events.

For instance,

# anywhere in code

dispatch(Events.USER_CREATED, {"key_in_payload": 123})
# in handlers.py

@local_handler.register(event_name=Events.USER_CREATED)
async def handle_user_created(event: Event):
    _, payload = event
    payload.pop("key_in_payload")


@local_handler.register(event_name=Events.USER_CREATED)
async def handle_user_created_2(event: Event)
    _, payload = event
    payload["key_in_payload"]  # KeyError

Proposal

A copy of the payload should be passed to the handlers.

Proposal - use asyncio for events management (Supporting Event Chaining)

fastapi-events currently uses ASGI middle-ware for the events management. This middle-ware creates event store for each specific request which is filled by events dispatched while computing the request and then used to execute collected events after the response was sent to a client. While this might be sufficient solution, this architecture has some disadvantages and there might be even more simplistic solution.

  • Initial problem
    First of all I am really thankful for this library and great work put into it. One of the limitations of currently used architecture and also reason why I had to stick with custom solution was the fact that currently its not possible to dispatch event from a registered handler (not possible to chain handlers by event). Since dispatch() function called from registered handler is called after the response was already sent there is no middle-ware which would execute additional events.

  • Custom simplistic solution
    It took me some time to find out how would I create custom self-executing event queue which would execute events no matter when and where dispatched as long as dispatch() function has access to event queue itself. Then I got an idea that if FastAPI is built on top of asyncio it should definitely be possible to dispatch tasks/events to the event loop so it will be queued together with other FastAPI tasks (mainly request handlers?). Following is very simple code change that allows to dispatch events into asyncio event loop and therefore there is not any requirement for event store, middle-ware executor and handling more than one task at a time.

def _dispatch(event_name: Union[str, Enum], payload: Optional[Any] = None) -> None:
    async def task():
          await local_handler.handle((event_name, payload))
    asyncio.create_task(task()) # we don't await for task execution, only register it

Differences between task management architectures

Property Middle-ware Event-loop (asyncio)
Executes after the response Yes Depends on usage
Doesn't block response Yes Yes
Dispatch must be called from a request context Yes Yes (anywhere from async context, so asyncio is accesible)
Dispatch can be used within registered handler No Yes

There are some key points to consider from the table above. While both strategies don't block the response, strategy with asyncio event loop can execute event sooner then the response is sent to client. This might happen when we do dispatch(event) with consecutive await promise. The event is dispatched to the event loop but since there is await after event has been dispatched the event loop might start executing dispatched event. From user perspective I would say this is acceptable/preferable behavior - I have already dispatched event but still awaiting for other resource and therefore other tasks can be executed in mean time. If dispatch is called and there is no consecutive await its guaranteed that it will be executed after the current event(request) finishes its execution.

While this change in architecture might break behavior users are used to I would say that strategy of detaching events execution to the asyncio event pool is more preferred and stable for the future. Instead of executing event right after the request/response where it was dispatched, event is sent to a queue and scheduled for execution with every other request/response and events that Fastapi creates. New architecture still allows old behavior to be used in case anyone needs to schedule event execution after the response. Moreover this architecture allows users to define preferred behavior instead of forcing them with strict rules.

Consider emitting warnings when annotating function/coroutine with `local_handler.handle()`

This is a common mistake when registering handlers with local_handler:

โœ… Correct usage:

    @local_handler.register(event_name="abc")
    async def handle_abc_event(event):
        pass

โŒ Incorrect usage:

   @local_handler.handle("abc")
   async def handle_abc_event(event):
       pass

Consider adding emitting a warning when local_handler.handle() is used when annotating functions/coroutines.

Add Python 3.11

  • update CI/CD pipelines to run tests in Python 3.11
  • update setup.py, if necessary

Trigger an event at startup

Is there a way to trigger an event at the startup of the application? I've been trying to do so with variations of the following piece of code:

app = FastAPI()
app.add_middleware(EventHandlerASGIMiddleware, 
                  handlers=[local_handler])
app.include_router(example.router)

@app.on_event("startup")
async def startup_event():
   dispatch("STARTING")

And I'm getting this error at the application startup:

ERROR:    Traceback (most recent call last):
  File "/home/acassimiro/Documents/tools/fastapi/eventDrivenApp/env/lib/python3.8/site-packages/starlette/routing.py", line 635, in lifespan
    async with self.lifespan_context(app):
  File "/home/acassimiro/Documents/tools/fastapi/eventDrivenApp/env/lib/python3.8/site-packages/starlette/routing.py", line 530, in __aenter__
    await self._router.startup()
  File "/home/acassimiro/Documents/tools/fastapi/eventDrivenApp/env/lib/python3.8/site-packages/starlette/routing.py", line 612, in startup
    await handler()
  File "/home/acassimiro/Documents/tools/fastapi/eventDrivenApp/main.py", line 39, in startup_event
    dispatch("STARTING")
  File "/home/acassimiro/Documents/tools/fastapi/eventDrivenApp/env/lib/python3.8/site-packages/fastapi_events/dispatcher.py", line 94, in dispatch
    return _dispatch(event_name=event_name, payload=payload)
  File "/home/acassimiro/Documents/tools/fastapi/eventDrivenApp/env/lib/python3.8/site-packages/fastapi_events/dispatcher.py", line 61, in _dispatch
    _dispatch_as_task(event_name, payload)
  File "/home/acassimiro/Documents/tools/fastapi/eventDrivenApp/env/lib/python3.8/site-packages/fastapi_events/dispatcher.py", line 37, in _dispatch_as_task
    handlers = _list_handlers()
  File "/home/acassimiro/Documents/tools/fastapi/eventDrivenApp/env/lib/python3.8/site-packages/fastapi_events/dispatcher.py", line 28, in _list_handlers
    middleware_id: int = middleware_identifier.get()
LookupError: <ContextVar name='fastapi_middleware_identifier' at 0x7fdbec69dd60>

ERROR:    Application startup failed. Exiting.

The task that I want to run with this starting event is supposed to dispatch events periodically, so even if I initially run it as a background task, I get the same error sometime after the application finishes starting. My current workaround is to manually call an endpoint that dispatches this event once the application is up and running, but I'd like to get rid of this step.

By the way, thanks for the effort and work in this project.

Do not always dump pydantic models to dict

Request:

When using pydantic models for events, they are dumped to dict by fastapi-events.

Motivation

This is convenient but restrictive. A pydantic model is often more useful for a handler than a dict:

  • You can call isinstance on it. This is particularly useful for events where one handler callback handles multiple event types.
  • The model may have methods or other logic/data that isn't captured by dumping to dict.
  • You can get type hints in event handlers.

Proposed API

Add arg payload_schema_dump: bool = True to dispatch.

The default behaviour is unchanged. When this is set to False, handlers are called with the pydantic model as the payload.

If validate_payload is True, we still do the round-trip to re-validate the payload, but skip the final dump.

pydantic v2 Warnings: The `dict` method is deprecated; use `model_dump` instead

Hi, after migrating to pydantic v2 I have notice a lot of warnings popping up.

pydantic.warnings.PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2
 Migration Guide at https://errors.pydantic.dev/2.1.1/migration/

This is because of the dispatch fastapi_events/dispatcher.py:119: in dispatch
payload = payload_schema_cls(**(payload or {})).dict(**payload_schema_cls_dict_args)

pydantic v2 import errors

When trying to upgrade to pydantic V2. there is the following import error

  File "/app/src/services/mail/events.py", line 1, in <module>
    from fastapi_events.handlers.local import local_handler
  File "/usr/local/lib/python3.11/site-packages/fastapi_events/handlers/local.py", line 10, in <module>
    from pydantic.error_wrappers import ErrorWrapper
  File "/usr/local/lib/python3.11/site-packages/pydantic/_migration.py", line 295, in wrapper
    raise PydanticImportError(f'`{import_path}` has been removed in V2.')
pydantic.errors.PydanticImportError: `pydantic.error_wrappers:ErrorWrapper` has been removed in V2.

For further information visit https://errors.pydantic.dev/2.1.1/u/import-error

it does not seem there is a direct replacement for "ErrorWrapper"
https://github.com/pydantic/pydantic/blob/main/pydantic/errors.py

Events are not received when dispatch() occurs in an asyncio.Task

I banged my head against the wall with this one for a while before I found the problem.

Steps to reproduce:

  1. Request is made to FastAPI which dispatches a starting X process. The request immediately returns a payload letting the client know the asynchronous process has started.
  2. The actual work is then done in an asyncio.Task which then dispatches a X process is complete afterwords.
  3. Any dispatched events in the Task are never received

What is actually happening:

The issue is on dispatcher.py line 57. Starting in Python 3.7, asyncio.Task copies the current context from contextvars into the Task. When line 57 is reached, the code is told the event will be dispatched in the middleware as part of the request since the request was active at the time the Task was created. In actuality, these events end up in the void as they should have been dispatched via _dispatch_as_task.

For now, anywhere an event needs to be dispatch within a Task, I import fastapi_events.in_req_res_cycle into the code and run in_req_res_cycle.set(None). This forces _dispatch() to process these events via _dispatch_as_task.

Edit: updated the link to include the specific commit revision

Add a SNS Handler

The SQS handler is a good start, but it would be great if I could use fastapi events with SNS.

I know I could write a local handler or write my own handler, but it would be cool to see one included with fastapi-events.

Support for Depends()

By looking at the docs and the source code to me it seems that usage of FastAPI dependency injection system (via Depends) is not supported. If it is really so, it will be a must have feature for many projects. E.g. I might have a service which injects a couple of other services which also use FastAPI's DI system. It will be very inconvenient to set up the whole hierarchy manually.

Register event payload class and event in 1

[Suggestion]

The chance of things going out of wack if you keep the event names manually with a string setup are high. suggestion is to do something like sqlalchemy uses with its models with __tablename__ on a (eventmodel) pydantic model

changing fastapi_events/registry/base.py to something like (untested) might be an easy first step:

    def register(self, _schema=None, event_name=None):
        if not event_name:
            if BaseModel and issubclass(_schema, BaseModel) and hasattr(_schema,"_event_name"):
                event_name = _schema._event_name
        if not event_name:
            raise ValueError("'event_name' must be provided when registering a schema")

"private fields" as per pydantic https://docs.pydantic.dev/latest/concepts/models/#private-model-attributes

(its a monday morning., i havent woken up enough to get this right yet)

this way the users can define a model and event name in 1 model and then just register it with something like (altho i would suggest having an upstream EventModel that upstreams BaseModel for possible future amazeballs

class MySuperEvent(BaseModel):
    _event_name="my.super.event"
    user: User = Field(default_factory=User))

main.py

import fastapi_events.whatever
whatever.register_event(MySuperEvent)

and for dispatch do something like dispatch(MySuperEvent(user=user)

Enhance Registration of Multiple Events Of Local Handler

Today, with local handler, it is not possible to decorate a function/coroutine with multiple @local_handler.register:

# this will result in errors
@local_handler.register(event_name="USER_CREATED")
@local_handler.register(event_name="GUESS_USER_CREATED")
async def handle_user_creation(event: Event):
     pass

The workaround is:

async def handle_user_creation(event: Event):
    pass

local_handler.register(event_name="USER_CREATED")(handle_user_creation)
local_handler.register(event_name="GUESS_USER_CREATED")(handle_user_creation)

TypeError with Synchronous Function in Dependencies When Using Local Handler

I've encountered a TypeError when a synchronous function is used as a dependency with the local event handler. It seems the dependency function is expected to receive the event object leading to a mismatch in arguments passed. Is this indeed intended?

Here's a snippet that reproduces the possible issue:

@router.get("")
def status():
    dispatch("status")
    return {"status": "ok"}


def get_dependency():
    return "dependency"


@local_handler.register(event_name="*")
async def handle_all_events(event: Event, dependency=fastapi.Depends(get_dependency)):
    pass

This results in the following error:
TypeError: get_dependency() takes 0 positional arguments but 1 was given

Changing get_dependency signature to async def get_dependency runs successfully.

Thanks for this awesome library!

๐Ÿ› Awkward errors after fastapi_events setup

Traces gathered by sentry.io:

ValueError: <Token var=<ContextVar name='fastapi_context' at 0x7f256cfd1bd0> at 0x7f255dd9e800> was created in a different Context
  File "fastapi_events/middleware.py", line 29, in __call__
    self._teardown_event_store()
  File "fastapi_events/middleware.py", line 40, in _teardown_event_store
    event_store.reset(self._token)
RuntimeError: <Token used var=<ContextVar name='fastapi_context' at 0x7f256cfd1bd0> at 0x7f255c3a6dc0> has already been used once
  File "fastapi_events/middleware.py", line 29, in __call__
    self._teardown_event_store()
  File "fastapi_events/middleware.py", line 40, in _teardown_event_store
    event_store.reset(self._token)

Trace I'm seeing in logs

Oct 19 01:16:53 PM  Traceback (most recent call last):
  File "/app/.venv/lib/python3.8/site-packages/uvicorn/protocols/http/httptools_impl.py", line 375, in run_asgi
    result = await app(self.scope, self.receive, self.send)
  File "/app/.venv/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 75, in __call__
    return await self.app(scope, receive, send)
  File "/app/.venv/lib/python3.8/site-packages/fastapi/applications.py", line 208, in __call__
    await super().__call__(scope, receive, send)
  File "/app/.venv/lib/python3.8/site-packages/starlette/applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/app/.venv/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
    raise exc
  File "/app/.venv/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
    await self.app(scope, receive, _send)
  File "/app/.venv/lib/python3.8/site-packages/starlette/middleware/cors.py", line 84, in __call__
    await self.app(scope, receive, send)
  File "/app/.venv/lib/python3.8/site-packages/sentry_sdk/integrations/asgi.py", line 106, in _run_asgi3
    return await self._run_app(scope, lambda: self.app(scope, receive, send))
  File "/app/.venv/lib/python3.8/site-packages/sentry_sdk/integrations/asgi.py", line 152, in _run_app
    raise exc from None
  File "/app/.venv/lib/python3.8/site-packages/sentry_sdk/integrations/asgi.py", line 149, in _run_app
    return await callback()
  File "/app/.venv/lib/python3.8/site-packages/fastapi_events/middleware.py", line 29, in __call__
    self._teardown_event_store()
  File "/app/.venv/lib/python3.8/site-packages/fastapi_events/middleware.py", line 40, in _teardown_event_store
    event_store.reset(self._token)
RuntimeError: <Token used var=<ContextVar name='fastapi_context' at 0x7f256cfd1bd0> at 0x7f255d943340> has already been used once

Providing AWS Credentials

Not able to use Amazon credentials for using SQSForwardHandler. Can anyone help with a code snippet for this.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.