Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Only run one background update at a time #7190

Merged
merged 8 commits into from
Apr 3, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 48 additions & 26 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ def __init__(self, hs, database):
self._clock = hs.get_clock()
self.db = database

# if a background update is currently running, its name.
self._current_background_update = None # type: Optional[str]

self._background_update_performance = {}
self._background_update_queue = []
self._background_update_handlers = {}
self._all_done = False

Expand Down Expand Up @@ -131,7 +133,7 @@ async def has_completed_background_updates(self) -> bool:
return True

# obviously, if we have things in our queue, we're not done.
richvdh marked this conversation as resolved.
Show resolved Hide resolved
if self._background_update_queue:
if self._current_background_update:
return False

# otherwise, check if there are updates to be run. This is important,
Expand All @@ -152,11 +154,10 @@ async def has_completed_background_updates(self) -> bool:
async def has_completed_background_update(self, update_name) -> bool:
"""Check if the given background update has finished running.
"""

if self._all_done:
return True

if update_name in self._background_update_queue:
if update_name == self._current_background_update:
return False

update_exists = await self.db.simple_select_one_onecol(
Expand All @@ -180,31 +181,49 @@ async def do_next_background_update(self, desired_duration_ms: float) -> bool:
Returns:
True if there is no more work to do, otherwise False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be less confusing to do this the other way around (i.e. returning False if there's no more work to do).
The question the return value seemed to be answering before that change seems to be "Do we still have work to do?", and not "Do we not have work to do?", which could lead to confusing double negations, so I suggest we keep the semantics here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if I changed the description to True if we have finished running all the background updates, otherwise False?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's better, though I still find it a bit confusing, but ymmv.

"""
if not self._background_update_queue:
updates = await self.db.simple_select_list(
"background_updates",
keyvalues=None,
retcols=("update_name", "depends_on"),

def get_background_updates_txn(txn):
txn.execute(
"""
SELECT update_name, depends_on FROM background_updates
ORDER BY ordering, update_name
"""
)
in_flight = {update["update_name"] for update in updates}
for update in updates:
if update["depends_on"] not in in_flight:
self._background_update_queue.append(update["update_name"])
return self.db.cursor_to_dict(txn)

if not self._background_update_queue:
# no work left to do
return True
if not self._current_background_update:
all_pending_updates = await self.db.runInteraction(
"background_updates", get_background_updates_txn,
)
if not all_pending_updates:
# no work left to do
return True

# find the first update which isn't dependent on another one in the queue.
pending = {update["update_name"] for update in all_pending_updates}
for upd in all_pending_updates:
depends_on = upd["depends_on"]
if not depends_on or depends_on not in pending:
break
logger.info(
"Not starting on bg update %s until %s is done",
upd["update_name"],
depends_on,
)
else:
# if we get to the end of that for loop, there is a problem
raise Exception(
"Unable to find a background update which doesn't depend on "
"another: dependency cycle?"
)

# pop from the front, and add back to the back
update_name = self._background_update_queue.pop(0)
self._background_update_queue.append(update_name)
self._current_background_update = upd["update_name"]

res = await self._do_background_update(update_name, desired_duration_ms)
await self._do_background_update(desired_duration_ms)
return False

async def _do_background_update(
self, update_name: str, desired_duration_ms: float
) -> int:
async def _do_background_update(self, desired_duration_ms: float) -> int:
update_name = self._current_background_update
logger.info("Starting update batch on background update '%s'", update_name)

update_handler = self._background_update_handlers[update_name]
Expand Down Expand Up @@ -405,9 +424,12 @@ def _end_background_update(self, update_name):
Returns:
A deferred that completes once the task is removed.
"""
self._background_update_queue = [
name for name in self._background_update_queue if name != update_name
]
if update_name != self._current_background_update:
raise Exception(
"Cannot end background update %s which isn't currently running"
% update_name
)
self._current_background_update = None
return self.db.simple_delete_one(
"background_updates", keyvalues={"update_name": update_name}
)
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/prepare_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 57
SCHEMA_VERSION = 58

dir_path = os.path.abspath(os.path.dirname(__file__))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/* add an "ordering" column to background_updates, which can be used to sort them
to achieve some level of consistency. */

ALTER TABLE background_updates ADD COLUMN ordering INT NOT NULL DEFAULT 0;