-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[v2] Kafka scaler isactive latest #996
Conversation
…olicy Signed-off-by: grassiale <alessandro.grassi01@gmail.com>
… missing, wip. Signed-off-by: grassiale <alessandro.grassi01@gmail.com>
552cafb
to
6eac8bb
Compare
@ppatierno PTAL ^ |
FYI @tbickford this is the Kafka issue we were talking about |
@grassiale any update on this please? |
Hi. |
OK, thanks |
67598d7
to
50d8b1f
Compare
Signed-off-by: grassiale <alessandro.grassi01@gmail.com>
50d8b1f
to
31fdbe2
Compare
Signed-off-by: grassiale <alessandro.grassi01@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but would like to have an ack from some Kafka guru 😄
pkg/scalers/kafka_scaler.go
Outdated
@@ -317,7 +317,7 @@ func (s *kafkaScaler) getLagForPartition(partition int32, offsets *sarama.Offset | |||
|
|||
if consumerOffset == sarama.OffsetNewest || consumerOffset == sarama.OffsetOldest { | |||
if s.metadata.offsetResetPolicy == latest { | |||
lag = 0 | |||
lag = sarama.OffsetNewest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know the sarama library but there is something that sounds strange to me first of all in the first if
statement:
if consumerOffset == sarama.OffsetNewest || consumerOffset == sarama.OffsetOldest
When a consumer is new in a consumer group there is no offset committed of course. Why the sarama fetch offset should return OffsetNewest
or OffsetOldest
? It could return the one or the other if it knew the reset policy asked by the consumer (so returning OffsetNewest
if latest
and OffsetOldest
if earliest
) but it doesn't know, right?
It's not actually clear to me the sarama behaviour here tbh.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is actually not Sarama deciding to return -1, but the kafka coordinator here: https://github.com/apache/kafka/blob/8bde3d476f23a9d9dd06b3360281a3a3fca2804e/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L481
From what I see, if no consumer group exists, or no offset is committed, the coordinator will return the value of OffsetFetchResponse.INVALID_OFFSET that is -1:
https://github.com/apache/kafka/blob/6da70f9b95d2f7cc1f32de4e405661a8015bfa7e/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java#L53
Maybe we could introduce the concept of invalid offset instead of using offsetNewest and Oldest that are related to the consumer and not the admin client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely agree ... this -1 value has a different meaning them not related to what Sarama returns, so it's better having it separated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we should return an error in getLagForPartition if the offset is invalid? This would help identify the case of missing offsets, hence having a more significant condition also for the isActive method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
absolutely yes, I don't like the if
condition based on sarama newest and oldest + lag > 0 or lag != 0. An error should simplify and make it clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I didn't know whether to log an error or an info message with a low verbosity level, since this is not a fatal error as, for example, the other two in the getLagForPartition method.
…fka coordinator is invalid (-1) Signed-off-by: grassiale <alessandro.grassi01@gmail.com>
lag, err := s.getLagForPartition(partition, offsets) | ||
if err != nil { | ||
return true, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only concern is, is any error really ok to saying isActive = true? Maybe we should check the exact error, in this case the invalid offset one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct
Signed-off-by: grassiale <alessandro.grassi01@gmail.com>
@grassiale @ppatierno thanks a lot! |
Hello, while implementing the e2e tests requested here: #985
I've come across a new issue that is not making the scaler behave correctly when the offset reset policy is set to latest (default).
When a new consumer is created, meaning it has no offsets committed, the sarama offset fetch will return a value of -1 for all the partitions of the topic it consumes from. If we set the lag to 0, as in the previous solution, the isActive method will always return false resulting in the scaling to never occur even if new messages are written in the topic.
In order to fix this, I propose the following:
With that in mind I also changed E2E tests so that:
Checklist
Fixes #