Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor on_receive_pdu code #10615

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10615.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Clean up some of the federation event authentication code for clarity.
271 changes: 137 additions & 134 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,169 +285,172 @@ async def on_receive_pdu(
# - Fetching any missing prev events to fill in gaps in the graph
# - Fetching state if we have a hole in the graph
if not pdu.internal_metadata.is_outlier():
# We only backfill backwards to the min depth.
min_depth = await self.get_min_depth_for_context(pdu.room_id)

logger.debug("min_depth: %d", min_depth)

prevs = set(pdu.prev_event_ids())
seen = await self.store.have_events_in_timeline(prevs)
missing_prevs = prevs - seen

if min_depth is not None and pdu.depth > min_depth:
missing_prevs = prevs - seen
if sent_to_us_directly and missing_prevs:
# If we're missing stuff, ensure we only fetch stuff one
# at a time.
logger.info(
"Acquiring room lock to fetch %d missing prev_events: %s",
len(missing_prevs),
shortstr(missing_prevs),
)
with (await self._room_pdu_linearizer.queue(pdu.room_id)):
if missing_prevs:
if sent_to_us_directly:
# We only backfill backwards to the min depth.
min_depth = await self.get_min_depth_for_context(pdu.room_id)
logger.debug("min_depth: %d", min_depth)

if min_depth is not None and pdu.depth > min_depth:
# If we're missing stuff, ensure we only fetch stuff one
# at a time.
logger.info(
"Acquired room lock to fetch %d missing prev_events",
"Acquiring room lock to fetch %d missing prev_events: %s",
len(missing_prevs),
shortstr(missing_prevs),
)

try:
await self._get_missing_events_for_pdu(
origin, pdu, prevs, min_depth
with (await self._room_pdu_linearizer.queue(pdu.room_id)):
logger.info(
"Acquired room lock to fetch %d missing prev_events",
len(missing_prevs),
)
except Exception as e:
raise Exception(
"Error fetching missing prev_events for %s: %s"
% (event_id, e)
) from e

try:
await self._get_missing_events_for_pdu(
origin, pdu, prevs, min_depth
)
except Exception as e:
raise Exception(
"Error fetching missing prev_events for %s: %s"
% (event_id, e)
) from e

# Update the set of things we've seen after trying to
# fetch the missing stuff
seen = await self.store.have_events_in_timeline(prevs)
missing_prevs = prevs - seen

if not missing_prevs:
logger.info("Found all missing prev_events")

if missing_prevs:
# since this event was pushed to us, it is possible for it to
# become the only forward-extremity in the room, and we would then
# trust its state to be the state for the whole room. This is very
# bad. Further, if the event was pushed to us, there is no excuse
# for us not to have all the prev_events. (XXX: apart from
# min_depth?)
#
# We therefore reject any such events.
logger.warning(
"Rejecting: failed to fetch %d prev events: %s",
len(missing_prevs),
shortstr(missing_prevs),
)
raise FederationError(
"ERROR",
403,
(
"Your server isn't divulging details about prev_events "
"referenced in this event."
),
affected=pdu.event_id,
)

if not prevs - seen:
logger.info(
"Found all missing prev_events",
)

missing_prevs = prevs - seen
if missing_prevs:
# We've still not been able to get all of the prev_events for this event.
#
# In this case, we need to fall back to asking another server in the
# federation for the state at this event. That's ok provided we then
# resolve the state against other bits of the DAG before using it (which
# will ensure that you can't just take over a room by sending an event,
# withholding its prev_events, and declaring yourself to be an admin in
# the subsequent state request).
#
# Now, if we're pulling this event as a missing prev_event, then clearly
# this event is not going to become the only forward-extremity and we are
# guaranteed to resolve its state against our existing forward
# extremities, so that should be fine.
#
# On the other hand, if this event was pushed to us, it is possible for
# it to become the only forward-extremity in the room, and we would then
# trust its state to be the state for the whole room. This is very bad.
# Further, if the event was pushed to us, there is no excuse for us not to
# have all the prev_events. We therefore reject any such events.
#
# XXX this really feels like it could/should be merged with the above,
# but there is an interaction with min_depth that I'm not really
# following.

if sent_to_us_directly:
logger.warning(
"Rejecting: failed to fetch %d prev events: %s",
len(missing_prevs),
else:
# We don't have all of the prev_events for this event.
#
# In this case, we need to fall back to asking another server in the
# federation for the state at this event. That's ok provided we then
# resolve the state against other bits of the DAG before using it (which
# will ensure that you can't just take over a room by sending an event,
# withholding its prev_events, and declaring yourself to be an admin in
# the subsequent state request).
#
# Since we're pulling this event as a missing prev_event, then clearly
# this event is not going to become the only forward-extremity and we are
# guaranteed to resolve its state against our existing forward
# extremities, so that should be fine.
#
# XXX this really feels like it could/should be merged with the above,
# but there is an interaction with min_depth that I'm not really
# following.
logger.info(
"Event %s is missing prev_events %s: calculating state for a "
"backwards extremity",
event_id,
shortstr(missing_prevs),
)
raise FederationError(
"ERROR",
403,
(
"Your server isn't divulging details about prev_events "
"referenced in this event."
),
affected=pdu.event_id,
)

logger.info(
"Event %s is missing prev_events %s: calculating state for a "
"backwards extremity",
event_id,
shortstr(missing_prevs),
)
# Calculate the state after each of the previous events, and
# resolve them to find the correct state at the current event.
event_map = {event_id: pdu}
try:
# Get the state of the events we know about
ours = await self.state_store.get_state_groups_ids(
room_id, seen
)

# Calculate the state after each of the previous events, and
# resolve them to find the correct state at the current event.
event_map = {event_id: pdu}
try:
# Get the state of the events we know about
ours = await self.state_store.get_state_groups_ids(room_id, seen)

# state_maps is a list of mappings from (type, state_key) to event_id
state_maps: List[StateMap[str]] = list(ours.values())

# we don't need this any more, let's delete it.
del ours

# Ask the remote server for the states we don't
# know about
for p in missing_prevs:
logger.info("Requesting state after missing prev_event %s", p)

with nested_logging_context(p):
# note that if any of the missing prevs share missing state or
# auth events, the requests to fetch those events are deduped
# by the get_pdu_cache in federation_client.
remote_state = (
await self._get_state_after_missing_prev_event(
origin, room_id, p
)
# state_maps is a list of mappings from (type, state_key) to event_id
state_maps: List[StateMap[str]] = list(ours.values())

# we don't need this any more, let's delete it.
del ours

# Ask the remote server for the states we don't
# know about
for p in missing_prevs:
logger.info(
"Requesting state after missing prev_event %s", p
)

remote_state_map = {
(x.type, x.state_key): x.event_id for x in remote_state
}
state_maps.append(remote_state_map)
with nested_logging_context(p):
# note that if any of the missing prevs share missing state or
# auth events, the requests to fetch those events are deduped
# by the get_pdu_cache in federation_client.
remote_state = (
await self._get_state_after_missing_prev_event(
origin, room_id, p
)
)

remote_state_map = {
(x.type, x.state_key): x.event_id
for x in remote_state
}
state_maps.append(remote_state_map)

for x in remote_state:
event_map[x.event_id] = x
for x in remote_state:
event_map[x.event_id] = x

room_version = await self.store.get_room_version_id(room_id)
state_map = (
await self._state_resolution_handler.resolve_events_with_store(
room_version = await self.store.get_room_version_id(room_id)
state_map = await self._state_resolution_handler.resolve_events_with_store(
room_id,
room_version,
state_maps,
event_map,
state_res_store=StateResolutionStore(self.store),
)
)

# We need to give _process_received_pdu the actual state events
# rather than event ids, so generate that now.
# We need to give _process_received_pdu the actual state events
# rather than event ids, so generate that now.

# First though we need to fetch all the events that are in
# state_map, so we can build up the state below.
evs = await self.store.get_events(
list(state_map.values()),
get_prev_content=False,
redact_behaviour=EventRedactBehaviour.AS_IS,
)
event_map.update(evs)
# First though we need to fetch all the events that are in
# state_map, so we can build up the state below.
evs = await self.store.get_events(
list(state_map.values()),
get_prev_content=False,
redact_behaviour=EventRedactBehaviour.AS_IS,
)
event_map.update(evs)

state = [event_map[e] for e in state_map.values()]
except Exception:
logger.warning(
"Error attempting to resolve state at missing " "prev_events",
exc_info=True,
)
raise FederationError(
"ERROR",
403,
"We can't get valid state history.",
affected=event_id,
)
state = [event_map[e] for e in state_map.values()]
except Exception:
logger.warning(
"Error attempting to resolve state at missing "
"prev_events",
exc_info=True,
)
raise FederationError(
"ERROR",
403,
"We can't get valid state history.",
affected=event_id,
)

# A second round of checks for all events. Check that the event passes auth
# based on `auth_events`, this allows us to assert that the event would
Expand Down