diff --git a/src/v/cluster/cluster_utils.cc b/src/v/cluster/cluster_utils.cc index 6ed6dfeb3ea4..7e6340bce467 100644 --- a/src/v/cluster/cluster_utils.cc +++ b/src/v/cluster/cluster_utils.cc @@ -126,7 +126,8 @@ ss::future<> maybe_create_tcp_client( rpc::connection_cache& cache, model::node_id node, net::unresolved_address rpc_address, - config::tls_config tls_config) { + config::tls_config tls_config, + rpc::backoff_policy backoff) { auto f = ss::now(); if (cache.contains(node)) { // client is already there, check if configuration changed @@ -141,11 +142,15 @@ ss::future<> maybe_create_tcp_client( return f.then([&cache, node, rpc_address = std::move(rpc_address), - tls_config = std::move(tls_config)]() mutable { + tls_config = std::move(tls_config), + backoff = std::move(backoff)]() mutable { return maybe_build_reloadable_certificate_credentials( std::move(tls_config)) .then( - [&cache, node, rpc_address = std::move(rpc_address)]( + [&cache, + node, + rpc_address = std::move(rpc_address), + backoff = std::move(backoff)]( ss::shared_ptr&& cert) mutable { return cache.emplace( node, @@ -155,8 +160,7 @@ ss::future<> maybe_create_tcp_client( .disable_metrics = net::metrics_disabled( config::shard_local_cfg().disable_metrics), .version = cache.get_default_transport_version()}, - rpc::make_exponential_backoff_policy( - std::chrono::seconds(1), std::chrono::seconds(15))); + std::move(backoff)); }); }); } @@ -166,13 +170,20 @@ ss::future<> add_one_tcp_client( ss::sharded& clients, model::node_id node, net::unresolved_address addr, - config::tls_config tls_config) { + config::tls_config tls_config, + rpc::backoff_policy backoff) { return clients.invoke_on( owner, - [node, rpc_address = std::move(addr), tls_config = std::move(tls_config)]( - rpc::connection_cache& cache) mutable { + [node, + rpc_address = std::move(addr), + tls_config = std::move(tls_config), + backoff = std::move(backoff)](rpc::connection_cache& cache) mutable { return maybe_create_tcp_client( - cache, node, std::move(rpc_address), std::move(tls_config)); + cache, + node, + std::move(rpc_address), + std::move(tls_config), + std::move(backoff)); }); } diff --git a/src/v/cluster/cluster_utils.h b/src/v/cluster/cluster_utils.h index a769585927f6..4932a4b4b39f 100644 --- a/src/v/cluster/cluster_utils.h +++ b/src/v/cluster/cluster_utils.h @@ -91,12 +91,18 @@ inline std::vector create_topic_results( }); } +inline rpc::backoff_policy default_backoff_policy() { + return rpc::make_exponential_backoff_policy( + std::chrono::seconds(1), std::chrono::seconds(15)); +} + ss::future<> add_one_tcp_client( ss::shard_id owner, ss::sharded& clients, model::node_id node, net::unresolved_address addr, - config::tls_config tls_config); + config::tls_config tls_config, + rpc::backoff_policy backoff_policy = default_backoff_policy()); ss::future<> update_broker_client( model::node_id, diff --git a/src/v/cluster/node_status_backend.cc b/src/v/cluster/node_status_backend.cc index ad0b6d09b64e..b8b84d216a26 100644 --- a/src/v/cluster/node_status_backend.cc +++ b/src/v/cluster/node_status_backend.cc @@ -34,12 +34,14 @@ node_status_backend::node_status_backend( ss::sharded& feature_table, ss::sharded& node_status_table, config::binding period, + config::binding max_reconnect_backoff, ss::sharded& as) : _self(self) , _members_table(members_table) , _feature_table(feature_table) , _node_status_table(node_status_table) , _period(std::move(period)) + , _max_reconnect_backoff(std::move(max_reconnect_backoff)) , _rpc_tls_config(config::node().rpc_server_tls()) , _as(as) { if (!config::shard_local_cfg().disable_public_metrics()) { @@ -61,6 +63,20 @@ node_status_backend::node_status_backend( } }); + _max_reconnect_backoff.watch([this]() { + ssx::spawn_with_gate(_gate, [this] { + auto new_backoff = _max_reconnect_backoff(); + vlog( + clusterlog.info, + "Updating max reconnect backoff to: {}ms", + new_backoff.count()); + // Invalidate all transports so they are created afresh with new + // backoff. + return _node_connection_cache.invoke_on_all( + [](rpc::connection_cache& local) { return local.remove_all(); }); + }); + }); + _timer.set_callback([this] { tick(); }); } @@ -212,12 +228,17 @@ ss::future> node_status_backend::send_node_status_request( ss::future node_status_backend::maybe_create_client( model::node_id target, net::unresolved_address address) { - auto source_shard = target % ss::smp::count; + auto source_shard = connection_source_shard(target); auto target_shard = rpc::connection_cache::shard_for( _self, source_shard, target); co_await add_one_tcp_client( - target_shard, _node_connection_cache, target, address, _rpc_tls_config); + target_shard, + _node_connection_cache, + target, + address, + _rpc_tls_config, + create_backoff_policy()); co_return source_shard; } @@ -253,11 +274,21 @@ node_status_backend::process_reply(result reply) { } ss::future -node_status_backend::process_request(node_status_request) { +node_status_backend::process_request(node_status_request request) { _stats.rpcs_received += 1; - node_status_reply reply = {.replier_metadata = {.node_id = _self}}; - return ss::make_ready_future(std::move(reply)); + auto sender = request.sender_metadata.node_id; + auto sender_md = _node_status_table.local().get_node_status(sender); + if ( + sender_md + // Check if the peer has atleast 2 missed heart beats. This avoids + // a cross shard invoke in happy path when no reset is needed. + && ss::lowres_clock::now() - sender_md->last_seen > 2 * _period()) { + co_await _node_connection_cache.local().reset_client_backoff( + _self, connection_source_shard(sender), sender); + } + + co_return node_status_reply{.replier_metadata = {.node_id = _self}}; } void node_status_backend::setup_metrics( diff --git a/src/v/cluster/node_status_backend.h b/src/v/cluster/node_status_backend.h index 5ab31cc1a619..19cee3cac5cb 100644 --- a/src/v/cluster/node_status_backend.h +++ b/src/v/cluster/node_status_backend.h @@ -54,7 +54,8 @@ class node_status_backend { ss::sharded&, ss::sharded&, ss::sharded&, - config::binding, + config::binding /* period*/, + config::binding /* max_backoff*/, ss::sharded&); ss::future<> start(); @@ -88,12 +89,23 @@ class node_status_backend { }; private: + ss::shard_id connection_source_shard(model::node_id target) const { + return target % ss::smp::count; + } + + rpc::backoff_policy create_backoff_policy() const { + static constexpr auto default_backoff_base = 1000ms; + return rpc::make_exponential_backoff_policy( + std::min(default_backoff_base, _max_reconnect_backoff()), + _max_reconnect_backoff()); + } model::node_id _self; ss::sharded& _members_table; ss::sharded& _feature_table; ss::sharded& _node_status_table; config::binding _period; + config::binding _max_reconnect_backoff; config::tls_config _rpc_tls_config; ss::sharded _node_connection_cache; diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 937eefebaa40..dddea31ca309 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -1925,6 +1925,13 @@ configuration::configuration() "establish liveness status outside of the Raft protocol.", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, 100ms) + , node_status_reconnect_max_backoff_ms( + *this, + "node_status_reconnect_max_backoff_ms", + "Maximum backoff (in ms) to reconnect to an unresponsive peer during " + "node status liveness checks.", + {.needs_restart = needs_restart::no, .visibility = visibility::user}, + 15s) , enable_controller_log_rate_limiting( *this, "enable_controller_log_rate_limiting", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 47b227749c69..994dbe96eb59 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -388,6 +388,7 @@ struct configuration final : public config_store { property enable_rack_awareness; property node_status_interval; + property node_status_reconnect_max_backoff_ms; // controller log limitng property enable_controller_log_rate_limiting; property rps_limit_topic_operations; diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index f7c5dda83d1b..66317b13e4b1 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -1241,6 +1241,10 @@ void application::wire_up_redpanda_services(model::node_id node_id) { std::ref(node_status_table), ss::sharded_parameter( [] { return config::shard_local_cfg().node_status_interval.bind(); }), + ss::sharded_parameter([] { + return config::shard_local_cfg() + .node_status_reconnect_max_backoff_ms.bind(); + }), std::ref(_as)) .get(); diff --git a/src/v/rpc/connection_cache.cc b/src/v/rpc/connection_cache.cc index 264631ef2bee..d040e91b0a59 100644 --- a/src/v/rpc/connection_cache.cc +++ b/src/v/rpc/connection_cache.cc @@ -63,6 +63,16 @@ ss::future<> connection_cache::remove(model::node_id n) { }); } +ss::future<> connection_cache::remove_all() { + auto units = co_await _mutex.get_units(); + auto cache = std::exchange(_cache, {}); + co_await parallel_for_each(cache, [](auto& it) { + auto& [_, cli] = it; + return cli->stop(); + }); + cache.clear(); +} + /// \brief closes all client connections ss::future<> connection_cache::do_shutdown() { auto units = co_await _mutex.get_units(); diff --git a/src/v/rpc/connection_cache.h b/src/v/rpc/connection_cache.h index ef968029cd4e..5e3e033de1a3 100644 --- a/src/v/rpc/connection_cache.h +++ b/src/v/rpc/connection_cache.h @@ -57,6 +57,9 @@ class connection_cache final /// \brief removes the node *and* closes the connection ss::future<> remove(model::node_id n); + /// \brief similar to remove but removes all nodes. + ss::future<> remove_all(); + /// \brief closes all connections ss::future<> do_shutdown(); void shutdown(); diff --git a/tests/rptest/tests/node_status_test.py b/tests/rptest/tests/node_status_test.py index a1bb3ef0591f..a3fd0bf77da0 100644 --- a/tests/rptest/tests/node_status_test.py +++ b/tests/rptest/tests/node_status_test.py @@ -77,9 +77,9 @@ def is_node_available(self, node: ClusterNode): vertex = self.node_to_vertex[node] return self.edges[vertex][vertex] == ConnectionStatus.ALIVE - def check_cluster_status(self): + def do_check_cluster_status(self): admin = Admin(self.redpanda) - + results = [] for node, peer, expected_status in self._all_edges(): if not self.is_node_available(node): # The starting node is unavailable so the request @@ -88,21 +88,36 @@ def check_cluster_status(self): self.redpanda.logger.debug( f"Checking status of peer {self.redpanda.idx(peer)} " - f"from node {self.redpanda.idx(node)}") + f"from node {self.redpanda.idx(node)}, expected status: {expected_status}" + ) peer_status = self._get_peer_status(admin, node, peer) if expected_status == ConnectionStatus.UNKNOWN: - assert peer_status is None, f"Expected no reponse from node {peer.name}" - if expected_status == ConnectionStatus.ALIVE: + results.append(peer_status is None) + + elif expected_status == ConnectionStatus.ALIVE: ms_since_last_status = peer_status["since_last_status"] - assert is_live( - ms_since_last_status - ), f"Expected node {peer.name} to be alive, but since_last_status > max_delta: ms_since_last_status={ms_since_last_status}, tolerance={MAX_DELTA}" + is_peer_live = is_live(ms_since_last_status) + self.redpanda.logger.debug( + f"Node {peer.name} expected status: alive, last status: {ms_since_last_status}, is live: {is_peer_live}" + ) + results.append(is_peer_live) + elif expected_status == ConnectionStatus.DOWN: ms_since_last_status = peer_status["since_last_status"] - assert not is_live( - ms_since_last_status - ), f"Expected node {peer.name} to be down, but ms_since_last_status <= max_delta: ms_since_last_status={ms_since_last_status}, tolerance={MAX_DELTA}" + is_not_live = not is_live(ms_since_last_status) + self.redpanda.logger.debug( + f"Node {peer.name} expected status: down, last status: {ms_since_last_status}, is not live: {is_not_live}" + ) + return all(results) + + def check_cluster_status(self): + self.redpanda.wait_until( + self.do_check_cluster_status, + timeout_sec=30, + backoff_sec=2, + err_msg= + "Node status across cluster nodes did not reach the desired state") def _all_edges(self): for start_vertex, end_vertex in np.ndindex(self.shape): @@ -129,10 +144,16 @@ def __init__(self, ctx): test_context=ctx, extra_rp_conf={"node_status_interval": NODE_STATUS_INTERVAL}) + def _update_max_backoff(self): + self.redpanda.set_cluster_config( + {"node_status_reconnect_max_backoff_ms": 5000}) + @cluster(num_nodes=3) def test_all_nodes_up(self): status_graph = StatusGraph(self.redpanda) status_graph.check_cluster_status() + self._update_max_backoff() + status_graph.check_cluster_status() @cluster(num_nodes=3) def test_node_down(self):