Skip to content

Commit

Permalink
Merge pull request #2378 from detiber/kcpFilterCleanup
Browse files Browse the repository at this point in the history
🏃 Cleanup KubeadmControlPlane machine filters
  • Loading branch information
k8s-ci-robot committed Feb 20, 2020
2 parents 873422d + ac8b360 commit 13d26a5
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package controllers
import (
"context"
"fmt"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -67,7 +66,7 @@ const (
)

type managementCluster interface {
GetMachinesForCluster(ctx context.Context, cluster types.NamespacedName, filters ...func(machine *clusterv1.Machine) bool) ([]*clusterv1.Machine, error)
GetMachinesForCluster(ctx context.Context, cluster types.NamespacedName, filters ...internal.MachineFilter) (internal.FilterableMachineCollection, error)
TargetClusterControlPlaneIsHealthy(ctx context.Context, clusterKey types.NamespacedName, controlPlaneName string) error
TargetClusterEtcdIsHealthy(ctx context.Context, clusterKey types.NamespacedName, controlPlaneName string) error
}
Expand Down Expand Up @@ -235,8 +234,7 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
}

currentConfigurationHash := hash.Compute(&kcp.Spec)
requireUpgrade := internal.FilterMachines(
ownedMachines,
requireUpgrade := ownedMachines.AnyFilter(
internal.Not(internal.MatchesConfigurationHash(currentConfigurationHash)),
internal.OlderThan(kcp.Spec.UpgradeAfter),
)
Expand All @@ -254,7 +252,7 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
}

// If we've made it this far, we don't need to worry about Machines that are older than kcp.Spec.UpgradeAfter
currentMachines := internal.FilterMachines(ownedMachines, internal.MatchesConfigurationHash(currentConfigurationHash))
currentMachines := ownedMachines.Filter(internal.MatchesConfigurationHash(currentConfigurationHash))
numMachines := len(currentMachines)
desiredReplicas := int(*kcp.Spec.Replicas)

Expand Down Expand Up @@ -312,7 +310,7 @@ func (r *KubeadmControlPlaneReconciler) updateStatus(ctx context.Context, kcp *c
return errors.Wrap(err, "failed to get list of owned machines")
}

currentMachines := internal.FilterMachines(ownedMachines, internal.MatchesConfigurationHash(hash.Compute(&kcp.Spec)))
currentMachines := ownedMachines.Filter(internal.MatchesConfigurationHash(hash.Compute(&kcp.Spec)))
kcp.Status.UpdatedReplicas = int32(len(currentMachines))

replicas := int32(len(ownedMachines))
Expand Down Expand Up @@ -347,7 +345,7 @@ func (r *KubeadmControlPlaneReconciler) updateStatus(ctx context.Context, kcp *c
return nil
}

func (r *KubeadmControlPlaneReconciler) upgradeControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, requireUpgrade []*clusterv1.Machine) error {
func (r *KubeadmControlPlaneReconciler) upgradeControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, requireUpgrade internal.FilterableMachineCollection) error {

// TODO: verify health for each existing replica
// TODO: mark an old Machine via the label kubeadm.controlplane.cluster.x-k8s.io/selected-for-upgrade
Expand Down Expand Up @@ -412,13 +410,13 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Contex
}

// Wait for any delete in progress to complete before deleting another Machine
if len(internal.FilterMachines(ownedMachines, internal.HasDeletionTimestamp)) > 0 {
if len(ownedMachines.Filter(internal.HasDeletionTimestamp)) > 0 {
return ctrl.Result{RequeueAfter: DeleteRequeueAfter}, nil
}

machineToDelete, err := oldestMachine(ownedMachines)
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to pick control plane Machine to delete")
machineToDelete := ownedMachines.Oldest()
if machineToDelete == nil {
return ctrl.Result{}, errors.New("failed to pick control plane Machine to delete")
}

if err := r.Client.Delete(ctx, machineToDelete); err != nil && !apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -719,11 +717,3 @@ func getMachineNode(ctx context.Context, crClient client.Client, machine *cluste

return node, nil
}

func oldestMachine(machines []*clusterv1.Machine) (*clusterv1.Machine, error) {
if len(machines) == 0 {
return &clusterv1.Machine{}, errors.New("no machines given")
}
sort.Sort(util.MachinesByCreationTimestamp(machines))
return machines[0], nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -1319,10 +1319,10 @@ func TestKubeadmControlPlaneReconciler_reconcileDelete(t *testing.T) {
type fakeManagementCluster struct {
ControlPlaneHealthy bool
EtcdHealthy bool
Machines []*clusterv1.Machine
Machines internal.FilterableMachineCollection
}

func (f *fakeManagementCluster) GetMachinesForCluster(ctx context.Context, cluster types.NamespacedName, filters ...func(machine *clusterv1.Machine) bool) ([]*clusterv1.Machine, error) {
func (f *fakeManagementCluster) GetMachinesForCluster(ctx context.Context, cluster types.NamespacedName, filters ...internal.MachineFilter) (internal.FilterableMachineCollection, error) {
return f.Machines, nil
}

Expand Down Expand Up @@ -1351,15 +1351,15 @@ func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) {
g.Expect(fakeClient.Create(context.Background(), genericMachineTemplate)).To(Succeed())

fmc := &fakeManagementCluster{
Machines: []*clusterv1.Machine{},
Machines: internal.NewFilterableMachineCollection(),
ControlPlaneHealthy: true,
EtcdHealthy: true,
}

for i := 0; i < 2; i++ {
m, _ := createMachineNodePair(fmt.Sprintf("test-%d", i), cluster, kcp, true)
g.Expect(fakeClient.Create(context.Background(), m)).To(Succeed())
fmc.Machines = append(fmc.Machines, m)
fmc.Machines.Insert(m.DeepCopy())
}

r := &KubeadmControlPlaneReconciler{
Expand Down Expand Up @@ -1410,15 +1410,15 @@ func TestKubeadmControlPlaneReconciler_scaleDownControlPlane(t *testing.T) {
g.Expect(fakeClient.Create(context.Background(), genericMachineTemplate)).To(Succeed())

fmc := &fakeManagementCluster{
Machines: []*clusterv1.Machine{},
Machines: internal.NewFilterableMachineCollection(),
ControlPlaneHealthy: true,
EtcdHealthy: true,
}

for i := 0; i < 2; i++ {
m, _ := createMachineNodePair(fmt.Sprintf("test-%d", i), cluster, kcp, true)
g.Expect(fakeClient.Create(context.Background(), m)).To(Succeed())
fmc.Machines = append(fmc.Machines, m)
fmc.Machines.Insert(m.DeepCopy())
}

r := &KubeadmControlPlaneReconciler{
Expand Down Expand Up @@ -1446,15 +1446,15 @@ func TestKubeadmControlPlaneReconciler_scaleDownControlPlane(t *testing.T) {
g.Expect(fakeClient.Create(context.Background(), genericMachineTemplate)).To(Succeed())

fmc := &fakeManagementCluster{
Machines: []*clusterv1.Machine{},
Machines: internal.NewFilterableMachineCollection(),
ControlPlaneHealthy: true,
EtcdHealthy: true,
}

for i := 0; i < 2; i++ {
m, _ := createMachineNodePair(fmt.Sprintf("test-%d", i), cluster, kcp, true)
g.Expect(fakeClient.Create(context.Background(), m)).To(Succeed())
fmc.Machines = append(fmc.Machines, m)
fmc.Machines.Insert(m.DeepCopy())
}

r := &KubeadmControlPlaneReconciler{
Expand Down
40 changes: 7 additions & 33 deletions controlplane/kubeadm/internal/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,31 +50,9 @@ type ManagementCluster struct {
Client ctrlclient.Client
}

// FilterMachines returns a filtered list of machines
func FilterMachines(machines []*clusterv1.Machine, filters ...func(machine *clusterv1.Machine) bool) []*clusterv1.Machine {
if len(filters) == 0 {
return machines
}

filteredMachines := make([]*clusterv1.Machine, 0, len(machines))
for _, machine := range machines {
add := true
for _, filter := range filters {
if !filter(machine) {
add = false
break
}
}
if add {
filteredMachines = append(filteredMachines, machine)
}
}
return filteredMachines
}

// GetMachinesForCluster returns a list of machines that can be filtered or not.
// If no filter is supplied then all machines associated with the target cluster are returned.
func (m *ManagementCluster) GetMachinesForCluster(ctx context.Context, cluster types.NamespacedName, filters ...func(machine *clusterv1.Machine) bool) ([]*clusterv1.Machine, error) {
func (m *ManagementCluster) GetMachinesForCluster(ctx context.Context, cluster types.NamespacedName, filters ...MachineFilter) (FilterableMachineCollection, error) {
selector := map[string]string{
clusterv1.ClusterLabelName: cluster.Name,
}
Expand All @@ -83,12 +61,8 @@ func (m *ManagementCluster) GetMachinesForCluster(ctx context.Context, cluster t
return nil, errors.Wrap(err, "failed to list machines")
}

machines := make([]*clusterv1.Machine, 0, len(ml.Items))
for i := range ml.Items {
machines = append(machines, &ml.Items[i])
}

return FilterMachines(machines, filters...), nil
machines := NewFilterableMachineCollectionFromMachineList(ml)
return machines.Filter(filters...), nil
}

// getCluster builds a cluster object.
Expand All @@ -114,7 +88,7 @@ func (m *ManagementCluster) getCluster(ctx context.Context, clusterKey types.Nam
client: c,
restConfig: restConfig,
etcdCACert: etcdCACert,
etcdCAkey: etcdCAKey,
etcdCAKey: etcdCAKey,
}, nil
}

Expand Down Expand Up @@ -204,12 +178,12 @@ type cluster struct {
client ctrlclient.Client
// restConfig is required for the proxy.
restConfig *rest.Config
etcdCACert, etcdCAkey []byte
etcdCACert, etcdCAKey []byte
}

// generateEtcdTLSClientBundle builds an etcd client TLS bundle from the Etcd CA for this cluster.
func (c *cluster) generateEtcdTLSClientBundle() (*tls.Config, error) {
clientCert, err := generateClientCert(c.etcdCACert, c.etcdCAkey)
clientCert, err := generateClientCert(c.etcdCACert, c.etcdCAKey)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -361,7 +335,7 @@ func (c *cluster) getEtcdClientForNode(nodeName string, tlsConfig *tls.Config) (
// This does not support external etcd.
p := proxy.Proxy{
Kind: "pods",
Namespace: "kube-system", // TODO, can etcd ever run in a different namespace?
Namespace: metav1.NamespaceSystem, // TODO, can etcd ever run in a different namespace?
ResourceName: staticPodName("etcd", nodeName),
KubeConfig: c.restConfig,
TLSConfig: tlsConfig,
Expand Down
3 changes: 2 additions & 1 deletion controlplane/kubeadm/internal/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
"sigs.k8s.io/controller-runtime/pkg/client"

clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
)

func podReady(isReady corev1.ConditionStatus) corev1.PodCondition {
Expand Down
14 changes: 7 additions & 7 deletions controlplane/kubeadm/internal/etcd/util/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

type empty struct{}

// util.UInt64Set is a set of int64s, implemented via map[uint64]struct{} for minimal memory consumption.
// util.UInt64Set is a set of uint64s, implemented via map[uint64]struct{} for minimal memory consumption.
type UInt64Set map[uint64]empty

// NewUInt64Set creates a UInt64Set from a list of values.
Expand Down Expand Up @@ -156,15 +156,15 @@ func (s UInt64Set) Equal(s2 UInt64Set) bool {
return len(s1) == len(s2) && s1.IsSuperset(s2)
}

type sortableSliceOfInt64 []uint64
type sortableSliceOfUInt64 []uint64

func (s sortableSliceOfInt64) Len() int { return len(s) }
func (s sortableSliceOfInt64) Less(i, j int) bool { return lessInt64(s[i], s[j]) }
func (s sortableSliceOfInt64) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s sortableSliceOfUInt64) Len() int { return len(s) }
func (s sortableSliceOfUInt64) Less(i, j int) bool { return lessUInt64(s[i], s[j]) }
func (s sortableSliceOfUInt64) Swap(i, j int) { s[i], s[j] = s[j], s[i] }

// List returns the contents as a sorted uint64 slice.
func (s UInt64Set) List() []uint64 {
res := make(sortableSliceOfInt64, 0, len(s))
res := make(sortableSliceOfUInt64, 0, len(s))
for key := range s {
res = append(res, key)
}
Expand Down Expand Up @@ -196,6 +196,6 @@ func (s UInt64Set) Len() int {
return len(s)
}

func lessInt64(lhs, rhs uint64) bool {
func lessUInt64(lhs, rhs uint64) bool {
return lhs < rhs
}
6 changes: 3 additions & 3 deletions controlplane/kubeadm/internal/failure_domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (f failureDomainAggregations) Swap(i, j int) {
}

// PickMost returns the failure domain with the most number of machines.
func PickMost(failureDomains clusterv1.FailureDomains, machines []*clusterv1.Machine) string {
func PickMost(failureDomains clusterv1.FailureDomains, machines FilterableMachineCollection) string {
aggregations := pick(failureDomains, machines)
if len(aggregations) == 0 {
return ""
Expand All @@ -60,7 +60,7 @@ func PickMost(failureDomains clusterv1.FailureDomains, machines []*clusterv1.Mac
}

// PickFewest returns the failure domain with the fewest number of machines.
func PickFewest(failureDomains clusterv1.FailureDomains, machines []*clusterv1.Machine) string {
func PickFewest(failureDomains clusterv1.FailureDomains, machines FilterableMachineCollection) string {
aggregations := pick(failureDomains, machines)
if len(aggregations) == 0 {
return ""
Expand All @@ -69,7 +69,7 @@ func PickFewest(failureDomains clusterv1.FailureDomains, machines []*clusterv1.M
return aggregations[0].id
}

func pick(failureDomains clusterv1.FailureDomains, machines []*clusterv1.Machine) failureDomainAggregations {
func pick(failureDomains clusterv1.FailureDomains, machines FilterableMachineCollection) failureDomainAggregations {
if len(failureDomains) == 0 {
return failureDomainAggregations{}
}
Expand Down
19 changes: 6 additions & 13 deletions controlplane/kubeadm/internal/failure_domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestNewFailureDomainPicker(t *testing.T) {
testcases := []struct {
name string
fds clusterv1.FailureDomains
machines []*clusterv1.Machine
machines FilterableMachineCollection
expected []string
}{
{
Expand All @@ -52,37 +52,30 @@ func TestNewFailureDomainPicker(t *testing.T) {
expected: []string{a},
},
{
name: "one machine in a failure domain",
fds: fds,
machines: []*clusterv1.Machine{
machinea.DeepCopy(),
},
name: "one machine in a failure domain",
fds: fds,
machines: NewFilterableMachineCollection(machinea.DeepCopy()),
expected: []string{b},
},
{
name: "no failure domain specified on machine",
fds: clusterv1.FailureDomains{
a: clusterv1.FailureDomainSpec{},
},
machines: []*clusterv1.Machine{
machinenil.DeepCopy(),
},
machines: NewFilterableMachineCollection(machinenil.DeepCopy()),
expected: []string{a, b},
},
{
name: "mismatched failure domain on machine",
fds: clusterv1.FailureDomains{
a: clusterv1.FailureDomainSpec{},
},
machines: []*clusterv1.Machine{
machineb.DeepCopy(),
},
machines: NewFilterableMachineCollection(machineb.DeepCopy()),
expected: []string{a},
},
{
name: "failure domains and no machines should return a valid failure domain",
fds: fds,
machines: []*clusterv1.Machine{},
expected: []string{a, b},
},
}
Expand Down
6 changes: 0 additions & 6 deletions controlplane/kubeadm/internal/machine_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,6 @@ func OlderThan(t *metav1.Time) MachineFilter {
}
}

// SelectedForUpgrade is a MachineFilter to find all machines that have the
// controlplanev1.SelectedForUpgradeAnnotation set.
func SelectedForUpgrade(machine *clusterv1.Machine) bool {
return HasAnnotationKey(controlplanev1.SelectedForUpgradeAnnotation)(machine)
}

// HasAnnotationKey returns a MachineFilter function to find all machines that have the
// specified Annotation key present
func HasAnnotationKey(key string) MachineFilter {
Expand Down
Loading

0 comments on commit 13d26a5

Please sign in to comment.