Skip to content

Commit

Permalink
Merge pull request #5242 from LenaAn/rr_fetch
Browse files Browse the repository at this point in the history
read replica: always serve cloud data for read replicas
  • Loading branch information
Lena Anyusheva committed Jul 13, 2022
2 parents bcc1d10 + f6e39b5 commit e5cca4c
Show file tree
Hide file tree
Showing 19 changed files with 409 additions and 397 deletions.
87 changes: 87 additions & 0 deletions src/v/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ ntp_archiver::ntp_archiver(
, _manifest_upload_timeout(conf.manifest_upload_timeout)
, _upload_loop_initial_backoff(conf.upload_loop_initial_backoff)
, _upload_loop_max_backoff(conf.upload_loop_max_backoff)
, _sync_manifest_timeout(
config::shard_local_cfg()
.cloud_storage_readreplica_manifest_sync_timeout_ms.bind())
, _upload_sg(conf.upload_scheduling_group)
, _io_priority(conf.upload_io_priority) {
vassert(
Expand All @@ -81,6 +84,30 @@ ntp_archiver::ntp_archiver(
_start_term);
}

void ntp_archiver::run_sync_manifest_loop() {
vassert(
!_sync_manifest_loop_started,
"sync manifest loop for ntp {} already started",
_ntp);
_sync_manifest_loop_started = true;

// NOTE: not using ssx::spawn_with_gate_then here because we want to log
// inside the gate (so that _rtclog is guaranteed to be alive).
ssx::spawn_with_gate(_gate, [this] {
return sync_manifest_loop()
.handle_exception_type([](const ss::abort_requested_exception&) {})
.handle_exception_type([](const ss::sleep_aborted&) {})
.handle_exception_type([](const ss::gate_closed_exception&) {})
.handle_exception([this](std::exception_ptr e) {
vlog(_rtclog.error, "sync manifest loop error: {}", e);
})
.finally([this] {
vlog(_rtclog.debug, "sync manifest loop stopped");
_sync_manifest_loop_stopped = true;
});
});
}

void ntp_archiver::run_upload_loop() {
vassert(
!_upload_loop_started, "upload loop for ntp {} already started", _ntp);
Expand Down Expand Up @@ -150,12 +177,72 @@ ss::future<> ntp_archiver::upload_loop() {
}
}

ss::future<> ntp_archiver::sync_manifest_loop() {
while (sync_manifest_loop_can_continue()) {
cloud_storage::download_result result = co_await sync_manifest();

if (result != cloud_storage::download_result::success) {
// The logic in class `remote` already does retries: if we get here,
// it means the download failed after several retries, indicating
// something non-transient may be wrong. Hence error severity.
vlog(
_rtclog.error,
"Failed to download manifest {}",
_manifest.get_manifest_path());
} else {
vlog(
_rtclog.debug,
"Successfuly downloaded manifest {}",
_manifest.get_manifest_path());
}
co_await ss::sleep_abortable(_sync_manifest_timeout(), _as);
}
}

ss::future<cloud_storage::download_result> ntp_archiver::sync_manifest() {
cloud_storage::download_result r = co_await download_manifest();
if (r == cloud_storage::download_result::success) {
vlog(_rtclog.debug, "Downloading manifest in read-replica mode");
if (_partition->archival_meta_stm()) {
vlog(
_rtclog.debug,
"Updating the archival_meta_stm in read-replica mode");
auto deadline = ss::lowres_clock::now() + _manifest_upload_timeout;
auto error = co_await _partition->archival_meta_stm()->add_segments(
_manifest, deadline, _as);
if (
error != cluster::errc::success
&& error != cluster::errc::not_leader) {
vlog(
_rtclog.warn,
"archival metadata STM update failed: {}",
error);
}
auto last_offset
= _partition->archival_meta_stm()->manifest().get_last_offset();
vlog(_rtclog.debug, "manifest last_offset: {}", last_offset);
}
} else {
vlog(
_rtclog.error,
"Failed to download partition manifest in read-replica mode");
}
co_return r;
}

bool ntp_archiver::upload_loop_can_continue() const {
return !_as.abort_requested() && !_gate.is_closed()
&& _partition->is_elected_leader()
&& _partition->term() == _start_term;
}

bool ntp_archiver::sync_manifest_loop_can_continue() const {
// todo: think about it
return !_as.abort_requested() && !_gate.is_closed()
&& _partition->is_elected_leader()
&& _partition->term() == _start_term;
}

ss::future<> ntp_archiver::stop() {
_as.request_abort();
return _gate.close();
Expand Down
15 changes: 15 additions & 0 deletions src/v/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,18 @@ class ntp_archiver {
/// storage. Can be started only once.
void run_upload_loop();

void run_sync_manifest_loop();

/// Stop archiver.
///
/// \return future that will become ready when all async operation will be
/// completed
ss::future<> stop();

bool upload_loop_stopped() const { return _upload_loop_stopped; }
bool sync_manifest_loop_stopped() const {
return _sync_manifest_loop_stopped;
}

/// Get NTP
const model::ntp& get_ntp() const;
Expand Down Expand Up @@ -112,6 +117,8 @@ class ntp_archiver {
ss::future<batch_result> upload_next_candidates(
std::optional<model::offset> last_stable_offset_override = std::nullopt);

ss::future<cloud_storage::download_result> sync_manifest();

uint64_t estimate_backlog_size();

/// \brief Probe remote storage and truncate the manifest if needed
Expand Down Expand Up @@ -166,7 +173,11 @@ class ntp_archiver {
/// Launch the upload loop fiber.
ss::future<> upload_loop();

/// Launch the sync manifest loop fiber.
ss::future<> sync_manifest_loop();

bool upload_loop_can_continue() const;
bool sync_manifest_loop_can_continue() const;

ntp_level_probe _probe;
model::ntp _ntp;
Expand All @@ -190,13 +201,17 @@ class ntp_archiver {
ss::semaphore _mutex{1};
ss::lowres_clock::duration _upload_loop_initial_backoff;
ss::lowres_clock::duration _upload_loop_max_backoff;
config::binding<std::chrono::milliseconds> _sync_manifest_timeout;
simple_time_jitter<ss::lowres_clock> _backoff_jitter{100ms};
size_t _concurrency{4};
ss::lowres_clock::time_point _last_upload_time;
ss::scheduling_group _upload_sg;
ss::io_priority_class _io_priority;
bool _upload_loop_started = false;
bool _upload_loop_stopped = false;

bool _sync_manifest_loop_started = false;
bool _sync_manifest_loop_stopped = false;
};

} // namespace archival
104 changes: 59 additions & 45 deletions src/v/archival/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,48 +231,56 @@ ss::future<> scheduler_service_impl::add_ntp_archiver(
if (_gate.is_closed()) {
return ss::now();
}
return archiver->download_manifest().then(
[this, archiver](cloud_storage::download_result result) {
auto ntp = archiver->get_ntp();
switch (result) {
case cloud_storage::download_result::success:
vlog(
_rtclog.info,
"Found manifest for partition {}",
archiver->get_ntp());
_probe.start_archiving_ntp();

_archivers.emplace(archiver->get_ntp(), archiver);
archiver->run_upload_loop();

return ss::now();
case cloud_storage::download_result::notfound:
vlog(
_rtclog.info,
"Start archiving new partition {}",
archiver->get_ntp());
// Start topic manifest upload
// asynchronously
if (ntp.tp.partition == 0) {
// Upload manifest once per topic. GCS has strict
// limits for single object updates.
(void)upload_topic_manifest(
model::topic_namespace(ntp.ns, ntp.tp.topic),
archiver->get_revision_id());
}
_probe.start_archiving_ntp();

_archivers.emplace(archiver->get_ntp(), archiver);
archiver->run_upload_loop();
return archiver->download_manifest().then([this, archiver](
cloud_storage::download_result
result) {
auto ntp = archiver->get_ntp();
auto part = _partition_manager.local().get(ntp);
switch (result) {
case cloud_storage::download_result::success:
vlog(_rtclog.info, "Found manifest for partition {}", ntp);

if (part->get_ntp_config().is_read_replica_mode_enabled()) {
archiver->run_sync_manifest_loop();
} else {
_probe.start_archiving_ntp();
archiver->run_upload_loop();
}
_archivers.emplace(ntp, archiver);

return ss::now();
case cloud_storage::download_result::notfound:
if (part->get_ntp_config().is_read_replica_mode_enabled()) {
vlog(
_rtclog.info,
"Couldn't download manifest for partition {} in read replica",
ntp);
archiver->run_sync_manifest_loop();
} else {
vlog(_rtclog.info, "Start archiving new partition {}", ntp);
// Start topic manifest upload
// asynchronously
if (ntp.tp.partition == 0) {
// Upload manifest once per topic. GCS has strict
// limits for single object updates.
(void)upload_topic_manifest(
model::topic_namespace(ntp.ns, ntp.tp.topic),
archiver->get_revision_id());
}
_probe.start_archiving_ntp();

archiver->run_upload_loop();
}
_archivers.emplace(ntp, archiver);

return ss::now();
case cloud_storage::download_result::failed:
case cloud_storage::download_result::timedout:
vlog(_rtclog.warn, "Manifest download failed");
return ss::make_exception_future<>(ss::timed_out_error());
}
return ss::now();
});
return ss::now();
case cloud_storage::download_result::failed:
case cloud_storage::download_result::timedout:
vlog(_rtclog.warn, "Manifest download failed");
return ss::make_exception_future<>(ss::timed_out_error());
}
return ss::now();
});
}

ss::future<>
Expand All @@ -284,9 +292,13 @@ scheduler_service_impl::create_archivers(std::vector<model::ntp> to_create) {
std::move(to_create), concurrency, [this](const model::ntp& ntp) {
auto log = _partition_manager.local().log(ntp);
auto part = _partition_manager.local().get(ntp);
if (log.has_value() && part && part->is_elected_leader()
&& (part->get_ntp_config().is_archival_enabled()
|| config::shard_local_cfg().cloud_storage_enable_remote_write())) {
if (!log.has_value() || !part || !part->is_elected_leader()) {
return ss::now();
}
if (
part->get_ntp_config().is_archival_enabled()
|| part->get_ntp_config().is_read_replica_mode_enabled()
|| config::shard_local_cfg().cloud_storage_enable_remote_write()) {
auto archiver = ss::make_lw_shared<ntp_archiver>(
log->config(),
_partition_manager.local(),
Expand Down Expand Up @@ -325,7 +337,9 @@ ss::future<> scheduler_service_impl::reconcile_archivers() {
// find archivers that have already stopped
for (const auto& [ntp, archiver] : _archivers) {
auto p = pm.get(ntp);
if (!p || archiver->upload_loop_stopped()) {
if (
!p
|| (archiver->upload_loop_stopped() && archiver->sync_manifest_loop_stopped())) {
to_remove.push_back(ntp);
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/v/cloud_storage/remote_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,14 @@ model::offset remote_partition::first_uploaded_offset() {
}
}

model::offset remote_partition::last_uploaded_offset() {
vassert(
_manifest.size() > 0,
"The manifest for {} is not expected to be empty",
_manifest.get_ntp());
return _manifest.get_last_offset();
}

const model::ntp& remote_partition::get_ntp() const {
return _manifest.get_ntp();
}
Expand Down
3 changes: 3 additions & 0 deletions src/v/cloud_storage/remote_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ class remote_partition
/// Return first uploaded kafka offset
model::offset first_uploaded_offset();

/// Return last uploaded kafka offset
model::offset last_uploaded_offset();

/// Get partition NTP
const model::ntp& get_ntp() const;

Expand Down
20 changes: 18 additions & 2 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ class partition {
return _archival_meta_stm;
}

bool is_read_replica_mode_enabled() const {
const auto& cfg = _raft->log_config();
return cfg.is_read_replica_mode_enabled();
}

/// Return true if shadow indexing is enabled for the partition
bool is_remote_fetch_enabled() const {
const auto& cfg = _raft->log_config();
Expand All @@ -232,17 +237,28 @@ class partition {
model::offset start_cloud_offset() const {
vassert(
cloud_data_available(),
"Method can only be called if cloud data is available");
"Method can only be called if cloud data is available, ntp: {}",
_raft->ntp());
return _cloud_storage_partition->first_uploaded_offset();
}

/// Last available cloud offset
model::offset last_cloud_offset() const {
vassert(
cloud_data_available(),
"Method can only be called if cloud data is available, ntp: {}",
_raft->ntp());
return _cloud_storage_partition->last_uploaded_offset();
}

/// Create a reader that will fetch data from remote storage
ss::future<storage::translating_reader> make_cloud_reader(
storage::log_reader_config config,
std::optional<model::timeout_clock::time_point> deadline = std::nullopt) {
vassert(
cloud_data_available(),
"Method can only be called if cloud data is available");
"Method can only be called if cloud data is available, ntp: {}",
_raft->ntp());
return _cloud_storage_partition->make_reader(config, deadline);
}

Expand Down
7 changes: 5 additions & 2 deletions src/v/cluster/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ bool topic_properties::has_overrides() const {
return cleanup_policy_bitflags || compaction_strategy || segment_size
|| retention_bytes.has_value() || retention_bytes.is_disabled()
|| retention_duration.has_value() || retention_duration.is_disabled()
|| recovery.has_value() || shadow_indexing.has_value();
|| recovery.has_value() || shadow_indexing.has_value()
|| read_replica.has_value();
}

storage::ntp_config::default_overrides
Expand All @@ -69,6 +70,7 @@ topic_properties::get_ntp_cfg_overrides() const {
ret.shadow_indexing_mode = shadow_indexing
? *shadow_indexing
: model::shadow_indexing_mode::disabled;
ret.read_replica = read_replica;
return ret;
}

Expand Down Expand Up @@ -101,7 +103,8 @@ storage::ntp_config topic_configuration::make_ntp_config(
properties.recovery ? *properties.recovery : false),
.shadow_indexing_mode = properties.shadow_indexing
? *properties.shadow_indexing
: model::shadow_indexing_mode::disabled});
: model::shadow_indexing_mode::disabled,
.read_replica = properties.read_replica});
}
return {
model::ntp(tp_ns.ns, tp_ns.tp, p_id),
Expand Down
Loading

0 comments on commit e5cca4c

Please sign in to comment.