diff --git a/src/v/cluster/controller_api.cc b/src/v/cluster/controller_api.cc index 0d7d839802d50..70952ba131591 100644 --- a/src/v/cluster/controller_api.cc +++ b/src/v/cluster/controller_api.cc @@ -63,19 +63,19 @@ bool has_node_local_replicas( ss::future>> controller_api::get_reconciliation_state(model::topic_namespace_view tp_ns) { using ret_t = result>; - auto metadata = _topics.local().get_topic_metadata(tp_ns); + auto metadata = _topics.local().get_topic_metadata_ref(tp_ns); if (!metadata) { co_return ret_t(errc::topic_not_exists); } std::vector ntps; - ntps.reserve(metadata->partitions.size()); + ntps.reserve(metadata->get().get_assignments().size()); std::transform( - metadata->partitions.cbegin(), - metadata->partitions.cend(), + metadata->get().get_assignments().cbegin(), + metadata->get().get_assignments().cend(), std::back_inserter(ntps), - [tp_ns](const model::partition_metadata& p_md) { - return model::ntp(tp_ns.ns, tp_ns.tp, p_md.id); + [tp_ns](const partition_assignment& p_as) { + return model::ntp(tp_ns.ns, tp_ns.tp, p_as.id); }); co_return co_await get_reconciliation_state(std::move(ntps)); @@ -211,7 +211,7 @@ controller_api::get_reconciliation_state( // high level APIs ss::future controller_api::wait_for_topic( model::topic_namespace_view tp_ns, model::timeout_clock::time_point timeout) { - auto metadata = _topics.local().get_topic_metadata(tp_ns); + auto metadata = _topics.local().get_topic_metadata_ref(tp_ns); if (!metadata) { vlog(clusterlog.trace, "topic {} does not exists", tp_ns); co_return make_error_code(errc::topic_not_exists); @@ -219,9 +219,9 @@ ss::future controller_api::wait_for_topic( absl::node_hash_map> requests; // collect ntps per node - for (const auto& p_md : metadata->partitions) { - for (const auto& bs : p_md.replicas) { - requests[bs.node_id].emplace_back(tp_ns.ns, tp_ns.tp, p_md.id); + for (const auto& p_as : metadata->get().get_assignments()) { + for (const auto& bs : p_as.replicas) { + requests[bs.node_id].emplace_back(tp_ns.ns, tp_ns.tp, p_as.id); } } bool ready = false; diff --git a/src/v/cluster/health_manager.cc b/src/v/cluster/health_manager.cc index 8856744433192..118abf4159178 100644 --- a/src/v/cluster/health_manager.cc +++ b/src/v/cluster/health_manager.cc @@ -160,7 +160,7 @@ health_manager::ensure_topic_replication(model::topic_namespace_view topic) { co_return true; } - for (const auto& partition : metadata->partitions) { + for (const auto& partition : metadata->get_assignments()) { model::ntp ntp(topic.ns, topic.tp, partition.id); if (!co_await ensure_partition_replication(std::move(ntp))) { co_return false; diff --git a/src/v/cluster/members_backend.cc b/src/v/cluster/members_backend.cc index d3e3467e13a65..903a34687840c 100644 --- a/src/v/cluster/members_backend.cc +++ b/src/v/cluster/members_backend.cc @@ -167,7 +167,7 @@ void members_backend::calculate_reallocations(update_meta& meta) { if (!cfg.is_topic_replicable()) { continue; } - for (auto& pas : cfg.get_configuration().assignments) { + for (const auto& pas : cfg.get_assignments()) { if (is_in_replica_set(pas.replicas, meta.update.id)) { partition_reallocation reallocation( model::ntp(tp_ns.ns, tp_ns.tp, pas.id), @@ -284,7 +284,7 @@ void members_backend::calculate_reallocations_after_node_added( meta.update.offset); continue; } - for (auto& p : metadata.get_configuration().assignments) { + for (const auto& p : metadata.get_assignments()) { std::erase_if(to_move_from_node, [](const replicas_to_move& v) { return v.left_to_move == 0; }); diff --git a/src/v/cluster/metadata_cache.cc b/src/v/cluster/metadata_cache.cc index 547b1fc5de9c5..7a39ca4038ee7 100644 --- a/src/v/cluster/metadata_cache.cc +++ b/src/v/cluster/metadata_cache.cc @@ -9,6 +9,7 @@ #include "cluster/metadata_cache.h" +#include "cluster/fwd.h" #include "cluster/health_monitor_frontend.h" #include "cluster/members_table.h" #include "cluster/partition_leaders_table.h" @@ -62,17 +63,34 @@ metadata_cache::get_source_topic(model::topic_namespace_view tp) const { return mt->second.get_source_topic(); } -std::optional metadata_cache::get_topic_metadata( +std::optional +metadata_cache::get_topic_metadata(model::topic_namespace_view tp) const { + return _topics_state.local().get_topic_metadata(tp); +} + +std::optional> +metadata_cache::get_topic_metadata_ref(model::topic_namespace_view tn) const { + return _topics_state.local().get_topic_metadata_ref(tn); +} + +std::optional metadata_cache::get_model_topic_metadata( model::topic_namespace_view tp, metadata_cache::with_leaders leaders) const { auto md = _topics_state.local().get_topic_metadata(tp); if (!md) { - return md; + return std::nullopt; + } + + model::topic_metadata metadata(md->get_configuration().tp_ns); + metadata.partitions.reserve(md->get_assignments().size()); + for (const auto& p_as : md->get_assignments()) { + metadata.partitions.push_back(p_as.create_partition_metadata()); } + if (leaders) { - fill_partition_leaders(_leaders.local(), *md); + fill_partition_leaders(_leaders.local(), metadata); } - return md; + return metadata; } std::optional metadata_cache::get_topic_cfg(model::topic_namespace_view tp) const { @@ -84,15 +102,8 @@ metadata_cache::get_topic_timestamp_type(model::topic_namespace_view tp) const { return _topics_state.local().get_topic_timestamp_type(tp); } -std::vector metadata_cache::all_topics_metadata( - metadata_cache::with_leaders leaders) const { - auto all_md = _topics_state.local().all_topics_metadata(); - if (leaders) { - for (auto& md : all_md) { - fill_partition_leaders(_leaders.local(), md); - } - } - return all_md; +const topic_table::underlying_t& metadata_cache::all_topics_metadata() const { + return _topics_state.local().all_topics_metadata(); } std::optional metadata_cache::get_broker(model::node_id nid) const { @@ -148,6 +159,10 @@ bool metadata_cache::contains( return _topics_state.local().contains(tp, pid); } +bool metadata_cache::contains(model::topic_namespace_view tp) const { + return _topics_state.local().contains(tp); +} + ss::future metadata_cache::get_leader( const model::ntp& ntp, ss::lowres_clock::time_point tout, diff --git a/src/v/cluster/metadata_cache.h b/src/v/cluster/metadata_cache.h index a3f4100feb3de..6f1dd6430aa14 100644 --- a/src/v/cluster/metadata_cache.h +++ b/src/v/cluster/metadata_cache.h @@ -14,6 +14,7 @@ #include "cluster/fwd.h" #include "cluster/health_monitor_types.h" #include "cluster/partition_leaders_table.h" +#include "cluster/topic_table.h" #include "cluster/types.h" #include "model/fundamental.h" #include "model/metadata.h" @@ -26,6 +27,8 @@ #include +#include + namespace cluster { /// Metadata cache provides all informationrequired to fill Kafka metadata @@ -72,8 +75,16 @@ class metadata_cache { ///\brief Returns metadata of single topic. /// /// If topic does not exists it returns an empty optional - std::optional get_topic_metadata( + std::optional get_model_topic_metadata( model::topic_namespace_view, with_leaders = with_leaders::yes) const; + ///\brief Returns metadata of single topic. + /// + /// If topic does not exists it returns an empty optional + std::optional + get_topic_metadata(model::topic_namespace_view) const; + + std::optional> + get_topic_metadata_ref(model::topic_namespace_view) const; ///\brief Returns configuration of single topic. /// @@ -87,9 +98,7 @@ class metadata_cache { std::optional get_topic_timestamp_type(model::topic_namespace_view) const; - /// Returns metadata of all topics. - std::vector - all_topics_metadata(with_leaders = with_leaders::yes) const; + const topic_table::underlying_t& all_topics_metadata() const; /// Returns all brokers, returns copy as the content of broker can change std::vector all_brokers() const; @@ -105,6 +114,7 @@ class metadata_cache { std::optional get_broker(model::node_id) const; bool contains(model::topic_namespace_view, model::partition_id) const; + bool contains(model::topic_namespace_view) const; bool contains(const model::ntp& ntp) const { return contains(model::topic_namespace_view(ntp), ntp.tp.partition); diff --git a/src/v/cluster/metrics_reporter.cc b/src/v/cluster/metrics_reporter.cc index d2bc043cd41cb..57d6b1cf2f1d2 100644 --- a/src/v/cluster/metrics_reporter.cc +++ b/src/v/cluster/metrics_reporter.cc @@ -219,7 +219,7 @@ metrics_reporter::build_metrics_snapshot() { } snapshot.topic_count++; - snapshot.partition_count += md.get_configuration().cfg.partition_count; + snapshot.partition_count += md.get_configuration().partition_count; } snapshot.nodes.reserve(metrics_map.size()); diff --git a/src/v/cluster/scheduling/leader_balancer.cc b/src/v/cluster/scheduling/leader_balancer.cc index 33a2e7938adfe..bd44de4d58240 100644 --- a/src/v/cluster/scheduling/leader_balancer.cc +++ b/src/v/cluster/scheduling/leader_balancer.cc @@ -314,8 +314,7 @@ ss::future leader_balancer::balance() { if (!topic.second.is_topic_replicable()) { continue; } - for (const auto& partition : - topic.second.get_configuration().assignments) { + for (const auto& partition : topic.second.get_assignments()) { if (partition.group == transfer->group) { ntp = model::ntp(topic.first.ns, topic.first.tp, partition.id); break; @@ -438,8 +437,7 @@ leader_balancer::find_leader_shard(const model::ntp& ntp) { // If the inital query was for a non_replicable ntp, the get_leader() call // would have returned its source topic - for (const auto& partition : - config_it->second.get_configuration().assignments) { + for (const auto& partition : config_it->second.get_assignments()) { if (partition.id != ntp.tp.partition) { continue; } @@ -474,8 +472,7 @@ leader_balancer::index_type leader_balancer::build_index() { if (!topic.second.is_topic_replicable()) { continue; } - for (const auto& partition : - topic.second.get_configuration().assignments) { + for (const auto& partition : topic.second.get_assignments()) { if (partition.replicas.empty()) { vlog( clusterlog.warn, diff --git a/src/v/cluster/service.cc b/src/v/cluster/service.cc index 48fc0a99c628d..c18db7431c5a0 100644 --- a/src/v/cluster/service.cc +++ b/src/v/cluster/service.cc @@ -148,7 +148,7 @@ service::fetch_metadata_and_cfg(const std::vector& res) { md.reserve(res.size()); for (const auto& r : res) { if (r.ec == errc::success) { - auto topic_md = _md_cache.local().get_topic_metadata(r.tp_ns); + auto topic_md = _md_cache.local().get_model_topic_metadata(r.tp_ns); auto topic_cfg = _md_cache.local().get_topic_cfg(r.tp_ns); if (topic_md && topic_cfg) { md.push_back(std::move(topic_md.value())); diff --git a/src/v/cluster/tests/autocreate_test.cc b/src/v/cluster/tests/autocreate_test.cc index acd51af963673..c200a9047ca21 100644 --- a/src/v/cluster/tests/autocreate_test.cc +++ b/src/v/cluster/tests/autocreate_test.cc @@ -39,7 +39,8 @@ void validate_topic_metadata(cluster::metadata_cache& cache) { for (auto& t_cfg : expected_topics) { auto tp_md = cache.get_topic_metadata(t_cfg.tp_ns); BOOST_REQUIRE_EQUAL(tp_md.has_value(), true); - BOOST_REQUIRE_EQUAL(tp_md->partitions.size(), t_cfg.partition_count); + BOOST_REQUIRE_EQUAL( + tp_md->get_assignments().size(), t_cfg.partition_count); auto cfg = cache.get_topic_cfg(t_cfg.tp_ns); BOOST_REQUIRE_EQUAL(cfg->tp_ns, t_cfg.tp_ns); BOOST_REQUIRE_EQUAL(cfg->partition_count, t_cfg.partition_count); @@ -54,11 +55,14 @@ void validate_topic_metadata(cluster::metadata_cache& cache) { void wait_for_leaders( cluster::partition_leaders_table& leader_table, - const model::topic_metadata md) { + const cluster::topic_metadata md) { std::vector> f; - f.reserve(md.partitions.size()); - for (const auto& p : md.partitions) { - model::ntp ntp(md.tp_ns.ns, md.tp_ns.tp, p.id); + f.reserve(md.get_assignments().size()); + for (const auto& p : md.get_assignments()) { + model::ntp ntp( + md.get_configuration().tp_ns.ns, + md.get_configuration().tp_ns.tp, + p.id); f.push_back(leader_table .wait_for_leader( @@ -109,7 +113,7 @@ FIXTURE_TEST(create_single_topic_test_at_current_broker, cluster_test_fixture) { BOOST_REQUIRE_EQUAL(md.has_value(), true); wait_for_leaders(app->controller->get_partition_leaders().local(), *md); - BOOST_REQUIRE_EQUAL(md.value().tp_ns, r.tp_ns); + BOOST_REQUIRE_EQUAL(md->get_configuration().tp_ns, r.tp_ns); } } @@ -151,7 +155,7 @@ FIXTURE_TEST(test_autocreate_on_non_leader, cluster_test_fixture) { BOOST_REQUIRE_EQUAL(md.has_value(), true); wait_for_leaders( app_0->controller->get_partition_leaders().local(), *md); - BOOST_REQUIRE_EQUAL(md.value().tp_ns, r.tp_ns); + BOOST_REQUIRE_EQUAL(md->get_configuration().tp_ns, r.tp_ns); } // Make sure caches are the same validate_topic_metadata(get_local_cache(n_1)); diff --git a/src/v/cluster/tests/create_partitions_test.cc b/src/v/cluster/tests/create_partitions_test.cc index 676caa1cbdb28..b31e1b84380ec 100644 --- a/src/v/cluster/tests/create_partitions_test.cc +++ b/src/v/cluster/tests/create_partitions_test.cc @@ -45,15 +45,17 @@ FIXTURE_TEST(test_creating_partitions, rebalancing_tests_fixture) { auto tp_2_md = topics.local().get_topic_metadata(make_tp_ns("test-2")); // total 6 partitions - BOOST_REQUIRE_EQUAL(tp_1_md->partitions.size(), 6); + BOOST_REQUIRE_EQUAL(tp_1_md->get_assignments().size(), 6); for (auto i = 0; i < 6; ++i) { - BOOST_REQUIRE_EQUAL(tp_1_md->partitions[i].id, model::partition_id(i)); + BOOST_REQUIRE( + tp_1_md->get_assignments().contains(model::partition_id(i))); } // total 9 partitions - BOOST_REQUIRE_EQUAL(tp_2_md->partitions.size(), 9); + BOOST_REQUIRE_EQUAL(tp_2_md->get_assignments().size(), 9); for (auto i = 0; i < 9; ++i) { - BOOST_REQUIRE_EQUAL(tp_2_md->partitions[i].id, model::partition_id(i)); + BOOST_REQUIRE( + tp_2_md->get_assignments().contains(model::partition_id(i))); } } diff --git a/src/v/cluster/tests/metadata_dissemination_test.cc b/src/v/cluster/tests/metadata_dissemination_test.cc index 6f64647bdd509..c3e4e6b911e8c 100644 --- a/src/v/cluster/tests/metadata_dissemination_test.cc +++ b/src/v/cluster/tests/metadata_dissemination_test.cc @@ -35,19 +35,22 @@ wait_for_leaders_updates(int id, cluster::metadata_cache& cache) { std::chrono::seconds(10), [&cache, &leaders, id] { leaders.clear(); - auto tp_md = cache.get_topic_metadata(model::topic_namespace( - model::ns("default"), model::topic("test_1"))); + const model::topic_namespace tn( + model::ns("default"), model::topic("test_1")); + auto tp_md = cache.get_topic_metadata(tn); + if (!tp_md) { return false; } - if (tp_md->partitions.size() != 3) { + if (tp_md->get_assignments().size() != 3) { return false; } - for (auto& p_md : tp_md->partitions) { - if (!p_md.leader_node) { + for (auto& p_md : tp_md->get_assignments()) { + auto leader_id = cache.get_leader_id(tn, p_md.id); + if (!leader_id) { return false; } - leaders.push_back(*p_md.leader_node); + leaders.push_back(*leader_id); } return true; }) diff --git a/src/v/cluster/tests/partition_moving_test.cc b/src/v/cluster/tests/partition_moving_test.cc index ef447f1a187d5..dad239e502032 100644 --- a/src/v/cluster/tests/partition_moving_test.cc +++ b/src/v/cluster/tests/partition_moving_test.cc @@ -201,12 +201,11 @@ class partition_assignment_test_fixture : public cluster_test_fixture { std::vector get_replicas(int source_node, const model::ntp& ntp) { - auto md = controller(source_node) - ->get_topics_state() - .local() - .get_topic_metadata(model::topic_namespace_view(ntp)); - - return md->partitions.begin()->replicas; + return controller(source_node) + ->get_topics_state() + .local() + .get_partition_assignment(ntp) + ->replicas; } void wait_for_replica_set_partitions( diff --git a/src/v/cluster/tests/rebalancing_tests_fixture.h b/src/v/cluster/tests/rebalancing_tests_fixture.h index 378b9805e4c15..17697095141e8 100644 --- a/src/v/cluster/tests/rebalancing_tests_fixture.h +++ b/src/v/cluster/tests/rebalancing_tests_fixture.h @@ -93,7 +93,7 @@ class rebalancing_tests_fixture : public cluster_test_fixture { .local() .get_topic_metadata(model::topic_namespace_view(ntp)); - return md->partitions.begin()->replicas; + return md->get_assignments().begin()->replicas; } void create_topic(cluster::topic_configuration cfg) { @@ -204,10 +204,9 @@ class rebalancing_tests_fixture : public cluster_test_fixture { void populate_all_topics_with_data() { auto md = get_local_cache(model::node_id(0)).all_topics_metadata(); - for (auto& topic_metadata : md) { - for (auto& p : topic_metadata.partitions) { - model::ntp ntp( - topic_metadata.tp_ns.ns, topic_metadata.tp_ns.tp, p.id); + for (auto& [tp_ns, topic_metadata] : md) { + for (auto& p : topic_metadata.get_assignments()) { + model::ntp ntp(tp_ns.ns, tp_ns.tp, p.id); replicate_data(ntp, 10); } } diff --git a/src/v/cluster/tests/replicas_rebalancing_tests.cc b/src/v/cluster/tests/replicas_rebalancing_tests.cc index 3802a92b2352f..054bf829d969a 100644 --- a/src/v/cluster/tests/replicas_rebalancing_tests.cc +++ b/src/v/cluster/tests/replicas_rebalancing_tests.cc @@ -16,11 +16,11 @@ absl::node_hash_map calculate_replicas_per_node(const cluster::metadata_cache& cache) { absl::node_hash_map ret; - for (auto& tp_md : cache.all_topics_metadata()) { - if (tp_md.tp_ns.ns == model::redpanda_ns) { + for (auto& [tp_ns, tp_md] : cache.all_topics_metadata()) { + if (tp_ns.ns == model::redpanda_ns) { continue; } - for (auto& p_md : tp_md.partitions) { + for (auto& p_md : tp_md.get_assignments()) { for (auto replica : p_md.replicas) { ret[replica.node_id]++; } diff --git a/src/v/cluster/tests/topic_table_test.cc b/src/v/cluster/tests/topic_table_test.cc index 716b2112b83ff..afc475beb6f84 100644 --- a/src/v/cluster/tests/topic_table_test.cc +++ b/src/v/cluster/tests/topic_table_test.cc @@ -21,19 +21,17 @@ FIXTURE_TEST(test_happy_path_create, topic_table_fixture) { auto md = table.local().all_topics_metadata(); BOOST_REQUIRE_EQUAL(md.size(), 3); - std::sort( - md.begin(), - md.end(), - [](const model::topic_metadata& a, const model::topic_metadata& b) { - return a.tp_ns.tp < b.tp_ns.tp; - }); - BOOST_REQUIRE_EQUAL(md[0].tp_ns, make_tp_ns("test_tp_1")); - BOOST_REQUIRE_EQUAL(md[1].tp_ns, make_tp_ns("test_tp_2")); - BOOST_REQUIRE_EQUAL(md[2].tp_ns, make_tp_ns("test_tp_3")); - - BOOST_REQUIRE_EQUAL(md[0].partitions.size(), 1); - BOOST_REQUIRE_EQUAL(md[1].partitions.size(), 12); - BOOST_REQUIRE_EQUAL(md[2].partitions.size(), 8); + + BOOST_REQUIRE(md.contains(make_tp_ns("test_tp_1"))); + BOOST_REQUIRE(md.contains(make_tp_ns("test_tp_2"))); + BOOST_REQUIRE(md.contains(make_tp_ns("test_tp_3"))); + + BOOST_REQUIRE_EQUAL( + md.find(make_tp_ns("test_tp_1"))->second.get_assignments().size(), 1); + BOOST_REQUIRE_EQUAL( + md.find(make_tp_ns("test_tp_2"))->second.get_assignments().size(), 12); + BOOST_REQUIRE_EQUAL( + md.find(make_tp_ns("test_tp_3"))->second.get_assignments().size(), 8); // check delta auto d = table.local().wait_for_changes(as).get0(); @@ -60,9 +58,10 @@ FIXTURE_TEST(test_happy_path_delete, topic_table_fixture) { auto md = table.local().all_topics_metadata(); BOOST_REQUIRE_EQUAL(md.size(), 1); - BOOST_REQUIRE_EQUAL(md[0].tp_ns, make_tp_ns("test_tp_1")); + BOOST_REQUIRE(md.contains(make_tp_ns("test_tp_1"))); - BOOST_REQUIRE_EQUAL(md[0].partitions.size(), 1); + BOOST_REQUIRE_EQUAL( + md.find(make_tp_ns("test_tp_1"))->second.get_assignments().size(), 1); // check delta auto d = table.local().wait_for_changes(as).get0(); @@ -156,7 +155,7 @@ FIXTURE_TEST(test_adding_partition, topic_table_fixture) { auto md = table.local().get_topic_metadata(make_tp_ns("test_tp_2")); - BOOST_REQUIRE_EQUAL(md->partitions.size(), 15); + BOOST_REQUIRE_EQUAL(md->get_assignments().size(), 15); // check delta auto d = table.local().wait_for_changes(as).get0(); // require 3 partition additions diff --git a/src/v/cluster/tests/topic_updates_dispatcher_test.cc b/src/v/cluster/tests/topic_updates_dispatcher_test.cc index 0deed42d4b3fb..f5879276b67da 100644 --- a/src/v/cluster/tests/topic_updates_dispatcher_test.cc +++ b/src/v/cluster/tests/topic_updates_dispatcher_test.cc @@ -84,20 +84,17 @@ FIXTURE_TEST( auto md = table.local().all_topics_metadata(); BOOST_REQUIRE_EQUAL(md.size(), 3); - std::sort( - md.begin(), - md.end(), - [](const model::topic_metadata& a, const model::topic_metadata& b) { - return a.tp_ns.tp < b.tp_ns.tp; - }); - BOOST_REQUIRE_EQUAL(md[0].tp_ns, make_tp_ns("test_tp_1")); - BOOST_REQUIRE_EQUAL(md[1].tp_ns, make_tp_ns("test_tp_2")); - BOOST_REQUIRE_EQUAL(md[2].tp_ns, make_tp_ns("test_tp_3")); - BOOST_REQUIRE_EQUAL(md[0].partitions.size(), 1); - BOOST_REQUIRE_EQUAL(md[1].partitions.size(), 12); - BOOST_REQUIRE_EQUAL(md[2].partitions.size(), 8); + BOOST_REQUIRE_EQUAL(md.contains(make_tp_ns("test_tp_1")), true); + BOOST_REQUIRE_EQUAL(md.contains(make_tp_ns("test_tp_2")), true); + BOOST_REQUIRE_EQUAL(md.contains(make_tp_ns("test_tp_3")), true); + BOOST_REQUIRE_EQUAL( + md.find(make_tp_ns("test_tp_1"))->second.get_assignments().size(), 1); + BOOST_REQUIRE_EQUAL( + md.find(make_tp_ns("test_tp_2"))->second.get_assignments().size(), 12); + BOOST_REQUIRE_EQUAL( + md.find(make_tp_ns("test_tp_3"))->second.get_assignments().size(), 8); // Initial capacity // (cpus * max_allocations_per_core) - core0_extra_weight; // node 1, 8 cores @@ -133,8 +130,10 @@ FIXTURE_TEST( auto md = table.local().all_topics_metadata(); BOOST_REQUIRE_EQUAL(md.size(), 1); - BOOST_REQUIRE_EQUAL(md[0].tp_ns, make_tp_ns("test_tp_1")); - BOOST_REQUIRE_EQUAL(md[0].partitions.size(), 1); + + BOOST_REQUIRE_EQUAL(md.contains(make_tp_ns("test_tp_1")), true); + BOOST_REQUIRE_EQUAL( + md.find(make_tp_ns("test_tp_1"))->second.get_assignments().size(), 1); BOOST_REQUIRE_EQUAL( current_cluster_capacity(allocator.local().state().allocation_nodes()), diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index a42c19fc3593e..3757db08de20c 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -22,9 +22,9 @@ namespace cluster { template -std::vector> +std::vector> topic_table::transform_topics(Func&& f) const { - std::vector> ret; + std::vector> ret; ret.reserve(_topics.size()); std::transform( std::cbegin(_topics), @@ -32,46 +32,11 @@ topic_table::transform_topics(Func&& f) const { std::back_inserter(ret), [f = std::forward(f)]( const std::pair& p) { - return f(p.second.configuration); + return f(p.second); }); return ret; } -topic_table::topic_metadata::topic_metadata( - topic_configuration_assignment c, model::revision_id rid) noexcept - : configuration(std::move(c)) - , _source_topic(std::nullopt) - , _revision(rid) {} - -topic_table::topic_metadata::topic_metadata( - topic_configuration_assignment c, - model::revision_id rid, - model::topic st) noexcept - : configuration(std::move(c)) - , _source_topic(st) - , _revision(rid) {} - -bool topic_table::topic_metadata::is_topic_replicable() const { - return _source_topic.has_value() == false; -} - -model::revision_id topic_table::topic_metadata::get_revision() const { - vassert( - is_topic_replicable(), "Query for revision_id on a non-replicable topic"); - return _revision; -} - -const model::topic& topic_table::topic_metadata::get_source_topic() const { - vassert( - !is_topic_replicable(), "Query for source_topic on a replicable topic"); - return _source_topic.value(); -} - -const topic_configuration_assignment& -topic_table::topic_metadata::get_configuration() const { - return configuration; -} - ss::future topic_table::apply(create_topic_cmd cmd, model::offset offset) { if (_topics.contains(cmd.key)) { @@ -128,11 +93,11 @@ topic_table::apply(delete_topic_cmd cmd, model::offset offset) { } } - for (auto& p : tp->second.configuration.assignments) { - auto ntp = model::ntp(cmd.key.ns, cmd.key.tp, p.id); + for (auto& p_as : tp->second.get_assignments()) { + auto ntp = model::ntp(cmd.key.ns, cmd.key.tp, p_as.id); _update_in_progress.erase(ntp); _pending_deltas.emplace_back( - std::move(ntp), std::move(p), offset, delete_type); + std::move(ntp), p_as, offset, delete_type); } _topics.erase(tp); notify_waiters(); @@ -148,14 +113,14 @@ topic_table::apply(create_partition_cmd cmd, model::offset offset) { } // add partitions - auto prev_partition_count = tp->second.configuration.cfg.partition_count; + auto prev_partition_count = tp->second.get_configuration().partition_count; // update partitions count - tp->second.configuration.cfg.partition_count + tp->second.get_configuration().partition_count = cmd.value.cfg.new_total_partition_count; // add assignments of newly created partitions for (auto& p_as : cmd.value.assignments) { p_as.id += model::partition_id(prev_partition_count); - tp->second.configuration.assignments.push_back(p_as); + tp->second.get_assignments().emplace(p_as); // propagate deltas auto ntp = model::ntp(cmd.key.ns, cmd.key.tp, p_as.id); _pending_deltas.emplace_back( @@ -176,19 +141,11 @@ topic_table::apply(move_partition_replicas_cmd cmd, model::offset o) { return ss::make_ready_future( errc::topic_operation_error); } - auto find_partition = []( - std::vector& assignments, - model::partition_id p_id) { - return std::find_if( - assignments.begin(), - assignments.end(), - [p_id](const partition_assignment& p_as) { return p_id == p_as.id; }); - }; - auto current_assignment_it = find_partition( - tp->second.configuration.assignments, cmd.key.tp.partition); + auto current_assignment_it = tp->second.get_assignments().find( + cmd.key.tp.partition); - if (current_assignment_it == tp->second.configuration.assignments.end()) { + if (current_assignment_it == tp->second.get_assignments().end()) { return ss::make_ready_future( errc::partition_not_exists); } @@ -228,11 +185,10 @@ topic_table::apply(move_partition_replicas_cmd cmd, model::offset o) { sfound != _topics.end(), "Non replicable topic must exist: {}", cs); - auto assignment_it = find_partition( - sfound->second.configuration.assignments, + auto assignment_it = sfound->second.get_assignments().find( current_assignment_it->id); vassert( - assignment_it != sfound->second.configuration.assignments.end(), + assignment_it != sfound->second.get_assignments().end(), "Non replicable partition doesn't exist: {}-{}", cs, current_assignment_it->id); @@ -268,14 +224,10 @@ topic_table::apply(finish_moving_partition_replicas_cmd cmd, model::offset o) { } // calculate deleta for backend - auto current_assignment_it = std::find_if( - tp->second.configuration.assignments.begin(), - tp->second.configuration.assignments.end(), - [p_id = cmd.key.tp.partition](partition_assignment& p_as) { - return p_id == p_as.id; - }); + auto current_assignment_it = tp->second.get_assignments().find( + cmd.key.tp.partition); - if (current_assignment_it == tp->second.configuration.assignments.end()) { + if (current_assignment_it == tp->second.get_assignments().end()) { return ss::make_ready_future( errc::partition_not_exists); } @@ -394,7 +346,7 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) { if (tp == _topics.end() || !tp->second.is_topic_replicable()) { co_return make_error_code(errc::topic_not_exists); } - auto& properties = tp->second.configuration.cfg.properties; + auto& properties = tp->second.get_configuration().properties; auto& overrides = cmd.value; /** * Update topic properties @@ -414,8 +366,8 @@ topic_table::apply(update_topic_properties_cmd cmd, model::offset o) { // generate deltas for controller backend std::vector deltas; - deltas.reserve(tp->second.configuration.assignments.size()); - for (const auto& p_as : tp->second.configuration.assignments) { + deltas.reserve(tp->second.get_assignments().size()); + for (const auto& p_as : tp->second.get_assignments()) { deltas.emplace_back( model::ntp(cmd.key.ns, cmd.key.tp, p_as.id), p_as, @@ -445,7 +397,7 @@ topic_table::apply(create_non_replicable_topic_cmd cmd, model::offset o) { vassert( tp->second.is_topic_replicable(), "Source topic must be replicable"); - for (const auto& pas : tp->second.configuration.assignments) { + for (const auto& pas : tp->second.get_assignments()) { _pending_deltas.emplace_back( model::ntp(new_non_rep_topic.ns, new_non_rep_topic.tp, pas.id), pas, @@ -453,8 +405,9 @@ topic_table::apply(create_non_replicable_topic_cmd cmd, model::offset o) { delta::op_type::add_non_replicable); } - auto ca = tp->second.configuration; - ca.cfg.tp_ns = new_non_rep_topic; + auto cfg = tp->second.get_configuration(); + auto p_as = tp->second.get_assignments(); + cfg.tp_ns = new_non_rep_topic; auto [itr, success] = _topics_hierarchy.try_emplace( source, @@ -469,7 +422,8 @@ topic_table::apply(create_non_replicable_topic_cmd cmd, model::offset o) { } _topics.insert( {new_non_rep_topic, - topic_metadata(std::move(ca), model::revision_id(o()), source.tp)}); + topic_metadata( + std::move(cfg), std::move(p_as), model::revision_id(o()), source.tp)}); notify_waiters(); co_return make_error_code(errc::success); } @@ -538,13 +492,21 @@ topic_table::wait_for_changes(ss::abort_source& as) { std::vector topic_table::all_topics() const { return transform_topics( - [](const topic_configuration_assignment& td) { return td.cfg.tp_ns; }); + [](const topic_metadata& tp) { return tp.get_configuration().tp_ns; }); } -std::optional +// TODO: copy +std::optional topic_table::get_topic_metadata(model::topic_namespace_view tp) const { if (auto it = _topics.find(tp); it != _topics.end()) { - return it->second.configuration.get_metadata(); + return it->second; + } + return {}; +} +std::optional> +topic_table::get_topic_metadata_ref(model::topic_namespace_view tp) const { + if (auto it = _topics.find(tp); it != _topics.end()) { + return it->second; } return {}; } @@ -552,41 +514,35 @@ topic_table::get_topic_metadata(model::topic_namespace_view tp) const { std::optional topic_table::get_topic_cfg(model::topic_namespace_view tp) const { if (auto it = _topics.find(tp); it != _topics.end()) { - return it->second.configuration.cfg; + return it->second.get_configuration(); } return {}; } -std::optional> +std::optional topic_table::get_topic_assignments(model::topic_namespace_view tp) const { if (auto it = _topics.find(tp); it != _topics.end()) { - return it->second.configuration.assignments; + return it->second.get_assignments(); } - return {}; + return std::nullopt; } std::optional topic_table::get_topic_timestamp_type(model::topic_namespace_view tp) const { if (auto it = _topics.find(tp); it != _topics.end()) { - return it->second.configuration.cfg.properties.timestamp_type; + return it->second.get_configuration().properties.timestamp_type; } return {}; } -std::vector topic_table::all_topics_metadata() const { - return transform_topics([](const topic_configuration_assignment& td) { - return td.get_metadata(); - }); +const topic_table::underlying_t& topic_table::all_topics_metadata() const { + return _topics; } bool topic_table::contains( model::topic_namespace_view topic, model::partition_id pid) const { if (auto it = _topics.find(topic); it != _topics.end()) { - const auto& partitions = it->second.configuration.assignments; - return std::any_of( - partitions.cbegin(), - partitions.cend(), - [&pid](const partition_assignment& pas) { return pas.id == pid; }); + return it->second.get_assignments().contains(pid); } return false; } @@ -598,14 +554,9 @@ topic_table::get_partition_assignment(const model::ntp& ntp) const { return {}; } - auto p_it = std::find_if( - it->second.configuration.assignments.cbegin(), - it->second.configuration.assignments.cend(), - [&ntp](const partition_assignment& pas) { - return pas.id == ntp.tp.partition; - }); + auto p_it = it->second.get_assignments().find(ntp.tp.partition); - if (p_it == it->second.configuration.assignments.cend()) { + if (p_it == it->second.get_assignments().cend()) { return {}; } diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index 4bc8143c06c2d..f0ea6dcb3c955 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -22,6 +22,8 @@ #include #include +#include + namespace cluster { /// Topic table represent all topics configuration and partition assignments. @@ -38,26 +40,6 @@ class topic_table { public: using delta = topic_table_delta; - class topic_metadata { - public: - topic_metadata( - topic_configuration_assignment, model::revision_id) noexcept; - topic_metadata( - topic_configuration_assignment, - model::revision_id, - model::topic) noexcept; - - bool is_topic_replicable() const; - model::revision_id get_revision() const; - const model::topic& get_source_topic() const; - const topic_configuration_assignment& get_configuration() const; - - private: - friend class topic_table; - topic_configuration_assignment configuration; - std::optional _source_topic; - model::revision_id _revision; - }; using underlying_t = absl::flat_hash_map< model::topic_namespace, topic_metadata, @@ -135,9 +117,12 @@ class topic_table { ///\brief Returns metadata of single topic. /// /// If topic does not exists it returns an empty optional - std::optional + std::optional get_topic_metadata(model::topic_namespace_view) const; + std::optional> + get_topic_metadata_ref(model::topic_namespace_view) const; + ///\brief Returns configuration of single topic. /// /// If topic does not exists it returns an empty optional @@ -147,7 +132,7 @@ class topic_table { ///\brief Returns partition assignments of single topic. /// /// If topic does not exists it returns an empty optional - std::optional> + std::optional get_topic_assignments(model::topic_namespace_view) const; ///\brief Returns topics timestamp type @@ -157,10 +142,14 @@ class topic_table { get_topic_timestamp_type(model::topic_namespace_view) const; /// Returns metadata of all topics. - std::vector all_topics_metadata() const; + const underlying_t& all_topics_metadata() const; /// Checks if it has given partition bool contains(model::topic_namespace_view, model::partition_id) const; + /// Checks if it has given topic + bool contains(model::topic_namespace_view tp) const { + return _topics.contains(tp); + } std::optional get_partition_assignment(const model::ntp&) const; @@ -196,7 +185,7 @@ class topic_table { void notify_waiters(); template - std::vector> + std::vector> transform_topics(Func&&) const; underlying_t _topics; diff --git a/src/v/cluster/topic_updates_dispatcher.cc b/src/v/cluster/topic_updates_dispatcher.cc index 278b427e3a1ce..8c3eacf561183 100644 --- a/src/v/cluster/topic_updates_dispatcher.cc +++ b/src/v/cluster/topic_updates_dispatcher.cc @@ -10,6 +10,7 @@ #include "cluster/topic_updates_dispatcher.h" #include "cluster/commands.h" +#include "cluster/fwd.h" #include "cluster/partition_leaders_table.h" #include "model/metadata.h" #include "raft/types.h" @@ -40,7 +41,7 @@ topic_updates_dispatcher::apply_update(model::record_batch b) { auto tp_md = _topic_table.local().get_topic_metadata( del_cmd.value); return dispatch_updates_to_cores(del_cmd, base_offset) - .then([this, tp_md](std::error_code ec) { + .then([this, tp_md = std::move(tp_md)](std::error_code ec) { if (ec == errc::success) { vassert( tp_md.has_value(), @@ -81,27 +82,23 @@ topic_updates_dispatcher::apply_update(model::record_batch b) { auto tp_md = _topic_table.local().get_topic_metadata( model::topic_namespace_view(cmd.key)); return dispatch_updates_to_cores(cmd, base_offset) - .then([this, tp_md, cmd](std::error_code ec) { - if (!ec) { - vassert( - tp_md.has_value(), - "Topic had to exist before successful partition " - "reallocation"); - auto it = std::find_if( - std::cbegin(tp_md->partitions), - std::cend(tp_md->partitions), - [p_id = cmd.key.tp.partition]( - const model::partition_metadata& pmd) { - return pmd.id == p_id; - }); - vassert( - it != tp_md->partitions.cend(), - "Reassigned partition must exist"); - - reallocate_partition(it->replicas, cmd.value); - } - return ec; - }); + .then( + [this, tp_md = std::move(tp_md), cmd](std::error_code ec) { + if (!ec) { + vassert( + tp_md.has_value(), + "Topic had to exist before successful partition " + "reallocation"); + auto it = tp_md->get_assignments().find( + cmd.key.tp.partition); + vassert( + it != tp_md->get_assignments().cend(), + "Reassigned partition must exist"); + + reallocate_partition(it->replicas, cmd.value); + } + return ec; + }); }, [this, base_offset](finish_moving_partition_replicas_cmd cmd) { return dispatch_updates_to_cores(std::move(cmd), base_offset); @@ -127,7 +124,13 @@ topic_updates_dispatcher::apply_update(model::record_batch b) { if (ec == errc::success) { vassert( assignments.has_value(), "null topic_metadata"); - update_allocations(*assignments); + std::vector p_as; + p_as.reserve(assignments->size()); + std::move( + assignments->begin(), + assignments->end(), + std::back_inserter(p_as)); + update_allocations(std::move(p_as)); } return ec; }); @@ -197,10 +200,9 @@ topic_updates_dispatcher::dispatch_updates_to_cores(Cmd cmd, model::offset o) { }); } -void topic_updates_dispatcher::deallocate_topic( - const model::topic_metadata& tp_md) { +void topic_updates_dispatcher::deallocate_topic(const topic_metadata& tp_md) { // we have to deallocate topics - for (auto& p : tp_md.partitions) { + for (auto& p : tp_md.get_assignments()) { _partition_allocator.local().deallocate(p.replicas); } } diff --git a/src/v/cluster/topic_updates_dispatcher.h b/src/v/cluster/topic_updates_dispatcher.h index ad31d279c1ace..4232c9be692c8 100644 --- a/src/v/cluster/topic_updates_dispatcher.h +++ b/src/v/cluster/topic_updates_dispatcher.h @@ -11,12 +11,15 @@ #pragma once #include "cluster/commands.h" +#include "cluster/fwd.h" #include "cluster/scheduling/partition_allocator.h" #include "cluster/topic_table.h" #include "model/record.h" #include +#include + namespace cluster { // The topic updates dispatcher is resposible for receiving update_apply upcalls @@ -77,7 +80,7 @@ class topic_updates_dispatcher { ss::future<> update_leaders_with_estimates(std::vector leaders); void update_allocations(std::vector); - void deallocate_topic(const model::topic_metadata&); + void deallocate_topic(const topic_metadata&); void reallocate_partition( const std::vector&, const std::vector&); diff --git a/src/v/cluster/tx_gateway_frontend.cc b/src/v/cluster/tx_gateway_frontend.cc index 39f3648565de2..262a70352a461 100644 --- a/src/v/cluster/tx_gateway_frontend.cc +++ b/src/v/cluster/tx_gateway_frontend.cc @@ -162,8 +162,7 @@ ss::future> tx_gateway_frontend::get_tx_broker() { std::nullopt); } - auto md = _metadata_cache.local().get_topic_metadata( - model::tx_manager_nt); + auto md = _metadata_cache.local().contains(model::tx_manager_nt); if (!md) { return ss::make_ready_future>( std::nullopt); diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 8325e6388e865..7fa2888777566 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -285,6 +285,64 @@ bool config_status::operator==(const config_status& rhs) const { rhs.node, rhs.version, rhs.restart, rhs.unknown, rhs.invalid); } +namespace { +cluster::assignments_set to_assignments_map( + std::vector assignment_vector) { + cluster::assignments_set ret; + for (auto& p_as : assignment_vector) { + ret.emplace(std::move(p_as)); + } + return ret; +} +} // namespace + +topic_metadata::topic_metadata( + topic_configuration_assignment c, model::revision_id rid) noexcept + : _configuration(std::move(c.cfg)) + , _assignments(to_assignments_map(std::move(c.assignments))) + , _source_topic(std::nullopt) + , _revision(rid) {} + +topic_metadata::topic_metadata( + topic_configuration cfg, + assignments_set assignments, + model::revision_id rid, + model::topic st) noexcept + : _configuration(std::move(cfg)) + , _assignments(std::move(assignments)) + , _source_topic(st) + , _revision(rid) {} + +bool topic_metadata::is_topic_replicable() const { + return _source_topic.has_value() == false; +} + +model::revision_id topic_metadata::get_revision() const { + vassert( + is_topic_replicable(), "Query for revision_id on a non-replicable topic"); + return _revision; +} + +const model::topic& topic_metadata::get_source_topic() const { + vassert( + !is_topic_replicable(), "Query for source_topic on a replicable topic"); + return _source_topic.value(); +} + +const topic_configuration& topic_metadata::get_configuration() const { + return _configuration; +} + +const assignments_set& topic_metadata::get_assignments() const { + return _assignments; +} + +topic_configuration& topic_metadata::get_configuration() { + return _configuration; +} + +assignments_set& topic_metadata::get_assignments() { return _assignments; } + std::ostream& operator<<(std::ostream& o, const non_replicable_topic& d) { fmt::print( o, "{{Source topic: {}, non replicable topic: {}}}", d.source, d.name); diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index 4df0e8e2b6438..b05cdda8c0330 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -27,6 +27,7 @@ #include "utils/to_string.h" #include "v8_engine/data_policy.h" +#include #include namespace cluster { @@ -926,6 +927,57 @@ struct leader_term { friend std::ostream& operator<<(std::ostream&, const leader_term&); }; +struct partition_assignment_cmp { + using is_transparent = void; + constexpr bool operator()( + const partition_assignment& lhs, const partition_assignment& rhs) const { + return lhs.id < rhs.id; + } + + constexpr bool operator()( + const model::partition_id& id, const partition_assignment& rhs) const { + return id < rhs.id; + } + constexpr bool operator()( + const partition_assignment& lhs, const model::partition_id& id) const { + return lhs.id < id; + } + constexpr bool operator()( + const model::partition_id& lhs, const model::partition_id& rhs) const { + return lhs < rhs; + } +}; + +using assignments_set + = absl::btree_set; + +class topic_metadata { +public: + topic_metadata(topic_configuration_assignment, model::revision_id) noexcept; + + topic_metadata( + topic_configuration, + assignments_set, + model::revision_id, + model::topic) noexcept; + + bool is_topic_replicable() const; + model::revision_id get_revision() const; + const model::topic& get_source_topic() const; + + const topic_configuration& get_configuration() const; + topic_configuration& get_configuration(); + + const assignments_set& get_assignments() const; + assignments_set& get_assignments(); + +private: + topic_configuration _configuration; + assignments_set _assignments; + std::optional _source_topic; + model::revision_id _revision; +}; + } // namespace cluster namespace std { template<> diff --git a/src/v/coproc/reconciliation_backend.cc b/src/v/coproc/reconciliation_backend.cc index 537b84acdbe7d..2f91b2560e3a4 100644 --- a/src/v/coproc/reconciliation_backend.cc +++ b/src/v/coproc/reconciliation_backend.cc @@ -365,7 +365,7 @@ reconciliation_backend::create_non_replicable_partition( } // Non-replicated topics are not integrated with shadow indexing yet, // so the initial_revision_id is set to invalid value. - auto ntp_cfg = tt_md->second.get_configuration().cfg.make_ntp_config( + auto ntp_cfg = tt_md->second.get_configuration().make_ntp_config( _data_directory, ntp.tp.partition, rev, diff --git a/src/v/kafka/server/coordinator_ntp_mapper.h b/src/v/kafka/server/coordinator_ntp_mapper.h index 9953c563b8e24..0b975809caa36 100644 --- a/src/v/kafka/server/coordinator_ntp_mapper.h +++ b/src/v/kafka/server/coordinator_ntp_mapper.h @@ -50,14 +50,14 @@ class coordinator_ntp_mapper { , _tp_ns(std::move(group_topic)) {} std::optional ntp_for(const kafka::group_id& group) const { - auto md = _md.local().get_topic_metadata(_tp_ns); - if (!md) { + auto cfg = _md.local().get_topic_cfg(_tp_ns); + if (!cfg) { return std::nullopt; } incremental_xxhash64 inc; inc.update(group); auto p = static_cast( - jump_consistent_hash(inc.digest(), md->partitions.size())); + jump_consistent_hash(inc.digest(), cfg->partition_count)); return model::ntp(_tp_ns.ns, _tp_ns.tp, model::partition_id{p}); } diff --git a/src/v/kafka/server/group_metadata_migration.cc b/src/v/kafka/server/group_metadata_migration.cc index 89159df76f71b..6679798494331 100644 --- a/src/v/kafka/server/group_metadata_migration.cc +++ b/src/v/kafka/server/group_metadata_migration.cc @@ -95,14 +95,14 @@ transform_batch(model::record_batch batch) { ss::future create_consumer_offsets_topic( cluster::controller& controller, - std::vector assignments, + const cluster::assignments_set& assignments, model::timeout_clock::time_point timeout) { cluster::custom_assignable_topic_configuration topic( cluster::topic_configuration{ model::kafka_consumer_offsets_nt.ns, model::kafka_consumer_offsets_nt.tp, static_cast(assignments.size()), - static_cast(assignments.front().replicas.size())}); + static_cast(assignments.begin()->replicas.size())}); topic.cfg.properties.cleanup_policy_bitflags = model::cleanup_policy_bitflags::compaction; for (auto& p_as : assignments) { diff --git a/src/v/kafka/server/handlers/create_partitions.cc b/src/v/kafka/server/handlers/create_partitions.cc index 8f63abe89b70d..0eefa98755948 100644 --- a/src/v/kafka/server/handlers/create_partitions.cc +++ b/src/v/kafka/server/handlers/create_partitions.cc @@ -155,10 +155,8 @@ ss::future create_partitions_handler::handle( error_code::invalid_topic_exception, "Topic does not exist", [&ctx](const create_partitions_topic& tp) { - return ctx.metadata_cache() - .get_topic_metadata( - model::topic_namespace(model::kafka_namespace, tp.name)) - .has_value(); + return ctx.metadata_cache().contains( + model::topic_namespace(model::kafka_namespace, tp.name)); }); // validate custom assignment diff --git a/src/v/kafka/server/handlers/metadata.cc b/src/v/kafka/server/handlers/metadata.cc index 1ed8546271355..2601c27bb5ed8 100644 --- a/src/v/kafka/server/handlers/metadata.cc +++ b/src/v/kafka/server/handlers/metadata.cc @@ -32,6 +32,8 @@ #include #include +#include + namespace kafka { static constexpr model::node_id no_leader(-1); @@ -97,18 +99,19 @@ bool is_internal(const model::topic_namespace& tp_ns) { } // namespace metadata_response::topic make_topic_response_from_topic_metadata( - const cluster::metadata_cache& md_cache, model::topic_metadata&& tp_md) { + const cluster::metadata_cache& md_cache, cluster::topic_metadata&& tp_md) { metadata_response::topic tp; tp.error_code = error_code::none; - auto tp_ns = tp_md.tp_ns; - tp.name = std::move(tp_md.tp_ns.tp); + auto tp_ns = tp_md.get_configuration().tp_ns; + tp.name = std::move(tp_md.get_configuration().tp_ns.tp); tp.is_internal = is_internal(tp_ns); std::transform( - tp_md.partitions.begin(), - tp_md.partitions.end(), + tp_md.get_assignments().begin(), + tp_md.get_assignments().end(), std::back_inserter(tp.partitions), - [tp_ns = std::move(tp_ns), &md_cache](model::partition_metadata& p_md) { + [tp_ns = std::move(tp_ns), + &md_cache](cluster::partition_assignment& p_md) { std::vector replicas{}; replicas.reserve(p_md.replicas.size()); std::transform( @@ -189,14 +192,14 @@ make_error_topic_response(model::topic tp, error_code ec) { } static metadata_response::topic make_topic_response( - request_context& ctx, metadata_request& rq, model::topic_metadata md) { + request_context& ctx, metadata_request& rq, cluster::topic_metadata md) { int32_t auth_operations = 0; /** * if requested include topic authorized operations */ if (rq.data.include_topic_authorized_operations) { auth_operations = details::to_bit_field( - details::authorized_operations(ctx, md.tp_ns.tp)); + details::authorized_operations(ctx, md.get_configuration().tp_ns.tp)); } auto res = make_topic_response_from_topic_metadata( @@ -211,33 +214,26 @@ get_topic_metadata(request_context& ctx, metadata_request& request) { // request can be served from whatever happens to be in the cache if (request.list_all_topics) { - auto topics = ctx.metadata_cache().all_topics_metadata( - cluster::metadata_cache::with_leaders::no); - // only serve topics from the kafka namespace - std::erase_if(topics, [](model::topic_metadata& t_md) { - return t_md.tp_ns.ns != model::kafka_namespace; - }); - - auto unauthorized_it = std::partition( - topics.begin(), - topics.end(), - [&ctx](const model::topic_metadata& t_md) { - /* - * quiet authz failures. this isn't checking for a specifically - * requested topic, but rather checking visibility of all topics. - */ - return ctx.authorized( - security::acl_operation::describe, - t_md.tp_ns.tp, - authz_quiet{true}); - }); - std::transform( - topics.begin(), - unauthorized_it, - std::back_inserter(res), - [&ctx, &request](model::topic_metadata& t_md) { - return make_topic_response(ctx, request, std::move(t_md)); - }); + auto& topics_md = ctx.metadata_cache().all_topics_metadata(); + + for (const auto& [tp_ns, md] : topics_md) { + // only serve topics from the kafka namespace + if (tp_ns.ns != model::kafka_namespace) { + continue; + } + /* + * quiet authz failures. this isn't checking for a specifically + * requested topic, but rather checking visibility of all topics. + */ + if (!ctx.authorized( + security::acl_operation::describe, + tp_ns.tp, + authz_quiet{true})) { + continue; + } + res.push_back(make_topic_response(ctx, request, md)); + } + return ss::make_ready_future>( std::move(res)); } @@ -255,8 +251,7 @@ get_topic_metadata(request_context& ctx, metadata_request& request) { continue; } if (auto md = ctx.metadata_cache().get_topic_metadata( - model::topic_namespace_view(model::kafka_namespace, topic.name), - cluster::metadata_cache::with_leaders::no); + model::topic_namespace_view(model::kafka_namespace, topic.name)); md) { auto src_topic_response = make_topic_response( ctx, request, std::move(*md)); diff --git a/src/v/kafka/server/handlers/offset_commit.cc b/src/v/kafka/server/handlers/offset_commit.cc index 731c4e61a0e73..81c31d44b2c58 100644 --- a/src/v/kafka/server/handlers/offset_commit.cc +++ b/src/v/kafka/server/handlers/offset_commit.cc @@ -125,21 +125,16 @@ offset_commit_handler::handle(request_context ctx, ss::smp_service_group ssg) { const auto topic_name = model::topic(it->name); model::topic_namespace_view tn(model::kafka_namespace, topic_name); - if (const auto& md = octx.rctx.metadata_cache().get_topic_metadata(tn); - md) { + if (octx.rctx.metadata_cache().contains(tn)) { /* * check if each partition exists */ auto split = std::partition( it->partitions.begin(), it->partitions.end(), - [&md](const offset_commit_request_partition& p) { - return std::any_of( - md->partitions.cbegin(), - md->partitions.cend(), - [&p](const model::partition_metadata& pmd) { - return pmd.id == p.partition_index; - }); + [&octx, &tn](const offset_commit_request_partition& p) { + return octx.rctx.metadata_cache().contains( + tn, p.partition_index); }); /* * build responses for nonexistent topic partitions diff --git a/src/v/kafka/server/handlers/topics/topic_utils.cc b/src/v/kafka/server/handlers/topics/topic_utils.cc index 7a5362dd43625..2e651668596a6 100644 --- a/src/v/kafka/server/handlers/topics/topic_utils.cc +++ b/src/v/kafka/server/handlers/topics/topic_utils.cc @@ -42,13 +42,13 @@ ss::future> wait_for_leaders( continue; } // collect partitions - auto md = md_cache.get_topic_metadata(r.tp_ns); + auto md = md_cache.get_topic_metadata_ref(r.tp_ns); if (!md) { // topic already deleted continue; } // for each partition ask for leader - for (auto& pmd : md->partitions) { + for (auto& pmd : md->get().get_assignments()) { futures.push_back(md_cache.get_leader( model::ntp(r.tp_ns.ns, r.tp_ns.tp, pmd.id), timeout)); } diff --git a/src/v/kafka/server/handlers/txn_offset_commit.cc b/src/v/kafka/server/handlers/txn_offset_commit.cc index d2f7fe5864858..04c2649291cce 100644 --- a/src/v/kafka/server/handlers/txn_offset_commit.cc +++ b/src/v/kafka/server/handlers/txn_offset_commit.cc @@ -109,21 +109,16 @@ ss::future txn_offset_commit_handler::handle( const auto topic_name = model::topic(it->name); model::topic_namespace_view tn(model::kafka_namespace, topic_name); - if (const auto& md = octx.rctx.metadata_cache().get_topic_metadata(tn); - md) { + if (octx.rctx.metadata_cache().contains(tn)) { /* * check if each partition exists */ auto split = std::partition( it->partitions.begin(), it->partitions.end(), - [&md](const txn_offset_commit_request_partition& p) { - return std::any_of( - md->partitions.cbegin(), - md->partitions.cend(), - [&p](const model::partition_metadata& pmd) { - return pmd.id == p.partition_index; - }); + [&octx, &tn](const txn_offset_commit_request_partition& p) { + return octx.rctx.metadata_cache().contains( + tn, p.partition_index); }); /* * build responses for nonexistent topic partitions diff --git a/src/v/redpanda/tests/fixture.h b/src/v/redpanda/tests/fixture.h index a690dc9f7c493..1b33f5eb4399f 100644 --- a/src/v/redpanda/tests/fixture.h +++ b/src/v/redpanda/tests/fixture.h @@ -284,9 +284,10 @@ class redpanda_thread_fixture { r.tp_ns); return md && std::all_of( - md->partitions.begin(), - md->partitions.end(), - [this, &r](const model::partition_metadata& p) { + md->get_assignments().begin(), + md->get_assignments().end(), + [this, + &r](const cluster::partition_assignment& p) { return app.shard_table.local().shard_for( model::ntp(r.tp_ns.ns, r.tp_ns.tp, p.id)); });