Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Processor] Stop processing events on draining and send signal to continue processing #3120

Merged
merged 13 commits into from
Jan 17, 2024

Conversation

rokatyy
Copy link
Contributor

@rokatyy rokatyy commented Jan 14, 2024

Jira - https://jira.iguazeng.com/browse/NUC-119

This pull request addresses an issue where the processor sent events to the runtime after draining and before reconnection. Now, the wrapper will skip all events received after draining and wait for the SIGCONT signal to resume event processing.
These changes also allow us to get rid of isDrained checks on processor side.

Copy link
Contributor

@TomerShor TomerShor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR coincides with #3119, which handles the case where a drain signal is received while we wait for an event.
Since it is changing a bits of the drain registering and the same serve_requests function, we should make sure we align everything before merging.

pkg/processor/runtime/runtime.go Outdated Show resolved Hide resolved
pkg/processor/runtime/rpc/abstract.go Outdated Show resolved Hide resolved
pkg/processor/trigger/kafka/trigger.go Outdated Show resolved Hide resolved
pkg/processor/trigger/trigger.go Outdated Show resolved Hide resolved
pkg/processor/worker/worker.go Show resolved Hide resolved
pkg/processor/runtime/python/py/_nuclio_wrapper.py Outdated Show resolved Hide resolved
pkg/processor/runtime/python/py/_nuclio_wrapper.py Outdated Show resolved Hide resolved
pkg/processor/runtime/python/py/_nuclio_wrapper.py Outdated Show resolved Hide resolved
Copy link
Contributor

@quaark quaark left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!
A couple of questions from my end

pkg/processor/runtime/python/py/_nuclio_wrapper.py Outdated Show resolved Hide resolved
pkg/processor/trigger/kafka/trigger.go Outdated Show resolved Hide resolved
@rokatyy rokatyy requested a review from quaark January 15, 2024 15:06
pkg/processor/runtime/python/py/_nuclio_wrapper.py Outdated Show resolved Hide resolved
pkg/processor/runtime/rpc/abstract.go Outdated Show resolved Hide resolved
pkg/processor/trigger/trigger.go Outdated Show resolved Hide resolved
Copy link
Contributor

@TomerShor TomerShor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last couple of comments and we're good to go 🎈

pkg/processor/runtime/python/py/_nuclio_wrapper.py Outdated Show resolved Hide resolved
pkg/processor/runtime/python/py/_nuclio_wrapper.py Outdated Show resolved Hide resolved
Copy link
Contributor

@TomerShor TomerShor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work 🤝

@rokatyy rokatyy marked this pull request as ready for review January 16, 2024 17:09
@TomerShor TomerShor merged commit 1cec567 into nuclio:development Jan 17, 2024
11 checks passed
except BaseException as exc:
await self._on_handle_event_error(exc)
# do not handle an event if a worker is drained
if not self._discard_events:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is worth to "debug"-log the discarded events

self._is_termination_needed = True
# if serving loop is waiting for an event, unblock this operation to allow the termination callback to be called
if self._event_message_length_task:
self._event_message_length_task.cancel()

def _on_continue_signal(self, signal_name):
self._logger.debug_with('Received continue signal', signal=signal_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._logger.debug_with('Received signal', signal=signal_name)
for consistency with _on_drain_signal and _on_termination_signal

w.isDrained.Store(isDrained)
func (w *Worker) Continue() error {
if err := w.runtime.Continue(); err != nil {
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errors.Wrap(err, ...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liranbg we do the same return err in Drain and Terminate. Err will be wrapped on the next step. We have 1-1 relationships between worker and runtime, so maybe we can leave as is?

if err = k.SignalWorkersToContinue(); err != nil {
k.Logger.WarnWith("Failed to signal worker to continue event processing",
"err", err)
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it is not being able to signaling after 2-3 times I would suggest to panic and let the entire function to restart, perhaps a zombie process is there and we dont want to get stuck with kafka stream not being consumed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants