Skip to content

Commit

Permalink
Merge pull request #7415 from vbotbuildovich/backport-5558-v22.3.x-111
Browse files Browse the repository at this point in the history
[v22.3.x] Fix cross-shard allocator manipulation
  • Loading branch information
dotnwat committed Nov 23, 2022
2 parents 215f5bf + 1cd29ea commit 5be3e8e
Show file tree
Hide file tree
Showing 15 changed files with 100 additions and 42 deletions.
2 changes: 1 addition & 1 deletion src/v/bytes/oncore.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class oncore final {
void verify_shard_source_location(const char* file, int linenum) const;

private:
const shard_id_type _owner_shard;
shard_id_type _owner_shard;
};

// Next function should be replace with source_location in c++20 very soon
Expand Down
5 changes: 3 additions & 2 deletions src/v/cluster/scheduling/allocation_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,12 @@ std::ostream& operator<<(std::ostream& o, allocation_node::state s) {
std::ostream& operator<<(std::ostream& o, const allocation_node& n) {
fmt::print(
o,
"{{node: {}, max_partitions_per_core: {}, state: {}, partition_capacity: "
"{}, weights: [",
"{{node: {}, max_partitions_per_core: {}, state: {}, allocated: {}, "
"partition_capacity: {}, weights: [",
n._id,
n._partitions_per_shard(),
n._state,
n._allocated_partitions,
n.partition_capacity());

for (auto w : n._weights) {
Expand Down
23 changes: 23 additions & 0 deletions src/v/cluster/scheduling/allocation_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "cluster/scheduling/allocation_state.h"

#include "bytes/oncore.h"
#include "cluster/logger.h"
#include "ssx/sformat.h"

Expand All @@ -17,6 +18,7 @@ namespace cluster {
void allocation_state::rollback(
const std::vector<partition_assignment>& v,
const partition_allocation_domain domain) {
verify_shard();
for (auto& as : v) {
rollback(as.replicas, domain);
// rollback for each assignment as the groups are distinct
Expand All @@ -27,12 +29,14 @@ void allocation_state::rollback(
void allocation_state::rollback(
const std::vector<model::broker_shard>& v,
const partition_allocation_domain domain) {
verify_shard();
for (auto& bs : v) {
deallocate(bs, domain);
}
}

int16_t allocation_state::available_nodes() const {
verify_shard();
return std::count_if(
_nodes.begin(), _nodes.end(), [](const underlying_t::value_type& p) {
return p.second->is_active();
Expand All @@ -54,6 +58,7 @@ void allocation_state::apply_update(
std::vector<model::broker_shard> replicas,
raft::group_id group_id,
const partition_allocation_domain domain) {
verify_shard();
if (replicas.empty()) {
return;
}
Expand Down Expand Up @@ -92,6 +97,7 @@ void allocation_state::register_node(allocation_state::node_ptr n) {

void allocation_state::update_allocation_nodes(
const std::vector<model::broker>& brokers) {
verify_shard();
// deletions
for (auto& [id, node] : _nodes) {
auto it = std::find_if(
Expand Down Expand Up @@ -127,6 +133,7 @@ void allocation_state::update_allocation_nodes(
}

void allocation_state::decommission_node(model::node_id id) {
verify_shard();
auto it = _nodes.find(id);
if (it == _nodes.end()) {
throw std::invalid_argument(
Expand All @@ -137,6 +144,7 @@ void allocation_state::decommission_node(model::node_id id) {
}

void allocation_state::recommission_node(model::node_id id) {
verify_shard();
auto it = _nodes.find(id);
if (it == _nodes.end()) {
throw std::invalid_argument(
Expand All @@ -158,13 +166,15 @@ bool allocation_state::is_empty(model::node_id id) const {
void allocation_state::deallocate(
const model::broker_shard& replica,
const partition_allocation_domain domain) {
verify_shard();
if (auto it = _nodes.find(replica.node_id); it != _nodes.end()) {
it->second->deallocate_on(replica.shard, domain);
}
}

result<uint32_t> allocation_state::allocate(
model::node_id id, const partition_allocation_domain domain) {
verify_shard();
if (auto it = _nodes.find(id); it != _nodes.end()) {
if (it->second->is_full()) {
return errc::invalid_node_operation;
Expand All @@ -183,4 +193,17 @@ allocation_state::get_rack_id(model::node_id id) const {
vassert(false, "unexpected node id {}", id);
}

void allocation_state::verify_shard() const {
/* This is a consistency check on the use of the allocation state:
* it checks that the caller is on the same shard the state was originally
* created on. It is easy to inadvertently violate this condition since
* the cluster::allocation_units class embeds a pointer to this state and
* when moved across shards (e.g., because allocation always happens on
* shard 0 but some other shard initiates the request) it may result in
* calls on the current shard being routed back to the original shard
* through this pointer, a thread-safety violation.
*/
oncore_debug_verify(_verify_shard);
}

} // namespace cluster
8 changes: 8 additions & 0 deletions src/v/cluster/scheduling/allocation_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once

#include "bytes/oncore.h"
#include "cluster/scheduling/allocation_node.h"
#include "model/metadata.h"

Expand Down Expand Up @@ -67,10 +68,17 @@ class allocation_state {
std::optional<model::rack_id> get_rack_id(model::node_id) const;

private:
/**
* This function verifies that the current shard matches the shard the
* state was originally created on, aborting if not.
*/
void verify_shard() const;

config::binding<uint32_t> _partitions_per_shard;
config::binding<uint32_t> _partitions_reserve_shard0;

raft::group_id _highest_group{0};
underlying_t _nodes;
expression_in_debug_mode(oncore _verify_shard;)
};
} // namespace cluster
8 changes: 5 additions & 3 deletions src/v/cluster/scheduling/partition_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "units.h"
#include "utils/human.h"

#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>

#include <absl/container/node_hash_set.h>
Expand All @@ -28,6 +29,7 @@
#include <algorithm>
#include <exception>
#include <iterator>
#include <memory>
#include <vector>

namespace cluster {
Expand Down Expand Up @@ -267,7 +269,7 @@ std::error_code partition_allocator::check_cluster_limits(
return errc::success;
}

result<allocation_units>
result<allocation_units::pointer>
partition_allocator::allocate(allocation_request request) {
vlog(
clusterlog.trace,
Expand All @@ -293,8 +295,8 @@ partition_allocator::allocate(allocation_request request) {
_state->next_group_id(), partition_id, std::move(replicas.value()));
}

return allocation_units(
std::move(assignments).finish(), _state.get(), request.domain);
return ss::make_foreign(std::make_unique<allocation_units>(
std::move(assignments).finish(), _state.get(), request.domain));
}

result<std::vector<model::broker_shard>>
Expand Down
6 changes: 5 additions & 1 deletion src/v/cluster/scheduling/partition_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ class partition_allocator {
return _state->contains_node(n);
}

result<allocation_units> allocate(allocation_request);
/**
* Return an allocation_units object wrapping the result of the allocating
* the given allocation request, or an error if it was not possible.
*/
result<allocation_units::pointer> allocate(allocation_request);

/// Reallocates partition replicas, moving them away from decommissioned
/// nodes. Replicas on nodes that were left untouched are not changed.
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/scheduling/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ allocation_units::allocation_units(
}

allocation_units::~allocation_units() {
oncore_debug_verify(_oncore);
for (auto& pas : _assignments) {
for (auto& replica : pas.replicas) {
if (!_previous.contains(replica)) {
Expand Down
17 changes: 17 additions & 0 deletions src/v/cluster/scheduling/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once

#include "bytes/oncore.h"
#include "cluster/types.h"
#include "model/fundamental.h"
#include "vassert.h"
Expand Down Expand Up @@ -126,8 +127,22 @@ struct allocation_constraints {
/**
* RAII based helper holding allocated partititions, allocation is reverted
* after this object goes out of scope.
*
* WARNING: this object contains an embedded reference to the partition
* allocator service (specifically, the allocation_state associated with that
* allocator) and so it must be destroyed on the same shard the original units
* were allocated on. I.e., if original units A are moved into B, B must be
* destroyed the shard A was allocated on (or the shard of the object that was
* moved into A and so on).
*/
struct allocation_units {
/**
* A foreign unique pointer to some units. Given the warning above about
* cross-core destruction, it is best to use this pointer class when dealing
* with units allocated on another core.
*/
using pointer = ss::foreign_ptr<std::unique_ptr<allocation_units>>;

allocation_units(
std::vector<partition_assignment>,
allocation_state*,
Expand Down Expand Up @@ -156,6 +171,8 @@ struct allocation_units {
// keep the pointer to make this type movable
allocation_state* _state;
partition_allocation_domain _domain;
// oncore checker to ensure destruction happens on the same core
[[no_unique_address]] oncore _oncore;
};

/**
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/tests/allocation_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ PERF_TEST_F(partition_allocator_fixture, deallocation_3) {
std::vector<model::broker_shard> replicas;
{
auto vals = std::move(allocator.allocate(std::move(req)).value());
replicas = vals.get_assignments().front().replicas;
replicas = vals->get_assignments().front().replicas;
allocator.update_allocation_state(
vals.get_assignments().front().replicas,
vals.get_assignments().front().group,
vals->get_assignments().front().replicas,
vals->get_assignments().front().group,
cluster::partition_allocation_domains::common);
}
perf_tests::do_not_optimize(replicas);
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/tests/partition_allocator_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ struct partition_allocator_fixture {
auto units = allocator.allocate(
make_allocation_request(max_capacity(), 1));

for (auto& pas : units.value().get_assignments()) {
for (auto& pas : units.value()->get_assignments()) {
allocator.state().apply_update(
pas.replicas,
pas.group,
Expand Down
Loading

0 comments on commit 5be3e8e

Please sign in to comment.