Skip to content

Commit

Permalink
operator: add controller tests for scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolaferraro committed Jun 6, 2022
1 parent cddc480 commit a44f36e
Show file tree
Hide file tree
Showing 11 changed files with 638 additions and 105 deletions.
14 changes: 9 additions & 5 deletions src/go/k8s/controllers/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"reflect"
"strings"
"time"

"github.com/go-logr/logr"
redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
Expand Down Expand Up @@ -46,11 +47,12 @@ var (
// ClusterReconciler reconciles a Cluster object
type ClusterReconciler struct {
client.Client
Log logr.Logger
configuratorSettings resources.ConfiguratorSettings
clusterDomain string
Scheme *runtime.Scheme
AdminAPIClientFactory adminutils.AdminAPIClientFactory
Log logr.Logger
configuratorSettings resources.ConfiguratorSettings
clusterDomain string
Scheme *runtime.Scheme
AdminAPIClientFactory adminutils.AdminAPIClientFactory
DecommissionWaitInterval time.Duration
}

//+kubebuilder:rbac:groups=redpanda.vectorized.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -156,6 +158,8 @@ func (r *ClusterReconciler) Reconcile(
sa.Key().Name,
r.configuratorSettings,
configMapResource.GetNodeConfigHash,
r.AdminAPIClientFactory,
r.DecommissionWaitInterval,
log)

toApply := []resources.Reconciler{
Expand Down
157 changes: 157 additions & 0 deletions src/go/k8s/controllers/redpanda/cluster_controller_common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2022 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package redpanda_test

import (
"context"
"fmt"

v1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func resourceGetter(key client.ObjectKey, res client.Object) func() error {
return func() error {
return k8sClient.Get(context.Background(), key, res)
}
}

func resourceDataGetter(
key client.ObjectKey, res client.Object, extractor func() interface{},
) func() interface{} {
return func() interface{} {
err := resourceGetter(key, res)()
if err != nil {
return err
}
return extractor()
}
}

func annotationGetter(
key client.ObjectKey, res client.Object, name string,
) func() string {
return func() string {
if err := resourceGetter(key, res)(); err != nil {
return fmt.Sprintf("client error: %+v", err)
}
if sts, ok := res.(*appsv1.StatefulSet); ok {
return sts.Spec.Template.Annotations[name]
}
return res.GetAnnotations()[name]
}
}

func clusterConfiguredConditionGetter(
key client.ObjectKey,
) func() *v1alpha1.ClusterCondition {
return func() *v1alpha1.ClusterCondition {
var cluster v1alpha1.Cluster
if err := k8sClient.Get(context.Background(), key, &cluster); err != nil {
return nil
}
return cluster.Status.GetCondition(v1alpha1.ClusterConfiguredConditionType)
}
}

func clusterConfiguredConditionStatusGetter(key client.ObjectKey) func() bool {
return func() bool {
cond := clusterConfiguredConditionGetter(key)()
return cond != nil && cond.Status == corev1.ConditionTrue
}
}

func clusterUpdater(
clusterNamespacedName types.NamespacedName, upd func(*v1alpha1.Cluster),
) func() error {
return func() error {
cl := &v1alpha1.Cluster{}
if err := k8sClient.Get(context.Background(), clusterNamespacedName, cl); err != nil {
return err
}
upd(cl)
return k8sClient.Update(context.Background(), cl)
}
}

func statefulSetReplicasReconciler(
key types.NamespacedName, cluster *v1alpha1.Cluster,
) func() error {
return func() error {
var sts appsv1.StatefulSet
err := k8sClient.Get(context.Background(), key, &sts)
if err != nil {
return err
}

// Aligning Pods first
var podList corev1.PodList
err = k8sClient.List(context.Background(), &podList, &client.ListOptions{
Namespace: key.Namespace,
LabelSelector: labels.ForCluster(cluster).AsClientSelector(),
})
if err != nil {
return err
}

pods := make(map[string]bool, len(podList.Items))
for i := range podList.Items {
pods[podList.Items[i].Name] = true
}

for i := int32(0); i < *sts.Spec.Replicas; i++ {
podName := fmt.Sprintf("%s-%d", key.Name, i)
var pod corev1.Pod
if pods[podName] {
for j := range podList.Items {
if podList.Items[j].Name == podName {
pod = *podList.Items[j].DeepCopy()
}
}
}
pod.Name = podName
pod.Namespace = key.Namespace
pod.Labels = labels.ForCluster(cluster)
pod.Annotations = sts.Spec.Template.Annotations
pod.Spec = sts.Spec.Template.Spec

if pods[podName] {
delete(pods, podName)
err = k8sClient.Update(context.Background(), &pod)
if err != nil {
return err
}
} else {
err = k8sClient.Create(context.Background(), &pod)
if err != nil {
return err
}
}
}

for i := range podList.Items {
if pods[podList.Items[i].Name] {
err = k8sClient.Delete(context.Background(), &podList.Items[i])
if err != nil {
return err
}
}
}

// Aligning StatefulSet
sts.Status.Replicas = *sts.Spec.Replicas
sts.Status.ReadyReplicas = sts.Status.Replicas
return k8sClient.Status().Update(context.Background(), &sts)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
Expand Down Expand Up @@ -820,67 +819,3 @@ func getInitialTestCluster(
}
return key, baseKey, cluster
}

func resourceGetter(key client.ObjectKey, res client.Object) func() error {
return func() error {
return k8sClient.Get(context.Background(), key, res)
}
}

func resourceDataGetter(
key client.ObjectKey, res client.Object, extractor func() interface{},
) func() interface{} {
return func() interface{} {
err := resourceGetter(key, res)()
if err != nil {
return err
}
return extractor()
}
}

func annotationGetter(
key client.ObjectKey, res client.Object, name string,
) func() string {
return func() string {
if err := resourceGetter(key, res)(); err != nil {
return fmt.Sprintf("client error: %+v", err)
}
if sts, ok := res.(*appsv1.StatefulSet); ok {
return sts.Spec.Template.Annotations[name]
}
return res.GetAnnotations()[name]
}
}

func clusterConfiguredConditionGetter(
key client.ObjectKey,
) func() *v1alpha1.ClusterCondition {
return func() *v1alpha1.ClusterCondition {
var cluster v1alpha1.Cluster
if err := k8sClient.Get(context.Background(), key, &cluster); err != nil {
return nil
}
return cluster.Status.GetCondition(v1alpha1.ClusterConfiguredConditionType)
}
}

func clusterConfiguredConditionStatusGetter(key client.ObjectKey) func() bool {
return func() bool {
cond := clusterConfiguredConditionGetter(key)()
return cond != nil && cond.Status == corev1.ConditionTrue
}
}

func clusterUpdater(
clusterNamespacedName types.NamespacedName, upd func(*v1alpha1.Cluster),
) func() error {
return func() error {
cl := &v1alpha1.Cluster{}
if err := k8sClient.Get(context.Background(), clusterNamespacedName, cl); err != nil {
return err
}
upd(cl)
return k8sClient.Update(context.Background(), cl)
}
}
Loading

0 comments on commit a44f36e

Please sign in to comment.