Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prevent memory leak from reoccurring when presence is disabled. #12656

Merged
merged 5 commits into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12656.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prevent memory leak from reoccurring when presence is disabled.
42 changes: 27 additions & 15 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,27 +659,28 @@ def __init__(self, hs: "HomeServer"):
)

now = self.clock.time_msec()
for state in self.user_to_current_state.values():
self.wheel_timer.insert(
now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
)
self.wheel_timer.insert(
now=now,
obj=state.user_id,
then=state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
)
if self.is_mine_id(state.user_id):
if self._presence_enabled:
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
for state in self.user_to_current_state.values():
self.wheel_timer.insert(
now=now,
obj=state.user_id,
then=state.last_federation_update_ts + FEDERATION_PING_INTERVAL,
now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
)
else:
self.wheel_timer.insert(
now=now,
obj=state.user_id,
then=state.last_federation_update_ts + FEDERATION_TIMEOUT,
then=state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
)
if self.is_mine_id(state.user_id):
self.wheel_timer.insert(
now=now,
obj=state.user_id,
then=state.last_federation_update_ts + FEDERATION_PING_INTERVAL,
)
else:
self.wheel_timer.insert(
now=now,
obj=state.user_id,
then=state.last_federation_update_ts + FEDERATION_TIMEOUT,
)

# Set of users who have presence in the `user_to_current_state` that
# have not yet been persisted
Expand Down Expand Up @@ -804,6 +805,13 @@ async def _update_states(
This is currently used to bump the max presence stream ID without changing any
user's presence (see PresenceHandler.add_users_to_send_full_presence_to).
"""
if not self._presence_enabled:
# We shouldn't get here if presence is disabled, but we check anyway
# to ensure that we don't a) send out presence federation and b)
# don't add things to the wheel timer that will never be handled.
logger.warning("Tried to update presence states when presence is disabled")
return

now = self.clock.time_msec()

with Measure(self.clock, "presence_update_states"):
Expand Down Expand Up @@ -1229,6 +1237,10 @@ async def set_state(
):
raise SynapseError(400, "Invalid presence state")

# If presence is disabled, no-op
if not self.hs.config.server.use_presence:
return

user_id = target_user.to_string()

prev_state = await self.current_state_for_user(user_id)
Expand Down
30 changes: 23 additions & 7 deletions synapse/util/wheel_timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,23 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Generic, List, TypeVar
import logging
from typing import Generic, Hashable, List, Set, TypeVar

T = TypeVar("T")
logger = logging.getLogger(__name__)

T = TypeVar("T", bound=Hashable)


class _Entry(Generic[T]):
__slots__ = ["end_key", "queue"]

def __init__(self, end_key: int) -> None:
self.end_key: int = end_key
self.queue: List[T] = []

# We use a set here as otherwise we can end up with a lot of duplicate
# entries.
self.queue: Set[T] = set()
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved


class WheelTimer(Generic[T]):
Expand All @@ -48,17 +54,27 @@ def insert(self, now: int, obj: T, then: int) -> None:
then: When to return the object strictly after.
"""
then_key = int(then / self.bucket_size) + 1
now_key = int(now / self.bucket_size)

if self.entries:
min_key = self.entries[0].end_key
max_key = self.entries[-1].end_key

if min_key < now_key - 10:
# If we have ten buckets that are due and still nothing has
# called `fetch()` then we likely have a bug that is causing a
# memory leak.
logger.warning(
"Inserting into a wheel timer that hasn't been read from recently. Item: %s",
obj,
)

if then_key <= max_key:
# The max here is to protect against inserts for times in the past
self.entries[max(min_key, then_key) - min_key].queue.append(obj)
self.entries[max(min_key, then_key) - min_key].queue.add(obj)
return

next_key = int(now / self.bucket_size) + 1
next_key = now_key + 1
if self.entries:
last_key = self.entries[-1].end_key
else:
Expand All @@ -71,7 +87,7 @@ def insert(self, now: int, obj: T, then: int) -> None:
# to insert. This ensures there are no gaps.
self.entries.extend(_Entry(key) for key in range(last_key, then_key + 1))

self.entries[-1].queue.append(obj)
self.entries[-1].queue.add(obj)

def fetch(self, now: int) -> List[T]:
"""Fetch any objects that have timed out
Expand All @@ -84,7 +100,7 @@ def fetch(self, now: int) -> List[T]:
"""
now_key = int(now / self.bucket_size)

ret = []
ret: List[T] = []
while self.entries and self.entries[0].end_key <= now_key:
ret.extend(self.entries.pop(0).queue)
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

Expand Down