Skip to content

Commit

Permalink
Add compare util using go-cmp, modify KCP controller & webhooks
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan Büringer buringerst@vmware.com
  • Loading branch information
sbueringer committed May 16, 2024
1 parent 7b50321 commit 677625c
Show file tree
Hide file tree
Showing 14 changed files with 471 additions and 98 deletions.
28 changes: 19 additions & 9 deletions controlplane/kubeadm/internal/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,15 @@ func (c *ControlPlane) FailureDomainWithMostMachines(ctx context.Context, machin
}

// NextFailureDomainForScaleUp returns the failure domain with the fewest number of up-to-date machines.
func (c *ControlPlane) NextFailureDomainForScaleUp(ctx context.Context) *string {
func (c *ControlPlane) NextFailureDomainForScaleUp(ctx context.Context) (*string, error) {
if len(c.Cluster.Status.FailureDomains.FilterControlPlane()) == 0 {
return nil
return nil, nil
}
return failuredomains.PickFewest(ctx, c.FailureDomains().FilterControlPlane(), c.UpToDateMachines())
upToDateMachines, err := c.UpToDateMachines()
if err != nil {
return nil, errors.Wrapf(err, "failed to determine next failure domain for scale up")
}
return failuredomains.PickFewest(ctx, c.FailureDomains().FilterControlPlane(), upToDateMachines), nil
}

// InitialControlPlaneConfig returns a new KubeadmConfigSpec that is to be used for an initializing control plane.
Expand Down Expand Up @@ -167,34 +171,40 @@ func (c *ControlPlane) GetKubeadmConfig(machineName string) (*bootstrapv1.Kubead
}

// MachinesNeedingRollout return a list of machines that need to be rolled out.
func (c *ControlPlane) MachinesNeedingRollout() (collections.Machines, map[string]string) {
func (c *ControlPlane) MachinesNeedingRollout() (collections.Machines, map[string]string, error) {
// Ignore machines to be deleted.
machines := c.Machines.Filter(collections.Not(collections.HasDeletionTimestamp))

// Return machines if they are scheduled for rollout or if with an outdated configuration.
machinesNeedingRollout := make(collections.Machines, len(machines))
rolloutReasons := map[string]string{}
for _, m := range machines {
reason, needsRollout := NeedsRollout(&c.reconciliationTime, c.KCP.Spec.RolloutAfter, c.KCP.Spec.RolloutBefore, c.InfraResources, c.KubeadmConfigs, c.KCP, m)
reason, needsRollout, err := NeedsRollout(&c.reconciliationTime, c.KCP.Spec.RolloutAfter, c.KCP.Spec.RolloutBefore, c.InfraResources, c.KubeadmConfigs, c.KCP, m)
if err != nil {
return nil, nil, err
}
if needsRollout {
machinesNeedingRollout.Insert(m)
rolloutReasons[m.Name] = reason
}
}
return machinesNeedingRollout, rolloutReasons
return machinesNeedingRollout, rolloutReasons, nil
}

// UpToDateMachines returns the machines that are up to date with the control
// plane's configuration and therefore do not require rollout.
func (c *ControlPlane) UpToDateMachines() collections.Machines {
func (c *ControlPlane) UpToDateMachines() (collections.Machines, error) {
upToDateMachines := make(collections.Machines, len(c.Machines))
for _, m := range c.Machines {
_, needsRollout := NeedsRollout(&c.reconciliationTime, c.KCP.Spec.RolloutAfter, c.KCP.Spec.RolloutBefore, c.InfraResources, c.KubeadmConfigs, c.KCP, m)
_, needsRollout, err := NeedsRollout(&c.reconciliationTime, c.KCP.Spec.RolloutAfter, c.KCP.Spec.RolloutBefore, c.InfraResources, c.KubeadmConfigs, c.KCP, m)
if err != nil {
return nil, err
}
if !needsRollout {
upToDateMachines.Insert(m)
}
}
return upToDateMachines
return upToDateMachines, nil
}

// getInfraResources fetches the external infrastructure resource for each machine in the collection and returns a map of machine.Name -> infraResource.
Expand Down
7 changes: 5 additions & 2 deletions controlplane/kubeadm/internal/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (r *KubeadmControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.
if errors.As(err, &connFailure) {
log.Info("Could not connect to workload cluster to fetch status", "err", err.Error())
} else {
log.Error(err, "Failed to update KubeadmControlPlane Status")
log.Error(err, "Failed to update KubeadmControlPlane status")
reterr = kerrors.NewAggregate([]error{reterr, err})
}
}
Expand Down Expand Up @@ -399,7 +399,10 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, controlPl
}

// Control plane machines rollout due to configuration changes (e.g. upgrades) takes precedence over other operations.
machinesNeedingRollout, rolloutReasons := controlPlane.MachinesNeedingRollout()
machinesNeedingRollout, rolloutReasons, err := controlPlane.MachinesNeedingRollout()
if err != nil {
return ctrl.Result{}, err
}
switch {
case len(machinesNeedingRollout) > 0:
var reasons []string
Expand Down
12 changes: 10 additions & 2 deletions controlplane/kubeadm/internal/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Conte
logger := ctrl.LoggerFrom(ctx)

bootstrapSpec := controlPlane.InitialControlPlaneConfig()
fd := controlPlane.NextFailureDomainForScaleUp(ctx)
fd, err := controlPlane.NextFailureDomainForScaleUp(ctx)
if err != nil {
return ctrl.Result{}, err
}

if err := r.cloneConfigsAndGenerateMachine(ctx, controlPlane.Cluster, controlPlane.KCP, bootstrapSpec, fd); err != nil {
logger.Error(err, "Failed to create initial control plane Machine")
r.recorder.Eventf(controlPlane.KCP, corev1.EventTypeWarning, "FailedInitialization", "Failed to create initial control plane Machine for cluster %s control plane: %v", klog.KObj(controlPlane.Cluster), err)
Expand All @@ -60,7 +64,11 @@ func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context,

// Create the bootstrap configuration
bootstrapSpec := controlPlane.JoinControlPlaneConfig()
fd := controlPlane.NextFailureDomainForScaleUp(ctx)
fd, err := controlPlane.NextFailureDomainForScaleUp(ctx)
if err != nil {
return ctrl.Result{}, err
}

if err := r.cloneConfigsAndGenerateMachine(ctx, controlPlane.Cluster, controlPlane.KCP, bootstrapSpec, fd); err != nil {
logger.Error(err, "Failed to create additional control plane Machine")
r.recorder.Eventf(controlPlane.KCP, corev1.EventTypeWarning, "FailedScaleUp", "Failed to create additional control plane Machine for cluster % control plane: %v", klog.KObj(controlPlane.Cluster), err)
Expand Down
6 changes: 5 additions & 1 deletion controlplane/kubeadm/internal/controllers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ func (r *KubeadmControlPlaneReconciler) updateStatus(ctx context.Context, contro
// This is necessary for CRDs including scale subresources.
controlPlane.KCP.Status.Selector = selector.String()

controlPlane.KCP.Status.UpdatedReplicas = int32(len(controlPlane.UpToDateMachines()))
upToDateMachines, err := controlPlane.UpToDateMachines()
if err != nil {
return errors.Wrapf(err, "failed to update status")
}
controlPlane.KCP.Status.UpdatedReplicas = int32(len(upToDateMachines))

replicas := int32(len(controlPlane.Machines))
desiredReplicas := *controlPlane.KCP.Spec.Replicas
Expand Down
74 changes: 50 additions & 24 deletions controlplane/kubeadm/internal/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"reflect"
"strings"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1beta1"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
"sigs.k8s.io/cluster-api/internal/util/compare"
"sigs.k8s.io/cluster-api/util/collections"
)

Expand All @@ -39,7 +41,7 @@ import (
// - mutated in-place (ex: NodeDrainTimeout)
// - are not dictated by KCP (ex: ProviderID)
// - are not relevant for the rollout decision (ex: failureDomain).
func matchesMachineSpec(infraConfigs map[string]*unstructured.Unstructured, machineConfigs map[string]*bootstrapv1.KubeadmConfig, kcp *controlplanev1.KubeadmControlPlane, machine *clusterv1.Machine) (string, bool) {
func matchesMachineSpec(infraConfigs map[string]*unstructured.Unstructured, machineConfigs map[string]*bootstrapv1.KubeadmConfig, kcp *controlplanev1.KubeadmControlPlane, machine *clusterv1.Machine) (string, bool, error) {
mismatchReasons := []string{}

if !collections.MatchesKubernetesVersion(kcp.Spec.Version)(machine) {
Expand All @@ -50,7 +52,11 @@ func matchesMachineSpec(infraConfigs map[string]*unstructured.Unstructured, mach
mismatchReasons = append(mismatchReasons, fmt.Sprintf("Machine version %q is not equal to KCP version %q", machineVersion, kcp.Spec.Version))
}

if reason, matches := matchesKubeadmBootstrapConfig(machineConfigs, kcp, machine); !matches {
reason, matches, err := matchesKubeadmBootstrapConfig(machineConfigs, kcp, machine)
if err != nil {
return "", false, errors.Wrapf(err, "failed to match Machine spec")
}
if !matches {
mismatchReasons = append(mismatchReasons, reason)
}

Expand All @@ -59,14 +65,14 @@ func matchesMachineSpec(infraConfigs map[string]*unstructured.Unstructured, mach
}

if len(mismatchReasons) > 0 {
return strings.Join(mismatchReasons, ","), false
return strings.Join(mismatchReasons, ","), false, nil
}

return "", true
return "", true, nil
}

// NeedsRollout checks if a Machine needs to be rolled out and returns the reason why.
func NeedsRollout(reconciliationTime, rolloutAfter *metav1.Time, rolloutBefore *controlplanev1.RolloutBefore, infraConfigs map[string]*unstructured.Unstructured, machineConfigs map[string]*bootstrapv1.KubeadmConfig, kcp *controlplanev1.KubeadmControlPlane, machine *clusterv1.Machine) (string, bool) {
func NeedsRollout(reconciliationTime, rolloutAfter *metav1.Time, rolloutBefore *controlplanev1.RolloutBefore, infraConfigs map[string]*unstructured.Unstructured, machineConfigs map[string]*bootstrapv1.KubeadmConfig, kcp *controlplanev1.KubeadmControlPlane, machine *clusterv1.Machine) (string, bool, error) {
rolloutReasons := []string{}

// Machines whose certificates are about to expire.
Expand All @@ -81,15 +87,19 @@ func NeedsRollout(reconciliationTime, rolloutAfter *metav1.Time, rolloutBefore *
}

// Machines that do not match with KCP config.
if mismatchReason, matches := matchesMachineSpec(infraConfigs, machineConfigs, kcp, machine); !matches {
mismatchReason, matches, err := matchesMachineSpec(infraConfigs, machineConfigs, kcp, machine)
if err != nil {
return "", false, errors.Wrapf(err, "failed to determine if Machine %s needs rollout", machine.Name)
}
if !matches {
rolloutReasons = append(rolloutReasons, mismatchReason)
}

if len(rolloutReasons) > 0 {
return fmt.Sprintf("Machine %s needs rollout: %s", machine.Name, strings.Join(rolloutReasons, ",")), true
return fmt.Sprintf("Machine %s needs rollout: %s", machine.Name, strings.Join(rolloutReasons, ",")), true, nil
}

return "", false
return "", false, nil
}

// matchesTemplateClonedFrom checks if a Machine has a corresponding infrastructure machine that
Expand Down Expand Up @@ -130,50 +140,58 @@ func matchesTemplateClonedFrom(infraConfigs map[string]*unstructured.Unstructure
// matchesKubeadmBootstrapConfig checks if machine's KubeadmConfigSpec is equivalent with KCP's KubeadmConfigSpec.
// Note: Differences to the labels and annotations on the KubeadmConfig are not considered for matching
// criteria, because changes to labels and annotations are propagated in-place to KubeadmConfig.
func matchesKubeadmBootstrapConfig(machineConfigs map[string]*bootstrapv1.KubeadmConfig, kcp *controlplanev1.KubeadmControlPlane, machine *clusterv1.Machine) (string, bool) {
func matchesKubeadmBootstrapConfig(machineConfigs map[string]*bootstrapv1.KubeadmConfig, kcp *controlplanev1.KubeadmControlPlane, machine *clusterv1.Machine) (string, bool, error) {
if machine == nil {
return "Machine KubeadmConfig cannot be compared: Machine is nil", false
return "Machine KubeadmConfig cannot be compared: Machine is nil", false, nil
}

// Check if KCP and machine ClusterConfiguration matches, if not return
if !matchClusterConfiguration(kcp, machine) {
return "Machine ClusterConfiguration is outdated", false
match, diff, err := matchClusterConfiguration(kcp, machine)
if err != nil {
return "", false, errors.Wrapf(err, "failed to match KubeadmConfig")
}
if !match {
return fmt.Sprintf("Machine KubeadmConfig ClusterConfiguration is outdated: diff: %s", diff), false, nil
}

bootstrapRef := machine.Spec.Bootstrap.ConfigRef
if bootstrapRef == nil {
// Missing bootstrap reference should not be considered as unmatching.
// This is a safety precaution to avoid selecting machines that are broken, which in the future should be remediated separately.
return "", true
return "", true, nil
}

machineConfig, found := machineConfigs[machine.Name]
if !found {
// Return true here because failing to get KubeadmConfig should not be considered as unmatching.
// This is a safety precaution to avoid rolling out machines if the client or the api-server is misbehaving.
return "", true
return "", true, nil
}

// Check if KCP and machine InitConfiguration or JoinConfiguration matches
// NOTE: only one between init configuration and join configuration is set on a machine, depending
// on the fact that the machine was the initial control plane node or a joining control plane node.
if !matchInitOrJoinConfiguration(machineConfig, kcp) {
return "Machine InitConfiguration or JoinConfiguration are outdated", false
match, diff, err = matchInitOrJoinConfiguration(machineConfig, kcp)
if err != nil {
return "", false, errors.Wrapf(err, "failed to match KubeadmConfig")
}
if !match {
return fmt.Sprintf("Machine KubeadmConfig InitConfiguration or JoinConfiguration are outdated: diff: %s", diff), false, nil
}

return "", true
return "", true, nil
}

// matchClusterConfiguration verifies if KCP and machine ClusterConfiguration matches.
// NOTE: Machines that have KubeadmClusterConfigurationAnnotation will have to match with KCP ClusterConfiguration.
// If the annotation is not present (machine is either old or adopted), we won't roll out on any possible changes
// made in KCP's ClusterConfiguration given that we don't have enough information to make a decision.
// Users should use KCP.Spec.RolloutAfter field to force a rollout in this case.
func matchClusterConfiguration(kcp *controlplanev1.KubeadmControlPlane, machine *clusterv1.Machine) bool {
func matchClusterConfiguration(kcp *controlplanev1.KubeadmControlPlane, machine *clusterv1.Machine) (bool, string, error) {
machineClusterConfigStr, ok := machine.GetAnnotations()[controlplanev1.KubeadmClusterConfigurationAnnotation]
if !ok {
// We don't have enough information to make a decision; don't' trigger a roll out.
return true
return true, "", nil
}

machineClusterConfig := &bootstrapv1.ClusterConfiguration{}
Expand All @@ -182,7 +200,7 @@ func matchClusterConfiguration(kcp *controlplanev1.KubeadmControlPlane, machine
// otherwise we won't be able to handle a nil ClusterConfiguration (that is serialized into "null").
// See https://github.com/kubernetes-sigs/cluster-api/issues/3353.
if err := json.Unmarshal([]byte(machineClusterConfigStr), &machineClusterConfig); err != nil {
return false
return false, "", nil //nolint:nilerr // Intentionally not returning the error here
}

// If any of the compared values are nil, treat them the same as an empty ClusterConfiguration.
Expand All @@ -199,16 +217,20 @@ func matchClusterConfiguration(kcp *controlplanev1.KubeadmControlPlane, machine
machineClusterConfig.DNS = kcpLocalClusterConfiguration.DNS

// Compare and return.
return reflect.DeepEqual(machineClusterConfig, kcpLocalClusterConfiguration)
match, diff, err := compare.Diff(machineClusterConfig, kcpLocalClusterConfiguration)
if err != nil {
return false, "", errors.Wrapf(err, "failed to match ClusterConfiguration")
}
return match, diff, nil
}

// matchInitOrJoinConfiguration verifies if KCP and machine InitConfiguration or JoinConfiguration matches.
// NOTE: By extension this method takes care of detecting changes in other fields of the KubeadmConfig configuration (e.g. Files, Mounts etc.)
func matchInitOrJoinConfiguration(machineConfig *bootstrapv1.KubeadmConfig, kcp *controlplanev1.KubeadmControlPlane) bool {
func matchInitOrJoinConfiguration(machineConfig *bootstrapv1.KubeadmConfig, kcp *controlplanev1.KubeadmControlPlane) (bool, string, error) {
if machineConfig == nil {
// Return true here because failing to get KubeadmConfig should not be considered as unmatching.
// This is a safety precaution to avoid rolling out machines if the client or the api-server is misbehaving.
return true
return true, "", nil
}

// takes the KubeadmConfigSpec from KCP and applies the transformations required
Expand All @@ -225,7 +247,11 @@ func matchInitOrJoinConfiguration(machineConfig *bootstrapv1.KubeadmConfig, kcp
// cleanups all the fields that are not relevant for the comparison.
cleanupConfigFields(kcpConfig, machineConfig)

return reflect.DeepEqual(&machineConfig.Spec, kcpConfig)
match, diff, err := compare.Diff(&machineConfig.Spec, kcpConfig)
if err != nil {
return false, "", errors.Wrapf(err, "failed to match InitConfiguration or JoinConfiguration")
}
return match, diff, nil
}

// getAdjustedKcpConfig takes the KubeadmConfigSpec from KCP and applies the transformations required
Expand Down
Loading

0 comments on commit 677625c

Please sign in to comment.