diff --git a/querybook/server/lib/query_executor/exc.py b/querybook/server/lib/query_executor/exc.py new file mode 100644 index 000000000..c6db42bca --- /dev/null +++ b/querybook/server/lib/query_executor/exc.py @@ -0,0 +1,14 @@ +class QueryExecutorException(Exception): + pass + + +# AlreadyExecutedException +# This error will happen since we turned acks_late = True +# So in the event of worker unexpected crash, the task +# will get reassigned +class AlreadyExecutedException(QueryExecutorException): + pass + + +class InvalidQueryExecution(QueryExecutorException): + pass diff --git a/querybook/server/lib/query_executor/executor_factory.py b/querybook/server/lib/query_executor/executor_factory.py new file mode 100644 index 000000000..e138d5887 --- /dev/null +++ b/querybook/server/lib/query_executor/executor_factory.py @@ -0,0 +1,96 @@ +from app.db import with_session +from const.query_execution import QueryExecutionStatus +from lib.logger import get_logger +from lib.query_analysis import get_statement_ranges +from lib.query_analysis.lineage import process_query +from logic import ( + admin as admin_logic, + query_execution as qe_logic, + user as user_logic, +) +from .exc import AlreadyExecutedException, InvalidQueryExecution +from .all_executors import get_executor_class + +LOG = get_logger(__file__) + + +@with_session +def create_executor_from_execution(query_execution_id, celery_task, session=None): + executor_params, engine = _get_executor_params_and_engine( + query_execution_id, celery_task=celery_task, session=session + ) + executor = get_executor_class(engine.language, engine.executor)(**executor_params) + return executor + + +@with_session +def _get_executor_params_and_engine(query_execution_id, celery_task, session=None): + query, statement_ranges, uid, engine_id = _get_query_execution_info( + query_execution_id, session=session + ) + user = user_logic.get_user_by_id(uid, session=session) + engine = admin_logic.get_query_engine_by_id(engine_id, session=session) + + return ( + { + "query_execution_id": query_execution_id, + "celery_task": celery_task, + "query": query, + "statement_ranges": statement_ranges, + "client_setting": { + **engine.get_engine_params(), + "proxy_user": user.username, + }, + }, + engine, + ) + + +@with_session +def _get_query_execution_info(query_execution_id, session=None): + query_execution = qe_logic.get_query_execution_by_id( + query_execution_id, session=session + ) + if not query_execution: + raise InvalidQueryExecution(f"Query {query_execution_id} does not exist") + if query_execution.status != QueryExecutionStatus.INITIALIZED: + # Double check to see query has been executed since + # it could be re-inserted after celery worker failure + raise AlreadyExecutedException( + f"Query {query_execution_id} is already executed. This is likely caused by a worker crash." + ) + + query = query_execution.query + statement_ranges = get_statement_ranges(query) + uid = query_execution.uid + engine_id = query_execution.engine_id + + _assert_safe_query(query, engine_id, session=session) + return query, statement_ranges, uid, engine_id + + +@with_session +def _assert_safe_query(query, engine_id, session=None): + try: + from lib.metastore.utils import MetastoreTableACLChecker + + LOG.debug("assert_safe_query") + table_per_statement, _ = process_query(query) + all_tables = [table for tables in table_per_statement for table in tables] + + query_engine = admin_logic.get_query_engine_by_id(engine_id, session=session) + metastore = admin_logic.get_query_metastore_by_id( + query_engine.metastore_id, session=session + ) + acl_checker = MetastoreTableACLChecker(metastore.acl_control) + + for table in all_tables: + schema_name, table_name = table.split(".") + if not acl_checker.is_table_valid(schema_name, table_name): + raise InvalidQueryExecution( + f"Table {table} is not allowed by metastore" + ) + except InvalidQueryExecution as e: + raise e + except Exception as e: + LOG.info(e) diff --git a/querybook/server/lib/query_executor/notification.py b/querybook/server/lib/query_executor/notification.py new file mode 100644 index 000000000..c2914fb47 --- /dev/null +++ b/querybook/server/lib/query_executor/notification.py @@ -0,0 +1,61 @@ +from app.db import with_session +from env import QuerybookSettings +from lib.notify.utils import notify_user +from logic import ( + query_execution as qe_logic, + user as user_logic, + query_execution_permission as qe_perm_logic, +) + + +@with_session +def notifiy_on_execution_completion(query_execution_id, session=None): + query_execution = qe_logic.get_query_execution_by_id( + query_execution_id, session=session + ) + + notifications = query_execution.notifications + if len(notifications): + data_cell = next(iter(query_execution.cells), None) + # TODO: this should be determined by the notification.user? + # Come up with a more efficient way to determine env per user + env_name = getattr( + qe_perm_logic.get_default_user_environment_by_execution_id( + execution_id=query_execution_id, + uid=query_execution.uid, + session=session, + ), + "name", + None, + ) + + # If the query execution is not associated with any environment + # then no notification can be done + if not env_name: + return + + for notification in notifications: + uid = notification.user + user = user_logic.get_user_by_id(uid, session=session) + doc_id = None + cell_id = None + query_title = "Untitled" + + if data_cell is not None: + cell_id = data_cell.id + doc_id = data_cell.doc.id + query_title = data_cell.meta.get("title", query_title) + + notify_user( + user=user, + template_name="query_completion_notification", + template_params=dict( + query_execution=query_execution, + doc_id=doc_id, + cell_id=cell_id, + query_title=query_title, + public_url=QuerybookSettings.PUBLIC_URL, + env_name=env_name, + ), + session=session, + ) diff --git a/querybook/server/tasks/run_query.py b/querybook/server/tasks/run_query.py index e141ac924..8f31f0e11 100644 --- a/querybook/server/tasks/run_query.py +++ b/querybook/server/tasks/run_query.py @@ -1,24 +1,18 @@ import traceback import datetime - -from app.db import with_session, DBSession -from app.flask_app import celery from celery.contrib.abortable import AbortableTask from celery.exceptions import SoftTimeLimitExceeded from celery.utils.log import get_task_logger + +from app.db import with_session, DBSession +from app.flask_app import celery from const.query_execution import QueryExecutionStatus -from env import QuerybookSettings -from lib.query_analysis import get_statement_ranges -from lib.query_analysis.lineage import process_query -from lib.query_executor.all_executors import get_executor_class -from logic import ( - admin as admin_logic, - query_execution as qe_logic, - user as user_logic, - query_execution_permission as qe_perm_logic, -) +from lib.query_executor.notification import notifiy_on_execution_completion +from lib.query_executor.executor_factory import create_executor_from_execution +from lib.query_executor.exc import QueryExecutorException +from logic import query_execution as qe_logic from tasks.log_query_per_table import log_query_per_table_task -from lib.notify.utils import notify_user + LOG = get_task_logger(__name__) @@ -34,206 +28,95 @@ def run_query_task(self, query_execution_id): executor = None error_message = None + query_execution_status = QueryExecutionStatus.INITIALIZED try: - # Performance sanity check to see query has been executed - # Raise AlreadyExecutedException if it has been ran before - query, statement_ranges, uid, engine_id = get_query_execution_params( - query_execution_id - ) - - user = user_logic.get_user_by_id(uid) - engine = admin_logic.get_query_engine_by_id(engine_id) - - executor_params = { - "query_execution_id": query_execution_id, - "celery_task": self, - "query": query, - "statement_ranges": statement_ranges, - "client_setting": { - **engine.get_engine_params(), - "proxy_user": user.username, - }, - } - - executor = get_executor_class(engine.language, engine.executor)( - **executor_params - ) - - while True: - if self.is_aborted(): - executor.cancel() - break - - executor.poll() - if executor.status != QueryExecutionStatus.RUNNING: - break - executor.sleep() - except (AlreadyExecutedException, SoftTimeLimitExceeded, Exception) as e: - # AlreadyExecutedException - # This error will happen since we turned acks_late = True - # So in the event of worker unexpected crash, the task - # will get reassigned - + executor = create_executor_from_execution(query_execution_id, celery_task=self) + run_executor_until_finish(self, executor) + except SoftTimeLimitExceeded: # SoftTimeLimitExceeded # This exception happens when query has been running for more than - # 2 days. - + # the limited time (default 2 days) + error_message = "The execution has exceeded the maximum allowed time." + except QueryExecutorException as e: + error_message = str(e) + except Exception as e: error_message = "{}\n{}".format(e, traceback.format_exc()) finally: # When the finally block is reached, it is expected # that the executor should be in one of the end state with DBSession() as session: - final_query_status = QueryExecutionStatus.INITIALIZED - if executor: - final_query_status = executor.status - else: - # If the error happens before the executor is initialized - # we check the existing query execution status in db - # for reference - query_execution = qe_logic.get_query_execution_by_id( - query_execution_id, session=session - ) - if query_execution is not None: - final_query_status = query_execution.status - - if final_query_status in ( - QueryExecutionStatus.INITIALIZED, - QueryExecutionStatus.DELIVERED, - QueryExecutionStatus.RUNNING, - ): - if error_message is None: - error_message = "Unknown error, executor not completed when exit" - LOG.error(error_message) - - qe_logic.create_query_execution_error( - query_execution_id, - error_type=None, - error_message_extracted=None, - error_message=error_message, - session=session, - ) - qe_logic.update_query_execution( - query_execution_id, - status=QueryExecutionStatus.ERROR, - completed_at=datetime.datetime.utcnow(), - session=session, - ) - - send_out_notification(query_execution_id, session=session) - - # not using final_query_status because this might be a query - # that was sent again - if executor and executor.status == QueryExecutionStatus.DONE: + query_execution_status = get_query_execution_final_status( + query_execution_id, executor, error_message, session=session + ) + notifiy_on_execution_completion(query_execution_id, session=session) + + # Executor exists means the query actually executed + # This prevents cases when query_execution got executed twice + if executor and query_execution_status == QueryExecutionStatus.DONE: log_query_per_table_task.delay(query_execution_id) - return executor.status.value if executor is not None else None + return query_execution_status.value if executor is not None else None + + +def run_executor_until_finish(celery_task, executor): + while True: + if celery_task.is_aborted(): + executor.cancel() + break + executor.poll() + if executor.status != QueryExecutionStatus.RUNNING: + break + executor.sleep() -def get_query_execution_params(query_execution_id): - with DBSession() as session: +@with_session +def get_query_execution_final_status( + query_execution_id, executor, error_message, session=None +): + final_query_status = QueryExecutionStatus.INITIALIZED + if executor: + final_query_status = executor.status + else: + # If the error happens before the executor is initialized + # we check the existing query execution status in db + # for reference query_execution = qe_logic.get_query_execution_by_id( query_execution_id, session=session ) - if not query_execution: - raise InvalidQueryExecution(f"Query {query_execution_id} does not exist") - if query_execution.status != QueryExecutionStatus.INITIALIZED: - raise AlreadyExecutedException( - f"Query {query_execution_id} is already executed. This is likely caused by a worker crash." - ) + if query_execution is not None: + final_query_status = query_execution.status - query = query_execution.query - statement_ranges = get_statement_ranges(query) - uid = query_execution.uid - engine_id = query_execution.engine_id + log_if_incomplete_query_status( + final_query_status, query_execution_id, error_message, session=session + ) - assert_safe_query(query, engine_id, session=session) - return query, statement_ranges, uid, engine_id + return final_query_status @with_session -def assert_safe_query(query, engine_id, session=None): - try: - from lib.metastore.utils import MetastoreTableACLChecker - - LOG.debug("assert_safe_query") - table_per_statement, _ = process_query(query) - all_tables = [table for tables in table_per_statement for table in tables] - - query_engine = admin_logic.get_query_engine_by_id(engine_id, session=session) - metastore = admin_logic.get_query_metastore_by_id( - query_engine.metastore_id, session=session +def log_if_incomplete_query_status( + final_query_status, query_execution_id, error_message, session=None +): + if final_query_status in ( + QueryExecutionStatus.INITIALIZED, + QueryExecutionStatus.DELIVERED, + QueryExecutionStatus.RUNNING, + ): + if error_message is None: + error_message = "Unknown error, executor not completed when exit" + LOG.error(error_message) + + qe_logic.create_query_execution_error( + query_execution_id, + error_type=None, + error_message_extracted=None, + error_message=error_message, + session=session, ) - acl_checker = MetastoreTableACLChecker(metastore.acl_control) - - for table in all_tables: - schema_name, table_name = table.split(".") - if not acl_checker.is_table_valid(schema_name, table_name): - raise InvalidQueryExecution( - f"Table {table} is not allowed by metastore" - ) - except InvalidQueryExecution as e: - raise e - except Exception as e: - LOG.info(e) - - -@with_session -def send_out_notification(query_execution_id, session=None): - query_execution = qe_logic.get_query_execution_by_id( - query_execution_id, session=session - ) - - notifications = query_execution.notifications - if len(notifications): - data_cell = next(iter(query_execution.cells), None) - # TODO: this should be determined by the notification.user? - # Come up with a more efficient way to determine env per user - env_name = getattr( - qe_perm_logic.get_default_user_environment_by_execution_id( - execution_id=query_execution_id, - uid=query_execution.uid, - session=session, - ), - "name", - None, + qe_logic.update_query_execution( + query_execution_id, + status=QueryExecutionStatus.ERROR, + completed_at=datetime.datetime.utcnow(), + session=session, ) - - # If the query execution is not associated with any environment - # then no notification can be done - if not env_name: - return - - for notification in notifications: - uid = notification.user - user = user_logic.get_user_by_id(uid, session=session) - doc_id = None - cell_id = None - query_title = "Untitled" - - if data_cell is not None: - cell_id = data_cell.id - doc_id = data_cell.doc.id - query_title = data_cell.meta.get("title", query_title) - - notify_user( - user=user, - template_name="query_completion_notification", - template_params=dict( - query_execution=query_execution, - doc_id=doc_id, - cell_id=cell_id, - query_title=query_title, - public_url=QuerybookSettings.PUBLIC_URL, - env_name=env_name, - ), - session=session, - ) - - -class AlreadyExecutedException(Exception): - pass - - -class InvalidQueryExecution(Exception): - pass