Skip to content

Commit

Permalink
Merge pull request #5835 from jcsp/issue-5609-config-status-consistency
Browse files Browse the repository at this point in the history
admin: read-after-write consistency for config status on leader node
  • Loading branch information
jcsp committed Aug 16, 2022
2 parents 293f19e + 13d211a commit b40ed60
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 12 deletions.
22 changes: 22 additions & 0 deletions src/v/cluster/config_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -862,4 +862,26 @@ config_manager::apply_update(model::record_batch b) {
});
}

config_manager::status_map config_manager::get_projected_status() const {
status_map r = status;

// If our local status is ahead of the persistent status map,
// update the projected result: the persistent status map is
// guaranteed to catch up to this eventually via reconcile_status.
//
// This behaviour is useful to get read-after-write consistency
// when writing config updates to the controller leader + then
// reading back the status from the same leader node.
//
// A more comprehensive approach to waiting for status updates
// inline with config updates is discussed in
// https://github.com/redpanda-data/redpanda/issues/5833
auto it = r.find(_self);
if (it == r.end() || it->second.version < my_latest_status.version) {
r[_self] = my_latest_status;
}

return r;
}

} // namespace cluster
2 changes: 2 additions & 0 deletions src/v/cluster/config_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class config_manager final {

status_map const& get_status() const { return status; }

status_map get_projected_status() const;

config_version get_version() const noexcept { return _seen_version; }

bool needs_update(const config_status& new_status) {
Expand Down
5 changes: 1 addition & 4 deletions src/v/redpanda/admin_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1012,10 +1012,7 @@ void admin_server::register_cluster_config_routes() {
auto statuses = co_await cfg.invoke_on(
cluster::controller_stm_shard,
[](cluster::config_manager& manager) {
// Intentional copy, do not want to pass reference to mutable
// status map back to originating core.
return cluster::config_manager::status_map(
manager.get_status());
return manager.get_projected_status();
});

for (const auto& s : statuses) {
Expand Down
10 changes: 6 additions & 4 deletions tests/rptest/services/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ def patch_cluster_config(self,
upsert=None,
remove=None,
force=False,
dry_run=False):
dry_run=False,
node=None):
if upsert is None:
upsert = {}
if remove is None:
Expand All @@ -375,10 +376,11 @@ def patch_cluster_config(self,
json={
'upsert': upsert,
'remove': remove
}).json()
},
node=node).json()

def get_cluster_config_status(self):
return self._request("GET", "cluster_config/status").json()
def get_cluster_config_status(self, node=None):
return self._request("GET", "cluster_config/status", node=node).json()

def get_node_config(self):
return self._request("GET", "node_config").json()
Expand Down
4 changes: 2 additions & 2 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -1115,8 +1115,8 @@ def set_cluster_config(self, values: dict, expect_restart: bool = False):
# early in the cluster's lifetime
wait_until(
lambda: all([
n['config_version'] >= new_version
for n in self._admin.get_cluster_config_status()
n['config_version'] >= new_version for n in self._admin.
get_cluster_config_status(node=self.controller())
]),
timeout_sec=10,
backoff_sec=0.5,
Expand Down
48 changes: 46 additions & 2 deletions tests/rptest/tests/cluster_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ def test_bootstrap(self):
def _wait_for_version_sync(self, version):
wait_until(
lambda: set([
n['config_version']
for n in self.admin.get_cluster_config_status()
n['config_version'] for n in self.admin.
get_cluster_config_status(node=self.redpanda.controller())
]) == {version},
timeout_sec=10,
backoff_sec=0.5,
Expand Down Expand Up @@ -1123,3 +1123,47 @@ def has_uuid_cluster_id():
expect_restart=False)

assert rpk.cluster_metadata_id() == f"redpanda.{manual_id}"

@cluster(num_nodes=3)
def test_status_read_after_write_consistency(self):
"""
In general, status is updated asynchronously, and API clients
may send a PUT to any node, and poll any node to see asynchronous
updates to status.
However, there is a special path for API clients that would like to
get something a bit stricter: if they send a PUT to the controller
leader and then read the status back from the leader, they will
always see the status for this node updated with the new version
in a subsequent GET cluster_config/status to the same node.
Clearly doing fast reads isn't a guarantee of strict consistency
rules, but it will detect violations on realistic timescales. This
test did fail in practice before the change to have /status return
projected values.
"""

admin = Admin(self.redpanda)

# Don't want controller leadership changing while we run
r = admin.patch_cluster_config(
upsert={'enable_leader_balancer': False})
config_version = r['config_version']
self._wait_for_version_sync(config_version)

controller_node = self.redpanda.controller()
for i in range(0, 50):
# Some config update, different each iteration
r = admin.patch_cluster_config(
upsert={'kafka_connections_max': 1000 + i},
node=controller_node)
new_config_version = r['config_version']
assert new_config_version != config_version
config_version = new_config_version

# Immediately read back status from controller, it should reflect new version
status = self.admin.get_cluster_config_status(node=controller_node)
local_status = next(
s for s in status
if s['node_id'] == self.redpanda.idx(controller_node))
assert local_status['config_version'] == config_version

0 comments on commit b40ed60

Please sign in to comment.