diff --git a/src/v/archival/ntp_archiver_service.cc b/src/v/archival/ntp_archiver_service.cc index 7c8c301914b1..bdb481ae5343 100644 --- a/src/v/archival/ntp_archiver_service.cc +++ b/src/v/archival/ntp_archiver_service.cc @@ -67,6 +67,9 @@ ntp_archiver::ntp_archiver( , _manifest_upload_timeout(conf.manifest_upload_timeout) , _upload_loop_initial_backoff(conf.upload_loop_initial_backoff) , _upload_loop_max_backoff(conf.upload_loop_max_backoff) + , _sync_manifest_timeout( + config::shard_local_cfg() + .cloud_storage_readreplica_manifest_sync_timeout_ms.bind()) , _upload_sg(conf.upload_scheduling_group) , _io_priority(conf.upload_io_priority) { vassert( @@ -81,6 +84,30 @@ ntp_archiver::ntp_archiver( _start_term); } +void ntp_archiver::run_sync_manifest_loop() { + vassert( + !_sync_manifest_loop_started, + "sync manifest loop for ntp {} already started", + _ntp); + _sync_manifest_loop_started = true; + + // NOTE: not using ssx::spawn_with_gate_then here because we want to log + // inside the gate (so that _rtclog is guaranteed to be alive). + ssx::spawn_with_gate(_gate, [this] { + return sync_manifest_loop() + .handle_exception_type([](const ss::abort_requested_exception&) {}) + .handle_exception_type([](const ss::sleep_aborted&) {}) + .handle_exception_type([](const ss::gate_closed_exception&) {}) + .handle_exception([this](std::exception_ptr e) { + vlog(_rtclog.error, "sync manifest loop error: {}", e); + }) + .finally([this] { + vlog(_rtclog.debug, "sync manifest loop stopped"); + _sync_manifest_loop_stopped = true; + }); + }); +} + void ntp_archiver::run_upload_loop() { vassert( !_upload_loop_started, "upload loop for ntp {} already started", _ntp); @@ -150,12 +177,72 @@ ss::future<> ntp_archiver::upload_loop() { } } +ss::future<> ntp_archiver::sync_manifest_loop() { + while (sync_manifest_loop_can_continue()) { + cloud_storage::download_result result = co_await sync_manifest(); + + if (result != cloud_storage::download_result::success) { + // The logic in class `remote` already does retries: if we get here, + // it means the download failed after several retries, indicating + // something non-transient may be wrong. Hence error severity. + vlog( + _rtclog.error, + "Failed to download manifest {}", + _manifest.get_manifest_path()); + } else { + vlog( + _rtclog.debug, + "Successfuly downloaded manifest {}", + _manifest.get_manifest_path()); + } + co_await ss::sleep_abortable(_sync_manifest_timeout(), _as); + } +} + +ss::future ntp_archiver::sync_manifest() { + cloud_storage::download_result r = co_await download_manifest(); + if (r == cloud_storage::download_result::success) { + vlog(_rtclog.debug, "Downloading manifest in read-replica mode"); + if (_partition->archival_meta_stm()) { + vlog( + _rtclog.debug, + "Updating the archival_meta_stm in read-replica mode"); + auto deadline = ss::lowres_clock::now() + _manifest_upload_timeout; + auto error = co_await _partition->archival_meta_stm()->add_segments( + _manifest, deadline, _as); + if ( + error != cluster::errc::success + && error != cluster::errc::not_leader) { + vlog( + _rtclog.warn, + "archival metadata STM update failed: {}", + error); + } + auto last_offset + = _partition->archival_meta_stm()->manifest().get_last_offset(); + vlog(_rtclog.debug, "manifest last_offset: {}", last_offset); + } + } else { + vlog( + _rtclog.error, + "Failed to download partition manifest in read-replica mode"); + } + co_return r; +} + bool ntp_archiver::upload_loop_can_continue() const { return !_as.abort_requested() && !_gate.is_closed() && _partition->is_elected_leader() && _partition->term() == _start_term; } +bool ntp_archiver::sync_manifest_loop_can_continue() const { + // todo: think about it + return !_as.abort_requested() && !_gate.is_closed() + && _partition->is_elected_leader() + && _partition->term() == _start_term; +} + ss::future<> ntp_archiver::stop() { _as.request_abort(); return _gate.close(); diff --git a/src/v/archival/ntp_archiver_service.h b/src/v/archival/ntp_archiver_service.h index 1cad7145c822..ca45e7c5868a 100644 --- a/src/v/archival/ntp_archiver_service.h +++ b/src/v/archival/ntp_archiver_service.h @@ -73,6 +73,8 @@ class ntp_archiver { /// storage. Can be started only once. void run_upload_loop(); + void run_sync_manifest_loop(); + /// Stop archiver. /// /// \return future that will become ready when all async operation will be @@ -80,6 +82,9 @@ class ntp_archiver { ss::future<> stop(); bool upload_loop_stopped() const { return _upload_loop_stopped; } + bool sync_manifest_loop_stopped() const { + return _sync_manifest_loop_stopped; + } /// Get NTP const model::ntp& get_ntp() const; @@ -112,6 +117,8 @@ class ntp_archiver { ss::future upload_next_candidates( std::optional last_stable_offset_override = std::nullopt); + ss::future sync_manifest(); + uint64_t estimate_backlog_size(); /// \brief Probe remote storage and truncate the manifest if needed @@ -166,7 +173,11 @@ class ntp_archiver { /// Launch the upload loop fiber. ss::future<> upload_loop(); + /// Launch the sync manifest loop fiber. + ss::future<> sync_manifest_loop(); + bool upload_loop_can_continue() const; + bool sync_manifest_loop_can_continue() const; ntp_level_probe _probe; model::ntp _ntp; @@ -190,6 +201,7 @@ class ntp_archiver { ss::semaphore _mutex{1}; ss::lowres_clock::duration _upload_loop_initial_backoff; ss::lowres_clock::duration _upload_loop_max_backoff; + config::binding _sync_manifest_timeout; simple_time_jitter _backoff_jitter{100ms}; size_t _concurrency{4}; ss::lowres_clock::time_point _last_upload_time; @@ -197,6 +209,9 @@ class ntp_archiver { ss::io_priority_class _io_priority; bool _upload_loop_started = false; bool _upload_loop_stopped = false; + + bool _sync_manifest_loop_started = false; + bool _sync_manifest_loop_stopped = false; }; } // namespace archival diff --git a/src/v/archival/service.cc b/src/v/archival/service.cc index b0d474c5a089..6060f5d0532f 100644 --- a/src/v/archival/service.cc +++ b/src/v/archival/service.cc @@ -231,48 +231,56 @@ ss::future<> scheduler_service_impl::add_ntp_archiver( if (_gate.is_closed()) { return ss::now(); } - return archiver->download_manifest().then( - [this, archiver](cloud_storage::download_result result) { - auto ntp = archiver->get_ntp(); - switch (result) { - case cloud_storage::download_result::success: - vlog( - _rtclog.info, - "Found manifest for partition {}", - archiver->get_ntp()); - _probe.start_archiving_ntp(); - - _archivers.emplace(archiver->get_ntp(), archiver); - archiver->run_upload_loop(); - - return ss::now(); - case cloud_storage::download_result::notfound: - vlog( - _rtclog.info, - "Start archiving new partition {}", - archiver->get_ntp()); - // Start topic manifest upload - // asynchronously - if (ntp.tp.partition == 0) { - // Upload manifest once per topic. GCS has strict - // limits for single object updates. - (void)upload_topic_manifest( - model::topic_namespace(ntp.ns, ntp.tp.topic), - archiver->get_revision_id()); - } - _probe.start_archiving_ntp(); - - _archivers.emplace(archiver->get_ntp(), archiver); - archiver->run_upload_loop(); + return archiver->download_manifest().then([this, archiver]( + cloud_storage::download_result + result) { + auto ntp = archiver->get_ntp(); + auto part = _partition_manager.local().get(ntp); + switch (result) { + case cloud_storage::download_result::success: + vlog(_rtclog.info, "Found manifest for partition {}", ntp); + + if (part->get_ntp_config().is_read_replica_mode_enabled()) { + archiver->run_sync_manifest_loop(); + } else { + _probe.start_archiving_ntp(); + archiver->run_upload_loop(); + } + _archivers.emplace(ntp, archiver); + + return ss::now(); + case cloud_storage::download_result::notfound: + if (part->get_ntp_config().is_read_replica_mode_enabled()) { + vlog( + _rtclog.info, + "Couldn't download manifest for partition {} in read replica", + ntp); + archiver->run_sync_manifest_loop(); + } else { + vlog(_rtclog.info, "Start archiving new partition {}", ntp); + // Start topic manifest upload + // asynchronously + if (ntp.tp.partition == 0) { + // Upload manifest once per topic. GCS has strict + // limits for single object updates. + (void)upload_topic_manifest( + model::topic_namespace(ntp.ns, ntp.tp.topic), + archiver->get_revision_id()); + } + _probe.start_archiving_ntp(); + + archiver->run_upload_loop(); + } + _archivers.emplace(ntp, archiver); - return ss::now(); - case cloud_storage::download_result::failed: - case cloud_storage::download_result::timedout: - vlog(_rtclog.warn, "Manifest download failed"); - return ss::make_exception_future<>(ss::timed_out_error()); - } - return ss::now(); - }); + return ss::now(); + case cloud_storage::download_result::failed: + case cloud_storage::download_result::timedout: + vlog(_rtclog.warn, "Manifest download failed"); + return ss::make_exception_future<>(ss::timed_out_error()); + } + return ss::now(); + }); } ss::future<> @@ -284,9 +292,13 @@ scheduler_service_impl::create_archivers(std::vector to_create) { std::move(to_create), concurrency, [this](const model::ntp& ntp) { auto log = _partition_manager.local().log(ntp); auto part = _partition_manager.local().get(ntp); - if (log.has_value() && part && part->is_elected_leader() - && (part->get_ntp_config().is_archival_enabled() - || config::shard_local_cfg().cloud_storage_enable_remote_write())) { + if (!log.has_value() || !part || !part->is_elected_leader()) { + return ss::now(); + } + if ( + part->get_ntp_config().is_archival_enabled() + || part->get_ntp_config().is_read_replica_mode_enabled() + || config::shard_local_cfg().cloud_storage_enable_remote_write()) { auto archiver = ss::make_lw_shared( log->config(), _partition_manager.local(), @@ -325,7 +337,9 @@ ss::future<> scheduler_service_impl::reconcile_archivers() { // find archivers that have already stopped for (const auto& [ntp, archiver] : _archivers) { auto p = pm.get(ntp); - if (!p || archiver->upload_loop_stopped()) { + if ( + !p + || (archiver->upload_loop_stopped() && archiver->sync_manifest_loop_stopped())) { to_remove.push_back(ntp); } } diff --git a/src/v/cloud_storage/remote_partition.cc b/src/v/cloud_storage/remote_partition.cc index efd96d33986b..ef90f003ad62 100644 --- a/src/v/cloud_storage/remote_partition.cc +++ b/src/v/cloud_storage/remote_partition.cc @@ -459,6 +459,14 @@ model::offset remote_partition::first_uploaded_offset() { } } +model::offset remote_partition::last_uploaded_offset() { + vassert( + _manifest.size() > 0, + "The manifest for {} is not expected to be empty", + _manifest.get_ntp()); + return _manifest.get_last_offset(); +} + const model::ntp& remote_partition::get_ntp() const { return _manifest.get_ntp(); } diff --git a/src/v/cloud_storage/remote_partition.h b/src/v/cloud_storage/remote_partition.h index f622daec685e..929c9010c858 100644 --- a/src/v/cloud_storage/remote_partition.h +++ b/src/v/cloud_storage/remote_partition.h @@ -180,6 +180,9 @@ class remote_partition /// Return first uploaded kafka offset model::offset first_uploaded_offset(); + /// Return last uploaded kafka offset + model::offset last_uploaded_offset(); + /// Get partition NTP const model::ntp& get_ntp() const; diff --git a/src/v/cluster/partition.h b/src/v/cluster/partition.h index 060153c9cc0e..0b3dc9ad47e3 100644 --- a/src/v/cluster/partition.h +++ b/src/v/cluster/partition.h @@ -211,6 +211,11 @@ class partition { return _archival_meta_stm; } + bool is_read_replica_mode_enabled() const { + const auto& cfg = _raft->log_config(); + return cfg.is_read_replica_mode_enabled(); + } + /// Return true if shadow indexing is enabled for the partition bool is_remote_fetch_enabled() const { const auto& cfg = _raft->log_config(); @@ -232,17 +237,28 @@ class partition { model::offset start_cloud_offset() const { vassert( cloud_data_available(), - "Method can only be called if cloud data is available"); + "Method can only be called if cloud data is available, ntp: {}", + _raft->ntp()); return _cloud_storage_partition->first_uploaded_offset(); } + /// Last available cloud offset + model::offset last_cloud_offset() const { + vassert( + cloud_data_available(), + "Method can only be called if cloud data is available, ntp: {}", + _raft->ntp()); + return _cloud_storage_partition->last_uploaded_offset(); + } + /// Create a reader that will fetch data from remote storage ss::future make_cloud_reader( storage::log_reader_config config, std::optional deadline = std::nullopt) { vassert( cloud_data_available(), - "Method can only be called if cloud data is available"); + "Method can only be called if cloud data is available, ntp: {}", + _raft->ntp()); return _cloud_storage_partition->make_reader(config, deadline); } diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 474bfb6a2d9e..70888c5609c8 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -55,7 +55,8 @@ bool topic_properties::has_overrides() const { return cleanup_policy_bitflags || compaction_strategy || segment_size || retention_bytes.has_value() || retention_bytes.is_disabled() || retention_duration.has_value() || retention_duration.is_disabled() - || recovery.has_value() || shadow_indexing.has_value(); + || recovery.has_value() || shadow_indexing.has_value() + || read_replica.has_value(); } storage::ntp_config::default_overrides @@ -69,6 +70,7 @@ topic_properties::get_ntp_cfg_overrides() const { ret.shadow_indexing_mode = shadow_indexing ? *shadow_indexing : model::shadow_indexing_mode::disabled; + ret.read_replica = read_replica; return ret; } @@ -101,7 +103,8 @@ storage::ntp_config topic_configuration::make_ntp_config( properties.recovery ? *properties.recovery : false), .shadow_indexing_mode = properties.shadow_indexing ? *properties.shadow_indexing - : model::shadow_indexing_mode::disabled}); + : model::shadow_indexing_mode::disabled, + .read_replica = properties.read_replica}); } return { model::ntp(tp_ns.ns, tp_ns.tp, p_id), diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index bce4f0f6891b..a6b2c0274dbc 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -1029,6 +1029,13 @@ configuration::configuration() "remote storage (sec)", {.visibility = visibility::tunable}, std::nullopt) + , cloud_storage_readreplica_manifest_sync_timeout_ms( + *this, + "cloud_storage_readreplica_manifest_sync_timeout_ms", + "Timeout to check if new data is available for partition in S3 for read " + "replica", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + 30s) , cloud_storage_upload_ctrl_update_interval_ms( *this, "cloud_storage_upload_ctrl_update_interval_ms", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 8cb40c1c28d2..10fa20a4e7d0 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -218,6 +218,8 @@ struct configuration final : public config_store { cloud_storage_max_connection_idle_time_ms; property> cloud_storage_segment_max_upload_interval_sec; + property + cloud_storage_readreplica_manifest_sync_timeout_ms; // Archival upload controller property diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index d24fe5fed21b..6c2583f6b29f 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -38,6 +38,15 @@ replicated_partition::replicated_partition( ss::future replicated_partition::make_reader( storage::log_reader_config cfg, std::optional deadline) { + if ( + _partition->is_read_replica_mode_enabled() + && _partition->cloud_data_available()) { + // No need to translate the offsets in this case since all fetch + // requests in read replica are served via remote_partition which + // does its own translation. + co_return co_await _partition->make_cloud_reader(cfg); + } + auto local_kafka_start_offset = _translator->from_log_offset( _partition->start_offset()); if ( diff --git a/src/v/kafka/server/replicated_partition.h b/src/v/kafka/server/replicated_partition.h index 5c4379762f68..187a2f4e89ce 100644 --- a/src/v/kafka/server/replicated_partition.h +++ b/src/v/kafka/server/replicated_partition.h @@ -37,6 +37,13 @@ class replicated_partition final : public kafka::partition_proxy::impl { const model::ntp& ntp() const final { return _partition->ntp(); } model::offset start_offset() const final { + if ( + _partition->is_read_replica_mode_enabled() + && _partition->cloud_data_available()) { + // Always assume remote read in this case. + return _partition->start_cloud_offset(); + } + auto local_kafka_start_offset = _translator->from_log_offset( _partition->start_offset()); if ( @@ -49,10 +56,25 @@ class replicated_partition final : public kafka::partition_proxy::impl { } model::offset high_watermark() const final { + if (_partition->is_read_replica_mode_enabled()) { + if (_partition->cloud_data_available()) { + return model::next_offset(_partition->last_cloud_offset()); + } else { + return model::offset(0); + } + } return _translator->from_log_offset(_partition->high_watermark()); } model::offset last_stable_offset() const final { + if (_partition->is_read_replica_mode_enabled()) { + if (_partition->cloud_data_available()) { + // There is no difference between HWM and LO in this mode + return model::next_offset(_partition->last_cloud_offset()); + } else { + return model::offset(0); + } + } return _translator->from_log_offset(_partition->last_stable_offset()); } diff --git a/src/v/kafka/server/tests/CMakeLists.txt b/src/v/kafka/server/tests/CMakeLists.txt index 875fc931d94b..3630ba7e10ff 100644 --- a/src/v/kafka/server/tests/CMakeLists.txt +++ b/src/v/kafka/server/tests/CMakeLists.txt @@ -14,7 +14,6 @@ rp_test( ) set(srcs - s3_imposter_fixture.cc consumer_groups_test.cc member_test.cc group_test.cc diff --git a/src/v/kafka/server/tests/create_topics_test.cc b/src/v/kafka/server/tests/create_topics_test.cc index afb0532b41ee..8aac8de3fbf7 100644 --- a/src/v/kafka/server/tests/create_topics_test.cc +++ b/src/v/kafka/server/tests/create_topics_test.cc @@ -12,23 +12,16 @@ #include "kafka/server/handlers/topics/types.h" #include "redpanda/tests/fixture.h" #include "resource_mgmt/io_priority.h" -#include "s3_imposter_fixture.h" #include #include #include #include -#include - -inline ss::logger test_log("test"); // NOLINT // rougly equivalent to the test harness: // https://github.com/apache/kafka/blob/8e16158/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala -class create_topic_fixture - : public s3_imposter_fixture - , public enable_cloud_storage_fixture - , public redpanda_thread_fixture { +class create_topic_fixture : public redpanda_thread_fixture { public: kafka::create_topics_request make_req( std::vector topics, bool validate_only = false) { @@ -97,18 +90,11 @@ class create_topic_fixture void test_create_topic( kafka::create_topics_request req, - std::optional partition_count = std::nullopt, - std::optional revision_id = std::nullopt, kafka::api_version version = kafka::api_version(2)) { auto client = make_kafka_client().get0(); client.connect().get(); auto resp = client.dispatch(req, version).get0(); - // todo: here - for (auto req : get_requests()) { - vlog(test_log.info, "{} {}", req._method, req._url); - } - BOOST_REQUIRE_MESSAGE( std::all_of( std::cbegin(resp.data.topics), @@ -119,7 +105,7 @@ class create_topic_fixture fmt::format("expected no errors. received response: {}", resp)); for (auto& topic : req.data.topics) { - verify_metadata(client, req, topic, partition_count, revision_id); + verify_metadata(client, req, topic); auto it = std::find_if( resp.data.topics.begin(), @@ -140,11 +126,6 @@ class create_topic_fixture client.stop().then([&client] { client.shutdown(); }).get(); } - void test_create_read_replica_topic( - kafka::create_topics_request req, int partition_count, int revision_id) { - test_create_topic(req, partition_count, revision_id); - } - void verify_response( const kafka::creatable_topic& req, const kafka::creatable_topic_result& topic_res, @@ -223,9 +204,7 @@ class create_topic_fixture void verify_metadata( kafka::client::transport& client, kafka::create_topics_request& create_req, - kafka::creatable_topic& request_topic, - std::optional partition_count = std::nullopt, - std::optional revision_id = std::nullopt) { + kafka::creatable_topic& request_topic) { // query the server for this topic's metadata kafka::metadata_request metadata_req; metadata_req.data.topics @@ -248,9 +227,7 @@ class create_topic_fixture "expected topic not returned from metadata query"); int partitions; - if (partition_count) { - partitions = partition_count.value(); - } else if (!request_topic.assignments.empty()) { + if (!request_topic.assignments.empty()) { partitions = request_topic.assignments.size(); } else { partitions = request_topic.num_partitions; @@ -375,38 +352,6 @@ FIXTURE_TEST(create_non_replicable_topics, create_topic_fixture) { BOOST_CHECK(resp[1].tp_ns.tp() == "topic2"); } -FIXTURE_TEST(read_replica, create_topic_fixture) { - ss::sstring manifest_url = ssx::sformat( - "/f0000000/meta/kafka/test-topic/topic_manifest.json"); - - std::string_view manifest_payload = R"json({ - "version": 1, - "namespace": "kafka", - "topic": "test-topic", - "partition_count": 32, - "replication_factor": 3, - "revision_id": 10, - "compression": null, - "cleanup_policy_bitflags": null, - "compaction_strategy": null, - "timestamp_type": null, - "segment_size": null - })json"; - - set_expectations_and_listen({expectation{ - .url = manifest_url, .body = ss::sstring(manifest_payload)}}); - - auto topic = make_topic( - "test-topic", - std::nullopt, - std::nullopt, - std::map{ - {"redpanda.remote.readreplica", "true"}, - {"redpanda.remote.readreplica.bucket", "panda-bucket"}}); - - test_create_read_replica_topic(make_req({topic}), 32, 10); -} - FIXTURE_TEST(s3bucket_is_missing, create_topic_fixture) { auto topic = make_topic( "topic1", @@ -494,7 +439,5 @@ FIXTURE_TEST(test_v5_validate_configs_resp, create_topic_fixture) { {make_topic("topicC", 3, 1, config_map), make_topic("topicD", 3, 1, config_map)}, false), - std::nullopt, - std::nullopt, kafka::api_version(5)); } diff --git a/src/v/kafka/server/tests/s3_imposter_fixture.cc b/src/v/kafka/server/tests/s3_imposter_fixture.cc deleted file mode 100644 index 5a45621dfa6f..000000000000 --- a/src/v/kafka/server/tests/s3_imposter_fixture.cc +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Copyright 2022 Redpanda Data, Inc. - * - * Licensed as a Redpanda Enterprise file under the Redpanda Community - * License (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md - */ - -#include "s3_imposter_fixture.h" - -#include "bytes/iobuf.h" -#include "bytes/iobuf_parser.h" -#include "config/configuration.h" -#include "seastarx.h" -#include "test_utils/async.h" - -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -using namespace std::chrono_literals; - -inline ss::logger fixt_log("fixture"); // NOLINT - -static constexpr int16_t httpd_port_number = 4430; -static constexpr const char* httpd_host_name = "127.0.0.1"; - -s3_imposter_fixture::s3_imposter_fixture() { - _server = ss::make_shared(); - _server->start().get(); - ss::ipv4_addr ip_addr = {httpd_host_name, httpd_port_number}; - _server_addr = ss::socket_address(ip_addr); -} - -s3_imposter_fixture::~s3_imposter_fixture() { _server->stop().get(); } - -const std::vector& -s3_imposter_fixture::get_requests() const { - return _requests; -} - -const std::multimap& -s3_imposter_fixture::get_targets() const { - return _targets; -} - -void s3_imposter_fixture::set_expectations_and_listen( - const std::vector& expectations) { - _server - ->set_routes([this, &expectations](ss::httpd::routes& r) { - set_routes(r, expectations); - }) - .get(); - _server->listen(_server_addr).get(); -} - -void s3_imposter_fixture::set_routes( - ss::httpd::routes& r, - const std::vector& expectations) { - using namespace ss::httpd; - struct content_handler { - content_handler( - const std::vector& exp, s3_imposter_fixture& imp) - : fixture(imp) { - for (const auto& e : exp) { - expectations[e.url] = e; - } - } - ss::sstring handle(const_req request, reply& repl) { - static const ss::sstring error_payload - = R"xml( - - NoSuchKey - Object not found - resource - requestid - )xml"; - fixture._requests.push_back(request); - fixture._targets.insert(std::make_pair(request._url, request)); - vlog( - fixt_log.trace, - "S3 imposter request {} - {} - {}", - request._url, - request.content_length, - request._method); - if (request._method == "GET") { - auto it = expectations.find(request._url); - if (it == expectations.end() || !it->second.body.has_value()) { - vlog(fixt_log.trace, "Reply GET request with error"); - repl.set_status(reply::status_type::not_found); - return error_payload; - } - return *it->second.body; - } else if (request._method == "PUT") { - expectations[request._url] = { - .url = request._url, .body = request.content}; - return ""; - } else if (request._method == "DELETE") { - auto it = expectations.find(request._url); - if (it == expectations.end() || !it->second.body.has_value()) { - vlog(fixt_log.trace, "Reply DELETE request with error"); - repl.set_status(reply::status_type::not_found); - return error_payload; - } - repl.set_status(reply::status_type::no_content); - it->second.body = std::nullopt; - return ""; - } else if (request._method == "HEAD") { - auto it = expectations.find(request._url); - if (it == expectations.end() || !it->second.body.has_value()) { - vlog(fixt_log.trace, "Reply HEAD request with error"); - repl.add_header("x-amz-request-id", "placeholder-id"); - repl.set_status(reply::status_type::not_found); - } else { - repl.add_header("ETag", "placeholder-etag"); - repl.add_header( - "Content-Length", - ssx::sformat("{}", it->second.body->size())); - repl.set_status(reply::status_type::ok); - } - vlog( - fixt_log.trace, - "S3 imposter response: {}", - repl.response_line()); - return ""; - } - BOOST_FAIL("Unexpected request"); - return ""; - } - std::map expectations; - s3_imposter_fixture& fixture; - }; - auto hd = ss::make_shared(expectations, *this); - _handler = std::make_unique( - [hd](const_req req, reply& repl) { return hd->handle(req, repl); }, - "txt"); - r.add_default_handler(_handler.get()); -} - -enable_cloud_storage_fixture::enable_cloud_storage_fixture() { - ss::smp::invoke_on_all([]() { - auto& cfg = config::shard_local_cfg(); - cfg.cloud_storage_enabled.set_value(true); - cfg.cloud_storage_disable_tls.set_value(true); - cfg.cloud_storage_api_endpoint.set_value( - std::optional{httpd_host_name}); - cfg.cloud_storage_api_endpoint_port.set_value(httpd_port_number); - cfg.cloud_storage_access_key.set_value( - std::optional{"access-key"}); - cfg.cloud_storage_secret_key.set_value( - std::optional{"secret-key"}); - cfg.cloud_storage_region.set_value( - std::optional{"us-east-1"}); - cfg.cloud_storage_bucket.set_value( - std::optional{"test-bucket"}); - }).get0(); -} - -enable_cloud_storage_fixture::~enable_cloud_storage_fixture() { - config::shard_local_cfg().cloud_storage_enabled.set_value(false); -} diff --git a/src/v/kafka/server/tests/s3_imposter_fixture.h b/src/v/kafka/server/tests/s3_imposter_fixture.h deleted file mode 100644 index 73f90fd61ba7..000000000000 --- a/src/v/kafka/server/tests/s3_imposter_fixture.h +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright 2022 Redpanda Data, Inc. - * - * Licensed as a Redpanda Enterprise file under the Redpanda Community - * License (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md - */ - -#pragma once - -#include "seastarx.h" -#include "ssx/sformat.h" - -#include -#include -#include -#include - -#include -#include -#include -#include - -// TODO(https://github.com/redpanda-data/redpanda/issues/5240): -// Move s3_imposter_fixture to the common place and use one implementation -// throughout the code base - -/// Emulates S3 REST API for testing purposes. -/// The imposter is a simple KV-store that contains a set of expectations. -/// Expectations are accessible by url via GET, PUT, and DELETE http calls. -/// Expectations are provided before impster starts to listen. They have -/// two field - url and optional body. If body is set to nullopt, attemtp -/// to read it using GET or delete it using DELETE requests will trigger an -/// http response with error code 404 and xml formatted error message. -/// If the body of the expectation is set by the user or PUT request it can -/// be retrieved using the GET request or deleted using the DELETE request. -class s3_imposter_fixture { -public: - s3_imposter_fixture(); - ~s3_imposter_fixture(); - - s3_imposter_fixture(const s3_imposter_fixture&) = delete; - s3_imposter_fixture& operator=(const s3_imposter_fixture&) = delete; - s3_imposter_fixture(s3_imposter_fixture&&) = delete; - s3_imposter_fixture& operator=(s3_imposter_fixture&&) = delete; - - struct expectation { - ss::sstring url; - std::optional body; - }; - - /// Set expectaitions on REST API calls that supposed to be made - /// Only the requests that described in this call will be possible - /// to make. This method can only be called once per test run. - /// - /// \param expectations is a collection of access points that allow GET, - /// PUT, and DELETE requests, each expectation has url and body. The body - /// will be returned by GET call if it's set or trigger error if its null. - /// The expectations are statefull. If the body of the expectation was set - /// to null but there was PUT call that sent some data, subsequent GET call - /// will retrieve this data. - void - set_expectations_and_listen(const std::vector& expectations); - - /// Access all http requests ordered by time - const std::vector& get_requests() const; - - /// Access all http requests ordered by target url - const std::multimap& get_targets() const; - - // static s3::configuration get_configuration(); - -private: - void set_routes( - ss::httpd::routes& r, const std::vector& expectations); - - ss::socket_address _server_addr; - ss::shared_ptr _server; - - std::unique_ptr _handler; - /// Contains saved requests - std::vector _requests; - /// Contains all accessed target urls - std::multimap _targets; -}; - -class enable_cloud_storage_fixture { -public: - enable_cloud_storage_fixture(); - ~enable_cloud_storage_fixture(); -}; diff --git a/src/v/storage/ntp_config.h b/src/v/storage/ntp_config.h index 4a0e6113a057..b9e205840e36 100644 --- a/src/v/storage/ntp_config.h +++ b/src/v/storage/ntp_config.h @@ -47,6 +47,8 @@ class ntp_config { model::shadow_indexing_mode shadow_indexing_mode = model::shadow_indexing_mode::disabled; + std::optional read_replica; + friend std::ostream& operator<<(std::ostream&, const default_overrides&); }; @@ -150,6 +152,11 @@ class ntp_config { && model::is_fetch_enabled(_overrides->shadow_indexing_mode); } + bool is_read_replica_mode_enabled() const { + return _overrides != nullptr && _overrides->read_replica + && _overrides->read_replica.value(); + } + private: model::ntp _ntp; /// \brief currently this is the basedir. In the future diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index e9e36ff794fa..e7109b4fe857 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -222,23 +222,24 @@ class SISettings: GLOBAL_S3_REGION_KEY = "s3_region" def __init__( - self, - *, - log_segment_size: int = 16 * 1000000, - cloud_storage_access_key: str = 'panda-user', - cloud_storage_secret_key: str = 'panda-secret', - cloud_storage_region: str = 'panda-region', - cloud_storage_bucket: Optional[str] = None, - cloud_storage_api_endpoint: str = 'minio-s3', - cloud_storage_api_endpoint_port: int = 9000, - cloud_storage_cache_size: int = 160 * 1000000, - cloud_storage_enable_remote_read: bool = True, - cloud_storage_enable_remote_write: bool = True, - cloud_storage_reconciliation_interval_ms: Optional[int] = None, - cloud_storage_max_connections: Optional[int] = None, - cloud_storage_disable_tls: bool = True, - cloud_storage_segment_max_upload_interval_sec: Optional[int] = None - ): + self, + *, + log_segment_size: int = 16 * 1000000, + cloud_storage_access_key: str = 'panda-user', + cloud_storage_secret_key: str = 'panda-secret', + cloud_storage_region: str = 'panda-region', + cloud_storage_bucket: Optional[str] = None, + cloud_storage_api_endpoint: str = 'minio-s3', + cloud_storage_api_endpoint_port: int = 9000, + cloud_storage_cache_size: int = 160 * 1000000, + cloud_storage_enable_remote_read: bool = True, + cloud_storage_enable_remote_write: bool = True, + cloud_storage_reconciliation_interval_ms: Optional[int] = None, + cloud_storage_max_connections: Optional[int] = None, + cloud_storage_disable_tls: bool = True, + cloud_storage_segment_max_upload_interval_sec: Optional[int] = None, + cloud_storage_readreplica_manifest_sync_timeout_ms: Optional[ + int] = None): self.log_segment_size = log_segment_size self.cloud_storage_access_key = cloud_storage_access_key self.cloud_storage_secret_key = cloud_storage_secret_key @@ -253,6 +254,7 @@ def __init__( self.cloud_storage_max_connections = cloud_storage_max_connections self.cloud_storage_disable_tls = cloud_storage_disable_tls self.cloud_storage_segment_max_upload_interval_sec = cloud_storage_segment_max_upload_interval_sec + self.cloud_storage_readreplica_manifest_sync_timeout_ms = cloud_storage_readreplica_manifest_sync_timeout_ms self.endpoint_url = f'http://{self.cloud_storage_api_endpoint}:{self.cloud_storage_api_endpoint_port}' def load_context(self, logger, test_context): @@ -309,6 +311,9 @@ def update_rp_conf(self, conf) -> dict[str, Any]: if self.cloud_storage_max_connections: conf[ 'cloud_storage_max_connections'] = self.cloud_storage_max_connections + if self.cloud_storage_readreplica_manifest_sync_timeout_ms: + conf[ + 'cloud_storage_readreplica_manifest_sync_timeout_ms'] = self.cloud_storage_readreplica_manifest_sync_timeout_ms if self.cloud_storage_segment_max_upload_interval_sec: conf[ 'cloud_storage_segment_max_upload_interval_sec'] = self.cloud_storage_segment_max_upload_interval_sec @@ -572,7 +577,7 @@ def get_node_memory_mb(self): memory_kb = int(line.strip().split()[1]) return memory_kb / 1024 - def start(self, nodes=None, clean_nodes=True): + def start(self, nodes=None, clean_nodes=True, start_si=True): """Start the service on all nodes.""" to_start = nodes if nodes is not None else self.nodes assert all((node in self.nodes for node in to_start)) @@ -647,7 +652,7 @@ def start(self, nodes=None, clean_nodes=True): request_timeout_ms=30000, api_version_auto_timeout_ms=3000) - if self._si_settings is not None: + if start_si and self._si_settings is not None: self.start_si() def write_tls_certs(self): @@ -925,6 +930,8 @@ def start_si(self): logger=self.logger, ) + self.logger.debug( + f"Creating S3 bucket: {self._si_settings.cloud_storage_bucket}") self._s3client.create_bucket(self._si_settings.cloud_storage_bucket) def list_buckets(self) -> dict[str, Union[list, dict]]: diff --git a/tests/rptest/tests/end_to_end.py b/tests/rptest/tests/end_to_end.py index ecdda9679014..594be6b244d3 100644 --- a/tests/rptest/tests/end_to_end.py +++ b/tests/rptest/tests/end_to_end.py @@ -86,7 +86,8 @@ def start_redpanda(self, self.redpanda = RedpandaService(self.test_context, num_nodes, extra_rp_conf=self._extra_rp_conf, - extra_node_conf=self._extra_node_conf) + extra_node_conf=self._extra_node_conf, + si_settings=self.si_settings) self.redpanda.start() self._client = DefaultClient(self.redpanda) @@ -153,8 +154,8 @@ def has_finished_consuming(): wait_until(has_finished_consuming, timeout_sec=timeout_sec, - err_msg="Consumer failed to consume up to offsets %s after waiting %ds." %\ - (str(last_acked_offsets), timeout_sec)) + err_msg="Consumer failed to consume up to offsets %s after waiting %ds, last consumed offsets: %s." %\ + (str(last_acked_offsets), timeout_sec, list(self.last_consumed_offsets))) def _collect_all_logs(self): for s in self.test_context.services: diff --git a/tests/rptest/tests/read_replica_e2e_test.py b/tests/rptest/tests/read_replica_e2e_test.py new file mode 100644 index 000000000000..651d8e1c26ab --- /dev/null +++ b/tests/rptest/tests/read_replica_e2e_test.py @@ -0,0 +1,133 @@ +# Copyright 2022 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 +from rptest.services.cluster import cluster + +from rptest.clients.default import DefaultClient +from rptest.services.redpanda import SISettings +from rptest.clients.rpk import RpkTool +from rptest.clients.types import TopicSpec +from ducktape.mark import matrix + +import json + +from rptest.services.redpanda import RedpandaService +from rptest.tests.end_to_end import EndToEndTest +from rptest.services.verifiable_producer import VerifiableProducer, is_int_with_prefix +from rptest.services.verifiable_consumer import VerifiableConsumer +from rptest.util import ( + wait_until, ) + + +class TestReadReplicaService(EndToEndTest): + log_segment_size = 1048576 # 5MB + topic_name = "panda-topic" + s3_bucket_name = "panda-bucket" + si_settings = SISettings( + cloud_storage_bucket=s3_bucket_name, + cloud_storage_reconciliation_interval_ms=500, + cloud_storage_max_connections=5, + log_segment_size=log_segment_size, + cloud_storage_readreplica_manifest_sync_timeout_ms=500, + cloud_storage_segment_max_upload_interval_sec=5) + + def __init__(self, test_context): + super(TestReadReplicaService, self).__init__(test_context=test_context) + self.second_cluster = None + + def create_read_replica_topic(self): + self.second_cluster = RedpandaService(self.test_context, + num_brokers=3, + si_settings=self.si_settings) + self.second_cluster.start(start_si=False) + + rpk_second_cluster = RpkTool(self.second_cluster) + conf = { + 'redpanda.remote.readreplica': 'true', + 'redpanda.remote.readreplica.bucket': self.s3_bucket_name, + } + rpk_second_cluster.create_topic(self.topic_name, config=conf) + + def start_consumer(self): + self.consumer = VerifiableConsumer( + self.test_context, + num_nodes=1, + redpanda=self.second_cluster, + topic=self.topic_name, + group_id='consumer_test_group', + on_record_consumed=self.on_record_consumed) + self.consumer.start() + + def start_producer(self): + self.producer = VerifiableProducer( + self.test_context, + num_nodes=1, + redpanda=self.redpanda, + topic=self.topic_name, + throughput=1000, + message_validator=is_int_with_prefix) + self.producer.start() + + @cluster(num_nodes=8) + @matrix(partition_count=[10], min_records=[10000]) + def test_simple_end_to_end(self, partition_count, min_records): + # Create original topic, produce data to it + self.start_redpanda(3, si_settings=self.si_settings) + spec = TopicSpec(name=self.topic_name, + partition_count=partition_count, + replication_factor=3) + + DefaultClient(self.redpanda).create_topic(spec) + + self.start_producer() + wait_until(lambda: self.producer.num_acked > min_records, + timeout_sec=30, + err_msg="Producer failed to produce messages for %ds." %\ + 30) + self.logger.info("Stopping producer after writing up to offsets %s" %\ + str(self.producer.last_acked_offsets)) + self.producer.stop() + + # Make original topic upload data to S3 + rpk = RpkTool(self.redpanda) + rpk.alter_topic_config(spec.name, 'redpanda.remote.write', 'true') + + # Make sure all produced data is uploaded to S3 + def s3_has_all_data(): + objects = list( + self.redpanda._s3client.list_objects(self.s3_bucket_name)) + total_uploaded = 0 + for o in objects: + if o.Key.endswith( + "/manifest.json") and self.topic_name in o.Key: + data = self.redpanda._s3client.get_object_data( + self.s3_bucket_name, o.Key) + manifest = json.loads(data) + last_upl_offset = manifest['last_offset'] + total_uploaded += last_upl_offset + self.logger.info( + f"Found manifest at {o.Key}, last_offset is {last_upl_offset}" + ) + self.logger.info( + f"Total uploaded: {total_uploaded}, num_acked: {self.producer.num_acked}" + ) + return total_uploaded >= self.producer.num_acked + + wait_until( + s3_has_all_data, + timeout_sec= + 30, #should be uploaded since cloud_storage_segment_max_upload_interval_sec=5 + backoff_sec=5, + err_msg= + f"Not all data is uploaded to S3 bucket, is S3 bucket: {list(self.redpanda._s3client.list_objects(self.s3_bucket_name))}" + ) + + # Create read replica topic, consume from it and validate + self.create_read_replica_topic() + self.start_consumer() + self.run_validation()