Skip to content

Commit

Permalink
Merge pull request #6744 from dlex/333_cluster-bootstrap
Browse files Browse the repository at this point in the history
Seeds Driven Cluster Bootstrap
  • Loading branch information
dlex committed Oct 20, 2022
2 parents bf10533 + 50840ed commit 355adde
Show file tree
Hide file tree
Showing 46 changed files with 1,215 additions and 176 deletions.
1 change: 1 addition & 0 deletions src/v/cluster/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ v_cc_library(
node_status_backend.cc
node_status_rpc_handler.cc
bootstrap_service.cc
bootstrap_backend.cc
DEPS
Seastar::seastar
bootstrap_rpc
Expand Down
117 changes: 117 additions & 0 deletions src/v/cluster/bootstrap_backend.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2022 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#include "cluster/bootstrap_backend.h"

#include "cluster/cluster_uuid.h"
#include "cluster/commands.h"
#include "cluster/logger.h"
#include "cluster/types.h"
#include "security/credential_store.h"

namespace cluster {

bootstrap_backend::bootstrap_backend(
ss::sharded<security::credential_store>& credentials,
ss::sharded<storage::api>& storage)
: _credentials(credentials)
, _storage(storage) {}

namespace {

std::error_code
do_apply(user_and_credential cred, security::credential_store& store) {
if (store.contains(cred.username)) {
return errc::user_exists;
}
store.put(cred.username, std::move(cred.credential));
return errc::success;
}

template<typename Cmd>
ss::future<std::error_code> dispatch_updates_to_cores(
Cmd cmd, ss::sharded<security::credential_store>& sharded_service) {
auto res = co_await sharded_service.map_reduce0(
[cmd](security::credential_store& service) {
return do_apply(std::move(cmd), service);
},
std::optional<std::error_code>{},
[](std::optional<std::error_code> result, std::error_code error_code) {
if (!result.has_value()) {
result = error_code;
} else {
vassert(
result.value() == error_code,
"State inconsistency across shards detected, "
"expected result: {}, have: {}",
result->value(),
error_code);
}
return result.value();
});
co_return res.value();
}

} // namespace

ss::future<std::error_code>
bootstrap_backend::apply_update(model::record_batch b) {
vlog(clusterlog.info, "Applying update to bootstrap_manager");

// handle node managements command
static constexpr auto accepted_commands
= make_commands_list<bootstrap_cluster_cmd>();
auto cmd = co_await cluster::deserialize(std::move(b), accepted_commands);

co_return co_await ss::visit(
cmd, [this](bootstrap_cluster_cmd cmd) -> ss::future<std::error_code> {
if (_storage.local().get_cluster_uuid().has_value()) {
vlog(
clusterlog.debug,
"Skipping bootstrap_cluster_cmd {}, current cluster_uuid: {}",
cmd.value.uuid,
*_storage.local().get_cluster_uuid());
co_return errc::cluster_already_exists;
}

if (cmd.value.bootstrap_user_cred) {
const std::error_code res
= co_await dispatch_updates_to_cores<user_and_credential>(
*cmd.value.bootstrap_user_cred, _credentials);
if (!res) {
vlog(
clusterlog.warn,
"Failed to dispatch bootstrap user: {}",
res);
co_return res;
}
vlog(
clusterlog.info,
"Bootstrap user {} created",
cmd.value.bootstrap_user_cred->username);
}

co_await _storage.invoke_on_all(
[&new_cluster_uuid = cmd.value.uuid](storage::api& storage) {
storage.set_cluster_uuid(new_cluster_uuid);
return ss::make_ready_future();
});
co_await _storage.local().kvs().put(
cluster_uuid_key_space,
cluster_uuid_key,
serde::to_iobuf(cmd.value.uuid));

vlog(clusterlog.info, "Cluster created {}", cmd.value.uuid);
co_return errc::success;
});
}

} // namespace cluster
54 changes: 54 additions & 0 deletions src/v/cluster/bootstrap_backend.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2022 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#pragma once

#include "cluster/fwd.h"
#include "model/fundamental.h"
#include "model/record.h"
#include "model/record_batch_types.h"
#include "storage/api.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/sharded.hh>

#include <optional>

namespace security {
class credential_store;
}

namespace cluster {

/**
* This class applies the cluster intitalization message to
* - itself, storing the cluster UUID value, also duplicating it to the kvstore
* - credintial_store, to initialize the bootstrap user
* - TODO: apply the initial licence
*/
class bootstrap_backend final {
public:
bootstrap_backend(
ss::sharded<security::credential_store>&, ss::sharded<storage::api>&);

ss::future<std::error_code> apply_update(model::record_batch);

bool is_batch_applicable(const model::record_batch& b) {
return b.header().type
== model::record_batch_type::cluster_bootstrap_cmd;
}

private:
ss::sharded<security::credential_store>& _credentials;
ss::sharded<storage::api>& _storage;
};

} // namespace cluster
20 changes: 19 additions & 1 deletion src/v/cluster/bootstrap_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,32 @@
#include "cluster/bootstrap_service.h"

#include "cluster/bootstrap_types.h"
#include "cluster/cluster_utils.h"
#include "cluster/logger.h"
#include "config/node_config.h"
#include "features/feature_table.h"
#include "storage/api.h"

namespace cluster {

ss::future<cluster_bootstrap_info_reply>
bootstrap_service::cluster_bootstrap_info(
cluster_bootstrap_info_request&&, rpc::streaming_context&) {
cluster_bootstrap_info_reply r{};
// TODO: add fields!
r.broker = make_self_broker(config::node());
r.version = features::feature_table::get_latest_logical_version();
const std::vector<config::seed_server>& seed_servers
= config::node().seed_servers();
r.seed_servers.reserve(seed_servers.size());
std::transform(
seed_servers.cbegin(),
seed_servers.cend(),
std::back_inserter(r.seed_servers),
[](const config::seed_server& seed_server) { return seed_server.addr; });
r.empty_seed_starts_cluster = config::node().empty_seed_starts_cluster();
r.cluster_uuid = _storage.local().get_cluster_uuid();

vlog(clusterlog.debug, "Replying cluster_bootstrap_info: {}", r);
co_return r;
}

Expand Down
16 changes: 12 additions & 4 deletions src/v/cluster/bootstrap_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,28 @@

#include "cluster/bootstrap_types.h"
#include "cluster/cluster_bootstrap_service.h"
#include "storage/fwd.h"

#include <seastar/core/sharded.hh>

namespace cluster {

// RPC service to be used when initialially bootstrapping a cluster.
// TODO: talk about how it's a slim service with few dependencies.
// RPC service used to determine cluster info when bootstrapping a cluster
// or to discover the existence of an existing cluster.
class bootstrap_service : public cluster_bootstrap_service {
public:
bootstrap_service(ss::scheduling_group sg, ss::smp_service_group ssg)
: cluster_bootstrap_service(sg, ssg) {}
bootstrap_service(
ss::scheduling_group sg,
ss::smp_service_group ssg,
ss::sharded<storage::api>& storage)
: cluster_bootstrap_service(sg, ssg)
, _storage(storage) {}

ss::future<cluster_bootstrap_info_reply> cluster_bootstrap_info(
cluster_bootstrap_info_request&&, rpc::streaming_context&) override;

private:
ss::sharded<storage::api>& _storage;
};

} // namespace cluster
28 changes: 24 additions & 4 deletions src/v/cluster/bootstrap_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0
#pragma once

#include "cluster/types.h"
#include "model/fundamental.h"
#include "serde/serde.h"

Expand All @@ -34,13 +35,32 @@ struct cluster_bootstrap_info_reply
: serde::envelope<cluster_bootstrap_info_reply, serde::version<0>> {
using rpc_adl_exempt = std::true_type;

// TODO: add fields!
model::broker broker;
cluster_version version;
std::vector<net::unresolved_address> seed_servers;
bool empty_seed_starts_cluster;
std::optional<model::cluster_uuid> cluster_uuid;

auto serde_fields() { return std::tie(); }
auto serde_fields() {
return std::tie(
broker,
version,
seed_servers,
empty_seed_starts_cluster,
cluster_uuid);
}

friend std::ostream&
operator<<(std::ostream& o, const cluster_bootstrap_info_reply&) {
fmt::print(o, "{{}}");
operator<<(std::ostream& o, const cluster_bootstrap_info_reply& v) {
fmt::print(
o,
"{{broker: {}, version: {}, seed_servers: {}, ESCB: {}, "
"cluster_UUID: {}}}",
v.broker,
v.version,
v.seed_servers,
v.empty_seed_starts_cluster,
v.cluster_uuid);
return o;
}
};
Expand Down
Loading

0 comments on commit 355adde

Please sign in to comment.