diff --git a/src/v/ssx/CMakeLists.txt b/src/v/ssx/CMakeLists.txt index 2591d270c034..1229cbb8ddb2 100644 --- a/src/v/ssx/CMakeLists.txt +++ b/src/v/ssx/CMakeLists.txt @@ -3,6 +3,8 @@ v_cc_library( ssx HDRS "future-util.h" + SRCS + metrics.cc DEPS Seastar::seastar ) diff --git a/src/v/ssx/metrics.cc b/src/v/ssx/metrics.cc new file mode 100644 index 000000000000..b874aec1cdc5 --- /dev/null +++ b/src/v/ssx/metrics.cc @@ -0,0 +1,141 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "ssx/metrics.h" + +#include +#include + +#include + +#include +#include +#include +#include + +namespace ssx::metrics { + +namespace ssm = ss::metrics; +namespace ssmi = ssm::impl; +using agg_op = ssx::metrics::filter::config::agg_op; +using registered_instances = std::vector; +// Store a mapping of allowed labels to aggregated registered_metrics +using metric_multi_instance = std::map; + +ssmi::metric_function get_op(agg_op op, registered_instances refs) { + switch (op) { + case agg_op::sum: + return [refs{std::move(refs)}]() { + auto type = refs.front()->get_function()().type(); + return absl::c_accumulate( + refs, ssmi::metric_value(0, type), [](auto lhs, auto rhs) { + return lhs + rhs->get_function()(); + }); + }; + case agg_op::count: + return [s = static_cast(refs.size())]() { + return ssmi::metric_value(s, ssmi::data_type::ABSOLUTE); + }; + case agg_op::min: + return [refs{std::move(refs)}]() { + auto it = absl::c_min_element(refs, [](auto lhs, auto rhs) { + return lhs->get_function()().d() < rhs->get_function()().d(); + }); + return (*it)->get_function()(); + }; + case agg_op::max: + return [refs{std::move(refs)}]() { + auto it = absl::c_max_element(refs, [](auto lhs, auto rhs) { + return lhs->get_function()().d() < rhs->get_function()().d(); + }); + return (*it)->get_function()(); + }; + case agg_op::avg: + return [refs{std::move(refs)}]() { + auto type = refs.front()->get_function()().type(); + auto sum = absl::c_accumulate( + refs, ssmi::metric_value{0, type}, [](auto lhs, auto rhs) { + return lhs + rhs->get_function()(); + }); + return ssmi::metric_value{ + sum.d() / static_cast(refs.size()), type}; + }; + }; +} + +ss::metrics::impl::value_map get_filtered(const filter::config& cfg) { + // The structure looks like this: + // ssmi::value_map exemplar{ + // {"test_fetch_metrics_counter", + // ssmi::metric_family{ + // ssmi::metric_instances{ + // {ssmi::labels_type{ + // {"aggregate", "0"}, {"leave", "0"}, {"shard", "0"}}, + // ssmi::register_ref{}}, + // {ssmi::labels_type{ + // {"aggregate", "1"}, {"leave", "0"}, {"shard", "0"}}, + // ssmi::register_ref{}}}, + // ssmi::metric_family_info{ + // ssmi::data_type::COUNTER, + // ssm::metric_type_def{""}, + // ssm::description{"Test a counter"}, + // ss::sstring{"test_fetch_metrics_counter"}}}}}; + + // Collect metric families + ssmi::value_map result; + for (const auto& [name, family] : ssmi::get_value_map()) { + auto m_it = absl::c_find_if( + cfg.allow, [&n = name](const auto& m) { return m.name == n; }); + + // Ignore metrics that are not allowed. + if (m_it == cfg.allow.end()) { + continue; + } + + const auto& metric_cfg{*m_it}; + + // Collect aggregated instances + metric_multi_instance new_instances; + for (const auto& [base_labels, base_metric] : family) { + ssmi::labels_type new_labels; + for (const auto& [l_name, l_value] : base_labels) { + auto l_it = absl::c_find_if( + metric_cfg.allow, [lhs = l_name](const auto& rhs) { + return lhs == rhs.name(); + }); + if (l_it != metric_cfg.allow.end()) { + new_labels.emplace(l_name, l_value); + } + } + new_instances[new_labels].emplace_back(base_metric); + } + + // Convert aggregated instances to registred metrics + ssmi::metric_instances family_instances; + for (auto& ni : new_instances) { + auto id = ni.second.front()->get_id(); + auto op = metric_cfg.op.value_or(agg_op::sum); + + family_instances.emplace( + ni.first, + ss::make_shared( + std::move(id), get_op(op, std::move(ni.second)))); + } + auto family_info = family.info(); + result.emplace( + name, + ssmi::metric_family{ + std::move(family_instances), std::move(family_info)}); + } + return result; +} + +} // namespace ssx::metrics diff --git a/src/v/ssx/metrics.h b/src/v/ssx/metrics.h new file mode 100644 index 000000000000..0168cadeb58c --- /dev/null +++ b/src/v/ssx/metrics.h @@ -0,0 +1,47 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "seastarx.h" + +#include + +#include +#include + +namespace ssx::metrics { + +namespace filter { + +struct config { + enum class agg_op { + sum, + count, + min, + max, + avg, + }; + + struct metric { + ss::sstring name; + std::vector allow; + std::optional op; + }; + + std::vector allow; +}; + +} // namespace filter + +ss::metrics::impl::value_map get_filtered(const filter::config& cfg); + +} // namespace ssx::metrics diff --git a/src/v/ssx/tests/CMakeLists.txt b/src/v/ssx/tests/CMakeLists.txt index a4e168f5c920..97cd846930b7 100644 --- a/src/v/ssx/tests/CMakeLists.txt +++ b/src/v/ssx/tests/CMakeLists.txt @@ -5,6 +5,7 @@ rp_test( async_transforms.cc sformat.cc future_util.cc + metrics.cc DEFINITIONS BOOST_TEST_DYN_LINK LIBRARIES v::seastar_testing_main v::ssx LABELS ssx diff --git a/src/v/ssx/tests/metrics.cc b/src/v/ssx/tests/metrics.cc new file mode 100644 index 000000000000..64aed9626b7a --- /dev/null +++ b/src/v/ssx/tests/metrics.cc @@ -0,0 +1,136 @@ +// Copyright 2022 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "ssx/metrics.h" + +#include "prometheus/prometheus_sanitize.h" +#include "seastarx.h" + +#include +#include + +#include + +namespace sm = ss::metrics; +namespace ssmi = sm::impl; +namespace ssxm = ssx::metrics; +namespace ssxmf = ssxm::filter; +using agg_op = ssx::metrics::filter::config::agg_op; + +SEASTAR_THREAD_TEST_CASE(test_fetch_metrics) { + uint64_t counter_0{17}; + uint64_t counter_1{19}; + uint64_t counter_2{23}; + sm::metric_groups metrics; + + sm::label label_ag{"aggregate"}; + sm::label label_leave{"leave"}; + + metrics.add_group( + prometheus_sanitize::metrics_name("test_fetch_metrics"), + { + sm::make_counter( + "counter", + [&] { return counter_0; }, + sm::description("Test a counter"), + { + label_ag(0), + label_leave(0), + }), + sm::make_counter( + "counter", + [&] { return counter_1; }, + sm::description("Test a counter"), + { + label_ag(1), + label_leave(0), + }), + sm::make_counter( + "counter", + [&] { return counter_2; }, + sm::description("Test a counter"), + { + label_ag(1), + label_leave(1), + }), + }); + + auto collect_values = [](const ssmi::value_map& metrics) { + std::map values; + for (const auto& [n, m] : metrics) { + for (const auto& [n, o] : m) { + values[n] = o->get_function()().i(); + } + } + return values; + }; + + // Check registred metrics + auto values = collect_values(ss::metrics::impl::get_value_map()); + auto expected_0 = ssmi::labels_type{ + {label_ag.name(), "0"}, + {label_leave.name(), "0"}, + {sm::shard_label.name(), "0"}}; + + auto expected_1 = ssmi::labels_type{ + {label_ag.name(), "1"}, + {label_leave.name(), "0"}, + {sm::shard_label.name(), "0"}}; + + auto expected_2 = ssmi::labels_type{ + {label_ag.name(), "1"}, + {label_leave.name(), "1"}, + {sm::shard_label.name(), "0"}}; + + BOOST_CHECK_EQUAL(values[expected_0], 17); + BOOST_CHECK_EQUAL(values[expected_1], 19); + BOOST_CHECK_EQUAL(values[expected_2], 23); + + // Check summed metrics + ssxmf::config config{{{"test_fetch_metrics_counter", {label_leave}}}}; + values = collect_values(ssxm::get_filtered(config)); + + BOOST_CHECK_EQUAL(values.size(), 2); + auto leave_0 = ssmi::labels_type{{label_leave.name(), "0"}}; + auto leave_1 = ssmi::labels_type{{label_leave.name(), "1"}}; + BOOST_CHECK_EQUAL(values[leave_0], 36); + BOOST_CHECK_EQUAL(values[leave_1], 23); + + // Check count metrics + config = {{{"test_fetch_metrics_counter", {label_leave}, agg_op::count}}}; + values = collect_values(ssxm::get_filtered(config)); + + BOOST_CHECK_EQUAL(values.size(), 2); + BOOST_CHECK_EQUAL(values[leave_0], 2); + BOOST_CHECK_EQUAL(values[leave_1], 1); + + // Check min metrics + config = {{{"test_fetch_metrics_counter", {label_leave}, agg_op::min}}}; + values = collect_values(ssxm::get_filtered(config)); + + BOOST_CHECK_EQUAL(values.size(), 2); + BOOST_CHECK_EQUAL(values[leave_0], 17); + BOOST_CHECK_EQUAL(values[leave_1], 23); + + // Check max metrics + config = {{{"test_fetch_metrics_counter", {label_leave}, agg_op::max}}}; + values = collect_values(ssxm::get_filtered(config)); + + BOOST_CHECK_EQUAL(values.size(), 2); + BOOST_CHECK_EQUAL(values[leave_0], 19); + BOOST_CHECK_EQUAL(values[leave_1], 23); + + // Check avg metrics + config = {{{"test_fetch_metrics_counter", {label_leave}, agg_op::avg}}}; + values = collect_values(ssxm::get_filtered(config)); + + BOOST_CHECK_EQUAL(values.size(), 2); + BOOST_CHECK_EQUAL(values[leave_0], 18); + BOOST_CHECK_EQUAL(values[leave_1], 23); +}