Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Stagger send presence to remotes (#10398)
Browse files Browse the repository at this point in the history
This is to help with performance, where trying to connect to thousands
of hosts at once can consume a lot of CPU (due to TLS etc).

Co-authored-by: Brendan Abolivier <babolivier@matrix.org>
  • Loading branch information
erikjohnston and babolivier committed Jul 15, 2021
1 parent 5ecad4e commit ac5c221
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 5 deletions.
1 change: 1 addition & 0 deletions changelog.d/10398.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Stagger sending of presence update to remote servers, reducing CPU spikes caused by starting many connections to remote servers at once.
96 changes: 94 additions & 2 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

import abc
import logging
from collections import OrderedDict
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple

import attr
from prometheus_client import Counter
from typing_extensions import Literal

from twisted.internet import defer

Expand All @@ -33,8 +36,12 @@
event_processing_loop_room_count,
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.util import Clock
from synapse.util.metrics import Measure

if TYPE_CHECKING:
Expand Down Expand Up @@ -137,6 +144,84 @@ async def get_replication_rows(
raise NotImplementedError()


@attr.s
class _PresenceQueue:
"""A queue of destinations that need to be woken up due to new presence
updates.
Staggers waking up of per destination queues to ensure that we don't attempt
to start TLS connections with many hosts all at once, leading to pinned CPU.
"""

# The maximum duration in seconds between queuing up a destination and it
# being woken up.
_MAX_TIME_IN_QUEUE = 30.0

# The maximum duration in seconds between waking up consecutive destination
# queues.
_MAX_DELAY = 0.1

sender: "FederationSender" = attr.ib()
clock: Clock = attr.ib()
queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict)
processing: bool = attr.ib(default=False)

def add_to_queue(self, destination: str) -> None:
"""Add a destination to the queue to be woken up."""

self.queue[destination] = None

if not self.processing:
self._handle()

@wrap_as_background_process("_PresenceQueue.handle")
async def _handle(self) -> None:
"""Background process to drain the queue."""

if not self.queue:
return

assert not self.processing
self.processing = True

try:
# We start with a delay that should drain the queue quickly enough that
# we process all destinations in the queue in _MAX_TIME_IN_QUEUE
# seconds.
#
# We also add an upper bound to the delay, to gracefully handle the
# case where the queue only has a few entries in it.
current_sleep_seconds = min(
self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue)
)

while self.queue:
destination, _ = self.queue.popitem(last=False)

queue = self.sender._get_per_destination_queue(destination)

if not queue._new_data_to_send:
# The per destination queue has already been woken up.
continue

queue.attempt_new_transaction()

await self.clock.sleep(current_sleep_seconds)

if not self.queue:
break

# More destinations may have been added to the queue, so we may
# need to reduce the delay to ensure everything gets processed
# within _MAX_TIME_IN_QUEUE seconds.
current_sleep_seconds = min(
current_sleep_seconds, self._MAX_TIME_IN_QUEUE / len(self.queue)
)

finally:
self.processing = False


class FederationSender(AbstractFederationSender):
def __init__(self, hs: "HomeServer"):
self.hs = hs
Expand Down Expand Up @@ -208,6 +293,8 @@ def __init__(self, hs: "HomeServer"):

self._external_cache = hs.get_external_cache()

self._presence_queue = _PresenceQueue(self, self.clock)

def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination
Expand Down Expand Up @@ -517,7 +604,12 @@ def send_presence_to_destinations(
self._instance_name, destination
):
continue
self._get_per_destination_queue(destination).send_presence(states)

self._get_per_destination_queue(destination).send_presence(
states, start_loop=False
)

self._presence_queue.add_to_queue(destination)

def build_and_send_edu(
self,
Expand Down
16 changes: 13 additions & 3 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,24 @@ def send_pdu(self, pdu: EventBase) -> None:

self.attempt_new_transaction()

def send_presence(self, states: Iterable[UserPresenceState]) -> None:
"""Add presence updates to the queue. Start the transmission loop if necessary.
def send_presence(
self, states: Iterable[UserPresenceState], start_loop: bool = True
) -> None:
"""Add presence updates to the queue.
Args:
states: Presence updates to send
start_loop: Whether to start the transmission loop if not already
running.
Args:
states: presence to send
"""
self._pending_presence.update({state.user_id: state for state in states})
self.attempt_new_transaction()
self._new_data_to_send = True

if start_loop:
self.attempt_new_transaction()

def queue_read_receipt(self, receipt: ReadReceipt) -> None:
"""Add a RR to the list to be sent. Doesn't start the transmission loop yet
Expand Down
8 changes: 8 additions & 0 deletions tests/events/test_presence_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ def test_send_local_online_presence_to_with_module(self):
presence_updates, _ = sync_presence(self, self.presence_receiving_user_two_id)
self.assertEqual(len(presence_updates), 3)

# We stagger sending of presence, so we need to wait a bit for them to
# get sent out.
self.reactor.advance(60)

# Test that sending to a remote user works
remote_user_id = "@far_away_person:island"

Expand All @@ -301,6 +305,10 @@ def test_send_local_online_presence_to_with_module(self):
self.module_api.send_local_online_presence_to([remote_user_id])
)

# We stagger sending of presence, so we need to wait a bit for them to
# get sent out.
self.reactor.advance(60)

# Check that the expected presence updates were sent
# We explicitly compare using sets as we expect that calling
# module_api.send_local_online_presence_to will create a presence
Expand Down

0 comments on commit ac5c221

Please sign in to comment.