Skip to content

Commit

Permalink
use grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
enochtangg committed Sep 20, 2024
1 parent b1d92ca commit eca1b54
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 49 deletions.
2 changes: 1 addition & 1 deletion requirements-base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ cryptography>=38.0.3
# Note, grpcio>1.30.0 requires setting GRPC_POLL_STRATEGY=epoll1
# See https://github.com/grpc/grpc/issues/23796 and
# https://github.com/grpc/grpc/blob/v1.35.x/doc/core/grpc-polling-engines.md#polling-engine-implementations-in-grpc
grpcio==1.65.4
grpcio==1.66.1

# not directly used, but provides a speedup for redis
hiredis>=2.3.2
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev-frozen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ google-resumable-media==2.7.0
googleapis-common-protos==1.63.2
grpc-google-iam-v1==0.13.1
grpc-stubs==1.53.0.5
grpcio==1.65.4
grpcio==1.66.1
grpcio-status==1.60.1
h11==0.13.0
hiredis==2.3.2
Expand Down
2 changes: 1 addition & 1 deletion requirements-frozen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ google-resumable-media==2.7.0
googleapis-common-protos==1.63.2
grpc-google-iam-v1==0.13.1
grpc-stubs==1.53.0.5
grpcio==1.65.4
grpcio==1.66.1
grpcio-status==1.60.1
h11==0.14.0
hiredis==2.3.2
Expand Down
4 changes: 3 additions & 1 deletion src/sentry/taskworker/pending_task_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ def store(self, batch: Sequence[Task]):

PendingTasks.objects.bulk_create([PendingTasks.from_proto(task) for task in batch])

def get_pending_task(self, partition: int | None = None, topic: str | None = None) -> Task:
def get_pending_task(
self, partition: int | None = None, topic: str | None = None
) -> Task | None:
from django.db import router, transaction

from sentry.taskworker.models import PendingTasks
Expand Down
4 changes: 3 additions & 1 deletion src/sentry/taskworker/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from collections.abc import Sequence
from typing import Any

from sentry_protos.hackweek_team_no_celery_pls.v1alpha.pending_task_pb2 import RetryPolicy


@dataclasses.dataclass
class RetryState:
Expand Down Expand Up @@ -41,7 +43,7 @@ def __init__(
self.__deadletter = deadletter
self.__discard = discard

def should_retry(self, state: RetryState, exc: Exception) -> bool:
def should_retry(self, state: RetryPolicy, exc: Exception) -> bool:
# No more attempts left
if state.attempts >= self.__times:
return False
Expand Down
50 changes: 50 additions & 0 deletions src/sentry/taskworker/service/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import logging

import grpc
from sentry_protos.hackweek_team_no_celery_pls.v1alpha.pending_task_pb2 import (
COMPLETE,
GetTaskRequest,
SetTaskResultRequest,
Status,
Task,
)
from sentry_protos.hackweek_team_no_celery_pls.v1alpha.pending_task_pb2_grpc import ConsumerStub

from sentry.taskworker.pending_task_store import PendingTaskStore

logger = logging.getLogger("sentry.taskworker")


class TaskClient:
"""
Emulate an RPC style interface
This interface is the 'worker process' interface for
fetching and updating state on tasks. Worker processes
can rely on this interface to be stable.
"""

def __init__(self):
self.pending_task_store = PendingTaskStore()
self.host = "localhost"
self.server_port = 50051
self.channel = grpc.insecure_channel(f"{self.host}:{self.server_port}")
self.stub = ConsumerStub(self.channel)

def get_task(self, partition: int | None = None, topic: str | None = None) -> Task | None:
logger.info("getting_latest_tasks", extra={"partition": partition, "topic": topic})
request = GetTaskRequest(partition=partition, topic=topic)
response = self.stub.GetTask(request)
if response.HasField("task"):
return response.task
return None

def set_task_status(self, task_id: int, task_status: Status.ValueType):
request = SetTaskResultRequest(store_id=task_id, status=task_status)
self.stub.SetTaskResult(request)

def complete_task(self, task_id: int):
self.set_task_status(task_id=task_id, task_status=COMPLETE)


task_client = TaskClient()
37 changes: 0 additions & 37 deletions src/sentry/taskworker/service/service.py

This file was deleted.

13 changes: 6 additions & 7 deletions src/sentry/taskworker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ def start(self) -> None:
logger.exception("Worker process crashed")

def process_tasks(self, namespace: TaskNamespace) -> None:
from sentry.taskworker.service.service import task_service
from sentry.taskworker.service.client import task_client

# This emulates an RPC service interface.
task_data = task_service.get_task(topic=namespace.topic)
if task_data is None:
task_data = task_client.get_task(topic=namespace.topic)
if not task_data:
logger.info("No tasks")
time.sleep(1)
return
Expand All @@ -71,18 +70,18 @@ def process_tasks(self, namespace: TaskNamespace) -> None:
except Exception as err:
logger.info("taskworker.task_errored", extra={"error": str(err)})
# TODO check retry policy
if task_meta.should_retry(task_data.retry_state(), err):
if task_meta.should_retry(task_data.work.retry_state, err):
logger.info("taskworker.task.retry", extra={"task": task_data.work.taskname})
next_state = RETRY
if next_state == COMPLETE:
logger.info("taskworker.task.complete", extra={"task": task_data.work.taskname})
task_service.complete_task(task_id=task_data.store_id)
task_client.complete_task(task_id=task_data.store_id)
else:
logger.info(
"taskworker.task.change_status",
extra={"task": task_data.work.taskname, "state": next_state},
)
task_service.set_task_status(
task_client.set_task_status(
task_id=task_data.store_id,
task_status=next_state,
)

0 comments on commit eca1b54

Please sign in to comment.