diff --git a/src/v/archival/probe.cc b/src/v/archival/probe.cc index f5008f8e9069..15d2167d5991 100644 --- a/src/v/archival/probe.cc +++ b/src/v/archival/probe.cc @@ -10,6 +10,7 @@ #include "archival/probe.h" +#include "config/configuration.h" #include "prometheus/prometheus_sanitize.h" #include @@ -32,6 +33,9 @@ ntp_level_probe::ntp_level_probe( topic_label(ntp.tp.topic()), partition_label(ntp.tp.partition()), }; + auto aggregate_labels = config::shard_local_cfg().aggregate_metrics() + ? std::vector{sm::shard_label} + : std::vector{}; _metrics.add_group( prometheus_sanitize::metrics_name("ntp_archiver"), @@ -40,22 +44,26 @@ ntp_level_probe::ntp_level_probe( "uploaded", [this] { return _uploaded; }, sm::description("Uploaded offsets"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_total_bytes( "uploaded_bytes", [this] { return _uploaded_bytes; }, sm::description("Total number of uploaded bytes"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "missing", [this] { return _missing; }, sm::description("Missing offsets due to gaps"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_gauge( "pending", [this] { return _pending; }, sm::description("Pending offsets"), - labels), + labels) + .aggregate(aggregate_labels), }); } diff --git a/src/v/cluster/partition_probe.cc b/src/v/cluster/partition_probe.cc index 0f2461df452e..1b92f0e57f01 100644 --- a/src/v/cluster/partition_probe.cc +++ b/src/v/cluster/partition_probe.cc @@ -32,6 +32,9 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) { auto ns_label = sm::label("namespace"); auto topic_label = sm::label("topic"); auto partition_label = sm::label("partition"); + auto aggregate_labels = config::shard_local_cfg().aggregate_metrics() + ? std::vector{sm::shard_label} + : std::vector{}; const std::vector labels = { ns_label(ntp.ns()), @@ -47,30 +50,35 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) { [this] { return _partition.is_elected_leader() ? 1 : 0; }, sm::description( "Flag indicating if this partition instance is a leader"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_gauge( "last_stable_offset", [this] { return _partition.last_stable_offset(); }, sm::description("Last stable offset"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_gauge( "committed_offset", [this] { return _partition.committed_offset(); }, sm::description("Partition commited offset. i.e. safely persisted on " "majority of replicas"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_gauge( "end_offset", [this] { return _partition.dirty_offset(); }, sm::description( "Last offset stored by current partition on this node"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_gauge( "high_watermark", [this] { return _partition.high_watermark(); }, sm::description( "Partion high watermark i.e. highest consumable offset"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_gauge( "leader_id", [this] { @@ -78,7 +86,8 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) { model::node_id(-1)); }, sm::description("Id of current partition leader"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_gauge( "under_replicated_replicas", [this] { @@ -91,27 +100,32 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) { }); }, sm::description("Number of under replicated replicas"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "records_produced", [this] { return _records_produced; }, sm::description("Total number of records produced"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "records_fetched", [this] { return _records_fetched; }, sm::description("Total number of records fetched"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_total_bytes( "bytes_produced_total", [this] { return _bytes_produced; }, sm::description("Total number of bytes produced"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_total_bytes( "bytes_fetched_total", [this] { return _bytes_fetched; }, sm::description("Total number of bytes fetched"), - labels), + labels) + .aggregate(aggregate_labels), }); } partition_probe make_materialized_partition_probe() { diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 9b32af4b1afb..b6d44e2031ee 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -237,6 +237,17 @@ configuration::configuration() "Disable registering metrics", base_property::metadata{}, false) + , aggregate_metrics( + *this, + "aggregate_metrics", + "Enable aggregations of metrics returned by the prometheus '/metrics' " + "endpoint. " + "Metric aggregation is performed by summing the values of samples by " + "labels. " + "Aggregations are performed where it makes sense by the shard and/or " + "partition labels.", + {.needs_restart = needs_restart::yes}, + false) , group_min_session_timeout_ms( *this, "group_min_session_timeout_ms", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index a38b7598de35..15be6702d444 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -80,6 +80,7 @@ struct configuration final : public config_store { bounded_property target_quota_byte_rate; property> cluster_id; property disable_metrics; + property aggregate_metrics; property group_min_session_timeout_ms; property group_max_session_timeout_ms; property group_initial_rebalance_delay; diff --git a/src/v/kafka/latency_probe.h b/src/v/kafka/latency_probe.h index ef161c134ce7..fc7902abd355 100644 --- a/src/v/kafka/latency_probe.h +++ b/src/v/kafka/latency_probe.h @@ -28,18 +28,23 @@ class latency_probe { } std::vector labels{ sm::label("latency_metric")("microseconds")}; + auto aggregate_labels = config::shard_local_cfg().aggregate_metrics() + ? std::vector{sm::shard_label} + : std::vector{}; _metrics.add_group( prometheus_sanitize::metrics_name("kafka:latency"), {sm::make_histogram( "fetch_latency_us", sm::description("Fetch Latency"), labels, - [this] { return _fetch_latency.seastar_histogram_logform(); }), + [this] { return _fetch_latency.seastar_histogram_logform(); }) + .aggregate(aggregate_labels), sm::make_histogram( "produce_latency_us", sm::description("Produce Latency"), labels, - [this] { return _produce_latency.seastar_histogram_logform(); })}); + [this] { return _produce_latency.seastar_histogram_logform(); }) + .aggregate(aggregate_labels)}); } std::unique_ptr auto_produce_measurement() { diff --git a/src/v/net/probes.cc b/src/v/net/probes.cc index f8feb3c76705..69a226f9a2ea 100644 --- a/src/v/net/probes.cc +++ b/src/v/net/probes.cc @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 +#include "config/configuration.h" #include "net/client_probe.h" #include "net/server_probe.h" #include "prometheus/prometheus_sanitize.h" @@ -21,6 +22,9 @@ namespace net { void server_probe::setup_metrics( ss::metrics::metric_groups& mgs, const char* proto) { namespace sm = ss::metrics; + auto aggregate_labels = config::shard_local_cfg().aggregate_metrics() + ? std::vector{sm::shard_label} + : std::vector{}; mgs.add_group( prometheus_sanitize::metrics_name(proto), { @@ -28,69 +32,81 @@ void server_probe::setup_metrics( "active_connections", [this] { return _connections; }, sm::description( - ssx::sformat("{}: Currently active connections", proto))), + ssx::sformat("{}: Currently active connections", proto))) + .aggregate(aggregate_labels), sm::make_counter( "connects", [this] { return _connects; }, sm::description( - ssx::sformat("{}: Number of accepted connections", proto))), + ssx::sformat("{}: Number of accepted connections", proto))) + .aggregate(aggregate_labels), sm::make_counter( "connection_close_errors", [this] { return _connection_close_error; }, sm::description(ssx::sformat( - "{}: Number of errors when shutting down the connection", proto))), + "{}: Number of errors when shutting down the connection", proto))) + .aggregate(aggregate_labels), sm::make_counter( "connections_rejected", [this] { return _connections_rejected; }, sm::description(ssx::sformat( "{}: Number of connections rejected for hitting connection limits", - proto))), + proto))) + .aggregate(aggregate_labels), sm::make_counter( "requests_completed", [this] { return _requests_completed; }, sm::description( - ssx::sformat("{}: Number of successful requests", proto))), + ssx::sformat("{}: Number of successful requests", proto))) + .aggregate(aggregate_labels), sm::make_total_bytes( "received_bytes", [this] { return _in_bytes; }, sm::description(ssx::sformat( "{}: Number of bytes received from the clients in valid requests", - proto))), + proto))) + .aggregate(aggregate_labels), sm::make_total_bytes( "sent_bytes", [this] { return _out_bytes; }, sm::description( - ssx::sformat("{}: Number of bytes sent to clients", proto))), + ssx::sformat("{}: Number of bytes sent to clients", proto))) + .aggregate(aggregate_labels), sm::make_counter( "method_not_found_errors", [this] { return _method_not_found_errors; }, sm::description(ssx::sformat( - "{}: Number of requests with not available RPC method", proto))), + "{}: Number of requests with not available RPC method", proto))) + .aggregate(aggregate_labels), sm::make_counter( "corrupted_headers", [this] { return _corrupted_headers; }, sm::description(ssx::sformat( - "{}: Number of requests with corrupted headers", proto))), + "{}: Number of requests with corrupted headers", proto))) + .aggregate(aggregate_labels), sm::make_counter( "service_errors", [this] { return _service_errors; }, - sm::description(ssx::sformat("{}: Number of service errors", proto))), + sm::description(ssx::sformat("{}: Number of service errors", proto))) + .aggregate(aggregate_labels), sm::make_counter( "requests_blocked_memory", [this] { return _requests_blocked_memory; }, sm::description(ssx::sformat( - "{}: Number of requests blocked in memory backpressure", proto))), + "{}: Number of requests blocked in memory backpressure", proto))) + .aggregate(aggregate_labels), sm::make_gauge( "requests_pending", [this] { return _requests_received - _requests_completed; }, sm::description(ssx::sformat( - "{}: Number of requests being processed by server", proto))), + "{}: Number of requests being processed by server", proto))) + .aggregate(aggregate_labels), sm::make_counter( "connections_wait_rate", [this] { return _connections_wait_rate; }, sm::description(ssx::sformat( - "{}: Number of connections are blocked by connection rate", - proto))), + "{}: Number of connections are blocked by connection rate", proto))) + .aggregate(aggregate_labels), }); } diff --git a/src/v/pandaproxy/probe.cc b/src/v/pandaproxy/probe.cc index 981f335df724..8d1227885a17 100644 --- a/src/v/pandaproxy/probe.cc +++ b/src/v/pandaproxy/probe.cc @@ -23,15 +23,26 @@ probe::probe(ss::httpd::path_description& path_desc) return; } namespace sm = ss::metrics; + + auto operation_label = sm::label("operation"); std::vector labels{ - sm::label("operation")(path_desc.operations.nickname)}; + operation_label(path_desc.operations.nickname)}; + + auto aggregate_labels = std::vector{ + sm::shard_label, operation_label}; + auto internal_aggregate_labels + = config::shard_local_cfg().aggregate_metrics() + ? aggregate_labels + : std::vector{}; + _metrics.add_group( "pandaproxy", {sm::make_histogram( - "request_latency", - sm::description("Request latency"), - std::move(labels), - [this] { return _request_hist.seastar_histogram_logform(); })}); + "request_latency", + sm::description("Request latency"), + labels, + [this] { return _request_hist.seastar_histogram_logform(); }) + .aggregate(internal_aggregate_labels)}); } -} // namespace pandaproxy \ No newline at end of file +} // namespace pandaproxy diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 07a2da8d56bc..ef5a09a19ead 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -136,13 +136,17 @@ consensus::consensus( } void consensus::setup_metrics() { + namespace sm = ss::metrics; + if (config::shard_local_cfg().disable_metrics()) { return; } _probe.setup_metrics(_log.config().ntp()); auto labels = probe::create_metric_labels(_log.config().ntp()); - namespace sm = ss::metrics; + auto aggregate_labels = config::shard_local_cfg().aggregate_metrics() + ? std::vector{sm::shard_label} + : std::vector{}; _metrics.add_group( prometheus_sanitize::metrics_name("raft"), @@ -150,7 +154,8 @@ void consensus::setup_metrics() { "leader_for", [this] { return is_elected_leader(); }, sm::description("Number of groups for which node is a leader"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_gauge( "configuration_change_in_progress", [this] { @@ -160,7 +165,8 @@ void consensus::setup_metrics() { }, sm::description("Indicates if current raft group configuration is in " "joint state i.e. configuration is being changed"), - labels)}); + labels) + .aggregate(aggregate_labels)}); } void consensus::do_step_down(std::string_view ctx) { diff --git a/src/v/raft/probe.cc b/src/v/raft/probe.cc index 1180ec055729..37ab047d47db 100644 --- a/src/v/raft/probe.cc +++ b/src/v/raft/probe.cc @@ -33,6 +33,11 @@ probe::create_metric_labels(const model::ntp& ntp) { void probe::setup_metrics(const model::ntp& ntp) { namespace sm = ss::metrics; auto labels = create_metric_labels(ntp); + auto aggregate_labels + = config::shard_local_cfg().aggregate_metrics() + ? std::vector{sm::shard_label, sm::label("partition")} + : std::vector{}; + ; _metrics.add_group( prometheus_sanitize::metrics_name("raft"), @@ -40,70 +45,83 @@ void probe::setup_metrics(const model::ntp& ntp) { "received_vote_requests", [this] { return _vote_requests; }, sm::description("Number of vote requests received"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "received_append_requests", [this] { return _append_requests; }, sm::description("Number of append requests received"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "sent_vote_requests", [this] { return _vote_requests_sent; }, sm::description("Number of vote requests sent"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "replicate_ack_all_requests", [this] { return _replicate_requests_ack_all; }, sm::description( "Number of replicate requests with quorum ack consistency"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "replicate_ack_leader_requests", [this] { return _replicate_requests_ack_leader; }, sm::description( "Number of replicate requests with leader ack consistency"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "replicate_ack_none_requests", [this] { return _replicate_requests_ack_none; }, sm::description( "Number of replicate requests with no ack consistency"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "done_replicate_requests", [this] { return _replicate_requests_done; }, sm::description("Number of finished replicate requests"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "log_flushes", [this] { return _log_flushes; }, sm::description("Number of log flushes"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "log_truncations", [this] { return _log_truncations; }, sm::description("Number of log truncations"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "leadership_changes", [this] { return _leadership_changes; }, sm::description("Number of leadership changes"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "replicate_request_errors", [this] { return _replicate_request_error; }, sm::description("Number of failed replicate requests"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "heartbeat_requests_errors", [this] { return _heartbeat_request_error; }, sm::description("Number of failed heartbeat requests"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "recovery_requests_errors", [this] { return _recovery_request_error; }, sm::description("Number of failed recovery requests"), - labels)}); + labels) + .aggregate(aggregate_labels)}); } } // namespace raft diff --git a/src/v/storage/probe.cc b/src/v/storage/probe.cc index 49727667da84..035c9915b9eb 100644 --- a/src/v/storage/probe.cc +++ b/src/v/storage/probe.cc @@ -70,87 +70,104 @@ void probe::setup_metrics(const model::ntp& ntp) { topic_label(ntp.tp.topic()), partition_label(ntp.tp.partition()), }; + auto aggregate_labels + = config::shard_local_cfg().aggregate_metrics() + ? std::vector{sm::shard_label, partition_label} + : std::vector{}; _metrics.add_group( prometheus_sanitize::metrics_name("storage:log"), - { - sm::make_total_bytes( - "written_bytes", - [this] { return _bytes_written; }, - sm::description("Total number of bytes written"), - labels), - sm::make_counter( - "batches_written", - [this] { return _batches_written; }, - sm::description("Total number of batches written"), - labels), - sm::make_total_bytes( - "read_bytes", - [this] { return _bytes_read; }, - sm::description("Total number of bytes read"), - labels), - sm::make_total_bytes( - "cached_read_bytes", - [this] { return _cached_bytes_read; }, - sm::description("Total number of cached bytes read"), - labels), - sm::make_counter( - "batches_read", - [this] { return _batches_read; }, - sm::description("Total number of batches read"), - labels), - sm::make_counter( - "cached_batches_read", - [this] { return _cached_batches_read; }, - sm::description("Total number of cached batches read"), - labels), - sm::make_counter( - "log_segments_created", - [this] { return _log_segments_created; }, - sm::description("Number of created log segments"), - labels), - sm::make_counter( - "log_segments_removed", - [this] { return _log_segments_removed; }, - sm::description("Number of removed log segments"), - labels), - sm::make_counter( - "log_segments_active", - [this] { return _log_segments_active; }, - sm::description("Number of active log segments"), - labels), - sm::make_counter( - "batch_parse_errors", - [this] { return _batch_parse_errors; }, - sm::description("Number of batch parsing (reading) errors"), - labels), - sm::make_counter( - "batch_write_errors", - [this] { return _batch_write_errors; }, - sm::description("Number of batch write errors"), - labels), - sm::make_counter( - "corrupted_compaction_indices", - [this] { return _corrupted_compaction_index; }, - sm::description("Number of times we had to re-construct the " - ".compaction index on a segment"), - labels), - sm::make_counter( - "compacted_segment", - [this] { return _segment_compacted; }, - sm::description("Number of compacted segments"), - labels), - sm::make_gauge( - "partition_size", - [this] { return _partition_bytes; }, - sm::description("Current size of partition in bytes"), - labels), - sm::make_total_bytes( - "compaction_ratio", - [this] { return _compaction_ratio; }, - sm::description("Average segment compaction ratio"), - labels), - }); + {sm::make_total_bytes( + "written_bytes", + [this] { return _bytes_written; }, + sm::description("Total number of bytes written"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "batches_written", + [this] { return _batches_written; }, + sm::description("Total number of batches written"), + labels) + .aggregate(aggregate_labels), + sm::make_total_bytes( + "read_bytes", + [this] { return _bytes_read; }, + sm::description("Total number of bytes read"), + labels) + .aggregate(aggregate_labels), + sm::make_total_bytes( + "cached_read_bytes", + [this] { return _cached_bytes_read; }, + sm::description("Total number of cached bytes read"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "batches_read", + [this] { return _batches_read; }, + sm::description("Total number of batches read"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "cached_batches_read", + [this] { return _cached_batches_read; }, + sm::description("Total number of cached batches read"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "log_segments_created", + [this] { return _log_segments_created; }, + sm::description("Number of created log segments"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "log_segments_removed", + [this] { return _log_segments_removed; }, + sm::description("Number of removed log segments"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "log_segments_active", + [this] { return _log_segments_active; }, + sm::description("Number of active log segments"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "batch_parse_errors", + [this] { return _batch_parse_errors; }, + sm::description("Number of batch parsing (reading) errors"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "batch_write_errors", + [this] { return _batch_write_errors; }, + sm::description("Number of batch write errors"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "corrupted_compaction_indices", + [this] { return _corrupted_compaction_index; }, + sm::description("Number of times we had to re-construct the " + ".compaction index on a segment"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "compacted_segment", + [this] { return _segment_compacted; }, + sm::description("Number of compacted segments"), + labels) + .aggregate(aggregate_labels), + sm::make_gauge( + "partition_size", + [this] { return _partition_bytes; }, + sm::description("Current size of partition in bytes"), + labels) + .aggregate(aggregate_labels), + sm::make_total_bytes( + "compaction_ratio", + [this] { return _compaction_ratio; }, + sm::description("Average segment compaction ratio"), + labels) + .aggregate({sm::shard_label})}); } void probe::add_initial_segment(const segment& s) { @@ -168,6 +185,11 @@ void readers_cache_probe::setup_metrics(const model::ntp& ntp) { auto ns_label = sm::label("namespace"); auto topic_label = sm::label("topic"); auto partition_label = sm::label("partition"); + auto aggregate_labels + = config::shard_local_cfg().aggregate_metrics() + ? std::vector{sm::shard_label, partition_label} + : std::vector{}; + const std::vector labels = { ns_label(ntp.ns()), topic_label(ntp.tp.topic()), @@ -181,22 +203,26 @@ void readers_cache_probe::setup_metrics(const model::ntp& ntp) { "readers_added", [this] { return _readers_added; }, sm::description("Number of readers added to cache"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "readers_evicted", [this] { return _readers_evicted; }, sm::description("Number of readers evicted from cache"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "cache_hits", [this] { return _cache_hits; }, sm::description("Reader cache hits"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "cache_misses", [this] { return _cache_misses; }, sm::description("Reader cache misses"), - labels), + labels) + .aggregate(aggregate_labels), }); } } // namespace storage diff --git a/tests/rptest/tests/compacted_term_rolled_recovery_test.py b/tests/rptest/tests/compacted_term_rolled_recovery_test.py index 4b23151f0bef..f92bfe5a1779 100644 --- a/tests/rptest/tests/compacted_term_rolled_recovery_test.py +++ b/tests/rptest/tests/compacted_term_rolled_recovery_test.py @@ -16,7 +16,8 @@ class CompactionTermRollRecoveryTest(RedpandaTest): - topics = (TopicSpec(cleanup_policy=TopicSpec.CLEANUP_COMPACT), ) + topics = (TopicSpec(cleanup_policy=TopicSpec.CLEANUP_COMPACT, + partition_count=1), ) def __init__(self, test_context): extra_rp_conf = dict( @@ -41,7 +42,10 @@ def test_compact_term_rolled_recovery(self): TODO: this test should be generalized so we can test this basic recovery scenario with various topic configurations. """ - # operate on a partition. doesn't matter which one + # Operate on the topics single partition. This is relevant + # because we use a metric that is aggregated by partition + # to retrieve the number of compacted segments, which means + # that it cannot differentiate between multiple partitions. partition = self.redpanda.partitions(self.topic)[0] # stop a replica in order to test its recovery @@ -50,7 +54,7 @@ def test_compact_term_rolled_recovery(self): self.redpanda.stop_node(needs_recovery) # produce until segments have been compacted - self._produce_until_compaction(others, self.topic, partition.index) + self._produce_until_compaction(others, self.topic) # restart all replicas: rolls term, starts recovery self.redpanda.restart_nodes(all_replicas) @@ -58,7 +62,7 @@ def test_compact_term_rolled_recovery(self): # ensure that the first stopped node recovered ok self._wait_until_recovered(all_replicas, self.topic, partition.index) - def _produce_until_compaction(self, nodes, topic, partition): + def _produce_until_compaction(self, nodes, topic): """ Produce into the topic until some new segments have been compacted. """ @@ -66,13 +70,13 @@ def _produce_until_compaction(self, nodes, topic, partition): target = list( map(lambda cnt: cnt + num_segs, - self._compacted_segments(nodes, topic, partition))) + self._compacted_segments(nodes, topic))) kafka_tools = KafkaCliTools(self.redpanda) def done(): kafka_tools.produce(self.topic, 1024, 1024) - curr = self._compacted_segments(nodes, topic, partition) + curr = self._compacted_segments(nodes, topic) return all(map(lambda cnt: cnt[0] > cnt[1], zip(curr, target))) wait_until(done, @@ -80,7 +84,7 @@ def done(): backoff_sec=2, err_msg="Compacted segments were not created") - def _compacted_segments(self, nodes, topic, partition): + def _compacted_segments(self, nodes, topic): """ Fetch the number of compacted segments. @@ -95,8 +99,7 @@ def fetch(node): for sample in family.samples: if sample.name == "vectorized_storage_log_compacted_segment_total" and \ sample.labels["namespace"] == "kafka" and \ - sample.labels["topic"] == topic and \ - int(sample.labels["partition"]) == partition: + sample.labels["topic"] == topic: count += int(sample.value) self.logger.debug(count) return count diff --git a/tools/rpcgen.py b/tools/rpcgen.py index c03fe0077789..f932169e2f5c 100755 --- a/tools/rpcgen.py +++ b/tools/rpcgen.py @@ -24,6 +24,7 @@ // This file is autogenerated. Manual changes will be lost. #pragma once +#include "config/configuration.h" #include "reflection/adl.h" #include "rpc/types.h" #include "rpc/parse_utils.h" @@ -86,13 +87,18 @@ class failure_probes; std::vector labels{ service_label("{{service_name}}"), method_label("{{method.name}}")}; + auto aggregate_labels + = config::shard_local_cfg().aggregate_metrics() + ? std::vector{sm::shard_label, method_label} + : std::vector{}; _metrics.add_group( prometheus_sanitize::metrics_name("internal_rpc"), {sm::make_histogram( "latency", [this] { return _methods[{{loop.index-1}}].probes.latency_hist().seastar_histogram_logform(); }, sm::description("Internal RPC service latency"), - labels)}); + labels) + .aggregate(aggregate_labels)}); } {%- endfor %} }