Skip to content

Commit

Permalink
s/tests: added readers_cache range locking test
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Jul 28, 2022
1 parent c230802 commit 1c31f91
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/v/storage/readers_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class readers_cache {
~readers_cache();

private:
friend struct readers_cache_test_fixture;
struct entry;
void touch(entry* e) {
e->last_used = ss::lowres_clock::now();
Expand Down
1 change: 1 addition & 0 deletions src/v/storage/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ rp_test(
timequery_test.cc
kvstore_test.cc
backlog_controller_test.cc
readers_cache_test.cc
LIBRARIES v::seastar_testing_main v::storage_test_utils v::model_test_utils
LABELS storage
ARGS "-- -c 1"
Expand Down
113 changes: 113 additions & 0 deletions src/v/storage/tests/readers_cache_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2022 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "model/fundamental.h"
#include "seastarx.h"
#include "storage/readers_cache.h"
#include "test_utils/fixture.h"

#include <fmt/ostream.h>

namespace storage {
struct readers_cache_test_fixture {
readers_cache_test_fixture()
: cache(
model::ntp("test", "test", 0), std::chrono::milliseconds(360000)) {}

bool intersects_locked_range(model::offset begin, model::offset end) {
return cache.intersects_with_locked_range(begin, end);
}

void test_intersects_locked(
model::offset::type begin, model::offset::type end, bool in_range) {
BOOST_REQUIRE_EQUAL(
intersects_locked_range(model::offset(begin), model::offset(end)),
in_range);
}

readers_cache cache;
};

} // namespace storage
using namespace storage;

FIXTURE_TEST(test_range_is_correctly_locked, readers_cache_test_fixture) {
{
/**
* Evict truncate locks range starting from give offset up to max
*/
auto holder = cache.evict_truncate(model::offset(10)).get();

// [0,1] is not in [10,inf]
test_intersects_locked(0, 1, false);

// [0,10] is in range [10,inf]
test_intersects_locked(0, 10, true);

// [9,12] is in range [10, inf]
test_intersects_locked(9, 12, true);

// [10,12] is in range [10, inf]
test_intersects_locked(10, 12, true);

// [20,25] is in range [10, inf]
test_intersects_locked(20, 25, true);
}

{
/**
* Evict prefix truncate locks range starting from 0 up to given offset
*/
auto holder = cache.evict_prefix_truncate(model::offset(10)).get();

// [0,1] is in [0,10]
test_intersects_locked(0, 1, true);

// [0,10] is in range [0,10]
test_intersects_locked(0, 10, true);

// [9,12] is in range [0, 10]
test_intersects_locked(9, 12, true);

// [10,12] is in range [0, 10]
test_intersects_locked(10, 12, true);

// [20,25] is not in range [0, 10]
test_intersects_locked(20, 25, false);
}

{
/**
* Evict range locks given range
*/
auto holder
= cache.evict_range(model::offset(5), model::offset(10)).get();

// [0,1] is not in [5,10]
test_intersects_locked(0, 1, false);

// [0,20] is not in range [5,10] but [5,10] is in [0,20]
test_intersects_locked(0, 20, true);

// [9,12] is in range [5,10]
test_intersects_locked(9, 12, true);

// [10,12] is in range [5,10]
test_intersects_locked(10, 12, true);

// [20,25] is not in range [5,10]
test_intersects_locked(20, 25, false);

// [4,5] is in range [5,10]
test_intersects_locked(4, 5, true);

// [6,7] is in range [5,10]
test_intersects_locked(6, 7, true);
}
}
114 changes: 114 additions & 0 deletions src/v/storage/tests/storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "model/timeout_clock.h"
#include "model/timestamp.h"
#include "random/generators.h"
#include "reflection/adl.h"
#include "storage/batch_cache.h"
#include "storage/log_manager.h"
#include "storage/record_batch_builder.h"
Expand Down Expand Up @@ -2191,3 +2192,116 @@ FIXTURE_TEST(test_compacting_batches_of_different_types, storage_test_fixture) {

BOOST_REQUIRE(before_compaction == after_compaction);
}

FIXTURE_TEST(read_write_truncate, storage_test_fixture) {
/**
* Test validating concurrent reads, writes and truncations
*/
auto cfg = default_log_config(test_dir);
cfg.stype = storage::log_config::storage_type::disk;
cfg.cache = storage::with_cache::no;
cfg.max_segment_size = config::mock_binding<size_t>(100_MiB);
storage::ntp_config::default_overrides overrides;
ss::abort_source as;
storage::log_manager mgr = make_log_manager(cfg);
info("config: {}", mgr.config());
auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get(); });

auto ntp = model::ntp("default", "test", 0);
auto log
= mgr.manage(storage::ntp_config(ntp, mgr.config().base_dir)).get0();

int cnt = 0;
int max = 500;
mutex log_mutex;
auto produce = ss::do_until(
[&] { return cnt > max; },
[&log, &cnt, &log_mutex] {
ss::circular_buffer<model::record_batch> batches;
for (int i = 0; i < 20; ++i) {
storage::record_batch_builder builder(
model::record_batch_type::raft_data, model::offset(0));

builder.add_raw_kv(
reflection::to_iobuf("key"), reflection::to_iobuf("value"));
batches.push_back(std::move(builder).build());
}
auto reader = model::make_memory_record_batch_reader(
std::move(batches));

storage::log_append_config cfg{
.should_fsync = storage::log_append_config::fsync::no,
.io_priority = ss::default_priority_class(),
.timeout = model::no_timeout,
};
info("append");
return log_mutex
.with([reader = std::move(reader), cfg, &log]() mutable {
info("append_lock");
return std::move(reader).for_each_ref(
log.make_appender(cfg), cfg.timeout);
})
.then([](storage::append_result res) {
info("append_result: {}", res.last_offset);
})
.then([&log] { return log.flush(); })
.finally([&cnt] { cnt++; });
});

auto read = ss::do_until(
[&] { return cnt > max; },
[&log, &cnt] {
auto offset = log.offsets();
storage::log_reader_config cfg(
std::max(model::offset(0), offset.dirty_offset - model::offset(10)),
cnt % 2 == 0 ? offset.dirty_offset - model::offset(2)
: offset.dirty_offset,
ss::default_priority_class());
auto start = ss::steady_clock_type::now();
return log.make_reader(cfg)
.then([start](model::record_batch_reader rdr) {
// assert that creating a reader took less than 5 seconds
BOOST_REQUIRE_LT(
(ss::steady_clock_type::now() - start) / 1ms, 5000);
return model::consume_reader_to_memory(
std::move(rdr), model::no_timeout);
})
.then([](ss::circular_buffer<model::record_batch> batches) {
if (batches.empty()) {
info("read empty range");
return;
}
info(
"read range: {}, {}",
batches.front().base_offset(),
batches.back().last_offset());
});
});

auto truncate = ss::do_until(
[&] { return cnt > max; },
[&log, &log_mutex] {
auto offset = log.offsets();
if (offset.dirty_offset <= model::offset(0)) {
return ss::now();
}
return log_mutex.with([&log] {
auto offset = log.offsets();
info("truncate offsets: {}", offset);
auto start = ss::steady_clock_type::now();
return log
.truncate(storage::truncate_config(
offset.dirty_offset, ss::default_priority_class()))
.finally([start, o = offset.dirty_offset] {
// assert that truncation took less than 5 seconds
BOOST_REQUIRE_LT(
(ss::steady_clock_type::now() - start) / 1ms, 5000);
info("truncate_done");
});
});
});

produce.get();
read.get();
truncate.get();
}

0 comments on commit 1c31f91

Please sign in to comment.