Skip to content

Commit

Permalink
k/group: avoid ABA problem
Browse files Browse the repository at this point in the history
Updating consumer groups to use conditional replication to prevent a
situation when after a check a leadership jumps away, invalidates the
check, jumps back just in time for the post check replication.

check condition
  leadership goes to a new node
  the node replicates something which invalidates the conditions
  the leadership jumps back
the node successfully replicates assuming that the condition is true

Switched to a conditional replicate to fix the problem. When a group
manager detects a leadership change it replays the group's records to
reconstruct the groups state. We cache the current term in the state
and use it as a condition on replicate. In this case we know that if
the leadership bounce the replication won't pass.
  • Loading branch information
rystsov committed Jul 9, 2022
1 parent 67a3112 commit e693bea
Showing 1 changed file with 4 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2104,6 +2104,7 @@ group::offset_commit_stages group::store_offsets(offset_commit_request&& r) {
auto reader = model::make_memory_record_batch_reader(std::move(batch));

auto replicate_stages = _partition->raft()->replicate_in_stages(
_term,
std::move(reader),
raft::replicate_options(raft::consistency_level::quorum_ack));

Expand Down Expand Up @@ -2493,6 +2494,7 @@ ss::future<error_code> group::remove() {

try {
auto result = co_await _partition->raft()->replicate(
_term,
std::move(reader),
raft::replicate_options(raft::consistency_level::quorum_ack));
if (result) {
Expand Down Expand Up @@ -2573,6 +2575,7 @@ group::remove_topic_partitions(const std::vector<model::topic_partition>& tps) {

try {
auto result = co_await _partition->raft()->replicate(
_term,
std::move(reader),
raft::replicate_options(raft::consistency_level::quorum_ack));
if (result) {
Expand Down Expand Up @@ -2600,6 +2603,7 @@ group::remove_topic_partitions(const std::vector<model::topic_partition>& tps) {
ss::future<result<raft::replicate_result>>
group::store_group(model::record_batch batch) {
return _partition->raft()->replicate(
_term,
model::make_memory_record_batch_reader(std::move(batch)),
raft::replicate_options(raft::consistency_level::quorum_ack));
}
Expand Down

0 comments on commit e693bea

Please sign in to comment.