Skip to content

Commit

Permalink
k/tests: wait for coordinator of correct group id
Browse files Browse the repository at this point in the history
After changing default number of partitions for `__consumer_offsets`
topic we need to wait for particular group coordinator before requesting
consumer to join the group.

Fixes: redpanda-data#5495, redpanda-data#5466

Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Jul 18, 2022
1 parent e481a87 commit c0edc77
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions src/v/kafka/server/tests/consumer_groups_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ join_group_request make_join_group_request(
return req;
}
struct consumer_offsets_fixture : public redpanda_thread_fixture {
void wait_for_consumer_offsets_topic() {
void
wait_for_consumer_offsets_topic(const kafka::group_instance_id& group) {
app.controller->get_feature_table()
.local()
.await_feature(
Expand All @@ -56,7 +57,7 @@ struct consumer_offsets_fixture : public redpanda_thread_fixture {
auto client = make_kafka_client().get0();

client.connect().get();
kafka::find_coordinator_request req("key");
kafka::find_coordinator_request req(group);
req.data.key_type = kafka::coordinator_type::group;
client.dispatch(std::move(req), kafka::api_version(1)).get();

Expand All @@ -66,9 +67,9 @@ struct consumer_offsets_fixture : public redpanda_thread_fixture {
model::kafka_consumer_offsets_nt, model::timeout_clock::now() + 30s)
.get();

tests::cooperative_spin_wait_with_timeout(30s, [&client] {
tests::cooperative_spin_wait_with_timeout(30s, [&group, &client] {
kafka::describe_groups_request req;
req.data.groups.emplace_back("key");
req.data.groups.emplace_back(group);
return client.dispatch(std::move(req), kafka::api_version(1))
.then([](kafka::describe_groups_response response) {
return response.data.groups.front().error_code
Expand All @@ -82,7 +83,8 @@ struct consumer_offsets_fixture : public redpanda_thread_fixture {
};

FIXTURE_TEST(join_empty_group_static_member, consumer_offsets_fixture) {
wait_for_consumer_offsets_topic();
kafka::group_instance_id gr("instance-1");
wait_for_consumer_offsets_topic(gr);
auto client = make_kafka_client().get0();
auto deferred = ss::defer([&client] {
client.stop().then([&client] { client.shutdown(); }).get();
Expand All @@ -92,7 +94,7 @@ FIXTURE_TEST(join_empty_group_static_member, consumer_offsets_fixture) {
auto req = make_join_group_request(
unknown_member_id, "group-test", {"p1", "p2"}, "random");
// set group instance id
req.data.group_instance_id = kafka::group_instance_id("instance-1");
req.data.group_instance_id = gr;
auto resp = client.dispatch(std::move(req), kafka::api_version(5)).get0();

info("response: {}", resp.data);
Expand Down

0 comments on commit c0edc77

Please sign in to comment.