Skip to content

Commit

Permalink
Merge pull request #5544 from Lazin/feature/si-recovery-tool
Browse files Browse the repository at this point in the history
cloud_storage: Fix race condition in topic recovery
  • Loading branch information
Lazin committed Jul 29, 2022
2 parents e08b1f9 + cec9ba2 commit cdae7df
Show file tree
Hide file tree
Showing 14 changed files with 292 additions and 141 deletions.
9 changes: 8 additions & 1 deletion src/v/archival/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ ss::future<> scheduler_service_impl::add_ntp_archiver(
if (part->get_ntp_config().is_read_replica_mode_enabled()) {
archiver->run_sync_manifest_loop();
} else {
if (ntp.tp.partition == 0) {
// Upload manifest once per topic. GCS has strict
// limits for single object updates.
ssx::background = upload_topic_manifest(
model::topic_namespace(ntp.ns, ntp.tp.topic),
archiver->get_revision_id());
}
_probe.start_archiving_ntp();
archiver->run_upload_loop();
}
Expand All @@ -264,7 +271,7 @@ ss::future<> scheduler_service_impl::add_ntp_archiver(
if (ntp.tp.partition == 0) {
// Upload manifest once per topic. GCS has strict
// limits for single object updates.
(void)upload_topic_manifest(
ssx::background = upload_topic_manifest(
model::topic_namespace(ntp.ns, ntp.tp.topic),
archiver->get_revision_id());
}
Expand Down
111 changes: 46 additions & 65 deletions src/v/cloud_storage/partition_recovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ partition_recovery_manager::~partition_recovery_manager() {

ss::future<> partition_recovery_manager::stop() { co_await _gate.close(); }

ss::future<log_recovery_result>
partition_recovery_manager::download_log(const storage::ntp_config& ntp_cfg) {
ss::future<log_recovery_result> partition_recovery_manager::download_log(
const storage::ntp_config& ntp_cfg, cluster::remote_topic_properties rtp) {
if (!ntp_cfg.has_overrides()) {
vlog(
cst_log.debug, "No overrides for {} found, skipping", ntp_cfg.ntp());
Expand All @@ -101,19 +101,21 @@ partition_recovery_manager::download_log(const storage::ntp_config& ntp_cfg) {
co_return log_recovery_result{};
}
partition_downloader downloader(
ntp_cfg, &_remote.local(), _bucket, _gate, _root);
ntp_cfg, &_remote.local(), rtp, _bucket, _gate, _root);
co_return co_await downloader.download_log();
}

partition_downloader::partition_downloader(
const storage::ntp_config& ntpc,
remote* remote,
cluster::remote_topic_properties rtp,
s3::bucket_name bucket,
ss::gate& gate_root,
retry_chain_node& parent)
: _ntpc(ntpc)
, _bucket(std::move(bucket))
, _remote(remote)
, _rtp(rtp)
, _gate(gate_root)
, _rtcnode(download_timeout, initial_backoff, &parent)
, _ctxlog(
Expand Down Expand Up @@ -244,30 +246,21 @@ partition_downloader::download_log(const remote_manifest_path& manifest_key) {
retention);
auto mat = co_await find_recovery_material(manifest_key);
auto offset_map = co_await build_offset_map(mat);
partition_manifest target(_ntpc.ntp(), _ntpc.get_initial_revision());
for (const auto& kv : offset_map) {
target.add(kv.second.manifest_key, kv.second.meta);
}
if (cst_log.is_enabled(ss::log_level::debug)) {
std::stringstream ostr;
target.serialize(ostr);
vlog(_ctxlog.debug, "Generated partition manifest: {}", ostr.str());
}
// Here the partition manifest 'target' may contain segments
// that have different revision ids inside the path.
if (target.size() == 0) {
mat.partition_manifest.serialize(ostr);
vlog(
_ctxlog.error,
"No segments found. Empty partition manifest generated.");
throw missing_partition_exception(_ntpc);
_ctxlog.debug,
"Partition manifest used for recovery: {}",
ostr.str());
}
download_part part;
if (std::holds_alternative<std::monostate>(retention)) {
static constexpr std::chrono::seconds one_day = 86400s;
static constexpr auto one_week = one_day * 7;
vlog(_ctxlog.info, "Default retention parameters are used.");
part = co_await download_log_with_capped_time(
offset_map, target, prefix, one_week);
offset_map, mat.partition_manifest, prefix, one_week);
} else if (std::holds_alternative<size_bound_deletion_parameters>(
retention)) {
auto r = std::get<size_bound_deletion_parameters>(retention);
Expand All @@ -276,7 +269,7 @@ partition_downloader::download_log(const remote_manifest_path& manifest_key) {
"Size bound retention is used. Size limit: {} bytes.",
r.retention_bytes);
part = co_await download_log_with_capped_size(
offset_map, target, prefix, r.retention_bytes);
offset_map, mat.partition_manifest, prefix, r.retention_bytes);
} else if (std::holds_alternative<time_bound_deletion_parameters>(
retention)) {
auto r = std::get<time_bound_deletion_parameters>(retention);
Expand All @@ -285,55 +278,28 @@ partition_downloader::download_log(const remote_manifest_path& manifest_key) {
"Time bound retention is used. Time limit: {}ms.",
r.retention_duration.count());
part = co_await download_log_with_capped_time(
offset_map, target, prefix, r.retention_duration);
offset_map, mat.partition_manifest, prefix, r.retention_duration);
}
// Move parts to final destinations
co_await move_parts(part);

auto upl_result = co_await _remote->upload_manifest(
_bucket, target, _rtcnode);
// If the manifest upload fails we can't continue
// since it will damage the data in S3. The archival subsystem
// will pick new partition after the leader will be elected. Then
// it won't find the manifest in place and will create a new one.
// If the manifest name in S3 matches the old manifest name it will
// be overwriten and some data may be lost as a result.
vassert(
upl_result == upload_result::success,
"Can't upload new manifest {} after recovery",
target.get_manifest_path());

// TODO (evgeny): clean up old manifests on success. Take into account
// that old and new manifest names could match.

// Upload topic manifest for re-created topic (here we don't prevent
// other partitions of the same topic to read old topic manifest if the
// revision is different).
if (mat.topic_manifest.get_revision() != _ntpc.get_initial_revision()) {
mat.topic_manifest.set_revision(_ntpc.get_initial_revision());
upl_result = co_await _remote->upload_manifest(
_bucket, mat.topic_manifest, _rtcnode);
if (upl_result != upload_result::success) {
// That's probably fine since the archival subsystem will
// re-upload topic manifest eventually.
vlog(
_ctxlog.warn,
"Failed to upload new topic manifest {} after recovery",
target.get_manifest_path());
}
}
log_recovery_result result{
.completed = true,
.min_kafka_offset = part.range.min_offset,
.max_kafka_offset = part.range.max_offset,
.manifest = target,
.manifest = mat.partition_manifest,
};
co_return result;
}

void partition_downloader::update_downloaded_offsets(
std::vector<partition_downloader::offset_range> dloffsets,
partition_downloader::download_part& dlpart) {
auto to_erase = std::remove_if(
dloffsets.begin(), dloffsets.end(), [](offset_range r) {
return r.max_offset == model::offset::min();
});
dloffsets.erase(to_erase, dloffsets.end());
std::sort(dloffsets.begin(), dloffsets.end());
for (auto it = dloffsets.rbegin(); it != dloffsets.rend(); it++) {
auto offsets = *it;
Expand Down Expand Up @@ -434,6 +400,11 @@ partition_downloader::download_log_with_capped_size(
}
});
update_downloaded_offsets(std::move(dloffsets), dlpart);
if (dlpart.num_files == 0) {
// The segments didn't have data batches
dlpart.range.min_offset = manifest.get_last_offset();
dlpart.range.max_offset = manifest.get_last_offset();
}
co_return dlpart;
}

Expand Down Expand Up @@ -520,6 +491,11 @@ partition_downloader::download_log_with_capped_time(
}
});
update_downloaded_offsets(std::move(dloffsets), dlpart);
if (dlpart.num_files == 0) {
// The segments didn't have data batches
dlpart.range.min_offset = manifest.get_last_offset();
dlpart.range.max_offset = manifest.get_last_offset();
}
co_return dlpart;
}

Expand All @@ -539,13 +515,8 @@ ss::future<partition_downloader::recovery_material>
partition_downloader::find_recovery_material(const remote_manifest_path& key) {
vlog(_ctxlog.info, "Downloading topic manifest {}", key);
recovery_material recovery_mat;
auto result = co_await _remote->download_manifest(
_bucket, key, recovery_mat.topic_manifest, _rtcnode);
if (result != download_result::success) {
throw missing_partition_exception(key);
}
auto orig_rev = recovery_mat.topic_manifest.get_revision();
partition_manifest tmp(_ntpc.ntp(), orig_rev);

partition_manifest tmp(_ntpc.ntp(), _rtp.remote_revision);
auto res = co_await _remote->download_manifest(
_bucket, tmp.get_manifest_path(), tmp, _rtcnode);
if (res != download_result::success) {
Expand Down Expand Up @@ -625,12 +596,22 @@ partition_downloader::download_segment_file(
min_offset = stream_stats.min_offset;
max_offset = stream_stats.max_offset;

vlog(
_ctxlog.debug,
"Log segment downloaded. {} bytes expected, {} bytes after "
"pre-processing.",
len,
stream_stats.size_bytes);
if (stream_stats.size_bytes == 0) {
vlog(
_ctxlog.debug,
"Log segment downloaded empty. Original size: {}.",
len);
// The segment is empty after filtering
co_await ss::remove_file(localpath.native());
} else {
vlog(
_ctxlog.debug,
"Log segment downloaded. {} bytes expected, {} bytes after "
"pre-processing.",
len,
stream_stats.size_bytes);
}

co_return stream_stats.size_bytes;
};

Expand Down
8 changes: 5 additions & 3 deletions src/v/cloud_storage/partition_recovery_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "cloud_storage/remote.h"
#include "cloud_storage/topic_manifest.h"
#include "cloud_storage/types.h"
#include "cluster/types.h"
#include "model/record.h"
#include "s3/client.h"
#include "storage/ntp_config.h"
Expand Down Expand Up @@ -71,8 +72,8 @@ class partition_recovery_manager {
/// \return download result struct that contains 'completed=true'
/// if actual download happened. The 'last_offset' field will
/// be set to max offset of the downloaded log.
ss::future<log_recovery_result>
download_log(const storage::ntp_config& ntp_cfg);
ss::future<log_recovery_result> download_log(
const storage::ntp_config& ntp_cfg, cluster::remote_topic_properties rtp);

private:
s3::bucket_name _bucket;
Expand All @@ -90,6 +91,7 @@ class partition_downloader {
partition_downloader(
const storage::ntp_config& ntpc,
remote* remote,
cluster::remote_topic_properties rtp,
s3::bucket_name bucket,
ss::gate& gate_root,
retry_chain_node& parent);
Expand Down Expand Up @@ -121,7 +123,6 @@ class partition_downloader {
download_manifest(const remote_manifest_path& path);

struct recovery_material {
topic_manifest topic_manifest;
partition_manifest partition_manifest;
};

Expand Down Expand Up @@ -191,6 +192,7 @@ class partition_downloader {
const storage::ntp_config& _ntpc;
s3::bucket_name _bucket;
remote* _remote;
cluster::remote_topic_properties _rtp;
ss::gate& _gate;
retry_chain_node _rtcnode;
retry_chain_logger _ctxlog;
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ v_cc_library(
feature_barrier.cc
feature_table.cc
drain_manager.cc
read_replica_manager.cc
remote_topic_configuration_source.cc
partition_balancer_planner.cc
partition_balancer_backend.cc
partition_balancer_rpc_handler.cc
Expand Down
13 changes: 12 additions & 1 deletion src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1437,14 +1437,25 @@ ss::future<std::error_code> controller_backend::create_partition(
}
// no partition exists, create one
if (likely(!partition)) {
// if topic recovery is enabled the information required by it
// need to be passed to the 'manage' call since it's not available
// on parition level.
auto meta = _topics.local().get_topic_metadata(
model::topic_namespace_view(ntp));
std::optional<cluster::remote_topic_properties> remote_properties;
if (meta->get_configuration().is_recovery_enabled()) {
remote_properties
= meta->get_configuration().properties.remote_topic_properties;
}
// we use offset as an rev as it is always increasing and it
// increases while ntp is being created again
f = _partition_manager.local()
.manage(
cfg->make_ntp_config(
_data_directory, ntp.tp.partition, rev, initial_rev.value()),
group_id,
std::move(members))
std::move(members),
remote_properties)
.discard_result();
} else {
// old partition still exists, wait for it to be removed
Expand Down
14 changes: 9 additions & 5 deletions src/v/cluster/partition_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "cluster/archival_metadata_stm.h"
#include "cluster/fwd.h"
#include "cluster/logger.h"
#include "cluster/types.h"
#include "config/configuration.h"
#include "model/metadata.h"
#include "raft/consensus.h"
Expand Down Expand Up @@ -73,9 +74,10 @@ partition_manager::get_topic_partition_table(
ss::future<consensus_ptr> partition_manager::manage(
storage::ntp_config ntp_cfg,
raft::group_id group,
std::vector<model::broker> initial_nodes) {
std::vector<model::broker> initial_nodes,
std::optional<cluster::remote_topic_properties> rtp) {
gate_guard guard(_gate);
auto dl_result = co_await maybe_download_log(ntp_cfg);
auto dl_result = co_await maybe_download_log(ntp_cfg, rtp);
auto [logs_recovered, min_kafka_offset, max_kafka_offset, manifest]
= dl_result;
if (logs_recovered) {
Expand Down Expand Up @@ -151,10 +153,12 @@ ss::future<consensus_ptr> partition_manager::manage(
}

ss::future<cloud_storage::log_recovery_result>
partition_manager::maybe_download_log(storage::ntp_config& ntp_cfg) {
if (_partition_recovery_mgr.local_is_initialized()) {
partition_manager::maybe_download_log(
storage::ntp_config& ntp_cfg,
std::optional<cluster::remote_topic_properties> rtp) {
if (rtp.has_value() && _partition_recovery_mgr.local_is_initialized()) {
auto res = co_await _partition_recovery_mgr.local().download_log(
ntp_cfg);
ntp_cfg, *rtp);
co_return res;
}
vlog(
Expand Down
12 changes: 8 additions & 4 deletions src/v/cluster/partition_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,11 @@ class partition_manager {

ss::future<> start() { return ss::now(); }
ss::future<> stop_partitions();
ss::future<consensus_ptr>
manage(storage::ntp_config, raft::group_id, std::vector<model::broker>);
ss::future<consensus_ptr> manage(
storage::ntp_config,
raft::group_id,
std::vector<model::broker>,
std::optional<cluster::remote_topic_properties> = std::nullopt);

ss::future<> shutdown(const model::ntp& ntp);
ss::future<> remove(const model::ntp& ntp);
Expand Down Expand Up @@ -172,8 +175,9 @@ class partition_manager {
/// In this case this method always returns false.
/// \param ntp_cfg is an ntp_config instance to recover
/// \return true if the recovery was invoked, false otherwise
ss::future<cloud_storage::log_recovery_result>
maybe_download_log(storage::ntp_config& ntp_cfg);
ss::future<cloud_storage::log_recovery_result> maybe_download_log(
storage::ntp_config& ntp_cfg,
std::optional<cluster::remote_topic_properties> rtp);

ss::future<> do_shutdown(ss::lw_shared_ptr<partition>);

Expand Down
Loading

0 comments on commit cdae7df

Please sign in to comment.