Skip to content

Commit

Permalink
storage/test: integration test for max compaction offset
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaron Fabbri committed Jul 7, 2022
1 parent 38e811f commit 1c6661f
Showing 1 changed file with 68 additions and 0 deletions.
68 changes: 68 additions & 0 deletions src/v/storage/tests/storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2110,3 +2110,71 @@ FIXTURE_TEST(test_querying_term_last_offset, storage_test_fixture) {

BOOST_REQUIRE(!log.get_term_last_offset(model::term_id(0)).has_value());
}

FIXTURE_TEST(test_max_compact_offset, storage_test_fixture) {
// Test setup.
auto cfg = default_log_config(test_dir);
cfg.max_segment_size = config::mock_binding<size_t>(1_KiB);
cfg.stype = storage::log_config::storage_type::disk;
ss::abort_source as;
storage::log_manager mgr = make_log_manager(cfg);
info("Configuration: {}", mgr.config());
auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get0(); });
auto ntp = model::ntp("default", "test", 0);
storage::ntp_config ntp_cfg(ntp, mgr.config().base_dir);
auto log = mgr.manage(std::move(ntp_cfg)).get0();
auto disk_log = get_disk_log(log);

// (1) append some random data, with limited number of distinct keys, so
// compaction can make progress.
auto headers = append_random_batches<key_limited_random_batch_generator>(
log, 20);
log.flush().get0();
auto all_batches = read_and_validate_all_batches(log);
size_t first_size = std::accumulate(
all_batches.begin(),
all_batches.end(),
size_t(0),
[](size_t acc, model::record_batch& b) { return acc + b.size_bytes(); });

// (2) remember log offset, roll log, and produce more messages
auto first_stats = log.offsets();
info("Offsets to be compacted {}", first_stats);
disk_log->force_roll(ss::default_priority_class()).get();
headers = append_random_batches(log, 20);
auto new_batches = read_and_validate_all_batches(log);
size_t total_size = std::accumulate(
new_batches.begin(),
new_batches.end(),
size_t(0),
[](size_t acc, model::record_batch& b) { return acc + b.size_bytes(); });

// (3) remember log offset, force log roll, and trigger compaction, but only
// on the messages from phase (1)
auto second_stats = log.offsets();
disk_log->force_roll(ss::default_priority_class()).get();
storage::compaction_config ccfg(
model::timestamp::max(), // no time-based deletion
std::nullopt,
first_stats.committed_offset,
ss::default_priority_class(),
as);
log.compact(ccfg).get0();

// (4) check correctness.
// num bytes compacted < bytes written in (1)
// num bytes compacted > 0

auto final_stats = log.offsets();
info("Final offsets {}", final_stats);
auto max_compacted = first_stats.committed_offset + model::offset(1);
auto num_compacted = second_stats.committed_offset
- final_stats.committed_offset;
info("max_compacted {}, num_compacted {}", max_compacted, num_compacted);

// we compacted some data
BOOST_REQUIRE(final_stats.committed_offset < second_stats.committed_offset);

BOOST_REQUIRE(num_compacted < max_compacted);
BOOST_REQUIRE(num_compacted > model::offset(0));
};

0 comments on commit 1c6661f

Please sign in to comment.