Skip to content

Commit

Permalink
Merge pull request #5934 from abhijat/remove-archiver-condition
Browse files Browse the repository at this point in the history
cloud_storage: adjust condition to remove archiver
  • Loading branch information
abhijat committed Aug 10, 2022
2 parents f2d73b1 + a7ff954 commit 56e47a5
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 15 deletions.
24 changes: 18 additions & 6 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,15 @@ ntp_archiver::ntp_archiver(

void ntp_archiver::run_sync_manifest_loop() {
vassert(
!_sync_manifest_loop_started,
_upload_loop_state == loop_state::initial,
"attempt to start manifest sync loop for {} when upload loop has been "
"active",
_ntp);
vassert(
_sync_manifest_loop_state != loop_state::started,
"sync manifest loop for ntp {} already started",
_ntp);
_sync_manifest_loop_started = true;
_sync_manifest_loop_state = loop_state::started;

// NOTE: not using ssx::spawn_with_gate_then here because we want to log
// inside the gate (so that _rtclog is guaranteed to be alive).
Expand All @@ -104,15 +109,22 @@ void ntp_archiver::run_sync_manifest_loop() {
})
.finally([this] {
vlog(_rtclog.debug, "sync manifest loop stopped");
_sync_manifest_loop_stopped = true;
_sync_manifest_loop_state = loop_state::stopped;
});
});
}

void ntp_archiver::run_upload_loop() {
vassert(
!_upload_loop_started, "upload loop for ntp {} already started", _ntp);
_upload_loop_started = true;
_sync_manifest_loop_state == loop_state::initial,
"attempt to start upload loop for {} when manifest sync loop has been "
"active",
_ntp);
vassert(
_upload_loop_state != loop_state::started,
"upload loop for ntp {} already started",
_ntp);
_upload_loop_state = loop_state::started;

// NOTE: not using ssx::spawn_with_gate_then here because we want to log
// inside the gate (so that _rtclog is guaranteed to be alive).
Expand All @@ -127,7 +139,7 @@ void ntp_archiver::run_upload_loop() {
})
.finally([this] {
vlog(_rtclog.debug, "upload loop stopped");
_upload_loop_stopped = true;
_upload_loop_state = loop_state::stopped;
});
});
}
Expand Down
27 changes: 21 additions & 6 deletions src/v/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ class ntp_archiver {
using back_insert_iterator
= std::back_insert_iterator<std::vector<segment_name>>;

enum class loop_state {
initial,
started,
stopped,
};

/// Create new instance
///
/// \param ntp is an ntp that archiver is responsible for
Expand All @@ -80,9 +86,20 @@ class ntp_archiver {
/// completed
ss::future<> stop();

bool upload_loop_stopped() const { return _upload_loop_stopped; }
bool upload_loop_stopped() const {
return _upload_loop_state == loop_state::stopped;
}

bool sync_manifest_loop_stopped() const {
return _sync_manifest_loop_stopped;
return _sync_manifest_loop_state == loop_state::stopped;
}

/// Query if either of the manifest sync loop or upload loop has stopped
/// These are mutually exclusive loops, and if any one has transitioned to a
/// stopped state then the archiver is stopped.
bool is_loop_stopped() const {
return _sync_manifest_loop_state == loop_state::stopped
|| _upload_loop_state == loop_state::stopped;
}

/// Get NTP
Expand Down Expand Up @@ -216,11 +233,9 @@ class ntp_archiver {
ss::lowres_clock::time_point _last_upload_time;
ss::scheduling_group _upload_sg;
ss::io_priority_class _io_priority;
bool _upload_loop_started = false;
bool _upload_loop_stopped = false;

bool _sync_manifest_loop_started = false;
bool _sync_manifest_loop_stopped = false;
loop_state _upload_loop_state{loop_state::initial};
loop_state _sync_manifest_loop_state{loop_state::initial};
};

} // namespace archival
4 changes: 1 addition & 3 deletions src/v/archival/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,7 @@ ss::future<> scheduler_service_impl::reconcile_archivers() {
// find archivers that have already stopped
for (const auto& [ntp, archiver] : _archivers) {
auto p = pm.get(ntp);
if (
!p
|| (archiver->upload_loop_stopped() && archiver->sync_manifest_loop_stopped())) {
if (!p || archiver->is_loop_stopped()) {
to_remove.push_back(ntp);
}
}
Expand Down

0 comments on commit 56e47a5

Please sign in to comment.