Skip to content

Commit

Permalink
Merge pull request #7935 from VladLazar/abs-part-1
Browse files Browse the repository at this point in the history
cloud_storage: Azure Blob Storage support prerequisites
  • Loading branch information
jcsp committed Jan 5, 2023
2 parents c30c079 + 40d1c8a commit 25f76b2
Show file tree
Hide file tree
Showing 26 changed files with 421 additions and 354 deletions.
20 changes: 9 additions & 11 deletions src/v/archival/tests/service_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,15 @@ std::tuple<
cloud_storage::configuration>
get_configurations() {
net::unresolved_address server_addr(httpd_host_name, httpd_port_number);
cloud_storage_clients::configuration s3conf{
.uri = cloud_storage_clients::access_point_uri(httpd_host_name),
.access_key = cloud_roles::public_key_str("acess-key"),
.secret_key = cloud_roles::private_key_str("secret-key"),
.region = cloud_roles::aws_region_name("us-east-1"),
._probe = ss::make_shared(cloud_storage_clients::client_probe(
net::metrics_disabled::yes,
net::public_metrics_disabled::yes,
"",
""))};
cloud_storage_clients::s3_configuration s3conf;
s3conf.uri = cloud_storage_clients::access_point_uri(httpd_host_name);
s3conf.access_key = cloud_roles::public_key_str("acess-key");
s3conf.secret_key = cloud_roles::private_key_str("secret-key");
s3conf.region = cloud_roles::aws_region_name("us-east-1");
s3conf._probe = ss::make_shared(cloud_storage_clients::client_probe(
net::metrics_disabled::yes, net::public_metrics_disabled::yes, "", ""));
s3conf.server_addr = server_addr;

archival::configuration aconf;
aconf.bucket_name = cloud_storage_clients::bucket_name("test-bucket");
aconf.ntp_metrics_disabled = archival::per_ntp_metrics_disabled::yes;
Expand All @@ -158,7 +156,7 @@ get_configurations() {
cloud_storage::configuration cconf;
cconf.client_config = s3conf;
cconf.bucket_name = cloud_storage_clients::bucket_name("test-bucket");
cconf.connection_limit = archival::s3_connection_limit(2);
cconf.connection_limit = archival::connection_limit(2);
cconf.metrics_disabled = cloud_storage::remote_metrics_disabled::yes;
cconf.cloud_credentials_source
= model::cloud_credentials_source::config_file;
Expand Down
2 changes: 1 addition & 1 deletion src/v/archival/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@

namespace archival {

using cloud_storage::connection_limit;
using cloud_storage::local_segment_path;
using cloud_storage::remote_manifest_path;
using cloud_storage::remote_segment_path;
using cloud_storage::s3_connection_limit;
using cloud_storage::segment_name;

using service_metrics_disabled
Expand Down
6 changes: 3 additions & 3 deletions src/v/cloud_storage/remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ namespace cloud_storage {
using namespace std::chrono_literals;

remote::remote(
s3_connection_limit limit,
connection_limit limit,
const cloud_storage_clients::client_configuration& conf,
model::cloud_credentials_source cloud_credentials_source)
: _pool(limit(), conf)
Expand Down Expand Up @@ -677,7 +677,7 @@ void auth_refresh_bg_op::do_start_auth_refresh_op(
[](const auto& cfg) {
using cfg_type = std::decay_t<decltype(cfg)>;
if constexpr (std::is_same_v<
cloud_storage_clients::configuration,
cloud_storage_clients::s3_configuration,
cfg_type>) {
return cloud_roles::aws_region_name{cfg.region};
} else {
Expand Down Expand Up @@ -721,7 +721,7 @@ cloud_roles::credentials auth_refresh_bg_op::build_static_credentials() const {
[](const auto& cfg) {
using cfg_type = std::decay_t<decltype(cfg)>;
if constexpr (std::is_same_v<
cloud_storage_clients::configuration,
cloud_storage_clients::s3_configuration,
cfg_type>) {
return cloud_roles::aws_credentials{
cfg.access_key.value(),
Expand Down
2 changes: 1 addition & 1 deletion src/v/cloud_storage/remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class remote : public ss::peering_sharded_service<remote> {
/// \param limit is a number of simultaneous connections
/// \param conf is an S3 configuration
remote(
s3_connection_limit limit,
connection_limit limit,
const cloud_storage_clients::client_configuration& conf,
model::cloud_credentials_source cloud_credentials_source);

Expand Down
16 changes: 8 additions & 8 deletions src/v/cloud_storage/tests/remote_partition_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ static model::record_batch_header read_single_batch_from_remote_partition(
cloud_storage_fixture& fixture, model::offset target) {
auto conf = fixture.get_configuration();
static auto bucket = cloud_storage_clients::bucket_name("bucket");
remote api(s3_connection_limit(10), conf, config_file);
remote api(connection_limit(10), conf, config_file);
api.start().get();
auto action = ss::defer([&api] { api.stop().get(); });
auto m = ss::make_lw_shared<cloud_storage::partition_manifest>(
Expand Down Expand Up @@ -642,7 +642,7 @@ static std::vector<model::record_batch_header> scan_remote_partition(
config::shard_local_cfg().cloud_storage_max_readers_per_shard(
maybe_max_readers);
}
remote api(s3_connection_limit(10), conf, config_file);
remote api(connection_limit(10), conf, config_file);
api.start().get();
auto action = ss::defer([&api] { api.stop().get(); });
auto m = ss::make_lw_shared<cloud_storage::partition_manifest>(
Expand Down Expand Up @@ -1198,7 +1198,7 @@ scan_remote_partition_incrementally(
config::shard_local_cfg().cloud_storage_max_readers_per_shard(
maybe_max_readers);
}
remote api(s3_connection_limit(10), conf, config_file);
remote api(connection_limit(10), conf, config_file);
api.start().get();
auto action = ss::defer([&api] { api.stop().get(); });
auto m = ss::make_lw_shared<cloud_storage::partition_manifest>(
Expand Down Expand Up @@ -1303,7 +1303,7 @@ FIXTURE_TEST(test_remote_partition_read_cached_index, cloud_storage_fixture) {

auto conf = get_configuration();
auto bucket = cloud_storage_clients::bucket_name("bucket");
remote api(s3_connection_limit(10), conf, config_file);
remote api(connection_limit(10), conf, config_file);
api.start().get();
auto action = ss::defer([&api] { api.stop().get(); });
auto m = ss::make_lw_shared<cloud_storage::partition_manifest>(
Expand Down Expand Up @@ -1401,7 +1401,7 @@ FIXTURE_TEST(test_remote_partition_concurrent_truncate, cloud_storage_fixture) {
// create a reader that consumes segments one by one
auto conf = get_configuration();
static auto bucket = cloud_storage_clients::bucket_name("bucket");
remote api(s3_connection_limit(10), conf, config_file);
remote api(connection_limit(10), conf, config_file);
api.start().get();
auto action = ss::defer([&api] { api.stop().get(); });

Expand Down Expand Up @@ -1506,7 +1506,7 @@ FIXTURE_TEST(
// create a reader that consumes segments one by one
auto conf = get_configuration();
static auto bucket = cloud_storage_clients::bucket_name("bucket");
remote api(s3_connection_limit(10), conf, config_file);
remote api(connection_limit(10), conf, config_file);
api.start().get();
auto action = ss::defer([&api] { api.stop().get(); });

Expand Down Expand Up @@ -1595,7 +1595,7 @@ FIXTURE_TEST(
// create a reader that consumes segments one by one
auto conf = get_configuration();
static auto bucket = cloud_storage_clients::bucket_name("bucket");
remote api(s3_connection_limit(10), conf, config_file);
remote api(connection_limit(10), conf, config_file);
api.start().get();
auto action = ss::defer([&api] { api.stop().get(); });

Expand Down Expand Up @@ -1695,7 +1695,7 @@ scan_remote_partition_incrementally_with_reuploads(
config::shard_local_cfg().cloud_storage_max_readers_per_shard(
maybe_max_readers);
}
remote api(s3_connection_limit(10), conf, config_file);
remote api(connection_limit(10), conf, config_file);
api.start().get();
auto action = ss::defer([&api] { api.stop().get(); });
auto m = ss::make_lw_shared<cloud_storage::partition_manifest>(
Expand Down
10 changes: 5 additions & 5 deletions src/v/cloud_storage/tests/remote_segment_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ FIXTURE_TEST(
set_expectations_and_listen({});
auto conf = get_configuration();
auto bucket = cloud_storage_clients::bucket_name("bucket");
remote remote(s3_connection_limit(10), conf, config_file);
remote remote(connection_limit(10), conf, config_file);
partition_manifest m(manifest_ntp, manifest_revision);
auto key = model::offset(1);
model::initial_revision_id segment_ntp_revision{777};
Expand Down Expand Up @@ -124,7 +124,7 @@ FIXTURE_TEST(
FIXTURE_TEST(test_remote_segment_timeout, cloud_storage_fixture) { // NOLINT
auto conf = get_configuration();
auto bucket = cloud_storage_clients::bucket_name("bucket");
remote remote(s3_connection_limit(10), conf, config_file);
remote remote(connection_limit(10), conf, config_file);
partition_manifest m(manifest_ntp, manifest_revision);
auto name = segment_name("7-8-v1.log");
auto key = parse_segment_name(name).value();
Expand Down Expand Up @@ -155,7 +155,7 @@ FIXTURE_TEST(
set_expectations_and_listen({});
auto conf = get_configuration();
auto bucket = cloud_storage_clients::bucket_name("bucket");
remote remote(s3_connection_limit(10), conf, config_file);
remote remote(connection_limit(10), conf, config_file);
partition_manifest m(manifest_ntp, manifest_revision);
auto key = model::offset(1);
iobuf segment_bytes = generate_segment(model::offset(1), 100);
Expand Down Expand Up @@ -241,7 +241,7 @@ void test_remote_segment_batch_reader(
fixture.set_expectations_and_listen({});
auto conf = fixture.get_configuration();
auto bucket = cloud_storage_clients::bucket_name("bucket");
remote remote(s3_connection_limit(10), conf, config_file);
remote remote(connection_limit(10), conf, config_file);
auto action = ss::defer([&remote] { remote.stop().get(); });

partition_manifest m(manifest_ntp, manifest_revision);
Expand Down Expand Up @@ -353,7 +353,7 @@ FIXTURE_TEST(
set_expectations_and_listen({});
auto conf = get_configuration();
auto bucket = cloud_storage_clients::bucket_name("bucket");
remote remote(s3_connection_limit(10), conf, config_file);
remote remote(connection_limit(10), conf, config_file);
auto action = ss::defer([&remote] { remote.stop().get(); });

partition_manifest m(manifest_ntp, manifest_revision);
Expand Down
22 changes: 11 additions & 11 deletions src/v/cloud_storage/tests/remote_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ FIXTURE_TEST(test_download_manifest, s3_imposter_fixture) { // NOLINT
set_expectations_and_listen({expectation{
.url = "/" + manifest_url, .body = ss::sstring(manifest_payload)}});
auto conf = get_configuration();
remote remote(s3_connection_limit(10), conf, config_file);
remote remote(connection_limit(10), conf, config_file);
partition_manifest actual(manifest_ntp, manifest_revision);
auto action = ss::defer([&remote] { remote.stop().get(); });
retry_chain_node fib(100ms, 20ms);
Expand All @@ -99,7 +99,7 @@ FIXTURE_TEST(test_download_manifest, s3_imposter_fixture) { // NOLINT

FIXTURE_TEST(test_download_manifest_timeout, s3_imposter_fixture) { // NOLINT
auto conf = get_configuration();
remote remote(s3_connection_limit(10), conf, config_file);
remote remote(connection_limit(10), conf, config_file);
partition_manifest actual(manifest_ntp, manifest_revision);
auto action = ss::defer([&remote] { remote.stop().get(); });
retry_chain_node fib(100ms, 20ms);
Expand All @@ -116,7 +116,7 @@ FIXTURE_TEST(test_download_manifest_timeout, s3_imposter_fixture) { // NOLINT
FIXTURE_TEST(test_upload_segment, s3_imposter_fixture) { // NOLINT
set_expectations_and_listen({});
auto conf = get_configuration();
remote remote(s3_connection_limit(10), conf, config_file);
remote remote(connection_limit(10), conf, config_file);
auto name = segment_name("1-2-v1.log");
auto path = generate_remote_segment_path(
manifest_ntp, manifest_revision, name, model::term_id{123});
Expand Down Expand Up @@ -149,7 +149,7 @@ FIXTURE_TEST(
test_upload_segment_lost_leadership, s3_imposter_fixture) { // NOLINT
set_expectations_and_listen({});
auto conf = get_configuration();
remote remote(s3_connection_limit(10), conf, config_file);
remote remote(connection_limit(10), conf, config_file);
auto name = segment_name("1-2-v1.log");
auto path = generate_remote_segment_path(
manifest_ntp, manifest_revision, name, model::term_id{123});
Expand Down Expand Up @@ -180,7 +180,7 @@ FIXTURE_TEST(

FIXTURE_TEST(test_upload_segment_timeout, s3_imposter_fixture) { // NOLINT
auto conf = get_configuration();
remote remote(s3_connection_limit(10), conf, config_file);
remote remote(connection_limit(10), conf, config_file);
auto name = segment_name("1-2-v1.log");
auto path = generate_remote_segment_path(
manifest_ntp, manifest_revision, name, model::term_id{123});
Expand Down Expand Up @@ -210,7 +210,7 @@ FIXTURE_TEST(test_download_segment, s3_imposter_fixture) { // NOLINT
set_expectations_and_listen({});
auto conf = get_configuration();
auto bucket = cloud_storage_clients::bucket_name("bucket");
remote remote(s3_connection_limit(10), conf, config_file);
remote remote(connection_limit(10), conf, config_file);
auto name = segment_name("1-2-v1.log");
auto path = generate_remote_segment_path(
manifest_ntp, manifest_revision, name, model::term_id{123});
Expand Down Expand Up @@ -252,7 +252,7 @@ FIXTURE_TEST(test_download_segment, s3_imposter_fixture) { // NOLINT
FIXTURE_TEST(test_download_segment_timeout, s3_imposter_fixture) { // NOLINT
auto conf = get_configuration();
auto bucket = cloud_storage_clients::bucket_name("bucket");
remote remote(s3_connection_limit(10), conf, config_file);
remote remote(connection_limit(10), conf, config_file);
auto name = segment_name("1-2-v1.log");
auto path = generate_remote_segment_path(
manifest_ntp, manifest_revision, name, model::term_id{123});
Expand All @@ -271,7 +271,7 @@ FIXTURE_TEST(test_segment_exists, s3_imposter_fixture) { // NOLINT
set_expectations_and_listen({});
auto conf = get_configuration();
auto bucket = cloud_storage_clients::bucket_name("bucket");
remote remote(s3_connection_limit(10), conf, config_file);
remote remote(connection_limit(10), conf, config_file);
auto name = segment_name("1-2-v1.log");
auto path = generate_remote_segment_path(
manifest_ntp, manifest_revision, name, model::term_id{123});
Expand Down Expand Up @@ -303,7 +303,7 @@ FIXTURE_TEST(test_segment_exists, s3_imposter_fixture) { // NOLINT
FIXTURE_TEST(test_segment_exists_timeout, s3_imposter_fixture) { // NOLINT
auto conf = get_configuration();
auto bucket = cloud_storage_clients::bucket_name("bucket");
remote remote(s3_connection_limit(10), conf, config_file);
remote remote(connection_limit(10), conf, config_file);
auto name = segment_name("1-2-v1.log");
auto path = generate_remote_segment_path(
manifest_ntp, manifest_revision, name, model::term_id{123});
Expand All @@ -317,7 +317,7 @@ FIXTURE_TEST(test_segment_delete, s3_imposter_fixture) { // NOLINT
set_expectations_and_listen({});
auto conf = get_configuration();
auto bucket = cloud_storage_clients::bucket_name("bucket");
remote remote(s3_connection_limit(10), conf, config_file);
remote remote(connection_limit(10), conf, config_file);
auto name = segment_name("0-1-v1.log");
auto path = generate_remote_segment_path(
manifest_ntp, manifest_revision, name, model::term_id{1});
Expand Down Expand Up @@ -399,7 +399,7 @@ FIXTURE_TEST(test_concat_segment_upload, s3_imposter_fixture) {
retry_chain_node fib(100ms, 20ms);
auto upload_size = b.get_disk_log_impl().size_bytes() - 40;

remote remote(s3_connection_limit(10), conf, config_file);
remote remote(connection_limit(10), conf, config_file);
auto action = ss::defer([&remote] { remote.stop().get(); });

set_expectations_and_listen({});
Expand Down
14 changes: 7 additions & 7 deletions src/v/cloud_storage/tests/s3_imposter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ inline ss::logger fixt_log("fixture"); // NOLINT
static constexpr uint16_t httpd_port_number = 4430;
static constexpr const char* httpd_host_name = "127.0.0.1";

cloud_storage_clients::configuration s3_imposter_fixture::get_configuration() {
cloud_storage_clients::s3_configuration
s3_imposter_fixture::get_configuration() {
net::unresolved_address server_addr(httpd_host_name, httpd_port_number);
cloud_storage_clients::configuration conf{
.uri = cloud_storage_clients::access_point_uri(httpd_host_name),
.access_key = cloud_roles::public_key_str("acess-key"),
.secret_key = cloud_roles::private_key_str("secret-key"),
.region = cloud_roles::aws_region_name("us-east-1"),
};
cloud_storage_clients::s3_configuration conf;
conf.uri = cloud_storage_clients::access_point_uri(httpd_host_name);
conf.access_key = cloud_roles::public_key_str("acess-key");
conf.secret_key = cloud_roles::private_key_str("secret-key");
conf.region = cloud_roles::aws_region_name("us-east-1");
conf.server_addr = server_addr;
conf._probe = ss::make_shared<cloud_storage_clients::client_probe>(
net::metrics_disabled::yes,
Expand Down
2 changes: 1 addition & 1 deletion src/v/cloud_storage/tests/s3_imposter.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class s3_imposter_fixture {
/// Access all http requests ordered by target url
const std::multimap<ss::sstring, ss::httpd::request>& get_targets() const;

static cloud_storage_clients::configuration get_configuration();
static cloud_storage_clients::s3_configuration get_configuration();

private:
void set_routes(
Expand Down
4 changes: 2 additions & 2 deletions src/v/cloud_storage/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ ss::future<configuration> configuration::get_config() {
overrides.port = config::shard_local_cfg().cloud_storage_api_endpoint_port;

auto s3_conf
= co_await cloud_storage_clients::configuration::make_configuration(
= co_await cloud_storage_clients::s3_configuration::make_configuration(
access_key,
secret_key,
region,
Expand All @@ -152,7 +152,7 @@ ss::future<configuration> configuration::get_config() {

configuration cfg{
.client_config = std::move(s3_conf),
.connection_limit = s3_connection_limit(
.connection_limit = cloud_storage::connection_limit(
config::shard_local_cfg().cloud_storage_max_connections.value()),
.metrics_disabled = remote_metrics_disabled(
static_cast<bool>(disable_metrics)),
Expand Down
5 changes: 2 additions & 3 deletions src/v/cloud_storage/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ using remote_manifest_path
using local_segment_path
= named_type<std::filesystem::path, struct archival_local_segment_path_t>;
/// Number of simultaneous connections to S3
using s3_connection_limit
= named_type<size_t, struct archival_s3_connection_limit_t>;
using connection_limit = named_type<size_t, struct archival_connection_limit_t>;

/// Version of the segment name format
enum class segment_name_format : int16_t { v1 = 1, v2 = 2 };
Expand Down Expand Up @@ -86,7 +85,7 @@ struct configuration {
/// S3 configuration
cloud_storage_clients::client_configuration client_config;
/// Number of simultaneous S3 uploads
s3_connection_limit connection_limit;
connection_limit connection_limit;
/// Disable metrics in the remote
remote_metrics_disabled metrics_disabled;
/// The bucket to use
Expand Down
2 changes: 2 additions & 0 deletions src/v/cloud_storage_clients/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ v_cc_library(
SRCS
client_pool.cc
client_probe.cc
configuration.cc
s3_client.cc
s3_error.cc
util.cc
DEPS
Seastar::seastar
v::bytes
Expand Down
3 changes: 1 addition & 2 deletions src/v/cloud_storage_clients/client_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ client_pool::http_client_ptr client_pool::make_client() const {
return std::visit(
[this](const auto& cfg) {
using cfg_type = std::decay_t<decltype(cfg)>;
static_assert(std::is_same_v<configuration, cfg_type>);
if constexpr (std::is_same_v<configuration, cfg_type>) {
if constexpr (std::is_same_v<s3_configuration, cfg_type>) {
return ss::make_shared<s3_client>(cfg, _as, _apply_credentials);
} else {
static_assert(always_false_v<cfg_type>, "Unknown client type");
Expand Down
Loading

0 comments on commit 25f76b2

Please sign in to comment.