Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix readers cache truncation deadlock #5681

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions src/v/storage/readers_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ readers_cache::put(std::unique_ptr<log_reader> reader) {
return model::record_batch_reader(std::move(reader));
}
// check if requested reader belongs to one of the locked range
auto lock_it = std::find_if(
_locked_offset_ranges.begin(),
_locked_offset_ranges.end(),
[&reader](const offset_range& range) {
return reader->lease_range_base_offset() > range.second
|| reader->lease_range_end_offset() < range.first;
});
Comment on lines -57 to -63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! The fix seems reasonable, though I'm curious how else this bug may manifest itself. Is the worst that can happen a minute-long deadlock? Or is there any way this can result in a segfault or segments being deleted incorrectly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one minute deadlock is how it manifests, it can not lead to any other more serious issue as read/write/truncate concurrency control is separate from readers cache.


// range locked, do not insert into the cache
if (lock_it != _locked_offset_ranges.end()) {
if (intersects_with_locked_range(
reader->lease_range_base_offset(),
reader->lease_range_end_offset())) {
vlog(
stlog.trace,
"{} - range is locked, not adding reader with lease [{},{}]",
_ntp,
reader->lease_range_base_offset(),
reader->lease_range_end_offset());
return model::record_batch_reader(std::move(reader));
}

Expand All @@ -80,6 +80,19 @@ readers_cache::put(std::unique_ptr<log_reader> reader) {
return ptr->make_cached_reader(this);
}

bool readers_cache::intersects_with_locked_range(
model::offset reader_base_offset, model::offset reader_end_offset) const {
auto lock_it = std::find_if(
_locked_offset_ranges.begin(),
_locked_offset_ranges.end(),
[reader_base_offset, reader_end_offset](const offset_range& range) {
return reader_base_offset <= range.second
Copy link
Contributor

@ajfabbri ajfabbri Jul 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good find. This condition looks correct to me. Previous one didn't make any sense to me. Good call putting it in a helper function.

&& reader_end_offset >= range.first;
Copy link
Contributor

@ajfabbri ajfabbri Jul 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Story time: When I finished grad school and interviewed at my first big job, this was the last coding problem in a long day of interviews. This part in particular is error prone. Also, the guy who gave me this problem is now making autonomous robots that kill things (weeds) with lasers.

});

return lock_it != _locked_offset_ranges.end();
}

std::optional<model::record_batch_reader>
readers_cache::get_reader(const log_reader_config& cfg) {
if (_gate.is_closed()) {
Expand Down
23 changes: 19 additions & 4 deletions src/v/storage/readers_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,29 @@ class readers_cache {
, _cache(c) {}

range_lock_holder(const range_lock_holder&) = delete;
range_lock_holder(range_lock_holder&&) = default;
range_lock_holder(range_lock_holder&& other) noexcept
: _range(std::move(other._range))
, _cache(other._cache) {
other._range.reset();
}

range_lock_holder& operator=(const range_lock_holder&) = delete;
range_lock_holder& operator=(range_lock_holder&&) = default;
range_lock_holder& operator=(range_lock_holder&& other) noexcept {
_range = std::move(other._range);
_cache = other._cache;
other._range.reset();

return *this;
}

~range_lock_holder() {
std::erase(_cache->_locked_offset_ranges, _range);
if (_range) {
std::erase(_cache->_locked_offset_ranges, _range.value());
}
Comment on lines -52 to +64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gosh. nice catch

}

private:
offset_range _range;
std::optional<offset_range> _range;
readers_cache* _cache;
};
explicit readers_cache(model::ntp, std::chrono::milliseconds);
Expand Down Expand Up @@ -82,6 +94,7 @@ class readers_cache {
~readers_cache();

private:
friend struct readers_cache_test_fixture;
struct entry;
void touch(entry* e) {
e->last_used = ss::lowres_clock::now();
Expand Down Expand Up @@ -162,6 +175,8 @@ class readers_cache {
co_await dispose_entries(std::move(to_evict));
}

bool intersects_with_locked_range(model::offset, model::offset) const;

model::ntp _ntp;
std::chrono::milliseconds _eviction_timeout;
ss::gate _gate;
Expand Down
1 change: 1 addition & 0 deletions src/v/storage/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ rp_test(
timequery_test.cc
kvstore_test.cc
backlog_controller_test.cc
readers_cache_test.cc
LIBRARIES v::seastar_testing_main v::storage_test_utils v::model_test_utils
LABELS storage
ARGS "-- -c 1"
Expand Down
113 changes: 113 additions & 0 deletions src/v/storage/tests/readers_cache_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2022 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "model/fundamental.h"
#include "seastarx.h"
#include "storage/readers_cache.h"
#include "test_utils/fixture.h"

#include <fmt/ostream.h>

namespace storage {
struct readers_cache_test_fixture {
readers_cache_test_fixture()
: cache(
model::ntp("test", "test", 0), std::chrono::milliseconds(360000)) {}

bool intersects_locked_range(model::offset begin, model::offset end) {
return cache.intersects_with_locked_range(begin, end);
}

void test_intersects_locked(
model::offset::type begin, model::offset::type end, bool in_range) {
BOOST_REQUIRE_EQUAL(
intersects_locked_range(model::offset(begin), model::offset(end)),
in_range);
}

readers_cache cache;
};

} // namespace storage
using namespace storage;

FIXTURE_TEST(test_range_is_correctly_locked, readers_cache_test_fixture) {
Copy link
Contributor

@ajfabbri ajfabbri Jul 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding a unit test.

{
/**
* Evict truncate locks range starting from give offset up to max
*/
auto holder = cache.evict_truncate(model::offset(10)).get();

// [0,1] is not in [10,inf]
test_intersects_locked(0, 1, false);

// [0,10] is in range [10,inf]
test_intersects_locked(0, 10, true);

// [9,12] is in range [10, inf]
test_intersects_locked(9, 12, true);

// [10,12] is in range [10, inf]
test_intersects_locked(10, 12, true);

// [20,25] is in range [10, inf]
test_intersects_locked(20, 25, true);
}

{
/**
* Evict prefix truncate locks range starting from 0 up to given offset
*/
auto holder = cache.evict_prefix_truncate(model::offset(10)).get();

// [0,1] is in [0,10]
test_intersects_locked(0, 1, true);

// [0,10] is in range [0,10]
test_intersects_locked(0, 10, true);

// [9,12] is in range [0, 10]
test_intersects_locked(9, 12, true);

// [10,12] is in range [0, 10]
test_intersects_locked(10, 12, true);

// [20,25] is not in range [0, 10]
test_intersects_locked(20, 25, false);
}

{
/**
* Evict range locks given range
*/
auto holder
= cache.evict_range(model::offset(5), model::offset(10)).get();

// [0,1] is not in [5,10]
test_intersects_locked(0, 1, false);

// [0,20] is not in range [5,10] but [5,10] is in [0,20]
test_intersects_locked(0, 20, true);

// [9,12] is in range [5,10]
test_intersects_locked(9, 12, true);

// [10,12] is in range [5,10]
test_intersects_locked(10, 12, true);

// [20,25] is not in range [5,10]
test_intersects_locked(20, 25, false);

// [4,5] is in range [5,10]
test_intersects_locked(4, 5, true);

// [6,7] is in range [5,10]
test_intersects_locked(6, 7, true);
}
}
114 changes: 114 additions & 0 deletions src/v/storage/tests/storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "model/timeout_clock.h"
#include "model/timestamp.h"
#include "random/generators.h"
#include "reflection/adl.h"
#include "storage/batch_cache.h"
#include "storage/log_manager.h"
#include "storage/record_batch_builder.h"
Expand Down Expand Up @@ -2191,3 +2192,116 @@ FIXTURE_TEST(test_compacting_batches_of_different_types, storage_test_fixture) {

BOOST_REQUIRE(before_compaction == after_compaction);
}

FIXTURE_TEST(read_write_truncate, storage_test_fixture) {
/**
* Test validating concurrent reads, writes and truncations
mmaslankaprv marked this conversation as resolved.
Show resolved Hide resolved
*/
auto cfg = default_log_config(test_dir);
cfg.stype = storage::log_config::storage_type::disk;
cfg.cache = storage::with_cache::no;
cfg.max_segment_size = config::mock_binding<size_t>(100_MiB);
storage::ntp_config::default_overrides overrides;
ss::abort_source as;
storage::log_manager mgr = make_log_manager(cfg);
info("config: {}", mgr.config());
auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get(); });

auto ntp = model::ntp("default", "test", 0);
auto log
= mgr.manage(storage::ntp_config(ntp, mgr.config().base_dir)).get0();

int cnt = 0;
int max = 500;
mutex log_mutex;
auto produce = ss::do_until(
[&] { return cnt > max; },
[&log, &cnt, &log_mutex] {
ss::circular_buffer<model::record_batch> batches;
for (int i = 0; i < 20; ++i) {
storage::record_batch_builder builder(
model::record_batch_type::raft_data, model::offset(0));

builder.add_raw_kv(
reflection::to_iobuf("key"), reflection::to_iobuf("value"));
batches.push_back(std::move(builder).build());
}
auto reader = model::make_memory_record_batch_reader(
std::move(batches));

storage::log_append_config cfg{
.should_fsync = storage::log_append_config::fsync::no,
.io_priority = ss::default_priority_class(),
.timeout = model::no_timeout,
};
info("append");
return log_mutex
.with([reader = std::move(reader), cfg, &log]() mutable {
info("append_lock");
return std::move(reader).for_each_ref(
log.make_appender(cfg), cfg.timeout);
})
.then([](storage::append_result res) {
info("append_result: {}", res.last_offset);
})
.then([&log] { return log.flush(); })
.finally([&cnt] { cnt++; });
});

auto read = ss::do_until(
[&] { return cnt > max; },
[&log, &cnt] {
auto offset = log.offsets();
storage::log_reader_config cfg(
std::max(model::offset(0), offset.dirty_offset - model::offset(10)),
cnt % 2 == 0 ? offset.dirty_offset - model::offset(2)
: offset.dirty_offset,
ss::default_priority_class());
auto start = ss::steady_clock_type::now();
return log.make_reader(cfg)
.then([start](model::record_batch_reader rdr) {
// assert that creating a reader took less than 5 seconds
BOOST_REQUIRE_LT(
(ss::steady_clock_type::now() - start) / 1ms, 5000);
return model::consume_reader_to_memory(
std::move(rdr), model::no_timeout);
})
.then([](ss::circular_buffer<model::record_batch> batches) {
if (batches.empty()) {
info("read empty range");
return;
}
info(
"read range: {}, {}",
batches.front().base_offset(),
batches.back().last_offset());
});
});

auto truncate = ss::do_until(
[&] { return cnt > max; },
[&log, &log_mutex] {
auto offset = log.offsets();
if (offset.dirty_offset <= model::offset(0)) {
return ss::now();
}
return log_mutex.with([&log] {
auto offset = log.offsets();
info("truncate offsets: {}", offset);
auto start = ss::steady_clock_type::now();
return log
.truncate(storage::truncate_config(
offset.dirty_offset, ss::default_priority_class()))
.finally([start, o = offset.dirty_offset] {
// assert that truncation took less than 5 seconds
BOOST_REQUIRE_LT(
(ss::steady_clock_type::now() - start) / 1ms, 5000);
info("truncate_done");
});
});
});

produce.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some questions about this integration test.. nothing that should block this from being merged, IMO. We can discuss separately.

read.get();
truncate.get();
}