Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use fully-qualified PersistedEventPosition when returning RoomsForUser #17265

Merged
merged 7 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ async def on_make_join_request(
# This is in addition to the HS-level rate limiting applied by
# BaseFederationServlet.
# type-ignore: mypy doesn't seem able to deduce the type of the limiter(!?)
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
await self._room_member_handler._join_rate_per_room_limiter.ratelimit(
requester=None,
key=room_id,
update=False,
Expand Down Expand Up @@ -717,7 +717,7 @@ async def on_send_join_request(
SynapseTags.SEND_JOIN_RESPONSE_IS_PARTIAL_STATE,
caller_supports_partial_state,
)
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
await self._room_member_handler._join_rate_per_room_limiter.ratelimit(
requester=None,
key=room_id,
update=False,
Expand Down
10 changes: 2 additions & 8 deletions synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,7 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") ->
# Get all rooms the user is in or has been in
rooms = await self._store.get_rooms_for_local_user_where_membership_is(
user_id,
membership_list=(
Membership.JOIN,
Membership.LEAVE,
Membership.BAN,
Membership.INVITE,
Membership.KNOCK,
),
membership_list=Membership.LIST,
)

# We only try and fetch events for rooms the user has been in. If
Expand Down Expand Up @@ -179,7 +173,7 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") ->
if room.membership == Membership.JOIN:
stream_ordering = self._store.get_room_max_stream_ordering()
else:
stream_ordering = room.stream_ordering
stream_ordering = room.event_pos.stream

from_key = RoomStreamToken(topological=0, stream=0)
to_key = RoomStreamToken(stream=stream_ordering)
Copy link
Contributor Author

@MadLittleMods MadLittleMods Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On matrix.org, my /sync token looks like s4983244229_757284974_13038521_2867177461_3105561071_197466693_1334422313_10818115877_0_293162. I assume we use multiple event persisters on matrix.org so I would expect it to see the m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}. ... variant of RoomStreamToken at the front? What's happening there?

Relevant docs:

There is also a third mode for live tokens where the token starts with "m",
which is sometimes used when using sharded event persisters. In this case
the events stream is considered to be a set of streams (one for each writer)
and the token encodes the vector clock of positions of each writer in their
respective streams.
The format of the token in such case is an initial integer min position,
followed by the mapping of instance ID to position separated by '.' and '~':
m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}. ...
The `min_pos` corresponds to the minimum position all writers have persisted
up to, and then only writers that are ahead of that position need to be
encoded. An example token is:
m56~2.58~3.59
Which corresponds to a set of three (or more writers) where instances 2 and
3 (these are instance IDs that can be looked up in the DB to fetch the more
commonly used instance names) are at positions 58 and 59 respectively, and
all other instances are at position 56.

Copy link
Contributor Author

@MadLittleMods MadLittleMods Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, actually I see both used:

Ex.
m4982500302~37.4982500309~3.4982500309~2.4982500309_757284974_11788273_2866394521_3104790542_197391253_1334263147_10815910062_0_293003

Perhaps, this just depends on whether the now_token in sync v2 was updated in-place although I see rooms data in the response when we get either type of token 🤷

This isn't an initial vs incremental sync thing. It changes back-and-forth through the life-time of my sync loop.

Copy link
Contributor Author

@MadLittleMods MadLittleMods Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to (should we) be more careful about crafting the RoomStreamToken in all of the places here? (do they need instance_map?)

I know in #17187, we talked about constructing the RoomStreamToken using the instance_map to be ultra correct.

I'm racking my brain on each of these locations. My hunch, is that we should be pay closer attention and add the instance_map where RoomStreamToken(stream=room.event_pos.stream, instance_map={ <max-stream-of-each-instance-in-get_rooms_for_local_user_where_membership_is-results> })

Is this something to push off to a more broad, holistic update to how we craft stream tokens everywhere? It's at least as good as it was before in any case.

Copy link
Contributor Author

@MadLittleMods MadLittleMods Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know there was something recent about changing all of the streams to MultiWriterIdGenerator for some guarantee (they don't go backwards, or something) (edit: see #17226). Just want to make sure whether it's ok to define RoomStreamToken(..., instance_map={...}) even if we're not using multiple event persisters. Any kind of check we need before adding instance_map? The closest thing I can find is in synapse/storage/databases/main/stream.py#L565 for example.

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ async def handle_room(event: RoomsForUser) -> None:
)
elif event.membership == Membership.LEAVE:
room_end_token = RoomStreamToken(
stream=event.stream_ordering,
stream=event.event_pos.stream,
)
deferred_room_state = run_in_background(
self._state_storage_controller.get_state_for_events,
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import SerializeEventConfig
from synapse.handlers.room import ShutdownRoomParams, ShutdownRoomResponse
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process
Expand All @@ -38,6 +37,8 @@
JsonMapping,
Requester,
ScheduledTask,
ShutdownRoomParams,
ShutdownRoomResponse,
StreamKeyType,
TaskStatus,
)
Expand Down
60 changes: 2 additions & 58 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
)

import attr
from typing_extensions import TypedDict

import synapse.events.snapshot
from synapse.api.constants import (
Expand Down Expand Up @@ -81,6 +80,8 @@
RoomAlias,
RoomID,
RoomStreamToken,
ShutdownRoomParams,
ShutdownRoomResponse,
StateMap,
StrCollection,
StreamKeyType,
Expand Down Expand Up @@ -1780,63 +1781,6 @@ def get_current_key_for_room(self, room_id: str) -> Awaitable[RoomStreamToken]:
return self.store.get_current_room_stream_token_for_room_id(room_id)


class ShutdownRoomParams(TypedDict):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving ShutdownRoomParams and ShutdownRoomResponse in this PR is a random change but I wasn't able to run the tests because of the circular import here.

SYNAPSE_POSTGRES=1 SYNAPSE_POSTGRES_USER=postgres SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.replication.storage.test_events
[...]
Traceback (most recent call last):
  File "pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.12/lib/python3.12/site-packages/twisted/trial/runner.py", line 711, in loadByName
    return self.suiteFactory([self.findByName(name, recurse=recurse)])
  File "pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.12/lib/python3.12/site-packages/twisted/trial/runner.py", line 474, in findByName
    obj = reflect.namedModule(searchName)
  File "pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.12/lib/python3.12/site-packages/twisted/python/reflect.py", line 156, in namedModule
    topLevel = __import__(name)
  File "synapse/tests/replication/storage/test_events.py", line 33, in <module>
    from synapse.handlers.room import RoomEventSource
  File "synapse/synapse/handlers/room.py", line 74, in <module>
    from synapse.rest.admin._base import assert_user_is_admin
  File "synapse/synapse/rest/__init__.py", line 24, in <module>
    from synapse.rest import admin
  File "synapse/synapse/rest/admin/__init__.py", line 41, in <module>
    from synapse.handlers.pagination import PURGE_HISTORY_ACTION_NAME
  File "synapse/synapse/handlers/pagination.py", line 30, in <module>
    from synapse.handlers.room import ShutdownRoomParams, ShutdownRoomResponse
builtins.ImportError: cannot import name 'ShutdownRoomParams' from partially initialized module 'synapse.handlers.room' (most likely due to a circular import) (synapse/synapse/handlers/room.py)

"""
Attributes:
requester_user_id:
User who requested the action. Will be recorded as putting the room on the
blocking list.
new_room_user_id:
If set, a new room will be created with this user ID
as the creator and admin, and all users in the old room will be
moved into that room. If not set, no new room will be created
and the users will just be removed from the old room.
new_room_name:
A string representing the name of the room that new users will
be invited to. Defaults to `Content Violation Notification`
message:
A string containing the first message that will be sent as
`new_room_user_id` in the new room. Ideally this will clearly
convey why the original room was shut down.
Defaults to `Sharing illegal content on this server is not
permitted and rooms in violation will be blocked.`
block:
If set to `true`, this room will be added to a blocking list,
preventing future attempts to join the room. Defaults to `false`.
purge:
If set to `true`, purge the given room from the database.
force_purge:
If set to `true`, the room will be purged from database
even if there are still users joined to the room.
"""

requester_user_id: Optional[str]
new_room_user_id: Optional[str]
new_room_name: Optional[str]
message: Optional[str]
block: bool
purge: bool
force_purge: bool


class ShutdownRoomResponse(TypedDict):
"""
Attributes:
kicked_users: An array of users (`user_id`) that were kicked.
failed_to_kick_users:
An array of users (`user_id`) that that were not kicked.
local_aliases:
An array of strings representing the local aliases that were
migrated from the old room to the new.
new_room_id: A string representing the room ID of the new room.
"""

kicked_users: List[str]
failed_to_kick_users: List[str]
local_aliases: List[str]
new_room_id: Optional[str]


class RoomShutdownHandler:
DEFAULT_MESSAGE = (
"Sharing illegal content on this server is not permitted and rooms in"
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -2805,7 +2805,7 @@ async def _get_room_changes_for_initial_sync(
continue

leave_token = now_token.copy_and_replace(
StreamKeyType.ROOM, RoomStreamToken(stream=event.stream_ordering)
StreamKeyType.ROOM, RoomStreamToken(stream=event.event_pos.stream)
)
room_entries.append(
RoomSyncResultBuilder(
Expand Down
14 changes: 12 additions & 2 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ def _get_rooms_for_local_user_where_membership_is_txn(
)

sql = """
SELECT room_id, e.sender, c.membership, event_id, e.stream_ordering, r.room_version
SELECT room_id, e.sender, c.membership, event_id, e.instance_name, e.stream_ordering, r.room_version
FROM local_current_membership AS c
INNER JOIN events AS e USING (room_id, event_id)
INNER JOIN rooms AS r USING (room_id)
Expand All @@ -488,7 +488,17 @@ def _get_rooms_for_local_user_where_membership_is_txn(
)

txn.execute(sql, (user_id, *args))
results = [RoomsForUser(*r) for r in txn]
results = [
RoomsForUser(
room_id,
sender,
membership,
event_id,
PersistedEventPosition(instance_name, stream_ordering),
room_version,
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
)
for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn
]

return results

Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class RoomsForUser:
sender: str
membership: str
event_id: str
stream_ordering: int
event_pos: PersistedEventPosition
room_version_id: str


Expand Down
57 changes: 57 additions & 0 deletions synapse/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1279,3 +1279,60 @@ class ScheduledTask:
result: Optional[JsonMapping]
# Optional error that should be assigned a value when the status is FAILED
error: Optional[str]


class ShutdownRoomParams(TypedDict):
"""
Attributes:
requester_user_id:
User who requested the action. Will be recorded as putting the room on the
blocking list.
new_room_user_id:
If set, a new room will be created with this user ID
as the creator and admin, and all users in the old room will be
moved into that room. If not set, no new room will be created
and the users will just be removed from the old room.
new_room_name:
A string representing the name of the room that new users will
be invited to. Defaults to `Content Violation Notification`
message:
A string containing the first message that will be sent as
`new_room_user_id` in the new room. Ideally this will clearly
convey why the original room was shut down.
Defaults to `Sharing illegal content on this server is not
permitted and rooms in violation will be blocked.`
block:
If set to `true`, this room will be added to a blocking list,
preventing future attempts to join the room. Defaults to `false`.
purge:
If set to `true`, purge the given room from the database.
force_purge:
If set to `true`, the room will be purged from database
even if there are still users joined to the room.
"""

requester_user_id: Optional[str]
new_room_user_id: Optional[str]
new_room_name: Optional[str]
message: Optional[str]
block: bool
purge: bool
force_purge: bool


class ShutdownRoomResponse(TypedDict):
"""
Attributes:
kicked_users: An array of users (`user_id`) that were kicked.
failed_to_kick_users:
An array of users (`user_id`) that that were not kicked.
local_aliases:
An array of strings representing the local aliases that were
migrated from the old room to the new.
new_room_id: A string representing the room ID of the new room.
"""

kicked_users: List[str]
failed_to_kick_users: List[str]
local_aliases: List[str]
new_room_id: Optional[str]
5 changes: 4 additions & 1 deletion tests/replication/storage/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ def test_invites(self) -> None:
USER_ID,
"invite",
event.event_id,
event.internal_metadata.stream_ordering,
PersistedEventPosition(
self.hs.get_instance_name(),
event.internal_metadata.stream_ordering,
),
RoomVersions.V1.identifier,
)
],
Expand Down
Loading