Skip to content

Commit

Permalink
test: reproducer for segment appender assertion
Browse files Browse the repository at this point in the history
Reported failure: redpanda-data#5433

Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
  • Loading branch information
dotnwat committed Oct 19, 2022
1 parent 23f8495 commit b243746
Showing 1 changed file with 198 additions and 0 deletions.
198 changes: 198 additions & 0 deletions src/v/storage/tests/log_segment_appender_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "random/generators.h"
#include "seastarx.h"
#include "storage/segment_appender.h"
#include "test_utils/tmp_dir.h"

#include <seastar/core/reactor.hh>
#include <seastar/core/thread.hh>
Expand Down Expand Up @@ -284,3 +285,200 @@ SEASTAR_THREAD_TEST_CASE(test_fallocate_size) {
run_test_fallocate_size(fallocate_size);
}
}

/*
* Assumption about the segment appender:
*
* usage of the appender api is exclusive: you cannot call append or truncate
* or any other interface until the future from any previous method
* completes.
*
* The exception to this rule is `flush`, which should be allowed to be invoked
* asynchronously to append calls, even if an append has not completed:
*
* commit 5668d7da3309d7bd2d03339f4067b64320cb3802
* Author: Noah Watkins <noah@vectorized.io>
* Date: Thu Jun 3 15:18:33 2021 -0700
*
* storage: support concurrent append and flushing
*
* This patch adds support to the segment appender for allowing append()
* to be called before the future from a previous flush() is completed. This new
* functionality allows raft to pipeline appends while while waiting on flushes
* to complete asynchronously.
*
* It would appear that there is a race condition in which an assertion fails,
* and this race condition is rare. It requires that truncate or fallocation run
* in parallel to a slow dma_write.
*
* The following test will reproduce the bug on DEBUG builds. Some strategic
* sleeps may be required to make it reproduce on a RELEASE build.
*/
SEASTAR_THREAD_TEST_CASE(exclusive_concurrent_flushing_drains_all_io) {
// create an empty file
temporary_dir dir("exclusive_concurrent_flushing_drains_all_io");
auto fpath = dir.get_path() / "segment.dat";
auto f = open_file(fpath.string());

// appender with 32_KiB fallocation size
const int falloc_size = 32_KiB;
storage::storage_resources resources(
config::mock_binding<size_t>(falloc_size));
auto appender = make_segment_appender(f, resources);

const iobuf one_byte = make_random_data(1);
const iobuf many_bytes = make_random_data(falloc_size * 2);
std::vector<ss::future<>> bg_append;
std::vector<ss::future<>> bg_flush;

/*
* we want to be able to control when fallocation occurs, and it always
* occurs on the first append. so let's get that out of the way.
*
* after this call the head write-ahead buffer contains 1 byte.
*/
appender.append(one_byte).get();

/*
* FLUSH-A
*
* next we want to trigger a write by calling flush. since the head buffer
* contains 1 byte of dirty data a background write will be started via
* dispatch_background_head_write. here that is with the important parts:
*
* ss::future<> segment_appender::flush() {
*
* void segment_appender::dispatch_background_head_write() {
*
* // NOT FULL
* const auto full = _head->is_full();
*
* // PREV UNITS
* auto prev = _prev_head_write;
* auto units = ss::get_units(*prev, 1);
*
* // BACKGROUND WRITE
* (void)ss::with_semaphore(
* _concurrent_flushes,
* 1,
* [units = std::move(units), full]() mutable {
* return units
* .then([this, h, w, start_offset, expected, src, full](
* ssx::semaphore_units u) mutable {
* return _out
* .dma_write(start_offset, src, expected,
* _opts.priority)
*
*
* // PREV HEAD _NOT_ RESET
* if (full) {
* _prev_head_write = ss::make_lw_shared<ssx::semaphore>(1,
* head_sem_name);
* }
*
* so in words, at this point there is a background write that is holding 1
* unit of _concurrent_flushes, and 1 unit of _prev_head_write. finally,
* since the head buffer was not full, it is also not reset to a fresh mutex
* before exiting.
*
* Note that the flush is start asynchronously in the background which is
* allowed according to the assumptions stated above.
*/
bg_flush.push_back(appender.flush());

/*
* we want to be able to trigger another background write. if we called
* flush again now it would be a noop since the previous dirty data is now
* in flight ot the disk. so let's add another byte to make the head buffer
* dirty again.
*/
appender.append(one_byte).get();

/*
* here is the first part of where the race happens. we are going to now to
* start a new append that is large enough to trigger the fallocation path,
* and we are going to background the append.
*
* this usage is still is still correct according to the assumptions stated
* above: there is 1 inflight flush and all previous appends have completed.
*
* when append is a called the need for fallocation is checked. if an
* fallocation is needed it is done prior to any remaining data in the
* append call being written to the current head buffer.
*
* ss::future<> segment_appender::append(const iobuf& io) {
*
* ss::future<> segment_appender::do_next_adaptive_fallocation() {
*
* return ss::with_semaphore(
* _concurrent_flushes,
* ss::semaphore::max_counter(),
* [this, step]() mutable {
* vassert(
* _prev_head_write->available_units() == 1,
* "Unexpected pending head write {}",
* *this);
*
* at this point the append we just started should be blocked on
* _concurrent_flushes because it is trying grab exclusive control
* (max_counter) AND the background write started from FLUSH-A above is
* holding 1 unit of this semaphore.
*/
bg_append.push_back(appender.append(many_bytes));

/*
* currently there is 1 flush and 1 append in flight.
*/
BOOST_REQUIRE(bg_append.size() == 1);
BOOST_REQUIRE(bg_flush.size() == 1);

/*
* now for the final piece of the race. we are going to call flush once
* more. recall that we have 1 byte of dirty data in the head buffer and
* many bytes that are in an inflight append, but waiting on fallocate.
*
* recall from above what happens when flush starts the background write:
*
* ss::future<> segment_appender::flush() {
* void segment_appender::dispatch_background_head_write() {
* ...
*
*
* the background write grabs 1 unit of _concurrent_flushes and 1 unit of
* _prev_head_write. since the head buffer is NOT full, it is not reset to a
* new semaphore before returning.
*/
bg_flush.push_back(appender.flush());
BOOST_REQUIRE(bg_append.size() == 1);
BOOST_REQUIRE(bg_flush.size() == 2);

/*
* now let the race complete.
*
* when the background write from the original FLUSH-A completes and
* releases its unit of _concurrent_flushes and _prev_head_write two things
* will happen.
*
* first, the next waiter on _concurrent_flushes will be woken up. this is
* the fallocation call from the last append that is in the background.
*
* second, _prev_head_write units will be released and control of the mutex
* will be given to the next waiter which is the background write scheduled
* from the immediately preceeding flush.
*
* when fallocate is scheduled holding exclusive access to concurrent
* flushes it will find that this assertion doesn't hold because there is a
* new owner of the current _prev_head_write semaphore.
*
* vassert(
* _prev_head_write->available_units() == 1,
* "Unexpected pending head write {}",
* *this);
*/
ss::when_all_succeed(
ss::when_all_succeed(bg_append.begin(), bg_append.end()),
ss::when_all_succeed(bg_flush.begin(), bg_flush.end()))
.get();

appender.close().get();
}

0 comments on commit b243746

Please sign in to comment.