diff --git a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go index fa55ec50f913..5afd7688ef74 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go @@ -293,9 +293,15 @@ type ClusterStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file - // Replicas show how many nodes are working in the cluster + // Replicas show how many nodes have been created for the cluster // +optional Replicas int32 `json:"replicas"` + // ReadyReplicas is the number of Pods belonging to the cluster that have a Ready Condition. + // +optional + ReadyReplicas int32 `json:"readyReplicas,omitempty"` + // CurrentReplicas is the number of Pods that the controller currently wants to run for the cluster. + // +optional + CurrentReplicas int32 `json:"currentReplicas,omitempty"` // Nodes of the provisioned redpanda nodes // +optional Nodes NodesList `json:"nodes,omitempty"` @@ -306,6 +312,9 @@ type ClusterStatus struct { // Indicates that a cluster is restarting due to an upgrade or a different reason // +optional Restarting bool `json:"restarting"` + // Indicates that a node is currently being decommissioned from the cluster and provides its ordinal number + // +optional + DecommissioningNode *int32 `json:"decommissioningNode,omitempty"` // Current version of the cluster. // +optional Version string `json:"version"` @@ -847,6 +856,30 @@ func (s *ClusterStatus) SetRestarting(restarting bool) { s.DeprecatedUpgrading = restarting } +// GetCurrentReplicas returns the current number of replicas that the controller wants to run. +// It returns 1 when not initialized (as fresh clusters start from 1 replica) +func (r *Cluster) GetCurrentReplicas() int32 { + if r.Status.CurrentReplicas <= 0 { + // Not initialized, let's give the computed value + return r.ComputeInitialCurrentReplicasField() + } + return r.Status.CurrentReplicas +} + +// ComputeInitialCurrentReplicasField calculates the initial value for status.currentReplicas. +// +// It needs to consider the following cases: +// - Fresh cluster: we start from 1 replicas, then upscale if needed (initialization to bypass https://github.com/redpanda-data/redpanda/issues/333) +// - Existing clusters: we keep spec.replicas as starting point +func (r *Cluster) ComputeInitialCurrentReplicasField() int32 { + if r.Status.Replicas > 1 || r.Status.ReadyReplicas > 1 || len(r.Status.Nodes.Internal) > 1 { + // A cluster seems to be already running, we start from the existing amount of replicas + return *r.Spec.Replicas + } + // Clusters start from a single replica, then upscale + return 1 +} + // TLSConfig is a generic TLS configuration type TLSConfig struct { Enabled bool `json:"enabled,omitempty"` diff --git a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go index 5203cae07751..e4a8e2fe3c1a 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/utils/pointer" ) // nolint:funlen // this is ok for a test @@ -216,3 +217,19 @@ func TestConditions(t *testing.T) { assert.Equal(t, condTime, cond2.LastTransitionTime) }) } + +func TestInitialReplicas(t *testing.T) { + cluster := v1alpha1.Cluster{} + cluster.Spec.Replicas = pointer.Int32(3) + assert.Equal(t, int32(1), cluster.GetCurrentReplicas()) + cluster.Status.Replicas = 2 + assert.Equal(t, int32(3), cluster.GetCurrentReplicas()) + cluster.Status.Replicas = 0 + cluster.Status.ReadyReplicas = 2 + assert.Equal(t, int32(3), cluster.GetCurrentReplicas()) + cluster.Status.ReadyReplicas = 0 + cluster.Status.Nodes.Internal = []string{"1", "2"} + assert.Equal(t, int32(3), cluster.GetCurrentReplicas()) + cluster.Status.Nodes.Internal = nil + assert.Equal(t, int32(1), cluster.GetCurrentReplicas()) +} diff --git a/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go b/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go index c271d566af4a..664054b6f81e 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go @@ -43,6 +43,11 @@ const ( defaultSchemaRegistryPort = 8081 ) +// AllowDownscalingInWebhook controls the downscaling alpha feature in the Cluster custom resource. +// Downscaling is not stable since nodeIDs are currently not reusable, so adding to a cluster a node +// that has previously been decommissioned can cause issues. +var AllowDownscalingInWebhook = false + type resourceField struct { resources *corev1.ResourceRequirements path *field.Path @@ -136,6 +141,8 @@ func (r *Cluster) ValidateCreate() error { var allErrs field.ErrorList + allErrs = append(allErrs, r.validateScaling()...) + allErrs = append(allErrs, r.validateKafkaListeners()...) allErrs = append(allErrs, r.validateAdminListeners()...) @@ -173,12 +180,10 @@ func (r *Cluster) ValidateUpdate(old runtime.Object) error { oldCluster := old.(*Cluster) var allErrs field.ErrorList - if r.Spec.Replicas != nil && oldCluster.Spec.Replicas != nil && *r.Spec.Replicas < *oldCluster.Spec.Replicas { - allErrs = append(allErrs, - field.Invalid(field.NewPath("spec").Child("replicas"), - r.Spec.Replicas, - "scaling down is not supported")) - } + allErrs = append(allErrs, r.validateScaling()...) + + allErrs = append(allErrs, r.validateDownscaling(oldCluster)...) + allErrs = append(allErrs, r.validateKafkaListeners()...) allErrs = append(allErrs, r.validateAdminListeners()...) @@ -212,6 +217,34 @@ func (r *Cluster) ValidateUpdate(old runtime.Object) error { r.Name, allErrs) } +func (r *Cluster) validateScaling() field.ErrorList { + var allErrs field.ErrorList + if r.Spec.Replicas == nil { + allErrs = append(allErrs, + field.Invalid(field.NewPath("spec").Child("replicas"), + r.Spec.Replicas, + "replicas must be specified explicitly")) + } else if *r.Spec.Replicas <= 0 { + allErrs = append(allErrs, + field.Invalid(field.NewPath("spec").Child("replicas"), + r.Spec.Replicas, + "downscaling is not allowed to less than 1 instance")) + } + + return allErrs +} + +func (r *Cluster) validateDownscaling(old *Cluster) field.ErrorList { + var allErrs field.ErrorList + if !AllowDownscalingInWebhook && old.Spec.Replicas != nil && r.Spec.Replicas != nil && *r.Spec.Replicas < *old.Spec.Replicas { + allErrs = append(allErrs, + field.Invalid(field.NewPath("spec").Child("replicas"), + r.Spec.Replicas, + "downscaling is an alpha feature: set --allow-downscaling in the controller parameters to enable it")) + } + return allErrs +} + func (r *Cluster) validateAdminListeners() field.ErrorList { var allErrs field.ErrorList externalAdmin := r.AdminAPIExternal() diff --git a/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook_test.go b/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook_test.go index 7533669c7475..4eab69e3e275 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook_test.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook_test.go @@ -17,6 +17,7 @@ import ( cmmeta "github.com/jetstack/cert-manager/pkg/apis/meta/v1" "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -200,8 +201,8 @@ func TestDefault(t *testing.T) { } func TestValidateUpdate(t *testing.T) { - var replicas1 int32 = 1 - var replicas2 int32 = 2 + var replicas0 int32 + var replicas3 int32 = 3 redpandaCluster := &v1alpha1.Cluster{ ObjectMeta: metav1.ObjectMeta{ @@ -209,7 +210,7 @@ func TestValidateUpdate(t *testing.T) { Namespace: "", }, Spec: v1alpha1.ClusterSpec{ - Replicas: pointer.Int32Ptr(replicas2), + Replicas: pointer.Int32Ptr(replicas3), Configuration: v1alpha1.RedpandaConfig{}, Resources: v1alpha1.RedpandaResourceRequirements{ ResourceRequirements: corev1.ResourceRequirements{ @@ -224,7 +225,7 @@ func TestValidateUpdate(t *testing.T) { } updatedCluster := redpandaCluster.DeepCopy() - updatedCluster.Spec.Replicas = &replicas1 + updatedCluster.Spec.Replicas = &replicas0 updatedCluster.Spec.Configuration = v1alpha1.RedpandaConfig{ KafkaAPI: []v1alpha1.KafkaAPI{ { @@ -272,6 +273,15 @@ func TestValidateUpdate(t *testing.T) { } } +func TestNilReplicasIsNotAllowed(t *testing.T) { + rpCluster := validRedpandaCluster() + err := rpCluster.ValidateCreate() + require.Nil(t, err, "Initial cluster is not valid") + rpCluster.Spec.Replicas = nil + err = rpCluster.ValidateCreate() + assert.Error(t, err) +} + //nolint:funlen // this is ok for a test func TestValidateUpdate_NoError(t *testing.T) { var replicas2 int32 = 2 @@ -1097,6 +1107,7 @@ func validRedpandaCluster() *v1alpha1.Cluster { Namespace: "", }, Spec: v1alpha1.ClusterSpec{ + Replicas: pointer.Int32Ptr(1), Configuration: v1alpha1.RedpandaConfig{ KafkaAPI: []v1alpha1.KafkaAPI{{Port: 124}}, AdminAPI: []v1alpha1.AdminAPI{{Port: 126}}, diff --git a/src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go b/src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go index 0f2d6a7927db..5d9dfd0a6513 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go @@ -221,6 +221,11 @@ func (in *ClusterSpec) DeepCopy() *ClusterSpec { func (in *ClusterStatus) DeepCopyInto(out *ClusterStatus) { *out = *in in.Nodes.DeepCopyInto(&out.Nodes) + if in.DecommissioningNode != nil { + in, out := &in.DecommissioningNode, &out.DecommissioningNode + *out = new(int32) + **out = **in + } if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions *out = make([]ClusterCondition, len(*in)) diff --git a/src/go/k8s/cmd/configurator/main.go b/src/go/k8s/cmd/configurator/main.go index b019540579ec..8175c337f0c9 100644 --- a/src/go/k8s/cmd/configurator/main.go +++ b/src/go/k8s/cmd/configurator/main.go @@ -131,9 +131,11 @@ func main() { cfg.Redpanda.ID = int(hostIndex) - // First Redpanda node need to have cleared seed servers in order - // to form raft group 0 - if hostIndex == 0 { + // In case of a single seed server, the list should contain the current node itself. + // Normally the cluster is able to recognize it's talking to itself, except when the cluster is + // configured to use mutual TLS on the Kafka API (see Helm test). + // So, we clear the list of seeds to help Redpanda. + if len(cfg.Redpanda.SeedServers) == 1 { cfg.Redpanda.SeedServers = []config.SeedServer{} } diff --git a/src/go/k8s/config/crd/bases/redpanda.vectorized.io_clusters.yaml b/src/go/k8s/config/crd/bases/redpanda.vectorized.io_clusters.yaml index ef3383c82941..dbfa83971032 100644 --- a/src/go/k8s/config/crd/bases/redpanda.vectorized.io_clusters.yaml +++ b/src/go/k8s/config/crd/bases/redpanda.vectorized.io_clusters.yaml @@ -806,6 +806,16 @@ spec: - type type: object type: array + currentReplicas: + description: CurrentReplicas is the number of Pods that the controller + currently wants to run for the cluster. + format: int32 + type: integer + decommissioningNode: + description: Indicates that a node is currently being decommissioned + from the cluster and provides its ordinal number + format: int32 + type: integer nodes: description: Nodes of the provisioned redpanda nodes properties: @@ -908,8 +918,14 @@ spec: type: string type: object type: object + readyReplicas: + description: ReadyReplicas is the number of Pods belonging to the + cluster that have a Ready Condition. + format: int32 + type: integer replicas: - description: Replicas show how many nodes are working in the cluster + description: Replicas show how many nodes have been created for the + cluster format: int32 type: integer restarting: diff --git a/src/go/k8s/controllers/redpanda/cluster_controller.go b/src/go/k8s/controllers/redpanda/cluster_controller.go index 155e7f4cdba9..e6ed873fb204 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller.go @@ -16,6 +16,7 @@ import ( "fmt" "reflect" "strings" + "time" "github.com/go-logr/logr" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" @@ -24,6 +25,7 @@ import ( "github.com/redpanda-data/redpanda/src/go/k8s/pkg/networking" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/certmanager" + resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" appsv1 "k8s.io/api/apps/v1" @@ -45,11 +47,12 @@ var ( // ClusterReconciler reconciles a Cluster object type ClusterReconciler struct { client.Client - Log logr.Logger - configuratorSettings resources.ConfiguratorSettings - clusterDomain string - Scheme *runtime.Scheme - AdminAPIClientFactory adminutils.AdminAPIClientFactory + Log logr.Logger + configuratorSettings resources.ConfiguratorSettings + clusterDomain string + Scheme *runtime.Scheme + AdminAPIClientFactory adminutils.AdminAPIClientFactory + DecommissionWaitInterval time.Duration } //+kubebuilder:rbac:groups=redpanda.vectorized.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete @@ -155,6 +158,8 @@ func (r *ClusterReconciler) Reconcile( sa.Key().Name, r.configuratorSettings, configMapResource.GetNodeConfigHash, + r.AdminAPIClientFactory, + r.DecommissionWaitInterval, log) toApply := []resources.Reconciler{ @@ -179,7 +184,7 @@ func (r *ClusterReconciler) Reconcile( var e *resources.RequeueAfterError if errors.As(err, &e) { - log.Error(e, e.Msg) + log.Info(e.Error()) return ctrl.Result{RequeueAfter: e.RequeueAfter}, nil } @@ -324,7 +329,8 @@ func (r *ClusterReconciler) reportStatus( } cluster.Status.Nodes = *nodeList - cluster.Status.Replicas = sts.LastObservedState.Status.ReadyReplicas + cluster.Status.ReadyReplicas = sts.LastObservedState.Status.ReadyReplicas + cluster.Status.Replicas = sts.LastObservedState.Status.Replicas cluster.Status.Version = sts.Version() err = r.Status().Update(ctx, &cluster) @@ -353,7 +359,8 @@ func statusShouldBeUpdated( !reflect.DeepEqual(nodeList.ExternalPandaproxy, status.Nodes.ExternalPandaproxy) || !reflect.DeepEqual(nodeList.SchemaRegistry, status.Nodes.SchemaRegistry) || !reflect.DeepEqual(nodeList.ExternalBootstrap, status.Nodes.ExternalBootstrap)) || - status.Replicas != sts.LastObservedState.Status.ReadyReplicas || + status.Replicas != sts.LastObservedState.Status.Replicas || + status.ReadyReplicas != sts.LastObservedState.Status.ReadyReplicas || status.Version != sts.Version() } @@ -495,7 +502,7 @@ func (r *ClusterReconciler) setInitialSuperUserPassword( ctx context.Context, redpandaCluster *redpandav1alpha1.Cluster, fqdn string, - adminTLSConfigProvider resources.AdminTLSConfigProvider, + adminTLSConfigProvider resourcetypes.AdminTLSConfigProvider, objs []types.NamespacedName, ) error { adminAPI, err := r.AdminAPIClientFactory(ctx, r, redpandaCluster, fqdn, adminTLSConfigProvider) diff --git a/src/go/k8s/controllers/redpanda/cluster_controller_common_test.go b/src/go/k8s/controllers/redpanda/cluster_controller_common_test.go new file mode 100644 index 000000000000..8e9aa6ef5e6f --- /dev/null +++ b/src/go/k8s/controllers/redpanda/cluster_controller_common_test.go @@ -0,0 +1,157 @@ +// Copyright 2022 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package redpanda_test + +import ( + "context" + "fmt" + + v1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func resourceGetter(key client.ObjectKey, res client.Object) func() error { + return func() error { + return k8sClient.Get(context.Background(), key, res) + } +} + +func resourceDataGetter( + key client.ObjectKey, res client.Object, extractor func() interface{}, +) func() interface{} { + return func() interface{} { + err := resourceGetter(key, res)() + if err != nil { + return err + } + return extractor() + } +} + +func annotationGetter( + key client.ObjectKey, res client.Object, name string, +) func() string { + return func() string { + if err := resourceGetter(key, res)(); err != nil { + return fmt.Sprintf("client error: %+v", err) + } + if sts, ok := res.(*appsv1.StatefulSet); ok { + return sts.Spec.Template.Annotations[name] + } + return res.GetAnnotations()[name] + } +} + +func clusterConfiguredConditionGetter( + key client.ObjectKey, +) func() *v1alpha1.ClusterCondition { + return func() *v1alpha1.ClusterCondition { + var cluster v1alpha1.Cluster + if err := k8sClient.Get(context.Background(), key, &cluster); err != nil { + return nil + } + return cluster.Status.GetCondition(v1alpha1.ClusterConfiguredConditionType) + } +} + +func clusterConfiguredConditionStatusGetter(key client.ObjectKey) func() bool { + return func() bool { + cond := clusterConfiguredConditionGetter(key)() + return cond != nil && cond.Status == corev1.ConditionTrue + } +} + +func clusterUpdater( + clusterNamespacedName types.NamespacedName, upd func(*v1alpha1.Cluster), +) func() error { + return func() error { + cl := &v1alpha1.Cluster{} + if err := k8sClient.Get(context.Background(), clusterNamespacedName, cl); err != nil { + return err + } + upd(cl) + return k8sClient.Update(context.Background(), cl) + } +} + +func statefulSetReplicasReconciler( + key types.NamespacedName, cluster *v1alpha1.Cluster, +) func() error { + return func() error { + var sts appsv1.StatefulSet + err := k8sClient.Get(context.Background(), key, &sts) + if err != nil { + return err + } + + // Aligning Pods first + var podList corev1.PodList + err = k8sClient.List(context.Background(), &podList, &client.ListOptions{ + Namespace: key.Namespace, + LabelSelector: labels.ForCluster(cluster).AsClientSelector(), + }) + if err != nil { + return err + } + + pods := make(map[string]bool, len(podList.Items)) + for i := range podList.Items { + pods[podList.Items[i].Name] = true + } + + for i := int32(0); i < *sts.Spec.Replicas; i++ { + podName := fmt.Sprintf("%s-%d", key.Name, i) + var pod corev1.Pod + if pods[podName] { + for j := range podList.Items { + if podList.Items[j].Name == podName { + pod = *podList.Items[j].DeepCopy() + } + } + } + pod.Name = podName + pod.Namespace = key.Namespace + pod.Labels = labels.ForCluster(cluster) + pod.Annotations = sts.Spec.Template.Annotations + pod.Spec = sts.Spec.Template.Spec + + if pods[podName] { + delete(pods, podName) + err = k8sClient.Update(context.Background(), &pod) + if err != nil { + return err + } + } else { + err = k8sClient.Create(context.Background(), &pod) + if err != nil { + return err + } + } + } + + for i := range podList.Items { + if pods[podList.Items[i].Name] { + err = k8sClient.Delete(context.Background(), &podList.Items[i]) + if err != nil { + return err + } + } + } + + // Aligning StatefulSet + sts.Status.Replicas = *sts.Spec.Replicas + sts.Status.ReadyReplicas = sts.Status.Replicas + return k8sClient.Status().Update(context.Background(), &sts) + } +} diff --git a/src/go/k8s/controllers/redpanda/cluster_controller_configuration_test.go b/src/go/k8s/controllers/redpanda/cluster_controller_configuration_test.go index dfd1fb11f45b..59cb70e9f705 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller_configuration_test.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller_configuration_test.go @@ -27,7 +27,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" - "sigs.k8s.io/controller-runtime/pkg/client" ) const ( @@ -820,67 +819,3 @@ func getInitialTestCluster( } return key, baseKey, cluster } - -func resourceGetter(key client.ObjectKey, res client.Object) func() error { - return func() error { - return k8sClient.Get(context.Background(), key, res) - } -} - -func resourceDataGetter( - key client.ObjectKey, res client.Object, extractor func() interface{}, -) func() interface{} { - return func() interface{} { - err := resourceGetter(key, res)() - if err != nil { - return err - } - return extractor() - } -} - -func annotationGetter( - key client.ObjectKey, res client.Object, name string, -) func() string { - return func() string { - if err := resourceGetter(key, res)(); err != nil { - return fmt.Sprintf("client error: %+v", err) - } - if sts, ok := res.(*appsv1.StatefulSet); ok { - return sts.Spec.Template.Annotations[name] - } - return res.GetAnnotations()[name] - } -} - -func clusterConfiguredConditionGetter( - key client.ObjectKey, -) func() *v1alpha1.ClusterCondition { - return func() *v1alpha1.ClusterCondition { - var cluster v1alpha1.Cluster - if err := k8sClient.Get(context.Background(), key, &cluster); err != nil { - return nil - } - return cluster.Status.GetCondition(v1alpha1.ClusterConfiguredConditionType) - } -} - -func clusterConfiguredConditionStatusGetter(key client.ObjectKey) func() bool { - return func() bool { - cond := clusterConfiguredConditionGetter(key)() - return cond != nil && cond.Status == corev1.ConditionTrue - } -} - -func clusterUpdater( - clusterNamespacedName types.NamespacedName, upd func(*v1alpha1.Cluster), -) func() error { - return func() error { - cl := &v1alpha1.Cluster{} - if err := k8sClient.Get(context.Background(), clusterNamespacedName, cl); err != nil { - return err - } - upd(cl) - return k8sClient.Update(context.Background(), cl) - } -} diff --git a/src/go/k8s/controllers/redpanda/cluster_controller_scale_test.go b/src/go/k8s/controllers/redpanda/cluster_controller_scale_test.go new file mode 100644 index 000000000000..57691f5fa913 --- /dev/null +++ b/src/go/k8s/controllers/redpanda/cluster_controller_scale_test.go @@ -0,0 +1,322 @@ +// Copyright 2022 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package redpanda_test + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/pointer" +) + +var _ = Describe("Redpanda cluster scale resource", func() { + const ( + timeout = time.Second * 30 + interval = time.Millisecond * 100 + + timeoutShort = time.Millisecond * 100 + intervalShort = time.Millisecond * 20 + ) + + Context("When starting up a fresh Redpanda cluster", func() { + It("Should wait for the first replica to come up before launching the others", func() { + By("Allowing creation of a new cluster with 3 replicas") + key, redpandaCluster := getClusterWithReplicas("startup", 3) + Expect(k8sClient.Create(context.Background(), redpandaCluster)).Should(Succeed()) + + By("Keeping the StatefulSet at single replica until initialized") + var sts appsv1.StatefulSet + Eventually(resourceGetter(key, &sts), timeout, interval).Should(Succeed()) + Consistently(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeoutShort, intervalShort).Should(Equal(int32(1))) + + By("Scaling to 3 replicas only when broker appears") + testAdminAPI.AddBroker(admin.Broker{NodeID: 0, MembershipStatus: admin.MembershipStatusActive}) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(3))) + + By("Deleting the cluster") + Expect(k8sClient.Delete(context.Background(), redpandaCluster)).Should(Succeed()) + }) + }) + + Context("When scaling up a cluster", func() { + It("Should just update the StatefulSet", func() { + By("Allowing creation of a new cluster with 2 replicas") + key, redpandaCluster := getClusterWithReplicas("scaling", 2) + Expect(k8sClient.Create(context.Background(), redpandaCluster)).Should(Succeed()) + + By("Scaling to 2 replicas when at least one broker appears") + testAdminAPI.AddBroker(admin.Broker{NodeID: 0, MembershipStatus: admin.MembershipStatusActive}) + var sts appsv1.StatefulSet + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(2))) + + By("Scaling up when replicas increase") + Eventually(clusterUpdater(key, func(cluster *v1alpha1.Cluster) { + cluster.Spec.Replicas = pointer.Int32Ptr(5) + }), timeout, interval).Should(Succeed()) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(5))) + + By("Deleting the cluster") + Expect(k8sClient.Delete(context.Background(), redpandaCluster)).Should(Succeed()) + }) + }) + + Context("When scaling down a cluster", func() { + It("Should always decommission the last nodes of a cluster one at time", func() { + By("Allowing creation of a new cluster with 3 replicas") + key, redpandaCluster := getClusterWithReplicas("decommission", 3) + Expect(k8sClient.Create(context.Background(), redpandaCluster)).Should(Succeed()) + + By("Scaling to 3 replicas when the brokers start to appear") + testAdminAPI.AddBroker(admin.Broker{NodeID: 0, MembershipStatus: admin.MembershipStatusActive}) + testAdminAPI.AddBroker(admin.Broker{NodeID: 1, MembershipStatus: admin.MembershipStatusActive}) + testAdminAPI.AddBroker(admin.Broker{NodeID: 2, MembershipStatus: admin.MembershipStatusActive}) + var sts appsv1.StatefulSet + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(3))) + + By("Decommissioning the last node when scaling down by 2") + Eventually(clusterUpdater(key, func(cluster *v1alpha1.Cluster) { + cluster.Spec.Replicas = pointer.Int32Ptr(1) + }), timeout, interval).Should(Succeed()) + + By("Start decommissioning node with ordinal 2") + Eventually(resourceDataGetter(key, redpandaCluster, func() interface{} { + if redpandaCluster.Status.DecommissioningNode == nil { + return nil + } + return *redpandaCluster.Status.DecommissioningNode + }), timeout, interval).Should(Equal(int32(2))) + Eventually(testAdminAPI.BrokerStatusGetter(2), timeout, interval).Should(Equal(admin.MembershipStatusDraining)) + Consistently(testAdminAPI.BrokerStatusGetter(1), timeoutShort, intervalShort).Should(Equal(admin.MembershipStatusActive)) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(3))) + + By("Scaling down only when decommissioning is done") + Expect(testAdminAPI.RemoveBroker(2)).To(BeTrue()) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(2))) + Eventually(statefulSetReplicasReconciler(key, redpandaCluster), timeout, interval).Should(Succeed()) + + By("Start decommissioning the other node") + Eventually(testAdminAPI.BrokerStatusGetter(1), timeout, interval).Should(Equal(admin.MembershipStatusDraining)) + + By("Removing the other node as well when done") + Expect(testAdminAPI.RemoveBroker(1)).To(BeTrue()) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(1))) + + By("Deleting the cluster") + Expect(k8sClient.Delete(context.Background(), redpandaCluster)).Should(Succeed()) + }) + + It("Can recommission a node while decommission is in progress", func() { + By("Allowing creation of a new cluster with 3 replicas") + key, redpandaCluster := getClusterWithReplicas("recommission", 3) + Expect(k8sClient.Create(context.Background(), redpandaCluster)).Should(Succeed()) + + By("Scaling to 3 replicas when the brokers start to appear") + testAdminAPI.AddBroker(admin.Broker{NodeID: 0, MembershipStatus: admin.MembershipStatusActive}) + testAdminAPI.AddBroker(admin.Broker{NodeID: 1, MembershipStatus: admin.MembershipStatusActive}) + testAdminAPI.AddBroker(admin.Broker{NodeID: 2, MembershipStatus: admin.MembershipStatusActive}) + var sts appsv1.StatefulSet + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(3))) + + By("Start decommissioning node 2") + Eventually(clusterUpdater(key, func(cluster *v1alpha1.Cluster) { + cluster.Spec.Replicas = pointer.Int32Ptr(2) + }), timeout, interval).Should(Succeed()) + Eventually(resourceDataGetter(key, redpandaCluster, func() interface{} { + if redpandaCluster.Status.DecommissioningNode == nil { + return nil + } + return *redpandaCluster.Status.DecommissioningNode + }), timeout, interval).Should(Equal(int32(2))) + Eventually(testAdminAPI.BrokerStatusGetter(2), timeout, interval).Should(Equal(admin.MembershipStatusDraining)) + + By("Recommissioning the node by restoring replicas") + Eventually(clusterUpdater(key, func(cluster *v1alpha1.Cluster) { + cluster.Spec.Replicas = pointer.Int32Ptr(3) + }), timeout, interval).Should(Succeed()) + Eventually(testAdminAPI.BrokerStatusGetter(2), timeout, interval).Should(Equal(admin.MembershipStatusActive)) + + By("Start decommissioning node 2 again") + Eventually(clusterUpdater(key, func(cluster *v1alpha1.Cluster) { + cluster.Spec.Replicas = pointer.Int32Ptr(2) + }), timeout, interval).Should(Succeed()) + Eventually(resourceDataGetter(key, redpandaCluster, func() interface{} { + if redpandaCluster.Status.DecommissioningNode == nil { + return nil + } + return *redpandaCluster.Status.DecommissioningNode + }), timeout, interval).Should(Equal(int32(2))) + Eventually(testAdminAPI.BrokerStatusGetter(2), timeout, interval).Should(Equal(admin.MembershipStatusDraining)) + + By("Recommissioning the node also when scaling to more replicas") + Eventually(clusterUpdater(key, func(cluster *v1alpha1.Cluster) { + cluster.Spec.Replicas = pointer.Int32Ptr(4) + }), timeout, interval).Should(Succeed()) + Eventually(testAdminAPI.BrokerStatusGetter(2), timeout, interval).Should(Equal(admin.MembershipStatusActive)) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(4))) + + By("Deleting the cluster") + Expect(k8sClient.Delete(context.Background(), redpandaCluster)).Should(Succeed()) + }) + + It("Can decommission nodes that never started", func() { + By("Allowing creation of a new cluster with 3 replicas") + key, redpandaCluster := getClusterWithReplicas("direct-decommission", 3) + Expect(k8sClient.Create(context.Background(), redpandaCluster)).Should(Succeed()) + + By("Scaling to 3 replicas, but have last one not registered") + testAdminAPI.AddBroker(admin.Broker{NodeID: 0, MembershipStatus: admin.MembershipStatusActive}) + testAdminAPI.AddBroker(admin.Broker{NodeID: 1, MembershipStatus: admin.MembershipStatusActive}) + var sts appsv1.StatefulSet + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(3))) + + By("Doing direct scale down of node 2") + Eventually(clusterUpdater(key, func(cluster *v1alpha1.Cluster) { + cluster.Spec.Replicas = pointer.Int32Ptr(2) + }), timeout, interval).Should(Succeed()) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(2))) + Eventually(statefulSetReplicasReconciler(key, redpandaCluster), timeout, interval).Should(Succeed()) + Eventually(resourceDataGetter(key, redpandaCluster, func() interface{} { + return redpandaCluster.Status.DecommissioningNode + }), timeout, interval).Should(BeNil()) + + By("Deleting the cluster") + Expect(k8sClient.Delete(context.Background(), redpandaCluster)).Should(Succeed()) + }) + + It("Can decommission nodes that were lazy to start", func() { + By("Allowing creation of a new cluster with 3 replicas") + key, redpandaCluster := getClusterWithReplicas("direct-decommission-lazy", 3) + Expect(k8sClient.Create(context.Background(), redpandaCluster)).Should(Succeed()) + + By("Scaling to 3 replicas, but have last one not registered") + testAdminAPI.AddBroker(admin.Broker{NodeID: 0, MembershipStatus: admin.MembershipStatusActive}) + testAdminAPI.AddBroker(admin.Broker{NodeID: 1, MembershipStatus: admin.MembershipStatusActive}) + var sts appsv1.StatefulSet + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(3))) + + By("Doing direct scale down of node 2") + Eventually(clusterUpdater(key, func(cluster *v1alpha1.Cluster) { + cluster.Spec.Replicas = pointer.Int32Ptr(2) + }), timeout, interval).Should(Succeed()) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(2))) + + By("Finding out that the node was able to connect to the cluster before being terminated") + testAdminAPI.AddBroker(admin.Broker{NodeID: 2, MembershipStatus: admin.MembershipStatusActive}) + Eventually(resourceDataGetter(key, redpandaCluster, func() interface{} { + if redpandaCluster.Status.DecommissioningNode == nil { + return nil + } + return *redpandaCluster.Status.DecommissioningNode + }), timeout, interval).Should(Equal(int32(2))) + + By("Scaling the cluster back and properly decommission the node") + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(3))) + Eventually(testAdminAPI.BrokerStatusGetter(2), timeout, interval).Should(Equal(admin.MembershipStatusDraining)) + + By("Complete downscale after decommission") + Expect(testAdminAPI.RemoveBroker(2)).To(BeTrue()) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(2))) + Eventually(statefulSetReplicasReconciler(key, redpandaCluster), timeout, interval).Should(Succeed()) + Eventually(resourceDataGetter(key, redpandaCluster, func() interface{} { + return redpandaCluster.Status.DecommissioningNode + }), timeout, interval).Should(BeNil()) + + By("Deleting the cluster") + Expect(k8sClient.Delete(context.Background(), redpandaCluster)).Should(Succeed()) + }) + }) +}) + +func getClusterWithReplicas( + name string, replicas int32, +) (key types.NamespacedName, cluster *v1alpha1.Cluster) { + key = types.NamespacedName{ + Name: name, + Namespace: "default", + } + + cluster = &v1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: key.Name, + Namespace: key.Namespace, + }, + Spec: v1alpha1.ClusterSpec{ + Image: "vectorized/redpanda", + Version: "dev", + Replicas: pointer.Int32Ptr(replicas), + Configuration: v1alpha1.RedpandaConfig{ + KafkaAPI: []v1alpha1.KafkaAPI{ + { + Port: 9092, + }, + }, + AdminAPI: []v1alpha1.AdminAPI{{Port: 9644}}, + RPCServer: v1alpha1.SocketAddress{ + Port: 33145, + }, + }, + Resources: v1alpha1.RedpandaResourceRequirements{ + ResourceRequirements: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + Redpanda: nil, + }, + }, + } + return key, cluster +} diff --git a/src/go/k8s/controllers/redpanda/cluster_controller_test.go b/src/go/k8s/controllers/redpanda/cluster_controller_test.go index e4b9a1e6d6f7..718c2b17787c 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller_test.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller_test.go @@ -699,10 +699,11 @@ var _ = Describe("RedPandaCluster controller", func() { // exists. This verifies that these situations are handled // gracefully and without error r := &redpanda.ClusterReconciler{ - Client: fake.NewClientBuilder().Build(), - Log: ctrl.Log, - Scheme: scheme.Scheme, - AdminAPIClientFactory: testAdminAPIFactory, + Client: fake.NewClientBuilder().Build(), + Log: ctrl.Log, + Scheme: scheme.Scheme, + AdminAPIClientFactory: testAdminAPIFactory, + DecommissionWaitInterval: 100 * time.Millisecond, } _, err := r.Reconcile(context.Background(), ctrl.Request{NamespacedName: types.NamespacedName{ Namespace: "default", @@ -720,10 +721,11 @@ var _ = Describe("RedPandaCluster controller", func() { Expect(err).NotTo(HaveOccurred()) r := &redpanda.ClusterReconciler{ - Client: fake.NewClientBuilder().Build(), - Log: ctrl.Log, - Scheme: scheme.Scheme, - AdminAPIClientFactory: testAdminAPIFactory, + Client: fake.NewClientBuilder().Build(), + Log: ctrl.Log, + Scheme: scheme.Scheme, + AdminAPIClientFactory: testAdminAPIFactory, + DecommissionWaitInterval: 100 * time.Millisecond, } Expect(r.WithConfiguratorSettings(res.ConfiguratorSettings{ diff --git a/src/go/k8s/controllers/redpanda/metric_controller.go b/src/go/k8s/controllers/redpanda/metric_controller.go index ca5066318be3..9f7e2b843f37 100644 --- a/src/go/k8s/controllers/redpanda/metric_controller.go +++ b/src/go/k8s/controllers/redpanda/metric_controller.go @@ -94,7 +94,7 @@ func (r *ClusterMetricController) Reconcile( if err != nil { return ctrl.Result{}, err } - g.Set(float64(cl.Items[i].Status.Replicas)) + g.Set(float64(cl.Items[i].Status.ReadyReplicas)) curLabels[cl.Items[i].Name] = struct{}{} r.currentLabels[cl.Items[i].Name] = struct{}{} } diff --git a/src/go/k8s/controllers/redpanda/suite_test.go b/src/go/k8s/controllers/redpanda/suite_test.go index b55c9090e66e..bce30a3e193d 100644 --- a/src/go/k8s/controllers/redpanda/suite_test.go +++ b/src/go/k8s/controllers/redpanda/suite_test.go @@ -13,6 +13,7 @@ import ( "bytes" "context" "encoding/json" + "fmt" "net/http" "path/filepath" "sort" @@ -29,6 +30,7 @@ import ( adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/configuration" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -92,16 +94,24 @@ var _ = BeforeSuite(func(done Done) { _ client.Reader, _ *redpandav1alpha1.Cluster, _ string, - _ resources.AdminTLSConfigProvider, + _ types.AdminTLSConfigProvider, + ordinals ...int32, ) (adminutils.AdminAPIClient, error) { + if len(ordinals) == 1 { + return &scopedMockAdminAPI{ + mockAdminAPI: testAdminAPI, + ordinal: ordinals[0], + }, nil + } return testAdminAPI, nil } err = (&redpandacontrollers.ClusterReconciler{ - Client: k8sManager.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("core").WithName("RedpandaCluster"), - Scheme: k8sManager.GetScheme(), - AdminAPIClientFactory: testAdminAPIFactory, + Client: k8sManager.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("core").WithName("RedpandaCluster"), + Scheme: k8sManager.GetScheme(), + AdminAPIClientFactory: testAdminAPIFactory, + DecommissionWaitInterval: 100 * time.Millisecond, }).WithClusterDomain("cluster.local").WithConfiguratorSettings(resources.ConfiguratorSettings{ ConfiguratorBaseImage: "vectorized/configurator", ConfiguratorTag: "latest", @@ -160,9 +170,15 @@ type mockAdminAPI struct { invalid []string unknown []string directValidation bool + brokers []admin.Broker monitor sync.Mutex } +type scopedMockAdminAPI struct { + *mockAdminAPI + ordinal int32 +} + var _ adminutils.AdminAPIClient = &mockAdminAPI{} type unavailableError struct{} @@ -287,6 +303,7 @@ func (m *mockAdminAPI) Clear() { m.patches = nil m.unavailable = false m.directValidation = false + m.brokers = nil } func (m *mockAdminAPI) GetFeatures( @@ -365,12 +382,114 @@ func (m *mockAdminAPI) SetUnavailable(unavailable bool) { m.unavailable = unavailable } +func (m *mockAdminAPI) GetNodeConfig( + _ context.Context, +) (admin.NodeConfig, error) { + return admin.NodeConfig{}, nil +} + +// nolint:goerr113 // test code +func (s *scopedMockAdminAPI) GetNodeConfig( + ctx context.Context, +) (admin.NodeConfig, error) { + brokers, err := s.Brokers(ctx) + if err != nil { + return admin.NodeConfig{}, err + } + if len(brokers) <= int(s.ordinal) { + return admin.NodeConfig{}, fmt.Errorf("broker not registered") + } + return admin.NodeConfig{ + NodeID: brokers[int(s.ordinal)].NodeID, + }, nil +} + func (m *mockAdminAPI) SetDirectValidationEnabled(directValidation bool) { m.monitor.Lock() defer m.monitor.Unlock() m.directValidation = directValidation } +func (m *mockAdminAPI) AddBroker(broker admin.Broker) { + m.monitor.Lock() + defer m.monitor.Unlock() + + m.brokers = append(m.brokers, broker) +} + +func (m *mockAdminAPI) RemoveBroker(id int) bool { + m.monitor.Lock() + defer m.monitor.Unlock() + + idx := -1 + for i := range m.brokers { + if m.brokers[i].NodeID == id { + idx = i + break + } + } + if idx < 0 { + return false + } + m.brokers = append(m.brokers[:idx], m.brokers[idx+1:]...) + return true +} + +func (m *mockAdminAPI) Brokers(_ context.Context) ([]admin.Broker, error) { + m.monitor.Lock() + defer m.monitor.Unlock() + + return append([]admin.Broker{}, m.brokers...), nil +} + +func (m *mockAdminAPI) BrokerStatusGetter( + id int, +) func() admin.MembershipStatus { + return func() admin.MembershipStatus { + m.monitor.Lock() + defer m.monitor.Unlock() + + for i := range m.brokers { + if m.brokers[i].NodeID == id { + return m.brokers[i].MembershipStatus + } + } + return "" + } +} + +func (m *mockAdminAPI) DecommissionBroker(_ context.Context, id int) error { + return m.SetBrokerStatus(id, admin.MembershipStatusDraining) +} + +func (m *mockAdminAPI) RecommissionBroker(_ context.Context, id int) error { + return m.SetBrokerStatus(id, admin.MembershipStatusActive) +} + +func (m *mockAdminAPI) EnableMaintenanceMode(_ context.Context, _ int) error { + return nil +} + +func (m *mockAdminAPI) DisableMaintenanceMode(_ context.Context, _ int) error { + return nil +} + +// nolint:goerr113 // test code +func (m *mockAdminAPI) SetBrokerStatus( + id int, status admin.MembershipStatus, +) error { + m.monitor.Lock() + defer m.monitor.Unlock() + + for i := range m.brokers { + if m.brokers[i].NodeID == id { + m.brokers[i].MembershipStatus = status + return nil + } + } + return fmt.Errorf("unknown broker %d", id) +} + func makeCopy(input, output interface{}) { ser, err := json.Marshal(input) if err != nil { diff --git a/src/go/k8s/main.go b/src/go/k8s/main.go index 2785e744a51b..9b55f93d580f 100644 --- a/src/go/k8s/main.go +++ b/src/go/k8s/main.go @@ -11,6 +11,7 @@ package main import ( "flag" "os" + "time" cmapiv1 "github.com/jetstack/cert-manager/pkg/apis/certmanager/v1" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" @@ -55,6 +56,7 @@ func main() { configuratorBaseImage string configuratorTag string configuratorImagePullPolicy string + decommissionWaitInterval time.Duration ) flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") @@ -67,6 +69,8 @@ func main() { flag.StringVar(&configuratorBaseImage, "configurator-base-image", defaultConfiguratorContainerImage, "Set the configurator base image") flag.StringVar(&configuratorTag, "configurator-tag", "latest", "Set the configurator tag") flag.StringVar(&configuratorImagePullPolicy, "configurator-image-pull-policy", "Always", "Set the configurator image pull policy") + flag.DurationVar(&decommissionWaitInterval, "decommission-wait-interval", 8*time.Second, "Set the time to wait for a node decommission to happen in the cluster") + flag.BoolVar(&redpandav1alpha1.AllowDownscalingInWebhook, "allow-downscaling", false, "Allow to reduce the number of replicas in existing clusters (alpha feature)") opts := zap.Options{ Development: true, @@ -98,10 +102,11 @@ func main() { } if err = (&redpandacontrollers.ClusterReconciler{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("redpanda").WithName("Cluster"), - Scheme: mgr.GetScheme(), - AdminAPIClientFactory: adminutils.NewInternalAdminAPI, + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("redpanda").WithName("Cluster"), + Scheme: mgr.GetScheme(), + AdminAPIClientFactory: adminutils.NewInternalAdminAPI, + DecommissionWaitInterval: decommissionWaitInterval, }).WithClusterDomain(clusterDomain).WithConfiguratorSettings(configurator).SetupWithManager(mgr); err != nil { setupLog.Error(err, "Unable to create controller", "controller", "Cluster") os.Exit(1) diff --git a/src/go/k8s/pkg/admin/admin.go b/src/go/k8s/pkg/admin/admin.go index 871956a5c4c3..2a37d8a5ecce 100644 --- a/src/go/k8s/pkg/admin/admin.go +++ b/src/go/k8s/pkg/admin/admin.go @@ -16,7 +16,7 @@ import ( "fmt" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" - "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -35,7 +35,8 @@ func NewInternalAdminAPI( k8sClient client.Reader, redpandaCluster *redpandav1alpha1.Cluster, fqdn string, - adminTLSProvider resources.AdminTLSConfigProvider, + adminTLSProvider types.AdminTLSConfigProvider, + ordinals ...int32, ) (AdminAPIClient, error) { adminInternal := redpandaCluster.AdminAPIInternal() if adminInternal == nil { @@ -53,11 +54,17 @@ func NewInternalAdminAPI( adminInternalPort := adminInternal.Port - var urls []string - replicas := *redpandaCluster.Spec.Replicas + if len(ordinals) == 0 { + // Not a specific node, just go through all them + replicas := redpandaCluster.GetCurrentReplicas() - for i := int32(0); i < replicas; i++ { - urls = append(urls, fmt.Sprintf("%s-%d.%s:%d", redpandaCluster.Name, i, fqdn, adminInternalPort)) + for i := int32(0); i < replicas; i++ { + ordinals = append(ordinals, i) + } + } + urls := make([]string, 0, len(ordinals)) + for _, on := range ordinals { + urls = append(urls, fmt.Sprintf("%s-%d.%s:%d", redpandaCluster.Name, on, fqdn, adminInternalPort)) } adminAPI, err := admin.NewAdminAPI(urls, admin.BasicCredentials{}, tlsConfig) @@ -74,10 +81,18 @@ type AdminAPIClient interface { ClusterConfigStatus(ctx context.Context, sendToLeader bool) (admin.ConfigStatusResponse, error) ClusterConfigSchema(ctx context.Context) (admin.ConfigSchema, error) PatchClusterConfig(ctx context.Context, upsert map[string]interface{}, remove []string) (admin.ClusterConfigWriteResult, error) + GetNodeConfig(ctx context.Context) (admin.NodeConfig, error) CreateUser(ctx context.Context, username, password, mechanism string) error GetFeatures(ctx context.Context) (admin.FeaturesResponse, error) + + Brokers(ctx context.Context) ([]admin.Broker, error) + DecommissionBroker(ctx context.Context, node int) error + RecommissionBroker(ctx context.Context, node int) error + + EnableMaintenanceMode(ctx context.Context, node int) error + DisableMaintenanceMode(ctx context.Context, node int) error } var _ AdminAPIClient = &admin.AdminAPI{} @@ -89,7 +104,8 @@ type AdminAPIClientFactory func( k8sClient client.Reader, redpandaCluster *redpandav1alpha1.Cluster, fqdn string, - adminTLSProvider resources.AdminTLSConfigProvider, + adminTLSProvider types.AdminTLSConfigProvider, + ordinals ...int32, ) (AdminAPIClient, error) var _ AdminAPIClientFactory = NewInternalAdminAPI diff --git a/src/go/k8s/pkg/resources/certmanager/pki.go b/src/go/k8s/pkg/resources/certmanager/pki.go index 82028ab87bf6..e77a8c4f6016 100644 --- a/src/go/k8s/pkg/resources/certmanager/pki.go +++ b/src/go/k8s/pkg/resources/certmanager/pki.go @@ -17,6 +17,7 @@ import ( "github.com/go-logr/logr" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" + resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -83,11 +84,11 @@ func (r *PkiReconciler) Ensure(ctx context.Context) error { } // StatefulSetVolumeProvider returns volume provider for all TLS certificates -func (r *PkiReconciler) StatefulSetVolumeProvider() resources.StatefulsetTLSVolumeProvider { +func (r *PkiReconciler) StatefulSetVolumeProvider() resourcetypes.StatefulsetTLSVolumeProvider { return r.clusterCertificates } // AdminAPIConfigProvider returns provider of admin TLS configuration -func (r *PkiReconciler) AdminAPIConfigProvider() resources.AdminTLSConfigProvider { +func (r *PkiReconciler) AdminAPIConfigProvider() resourcetypes.AdminTLSConfigProvider { return r.clusterCertificates } diff --git a/src/go/k8s/pkg/resources/certmanager/type_helpers.go b/src/go/k8s/pkg/resources/certmanager/type_helpers.go index 3b3942ff7546..f0a676af5371 100644 --- a/src/go/k8s/pkg/resources/certmanager/type_helpers.go +++ b/src/go/k8s/pkg/resources/certmanager/type_helpers.go @@ -20,6 +20,7 @@ import ( cmmetav1 "github.com/jetstack/cert-manager/pkg/apis/meta/v1" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" + resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -455,7 +456,7 @@ func (cc *ClusterCertificates) Volumes() ( ) { var vols []corev1.Volume var mounts []corev1.VolumeMount - mountPoints := resources.GetTLSMountPoints() + mountPoints := resourcetypes.GetTLSMountPoints() vol, mount := secretVolumesForTLS(cc.kafkaAPI.nodeCertificateName(), cc.kafkaAPI.clientCertificates, redpandaCertVolName, redpandaCAVolName, mountPoints.KafkaAPI.NodeCertMountDir, mountPoints.KafkaAPI.ClientCAMountDir) vols = append(vols, vol...) diff --git a/src/go/k8s/pkg/resources/configmap.go b/src/go/k8s/pkg/resources/configmap.go index d747ec8d8c42..8a24e9977264 100644 --- a/src/go/k8s/pkg/resources/configmap.go +++ b/src/go/k8s/pkg/resources/configmap.go @@ -25,6 +25,7 @@ import ( "github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/configuration" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates" + resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -216,7 +217,7 @@ func (r *ConfigMapResource) CreateConfiguration( ) (*configuration.GlobalConfiguration, error) { cfg := configuration.For(r.pandaCluster.Spec.Version) cfg.NodeConfiguration = *config.Default() - mountPoints := GetTLSMountPoints() + mountPoints := resourcetypes.GetTLSMountPoints() c := r.pandaCluster.Spec.Configuration cr := &cfg.NodeConfiguration.Redpanda @@ -330,7 +331,7 @@ func (r *ConfigMapResource) CreateConfiguration( cfg.SetAdditionalRedpandaProperty("log_segment_size", logSegmentSize) - replicas := *r.pandaCluster.Spec.Replicas + replicas := r.pandaCluster.GetCurrentReplicas() for i := int32(0); i < replicas; i++ { cr.SeedServers = append(cr.SeedServers, config.SeedServer{ Host: config.SocketAddress{ @@ -453,7 +454,7 @@ func (r *ConfigMapResource) preparePandaproxyClient( return nil } - replicas := *r.pandaCluster.Spec.Replicas + replicas := r.pandaCluster.GetCurrentReplicas() cfg.NodeConfiguration.PandaproxyClient = &config.KafkaClient{} for i := int32(0); i < replicas; i++ { cfg.NodeConfiguration.PandaproxyClient.Brokers = append(cfg.NodeConfiguration.PandaproxyClient.Brokers, config.SocketAddress{ @@ -492,7 +493,7 @@ func (r *ConfigMapResource) prepareSchemaRegistryClient( return nil } - replicas := *r.pandaCluster.Spec.Replicas + replicas := r.pandaCluster.GetCurrentReplicas() cfg.NodeConfiguration.SchemaRegistryClient = &config.KafkaClient{} for i := int32(0); i < replicas; i++ { cfg.NodeConfiguration.SchemaRegistryClient.Brokers = append(cfg.NodeConfiguration.SchemaRegistryClient.Brokers, config.SocketAddress{ @@ -525,7 +526,7 @@ func (r *ConfigMapResource) prepareSchemaRegistryClient( } func (r *ConfigMapResource) preparePandaproxyTLS( - cfgRpk *config.Config, mountPoints *TLSMountPoints, + cfgRpk *config.Config, mountPoints *resourcetypes.TLSMountPoints, ) { tlsListener := r.pandaCluster.PandaproxyAPITLS() if tlsListener != nil { @@ -550,7 +551,7 @@ func (r *ConfigMapResource) preparePandaproxyTLS( } func (r *ConfigMapResource) prepareSchemaRegistryTLS( - cfgRpk *config.Config, mountPoints *TLSMountPoints, + cfgRpk *config.Config, mountPoints *resourcetypes.TLSMountPoints, ) { if r.pandaCluster.Spec.Configuration.SchemaRegistry != nil && r.pandaCluster.Spec.Configuration.SchemaRegistry.TLS != nil { diff --git a/src/go/k8s/pkg/resources/resource_integration_test.go b/src/go/k8s/pkg/resources/resource_integration_test.go index 0646f6ecd93c..43327f631f4f 100644 --- a/src/go/k8s/pkg/resources/resource_integration_test.go +++ b/src/go/k8s/pkg/resources/resource_integration_test.go @@ -17,10 +17,13 @@ import ( "path/filepath" "strings" "testing" + "time" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" + adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin" res "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -76,6 +79,8 @@ func TestEnsure_StatefulSet(t *testing.T) { cluster := pandaCluster() cluster = cluster.DeepCopy() cluster.Name = "ensure-integration-cluster" + err := c.Create(context.Background(), cluster) + require.NoError(t, err) sts := res.NewStatefulSet( c, @@ -93,9 +98,11 @@ func TestEnsure_StatefulSet(t *testing.T) { ImagePullPolicy: "Always", }, func(ctx context.Context) (string, error) { return hash, nil }, + adminutils.NewInternalAdminAPI, + time.Second, ctrl.Log.WithName("test")) - err := sts.Ensure(context.Background()) + err = sts.Ensure(context.Background()) assert.NoError(t, err) actual := &v1.StatefulSet{} diff --git a/src/go/k8s/pkg/resources/statefulset.go b/src/go/k8s/pkg/resources/statefulset.go index 759601dacc9d..1a1309dd0e60 100644 --- a/src/go/k8s/pkg/resources/statefulset.go +++ b/src/go/k8s/pkg/resources/statefulset.go @@ -17,11 +17,14 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/go-logr/logr" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" + adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates" + resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -84,16 +87,18 @@ type StatefulSetResource struct { serviceName string nodePortName types.NamespacedName nodePortSvc corev1.Service - volumeProvider StatefulsetTLSVolumeProvider - adminTLSConfigProvider AdminTLSConfigProvider + volumeProvider resourcetypes.StatefulsetTLSVolumeProvider + adminTLSConfigProvider resourcetypes.AdminTLSConfigProvider serviceAccountName string configuratorSettings ConfiguratorSettings // hash of configmap containing configuration for redpanda (node config only), it's injected to // annotation to ensure the pods get restarted when configuration changes // this has to be retrieved lazily to achieve the correct order of resources // being applied - nodeConfigMapHashGetter func(context.Context) (string, error) - logger logr.Logger + nodeConfigMapHashGetter func(context.Context) (string, error) + adminAPIClientFactory adminutils.AdminAPIClientFactory + decommissionWaitInterval time.Duration + logger logr.Logger LastObservedState *appsv1.StatefulSet } @@ -106,11 +111,13 @@ func NewStatefulSet( serviceFQDN string, serviceName string, nodePortName types.NamespacedName, - volumeProvider StatefulsetTLSVolumeProvider, - adminTLSConfigProvider AdminTLSConfigProvider, + volumeProvider resourcetypes.StatefulsetTLSVolumeProvider, + adminTLSConfigProvider resourcetypes.AdminTLSConfigProvider, serviceAccountName string, configuratorSettings ConfiguratorSettings, nodeConfigMapHashGetter func(context.Context) (string, error), + adminAPIClientFactory adminutils.AdminAPIClientFactory, + decommissionWaitInterval time.Duration, logger logr.Logger, ) *StatefulSetResource { return &StatefulSetResource{ @@ -126,6 +133,8 @@ func NewStatefulSet( serviceAccountName, configuratorSettings, nodeConfigMapHashGetter, + adminAPIClientFactory, + decommissionWaitInterval, logger.WithValues("Kind", statefulSetKind()), nil, } @@ -166,8 +175,21 @@ func (r *StatefulSetResource) Ensure(ctx context.Context) error { return fmt.Errorf("error while fetching StatefulSet resource: %w", err) } r.LastObservedState = &sts + + // Hack for: https://github.com/redpanda-data/redpanda/issues/4999 + err = r.disableMaintenanceModeOnDecommissionedNodes(ctx) + if err != nil { + return err + } + r.logger.Info("Running update", "resource name", r.Key().Name) - return r.runUpdate(ctx, &sts, obj.(*appsv1.StatefulSet)) + err = r.runUpdate(ctx, &sts, obj.(*appsv1.StatefulSet)) + if err != nil { + return err + } + + r.logger.Info("Running scale handler", "resource name", r.Key().Name) + return r.handleScaling(ctx) } // GetCentralizedConfigurationHashFromCluster retrieves the current centralized configuratino hash from the statefulset @@ -269,6 +291,10 @@ func (r *StatefulSetResource) obj( externalAddressType = externalListener.External.PreferredAddressType } tlsVolumes, tlsVolumeMounts := r.volumeProvider.Volumes() + + // We set statefulset replicas via status.currentReplicas in order to control it from the handleScaling function + replicas := r.pandaCluster.GetCurrentReplicas() + ss := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Namespace: r.Key().Namespace, @@ -280,7 +306,7 @@ func (r *StatefulSetResource) obj( APIVersion: "apps/v1", }, Spec: appsv1.StatefulSetSpec{ - Replicas: r.pandaCluster.Spec.Replicas, + Replicas: &replicas, PodManagementPolicy: appsv1.ParallelPodManagement, Selector: clusterLabels.AsAPISelector(), UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ @@ -487,7 +513,8 @@ func (r *StatefulSetResource) obj( } // Only multi-replica clusters should use maintenance mode. See: https://github.com/redpanda-data/redpanda/issues/4338 - multiReplica := r.pandaCluster.Spec.Replicas != nil && *r.pandaCluster.Spec.Replicas > 1 + // Startup of a fresh cluster would let the first pod restart, until dynamic hooks are implemented. See: https://github.com/redpanda-data/redpanda/pull/4907 + multiReplica := r.pandaCluster.GetCurrentReplicas() > 1 if featuregates.MaintenanceMode(r.pandaCluster.Spec.Version) && r.pandaCluster.IsUsingMaintenanceModeHooks() && multiReplica { ss.Spec.Template.Spec.Containers[0].Lifecycle = &corev1.Lifecycle{ PreStop: r.getPreStopHook(), @@ -526,7 +553,7 @@ func (r *StatefulSetResource) getPreStopHook() *corev1.Handler { curlGetCommand := r.composeCURLMaintenanceCommand(`--silent`, &genericMaintenancePath) cmd := fmt.Sprintf(`until [ "${status:-}" = "200" ]; do status=$(%s); sleep 0.5; done`, curlCommand) + " && " + - fmt.Sprintf(`until [ "${finished:-}" = "true" ]; do finished=$(%s | grep -o '\"finished\":[^,}]*' | grep -o '[^: ]*$'); sleep 0.5; done`, curlGetCommand) + fmt.Sprintf(`until [ "${finished:-}" = "true" ] || [ "${draining:-}" = "false" ]; do res=$(%s); finished=$(echo $res | grep -o '\"finished\":[^,}]*' | grep -o '[^: ]*$'); draining=$(echo $res | grep -o '\"draining\":[^,}]*' | grep -o '[^: ]*$'); sleep 0.5; done`, curlGetCommand) return &corev1.Handler{ Exec: &corev1.ExecAction{ diff --git a/src/go/k8s/pkg/resources/statefulset_scale.go b/src/go/k8s/pkg/resources/statefulset_scale.go new file mode 100644 index 000000000000..47b323b0f13a --- /dev/null +++ b/src/go/k8s/pkg/resources/statefulset_scale.go @@ -0,0 +1,392 @@ +// Copyright 2022 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package resources + +import ( + "context" + "errors" + "fmt" + + "github.com/go-logr/logr" + redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" + adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + k8sclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + decommissionWaitJitterFactor = 0.2 +) + +// handleScaling is responsible for managing the current number of replicas running for a cluster. +// +// Replicas are controlled via the field `status.currentReplicas` that is set in the current method and should be +// respected by all other external reconcile functions, including the one that sets the actual amount of replicas in the StatefulSet. +// External functions should use `cluster.getCurrentReplicas()` to get the number of expected replicas for a cluster, +// since it deals with cases where the status is not initialized yet. +// +// When users change the value of `spec.replicas` for a cluster, this function is responsible for progressively changing the `status.currentReplicas` to match that value. +// In the case of a Cluster downscaled by 1 replica (i.e. `spec.replicas` lower than current value of `status.currentReplicas` by `1`), before lowering the +// value of `status.currentReplicas`, this handler will first decommission the last node, using the admin API of the cluster. +// +// There are cases where the cluster will not decommission a node (e.g. user reduces `spec.replicas` to `2`, but there are topics with `3` partition replicas), +// in which case the draining phase will hang indefinitely in Redpanda. When this happens, the controller will not downscale the cluster and +// users will find in `status.decommissioningNode` that a node is constantly being decommissioned. +// +// In cases where the decommissioning process hangs, users can increase again the value of `spec.replicas` and the handler will contact the admin API +// to recommission the last node, ensuring that the number of replicas matches the expected value and that the node joins the cluster again. +// +// Users can change the value of `spec.replicas` freely (except it cannot be 0). In case of downscaling by multiple replicas, the handler will +// decommission one node at time, until the desired number of replicas is met. +// +// When a new cluster is created, the value of `status.currentReplicas` will be initially set to `1`, no matter what the user sets in `spec.replicas`. +// This handler will first ensure that the initial node forms a cluster, by retrieving the list of brokers using the admin API. +// After the cluster is formed, the handler will increase the `status.currentReplicas` as requested. +// +// This is due to the fact that Redpanda is currently unable to initialize a cluster if each node is given the full list of seed servers: https://github.com/redpanda-data/redpanda/issues/333. +// Previous versions of the operator use to hack the list of seeds server (in the configurator pod) of node with ordinal 0, to set it always to an empty set, +// allowing it to create an initial cluster. That strategy worked because the list of seed servers is not read again from the `redpanda.yaml` file once the +// cluster is initialized. +// But the drawback was that, on an existing running cluster, when node 0 loses its data directory (e.g. because it's using local storage, and it undergoes a k8s node upgrade), +// then node 0 (having no data and an empty seed server list in `redpanda.yaml`) creates a brand new cluster ignoring the other nodes (split-brain). +// The strategy implemented here (to initialize the cluster at 1 replica, then upscaling to the desired number, without hacks on the seed server list), +// should fix this problem, since the list of seeds servers will be the same in all nodes once the cluster is created. +// +// nolint:nestif // for clarity +func (r *StatefulSetResource) handleScaling(ctx context.Context) error { + if r.pandaCluster.Status.DecommissioningNode != nil { + decommissionTargetReplicas := *r.pandaCluster.Status.DecommissioningNode + if *r.pandaCluster.Spec.Replicas > decommissionTargetReplicas { + // Decommissioning can also be canceled and we need to recommission + return r.handleRecommission(ctx) + } + return r.handleDecommission(ctx) + } + + if r.pandaCluster.Status.CurrentReplicas == 0 { + // Initialize the currentReplicas field, so that it can be later controlled + r.pandaCluster.Status.CurrentReplicas = r.pandaCluster.ComputeInitialCurrentReplicasField() + return r.Status().Update(ctx, r.pandaCluster) + } + + if *r.pandaCluster.Spec.Replicas == r.pandaCluster.Status.CurrentReplicas { + // No changes to replicas, we do nothing here + return nil + } + + if *r.pandaCluster.Spec.Replicas > r.pandaCluster.Status.CurrentReplicas { + r.logger.Info("Upscaling cluster", "replicas", *r.pandaCluster.Spec.Replicas) + + // We care about upscaling only when the cluster is moving off 1 replica, which happen e.g. at cluster startup + if r.pandaCluster.Status.CurrentReplicas == 1 { + r.logger.Info("Waiting for first node to form a cluster before upscaling") + formed, err := r.isClusterFormed(ctx) + if err != nil { + return err + } + if !formed { + return &RequeueAfterError{ + RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor), + Msg: fmt.Sprintf("Waiting for cluster to be formed before upscaling to %d replicas", *r.pandaCluster.Spec.Replicas), + } + } + r.logger.Info("Initial cluster has been formed") + } + + // Upscaling request: this is already handled by Redpanda, so we just increase status currentReplicas + return setCurrentReplicas(ctx, r, r.pandaCluster, *r.pandaCluster.Spec.Replicas, r.logger) + } + + // User required replicas is lower than current replicas (currentReplicas): start the decommissioning process + targetOrdinal := r.pandaCluster.Status.CurrentReplicas - 1 // Always decommission last node + r.logger.Info("Start decommission of last broker node", "ordinal", targetOrdinal) + r.pandaCluster.Status.DecommissioningNode = &targetOrdinal + return r.Status().Update(ctx, r.pandaCluster) +} + +// handleDecommission manages the case of decommissioning of the last node of a cluster. +// +// When this handler is called, the `status.decommissioningNode` is populated with the pod ordinal (== nodeID) of the +// node that needs to be decommissioned. +// +// The handler verifies that the node is not present in the list of brokers registered in the cluster, via admin API, +// then downscales the StatefulSet via decreasing the `status.currentReplicas`. +// +// Before completing the process, it double-checks if the node is still not registered, for handling cases where the node was +// about to start when the decommissioning process started. If the broker is found, the process is restarted. +func (r *StatefulSetResource) handleDecommission(ctx context.Context) error { + targetReplicas := *r.pandaCluster.Status.DecommissioningNode + r.logger.Info("Handling cluster in decommissioning phase", "target replicas", targetReplicas) + + adminAPI, err := r.getAdminAPIClient(ctx) + if err != nil { + return err + } + + broker, err := getNodeInfoFromCluster(ctx, *r.pandaCluster.Status.DecommissioningNode, adminAPI) + if err != nil { + return err + } + + if broker != nil { + r.logger.Info("Broker still exists in the cluster", "node_id", broker.NodeID, "status", broker.MembershipStatus) + if broker.MembershipStatus != admin.MembershipStatusDraining { + // We ask to decommission since it does not seem done + err = adminAPI.DecommissionBroker(ctx, broker.NodeID) + if err != nil { + return fmt.Errorf("error while trying to decommission node %d in cluster %s: %w", broker.NodeID, r.pandaCluster.Name, err) + } + r.logger.Info("Node marked for decommissioning in cluster", "node_id", broker.NodeID) + } + + // The draining phase must always be completed with all nodes running, to let single-replica partitions be transferred. + // The value may diverge in case we restarted the process after a complete scale down. + drainingReplicas := targetReplicas + 1 + if r.pandaCluster.Status.CurrentReplicas != drainingReplicas { + return setCurrentReplicas(ctx, r, r.pandaCluster, drainingReplicas, r.logger) + } + + // Wait until the node is fully drained (or wait forever if the cluster does not allow decommissioning of that specific node) + return &RequeueAfterError{ + RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor), + Msg: fmt.Sprintf("Waiting for node %d to be decommissioned from cluster", broker.NodeID), + } + } + + // Broker is now missing from cluster API + r.logger.Info("Node is not registered in the cluster: initializing downscale", "node_id", *r.pandaCluster.Status.DecommissioningNode) + + // We set status.currentReplicas accordingly to trigger scaling down of the statefulset + if err = setCurrentReplicas(ctx, r, r.pandaCluster, targetReplicas, r.logger); err != nil { + return err + } + + scaledDown, err := r.verifyRunningCount(ctx, targetReplicas) + if err != nil { + return err + } + if !scaledDown { + return &RequeueAfterError{ + RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor), + Msg: fmt.Sprintf("Waiting for statefulset to downscale to %d replicas", targetReplicas), + } + } + + // There's a chance that the node was initially not present in the broker list, but appeared after we started to scale down. + // Since the node may hold data that need to be propagated to other nodes, we need to restart it to let the decommission process finish. + broker, err = getNodeInfoFromCluster(ctx, *r.pandaCluster.Status.DecommissioningNode, adminAPI) + if err != nil { + return err + } + if broker != nil { + // Node reappeared in the cluster, we restart the process to handle it + return &NodeReappearingError{NodeID: broker.NodeID} + } + + r.logger.Info("Decommissioning process successfully completed", "node_id", *r.pandaCluster.Status.DecommissioningNode) + r.pandaCluster.Status.DecommissioningNode = nil + return r.Status().Update(ctx, r.pandaCluster) +} + +// handleRecommission manages the case of a node being recommissioned after a failed/wrong decommission. +// +// Recommission can only work for nodes that are still in the "draining" phase according to Redpanda. +// +// When this handler is triggered, `status.decommissioningNode` is populated with the node that was being decommissioned and +// `spec.replicas` reports a value that include that node, indicating the intention from the user to recommission it. +// +// The handler ensures that the node is running and also calls the admin API to recommission it. +// The process finishes when the node is registered among brokers and the StatefulSet is correctly scaled. +func (r *StatefulSetResource) handleRecommission(ctx context.Context) error { + r.logger.Info("Handling cluster in recommissioning phase") + + // First we ensure we've enough replicas to let the recommissioning node run + targetReplicas := *r.pandaCluster.Status.DecommissioningNode + 1 + err := setCurrentReplicas(ctx, r, r.pandaCluster, targetReplicas, r.logger) + if err != nil { + return err + } + + adminAPI, err := r.getAdminAPIClient(ctx) + if err != nil { + return err + } + + broker, err := getNodeInfoFromCluster(ctx, *r.pandaCluster.Status.DecommissioningNode, adminAPI) + if err != nil { + return err + } + + if broker == nil || broker.MembershipStatus != admin.MembershipStatusActive { + err = adminAPI.RecommissionBroker(ctx, int(*r.pandaCluster.Status.DecommissioningNode)) + if err != nil { + return fmt.Errorf("error while trying to recommission node %d in cluster %s: %w", *r.pandaCluster.Status.DecommissioningNode, r.pandaCluster.Name, err) + } + r.logger.Info("Node marked for being recommissioned in cluster", "node_id", *r.pandaCluster.Status.DecommissioningNode) + + return &RequeueAfterError{ + RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor), + Msg: fmt.Sprintf("Waiting for node %d to be recommissioned into cluster %s", *r.pandaCluster.Status.DecommissioningNode, r.pandaCluster.Name), + } + } + + r.logger.Info("Recommissioning process successfully completed", "node_id", *r.pandaCluster.Status.DecommissioningNode) + r.pandaCluster.Status.DecommissioningNode = nil + return r.Status().Update(ctx, r.pandaCluster) +} + +func (r *StatefulSetResource) getAdminAPIClient( + ctx context.Context, ordinals ...int32, +) (adminutils.AdminAPIClient, error) { + return r.adminAPIClientFactory(ctx, r, r.pandaCluster, r.serviceFQDN, r.adminTLSConfigProvider, ordinals...) +} + +func (r *StatefulSetResource) isClusterFormed( + ctx context.Context, +) (bool, error) { + rootNodeAdminAPI, err := r.getAdminAPIClient(ctx, 0) + if err != nil { + return false, err + } + brokers, err := rootNodeAdminAPI.Brokers(ctx) + if err != nil { + // Eat the error and return that the cluster is not formed + return false, nil + } + return len(brokers) > 0, nil +} + +// disableMaintenanceModeOnDecommissionedNodes can be used to put a cluster in a consistent state, disabling maintenance mode on +// nodes that have been decommissioned. +// +// A decommissioned node may activate maintenance mode via shutdown hooks and the cluster may enter an inconsistent state, +// preventing other pods clean shutdown. +// +// See: https://github.com/redpanda-data/redpanda/issues/4999 +func (r *StatefulSetResource) disableMaintenanceModeOnDecommissionedNodes( + ctx context.Context, +) error { + if !featuregates.MaintenanceMode(r.pandaCluster.Status.Version) { + return nil + } + + if r.pandaCluster.Status.DecommissioningNode == nil || r.pandaCluster.Status.CurrentReplicas > *r.pandaCluster.Status.DecommissioningNode { + // Only if actually in a decommissioning phase + return nil + } + + ordinal := *r.pandaCluster.Status.DecommissioningNode + targetReplicas := ordinal + + scaledDown, err := r.verifyRunningCount(ctx, targetReplicas) + if err != nil || !scaledDown { + // This should be done only when the pod disappears from the cluster + return err + } + + adminAPI, err := r.getAdminAPIClient(ctx) + if err != nil { + return err + } + + r.logger.Info("Forcing deletion of maintenance mode for the decommissioned node", "node_id", ordinal) + err = adminAPI.DisableMaintenanceMode(ctx, int(ordinal)) + if err != nil { + var httpErr *admin.HTTPResponseError + if errors.As(err, &httpErr) { + if httpErr.Response != nil && httpErr.Response.StatusCode/100 == 4 { + // Cluster says we don't need to do it + r.logger.Info("No need to disable maintenance mode on the decommissioned node", "node_id", ordinal, "status_code", httpErr.Response.StatusCode) + return nil + } + } + return fmt.Errorf("could not disable maintenance mode on decommissioning node %d: %w", ordinal, err) + } + r.logger.Info("Maintenance mode disabled for the decommissioned node", "node_id", ordinal) + return nil +} + +// verifyRunningCount checks if the statefulset is configured to run the given amount of replicas and that also pods match the expectations +func (r *StatefulSetResource) verifyRunningCount( + ctx context.Context, replicas int32, +) (bool, error) { + var sts appsv1.StatefulSet + if err := r.Get(ctx, r.Key(), &sts); err != nil { + return false, fmt.Errorf("could not get statefulset for checking replicas: %w", err) + } + if sts.Spec.Replicas == nil || *sts.Spec.Replicas != replicas || sts.Status.Replicas != replicas { + return false, nil + } + + var podList corev1.PodList + err := r.List(ctx, &podList, &k8sclient.ListOptions{ + Namespace: r.pandaCluster.Namespace, + LabelSelector: labels.ForCluster(r.pandaCluster).AsClientSelector(), + }) + if err != nil { + return false, fmt.Errorf("could not list pods for checking replicas: %w", err) + } + + return len(podList.Items) == int(replicas), nil +} + +// getNodeInfoFromCluster allows to get broker information using the admin API +func getNodeInfoFromCluster( + ctx context.Context, ordinal int32, adminAPI adminutils.AdminAPIClient, +) (*admin.Broker, error) { + brokers, err := adminAPI.Brokers(ctx) + if err != nil { + return nil, fmt.Errorf("could not get the list of brokers for checking decommission: %w", err) + } + for i := range brokers { + if brokers[i].NodeID == int(ordinal) { + return &brokers[i], nil + } + } + return nil, nil +} + +// setCurrentReplicas allows to set the number of status.currentReplicas in the CR, which in turns controls the replicas +// assigned to the StatefulSet +func setCurrentReplicas( + ctx context.Context, + c k8sclient.Client, + pandaCluster *redpandav1alpha1.Cluster, + replicas int32, + logger logr.Logger, +) error { + if pandaCluster.Status.CurrentReplicas == replicas { + // Skip if already done + return nil + } + + logger.Info("Scaling StatefulSet", "replicas", replicas) + pandaCluster.Status.CurrentReplicas = replicas + if err := c.Status().Update(ctx, pandaCluster); err != nil { + return fmt.Errorf("could not scale cluster %s to %d replicas: %w", pandaCluster.Name, replicas, err) + } + logger.Info("StatefulSet scaled", "replicas", replicas) + return nil +} + +// NodeReappearingError indicates that a node has appeared in the cluster before completion of the a direct downscale +type NodeReappearingError struct { + NodeID int +} + +// Error makes the NodeReappearingError a proper error +func (e *NodeReappearingError) Error() string { + return fmt.Sprintf("node has appeared in the cluster with id=%d", e.NodeID) +} diff --git a/src/go/k8s/pkg/resources/statefulset_test.go b/src/go/k8s/pkg/resources/statefulset_test.go index fd111c68333b..6b069a7bda58 100644 --- a/src/go/k8s/pkg/resources/statefulset_test.go +++ b/src/go/k8s/pkg/resources/statefulset_test.go @@ -12,8 +12,10 @@ package resources_test import ( "context" "testing" + "time" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" + adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin" res "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" "github.com/stretchr/testify/assert" v1 "k8s.io/api/apps/v1" @@ -33,13 +35,6 @@ func TestEnsure(t *testing.T) { cluster := pandaCluster() stsResource := stsFromCluster(cluster) - var newReplicas int32 = 3333 - - replicasUpdatedCluster := cluster.DeepCopy() - replicasUpdatedCluster.Spec.Replicas = &newReplicas - replicasUpdatedSts := stsFromCluster(cluster).DeepCopy() - replicasUpdatedSts.Spec.Replicas = &newReplicas - newResources := corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("1111"), corev1.ResourceMemory: resource.MustParse("2222Gi"), @@ -76,7 +71,6 @@ func TestEnsure(t *testing.T) { expectedObject *v1.StatefulSet }{ {"none existing", nil, cluster, stsResource}, - {"update replicas", stsResource, replicasUpdatedCluster, replicasUpdatedSts}, {"update resources", stsResource, resourcesUpdatedCluster, resourcesUpdatedSts}, {"update redpanda resources", stsResource, resourcesUpdatedRedpandaCluster, resourcesUpdatedSts}, {"disabled sidecar", nil, noSidecarCluster, noSidecarSts}, @@ -116,6 +110,8 @@ func TestEnsure(t *testing.T) { ImagePullPolicy: "Always", }, func(ctx context.Context) (string, error) { return hash, nil }, + adminutils.NewInternalAdminAPI, + time.Second, ctrl.Log.WithName("test")) err = sts.Ensure(context.Background()) diff --git a/src/go/k8s/pkg/resources/tls_types.go b/src/go/k8s/pkg/resources/types/tls_types.go similarity index 96% rename from src/go/k8s/pkg/resources/tls_types.go rename to src/go/k8s/pkg/resources/types/tls_types.go index 23fcb0ea68d0..5c2b48cfd0db 100644 --- a/src/go/k8s/pkg/resources/tls_types.go +++ b/src/go/k8s/pkg/resources/types/tls_types.go @@ -7,7 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 -package resources +// Package types contains useful types for dealing with resources +package types import ( "context" diff --git a/src/go/k8s/tests/e2e/decommission/00-assert.yaml b/src/go/k8s/tests/e2e/decommission/00-assert.yaml new file mode 100644 index 000000000000..d2fa081322b9 --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/00-assert.yaml @@ -0,0 +1,21 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +commands: + - command: kubectl rollout status deployment redpanda-controller-manager -n redpanda-system + - command: hack/wait-for-webhook-ready.sh +--- + +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +collectors: + - type: pod + selector: app.kubernetes.io/name=redpanda + tail: -1 + - type: pod + namespace: redpanda-system + selector: control-plane=controller-manager + tail: -1 + - type: command + command: kubectl get clusters -o jsonpath={@} -n $NAMESPACE + - type: command + command: kubectl get pods -o jsonpath={@} -n $NAMESPACE diff --git a/src/go/k8s/tests/e2e/decommission/00-enable-downscaling.yaml b/src/go/k8s/tests/e2e/decommission/00-enable-downscaling.yaml new file mode 100644 index 000000000000..3aa82a014111 --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/00-enable-downscaling.yaml @@ -0,0 +1,6 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: +# Set the --allow-downscaling=true parameter in the controller manager +- command: kubectl patch deployment redpanda-controller-manager -n redpanda-system --type='json' -p='[{"op":"add", "path":"/spec/template/spec/containers/1/args/-", "value":"--allow-downscaling=true"}]' +- command: kubectl get deployment redpanda-controller-manager -n redpanda-system -o json diff --git a/src/go/k8s/tests/e2e/decommission/01-assert.yaml b/src/go/k8s/tests/e2e/decommission/01-assert.yaml new file mode 100644 index 000000000000..f94701ae5a68 --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/01-assert.yaml @@ -0,0 +1,23 @@ +apiVersion: redpanda.vectorized.io/v1alpha1 +kind: Cluster +metadata: + name: decommissioning +status: + replicas: 3 + currentReplicas: 3 +--- + +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +collectors: + - type: pod + selector: app.kubernetes.io/name=redpanda + tail: -1 + - type: pod + namespace: redpanda-system + selector: control-plane=controller-manager + tail: -1 + - type: command + command: kubectl get clusters -o jsonpath={@} -n $NAMESPACE + - type: command + command: kubectl get pods -o jsonpath={@} -n $NAMESPACE diff --git a/src/go/k8s/tests/e2e/decommission/01-redpanda-cluster.yaml b/src/go/k8s/tests/e2e/decommission/01-redpanda-cluster.yaml new file mode 100644 index 000000000000..7bb3d1f5d8e6 --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/01-redpanda-cluster.yaml @@ -0,0 +1,25 @@ +apiVersion: redpanda.vectorized.io/v1alpha1 +kind: Cluster +metadata: + name: decommissioning +spec: + image: "localhost/redpanda" + version: "dev" + replicas: 3 + resources: + requests: + cpu: 100m + memory: 100Mi + limits: + cpu: 1 + memory: 500Mi + configuration: + rpcServer: + port: 33145 + kafkaApi: + - port: 9092 + adminApi: + - port: 9644 + pandaproxyApi: + - port: 8082 + developerMode: true diff --git a/src/go/k8s/tests/e2e/decommission/02-assert.yaml b/src/go/k8s/tests/e2e/decommission/02-assert.yaml new file mode 100644 index 000000000000..a3c57ca43421 --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/02-assert.yaml @@ -0,0 +1,29 @@ +apiVersion: v1 +kind: Pod +metadata: + labels: + job-name: wait-for-3-brokers +status: + containerStatuses: + - name: curl + state: + terminated: + message: | + 3 + phase: Succeeded +--- + +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +collectors: + - type: pod + selector: app.kubernetes.io/name=redpanda + tail: -1 + - type: pod + namespace: redpanda-system + selector: control-plane=controller-manager + tail: -1 + - type: command + command: kubectl get clusters -o jsonpath={@} -n $NAMESPACE + - type: command + command: kubectl get pods -o jsonpath={@} -n $NAMESPACE diff --git a/src/go/k8s/tests/e2e/decommission/02-probe.yaml b/src/go/k8s/tests/e2e/decommission/02-probe.yaml new file mode 100644 index 000000000000..b3306231ee82 --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/02-probe.yaml @@ -0,0 +1,30 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: wait-for-3-brokers +spec: + backoffLimit: 10 + template: + spec: + activeDeadlineSeconds: 90 + containers: + - name: curl + image: curlimages/curl:latest + env: + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + command: + - /bin/sh + - -c + - -ex + args: + - > + url=http://decommissioning-0.decommissioning.$NAMESPACE.svc.cluster.local:9644/v1/brokers + res=$(curl --silent -L $url | tr '{' '\n' | grep node_id | wc -l) && + echo $res > /dev/termination-log && + if [[ "$res" != "3" ]]; then + exit 1; + fi + restartPolicy: Never diff --git a/src/go/k8s/tests/e2e/decommission/03-assert.yaml b/src/go/k8s/tests/e2e/decommission/03-assert.yaml new file mode 100644 index 000000000000..ff279070c069 --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/03-assert.yaml @@ -0,0 +1,22 @@ +apiVersion: redpanda.vectorized.io/v1alpha1 +kind: Cluster +metadata: + name: decommissioning +status: + replicas: 2 +--- + +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +collectors: + - type: pod + selector: app.kubernetes.io/name=redpanda + tail: -1 + - type: pod + namespace: redpanda-system + selector: control-plane=controller-manager + tail: -1 + - type: command + command: kubectl get clusters -o jsonpath={@} -n $NAMESPACE + - type: command + command: kubectl get pods -o jsonpath={@} -n $NAMESPACE diff --git a/src/go/k8s/tests/e2e/decommission/03-redpanda-downscale.yaml b/src/go/k8s/tests/e2e/decommission/03-redpanda-downscale.yaml new file mode 100644 index 000000000000..4ef80085dfdd --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/03-redpanda-downscale.yaml @@ -0,0 +1,6 @@ +apiVersion: redpanda.vectorized.io/v1alpha1 +kind: Cluster +metadata: + name: decommissioning +spec: + replicas: 2 diff --git a/src/go/k8s/tests/e2e/decommission/04-assert.yaml b/src/go/k8s/tests/e2e/decommission/04-assert.yaml new file mode 100644 index 000000000000..875ada95dc8a --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/04-assert.yaml @@ -0,0 +1,29 @@ +apiVersion: v1 +kind: Pod +metadata: + labels: + job-name: wait-for-2-brokers +status: + containerStatuses: + - name: curl + state: + terminated: + message: | + 2 + phase: Succeeded +--- + +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +collectors: + - type: pod + selector: app.kubernetes.io/name=redpanda + tail: -1 + - type: pod + namespace: redpanda-system + selector: control-plane=controller-manager + tail: -1 + - type: command + command: kubectl get clusters -o jsonpath={@} -n $NAMESPACE + - type: command + command: kubectl get pods -o jsonpath={@} -n $NAMESPACE diff --git a/src/go/k8s/tests/e2e/decommission/04-probe.yaml b/src/go/k8s/tests/e2e/decommission/04-probe.yaml new file mode 100644 index 000000000000..c20a7a1e388e --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/04-probe.yaml @@ -0,0 +1,30 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: wait-for-2-brokers +spec: + backoffLimit: 10 + template: + spec: + activeDeadlineSeconds: 90 + containers: + - name: curl + image: curlimages/curl:latest + env: + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + command: + - /bin/sh + - -c + - -ex + args: + - > + url=http://decommissioning-0.decommissioning.$NAMESPACE.svc.cluster.local:9644/v1/brokers + res=$(curl --silent -L $url | tr '{' '\n' | grep node_id | wc -l) && + echo $res > /dev/termination-log && + if [[ "$res" != "2" ]]; then + exit 1; + fi + restartPolicy: Never diff --git a/src/go/k8s/tests/e2e/decommission/05-restore-downscaling.yaml b/src/go/k8s/tests/e2e/decommission/05-restore-downscaling.yaml new file mode 100644 index 000000000000..e4d660bfe6b9 --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/05-restore-downscaling.yaml @@ -0,0 +1,5 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: +# Downscaling will be set to the default value +- command: kubectl patch deployment redpanda-controller-manager -n redpanda-system --type='json' -p='[{"op":"remove", "path":"/spec/template/spec/containers/1/args/4"}]' diff --git a/src/go/k8s/tests/e2e/decommission/06-assert.yaml b/src/go/k8s/tests/e2e/decommission/06-assert.yaml new file mode 100644 index 000000000000..522965a7af16 --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/06-assert.yaml @@ -0,0 +1,22 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +commands: + - command: kubectl rollout status deployment redpanda-controller-manager -n redpanda-system + - command: hack/wait-for-webhook-ready.sh + +--- + +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +collectors: + - type: pod + selector: app.kubernetes.io/name=redpanda + tail: -1 + - type: pod + namespace: redpanda-system + selector: control-plane=controller-manager + tail: -1 + - type: command + command: kubectl get clusters -o jsonpath={@} -n $NAMESPACE + - type: command + command: kubectl get pods -o jsonpath={@} -n $NAMESPACE diff --git a/src/go/rpk/pkg/api/admin/api_broker.go b/src/go/rpk/pkg/api/admin/api_broker.go index 482532c1600f..75037c21bc9a 100644 --- a/src/go/rpk/pkg/api/admin/api_broker.go +++ b/src/go/rpk/pkg/api/admin/api_broker.go @@ -31,11 +31,21 @@ type MaintenanceStatus struct { Failed int `json:"failed"` } +// MembershipStatus enumerates possible membership states for brokers. +type MembershipStatus string + +const ( + // MembershipStatusActive indicates an active broker. + MembershipStatusActive MembershipStatus = "active" + // MembershipStatusDraining indicates that the broker is being drained, e.g. for decommission. + MembershipStatusDraining MembershipStatus = "draining" +) + // Broker is the information returned from the Redpanda admin broker endpoints. type Broker struct { NodeID int `json:"node_id"` NumCores int `json:"num_cores"` - MembershipStatus string `json:"membership_status"` + MembershipStatus MembershipStatus `json:"membership_status"` IsAlive *bool `json:"is_alive"` Version string `json:"version"` Maintenance *MaintenanceStatus `json:"maintenance_status"`