From e693bead59ec286596f23f29b50a9f49cb6ceb9a Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Mon, 4 Jul 2022 14:36:29 -0700 Subject: [PATCH] k/group: avoid ABA problem Updating consumer groups to use conditional replication to prevent a situation when after a check a leadership jumps away, invalidates the check, jumps back just in time for the post check replication. check condition leadership goes to a new node the node replicates something which invalidates the conditions the leadership jumps back the node successfully replicates assuming that the condition is true Switched to a conditional replicate to fix the problem. When a group manager detects a leadership change it replays the group's records to reconstruct the groups state. We cache the current term in the state and use it as a condition on replicate. In this case we know that if the leadership bounce the replication won't pass. --- src/v/kafka/server/group.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 32815af490d9..b7d7c2262a81 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -2104,6 +2104,7 @@ group::offset_commit_stages group::store_offsets(offset_commit_request&& r) { auto reader = model::make_memory_record_batch_reader(std::move(batch)); auto replicate_stages = _partition->raft()->replicate_in_stages( + _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); @@ -2493,6 +2494,7 @@ ss::future group::remove() { try { auto result = co_await _partition->raft()->replicate( + _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); if (result) { @@ -2573,6 +2575,7 @@ group::remove_topic_partitions(const std::vector& tps) { try { auto result = co_await _partition->raft()->replicate( + _term, std::move(reader), raft::replicate_options(raft::consistency_level::quorum_ack)); if (result) { @@ -2600,6 +2603,7 @@ group::remove_topic_partitions(const std::vector& tps) { ss::future> group::store_group(model::record_batch batch) { return _partition->raft()->replicate( + _term, model::make_memory_record_batch_reader(std::move(batch)), raft::replicate_options(raft::consistency_level::quorum_ack)); }