-
Notifications
You must be signed in to change notification settings - Fork 563
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added handling members with group.instance.id
set i.e. static members
#4684
Added handling members with group.instance.id
set i.e. static members
#4684
Conversation
debcf68
to
2110d3e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool, just some nits, and a couple of questions.
2110d3e
to
b9c4e05
Compare
Will review on Monday! Sorry about delay |
b9c4e05
to
2f77a7a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good stuff. mostly nits / questions
2f77a7a
to
692f0b2
Compare
Signed-off-by: Michal Maslanka <[email protected]>
692f0b2
to
47f1d0a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks really good.
what should the testing approach be? i see ducktape tests for important scenarios, but the scale of the tests seems to be 2 group members. it seems like we need to expand testing for static members, at a minimum scaling the tests to be more substantial.
separate from this PR, do you think we need more testing for consumer groups in general in ducktape? how can we bring some of the recent consumer group scaling work into ducktape to run in cdt?
src/v/kafka/server/group.cc
Outdated
// <kafka>Only return MEMBER_ID_REQUIRED error if joinGroupRequest version | ||
// is >= 4 and groupInstanceId is configured to unknown.</kafka> | ||
if (r.version >= api_version(4) && !r.data.group_instance_id) { | ||
if (member->is_joining()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is it ok to remove this condition that is dependent on request version, and which is handled by upstream kafka? is the same condition handled in some other way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think that diff
is here misleading. This is entirely new code path that was introduced to handle members with group.iinstance.id
assigned
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeh, but i guess what i'm saying is that if (r.version >= api_version(4) && !r.data.group_instance_id) {
is a specific condition in the protocol controlling behavior that clients expect. and it's not clear if that behavior is preserved and where its new location is at in the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahh i get it now. it's implied in add_new_dynamic_member and then the version check is relocated there!
// <kafka>Only return MEMBER_ID_REQUIRED error if joinGroupRequest version | ||
// is >= 4 and groupInstanceId is configured to unknown.</kafka> | ||
if (r.version >= api_version(4) && !r.data.group_instance_id) { | ||
if (r.version >= api_version(4)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth asserting !r.data.group_instance_id?
it seems like it. and also brining back the kafka comment to indicate that we are preserving kafka semantics.
I will create a follow up PR. I have modified |
Signed-off-by: Michal Maslanka <[email protected]>
Checkpoint assignments may come from either the sync request or the members metadata stored in group metadata. Added generic checkpoint method that allows to build a checkpoint using different assignments providers. Signed-off-by: Michal Maslanka <[email protected]>
Previously the `stop_signal` was destroyed in `fixture::start()` method leading to the signaling stop immediately after fixture application was started. Signed-off-by: Michal Maslanka <[email protected]>
Added handling members with `group.instance.id` set i.e. static members. When static member joins the group its `member_id` is identified using a mapping stored in `_static_members` map. This way broker can identify the `member_id` that is assigned to requested instance id. In dynamic membership protocol `member_id` is assigned by broker after each restart of consumer (there is no state persistence on the client side) hence every consumer restart causes rebalance since member with new id joins the group. With static members, when `group.instance.id` is present in the `JoinRequest` and mapping is already present coordinator will use that mapping to find member metadata. When using static membership protocol consumers does not send `LeaveGroup` requests when stopping. More information: [KIP-345](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances) Signed-off-by: Michal Maslanka <[email protected]>
Signed-off-by: Michal Maslanka <[email protected]>
Signed-off-by: Michal Maslanka <[email protected]>
Signed-off-by: Michal Maslanka <[email protected]>
Signed-off-by: Michal Maslanka <[email protected]>
Signed-off-by: Michal Maslanka <[email protected]>
Signed-off-by: Michal Maslanka <[email protected]>
Signed-off-by: Michal Maslanka <[email protected]>
Added `formatter_properties` parameter to control how messages are formatted when using Kafka CLI tools consumer. Signed-off-by: Michal Maslanka <[email protected]>
When consumer is being stopped it may send `LeaveGroup` request to the broker. Using clean shutdown (SIGTERM) to give the consumer process oportunity to finish gracefully. Signed-off-by: Michal Maslanka <[email protected]>
Signed-off-by: Michal Maslanka <[email protected]>
Added tests validating basic consumer group operations when using dynamic and static membership protocols. Signed-off-by: Michal Maslanka <[email protected]>
47f1d0a
to
a91767e
Compare
Cover letter
When static member joins the group its
member_id
is identified using a mapping stored in_static_members
map. This way broker can identify themember_id
that is assigned to requested instance id. In dynamic membership protocolmember_id
is assigned by broker after each restart of consumer (there is no state persistence on the client side) hence every consumer restart causes rebalance since member with new id joins the group. With static members, whengroup.instance.id
is present in theJoinRequest
and mapping is already present coordinator will use that mapping to find member metadata.When using static membership protocol consumers does not send
LeaveGroup
requests when stopping.More information:
KIP-345
Fixes: #1335
Depends on: #4673
Release notes
Improvements