Skip to content

Commit

Permalink
reproduce empty batch corruption
Browse files Browse the repository at this point in the history
Here’s a sequence of events that reproduces the issue:
[start, end], where start == end-1 means the batch is empty
[0, 0] [1, 0] [1, 0], [1, 1] [2, 2]

Goal: truncate at 2), leave only 1]

1. find starting point to truncate by looking roughly for offset 1 (see
   `segment_index::find_nearest(model::offset)`)
2. get offset 0 as the seek starting point, we’ll seek forward from 0 until we
   see offset 2, and use the preceding batch
3. read from cache for [0, 0] [1, 0] (log_reader.read_some() last record is the
   first [1, 0] batch) (see `log_segment_batch_reader::read_some()`)
4. read from disk from where cache left off (see
   `log_segment_batch_reader::read_some()`), starting at the record following
   [1, 0]: 0 + 1 = 1
5. skip anything with last_offset < 1, so jump to [1, 1] (see
   `skipping_consumer::accept_batch_start()`)
6. we missed accumulating file size for the second [1, 0] batch (see
   `offset_to_filepos_consumer::operator()`)
7. because the returned file size is too low, we truncate at too low of a
   filepos (see `disk_log_impl::do_truncate()`)

General problems:
- when switching over from cache to disk reader (redpanda-data#4), we didn’t account for
  empty batches in the batch cache, causing us to skip portions of the file when
  figuring out where to truncate
- even more generally, the interplay between offset_to_filepos_consumer and the
  skipping_consumer doesn’t expect empty batches at all, resulting in incorrect
  skipping when trying to figure out where to truncate
  • Loading branch information
andrwng committed Nov 3, 2023
1 parent 144fa4e commit 036f04d
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 5 deletions.
2 changes: 2 additions & 0 deletions src/v/storage/batch_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ batch_cache::batch_cache(const reclaim_options& opts)

batch_cache::entry
batch_cache::put(batch_cache_index& index, const model::record_batch& input) {
vlog(stlog.info, "AWONG PUT [{}, {}]", input.base_offset(), input.last_offset());
// notify no matter what the exit path
auto notify_guard = ss::defer([this] { _background_reclaimer.notify(); });

Expand Down Expand Up @@ -392,6 +393,7 @@ void batch_cache_index::truncate(model::offset offset) {
++it;
}
std::for_each(it, _index.end(), [this](index_type::value_type& e) {
vlog(stlog.info, "AWONG EVICT [{},{}]", e.second.batch().base_offset(), e.second.batch().last_offset());
_cache->evict(std::move(e.second.range()));
});
_index.erase(it, _index.end());
Expand Down
6 changes: 5 additions & 1 deletion src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1875,12 +1875,14 @@ ss::future<> disk_log_impl::do_truncate(
// if no offset is found in an index we will start from the segment base
// offset
model::offset start = last->offsets().base_offset;
auto target = std::max(start, model::prev_offset(cfg.base_offset));

auto pidx = last->index().find_nearest(
std::max(start, model::prev_offset(cfg.base_offset)));
target);
size_t initial_size = 0;
model::timestamp initial_timestamp = last->index().max_timestamp();
if (pidx) {
vlog(stlog.info, "AWONG FOUND {} LOOKING FOR {}", start, target);
start = pidx->offset;
initial_size = pidx->filepos;
initial_timestamp = pidx->timestamp;
Expand All @@ -1891,6 +1893,7 @@ ss::future<> disk_log_impl::do_truncate(
// an unchecked reader is created which does not enforce the logical
// starting offset. this is needed because we really do want to read
// all the data in the segment to find the correct physical offset.
vlog(stlog.info, "AWONG READER START AT {}", start);
auto reader = co_await make_unchecked_reader(
log_reader_config(start, model::offset::max(), cfg.prio));
auto phs = co_await std::move(reader).consume(
Expand Down Expand Up @@ -1930,6 +1933,7 @@ ss::future<> disk_log_impl::do_truncate(
*this));
}
auto [prev_last_offset, file_position, new_max_timestamp] = phs.value();
vlog(stlog.info, "AWONG truncating at offset {}], filepos {}", prev_last_offset, file_position);

if (file_position == 0) {
_segs.pop_back();
Expand Down
6 changes: 3 additions & 3 deletions src/v/storage/disk_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,14 @@ class disk_log_impl final : public log {
const compaction_config& cfg,
std::optional<model::offset> new_start_offset = std::nullopt);


ss::future<model::record_batch_reader>
make_unchecked_reader(log_reader_config);
private:
friend class disk_log_appender; // for multi-term appends
friend class disk_log_builder; // for tests
friend std::ostream& operator<<(std::ostream& o, const disk_log_impl& d);

ss::future<model::record_batch_reader>
make_unchecked_reader(log_reader_config);

ss::future<model::record_batch_reader>
make_cached_reader(log_reader_config);

Expand Down
4 changes: 4 additions & 0 deletions src/v/storage/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,21 @@ batch_consumer::consume_result skipping_consumer::accept_batch_start(
* Check if we have to skip the batch
*/
if (header.last_offset() < _reader._config.start_offset) {
vlog(stlog.info, "AWONG SKIP {} V {}", header.last_offset(), _reader._config.start_offset);
return batch_consumer::consume_result::skip_batch;
}
if (
_reader._config.type_filter
&& _reader._config.type_filter != header.type) {
_reader._config.start_offset = header.last_offset() + model::offset(1);
vlog(stlog.info, "AWONG SKIP");
return batch_consumer::consume_result::skip_batch;
}
if (_reader._config.first_timestamp > header.max_timestamp) {
// kakfa requires that we return messages >= the timestamp, it is
// permitted to include a few earlier
_reader._config.start_offset = header.last_offset() + model::offset(1);
vlog(stlog.info, "AWONG SKIP");
return batch_consumer::consume_result::skip_batch;
}
// we want to consume the batch
Expand Down Expand Up @@ -246,6 +249,7 @@ log_reader::log_reader(
_iterator.reader = std::make_unique<log_segment_batch_reader>(
**_iterator.next_seg, _config, _probe);
}
vlog(stlog.info, "AWONG CONSTRUCTED W {}", _config.start_offset);
}

ss::future<> log_reader::find_next_valid_iterator() {
Expand Down
2 changes: 2 additions & 0 deletions src/v/storage/offset_to_filepos.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ ss::future<ss::stop_iteration>
offset_to_filepos_consumer::operator()(::model::record_batch batch) {
_prev = _accumulator;
_accumulator += batch.size_bytes();
vlog(stlog.info, "AWONG batch {} filepos [{}, {}]", batch.base_offset(), _prev, _accumulator);

if (_target_last_offset <= batch.base_offset()) {
vlog(stlog.info, "AWONG NEXT IS {} >= {}, RETURNING {}, FILEPOS {}", batch.base_offset(), _target_last_offset, _prev_batch_last_offset, _prev);
_filepos = {_prev_batch_last_offset, _prev, _prev_batch_max_timestamp};
co_return ss::stop_iteration::yes;
}
Expand Down
3 changes: 3 additions & 0 deletions src/v/storage/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,16 @@ ss::future<result<stop_parser>> continuous_batch_parser::consume_header() {
auto ret = _consumer->accept_batch_start(*_header);
switch (ret) {
case batch_consumer::consume_result::stop_parser:
vlog(stlog.info, "AWONG STOP {}", _header->base_offset);
co_return stop_parser::yes;
case batch_consumer::consume_result::accept_batch:
vlog(stlog.info, "AWONG ACCEPT BATCH {}", _header->base_offset);
_consumer->consume_batch_start(
*_header, _physical_base_offset, _header->size_bytes);
_physical_base_offset += _header->size_bytes;
co_return stop_parser::no;
case batch_consumer::consume_result::skip_batch:
vlog(stlog.info, "AWONG SKIP {}", _header->base_offset);
_consumer->skip_batch_start(
*_header, _physical_base_offset, _header->size_bytes);
_physical_base_offset += _header->size_bytes;
Expand Down
1 change: 1 addition & 0 deletions src/v/storage/segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ ss::future<append_result> segment::do_append(const model::record_batch& b) {
b.header())));
}
const auto start_physical_offset = _appender->file_byte_offset();
vlog(stlog.info, "AWONG APPEND {} AT FILEPOS {}", b.base_offset(), start_physical_offset);
_generation_id++;
// proxy serialization to segment_appender
auto write_fut = _appender->append(b).then(
Expand Down
30 changes: 30 additions & 0 deletions src/v/storage/tests/log_truncate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,36 @@ FIXTURE_TEST(test_truncate_whole, storage_test_fixture) {
BOOST_REQUIRE_EQUAL(lstats.start_offset, model::offset{});
}

FIXTURE_TEST(test_truncate_empty_batch, storage_test_fixture) {
storage::log_manager mgr = make_log_manager();
info("config: {}", mgr.config());
auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get0(); });
auto ntp = model::ntp("default", "test", 0);
auto log
= mgr.manage(storage::ntp_config(ntp, mgr.config().base_dir)).get0();
append_batch(
log, model::test::make_random_batch(model::offset{0}, 1, true));
append_batch(
log, model::test::make_random_batch(model::offset{1}, 0, true));
append_batch(
log, model::test::make_random_batch(model::offset{1}, 0, true));
append_batch(
log, model::test::make_random_batch(model::offset{1}, 1, true));
append_batch(
log, model::test::make_random_batch(model::offset{2}, 1, true));
log
->truncate(storage::truncate_config(
model::offset{2}, ss::default_priority_class()))
.get();
append_batch(
log, model::test::make_random_batch(model::offset{2}, 1, true));
auto read_batches = read_and_validate_all_batches(log);
for (const auto& b : read_batches) {
info("AWONG [{}, {}], {}", b.base_offset(), b.last_offset(), b);
}
BOOST_REQUIRE_EQUAL(read_batches.size(), 4);
}

FIXTURE_TEST(test_truncate_in_the_middle_of_segment, storage_test_fixture) {
storage::log_manager mgr = make_log_manager();
info("config: {}", mgr.config());
Expand Down
3 changes: 2 additions & 1 deletion src/v/storage/tests/storage_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "random/generators.h"
#include "reflection/adl.h"
#include "seastarx.h"
#include "storage/disk_log_impl.h"
#include "storage/kvstore.h"
#include "storage/log_manager.h"
#include "storage/types.h"
Expand Down Expand Up @@ -289,7 +290,7 @@ class storage_test_fixture {
lstats.start_offset,
lstats.committed_offset,
ss::default_priority_class());
auto reader = log->make_reader(std::move(cfg)).get0();
auto reader = dynamic_cast<storage::disk_log_impl*>(log.get())->make_unchecked_reader(std::move(cfg)).get0();
return reader.consume(batch_validating_consumer{}, model::no_timeout)
.get0();
}
Expand Down

0 comments on commit 036f04d

Please sign in to comment.