Skip to content

Commit

Permalink
Merge pull request #4926 from vbotbuildovich/backport-fixes-to-v22.1.…
Browse files Browse the repository at this point in the history
…x-941

[v22.1.x] cluster,kafka: auto-populate cluster_id and prefix it with "redpanda."
  • Loading branch information
jcsp committed Jun 1, 2022
2 parents e9cdba0 + caa288c commit 397f01b
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 19 deletions.
4 changes: 1 addition & 3 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,10 @@ ss::future<> controller::start() {
std::ref(_members_table),
std::ref(_tp_state),
std::ref(_hm_frontend),
std::ref(_config_frontend),
std::ref(_as));
})
.then([this] {
if (!config::shard_local_cfg().enable_metrics_reporter()) {
return ss::now();
}
return _metrics_reporter.invoke_on(0, &metrics_reporter::start);
});
}
Expand Down
67 changes: 59 additions & 8 deletions src/v/cluster/metrics_reporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "cluster/metrics_reporter.h"

#include "bytes/iobuf.h"
#include "cluster/config_frontend.h"
#include "cluster/fwd.h"
#include "cluster/health_monitor_frontend.h"
#include "cluster/health_monitor_types.h"
Expand Down Expand Up @@ -100,11 +101,13 @@ metrics_reporter::metrics_reporter(
ss::sharded<members_table>& members_table,
ss::sharded<topic_table>& topic_table,
ss::sharded<health_monitor_frontend>& health_monitor,
ss::sharded<config_frontend>& config_frontend,
ss::sharded<ss::abort_source>& as)
: _raft0(std::move(raft0))
, _members_table(members_table)
, _topics(topic_table)
, _health_monitor(health_monitor)
, _config_frontend(config_frontend)
, _as(as)
, _logger(logger, "metrics-reporter") {}

Expand All @@ -113,7 +116,13 @@ ss::future<> metrics_reporter::start() {
_address = details::parse_url(
config::shard_local_cfg().metrics_reporter_url());
_tick_timer.set_callback([this] { report_metrics(); });
_tick_timer.arm(config::shard_local_cfg().metrics_reporter_tick_interval());

const auto initial_delay = 10s;

// A shorter initial wait than the tick interval, so that we
// give the cluster state a chance to stabilize, but also send
// a report reasonably promptly.
_tick_timer.arm(initial_delay);
co_return;
}

Expand Down Expand Up @@ -272,6 +281,36 @@ ss::future<> metrics_reporter::try_initialize_cluster_info() {
_cluster_uuid = fmt::format("{}", uuid_gen());
}

/**
* Having synthesized a unique cluster ID in try_initialize_cluster_info,
* set it as the global cluster_id in the cluster configuration, if there
* is not already a cluster_id set there.
*
* If this fails to write the configuration, it will simply log a warning,
* in the expectation that this function is called again on next metrics
* reporter tick.
*/
ss::future<> metrics_reporter::propagate_cluster_id() {
if (config::shard_local_cfg().cluster_id().has_value()) {
// Don't override any existing cluster_id
co_return;
}

if (_cluster_uuid == "") {
co_return;
}

auto result = co_await _config_frontend.local().do_patch(
config_update_request{.upsert = {{"cluster_id", _cluster_uuid}}},
model::timeout_clock::now() + 5s);
if (result.errc) {
vlog(
clusterlog.warn, "Failed to initialize cluster_id: {}", result.errc);
} else {
vlog(clusterlog.info, "Initialized cluster_id to {}", _cluster_uuid);
}
}

iobuf serialize_metrics_snapshot(
const metrics_reporter::metrics_snapshot& snapshot) {
json::StringBuffer sb;
Expand Down Expand Up @@ -314,20 +353,32 @@ ss::future<http::client> metrics_reporter::make_http_client() {
}

ss::future<> metrics_reporter::do_report_metrics() {
// skip reporting if current node is not raft0 leader, or we need to wait
// for next report
if (
!_raft0->is_leader()
|| _last_success
> ss::lowres_clock::now()
- config::shard_local_cfg().metrics_reporter_report_interval()) {
// skip reporting if current node is not raft0 leader
if (!_raft0->is_leader()) {
co_return;
}

// try initializing cluster info, if it is already present this operation
// does nothig
co_await try_initialize_cluster_info();

// Update cluster_id in configuration, if not already set.
co_await propagate_cluster_id();

// report interval has not elapsed
if (
_last_success
> ss::lowres_clock::now()
- config::shard_local_cfg().metrics_reporter_report_interval()) {
co_return;
}

// If reporting is disabled, drop out here: we've initialized cluster_id
// if needed but, will not send any reports home.
if (!config::shard_local_cfg().enable_metrics_reporter()) {
co_return;
}

// if not initialized, wait until next tick
if (_cluster_uuid == "") {
co_return;
Expand Down
3 changes: 3 additions & 0 deletions src/v/cluster/metrics_reporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class metrics_reporter {
ss::sharded<members_table>&,
ss::sharded<topic_table>&,
ss::sharded<health_monitor_frontend>&,
ss::sharded<config_frontend>&,
ss::sharded<ss::abort_source>&);

ss::future<> start();
Expand All @@ -90,12 +91,14 @@ class metrics_reporter {

ss::future<http::client> make_http_client();
ss::future<> try_initialize_cluster_info();
ss::future<> propagate_cluster_id();

ss::sstring _cluster_uuid;
consensus_ptr _raft0;
ss::sharded<members_table>& _members_table;
ss::sharded<topic_table>& _topics;
ss::sharded<health_monitor_frontend>& _health_monitor;
ss::sharded<config_frontend>& _config_frontend;
ss::sharded<ss::abort_source>& _as;
model::timestamp _creation_timestamp;
prefix_logger _logger;
Expand Down
10 changes: 9 additions & 1 deletion src/v/kafka/server/handlers/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,15 @@ ss::future<response_ptr> metadata_handler::handle(
}
}

reply.data.cluster_id = config::shard_local_cfg().cluster_id;
const auto cluster_id = config::shard_local_cfg().cluster_id();
if (cluster_id.has_value()) {
reply.data.cluster_id = ssx::sformat("redpanda.{}", cluster_id.value());
} else {
// Include a "redpanda." cluster ID even if we didn't initialize
// cluster_id yet, so that callers can identify which Kafka
// implementation they're talking to.
reply.data.cluster_id = "redpanda.initializing";
}

auto leader_id = ctx.metadata_cache().get_controller_leader_id();
reply.data.controller_id = leader_id.value_or(model::node_id(-1));
Expand Down
25 changes: 25 additions & 0 deletions tests/rptest/clients/rpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,3 +623,28 @@ def acl_create_allow_cluster(self, username, op):
] + self._kafka_conn_settings()
output = self._execute(cmd)
return output

def cluster_metadata_id(self):
"""
Calls 'cluster metadata' and returns the cluster ID, if set,
else None. Don't return the rest of the output (brokers list)
because there are already other ways to get at that.
"""
cmd = [
self._rpk_binary(), '--brokers',
self._redpanda.brokers(), 'cluster', 'metadata'
]
output = self._execute(cmd)
lines = output.strip().split("\n")

# Output is like:
#
# CLUSTER
# =======
# foobar

# Output only has a "CLUSTER" section if cluster id is set
if lines[0] != "CLUSTER":
return None
else:
return lines[2]
14 changes: 8 additions & 6 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,6 @@ class RedpandaService(Service):
COVERAGE_PROFRAW_CAPTURE = os.path.join(PERSISTENT_ROOT,
"redpanda.profraw")

CLUSTER_NAME = "my_cluster"
READY_TIMEOUT_SEC = 10

DEDICATED_NODE_KEY = "dedicated_nodes"
Expand Down Expand Up @@ -449,8 +448,7 @@ class RedpandaService(Service):
'default_topic_partitions': 4,
'enable_metrics_reporter': False,
'superusers': [SUPERUSER_CREDENTIALS[0]],
'enable_auto_rebalance_on_node_add': True,
'cluster_id': CLUSTER_NAME
'enable_auto_rebalance_on_node_add': True
}

logs = {
Expand Down Expand Up @@ -966,11 +964,15 @@ def set_cluster_config(self, values: dict, expect_restart: bool = False):
"""
patch_result = self._admin.patch_cluster_config(upsert=values)
new_version = patch_result['config_version']

# The version check is >= to permit other config writes to happen in
# the background, including the write to cluster_id that happens
# early in the cluster's lifetime
wait_until(
lambda: set([
n['config_version']
lambda: all([
n['config_version'] >= new_version
for n in self._admin.get_cluster_config_status()
]) == {new_version},
]),
timeout_sec=10,
backoff_sec=0.5,
err_msg=f"Config status versions did not converge on {new_version}"
Expand Down
50 changes: 49 additions & 1 deletion tests/rptest/tests/cluster_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ def __init__(self, *args, **kwargs):
self.admin = Admin(self.redpanda)
self.rpk = RpkTool(self.redpanda)

def setUp(self):
super().setUp()

# wait for the two config versions:
# 1. The initial bootstrap where we "import" any cluster properties
# that were in bootstrap.yaml
# 2. The metrics reporter's first tick, where it initializes
# the cluster_id property.
self._wait_for_version_sync(2)

@cluster(num_nodes=3)
@parametrize(legacy=False)
@parametrize(legacy=True)
Expand Down Expand Up @@ -501,7 +511,10 @@ def test_valid_settings(self):
else:
valid_value = 1000.0
elif p['type'] == 'string':
valid_value = "rhubarb"
if name.endswith("_url"):
valid_value = "http://example.com"
else:
valid_value = "rhubarb"
elif p['type'] == 'boolean':
valid_value = not initial_config[name]
elif p['type'] == "array" and p['items']['type'] == 'string':
Expand Down Expand Up @@ -1026,3 +1039,38 @@ def test_cloud_validation(self):
# disabled
for key in forbidden_to_clear:
self.admin.patch_cluster_config(upsert={}, remove=[key])

@cluster(num_nodes=3)
def test_cluster_id(self):
"""
That the cluster_id exposed in Kafka metadata is automatically
populated with a uuid, that it starts with redpanda. and that
it can be overridden by setting the property to something else.
"""

rpk = RpkTool(self.redpanda)

# An example, we will compare lengths with this
uuid_example = "redpanda.87e8c0c3-7c2a-4f7b-987f-11fc1d2443a4"

def has_uuid_cluster_id():
cluster_id = rpk.cluster_metadata_id()
self.logger.info(f"cluster_id={cluster_id}")
return cluster_id is not None and len(cluster_id) == len(
uuid_example)

# This is a wait_until because the initialization of cluster_id
# is async and can happen after the cluster starts answering Kafka requests.
wait_until(has_uuid_cluster_id, timeout_sec=20, backoff_sec=1)

# Verify that the cluster_id does not change on a restart
initial_cluster_id = rpk.cluster_metadata_id()
self.redpanda.restart_nodes(self.redpanda.nodes)
assert rpk.cluster_metadata_id() == initial_cluster_id

# Verify that a manually set cluster_id is respected
manual_id = "rhubarb"
self.redpanda.set_cluster_config(values={"cluster_id": manual_id},
expect_restart=False)

assert rpk.cluster_metadata_id() == f"redpanda.{manual_id}"

0 comments on commit 397f01b

Please sign in to comment.