From abc00f00ed652b9a74f540f234dd519c9a6b4779 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 5 May 2022 12:04:50 +0200 Subject: [PATCH 1/2] c/topics_table: optimized per partition metadata lookup Optimized per partition metadata lookup by storing partition metadata in `absl::btree_set` the set provides fast lookup while keeping partitions ordered. Limited number of times partition metadata are copied by returning metadata object stored directly in a map. Signed-off-by: Michal Maslanka --- src/v/cluster/controller_api.cc | 16 +-- src/v/cluster/health_manager.cc | 2 +- src/v/cluster/members_backend.cc | 4 +- src/v/cluster/metadata_cache.cc | 36 +++-- src/v/cluster/metadata_cache.h | 13 +- src/v/cluster/metrics_reporter.cc | 2 +- src/v/cluster/scheduling/leader_balancer.cc | 9 +- src/v/cluster/service.cc | 2 +- src/v/cluster/tests/autocreate_test.cc | 18 ++- src/v/cluster/tests/create_partitions_test.cc | 10 +- .../tests/metadata_dissemination_test.cc | 15 +- src/v/cluster/tests/partition_moving_test.cc | 11 +- .../cluster/tests/rebalancing_tests_fixture.h | 9 +- .../tests/replicas_rebalancing_tests.cc | 6 +- src/v/cluster/tests/topic_table_test.cc | 31 ++-- .../tests/topic_updates_dispatcher_test.cc | 27 ++-- src/v/cluster/topic_table.cc | 135 +++++------------- src/v/cluster/topic_table.h | 32 ++--- src/v/cluster/topic_updates_dispatcher.cc | 53 +++---- src/v/cluster/topic_updates_dispatcher.h | 2 +- src/v/cluster/tx_gateway_frontend.cc | 3 +- src/v/cluster/types.cc | 58 ++++++++ src/v/cluster/types.h | 52 +++++++ src/v/coproc/reconciliation_backend.cc | 2 +- src/v/kafka/server/coordinator_ntp_mapper.h | 6 +- .../kafka/server/group_metadata_migration.cc | 4 +- .../server/handlers/create_partitions.cc | 6 +- src/v/kafka/server/handlers/metadata.cc | 67 ++++----- src/v/kafka/server/handlers/offset_commit.cc | 13 +- .../server/handlers/topics/topic_utils.cc | 2 +- .../server/handlers/txn_offset_commit.cc | 13 +- src/v/redpanda/tests/fixture.h | 7 +- 32 files changed, 351 insertions(+), 315 deletions(-) diff --git a/src/v/cluster/controller_api.cc b/src/v/cluster/controller_api.cc index 0d7d839802d5..83a0b5e0bfc3 100644 --- a/src/v/cluster/controller_api.cc +++ b/src/v/cluster/controller_api.cc @@ -68,14 +68,14 @@ controller_api::get_reconciliation_state(model::topic_namespace_view tp_ns) { co_return ret_t(errc::topic_not_exists); } std::vector ntps; - ntps.reserve(metadata->partitions.size()); + ntps.reserve(metadata->get_assignments().size()); std::transform( - metadata->partitions.cbegin(), - metadata->partitions.cend(), + metadata->get_assignments().cbegin(), + metadata->get_assignments().cend(), std::back_inserter(ntps), - [tp_ns](const model::partition_metadata& p_md) { - return model::ntp(tp_ns.ns, tp_ns.tp, p_md.id); + [tp_ns](const partition_assignment& p_as) { + return model::ntp(tp_ns.ns, tp_ns.tp, p_as.id); }); co_return co_await get_reconciliation_state(std::move(ntps)); @@ -219,9 +219,9 @@ ss::future controller_api::wait_for_topic( absl::node_hash_map> requests; // collect ntps per node - for (const auto& p_md : metadata->partitions) { - for (const auto& bs : p_md.replicas) { - requests[bs.node_id].emplace_back(tp_ns.ns, tp_ns.tp, p_md.id); + for (const auto& p_as : metadata->get_assignments()) { + for (const auto& bs : p_as.replicas) { + requests[bs.node_id].emplace_back(tp_ns.ns, tp_ns.tp, p_as.id); } } bool ready = false; diff --git a/src/v/cluster/health_manager.cc b/src/v/cluster/health_manager.cc index 885674443319..118abf415917 100644 --- a/src/v/cluster/health_manager.cc +++ b/src/v/cluster/health_manager.cc @@ -160,7 +160,7 @@ health_manager::ensure_topic_replication(model::topic_namespace_view topic) { co_return true; } - for (const auto& partition : metadata->partitions) { + for (const auto& partition : metadata->get_assignments()) { model::ntp ntp(topic.ns, topic.tp, partition.id); if (!co_await ensure_partition_replication(std::move(ntp))) { co_return false; diff --git a/src/v/cluster/members_backend.cc b/src/v/cluster/members_backend.cc index d3e3467e13a6..903a34687840 100644 --- a/src/v/cluster/members_backend.cc +++ b/src/v/cluster/members_backend.cc @@ -167,7 +167,7 @@ void members_backend::calculate_reallocations(update_meta& meta) { if (!cfg.is_topic_replicable()) { continue; } - for (auto& pas : cfg.get_configuration().assignments) { + for (const auto& pas : cfg.get_assignments()) { if (is_in_replica_set(pas.replicas, meta.update.id)) { partition_reallocation reallocation( model::ntp(tp_ns.ns, tp_ns.tp, pas.id), @@ -284,7 +284,7 @@ void members_backend::calculate_reallocations_after_node_added( meta.update.offset); continue; } - for (auto& p : metadata.get_configuration().assignments) { + for (const auto& p : metadata.get_assignments()) { std::erase_if(to_move_from_node, [](const replicas_to_move& v) { return v.left_to_move == 0; }); diff --git a/src/v/cluster/metadata_cache.cc b/src/v/cluster/metadata_cache.cc index 547b1fc5de9c..18b9ca39e2e1 100644 --- a/src/v/cluster/metadata_cache.cc +++ b/src/v/cluster/metadata_cache.cc @@ -9,6 +9,7 @@ #include "cluster/metadata_cache.h" +#include "cluster/fwd.h" #include "cluster/health_monitor_frontend.h" #include "cluster/members_table.h" #include "cluster/partition_leaders_table.h" @@ -62,17 +63,29 @@ metadata_cache::get_source_topic(model::topic_namespace_view tp) const { return mt->second.get_source_topic(); } -std::optional metadata_cache::get_topic_metadata( +std::optional +metadata_cache::get_topic_metadata(model::topic_namespace_view tp) const { + return _topics_state.local().get_topic_metadata(tp); +} + +std::optional metadata_cache::get_model_topic_metadata( model::topic_namespace_view tp, metadata_cache::with_leaders leaders) const { auto md = _topics_state.local().get_topic_metadata(tp); if (!md) { - return md; + return std::nullopt; } + + model::topic_metadata metadata(md->get_configuration().tp_ns); + metadata.partitions.reserve(md->get_assignments().size()); + for (const auto& p_as : md->get_assignments()) { + metadata.partitions.push_back(p_as.create_partition_metadata()); + } + if (leaders) { - fill_partition_leaders(_leaders.local(), *md); + fill_partition_leaders(_leaders.local(), metadata); } - return md; + return metadata; } std::optional metadata_cache::get_topic_cfg(model::topic_namespace_view tp) const { @@ -84,15 +97,8 @@ metadata_cache::get_topic_timestamp_type(model::topic_namespace_view tp) const { return _topics_state.local().get_topic_timestamp_type(tp); } -std::vector metadata_cache::all_topics_metadata( - metadata_cache::with_leaders leaders) const { - auto all_md = _topics_state.local().all_topics_metadata(); - if (leaders) { - for (auto& md : all_md) { - fill_partition_leaders(_leaders.local(), md); - } - } - return all_md; +const topic_table::underlying_t& metadata_cache::all_topics_metadata() const { + return _topics_state.local().all_topics_metadata(); } std::optional metadata_cache::get_broker(model::node_id nid) const { @@ -148,6 +154,10 @@ bool metadata_cache::contains( return _topics_state.local().contains(tp, pid); } +bool metadata_cache::contains(model::topic_namespace_view tp) const { + return _topics_state.local().contains(tp); +} + ss::future metadata_cache::get_leader( const model::ntp& ntp, ss::lowres_clock::time_point tout, diff --git a/src/v/cluster/metadata_cache.h b/src/v/cluster/metadata_cache.h index a3f4100feb3d..3875ff9a5e0f 100644 --- a/src/v/cluster/metadata_cache.h +++ b/src/v/cluster/metadata_cache.h @@ -14,6 +14,7 @@ #include "cluster/fwd.h" #include "cluster/health_monitor_types.h" #include "cluster/partition_leaders_table.h" +#include "cluster/topic_table.h" #include "cluster/types.h" #include "model/fundamental.h" #include "model/metadata.h" @@ -72,8 +73,13 @@ class metadata_cache { ///\brief Returns metadata of single topic. /// /// If topic does not exists it returns an empty optional - std::optional get_topic_metadata( + std::optional get_model_topic_metadata( model::topic_namespace_view, with_leaders = with_leaders::yes) const; + ///\brief Returns metadata of single topic. + /// + /// If topic does not exists it returns an empty optional + std::optional + get_topic_metadata(model::topic_namespace_view) const; ///\brief Returns configuration of single topic. /// @@ -87,9 +93,7 @@ class metadata_cache { std::optional get_topic_timestamp_type(model::topic_namespace_view) const; - /// Returns metadata of all topics. - std::vector - all_topics_metadata(with_leaders = with_leaders::yes) const; + const topic_table::underlying_t& all_topics_metadata() const; /// Returns all brokers, returns copy as the content of broker can change std::vector all_brokers() const; @@ -105,6 +109,7 @@ class metadata_cache { std::optional get_broker(model::node_id) const; bool contains(model::topic_namespace_view, model::partition_id) const; + bool contains(model::topic_namespace_view) const; bool contains(const model::ntp& ntp) const { return contains(model::topic_namespace_view(ntp), ntp.tp.partition); diff --git a/src/v/cluster/metrics_reporter.cc b/src/v/cluster/metrics_reporter.cc index d2bc043cd41c..57d6b1cf2f1d 100644 --- a/src/v/cluster/metrics_reporter.cc +++ b/src/v/cluster/metrics_reporter.cc @@ -219,7 +219,7 @@ metrics_reporter::build_metrics_snapshot() { } snapshot.topic_count++; - snapshot.partition_count += md.get_configuration().cfg.partition_count; + snapshot.partition_count += md.get_configuration().partition_count; } snapshot.nodes.reserve(metrics_map.size()); diff --git a/src/v/cluster/scheduling/leader_balancer.cc b/src/v/cluster/scheduling/leader_balancer.cc index 33a2e7938adf..bd44de4d5824 100644 --- a/src/v/cluster/scheduling/leader_balancer.cc +++ b/src/v/cluster/scheduling/leader_balancer.cc @@ -314,8 +314,7 @@ ss::future leader_balancer::balance() { if (!topic.second.is_topic_replicable()) { continue; } - for (const auto& partition : - topic.second.get_configuration().assignments) { + for (const auto& partition : topic.second.get_assignments()) { if (partition.group == transfer->group) { ntp = model::ntp(topic.first.ns, topic.first.tp, partition.id); break; @@ -438,8 +437,7 @@ leader_balancer::find_leader_shard(const model::ntp& ntp) { // If the inital query was for a non_replicable ntp, the get_leader() call // would have returned its source topic - for (const auto& partition : - config_it->second.get_configuration().assignments) { + for (const auto& partition : config_it->second.get_assignments()) { if (partition.id != ntp.tp.partition) { continue; } @@ -474,8 +472,7 @@ leader_balancer::index_type leader_balancer::build_index() { if (!topic.second.is_topic_replicable()) { continue; } - for (const auto& partition : - topic.second.get_configuration().assignments) { + for (const auto& partition : topic.second.get_assignments()) { if (partition.replicas.empty()) { vlog( clusterlog.warn, diff --git a/src/v/cluster/service.cc b/src/v/cluster/service.cc index 48fc0a99c628..c18db7431c5a 100644 --- a/src/v/cluster/service.cc +++ b/src/v/cluster/service.cc @@ -148,7 +148,7 @@ service::fetch_metadata_and_cfg(const std::vector& res) { md.reserve(res.size()); for (const auto& r : res) { if (r.ec == errc::success) { - auto topic_md = _md_cache.local().get_topic_metadata(r.tp_ns); + auto topic_md = _md_cache.local().get_model_topic_metadata(r.tp_ns); auto topic_cfg = _md_cache.local().get_topic_cfg(r.tp_ns); if (topic_md && topic_cfg) { md.push_back(std::move(topic_md.value())); diff --git a/src/v/cluster/tests/autocreate_test.cc b/src/v/cluster/tests/autocreate_test.cc index acd51af96367..c200a9047ca2 100644 --- a/src/v/cluster/tests/autocreate_test.cc +++ b/src/v/cluster/tests/autocreate_test.cc @@ -39,7 +39,8 @@ void validate_topic_metadata(cluster::metadata_cache& cache) { for (auto& t_cfg : expected_topics) { auto tp_md = cache.get_topic_metadata(t_cfg.tp_ns); BOOST_REQUIRE_EQUAL(tp_md.has_value(), true); - BOOST_REQUIRE_EQUAL(tp_md->partitions.size(), t_cfg.partition_count); + BOOST_REQUIRE_EQUAL( + tp_md->get_assignments().size(), t_cfg.partition_count); auto cfg = cache.get_topic_cfg(t_cfg.tp_ns); BOOST_REQUIRE_EQUAL(cfg->tp_ns, t_cfg.tp_ns); BOOST_REQUIRE_EQUAL(cfg->partition_count, t_cfg.partition_count); @@ -54,11 +55,14 @@ void validate_topic_metadata(cluster::metadata_cache& cache) { void wait_for_leaders( cluster::partition_leaders_table& leader_table, - const model::topic_metadata md) { + const cluster::topic_metadata md) { std::vector> f; - f.reserve(md.partitions.size()); - for (const auto& p : md.partitions) { - model::ntp ntp(md.tp_ns.ns, md.tp_ns.tp, p.id); + f.reserve(md.get_assignments().size()); + for (const auto& p : md.get_assignments()) { + model::ntp ntp( + md.get_configuration().tp_ns.ns, + md.get_configuration().tp_ns.tp, + p.id); f.push_back(leader_table .wait_for_leader( @@ -109,7 +113,7 @@ FIXTURE_TEST(create_single_topic_test_at_current_broker, cluster_test_fixture) { BOOST_REQUIRE_EQUAL(md.has_value(), true); wait_for_leaders(app->controller->get_partition_leaders().local(), *md); - BOOST_REQUIRE_EQUAL(md.value().tp_ns, r.tp_ns); + BOOST_REQUIRE_EQUAL(md->get_configuration().tp_ns, r.tp_ns); } } @@ -151,7 +155,7 @@ FIXTURE_TEST(test_autocreate_on_non_leader, cluster_test_fixture) { BOOST_REQUIRE_EQUAL(md.has_value(), true); wait_for_leaders( app_0->controller->get_partition_leaders().local(), *md); - BOOST_REQUIRE_EQUAL(md.value().tp_ns, r.tp_ns); + BOOST_REQUIRE_EQUAL(md->get_configuration().tp_ns, r.tp_ns); } // Make sure caches are the same validate_topic_metadata(get_local_cache(n_1)); diff --git a/src/v/cluster/tests/create_partitions_test.cc b/src/v/cluster/tests/create_partitions_test.cc index 676caa1cbdb2..b31e1b84380e 100644 --- a/src/v/cluster/tests/create_partitions_test.cc +++ b/src/v/cluster/tests/create_partitions_test.cc @@ -45,15 +45,17 @@ FIXTURE_TEST(test_creating_partitions, rebalancing_tests_fixture) { auto tp_2_md = topics.local().get_topic_metadata(make_tp_ns("test-2")); // total 6 partitions - BOOST_REQUIRE_EQUAL(tp_1_md->partitions.size(), 6); + BOOST_REQUIRE_EQUAL(tp_1_md->get_assignments().size(), 6); for (auto i = 0; i < 6; ++i) { - BOOST_REQUIRE_EQUAL(tp_1_md->partitions[i].id, model::partition_id(i)); + BOOST_REQUIRE( + tp_1_md->get_assignments().contains(model::partition_id(i))); } // total 9 partitions - BOOST_REQUIRE_EQUAL(tp_2_md->partitions.size(), 9); + BOOST_REQUIRE_EQUAL(tp_2_md->get_assignments().size(), 9); for (auto i = 0; i < 9; ++i) { - BOOST_REQUIRE_EQUAL(tp_2_md->partitions[i].id, model::partition_id(i)); + BOOST_REQUIRE( + tp_2_md->get_assignments().contains(model::partition_id(i))); } } diff --git a/src/v/cluster/tests/metadata_dissemination_test.cc b/src/v/cluster/tests/metadata_dissemination_test.cc index 6f64647bdd50..c3e4e6b911e8 100644 --- a/src/v/cluster/tests/metadata_dissemination_test.cc +++ b/src/v/cluster/tests/metadata_dissemination_test.cc @@ -35,19 +35,22 @@ wait_for_leaders_updates(int id, cluster::metadata_cache& cache) { std::chrono::seconds(10), [&cache, &leaders, id] { leaders.clear(); - auto tp_md = cache.get_topic_metadata(model::topic_namespace( - model::ns("default"), model::topic("test_1"))); + const model::topic_namespace tn( + model::ns("default"), model::topic("test_1")); + auto tp_md = cache.get_topic_metadata(tn); + if (!tp_md) { return false; } - if (tp_md->partitions.size() != 3) { + if (tp_md->get_assignments().size() != 3) { return false; } - for (auto& p_md : tp_md->partitions) { - if (!p_md.leader_node) { + for (auto& p_md : tp_md->get_assignments()) { + auto leader_id = cache.get_leader_id(tn, p_md.id); + if (!leader_id) { return false; } - leaders.push_back(*p_md.leader_node); + leaders.push_back(*leader_id); } return true; }) diff --git a/src/v/cluster/tests/partition_moving_test.cc b/src/v/cluster/tests/partition_moving_test.cc index ef447f1a187d..dad239e50203 100644 --- a/src/v/cluster/tests/partition_moving_test.cc +++ b/src/v/cluster/tests/partition_moving_test.cc @@ -201,12 +201,11 @@ class partition_assignment_test_fixture : public cluster_test_fixture { std::vector get_replicas(int source_node, const model::ntp& ntp) { - auto md = controller(source_node) - ->get_topics_state() - .local() - .get_topic_metadata(model::topic_namespace_view(ntp)); - - return md->partitions.begin()->replicas; + return controller(source_node) + ->get_topics_state() + .local() + .get_partition_assignment(ntp) + ->replicas; } void wait_for_replica_set_partitions( diff --git a/src/v/cluster/tests/rebalancing_tests_fixture.h b/src/v/cluster/tests/rebalancing_tests_fixture.h index 378b9805e4c1..17697095141e 100644 --- a/src/v/cluster/tests/rebalancing_tests_fixture.h +++ b/src/v/cluster/tests/rebalancing_tests_fixture.h @@ -93,7 +93,7 @@ class rebalancing_tests_fixture : public cluster_test_fixture { .local() .get_topic_metadata(model::topic_namespace_view(ntp)); - return md->partitions.begin()->replicas; + return md->get_assignments().begin()->replicas; } void create_topic(cluster::topic_configuration cfg) { @@ -204,10 +204,9 @@ class rebalancing_tests_fixture : public cluster_test_fixture { void populate_all_topics_with_data() { auto md = get_local_cache(model::node_id(0)).all_topics_metadata(); - for (auto& topic_metadata : md) { - for (auto& p : topic_metadata.partitions) { - model::ntp ntp( - topic_metadata.tp_ns.ns, topic_metadata.tp_ns.tp, p.id); + for (auto& [tp_ns, topic_metadata] : md) { + for (auto& p : topic_metadata.get_assignments()) { + model::ntp ntp(tp_ns.ns, tp_ns.tp, p.id); replicate_data(ntp, 10); } } diff --git a/src/v/cluster/tests/replicas_rebalancing_tests.cc b/src/v/cluster/tests/replicas_rebalancing_tests.cc index 3802a92b2352..054bf829d969 100644 --- a/src/v/cluster/tests/replicas_rebalancing_tests.cc +++ b/src/v/cluster/tests/replicas_rebalancing_tests.cc @@ -16,11 +16,11 @@ absl::node_hash_map calculate_replicas_per_node(const cluster::metadata_cache& cache) { absl::node_hash_map ret; - for (auto& tp_md : cache.all_topics_metadata()) { - if (tp_md.tp_ns.ns == model::redpanda_ns) { + for (auto& [tp_ns, tp_md] : cache.all_topics_metadata()) { + if (tp_ns.ns == model::redpanda_ns) { continue; } - for (auto& p_md : tp_md.partitions) { + for (auto& p_md : tp_md.get_assignments()) { for (auto replica : p_md.replicas) { ret[replica.node_id]++; } diff --git a/src/v/cluster/tests/topic_table_test.cc b/src/v/cluster/tests/topic_table_test.cc index 716b2112b83f..afc475beb6f8 100644 --- a/src/v/cluster/tests/topic_table_test.cc +++ b/src/v/cluster/tests/topic_table_test.cc @@ -21,19 +21,17 @@ FIXTURE_TEST(test_happy_path_create, topic_table_fixture) { auto md = table.local().all_topics_metadata(); BOOST_REQUIRE_EQUAL(md.size(), 3); - std::sort( - md.begin(), - md.end(), - [](const model::topic_metadata& a, const model::topic_metadata& b) { - return a.tp_ns.tp < b.tp_ns.tp; - }); - BOOST_REQUIRE_EQUAL(md[0].tp_ns, make_tp_ns("test_tp_1")); - BOOST_REQUIRE_EQUAL(md[1].tp_ns, make_tp_ns("test_tp_2")); - BOOST_REQUIRE_EQUAL(md[2].tp_ns, make_tp_ns("test_tp_3")); - - BOOST_REQUIRE_EQUAL(md[0].partitions.size(), 1); - BOOST_REQUIRE_EQUAL(md[1].partitions.size(), 12); - BOOST_REQUIRE_EQUAL(md[2].partitions.size(), 8); + + BOOST_REQUIRE(md.contains(make_tp_ns("test_tp_1"))); + BOOST_REQUIRE(md.contains(make_tp_ns("test_tp_2"))); + BOOST_REQUIRE(md.contains(make_tp_ns("test_tp_3"))); + + BOOST_REQUIRE_EQUAL( + md.find(make_tp_ns("test_tp_1"))->second.get_assignments().size(), 1); + BOOST_REQUIRE_EQUAL( + md.find(make_tp_ns("test_tp_2"))->second.get_assignments().size(), 12); + BOOST_REQUIRE_EQUAL( + md.find(make_tp_ns("test_tp_3"))->second.get_assignments().size(), 8); // check delta auto d = table.local().wait_for_changes(as).get0(); @@ -60,9 +58,10 @@ FIXTURE_TEST(test_happy_path_delete, topic_table_fixture) { auto md = table.local().all_topics_metadata(); BOOST_REQUIRE_EQUAL(md.size(), 1); - BOOST_REQUIRE_EQUAL(md[0].tp_ns, make_tp_ns("test_tp_1")); + BOOST_REQUIRE(md.contains(make_tp_ns("test_tp_1"))); - BOOST_REQUIRE_EQUAL(md[0].partitions.size(), 1); + BOOST_REQUIRE_EQUAL( + md.find(make_tp_ns("test_tp_1"))->second.get_assignments().size(), 1); // check delta auto d = table.local().wait_for_changes(as).get0(); @@ -156,7 +155,7 @@ FIXTURE_TEST(test_adding_partition, topic_table_fixture) { auto md = table.local().get_topic_metadata(make_tp_ns("test_tp_2")); - BOOST_REQUIRE_EQUAL(md->partitions.size(), 15); + BOOST_REQUIRE_EQUAL(md->get_assignments().size(), 15); // check delta auto d = table.local().wait_for_changes(as).get0(); // require 3 partition additions diff --git a/src/v/cluster/tests/topic_updates_dispatcher_test.cc b/src/v/cluster/tests/topic_updates_dispatcher_test.cc index 0deed42d4b3f..f5879276b67d 100644 --- a/src/v/cluster/tests/topic_updates_dispatcher_test.cc +++ b/src/v/cluster/tests/topic_updates_dispatcher_test.cc @@ -84,20 +84,17 @@ FIXTURE_TEST( auto md = table.local().all_topics_metadata(); BOOST_REQUIRE_EQUAL(md.size(), 3); - std::sort( - md.begin(), - md.end(), - [](const model::topic_metadata& a, const model::topic_metadata& b) { - return a.tp_ns.tp < b.tp_ns.tp; - }); - BOOST_REQUIRE_EQUAL(md[0].tp_ns, make_tp_ns("test_tp_1")); - BOOST_REQUIRE_EQUAL(md[1].tp_ns, make_tp_ns("test_tp_2")); - BOOST_REQUIRE_EQUAL(md[2].tp_ns, make_tp_ns("test_tp_3")); - BOOST_REQUIRE_EQUAL(md[0].partitions.size(), 1); - BOOST_REQUIRE_EQUAL(md[1].partitions.size(), 12); - BOOST_REQUIRE_EQUAL(md[2].partitions.size(), 8); + BOOST_REQUIRE_EQUAL(md.contains(make_tp_ns("test_tp_1")), true); + BOOST_REQUIRE_EQUAL(md.contains(make_tp_ns("test_tp_2")), true); + BOOST_REQUIRE_EQUAL(md.contains(make_tp_ns("test_tp_3")), true); + BOOST_REQUIRE_EQUAL( + md.find(make_tp_ns("test_tp_1"))->second.get_assignments().size(), 1); + BOOST_REQUIRE_EQUAL( + md.find(make_tp_ns("test_tp_2"))->second.get_assignments().size(), 12); + BOOST_REQUIRE_EQUAL( + md.find(make_tp_ns("test_tp_3"))->second.get_assignments().size(), 8); // Initial capacity // (cpus * max_allocations_per_core) - core0_extra_weight; // node 1, 8 cores @@ -133,8 +130,10 @@ FIXTURE_TEST( auto md = table.local().all_topics_metadata(); BOOST_REQUIRE_EQUAL(md.size(), 1); - BOOST_REQUIRE_EQUAL(md[0].tp_ns, make_tp_ns("test_tp_1")); - BOOST_REQUIRE_EQUAL(md[0].partitions.size(), 1); + + BOOST_REQUIRE_EQUAL(md.contains(make_tp_ns("test_tp_1")), true); + BOOST_REQUIRE_EQUAL( + md.find(make_tp_ns("test_tp_1"))->second.get_assignments().size(), 1); BOOST_REQUIRE_EQUAL( current_cluster_capacity(allocator.local().state().allocation_nodes()), diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index a42c19fc3593..eafc4ff5a382 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -22,9 +22,9 @@ namespace cluster { template -std::vector> +std::vector> topic_table::transform_topics(Func&& f) const { - std::vector> ret; + std::vector> ret; ret.reserve(_topics.size()); std::transform( std::cbegin(_topics), @@ -32,46 +32,11 @@ topic_table::transform_topics(Func&& f) const { std::back_inserter(ret), [f = std::forward(f)]( const std::pair& p) { - return f(p.second.configuration); + return f(p.second); }); return ret; } -topic_table::topic_metadata::topic_metadata( - topic_configuration_assignment c, model::revision_id rid) noexcept - : configuration(std::move(c)) - , _source_topic(std::nullopt) - , _revision(rid) {} - -topic_table::topic_metadata::topic_metadata( - topic_configuration_assignment c, - model::revision_id rid, - model::topic st) noexcept - : configuration(std::move(c)) - , _source_topic(st) - , _revision(rid) {} - -bool topic_table::topic_metadata::is_topic_replicable() const { - return _source_topic.has_value() == false; -} - -model::revision_id topic_table::topic_metadata::get_revision() const { - vassert( - is_topic_replicable(), "Query for revision_id on a non-replicable topic"); - return _revision; -} - -const model::topic& topic_table::topic_metadata::get_source_topic() const { - vassert( - !is_topic_replicable(), "Query for source_topic on a replicable topic"); - return _source_topic.value(); -} - -const topic_configuration_assignment& -topic_table::topic_metadata::get_configuration() const { - return configuration; -} - ss::future topic_table::apply(create_topic_cmd cmd, model::offset offset) { if (_topics.contains(cmd.key)) { @@ -128,11 +93,11 @@ topic_table::apply(delete_topic_cmd cmd, model::offset offset) { } } - for (auto& p : tp->second.configuration.assignments) { - auto ntp = model::ntp(cmd.key.ns, cmd.key.tp, p.id); + for (auto& p_as : tp->second.get_assignments()) { + auto ntp = model::ntp(cmd.key.ns, cmd.key.tp, p_as.id); _update_in_progress.erase(ntp); _pending_deltas.emplace_back( - std::move(ntp), std::move(p), offset, delete_type); + std::move(ntp), p_as, offset, delete_type); } _topics.erase(tp); notify_waiters(); @@ -148,14 +113,14 @@ topic_table::apply(create_partition_cmd cmd, model::offset offset) { } // add partitions - auto prev_partition_count = tp->second.configuration.cfg.partition_count; + auto prev_partition_count = tp->second.get_configuration().partition_count; // update partitions count - tp->second.configuration.cfg.partition_count + tp->second.get_configuration().partition_count = cmd.value.cfg.new_total_partition_count; // add assignments of newly created partitions for (auto& p_as : cmd.value.assignments) { p_as.id += model::partition_id(prev_partition_count); - tp->second.configuration.assignments.push_back(p_as); + tp->second.get_assignments().emplace(p_as); // propagate deltas auto ntp = model::ntp(cmd.key.ns, cmd.key.tp, p_as.id); _pending_deltas.emplace_back( @@ -176,19 +141,11 @@ topic_table::apply(move_partition_replicas_cmd cmd, model::offset o) { return ss::make_ready_future( errc::topic_operation_error); } - auto find_partition = []( - std::vector& assignments, - model::partition_id p_id) { - return std::find_if( - assignments.begin(), - assignments.end(), - [p_id](const partition_assignment& p_as) { return p_id == p_as.id; }); - }; - auto current_assignment_it = find_partition( - tp->second.configuration.assignments, cmd.key.tp.partition); + auto current_assignment_it = tp->second.get_assignments().find( + cmd.key.tp.partition); - if (current_assignment_it == tp->second.configuration.assignments.end()) { + if (current_assignment_it == tp->second.get_assignments().end()) { return ss::make_ready_future( errc::partition_not_exists); } @@ -228,11 +185,10 @@ topic_table::apply(move_partition_replicas_cmd cmd, model::offset o) { sfound != _topics.end(), "Non replicable topic must exist: {}", cs); - auto assignment_it = find_partition( - sfound->second.configuration.assignments, + auto assignment_it = sfound->second.get_assignments().find( current_assignment_it->id); vassert( - assignment_it != sfound->second.configuration.assignments.end(), + assignment_it != sfound->second.get_assignments().end(), "Non replicable partition doesn't exist: {}-{}", cs, current_assignment_it->id); @@ -268,14 +224,10 @@ topic_table::apply(finish_moving_partition_replicas_cmd cmd, model::offset o) { } // calculate deleta for backend - auto current_assignment_it = std::find_if( - tp->second.configuration.assignments.begin(), - tp->second.configuration.assignments.end(), - [p_id = cmd.key.tp.partition](partition_assignment& p_as) { - return p_id == p_as.id; - }); + auto current_assignment_it = tp->second.get_assignments().find( + cmd.key.tp.partition); - if (current_assignment_it == tp->second.configuration.assignments.end()) { + if (current_assignment_it == tp->second.get_assignments().end()) { return ss::make_ready_future( errc::partition_not_exists); } @@ -394,7 +346,7 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) { if (tp == _topics.end() || !tp->second.is_topic_replicable()) { co_return make_error_code(errc::topic_not_exists); } - auto& properties = tp->second.configuration.cfg.properties; + auto& properties = tp->second.get_configuration().properties; auto& overrides = cmd.value; /** * Update topic properties @@ -414,8 +366,8 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) { // generate deltas for controller backend std::vector deltas; - deltas.reserve(tp->second.configuration.assignments.size()); - for (const auto& p_as : tp->second.configuration.assignments) { + deltas.reserve(tp->second.get_assignments().size()); + for (const auto& p_as : tp->second.get_assignments()) { deltas.emplace_back( model::ntp(cmd.key.ns, cmd.key.tp, p_as.id), p_as, @@ -445,7 +397,7 @@ topic_table::apply(create_non_replicable_topic_cmd cmd, model::offset o) { vassert( tp->second.is_topic_replicable(), "Source topic must be replicable"); - for (const auto& pas : tp->second.configuration.assignments) { + for (const auto& pas : tp->second.get_assignments()) { _pending_deltas.emplace_back( model::ntp(new_non_rep_topic.ns, new_non_rep_topic.tp, pas.id), pas, @@ -453,8 +405,9 @@ topic_table::apply(create_non_replicable_topic_cmd cmd, model::offset o) { delta::op_type::add_non_replicable); } - auto ca = tp->second.configuration; - ca.cfg.tp_ns = new_non_rep_topic; + auto cfg = tp->second.get_configuration(); + auto p_as = tp->second.get_assignments(); + cfg.tp_ns = new_non_rep_topic; auto [itr, success] = _topics_hierarchy.try_emplace( source, @@ -469,7 +422,8 @@ topic_table::apply(create_non_replicable_topic_cmd cmd, model::offset o) { } _topics.insert( {new_non_rep_topic, - topic_metadata(std::move(ca), model::revision_id(o()), source.tp)}); + topic_metadata( + std::move(cfg), std::move(p_as), model::revision_id(o()), source.tp)}); notify_waiters(); co_return make_error_code(errc::success); } @@ -538,13 +492,13 @@ topic_table::wait_for_changes(ss::abort_source& as) { std::vector topic_table::all_topics() const { return transform_topics( - [](const topic_configuration_assignment& td) { return td.cfg.tp_ns; }); + [](const topic_metadata& tp) { return tp.get_configuration().tp_ns; }); } -std::optional +std::optional topic_table::get_topic_metadata(model::topic_namespace_view tp) const { if (auto it = _topics.find(tp); it != _topics.end()) { - return it->second.configuration.get_metadata(); + return it->second; } return {}; } @@ -552,41 +506,35 @@ topic_table::get_topic_metadata(model::topic_namespace_view tp) const { std::optional topic_table::get_topic_cfg(model::topic_namespace_view tp) const { if (auto it = _topics.find(tp); it != _topics.end()) { - return it->second.configuration.cfg; + return it->second.get_configuration(); } return {}; } -std::optional> +std::optional topic_table::get_topic_assignments(model::topic_namespace_view tp) const { if (auto it = _topics.find(tp); it != _topics.end()) { - return it->second.configuration.assignments; + return it->second.get_assignments(); } - return {}; + return std::nullopt; } std::optional topic_table::get_topic_timestamp_type(model::topic_namespace_view tp) const { if (auto it = _topics.find(tp); it != _topics.end()) { - return it->second.configuration.cfg.properties.timestamp_type; + return it->second.get_configuration().properties.timestamp_type; } return {}; } -std::vector topic_table::all_topics_metadata() const { - return transform_topics([](const topic_configuration_assignment& td) { - return td.get_metadata(); - }); +const topic_table::underlying_t& topic_table::all_topics_metadata() const { + return _topics; } bool topic_table::contains( model::topic_namespace_view topic, model::partition_id pid) const { if (auto it = _topics.find(topic); it != _topics.end()) { - const auto& partitions = it->second.configuration.assignments; - return std::any_of( - partitions.cbegin(), - partitions.cend(), - [&pid](const partition_assignment& pas) { return pas.id == pid; }); + return it->second.get_assignments().contains(pid); } return false; } @@ -598,14 +546,9 @@ topic_table::get_partition_assignment(const model::ntp& ntp) const { return {}; } - auto p_it = std::find_if( - it->second.configuration.assignments.cbegin(), - it->second.configuration.assignments.cend(), - [&ntp](const partition_assignment& pas) { - return pas.id == ntp.tp.partition; - }); + auto p_it = it->second.get_assignments().find(ntp.tp.partition); - if (p_it == it->second.configuration.assignments.cend()) { + if (p_it == it->second.get_assignments().cend()) { return {}; } diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index 4bc8143c06c2..f1fe4edbd203 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -38,26 +38,6 @@ class topic_table { public: using delta = topic_table_delta; - class topic_metadata { - public: - topic_metadata( - topic_configuration_assignment, model::revision_id) noexcept; - topic_metadata( - topic_configuration_assignment, - model::revision_id, - model::topic) noexcept; - - bool is_topic_replicable() const; - model::revision_id get_revision() const; - const model::topic& get_source_topic() const; - const topic_configuration_assignment& get_configuration() const; - - private: - friend class topic_table; - topic_configuration_assignment configuration; - std::optional _source_topic; - model::revision_id _revision; - }; using underlying_t = absl::flat_hash_map< model::topic_namespace, topic_metadata, @@ -135,7 +115,7 @@ class topic_table { ///\brief Returns metadata of single topic. /// /// If topic does not exists it returns an empty optional - std::optional + std::optional get_topic_metadata(model::topic_namespace_view) const; ///\brief Returns configuration of single topic. @@ -147,7 +127,7 @@ class topic_table { ///\brief Returns partition assignments of single topic. /// /// If topic does not exists it returns an empty optional - std::optional> + std::optional get_topic_assignments(model::topic_namespace_view) const; ///\brief Returns topics timestamp type @@ -157,10 +137,14 @@ class topic_table { get_topic_timestamp_type(model::topic_namespace_view) const; /// Returns metadata of all topics. - std::vector all_topics_metadata() const; + const underlying_t& all_topics_metadata() const; /// Checks if it has given partition bool contains(model::topic_namespace_view, model::partition_id) const; + /// Checks if it has given topic + bool contains(model::topic_namespace_view tp) const { + return _topics.contains(tp); + } std::optional get_partition_assignment(const model::ntp&) const; @@ -196,7 +180,7 @@ class topic_table { void notify_waiters(); template - std::vector> + std::vector> transform_topics(Func&&) const; underlying_t _topics; diff --git a/src/v/cluster/topic_updates_dispatcher.cc b/src/v/cluster/topic_updates_dispatcher.cc index 278b427e3a1c..a889b3419a38 100644 --- a/src/v/cluster/topic_updates_dispatcher.cc +++ b/src/v/cluster/topic_updates_dispatcher.cc @@ -40,7 +40,7 @@ topic_updates_dispatcher::apply_update(model::record_batch b) { auto tp_md = _topic_table.local().get_topic_metadata( del_cmd.value); return dispatch_updates_to_cores(del_cmd, base_offset) - .then([this, tp_md](std::error_code ec) { + .then([this, tp_md = std::move(tp_md)](std::error_code ec) { if (ec == errc::success) { vassert( tp_md.has_value(), @@ -81,27 +81,23 @@ topic_updates_dispatcher::apply_update(model::record_batch b) { auto tp_md = _topic_table.local().get_topic_metadata( model::topic_namespace_view(cmd.key)); return dispatch_updates_to_cores(cmd, base_offset) - .then([this, tp_md, cmd](std::error_code ec) { - if (!ec) { - vassert( - tp_md.has_value(), - "Topic had to exist before successful partition " - "reallocation"); - auto it = std::find_if( - std::cbegin(tp_md->partitions), - std::cend(tp_md->partitions), - [p_id = cmd.key.tp.partition]( - const model::partition_metadata& pmd) { - return pmd.id == p_id; - }); - vassert( - it != tp_md->partitions.cend(), - "Reassigned partition must exist"); - - reallocate_partition(it->replicas, cmd.value); - } - return ec; - }); + .then( + [this, tp_md = std::move(tp_md), cmd](std::error_code ec) { + if (!ec) { + vassert( + tp_md.has_value(), + "Topic had to exist before successful partition " + "reallocation"); + auto it = tp_md->get_assignments().find( + cmd.key.tp.partition); + vassert( + it != tp_md->get_assignments().cend(), + "Reassigned partition must exist"); + + reallocate_partition(it->replicas, cmd.value); + } + return ec; + }); }, [this, base_offset](finish_moving_partition_replicas_cmd cmd) { return dispatch_updates_to_cores(std::move(cmd), base_offset); @@ -127,7 +123,13 @@ topic_updates_dispatcher::apply_update(model::record_batch b) { if (ec == errc::success) { vassert( assignments.has_value(), "null topic_metadata"); - update_allocations(*assignments); + std::vector p_as; + p_as.reserve(assignments->size()); + std::move( + assignments->begin(), + assignments->end(), + std::back_inserter(p_as)); + update_allocations(std::move(p_as)); } return ec; }); @@ -197,10 +199,9 @@ topic_updates_dispatcher::dispatch_updates_to_cores(Cmd cmd, model::offset o) { }); } -void topic_updates_dispatcher::deallocate_topic( - const model::topic_metadata& tp_md) { +void topic_updates_dispatcher::deallocate_topic(const topic_metadata& tp_md) { // we have to deallocate topics - for (auto& p : tp_md.partitions) { + for (auto& p : tp_md.get_assignments()) { _partition_allocator.local().deallocate(p.replicas); } } diff --git a/src/v/cluster/topic_updates_dispatcher.h b/src/v/cluster/topic_updates_dispatcher.h index ad31d279c1ac..37ae754a9427 100644 --- a/src/v/cluster/topic_updates_dispatcher.h +++ b/src/v/cluster/topic_updates_dispatcher.h @@ -77,7 +77,7 @@ class topic_updates_dispatcher { ss::future<> update_leaders_with_estimates(std::vector leaders); void update_allocations(std::vector); - void deallocate_topic(const model::topic_metadata&); + void deallocate_topic(const topic_metadata&); void reallocate_partition( const std::vector&, const std::vector&); diff --git a/src/v/cluster/tx_gateway_frontend.cc b/src/v/cluster/tx_gateway_frontend.cc index 39f3648565de..262a70352a46 100644 --- a/src/v/cluster/tx_gateway_frontend.cc +++ b/src/v/cluster/tx_gateway_frontend.cc @@ -162,8 +162,7 @@ ss::future> tx_gateway_frontend::get_tx_broker() { std::nullopt); } - auto md = _metadata_cache.local().get_topic_metadata( - model::tx_manager_nt); + auto md = _metadata_cache.local().contains(model::tx_manager_nt); if (!md) { return ss::make_ready_future>( std::nullopt); diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 8325e6388e86..7fa288877756 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -285,6 +285,64 @@ bool config_status::operator==(const config_status& rhs) const { rhs.node, rhs.version, rhs.restart, rhs.unknown, rhs.invalid); } +namespace { +cluster::assignments_set to_assignments_map( + std::vector assignment_vector) { + cluster::assignments_set ret; + for (auto& p_as : assignment_vector) { + ret.emplace(std::move(p_as)); + } + return ret; +} +} // namespace + +topic_metadata::topic_metadata( + topic_configuration_assignment c, model::revision_id rid) noexcept + : _configuration(std::move(c.cfg)) + , _assignments(to_assignments_map(std::move(c.assignments))) + , _source_topic(std::nullopt) + , _revision(rid) {} + +topic_metadata::topic_metadata( + topic_configuration cfg, + assignments_set assignments, + model::revision_id rid, + model::topic st) noexcept + : _configuration(std::move(cfg)) + , _assignments(std::move(assignments)) + , _source_topic(st) + , _revision(rid) {} + +bool topic_metadata::is_topic_replicable() const { + return _source_topic.has_value() == false; +} + +model::revision_id topic_metadata::get_revision() const { + vassert( + is_topic_replicable(), "Query for revision_id on a non-replicable topic"); + return _revision; +} + +const model::topic& topic_metadata::get_source_topic() const { + vassert( + !is_topic_replicable(), "Query for source_topic on a replicable topic"); + return _source_topic.value(); +} + +const topic_configuration& topic_metadata::get_configuration() const { + return _configuration; +} + +const assignments_set& topic_metadata::get_assignments() const { + return _assignments; +} + +topic_configuration& topic_metadata::get_configuration() { + return _configuration; +} + +assignments_set& topic_metadata::get_assignments() { return _assignments; } + std::ostream& operator<<(std::ostream& o, const non_replicable_topic& d) { fmt::print( o, "{{Source topic: {}, non replicable topic: {}}}", d.source, d.name); diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index 4df0e8e2b643..b05cdda8c033 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -27,6 +27,7 @@ #include "utils/to_string.h" #include "v8_engine/data_policy.h" +#include #include namespace cluster { @@ -926,6 +927,57 @@ struct leader_term { friend std::ostream& operator<<(std::ostream&, const leader_term&); }; +struct partition_assignment_cmp { + using is_transparent = void; + constexpr bool operator()( + const partition_assignment& lhs, const partition_assignment& rhs) const { + return lhs.id < rhs.id; + } + + constexpr bool operator()( + const model::partition_id& id, const partition_assignment& rhs) const { + return id < rhs.id; + } + constexpr bool operator()( + const partition_assignment& lhs, const model::partition_id& id) const { + return lhs.id < id; + } + constexpr bool operator()( + const model::partition_id& lhs, const model::partition_id& rhs) const { + return lhs < rhs; + } +}; + +using assignments_set + = absl::btree_set; + +class topic_metadata { +public: + topic_metadata(topic_configuration_assignment, model::revision_id) noexcept; + + topic_metadata( + topic_configuration, + assignments_set, + model::revision_id, + model::topic) noexcept; + + bool is_topic_replicable() const; + model::revision_id get_revision() const; + const model::topic& get_source_topic() const; + + const topic_configuration& get_configuration() const; + topic_configuration& get_configuration(); + + const assignments_set& get_assignments() const; + assignments_set& get_assignments(); + +private: + topic_configuration _configuration; + assignments_set _assignments; + std::optional _source_topic; + model::revision_id _revision; +}; + } // namespace cluster namespace std { template<> diff --git a/src/v/coproc/reconciliation_backend.cc b/src/v/coproc/reconciliation_backend.cc index 537b84acdbe7..2f91b2560e3a 100644 --- a/src/v/coproc/reconciliation_backend.cc +++ b/src/v/coproc/reconciliation_backend.cc @@ -365,7 +365,7 @@ reconciliation_backend::create_non_replicable_partition( } // Non-replicated topics are not integrated with shadow indexing yet, // so the initial_revision_id is set to invalid value. - auto ntp_cfg = tt_md->second.get_configuration().cfg.make_ntp_config( + auto ntp_cfg = tt_md->second.get_configuration().make_ntp_config( _data_directory, ntp.tp.partition, rev, diff --git a/src/v/kafka/server/coordinator_ntp_mapper.h b/src/v/kafka/server/coordinator_ntp_mapper.h index 9953c563b8e2..0b975809caa3 100644 --- a/src/v/kafka/server/coordinator_ntp_mapper.h +++ b/src/v/kafka/server/coordinator_ntp_mapper.h @@ -50,14 +50,14 @@ class coordinator_ntp_mapper { , _tp_ns(std::move(group_topic)) {} std::optional ntp_for(const kafka::group_id& group) const { - auto md = _md.local().get_topic_metadata(_tp_ns); - if (!md) { + auto cfg = _md.local().get_topic_cfg(_tp_ns); + if (!cfg) { return std::nullopt; } incremental_xxhash64 inc; inc.update(group); auto p = static_cast( - jump_consistent_hash(inc.digest(), md->partitions.size())); + jump_consistent_hash(inc.digest(), cfg->partition_count)); return model::ntp(_tp_ns.ns, _tp_ns.tp, model::partition_id{p}); } diff --git a/src/v/kafka/server/group_metadata_migration.cc b/src/v/kafka/server/group_metadata_migration.cc index a14161b68f81..a44ef5456667 100644 --- a/src/v/kafka/server/group_metadata_migration.cc +++ b/src/v/kafka/server/group_metadata_migration.cc @@ -97,7 +97,7 @@ transform_batch(model::record_batch batch) { ss::future create_consumer_offsets_topic( cluster::controller& controller, - std::vector assignments, + const cluster::assignments_set& assignments, model::timeout_clock::time_point timeout) { vassert( !assignments.empty(), @@ -107,7 +107,7 @@ ss::future create_consumer_offsets_topic( model::kafka_consumer_offsets_nt.ns, model::kafka_consumer_offsets_nt.tp, static_cast(assignments.size()), - static_cast(assignments.front().replicas.size())}); + static_cast(assignments.begin()->replicas.size())}); topic.cfg.properties.cleanup_policy_bitflags = model::cleanup_policy_bitflags::compaction; for (auto& p_as : assignments) { diff --git a/src/v/kafka/server/handlers/create_partitions.cc b/src/v/kafka/server/handlers/create_partitions.cc index 8f63abe89b70..0eefa9875594 100644 --- a/src/v/kafka/server/handlers/create_partitions.cc +++ b/src/v/kafka/server/handlers/create_partitions.cc @@ -155,10 +155,8 @@ ss::future create_partitions_handler::handle( error_code::invalid_topic_exception, "Topic does not exist", [&ctx](const create_partitions_topic& tp) { - return ctx.metadata_cache() - .get_topic_metadata( - model::topic_namespace(model::kafka_namespace, tp.name)) - .has_value(); + return ctx.metadata_cache().contains( + model::topic_namespace(model::kafka_namespace, tp.name)); }); // validate custom assignment diff --git a/src/v/kafka/server/handlers/metadata.cc b/src/v/kafka/server/handlers/metadata.cc index 1ed854627135..1f243688b6dd 100644 --- a/src/v/kafka/server/handlers/metadata.cc +++ b/src/v/kafka/server/handlers/metadata.cc @@ -97,18 +97,19 @@ bool is_internal(const model::topic_namespace& tp_ns) { } // namespace metadata_response::topic make_topic_response_from_topic_metadata( - const cluster::metadata_cache& md_cache, model::topic_metadata&& tp_md) { + const cluster::metadata_cache& md_cache, cluster::topic_metadata&& tp_md) { metadata_response::topic tp; tp.error_code = error_code::none; - auto tp_ns = tp_md.tp_ns; - tp.name = std::move(tp_md.tp_ns.tp); + auto tp_ns = tp_md.get_configuration().tp_ns; + tp.name = std::move(tp_md.get_configuration().tp_ns.tp); tp.is_internal = is_internal(tp_ns); std::transform( - tp_md.partitions.begin(), - tp_md.partitions.end(), + tp_md.get_assignments().begin(), + tp_md.get_assignments().end(), std::back_inserter(tp.partitions), - [tp_ns = std::move(tp_ns), &md_cache](model::partition_metadata& p_md) { + [tp_ns = std::move(tp_ns), + &md_cache](cluster::partition_assignment& p_md) { std::vector replicas{}; replicas.reserve(p_md.replicas.size()); std::transform( @@ -189,14 +190,14 @@ make_error_topic_response(model::topic tp, error_code ec) { } static metadata_response::topic make_topic_response( - request_context& ctx, metadata_request& rq, model::topic_metadata md) { + request_context& ctx, metadata_request& rq, cluster::topic_metadata md) { int32_t auth_operations = 0; /** * if requested include topic authorized operations */ if (rq.data.include_topic_authorized_operations) { auth_operations = details::to_bit_field( - details::authorized_operations(ctx, md.tp_ns.tp)); + details::authorized_operations(ctx, md.get_configuration().tp_ns.tp)); } auto res = make_topic_response_from_topic_metadata( @@ -211,33 +212,26 @@ get_topic_metadata(request_context& ctx, metadata_request& request) { // request can be served from whatever happens to be in the cache if (request.list_all_topics) { - auto topics = ctx.metadata_cache().all_topics_metadata( - cluster::metadata_cache::with_leaders::no); - // only serve topics from the kafka namespace - std::erase_if(topics, [](model::topic_metadata& t_md) { - return t_md.tp_ns.ns != model::kafka_namespace; - }); - - auto unauthorized_it = std::partition( - topics.begin(), - topics.end(), - [&ctx](const model::topic_metadata& t_md) { - /* - * quiet authz failures. this isn't checking for a specifically - * requested topic, but rather checking visibility of all topics. - */ - return ctx.authorized( - security::acl_operation::describe, - t_md.tp_ns.tp, - authz_quiet{true}); - }); - std::transform( - topics.begin(), - unauthorized_it, - std::back_inserter(res), - [&ctx, &request](model::topic_metadata& t_md) { - return make_topic_response(ctx, request, std::move(t_md)); - }); + auto& topics_md = ctx.metadata_cache().all_topics_metadata(); + + for (const auto& [tp_ns, md] : topics_md) { + // only serve topics from the kafka namespace + if (tp_ns.ns != model::kafka_namespace) { + continue; + } + /* + * quiet authz failures. this isn't checking for a specifically + * requested topic, but rather checking visibility of all topics. + */ + if (!ctx.authorized( + security::acl_operation::describe, + tp_ns.tp, + authz_quiet{true})) { + continue; + } + res.push_back(make_topic_response(ctx, request, md)); + } + return ss::make_ready_future>( std::move(res)); } @@ -255,8 +249,7 @@ get_topic_metadata(request_context& ctx, metadata_request& request) { continue; } if (auto md = ctx.metadata_cache().get_topic_metadata( - model::topic_namespace_view(model::kafka_namespace, topic.name), - cluster::metadata_cache::with_leaders::no); + model::topic_namespace_view(model::kafka_namespace, topic.name)); md) { auto src_topic_response = make_topic_response( ctx, request, std::move(*md)); diff --git a/src/v/kafka/server/handlers/offset_commit.cc b/src/v/kafka/server/handlers/offset_commit.cc index 731c4e61a0e7..81c31d44b2c5 100644 --- a/src/v/kafka/server/handlers/offset_commit.cc +++ b/src/v/kafka/server/handlers/offset_commit.cc @@ -125,21 +125,16 @@ offset_commit_handler::handle(request_context ctx, ss::smp_service_group ssg) { const auto topic_name = model::topic(it->name); model::topic_namespace_view tn(model::kafka_namespace, topic_name); - if (const auto& md = octx.rctx.metadata_cache().get_topic_metadata(tn); - md) { + if (octx.rctx.metadata_cache().contains(tn)) { /* * check if each partition exists */ auto split = std::partition( it->partitions.begin(), it->partitions.end(), - [&md](const offset_commit_request_partition& p) { - return std::any_of( - md->partitions.cbegin(), - md->partitions.cend(), - [&p](const model::partition_metadata& pmd) { - return pmd.id == p.partition_index; - }); + [&octx, &tn](const offset_commit_request_partition& p) { + return octx.rctx.metadata_cache().contains( + tn, p.partition_index); }); /* * build responses for nonexistent topic partitions diff --git a/src/v/kafka/server/handlers/topics/topic_utils.cc b/src/v/kafka/server/handlers/topics/topic_utils.cc index 7a5362dd4362..708408bdea54 100644 --- a/src/v/kafka/server/handlers/topics/topic_utils.cc +++ b/src/v/kafka/server/handlers/topics/topic_utils.cc @@ -48,7 +48,7 @@ ss::future> wait_for_leaders( continue; } // for each partition ask for leader - for (auto& pmd : md->partitions) { + for (auto& pmd : md->get_assignments()) { futures.push_back(md_cache.get_leader( model::ntp(r.tp_ns.ns, r.tp_ns.tp, pmd.id), timeout)); } diff --git a/src/v/kafka/server/handlers/txn_offset_commit.cc b/src/v/kafka/server/handlers/txn_offset_commit.cc index d2f7fe586485..04c2649291cc 100644 --- a/src/v/kafka/server/handlers/txn_offset_commit.cc +++ b/src/v/kafka/server/handlers/txn_offset_commit.cc @@ -109,21 +109,16 @@ ss::future txn_offset_commit_handler::handle( const auto topic_name = model::topic(it->name); model::topic_namespace_view tn(model::kafka_namespace, topic_name); - if (const auto& md = octx.rctx.metadata_cache().get_topic_metadata(tn); - md) { + if (octx.rctx.metadata_cache().contains(tn)) { /* * check if each partition exists */ auto split = std::partition( it->partitions.begin(), it->partitions.end(), - [&md](const txn_offset_commit_request_partition& p) { - return std::any_of( - md->partitions.cbegin(), - md->partitions.cend(), - [&p](const model::partition_metadata& pmd) { - return pmd.id == p.partition_index; - }); + [&octx, &tn](const txn_offset_commit_request_partition& p) { + return octx.rctx.metadata_cache().contains( + tn, p.partition_index); }); /* * build responses for nonexistent topic partitions diff --git a/src/v/redpanda/tests/fixture.h b/src/v/redpanda/tests/fixture.h index 533b7cf6a79e..d3a11869828d 100644 --- a/src/v/redpanda/tests/fixture.h +++ b/src/v/redpanda/tests/fixture.h @@ -284,9 +284,10 @@ class redpanda_thread_fixture { r.tp_ns); return md && std::all_of( - md->partitions.begin(), - md->partitions.end(), - [this, &r](const model::partition_metadata& p) { + md->get_assignments().begin(), + md->get_assignments().end(), + [this, + &r](const cluster::partition_assignment& p) { return app.shard_table.local().shard_for( model::ntp(r.tp_ns.ns, r.tp_ns.tp, p.id)); }); From d32572cb0912e3d658001e07230c84b8df12da7e Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Thu, 5 May 2022 13:29:41 +0200 Subject: [PATCH 2/2] c/topic_table: introduced method returning reference to topic metadata Introduced method returning reference to topic metadata. The reference returning method allow us to avoid copying metadata object when it has to be accessed by synchronous parts of code. Important: Remember not to use the reference when its lifetime has to span across multiple scheduling point. Reference returning method is provided not to copy metadata object when used by synchronous parts of code Signed-off-by: Michal Maslanka --- src/v/cluster/controller_api.cc | 12 ++++++------ src/v/cluster/metadata_cache.cc | 5 +++++ src/v/cluster/metadata_cache.h | 11 +++++++++++ src/v/cluster/topic_table.cc | 7 +++++++ src/v/cluster/topic_table.h | 11 +++++++++++ src/v/kafka/server/handlers/topics/topic_utils.cc | 4 ++-- 6 files changed, 42 insertions(+), 8 deletions(-) diff --git a/src/v/cluster/controller_api.cc b/src/v/cluster/controller_api.cc index 83a0b5e0bfc3..70952ba13159 100644 --- a/src/v/cluster/controller_api.cc +++ b/src/v/cluster/controller_api.cc @@ -63,16 +63,16 @@ bool has_node_local_replicas( ss::future>> controller_api::get_reconciliation_state(model::topic_namespace_view tp_ns) { using ret_t = result>; - auto metadata = _topics.local().get_topic_metadata(tp_ns); + auto metadata = _topics.local().get_topic_metadata_ref(tp_ns); if (!metadata) { co_return ret_t(errc::topic_not_exists); } std::vector ntps; - ntps.reserve(metadata->get_assignments().size()); + ntps.reserve(metadata->get().get_assignments().size()); std::transform( - metadata->get_assignments().cbegin(), - metadata->get_assignments().cend(), + metadata->get().get_assignments().cbegin(), + metadata->get().get_assignments().cend(), std::back_inserter(ntps), [tp_ns](const partition_assignment& p_as) { return model::ntp(tp_ns.ns, tp_ns.tp, p_as.id); @@ -211,7 +211,7 @@ controller_api::get_reconciliation_state( // high level APIs ss::future controller_api::wait_for_topic( model::topic_namespace_view tp_ns, model::timeout_clock::time_point timeout) { - auto metadata = _topics.local().get_topic_metadata(tp_ns); + auto metadata = _topics.local().get_topic_metadata_ref(tp_ns); if (!metadata) { vlog(clusterlog.trace, "topic {} does not exists", tp_ns); co_return make_error_code(errc::topic_not_exists); @@ -219,7 +219,7 @@ ss::future controller_api::wait_for_topic( absl::node_hash_map> requests; // collect ntps per node - for (const auto& p_as : metadata->get_assignments()) { + for (const auto& p_as : metadata->get().get_assignments()) { for (const auto& bs : p_as.replicas) { requests[bs.node_id].emplace_back(tp_ns.ns, tp_ns.tp, p_as.id); } diff --git a/src/v/cluster/metadata_cache.cc b/src/v/cluster/metadata_cache.cc index 18b9ca39e2e1..7a39ca4038ee 100644 --- a/src/v/cluster/metadata_cache.cc +++ b/src/v/cluster/metadata_cache.cc @@ -68,6 +68,11 @@ metadata_cache::get_topic_metadata(model::topic_namespace_view tp) const { return _topics_state.local().get_topic_metadata(tp); } +std::optional> +metadata_cache::get_topic_metadata_ref(model::topic_namespace_view tn) const { + return _topics_state.local().get_topic_metadata_ref(tn); +} + std::optional metadata_cache::get_model_topic_metadata( model::topic_namespace_view tp, metadata_cache::with_leaders leaders) const { auto md = _topics_state.local().get_topic_metadata(tp); diff --git a/src/v/cluster/metadata_cache.h b/src/v/cluster/metadata_cache.h index 3875ff9a5e0f..d25a7d2b31c3 100644 --- a/src/v/cluster/metadata_cache.h +++ b/src/v/cluster/metadata_cache.h @@ -81,6 +81,17 @@ class metadata_cache { std::optional get_topic_metadata(model::topic_namespace_view) const; + ///\brief Returns reference to metadata of single topic. + /// + /// If topic does not exists it returns an empty optional + /// + /// IMPORTANT: remember not to use the reference when its lifetime has to + /// span across multiple scheduling point. Reference returning method is + /// provided not to copy metadata object when used by synchronous parts of + /// code + std::optional> + get_topic_metadata_ref(model::topic_namespace_view) const; + ///\brief Returns configuration of single topic. /// /// If topic does not exists it returns an empty optional diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index eafc4ff5a382..83ad4bde6714 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -502,6 +502,13 @@ topic_table::get_topic_metadata(model::topic_namespace_view tp) const { } return {}; } +std::optional> +topic_table::get_topic_metadata_ref(model::topic_namespace_view tp) const { + if (auto it = _topics.find(tp); it != _topics.end()) { + return it->second; + } + return {}; +} std::optional topic_table::get_topic_cfg(model::topic_namespace_view tp) const { diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index f1fe4edbd203..4cc63f87c2e3 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -118,6 +118,17 @@ class topic_table { std::optional get_topic_metadata(model::topic_namespace_view) const; + ///\brief Returns reference to metadata of single topic. + /// + /// If topic does not exists it returns an empty optional + /// + /// IMPORTANT: remember not to use the reference when its lifetime has to + /// span across multiple scheduling point. Reference returning method is + /// provided not to copy metadata object when used by synchronous parts of + /// code + std::optional> + get_topic_metadata_ref(model::topic_namespace_view) const; + ///\brief Returns configuration of single topic. /// /// If topic does not exists it returns an empty optional diff --git a/src/v/kafka/server/handlers/topics/topic_utils.cc b/src/v/kafka/server/handlers/topics/topic_utils.cc index 708408bdea54..2e651668596a 100644 --- a/src/v/kafka/server/handlers/topics/topic_utils.cc +++ b/src/v/kafka/server/handlers/topics/topic_utils.cc @@ -42,13 +42,13 @@ ss::future> wait_for_leaders( continue; } // collect partitions - auto md = md_cache.get_topic_metadata(r.tp_ns); + auto md = md_cache.get_topic_metadata_ref(r.tp_ns); if (!md) { // topic already deleted continue; } // for each partition ask for leader - for (auto& pmd : md->get_assignments()) { + for (auto& pmd : md->get().get_assignments()) { futures.push_back(md_cache.get_leader( model::ntp(r.tp_ns.ns, r.tp_ns.tp, pmd.id), timeout)); }