Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: GRPC server #77762

Draft
wants to merge 4 commits into
base: hackweek-kafkatasks
Choose a base branch
from
Draft

wip: GRPC server #77762

wants to merge 4 commits into from

Conversation

john-z-yang
Copy link
Member

@john-z-yang john-z-yang commented Sep 19, 2024

Turns out our grpc version is too old, bumps grpcio version to 1.65.4.
Run devenv sync to install the new dependency, then run pip install -e ../sentry-protos/py to install the code gen from our sentry protos branch (worker-pull).

You can start the server with
sentry run kafka-task-grpc-server
It doesn't do anything yet, but it's useful to check if imports are successful.

John Sept: 19:
I decided to spend the entire day today to get rid of the PendingTask class that the PendingTaskStore has.

The result of this is that PendingTaskStore and the consumer will use a protobuf data structure directly instead of the python dataclass that wraps a protobuf data structure.

The main reason being that in order to use grpc we cannot pass PendingTask between the services (because it's a python class that wraps a protobuf type, plus some additional data), and instead of creating yet another Protobuf definition, I just added the fields into the existing protobuf message.

The new protobuf messages are structured like the following (name are probably going to change):

Work: An abstract unit of work, which only contains information like retry policy, the task ID, and the function parameters. Theses are the messages that are on Kafka.

Task: The actual physical task for the worker, which contains a Work, but also things like status, topic, partition, recieved_at. These are the messages the consumer stores into the PendingTaskStore.

This leaves us with a single python data structure
PendingTask: The Django model, used only in PendingTaskStore and it's just has all the fields a Task has, but flattened.

Now that PendingTaskStore is just taking a protobuf message (Task), this makes implementing the grpc server relatively simple.

def GetTask(self, request: GetTaskRequest, context) -> GetTaskResponse:
    # task is already a GRPC message
    task = self.pending_task_store.get_pending_task(
        request.partition if request.partition else None,
        request.topic if request.topic else None,
    )
    return GetTaskResponse(task=task)

TODO:

  • Implement the functions defined in ConsumerServicer (we need to make sure to use the PendingTaskStore abstraction here)
  • Have the worker fetch the tasks via grpc by calling gRPC stub in TaskService
  • Test different failure modes

@github-actions github-actions bot added the Scope: Backend Automatically applied to PRs that change backend components label Sep 19, 2024
Comment on lines 77 to 82
def retry_task(self, taskdata: PendingTasks) -> None:
task_message = taskdata.to_message()
message = taskdata.to_proto()
self.producer.produce(
ArroyoTopic(name=self.topic),
KafkaPayload(key=None, value=task_message.SerializeToString(), headers=[]),
KafkaPayload(key=None, value=message.work.SerializeToString(), headers=[]),
)
Copy link
Member Author

@john-z-yang john-z-yang Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to move this to pending_task_store.py because it is using PendingTasks which is the Django model which should be kept in pending_task_store.py. Also the only thing that calls this is pending_task_store.py.

Alternatively, we can make this function accept a protobuf Task instead of PendingTasks

Comment on lines 30 to 34
def set_task_status(self, *, task_id: int, task_status: Status.ValueType):
self.pending_task_store.set_task_status(task_id=task_id, task_status=task_status)

def complete_task(self, *, task_id: int) -> RpcTask | None:
return self.set_task_status(task_id=task_id, task_status=PendingTasks.States.COMPLETE)
def complete_task(self, *, task_id: int):
self.set_task_status(task_id=task_id, task_status=COMPLETE)
Copy link
Member Author

@john-z-yang john-z-yang Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the return value here because it's not clear to me what we're trying to accomplish with the return value (nothing uses it)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Scope: Backend Automatically applied to PRs that change backend components
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants