Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Derive current_state_events from state groups
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Jan 20, 2017
1 parent 97efe99 commit 09eb08f
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 99 deletions.
1 change: 0 additions & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,6 @@ def _persist_auth_tree(self, origin, auth_events, state, event):

event_stream_id, max_stream_id = yield self.store.persist_event(
event, new_event_context,
current_state=state,
)

defer.returnValue((event_stream_id, max_stream_id))
Expand Down
3 changes: 3 additions & 0 deletions synapse/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,9 @@ def resolve_events(state_sets, state_map_factory):
dict[(str, str), synapse.events.FrozenEvent] is a map from
(type, state_key) to event.
"""
if len(state_sets) == 1:
return state_sets[0]

unconflicted_state, conflicted_state = _seperate(
state_sets,
)
Expand Down
188 changes: 118 additions & 70 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore, _RollbackButIsFineException
from ._base import SQLBaseStore

from twisted.internet import defer, reactor

Expand All @@ -27,6 +27,7 @@
from synapse.util.metrics import Measure
from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.state import resolve_events

from canonicaljson import encode_canonical_json
from collections import deque, namedtuple, OrderedDict
Expand Down Expand Up @@ -71,22 +72,19 @@ class _EventPeristenceQueue(object):
"""

_EventPersistQueueItem = namedtuple("_EventPersistQueueItem", (
"events_and_contexts", "current_state", "backfilled", "deferred",
"events_and_contexts", "backfilled", "deferred",
))

def __init__(self):
self._event_persist_queues = {}
self._currently_persisting_rooms = set()

def add_to_queue(self, room_id, events_and_contexts, backfilled, current_state):
def add_to_queue(self, room_id, events_and_contexts, backfilled):
"""Add events to the queue, with the given persist_event options.
"""
queue = self._event_persist_queues.setdefault(room_id, deque())
if queue:
end_item = queue[-1]
if end_item.current_state or current_state:
# We perist events with current_state set to True one at a time
pass
if end_item.backfilled == backfilled:
end_item.events_and_contexts.extend(events_and_contexts)
return end_item.deferred.observe()
Expand All @@ -96,7 +94,6 @@ def add_to_queue(self, room_id, events_and_contexts, backfilled, current_state):
queue.append(self._EventPersistQueueItem(
events_and_contexts=events_and_contexts,
backfilled=backfilled,
current_state=current_state,
deferred=deferred,
))

Expand Down Expand Up @@ -216,7 +213,6 @@ def persist_events(self, events_and_contexts, backfilled=False):
d = preserve_fn(self._event_persist_queue.add_to_queue)(
room_id, evs_ctxs,
backfilled=backfilled,
current_state=None,
)
deferreds.append(d)

Expand All @@ -229,11 +225,10 @@ def persist_events(self, events_and_contexts, backfilled=False):

@defer.inlineCallbacks
@log_function
def persist_event(self, event, context, current_state=None, backfilled=False):
def persist_event(self, event, context, backfilled=False):
deferred = self._event_persist_queue.add_to_queue(
event.room_id, [(event, context)],
backfilled=backfilled,
current_state=current_state,
)

self._maybe_start_persisting(event.room_id)
Expand All @@ -246,21 +241,10 @@ def persist_event(self, event, context, current_state=None, backfilled=False):
def _maybe_start_persisting(self, room_id):
@defer.inlineCallbacks
def persisting_queue(item):
if item.current_state:
for event, context in item.events_and_contexts:
# There should only ever be one item in
# events_and_contexts when current_state is
# not None
yield self._persist_event(
event, context,
current_state=item.current_state,
backfilled=item.backfilled,
)
else:
yield self._persist_events(
item.events_and_contexts,
backfilled=item.backfilled,
)
yield self._persist_events(
item.events_and_contexts,
backfilled=item.backfilled,
)

self._event_persist_queue.handle_queue(room_id, persisting_queue)

Expand Down Expand Up @@ -294,36 +278,89 @@ def _persist_events(self, events_and_contexts, backfilled=False,
for chunk in chunks:
# We can't easily parallelize these since different chunks
# might contain the same event. :(

current_state_for_room = {}
if not backfilled:
# Work out the new "current state" for each room.
# We do this by working out what the new extremities are and then
# calculating the state from that.
events_by_room = {}
for event, context in chunk:
events_by_room.setdefault(event.room_id, []).append(
(event, context)
)

for room_id, ev_ctx_rm in events_by_room.items():
# Work out new extremities by recursively adding and removing
# the new events.
latest_event_ids = yield self.get_latest_event_ids_in_room(
room_id
)
new_latest_event_ids = set(latest_event_ids)
for event, ctx in ev_ctx_rm:
if event.internal_metadata.is_outlier():
continue

new_latest_event_ids.difference_update(
e_id for e_id, _ in event.prev_events
)
new_latest_event_ids.add(event.event_id)

if new_latest_event_ids == set(latest_event_ids):
# No change in extremities, so no change in state
continue

# Now we need to work out the different state sets for
# each state extremities
state_sets = []
missing_event_ids = []
was_updated = False
for event_id in new_latest_event_ids:
# First search in the list of new events we're adding,
# and then use the current state from that
for ev, ctx in ev_ctx_rm:
if event_id == ev.event_id:
if ctx.current_state_ids is None:
raise Exception("Unknown current state")
state_sets.append(ctx.current_state_ids)
if ctx.delta_ids or hasattr(ev, "state_key"):
was_updated = True
break
else:
# If we couldn't find it, then we'll need to pull
# the state from the database
was_updated = True
missing_event_ids.append(event_id)

if missing_event_ids:
# Now pull out the state for any missing events from DB
event_to_groups = yield self._get_state_group_for_events(
missing_event_ids,
)

groups = set(event_to_groups.values())
group_to_state = yield self._get_state_for_groups(groups)

state_sets.extend(group_to_state.values())

if not new_latest_event_ids or was_updated:
current_state_for_room[room_id] = yield resolve_events(
state_sets,
state_map_factory=lambda ev_ids: self.get_events(
ev_ids, get_prev_content=False, check_redacted=False,
),
)

yield self.runInteraction(
"persist_events",
self._persist_events_txn,
events_and_contexts=chunk,
backfilled=backfilled,
delete_existing=delete_existing,
current_state_for_room=current_state_for_room,
)
persist_event_counter.inc_by(len(chunk))

@_retry_on_integrity_error
@defer.inlineCallbacks
@log_function
def _persist_event(self, event, context, current_state=None, backfilled=False,
delete_existing=False):
try:
with self._stream_id_gen.get_next() as stream_ordering:
event.internal_metadata.stream_ordering = stream_ordering
yield self.runInteraction(
"persist_event",
self._persist_event_txn,
event=event,
context=context,
current_state=current_state,
backfilled=backfilled,
delete_existing=delete_existing,
)
persist_event_counter.inc()
except _RollbackButIsFineException:
pass

@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
get_prev_content=False, allow_rejected=False,
Expand Down Expand Up @@ -426,7 +463,7 @@ def _persist_event_txn(self, txn, event, context, current_state, backfilled=Fals

@log_function
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
delete_existing=False):
delete_existing=False, current_state_for_room={}):
"""Insert some number of room events into the necessary database tables.
Rejected events are only inserted into the events table, the events_json table,
Expand All @@ -436,6 +473,40 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled,
If delete_existing is True then existing events will be purged from the
database before insertion. This is useful when retrying due to IntegrityError.
"""
for room_id, current_state in current_state_for_room.iteritems():
txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (room_id,))

# Add an entry to the current_state_resets table to record the point
# where we clobbered the current state
stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
self._simple_insert_txn(
txn,
table="current_state_resets",
values={"event_stream_ordering": stream_order}
)

self._simple_delete_txn(
txn,
table="current_state_events",
keyvalues={"room_id": room_id},
)

self._simple_insert_many_txn(
txn,
table="current_state_events",
values=[
{
"event_id": ev_id,
"room_id": room_id,
"type": key[0],
"state_key": key[1],
}
for key, ev_id in current_state.iteritems()
],
)

# Ensure that we don't have the same event twice.
# Pick the earliest non-outlier if there is one, else the earliest one.
new_events_and_contexts = OrderedDict()
Expand Down Expand Up @@ -798,29 +869,6 @@ def event_dict(event):
# to update the current state table
return

for event, _ in state_events_and_contexts:
if event.internal_metadata.is_outlier():
# Outlier events shouldn't clobber the current state.
continue

txn.call_after(
self._get_current_state_for_key.invalidate,
(event.room_id, event.type, event.state_key,)
)

self._simple_upsert_txn(
txn,
"current_state_events",
keyvalues={
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
values={
"event_id": event.event_id,
}
)

return

def _add_to_cache(self, txn, events_and_contexts):
Expand Down
Loading

0 comments on commit 09eb08f

Please sign in to comment.