From 32a8a2d3179f953b14c0c64ee3a08177366e8bf2 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Sun, 11 Jun 2023 16:51:34 -0700 Subject: [PATCH 1/4] node_status_backend: factor connection_source_shard into a method --- src/v/cluster/node_status_backend.cc | 2 +- src/v/cluster/node_status_backend.h | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/v/cluster/node_status_backend.cc b/src/v/cluster/node_status_backend.cc index ad0b6d09b64e..96e64d8c12b9 100644 --- a/src/v/cluster/node_status_backend.cc +++ b/src/v/cluster/node_status_backend.cc @@ -212,7 +212,7 @@ ss::future> node_status_backend::send_node_status_request( ss::future node_status_backend::maybe_create_client( model::node_id target, net::unresolved_address address) { - auto source_shard = target % ss::smp::count; + auto source_shard = connection_source_shard(target); auto target_shard = rpc::connection_cache::shard_for( _self, source_shard, target); diff --git a/src/v/cluster/node_status_backend.h b/src/v/cluster/node_status_backend.h index 5ab31cc1a619..76e0732c80e5 100644 --- a/src/v/cluster/node_status_backend.h +++ b/src/v/cluster/node_status_backend.h @@ -88,6 +88,9 @@ class node_status_backend { }; private: + ss::shard_id connection_source_shard(model::node_id target) const { + return target % ss::smp::count; + } model::node_id _self; ss::sharded& _members_table; ss::sharded& _feature_table; From a22b635c0e89e67a73f7c16026100b6d29717eb3 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Sun, 11 Jun 2023 17:13:32 -0700 Subject: [PATCH 2/4] node_status_backend: reset client backoff on peer checkin When a peer restarts and a backoff is applied locally, it needs to be reset once the peer is available again. Otherwise the transport does not reconnect until the entire backoff elapses thus marking it unavailable for downstream consumers like partition balancer. --- src/v/cluster/node_status_backend.cc | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/v/cluster/node_status_backend.cc b/src/v/cluster/node_status_backend.cc index 96e64d8c12b9..7904bc69c633 100644 --- a/src/v/cluster/node_status_backend.cc +++ b/src/v/cluster/node_status_backend.cc @@ -253,11 +253,21 @@ node_status_backend::process_reply(result reply) { } ss::future -node_status_backend::process_request(node_status_request) { +node_status_backend::process_request(node_status_request request) { _stats.rpcs_received += 1; - node_status_reply reply = {.replier_metadata = {.node_id = _self}}; - return ss::make_ready_future(std::move(reply)); + auto sender = request.sender_metadata.node_id; + auto sender_md = _node_status_table.local().get_node_status(sender); + if ( + sender_md + // Check if the peer has atleast 2 missed heart beats. This avoids + // a cross shard invoke in happy path when no reset is needed. + && ss::lowres_clock::now() - sender_md->last_seen > 2 * _period()) { + co_await _node_connection_cache.local().reset_client_backoff( + _self, connection_source_shard(sender), sender); + } + + co_return node_status_reply{.replier_metadata = {.node_id = _self}}; } void node_status_backend::setup_metrics( From c111134ad55580a0d5f3c054238d37a4c6dd4028 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Mon, 12 Jun 2023 15:26:31 -0700 Subject: [PATCH 3/4] node_status_backend: Make reconnect backoff configurable Adds node_status_reconnect_max_backoff_ms cluster configuration and defaults to 15s. --- src/v/cluster/cluster_utils.cc | 29 ++++++++++++++++++-------- src/v/cluster/cluster_utils.h | 8 ++++++- src/v/cluster/node_status_backend.cc | 23 +++++++++++++++++++- src/v/cluster/node_status_backend.h | 11 +++++++++- src/v/config/configuration.cc | 7 +++++++ src/v/config/configuration.h | 1 + src/v/redpanda/application.cc | 4 ++++ src/v/rpc/connection_cache.cc | 10 +++++++++ src/v/rpc/connection_cache.h | 3 +++ tests/rptest/tests/node_status_test.py | 6 ++++++ 10 files changed, 90 insertions(+), 12 deletions(-) diff --git a/src/v/cluster/cluster_utils.cc b/src/v/cluster/cluster_utils.cc index 6ed6dfeb3ea4..7e6340bce467 100644 --- a/src/v/cluster/cluster_utils.cc +++ b/src/v/cluster/cluster_utils.cc @@ -126,7 +126,8 @@ ss::future<> maybe_create_tcp_client( rpc::connection_cache& cache, model::node_id node, net::unresolved_address rpc_address, - config::tls_config tls_config) { + config::tls_config tls_config, + rpc::backoff_policy backoff) { auto f = ss::now(); if (cache.contains(node)) { // client is already there, check if configuration changed @@ -141,11 +142,15 @@ ss::future<> maybe_create_tcp_client( return f.then([&cache, node, rpc_address = std::move(rpc_address), - tls_config = std::move(tls_config)]() mutable { + tls_config = std::move(tls_config), + backoff = std::move(backoff)]() mutable { return maybe_build_reloadable_certificate_credentials( std::move(tls_config)) .then( - [&cache, node, rpc_address = std::move(rpc_address)]( + [&cache, + node, + rpc_address = std::move(rpc_address), + backoff = std::move(backoff)]( ss::shared_ptr&& cert) mutable { return cache.emplace( node, @@ -155,8 +160,7 @@ ss::future<> maybe_create_tcp_client( .disable_metrics = net::metrics_disabled( config::shard_local_cfg().disable_metrics), .version = cache.get_default_transport_version()}, - rpc::make_exponential_backoff_policy( - std::chrono::seconds(1), std::chrono::seconds(15))); + std::move(backoff)); }); }); } @@ -166,13 +170,20 @@ ss::future<> add_one_tcp_client( ss::sharded& clients, model::node_id node, net::unresolved_address addr, - config::tls_config tls_config) { + config::tls_config tls_config, + rpc::backoff_policy backoff) { return clients.invoke_on( owner, - [node, rpc_address = std::move(addr), tls_config = std::move(tls_config)]( - rpc::connection_cache& cache) mutable { + [node, + rpc_address = std::move(addr), + tls_config = std::move(tls_config), + backoff = std::move(backoff)](rpc::connection_cache& cache) mutable { return maybe_create_tcp_client( - cache, node, std::move(rpc_address), std::move(tls_config)); + cache, + node, + std::move(rpc_address), + std::move(tls_config), + std::move(backoff)); }); } diff --git a/src/v/cluster/cluster_utils.h b/src/v/cluster/cluster_utils.h index a769585927f6..4932a4b4b39f 100644 --- a/src/v/cluster/cluster_utils.h +++ b/src/v/cluster/cluster_utils.h @@ -91,12 +91,18 @@ inline std::vector create_topic_results( }); } +inline rpc::backoff_policy default_backoff_policy() { + return rpc::make_exponential_backoff_policy( + std::chrono::seconds(1), std::chrono::seconds(15)); +} + ss::future<> add_one_tcp_client( ss::shard_id owner, ss::sharded& clients, model::node_id node, net::unresolved_address addr, - config::tls_config tls_config); + config::tls_config tls_config, + rpc::backoff_policy backoff_policy = default_backoff_policy()); ss::future<> update_broker_client( model::node_id, diff --git a/src/v/cluster/node_status_backend.cc b/src/v/cluster/node_status_backend.cc index 7904bc69c633..b8b84d216a26 100644 --- a/src/v/cluster/node_status_backend.cc +++ b/src/v/cluster/node_status_backend.cc @@ -34,12 +34,14 @@ node_status_backend::node_status_backend( ss::sharded& feature_table, ss::sharded& node_status_table, config::binding period, + config::binding max_reconnect_backoff, ss::sharded& as) : _self(self) , _members_table(members_table) , _feature_table(feature_table) , _node_status_table(node_status_table) , _period(std::move(period)) + , _max_reconnect_backoff(std::move(max_reconnect_backoff)) , _rpc_tls_config(config::node().rpc_server_tls()) , _as(as) { if (!config::shard_local_cfg().disable_public_metrics()) { @@ -61,6 +63,20 @@ node_status_backend::node_status_backend( } }); + _max_reconnect_backoff.watch([this]() { + ssx::spawn_with_gate(_gate, [this] { + auto new_backoff = _max_reconnect_backoff(); + vlog( + clusterlog.info, + "Updating max reconnect backoff to: {}ms", + new_backoff.count()); + // Invalidate all transports so they are created afresh with new + // backoff. + return _node_connection_cache.invoke_on_all( + [](rpc::connection_cache& local) { return local.remove_all(); }); + }); + }); + _timer.set_callback([this] { tick(); }); } @@ -217,7 +233,12 @@ ss::future node_status_backend::maybe_create_client( _self, source_shard, target); co_await add_one_tcp_client( - target_shard, _node_connection_cache, target, address, _rpc_tls_config); + target_shard, + _node_connection_cache, + target, + address, + _rpc_tls_config, + create_backoff_policy()); co_return source_shard; } diff --git a/src/v/cluster/node_status_backend.h b/src/v/cluster/node_status_backend.h index 76e0732c80e5..19cee3cac5cb 100644 --- a/src/v/cluster/node_status_backend.h +++ b/src/v/cluster/node_status_backend.h @@ -54,7 +54,8 @@ class node_status_backend { ss::sharded&, ss::sharded&, ss::sharded&, - config::binding, + config::binding /* period*/, + config::binding /* max_backoff*/, ss::sharded&); ss::future<> start(); @@ -91,12 +92,20 @@ class node_status_backend { ss::shard_id connection_source_shard(model::node_id target) const { return target % ss::smp::count; } + + rpc::backoff_policy create_backoff_policy() const { + static constexpr auto default_backoff_base = 1000ms; + return rpc::make_exponential_backoff_policy( + std::min(default_backoff_base, _max_reconnect_backoff()), + _max_reconnect_backoff()); + } model::node_id _self; ss::sharded& _members_table; ss::sharded& _feature_table; ss::sharded& _node_status_table; config::binding _period; + config::binding _max_reconnect_backoff; config::tls_config _rpc_tls_config; ss::sharded _node_connection_cache; diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 628b73f14b21..8170103608d5 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -1924,6 +1924,13 @@ configuration::configuration() "establish liveness status outside of the Raft protocol.", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, 100ms) + , node_status_reconnect_max_backoff_ms( + *this, + "node_status_reconnect_max_backoff_ms", + "Maximum backoff (in ms) to reconnect to an unresponsive peer during " + "node status liveness checks.", + {.needs_restart = needs_restart::no, .visibility = visibility::user}, + 15s) , enable_controller_log_rate_limiting( *this, "enable_controller_log_rate_limiting", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index aed257f39718..2e49b0606d4f 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -387,6 +387,7 @@ struct configuration final : public config_store { property enable_rack_awareness; property node_status_interval; + property node_status_reconnect_max_backoff_ms; // controller log limitng property enable_controller_log_rate_limiting; property rps_limit_topic_operations; diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 88d214941ba4..d430a3f30404 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -1240,6 +1240,10 @@ void application::wire_up_redpanda_services(model::node_id node_id) { std::ref(node_status_table), ss::sharded_parameter( [] { return config::shard_local_cfg().node_status_interval.bind(); }), + ss::sharded_parameter([] { + return config::shard_local_cfg() + .node_status_reconnect_max_backoff_ms.bind(); + }), std::ref(_as)) .get(); diff --git a/src/v/rpc/connection_cache.cc b/src/v/rpc/connection_cache.cc index 264631ef2bee..d040e91b0a59 100644 --- a/src/v/rpc/connection_cache.cc +++ b/src/v/rpc/connection_cache.cc @@ -63,6 +63,16 @@ ss::future<> connection_cache::remove(model::node_id n) { }); } +ss::future<> connection_cache::remove_all() { + auto units = co_await _mutex.get_units(); + auto cache = std::exchange(_cache, {}); + co_await parallel_for_each(cache, [](auto& it) { + auto& [_, cli] = it; + return cli->stop(); + }); + cache.clear(); +} + /// \brief closes all client connections ss::future<> connection_cache::do_shutdown() { auto units = co_await _mutex.get_units(); diff --git a/src/v/rpc/connection_cache.h b/src/v/rpc/connection_cache.h index ef968029cd4e..5e3e033de1a3 100644 --- a/src/v/rpc/connection_cache.h +++ b/src/v/rpc/connection_cache.h @@ -57,6 +57,9 @@ class connection_cache final /// \brief removes the node *and* closes the connection ss::future<> remove(model::node_id n); + /// \brief similar to remove but removes all nodes. + ss::future<> remove_all(); + /// \brief closes all connections ss::future<> do_shutdown(); void shutdown(); diff --git a/tests/rptest/tests/node_status_test.py b/tests/rptest/tests/node_status_test.py index a1bb3ef0591f..a37175b7af69 100644 --- a/tests/rptest/tests/node_status_test.py +++ b/tests/rptest/tests/node_status_test.py @@ -129,10 +129,16 @@ def __init__(self, ctx): test_context=ctx, extra_rp_conf={"node_status_interval": NODE_STATUS_INTERVAL}) + def _update_max_backoff(self): + self.redpanda.set_cluster_config( + {"node_status_reconnect_max_backoff_ms": 5000}) + @cluster(num_nodes=3) def test_all_nodes_up(self): status_graph = StatusGraph(self.redpanda) status_graph.check_cluster_status() + self._update_max_backoff() + status_graph.check_cluster_status() @cluster(num_nodes=3) def test_node_down(self): From 4ca6fbfa846206e26c0e47aef3fe5efc3580dfc1 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Tue, 13 Jun 2023 11:25:18 -0700 Subject: [PATCH 4/4] ducktape/node_status: deflake test Wrap it in a waiter. With debug builds the backend can potentially take some additional time to reach the desired state, especially after invalidating all the transports after resetting a backoff. --- tests/rptest/tests/node_status_test.py | 37 ++++++++++++++++++-------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/tests/rptest/tests/node_status_test.py b/tests/rptest/tests/node_status_test.py index a37175b7af69..a3fd0bf77da0 100644 --- a/tests/rptest/tests/node_status_test.py +++ b/tests/rptest/tests/node_status_test.py @@ -77,9 +77,9 @@ def is_node_available(self, node: ClusterNode): vertex = self.node_to_vertex[node] return self.edges[vertex][vertex] == ConnectionStatus.ALIVE - def check_cluster_status(self): + def do_check_cluster_status(self): admin = Admin(self.redpanda) - + results = [] for node, peer, expected_status in self._all_edges(): if not self.is_node_available(node): # The starting node is unavailable so the request @@ -88,21 +88,36 @@ def check_cluster_status(self): self.redpanda.logger.debug( f"Checking status of peer {self.redpanda.idx(peer)} " - f"from node {self.redpanda.idx(node)}") + f"from node {self.redpanda.idx(node)}, expected status: {expected_status}" + ) peer_status = self._get_peer_status(admin, node, peer) if expected_status == ConnectionStatus.UNKNOWN: - assert peer_status is None, f"Expected no reponse from node {peer.name}" - if expected_status == ConnectionStatus.ALIVE: + results.append(peer_status is None) + + elif expected_status == ConnectionStatus.ALIVE: ms_since_last_status = peer_status["since_last_status"] - assert is_live( - ms_since_last_status - ), f"Expected node {peer.name} to be alive, but since_last_status > max_delta: ms_since_last_status={ms_since_last_status}, tolerance={MAX_DELTA}" + is_peer_live = is_live(ms_since_last_status) + self.redpanda.logger.debug( + f"Node {peer.name} expected status: alive, last status: {ms_since_last_status}, is live: {is_peer_live}" + ) + results.append(is_peer_live) + elif expected_status == ConnectionStatus.DOWN: ms_since_last_status = peer_status["since_last_status"] - assert not is_live( - ms_since_last_status - ), f"Expected node {peer.name} to be down, but ms_since_last_status <= max_delta: ms_since_last_status={ms_since_last_status}, tolerance={MAX_DELTA}" + is_not_live = not is_live(ms_since_last_status) + self.redpanda.logger.debug( + f"Node {peer.name} expected status: down, last status: {ms_since_last_status}, is not live: {is_not_live}" + ) + return all(results) + + def check_cluster_status(self): + self.redpanda.wait_until( + self.do_check_cluster_status, + timeout_sec=30, + backoff_sec=2, + err_msg= + "Node status across cluster nodes did not reach the desired state") def _all_edges(self): for start_vertex, end_vertex in np.ndindex(self.shape):