Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate metrics to reduce cardinality #5166

Merged
merged 10 commits into from
Jul 4, 2022
8 changes: 4 additions & 4 deletions src/v/cluster/partition_probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) {
[this] { return _partition.committed_offset(); },
sm::description("Partition commited offset. i.e. safely persisted on "
"majority of replicas"),
labels,
aggregate_labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"end_offset",
[this] { return _partition.dirty_offset(); },
Expand Down Expand Up @@ -106,8 +106,8 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) {
"records_produced",
[this] { return _records_produced; },
sm::description("Total number of records produced"),
labels,
aggregate_labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"records_fetched",
[this] { return _records_fetched; },
Expand Down
190 changes: 108 additions & 82 deletions src/v/storage/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,87 +70,104 @@ void probe::setup_metrics(const model::ntp& ntp) {
topic_label(ntp.tp.topic()),
partition_label(ntp.tp.partition()),
};
auto aggregate_labels
= config::shard_local_cfg().aggregate_metrics()
? std::vector<sm::label>{sm::shard_label, partition_label}
: std::vector<sm::label>{};

_metrics.add_group(
prometheus_sanitize::metrics_name("storage:log"),
{
sm::make_total_bytes(
"written_bytes",
[this] { return _bytes_written; },
sm::description("Total number of bytes written"),
labels),
sm::make_counter(
"batches_written",
[this] { return _batches_written; },
sm::description("Total number of batches written"),
labels),
sm::make_total_bytes(
"read_bytes",
[this] { return _bytes_read; },
sm::description("Total number of bytes read"),
labels),
sm::make_total_bytes(
"cached_read_bytes",
[this] { return _cached_bytes_read; },
sm::description("Total number of cached bytes read"),
labels),
sm::make_counter(
"batches_read",
[this] { return _batches_read; },
sm::description("Total number of batches read"),
labels),
sm::make_counter(
"cached_batches_read",
[this] { return _cached_batches_read; },
sm::description("Total number of cached batches read"),
labels),
sm::make_counter(
"log_segments_created",
[this] { return _log_segments_created; },
sm::description("Number of created log segments"),
labels),
sm::make_counter(
"log_segments_removed",
[this] { return _log_segments_removed; },
sm::description("Number of removed log segments"),
labels),
sm::make_counter(
"log_segments_active",
[this] { return _log_segments_active; },
sm::description("Number of active log segments"),
labels),
sm::make_counter(
"batch_parse_errors",
[this] { return _batch_parse_errors; },
sm::description("Number of batch parsing (reading) errors"),
labels),
sm::make_counter(
"batch_write_errors",
[this] { return _batch_write_errors; },
sm::description("Number of batch write errors"),
labels),
sm::make_counter(
"corrupted_compaction_indices",
[this] { return _corrupted_compaction_index; },
sm::description("Number of times we had to re-construct the "
".compaction index on a segment"),
labels),
sm::make_counter(
"compacted_segment",
[this] { return _segment_compacted; },
sm::description("Number of compacted segments"),
labels),
sm::make_gauge(
"partition_size",
[this] { return _partition_bytes; },
sm::description("Current size of partition in bytes"),
labels),
sm::make_total_bytes(
"compaction_ratio",
[this] { return _compaction_ratio; },
sm::description("Average segment compaction ratio"),
labels),
});
{sm::make_total_bytes(
"written_bytes",
Comment on lines +80 to +81
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note for next time: in cases like this where one line of change is causing clang-format to change a lot it makes reviewing difficult. to deal with this you can split the commit which makes the change into two commits where the second one only applies the formatting update. that way the diff is much easier to review.

[this] { return _bytes_written; },
sm::description("Total number of bytes written"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"batches_written",
[this] { return _batches_written; },
sm::description("Total number of batches written"),
labels)
.aggregate(aggregate_labels),
sm::make_total_bytes(
"read_bytes",
[this] { return _bytes_read; },
sm::description("Total number of bytes read"),
labels)
.aggregate(aggregate_labels),
sm::make_total_bytes(
"cached_read_bytes",
[this] { return _cached_bytes_read; },
sm::description("Total number of cached bytes read"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"batches_read",
[this] { return _batches_read; },
sm::description("Total number of batches read"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"cached_batches_read",
[this] { return _cached_batches_read; },
sm::description("Total number of cached batches read"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"log_segments_created",
[this] { return _log_segments_created; },
sm::description("Number of created log segments"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"log_segments_removed",
[this] { return _log_segments_removed; },
sm::description("Number of removed log segments"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"log_segments_active",
[this] { return _log_segments_active; },
sm::description("Number of active log segments"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"batch_parse_errors",
[this] { return _batch_parse_errors; },
sm::description("Number of batch parsing (reading) errors"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"batch_write_errors",
[this] { return _batch_write_errors; },
sm::description("Number of batch write errors"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"corrupted_compaction_indices",
[this] { return _corrupted_compaction_index; },
sm::description("Number of times we had to re-construct the "
".compaction index on a segment"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"compacted_segment",
[this] { return _segment_compacted; },
sm::description("Number of compacted segments"),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"partition_size",
[this] { return _partition_bytes; },
sm::description("Current size of partition in bytes"),
labels)
.aggregate(aggregate_labels),
sm::make_total_bytes(
"compaction_ratio",
[this] { return _compaction_ratio; },
sm::description("Average segment compaction ratio"),
labels)
.aggregate({sm::shard_label})});
}

void probe::add_initial_segment(const segment& s) {
Expand All @@ -168,6 +185,11 @@ void readers_cache_probe::setup_metrics(const model::ntp& ntp) {
auto ns_label = sm::label("namespace");
auto topic_label = sm::label("topic");
auto partition_label = sm::label("partition");
auto aggregate_labels
= config::shard_local_cfg().aggregate_metrics()
? std::vector<sm::label>{sm::shard_label, partition_label}
: std::vector<sm::label>{};

const std::vector<sm::label_instance> labels = {
ns_label(ntp.ns()),
topic_label(ntp.tp.topic()),
Expand All @@ -181,22 +203,26 @@ void readers_cache_probe::setup_metrics(const model::ntp& ntp) {
"readers_added",
[this] { return _readers_added; },
sm::description("Number of readers added to cache"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"readers_evicted",
[this] { return _readers_evicted; },
sm::description("Number of readers evicted from cache"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"cache_hits",
[this] { return _cache_hits; },
sm::description("Reader cache hits"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"cache_misses",
[this] { return _cache_misses; },
sm::description("Reader cache misses"),
labels),
labels)
.aggregate(aggregate_labels),
});
}
} // namespace storage
21 changes: 12 additions & 9 deletions tests/rptest/tests/compacted_term_rolled_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@


class CompactionTermRollRecoveryTest(RedpandaTest):
topics = (TopicSpec(cleanup_policy=TopicSpec.CLEANUP_COMPACT), )
topics = (TopicSpec(cleanup_policy=TopicSpec.CLEANUP_COMPACT,
partition_count=1), )

def __init__(self, test_context):
extra_rp_conf = dict(
Expand All @@ -41,7 +42,10 @@ def test_compact_term_rolled_recovery(self):
TODO: this test should be generalized so we can test this basic recovery
scenario with various topic configurations.
"""
# operate on a partition. doesn't matter which one
# Operate on the topics single partition. This is relevant
# because we use a metric that is aggregated by partition
# to retrieve the number of compacted segments, which means
# that it cannot differentiate between multiple partitions.
partition = self.redpanda.partitions(self.topic)[0]

# stop a replica in order to test its recovery
Expand All @@ -50,37 +54,37 @@ def test_compact_term_rolled_recovery(self):
self.redpanda.stop_node(needs_recovery)

# produce until segments have been compacted
self._produce_until_compaction(others, self.topic, partition.index)
self._produce_until_compaction(others, self.topic)

# restart all replicas: rolls term, starts recovery
self.redpanda.restart_nodes(all_replicas)

# ensure that the first stopped node recovered ok
self._wait_until_recovered(all_replicas, self.topic, partition.index)

def _produce_until_compaction(self, nodes, topic, partition):
def _produce_until_compaction(self, nodes, topic):
"""
Produce into the topic until some new segments have been compacted.
"""
num_segs = 3

target = list(
map(lambda cnt: cnt + num_segs,
self._compacted_segments(nodes, topic, partition)))
self._compacted_segments(nodes, topic)))

kafka_tools = KafkaCliTools(self.redpanda)

def done():
kafka_tools.produce(self.topic, 1024, 1024)
curr = self._compacted_segments(nodes, topic, partition)
curr = self._compacted_segments(nodes, topic)
return all(map(lambda cnt: cnt[0] > cnt[1], zip(curr, target)))

wait_until(done,
timeout_sec=60,
backoff_sec=2,
err_msg="Compacted segments were not created")

def _compacted_segments(self, nodes, topic, partition):
def _compacted_segments(self, nodes, topic):
"""
Fetch the number of compacted segments.

Expand All @@ -95,8 +99,7 @@ def fetch(node):
for sample in family.samples:
if sample.name == "vectorized_storage_log_compacted_segment_total" and \
sample.labels["namespace"] == "kafka" and \
sample.labels["topic"] == topic and \
int(sample.labels["partition"]) == partition:
sample.labels["topic"] == topic:
count += int(sample.value)
self.logger.debug(count)
return count
Expand Down