Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backport] [v23.1.x] More detailed partition reconfiguration tracking #10201 #10630

Merged
merged 7 commits into from
May 10, 2023
23 changes: 23 additions & 0 deletions src/v/cluster/cluster_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,29 @@ inline bool has_non_replicable_op_type(const topic_table_delta& d) {
}
__builtin_unreachable();
}

inline std::vector<model::broker_shard> union_replica_sets(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is actually needed by the previous commit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought no one would notice it in a backport :-P

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha 😁

const std::vector<model::broker_shard>& lhs,
const std::vector<model::broker_shard>& rhs) {
std::vector<model::broker_shard> ret;
// Inefficient but constant time for small replica sets.
std::copy_if(
lhs.begin(),
lhs.end(),
std::back_inserter(ret),
[&ret](const model::broker_shard& bs) {
return std::find(ret.begin(), ret.end(), bs) == ret.end();
});
std::copy_if(
rhs.begin(),
rhs.end(),
std::back_inserter(ret),
[&ret](const model::broker_shard& bs) {
return std::find(ret.begin(), ret.end(), bs) == ret.end();
});
return ret;
}

/**
* Subtracts second replica set from the first one. Result contains only brokers
* shards that are present in first replica set but not in the second one.
Expand Down
59 changes: 59 additions & 0 deletions src/v/cluster/controller_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
#include "model/metadata.h"
#include "model/timeout_clock.h"
#include "rpc/connection_cache.h"
#include "ssx/future-util.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/future.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/sleep.hh>
#include <seastar/coroutine/maybe_yield.hh>

#include <absl/container/node_hash_map.h>

Expand Down Expand Up @@ -176,6 +178,9 @@ controller_api::get_reconciliation_state(model::ntp ntp) {
.source_shard = shard,
.p_as = std::move(m.delta.new_assignment),
.type = m.delta.type,
.current_retry = m.retries,
.last_operation_result = m.last_error,
.revision_of_operation = model::revision_id(m.delta.offset),
};
});
}
Expand Down Expand Up @@ -452,4 +457,58 @@ controller_api::shard_for(const model::ntp& ntp) const {
return _shard_table.local().shard_for(ntp);
}

ss::future<global_reconciliation_state>
controller_api::get_global_reconciliation_state(
std::vector<model::ntp> ntps, model::timeout_clock::time_point timeout) {
// step is to gather per node requests for each of the node
absl::node_hash_map<model::node_id, std::vector<model::ntp>> grouped_ntps;
const auto ntps_sz = ntps.size();

for (auto& ntp : ntps) {
auto it = _topics.local().updates_in_progress().find(ntp);
if (it == _topics.local().updates_in_progress().end()) {
// not longer updating
continue;
}
auto all_replicas = union_replica_sets(
it->second.get_previous_replicas(), it->second.get_target_replicas());
for (const auto& r : all_replicas) {
grouped_ntps[r.node_id].push_back(ntp);
}
co_await ss::coroutine::maybe_yield();
}

using ret_t = result<std::vector<ntp_reconciliation_state>>;

auto node_results = co_await ssx::parallel_transform(
std::move(grouped_ntps), [this, timeout](auto pair) {
if (pair.first == _self) {
return get_reconciliation_state(std::move(pair.second))
.then([id = pair.first](ret_t ret) {
return std::make_pair(id, std::move(ret));
});
}
return get_reconciliation_state(
pair.first, std::move(pair.second), timeout)
.then([id = pair.first](ret_t ret) {
return std::make_pair(id, std::move(ret));
});
});

global_reconciliation_state state;
state.ntp_backend_operations.reserve(ntps_sz);
for (auto& [node_id, result] : node_results) {
if (result.has_error()) {
state.node_errors.emplace_back(node_id, result.error());
continue;
}
for (auto& ntp_state : result.value()) {
state.ntp_backend_operations[ntp_state.ntp()].emplace_back(
node_id, std::move(ntp_state.pending_operations()));
}
co_await ss::coroutine::maybe_yield();
}
co_return state;
}

} // namespace cluster
8 changes: 8 additions & 0 deletions src/v/cluster/controller_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ class controller_api {
ss::future<result<std::vector<partition_reconfiguration_state>>>
get_partitions_reconfiguration_state(
std::vector<model::ntp>, model::timeout_clock::time_point);
/**
* Returns state of controller backend from each node in the cluster for
* requested list of ntps. A global reconciliation status contains a state
* of reconciliation loop execution from every replica that is part of an
* ntp replica set.
*/
ss::future<global_reconciliation_state> get_global_reconciliation_state(
std::vector<model::ntp>, model::timeout_clock::time_point);

ss::future<result<node_decommission_progress>>
get_node_decommission_progress(
Expand Down
14 changes: 13 additions & 1 deletion src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,11 @@ ss::future<> controller_backend::reconcile_ntp(deltas_t& deltas) {
it->delta.previous_replica_set);

it->retries++;
if (ec.category() == error_category()) {
it->last_error = static_cast<errc>(ec.value());
} else {
it->last_error = errc::partition_operation_failed;
}
stop = true;
continue;
}
Expand Down Expand Up @@ -784,6 +789,7 @@ ss::future<> controller_backend::reconcile_ntp(deltas_t& deltas) {
*it,
std::current_exception());
it->retries++;
it->last_error = errc::partition_operation_failed;
stop = true;
continue;
}
Expand Down Expand Up @@ -1809,7 +1815,13 @@ controller_backend::list_ntp_deltas(const model::ntp& ntp) const {

std::ostream&
operator<<(std::ostream& o, const controller_backend::delta_metadata& m) {
fmt::print(o, "{{delta: {}, retries: {}}}", m.delta, m.retries);
fmt::print(
o,
"{{delta: {}, retries: {}, last_error: {}({})}}",
m.delta,
m.retries,
m.last_error,
error_category().message(static_cast<int>(m.last_error)));
return o;
}

Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once

#include "cluster/errc.h"
#include "cluster/fwd.h"
#include "cluster/topic_table.h"
#include "cluster/types.h"
Expand Down Expand Up @@ -225,6 +226,7 @@ class controller_backend

topic_table::delta delta;
uint64_t retries = 0;
cluster::errc last_error = errc::waiting_for_recovery;
friend std::ostream& operator<<(std::ostream&, const delta_metadata&);
};

Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/errc.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ enum class errc : int16_t {
cluster_already_exists,
no_partition_assignments,
failed_to_create_partition,
partition_operation_failed,
};
struct errc_category final : public std::error_category {
const char* name() const noexcept final { return "cluster::errc"; }
Expand Down Expand Up @@ -191,6 +192,9 @@ struct errc_category final : public std::error_category {
return "No replica assignments for the requested partition";
case errc::failed_to_create_partition:
return "Failed to create partition replica instance";
case errc::partition_operation_failed:
return "Generic failure occurred during partition operation "
"execution";
}
return "cluster::errc::unknown";
}
Expand Down
47 changes: 45 additions & 2 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2080,16 +2080,29 @@ struct delete_acls_reply

struct backend_operation
: serde::
envelope<backend_operation, serde::version<0>, serde::compat_version<0>> {
envelope<backend_operation, serde::version<1>, serde::compat_version<0>> {
ss::shard_id source_shard;
partition_assignment p_as;
topic_table_delta::op_type type;

uint64_t current_retry;
cluster::errc last_operation_result;
model::revision_id revision_of_operation;

friend std::ostream& operator<<(std::ostream&, const backend_operation&);

friend bool operator==(const backend_operation&, const backend_operation&)
= default;

auto serde_fields() { return std::tie(source_shard, p_as, type); }
auto serde_fields() {
return std::tie(
source_shard,
p_as,
type,
current_retry,
last_operation_result,
revision_of_operation);
}
};

struct create_data_policy_cmd_data
Expand Down Expand Up @@ -2392,6 +2405,11 @@ class ntp_reconciliation_state
const std::vector<backend_operation>& pending_operations() const {
return _backend_operations;
}

std::vector<backend_operation>& pending_operations() {
return _backend_operations;
}

reconciliation_status status() const { return _status; }

std::error_code error() const { return make_error_code(_error); }
Expand All @@ -2415,6 +2433,31 @@ class ntp_reconciliation_state
errc _error;
};

struct node_backend_operations {
node_backend_operations(
model::node_id id, std::vector<backend_operation> ops)
: node_id(id)
, backend_operations(std::move(ops)) {}

model::node_id node_id;
std::vector<backend_operation> backend_operations;
};

struct node_error {
node_error(model::node_id nid, std::error_code err_code)
: node_id(nid)
, ec(err_code) {}

model::node_id node_id;
std::error_code ec;
};

struct global_reconciliation_state {
absl::node_hash_map<model::ntp, std::vector<node_backend_operations>>
ntp_backend_operations;
std::vector<node_error> node_errors;
};

struct reconciliation_state_request
: serde::envelope<
reconciliation_state_request,
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ constexpr error_code map_topic_error_code(cluster::errc code) {
case cluster::errc::cluster_already_exists:
case cluster::errc::no_partition_assignments:
case cluster::errc::failed_to_create_partition:
case cluster::errc::partition_operation_failed:
break;
}
return error_code::unknown_server_error;
Expand Down
71 changes: 70 additions & 1 deletion src/v/redpanda/admin/api-doc/partition.json
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,75 @@
"status": {
"type": "string",
"description": "Reconfiguration status"
},
"current_replicas": {
"type": "array",
"items": {
"type": "assignment"
},
"description": "Current replica set"
},
"bytes_left_to_move": {
"type": "long",
"description": "bytes left to move to new replicas"
},
"bytes_moved": {
"type": "long",
"description": "bytes already moved to new replicas"
},
"partition_size": {
"type": "long",
"description": "current size of partition"
},
"reconciliation_statuses": {
"type": "array",
"items": {
"type": "partition_reconciliation_status"
},
"description": "list of reconciliation statuses per node"
}
}
},
"partition_reconciliation_status": {
"id": "partition_reconciliation_status",
"description": "Partition reconciliation status on a node",
"properties": {
"node_id": {
"type": "int",
"description": "Id of a node reporting the status"
},
"operations": {
"type": "array",
"items": {
"type": "partition_reconciliation_operation"
},
"description": "list of operations being executed on the node"
}
}
},
"partition_reconciliation_operation": {
"id": "partition_reconciliation_operation",
"description": "Partition reconciliation being executed by a a node on specific shard",
"properties": {
"type": {
"type": "string",
"description": "Type of an operation that is currently being executed"
},
"core": {
"type": "int",
"description": "Core that the status come from"
},
"retry_number": {
"type": "int",
"description": "Number of currently executed operations retry"
},
"revision": {
"type": "int",
"description": "Currently executed operation revision"
},
"status": {
"type": "string",
"description": "Result of last operation retry"
}
}
},
Expand All @@ -542,4 +611,4 @@
}
}
}
}
}
Loading