Skip to content

Commit

Permalink
Handling race conditions when caching values (#322)
Browse files Browse the repository at this point in the history
Fix for situations where multiple jobs end up caching the same value at the same time and end up violating uniqueness constrain on value hash.
  • Loading branch information
witek-insitro committed Aug 31, 2023
1 parent f40ad59 commit cd2df16
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 5 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ docs/build/
.redun
redun.db
dist/
build/
build/
scratch/
16 changes: 14 additions & 2 deletions redun/backends/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1614,7 +1614,7 @@ def record_value(self, value: Any, data: Optional[bytes] = None) -> str:
# See `_get_value_data`
data = b""

value_row = self.session.query(Value).filter_by(value_hash=value_hash).first()
value_row = self.session.get(Value, value_hash)
if value_row:
# Value already recorded.
return value_hash
Expand All @@ -1628,7 +1628,19 @@ def record_value(self, value: Any, data: Optional[bytes] = None) -> str:
value=data,
)
)
self.session.commit()
try:
self.session.commit()
except sa.exc.IntegrityError:
# Most likely value recorded in the meantime by another process.
self.session.rollback()
# run a double check
# we can't catch for UniqueViolation or other as it is abstracted away by sqlalchemy
value_row = self.session.get(Value, value_hash)
if value_row:
return value_hash
else:
# something else went wrong
raise

self._record_special_redun_values([value], [value_hash])

Expand Down
32 changes: 30 additions & 2 deletions redun/tests/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from unittest.mock import patch

import pytest
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import cast as sa_cast

Expand Down Expand Up @@ -57,10 +58,37 @@ def test_max_value_size() -> None:

backend = RedunBackendDb(db_uri="sqlite:///:memory:", config=config)
backend.load()
small_object = "J" * 10000
large_object = "J" * 10000

with pytest.raises(RedunDatabaseError):
backend.record_value(small_object)
backend.record_value(large_object)


def test_race_condition_record_value() -> None:
"""
Test proper handling of a dirty read when a recorded value exists even though it didn't
just a couple of lines of code before.
"""
backend = RedunBackendDb(db_uri="sqlite:///:memory:")
backend.load()
proper_commit = backend.session.commit # type: ignore

def commit_but_raise_anyway(*args, **kwargs):
proper_commit(*args, **kwargs)
raise IntegrityError("UniquenessViolated", "UniquenessViolated", "UniquenessViolated")

backend.session.commit = mock.MagicMock(side_effect=commit_but_raise_anyway) # type: ignore
# record value should gracefully handle the IntegrityError and return the value_hash
value_hash1 = backend.record_value("myvalue")

assert backend.get_value(value_hash1) == ("myvalue", True)

backend.session.commit = mock.MagicMock( # type: ignore
side_effect=IntegrityError("OtherIssue", "OtherIssue", "OtherIssue")
)

with pytest.raises(IntegrityError):
backend.record_value("my-other-value")


def test_call_graph_db() -> None:
Expand Down

0 comments on commit cd2df16

Please sign in to comment.