From 271a196121beac9f05c65c69859e6efdd273c1f1 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 3 Jun 2024 16:07:56 -0500 Subject: [PATCH 1/7] Use fully-qualified `PersistedEventPosition` when returning membership for user Spawning from https://github.com/element-hq/synapse/pull/17187 --- synapse/storage/databases/main/roommember.py | 14 ++++++++++++-- synapse/storage/roommember.py | 2 +- tests/replication/storage/test_events.py | 4 +++- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 9fddbb2caf..642cf96af8 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -476,7 +476,7 @@ def _get_rooms_for_local_user_where_membership_is_txn( ) sql = """ - SELECT room_id, e.sender, c.membership, event_id, e.stream_ordering, r.room_version + SELECT room_id, e.sender, c.membership, event_id, e.instance_name, e.stream_ordering, r.room_version FROM local_current_membership AS c INNER JOIN events AS e USING (room_id, event_id) INNER JOIN rooms AS r USING (room_id) @@ -488,7 +488,17 @@ def _get_rooms_for_local_user_where_membership_is_txn( ) txn.execute(sql, (user_id, *args)) - results = [RoomsForUser(*r) for r in txn] + results = [ + RoomsForUser( + room_id, + sender, + membership, + event_id, + PersistedEventPosition(instance_name, stream_ordering), + room_version, + ) + for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn + ] return results diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 7471f81a19..80c9630867 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -35,7 +35,7 @@ class RoomsForUser: sender: str membership: str event_id: str - stream_ordering: int + event_pos: PersistedEventPosition room_version_id: str diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py index 86c8f14d1b..3df7fb89b5 100644 --- a/tests/replication/storage/test_events.py +++ b/tests/replication/storage/test_events.py @@ -154,7 +154,9 @@ def test_invites(self) -> None: USER_ID, "invite", event.event_id, - event.internal_metadata.stream_ordering, + PersistedEventPosition( + "master", event.internal_metadata.stream_ordering + ), RoomVersions.V1.identifier, ) ], From 4155e18d76bf45ea3b10214eae4054088585ef25 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 3 Jun 2024 16:18:16 -0500 Subject: [PATCH 2/7] Fix circular imports when running specific tests Before: ``` $ SYNAPSE_POSTGRES=1 SYNAPSE_POSTGRES_USER=postgres SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.replication.storage.test_events [...] Traceback (most recent call last): File "pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.12/lib/python3.12/site-packages/twisted/trial/runner.py", line 711, in loadByName return self.suiteFactory([self.findByName(name, recurse=recurse)]) File "pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.12/lib/python3.12/site-packages/twisted/trial/runner.py", line 474, in findByName obj = reflect.namedModule(searchName) File "pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.12/lib/python3.12/site-packages/twisted/python/reflect.py", line 156, in namedModule topLevel = __import__(name) File "synapse/tests/replication/storage/test_events.py", line 33, in from synapse.handlers.room import RoomEventSource File "synapse/synapse/handlers/room.py", line 74, in from synapse.rest.admin._base import assert_user_is_admin File "synapse/synapse/rest/__init__.py", line 24, in from synapse.rest import admin File "synapse/synapse/rest/admin/__init__.py", line 41, in from synapse.handlers.pagination import PURGE_HISTORY_ACTION_NAME File "synapse/synapse/handlers/pagination.py", line 30, in from synapse.handlers.room import ShutdownRoomParams, ShutdownRoomResponse builtins.ImportError: cannot import name 'ShutdownRoomParams' from partially initialized module 'synapse.handlers.room' (most likely due to a circular import) (synapse/synapse/handlers/room.py) ``` --- synapse/handlers/pagination.py | 3 +- synapse/handlers/room.py | 59 ++-------------------------------- synapse/types/__init__.py | 57 ++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 58 deletions(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 6617105cdb..f7447b8ba5 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -27,7 +27,6 @@ from synapse.api.errors import SynapseError from synapse.api.filtering import Filter from synapse.events.utils import SerializeEventConfig -from synapse.handlers.room import ShutdownRoomParams, ShutdownRoomResponse from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME from synapse.logging.opentracing import trace from synapse.metrics.background_process_metrics import run_as_background_process @@ -38,6 +37,8 @@ JsonMapping, Requester, ScheduledTask, + ShutdownRoomParams, + ShutdownRoomResponse, StreamKeyType, TaskStatus, ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 51739a2653..eab400f79f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -81,6 +81,8 @@ RoomAlias, RoomID, RoomStreamToken, + ShutdownRoomParams, + ShutdownRoomResponse, StateMap, StrCollection, StreamKeyType, @@ -1780,63 +1782,6 @@ def get_current_key_for_room(self, room_id: str) -> Awaitable[RoomStreamToken]: return self.store.get_current_room_stream_token_for_room_id(room_id) -class ShutdownRoomParams(TypedDict): - """ - Attributes: - requester_user_id: - User who requested the action. Will be recorded as putting the room on the - blocking list. - new_room_user_id: - If set, a new room will be created with this user ID - as the creator and admin, and all users in the old room will be - moved into that room. If not set, no new room will be created - and the users will just be removed from the old room. - new_room_name: - A string representing the name of the room that new users will - be invited to. Defaults to `Content Violation Notification` - message: - A string containing the first message that will be sent as - `new_room_user_id` in the new room. Ideally this will clearly - convey why the original room was shut down. - Defaults to `Sharing illegal content on this server is not - permitted and rooms in violation will be blocked.` - block: - If set to `true`, this room will be added to a blocking list, - preventing future attempts to join the room. Defaults to `false`. - purge: - If set to `true`, purge the given room from the database. - force_purge: - If set to `true`, the room will be purged from database - even if there are still users joined to the room. - """ - - requester_user_id: Optional[str] - new_room_user_id: Optional[str] - new_room_name: Optional[str] - message: Optional[str] - block: bool - purge: bool - force_purge: bool - - -class ShutdownRoomResponse(TypedDict): - """ - Attributes: - kicked_users: An array of users (`user_id`) that were kicked. - failed_to_kick_users: - An array of users (`user_id`) that that were not kicked. - local_aliases: - An array of strings representing the local aliases that were - migrated from the old room to the new. - new_room_id: A string representing the room ID of the new room. - """ - - kicked_users: List[str] - failed_to_kick_users: List[str] - local_aliases: List[str] - new_room_id: Optional[str] - - class RoomShutdownHandler: DEFAULT_MESSAGE = ( "Sharing illegal content on this server is not permitted and rooms in" diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 151658df53..3a89787cab 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -1279,3 +1279,60 @@ class ScheduledTask: result: Optional[JsonMapping] # Optional error that should be assigned a value when the status is FAILED error: Optional[str] + + +class ShutdownRoomParams(TypedDict): + """ + Attributes: + requester_user_id: + User who requested the action. Will be recorded as putting the room on the + blocking list. + new_room_user_id: + If set, a new room will be created with this user ID + as the creator and admin, and all users in the old room will be + moved into that room. If not set, no new room will be created + and the users will just be removed from the old room. + new_room_name: + A string representing the name of the room that new users will + be invited to. Defaults to `Content Violation Notification` + message: + A string containing the first message that will be sent as + `new_room_user_id` in the new room. Ideally this will clearly + convey why the original room was shut down. + Defaults to `Sharing illegal content on this server is not + permitted and rooms in violation will be blocked.` + block: + If set to `true`, this room will be added to a blocking list, + preventing future attempts to join the room. Defaults to `false`. + purge: + If set to `true`, purge the given room from the database. + force_purge: + If set to `true`, the room will be purged from database + even if there are still users joined to the room. + """ + + requester_user_id: Optional[str] + new_room_user_id: Optional[str] + new_room_name: Optional[str] + message: Optional[str] + block: bool + purge: bool + force_purge: bool + + +class ShutdownRoomResponse(TypedDict): + """ + Attributes: + kicked_users: An array of users (`user_id`) that were kicked. + failed_to_kick_users: + An array of users (`user_id`) that that were not kicked. + local_aliases: + An array of strings representing the local aliases that were + migrated from the old room to the new. + new_room_id: A string representing the room ID of the new room. + """ + + kicked_users: List[str] + failed_to_kick_users: List[str] + local_aliases: List[str] + new_room_id: Optional[str] From 939695dbb550a40ca9c5e65415c978f6e3946854 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 3 Jun 2024 16:41:19 -0500 Subject: [PATCH 3/7] Update usage --- synapse/handlers/admin.py | 10 ++-------- synapse/handlers/initial_sync.py | 2 +- synapse/handlers/room.py | 1 - synapse/handlers/sync.py | 2 +- 4 files changed, 4 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 702d40332c..21d3bb37f3 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -126,13 +126,7 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") -> # Get all rooms the user is in or has been in rooms = await self._store.get_rooms_for_local_user_where_membership_is( user_id, - membership_list=( - Membership.JOIN, - Membership.LEAVE, - Membership.BAN, - Membership.INVITE, - Membership.KNOCK, - ), + membership_list=Membership.LIST, ) # We only try and fetch events for rooms the user has been in. If @@ -179,7 +173,7 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") -> if room.membership == Membership.JOIN: stream_ordering = self._store.get_room_max_stream_ordering() else: - stream_ordering = room.stream_ordering + stream_ordering = room.event_pos.stream from_key = RoomStreamToken(topological=0, stream=0) to_key = RoomStreamToken(stream=stream_ordering) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index d99fc4bec0..84d6fecf31 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -199,7 +199,7 @@ async def handle_room(event: RoomsForUser) -> None: ) elif event.membership == Membership.LEAVE: room_end_token = RoomStreamToken( - stream=event.stream_ordering, + stream=event.event_pos.stream, ) deferred_room_state = run_in_background( self._state_storage_controller.get_state_for_events, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index eab400f79f..7f1b674d10 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -40,7 +40,6 @@ ) import attr -from typing_extensions import TypedDict import synapse.events.snapshot from synapse.api.constants import ( diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 1d7d9dfdd0..e815e0ea7f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -2805,7 +2805,7 @@ async def _get_room_changes_for_initial_sync( continue leave_token = now_token.copy_and_replace( - StreamKeyType.ROOM, RoomStreamToken(stream=event.stream_ordering) + StreamKeyType.ROOM, RoomStreamToken(stream=event.event_pos.stream) ) room_entries.append( RoomSyncResultBuilder( From 73c20d961fa964d6c9dc466c7c34eebbd6cc993f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 3 Jun 2024 16:45:08 -0500 Subject: [PATCH 4/7] Use method to get instance name in tests --- tests/replication/storage/test_events.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py index 3df7fb89b5..4e41a1c912 100644 --- a/tests/replication/storage/test_events.py +++ b/tests/replication/storage/test_events.py @@ -155,7 +155,8 @@ def test_invites(self) -> None: "invite", event.event_id, PersistedEventPosition( - "master", event.internal_metadata.stream_ordering + self.hs.get_instance_name(), + event.internal_metadata.stream_ordering, ), RoomVersions.V1.identifier, ) From 7b41f412c67a781827088a6345223573d996fe32 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 3 Jun 2024 16:46:34 -0500 Subject: [PATCH 5/7] Fix random lints --- synapse/federation/federation_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 7ffc650aa1..1932fa82a4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -674,7 +674,7 @@ async def on_make_join_request( # This is in addition to the HS-level rate limiting applied by # BaseFederationServlet. # type-ignore: mypy doesn't seem able to deduce the type of the limiter(!?) - await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type] + await self._room_member_handler._join_rate_per_room_limiter.ratelimit( requester=None, key=room_id, update=False, @@ -717,7 +717,7 @@ async def on_send_join_request( SynapseTags.SEND_JOIN_RESPONSE_IS_PARTIAL_STATE, caller_supports_partial_state, ) - await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type] + await self._room_member_handler._join_rate_per_room_limiter.ratelimit( requester=None, key=room_id, update=False, From 09638ac31dc7635462f8c9f41ac5da9698c2e5d9 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 3 Jun 2024 17:51:03 -0500 Subject: [PATCH 6/7] Add changelog --- changelog.d/17265.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17265.misc diff --git a/changelog.d/17265.misc b/changelog.d/17265.misc new file mode 100644 index 0000000000..e6d4d8b4ee --- /dev/null +++ b/changelog.d/17265.misc @@ -0,0 +1 @@ +Use fully-qualified `PersistedEventPosition` when returning `RoomsForUser` to facilitate proper comparisons and `RoomStreamToken` generation. From cc35e423ff640463f6842cf921dcd6bc06c90b26 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 4 Jun 2024 12:04:28 -0500 Subject: [PATCH 7/7] Use keyword args (too many args) Co-authored-by: Erik Johnston --- synapse/storage/databases/main/roommember.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 642cf96af8..d8b54dc4e3 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -490,12 +490,12 @@ def _get_rooms_for_local_user_where_membership_is_txn( txn.execute(sql, (user_id, *args)) results = [ RoomsForUser( - room_id, - sender, - membership, - event_id, - PersistedEventPosition(instance_name, stream_ordering), - room_version, + room_id=room_id, + sender=sender, + membership=membership, + event_id=event_id, + event_pos=PersistedEventPosition(instance_name, stream_ordering), + room_version_id=room_version, ) for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn ]