Skip to content

Commit

Permalink
metrics: storage: default aggregations
Browse files Browse the repository at this point in the history
Aggregate over shard, partition by default. Also update a compaction
test that was relying on the partition label.

Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
Vlad Lazar committed Jun 30, 2022
1 parent 39db1cf commit b680204
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 95 deletions.
8 changes: 4 additions & 4 deletions src/v/cluster/partition_probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) {
[this] { return _partition.committed_offset(); },
sm::description("Partition commited offset. i.e. safely persisted on "
"majority of replicas"),
labels,
aggregate_labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"end_offset",
[this] { return _partition.dirty_offset(); },
Expand Down Expand Up @@ -106,8 +106,8 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) {
"records_produced",
[this] { return _records_produced; },
sm::description("Total number of records produced"),
labels,
aggregate_labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"records_fetched",
[this] { return _records_fetched; },
Expand Down
190 changes: 108 additions & 82 deletions src/v/storage/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,87 +70,104 @@ void probe::setup_metrics(const model::ntp& ntp) {
topic_label(ntp.tp.topic()),
partition_label(ntp.tp.partition()),
};
auto aggregate_labels
= config::shard_local_cfg().aggregate_metrics()
? std::vector<sm::label>{sm::shard_label, partition_label}
: std::vector<sm::label>{};

_metrics.add_group(
prometheus_sanitize::metrics_name("storage:log"),
{
sm::make_total_bytes(
"written_bytes",
[this] { return _bytes_written; },
sm::description("Total number of bytes written"),
labels),
sm::make_counter(
"batches_written",
[this] { return _batches_written; },
sm::description("Total number of batches written"),
labels),
sm::make_total_bytes(
"read_bytes",
[this] { return _bytes_read; },
sm::description("Total number of bytes read"),
labels),
sm::make_total_bytes(
"cached_read_bytes",
[this] { return _cached_bytes_read; },
sm::description("Total number of cached bytes read"),
labels),
sm::make_counter(
"batches_read",
[this] { return _batches_read; },
sm::description("Total number of batches read"),
labels),
sm::make_counter(
"cached_batches_read",
[this] { return _cached_batches_read; },
sm::description("Total number of cached batches read"),
labels),
sm::make_counter(
"log_segments_created",
[this] { return _log_segments_created; },
sm::description("Number of created log segments"),
labels),
sm::make_counter(
"log_segments_removed",
[this] { return _log_segments_removed; },
sm::description("Number of removed log segments"),
labels),
sm::make_counter(
"log_segments_active",
[this] { return _log_segments_active; },
sm::description("Number of active log segments"),
labels),
sm::make_counter(
"batch_parse_errors",
[this] { return _batch_parse_errors; },
sm::description("Number of batch parsing (reading) errors"),
labels),
sm::make_counter(
"batch_write_errors",
[this] { return _batch_write_errors; },
sm::description("Number of batch write errors"),
labels),
sm::make_counter(
"corrupted_compaction_indices",
[this] { return _corrupted_compaction_index; },
sm::description("Number of times we had to re-construct the "
".compaction index on a segment"),
labels),
sm::make_counter(
"compacted_segment",
[this] { return _segment_compacted; },
sm::description("Number of compacted segments"),
labels),
sm::make_gauge(
"partition_size",
[this] { return _partition_bytes; },
sm::description("Current size of partition in bytes"),
labels),
sm::make_total_bytes(
"compaction_ratio",
[this] { return _compaction_ratio; },
sm::description("Average segment compaction ratio"),
labels),
});
{sm::make_total_bytes(
"written_bytes",
[this] { return _bytes_written; },
sm::description("Total number of bytes written"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"batches_written",
[this] { return _batches_written; },
sm::description("Total number of batches written"),
labels)
.aggregate(aggregate_labels),
sm::make_total_bytes(
"read_bytes",
[this] { return _bytes_read; },
sm::description("Total number of bytes read"),
labels)
.aggregate(aggregate_labels),
sm::make_total_bytes(
"cached_read_bytes",
[this] { return _cached_bytes_read; },
sm::description("Total number of cached bytes read"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"batches_read",
[this] { return _batches_read; },
sm::description("Total number of batches read"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"cached_batches_read",
[this] { return _cached_batches_read; },
sm::description("Total number of cached batches read"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"log_segments_created",
[this] { return _log_segments_created; },
sm::description("Number of created log segments"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"log_segments_removed",
[this] { return _log_segments_removed; },
sm::description("Number of removed log segments"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"log_segments_active",
[this] { return _log_segments_active; },
sm::description("Number of active log segments"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"batch_parse_errors",
[this] { return _batch_parse_errors; },
sm::description("Number of batch parsing (reading) errors"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"batch_write_errors",
[this] { return _batch_write_errors; },
sm::description("Number of batch write errors"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"corrupted_compaction_indices",
[this] { return _corrupted_compaction_index; },
sm::description("Number of times we had to re-construct the "
".compaction index on a segment"),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"compacted_segment",
[this] { return _segment_compacted; },
sm::description("Number of compacted segments"),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"partition_size",
[this] { return _partition_bytes; },
sm::description("Current size of partition in bytes"),
labels)
.aggregate(aggregate_labels),
sm::make_total_bytes(
"compaction_ratio",
[this] { return _compaction_ratio; },
sm::description("Average segment compaction ratio"),
labels)
.aggregate(aggregate_labels)});
}

void probe::add_initial_segment(const segment& s) {
Expand All @@ -168,6 +185,11 @@ void readers_cache_probe::setup_metrics(const model::ntp& ntp) {
auto ns_label = sm::label("namespace");
auto topic_label = sm::label("topic");
auto partition_label = sm::label("partition");
auto aggregate_labels
= config::shard_local_cfg().aggregate_metrics()
? std::vector<sm::label>{sm::shard_label, partition_label}
: std::vector<sm::label>{};

const std::vector<sm::label_instance> labels = {
ns_label(ntp.ns()),
topic_label(ntp.tp.topic()),
Expand All @@ -181,22 +203,26 @@ void readers_cache_probe::setup_metrics(const model::ntp& ntp) {
"readers_added",
[this] { return _readers_added; },
sm::description("Number of readers added to cache"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"readers_evicted",
[this] { return _readers_evicted; },
sm::description("Number of readers evicted from cache"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"cache_hits",
[this] { return _cache_hits; },
sm::description("Reader cache hits"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"cache_misses",
[this] { return _cache_misses; },
sm::description("Reader cache misses"),
labels),
labels)
.aggregate(aggregate_labels),
});
}
} // namespace storage
21 changes: 12 additions & 9 deletions tests/rptest/tests/compacted_term_rolled_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@


class CompactionTermRollRecoveryTest(RedpandaTest):
topics = (TopicSpec(cleanup_policy=TopicSpec.CLEANUP_COMPACT), )
topics = (TopicSpec(cleanup_policy=TopicSpec.CLEANUP_COMPACT,
partition_count=1), )

def __init__(self, test_context):
extra_rp_conf = dict(
Expand All @@ -41,7 +42,10 @@ def test_compact_term_rolled_recovery(self):
TODO: this test should be generalized so we can test this basic recovery
scenario with various topic configurations.
"""
# operate on a partition. doesn't matter which one
# Operate on the topics single partition. This is relevant
# because we use a metric that is aggregated by partition
# to retrieve the number of compacted segments, which means
# that it cannot differentiate between multiple partitions.
partition = self.redpanda.partitions(self.topic)[0]

# stop a replica in order to test its recovery
Expand All @@ -50,37 +54,37 @@ def test_compact_term_rolled_recovery(self):
self.redpanda.stop_node(needs_recovery)

# produce until segments have been compacted
self._produce_until_compaction(others, self.topic, partition.index)
self._produce_until_compaction(others, self.topic)

# restart all replicas: rolls term, starts recovery
self.redpanda.restart_nodes(all_replicas)

# ensure that the first stopped node recovered ok
self._wait_until_recovered(all_replicas, self.topic, partition.index)

def _produce_until_compaction(self, nodes, topic, partition):
def _produce_until_compaction(self, nodes, topic):
"""
Produce into the topic until some new segments have been compacted.
"""
num_segs = 3

target = list(
map(lambda cnt: cnt + num_segs,
self._compacted_segments(nodes, topic, partition)))
self._compacted_segments(nodes, topic)))

kafka_tools = KafkaCliTools(self.redpanda)

def done():
kafka_tools.produce(self.topic, 1024, 1024)
curr = self._compacted_segments(nodes, topic, partition)
curr = self._compacted_segments(nodes, topic)
return all(map(lambda cnt: cnt[0] > cnt[1], zip(curr, target)))

wait_until(done,
timeout_sec=60,
backoff_sec=2,
err_msg="Compacted segments were not created")

def _compacted_segments(self, nodes, topic, partition):
def _compacted_segments(self, nodes, topic):
"""
Fetch the number of compacted segments.
Expand All @@ -95,8 +99,7 @@ def fetch(node):
for sample in family.samples:
if sample.name == "vectorized_storage_log_compacted_segment_total" and \
sample.labels["namespace"] == "kafka" and \
sample.labels["topic"] == topic and \
int(sample.labels["partition"]) == partition:
sample.labels["topic"] == topic:
count += int(sample.value)
self.logger.debug(count)
return count
Expand Down

0 comments on commit b680204

Please sign in to comment.