Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 Add compare util using go-cmp, modify webhooks & KCP controller #10628

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions controlplane/kubeadm/internal/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,15 @@ func (c *ControlPlane) FailureDomainWithMostMachines(ctx context.Context, machin
}

// NextFailureDomainForScaleUp returns the failure domain with the fewest number of up-to-date machines.
func (c *ControlPlane) NextFailureDomainForScaleUp(ctx context.Context) *string {
func (c *ControlPlane) NextFailureDomainForScaleUp(ctx context.Context) (*string, error) {
if len(c.Cluster.Status.FailureDomains.FilterControlPlane()) == 0 {
return nil
return nil, nil
}
return failuredomains.PickFewest(ctx, c.FailureDomains().FilterControlPlane(), c.UpToDateMachines())
upToDateMachines, err := c.UpToDateMachines()
if err != nil {
return nil, errors.Wrapf(err, "failed to determine next failure domain for scale up")
}
return failuredomains.PickFewest(ctx, c.FailureDomains().FilterControlPlane(), upToDateMachines), nil
}

// InitialControlPlaneConfig returns a new KubeadmConfigSpec that is to be used for an initializing control plane.
Expand Down Expand Up @@ -167,34 +171,40 @@ func (c *ControlPlane) GetKubeadmConfig(machineName string) (*bootstrapv1.Kubead
}

// MachinesNeedingRollout return a list of machines that need to be rolled out.
func (c *ControlPlane) MachinesNeedingRollout() (collections.Machines, map[string]string) {
func (c *ControlPlane) MachinesNeedingRollout() (collections.Machines, map[string]string, error) {
// Ignore machines to be deleted.
machines := c.Machines.Filter(collections.Not(collections.HasDeletionTimestamp))

// Return machines if they are scheduled for rollout or if with an outdated configuration.
machinesNeedingRollout := make(collections.Machines, len(machines))
rolloutReasons := map[string]string{}
for _, m := range machines {
reason, needsRollout := NeedsRollout(&c.reconciliationTime, c.KCP.Spec.RolloutAfter, c.KCP.Spec.RolloutBefore, c.InfraResources, c.KubeadmConfigs, c.KCP, m)
reason, needsRollout, err := NeedsRollout(&c.reconciliationTime, c.KCP.Spec.RolloutAfter, c.KCP.Spec.RolloutBefore, c.InfraResources, c.KubeadmConfigs, c.KCP, m)
if err != nil {
return nil, nil, err
}
if needsRollout {
machinesNeedingRollout.Insert(m)
rolloutReasons[m.Name] = reason
}
}
return machinesNeedingRollout, rolloutReasons
return machinesNeedingRollout, rolloutReasons, nil
}

// UpToDateMachines returns the machines that are up to date with the control
// plane's configuration and therefore do not require rollout.
func (c *ControlPlane) UpToDateMachines() collections.Machines {
func (c *ControlPlane) UpToDateMachines() (collections.Machines, error) {
upToDateMachines := make(collections.Machines, len(c.Machines))
for _, m := range c.Machines {
_, needsRollout := NeedsRollout(&c.reconciliationTime, c.KCP.Spec.RolloutAfter, c.KCP.Spec.RolloutBefore, c.InfraResources, c.KubeadmConfigs, c.KCP, m)
_, needsRollout, err := NeedsRollout(&c.reconciliationTime, c.KCP.Spec.RolloutAfter, c.KCP.Spec.RolloutBefore, c.InfraResources, c.KubeadmConfigs, c.KCP, m)
if err != nil {
return nil, err
}
if !needsRollout {
upToDateMachines.Insert(m)
}
}
return upToDateMachines
return upToDateMachines, nil
}

// getInfraResources fetches the external infrastructure resource for each machine in the collection and returns a map of machine.Name -> infraResource.
Expand Down
7 changes: 5 additions & 2 deletions controlplane/kubeadm/internal/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (r *KubeadmControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.
if errors.As(err, &connFailure) {
log.Error(err, "Could not connect to workload cluster to fetch status")
} else {
log.Error(err, "Failed to update KubeadmControlPlane Status")
log.Error(err, "Failed to update KubeadmControlPlane status")
reterr = kerrors.NewAggregate([]error{reterr, err})
}
}
Expand Down Expand Up @@ -399,7 +399,10 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, controlPl
}

// Control plane machines rollout due to configuration changes (e.g. upgrades) takes precedence over other operations.
machinesNeedingRollout, rolloutReasons := controlPlane.MachinesNeedingRollout()
machinesNeedingRollout, rolloutReasons, err := controlPlane.MachinesNeedingRollout()
if err != nil {
return ctrl.Result{}, err
}
switch {
case len(machinesNeedingRollout) > 0:
var reasons []string
Expand Down
12 changes: 10 additions & 2 deletions controlplane/kubeadm/internal/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Conte
logger := ctrl.LoggerFrom(ctx)

bootstrapSpec := controlPlane.InitialControlPlaneConfig()
fd := controlPlane.NextFailureDomainForScaleUp(ctx)
fd, err := controlPlane.NextFailureDomainForScaleUp(ctx)
if err != nil {
return ctrl.Result{}, err
}

if err := r.cloneConfigsAndGenerateMachine(ctx, controlPlane.Cluster, controlPlane.KCP, bootstrapSpec, fd); err != nil {
logger.Error(err, "Failed to create initial control plane Machine")
r.recorder.Eventf(controlPlane.KCP, corev1.EventTypeWarning, "FailedInitialization", "Failed to create initial control plane Machine for cluster %s control plane: %v", klog.KObj(controlPlane.Cluster), err)
Expand All @@ -60,7 +64,11 @@ func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context,

// Create the bootstrap configuration
bootstrapSpec := controlPlane.JoinControlPlaneConfig()
fd := controlPlane.NextFailureDomainForScaleUp(ctx)
fd, err := controlPlane.NextFailureDomainForScaleUp(ctx)
if err != nil {
return ctrl.Result{}, err
}

if err := r.cloneConfigsAndGenerateMachine(ctx, controlPlane.Cluster, controlPlane.KCP, bootstrapSpec, fd); err != nil {
logger.Error(err, "Failed to create additional control plane Machine")
r.recorder.Eventf(controlPlane.KCP, corev1.EventTypeWarning, "FailedScaleUp", "Failed to create additional control plane Machine for cluster % control plane: %v", klog.KObj(controlPlane.Cluster), err)
Expand Down
6 changes: 5 additions & 1 deletion controlplane/kubeadm/internal/controllers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ func (r *KubeadmControlPlaneReconciler) updateStatus(ctx context.Context, contro
// This is necessary for CRDs including scale subresources.
controlPlane.KCP.Status.Selector = selector.String()

controlPlane.KCP.Status.UpdatedReplicas = int32(len(controlPlane.UpToDateMachines()))
upToDateMachines, err := controlPlane.UpToDateMachines()
if err != nil {
return errors.Wrapf(err, "failed to update status")
}
controlPlane.KCP.Status.UpdatedReplicas = int32(len(upToDateMachines))

replicas := int32(len(controlPlane.Machines))
desiredReplicas := *controlPlane.KCP.Spec.Replicas
Expand Down
74 changes: 50 additions & 24 deletions controlplane/kubeadm/internal/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"reflect"
"strings"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1beta1"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
"sigs.k8s.io/cluster-api/internal/util/compare"
"sigs.k8s.io/cluster-api/util/collections"
)

Expand All @@ -39,7 +41,7 @@ import (
// - mutated in-place (ex: NodeDrainTimeout)
// - are not dictated by KCP (ex: ProviderID)
// - are not relevant for the rollout decision (ex: failureDomain).
func matchesMachineSpec(infraConfigs map[string]*unstructured.Unstructured, machineConfigs map[string]*bootstrapv1.KubeadmConfig, kcp *controlplanev1.KubeadmControlPlane, machine *clusterv1.Machine) (string, bool) {
func matchesMachineSpec(infraConfigs map[string]*unstructured.Unstructured, machineConfigs map[string]*bootstrapv1.KubeadmConfig, kcp *controlplanev1.KubeadmControlPlane, machine *clusterv1.Machine) (string, bool, error) {
mismatchReasons := []string{}

if !collections.MatchesKubernetesVersion(kcp.Spec.Version)(machine) {
Expand All @@ -50,7 +52,11 @@ func matchesMachineSpec(infraConfigs map[string]*unstructured.Unstructured, mach
mismatchReasons = append(mismatchReasons, fmt.Sprintf("Machine version %q is not equal to KCP version %q", machineVersion, kcp.Spec.Version))
}

if reason, matches := matchesKubeadmBootstrapConfig(machineConfigs, kcp, machine); !matches {
reason, matches, err := matchesKubeadmBootstrapConfig(machineConfigs, kcp, machine)
if err != nil {
return "", false, errors.Wrapf(err, "failed to match Machine spec")
}
if !matches {
mismatchReasons = append(mismatchReasons, reason)
}

Expand All @@ -59,14 +65,14 @@ func matchesMachineSpec(infraConfigs map[string]*unstructured.Unstructured, mach
}

if len(mismatchReasons) > 0 {
return strings.Join(mismatchReasons, ","), false
return strings.Join(mismatchReasons, ","), false, nil
}

return "", true
return "", true, nil
}

// NeedsRollout checks if a Machine needs to be rolled out and returns the reason why.
func NeedsRollout(reconciliationTime, rolloutAfter *metav1.Time, rolloutBefore *controlplanev1.RolloutBefore, infraConfigs map[string]*unstructured.Unstructured, machineConfigs map[string]*bootstrapv1.KubeadmConfig, kcp *controlplanev1.KubeadmControlPlane, machine *clusterv1.Machine) (string, bool) {
func NeedsRollout(reconciliationTime, rolloutAfter *metav1.Time, rolloutBefore *controlplanev1.RolloutBefore, infraConfigs map[string]*unstructured.Unstructured, machineConfigs map[string]*bootstrapv1.KubeadmConfig, kcp *controlplanev1.KubeadmControlPlane, machine *clusterv1.Machine) (string, bool, error) {
rolloutReasons := []string{}

// Machines whose certificates are about to expire.
Expand All @@ -81,15 +87,19 @@ func NeedsRollout(reconciliationTime, rolloutAfter *metav1.Time, rolloutBefore *
}

// Machines that do not match with KCP config.
if mismatchReason, matches := matchesMachineSpec(infraConfigs, machineConfigs, kcp, machine); !matches {
mismatchReason, matches, err := matchesMachineSpec(infraConfigs, machineConfigs, kcp, machine)
if err != nil {
return "", false, errors.Wrapf(err, "failed to determine if Machine %s needs rollout", machine.Name)
}
if !matches {
rolloutReasons = append(rolloutReasons, mismatchReason)
}

if len(rolloutReasons) > 0 {
return fmt.Sprintf("Machine %s needs rollout: %s", machine.Name, strings.Join(rolloutReasons, ",")), true
return fmt.Sprintf("Machine %s needs rollout: %s", machine.Name, strings.Join(rolloutReasons, ",")), true, nil
}

return "", false
return "", false, nil
}

// matchesTemplateClonedFrom checks if a Machine has a corresponding infrastructure machine that
Expand Down Expand Up @@ -130,50 +140,58 @@ func matchesTemplateClonedFrom(infraConfigs map[string]*unstructured.Unstructure
// matchesKubeadmBootstrapConfig checks if machine's KubeadmConfigSpec is equivalent with KCP's KubeadmConfigSpec.
// Note: Differences to the labels and annotations on the KubeadmConfig are not considered for matching
// criteria, because changes to labels and annotations are propagated in-place to KubeadmConfig.
func matchesKubeadmBootstrapConfig(machineConfigs map[string]*bootstrapv1.KubeadmConfig, kcp *controlplanev1.KubeadmControlPlane, machine *clusterv1.Machine) (string, bool) {
func matchesKubeadmBootstrapConfig(machineConfigs map[string]*bootstrapv1.KubeadmConfig, kcp *controlplanev1.KubeadmControlPlane, machine *clusterv1.Machine) (string, bool, error) {
if machine == nil {
return "Machine KubeadmConfig cannot be compared: Machine is nil", false
return "Machine KubeadmConfig cannot be compared: Machine is nil", false, nil
}

// Check if KCP and machine ClusterConfiguration matches, if not return
if !matchClusterConfiguration(kcp, machine) {
return "Machine ClusterConfiguration is outdated", false
match, diff, err := matchClusterConfiguration(kcp, machine)
if err != nil {
return "", false, errors.Wrapf(err, "failed to match KubeadmConfig")
}
if !match {
return fmt.Sprintf("Machine KubeadmConfig ClusterConfiguration is outdated: diff: %s", diff), false, nil
}

bootstrapRef := machine.Spec.Bootstrap.ConfigRef
if bootstrapRef == nil {
// Missing bootstrap reference should not be considered as unmatching.
// This is a safety precaution to avoid selecting machines that are broken, which in the future should be remediated separately.
return "", true
return "", true, nil
}

machineConfig, found := machineConfigs[machine.Name]
if !found {
// Return true here because failing to get KubeadmConfig should not be considered as unmatching.
// This is a safety precaution to avoid rolling out machines if the client or the api-server is misbehaving.
return "", true
return "", true, nil
}

// Check if KCP and machine InitConfiguration or JoinConfiguration matches
// NOTE: only one between init configuration and join configuration is set on a machine, depending
// on the fact that the machine was the initial control plane node or a joining control plane node.
if !matchInitOrJoinConfiguration(machineConfig, kcp) {
return "Machine InitConfiguration or JoinConfiguration are outdated", false
match, diff, err = matchInitOrJoinConfiguration(machineConfig, kcp)
if err != nil {
return "", false, errors.Wrapf(err, "failed to match KubeadmConfig")
}
if !match {
return fmt.Sprintf("Machine KubeadmConfig InitConfiguration or JoinConfiguration are outdated: diff: %s", diff), false, nil
}

return "", true
return "", true, nil
}

// matchClusterConfiguration verifies if KCP and machine ClusterConfiguration matches.
// NOTE: Machines that have KubeadmClusterConfigurationAnnotation will have to match with KCP ClusterConfiguration.
// If the annotation is not present (machine is either old or adopted), we won't roll out on any possible changes
// made in KCP's ClusterConfiguration given that we don't have enough information to make a decision.
// Users should use KCP.Spec.RolloutAfter field to force a rollout in this case.
func matchClusterConfiguration(kcp *controlplanev1.KubeadmControlPlane, machine *clusterv1.Machine) bool {
func matchClusterConfiguration(kcp *controlplanev1.KubeadmControlPlane, machine *clusterv1.Machine) (bool, string, error) {
machineClusterConfigStr, ok := machine.GetAnnotations()[controlplanev1.KubeadmClusterConfigurationAnnotation]
if !ok {
// We don't have enough information to make a decision; don't' trigger a roll out.
return true
return true, "", nil
}

machineClusterConfig := &bootstrapv1.ClusterConfiguration{}
Expand All @@ -182,7 +200,7 @@ func matchClusterConfiguration(kcp *controlplanev1.KubeadmControlPlane, machine
// otherwise we won't be able to handle a nil ClusterConfiguration (that is serialized into "null").
// See https://github.com/kubernetes-sigs/cluster-api/issues/3353.
if err := json.Unmarshal([]byte(machineClusterConfigStr), &machineClusterConfig); err != nil {
return false
return false, "", nil //nolint:nilerr // Intentionally not returning the error here
}

// If any of the compared values are nil, treat them the same as an empty ClusterConfiguration.
Expand All @@ -199,16 +217,20 @@ func matchClusterConfiguration(kcp *controlplanev1.KubeadmControlPlane, machine
machineClusterConfig.DNS = kcpLocalClusterConfiguration.DNS

// Compare and return.
return reflect.DeepEqual(machineClusterConfig, kcpLocalClusterConfiguration)
match, diff, err := compare.Diff(machineClusterConfig, kcpLocalClusterConfiguration)
if err != nil {
return false, "", errors.Wrapf(err, "failed to match ClusterConfiguration")
}
return match, diff, nil
}

// matchInitOrJoinConfiguration verifies if KCP and machine InitConfiguration or JoinConfiguration matches.
// NOTE: By extension this method takes care of detecting changes in other fields of the KubeadmConfig configuration (e.g. Files, Mounts etc.)
func matchInitOrJoinConfiguration(machineConfig *bootstrapv1.KubeadmConfig, kcp *controlplanev1.KubeadmControlPlane) bool {
func matchInitOrJoinConfiguration(machineConfig *bootstrapv1.KubeadmConfig, kcp *controlplanev1.KubeadmControlPlane) (bool, string, error) {
if machineConfig == nil {
// Return true here because failing to get KubeadmConfig should not be considered as unmatching.
// This is a safety precaution to avoid rolling out machines if the client or the api-server is misbehaving.
return true
return true, "", nil
}

// takes the KubeadmConfigSpec from KCP and applies the transformations required
Expand All @@ -225,7 +247,11 @@ func matchInitOrJoinConfiguration(machineConfig *bootstrapv1.KubeadmConfig, kcp
// cleanups all the fields that are not relevant for the comparison.
cleanupConfigFields(kcpConfig, machineConfig)

return reflect.DeepEqual(&machineConfig.Spec, kcpConfig)
match, diff, err := compare.Diff(&machineConfig.Spec, kcpConfig)
if err != nil {
return false, "", errors.Wrapf(err, "failed to match InitConfiguration or JoinConfiguration")
}
return match, diff, nil
}

// getAdjustedKcpConfig takes the KubeadmConfigSpec from KCP and applies the transformations required
Expand Down
Loading
Loading