Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit size of presence EDUs #17371

Merged
merged 5 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17371.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Limit size of presence EDUs to 50 entries.
31 changes: 20 additions & 11 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#
import datetime
import logging
from collections import OrderedDict
from types import TracebackType
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, Type

Expand Down Expand Up @@ -68,6 +69,10 @@
# If the retry interval is larger than this then we enter "catchup" mode
CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000

# Limit how many presence states we add to each presence EDU, to ensure that
# they are bounded in size.
MAX_PRESENCE_STATES_PER_EDU = 50


class PerDestinationQueue:
"""
Expand Down Expand Up @@ -144,7 +149,7 @@ def __init__(

# Map of user_id -> UserPresenceState of pending presence to be sent to this
# destination
self._pending_presence: Dict[str, UserPresenceState] = {}
self._pending_presence: OrderedDict[str, UserPresenceState] = OrderedDict()

# List of room_id -> receipt_type -> user_id -> receipt_dict,
#
Expand Down Expand Up @@ -399,7 +404,7 @@ async def _transaction_transmission_loop(self) -> None:
# through another mechanism, because this is all volatile!
self._pending_edus = []
self._pending_edus_keyed = {}
self._pending_presence = {}
self._pending_presence.clear()
self._pending_receipt_edus = []

self._start_catching_up()
Expand Down Expand Up @@ -721,22 +726,26 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:

# Add presence EDU.
if self.queue._pending_presence:
# Only send max 50 presence entries in the EDU, to bound the amount
# of data we're sending.
presence_to_add: List[JsonDict] = []
while (
self.queue._pending_presence
and len(presence_to_add) < MAX_PRESENCE_STATES_PER_EDU
):
_, presence = self.queue._pending_presence.popitem(last=False)
presence_to_add.append(
format_user_presence_state(presence, self.queue._clock.time_msec())
)

pending_edus.append(
Edu(
origin=self.queue._server_name,
destination=self.queue._destination,
edu_type=EduTypes.PRESENCE,
content={
"push": [
format_user_presence_state(
presence, self.queue._clock.time_msec()
)
for presence in self.queue._pending_presence.values()
]
},
content={"push": presence_to_add},
)
)
self.queue._pending_presence = {}

# Add read receipt EDUs.
pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))
Expand Down
119 changes: 119 additions & 0 deletions tests/federation/test_federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from twisted.test.proto_helpers import MemoryReactor

from synapse.api.constants import EduTypes, RoomEncryptionAlgorithms
from synapse.api.presence import UserPresenceState
from synapse.federation.sender.per_destination_queue import MAX_PRESENCE_STATES_PER_EDU
from synapse.federation.units import Transaction
from synapse.handlers.device import DeviceHandler
from synapse.rest import admin
Expand Down Expand Up @@ -266,6 +268,123 @@ def test_send_receipts_with_backoff(self) -> None:
)


class FederationSenderPresenceTestCases(HomeserverTestCase):
"""
Test federation sending for presence updates.
"""

def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
self.federation_transport_client = Mock(spec=["send_transaction"])
self.federation_transport_client.send_transaction = AsyncMock()
hs = self.setup_test_homeserver(
federation_transport_client=self.federation_transport_client,
)

return hs

def default_config(self) -> JsonDict:
config = super().default_config()
config["federation_sender_instances"] = None
return config

def test_presence_simple(self) -> None:
"Test that sending a single presence update works"

mock_send_transaction: AsyncMock = (
self.federation_transport_client.send_transaction
)
mock_send_transaction.return_value = {}

sender = self.hs.get_federation_sender()
self.get_success(
sender.send_presence_to_destinations(
[UserPresenceState.default("@user:test")],
["server"],
)
)

self.pump()

# expect a call to send_transaction
mock_send_transaction.assert_awaited_once()

json_cb = mock_send_transaction.call_args[0][1]
data = json_cb()
self.assertEqual(
data["edus"],
[
{
"edu_type": EduTypes.PRESENCE,
"content": {
"push": [
{
"presence": "offline",
"user_id": "@user:test",
}
]
},
}
],
)

def test_presence_batched(self) -> None:
"""Test that sending lots of presence updates to a destination are
batched, rather than having them all sent in one EDU."""

mock_send_transaction: AsyncMock = (
self.federation_transport_client.send_transaction
)
mock_send_transaction.return_value = {}

sender = self.hs.get_federation_sender()

# We now send lots of presence updates to force the federation sender to
# batch the mup.
number_presence_updates_to_send = MAX_PRESENCE_STATES_PER_EDU * 2
self.get_success(
sender.send_presence_to_destinations(
[
UserPresenceState.default(f"@user{i}:test")
for i in range(number_presence_updates_to_send)
],
["server"],
)
)

self.pump()

# We should have seen at least one transcation be sent by now.
mock_send_transaction.assert_called()

# We don't want to specify exactly how the presence EDUs get sent out,
# could be one per transaction or multiple per transaction. We just want
# to assert that a) each presence EDU has bounded number of updates, and
# b) that all updates get sent out.
presence_edus = []
for transaction_call in mock_send_transaction.call_args_list:
json_cb = transaction_call[0][1]
data = json_cb()

for edu in data["edus"]:
self.assertEqual(edu.get("edu_type"), EduTypes.PRESENCE)
presence_edus.append(edu)

# A set of all user presence we see, this should end up matching the
# number we sent out above.
seen_users: Set[str] = set()

for edu in presence_edus:
presence_states = edu["content"]["push"]

# This is where we actually check that the number of presence
# updates is bounded.
self.assertLessEqual(len(presence_states), MAX_PRESENCE_STATES_PER_EDU)

seen_users.update(p["user_id"] for p in presence_states)

self.assertEqual(len(seen_users), number_presence_updates_to_send)


class FederationSenderDevicesTestCases(HomeserverTestCase):
"""
Test federation sending to update devices.
Expand Down
Loading