diff --git a/requirements-base.txt b/requirements-base.txt index c0755110c2b74..54b9971ec56b7 100644 --- a/requirements-base.txt +++ b/requirements-base.txt @@ -97,7 +97,7 @@ cryptography>=38.0.3 # Note, grpcio>1.30.0 requires setting GRPC_POLL_STRATEGY=epoll1 # See https://github.com/grpc/grpc/issues/23796 and # https://github.com/grpc/grpc/blob/v1.35.x/doc/core/grpc-polling-engines.md#polling-engine-implementations-in-grpc -grpcio==1.65.4 +grpcio==1.66.1 # not directly used, but provides a speedup for redis hiredis>=2.3.2 diff --git a/requirements-dev-frozen.txt b/requirements-dev-frozen.txt index 70f446695454f..d57ad999096cd 100644 --- a/requirements-dev-frozen.txt +++ b/requirements-dev-frozen.txt @@ -69,7 +69,7 @@ google-resumable-media==2.7.0 googleapis-common-protos==1.63.2 grpc-google-iam-v1==0.13.1 grpc-stubs==1.53.0.5 -grpcio==1.65.4 +grpcio==1.66.1 grpcio-status==1.60.1 h11==0.13.0 hiredis==2.3.2 diff --git a/requirements-frozen.txt b/requirements-frozen.txt index f95d47175a589..7d03a99fae591 100644 --- a/requirements-frozen.txt +++ b/requirements-frozen.txt @@ -57,7 +57,7 @@ google-resumable-media==2.7.0 googleapis-common-protos==1.63.2 grpc-google-iam-v1==0.13.1 grpc-stubs==1.53.0.5 -grpcio==1.65.4 +grpcio==1.66.1 grpcio-status==1.60.1 h11==0.14.0 hiredis==2.3.2 diff --git a/src/sentry/taskworker/pending_task_store.py b/src/sentry/taskworker/pending_task_store.py index 9c588f299b8c5..6de4e8950769e 100644 --- a/src/sentry/taskworker/pending_task_store.py +++ b/src/sentry/taskworker/pending_task_store.py @@ -16,7 +16,9 @@ def store(self, batch: Sequence[Task]): PendingTasks.objects.bulk_create([PendingTasks.from_proto(task) for task in batch]) - def get_pending_task(self, partition: int | None = None, topic: str | None = None) -> Task: + def get_pending_task( + self, partition: int | None = None, topic: str | None = None + ) -> Task | None: from django.db import router, transaction from sentry.taskworker.models import PendingTasks diff --git a/src/sentry/taskworker/retry.py b/src/sentry/taskworker/retry.py index b8043572d52fe..f944364cba9ce 100644 --- a/src/sentry/taskworker/retry.py +++ b/src/sentry/taskworker/retry.py @@ -4,6 +4,8 @@ from collections.abc import Sequence from typing import Any +from sentry_protos.hackweek_team_no_celery_pls.v1alpha.pending_task_pb2 import RetryPolicy + @dataclasses.dataclass class RetryState: @@ -41,7 +43,7 @@ def __init__( self.__deadletter = deadletter self.__discard = discard - def should_retry(self, state: RetryState, exc: Exception) -> bool: + def should_retry(self, state: RetryPolicy, exc: Exception) -> bool: # No more attempts left if state.attempts >= self.__times: return False diff --git a/src/sentry/taskworker/service/client.py b/src/sentry/taskworker/service/client.py new file mode 100644 index 0000000000000..01dab1a12224f --- /dev/null +++ b/src/sentry/taskworker/service/client.py @@ -0,0 +1,50 @@ +import logging + +import grpc +from sentry_protos.hackweek_team_no_celery_pls.v1alpha.pending_task_pb2 import ( + COMPLETE, + GetTaskRequest, + SetTaskResultRequest, + Status, + Task, +) +from sentry_protos.hackweek_team_no_celery_pls.v1alpha.pending_task_pb2_grpc import ConsumerStub + +from sentry.taskworker.pending_task_store import PendingTaskStore + +logger = logging.getLogger("sentry.taskworker") + + +class TaskClient: + """ + Emulate an RPC style interface + + This interface is the 'worker process' interface for + fetching and updating state on tasks. Worker processes + can rely on this interface to be stable. + """ + + def __init__(self): + self.pending_task_store = PendingTaskStore() + self.host = "localhost" + self.server_port = 50051 + self.channel = grpc.insecure_channel(f"{self.host}:{self.server_port}") + self.stub = ConsumerStub(self.channel) + + def get_task(self, partition: int | None = None, topic: str | None = None) -> Task | None: + logger.info("getting_latest_tasks", extra={"partition": partition, "topic": topic}) + request = GetTaskRequest(partition=partition, topic=topic) + response = self.stub.GetTask(request) + if response.HasField("task"): + return response.task + return None + + def set_task_status(self, task_id: int, task_status: Status.ValueType): + request = SetTaskResultRequest(store_id=task_id, status=task_status) + self.stub.SetTaskResult(request) + + def complete_task(self, task_id: int): + self.set_task_status(task_id=task_id, task_status=COMPLETE) + + +task_client = TaskClient() diff --git a/src/sentry/taskworker/service/service.py b/src/sentry/taskworker/service/service.py deleted file mode 100644 index e012bdc3f96f3..0000000000000 --- a/src/sentry/taskworker/service/service.py +++ /dev/null @@ -1,37 +0,0 @@ -import logging - -from sentry_protos.hackweek_team_no_celery_pls.v1alpha.pending_task_pb2 import ( - COMPLETE, - Status, - Task, -) - -from sentry.taskworker.pending_task_store import PendingTaskStore - -logger = logging.getLogger("sentry.taskworker") - - -class TaskService: - """ - Emulate an RPC style interface - - This interface is the 'worker process' interface for - fetching and updating state on tasks. Worker processes - can rely on this interface to be stable. - """ - - def __init__(self): - self.pending_task_store = PendingTaskStore() - - def get_task(self, *, partition: int | None = None, topic: str | None = None) -> Task | None: - logger.info("getting_latest_tasks", extra={"partition": partition, "topic": topic}) - return self.pending_task_store.get_pending_task(partition, topic) - - def set_task_status(self, *, task_id: int, task_status: Status.ValueType): - self.pending_task_store.set_task_status(task_id=task_id, task_status=task_status) - - def complete_task(self, *, task_id: int): - self.set_task_status(task_id=task_id, task_status=COMPLETE) - - -task_service = TaskService() diff --git a/src/sentry/taskworker/worker.py b/src/sentry/taskworker/worker.py index 942311e3b4fde..3006b3376b04b 100644 --- a/src/sentry/taskworker/worker.py +++ b/src/sentry/taskworker/worker.py @@ -47,11 +47,10 @@ def start(self) -> None: logger.exception("Worker process crashed") def process_tasks(self, namespace: TaskNamespace) -> None: - from sentry.taskworker.service.service import task_service + from sentry.taskworker.service.client import task_client - # This emulates an RPC service interface. - task_data = task_service.get_task(topic=namespace.topic) - if task_data is None: + task_data = task_client.get_task(topic=namespace.topic) + if not task_data: logger.info("No tasks") time.sleep(1) return @@ -71,18 +70,18 @@ def process_tasks(self, namespace: TaskNamespace) -> None: except Exception as err: logger.info("taskworker.task_errored", extra={"error": str(err)}) # TODO check retry policy - if task_meta.should_retry(task_data.retry_state(), err): + if task_meta.should_retry(task_data.work.retry_state, err): logger.info("taskworker.task.retry", extra={"task": task_data.work.taskname}) next_state = RETRY if next_state == COMPLETE: logger.info("taskworker.task.complete", extra={"task": task_data.work.taskname}) - task_service.complete_task(task_id=task_data.store_id) + task_client.complete_task(task_id=task_data.store_id) else: logger.info( "taskworker.task.change_status", extra={"task": task_data.work.taskname, "state": next_state}, ) - task_service.set_task_status( + task_client.set_task_status( task_id=task_data.store_id, task_status=next_state, )