Skip to content

Commit

Permalink
Merge pull request #4449 from dotnwat/v22.1.x
Browse files Browse the repository at this point in the history
[v22.1.1] Collection of backports
  • Loading branch information
dotnwat committed Apr 27, 2022
2 parents c928976 + 453912f commit d7f12ad
Show file tree
Hide file tree
Showing 23 changed files with 332 additions and 94 deletions.
2 changes: 2 additions & 0 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ ss::future<> controller::start() {
std::ref(_raft_manager),
std::ref(_as),
std::ref(_storage_node),
std::ref(_drain_manager),
std::ref(_feature_table),
config::shard_local_cfg()
.storage_space_alert_free_threshold_bytes.bind(),
config::shard_local_cfg()
Expand Down
15 changes: 15 additions & 0 deletions src/v/cluster/drain_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,19 @@ ss::future<> drain_manager::do_restore() {
co_return;
}

std::ostream&
operator<<(std::ostream& os, const drain_manager::drain_status& ds) {
fmt::print(
os,
"{{finished: {}, errors: {}, partitions: {}, eligible: {}, transferring: "
"{}, failed: {}}}",
ds.finished,
ds.errors,
ds.partitions,
ds.eligible,
ds.transferring,
ds.failed);
return os;
}

} // namespace cluster
2 changes: 2 additions & 0 deletions src/v/cluster/drain_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,6 @@ class drain_manager : public ss::peering_sharded_service<drain_manager> {
ss::abort_source _abort;
};

std::ostream& operator<<(std::ostream&, const drain_manager::drain_status&);

} // namespace cluster
43 changes: 41 additions & 2 deletions src/v/cluster/health_monitor_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ health_monitor_backend::health_monitor_backend(
ss::sharded<raft::group_manager>& raft_manager,
ss::sharded<ss::abort_source>& as,
ss::sharded<storage::node_api>& storage_api,
ss::sharded<drain_manager>& drain_manager,
ss::sharded<feature_table>& feature_table,
config::binding<size_t> storage_min_bytes_threshold,
config::binding<unsigned> storage_min_percent_threshold)
: _raft0(std::move(raft0))
Expand All @@ -55,6 +57,8 @@ health_monitor_backend::health_monitor_backend(
, _partition_manager(partition_manager)
, _raft_manager(raft_manager)
, _as(as)
, _drain_manager(drain_manager)
, _feature_table(feature_table)
, _local_monitor(
std::move(storage_min_bytes_threshold),
std::move(storage_min_percent_threshold),
Expand Down Expand Up @@ -209,6 +213,10 @@ std::optional<node_health_report> health_monitor_backend::build_node_report(
report.topics = filter_topic_status(it->second.topics, f.ntp_filters);
}

report.drain_status = it->second.drain_status;
report.include_drain_status = _feature_table.local().is_active(
cluster::feature::maintenance_mode);

return report;
}

Expand Down Expand Up @@ -386,6 +394,17 @@ health_monitor_backend::get_cluster_health(
"requesing cluster state report with filter: {}, force refresh: {}",
filter,
refresh);
auto ec = co_await maybe_refresh_cluster_health(refresh, deadline);
if (ec) {
co_return ec;
}

co_return build_cluster_report(filter);
}

ss::future<std::error_code>
health_monitor_backend::maybe_refresh_cluster_health(
force_refresh refresh, model::timeout_clock::time_point deadline) {
auto const need_refresh = refresh
|| _last_refresh + max_metadata_age()
< ss::lowres_clock::now();
Expand All @@ -411,8 +430,7 @@ health_monitor_backend::get_cluster_health(
co_return errc::timeout;
}
}

co_return build_cluster_report(filter);
co_return errc::success;
}

cluster_health_report
Expand Down Expand Up @@ -576,6 +594,10 @@ health_monitor_backend::collect_current_node_health(node_report_filter filter) {
ret.local_state.logical_version
= feature_table::get_latest_logical_version();

ret.drain_status = co_await _drain_manager.local().status();
ret.include_drain_status = _feature_table.local().is_active(
cluster::feature::maintenance_mode);

if (filter.include_partitions) {
ret.topics = co_await collect_topic_status(
std::move(filter.ntp_filters));
Expand Down Expand Up @@ -674,4 +696,21 @@ std::chrono::milliseconds health_monitor_backend::max_metadata_age() {
return config::shard_local_cfg().health_monitor_max_metadata_age();
}

ss::future<result<std::optional<cluster::drain_manager::drain_status>>>
health_monitor_backend::get_node_drain_status(
model::node_id node_id, model::timeout_clock::time_point deadline) {
auto ec = co_await maybe_refresh_cluster_health(
force_refresh::no, deadline);
if (ec) {
co_return ec;
}

auto it = _reports.find(node_id);
if (it == _reports.end()) {
co_return errc::node_does_not_exists;
}

co_return it->second.drain_status;
}

} // namespace cluster
10 changes: 9 additions & 1 deletion src/v/cluster/health_monitor_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class health_monitor_backend {
ss::sharded<raft::group_manager>&,
ss::sharded<ss::abort_source>&,
ss::sharded<storage::node_api>&,
ss::sharded<drain_manager>&,
ss::sharded<feature_table>&,
config::binding<size_t> storage_min_bytes_threshold,
config::binding<unsigned> storage_min_percent_threshold);

Expand All @@ -71,6 +73,9 @@ class health_monitor_backend {
cluster::notification_id_type register_node_callback(health_node_cb_t cb);
void unregister_node_callback(cluster::notification_id_type id);

ss::future<result<std::optional<cluster::drain_manager::drain_status>>>
get_node_drain_status(model::node_id, model::timeout_clock::time_point);

private:
/**
* Struct used to track pending refresh request, it gives ability
Expand Down Expand Up @@ -109,7 +114,8 @@ class health_monitor_backend {
ss::future<> collect_cluster_health();
ss::future<result<node_health_report>>
collect_remote_node_health(model::node_id);

ss::future<std::error_code> maybe_refresh_cluster_health(
force_refresh, model::timeout_clock::time_point);
ss::future<std::error_code> refresh_cluster_health_cache(force_refresh);
ss::future<std::error_code>
dispatch_refresh_cluster_health_request(model::node_id);
Expand Down Expand Up @@ -140,6 +146,8 @@ class health_monitor_backend {
ss::sharded<partition_manager>& _partition_manager;
ss::sharded<raft::group_manager>& _raft_manager;
ss::sharded<ss::abort_source>& _as;
ss::sharded<drain_manager>& _drain_manager;
ss::sharded<feature_table>& _feature_table;

ss::lowres_clock::time_point _last_refresh;
ss::lw_shared_ptr<abortable_refresh_request> _refresh_request;
Expand Down
9 changes: 9 additions & 0 deletions src/v/cluster/health_monitor_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,13 @@ health_monitor_frontend::get_nodes_status(
});
});
}

ss::future<result<std::optional<cluster::drain_manager::drain_status>>>
health_monitor_frontend::get_node_drain_status(
model::node_id node_id, model::timeout_clock::time_point deadline) {
return dispatch_to_backend([node_id, deadline](health_monitor_backend& be) {
return be.get_node_drain_status(node_id, deadline);
});
}

} // namespace cluster
7 changes: 7 additions & 0 deletions src/v/cluster/health_monitor_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ class health_monitor_frontend {
ss::future<result<std::vector<node_state>>>
get_nodes_status(model::timeout_clock::time_point);

/**
* Return drain status for a given node.
*/
ss::future<result<std::optional<cluster::drain_manager::drain_status>>>
get_node_drain_status(
model::node_id node_id, model::timeout_clock::time_point deadline);

private:
template<typename Func>
auto dispatch_to_backend(Func&& f) {
Expand Down
43 changes: 32 additions & 11 deletions src/v/cluster/health_monitor_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "cluster/health_monitor_types.h"

#include "cluster/errc.h"
#include "cluster/feature_table.h"
#include "cluster/node/types.h"
#include "model/adl_serde.h"
#include "utils/to_string.h"
Expand Down Expand Up @@ -61,13 +62,14 @@ std::ostream& operator<<(std::ostream& o, const node_health_report& r) {
fmt::print(
o,
"{{id: {}, disks: {}, topics: {}, redpanda_version: {}, uptime: "
"{}, logical_version: {}}}",
"{}, logical_version: {}, drain_status: {}}}",
r.id,
r.local_state.disks,
r.topics,
r.local_state.redpanda_version,
r.local_state.uptime,
r.local_state.logical_version);
r.local_state.logical_version,
r.drain_status);
return o;
}

Expand Down Expand Up @@ -228,15 +230,28 @@ cluster::topic_status adl<cluster::topic_status>::from(iobuf_parser& p) {

void adl<cluster::node_health_report>::to(
iobuf& out, cluster::node_health_report&& r) {
reflection::serialize(
out,
r.current_version,
r.id,
std::move(r.local_state.redpanda_version),
r.local_state.uptime,
std::move(r.local_state.disks),
std::move(r.topics),
std::move(r.local_state.logical_version));
if (r.include_drain_status) {
reflection::serialize(
out,
cluster::node_health_report::current_version,
r.id,
std::move(r.local_state.redpanda_version),
r.local_state.uptime,
std::move(r.local_state.disks),
std::move(r.topics),
std::move(r.local_state.logical_version),
std::move(r.drain_status));
} else {
reflection::serialize(
out,
static_cast<int8_t>(1), // version right before maintenance mode added
r.id,
std::move(r.local_state.redpanda_version),
r.local_state.uptime,
std::move(r.local_state.disks),
std::move(r.topics),
std::move(r.local_state.logical_version));
}
}

cluster::node_health_report
Expand All @@ -253,12 +268,18 @@ adl<cluster::node_health_report>::from(iobuf_parser& p) {
if (version >= 1) {
logical_version = adl<cluster::cluster_version>{}.from(p);
}
std::optional<cluster::drain_manager::drain_status> drain_status;
if (version >= 2) {
drain_status
= adl<std::optional<cluster::drain_manager::drain_status>>{}.from(p);
}

return cluster::node_health_report{
.id = id,
.local_state
= {.redpanda_version = std::move(redpanda_version), .logical_version = std::move(logical_version), .uptime = uptime, .disks = std::move(disks)},
.topics = std::move(topics),
.drain_status = drain_status,
};
}

Expand Down
32 changes: 31 additions & 1 deletion src/v/cluster/health_monitor_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
#pragma once
#include "bytes/iobuf_parser.h"
#include "cluster/drain_manager.h"
#include "cluster/errc.h"
#include "cluster/node/types.h"
#include "cluster/types.h"
Expand Down Expand Up @@ -80,12 +81,41 @@ struct topic_status {
* instance of time
*/
struct node_health_report {
static constexpr int8_t current_version = 1;
static constexpr int8_t current_version = 2;

model::node_id id;
node::local_state local_state;
std::vector<topic_status> topics;

/*
* nodes running old versions of redpanda will assert that they can decode
* a message they receive by requiring the encoded version to be <= to the
* latest that that node understands.
*
* when drain_status is added the version is bumped, which means that older
* nodes will crash if they try to decode such a message. this is common for
* many places in the code base, but node_health_report makes this problem
* particularly acute because nodes are polled automatically at a regular,
* short interval.
*
* one solution is to make the feature table available in free functions so
* that we can use it to query about maintenance mode cluster support in
* adl<T>. unfortunately that won't work well in our mult-node unit tests
* because thread_local references to the feature service will be clobbered.
*
* another option would be to add a constructor to node_health_report so
* that when a report was created we could record a `serialized_as` version
* and query the feature table at the call site. this doesn't work well
* because reflection/adl needs types to be default-constructable.
*
* the final solution, which isn't a panacea, is do the equivalent of the
* ctor trick described above but with a flag. it's not a universal solution
* because devs need to be aware and handle this manually. fortunately there
* is only one or two places where we create this object.
*/
bool include_drain_status{false}; // not serialized
std::optional<drain_manager::drain_status> drain_status;

friend std::ostream& operator<<(std::ostream&, const node_health_report&);
};

Expand Down
9 changes: 4 additions & 5 deletions src/v/cluster/id_allocator_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,12 @@ id_allocator_frontend::allocate_id(model::timeout_clock::duration timeout) {

auto _self = _controller->self();

auto r = allocate_id_reply{0, errc::no_leader_controller};
auto r = allocate_id_reply{0, errc::not_leader};

auto retries = _metadata_dissemination_retries;
auto delay_ms = _metadata_dissemination_retry_delay_ms;
auto aborted = false;
std::optional<std::string> error;
while (!aborted && 0 < retries--) {
while (!_as.abort_requested() && 0 < retries--) {
auto leader_opt = _leaders.local().get_leader(model::id_allocator_ntp);
if (unlikely(!leader_opt)) {
error = vformat(
Expand All @@ -88,7 +87,7 @@ id_allocator_frontend::allocate_id(model::timeout_clock::duration timeout) {
"waiting for {} to fill leaders cache, retries left: {}",
model::id_allocator_ntp,
retries);
aborted = !co_await sleep_abortable(delay_ms, _as);
co_await sleep_abortable(delay_ms, _as);
continue;
}
auto leader = leader_opt.value();
Expand Down Expand Up @@ -117,7 +116,7 @@ id_allocator_frontend::allocate_id(model::timeout_clock::duration timeout) {
error = vformat("id allocation failed with {}", r.ec);
vlog(
clusterlog.trace, "id allocation failed, retries left: {}", retries);
aborted = !co_await sleep_abortable(delay_ms, _as);
co_await sleep_abortable(delay_ms, _as);
}

if (error) {
Expand Down
10 changes: 5 additions & 5 deletions src/v/cluster/metadata_dissemination_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,15 @@ metadata_dissemination_service::dispatch_get_metadata_update(
void metadata_dissemination_service::collect_pending_updates() {
auto brokers = _members_table.local().all_broker_ids();
for (auto& ntp_leader : _requests) {
auto tp_md = _topics.local().get_topic_metadata(
model::topic_namespace_view(ntp_leader.ntp));
auto assignment = _topics.local().get_partition_assignment(
ntp_leader.ntp);

if (!tp_md) {
// Topic metadata is not there anymore, partition was removed
if (!assignment) {
// Partition was removed, skip dissemination
continue;
}
auto non_overlapping = calculate_non_overlapping_nodes(
get_partition_members(ntp_leader.ntp.tp.partition, *tp_md), brokers);
*assignment, brokers);

/**
* remove current node from non overlapping list, current node may be
Expand Down
Loading

0 comments on commit d7f12ad

Please sign in to comment.