diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py index 84df5e8590ca..b9af16acd4fa 100644 --- a/synapse/replication/tcp/streams/partial_state.py +++ b/synapse/replication/tcp/streams/partial_state.py @@ -43,7 +43,7 @@ def __init__(self, hs: "HomeServer"): super().__init__( hs.get_instance_name(), # TODO(faster_joins, multiple writers): we need to account for instance names - current_token_without_instance(store.get_un_partial_stated_rooms_token), + store.get_un_partial_stated_rooms_token, store.get_un_partial_stated_rooms_from_stream, ) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 78906a5e1d9e..4408b77a059e 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1277,13 +1277,10 @@ async def get_join_event_id_and_device_lists_stream_id_for_partial_state( ) return result["join_event_id"], result["device_lists_stream_id"] - def get_un_partial_stated_rooms_token(self) -> int: - # TODO(faster_joins, multiple writers): This is inappropriate if there - # are multiple writers because workers that don't write often will - # hold all readers up. - # (See `MultiWriterIdGenerator.get_persisted_upto_position` for an - # explanation.) - return self._un_partial_stated_rooms_stream_id_gen.get_current_token() + def get_un_partial_stated_rooms_token(self, instance_name: str) -> int: + return self._un_partial_stated_rooms_stream_id_gen.get_current_token( + instance_name + ) async def get_un_partial_stated_rooms_from_stream( self, instance_name: str, last_id: int, current_id: int, limit: int