Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Db txn set isolation level #11799

Merged
merged 9 commits into from
Jan 25, 2022
1 change: 1 addition & 0 deletions changelog.d/11799.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Preparation for reducing Postgres serialization errors: allow setting transaction isolation level. Contributed by Nick @ Beeper.
10 changes: 10 additions & 0 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ async def runInteraction(
func: Callable[..., R],
*args: Any,
db_autocommit: bool = False,
isolation_level: Optional[int] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this parameter isn't being passed into runWithConnection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh huh indeed, looks like I've missed that :/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened #11847 to fix this.

**kwargs: Any,
) -> R:
"""Starts a transaction on the database and runs a given function
Expand All @@ -724,6 +725,7 @@ async def runInteraction(
called multiple times if the transaction is retried, so must
correctly handle that case.

isolation_level: Set the server isolation level for this transaction.
args: positional args to pass to `func`
kwargs: named args to pass to `func`

Expand Down Expand Up @@ -763,6 +765,7 @@ async def runWithConnection(
func: Callable[..., R],
*args: Any,
db_autocommit: bool = False,
isolation_level: Optional[int] = None,
**kwargs: Any,
) -> R:
"""Wraps the .runWithConnection() method on the underlying db_pool.
Expand All @@ -775,6 +778,7 @@ async def runWithConnection(
db_autocommit: Whether to run the function in "autocommit" mode,
i.e. outside of a transaction. This is useful for transaction
that are only a single query. Currently only affects postgres.
isolation_level: Set the server isolation level for this transaction.
kwargs: named args to pass to `func`

Returns:
Expand Down Expand Up @@ -834,6 +838,10 @@ def inner_func(conn, *args, **kwargs):
try:
if db_autocommit:
self.engine.attempt_to_set_autocommit(conn, True)
if isolation_level is not None:
self.engine.attempt_to_set_isolation_level(
conn, isolation_level
)

db_conn = LoggingDatabaseConnection(
conn, self.engine, "runWithConnection"
Expand All @@ -842,6 +850,8 @@ def inner_func(conn, *args, **kwargs):
finally:
if db_autocommit:
self.engine.attempt_to_set_autocommit(conn, False)
if isolation_level:
self.engine.attempt_to_set_isolation_level(conn, None)

return await make_deferred_yieldable(
self._db_pool.runWithConnection(inner_func, *args, **kwargs)
Expand Down
19 changes: 18 additions & 1 deletion synapse/storage/engines/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from typing import Generic, TypeVar
from enum import IntEnum
from typing import Generic, Optional, TypeVar

from synapse.storage.types import Connection


class IsolationLevel(IntEnum):
READ_COMMITTED: int = 1
REPEATABLE_READ: int = 2
SERIALIZABLE: int = 3


class IncorrectDatabaseSetup(RuntimeError):
pass

Expand Down Expand Up @@ -109,3 +116,13 @@ def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
commit/rollback the connections.
"""
...

@abc.abstractmethod
def attempt_to_set_isolation_level(
self, conn: Connection, isolation_level: Optional[int]
):
"""Attempt to set the connections isolation level.

Note: This has no effect on SQLite3, as transactions are SERIALIZABLE by default.
"""
...
29 changes: 25 additions & 4 deletions synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,13 @@
# limitations under the License.

import logging
from typing import Mapping, Optional

from synapse.storage.engines._base import BaseDatabaseEngine, IncorrectDatabaseSetup
from synapse.storage.engines._base import (
BaseDatabaseEngine,
IncorrectDatabaseSetup,
IsolationLevel,
)
from synapse.storage.types import Connection

logger = logging.getLogger(__name__)
Expand All @@ -34,6 +39,15 @@ def _disable_bytes_adapter(_):
self.synchronous_commit = database_config.get("synchronous_commit", True)
self._version = None # unknown as yet

self.isolation_level_map: Mapping[int, int] = {
IsolationLevel.READ_COMMITTED: self.module.extensions.ISOLATION_LEVEL_READ_COMMITTED,
IsolationLevel.REPEATABLE_READ: self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ,
IsolationLevel.SERIALIZABLE: self.module.extensions.ISOLATION_LEVEL_SERIALIZABLE,
}
self.default_isolation_level = (
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
)

@property
def single_threaded(self) -> bool:
return False
Expand Down Expand Up @@ -104,9 +118,7 @@ def convert_param_style(self, sql):
return sql.replace("?", "%s")

def on_new_connection(self, db_conn):
db_conn.set_isolation_level(
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
)
db_conn.set_isolation_level(self.default_isolation_level)

# Set the bytea output to escape, vs the default of hex
cursor = db_conn.cursor()
Expand Down Expand Up @@ -175,3 +187,12 @@ def in_transaction(self, conn: Connection) -> bool:

def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
return conn.set_session(autocommit=autocommit) # type: ignore

def attempt_to_set_isolation_level(
self, conn: Connection, isolation_level: Optional[int]
):
if isolation_level is None:
isolation_level = self.default_isolation_level
else:
isolation_level = self.isolation_level_map[isolation_level]
return conn.set_isolation_level(isolation_level) # type: ignore
7 changes: 7 additions & 0 deletions synapse/storage/engines/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import struct
import threading
import typing
from typing import Optional

from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.types import Connection
Expand Down Expand Up @@ -122,6 +123,12 @@ def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
# set the connection to autocommit mode.
pass

def attempt_to_set_isolation_level(
self, conn: Connection, isolation_level: Optional[int]
):
# All transactions are SERIALIZABLE by default in sqllite
pass


# Following functions taken from: https://github.com/coleifer/peewee

Expand Down