From 75e2c17d2a4ebe9e8841e1f7229f9d12ea1c3999 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Sep 2024 08:12:56 +0100 Subject: [PATCH] Speed up sorting of sliding sync rooms in initial request (#17734) We do this by using the event stream cache. --------- Co-authored-by: Devon Hudson --- changelog.d/17734.misc | 1 + synapse/handlers/sliding_sync/room_lists.py | 71 ++++++++++++++++++++- 2 files changed, 70 insertions(+), 2 deletions(-) create mode 100644 changelog.d/17734.misc diff --git a/changelog.d/17734.misc b/changelog.d/17734.misc new file mode 100644 index 0000000000..0bb5533ab6 --- /dev/null +++ b/changelog.d/17734.misc @@ -0,0 +1 @@ +Minor speed up of initial sliding sync requests. diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index bf19eb735b..0c9722021a 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -420,6 +420,10 @@ async def _compute_interested_rooms_new_tables( Dict[str, RoomsForUserType], filtered_sync_room_map ), to_token, + # We only need to sort the rooms up to the end + # of the largest range. Both sides of range are + # inclusive so we `+ 1`. + limit=max(range[1] + 1 for range in list_config.ranges), ) for range in list_config.ranges: @@ -462,7 +466,7 @@ async def _compute_interested_rooms_new_tables( ) lists[list_key] = SlidingSyncResult.SlidingWindowList( - count=len(sorted_room_info), + count=len(filtered_sync_room_map), ops=ops, ) @@ -1980,15 +1984,21 @@ async def sort_rooms( self, sync_room_map: Dict[str, RoomsForUserType], to_token: StreamToken, + limit: Optional[int] = None, ) -> List[RoomsForUserType]: """ Sort by `stream_ordering` of the last event that the user should see in the room. `stream_ordering` is unique so we get a stable sort. + If `limit` is specified then sort may return fewer entries, but will + always return at least the top N rooms. This is useful as we don't always + need to sort the full list, but are just interested in the top N. + Args: sync_room_map: Dictionary of room IDs to sort along with membership information in the room at the time of `to_token`. to_token: We sort based on the events in the room at this token (<= `to_token`) + limit: The number of rooms that we need to return from the top of the list. Returns: A sorted list of room IDs by `stream_ordering` along with membership information. @@ -1998,8 +2008,23 @@ async def sort_rooms( # user should see in the room (<= `to_token`) last_activity_in_room_map: Dict[str, int] = {} + # Same as above, except for positions that we know are in the event + # stream cache. + cached_positions: Dict[str, int] = {} + + earliest_cache_position = ( + self.store._events_stream_cache.get_earliest_known_position() + ) + for room_id, room_for_user in sync_room_map.items(): - if room_for_user.membership != Membership.JOIN: + if room_for_user.membership == Membership.JOIN: + # For joined rooms check the stream change cache. + cached_position = ( + self.store._events_stream_cache.get_max_pos_of_last_change(room_id) + ) + if cached_position is not None: + cached_positions[room_id] = cached_position + else: # If the user has left/been invited/knocked/been banned from a # room, they shouldn't see anything past that point. # @@ -2009,6 +2034,48 @@ async def sort_rooms( # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 last_activity_in_room_map[room_id] = room_for_user.event_pos.stream + # If the stream position is in range of the stream change cache + # we can include it. + if room_for_user.event_pos.stream > earliest_cache_position: + cached_positions[room_id] = room_for_user.event_pos.stream + + # If we are only asked for the top N rooms, and we have enough from + # looking in the stream change cache, then we can return early. This + # is because the cache must include all entries above + # `.get_earliest_known_position()`. + if limit is not None and len(cached_positions) >= limit: + # ... but first we need to handle the case where the cached max + # position is greater than the to_token, in which case we do + # actually query the DB. This should happen rarely, so can do it in + # a loop. + for room_id, position in list(cached_positions.items()): + if position > to_token.room_key.stream: + result = await self.store.get_last_event_pos_in_room_before_stream_ordering( + room_id, to_token.room_key + ) + if ( + result is not None + and result[1].stream > earliest_cache_position + ): + # We have a stream position in the cached range. + cached_positions[room_id] = result[1].stream + else: + # No position in the range, so we remove the entry. + cached_positions.pop(room_id) + + if limit is not None and len(cached_positions) >= limit: + return sorted( + ( + room + for room in sync_room_map.values() + if room.room_id in cached_positions + ), + # Sort by the last activity (stream_ordering) in the room + key=lambda room_info: cached_positions[room_info.room_id], + # We want descending order + reverse=True, + ) + # For fully-joined rooms, we find the latest activity at/before the # `to_token`. joined_room_positions = (