melvinkcx / fastapi-events Goto Github PK
View Code? Open in Web Editor NEWAsynchronous event dispatching/handling library for FastAPI and Starlette
License: MIT License
Asynchronous event dispatching/handling library for FastAPI and Starlette
License: MIT License
fastapi-events currently uses ASGI middle-ware for the events management. This middle-ware creates event store for each specific request which is filled by events dispatched while computing the request and then used to execute collected events after the response was sent to a client. While this might be sufficient solution, this architecture has some disadvantages and there might be even more simplistic solution.
Initial problem
First of all I am really thankful for this library and great work put into it. One of the limitations of currently used architecture and also reason why I had to stick with custom solution was the fact that currently its not possible to dispatch event from a registered handler (not possible to chain handlers by event). Since dispatch()
function called from registered handler is called after the response was already sent there is no middle-ware which would execute additional events.
Custom simplistic solution
It took me some time to find out how would I create custom self-executing event queue which would execute events no matter when and where dispatched as long as dispatch()
function has access to event queue itself. Then I got an idea that if FastAPI is built on top of asyncio it should definitely be possible to dispatch tasks/events to the event loop so it will be queued together with other FastAPI tasks (mainly request handlers?). Following is very simple code change that allows to dispatch events into asyncio event loop and therefore there is not any requirement for event store, middle-ware executor and handling more than one task at a time.
def _dispatch(event_name: Union[str, Enum], payload: Optional[Any] = None) -> None:
async def task():
await local_handler.handle((event_name, payload))
asyncio.create_task(task()) # we don't await for task execution, only register it
Differences between task management architectures
Property | Middle-ware | Event-loop (asyncio) |
---|---|---|
Executes after the response | Yes | Depends on usage |
Doesn't block response | Yes | Yes |
Dispatch must be called from a request context | Yes | Yes (anywhere from async context, so asyncio is accesible) |
Dispatch can be used within registered handler | No | Yes |
There are some key points to consider from the table above. While both strategies don't block the response, strategy with asyncio event loop can execute event sooner then the response is sent to client. This might happen when we do dispatch(event)
with consecutive await promise
. The event is dispatched to the event loop but since there is await after event has been dispatched the event loop might start executing dispatched event. From user perspective I would say this is acceptable/preferable behavior - I have already dispatched event but still awaiting for other resource and therefore other tasks can be executed in mean time. If dispatch is called and there is no consecutive await its guaranteed that it will be executed after the current event(request) finishes its execution.
While this change in architecture might break behavior users are used to I would say that strategy of detaching events execution to the asyncio event pool is more preferred and stable for the future. Instead of executing event right after the request/response where it was dispatched, event is sent to a queue and scheduled for execution with every other request/response and events that Fastapi creates. New architecture still allows old behavior to be used in case anyone needs to schedule event execution after the response. Moreover this architecture allows users to define preferred behavior instead of forcing them with strict rules.
The SQS handler is a good start, but it would be great if I could use fastapi events with SNS.
I know I could write a local handler or write my own handler, but it would be cool to see one included with fastapi-events.
Hi, after migrating to pydantic v2 I have notice a lot of warnings popping up.
pydantic.warnings.PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2
Migration Guide at https://errors.pydantic.dev/2.1.1/migration/
This is because of the dispatch fastapi_events/dispatcher.py:119: in dispatch
payload = payload_schema_cls(**(payload or {})).dict(**payload_schema_cls_dict_args)
When trying to upgrade to pydantic V2. there is the following import error
File "/app/src/services/mail/events.py", line 1, in <module>
from fastapi_events.handlers.local import local_handler
File "/usr/local/lib/python3.11/site-packages/fastapi_events/handlers/local.py", line 10, in <module>
from pydantic.error_wrappers import ErrorWrapper
File "/usr/local/lib/python3.11/site-packages/pydantic/_migration.py", line 295, in wrapper
raise PydanticImportError(f'`{import_path}` has been removed in V2.')
pydantic.errors.PydanticImportError: `pydantic.error_wrappers:ErrorWrapper` has been removed in V2.
For further information visit https://errors.pydantic.dev/2.1.1/u/import-error
it does not seem there is a direct replacement for "ErrorWrapper"
https://github.com/pydantic/pydantic/blob/main/pydantic/errors.py
Event pattern matching expects event names to be of string type due to the usage of fnmatch
.
Errors occur when Enum type is used as event names with local_handler.
Suggestion:
str()
if event name is not of string typeToday, with local handler, it is not possible to decorate a function/coroutine with multiple @local_handler.register
:
# this will result in errors
@local_handler.register(event_name="USER_CREATED")
@local_handler.register(event_name="GUESS_USER_CREATED")
async def handle_user_creation(event: Event):
pass
The workaround is:
async def handle_user_creation(event: Event):
pass
local_handler.register(event_name="USER_CREATED")(handle_user_creation)
local_handler.register(event_name="GUESS_USER_CREATED")(handle_user_creation)
This is a common mistake when registering handlers with local_handler
:
โ Correct usage:
@local_handler.register(event_name="abc")
async def handle_abc_event(event):
pass
โ Incorrect usage:
@local_handler.handle("abc")
async def handle_abc_event(event):
pass
Consider adding emitting a warning when local_handler.handle()
is used when annotating functions/coroutines.
I've encountered a TypeError when a synchronous function is used as a dependency with the local event handler. It seems the dependency function is expected to receive the event object leading to a mismatch in arguments passed. Is this indeed intended?
Here's a snippet that reproduces the possible issue:
@router.get("")
def status():
dispatch("status")
return {"status": "ok"}
def get_dependency():
return "dependency"
@local_handler.register(event_name="*")
async def handle_all_events(event: Event, dependency=fastapi.Depends(get_dependency)):
pass
This results in the following error:
TypeError: get_dependency() takes 0 positional arguments but 1 was given
Changing get_dependency
signature to async def get_dependency
runs successfully.
Thanks for this awesome library!
Sometimes generating the payload of an event might involve some serialization overhead which would be better deferred until after the response has been sent. An example of this might be converting a numpy
array to a type which is compatible with json.dumps
.
I'm imagining something like:
@app.route("/")
async def root(request: Request) -> JSONResponse:
arr = np.random.random(512)
dispatch("new event", payload=lambda: {'arr': arr.tolist()})
return JSONResponse()
This was fixed in #49 but one call to dict
was missed
Not able to use Amazon credentials for using SQSForwardHandler. Can anyone help with a code snippet for this.
Add documentation on how to suppress event dispatching in tests.
I am instrumenting a FastAPI application where events are Pydantic models, but the OTEL implementation expects them to be dict
. This causes the following error on each event:
File \"/path/.venv/lib/python3.11/site-packages/fastapi_events/dispatcher.py\", line 52, in task
await asyncio.gather(*[handler.handle((event_name, payload)) for handler in handlers])
File \"/path/.venv/lib/python3.11/site-packages/fastapi_events/handlers/local.py\", line 195, in handle
with create_span_for_handle_fn(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File \"/path/.venv/lib/python3.11/site-packages/fastapi_events/otel/utils.py\", line 47, in create_span_for_handle_fn
remote_ctx = propagate.extract(payload)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File \"/path/.venv/lib/python3.11/site-packages/opentelemetry/propagate/__init__.py\", line 101, in extract
return get_global_textmap().extract(carrier, context, getter=getter)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File \"/path/.venv/lib/python3.11/site-packages/opentelemetry/propagators/composite.py\", line 52, in extract
context = propagator.extract(carrier, context, getter=getter)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File \"/path/.venv/lib/python3.11/site-packages/opentelemetry/trace/propagation/tracecontext.py\", line 49, in extract
header = getter.get(carrier, self._TRACEPARENT_HEADER_NAME)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File \"/path/.venv/lib/python3.11/site-packages/opentelemetry/propagators/textmap.py\", line 87, in get
val = carrier.get(key, None)
^^^^^^^^^^^
File \"/path/.venv/lib/python3.11/site-packages/pydantic/main.py\", line 811, in __getattr__
raise AttributeError(f'{type(self).__name__!r} object has no attribute {item!r}')
AttributeError: 'QueueItemStatusChangedEvent' object has no attribute 'get'"}
payload
(a Pydantic model) does not have a .get()
method, so this exception is raised. I can not bypass this either, because when opentelemetry packages are installed in the venv, fastapi-events
imports them and attempts to instrument itself automatically.
Is there any way to either:
getter
method or somehow configure the OpenTelemetry propagator differently to handle Pydantic models (i know this is beyond the scope of fastapi-events
, but couldn't find any docs on this)fastapi-events
altogether, so that I can use the OpenTelemetry fastapi contrib libraries to instrument the application?Thanks for any help.
(ETA: it's definitely possible that i have no idea what i'm doing)
I banged my head against the wall with this one for a while before I found the problem.
Steps to reproduce:
asyncio.Task
which then dispatches a X process is complete afterwords.Task
are never receivedWhat is actually happening:
The issue is on dispatcher.py line 57. Starting in Python 3.7, asyncio.Task
copies the current context from contextvars
into the Task
. When line 57 is reached, the code is told the event will be dispatched in the middleware as part of the request since the request was active at the time the Task
was created. In actuality, these events end up in the void as they should have been dispatched via _dispatch_as_task
.
For now, anywhere an event needs to be dispatch within a Task
, I import fastapi_events.in_req_res_cycle
into the code and run in_req_res_cycle.set(None)
. This forces _dispatch()
to process these events via _dispatch_as_task
.
Edit: updated the link to include the specific commit revision
fastapi-events/fastapi_events/middleware.py
Lines 23 to 28 in a5a2823
token: Token = event_store.set(deque())
try:
await self.app(scope, receive, send)
finally:
await self._process_events()
event_store.reset(token)
If await self._process_events()
raises an exception then event_store.reset(token)
does not happen.
https://github.com/samuelcolvin/aioaws - is an asyncio friendly alternative to boto3
async def handle(self, event: Event) -> None:
for handler in self._get_handlers_for_event(event_name=event[0]):
if inspect.iscoroutinefunction(handler):
await handler(event)
else:
handler(event)
As titled, a referenced PR can be found here: open-telemetry/opentelemetry-python-contrib#1229
As titled, need to ensure all tests pass in Python 3.12
On the head of the dev branch, this test is failing:
tests/middleware/test_starlette.py::test_event_handling_without_request
This is a false negative. The library remains working properly, but the test case needs to be updated. Please see the section below for more details.
The middleware stack is now lazily built by Starlette with version 0.24 or later.
Ref: https://github.com/encode/starlette/releases/tag/0.24.0
Is there a way to trigger an event at the startup of the application? I've been trying to do so with variations of the following piece of code:
app = FastAPI()
app.add_middleware(EventHandlerASGIMiddleware,
handlers=[local_handler])
app.include_router(example.router)
@app.on_event("startup")
async def startup_event():
dispatch("STARTING")
And I'm getting this error at the application startup:
ERROR: Traceback (most recent call last):
File "/home/acassimiro/Documents/tools/fastapi/eventDrivenApp/env/lib/python3.8/site-packages/starlette/routing.py", line 635, in lifespan
async with self.lifespan_context(app):
File "/home/acassimiro/Documents/tools/fastapi/eventDrivenApp/env/lib/python3.8/site-packages/starlette/routing.py", line 530, in __aenter__
await self._router.startup()
File "/home/acassimiro/Documents/tools/fastapi/eventDrivenApp/env/lib/python3.8/site-packages/starlette/routing.py", line 612, in startup
await handler()
File "/home/acassimiro/Documents/tools/fastapi/eventDrivenApp/main.py", line 39, in startup_event
dispatch("STARTING")
File "/home/acassimiro/Documents/tools/fastapi/eventDrivenApp/env/lib/python3.8/site-packages/fastapi_events/dispatcher.py", line 94, in dispatch
return _dispatch(event_name=event_name, payload=payload)
File "/home/acassimiro/Documents/tools/fastapi/eventDrivenApp/env/lib/python3.8/site-packages/fastapi_events/dispatcher.py", line 61, in _dispatch
_dispatch_as_task(event_name, payload)
File "/home/acassimiro/Documents/tools/fastapi/eventDrivenApp/env/lib/python3.8/site-packages/fastapi_events/dispatcher.py", line 37, in _dispatch_as_task
handlers = _list_handlers()
File "/home/acassimiro/Documents/tools/fastapi/eventDrivenApp/env/lib/python3.8/site-packages/fastapi_events/dispatcher.py", line 28, in _list_handlers
middleware_id: int = middleware_identifier.get()
LookupError: <ContextVar name='fastapi_middleware_identifier' at 0x7fdbec69dd60>
ERROR: Application startup failed. Exiting.
The task that I want to run with this starting event is supposed to dispatch events periodically, so even if I initially run it as a background task, I get the same error sometime after the application finishes starting. My current workaround is to manually call an endpoint that dispatches this event once the application is up and running, but I'd like to get rid of this step.
By the way, thanks for the effort and work in this project.
Hello, I want to ask why the event callback is executed twice?
Today, an event with multiple local handlers registered can interfere with each other as all handlers are given the same shared copy of events.
For instance,
# anywhere in code
dispatch(Events.USER_CREATED, {"key_in_payload": 123})
# in handlers.py
@local_handler.register(event_name=Events.USER_CREATED)
async def handle_user_created(event: Event):
_, payload = event
payload.pop("key_in_payload")
@local_handler.register(event_name=Events.USER_CREATED)
async def handle_user_created_2(event: Event)
_, payload = event
payload["key_in_payload"] # KeyError
A copy of the payload should be passed to the handlers.
By looking at the docs and the source code to me it seems that usage of FastAPI dependency injection system (via Depends) is not supported. If it is really so, it will be a must have feature for many projects. E.g. I might have a service which injects a couple of other services which also use FastAPI's DI system. It will be very inconvenient to set up the whole hierarchy manually.
Document the public API in README:
dispatch
BaseEventPayloadSchemaRegistry.register
LocalHandler.register
Traces gathered by sentry.io:
ValueError: <Token var=<ContextVar name='fastapi_context' at 0x7f256cfd1bd0> at 0x7f255dd9e800> was created in a different Context
File "fastapi_events/middleware.py", line 29, in __call__
self._teardown_event_store()
File "fastapi_events/middleware.py", line 40, in _teardown_event_store
event_store.reset(self._token)
RuntimeError: <Token used var=<ContextVar name='fastapi_context' at 0x7f256cfd1bd0> at 0x7f255c3a6dc0> has already been used once
File "fastapi_events/middleware.py", line 29, in __call__
self._teardown_event_store()
File "fastapi_events/middleware.py", line 40, in _teardown_event_store
event_store.reset(self._token)
Trace I'm seeing in logs
Oct 19 01:16:53 PM Traceback (most recent call last):
File "/app/.venv/lib/python3.8/site-packages/uvicorn/protocols/http/httptools_impl.py", line 375, in run_asgi
result = await app(self.scope, self.receive, self.send)
File "/app/.venv/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 75, in __call__
return await self.app(scope, receive, send)
File "/app/.venv/lib/python3.8/site-packages/fastapi/applications.py", line 208, in __call__
await super().__call__(scope, receive, send)
File "/app/.venv/lib/python3.8/site-packages/starlette/applications.py", line 112, in __call__
await self.middleware_stack(scope, receive, send)
File "/app/.venv/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
raise exc
File "/app/.venv/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
await self.app(scope, receive, _send)
File "/app/.venv/lib/python3.8/site-packages/starlette/middleware/cors.py", line 84, in __call__
await self.app(scope, receive, send)
File "/app/.venv/lib/python3.8/site-packages/sentry_sdk/integrations/asgi.py", line 106, in _run_asgi3
return await self._run_app(scope, lambda: self.app(scope, receive, send))
File "/app/.venv/lib/python3.8/site-packages/sentry_sdk/integrations/asgi.py", line 152, in _run_app
raise exc from None
File "/app/.venv/lib/python3.8/site-packages/sentry_sdk/integrations/asgi.py", line 149, in _run_app
return await callback()
File "/app/.venv/lib/python3.8/site-packages/fastapi_events/middleware.py", line 29, in __call__
self._teardown_event_store()
File "/app/.venv/lib/python3.8/site-packages/fastapi_events/middleware.py", line 40, in _teardown_event_store
event_store.reset(self._token)
RuntimeError: <Token used var=<ContextVar name='fastapi_context' at 0x7f256cfd1bd0> at 0x7f255d943340> has already been used once
When using pydantic models for events, they are dumped to dict by fastapi-events
.
This is convenient but restrictive. A pydantic model is often more useful for a handler than a dict:
isinstance
on it. This is particularly useful for events where one handler callback handles multiple event types.Add arg payload_schema_dump: bool = True
to dispatch
.
The default behaviour is unchanged. When this is set to False
, handlers are called with the pydantic model as the payload.
If validate_payload is True
, we still do the round-trip to re-validate the payload, but skip the final dump.
[Suggestion]
The chance of things going out of wack if you keep the event names manually with a string setup are high. suggestion is to do something like sqlalchemy uses with its models with __tablename__
on a (eventmodel) pydantic model
changing fastapi_events/registry/base.py
to something like (untested) might be an easy first step:
def register(self, _schema=None, event_name=None):
if not event_name:
if BaseModel and issubclass(_schema, BaseModel) and hasattr(_schema,"_event_name"):
event_name = _schema._event_name
if not event_name:
raise ValueError("'event_name' must be provided when registering a schema")
"private fields" as per pydantic https://docs.pydantic.dev/latest/concepts/models/#private-model-attributes
(its a monday morning., i havent woken up enough to get this right yet)
this way the users can define a model and event name in 1 model and then just register it with something like (altho i would suggest having an upstream EventModel that upstreams BaseModel for possible future amazeballs
class MySuperEvent(BaseModel):
_event_name="my.super.event"
user: User = Field(default_factory=User))
main.py
import fastapi_events.whatever
whatever.register_event(MySuperEvent)
and for dispatch
do something like dispatch(MySuperEvent(user=user)
I cannot find a way to catch Exceptions properly using the SQSForwardHandler with a classic try catch inside my api.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.