Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Properly handle unknown results for the stream change cache. (#14592)
Browse files Browse the repository at this point in the history
StreamChangeCache.get_all_changed_entities can return None to signify
it does not have information at the given stream position. Two callers (related
to device lists and presence) were treating this response the same as an empty
list (i.e. there being no updates).
  • Loading branch information
clokep committed Dec 2, 2022
1 parent 6acb6d7 commit fac8a38
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
1 change: 1 addition & 0 deletions changelog.d/14592.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing bug where a device list update might not be sent to clients in certain circumstances.
4 changes: 2 additions & 2 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1764,14 +1764,14 @@ async def _filter_all_presence_updates_for_user(
Returns:
A list of presence states for the given user to receive.
"""
updated_users = None
if from_key:
# Only return updates since the last sync
updated_users = self.store.presence_stream_cache.get_all_entities_changed(
from_key
)
if not updated_users:
updated_users = []

if updated_users is not None:
# Get the actual presence update for each change
users_to_state = await self.get_presence_handler().current_state_for_users(
updated_users
Expand Down
33 changes: 19 additions & 14 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,12 +842,11 @@ async def get_users_whose_devices_changed(
user_ids, from_key
)

if not user_ids_to_check:
# If an empty set was returned, there's nothing to do.
if user_ids_to_check is not None and not user_ids_to_check:
return set()

def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
changes: Set[str] = set()

stream_id_where_clause = "stream_id > ?"
sql_args = [from_key]

Expand All @@ -858,19 +857,25 @@ def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
sql = f"""
SELECT DISTINCT user_id FROM device_lists_stream
WHERE {stream_id_where_clause}
AND
"""

# Query device changes with a batch of users at a time
# Assertion for mypy's benefit; see also
# https://mypy.readthedocs.io/en/stable/common_issues.html#narrowing-and-inner-functions
assert user_ids_to_check is not None
for chunk in batch_iter(user_ids_to_check, 100):
clause, args = make_in_list_sql_clause(
txn.database_engine, "user_id", chunk
)
txn.execute(sql + clause, sql_args + args)
changes.update(user_id for user_id, in txn)
# If the stream change cache gave us no information, fetch *all*
# users between the stream IDs.
if user_ids_to_check is None:
txn.execute(sql, sql_args)
return {user_id for user_id, in txn}

# Otherwise, fetch changes for the given users.
else:
changes: Set[str] = set()

# Query device changes with a batch of users at a time
for chunk in batch_iter(user_ids_to_check, 100):
clause, args = make_in_list_sql_clause(
txn.database_engine, "user_id", chunk
)
txn.execute(sql + " AND " + clause, sql_args + args)
changes.update(user_id for user_id, in txn)

return changes

Expand Down

0 comments on commit fac8a38

Please sign in to comment.