Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve docstrings for methods related to sending EDUs to application services #11138

Merged
merged 16 commits into from
Oct 21, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11138.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add docstrings and comments to the application service ephemeral event sending code.
99 changes: 87 additions & 12 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,26 @@ def notify_interested_services_ephemeral(
new_token: Optional[int],
users: Optional[Collection[Union[str, UserID]]] = None,
) -> None:
"""This is called by the notifier in the background
when a ephemeral event handled by the homeserver.

This will determine which appservices
are interested in the event, and submit them.
"""
This is called by the notifier in the background when an ephemeral event is handled
by the homeserver.

Events will only be pushed to appservices
that have opted into ephemeral events
This will determine which appservices are interested in the event, and submit them.

Args:
stream_key: The stream the event came from.
new_token: The latest stream token
users: The user(s) involved with the event.

`stream_key` can "typing_key", "receipt_key" or "presence_key". Any other
value for `stream_key` will cause this function to return early.
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved

Ephemeral events will only be pushed to appservices that have opted into
them.

Appservices will only receive ephemeral events that fall within their
registered user and room namespaces.

new_token: The latest stream token.
users: The users that should be informed of the new event, if any.
"""
if not self.notify_appservices:
return
Expand Down Expand Up @@ -232,40 +239,91 @@ async def _notify_interested_services_ephemeral(
for service in services:
# Only handle typing if we have the latest token
if stream_key == "typing_key" and new_token is not None:
# Note that we don't persist the token (via set_type_stream_id_for_appservice)
# for typing_key due to performance reasons and due to their highly
# ephemeral nature.
#
# Instead we simply grab the latest typing updates in _handle_typing
# and, if they apply to this application service, send it off.
events = await self._handle_typing(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
# We don't persist the token for typing_key for performance reasons

elif stream_key == "receipt_key":
events = await self._handle_receipts(service)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)

# Persist the latest handled stream token for this appservice
# TODO: We update the stream token for each appservice, even
# if sending the ephemeral events to the appservice failed.
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
)

elif stream_key == "presence_key":
events = await self._handle_presence(service, users)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)

# Persist the latest handled stream token for this appservice
# TODO: We update the stream token for each appservice, even
# if sending the ephemeral events to the appservice failed.
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)

async def _handle_typing(
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
"""
Return the typing events since the given stream token that the given application
service should receive.

First fetch all typing events between the given typing stream token (non-inclusive)
and the latest typing event stream token (inclusive). Then return only those typing
events that the given application service may be interested in.

Args:
service: The application service to check for which events it should receive.
new_token: A typing event stream token.

Returns:
A list of JSON dictionaries containing data derived from the typing events that
should be sent to the given application service.
"""
typing_source = self.event_sources.sources.typing
# Get the typing events from just before current
typing, _ = await typing_source.get_new_events_as(
service=service,
# For performance reasons, we don't persist the previous
# token in the DB and instead fetch the latest typing information
# token in the DB and instead fetch the latest typing event
# for appservices.
# TODO: It'd likely be more efficient to simply fetch the
# typing event with the given 'new_token' stream token and
# check if the given service was interested, rather than
# iterating over all typing events and only grabbing the
# latest few.
from_key=new_token - 1,
)
return typing

async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
"""
Return the latest read receipts that the given application service should receive.

First fetch all read receipts between the last receipt stream token that this
application service should have previously received (non-inclusive) and the
latest read receipt stream token (inclusive). Then from that set, return only
those read receipts that the given application service may be interested in.

Args:
service: The application service to check for which events it should receive.

Returns:
A list of JSON dictionaries containing data derived from the read receipts that
should be sent to the given application service.
"""
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
Expand All @@ -278,6 +336,22 @@ async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
async def _handle_presence(
self, service: ApplicationService, users: Collection[Union[str, UserID]]
) -> List[JsonDict]:
"""
Return the latest presence updates that the given application service should receive.

First, filter the given users list to those that the application service is
interested in. Then retrieve the latest presence updates since the
the last-known previously received presence stream token for the given
application service. Return those presence updates.

Args:
service: The application service that ephemeral events are being sent to.
users: The users that should receive the presence update.

Returns:
A list of json dictionaries containing data derived from the presence events
that should be sent to the given application service.
"""
events: List[JsonDict] = []
presence_source = self.event_sources.sources.presence
from_key = await self.store.get_type_stream_id_for_appservice(
Expand All @@ -290,9 +364,10 @@ async def _handle_presence(
interested = await service.is_interested_in_presence(user, self.store)
if not interested:
continue

# TODO: Make presence_events a Set. These presence events should be de-duplicated.
presence_events, _ = await presence_source.get_new_events(
user=user,
service=service,
from_key=from_key,
)
time_now = self.clock.time_msec()
Expand Down
4 changes: 4 additions & 0 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,10 @@ async def notify_device_update(
) -> None:
"""Notify that a user's device(s) has changed. Pokes the notifier, and
remote servers if the user is local.

Args:
user_id: The Matrix ID of the user who's device list has been updated.
device_ids: The device IDs that have changed.
"""
if not device_ids:
# No changes to notify about, so this is a no-op.
Expand Down
34 changes: 29 additions & 5 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
from synapse.api.constants import EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError
from synapse.api.presence import UserPresenceState
from synapse.appservice import ApplicationService
from synapse.events.presence_router import PresenceRouter
from synapse.logging.context import run_in_background
from synapse.logging.utils import log_function
Expand Down Expand Up @@ -1483,11 +1482,37 @@ def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) ->
def format_user_presence_state(
state: UserPresenceState, now: int, include_user_id: bool = True
) -> JsonDict:
"""Convert UserPresenceState to a format that can be sent down to clients
"""Convert UserPresenceState to a JSON format that can be sent down to clients
and to other servers.

The "user_id" is optional so that this function can be used to format presence
updates for client /sync responses and for federation /send requests.
Args:
state: The user presence state to format.
now: The current timestamp since the epoch in ms.
include_user_id: Whether to include `user_id` in the returned dictionary.
As this function can be used both to format presence updates for client /sync
responses and for federation /send requests, only the latter needs the include
the `user_id` field.

Returns:
A JSON dictionary with the following keys:
* presence: The presence state as a str.
* user_id: Optional. Included if `include_user_id` is truthy. The canonical
Matrix ID of the user.
* last_active_ago: Optional. Included if `last_active_ts` is set on `state`.
The timestamp that the user was last active.
* status_msg: Optional. Included if `status_msg` is set on `state`. The user's
status.
* currently_active: Optional. Included only if `state.state` is "online".

Example:

{
"presence": "online",
"user_id": "@alice:example.com",
"last_active_ago": 16783813918,
"status_msg": "Hello world!",
"currently_active": True
}
"""
content: JsonDict = {"presence": state.state}
if include_user_id:
Expand Down Expand Up @@ -1526,7 +1551,6 @@ async def get_new_events(
is_guest: bool = False,
explicit_room_id: Optional[str] = None,
include_offline: bool = True,
service: Optional[ApplicationService] = None,
) -> Tuple[List[UserPresenceState], int]:
# The process for getting presence events are:
# 1. Get the rooms the user is in.
Expand Down
12 changes: 11 additions & 1 deletion synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,18 @@ async def get_new_events(
async def get_new_events_as(
self, from_key: int, service: ApplicationService
) -> Tuple[List[JsonDict], int]:
"""Returns a set of new receipt events that an appservice
"""Returns a set of new read receipt events that an appservice
may be interested in.

Args:
from_key: the stream position at which events should be fetched from
service: The appservice which may be interested

Returns:
A two-tuple containing the following:
* A list of json dictionaries derived from read receipts that the
appservice may be interested in.
* The current read receipt stream token.
"""
from_key = int(from_key)
to_key = self.get_current_key()
Expand All @@ -261,6 +267,10 @@ async def get_new_events_as(
)

# Then filter down to rooms that the AS can read
# TODO: This doesn't honour an appservice's registration of room or namespace
# aliases. For instance, if an appservice registered a room namespace that
# matched this room, but it didn't have any members in the room, then that
# appservice wouldn't receive the read receipt.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't and should is the implication of this?

events = []
for room_id, event in rooms_to_events.items():
if not await service.matches_user_in_member_list(room_id, self.store):
Expand Down
16 changes: 13 additions & 3 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,17 +465,27 @@ async def get_new_events_as(
may be interested in.

Args:
from_key: the stream position at which events should be fetched from
service: The appservice which may be interested
from_key: the stream position at which events should be fetched from.
service: The appservice which may be interested.

Returns:
A two-tuple containing the following:
* A list of json dictionaries derived from typing events that the
appservice may be interested in.
* The latest known room serial.
"""
with Measure(self.clock, "typing.get_new_events_as"):
from_key = int(from_key)
handler = self.get_typing_handler()

events = []
for room_id in handler._room_serials.keys():
if handler._room_serials[room_id] <= from_key:
continue

# TODO: This doesn't to honour an appservice's registration of room or namespace
# aliases. For instance, if an appservice registered a room namespace that
# matched this room, but it didn't have any members in the room, then that
# appservice wouldn't receive the typing event.
if not await service.matches_user_in_member_list(
room_id, handler.store
):
Expand Down
18 changes: 16 additions & 2 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,14 @@ def _notify_app_services_ephemeral(
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
):
) -> None:
"""Notify application services of ephemeral event activity.

Args:
stream_key: The stream the event came from.
new_token: The value of the new stream token.
users: The users that should be informed of the new event, if any.
"""
try:
stream_token = None
if isinstance(new_token, int):
Expand All @@ -402,10 +409,17 @@ def on_new_event(
new_token: Union[int, RoomStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
rooms: Optional[Collection[str]] = None,
):
) -> None:
"""Used to inform listeners that something has happened event wise.

Will wake up all listeners for the given users and rooms.

Args:
stream_key: The stream the event came from.
new_token: The value of the new stream token.
users: The users that should be informed of the new event.
rooms: A collection of room IDs for which each joined member will be
informed of the new event.
"""
users = users or []
rooms = rooms or []
Expand Down