Skip to content

Commit

Permalink
Merge pull request redpanda-data#11342 from bharathv/rack_aw
Browse files Browse the repository at this point in the history
node_status_backend: reset backoff on peer checkin
  • Loading branch information
vshtokman committed Jun 14, 2023
2 parents 7673007 + 4ca6fbf commit 0ad8dd3
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 27 deletions.
29 changes: 20 additions & 9 deletions src/v/cluster/cluster_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ ss::future<> maybe_create_tcp_client(
rpc::connection_cache& cache,
model::node_id node,
net::unresolved_address rpc_address,
config::tls_config tls_config) {
config::tls_config tls_config,
rpc::backoff_policy backoff) {
auto f = ss::now();
if (cache.contains(node)) {
// client is already there, check if configuration changed
Expand All @@ -141,11 +142,15 @@ ss::future<> maybe_create_tcp_client(
return f.then([&cache,
node,
rpc_address = std::move(rpc_address),
tls_config = std::move(tls_config)]() mutable {
tls_config = std::move(tls_config),
backoff = std::move(backoff)]() mutable {
return maybe_build_reloadable_certificate_credentials(
std::move(tls_config))
.then(
[&cache, node, rpc_address = std::move(rpc_address)](
[&cache,
node,
rpc_address = std::move(rpc_address),
backoff = std::move(backoff)](
ss::shared_ptr<ss::tls::certificate_credentials>&& cert) mutable {
return cache.emplace(
node,
Expand All @@ -155,8 +160,7 @@ ss::future<> maybe_create_tcp_client(
.disable_metrics = net::metrics_disabled(
config::shard_local_cfg().disable_metrics),
.version = cache.get_default_transport_version()},
rpc::make_exponential_backoff_policy<rpc::clock_type>(
std::chrono::seconds(1), std::chrono::seconds(15)));
std::move(backoff));
});
});
}
Expand All @@ -166,13 +170,20 @@ ss::future<> add_one_tcp_client(
ss::sharded<rpc::connection_cache>& clients,
model::node_id node,
net::unresolved_address addr,
config::tls_config tls_config) {
config::tls_config tls_config,
rpc::backoff_policy backoff) {
return clients.invoke_on(
owner,
[node, rpc_address = std::move(addr), tls_config = std::move(tls_config)](
rpc::connection_cache& cache) mutable {
[node,
rpc_address = std::move(addr),
tls_config = std::move(tls_config),
backoff = std::move(backoff)](rpc::connection_cache& cache) mutable {
return maybe_create_tcp_client(
cache, node, std::move(rpc_address), std::move(tls_config));
cache,
node,
std::move(rpc_address),
std::move(tls_config),
std::move(backoff));
});
}

Expand Down
8 changes: 7 additions & 1 deletion src/v/cluster/cluster_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,18 @@ inline std::vector<topic_result> create_topic_results(
});
}

inline rpc::backoff_policy default_backoff_policy() {
return rpc::make_exponential_backoff_policy<rpc::clock_type>(
std::chrono::seconds(1), std::chrono::seconds(15));
}

ss::future<> add_one_tcp_client(
ss::shard_id owner,
ss::sharded<rpc::connection_cache>& clients,
model::node_id node,
net::unresolved_address addr,
config::tls_config tls_config);
config::tls_config tls_config,
rpc::backoff_policy backoff_policy = default_backoff_policy());

ss::future<> update_broker_client(
model::node_id,
Expand Down
41 changes: 36 additions & 5 deletions src/v/cluster/node_status_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ node_status_backend::node_status_backend(
ss::sharded<features::feature_table>& feature_table,
ss::sharded<node_status_table>& node_status_table,
config::binding<std::chrono::milliseconds> period,
config::binding<std::chrono::milliseconds> max_reconnect_backoff,
ss::sharded<ss::abort_source>& as)
: _self(self)
, _members_table(members_table)
, _feature_table(feature_table)
, _node_status_table(node_status_table)
, _period(std::move(period))
, _max_reconnect_backoff(std::move(max_reconnect_backoff))
, _rpc_tls_config(config::node().rpc_server_tls())
, _as(as) {
if (!config::shard_local_cfg().disable_public_metrics()) {
Expand All @@ -61,6 +63,20 @@ node_status_backend::node_status_backend(
}
});

_max_reconnect_backoff.watch([this]() {
ssx::spawn_with_gate(_gate, [this] {
auto new_backoff = _max_reconnect_backoff();
vlog(
clusterlog.info,
"Updating max reconnect backoff to: {}ms",
new_backoff.count());
// Invalidate all transports so they are created afresh with new
// backoff.
return _node_connection_cache.invoke_on_all(
[](rpc::connection_cache& local) { return local.remove_all(); });
});
});

_timer.set_callback([this] { tick(); });
}

Expand Down Expand Up @@ -212,12 +228,17 @@ ss::future<result<node_status>> node_status_backend::send_node_status_request(

ss::future<ss::shard_id> node_status_backend::maybe_create_client(
model::node_id target, net::unresolved_address address) {
auto source_shard = target % ss::smp::count;
auto source_shard = connection_source_shard(target);
auto target_shard = rpc::connection_cache::shard_for(
_self, source_shard, target);

co_await add_one_tcp_client(
target_shard, _node_connection_cache, target, address, _rpc_tls_config);
target_shard,
_node_connection_cache,
target,
address,
_rpc_tls_config,
create_backoff_policy());

co_return source_shard;
}
Expand Down Expand Up @@ -253,11 +274,21 @@ node_status_backend::process_reply(result<node_status_reply> reply) {
}

ss::future<node_status_reply>
node_status_backend::process_request(node_status_request) {
node_status_backend::process_request(node_status_request request) {
_stats.rpcs_received += 1;

node_status_reply reply = {.replier_metadata = {.node_id = _self}};
return ss::make_ready_future<node_status_reply>(std::move(reply));
auto sender = request.sender_metadata.node_id;
auto sender_md = _node_status_table.local().get_node_status(sender);
if (
sender_md
// Check if the peer has atleast 2 missed heart beats. This avoids
// a cross shard invoke in happy path when no reset is needed.
&& ss::lowres_clock::now() - sender_md->last_seen > 2 * _period()) {
co_await _node_connection_cache.local().reset_client_backoff(
_self, connection_source_shard(sender), sender);
}

co_return node_status_reply{.replier_metadata = {.node_id = _self}};
}

void node_status_backend::setup_metrics(
Expand Down
14 changes: 13 additions & 1 deletion src/v/cluster/node_status_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class node_status_backend {
ss::sharded<members_table>&,
ss::sharded<features::feature_table>&,
ss::sharded<node_status_table>&,
config::binding<std::chrono::milliseconds>,
config::binding<std::chrono::milliseconds> /* period*/,
config::binding<std::chrono::milliseconds> /* max_backoff*/,
ss::sharded<ss::abort_source>&);

ss::future<> start();
Expand Down Expand Up @@ -88,12 +89,23 @@ class node_status_backend {
};

private:
ss::shard_id connection_source_shard(model::node_id target) const {
return target % ss::smp::count;
}

rpc::backoff_policy create_backoff_policy() const {
static constexpr auto default_backoff_base = 1000ms;
return rpc::make_exponential_backoff_policy<rpc::backoff_policy>(
std::min(default_backoff_base, _max_reconnect_backoff()),
_max_reconnect_backoff());
}
model::node_id _self;
ss::sharded<members_table>& _members_table;
ss::sharded<features::feature_table>& _feature_table;
ss::sharded<node_status_table>& _node_status_table;

config::binding<std::chrono::milliseconds> _period;
config::binding<std::chrono::milliseconds> _max_reconnect_backoff;
config::tls_config _rpc_tls_config;
ss::sharded<rpc::connection_cache> _node_connection_cache;

Expand Down
7 changes: 7 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1925,6 +1925,13 @@ configuration::configuration()
"establish liveness status outside of the Raft protocol.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
100ms)
, node_status_reconnect_max_backoff_ms(
*this,
"node_status_reconnect_max_backoff_ms",
"Maximum backoff (in ms) to reconnect to an unresponsive peer during "
"node status liveness checks.",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
15s)
, enable_controller_log_rate_limiting(
*this,
"enable_controller_log_rate_limiting",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ struct configuration final : public config_store {
property<bool> enable_rack_awareness;

property<std::chrono::milliseconds> node_status_interval;
property<std::chrono::milliseconds> node_status_reconnect_max_backoff_ms;
// controller log limitng
property<bool> enable_controller_log_rate_limiting;
property<size_t> rps_limit_topic_operations;
Expand Down
4 changes: 4 additions & 0 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,10 @@ void application::wire_up_redpanda_services(model::node_id node_id) {
std::ref(node_status_table),
ss::sharded_parameter(
[] { return config::shard_local_cfg().node_status_interval.bind(); }),
ss::sharded_parameter([] {
return config::shard_local_cfg()
.node_status_reconnect_max_backoff_ms.bind();
}),
std::ref(_as))
.get();

Expand Down
10 changes: 10 additions & 0 deletions src/v/rpc/connection_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ ss::future<> connection_cache::remove(model::node_id n) {
});
}

ss::future<> connection_cache::remove_all() {
auto units = co_await _mutex.get_units();
auto cache = std::exchange(_cache, {});
co_await parallel_for_each(cache, [](auto& it) {
auto& [_, cli] = it;
return cli->stop();
});
cache.clear();
}

/// \brief closes all client connections
ss::future<> connection_cache::do_shutdown() {
auto units = co_await _mutex.get_units();
Expand Down
3 changes: 3 additions & 0 deletions src/v/rpc/connection_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class connection_cache final
/// \brief removes the node *and* closes the connection
ss::future<> remove(model::node_id n);

/// \brief similar to remove but removes all nodes.
ss::future<> remove_all();

/// \brief closes all connections
ss::future<> do_shutdown();
void shutdown();
Expand Down
43 changes: 32 additions & 11 deletions tests/rptest/tests/node_status_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ def is_node_available(self, node: ClusterNode):
vertex = self.node_to_vertex[node]
return self.edges[vertex][vertex] == ConnectionStatus.ALIVE

def check_cluster_status(self):
def do_check_cluster_status(self):
admin = Admin(self.redpanda)

results = []
for node, peer, expected_status in self._all_edges():
if not self.is_node_available(node):
# The starting node is unavailable so the request
Expand All @@ -88,21 +88,36 @@ def check_cluster_status(self):

self.redpanda.logger.debug(
f"Checking status of peer {self.redpanda.idx(peer)} "
f"from node {self.redpanda.idx(node)}")
f"from node {self.redpanda.idx(node)}, expected status: {expected_status}"
)
peer_status = self._get_peer_status(admin, node, peer)

if expected_status == ConnectionStatus.UNKNOWN:
assert peer_status is None, f"Expected no reponse from node {peer.name}"
if expected_status == ConnectionStatus.ALIVE:
results.append(peer_status is None)

elif expected_status == ConnectionStatus.ALIVE:
ms_since_last_status = peer_status["since_last_status"]
assert is_live(
ms_since_last_status
), f"Expected node {peer.name} to be alive, but since_last_status > max_delta: ms_since_last_status={ms_since_last_status}, tolerance={MAX_DELTA}"
is_peer_live = is_live(ms_since_last_status)
self.redpanda.logger.debug(
f"Node {peer.name} expected status: alive, last status: {ms_since_last_status}, is live: {is_peer_live}"
)
results.append(is_peer_live)

elif expected_status == ConnectionStatus.DOWN:
ms_since_last_status = peer_status["since_last_status"]
assert not is_live(
ms_since_last_status
), f"Expected node {peer.name} to be down, but ms_since_last_status <= max_delta: ms_since_last_status={ms_since_last_status}, tolerance={MAX_DELTA}"
is_not_live = not is_live(ms_since_last_status)
self.redpanda.logger.debug(
f"Node {peer.name} expected status: down, last status: {ms_since_last_status}, is not live: {is_not_live}"
)
return all(results)

def check_cluster_status(self):
self.redpanda.wait_until(
self.do_check_cluster_status,
timeout_sec=30,
backoff_sec=2,
err_msg=
"Node status across cluster nodes did not reach the desired state")

def _all_edges(self):
for start_vertex, end_vertex in np.ndindex(self.shape):
Expand All @@ -129,10 +144,16 @@ def __init__(self, ctx):
test_context=ctx,
extra_rp_conf={"node_status_interval": NODE_STATUS_INTERVAL})

def _update_max_backoff(self):
self.redpanda.set_cluster_config(
{"node_status_reconnect_max_backoff_ms": 5000})

@cluster(num_nodes=3)
def test_all_nodes_up(self):
status_graph = StatusGraph(self.redpanda)
status_graph.check_cluster_status()
self._update_max_backoff()
status_graph.check_cluster_status()

@cluster(num_nodes=3)
def test_node_down(self):
Expand Down

0 comments on commit 0ad8dd3

Please sign in to comment.