Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dispatching Pydantic model; Support implicit event name #53

Merged
merged 9 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 100 additions & 65 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,22 @@ An event dispatching/handling library for FastAPI, and Starlette.

Features:

* straightforward API to emit events anywhere in your code
* events are handled after responses are returned (doesn't affect response time)
* supports event piping to remote queues
* powerful built-in handlers to handle events locally and remotely
* coroutine functions (`async def`) are the first-class citizen
* write your handlers, never be limited to just what `fastapi_events` provides
* (__>=0.3.0__) supports event payload validation via Pydantic (See [here](#event-payload-validation-with-pydantic))
* (__>=0.4.0__) supports event chaining: dispatching events within handlers (thank [@ndopj](https://github.com/ndopj)
* Straightforward API for emitting events anywhere in your code.
* Events are handled after responses are returned, ensuring no impact on response time.
* Supports event piping to remote queues.
* Powerful built-in handlers for local and remote event handling
* Coroutine functions (`async def`) are treated as first-class citizens
* Write your own handlers; don't be limited to just what `fastapi_events` provides
* (__>=0.3.0__) Supports event payload validation via Pydantic (See [here](#event-payload-validation-with-pydantic))
* (__>=0.4.0__) Supports event chaining: dispatching events within handlers (thanks to [@ndopj](https://github.com/ndopj)
for contributing to the idea)
* (__>=0.7.0__) supports OpenTelemetry: see [this section](#opentelemetry-otel-support) for details
* (__>=0.9.0__) supports dependencies in local handlers: see [this section](#using-dependencies-in-local-handler) for details
* (__>=0.9.1__) supports Pydantic v2
* (__>=0.7.0__) Supports OpenTelemetry. See [this section](#opentelemetry-otel-support) for details
* (__>=0.9.0__) Adds support for [FastAPI dependencies](https://fastapi.tiangolo.com/tutorial/dependencies/) in local handlers. See [this section](#using-dependencies-in-local-handler) for
details
* (__>=0.9.1__) Now supports Pydantic v2
* (__>=0.10.0__) Enables dispatching Pydantic models as events (thanks to [@WilliamStam](https://github.com/WilliamStam) for contributing to this idea)

If you use or like this project, please consider giving it a star so it can reach more developers. Thanks =)
If you use or appreciate this project, please consider giving it a star to help it reach more developers. Thanks =)

## Installation

Expand Down Expand Up @@ -123,7 +125,9 @@ pip install fastapi-events[otel]

## Dispatching events

Events can be dispatched anywhere in the code, as long as they are dispatched before a response is made.
Events can be dispatched anywhere in the code, provided that they are dispatched before a response is generated.

### Option 1 - using dict

```python
# anywhere in code
Expand All @@ -138,11 +142,37 @@ dispatch(
dispatch("a_cat_is_spotted") # This works too!
```

### Event Payload Validation With Pydantic
### Option 2 - using Pydantic model

> New feature since version 0.10.0

It is now possible to dispatch pydantic model as events. A special thanks to
[@WilliamStam](https://github.com/WilliamStam) for introducing this remarkable idea.

```python
# anywhere in code
import pydantic
from fastapi_events.dispatcher import dispatch


class CatRequestedAFishEvent(pydantic.BaseModel):
__event_name__ = "cat-requested-a-fish"

cat_id: pydantic.UUID4


# Option 2 - dispatching event with pydantic model
dispatch(CatRequestedAFishEvent(cat_id="fd375d23-b0c9-4271-a9e0-e028c4cd7230"))

# which is equivalent to:
dispatch("cat-requested-a-fish", payload={"cat_id": "fd375d23-b0c9-4271-a9e0-e028c4cd7230"})
```

## Event Payload Validation With Pydantic

Event payload validation is possible since version 0.3.0. To enable, simply register
a [Pydantic models](https://pydantic-docs.helpmanual.io/usage/models/) with the corresponding event name.
Since version 0.3.0, event payload validation is possible. To enable this feature, register a Pydantic model with the corresponding event name.

> __>=0.10.0__: Event name can now be defined as a part of the payload schema as `__event_name__`
```python
import uuid
from enum import Enum
Expand All @@ -162,34 +192,44 @@ class UserEvents(Enum):
class SignUpPayload(BaseModel):
user_id: uuid.UUID
created_at: datetime

# which is also equivalent to
@payload_schema.register
class SignUpPayload(BaseModel):
__event_name__ = "USER_SIGNED_UP"

user_id: uuid.UUID
created_at: datetime
```

> Wildcard in event name is currently not supported

Payload will be validated automatically without any changes made while invoking the dispatcher.
The payload will be validated automatically without any changes required when invoking the dispatcher.

```python
# Events with payload schema registered
dispatch(UserEvents.SIGNED_UP) # raises ValidationError, missing payload
dispatch(UserEvents.SIGNED_UP,
{"user_id": "9e79cdbb-b216-40f7-9a05-20d223dee89a"}) # raises ValidationError, missing `created_at`
dispatch(UserEvents.SIGNED_UP,
{"user_id": "9e79cdbb-b216-40f7-9a05-20d223dee89a", created_at: datetime.utcnow()}) # OK!
{"user_id": "9e79cdbb-b216-40f7-9a05-20d223dee89a", "created_at": datetime.utcnow()}) # OK!

# Events without payload schema -> No validation will be performed
dispatch(UserEvents.ACTIVATED,
{"user_id": "9e79cdbb-b216-40f7-9a05-20d223dee89a"}) # OK! no validation will be performed

# Events dispatched with Pydantic model (>=0.10.0) -> Validation will be skipped since it would have been already validated
# If you choose to do this, you must ensure __event_name__ is defined in SignUpPayload
dispatch(SignUpPayload(user_id="9e79cdbb-b216-40f7-9a05-20d223dee89a", created_at=datetime.utcnow()))
```

> Reminder: payload validation is optional.
> Payload of events without its schema registered will not be validated.
> Payload validation is optional. Payload of events without its schema registered will not be validated.

## Handling Events

### Handle events locally

The flexibility of `fastapi-events` allows us to customise how the events should be handled. For starters, you might
want to handle your events locally.
The flexibility of `fastapi-events` enales customisation of how events should be handled. To begin, you may want to handle your events locally.

```python
# ex: in handlers.py
Expand Down Expand Up @@ -224,11 +264,10 @@ async def handle_all_events(event: Event):
#### Using Dependencies in Local Handler

> new feature in fastapi-events>=0.9.0
>
Dependencies can now be utilized with local handlers, and sub-dependencies are also supported.

Dependencies can now be used with local handler. Sub-dependencies are also supported.

However, dependencies using generator (with `yield` keyword) is not supported yet. I have the intention to support it in the future.

As of now, dependencies utilizing a generator (with the `yield` keyword) are not yet supported.

```python
# ex: in handlers.py
Expand All @@ -237,20 +276,20 @@ from fastapi import Depends
from fastapi_events.handlers.local import local_handler
from fastapi_events.typing import Event


async def get_db_conn():
pass # return a DB conn
pass # return a DB conn


async def get_db_session(
db_conn=Depends(get_db_conn)
):
pass # return a DB session created from `db_conn`

pass # return a DB session created from `db_conn`


@local_handler.register(event_name="*")
async def handle_all_events(
event: Event,
event: Event,
db_session=Depends(get_db_session)
):
# use the `db_session` here
Expand All @@ -259,9 +298,8 @@ async def handle_all_events(

### Piping Events To Remote Queues

For larger projects, you might have services dedicated to handling events separately.

For instance, `fastapi-events` comes with AWS SQS forwarder to forward events to a remote queue.
In larger projects, it's common to have dedicated services for handling events separately.
For example, `fastapi-events` includes an AWS SQS forwarder, allowing you to forward events to a remote queue.

1. Register `SQSForwardHandler` as handlers:
```python
Expand All @@ -271,7 +309,7 @@ For instance, `fastapi-events` comes with AWS SQS forwarder to forward events to
region_name="eu-central-1")]) # registering handler(s)
```

2. Start dispatching events! Events will be serialised into JSON format by default:
2. Start dispatching events! By default, events will be serialised into JSON format:
```python
["event name", {"payload": "here is the payload"}]
```
Expand Down Expand Up @@ -299,18 +337,18 @@ Here is a list of built-in event handlers:
* import from `fastapi_events.handlers.gcp`
* to publish events to a single pubsub topic

# Creating your own handler
# Creating Custom Handlers

Creating your own handler is nothing more than inheriting from the `BaseEventHandler` class
Creating your own handler is as simple as inheriting from the `BaseEventHandler` class
in `fastapi_events.handlers.base`.

To handle events, `fastapi_events` calls one of these methods, in the following priority order:
To handle events, `fastapi_events` calls one of these methods, following this priority order:

1. `handle_many(events)`:
The coroutine function should expect the backlog of the events collected.

2. `handle(event)`:
In cases where `handle_many()` weren't defined in your custom handler, `handle()`
If `handle_many()` is not defined in your custom handler, `handle()`
will be called by iterating through the events in the backlog.

```python
Expand Down Expand Up @@ -338,7 +376,7 @@ class MyOwnEventHandler(BaseEventHandler):

Since version 0.7.0, OpenTelemetry support has been added as an optional feature.

To enable it, make sure you install the optional modules:
To enable it, make sure you install the following optional modules:

```shell
pip install fastapi-events[otel]
Expand All @@ -357,14 +395,13 @@ Support for other handlers will be added in the future.

## 1) Suppressing Events / Disabling `dispatch()` Globally

In case you want to suppress events globally especially during testing, you can do so without having to mock or patch
the `dispatch()` function. Simple set the environment variable `FASTAPI_EVENTS_DISABLE_DISPATCH` to `1`, `True` or any
truthy values.
If you wish to globally suppress events, especially during testing, you can achieve this without having to mock or patch the dispatch() function.
Simply set the environment variable FASTAPI_EVENTS_DISABLE_DISPATCH to 1, True, or any truthy values.

## 2) Validating Event Payload During Dispatch

> Requires Pydantic, which comes with FastAPI.
> If you're using Starlette, you might need to install Pydantic
> This feature requires Pydantic, which is included with FastAPI.
> If you're using Starlette, ensure that Pydantic is installed separately.

See [Event Payload Validation With Pydantic](#event-payload-validation-with-pydantic)

Expand All @@ -383,22 +420,20 @@ Comparison between events dispatched within the request-response cycle and event

## 4) Dispatching events outside of a request

One goal of `fastapi-events` is to dispatch events without having to manage which instance
of `EventHandlerASGIMiddleware` is being targeted. By default, this is handled using `ContextVars`. There are occasions
when a user may want to dispatch events outside of the standard request sequence though. This can be accomplished by
generating a custom identifier for the middleware.
One of the goals of `fastapi-events` is to dispatch events without the need to manage specific instance of `EventHandlerASGIMiddleware`.
By default, this is handled using `ContextVars`.
However, there are scenarios where users may want to dispatch events outside the standard request sequence.
This can be achieved by generating a custom identifier for the middleware.

By default, the middleware identifier is generated from the object id of the `EventHandlerASGIMiddleware` instance and
is managed internally without need for user intervention. If the user needs to dispatch events outside of a
request-response lifecycle, a custom `middleware_id` value can be generated and passed to `EventHandlerASGIMiddleware`
during its creation. This value can then be used with `dispatch()` to ensure the correct `EventHandlerASGIMiddleware`
instance is selected.
By default, the middleware identifier is generated from the object ID of the `EventHandlerASGIMiddleware` instance and is managed internally without user intervention.
If a user needs to dispatch events outside of a request-response lifecycle, they can generate a custom `middleware_id` value and passed it to `EventHandlerASGIMiddleware` during its creation.
This value can then be used with `dispatch()` to ensure the correct `EventHandlerASGIMiddleware` instance is selected.

Dispatching events during a request does ***not*** require the `middleware_id`. These will continue to automatically
discover the event handler.
It's important to note that dispatching events during a request does not require the middleware_id.
The dispatcher will automatically discover the appropriate event handler.

In the following example, the id is being generated using the object id of the `FastAPI` instance. The middleware
identifier must be unique `int` but there are no other restrictions.
In the following example, the ID is generated using the object ID of the `FastAPI` instance.
The middleware identifier must be a unique `int`, but there are no other restrictions.

```python
import asyncio
Expand Down Expand Up @@ -449,22 +484,22 @@ def index(request: Request) -> JSONResponse:

Answer:

`dispatch()` relies on [ContextVars](https://docs.python.org/3/library/contextvars.html) to work properly. There are
many reasons why `LookupError` can occur. A common reason is `dispatch()` is called outside the request-response
lifecycle of FastAPI/Starlette, such as calling `dispatch()` after a response has been returned.
The proper functioning of `dispatch()` relies on [ContextVars](https://docs.python.org/3/library/contextvars.html).
Various factors can lead to a LookupError, with a common cause being the invocation of `dispatch()` outside the request-response lifecycle of FastAPI/Starlette, such as calling `dispatch()` after a response has been returned.

[This can be worked around by using a user-defined middleware_id.](#4-dispatching-events-outside-of-a-request)
If you encounter this issue, a workaround is available by using a user-defined middleware_id.
Refer to [Dispatching Events Outside of a Request](#4-dispatching-events-outside-of-a-request) for details.

If you're getting this during testing, you may consider disabling `dispatch()` during testing.
See [Suppressing Events / Disabling `dispatch()` Globally](#suppressing-events--disabling-dispatch-globally) for
If you're encountering this during testing, consider disabling `dispatch()` for testing purposes.
Refer to [Suppressing Events / Disabling `dispatch()` Globally](#suppressing-events--disabling-dispatch-globally) for
details.

2. My event handlers are not registered / Local handlers are not being executed:

Answer:

Make sure the module where your local event handlers are defined is loaded during runtime. A simple fix is to import
the module in your `__init__.py`. This will ensure the modules are properly loaded during runtime.
To ensure that the module where your local event handlers are defined is loaded during runtime, make sure to import the module in your __init__.py.
This straightforward fix guarantees the proper loading of modules during runtime.

# Feedback, Questions?

Expand Down
2 changes: 1 addition & 1 deletion fastapi_events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# handlers keeps track of all handlers registered via EventHandlerASGIMiddleware
handler_store: Dict[int, Iterable[BaseEventHandler]] = defaultdict(list)

# event_store keeps track of all events dispatched the request-response cycle
# event_store keeps track of all events dispatched in a request-response cycle
event_store: ContextVar = ContextVar("fastapi_event_store")

# in_req_res_cycle is set to allow dispatch() to work in event handlers
Expand Down
Loading