Skip to content

Commit

Permalink
storage: fix race between segment.ms and appends
Browse files Browse the repository at this point in the history
H/T to VladLazar for a similar change that inspired this one:
VladLazar@682aea5

The problem statement:
```
We have seen a couple of races between the application of `segment.ms`
and the normal append path. They had the following pattern in common:
1. application of `segment.ms` begins
2. a call to `segment::append` is interleaved
3. the append finishes first and and advances the dirty offsets, which
the rolling logic in `segment.ms` does not expect
-- or --
4. `segment.ms` releases the current appender while the append is
ongoing, which the append logic does not expect
```

The proposed fix was to introduce a new appender lock to the segment, and
ensure that it is held while appending an while segment.ms rolling. This
addressed problem #3, but wasn't sufficient to address redpanda-data#4.

The issue with introducing another lock to the segment is that the
unexpected behavior when appending to a segment happens in the context
of an already referenced segment. I.e. the appending fiber may proceed
to reference an appender, only for it to be destructed by the
housekeeping fiber before segment::append() is called, resulting in a
segfault.

This patch extends usage of the existing disk_log_impl::_segments_rolling_lock
to cover the entire duration of append (i.e. not just the underlying
segment::append() call), ensuring that segment.ms rolls and appends are
mutually exclusive.

(cherry picked from commit 78a9749)
  • Loading branch information
andrwng committed Aug 10, 2023
1 parent 136d14a commit 771b31e
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 23 deletions.
43 changes: 31 additions & 12 deletions src/v/storage/disk_log_appender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,22 @@ ss::future<> disk_log_appender::initialize() {
}

bool disk_log_appender::segment_is_appendable(model::term_id batch_term) const {
/**
* _log._segs.empty() is a tricky condition. It is here to suppor concurrent
* truncation (from 0) of an active log segment while we hold the lock of a
* valid segment.
*
* Checking for term is because we support multiple term appends which
* always roll
*
* _bytes_left_in_segment is for initial condition
*
*/
if (!_seg || !_seg->has_appender()) {
// The latest segment with which this log_appender has called
// initialize() has been rolled and no longer has an segment appender
// (e.g. because segment.ms rolled onto a new segment). There is likely
// already a new segment and segment appender and we should reset to
// use them.
return false;
}
// _log._segs.empty() is a tricky condition. It is here to support
// concurrent truncation (from 0) of an active log segment while we hold the
// lock of a valid segment.
//
// Checking for term is because we support multiple term appends which
// always roll
//
// _bytes_left_in_segment is for initial condition
return _bytes_left_in_segment > 0 && _log.term() == batch_term
&& !_log._segs.empty() /*see above before removing this condition*/;
}
Expand All @@ -73,6 +78,19 @@ void disk_log_appender::release_lock() {

ss::future<ss::stop_iteration>
disk_log_appender::operator()(model::record_batch& batch) {
// We use a fast path here since this lock should very rarely be contested.
// An open segment may only have one in-flight append at any given time and
// the only other places this lock is held are during truncation
// (infrequent) or when enforcing segment.ms (which should rarely happen in
// high throughput scenarios).
auto segment_roll_lock_holder = _log.try_segment_roll_lock();
if (!segment_roll_lock_holder.has_value()) {
vlog(
stlog.warn,
"Segment roll lock contested for {}",
_log.config().ntp());
segment_roll_lock_holder = co_await _log.segment_roll_lock();
}
batch.header().base_offset = _idx;
batch.header().header_crc = model::internal_header_only_crc(batch.header());
if (_last_term != batch.term()) {
Expand All @@ -88,7 +106,8 @@ disk_log_appender::operator()(model::record_batch& batch) {
// we might actually have space in the current log, but the
// terms do not match for the current append, so we must roll
release_lock();
co_await _log.maybe_roll(_last_term, _idx, _config.io_priority);
co_await _log.maybe_roll_unlocked(
_last_term, _idx, _config.io_priority);
co_await initialize();
}
co_return co_await append_batch_to_segment(batch);
Expand Down
20 changes: 10 additions & 10 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1115,15 +1115,11 @@ ss::future<> disk_log_impl::force_roll(ss::io_priority_class iopc) {
});
}

ss::future<> disk_log_impl::maybe_roll(
ss::future<> disk_log_impl::maybe_roll_unlocked(
model::term_id t, model::offset next_offset, ss::io_priority_class iopc) {
// This lock will only rarely be contended. If it is held, then
// we must wait for do_housekeeping to complete before proceeding, because
// the log might be in a state mid-roll where it has no appender.
// We need to take this irrespective of whether we're actually rolling
// or not, in order to ensure that writers wait for a background roll
// to complete if one is ongoing.
auto roll_lock_holder = co_await _segments_rolling_lock.get_units();
vassert(
!_segments_rolling_lock.ready(),
"Must have taken _segments_rolling_lock");

vassert(t >= term(), "Term:{} must be greater than base:{}", t, term());
if (_segs.empty()) {
Expand All @@ -1146,8 +1142,12 @@ ss::future<> disk_log_impl::maybe_roll(

ss::future<> disk_log_impl::apply_segment_ms() {
auto gate = _compaction_housekeeping_gate.hold();
// do_housekeeping races with maybe_roll to use new_segment.
// take a lock to prevent problems
// Holding the lock blocks writes to the last open segment.
// This is required in order to avoid the logic in this function
// racing with an inflight append. Contention on this lock should
// be very light, since we wouldn't need to enforce segment.ms
// if this partition was high throughput (segment would have rolled
// naturally).
auto lock = co_await _segments_rolling_lock.get_units();

if (_segs.empty()) {
Expand Down
18 changes: 17 additions & 1 deletion src/v/storage/disk_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ class disk_log_impl final : public log::impl {
get_term_last_offset(model::term_id term) const final;
std::ostream& print(std::ostream&) const final;

ss::future<> maybe_roll(
// Must be called while _segments_rolling_lock is held.
ss::future<> maybe_roll_unlocked(
model::term_id, model::offset next_offset, ss::io_priority_class);

// roll immediately with the current term. users should prefer the
Expand All @@ -111,6 +112,14 @@ class disk_log_impl final : public log::impl {

int64_t compaction_backlog() const final;

std::optional<ssx::semaphore_units> try_segment_roll_lock() {
return _segments_rolling_lock.try_get_units();
}

ss::future<ssx::semaphore_units> segment_roll_lock() {
return _segments_rolling_lock.get_units();
}

private:
friend class disk_log_appender; // for multi-term appends
friend class disk_log_builder; // for tests
Expand Down Expand Up @@ -225,6 +234,13 @@ class disk_log_impl final : public log::impl {