Skip to content

Commit

Permalink
Merge pull request #5497 from abhijat/iam-roles-ssl-client
Browse files Browse the repository at this point in the history
cloud_roles: optionally enable tls in http client
  • Loading branch information
abhijat committed Jul 21, 2022
2 parents 1eb6490 + 5c0c742 commit f5e34d4
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 15 deletions.
6 changes: 4 additions & 2 deletions src/v/cloud_roles/aws_refresh_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ ss::future<api_response> aws_refresh_impl::fetch_credentials() {
creds_req.method(boost::beast::http::verb::get);
creds_req.target(
fmt::format("/latest/meta-data/iam/security-credentials/{}", *_role));
co_return co_await make_request(make_api_client(), std::move(creds_req));
co_return co_await make_request(
co_await make_api_client(), std::move(creds_req));
}

api_response_parse_result aws_refresh_impl::parse_response(iobuf resp) {
Expand Down Expand Up @@ -105,7 +106,8 @@ ss::future<api_response> aws_refresh_impl::fetch_role_name() {
http::client::request_header role_req;
role_req.method(boost::beast::http::verb::get);
role_req.target("/latest/meta-data/iam/security-credentials/");
co_return co_await make_request(make_api_client(), std::move(role_req));
co_return co_await make_request(
co_await make_api_client(), std::move(role_req));
}

std::ostream& aws_refresh_impl::print(std::ostream& os) const {
Expand Down
18 changes: 17 additions & 1 deletion src/v/cloud_roles/aws_sts_refresh_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "bytes/iobuf_istreambuf.h"
#include "utils/file_io.h"

#include <boost/algorithm/string/trim.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/xml_parser.hpp>

Expand Down Expand Up @@ -115,9 +116,15 @@ ss::future<api_response> aws_sts_refresh_impl::fetch_credentials() {
assume_req.target("/");

assume_req.set(boost::beast::http::field::content_type, content_type);
assume_req.set(
boost::beast::http::field::host,
fmt::format("{}:{}", api_host(), api_port()));
assume_req.set(
boost::beast::http::field::user_agent, "redpanda.vectorized.io");

using namespace fmt::literals;

boost::trim(token);
ss::sstring body = fmt::format(
request_payload,
"duration_sec"_a = sts_params::token_expiry_seconds.count(),
Expand All @@ -126,8 +133,17 @@ ss::future<api_response> aws_sts_refresh_impl::fetch_credentials() {
"role_arn"_a = _role,
"token"_a = token);

// STS requires a TLS enabled client by default, but in test mode where we
// use the http imposter, we need to use a simple client.
auto tls_enabled = refresh_credentials::client_tls_enabled::yes;
if (api_port() != default_port) {
tls_enabled = refresh_credentials::client_tls_enabled::no;
}

co_return co_await post_request(
make_api_client(), std::move(assume_req), std::move(body));
co_await make_api_client(tls_enabled),
std::move(assume_req),
std::move(body));
}

api_response_parse_result aws_sts_refresh_impl::parse_response(iobuf resp) {
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_roles/gcp_refresh_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ ss::future<api_response> gcp_refresh_impl::fetch_credentials() {
oauth_req.set(
metadata_flavor::header_name.data(), metadata_flavor::value.data());

co_return co_await make_request(make_api_client(), std::move(oauth_req));
co_return co_await make_request(
co_await make_api_client(), std::move(oauth_req));
}

api_response_parse_result gcp_refresh_impl::parse_response(iobuf response) {
Expand Down
58 changes: 55 additions & 3 deletions src/v/cloud_roles/refresh_credentials.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
#include "cloud_roles/aws_sts_refresh_impl.h"
#include "cloud_roles/gcp_refresh_impl.h"
#include "cloud_roles/logger.h"
#include "config/configuration.h"
#include "config/node_config.h"
#include "model/metadata.h"
#include "net/tls.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/gate.hh>
Expand Down Expand Up @@ -55,7 +57,15 @@ void refresh_credentials::start() {
ss::future<> refresh_credentials::do_start() {
return ss::do_until(
[this] { return _gate.is_closed() || _as.abort_requested(); },
[this] { return fetch_and_update_credentials(); });
[this] {
return fetch_and_update_credentials().handle_exception_type(
[](const ss::sleep_aborted& ex) {
vlog(
clrl_log.info,
"stopping refresh_credentials loop: {}",
ex.what());
});
});
}

static std::optional<ss::sstring>
Expand Down Expand Up @@ -225,8 +235,24 @@ ss::future<> refresh_credentials::impl::sleep_until_expiry() const {
}
}

http::client refresh_credentials::impl::make_api_client() const {
return http::client{
ss::future<http::client>
refresh_credentials::impl::make_api_client(client_tls_enabled enable_tls) {
if (enable_tls == client_tls_enabled::yes) {
if (_tls_certs == nullptr) {
co_await init_tls_certs();
}

co_return http::client{
net::base_transport::configuration{
.server_addr = net::unresolved_address{api_host(), api_port()},
.credentials = _tls_certs,
// TODO (abhijat) toggle metrics
.disable_metrics = net::metrics_disabled::yes,
.tls_sni_hostname = api_host()},
_as};
}

co_return http::client{
net::base_transport::configuration{
.server_addr = net::unresolved_address{_api_host, _api_port},
.credentials = {},
Expand All @@ -235,6 +261,32 @@ http::client refresh_credentials::impl::make_api_client() const {
_as};
}

ss::future<> refresh_credentials::impl::init_tls_certs() {
ss::tls::credentials_builder b;
b.set_client_auth(ss::tls::client_auth::NONE);

if (auto trust_file_path
= config::shard_local_cfg().cloud_storage_trust_file.value();
trust_file_path.has_value()) {
vlog(
clrl_log.info,
"Using non-default trust file {}",
trust_file_path.value());
co_await b.set_x509_trust_file(
trust_file_path.value(), ss::tls::x509_crt_format::PEM);
} else if (auto ca_file = co_await net::find_ca_file();
ca_file.has_value()) {
vlog(clrl_log.info, "Using discovered trust file {}", ca_file.value());
co_await b.set_x509_trust_file(
ca_file.value(), ss::tls::x509_crt_format::PEM);
} else {
vlog(clrl_log.info, "Using GnuTLS default");
co_await b.set_system_trust();
}

_tls_certs = co_await b.build_reloadable_certificate_credentials();
}

refresh_credentials make_refresh_credentials(
model::cloud_credentials_source cloud_credentials_source,
ss::gate& gate,
Expand Down
13 changes: 12 additions & 1 deletion src/v/cloud_roles/refresh_credentials.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ struct retry_params {

class refresh_credentials {
public:
enum class client_tls_enabled : bool {
yes = true,
no = false,
};

class impl {
public:
impl(
Expand Down Expand Up @@ -79,7 +84,8 @@ class refresh_credentials {

protected:
/// Returns an http client with the API host and port applied
http::client make_api_client() const;
ss::future<http::client>
make_api_client(client_tls_enabled enable_tls = client_tls_enabled::no);

/// Helper to parse the iobuf returned from API into a credentials
/// object, customized to API response structure
Expand Down Expand Up @@ -117,6 +123,10 @@ class refresh_credentials {
aws_region_name region() const { return _region; }

private:
/// Initializes certificate_credentials on first client creation.
/// Subsequent clients which are created will reuse the certs.
ss::future<> init_tls_certs();

/// The hostname to query for credentials. Can be overridden using env
/// variable `RP_SI_CREDS_API_HOST`
ss::sstring _api_host;
Expand All @@ -133,6 +143,7 @@ class refresh_credentials {
std::optional<std::chrono::milliseconds> _sleep_duration{std::nullopt};
ss::abort_source& _as;
retry_params _retry_params;
ss::shared_ptr<ss::tls::certificate_credentials> _tls_certs = nullptr;
};

refresh_credentials(
Expand Down
18 changes: 11 additions & 7 deletions src/v/cloud_roles/request_response_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ ss::future<api_response> make_request(
.error_kind = api_request_error_kind::failed_abort};
}

co_return co_await drain_response_stream(std::move(response_stream));
auto data = co_await drain_response_stream(std::move(response_stream));
co_await client.stop();
client.shutdown();
co_return data;
} catch (const std::system_error& ec) {
if (auto code = ec.code(); std::find(
retryable_system_error_codes.begin(),
Expand Down Expand Up @@ -100,11 +103,8 @@ ss::future<api_response> post_request(
boost::beast::http::field::content_length,
boost::beast::to_static_string(content.size_bytes()));

auto [req_str, resp] = co_await client.make_request(
std::move(req), timeout);
co_await req_str->send_some(std::move(content));
co_await req_str->send_eof();

auto stream = make_iobuf_input_stream(std::move(content));
auto resp = co_await client.request(std::move(req), stream, timeout);
auto status = co_await get_status(resp);
if (
std::find(
Expand All @@ -121,7 +121,11 @@ ss::future<api_response> post_request(
.error_kind = api_request_error_kind::failed_abort};
}

co_return co_await drain_response_stream(std::move(resp));
auto data = co_await drain_response_stream(std::move(resp));
co_await stream.close();
co_await client.stop();
client.shutdown();
co_return data;
} catch (const std::system_error& ec) {
if (auto code = ec.code(); std::find(
retryable_system_error_codes.begin(),
Expand Down

0 comments on commit f5e34d4

Please sign in to comment.