Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate metrics to reduce cardinality #5166

Merged
merged 10 commits into from
Jul 4, 2022
Merged
16 changes: 12 additions & 4 deletions src/v/archival/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "archival/probe.h"

#include "config/configuration.h"
#include "prometheus/prometheus_sanitize.h"

#include <seastar/core/metrics.hh>
Expand All @@ -32,6 +33,9 @@ ntp_level_probe::ntp_level_probe(
topic_label(ntp.tp.topic()),
partition_label(ntp.tp.partition()),
};
auto aggregate_labels = config::shard_local_cfg().aggregate_metrics()
? std::vector<sm::label>{sm::shard_label}
: std::vector<sm::label>{};

_metrics.add_group(
prometheus_sanitize::metrics_name("ntp_archiver"),
Expand All @@ -40,22 +44,26 @@ ntp_level_probe::ntp_level_probe(
"uploaded",
[this] { return _uploaded; },
sm::description("Uploaded offsets"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_total_bytes(
"uploaded_bytes",
[this] { return _uploaded_bytes; },
sm::description("Total number of uploaded bytes"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"missing",
[this] { return _missing; },
sm::description("Missing offsets due to gaps"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"pending",
[this] { return _pending; },
sm::description("Pending offsets"),
labels),
labels)
.aggregate(aggregate_labels),
});
}

Expand Down
36 changes: 25 additions & 11 deletions src/v/cluster/partition_probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) {
auto ns_label = sm::label("namespace");
auto topic_label = sm::label("topic");
auto partition_label = sm::label("partition");
auto aggregate_labels = config::shard_local_cfg().aggregate_metrics()
? std::vector<sm::label>{sm::shard_label}
: std::vector<sm::label>{};

const std::vector<sm::label_instance> labels = {
ns_label(ntp.ns()),
Expand All @@ -47,38 +50,44 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) {
[this] { return _partition.is_elected_leader() ? 1 : 0; },
sm::description(
"Flag indicating if this partition instance is a leader"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"last_stable_offset",
[this] { return _partition.last_stable_offset(); },
sm::description("Last stable offset"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"committed_offset",
[this] { return _partition.committed_offset(); },
sm::description("Partition commited offset. i.e. safely persisted on "
"majority of replicas"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"end_offset",
[this] { return _partition.dirty_offset(); },
sm::description(
"Last offset stored by current partition on this node"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"high_watermark",
[this] { return _partition.high_watermark(); },
sm::description(
"Partion high watermark i.e. highest consumable offset"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"leader_id",
[this] {
return _partition._raft->get_leader_id().value_or(
model::node_id(-1));
},
sm::description("Id of current partition leader"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"under_replicated_replicas",
[this] {
Expand All @@ -91,27 +100,32 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) {
});
},
sm::description("Number of under replicated replicas"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"records_produced",
[this] { return _records_produced; },
sm::description("Total number of records produced"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"records_fetched",
[this] { return _records_fetched; },
sm::description("Total number of records fetched"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_total_bytes(
"bytes_produced_total",
[this] { return _bytes_produced; },
sm::description("Total number of bytes produced"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_total_bytes(
"bytes_fetched_total",
[this] { return _bytes_fetched; },
sm::description("Total number of bytes fetched"),
labels),
labels)
.aggregate(aggregate_labels),
});
}
partition_probe make_materialized_partition_probe() {
Expand Down
11 changes: 11 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,17 @@ configuration::configuration()
"Disable registering metrics",
base_property::metadata{},
false)
, aggregate_metrics(
*this,
"aggregate_metrics",
"Enable aggregations of metrics returned by the prometheus '/metrics' "
"endpoint. "
"Metric aggregation is performed by summing the values of samples by "
"labels. "
"Aggregations are performed where it makes sense by the shard and/or "
"partition labels.",
{.needs_restart = needs_restart::yes},
false)
, group_min_session_timeout_ms(
*this,
"group_min_session_timeout_ms",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ struct configuration final : public config_store {
bounded_property<uint32_t> target_quota_byte_rate;
property<std::optional<ss::sstring>> cluster_id;
property<bool> disable_metrics;
property<bool> aggregate_metrics;
property<std::chrono::milliseconds> group_min_session_timeout_ms;
property<std::chrono::milliseconds> group_max_session_timeout_ms;
property<std::chrono::milliseconds> group_initial_rebalance_delay;
Expand Down
9 changes: 7 additions & 2 deletions src/v/kafka/latency_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,23 @@ class latency_probe {
}
std::vector<sm::label_instance> labels{
sm::label("latency_metric")("microseconds")};
auto aggregate_labels = config::shard_local_cfg().aggregate_metrics()
? std::vector<sm::label>{sm::shard_label}
: std::vector<sm::label>{};
_metrics.add_group(
prometheus_sanitize::metrics_name("kafka:latency"),
{sm::make_histogram(
"fetch_latency_us",
sm::description("Fetch Latency"),
labels,
[this] { return _fetch_latency.seastar_histogram_logform(); }),
[this] { return _fetch_latency.seastar_histogram_logform(); })
.aggregate(aggregate_labels),
sm::make_histogram(
"produce_latency_us",
sm::description("Produce Latency"),
labels,
[this] { return _produce_latency.seastar_histogram_logform(); })});
[this] { return _produce_latency.seastar_histogram_logform(); })
.aggregate(aggregate_labels)});
}

std::unique_ptr<hdr_hist::measurement> auto_produce_measurement() {
Expand Down
44 changes: 30 additions & 14 deletions src/v/net/probes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "config/configuration.h"
#include "net/client_probe.h"
#include "net/server_probe.h"
#include "prometheus/prometheus_sanitize.h"
Expand All @@ -21,76 +22,91 @@ namespace net {
void server_probe::setup_metrics(
ss::metrics::metric_groups& mgs, const char* proto) {
namespace sm = ss::metrics;
auto aggregate_labels = config::shard_local_cfg().aggregate_metrics()
? std::vector<sm::label>{sm::shard_label}
: std::vector<sm::label>{};
VladLazar marked this conversation as resolved.
Show resolved Hide resolved
mgs.add_group(
prometheus_sanitize::metrics_name(proto),
{
sm::make_gauge(
"active_connections",
[this] { return _connections; },
sm::description(
ssx::sformat("{}: Currently active connections", proto))),
ssx::sformat("{}: Currently active connections", proto)))
.aggregate(aggregate_labels),
sm::make_counter(
"connects",
[this] { return _connects; },
sm::description(
ssx::sformat("{}: Number of accepted connections", proto))),
ssx::sformat("{}: Number of accepted connections", proto)))
.aggregate(aggregate_labels),
sm::make_counter(
"connection_close_errors",
[this] { return _connection_close_error; },
sm::description(ssx::sformat(
"{}: Number of errors when shutting down the connection", proto))),
"{}: Number of errors when shutting down the connection", proto)))
.aggregate(aggregate_labels),
sm::make_counter(
"connections_rejected",
[this] { return _connections_rejected; },
sm::description(ssx::sformat(
"{}: Number of connections rejected for hitting connection limits",
proto))),
proto)))
.aggregate(aggregate_labels),
sm::make_counter(
"requests_completed",
[this] { return _requests_completed; },
sm::description(
ssx::sformat("{}: Number of successful requests", proto))),
ssx::sformat("{}: Number of successful requests", proto)))
.aggregate(aggregate_labels),
sm::make_total_bytes(
"received_bytes",
[this] { return _in_bytes; },
sm::description(ssx::sformat(
"{}: Number of bytes received from the clients in valid requests",
proto))),
proto)))
.aggregate(aggregate_labels),
sm::make_total_bytes(
"sent_bytes",
[this] { return _out_bytes; },
sm::description(
ssx::sformat("{}: Number of bytes sent to clients", proto))),
ssx::sformat("{}: Number of bytes sent to clients", proto)))
.aggregate(aggregate_labels),
sm::make_counter(
"method_not_found_errors",
[this] { return _method_not_found_errors; },
sm::description(ssx::sformat(
"{}: Number of requests with not available RPC method", proto))),
"{}: Number of requests with not available RPC method", proto)))
.aggregate(aggregate_labels),
sm::make_counter(
"corrupted_headers",
[this] { return _corrupted_headers; },
sm::description(ssx::sformat(
"{}: Number of requests with corrupted headers", proto))),
"{}: Number of requests with corrupted headers", proto)))
.aggregate(aggregate_labels),
sm::make_counter(
"service_errors",
[this] { return _service_errors; },
sm::description(ssx::sformat("{}: Number of service errors", proto))),
sm::description(ssx::sformat("{}: Number of service errors", proto)))
.aggregate(aggregate_labels),
sm::make_counter(
"requests_blocked_memory",
[this] { return _requests_blocked_memory; },
sm::description(ssx::sformat(
"{}: Number of requests blocked in memory backpressure", proto))),
"{}: Number of requests blocked in memory backpressure", proto)))
.aggregate(aggregate_labels),
sm::make_gauge(
"requests_pending",
[this] { return _requests_received - _requests_completed; },
sm::description(ssx::sformat(
"{}: Number of requests being processed by server", proto))),
"{}: Number of requests being processed by server", proto)))
.aggregate(aggregate_labels),
sm::make_counter(
"connections_wait_rate",
[this] { return _connections_wait_rate; },
sm::description(ssx::sformat(
"{}: Number of connections are blocked by connection rate",
proto))),
"{}: Number of connections are blocked by connection rate", proto)))
.aggregate(aggregate_labels),
});
}

Expand Down
23 changes: 17 additions & 6 deletions src/v/pandaproxy/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,26 @@ probe::probe(ss::httpd::path_description& path_desc)
return;
}
namespace sm = ss::metrics;

auto operation_label = sm::label("operation");
std::vector<sm::label_instance> labels{
sm::label("operation")(path_desc.operations.nickname)};
operation_label(path_desc.operations.nickname)};

auto aggregate_labels = std::vector<sm::label>{
sm::shard_label, operation_label};
auto internal_aggregate_labels
= config::shard_local_cfg().aggregate_metrics()
? aggregate_labels
: std::vector<sm::label>{};

_metrics.add_group(
"pandaproxy",
{sm::make_histogram(
"request_latency",
sm::description("Request latency"),
std::move(labels),
[this] { return _request_hist.seastar_histogram_logform(); })});
"request_latency",
sm::description("Request latency"),
labels,
[this] { return _request_hist.seastar_histogram_logform(); })
.aggregate(internal_aggregate_labels)});
}

} // namespace pandaproxy
} // namespace pandaproxy
12 changes: 9 additions & 3 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,26 @@ consensus::consensus(
}

void consensus::setup_metrics() {
namespace sm = ss::metrics;

if (config::shard_local_cfg().disable_metrics()) {
return;
}

_probe.setup_metrics(_log.config().ntp());
auto labels = probe::create_metric_labels(_log.config().ntp());
namespace sm = ss::metrics;
auto aggregate_labels = config::shard_local_cfg().aggregate_metrics()
? std::vector<sm::label>{sm::shard_label}
: std::vector<sm::label>{};

_metrics.add_group(
prometheus_sanitize::metrics_name("raft"),
{sm::make_gauge(
"leader_for",
[this] { return is_elected_leader(); },
sm::description("Number of groups for which node is a leader"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"configuration_change_in_progress",
[this] {
Expand All @@ -160,7 +165,8 @@ void consensus::setup_metrics() {
},
sm::description("Indicates if current raft group configuration is in "
"joint state i.e. configuration is being changed"),
labels)});
labels)
.aggregate(aggregate_labels)});
}

void consensus::do_step_down(std::string_view ctx) {
Expand Down
Loading