diff --git a/package.json b/package.json index 841de2ea6..3d9e4aeb7 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "querybook", - "version": "3.28.2", + "version": "3.28.3", "description": "A Big Data Webapp", "private": true, "scripts": { diff --git a/querybook/scripts/run_test b/querybook/scripts/run_test index baefbdc46..b3e90af97 100755 --- a/querybook/scripts/run_test +++ b/querybook/scripts/run_test @@ -17,7 +17,7 @@ usage() { run_python_unit_test() { echo 'Start running python unit tests >>>>>>>>>>>>>>>>>>>>>>>>>>>>' - py.test querybook/tests || exit 1 + python -m gevent.monkey --module pytest querybook/tests || exit 1 } run_webpack_test() { diff --git a/querybook/server/datasources/query_execution.py b/querybook/server/datasources/query_execution.py index 83ddeb180..8c3515cdb 100644 --- a/querybook/server/datasources/query_execution.py +++ b/querybook/server/datasources/query_execution.py @@ -180,7 +180,6 @@ def cancel_query_and_notify(): "RECEIVED", # Rare case where task is received but not yet start "RETRY", # Very unlikely case, because query normally do not retry ): - task.revoke(terminate=True) # last attempt to cancel it cancel_query_and_notify() elif task.state == "ABORTED": diff --git a/querybook/server/lib/utils/utils.py b/querybook/server/lib/utils/utils.py index e38a4549b..ec966607d 100644 --- a/querybook/server/lib/utils/utils.py +++ b/querybook/server/lib/utils/utils.py @@ -1,8 +1,12 @@ +from contextlib import contextmanager import inspect import signal import subprocess from datetime import datetime, date from functools import wraps +from typing import Optional, Union + +import gevent from lib.logger import get_logger @@ -91,17 +95,58 @@ class TimeoutError(Exception): pass -class Timeout: - def __init__(self, sec, custom_error_message=None): +def is_gevent_monkey_patched(): + try: + from gevent import monkey + except ImportError: + return False + else: + # Choose a random library to test if it's patched + return monkey.is_module_patched("socket") + + +@contextmanager +def Timeout(sec: Union[int, float] = 1, custom_error_message: Optional[str] = None): + if is_gevent_monkey_patched(): + with GeventTimeout(sec, custom_error_message=custom_error_message): + yield + else: + with SignalTimeout(sec, custom_error_message=custom_error_message): + yield + + +@contextmanager +def GeventTimeout( + sec: Union[int, float] = 1, custom_error_message: Optional[str] = None +): + """ + This timeout function can be used in gevent celery worker or the web server (which is powered by gevent) + """ + + error_message = custom_error_message or f"Timeout Exception: {sec} seconds" + timeout = gevent.Timeout(sec, TimeoutError(error_message)) + timeout.start() + + try: + yield + finally: + timeout.close() + + +# Deprecated: use GeventTimeout if possible, the Timeout would break in gevent worker +class SignalTimeout: + def __init__( + self, sec: Union[int, float], custom_error_message: Optional[str] = None + ): self.error_message = custom_error_message or f"Timeout Exception: {sec} seconds" self.sec = sec def __enter__(self): signal.signal(signal.SIGALRM, self.raise_timeout) - signal.alarm(self.sec) + signal.setitimer(signal.ITIMER_REAL, self.sec) def __exit__(self, *args): - signal.alarm(0) # disable alarm + signal.setitimer(signal.ITIMER_REAL, 0) # disable alarm def raise_timeout(self, *args): raise TimeoutError(self.error_message) diff --git a/querybook/server/runweb.py b/querybook/server/runweb.py index 041d4edea..0518a0d79 100644 --- a/querybook/server/runweb.py +++ b/querybook/server/runweb.py @@ -1,7 +1,3 @@ -"""This file is for dev server only. - DO NOT USE FOR PROD -""" - from gevent import monkey monkey.patch_all() diff --git a/querybook/server/tasks/run_query.py b/querybook/server/tasks/run_query.py index d93fdc493..20f379169 100644 --- a/querybook/server/tasks/run_query.py +++ b/querybook/server/tasks/run_query.py @@ -17,7 +17,6 @@ from logic.elasticsearch import update_query_execution_by_id from tasks.log_query_per_table import log_query_per_table_task - LOG = get_task_logger(__name__) diff --git a/querybook/tests/test_lib/test_utils/test_utils.py b/querybook/tests/test_lib/test_utils/test_utils.py new file mode 100644 index 000000000..970b49721 --- /dev/null +++ b/querybook/tests/test_lib/test_utils/test_utils.py @@ -0,0 +1,33 @@ +import time + +import pytest + +from lib.utils.utils import GeventTimeout, SignalTimeout, TimeoutError + + +def test_gevent_timeout(): + with pytest.raises(TimeoutError): + with GeventTimeout(0.1): + time.sleep(0.2) + + +def test_gevent_no_timeout(): + try: + with GeventTimeout(0.2): + time.sleep(0.1) + except TimeoutError: + pytest.fail("TimeoutError raised when it shouldn't") + + +def test_signal_timeout(): + with pytest.raises(TimeoutError): + with SignalTimeout(0.1): + time.sleep(0.2) + + +def test_signal_no_timeout(): + try: + with SignalTimeout(0.2): + time.sleep(0.1) + except TimeoutError: + pytest.fail("TimeoutError raised when it shouldn't") diff --git a/requirements/base.txt b/requirements/base.txt index 80a81898c..b5975d040 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -22,6 +22,8 @@ Jinja2==3.1.2 # From Flask # Celery celery==5.2.7 +kombu==5.3.1 # not a direct dependency (from celery), pinned by due to bug: https://github.com/celery/kombu/issues/1785 + # Ops pyyaml==5.4.1