Skip to content

Commit

Permalink
r/tests: add node isolation scenarios to reconfiguration utests
Browse files Browse the repository at this point in the history
  • Loading branch information
ztlpn committed Apr 10, 2024
1 parent b3a12e6 commit 86c526f
Showing 1 changed file with 106 additions and 20 deletions.
126 changes: 106 additions & 20 deletions src/v/raft/tests/raft_reconfiguration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "serde/serde.h"
#include "storage/record_batch_builder.h"
#include "test_utils/async.h"
#include "test_utils/randoms.h"
#include "test_utils/test.h"

#include <seastar/core/coroutine.hh>
Expand All @@ -35,6 +36,7 @@

#include <absl/container/flat_hash_set.h>
#include <fmt/core.h>
#include <fmt/ranges.h>
#include <gtest/gtest.h>

#include <algorithm>
Expand All @@ -49,24 +51,39 @@ static ss::logger test_log("reconfiguration-test");
*/

using use_snapshot = ss::bool_class<struct use_snapshot_tag>;
using use_initial_learner_offset = ss::bool_class<struct use_snapshot_tag>;
struct test_params {
use_snapshot snapshot;
int initial_size;
int nodes_to_add;
int nodes_to_remove;
use_initial_learner_offset learner_start_offset;
consistency_level consistency_level = raft::consistency_level::quorum_ack;
using use_initial_learner_offset
= ss::bool_class<struct use_initial_learner_offset_tag>;

enum class isolated_t {
none,
old_leader,
old_followers,
random,
};

std::ostream& operator<<(std::ostream& o, isolated_t pt) {
switch (pt) {
case isolated_t::none:
return o << "isolated::none";
case isolated_t::old_leader:
return o << "isolated::old_leader";
case isolated_t::old_followers:
return o << "isolated::old_followers";
case isolated_t::random:
return o << "isolated::random";
}
__builtin_unreachable();
}

struct reconfiguration_test
: testing::WithParamInterface<std::tuple<
use_snapshot,
int,
int,
int,
use_initial_learner_offset,
consistency_level>>
consistency_level,
isolated_t>>
, raft_fixture {
ss::future<> wait_for_reconfiguration_to_finish(
const absl::flat_hash_set<model::node_id>& target_ids,
Expand Down Expand Up @@ -161,38 +178,43 @@ static void assert_offset_translator_state_is_consistent(

TEST_P_CORO(reconfiguration_test, configuration_replace_test) {
const auto param = GetParam();
use_snapshot snapshot = std::get<0>(param);
auto snapshot = std::get<use_snapshot>(param);
int initial_size = std::get<1>(param);
int nodes_to_add = std::get<2>(param);
int nodes_to_remove = std::get<3>(param);
use_initial_learner_offset use_learner_start_offset = std::get<4>(param);
consistency_level consistency_level = std::get<5>(param);
auto use_learner_start_offset = std::get<use_initial_learner_offset>(param);
auto consistency_lvl = std::get<consistency_level>(param);
auto isolated = std::get<isolated_t>(param);
// skip test cases that makes no sense
if (
nodes_to_add + initial_size - nodes_to_remove <= 0
|| initial_size < nodes_to_remove) {
co_return;
}
if (initial_size == 1 && isolated == isolated_t::old_followers) {
co_return;
}
fmt::print(
"test parameters: {{snapshot: {}, initial_size: {}, "
"nodes_to_add: {}, nodes_to_remove: {}, use_learner_start_offset: {}, "
"consistency_lvl: {}}}\n",
"consistency_lvl: {}, isolated: {}}}\n",
snapshot,
initial_size,
nodes_to_add,
nodes_to_remove,
use_learner_start_offset,
consistency_level);
consistency_lvl,
isolated);
// create group with initial configuration
co_await create_simple_group(initial_size);

// replicate batches
auto result = co_await retry_with_leader(
model::timeout_clock::now() + 30s,
[this, consistency_level](raft_node_instance& leader_node) {
[this, consistency_lvl](raft_node_instance& leader_node) {
return leader_node.raft()
->replicate(
make_random_batches(), replicate_options(consistency_level))
make_random_batches(), replicate_options(consistency_lvl))
.then([this](::result<replicate_result> r) {
if (!r) {
return ss::make_ready_future<::result<model::offset>>(
Expand All @@ -208,7 +230,7 @@ TEST_P_CORO(reconfiguration_test, configuration_replace_test) {
auto& leader_node = node(leader);
model::offset start_offset = leader_node.raft()->start_offset();
if (snapshot) {
if (consistency_level == consistency_level::leader_ack) {
if (consistency_lvl == consistency_level::leader_ack) {
for (auto& [_, n] : nodes()) {
co_await n->raft()->refresh_commit_index();
}
Expand Down Expand Up @@ -272,6 +294,12 @@ TEST_P_CORO(reconfiguration_test, configuration_replace_test) {
ASSERT_EQ_CORO(
current_nodes.size(), initial_size + nodes_to_add - nodes_to_remove);

vlog(
test_log.info,
"dispatching reconfiguration: {} -> {}",
old_node_ids,
current_node_ids);

// update group configuration
auto success = co_await retry_with_leader(
model::timeout_clock::now() + 30s,
Expand All @@ -287,10 +315,53 @@ TEST_P_CORO(reconfiguration_test, configuration_replace_test) {
});
});
ASSERT_TRUE_CORO(success);

auto isolated_nodes
= ss::make_lw_shared<absl::flat_hash_set<model::node_id>>();
switch (isolated) {
case isolated_t::none:
break;
case isolated_t::old_leader:
isolated_nodes->insert(leader);
break;
case isolated_t::old_followers:
*isolated_nodes = old_node_ids;
isolated_nodes->erase(leader);
break;
case isolated_t::random:
for (auto n : all_ids()) {
if (tests::random_bool()) {
isolated_nodes->insert(n);
}
}
break;
}

if (!isolated_nodes->empty()) {
vlog(test_log.info, "isolating nodes: {}", *isolated_nodes);

for (const auto& [source_id, node] : nodes()) {
node->on_dispatch([=](model::node_id dest_id, raft::msg_type) {
if (
isolated_nodes->contains(source_id)
!= isolated_nodes->contains(dest_id)) {
return ss::sleep(5s);
}
return ss::now();
});
}

// heal the partition 5s later
(void)ss::sleep(5s).then([isolated_nodes] {
vlog(test_log.info, "healing the network partition");
isolated_nodes->clear();
});
}

co_await with_leader(
30s, [this, consistency_level](raft_node_instance& leader_node) {
30s, [this, consistency_lvl](raft_node_instance& leader_node) {
// wait for committed offset to propagate
if (consistency_level == raft::consistency_level::quorum_ack) {
if (consistency_lvl == raft::consistency_level::quorum_ack) {
return wait_for_committed_offset(
leader_node.raft()->committed_offset(), 30s);
} else {
Expand Down Expand Up @@ -340,4 +411,19 @@ INSTANTIATE_TEST_SUITE_P(
testing::Values(0, 1, 3), // to remove
testing::Values(use_initial_learner_offset::yes),
testing::Values(
consistency_level::quorum_ack, consistency_level::leader_ack)));
consistency_level::quorum_ack, consistency_level::leader_ack),
testing::Values(isolated_t::none)));

INSTANTIATE_TEST_SUITE_P(
reconfiguration_with_isolated_nodes,
reconfiguration_test,
testing::Combine(
testing::Values(use_snapshot::no),
testing::Values(3), // initial size
testing::Values(2), // to add
testing::Values(0, 2), // to remove
testing::Values(use_initial_learner_offset::yes),
testing::Values(
consistency_level::quorum_ack, consistency_level::leader_ack),
testing::Values(
isolated_t::old_followers, isolated_t::old_leader, isolated_t::random)));

0 comments on commit 86c526f

Please sign in to comment.