Skip to content

Commit

Permalink
Merge pull request #5166 from VladLazar/metrics-aggregation
Browse files Browse the repository at this point in the history
Aggregate metrics to reduce cardinality by several orders of magnitude
  • Loading branch information
Vlad Lazar committed Jul 4, 2022
2 parents c3dc1bb + 1a27b58 commit c5c9050
Show file tree
Hide file tree
Showing 12 changed files with 270 additions and 145 deletions.
16 changes: 12 additions & 4 deletions src/v/archival/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "archival/probe.h"

#include "config/configuration.h"
#include "prometheus/prometheus_sanitize.h"

#include <seastar/core/metrics.hh>
Expand All @@ -32,6 +33,9 @@ ntp_level_probe::ntp_level_probe(
topic_label(ntp.tp.topic()),
partition_label(ntp.tp.partition()),
};
auto aggregate_labels = config::shard_local_cfg().aggregate_metrics()
? std::vector<sm::label>{sm::shard_label}
: std::vector<sm::label>{};

_metrics.add_group(
prometheus_sanitize::metrics_name("ntp_archiver"),
Expand All @@ -40,22 +44,26 @@ ntp_level_probe::ntp_level_probe(
"uploaded",
[this] { return _uploaded; },
sm::description("Uploaded offsets"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_total_bytes(
"uploaded_bytes",
[this] { return _uploaded_bytes; },
sm::description("Total number of uploaded bytes"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"missing",
[this] { return _missing; },
sm::description("Missing offsets due to gaps"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"pending",
[this] { return _pending; },
sm::description("Pending offsets"),
labels),
labels)
.aggregate(aggregate_labels),
});
}

Expand Down
36 changes: 25 additions & 11 deletions src/v/cluster/partition_probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) {
auto ns_label = sm::label("namespace");
auto topic_label = sm::label("topic");
auto partition_label = sm::label("partition");
auto aggregate_labels = config::shard_local_cfg().aggregate_metrics()
? std::vector<sm::label>{sm::shard_label}
: std::vector<sm::label>{};

const std::vector<sm::label_instance> labels = {
ns_label(ntp.ns()),
Expand All @@ -47,38 +50,44 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) {
[this] { return _partition.is_elected_leader() ? 1 : 0; },
sm::description(
"Flag indicating if this partition instance is a leader"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"last_stable_offset",
[this] { return _partition.last_stable_offset(); },
sm::description("Last stable offset"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"committed_offset",
[this] { return _partition.committed_offset(); },
sm::description("Partition commited offset. i.e. safely persisted on "
"majority of replicas"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"end_offset",
[this] { return _partition.dirty_offset(); },
sm::description(
"Last offset stored by current partition on this node"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"high_watermark",
[this] { return _partition.high_watermark(); },
sm::description(
"Partion high watermark i.e. highest consumable offset"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"leader_id",
[this] {
return _partition._raft->get_leader_id().value_or(
model::node_id(-1));
},
sm::description("Id of current partition leader"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"under_replicated_replicas",
[this] {
Expand All @@ -91,27 +100,32 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) {
});
},
sm::description("Number of under replicated replicas"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"records_produced",
[this] { return _records_produced; },
sm::description("Total number of records produced"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"records_fetched",
[this] { return _records_fetched; },
sm::description("Total number of records fetched"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_total_bytes(
"bytes_produced_total",
[this] { return _bytes_produced; },
sm::description("Total number of bytes produced"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_total_bytes(
"bytes_fetched_total",
[this] { return _bytes_fetched; },
sm::description("Total number of bytes fetched"),
labels),
labels)
.aggregate(aggregate_labels),
});
}
partition_probe make_materialized_partition_probe() {
Expand Down
11 changes: 11 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,17 @@ configuration::configuration()
"Disable registering metrics",
base_property::metadata{},
false)
, aggregate_metrics(
*this,
"aggregate_metrics",
"Enable aggregations of metrics returned by the prometheus '/metrics' "
"endpoint. "
"Metric aggregation is performed by summing the values of samples by "
"labels. "
"Aggregations are performed where it makes sense by the shard and/or "
"partition labels.",
{.needs_restart = needs_restart::yes},
false)
, group_min_session_timeout_ms(
*this,
"group_min_session_timeout_ms",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ struct configuration final : public config_store {
bounded_property<uint32_t> target_quota_byte_rate;
property<std::optional<ss::sstring>> cluster_id;
property<bool> disable_metrics;
property<bool> aggregate_metrics;
property<std::chrono::milliseconds> group_min_session_timeout_ms;
property<std::chrono::milliseconds> group_max_session_timeout_ms;
property<std::chrono::milliseconds> group_initial_rebalance_delay;
Expand Down
9 changes: 7 additions & 2 deletions src/v/kafka/latency_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,23 @@ class latency_probe {
}
std::vector<sm::label_instance> labels{
sm::label("latency_metric")("microseconds")};
auto aggregate_labels = config::shard_local_cfg().aggregate_metrics()
? std::vector<sm::label>{sm::shard_label}
: std::vector<sm::label>{};
_metrics.add_group(
prometheus_sanitize::metrics_name("kafka:latency"),
{sm::make_histogram(
"fetch_latency_us",
sm::description("Fetch Latency"),
labels,
[this] { return _fetch_latency.seastar_histogram_logform(); }),
[this] { return _fetch_latency.seastar_histogram_logform(); })
.aggregate(aggregate_labels),
sm::make_histogram(
"produce_latency_us",
sm::description("Produce Latency"),
labels,
[this] { return _produce_latency.seastar_histogram_logform(); })});
[this] { return _produce_latency.seastar_histogram_logform(); })
.aggregate(aggregate_labels)});
}

std::unique_ptr<hdr_hist::measurement> auto_produce_measurement() {
Expand Down
44 changes: 30 additions & 14 deletions src/v/net/probes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "config/configuration.h"
#include "net/client_probe.h"
#include "net/server_probe.h"
#include "prometheus/prometheus_sanitize.h"
Expand All @@ -21,76 +22,91 @@ namespace net {
void server_probe::setup_metrics(
ss::metrics::metric_groups& mgs, const char* proto) {
namespace sm = ss::metrics;
auto aggregate_labels = config::shard_local_cfg().aggregate_metrics()
? std::vector<sm::label>{sm::shard_label}
: std::vector<sm::label>{};
mgs.add_group(
prometheus_sanitize::metrics_name(proto),
{
sm::make_gauge(
"active_connections",
[this] { return _connections; },
sm::description(
ssx::sformat("{}: Currently active connections", proto))),
ssx::sformat("{}: Currently active connections", proto)))
.aggregate(aggregate_labels),
sm::make_counter(
"connects",
[this] { return _connects; },
sm::description(
ssx::sformat("{}: Number of accepted connections", proto))),
ssx::sformat("{}: Number of accepted connections", proto)))
.aggregate(aggregate_labels),
sm::make_counter(
"connection_close_errors",
[this] { return _connection_close_error; },
sm::description(ssx::sformat(
"{}: Number of errors when shutting down the connection", proto))),
"{}: Number of errors when shutting down the connection", proto)))
.aggregate(aggregate_labels),
sm::make_counter(
"connections_rejected",
[this] { return _connections_rejected; },
sm::description(ssx::sformat(
"{}: Number of connections rejected for hitting connection limits",
proto))),
proto)))
.aggregate(aggregate_labels),
sm::make_counter(
"requests_completed",
[this] { return _requests_completed; },
sm::description(
ssx::sformat("{}: Number of successful requests", proto))),
ssx::sformat("{}: Number of successful requests", proto)))
.aggregate(aggregate_labels),
sm::make_total_bytes(
"received_bytes",
[this] { return _in_bytes; },
sm::description(ssx::sformat(
"{}: Number of bytes received from the clients in valid requests",
proto))),
proto)))
.aggregate(aggregate_labels),
sm::make_total_bytes(
"sent_bytes",
[this] { return _out_bytes; },
sm::description(
ssx::sformat("{}: Number of bytes sent to clients", proto))),
ssx::sformat("{}: Number of bytes sent to clients", proto)))
.aggregate(aggregate_labels),
sm::make_counter(
"method_not_found_errors",
[this] { return _method_not_found_errors; },
sm::description(ssx::sformat(
"{}: Number of requests with not available RPC method", proto))),
"{}: Number of requests with not available RPC method", proto)))
.aggregate(aggregate_labels),
sm::make_counter(
"corrupted_headers",
[this] { return _corrupted_headers; },
sm::description(ssx::sformat(
"{}: Number of requests with corrupted headers", proto))),
"{}: Number of requests with corrupted headers", proto)))
.aggregate(aggregate_labels),
sm::make_counter(
"service_errors",
[this] { return _service_errors; },
sm::description(ssx::sformat("{}: Number of service errors", proto))),
sm::description(ssx::sformat("{}: Number of service errors", proto)))
.aggregate(aggregate_labels),
sm::make_counter(
"requests_blocked_memory",
[this] { return _requests_blocked_memory; },
sm::description(ssx::sformat(
"{}: Number of requests blocked in memory backpressure", proto))),
"{}: Number of requests blocked in memory backpressure", proto)))
.aggregate(aggregate_labels),
sm::make_gauge(
"requests_pending",
[this] { return _requests_received - _requests_completed; },
sm::description(ssx::sformat(
"{}: Number of requests being processed by server", proto))),
"{}: Number of requests being processed by server", proto)))
.aggregate(aggregate_labels),
sm::make_counter(
"connections_wait_rate",
[this] { return _connections_wait_rate; },
sm::description(ssx::sformat(
"{}: Number of connections are blocked by connection rate",
proto))),
"{}: Number of connections are blocked by connection rate", proto)))
.aggregate(aggregate_labels),
});
}

Expand Down
23 changes: 17 additions & 6 deletions src/v/pandaproxy/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,26 @@ probe::probe(ss::httpd::path_description& path_desc)
return;
}
namespace sm = ss::metrics;

auto operation_label = sm::label("operation");
std::vector<sm::label_instance> labels{
sm::label("operation")(path_desc.operations.nickname)};
operation_label(path_desc.operations.nickname)};

auto aggregate_labels = std::vector<sm::label>{
sm::shard_label, operation_label};
auto internal_aggregate_labels
= config::shard_local_cfg().aggregate_metrics()
? aggregate_labels
: std::vector<sm::label>{};

_metrics.add_group(
"pandaproxy",
{sm::make_histogram(
"request_latency",
sm::description("Request latency"),
std::move(labels),
[this] { return _request_hist.seastar_histogram_logform(); })});
"request_latency",
sm::description("Request latency"),
labels,
[this] { return _request_hist.seastar_histogram_logform(); })
.aggregate(internal_aggregate_labels)});
}

} // namespace pandaproxy
} // namespace pandaproxy
12 changes: 9 additions & 3 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,26 @@ consensus::consensus(
}

void consensus::setup_metrics() {
namespace sm = ss::metrics;

if (config::shard_local_cfg().disable_metrics()) {
return;
}

_probe.setup_metrics(_log.config().ntp());
auto labels = probe::create_metric_labels(_log.config().ntp());
namespace sm = ss::metrics;
auto aggregate_labels = config::shard_local_cfg().aggregate_metrics()
? std::vector<sm::label>{sm::shard_label}
: std::vector<sm::label>{};

_metrics.add_group(
prometheus_sanitize::metrics_name("raft"),
{sm::make_gauge(
"leader_for",
[this] { return is_elected_leader(); },
sm::description("Number of groups for which node is a leader"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"configuration_change_in_progress",
[this] {
Expand All @@ -162,7 +167,8 @@ void consensus::setup_metrics() {
},
sm::description("Indicates if current raft group configuration is in "
"joint state i.e. configuration is being changed"),
labels)});
labels)
.aggregate(aggregate_labels)});
}

void consensus::do_step_down(std::string_view ctx) {
Expand Down
Loading

0 comments on commit c5c9050

Please sign in to comment.