Skip to content

Commit

Permalink
Merge pull request #6241 from tomachalek/sqldb_rewrite
Browse files Browse the repository at this point in the history
Fix and rewrite sql integration and adhoc databases
  • Loading branch information
tomachalek committed Jun 18, 2024
2 parents 7517f37 + 4336cf8 commit bcb9e3b
Show file tree
Hide file tree
Showing 30 changed files with 379 additions and 247 deletions.
2 changes: 1 addition & 1 deletion conf/config.rng
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@
<element name="integration_db">
<a:documentation>A plugin providing connection to an existing database KonText needs to
be integrated with. Typically - corparch, query_persistence, auth plug-ins may support
integration db which allows configuring and utilizing a single connection (pool) instead
integration db which allows configuring and utilizing a single connection instead
of using multiple similar connections and configurations.</a:documentation>
<ref name="plugin_common" />
<ref name="customPluginConfiguration" />
Expand Down
5 changes: 2 additions & 3 deletions lib/action/model/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,11 @@ async def pre_dispatch(self, req_args):
# only general setting can be applied now because
# we do not know final corpus name yet
self._init_default_settings(options)

self._setup_user_paths()
self.cf = corplib.CorpusFactory(self.subcpath)
try:
options.update(await self._load_general_settings())
self.args.map_args_to_attrs(options)
self._setup_user_paths()
self.cf = corplib.CorpusFactory(self.subcpath)
except ValueError as ex:
raise UserReadableException(ex)
return req_args
Expand Down
8 changes: 6 additions & 2 deletions lib/action/plugin/initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,14 @@ def init_plugin(name, module=None, optional=False):
except ImportError as e:
logging.getLogger(__name__).warning(
f'Plugin [{name}] configured but the following error occurred: {e}')
except (PluginException, Exception) as e:
except PluginException as e:
logging.getLogger(__name__).critical(
'Failed to initiate plug-in %s: %s', name, e, exc_info=e)
raise e
raise e from e
except Exception as e:
logging.getLogger(__name__).critical(
'Failed to initiate plug-in %s: %s', name, e, exc_info=e)
raise PluginException(f'Failed to initiate plug-in {name} with error {e.__class__.__name__}: {e}') from e
else:
plugins.add_missing_plugin(name)

Expand Down
3 changes: 3 additions & 0 deletions lib/plugin_types/corparch/backend/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ async def load_interval_attrs(self, cursor: CursorType, corpus_id: str) -> List[
async def load_simple_query_default_attrs(self, cursor: CursorType, corpus_id: str) -> List[str]:
raise NotImplementedError()

async def close(self):
pass


class DatabaseWriteBackend(Generic[CursorType], abc.ABC):

Expand Down
122 changes: 16 additions & 106 deletions lib/plugin_types/integration_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,102 +17,14 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.

import abc
from contextlib import AbstractAsyncContextManager, AbstractContextManager
from typing import Generic, Optional, TypeVar

N = TypeVar('N')
R = TypeVar('R')
SN = TypeVar('SN')
SR = TypeVar('SR')

T = TypeVar('T')


class AsyncDbContextManager(AbstractAsyncContextManager, Generic[T]):
async def __aenter__(self) -> T:
pass


class DbContextManager(AbstractContextManager, Generic[T]):
def __enter__(self) -> T:
pass


class DatabaseAdapter(abc.ABC, Generic[N, R, SN, SR]):

@property
@abc.abstractmethod
def is_autocommit(self):
"""
Return True if autocommit is enabled for the connection; else False
"""
pass

@property
@abc.abstractmethod
def info(self) -> str:
"""
Provide a brief info about the database. This is mainly for administrators
as it is typically written to the application log during KonText start.
"""
pass


@abc.abstractmethod
async def connection(self) -> AsyncDbContextManager[N]:
"""
Return an async connection to the integration database from pool.
Please note that it is important for this function not to return
a new connection each time it is called. Otherwise, functions
like commit_tx, rollback_tx won't likely work as expected.
"""
pass

@abc.abstractmethod
async def cursor(self, dictionary=True) -> AsyncDbContextManager[R]:
"""
Create a new async database cursor
"""
pass

@abc.abstractmethod
async def begin_tx(self, cursor):
pass

@abc.abstractmethod
async def commit_tx(self):
"""
Commit a transaction running within
the current database connection.
"""
pass

@abc.abstractmethod
async def rollback_tx(self):
"""
Rollback a transaction running within
the current database connection.
"""
pass

@abc.abstractmethod
def begin_tx_sync(self, cursor):
pass

@abc.abstractmethod
def connection_sync(self) -> DbContextManager[SN]:
pass

@abc.abstractmethod
def cursor_sync(self, dictionary=True) -> DbContextManager[SR]:
pass
from plugins.common.sqldb import DatabaseAdapter, N, R, SN, SR
from typing import Optional


class IntegrationDatabase(DatabaseAdapter[N, R, SN, SR]):
"""
Integration DB plugin allows sharing a single database connection (or conn. pool) between
multiple plugins which can be convenient especially in case KonText is integrated into an existing
information system with existing user accounts, corpora information, settings storage, etc.
Integration DB plugin allows sharing a single database connection within
a scope of an HTTP action handler (i.e. one connection per HTTP request).
Although not explicitly stated, the interface is tailored for use with SQL databases.
where terms like 'cursor', 'commit', 'rollback' are common. But in general it should be possible
Expand All @@ -121,10 +33,6 @@ class IntegrationDatabase(DatabaseAdapter[N, R, SN, SR]):
Please also note that while the primary function of the plug-in is to allow integration of KonText
with existing SQL databases, it can also be used to create standalone KonText installations based
on MySQL/MariaDB.
It is expected that the plugin will is able to handle a per-request connection lifecycle.
KonText helps with this by calling on_request and on_response handlers to create and close
the connection.
"""

@property
Expand All @@ -142,34 +50,36 @@ def is_active(self):
@abc.abstractmethod
def wait_for_environment(self) -> Optional[Exception]:
"""
This function is called each time KonText service is started
This function is called each time KonText service is started,
and it should block the execution until either an internally
defined/configured timeout has elapsed or some external condition
allowing plug-in initialization has been fulfilled.
This can be e.g. used when KonText run within a Docker container and
This can be e.g. used when KonText run within a Docker container, and
it has to wait for a database container to initialize. In such case
it should e.g. try to select from a table to make sure there is a
working connection along with database, tables and data.
returns:
if None then the environment is ready else provide an error for further processing
if None then the environment is ready, else provide an error for further processing
"""
pass

async def on_request(self):
"""
The function is called by a Sanic 'request' middleware.
This is e.g. the right time to open a db connection
(but be aware of the scope of the connection -
see e.g. contextvars.ContextVar).
The function is called by the Sanic 'request' middleware.
This is the right place to open a db connection.
But please be aware of the scope of the connection. The plug-in
instance's scope is always "per-web worker", which means it is
shared among multiple action handlers running simultaneously.
So it is essential that the connection is scoped/isolated properly.
See e.g. Python's contextvars.ContextVar for a convenient solution.
"""
pass

async def on_response(self):
"""
The function is called by a Sanic 'response' middleware.
This is typically the right time to close the database
connection.
The function is called by the Sanic 'response' middleware.
This is the right place to close the connection.
"""
pass
3 changes: 0 additions & 3 deletions lib/plugins/common/mysql.rng
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
<element name="mysql_passwd">
<text />
</element>
<element name="mysql_pool_size">
<data type="positiveInteger" />
</element>
<element name="mysql_retry_delay">
<data type="positiveInteger" />
</element>
Expand Down
57 changes: 19 additions & 38 deletions lib/plugins/common/mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,77 +26,57 @@
from mysql.connector.aio.abstracts import MySQLConnectionAbstract, MySQLCursorAbstract


@dataclass
class ConnectionArgs:
db: str
user: str
password: str
autocommit: bool = False
host: str = field(default='localhost')
port: int = field(default=3306)


@dataclass
class PoolArgs:
minsize: int = field(default=1)
maxsize: int = field(default=10)
pool_recycle: float = field(default=-1.0)


@dataclass
class MySQLConf:
database: str
user: str
password: str
pool_size: int
conn_retry_delay: int
conn_retry_attempts: int
retry_delay: int
retry_attempts: int

host: str = field(default='localhost')
port: int = field(default=3306)
autocommit: bool = field(default=True)

@staticmethod
def from_conf(conf: Dict[str, Any]) -> 'MySQLConf':
pref = 'mysql_' if any(x.startswith('mysql_') for x in conf.keys()) else ''
return MySQLConf(
host=conf['mysql_host'],
database=conf['mysql_db'],
user=conf['mysql_user'],
password=conf['mysql_passwd'],
pool_size=conf['mysql_pool_size'],
conn_retry_delay=conf['mysql_retry_delay'],
conn_retry_attempts=conf['mysql_retry_attempts'],
host=conf[f'{pref}host'],
database=conf[f'{pref}db'],
user=conf[f'{pref}user'],
password=conf[f'{pref}passwd'],
retry_delay=conf[f'{pref}retry_delay'],
retry_attempts=conf[f'{pref}retry_attempts'],
)

@property
def conn_dict(self) -> Dict[str, Any]:
return asdict(self)

@property
def db(self):
return self.database


class MySQLOps:
"""
A simple wrapper for mysql.connector.aio and mysql.connector (for sync variants).
In KonText, the class is mostly used by the MySQLIntegrationDb plugin.
In KonText, the class is mostly used by the MySQLIntegrationDb plugin and mysql.AdhocDB.
As a standalone type, it is mostly useful for running various scripts and automatic
tasks on the worker server where we don't have to worry about effective
connection usage (for example, if a task runs once every 5 minutes, this means
no significant connection overhead).
no significant connection overhead to always connect and disconnect).
"""

_conn_args: ConnectionArgs

_pool_args: PoolArgs
_conn_args: MySQLConf

_retry_delay: int

_retry_attempts: int

def __init__(self, host, database, user, password, pool_size, autocommit, retry_delay, retry_attempts):
self._conn_args = ConnectionArgs(
host=host, db=database, user=user, password=password, autocommit=autocommit)
self._pool_args = PoolArgs(maxsize=pool_size)
self._retry_delay = retry_delay # TODO has no effect now
self._retry_attempts = retry_attempts # TODO has no effect now
def __init__(self, conn_args):
self._conn_args = conn_args

@asynccontextmanager
async def connection(self) -> Generator[MySQLConnectionAbstract, None, None]:
Expand All @@ -111,6 +91,7 @@ async def connection(self) -> Generator[MySQLConnectionAbstract, None, None]:
user=self._conn_args.user,
password=self._conn_args.password,
host=self._conn_args.host,
port=self._conn_args.port,
database=self._conn_args.db,
ssl_disabled=True,
autocommit=True) as conn:
Expand Down
Loading

0 comments on commit bcb9e3b

Please sign in to comment.