Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: Avoid oversize allocs for topic creation and configuration #16982

Merged
merged 5 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/v/cloud_storage/tests/topic_recovery_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,13 @@ FIXTURE_TEST(recovery_with_missing_topic_manifest, fixture) {
}

FIXTURE_TEST(recovery_with_existing_topic, fixture) {
cluster::topic_configuration cfg{
model::ns{"kafka"}, model::topic{"test"}, 1, 1};
std::vector<cluster::custom_assignable_topic_configuration> topic_cfg = {
cluster::custom_assignable_topic_configuration{std::move(cfg)}};
cluster::custom_assignable_topic_configuration_vector topic_cfg{
{cluster::custom_assignable_topic_configuration{
{model::ns{"kafka"}, model::topic{"test"}, 1, 1}}}};
auto topic_create_result = app.controller->get_topics_frontend()
.local()
.create_topics(topic_cfg, model::no_timeout)
.create_topics(
std::move(topic_cfg), model::no_timeout)
.get();
wait_for_topics(std::move(topic_create_result)).get();
set_expectations_and_listen(
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/cloud_metadata/cluster_recovery_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ ss::future<cluster::errc> cluster_recovery_backend::do_action(
case recovery_stage::recovered_remote_topic_data: {
retry_chain_node topics_retry(&parent_retry);
// TODO: batch this up.
std::vector<topic_configuration> topics;
topic_configuration_vector topics;
for (size_t i = 0; i < actions.remote_topics.size(); i++) {
auto& topic_cfg = actions.remote_topics[i];
if (topic_cfg.is_internal()) {
Expand Down Expand Up @@ -259,7 +259,7 @@ ss::future<cluster::errc> cluster_recovery_backend::do_action(
case recovery_stage::recovered_topic_data: {
retry_chain_node topics_retry(&parent_retry);
// TODO: batch this up.
std::vector<topic_configuration> topics;
topic_configuration_vector topics;
for (size_t i = 0; i < actions.local_topics.size(); i++) {
topics.emplace_back(std::move(actions.local_topics[i]));
vlog(clusterlog.debug, "Creating topic {}", topics.back().tp_ns);
Expand Down
11 changes: 6 additions & 5 deletions src/v/cluster/cluster_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <seastar/core/future.hh>

#include <chrono>
#include <iterator>

namespace cluster {

Expand Down Expand Up @@ -110,13 +111,13 @@ bool are_replica_sets_equal(
return l_sorted == r_sorted;
}

std::vector<custom_assignable_topic_configuration>
without_custom_assignments(std::vector<topic_configuration> topics) {
std::vector<custom_assignable_topic_configuration> assignable_topics;
custom_assignable_topic_configuration_vector
without_custom_assignments(topic_configuration_vector topics) {
custom_assignable_topic_configuration_vector assignable_topics;
assignable_topics.reserve(topics.size());
std::transform(
topics.begin(),
topics.end(),
std::make_move_iterator(topics.begin()),
std::make_move_iterator(topics.end()),
std::back_inserter(assignable_topics),
[](topic_configuration cfg) {
return custom_assignable_topic_configuration(std::move(cfg));
Expand Down
68 changes: 36 additions & 32 deletions src/v/cluster/cluster_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,48 @@

#include <seastar/core/sharded.hh>

#include <concepts>
#include <ranges>
#include <system_error>
#include <utility>

namespace detail {

template<typename T, typename Fn>
template<std::ranges::range Rng, typename Fn>
requires std::same_as<
std::invoke_result_t<Fn, std::ranges::range_value_t<Rng>>,
cluster::topic_result>
std::vector<cluster::topic_result>
make_error_topic_results(const std::vector<T>& topics, Fn fn) {
make_error_topic_results(const Rng& topics, Fn fn) {
std::vector<cluster::topic_result> results;
results.reserve(topics.size());
std::transform(
topics.cbegin(),
topics.cend(),
std::back_inserter(results),
[&fn](const T& t) { return fn(t); });
topics.cbegin(), topics.cend(), std::back_inserter(results), fn);
return results;
}

template<typename T>
concept has_tp_ns = requires {
{
std::declval<T>().tp_ns
} -> std::convertible_to<const model::topic_namespace&>;
};

template<typename T>
const model::topic_namespace& extract_tp_ns(const T& t) {
if constexpr (std::same_as<T, model::topic_namespace>) {
return t;
} else if constexpr (has_tp_ns<T>) {
return t.tp_ns;
} else if constexpr (std::same_as<
T,
cluster::custom_assignable_topic_configuration>) {
return t.cfg.tp_ns;
} else {
static_assert(always_false_v<T>, "couldn't extract tp_ns");
}
}

} // namespace detail

namespace config {
Expand All @@ -56,31 +80,11 @@ class metadata_cache;
class partition;

/// Creates the same topic_result for all requests
template<typename T>
requires requires(const T& req) {
{ req.tp_ns } -> std::convertible_to<const model::topic_namespace&>;
}
std::vector<topic_result>
make_error_topic_results(const std::vector<T>& requests, errc error_code) {
return detail::make_error_topic_results(requests, [error_code](const T& r) {
return topic_result(r.tp_ns, error_code);
});
}

inline std::vector<topic_result> make_error_topic_results(
const std::vector<model::topic_namespace>& topics, errc error_code) {
return detail::make_error_topic_results(
topics, [error_code](const model::topic_namespace& t) {
return topic_result(t, error_code);
});
}

inline std::vector<topic_result> make_error_topic_results(
const std::vector<custom_assignable_topic_configuration>& requests,
errc error_code) {
std::vector<topic_result> make_error_topic_results(
const std::ranges::range auto& topics, errc error_code) {
return detail::make_error_topic_results(
requests, [error_code](const custom_assignable_topic_configuration& r) {
return topic_result(r.cfg.tp_ns, error_code);
topics, [error_code](const auto& t) {
return topic_result(detail::extract_tp_ns(t), error_code);
});
}

Expand Down Expand Up @@ -189,8 +193,8 @@ ss::future<std::error_code> replicate_and_wait(
});
}

std::vector<custom_assignable_topic_configuration>
without_custom_assignments(std::vector<topic_configuration>);
custom_assignable_topic_configuration_vector
without_custom_assignments(topic_configuration_vector);

template<class T>
inline std::vector<T>
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ service::purged_topic(purged_topic_request r, rpc::streaming_context&) {
[](topic_result res) { return purged_topic_reply(std::move(res)); });
}

std::pair<std::vector<model::topic_metadata>, std::vector<topic_configuration>>
std::pair<std::vector<model::topic_metadata>, topic_configuration_vector>
service::fetch_metadata_and_cfg(const std::vector<topic_result>& res) {
std::vector<model::topic_metadata> md;
std::vector<topic_configuration> cfg;
topic_configuration_vector cfg;
md.reserve(res.size());
for (const auto& r : res) {
if (r.ec == errc::success) {
Expand Down Expand Up @@ -237,7 +237,7 @@ service::do_update_topic_properties(update_topic_properties_request req) {
// local topic frontend instance will eventually dispatch request to _raft0
// core
auto res = co_await _topics_frontend.local().update_topic_properties(
req.updates,
std::move(req).updates,
config::shard_local_cfg().replicate_append_timeout_ms()
+ model::timeout_clock::now());

Expand Down
5 changes: 2 additions & 3 deletions src/v/cluster/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,8 @@ class service : public controller_service {

private:
static constexpr auto default_move_interruption_timeout = 10s;
std::
pair<std::vector<model::topic_metadata>, std::vector<topic_configuration>>
fetch_metadata_and_cfg(const std::vector<topic_result>&);
std::pair<std::vector<model::topic_metadata>, topic_configuration_vector>
fetch_metadata_and_cfg(const std::vector<topic_result>&);

ss::future<finish_partition_update_reply>
do_finish_partition_update(finish_partition_update_request);
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/tests/autocreate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
#include <chrono>
#include <vector>

std::vector<cluster::topic_configuration> test_topics_configuration(
cluster::topic_configuration_vector test_topics_configuration(
cluster::replication_factor rf = cluster::replication_factor{1}) {
return std::vector<cluster::topic_configuration>{
return cluster::topic_configuration_vector{
cluster::topic_configuration(test_ns, model::topic("tp-1"), 10, rf),
cluster::topic_configuration(test_ns, model::topic("tp-2"), 10, rf),
cluster::topic_configuration(test_ns, model::topic("tp-3"), 10, rf),
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/tests/cluster_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class cluster_test_fixture {
return it.second->app.controller->is_raft0_leader();
});
auto& app_0 = leader_it->second->app;
std::vector<cluster::topic_configuration> cfgs = {
cluster::topic_configuration_vector cfgs = {
cluster::topic_configuration{
tp_ns.ns, tp_ns.tp, partitions, replication_factor}};
auto results = app_0.controller->get_topics_frontend()
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/tests/controller_api_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ FIXTURE_TEST(test_querying_ntp_status, cluster_test_fixture) {
auto leader = get_node_application(*leader_id);

// create topic
std::vector<cluster::topic_configuration> topics;
cluster::topic_configuration_vector topics;
topics.emplace_back(test_ntp.ns, test_ntp.tp.topic, 3, 1);

leader->controller->get_topics_frontend()
.local()
.create_topics(
cluster::without_custom_assignments(topics),
cluster::without_custom_assignments(std::move(topics)),
1s + model::timeout_clock::now())
.get();

Expand Down
11 changes: 5 additions & 6 deletions src/v/cluster/tests/health_monitor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,11 @@ FIXTURE_TEST(test_ntp_filter, cluster_test_fixture) {
}).get();

// create topics
std::vector<cluster::topic_configuration> topics;
topics.push_back(topic_cfg(model::kafka_namespace, "tp-1", 3, 3));
topics.push_back(topic_cfg(model::kafka_namespace, "tp-2", 3, 2));
topics.push_back(topic_cfg(model::kafka_namespace, "tp-3", 3, 1));
topics.push_back(
topic_cfg(model::kafka_internal_namespace, "internal-1", 3, 2));
cluster::topic_configuration_vector topics{
topic_cfg(model::kafka_namespace, "tp-1", 3, 3),
topic_cfg(model::kafka_namespace, "tp-2", 3, 2),
topic_cfg(model::kafka_namespace, "tp-3", 3, 1),
topic_cfg(model::kafka_internal_namespace, "internal-1", 3, 2)};

n1->controller->get_topics_frontend()
.local()
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/tests/metadata_dissemination_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ FIXTURE_TEST(
BOOST_REQUIRE_EQUAL(cache_2.node_count(), 3);

// Create topic with replication factor 1
std::vector<cluster::topic_configuration> topics;
cluster::topic_configuration_vector topics;
topics.emplace_back(model::ns("default"), model::topic("test_1"), 3, 1);
cntrl_0->controller->get_topics_frontend()
.local()
Expand Down Expand Up @@ -122,7 +122,7 @@ FIXTURE_TEST(test_metadata_dissemination_joining_node, cluster_test_fixture) {
BOOST_REQUIRE_EQUAL(cache_1.node_count(), 2);

// Create topic with replication factor 1
std::vector<cluster::topic_configuration> topics;
cluster::topic_configuration_vector topics;
topics.emplace_back(model::ns("default"), model::topic("test_1"), 3, 1);
cntrl_0->controller->get_topics_frontend()
.local()
Expand Down
19 changes: 9 additions & 10 deletions src/v/cluster/tests/serialization_rt_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1337,7 +1337,7 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
roundtrip_test(data);
}
{
std::vector<cluster::topic_properties_update> updates;
cluster::topic_properties_update_vector updates;
for (int i = 0, mi = random_generators::get_int(10); i < mi; i++) {
cluster::property_update<std::optional<v8_engine::data_policy>>
data_policy;
Expand All @@ -1350,16 +1350,15 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
cluster::incremental_topic_custom_updates custom_properties{
.data_policy = data_policy,
};
updates.push_back(cluster::topic_properties_update{
updates.emplace_back(
model::random_topic_namespace(),
random_incremental_topic_updates(),
custom_properties,
});
custom_properties);
}
cluster::update_topic_properties_request data{
.updates = updates,
.updates = std::move(updates),
};
roundtrip_test(data);
roundtrip_test(std::move(data));
}
{
cluster::topic_result data{
Expand Down Expand Up @@ -1698,18 +1697,18 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
roundtrip_test(data);
}
{
std::vector<cluster::topic_configuration> topics;
cluster::topic_configuration_vector topics;
for (auto i = 0, mi = random_generators::get_int(20); i < mi; ++i) {
topics.push_back(random_topic_configuration());
}
cluster::create_topics_request data{
.topics = topics,
.topics = std::move(topics),
.timeout = random_timeout_clock_duration(),
};
// adl encoding for topic_configuration doesn't encode/decode to exact
// equality, but also already existed prior to serde support being added
// so only testing the serde case.
roundtrip_test(data);
roundtrip_test(std::move(data));
}
{
auto data = random_partition_metadata();
Expand Down Expand Up @@ -1743,7 +1742,7 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
// adl serialization doesn't preserve equality for topic_configuration.
// serde serialization does and was added after support for adl so adl
// semantics are preserved.
roundtrip_test(data);
roundtrip_test(std::move(data));
}
{
raft::transfer_leadership_request data{
Expand Down
8 changes: 5 additions & 3 deletions src/v/cluster/topic_recovery_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "cloud_storage/topic_manifest.h"
#include "cluster/topic_recovery_status_frontend.h"
#include "cluster/topics_frontend.h"
#include "cluster/types.h"

#include <seastar/http/request.hh>
#include <seastar/util/defer.hh>
Expand Down Expand Up @@ -410,7 +411,7 @@ static cluster::topic_configuration make_topic_config(

ss::future<std::vector<cluster::topic_result>>
topic_recovery_service::create_topics(const recovery_request& request) {
std::vector<cluster::topic_configuration> topic_configs;
cluster::topic_configuration_vector topic_configs;
topic_configs.reserve(_downloaded_manifests->size());

std::transform(
Expand All @@ -420,7 +421,8 @@ topic_recovery_service::create_topics(const recovery_request& request) {
[&request](const auto& m) { return make_topic_config(m, request); });

co_return co_await _topics_frontend.local().autocreate_topics(
topic_configs, config::shard_local_cfg().create_topic_timeout_ms());
std::move(topic_configs),
config::shard_local_cfg().create_topic_timeout_ms());
}

ss::future<std::vector<cloud_storage::topic_manifest>>
Expand Down Expand Up @@ -532,7 +534,7 @@ ss::future<> topic_recovery_service::reset_topic_configurations() {
co_return;
}

std::vector<cluster::topic_properties_update> updates;
cluster::topic_properties_update_vector updates;
updates.reserve(_downloaded_manifests->size());
std::transform(
std::make_move_iterator(_downloaded_manifests->begin()),
Expand Down
Loading
Loading