From b680204ba4b415d3d21ca23cf094979cb8d418c2 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 24 Jun 2022 12:33:49 +0100 Subject: [PATCH] metrics: storage: default aggregations Aggregate over shard, partition by default. Also update a compaction test that was relying on the partition label. Signed-off-by: Ben Pope --- src/v/cluster/partition_probe.cc | 8 +- src/v/storage/probe.cc | 190 ++++++++++-------- .../compacted_term_rolled_recovery_test.py | 21 +- 3 files changed, 124 insertions(+), 95 deletions(-) diff --git a/src/v/cluster/partition_probe.cc b/src/v/cluster/partition_probe.cc index 16458c1ae576d..1b92f0e57f01f 100644 --- a/src/v/cluster/partition_probe.cc +++ b/src/v/cluster/partition_probe.cc @@ -63,8 +63,8 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) { [this] { return _partition.committed_offset(); }, sm::description("Partition commited offset. i.e. safely persisted on " "majority of replicas"), - labels, - aggregate_labels), + labels) + .aggregate(aggregate_labels), sm::make_gauge( "end_offset", [this] { return _partition.dirty_offset(); }, @@ -106,8 +106,8 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) { "records_produced", [this] { return _records_produced; }, sm::description("Total number of records produced"), - labels, - aggregate_labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "records_fetched", [this] { return _records_fetched; }, diff --git a/src/v/storage/probe.cc b/src/v/storage/probe.cc index 49727667da843..507eb97bb1ecf 100644 --- a/src/v/storage/probe.cc +++ b/src/v/storage/probe.cc @@ -70,87 +70,104 @@ void probe::setup_metrics(const model::ntp& ntp) { topic_label(ntp.tp.topic()), partition_label(ntp.tp.partition()), }; + auto aggregate_labels + = config::shard_local_cfg().aggregate_metrics() + ? std::vector{sm::shard_label, partition_label} + : std::vector{}; _metrics.add_group( prometheus_sanitize::metrics_name("storage:log"), - { - sm::make_total_bytes( - "written_bytes", - [this] { return _bytes_written; }, - sm::description("Total number of bytes written"), - labels), - sm::make_counter( - "batches_written", - [this] { return _batches_written; }, - sm::description("Total number of batches written"), - labels), - sm::make_total_bytes( - "read_bytes", - [this] { return _bytes_read; }, - sm::description("Total number of bytes read"), - labels), - sm::make_total_bytes( - "cached_read_bytes", - [this] { return _cached_bytes_read; }, - sm::description("Total number of cached bytes read"), - labels), - sm::make_counter( - "batches_read", - [this] { return _batches_read; }, - sm::description("Total number of batches read"), - labels), - sm::make_counter( - "cached_batches_read", - [this] { return _cached_batches_read; }, - sm::description("Total number of cached batches read"), - labels), - sm::make_counter( - "log_segments_created", - [this] { return _log_segments_created; }, - sm::description("Number of created log segments"), - labels), - sm::make_counter( - "log_segments_removed", - [this] { return _log_segments_removed; }, - sm::description("Number of removed log segments"), - labels), - sm::make_counter( - "log_segments_active", - [this] { return _log_segments_active; }, - sm::description("Number of active log segments"), - labels), - sm::make_counter( - "batch_parse_errors", - [this] { return _batch_parse_errors; }, - sm::description("Number of batch parsing (reading) errors"), - labels), - sm::make_counter( - "batch_write_errors", - [this] { return _batch_write_errors; }, - sm::description("Number of batch write errors"), - labels), - sm::make_counter( - "corrupted_compaction_indices", - [this] { return _corrupted_compaction_index; }, - sm::description("Number of times we had to re-construct the " - ".compaction index on a segment"), - labels), - sm::make_counter( - "compacted_segment", - [this] { return _segment_compacted; }, - sm::description("Number of compacted segments"), - labels), - sm::make_gauge( - "partition_size", - [this] { return _partition_bytes; }, - sm::description("Current size of partition in bytes"), - labels), - sm::make_total_bytes( - "compaction_ratio", - [this] { return _compaction_ratio; }, - sm::description("Average segment compaction ratio"), - labels), - }); + {sm::make_total_bytes( + "written_bytes", + [this] { return _bytes_written; }, + sm::description("Total number of bytes written"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "batches_written", + [this] { return _batches_written; }, + sm::description("Total number of batches written"), + labels) + .aggregate(aggregate_labels), + sm::make_total_bytes( + "read_bytes", + [this] { return _bytes_read; }, + sm::description("Total number of bytes read"), + labels) + .aggregate(aggregate_labels), + sm::make_total_bytes( + "cached_read_bytes", + [this] { return _cached_bytes_read; }, + sm::description("Total number of cached bytes read"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "batches_read", + [this] { return _batches_read; }, + sm::description("Total number of batches read"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "cached_batches_read", + [this] { return _cached_batches_read; }, + sm::description("Total number of cached batches read"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "log_segments_created", + [this] { return _log_segments_created; }, + sm::description("Number of created log segments"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "log_segments_removed", + [this] { return _log_segments_removed; }, + sm::description("Number of removed log segments"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "log_segments_active", + [this] { return _log_segments_active; }, + sm::description("Number of active log segments"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "batch_parse_errors", + [this] { return _batch_parse_errors; }, + sm::description("Number of batch parsing (reading) errors"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "batch_write_errors", + [this] { return _batch_write_errors; }, + sm::description("Number of batch write errors"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "corrupted_compaction_indices", + [this] { return _corrupted_compaction_index; }, + sm::description("Number of times we had to re-construct the " + ".compaction index on a segment"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "compacted_segment", + [this] { return _segment_compacted; }, + sm::description("Number of compacted segments"), + labels) + .aggregate(aggregate_labels), + sm::make_gauge( + "partition_size", + [this] { return _partition_bytes; }, + sm::description("Current size of partition in bytes"), + labels) + .aggregate(aggregate_labels), + sm::make_total_bytes( + "compaction_ratio", + [this] { return _compaction_ratio; }, + sm::description("Average segment compaction ratio"), + labels) + .aggregate(aggregate_labels)}); } void probe::add_initial_segment(const segment& s) { @@ -168,6 +185,11 @@ void readers_cache_probe::setup_metrics(const model::ntp& ntp) { auto ns_label = sm::label("namespace"); auto topic_label = sm::label("topic"); auto partition_label = sm::label("partition"); + auto aggregate_labels + = config::shard_local_cfg().aggregate_metrics() + ? std::vector{sm::shard_label, partition_label} + : std::vector{}; + const std::vector labels = { ns_label(ntp.ns()), topic_label(ntp.tp.topic()), @@ -181,22 +203,26 @@ void readers_cache_probe::setup_metrics(const model::ntp& ntp) { "readers_added", [this] { return _readers_added; }, sm::description("Number of readers added to cache"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "readers_evicted", [this] { return _readers_evicted; }, sm::description("Number of readers evicted from cache"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "cache_hits", [this] { return _cache_hits; }, sm::description("Reader cache hits"), - labels), + labels) + .aggregate(aggregate_labels), sm::make_counter( "cache_misses", [this] { return _cache_misses; }, sm::description("Reader cache misses"), - labels), + labels) + .aggregate(aggregate_labels), }); } } // namespace storage diff --git a/tests/rptest/tests/compacted_term_rolled_recovery_test.py b/tests/rptest/tests/compacted_term_rolled_recovery_test.py index 4b23151f0bef4..f92bfe5a17792 100644 --- a/tests/rptest/tests/compacted_term_rolled_recovery_test.py +++ b/tests/rptest/tests/compacted_term_rolled_recovery_test.py @@ -16,7 +16,8 @@ class CompactionTermRollRecoveryTest(RedpandaTest): - topics = (TopicSpec(cleanup_policy=TopicSpec.CLEANUP_COMPACT), ) + topics = (TopicSpec(cleanup_policy=TopicSpec.CLEANUP_COMPACT, + partition_count=1), ) def __init__(self, test_context): extra_rp_conf = dict( @@ -41,7 +42,10 @@ def test_compact_term_rolled_recovery(self): TODO: this test should be generalized so we can test this basic recovery scenario with various topic configurations. """ - # operate on a partition. doesn't matter which one + # Operate on the topics single partition. This is relevant + # because we use a metric that is aggregated by partition + # to retrieve the number of compacted segments, which means + # that it cannot differentiate between multiple partitions. partition = self.redpanda.partitions(self.topic)[0] # stop a replica in order to test its recovery @@ -50,7 +54,7 @@ def test_compact_term_rolled_recovery(self): self.redpanda.stop_node(needs_recovery) # produce until segments have been compacted - self._produce_until_compaction(others, self.topic, partition.index) + self._produce_until_compaction(others, self.topic) # restart all replicas: rolls term, starts recovery self.redpanda.restart_nodes(all_replicas) @@ -58,7 +62,7 @@ def test_compact_term_rolled_recovery(self): # ensure that the first stopped node recovered ok self._wait_until_recovered(all_replicas, self.topic, partition.index) - def _produce_until_compaction(self, nodes, topic, partition): + def _produce_until_compaction(self, nodes, topic): """ Produce into the topic until some new segments have been compacted. """ @@ -66,13 +70,13 @@ def _produce_until_compaction(self, nodes, topic, partition): target = list( map(lambda cnt: cnt + num_segs, - self._compacted_segments(nodes, topic, partition))) + self._compacted_segments(nodes, topic))) kafka_tools = KafkaCliTools(self.redpanda) def done(): kafka_tools.produce(self.topic, 1024, 1024) - curr = self._compacted_segments(nodes, topic, partition) + curr = self._compacted_segments(nodes, topic) return all(map(lambda cnt: cnt[0] > cnt[1], zip(curr, target))) wait_until(done, @@ -80,7 +84,7 @@ def done(): backoff_sec=2, err_msg="Compacted segments were not created") - def _compacted_segments(self, nodes, topic, partition): + def _compacted_segments(self, nodes, topic): """ Fetch the number of compacted segments. @@ -95,8 +99,7 @@ def fetch(node): for sample in family.samples: if sample.name == "vectorized_storage_log_compacted_segment_total" and \ sample.labels["namespace"] == "kafka" and \ - sample.labels["topic"] == topic and \ - int(sample.labels["partition"]) == partition: + sample.labels["topic"] == topic: count += int(sample.value) self.logger.debug(count) return count