Skip to content

Commit

Permalink
operator: restart cluster only when nodes share the same config version
Browse files Browse the repository at this point in the history
It could happen that the controller restarted the cluster before changes to configuration have been propagated to all nodes. This change prevents it to happen.
  • Loading branch information
nicolaferraro committed Jun 14, 2022
1 parent 0c9e05d commit 3d6a5fa
Showing 1 changed file with 34 additions and 16 deletions.
50 changes: 34 additions & 16 deletions src/go/k8s/controllers/redpanda/cluster_controller_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (r *ClusterReconciler) applyPatchIfNeeded(
}

log.Info("Applying patch to the cluster configuration", "patch", patch.String())
_, err = adminAPI.PatchClusterConfig(ctx, patch.Upsert, patch.Remove)
wr, err := adminAPI.PatchClusterConfig(ctx, patch.Upsert, patch.Remove)
if err != nil {
var conditionData *redpandav1alpha1.ClusterCondition
conditionData, err = tryMapErrorToCondition(err)
Expand All @@ -220,6 +220,7 @@ func (r *ClusterReconciler) applyPatchIfNeeded(
// Patch issue is due to user error, so it's unrecoverable
return false, nil
}
log.Info("Patch written to the cluster", "config_version", wr.ConfigVersion)
return true, nil
}

Expand Down Expand Up @@ -318,18 +319,24 @@ func (r *ClusterReconciler) synchronizeStatusWithCluster(
}
conditionData := mapStatusToCondition(status)
conditionChanged := redpandaCluster.Status.SetCondition(conditionData.Type, conditionData.Status, conditionData.Reason, conditionData.Message)
stsNeedsRestart := needsRestart(status)
if conditionChanged || (stsNeedsRestart && !redpandaCluster.Status.IsRestarting()) {
// Trigger restart here if needed
if stsNeedsRestart {
clusterNeedsRestart := needsRestart(status, log)
clusterSafeToRestart := isSafeToRestart(status, log)
restartingCluster := clusterNeedsRestart && clusterSafeToRestart

log.Info("Synchronizing configuration state for cluster",
"status", conditionData.Status,
"reason", conditionData.Reason,
"message", conditionData.Message,
"needs_restart", clusterNeedsRestart,
"restarting", restartingCluster,
)
if conditionChanged || (restartingCluster && !redpandaCluster.Status.IsRestarting()) {
log.Info("Updating configuration state for cluster")
// Trigger restart here if needed and safe to do it
if restartingCluster {
redpandaCluster.Status.SetRestarting(true)
}
log.Info("Updating configuration state for cluster",
"status", conditionData.Status,
"reason", conditionData.Reason,
"message", conditionData.Message,
"restarting", redpandaCluster.Status.IsRestarting(),
)

if err := r.Status().Update(ctx, redpandaCluster); err != nil {
return nil, errorWithContext(err, "could not update condition on cluster")
}
Expand All @@ -342,7 +349,7 @@ func mapStatusToCondition(
clusterStatus admin.ConfigStatusResponse,
) redpandav1alpha1.ClusterCondition {
var condition *redpandav1alpha1.ClusterCondition
var configVersion int64
var configVersion int64 = -1
for _, nodeStatus := range clusterStatus {
if len(nodeStatus.Invalid) > 0 {
condition = &redpandav1alpha1.ClusterCondition{
Expand All @@ -365,7 +372,7 @@ func mapStatusToCondition(
Reason: redpandav1alpha1.ClusterConfiguredReasonUpdating,
Message: fmt.Sprintf("Node %d needs restart", nodeStatus.NodeID),
}
} else if configVersion != 0 && nodeStatus.ConfigVersion != configVersion {
} else if configVersion >= 0 && nodeStatus.ConfigVersion != configVersion {
condition = &redpandav1alpha1.ClusterCondition{
Type: redpandav1alpha1.ClusterConfiguredConditionType,
Status: corev1.ConditionFalse,
Expand All @@ -387,13 +394,24 @@ func mapStatusToCondition(
return *condition
}

func needsRestart(clusterStatus admin.ConfigStatusResponse) bool {
func needsRestart(clusterStatus admin.ConfigStatusResponse, log logr.Logger) bool {
nodeNeedsRestart := false
for i := range clusterStatus {
log.Info(fmt.Sprintf("Node %d restart status is %v", clusterStatus[i].NodeID, clusterStatus[i].Restart))
if clusterStatus[i].Restart {
return true
nodeNeedsRestart = true
}
}
return false
return nodeNeedsRestart
}

func isSafeToRestart(clusterStatus admin.ConfigStatusResponse, log logr.Logger) bool {
configVersions := make(map[int64]bool)
for i := range clusterStatus {
log.Info(fmt.Sprintf("Node %d is using config version %d", clusterStatus[i].NodeID, clusterStatus[i].ConfigVersion))
configVersions[clusterStatus[i].ConfigVersion] = true
}
return len(configVersions) == 1
}

// tryMapErrorToCondition tries to map validation errors received from the cluster to a condition
Expand Down

0 comments on commit 3d6a5fa

Please sign in to comment.