Skip to content

Commit

Permalink
raft: distinguish abort & timeout in offset_monitor
Browse files Browse the repository at this point in the history
Aborts should be propagated as the standard
ss::abort_requested_exception type which is understood
by handlers to be ignored silently, as it occurs during
normal shutdown.

Timeouts remain specific exception type in offset_monitor,
and in locations that used to catch + swallow both aborts
and timeouts, timeouts are logged at WARN severity, as they
are not necessarily indicative of a fault, but may indicate
a system not operating at its best.

Fixes: redpanda-data#5154
  • Loading branch information
jcsp committed Sep 15, 2022
1 parent 6883b18 commit 927ea66
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 63 deletions.
22 changes: 19 additions & 3 deletions src/v/cluster/persisted_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,14 @@ ss::future<bool> persisted_stm::do_sync(
co_return false;
} catch (const ss::condition_variable_timed_out&) {
co_return false;
} catch (const raft::offset_monitor::wait_aborted&) {
} catch (const raft::offset_monitor::wait_timed_out&) {
vlog(
clusterlog.warn,
"sync timeout: waiting for offset={}; committed "
"offset={}; ntp={}",
offset,
committed,
ntp);
co_return false;
} catch (...) {
vlog(
Expand Down Expand Up @@ -304,10 +311,19 @@ ss::future<bool> persisted_stm::wait_no_throw(
auto deadline = model::timeout_clock::now() + timeout;
return wait(offset, deadline)
.then([] { return true; })
.handle_exception_type([](const raft::offset_monitor::wait_aborted&) {
vlog(clusterlog.trace, "aborted while waiting (shutting down)");
.handle_exception_type([](const ss::abort_requested_exception&) {
// Shutting down
return false;
})
.handle_exception_type(
[offset, ntp = _c->ntp()](const raft::offset_monitor::wait_timed_out&) {
vlog(
clusterlog.warn,
"timed out while waiting for offset: {}, ntp: {}",
offset,
ntp);
return false;
})
.handle_exception([offset, ntp = _c->ntp()](std::exception_ptr e) {
vlog(
clusterlog.error,
Expand Down
30 changes: 24 additions & 6 deletions src/v/raft/offset_monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace raft {

void offset_monitor::stop() {
for (auto& waiter : _waiters) {
waiter.second->done.set_exception(wait_aborted());
waiter.second->done.set_exception(ss::abort_requested_exception());
}
_waiters.clear();
}
Expand Down Expand Up @@ -62,22 +62,40 @@ offset_monitor::waiter::waiter(
: mon(mon) {
if (as) {
auto opt_sub = as->get().subscribe(
[this]() noexcept { handle_abort(); });
[this]() noexcept { handle_abort(false); });
if (opt_sub) {
sub = std::move(*opt_sub);
} else {
done.set_exception(wait_aborted());
done.set_exception(ss::abort_requested_exception());
return;
}
}
if (timeout != model::no_timeout) {
timer.set_callback([this] { handle_abort(); });
timer.set_callback([this] { handle_abort(true); });
timer.arm(timeout);
}
}

void offset_monitor::waiter::handle_abort() {
done.set_exception(wait_aborted());
/**
* Set an exception on our `done` promise and clean up waiters.
*
* This is behaviourally the same for aborts and timeouts, but
* we raise a different exception for those respective cases
* to help callers interpret it properly (distinguish between
* worrying timeouts, and non-worrying aborts during shutdown).
*
* @param is_timeout control the exception type that will be set
* on the future.
*/
void offset_monitor::waiter::handle_abort(bool is_timeout) {
if (is_timeout) {
done.set_exception(wait_timed_out());
} else {
// Use the generic seastar abort_requested_exception, because
// in many locations we handle this gracefully and do not log
// it as an error during shutdown.
done.set_exception(ss::abort_requested_exception());
}
auto it = std::find_if(
mon->_waiters.begin(),
mon->_waiters.end(),
Expand Down
6 changes: 3 additions & 3 deletions src/v/raft/offset_monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ class offset_monitor {
* Exception used to indicate an aborted wait, either from a requested abort
* via an abort source or because a timeout occurred.
*/
class wait_aborted final : public std::exception {
class wait_timed_out final : public std::exception {
public:
virtual const char* what() const noexcept final {
return "offset monitor wait aborted";
return "offset monitor wait timed out";
}
};

Expand Down Expand Up @@ -78,7 +78,7 @@ class offset_monitor {
model::timeout_clock::time_point,
std::optional<std::reference_wrapper<ss::abort_source>>);

void handle_abort();
void handle_abort(bool is_timeout);
};

friend waiter;
Expand Down
8 changes: 6 additions & 2 deletions src/v/raft/state_machine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,15 @@ ss::future<> state_machine::apply() {
}
});
})
.handle_exception_type([](const raft::offset_monitor::wait_aborted&) {})
.handle_exception_type([](const raft::offset_monitor::wait_timed_out&) {})
.handle_exception_type([](const ss::abort_requested_exception&) {})
.handle_exception_type([](const ss::gate_closed_exception&) {})
.handle_exception([this](const std::exception_ptr& e) {
vlog(
_log.info, "State machine for ntp={} handles {}", _raft->ntp(), e);
_log.info,
"State machine for ntp={} caught exception {}",
_raft->ntp(),
e);
});
}

Expand Down
56 changes: 7 additions & 49 deletions src/v/raft/tests/offset_monitor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,7 @@ SEASTAR_THREAD_TEST_CASE(stop) {
BOOST_REQUIRE(mon.empty());

for (auto& f : fs) {
BOOST_CHECK_EXCEPTION(
f.get(),
raft::offset_monitor::wait_aborted,
[](raft::offset_monitor::wait_aborted e) {
return std::string(e.what()).find("offset monitor wait aborted")
!= std::string::npos;
});
BOOST_CHECK_THROW(f.get(), ss::abort_requested_exception);
}
}

Expand Down Expand Up @@ -154,13 +148,7 @@ SEASTAR_THREAD_TEST_CASE(wait_timeout) {
BOOST_REQUIRE(!f0.available());
BOOST_REQUIRE(!mon.empty());

BOOST_CHECK_EXCEPTION(
f0.get(),
raft::offset_monitor::wait_aborted,
[](raft::offset_monitor::wait_aborted e) {
return std::string(e.what()).find("offset monitor wait aborted")
!= std::string::npos;
});
BOOST_CHECK_THROW(f0.get(), raft::offset_monitor::wait_timed_out);

BOOST_REQUIRE(mon.empty());
}
Expand All @@ -176,13 +164,7 @@ SEASTAR_THREAD_TEST_CASE(wait_abort_source) {
as.request_abort();
BOOST_REQUIRE(f0.failed());

BOOST_CHECK_EXCEPTION(
f0.get(),
raft::offset_monitor::wait_aborted,
[](raft::offset_monitor::wait_aborted e) {
return std::string(e.what()).find("offset monitor wait aborted")
!= std::string::npos;
});
BOOST_CHECK_THROW(f0.get(), ss::abort_requested_exception);

BOOST_REQUIRE(mon.empty());
}
Expand All @@ -196,13 +178,7 @@ SEASTAR_THREAD_TEST_CASE(wait_abort_source_already_aborted) {
BOOST_REQUIRE(mon.empty());

BOOST_REQUIRE(f0.failed());
BOOST_CHECK_EXCEPTION(
f0.get(),
raft::offset_monitor::wait_aborted,
[](raft::offset_monitor::wait_aborted e) {
return std::string(e.what()).find("offset monitor wait aborted")
!= std::string::npos;
});
BOOST_CHECK_THROW(f0.get(), ss::abort_requested_exception);
}

SEASTAR_THREAD_TEST_CASE(wait_abort_source_with_timeout_first) {
Expand All @@ -215,13 +191,7 @@ SEASTAR_THREAD_TEST_CASE(wait_abort_source_with_timeout_first) {

// there is an abort source, but only wait on timeout

BOOST_CHECK_EXCEPTION(
f0.get(),
raft::offset_monitor::wait_aborted,
[](raft::offset_monitor::wait_aborted e) {
return std::string(e.what()).find("offset monitor wait aborted")
!= std::string::npos;
});
BOOST_CHECK_THROW(f0.get(), raft::offset_monitor::wait_timed_out);

BOOST_REQUIRE(mon.empty());
}
Expand All @@ -238,13 +208,7 @@ SEASTAR_THREAD_TEST_CASE(wait_abort_source_with_timeout_abort_first) {
BOOST_REQUIRE(f0.failed());
BOOST_REQUIRE(mon.empty());

BOOST_CHECK_EXCEPTION(
f0.get(),
raft::offset_monitor::wait_aborted,
[](raft::offset_monitor::wait_aborted e) {
return std::string(e.what()).find("offset monitor wait aborted")
!= std::string::npos;
});
BOOST_CHECK_THROW(f0.get(), ss::abort_requested_exception);

BOOST_REQUIRE(mon.empty());
}
Expand All @@ -262,13 +226,7 @@ SEASTAR_THREAD_TEST_CASE(wait_abort_source_with_timeout_abort_before_timeout) {

as.request_abort();

BOOST_CHECK_EXCEPTION(
f0.get(),
raft::offset_monitor::wait_aborted,
[](raft::offset_monitor::wait_aborted e) {
return std::string(e.what()).find("offset monitor wait aborted")
!= std::string::npos;
});
BOOST_CHECK_THROW(f0.get(), ss::abort_requested_exception);

BOOST_REQUIRE(mon.empty());
}

0 comments on commit 927ea66

Please sign in to comment.