From 51c282c81312310ea2b0271321812db1a1bf24d0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Aug 2024 10:56:36 +0100 Subject: [PATCH 1/6] SSS: Implement PREVIOUSLY room tracking Implement tracking of rooms that have had updates that have not been sent down to clients. --- synapse/handlers/sliding_sync.py | 39 +++++++++- .../sliding_sync/test_connection_tracking.py | 72 ------------------- 2 files changed, 37 insertions(+), 74 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 1db96ad41c..5c3af74095 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -540,6 +540,9 @@ async def current_sync_for_user( lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} # Keep track of the rooms that we can display and need to fetch more info about relevant_room_map: Dict[str, RoomSyncConfig] = {} + # The set of room IDs of all rooms that could appear in any list. These + # include rooms that are outside the list ranges. + all_rooms: Set[str] = set() if has_lists and sync_config.lists is not None: with start_active_span("assemble_sliding_window_lists"): sync_room_map = await self.filter_rooms_relevant_for_sync( @@ -558,6 +561,8 @@ async def current_sync_for_user( to_token, ) + all_rooms.update(filtered_sync_room_map) + # Sort the list sorted_room_info = await self.sort_rooms( filtered_sync_room_map, to_token @@ -661,6 +666,8 @@ async def current_sync_for_user( if not room_membership_for_user_at_to_token: continue + all_rooms.add(room_id) + room_membership_for_user_map[room_id] = ( room_membership_for_user_at_to_token ) @@ -768,12 +775,40 @@ async def handle_room(room_id: str) -> None: ) if has_lists or has_room_subscriptions: + # We now calculate if any rooms outside the range have had updates, + # which we are not sending down. + # + # We *must* record rooms that have had updates, but it is also fine + # to record rooms as having updates even if there might not actually + # be anything new for the user (e.g. due to event filters, events + # having happened after the user left, etc). + unsent_room_ids = [] + if from_token: + # The set of rooms that the client (may) care about, but aren't + # in any list range (or subscribed to). + missing_rooms = all_rooms - relevant_room_map.keys() + + # We now just go and try fetching any events in the above rooms + # to see if anything has happened since the `from_token`. + # + # TODO: Replace this with something faster. When we land the + # sliding sync tables that record the most recent event + # positions we can use that. + missing_event_map_by_room = ( + await self.store.get_room_events_stream_for_rooms( + missing_rooms, + from_key=from_token.stream_token.room_key, + to_key=to_token.room_key, + limit=1, + ) + ) + unsent_room_ids = list(missing_event_map_by_room) + connection_position = await self.connection_store.record_rooms( sync_config=sync_config, from_token=from_token, sent_room_ids=relevant_rooms_to_send_map.keys(), - # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids` - unsent_room_ids=[], + unsent_room_ids=unsent_room_ids, ) elif from_token: connection_position = from_token.connection_position diff --git a/tests/rest/client/sliding_sync/test_connection_tracking.py b/tests/rest/client/sliding_sync/test_connection_tracking.py index 4d8866b30a..6863c32f7c 100644 --- a/tests/rest/client/sliding_sync/test_connection_tracking.py +++ b/tests/rest/client/sliding_sync/test_connection_tracking.py @@ -21,8 +21,6 @@ from synapse.api.constants import EventTypes from synapse.rest.client import login, room, sync from synapse.server import HomeServer -from synapse.types import SlidingSyncStreamToken -from synapse.types.handlers import SlidingSyncConfig from synapse.util import Clock from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase @@ -130,7 +128,6 @@ def test_rooms_timeline_incremental_sync_PREVIOUSLY(self, limited: bool) -> None self.helper.send(room_id1, "msg", tok=user1_tok) timeline_limit = 5 - conn_id = "conn_id" sync_body = { "lists": { "foo-list": { @@ -170,40 +167,6 @@ def test_rooms_timeline_incremental_sync_PREVIOUSLY(self, limited: bool) -> None response_body["rooms"].keys(), {room_id2}, response_body["rooms"] ) - # FIXME: This is a hack to record that the first room wasn't sent down - # sync, as we don't implement that currently. - sliding_sync_handler = self.hs.get_sliding_sync_handler() - requester = self.get_success( - self.hs.get_auth().get_user_by_access_token(user1_tok) - ) - sync_config = SlidingSyncConfig( - user=requester.user, - requester=requester, - conn_id=conn_id, - ) - - parsed_initial_from_token = self.get_success( - SlidingSyncStreamToken.from_string(self.store, initial_from_token) - ) - connection_position = self.get_success( - sliding_sync_handler.connection_store.record_rooms( - sync_config, - parsed_initial_from_token, - sent_room_ids=[], - unsent_room_ids=[room_id1], - ) - ) - - # FIXME: Now fix up `from_token` with new connect position above. - parsed_from_token = self.get_success( - SlidingSyncStreamToken.from_string(self.store, from_token) - ) - parsed_from_token = SlidingSyncStreamToken( - stream_token=parsed_from_token.stream_token, - connection_position=connection_position, - ) - from_token = self.get_success(parsed_from_token.to_string(self.store)) - # We now send another event to room1, so we should sync all the missing events. resp = self.helper.send(room_id1, "msg2", tok=user1_tok) expected_events.append(resp["event_id"]) @@ -238,7 +201,6 @@ def test_rooms_required_state_incremental_sync_PREVIOUSLY(self) -> None: self.helper.send(room_id1, "msg", tok=user1_tok) - conn_id = "conn_id" sync_body = { "lists": { "foo-list": { @@ -279,40 +241,6 @@ def test_rooms_required_state_incremental_sync_PREVIOUSLY(self) -> None: response_body["rooms"].keys(), {room_id2}, response_body["rooms"] ) - # FIXME: This is a hack to record that the first room wasn't sent down - # sync, as we don't implement that currently. - sliding_sync_handler = self.hs.get_sliding_sync_handler() - requester = self.get_success( - self.hs.get_auth().get_user_by_access_token(user1_tok) - ) - sync_config = SlidingSyncConfig( - user=requester.user, - requester=requester, - conn_id=conn_id, - ) - - parsed_initial_from_token = self.get_success( - SlidingSyncStreamToken.from_string(self.store, initial_from_token) - ) - connection_position = self.get_success( - sliding_sync_handler.connection_store.record_rooms( - sync_config, - parsed_initial_from_token, - sent_room_ids=[], - unsent_room_ids=[room_id1], - ) - ) - - # FIXME: Now fix up `from_token` with new connect position above. - parsed_from_token = self.get_success( - SlidingSyncStreamToken.from_string(self.store, from_token) - ) - parsed_from_token = SlidingSyncStreamToken( - stream_token=parsed_from_token.stream_token, - connection_position=connection_position, - ) - from_token = self.get_success(parsed_from_token.to_string(self.store)) - # We now send another event to room1, so we should sync all the missing state. self.helper.send(room_id1, "msg", tok=user1_tok) From 50f47345c14fd20a38df8afd27e62fb18f42308c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Aug 2024 10:58:33 +0100 Subject: [PATCH 2/6] Newsfile --- changelog.d/17535.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17535.bugfix diff --git a/changelog.d/17535.bugfix b/changelog.d/17535.bugfix new file mode 100644 index 0000000000..c5b5da0485 --- /dev/null +++ b/changelog.d/17535.bugfix @@ -0,0 +1 @@ +Fix experimental sliding sync implementation to remember any updates in rooms that were not sent down immediately. From 2226ef079015273478394f3bbead9c8d3baa1407 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Aug 2024 17:26:45 +0100 Subject: [PATCH 3/6] Filter out lazy loading --- synapse/handlers/sliding_sync.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 5c3af74095..503b8f36ea 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -561,8 +561,6 @@ async def current_sync_for_user( to_token, ) - all_rooms.update(filtered_sync_room_map) - # Sort the list sorted_room_info = await self.sort_rooms( filtered_sync_room_map, to_token @@ -588,6 +586,18 @@ async def current_sync_for_user( and StateValues.LAZY in membership_state_keys ) + if lazy_loading: + # Exclude partially-stated rooms unless the `required_state` + # only has `["m.room.member", "$LAZY"]` for membership + # (lazy-loading room members). + filtered_sync_room_map = { + room_id: room + for room_id, room in filtered_sync_room_map.items() + if room_id not in partial_state_room_map + } + + all_rooms.update(filtered_sync_room_map) + ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] if list_config.ranges: for range in list_config.ranges: @@ -605,15 +615,6 @@ async def current_sync_for_user( if len(room_ids_in_list) >= max_num_rooms: break - # Exclude partially-stated rooms unless the `required_state` - # only has `["m.room.member", "$LAZY"]` for membership - # (lazy-loading room members). - if ( - partial_state_room_map.get(room_id) - and not lazy_loading - ): - continue - # Take the superset of the `RoomSyncConfig` for each room. # # Update our `relevant_room_map` with the room we're going From aa8cda95741aaee92ff503cf441938fe1cb93d85 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Aug 2024 17:38:48 +0100 Subject: [PATCH 4/6] D'oh --- synapse/handlers/sliding_sync.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 503b8f36ea..45739ccd35 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -561,11 +561,6 @@ async def current_sync_for_user( to_token, ) - # Sort the list - sorted_room_info = await self.sort_rooms( - filtered_sync_room_map, to_token - ) - # Find which rooms are partially stated and may need to be filtered out # depending on the `required_state` requested (see below). partial_state_room_map = ( @@ -586,18 +581,23 @@ async def current_sync_for_user( and StateValues.LAZY in membership_state_keys ) - if lazy_loading: + if not lazy_loading: # Exclude partially-stated rooms unless the `required_state` # only has `["m.room.member", "$LAZY"]` for membership # (lazy-loading room members). filtered_sync_room_map = { room_id: room for room_id, room in filtered_sync_room_map.items() - if room_id not in partial_state_room_map + if not partial_state_room_map.get(room_id) } all_rooms.update(filtered_sync_room_map) + # Sort the list + sorted_room_info = await self.sort_rooms( + filtered_sync_room_map, to_token + ) + ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] if list_config.ranges: for range in list_config.ranges: From 45332f7b62fd0f12e1cb80ca45716c4ec7dac270 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Aug 2024 09:26:00 +0100 Subject: [PATCH 5/6] Fix merge --- synapse/handlers/sliding_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index c78dfa79db..678ebad037 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -800,7 +800,7 @@ async def handle_room(room_id: str) -> None: # positions we can use that. missing_event_map_by_room = ( await self.store.get_room_events_stream_for_rooms( - missing_rooms, + room_ids=missing_rooms, from_key=from_token.stream_token.room_key, to_key=to_token.room_key, limit=1, From 1f1130864508810494bc75923250a80d47fe4c80 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Aug 2024 09:54:30 +0100 Subject: [PATCH 6/6] Fix merge --- synapse/handlers/sliding_sync.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 678ebad037..18a96843be 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -801,8 +801,8 @@ async def handle_room(room_id: str) -> None: missing_event_map_by_room = ( await self.store.get_room_events_stream_for_rooms( room_ids=missing_rooms, - from_key=from_token.stream_token.room_key, - to_key=to_token.room_key, + from_key=to_token.room_key, + to_key=from_token.stream_token.room_key, limit=1, ) )