From 27119769e88dbb840564284f1f085bd93437f61f Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Mon, 13 Jun 2022 14:34:26 +0200 Subject: [PATCH 01/18] operator: split readyReplicas from replicas and add currentReplicas Now status fields have the following behaviour: - replicas: reflects StatefulSet status replicas (no longer readyReplicas) - readyReplicas: reflects StatefulSet status readyReplicas - currentReplicas: managed by the operator to dynamically change the current number of replicas, to gradually match user expectations --- src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go | 8 +++++++- .../crd/bases/redpanda.vectorized.io_clusters.yaml | 13 ++++++++++++- .../k8s/controllers/redpanda/cluster_controller.go | 6 ++++-- .../k8s/controllers/redpanda/metric_controller.go | 2 +- 4 files changed, 24 insertions(+), 5 deletions(-) diff --git a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go index fa55ec50f913..f35b6490b810 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go @@ -293,9 +293,15 @@ type ClusterStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file - // Replicas show how many nodes are working in the cluster + // Replicas show how many nodes have been created for the cluster // +optional Replicas int32 `json:"replicas"` + // ReadyReplicas is the number of Pods belonging to the cluster that have a Ready Condition. + // +optional + ReadyReplicas int32 `json:"readyReplicas,omitempty"` + // CurrentReplicas is the number of Pods that the controller currently wants to run for the cluster. + // +optional + CurrentReplicas int32 `json:"currentReplicas,omitempty"` // Nodes of the provisioned redpanda nodes // +optional Nodes NodesList `json:"nodes,omitempty"` diff --git a/src/go/k8s/config/crd/bases/redpanda.vectorized.io_clusters.yaml b/src/go/k8s/config/crd/bases/redpanda.vectorized.io_clusters.yaml index ef3383c82941..6f0eda065038 100644 --- a/src/go/k8s/config/crd/bases/redpanda.vectorized.io_clusters.yaml +++ b/src/go/k8s/config/crd/bases/redpanda.vectorized.io_clusters.yaml @@ -806,6 +806,11 @@ spec: - type type: object type: array + currentReplicas: + description: CurrentReplicas is the number of Pods that the controller + currently wants to run for the cluster. + format: int32 + type: integer nodes: description: Nodes of the provisioned redpanda nodes properties: @@ -908,8 +913,14 @@ spec: type: string type: object type: object + readyReplicas: + description: ReadyReplicas is the number of Pods belonging to the + cluster that have a Ready Condition. + format: int32 + type: integer replicas: - description: Replicas show how many nodes are working in the cluster + description: Replicas show how many nodes have been created for the + cluster format: int32 type: integer restarting: diff --git a/src/go/k8s/controllers/redpanda/cluster_controller.go b/src/go/k8s/controllers/redpanda/cluster_controller.go index 155e7f4cdba9..4e52a5eeabd3 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller.go @@ -324,7 +324,8 @@ func (r *ClusterReconciler) reportStatus( } cluster.Status.Nodes = *nodeList - cluster.Status.Replicas = sts.LastObservedState.Status.ReadyReplicas + cluster.Status.ReadyReplicas = sts.LastObservedState.Status.ReadyReplicas + cluster.Status.Replicas = sts.LastObservedState.Status.Replicas cluster.Status.Version = sts.Version() err = r.Status().Update(ctx, &cluster) @@ -353,7 +354,8 @@ func statusShouldBeUpdated( !reflect.DeepEqual(nodeList.ExternalPandaproxy, status.Nodes.ExternalPandaproxy) || !reflect.DeepEqual(nodeList.SchemaRegistry, status.Nodes.SchemaRegistry) || !reflect.DeepEqual(nodeList.ExternalBootstrap, status.Nodes.ExternalBootstrap)) || - status.Replicas != sts.LastObservedState.Status.ReadyReplicas || + status.Replicas != sts.LastObservedState.Status.Replicas || + status.ReadyReplicas != sts.LastObservedState.Status.ReadyReplicas || status.Version != sts.Version() } diff --git a/src/go/k8s/controllers/redpanda/metric_controller.go b/src/go/k8s/controllers/redpanda/metric_controller.go index ca5066318be3..9f7e2b843f37 100644 --- a/src/go/k8s/controllers/redpanda/metric_controller.go +++ b/src/go/k8s/controllers/redpanda/metric_controller.go @@ -94,7 +94,7 @@ func (r *ClusterMetricController) Reconcile( if err != nil { return ctrl.Result{}, err } - g.Set(float64(cl.Items[i].Status.Replicas)) + g.Set(float64(cl.Items[i].Status.ReadyReplicas)) curLabels[cl.Items[i].Name] = struct{}{} r.currentLabels[cl.Items[i].Name] = struct{}{} } From 8b3559d788887dcf05632b5bf4d93e0e0f13e0c2 Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Mon, 13 Jun 2022 14:36:58 +0200 Subject: [PATCH 02/18] operator: add decommissioningNode status field When decommissioning a node, the field is populated with the ordinal number of the node being decommissioned. In case of recommission, it also indicates the node being currently recommissioned. --- src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go | 3 +++ src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go | 5 +++++ .../config/crd/bases/redpanda.vectorized.io_clusters.yaml | 5 +++++ 3 files changed, 13 insertions(+) diff --git a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go index f35b6490b810..86868919d699 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go @@ -312,6 +312,9 @@ type ClusterStatus struct { // Indicates that a cluster is restarting due to an upgrade or a different reason // +optional Restarting bool `json:"restarting"` + // Indicates that a node is currently being decommissioned from the cluster and provides its ordinal number + // +optional + DecommissioningNode *int32 `json:"decommissioningNode,omitempty"` // Current version of the cluster. // +optional Version string `json:"version"` diff --git a/src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go b/src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go index 0f2d6a7927db..5d9dfd0a6513 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go @@ -221,6 +221,11 @@ func (in *ClusterSpec) DeepCopy() *ClusterSpec { func (in *ClusterStatus) DeepCopyInto(out *ClusterStatus) { *out = *in in.Nodes.DeepCopyInto(&out.Nodes) + if in.DecommissioningNode != nil { + in, out := &in.DecommissioningNode, &out.DecommissioningNode + *out = new(int32) + **out = **in + } if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions *out = make([]ClusterCondition, len(*in)) diff --git a/src/go/k8s/config/crd/bases/redpanda.vectorized.io_clusters.yaml b/src/go/k8s/config/crd/bases/redpanda.vectorized.io_clusters.yaml index 6f0eda065038..dbfa83971032 100644 --- a/src/go/k8s/config/crd/bases/redpanda.vectorized.io_clusters.yaml +++ b/src/go/k8s/config/crd/bases/redpanda.vectorized.io_clusters.yaml @@ -811,6 +811,11 @@ spec: currently wants to run for the cluster. format: int32 type: integer + decommissioningNode: + description: Indicates that a node is currently being decommissioned + from the cluster and provides its ordinal number + format: int32 + type: integer nodes: description: Nodes of the provisioned redpanda nodes properties: From 013118226a5012050ee911c2a9549eec5d4883ca Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Fri, 27 May 2022 11:04:19 +0200 Subject: [PATCH 03/18] operator: change webhook to allow decommissioning The replicas field can freely change and the controller will make sure that nodes are properly decommissioned. The only remaining restriction is that replicas cannot be 0 or nil. --- .../apis/redpanda/v1alpha1/cluster_webhook.go | 27 ++++++++++++++----- .../redpanda/v1alpha1/cluster_webhook_test.go | 19 ++++++++++--- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go b/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go index c271d566af4a..f4c61169cf22 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go @@ -136,6 +136,8 @@ func (r *Cluster) ValidateCreate() error { var allErrs field.ErrorList + allErrs = append(allErrs, r.validateScaling()...) + allErrs = append(allErrs, r.validateKafkaListeners()...) allErrs = append(allErrs, r.validateAdminListeners()...) @@ -173,12 +175,8 @@ func (r *Cluster) ValidateUpdate(old runtime.Object) error { oldCluster := old.(*Cluster) var allErrs field.ErrorList - if r.Spec.Replicas != nil && oldCluster.Spec.Replicas != nil && *r.Spec.Replicas < *oldCluster.Spec.Replicas { - allErrs = append(allErrs, - field.Invalid(field.NewPath("spec").Child("replicas"), - r.Spec.Replicas, - "scaling down is not supported")) - } + allErrs = append(allErrs, r.validateScaling()...) + allErrs = append(allErrs, r.validateKafkaListeners()...) allErrs = append(allErrs, r.validateAdminListeners()...) @@ -212,6 +210,23 @@ func (r *Cluster) ValidateUpdate(old runtime.Object) error { r.Name, allErrs) } +func (r *Cluster) validateScaling() field.ErrorList { + var allErrs field.ErrorList + if r.Spec.Replicas == nil { + allErrs = append(allErrs, + field.Invalid(field.NewPath("spec").Child("replicas"), + r.Spec.Replicas, + "replicas must be specified explicitly")) + } else if *r.Spec.Replicas <= 0 { + allErrs = append(allErrs, + field.Invalid(field.NewPath("spec").Child("replicas"), + r.Spec.Replicas, + "downscaling is not allowed to less than 1 instance")) + } + + return allErrs +} + func (r *Cluster) validateAdminListeners() field.ErrorList { var allErrs field.ErrorList externalAdmin := r.AdminAPIExternal() diff --git a/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook_test.go b/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook_test.go index 7533669c7475..4eab69e3e275 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook_test.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook_test.go @@ -17,6 +17,7 @@ import ( cmmeta "github.com/jetstack/cert-manager/pkg/apis/meta/v1" "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -200,8 +201,8 @@ func TestDefault(t *testing.T) { } func TestValidateUpdate(t *testing.T) { - var replicas1 int32 = 1 - var replicas2 int32 = 2 + var replicas0 int32 + var replicas3 int32 = 3 redpandaCluster := &v1alpha1.Cluster{ ObjectMeta: metav1.ObjectMeta{ @@ -209,7 +210,7 @@ func TestValidateUpdate(t *testing.T) { Namespace: "", }, Spec: v1alpha1.ClusterSpec{ - Replicas: pointer.Int32Ptr(replicas2), + Replicas: pointer.Int32Ptr(replicas3), Configuration: v1alpha1.RedpandaConfig{}, Resources: v1alpha1.RedpandaResourceRequirements{ ResourceRequirements: corev1.ResourceRequirements{ @@ -224,7 +225,7 @@ func TestValidateUpdate(t *testing.T) { } updatedCluster := redpandaCluster.DeepCopy() - updatedCluster.Spec.Replicas = &replicas1 + updatedCluster.Spec.Replicas = &replicas0 updatedCluster.Spec.Configuration = v1alpha1.RedpandaConfig{ KafkaAPI: []v1alpha1.KafkaAPI{ { @@ -272,6 +273,15 @@ func TestValidateUpdate(t *testing.T) { } } +func TestNilReplicasIsNotAllowed(t *testing.T) { + rpCluster := validRedpandaCluster() + err := rpCluster.ValidateCreate() + require.Nil(t, err, "Initial cluster is not valid") + rpCluster.Spec.Replicas = nil + err = rpCluster.ValidateCreate() + assert.Error(t, err) +} + //nolint:funlen // this is ok for a test func TestValidateUpdate_NoError(t *testing.T) { var replicas2 int32 = 2 @@ -1097,6 +1107,7 @@ func validRedpandaCluster() *v1alpha1.Cluster { Namespace: "", }, Spec: v1alpha1.ClusterSpec{ + Replicas: pointer.Int32Ptr(1), Configuration: v1alpha1.RedpandaConfig{ KafkaAPI: []v1alpha1.KafkaAPI{{Port: 124}}, AdminAPI: []v1alpha1.AdminAPI{{Port: 126}}, From d6dac468e6886fd3707d33f9b551699a7f50e12a Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Thu, 26 May 2022 14:45:23 +0200 Subject: [PATCH 04/18] operator: move types to their own package to avoid dependency loop This allows the pkg/resources package to use the admin API internal interface. --- src/go/k8s/controllers/redpanda/cluster_controller.go | 3 ++- src/go/k8s/controllers/redpanda/suite_test.go | 3 ++- src/go/k8s/pkg/admin/admin.go | 4 ++-- src/go/k8s/pkg/resources/certmanager/pki.go | 5 +++-- src/go/k8s/pkg/resources/certmanager/type_helpers.go | 3 ++- src/go/k8s/pkg/resources/configmap.go | 7 ++++--- src/go/k8s/pkg/resources/statefulset.go | 9 +++++---- src/go/k8s/pkg/resources/{ => types}/tls_types.go | 3 ++- 8 files changed, 22 insertions(+), 15 deletions(-) rename src/go/k8s/pkg/resources/{ => types}/tls_types.go (96%) diff --git a/src/go/k8s/controllers/redpanda/cluster_controller.go b/src/go/k8s/controllers/redpanda/cluster_controller.go index 4e52a5eeabd3..852508b030cd 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller.go @@ -24,6 +24,7 @@ import ( "github.com/redpanda-data/redpanda/src/go/k8s/pkg/networking" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/certmanager" + resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" appsv1 "k8s.io/api/apps/v1" @@ -497,7 +498,7 @@ func (r *ClusterReconciler) setInitialSuperUserPassword( ctx context.Context, redpandaCluster *redpandav1alpha1.Cluster, fqdn string, - adminTLSConfigProvider resources.AdminTLSConfigProvider, + adminTLSConfigProvider resourcetypes.AdminTLSConfigProvider, objs []types.NamespacedName, ) error { adminAPI, err := r.AdminAPIClientFactory(ctx, r, redpandaCluster, fqdn, adminTLSConfigProvider) diff --git a/src/go/k8s/controllers/redpanda/suite_test.go b/src/go/k8s/controllers/redpanda/suite_test.go index b55c9090e66e..5282a8cc464f 100644 --- a/src/go/k8s/controllers/redpanda/suite_test.go +++ b/src/go/k8s/controllers/redpanda/suite_test.go @@ -29,6 +29,7 @@ import ( adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/configuration" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -92,7 +93,7 @@ var _ = BeforeSuite(func(done Done) { _ client.Reader, _ *redpandav1alpha1.Cluster, _ string, - _ resources.AdminTLSConfigProvider, + _ types.AdminTLSConfigProvider, ) (adminutils.AdminAPIClient, error) { return testAdminAPI, nil } diff --git a/src/go/k8s/pkg/admin/admin.go b/src/go/k8s/pkg/admin/admin.go index 871956a5c4c3..86b2e86d3f26 100644 --- a/src/go/k8s/pkg/admin/admin.go +++ b/src/go/k8s/pkg/admin/admin.go @@ -16,7 +16,7 @@ import ( "fmt" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" - "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -35,7 +35,7 @@ func NewInternalAdminAPI( k8sClient client.Reader, redpandaCluster *redpandav1alpha1.Cluster, fqdn string, - adminTLSProvider resources.AdminTLSConfigProvider, + adminTLSProvider types.AdminTLSConfigProvider, ) (AdminAPIClient, error) { adminInternal := redpandaCluster.AdminAPIInternal() if adminInternal == nil { diff --git a/src/go/k8s/pkg/resources/certmanager/pki.go b/src/go/k8s/pkg/resources/certmanager/pki.go index 82028ab87bf6..e77a8c4f6016 100644 --- a/src/go/k8s/pkg/resources/certmanager/pki.go +++ b/src/go/k8s/pkg/resources/certmanager/pki.go @@ -17,6 +17,7 @@ import ( "github.com/go-logr/logr" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" + resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -83,11 +84,11 @@ func (r *PkiReconciler) Ensure(ctx context.Context) error { } // StatefulSetVolumeProvider returns volume provider for all TLS certificates -func (r *PkiReconciler) StatefulSetVolumeProvider() resources.StatefulsetTLSVolumeProvider { +func (r *PkiReconciler) StatefulSetVolumeProvider() resourcetypes.StatefulsetTLSVolumeProvider { return r.clusterCertificates } // AdminAPIConfigProvider returns provider of admin TLS configuration -func (r *PkiReconciler) AdminAPIConfigProvider() resources.AdminTLSConfigProvider { +func (r *PkiReconciler) AdminAPIConfigProvider() resourcetypes.AdminTLSConfigProvider { return r.clusterCertificates } diff --git a/src/go/k8s/pkg/resources/certmanager/type_helpers.go b/src/go/k8s/pkg/resources/certmanager/type_helpers.go index 3b3942ff7546..f0a676af5371 100644 --- a/src/go/k8s/pkg/resources/certmanager/type_helpers.go +++ b/src/go/k8s/pkg/resources/certmanager/type_helpers.go @@ -20,6 +20,7 @@ import ( cmmetav1 "github.com/jetstack/cert-manager/pkg/apis/meta/v1" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" + resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -455,7 +456,7 @@ func (cc *ClusterCertificates) Volumes() ( ) { var vols []corev1.Volume var mounts []corev1.VolumeMount - mountPoints := resources.GetTLSMountPoints() + mountPoints := resourcetypes.GetTLSMountPoints() vol, mount := secretVolumesForTLS(cc.kafkaAPI.nodeCertificateName(), cc.kafkaAPI.clientCertificates, redpandaCertVolName, redpandaCAVolName, mountPoints.KafkaAPI.NodeCertMountDir, mountPoints.KafkaAPI.ClientCAMountDir) vols = append(vols, vol...) diff --git a/src/go/k8s/pkg/resources/configmap.go b/src/go/k8s/pkg/resources/configmap.go index d747ec8d8c42..6e2593deb7f2 100644 --- a/src/go/k8s/pkg/resources/configmap.go +++ b/src/go/k8s/pkg/resources/configmap.go @@ -25,6 +25,7 @@ import ( "github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/configuration" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates" + resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -216,7 +217,7 @@ func (r *ConfigMapResource) CreateConfiguration( ) (*configuration.GlobalConfiguration, error) { cfg := configuration.For(r.pandaCluster.Spec.Version) cfg.NodeConfiguration = *config.Default() - mountPoints := GetTLSMountPoints() + mountPoints := resourcetypes.GetTLSMountPoints() c := r.pandaCluster.Spec.Configuration cr := &cfg.NodeConfiguration.Redpanda @@ -525,7 +526,7 @@ func (r *ConfigMapResource) prepareSchemaRegistryClient( } func (r *ConfigMapResource) preparePandaproxyTLS( - cfgRpk *config.Config, mountPoints *TLSMountPoints, + cfgRpk *config.Config, mountPoints *resourcetypes.TLSMountPoints, ) { tlsListener := r.pandaCluster.PandaproxyAPITLS() if tlsListener != nil { @@ -550,7 +551,7 @@ func (r *ConfigMapResource) preparePandaproxyTLS( } func (r *ConfigMapResource) prepareSchemaRegistryTLS( - cfgRpk *config.Config, mountPoints *TLSMountPoints, + cfgRpk *config.Config, mountPoints *resourcetypes.TLSMountPoints, ) { if r.pandaCluster.Spec.Configuration.SchemaRegistry != nil && r.pandaCluster.Spec.Configuration.SchemaRegistry.TLS != nil { diff --git a/src/go/k8s/pkg/resources/statefulset.go b/src/go/k8s/pkg/resources/statefulset.go index 759601dacc9d..b2597b5fce7f 100644 --- a/src/go/k8s/pkg/resources/statefulset.go +++ b/src/go/k8s/pkg/resources/statefulset.go @@ -22,6 +22,7 @@ import ( redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates" + resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -84,8 +85,8 @@ type StatefulSetResource struct { serviceName string nodePortName types.NamespacedName nodePortSvc corev1.Service - volumeProvider StatefulsetTLSVolumeProvider - adminTLSConfigProvider AdminTLSConfigProvider + volumeProvider resourcetypes.StatefulsetTLSVolumeProvider + adminTLSConfigProvider resourcetypes.AdminTLSConfigProvider serviceAccountName string configuratorSettings ConfiguratorSettings // hash of configmap containing configuration for redpanda (node config only), it's injected to @@ -106,8 +107,8 @@ func NewStatefulSet( serviceFQDN string, serviceName string, nodePortName types.NamespacedName, - volumeProvider StatefulsetTLSVolumeProvider, - adminTLSConfigProvider AdminTLSConfigProvider, + volumeProvider resourcetypes.StatefulsetTLSVolumeProvider, + adminTLSConfigProvider resourcetypes.AdminTLSConfigProvider, serviceAccountName string, configuratorSettings ConfiguratorSettings, nodeConfigMapHashGetter func(context.Context) (string, error), diff --git a/src/go/k8s/pkg/resources/tls_types.go b/src/go/k8s/pkg/resources/types/tls_types.go similarity index 96% rename from src/go/k8s/pkg/resources/tls_types.go rename to src/go/k8s/pkg/resources/types/tls_types.go index 23fcb0ea68d0..5c2b48cfd0db 100644 --- a/src/go/k8s/pkg/resources/tls_types.go +++ b/src/go/k8s/pkg/resources/types/tls_types.go @@ -7,7 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 -package resources +// Package types contains useful types for dealing with resources +package types import ( "context" From e9ecea7bc512ea1c7a634f355a08fcbc6a186f89 Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Wed, 8 Jun 2022 16:52:31 +0200 Subject: [PATCH 05/18] rpk: add enum for membership status --- src/go/rpk/pkg/api/admin/api_broker.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/go/rpk/pkg/api/admin/api_broker.go b/src/go/rpk/pkg/api/admin/api_broker.go index 482532c1600f..75037c21bc9a 100644 --- a/src/go/rpk/pkg/api/admin/api_broker.go +++ b/src/go/rpk/pkg/api/admin/api_broker.go @@ -31,11 +31,21 @@ type MaintenanceStatus struct { Failed int `json:"failed"` } +// MembershipStatus enumerates possible membership states for brokers. +type MembershipStatus string + +const ( + // MembershipStatusActive indicates an active broker. + MembershipStatusActive MembershipStatus = "active" + // MembershipStatusDraining indicates that the broker is being drained, e.g. for decommission. + MembershipStatusDraining MembershipStatus = "draining" +) + // Broker is the information returned from the Redpanda admin broker endpoints. type Broker struct { NodeID int `json:"node_id"` NumCores int `json:"num_cores"` - MembershipStatus string `json:"membership_status"` + MembershipStatus MembershipStatus `json:"membership_status"` IsAlive *bool `json:"is_alive"` Version string `json:"version"` Maintenance *MaintenanceStatus `json:"maintenance_status"` From f8a7bcd25255cbaa2d200116c504d943264717bb Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Wed, 8 Jun 2022 16:54:20 +0200 Subject: [PATCH 06/18] operator: allow scoping internal admin API to specific nodes This allows to get local information from brokers, such as the local configuration. --- src/go/k8s/controllers/redpanda/suite_test.go | 7 ++++++ src/go/k8s/pkg/admin/admin.go | 22 ++++++++++++++----- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/src/go/k8s/controllers/redpanda/suite_test.go b/src/go/k8s/controllers/redpanda/suite_test.go index 5282a8cc464f..b08a38bc814a 100644 --- a/src/go/k8s/controllers/redpanda/suite_test.go +++ b/src/go/k8s/controllers/redpanda/suite_test.go @@ -94,6 +94,7 @@ var _ = BeforeSuite(func(done Done) { _ *redpandav1alpha1.Cluster, _ string, _ types.AdminTLSConfigProvider, + _ ...int32, ) (adminutils.AdminAPIClient, error) { return testAdminAPI, nil } @@ -366,6 +367,12 @@ func (m *mockAdminAPI) SetUnavailable(unavailable bool) { m.unavailable = unavailable } +func (m *mockAdminAPI) GetNodeConfig( + _ context.Context, +) (admin.NodeConfig, error) { + return admin.NodeConfig{}, nil +} + func (m *mockAdminAPI) SetDirectValidationEnabled(directValidation bool) { m.monitor.Lock() defer m.monitor.Unlock() diff --git a/src/go/k8s/pkg/admin/admin.go b/src/go/k8s/pkg/admin/admin.go index 86b2e86d3f26..f6aefd73669b 100644 --- a/src/go/k8s/pkg/admin/admin.go +++ b/src/go/k8s/pkg/admin/admin.go @@ -36,6 +36,7 @@ func NewInternalAdminAPI( redpandaCluster *redpandav1alpha1.Cluster, fqdn string, adminTLSProvider types.AdminTLSConfigProvider, + ordinals ...int32, ) (AdminAPIClient, error) { adminInternal := redpandaCluster.AdminAPIInternal() if adminInternal == nil { @@ -53,11 +54,20 @@ func NewInternalAdminAPI( adminInternalPort := adminInternal.Port - var urls []string - replicas := *redpandaCluster.Spec.Replicas + if len(ordinals) == 0 { + // Not a specific node, just go through all them + replicas := redpandaCluster.Status.CurrentReplicas + if replicas <= 0 { + replicas = *redpandaCluster.Spec.Replicas + } - for i := int32(0); i < replicas; i++ { - urls = append(urls, fmt.Sprintf("%s-%d.%s:%d", redpandaCluster.Name, i, fqdn, adminInternalPort)) + for i := int32(0); i < replicas; i++ { + ordinals = append(ordinals, i) + } + } + var urls []string + for _, on := range ordinals { + urls = append(urls, fmt.Sprintf("%s-%d.%s:%d", redpandaCluster.Name, on, fqdn, adminInternalPort)) } adminAPI, err := admin.NewAdminAPI(urls, admin.BasicCredentials{}, tlsConfig) @@ -74,6 +84,7 @@ type AdminAPIClient interface { ClusterConfigStatus(ctx context.Context, sendToLeader bool) (admin.ConfigStatusResponse, error) ClusterConfigSchema(ctx context.Context) (admin.ConfigSchema, error) PatchClusterConfig(ctx context.Context, upsert map[string]interface{}, remove []string) (admin.ClusterConfigWriteResult, error) + GetNodeConfig(ctx context.Context) (admin.NodeConfig, error) CreateUser(ctx context.Context, username, password, mechanism string) error @@ -89,7 +100,8 @@ type AdminAPIClientFactory func( k8sClient client.Reader, redpandaCluster *redpandav1alpha1.Cluster, fqdn string, - adminTLSProvider resources.AdminTLSConfigProvider, + adminTLSProvider types.AdminTLSConfigProvider, + ordinals ...int32, ) (AdminAPIClient, error) var _ AdminAPIClientFactory = NewInternalAdminAPI From 1913822212bb66bb75c590c996c785b9d5bc49f3 Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Fri, 27 May 2022 16:44:53 +0200 Subject: [PATCH 07/18] operator: remove stack trace from logs when delay is requested This produced a stacktrace in the logs, while waiting for a condition. --- src/go/k8s/controllers/redpanda/cluster_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/go/k8s/controllers/redpanda/cluster_controller.go b/src/go/k8s/controllers/redpanda/cluster_controller.go index 852508b030cd..a1d5ecaaed13 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller.go @@ -180,7 +180,7 @@ func (r *ClusterReconciler) Reconcile( var e *resources.RequeueAfterError if errors.As(err, &e) { - log.Error(e, e.Msg) + log.Info(e.Error()) return ctrl.Result{RequeueAfter: e.RequeueAfter}, nil } From 33c103419047fe809983d549297a664e0ae61290 Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Tue, 14 Jun 2022 15:37:54 +0200 Subject: [PATCH 08/18] operator: enable decommission API functions in internal admin API --- src/go/k8s/controllers/redpanda/suite_test.go | 12 ++++++++++++ src/go/k8s/pkg/admin/admin.go | 4 ++++ 2 files changed, 16 insertions(+) diff --git a/src/go/k8s/controllers/redpanda/suite_test.go b/src/go/k8s/controllers/redpanda/suite_test.go index b08a38bc814a..b822d0293001 100644 --- a/src/go/k8s/controllers/redpanda/suite_test.go +++ b/src/go/k8s/controllers/redpanda/suite_test.go @@ -379,6 +379,18 @@ func (m *mockAdminAPI) SetDirectValidationEnabled(directValidation bool) { m.directValidation = directValidation } +func (m *mockAdminAPI) Brokers(_ context.Context) ([]admin.Broker, error) { + return nil, nil +} + +func (m *mockAdminAPI) DecommissionBroker(_ context.Context, _ int) error { + return nil +} + +func (m *mockAdminAPI) RecommissionBroker(_ context.Context, _ int) error { + return nil +} + func makeCopy(input, output interface{}) { ser, err := json.Marshal(input) if err != nil { diff --git a/src/go/k8s/pkg/admin/admin.go b/src/go/k8s/pkg/admin/admin.go index f6aefd73669b..ea575687ddf1 100644 --- a/src/go/k8s/pkg/admin/admin.go +++ b/src/go/k8s/pkg/admin/admin.go @@ -89,6 +89,10 @@ type AdminAPIClient interface { CreateUser(ctx context.Context, username, password, mechanism string) error GetFeatures(ctx context.Context) (admin.FeaturesResponse, error) + + Brokers(ctx context.Context) ([]admin.Broker, error) + DecommissionBroker(ctx context.Context, node int) error + RecommissionBroker(ctx context.Context, node int) error } var _ AdminAPIClient = &admin.AdminAPI{} From 4d74d4feb42fab57493fe1b4f6a15216a63f0315 Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Wed, 15 Jun 2022 12:57:43 +0200 Subject: [PATCH 09/18] operator: add scale handler to properly decommission and recommission nodes This adds a handler that correctly manages upscaling and downscaling the cluster, decommissioning nodes wheh needed. The handler uses `status.currentReplicas` to signal the amount of replicas that all subcontrollers should materialize. When a cluster is downscaled, the handler first tries to decommission the last node via admin API, then decreases the value of `status.currentReplicas`, to remove the node only when the cluster allows it. In case the cluster refuses to decommission a node (e.g. min replicas on a topic higher than the desired number of nodes), the user can increase `spec.replicas` to trigger a recommission of the node. --- .../resources/resource_integration_test.go | 5 +- src/go/k8s/pkg/resources/statefulset.go | 19 +- src/go/k8s/pkg/resources/statefulset_scale.go | 255 ++++++++++++++++++ src/go/k8s/pkg/resources/statefulset_test.go | 8 - 4 files changed, 276 insertions(+), 11 deletions(-) create mode 100644 src/go/k8s/pkg/resources/statefulset_scale.go diff --git a/src/go/k8s/pkg/resources/resource_integration_test.go b/src/go/k8s/pkg/resources/resource_integration_test.go index 0646f6ecd93c..c12ca2b52682 100644 --- a/src/go/k8s/pkg/resources/resource_integration_test.go +++ b/src/go/k8s/pkg/resources/resource_integration_test.go @@ -21,6 +21,7 @@ import ( redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" res "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -76,6 +77,8 @@ func TestEnsure_StatefulSet(t *testing.T) { cluster := pandaCluster() cluster = cluster.DeepCopy() cluster.Name = "ensure-integration-cluster" + err := c.Create(context.Background(), cluster) + require.NoError(t, err) sts := res.NewStatefulSet( c, @@ -95,7 +98,7 @@ func TestEnsure_StatefulSet(t *testing.T) { func(ctx context.Context) (string, error) { return hash, nil }, ctrl.Log.WithName("test")) - err := sts.Ensure(context.Background()) + err = sts.Ensure(context.Background()) assert.NoError(t, err) actual := &v1.StatefulSet{} diff --git a/src/go/k8s/pkg/resources/statefulset.go b/src/go/k8s/pkg/resources/statefulset.go index b2597b5fce7f..65d436d5471b 100644 --- a/src/go/k8s/pkg/resources/statefulset.go +++ b/src/go/k8s/pkg/resources/statefulset.go @@ -167,8 +167,15 @@ func (r *StatefulSetResource) Ensure(ctx context.Context) error { return fmt.Errorf("error while fetching StatefulSet resource: %w", err) } r.LastObservedState = &sts + r.logger.Info("Running update", "resource name", r.Key().Name) - return r.runUpdate(ctx, &sts, obj.(*appsv1.StatefulSet)) + err = r.runUpdate(ctx, &sts, obj.(*appsv1.StatefulSet)) + if err != nil { + return err + } + + r.logger.Info("Running scale handler", "resource name", r.Key().Name) + return r.handleScaling(ctx) } // GetCentralizedConfigurationHashFromCluster retrieves the current centralized configuratino hash from the statefulset @@ -270,6 +277,14 @@ func (r *StatefulSetResource) obj( externalAddressType = externalListener.External.PreferredAddressType } tlsVolumes, tlsVolumeMounts := r.volumeProvider.Volumes() + + // We set statefulset replicas via status.currentReplicas in order to control it from the handleScaling function + replicas := r.pandaCluster.Status.CurrentReplicas + if replicas <= 0 { + // Until the state is initialized + replicas = *r.pandaCluster.Spec.Replicas + } + ss := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Namespace: r.Key().Namespace, @@ -281,7 +296,7 @@ func (r *StatefulSetResource) obj( APIVersion: "apps/v1", }, Spec: appsv1.StatefulSetSpec{ - Replicas: r.pandaCluster.Spec.Replicas, + Replicas: &replicas, PodManagementPolicy: appsv1.ParallelPodManagement, Selector: clusterLabels.AsAPISelector(), UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ diff --git a/src/go/k8s/pkg/resources/statefulset_scale.go b/src/go/k8s/pkg/resources/statefulset_scale.go new file mode 100644 index 000000000000..7911c58ded4f --- /dev/null +++ b/src/go/k8s/pkg/resources/statefulset_scale.go @@ -0,0 +1,255 @@ +// Copyright 2022 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package resources + +import ( + "context" + "fmt" + "time" + + "github.com/go-logr/logr" + redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" + adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + k8sclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + DecommissionRequeueDuration = time.Second * 10 +) + +// handleScaling is called to detect cases of replicas change and apply them to the cluster +func (r *StatefulSetResource) handleScaling(ctx context.Context) error { + if r.pandaCluster.Status.DecommissioningNode != nil { + decommissionTargetReplicas := *r.pandaCluster.Status.DecommissioningNode + if *r.pandaCluster.Spec.Replicas > decommissionTargetReplicas { + // Decommissioning can also be canceled and we need to recommission + return r.handleRecommission(ctx) + } + return r.handleDecommission(ctx) + } + + if r.pandaCluster.Status.CurrentReplicas == 0 { + // Initialize the status currentReplicas + r.pandaCluster.Status.CurrentReplicas = *r.pandaCluster.Spec.Replicas + return r.Status().Update(ctx, r.pandaCluster) + } + + if *r.pandaCluster.Spec.Replicas == r.pandaCluster.Status.CurrentReplicas { + // No changes to replicas, we do nothing here + return nil + } + + if *r.pandaCluster.Spec.Replicas > r.pandaCluster.Status.CurrentReplicas { + r.logger.Info("Upscaling cluster", "replicas", *r.pandaCluster.Spec.Replicas) + // Upscaling request: this is already handled by Redpanda, so we just increase status currentReplicas + return setCurrentReplicas(ctx, r, r.pandaCluster, *r.pandaCluster.Spec.Replicas, r.logger) + } + + // User required replicas is lower than current replicas (currentReplicas): start the decommissioning process + targetOrdinal := r.pandaCluster.Status.CurrentReplicas - 1 // Always decommission last node + r.logger.Info("Start decommission of last broker node", "ordinal", targetOrdinal) + r.pandaCluster.Status.DecommissioningNode = &targetOrdinal + return r.Status().Update(ctx, r.pandaCluster) +} + +// handleDecommission manages cases of node decommissioning +func (r *StatefulSetResource) handleDecommission(ctx context.Context) error { + targetReplicas := *r.pandaCluster.Status.DecommissioningNode + r.logger.Info("Handling cluster in decommissioning phase", "target replicas", targetReplicas) + + adminAPI, err := r.getAdminAPIClient(ctx) + if err != nil { + return err + } + + broker, err := getNodeInfoFromCluster(ctx, *r.pandaCluster.Status.DecommissioningNode, adminAPI) + if err != nil { + return err + } + + if broker != nil { + r.logger.Info("Broker still exists in the cluster", "node_id", broker.NodeID) + if broker.MembershipStatus != admin.MembershipStatusDraining { + // We ask to decommission since it does not seem done + err = adminAPI.DecommissionBroker(ctx, broker.NodeID) + if err != nil { + return fmt.Errorf("error while trying to decommission node %d in cluster %s: %w", broker.NodeID, r.pandaCluster.Name, err) + } + r.logger.Info("Node marked for decommissioning in cluster", "node_id", broker.NodeID) + } + + // The draining phase must always be completed with all nodes running, to let single-replica partitions be transferred. + // The value may diverge in case we restarted the process after a complete scale down. + drainingReplicas := targetReplicas + 1 + if r.pandaCluster.Status.CurrentReplicas != drainingReplicas { + return setCurrentReplicas(ctx, r, r.pandaCluster, drainingReplicas, r.logger) + } + + // Wait until the node is fully drained (or wait forever if the cluster does not allow decommissioning of that specific node) + return &RequeueAfterError{ + RequeueAfter: DecommissionRequeueDuration, + Msg: fmt.Sprintf("Waiting for node %d to be decommissioned from cluster", broker.NodeID), + } + } + + // Broker is now missing from cluster API + r.logger.Info("Node is not registered in the cluster: initializing downscale", "node_id", *r.pandaCluster.Status.DecommissioningNode) + + // We set status.currentReplicas accordingly to trigger scaling down of the statefulset + if err = setCurrentReplicas(ctx, r, r.pandaCluster, targetReplicas, r.logger); err != nil { + return err + } + + scaledDown, err := r.verifyRunningCount(ctx, targetReplicas) + if err != nil { + return err + } + if !scaledDown { + return &RequeueAfterError{ + RequeueAfter: DecommissionRequeueDuration, + Msg: fmt.Sprintf("Waiting for statefulset to downscale to %d replicas", targetReplicas), + } + } + + // There's a chance that the node was initially not present in the broker list, but appeared after we started to scale down. + // Since the node may hold data that need to be propagated to other nodes, we need to restart it to let the decommission process finish. + broker, err = getNodeInfoFromCluster(ctx, *r.pandaCluster.Status.DecommissioningNode, adminAPI) + if err != nil { + return err + } + if broker != nil { + // Node reappeared in the cluster, we restart the process to handle it + return &NodeReappearingError{NodeID: broker.NodeID} + } + + r.logger.Info("Decommissioning process successfully completed", "node_id", *r.pandaCluster.Status.DecommissioningNode) + r.pandaCluster.Status.DecommissioningNode = nil + return r.Status().Update(ctx, r.pandaCluster) +} + +// handleRecommission manages cases of nodes being recommissioned after a failed/wrong decommission +func (r *StatefulSetResource) handleRecommission(ctx context.Context) error { + r.logger.Info("Handling cluster in recommissioning phase") + + // First we ensure we've enough replicas to let the recommissioning node run + targetReplicas := *r.pandaCluster.Status.DecommissioningNode + 1 + err := setCurrentReplicas(ctx, r, r.pandaCluster, targetReplicas, r.logger) + if err != nil { + return err + } + + adminAPI, err := r.getAdminAPIClient(ctx) + if err != nil { + return err + } + + broker, err := getNodeInfoFromCluster(ctx, *r.pandaCluster.Status.DecommissioningNode, adminAPI) + if err != nil { + return err + } + + if broker == nil || broker.MembershipStatus != admin.MembershipStatusActive { + err = adminAPI.RecommissionBroker(ctx, int(*r.pandaCluster.Status.DecommissioningNode)) + if err != nil { + return fmt.Errorf("error while trying to recommission node %d in cluster %s: %w", *r.pandaCluster.Status.DecommissioningNode, r.pandaCluster.Name, err) + } + r.logger.Info("Node marked for being recommissioned in cluster", "node_id", *r.pandaCluster.Status.DecommissioningNode) + + return &RequeueAfterError{ + RequeueAfter: DecommissionRequeueDuration, + Msg: fmt.Sprintf("Waiting for node %d to be recommissioned into cluster %s", *r.pandaCluster.Status.DecommissioningNode, r.pandaCluster.Name), + } + } + + r.logger.Info("Recommissioning process successfully completed", "node_id", *r.pandaCluster.Status.DecommissioningNode) + r.pandaCluster.Status.DecommissioningNode = nil + return r.Status().Update(ctx, r.pandaCluster) +} + +func (r *StatefulSetResource) getAdminAPIClient( + ctx context.Context, ordinals ...int32, +) (adminutils.AdminAPIClient, error) { + return adminutils.NewInternalAdminAPI(ctx, r, r.pandaCluster, r.serviceFQDN, r.adminTLSConfigProvider, ordinals...) +} + +// verifyRunningCount checks if the statefulset is configured to run the given amount of replicas and that also pods match the expectations +func (r *StatefulSetResource) verifyRunningCount( + ctx context.Context, replicas int32, +) (bool, error) { + var sts appsv1.StatefulSet + if err := r.Get(ctx, r.Key(), &sts); err != nil { + return false, fmt.Errorf("could not get statefulset for checking replicas: %w", err) + } + if sts.Spec.Replicas == nil || *sts.Spec.Replicas != replicas || sts.Status.Replicas != replicas { + return false, nil + } + + var podList corev1.PodList + err := r.List(ctx, &podList, &k8sclient.ListOptions{ + Namespace: r.pandaCluster.Namespace, + LabelSelector: labels.ForCluster(r.pandaCluster).AsClientSelector(), + }) + if err != nil { + return false, fmt.Errorf("could not list pods for checking replicas: %w", err) + } + return len(podList.Items) == int(replicas), nil +} + +// getNodeInfoFromCluster allows to get broker information using the admin API +func getNodeInfoFromCluster( + ctx context.Context, ordinal int32, adminAPI adminutils.AdminAPIClient, +) (*admin.Broker, error) { + brokers, err := adminAPI.Brokers(ctx) + if err != nil { + return nil, fmt.Errorf("could not get the list of brokers for checking decommission: %w", err) + } + for i := range brokers { + if brokers[i].NodeID == int(ordinal) { + return &brokers[i], nil + } + } + return nil, nil +} + +// setCurrentReplicas allows to set the number of status.currentReplicas in the CR, which in turns controls the replicas +// assigned to the StatefulSet +func setCurrentReplicas( + ctx context.Context, + c k8sclient.Client, + pandaCluster *redpandav1alpha1.Cluster, + replicas int32, + logger logr.Logger, +) error { + if pandaCluster.Status.CurrentReplicas == replicas { + // Skip if already done + return nil + } + + logger.Info("Scaling StatefulSet", "replicas", replicas) + pandaCluster.Status.CurrentReplicas = replicas + if err := c.Status().Update(ctx, pandaCluster); err != nil { + return fmt.Errorf("could not scale cluster %s to %d replicas: %w", pandaCluster.Name, replicas, err) + } + logger.Info("StatefulSet scaled", "replicas", replicas) + return nil +} + +// NodeReappearingError indicates that a node has appeared in the cluster before completion of the a direct downscale +type NodeReappearingError struct { + NodeID int +} + +func (e *NodeReappearingError) Error() string { + return fmt.Sprintf("node has appeared in the cluster with id=%d", e.NodeID) +} diff --git a/src/go/k8s/pkg/resources/statefulset_test.go b/src/go/k8s/pkg/resources/statefulset_test.go index fd111c68333b..37eac1ec5723 100644 --- a/src/go/k8s/pkg/resources/statefulset_test.go +++ b/src/go/k8s/pkg/resources/statefulset_test.go @@ -33,13 +33,6 @@ func TestEnsure(t *testing.T) { cluster := pandaCluster() stsResource := stsFromCluster(cluster) - var newReplicas int32 = 3333 - - replicasUpdatedCluster := cluster.DeepCopy() - replicasUpdatedCluster.Spec.Replicas = &newReplicas - replicasUpdatedSts := stsFromCluster(cluster).DeepCopy() - replicasUpdatedSts.Spec.Replicas = &newReplicas - newResources := corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("1111"), corev1.ResourceMemory: resource.MustParse("2222Gi"), @@ -76,7 +69,6 @@ func TestEnsure(t *testing.T) { expectedObject *v1.StatefulSet }{ {"none existing", nil, cluster, stsResource}, - {"update replicas", stsResource, replicasUpdatedCluster, replicasUpdatedSts}, {"update resources", stsResource, resourcesUpdatedCluster, resourcesUpdatedSts}, {"update redpanda resources", stsResource, resourcesUpdatedRedpandaCluster, resourcesUpdatedSts}, {"disabled sidecar", nil, noSidecarCluster, noSidecarSts}, From 5d62c760fdb08ffd5b0599a8fa3d98476eec8917 Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Mon, 13 Jun 2022 14:53:12 +0200 Subject: [PATCH 10/18] operator: implement progressive initialization to let node 0 create initial raft group This tries to solve the problem with empty seed_servers on node 0. With this change, all fresh clusters will be initially set to 1 replica (via `status.currentReplicas`), until a cluster is created and the operator can verify it via admin API. Then the cluster is scaled to the number of instances desired by the user. After the cluster is initialized, and for the entire lifetime of the cluster, the `seed_servers` property will be populated with the full list of available servers, in every node of the cluster. This overcomes https://github.com/redpanda-data/redpanda/issues/333. Previously, node 0 was always forced to have an empty seed_servers property, but this caused problems when it lost the data dir, as it tried to create a brand-new cluster. With this change, even if node 0 loses the data dir, the seed_servers property will always point to other nodes, so it will try to join the existing cluster. --- .../apis/redpanda/v1alpha1/cluster_types.go | 24 ++++++++++++ .../redpanda/v1alpha1/cluster_types_test.go | 17 +++++++++ src/go/k8s/cmd/configurator/main.go | 8 ++-- src/go/k8s/pkg/admin/admin.go | 7 +--- src/go/k8s/pkg/resources/configmap.go | 6 +-- src/go/k8s/pkg/resources/statefulset.go | 6 +-- src/go/k8s/pkg/resources/statefulset_scale.go | 37 ++++++++++++++++++- 7 files changed, 87 insertions(+), 18 deletions(-) diff --git a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go index 86868919d699..5afd7688ef74 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go @@ -856,6 +856,30 @@ func (s *ClusterStatus) SetRestarting(restarting bool) { s.DeprecatedUpgrading = restarting } +// GetCurrentReplicas returns the current number of replicas that the controller wants to run. +// It returns 1 when not initialized (as fresh clusters start from 1 replica) +func (r *Cluster) GetCurrentReplicas() int32 { + if r.Status.CurrentReplicas <= 0 { + // Not initialized, let's give the computed value + return r.ComputeInitialCurrentReplicasField() + } + return r.Status.CurrentReplicas +} + +// ComputeInitialCurrentReplicasField calculates the initial value for status.currentReplicas. +// +// It needs to consider the following cases: +// - Fresh cluster: we start from 1 replicas, then upscale if needed (initialization to bypass https://github.com/redpanda-data/redpanda/issues/333) +// - Existing clusters: we keep spec.replicas as starting point +func (r *Cluster) ComputeInitialCurrentReplicasField() int32 { + if r.Status.Replicas > 1 || r.Status.ReadyReplicas > 1 || len(r.Status.Nodes.Internal) > 1 { + // A cluster seems to be already running, we start from the existing amount of replicas + return *r.Spec.Replicas + } + // Clusters start from a single replica, then upscale + return 1 +} + // TLSConfig is a generic TLS configuration type TLSConfig struct { Enabled bool `json:"enabled,omitempty"` diff --git a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go index 5203cae07751..e4a8e2fe3c1a 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/utils/pointer" ) // nolint:funlen // this is ok for a test @@ -216,3 +217,19 @@ func TestConditions(t *testing.T) { assert.Equal(t, condTime, cond2.LastTransitionTime) }) } + +func TestInitialReplicas(t *testing.T) { + cluster := v1alpha1.Cluster{} + cluster.Spec.Replicas = pointer.Int32(3) + assert.Equal(t, int32(1), cluster.GetCurrentReplicas()) + cluster.Status.Replicas = 2 + assert.Equal(t, int32(3), cluster.GetCurrentReplicas()) + cluster.Status.Replicas = 0 + cluster.Status.ReadyReplicas = 2 + assert.Equal(t, int32(3), cluster.GetCurrentReplicas()) + cluster.Status.ReadyReplicas = 0 + cluster.Status.Nodes.Internal = []string{"1", "2"} + assert.Equal(t, int32(3), cluster.GetCurrentReplicas()) + cluster.Status.Nodes.Internal = nil + assert.Equal(t, int32(1), cluster.GetCurrentReplicas()) +} diff --git a/src/go/k8s/cmd/configurator/main.go b/src/go/k8s/cmd/configurator/main.go index b019540579ec..8175c337f0c9 100644 --- a/src/go/k8s/cmd/configurator/main.go +++ b/src/go/k8s/cmd/configurator/main.go @@ -131,9 +131,11 @@ func main() { cfg.Redpanda.ID = int(hostIndex) - // First Redpanda node need to have cleared seed servers in order - // to form raft group 0 - if hostIndex == 0 { + // In case of a single seed server, the list should contain the current node itself. + // Normally the cluster is able to recognize it's talking to itself, except when the cluster is + // configured to use mutual TLS on the Kafka API (see Helm test). + // So, we clear the list of seeds to help Redpanda. + if len(cfg.Redpanda.SeedServers) == 1 { cfg.Redpanda.SeedServers = []config.SeedServer{} } diff --git a/src/go/k8s/pkg/admin/admin.go b/src/go/k8s/pkg/admin/admin.go index ea575687ddf1..1512dc6e25a5 100644 --- a/src/go/k8s/pkg/admin/admin.go +++ b/src/go/k8s/pkg/admin/admin.go @@ -56,16 +56,13 @@ func NewInternalAdminAPI( if len(ordinals) == 0 { // Not a specific node, just go through all them - replicas := redpandaCluster.Status.CurrentReplicas - if replicas <= 0 { - replicas = *redpandaCluster.Spec.Replicas - } + replicas := redpandaCluster.GetCurrentReplicas() for i := int32(0); i < replicas; i++ { ordinals = append(ordinals, i) } } - var urls []string + urls := make([]string, 0, len(ordinals)) for _, on := range ordinals { urls = append(urls, fmt.Sprintf("%s-%d.%s:%d", redpandaCluster.Name, on, fqdn, adminInternalPort)) } diff --git a/src/go/k8s/pkg/resources/configmap.go b/src/go/k8s/pkg/resources/configmap.go index 6e2593deb7f2..8a24e9977264 100644 --- a/src/go/k8s/pkg/resources/configmap.go +++ b/src/go/k8s/pkg/resources/configmap.go @@ -331,7 +331,7 @@ func (r *ConfigMapResource) CreateConfiguration( cfg.SetAdditionalRedpandaProperty("log_segment_size", logSegmentSize) - replicas := *r.pandaCluster.Spec.Replicas + replicas := r.pandaCluster.GetCurrentReplicas() for i := int32(0); i < replicas; i++ { cr.SeedServers = append(cr.SeedServers, config.SeedServer{ Host: config.SocketAddress{ @@ -454,7 +454,7 @@ func (r *ConfigMapResource) preparePandaproxyClient( return nil } - replicas := *r.pandaCluster.Spec.Replicas + replicas := r.pandaCluster.GetCurrentReplicas() cfg.NodeConfiguration.PandaproxyClient = &config.KafkaClient{} for i := int32(0); i < replicas; i++ { cfg.NodeConfiguration.PandaproxyClient.Brokers = append(cfg.NodeConfiguration.PandaproxyClient.Brokers, config.SocketAddress{ @@ -493,7 +493,7 @@ func (r *ConfigMapResource) prepareSchemaRegistryClient( return nil } - replicas := *r.pandaCluster.Spec.Replicas + replicas := r.pandaCluster.GetCurrentReplicas() cfg.NodeConfiguration.SchemaRegistryClient = &config.KafkaClient{} for i := int32(0); i < replicas; i++ { cfg.NodeConfiguration.SchemaRegistryClient.Brokers = append(cfg.NodeConfiguration.SchemaRegistryClient.Brokers, config.SocketAddress{ diff --git a/src/go/k8s/pkg/resources/statefulset.go b/src/go/k8s/pkg/resources/statefulset.go index 65d436d5471b..87c28b1ba1dc 100644 --- a/src/go/k8s/pkg/resources/statefulset.go +++ b/src/go/k8s/pkg/resources/statefulset.go @@ -279,11 +279,7 @@ func (r *StatefulSetResource) obj( tlsVolumes, tlsVolumeMounts := r.volumeProvider.Volumes() // We set statefulset replicas via status.currentReplicas in order to control it from the handleScaling function - replicas := r.pandaCluster.Status.CurrentReplicas - if replicas <= 0 { - // Until the state is initialized - replicas = *r.pandaCluster.Spec.Replicas - } + replicas := r.pandaCluster.GetCurrentReplicas() ss := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ diff --git a/src/go/k8s/pkg/resources/statefulset_scale.go b/src/go/k8s/pkg/resources/statefulset_scale.go index 7911c58ded4f..fece4bdc83d9 100644 --- a/src/go/k8s/pkg/resources/statefulset_scale.go +++ b/src/go/k8s/pkg/resources/statefulset_scale.go @@ -29,6 +29,7 @@ const ( ) // handleScaling is called to detect cases of replicas change and apply them to the cluster +// nolint:nestif // for clarity func (r *StatefulSetResource) handleScaling(ctx context.Context) error { if r.pandaCluster.Status.DecommissioningNode != nil { decommissionTargetReplicas := *r.pandaCluster.Status.DecommissioningNode @@ -40,8 +41,8 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error { } if r.pandaCluster.Status.CurrentReplicas == 0 { - // Initialize the status currentReplicas - r.pandaCluster.Status.CurrentReplicas = *r.pandaCluster.Spec.Replicas + // Initialize the currentReplicas field, so that it can be later controlled + r.pandaCluster.Status.CurrentReplicas = r.pandaCluster.ComputeInitialCurrentReplicasField() return r.Status().Update(ctx, r.pandaCluster) } @@ -52,6 +53,23 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error { if *r.pandaCluster.Spec.Replicas > r.pandaCluster.Status.CurrentReplicas { r.logger.Info("Upscaling cluster", "replicas", *r.pandaCluster.Spec.Replicas) + + // We care about upscaling only when the cluster is moving off 1 replica, which happen e.g. at cluster startup + if r.pandaCluster.Status.CurrentReplicas == 1 { + r.logger.Info("Waiting for first node to form a cluster before upscaling") + formed, err := r.isClusterFormed(ctx) + if err != nil { + return err + } + if !formed { + return &RequeueAfterError{ + RequeueAfter: DecommissionRequeueDuration, + Msg: fmt.Sprintf("Waiting for cluster to be formed before upscaling to %d replicas", *r.pandaCluster.Spec.Replicas), + } + } + r.logger.Info("Initial cluster has been formed") + } + // Upscaling request: this is already handled by Redpanda, so we just increase status currentReplicas return setCurrentReplicas(ctx, r, r.pandaCluster, *r.pandaCluster.Spec.Replicas, r.logger) } @@ -183,6 +201,21 @@ func (r *StatefulSetResource) getAdminAPIClient( return adminutils.NewInternalAdminAPI(ctx, r, r.pandaCluster, r.serviceFQDN, r.adminTLSConfigProvider, ordinals...) } +func (r *StatefulSetResource) isClusterFormed( + ctx context.Context, +) (bool, error) { + rootNodeAdminAPI, err := r.getAdminAPIClient(ctx, 0) + if err != nil { + return false, err + } + brokers, err := rootNodeAdminAPI.Brokers(ctx) + if err != nil { + // Eat the error and return that the cluster is not formed + return false, nil + } + return len(brokers) > 0, nil +} + // verifyRunningCount checks if the statefulset is configured to run the given amount of replicas and that also pods match the expectations func (r *StatefulSetResource) verifyRunningCount( ctx context.Context, replicas int32, From d48e9579d68ed2212dfdedda6201e75c49cec6ba Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Fri, 3 Jun 2022 17:59:59 +0200 Subject: [PATCH 11/18] operator: consider draining field when checking maintenance mode status Since nodes are auto-draining as part of their shutdown hooks, it happens that when maintenance mode is activated for a decommissioned node, no process is really started. We just exit if that is the case. --- src/go/k8s/pkg/resources/statefulset.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/go/k8s/pkg/resources/statefulset.go b/src/go/k8s/pkg/resources/statefulset.go index 87c28b1ba1dc..836a9dceffe0 100644 --- a/src/go/k8s/pkg/resources/statefulset.go +++ b/src/go/k8s/pkg/resources/statefulset.go @@ -538,7 +538,7 @@ func (r *StatefulSetResource) getPreStopHook() *corev1.Handler { curlGetCommand := r.composeCURLMaintenanceCommand(`--silent`, &genericMaintenancePath) cmd := fmt.Sprintf(`until [ "${status:-}" = "200" ]; do status=$(%s); sleep 0.5; done`, curlCommand) + " && " + - fmt.Sprintf(`until [ "${finished:-}" = "true" ]; do finished=$(%s | grep -o '\"finished\":[^,}]*' | grep -o '[^: ]*$'); sleep 0.5; done`, curlGetCommand) + fmt.Sprintf(`until [ "${finished:-}" = "true" ] || [ "${draining:-}" = "false" ]; do res=$(%s); finished=$(echo $res | grep -o '\"finished\":[^,}]*' | grep -o '[^: ]*$'); draining=$(echo $res | grep -o '\"draining\":[^,}]*' | grep -o '[^: ]*$'); sleep 0.5; done`, curlGetCommand) return &corev1.Handler{ Exec: &corev1.ExecAction{ From 2b02d34df17fe7e975bead416669615fca0a4e29 Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Fri, 10 Jun 2022 12:20:30 +0200 Subject: [PATCH 12/18] operator: add controller tests for scaling --- .../redpanda/cluster_controller.go | 14 +- .../cluster_controller_common_test.go | 157 +++++++++ .../cluster_controller_configuration_test.go | 65 ---- .../redpanda/cluster_controller_scale_test.go | 322 ++++++++++++++++++ .../redpanda/cluster_controller_test.go | 18 +- src/go/k8s/controllers/redpanda/suite_test.go | 111 +++++- src/go/k8s/main.go | 10 +- .../resources/resource_integration_test.go | 4 + src/go/k8s/pkg/resources/statefulset.go | 12 +- src/go/k8s/pkg/resources/statefulset_scale.go | 17 +- src/go/k8s/pkg/resources/statefulset_test.go | 4 + 11 files changed, 629 insertions(+), 105 deletions(-) create mode 100644 src/go/k8s/controllers/redpanda/cluster_controller_common_test.go create mode 100644 src/go/k8s/controllers/redpanda/cluster_controller_scale_test.go diff --git a/src/go/k8s/controllers/redpanda/cluster_controller.go b/src/go/k8s/controllers/redpanda/cluster_controller.go index a1d5ecaaed13..e6ed873fb204 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller.go @@ -16,6 +16,7 @@ import ( "fmt" "reflect" "strings" + "time" "github.com/go-logr/logr" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" @@ -46,11 +47,12 @@ var ( // ClusterReconciler reconciles a Cluster object type ClusterReconciler struct { client.Client - Log logr.Logger - configuratorSettings resources.ConfiguratorSettings - clusterDomain string - Scheme *runtime.Scheme - AdminAPIClientFactory adminutils.AdminAPIClientFactory + Log logr.Logger + configuratorSettings resources.ConfiguratorSettings + clusterDomain string + Scheme *runtime.Scheme + AdminAPIClientFactory adminutils.AdminAPIClientFactory + DecommissionWaitInterval time.Duration } //+kubebuilder:rbac:groups=redpanda.vectorized.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete @@ -156,6 +158,8 @@ func (r *ClusterReconciler) Reconcile( sa.Key().Name, r.configuratorSettings, configMapResource.GetNodeConfigHash, + r.AdminAPIClientFactory, + r.DecommissionWaitInterval, log) toApply := []resources.Reconciler{ diff --git a/src/go/k8s/controllers/redpanda/cluster_controller_common_test.go b/src/go/k8s/controllers/redpanda/cluster_controller_common_test.go new file mode 100644 index 000000000000..8e9aa6ef5e6f --- /dev/null +++ b/src/go/k8s/controllers/redpanda/cluster_controller_common_test.go @@ -0,0 +1,157 @@ +// Copyright 2022 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package redpanda_test + +import ( + "context" + "fmt" + + v1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func resourceGetter(key client.ObjectKey, res client.Object) func() error { + return func() error { + return k8sClient.Get(context.Background(), key, res) + } +} + +func resourceDataGetter( + key client.ObjectKey, res client.Object, extractor func() interface{}, +) func() interface{} { + return func() interface{} { + err := resourceGetter(key, res)() + if err != nil { + return err + } + return extractor() + } +} + +func annotationGetter( + key client.ObjectKey, res client.Object, name string, +) func() string { + return func() string { + if err := resourceGetter(key, res)(); err != nil { + return fmt.Sprintf("client error: %+v", err) + } + if sts, ok := res.(*appsv1.StatefulSet); ok { + return sts.Spec.Template.Annotations[name] + } + return res.GetAnnotations()[name] + } +} + +func clusterConfiguredConditionGetter( + key client.ObjectKey, +) func() *v1alpha1.ClusterCondition { + return func() *v1alpha1.ClusterCondition { + var cluster v1alpha1.Cluster + if err := k8sClient.Get(context.Background(), key, &cluster); err != nil { + return nil + } + return cluster.Status.GetCondition(v1alpha1.ClusterConfiguredConditionType) + } +} + +func clusterConfiguredConditionStatusGetter(key client.ObjectKey) func() bool { + return func() bool { + cond := clusterConfiguredConditionGetter(key)() + return cond != nil && cond.Status == corev1.ConditionTrue + } +} + +func clusterUpdater( + clusterNamespacedName types.NamespacedName, upd func(*v1alpha1.Cluster), +) func() error { + return func() error { + cl := &v1alpha1.Cluster{} + if err := k8sClient.Get(context.Background(), clusterNamespacedName, cl); err != nil { + return err + } + upd(cl) + return k8sClient.Update(context.Background(), cl) + } +} + +func statefulSetReplicasReconciler( + key types.NamespacedName, cluster *v1alpha1.Cluster, +) func() error { + return func() error { + var sts appsv1.StatefulSet + err := k8sClient.Get(context.Background(), key, &sts) + if err != nil { + return err + } + + // Aligning Pods first + var podList corev1.PodList + err = k8sClient.List(context.Background(), &podList, &client.ListOptions{ + Namespace: key.Namespace, + LabelSelector: labels.ForCluster(cluster).AsClientSelector(), + }) + if err != nil { + return err + } + + pods := make(map[string]bool, len(podList.Items)) + for i := range podList.Items { + pods[podList.Items[i].Name] = true + } + + for i := int32(0); i < *sts.Spec.Replicas; i++ { + podName := fmt.Sprintf("%s-%d", key.Name, i) + var pod corev1.Pod + if pods[podName] { + for j := range podList.Items { + if podList.Items[j].Name == podName { + pod = *podList.Items[j].DeepCopy() + } + } + } + pod.Name = podName + pod.Namespace = key.Namespace + pod.Labels = labels.ForCluster(cluster) + pod.Annotations = sts.Spec.Template.Annotations + pod.Spec = sts.Spec.Template.Spec + + if pods[podName] { + delete(pods, podName) + err = k8sClient.Update(context.Background(), &pod) + if err != nil { + return err + } + } else { + err = k8sClient.Create(context.Background(), &pod) + if err != nil { + return err + } + } + } + + for i := range podList.Items { + if pods[podList.Items[i].Name] { + err = k8sClient.Delete(context.Background(), &podList.Items[i]) + if err != nil { + return err + } + } + } + + // Aligning StatefulSet + sts.Status.Replicas = *sts.Spec.Replicas + sts.Status.ReadyReplicas = sts.Status.Replicas + return k8sClient.Status().Update(context.Background(), &sts) + } +} diff --git a/src/go/k8s/controllers/redpanda/cluster_controller_configuration_test.go b/src/go/k8s/controllers/redpanda/cluster_controller_configuration_test.go index dfd1fb11f45b..59cb70e9f705 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller_configuration_test.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller_configuration_test.go @@ -27,7 +27,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" - "sigs.k8s.io/controller-runtime/pkg/client" ) const ( @@ -820,67 +819,3 @@ func getInitialTestCluster( } return key, baseKey, cluster } - -func resourceGetter(key client.ObjectKey, res client.Object) func() error { - return func() error { - return k8sClient.Get(context.Background(), key, res) - } -} - -func resourceDataGetter( - key client.ObjectKey, res client.Object, extractor func() interface{}, -) func() interface{} { - return func() interface{} { - err := resourceGetter(key, res)() - if err != nil { - return err - } - return extractor() - } -} - -func annotationGetter( - key client.ObjectKey, res client.Object, name string, -) func() string { - return func() string { - if err := resourceGetter(key, res)(); err != nil { - return fmt.Sprintf("client error: %+v", err) - } - if sts, ok := res.(*appsv1.StatefulSet); ok { - return sts.Spec.Template.Annotations[name] - } - return res.GetAnnotations()[name] - } -} - -func clusterConfiguredConditionGetter( - key client.ObjectKey, -) func() *v1alpha1.ClusterCondition { - return func() *v1alpha1.ClusterCondition { - var cluster v1alpha1.Cluster - if err := k8sClient.Get(context.Background(), key, &cluster); err != nil { - return nil - } - return cluster.Status.GetCondition(v1alpha1.ClusterConfiguredConditionType) - } -} - -func clusterConfiguredConditionStatusGetter(key client.ObjectKey) func() bool { - return func() bool { - cond := clusterConfiguredConditionGetter(key)() - return cond != nil && cond.Status == corev1.ConditionTrue - } -} - -func clusterUpdater( - clusterNamespacedName types.NamespacedName, upd func(*v1alpha1.Cluster), -) func() error { - return func() error { - cl := &v1alpha1.Cluster{} - if err := k8sClient.Get(context.Background(), clusterNamespacedName, cl); err != nil { - return err - } - upd(cl) - return k8sClient.Update(context.Background(), cl) - } -} diff --git a/src/go/k8s/controllers/redpanda/cluster_controller_scale_test.go b/src/go/k8s/controllers/redpanda/cluster_controller_scale_test.go new file mode 100644 index 000000000000..57691f5fa913 --- /dev/null +++ b/src/go/k8s/controllers/redpanda/cluster_controller_scale_test.go @@ -0,0 +1,322 @@ +// Copyright 2022 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package redpanda_test + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/pointer" +) + +var _ = Describe("Redpanda cluster scale resource", func() { + const ( + timeout = time.Second * 30 + interval = time.Millisecond * 100 + + timeoutShort = time.Millisecond * 100 + intervalShort = time.Millisecond * 20 + ) + + Context("When starting up a fresh Redpanda cluster", func() { + It("Should wait for the first replica to come up before launching the others", func() { + By("Allowing creation of a new cluster with 3 replicas") + key, redpandaCluster := getClusterWithReplicas("startup", 3) + Expect(k8sClient.Create(context.Background(), redpandaCluster)).Should(Succeed()) + + By("Keeping the StatefulSet at single replica until initialized") + var sts appsv1.StatefulSet + Eventually(resourceGetter(key, &sts), timeout, interval).Should(Succeed()) + Consistently(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeoutShort, intervalShort).Should(Equal(int32(1))) + + By("Scaling to 3 replicas only when broker appears") + testAdminAPI.AddBroker(admin.Broker{NodeID: 0, MembershipStatus: admin.MembershipStatusActive}) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(3))) + + By("Deleting the cluster") + Expect(k8sClient.Delete(context.Background(), redpandaCluster)).Should(Succeed()) + }) + }) + + Context("When scaling up a cluster", func() { + It("Should just update the StatefulSet", func() { + By("Allowing creation of a new cluster with 2 replicas") + key, redpandaCluster := getClusterWithReplicas("scaling", 2) + Expect(k8sClient.Create(context.Background(), redpandaCluster)).Should(Succeed()) + + By("Scaling to 2 replicas when at least one broker appears") + testAdminAPI.AddBroker(admin.Broker{NodeID: 0, MembershipStatus: admin.MembershipStatusActive}) + var sts appsv1.StatefulSet + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(2))) + + By("Scaling up when replicas increase") + Eventually(clusterUpdater(key, func(cluster *v1alpha1.Cluster) { + cluster.Spec.Replicas = pointer.Int32Ptr(5) + }), timeout, interval).Should(Succeed()) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(5))) + + By("Deleting the cluster") + Expect(k8sClient.Delete(context.Background(), redpandaCluster)).Should(Succeed()) + }) + }) + + Context("When scaling down a cluster", func() { + It("Should always decommission the last nodes of a cluster one at time", func() { + By("Allowing creation of a new cluster with 3 replicas") + key, redpandaCluster := getClusterWithReplicas("decommission", 3) + Expect(k8sClient.Create(context.Background(), redpandaCluster)).Should(Succeed()) + + By("Scaling to 3 replicas when the brokers start to appear") + testAdminAPI.AddBroker(admin.Broker{NodeID: 0, MembershipStatus: admin.MembershipStatusActive}) + testAdminAPI.AddBroker(admin.Broker{NodeID: 1, MembershipStatus: admin.MembershipStatusActive}) + testAdminAPI.AddBroker(admin.Broker{NodeID: 2, MembershipStatus: admin.MembershipStatusActive}) + var sts appsv1.StatefulSet + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(3))) + + By("Decommissioning the last node when scaling down by 2") + Eventually(clusterUpdater(key, func(cluster *v1alpha1.Cluster) { + cluster.Spec.Replicas = pointer.Int32Ptr(1) + }), timeout, interval).Should(Succeed()) + + By("Start decommissioning node with ordinal 2") + Eventually(resourceDataGetter(key, redpandaCluster, func() interface{} { + if redpandaCluster.Status.DecommissioningNode == nil { + return nil + } + return *redpandaCluster.Status.DecommissioningNode + }), timeout, interval).Should(Equal(int32(2))) + Eventually(testAdminAPI.BrokerStatusGetter(2), timeout, interval).Should(Equal(admin.MembershipStatusDraining)) + Consistently(testAdminAPI.BrokerStatusGetter(1), timeoutShort, intervalShort).Should(Equal(admin.MembershipStatusActive)) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(3))) + + By("Scaling down only when decommissioning is done") + Expect(testAdminAPI.RemoveBroker(2)).To(BeTrue()) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(2))) + Eventually(statefulSetReplicasReconciler(key, redpandaCluster), timeout, interval).Should(Succeed()) + + By("Start decommissioning the other node") + Eventually(testAdminAPI.BrokerStatusGetter(1), timeout, interval).Should(Equal(admin.MembershipStatusDraining)) + + By("Removing the other node as well when done") + Expect(testAdminAPI.RemoveBroker(1)).To(BeTrue()) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(1))) + + By("Deleting the cluster") + Expect(k8sClient.Delete(context.Background(), redpandaCluster)).Should(Succeed()) + }) + + It("Can recommission a node while decommission is in progress", func() { + By("Allowing creation of a new cluster with 3 replicas") + key, redpandaCluster := getClusterWithReplicas("recommission", 3) + Expect(k8sClient.Create(context.Background(), redpandaCluster)).Should(Succeed()) + + By("Scaling to 3 replicas when the brokers start to appear") + testAdminAPI.AddBroker(admin.Broker{NodeID: 0, MembershipStatus: admin.MembershipStatusActive}) + testAdminAPI.AddBroker(admin.Broker{NodeID: 1, MembershipStatus: admin.MembershipStatusActive}) + testAdminAPI.AddBroker(admin.Broker{NodeID: 2, MembershipStatus: admin.MembershipStatusActive}) + var sts appsv1.StatefulSet + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(3))) + + By("Start decommissioning node 2") + Eventually(clusterUpdater(key, func(cluster *v1alpha1.Cluster) { + cluster.Spec.Replicas = pointer.Int32Ptr(2) + }), timeout, interval).Should(Succeed()) + Eventually(resourceDataGetter(key, redpandaCluster, func() interface{} { + if redpandaCluster.Status.DecommissioningNode == nil { + return nil + } + return *redpandaCluster.Status.DecommissioningNode + }), timeout, interval).Should(Equal(int32(2))) + Eventually(testAdminAPI.BrokerStatusGetter(2), timeout, interval).Should(Equal(admin.MembershipStatusDraining)) + + By("Recommissioning the node by restoring replicas") + Eventually(clusterUpdater(key, func(cluster *v1alpha1.Cluster) { + cluster.Spec.Replicas = pointer.Int32Ptr(3) + }), timeout, interval).Should(Succeed()) + Eventually(testAdminAPI.BrokerStatusGetter(2), timeout, interval).Should(Equal(admin.MembershipStatusActive)) + + By("Start decommissioning node 2 again") + Eventually(clusterUpdater(key, func(cluster *v1alpha1.Cluster) { + cluster.Spec.Replicas = pointer.Int32Ptr(2) + }), timeout, interval).Should(Succeed()) + Eventually(resourceDataGetter(key, redpandaCluster, func() interface{} { + if redpandaCluster.Status.DecommissioningNode == nil { + return nil + } + return *redpandaCluster.Status.DecommissioningNode + }), timeout, interval).Should(Equal(int32(2))) + Eventually(testAdminAPI.BrokerStatusGetter(2), timeout, interval).Should(Equal(admin.MembershipStatusDraining)) + + By("Recommissioning the node also when scaling to more replicas") + Eventually(clusterUpdater(key, func(cluster *v1alpha1.Cluster) { + cluster.Spec.Replicas = pointer.Int32Ptr(4) + }), timeout, interval).Should(Succeed()) + Eventually(testAdminAPI.BrokerStatusGetter(2), timeout, interval).Should(Equal(admin.MembershipStatusActive)) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(4))) + + By("Deleting the cluster") + Expect(k8sClient.Delete(context.Background(), redpandaCluster)).Should(Succeed()) + }) + + It("Can decommission nodes that never started", func() { + By("Allowing creation of a new cluster with 3 replicas") + key, redpandaCluster := getClusterWithReplicas("direct-decommission", 3) + Expect(k8sClient.Create(context.Background(), redpandaCluster)).Should(Succeed()) + + By("Scaling to 3 replicas, but have last one not registered") + testAdminAPI.AddBroker(admin.Broker{NodeID: 0, MembershipStatus: admin.MembershipStatusActive}) + testAdminAPI.AddBroker(admin.Broker{NodeID: 1, MembershipStatus: admin.MembershipStatusActive}) + var sts appsv1.StatefulSet + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(3))) + + By("Doing direct scale down of node 2") + Eventually(clusterUpdater(key, func(cluster *v1alpha1.Cluster) { + cluster.Spec.Replicas = pointer.Int32Ptr(2) + }), timeout, interval).Should(Succeed()) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(2))) + Eventually(statefulSetReplicasReconciler(key, redpandaCluster), timeout, interval).Should(Succeed()) + Eventually(resourceDataGetter(key, redpandaCluster, func() interface{} { + return redpandaCluster.Status.DecommissioningNode + }), timeout, interval).Should(BeNil()) + + By("Deleting the cluster") + Expect(k8sClient.Delete(context.Background(), redpandaCluster)).Should(Succeed()) + }) + + It("Can decommission nodes that were lazy to start", func() { + By("Allowing creation of a new cluster with 3 replicas") + key, redpandaCluster := getClusterWithReplicas("direct-decommission-lazy", 3) + Expect(k8sClient.Create(context.Background(), redpandaCluster)).Should(Succeed()) + + By("Scaling to 3 replicas, but have last one not registered") + testAdminAPI.AddBroker(admin.Broker{NodeID: 0, MembershipStatus: admin.MembershipStatusActive}) + testAdminAPI.AddBroker(admin.Broker{NodeID: 1, MembershipStatus: admin.MembershipStatusActive}) + var sts appsv1.StatefulSet + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(3))) + + By("Doing direct scale down of node 2") + Eventually(clusterUpdater(key, func(cluster *v1alpha1.Cluster) { + cluster.Spec.Replicas = pointer.Int32Ptr(2) + }), timeout, interval).Should(Succeed()) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(2))) + + By("Finding out that the node was able to connect to the cluster before being terminated") + testAdminAPI.AddBroker(admin.Broker{NodeID: 2, MembershipStatus: admin.MembershipStatusActive}) + Eventually(resourceDataGetter(key, redpandaCluster, func() interface{} { + if redpandaCluster.Status.DecommissioningNode == nil { + return nil + } + return *redpandaCluster.Status.DecommissioningNode + }), timeout, interval).Should(Equal(int32(2))) + + By("Scaling the cluster back and properly decommission the node") + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(3))) + Eventually(testAdminAPI.BrokerStatusGetter(2), timeout, interval).Should(Equal(admin.MembershipStatusDraining)) + + By("Complete downscale after decommission") + Expect(testAdminAPI.RemoveBroker(2)).To(BeTrue()) + Eventually(resourceDataGetter(key, &sts, func() interface{} { + return *sts.Spec.Replicas + }), timeout, interval).Should(Equal(int32(2))) + Eventually(statefulSetReplicasReconciler(key, redpandaCluster), timeout, interval).Should(Succeed()) + Eventually(resourceDataGetter(key, redpandaCluster, func() interface{} { + return redpandaCluster.Status.DecommissioningNode + }), timeout, interval).Should(BeNil()) + + By("Deleting the cluster") + Expect(k8sClient.Delete(context.Background(), redpandaCluster)).Should(Succeed()) + }) + }) +}) + +func getClusterWithReplicas( + name string, replicas int32, +) (key types.NamespacedName, cluster *v1alpha1.Cluster) { + key = types.NamespacedName{ + Name: name, + Namespace: "default", + } + + cluster = &v1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: key.Name, + Namespace: key.Namespace, + }, + Spec: v1alpha1.ClusterSpec{ + Image: "vectorized/redpanda", + Version: "dev", + Replicas: pointer.Int32Ptr(replicas), + Configuration: v1alpha1.RedpandaConfig{ + KafkaAPI: []v1alpha1.KafkaAPI{ + { + Port: 9092, + }, + }, + AdminAPI: []v1alpha1.AdminAPI{{Port: 9644}}, + RPCServer: v1alpha1.SocketAddress{ + Port: 33145, + }, + }, + Resources: v1alpha1.RedpandaResourceRequirements{ + ResourceRequirements: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + Redpanda: nil, + }, + }, + } + return key, cluster +} diff --git a/src/go/k8s/controllers/redpanda/cluster_controller_test.go b/src/go/k8s/controllers/redpanda/cluster_controller_test.go index e4b9a1e6d6f7..718c2b17787c 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller_test.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller_test.go @@ -699,10 +699,11 @@ var _ = Describe("RedPandaCluster controller", func() { // exists. This verifies that these situations are handled // gracefully and without error r := &redpanda.ClusterReconciler{ - Client: fake.NewClientBuilder().Build(), - Log: ctrl.Log, - Scheme: scheme.Scheme, - AdminAPIClientFactory: testAdminAPIFactory, + Client: fake.NewClientBuilder().Build(), + Log: ctrl.Log, + Scheme: scheme.Scheme, + AdminAPIClientFactory: testAdminAPIFactory, + DecommissionWaitInterval: 100 * time.Millisecond, } _, err := r.Reconcile(context.Background(), ctrl.Request{NamespacedName: types.NamespacedName{ Namespace: "default", @@ -720,10 +721,11 @@ var _ = Describe("RedPandaCluster controller", func() { Expect(err).NotTo(HaveOccurred()) r := &redpanda.ClusterReconciler{ - Client: fake.NewClientBuilder().Build(), - Log: ctrl.Log, - Scheme: scheme.Scheme, - AdminAPIClientFactory: testAdminAPIFactory, + Client: fake.NewClientBuilder().Build(), + Log: ctrl.Log, + Scheme: scheme.Scheme, + AdminAPIClientFactory: testAdminAPIFactory, + DecommissionWaitInterval: 100 * time.Millisecond, } Expect(r.WithConfiguratorSettings(res.ConfiguratorSettings{ diff --git a/src/go/k8s/controllers/redpanda/suite_test.go b/src/go/k8s/controllers/redpanda/suite_test.go index b822d0293001..57bd1250b56b 100644 --- a/src/go/k8s/controllers/redpanda/suite_test.go +++ b/src/go/k8s/controllers/redpanda/suite_test.go @@ -13,6 +13,7 @@ import ( "bytes" "context" "encoding/json" + "fmt" "net/http" "path/filepath" "sort" @@ -94,16 +95,23 @@ var _ = BeforeSuite(func(done Done) { _ *redpandav1alpha1.Cluster, _ string, _ types.AdminTLSConfigProvider, - _ ...int32, + ordinals ...int32, ) (adminutils.AdminAPIClient, error) { + if len(ordinals) == 1 { + return &scopedMockAdminAPI{ + mockAdminAPI: testAdminAPI, + ordinal: ordinals[0], + }, nil + } return testAdminAPI, nil } err = (&redpandacontrollers.ClusterReconciler{ - Client: k8sManager.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("core").WithName("RedpandaCluster"), - Scheme: k8sManager.GetScheme(), - AdminAPIClientFactory: testAdminAPIFactory, + Client: k8sManager.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("core").WithName("RedpandaCluster"), + Scheme: k8sManager.GetScheme(), + AdminAPIClientFactory: testAdminAPIFactory, + DecommissionWaitInterval: 100 * time.Millisecond, }).WithClusterDomain("cluster.local").WithConfiguratorSettings(resources.ConfiguratorSettings{ ConfiguratorBaseImage: "vectorized/configurator", ConfiguratorTag: "latest", @@ -162,9 +170,15 @@ type mockAdminAPI struct { invalid []string unknown []string directValidation bool + brokers []admin.Broker monitor sync.Mutex } +type scopedMockAdminAPI struct { + *mockAdminAPI + ordinal int32 +} + var _ adminutils.AdminAPIClient = &mockAdminAPI{} type unavailableError struct{} @@ -289,6 +303,7 @@ func (m *mockAdminAPI) Clear() { m.patches = nil m.unavailable = false m.directValidation = false + m.brokers = nil } func (m *mockAdminAPI) GetFeatures( @@ -373,22 +388,98 @@ func (m *mockAdminAPI) GetNodeConfig( return admin.NodeConfig{}, nil } +// nolint:goerr113 // test code +func (s *scopedMockAdminAPI) GetNodeConfig( + ctx context.Context, +) (admin.NodeConfig, error) { + brokers, err := s.Brokers(ctx) + if err != nil { + return admin.NodeConfig{}, err + } + if len(brokers) <= int(s.ordinal) { + return admin.NodeConfig{}, fmt.Errorf("broker not registered") + } + return admin.NodeConfig{ + NodeID: brokers[int(s.ordinal)].NodeID, + }, nil +} + func (m *mockAdminAPI) SetDirectValidationEnabled(directValidation bool) { m.monitor.Lock() defer m.monitor.Unlock() m.directValidation = directValidation } +func (m *mockAdminAPI) AddBroker(broker admin.Broker) { + m.monitor.Lock() + defer m.monitor.Unlock() + + m.brokers = append(m.brokers, broker) +} + +func (m *mockAdminAPI) RemoveBroker(id int) bool { + m.monitor.Lock() + defer m.monitor.Unlock() + + idx := -1 + for i := range m.brokers { + if m.brokers[i].NodeID == id { + idx = i + break + } + } + if idx < 0 { + return false + } + m.brokers = append(m.brokers[:idx], m.brokers[idx+1:]...) + return true +} + func (m *mockAdminAPI) Brokers(_ context.Context) ([]admin.Broker, error) { - return nil, nil + m.monitor.Lock() + defer m.monitor.Unlock() + + return append([]admin.Broker{}, m.brokers...), nil } -func (m *mockAdminAPI) DecommissionBroker(_ context.Context, _ int) error { - return nil +func (m *mockAdminAPI) BrokerStatusGetter( + id int, +) func() admin.MembershipStatus { + return func() admin.MembershipStatus { + m.monitor.Lock() + defer m.monitor.Unlock() + + for i := range m.brokers { + if m.brokers[i].NodeID == id { + return m.brokers[i].MembershipStatus + } + } + return "" + } } -func (m *mockAdminAPI) RecommissionBroker(_ context.Context, _ int) error { - return nil +func (m *mockAdminAPI) DecommissionBroker(_ context.Context, id int) error { + return m.SetBrokerStatus(id, admin.MembershipStatusDraining) +} + +func (m *mockAdminAPI) RecommissionBroker(_ context.Context, id int) error { + return m.SetBrokerStatus(id, admin.MembershipStatusActive) +} + +// nolint:goerr113 // test code +func (m *mockAdminAPI) SetBrokerStatus( + id int, status admin.MembershipStatus, +) error { + m.monitor.Lock() + defer m.monitor.Unlock() + + for i := range m.brokers { + if m.brokers[i].NodeID == id { + m.brokers[i].MembershipStatus = status + return nil + } + } + return fmt.Errorf("unknown broker %d", id) } func makeCopy(input, output interface{}) { diff --git a/src/go/k8s/main.go b/src/go/k8s/main.go index 2785e744a51b..25b34231c0bd 100644 --- a/src/go/k8s/main.go +++ b/src/go/k8s/main.go @@ -11,6 +11,7 @@ package main import ( "flag" "os" + "time" cmapiv1 "github.com/jetstack/cert-manager/pkg/apis/certmanager/v1" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" @@ -98,10 +99,11 @@ func main() { } if err = (&redpandacontrollers.ClusterReconciler{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("redpanda").WithName("Cluster"), - Scheme: mgr.GetScheme(), - AdminAPIClientFactory: adminutils.NewInternalAdminAPI, + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("redpanda").WithName("Cluster"), + Scheme: mgr.GetScheme(), + AdminAPIClientFactory: adminutils.NewInternalAdminAPI, + DecommissionWaitInterval: 10 * time.Second, }).WithClusterDomain(clusterDomain).WithConfiguratorSettings(configurator).SetupWithManager(mgr); err != nil { setupLog.Error(err, "Unable to create controller", "controller", "Cluster") os.Exit(1) diff --git a/src/go/k8s/pkg/resources/resource_integration_test.go b/src/go/k8s/pkg/resources/resource_integration_test.go index c12ca2b52682..43327f631f4f 100644 --- a/src/go/k8s/pkg/resources/resource_integration_test.go +++ b/src/go/k8s/pkg/resources/resource_integration_test.go @@ -17,8 +17,10 @@ import ( "path/filepath" "strings" "testing" + "time" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" + adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin" res "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -96,6 +98,8 @@ func TestEnsure_StatefulSet(t *testing.T) { ImagePullPolicy: "Always", }, func(ctx context.Context) (string, error) { return hash, nil }, + adminutils.NewInternalAdminAPI, + time.Second, ctrl.Log.WithName("test")) err = sts.Ensure(context.Background()) diff --git a/src/go/k8s/pkg/resources/statefulset.go b/src/go/k8s/pkg/resources/statefulset.go index 836a9dceffe0..e3eb8969bc21 100644 --- a/src/go/k8s/pkg/resources/statefulset.go +++ b/src/go/k8s/pkg/resources/statefulset.go @@ -17,9 +17,11 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/go-logr/logr" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" + adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates" resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" @@ -93,8 +95,10 @@ type StatefulSetResource struct { // annotation to ensure the pods get restarted when configuration changes // this has to be retrieved lazily to achieve the correct order of resources // being applied - nodeConfigMapHashGetter func(context.Context) (string, error) - logger logr.Logger + nodeConfigMapHashGetter func(context.Context) (string, error) + adminAPIClientFactory adminutils.AdminAPIClientFactory + decommissionWaitInterval time.Duration + logger logr.Logger LastObservedState *appsv1.StatefulSet } @@ -112,6 +116,8 @@ func NewStatefulSet( serviceAccountName string, configuratorSettings ConfiguratorSettings, nodeConfigMapHashGetter func(context.Context) (string, error), + adminAPIClientFactory adminutils.AdminAPIClientFactory, + decommissionWaitInterval time.Duration, logger logr.Logger, ) *StatefulSetResource { return &StatefulSetResource{ @@ -127,6 +133,8 @@ func NewStatefulSet( serviceAccountName, configuratorSettings, nodeConfigMapHashGetter, + adminAPIClientFactory, + decommissionWaitInterval, logger.WithValues("Kind", statefulSetKind()), nil, } diff --git a/src/go/k8s/pkg/resources/statefulset_scale.go b/src/go/k8s/pkg/resources/statefulset_scale.go index fece4bdc83d9..bcd4bbe507a3 100644 --- a/src/go/k8s/pkg/resources/statefulset_scale.go +++ b/src/go/k8s/pkg/resources/statefulset_scale.go @@ -12,7 +12,6 @@ package resources import ( "context" "fmt" - "time" "github.com/go-logr/logr" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" @@ -24,10 +23,6 @@ import ( k8sclient "sigs.k8s.io/controller-runtime/pkg/client" ) -const ( - DecommissionRequeueDuration = time.Second * 10 -) - // handleScaling is called to detect cases of replicas change and apply them to the cluster // nolint:nestif // for clarity func (r *StatefulSetResource) handleScaling(ctx context.Context) error { @@ -63,7 +58,7 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error { } if !formed { return &RequeueAfterError{ - RequeueAfter: DecommissionRequeueDuration, + RequeueAfter: r.decommissionWaitInterval, Msg: fmt.Sprintf("Waiting for cluster to be formed before upscaling to %d replicas", *r.pandaCluster.Spec.Replicas), } } @@ -97,7 +92,7 @@ func (r *StatefulSetResource) handleDecommission(ctx context.Context) error { } if broker != nil { - r.logger.Info("Broker still exists in the cluster", "node_id", broker.NodeID) + r.logger.Info("Broker still exists in the cluster", "node_id", broker.NodeID, "status", broker.MembershipStatus) if broker.MembershipStatus != admin.MembershipStatusDraining { // We ask to decommission since it does not seem done err = adminAPI.DecommissionBroker(ctx, broker.NodeID) @@ -116,7 +111,7 @@ func (r *StatefulSetResource) handleDecommission(ctx context.Context) error { // Wait until the node is fully drained (or wait forever if the cluster does not allow decommissioning of that specific node) return &RequeueAfterError{ - RequeueAfter: DecommissionRequeueDuration, + RequeueAfter: r.decommissionWaitInterval, Msg: fmt.Sprintf("Waiting for node %d to be decommissioned from cluster", broker.NodeID), } } @@ -135,7 +130,7 @@ func (r *StatefulSetResource) handleDecommission(ctx context.Context) error { } if !scaledDown { return &RequeueAfterError{ - RequeueAfter: DecommissionRequeueDuration, + RequeueAfter: r.decommissionWaitInterval, Msg: fmt.Sprintf("Waiting for statefulset to downscale to %d replicas", targetReplicas), } } @@ -185,7 +180,7 @@ func (r *StatefulSetResource) handleRecommission(ctx context.Context) error { r.logger.Info("Node marked for being recommissioned in cluster", "node_id", *r.pandaCluster.Status.DecommissioningNode) return &RequeueAfterError{ - RequeueAfter: DecommissionRequeueDuration, + RequeueAfter: r.decommissionWaitInterval, Msg: fmt.Sprintf("Waiting for node %d to be recommissioned into cluster %s", *r.pandaCluster.Status.DecommissioningNode, r.pandaCluster.Name), } } @@ -198,7 +193,7 @@ func (r *StatefulSetResource) handleRecommission(ctx context.Context) error { func (r *StatefulSetResource) getAdminAPIClient( ctx context.Context, ordinals ...int32, ) (adminutils.AdminAPIClient, error) { - return adminutils.NewInternalAdminAPI(ctx, r, r.pandaCluster, r.serviceFQDN, r.adminTLSConfigProvider, ordinals...) + return r.adminAPIClientFactory(ctx, r, r.pandaCluster, r.serviceFQDN, r.adminTLSConfigProvider, ordinals...) } func (r *StatefulSetResource) isClusterFormed( diff --git a/src/go/k8s/pkg/resources/statefulset_test.go b/src/go/k8s/pkg/resources/statefulset_test.go index 37eac1ec5723..6b069a7bda58 100644 --- a/src/go/k8s/pkg/resources/statefulset_test.go +++ b/src/go/k8s/pkg/resources/statefulset_test.go @@ -12,8 +12,10 @@ package resources_test import ( "context" "testing" + "time" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" + adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin" res "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" "github.com/stretchr/testify/assert" v1 "k8s.io/api/apps/v1" @@ -108,6 +110,8 @@ func TestEnsure(t *testing.T) { ImagePullPolicy: "Always", }, func(ctx context.Context) (string, error) { return hash, nil }, + adminutils.NewInternalAdminAPI, + time.Second, ctrl.Log.WithName("test")) err = sts.Ensure(context.Background()) From 3b7b77e51584eb764dce02ce2f2b37ee368f892d Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Fri, 3 Jun 2022 17:46:45 +0200 Subject: [PATCH 13/18] operator: add kuttl test for decommission --- .../k8s/tests/e2e/decommission/00-assert.yaml | 23 ++++++++++++++ .../e2e/decommission/00-redpanda-cluster.yaml | 25 ++++++++++++++++ .../k8s/tests/e2e/decommission/01-assert.yaml | 29 ++++++++++++++++++ .../k8s/tests/e2e/decommission/01-probe.yaml | 30 +++++++++++++++++++ .../k8s/tests/e2e/decommission/02-assert.yaml | 22 ++++++++++++++ .../decommission/02-redpanda-downscale.yaml | 6 ++++ .../k8s/tests/e2e/decommission/03-assert.yaml | 29 ++++++++++++++++++ .../k8s/tests/e2e/decommission/03-probe.yaml | 30 +++++++++++++++++++ 8 files changed, 194 insertions(+) create mode 100644 src/go/k8s/tests/e2e/decommission/00-assert.yaml create mode 100644 src/go/k8s/tests/e2e/decommission/00-redpanda-cluster.yaml create mode 100644 src/go/k8s/tests/e2e/decommission/01-assert.yaml create mode 100644 src/go/k8s/tests/e2e/decommission/01-probe.yaml create mode 100644 src/go/k8s/tests/e2e/decommission/02-assert.yaml create mode 100644 src/go/k8s/tests/e2e/decommission/02-redpanda-downscale.yaml create mode 100644 src/go/k8s/tests/e2e/decommission/03-assert.yaml create mode 100644 src/go/k8s/tests/e2e/decommission/03-probe.yaml diff --git a/src/go/k8s/tests/e2e/decommission/00-assert.yaml b/src/go/k8s/tests/e2e/decommission/00-assert.yaml new file mode 100644 index 000000000000..f94701ae5a68 --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/00-assert.yaml @@ -0,0 +1,23 @@ +apiVersion: redpanda.vectorized.io/v1alpha1 +kind: Cluster +metadata: + name: decommissioning +status: + replicas: 3 + currentReplicas: 3 +--- + +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +collectors: + - type: pod + selector: app.kubernetes.io/name=redpanda + tail: -1 + - type: pod + namespace: redpanda-system + selector: control-plane=controller-manager + tail: -1 + - type: command + command: kubectl get clusters -o jsonpath={@} -n $NAMESPACE + - type: command + command: kubectl get pods -o jsonpath={@} -n $NAMESPACE diff --git a/src/go/k8s/tests/e2e/decommission/00-redpanda-cluster.yaml b/src/go/k8s/tests/e2e/decommission/00-redpanda-cluster.yaml new file mode 100644 index 000000000000..7bb3d1f5d8e6 --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/00-redpanda-cluster.yaml @@ -0,0 +1,25 @@ +apiVersion: redpanda.vectorized.io/v1alpha1 +kind: Cluster +metadata: + name: decommissioning +spec: + image: "localhost/redpanda" + version: "dev" + replicas: 3 + resources: + requests: + cpu: 100m + memory: 100Mi + limits: + cpu: 1 + memory: 500Mi + configuration: + rpcServer: + port: 33145 + kafkaApi: + - port: 9092 + adminApi: + - port: 9644 + pandaproxyApi: + - port: 8082 + developerMode: true diff --git a/src/go/k8s/tests/e2e/decommission/01-assert.yaml b/src/go/k8s/tests/e2e/decommission/01-assert.yaml new file mode 100644 index 000000000000..a3c57ca43421 --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/01-assert.yaml @@ -0,0 +1,29 @@ +apiVersion: v1 +kind: Pod +metadata: + labels: + job-name: wait-for-3-brokers +status: + containerStatuses: + - name: curl + state: + terminated: + message: | + 3 + phase: Succeeded +--- + +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +collectors: + - type: pod + selector: app.kubernetes.io/name=redpanda + tail: -1 + - type: pod + namespace: redpanda-system + selector: control-plane=controller-manager + tail: -1 + - type: command + command: kubectl get clusters -o jsonpath={@} -n $NAMESPACE + - type: command + command: kubectl get pods -o jsonpath={@} -n $NAMESPACE diff --git a/src/go/k8s/tests/e2e/decommission/01-probe.yaml b/src/go/k8s/tests/e2e/decommission/01-probe.yaml new file mode 100644 index 000000000000..b3306231ee82 --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/01-probe.yaml @@ -0,0 +1,30 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: wait-for-3-brokers +spec: + backoffLimit: 10 + template: + spec: + activeDeadlineSeconds: 90 + containers: + - name: curl + image: curlimages/curl:latest + env: + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + command: + - /bin/sh + - -c + - -ex + args: + - > + url=http://decommissioning-0.decommissioning.$NAMESPACE.svc.cluster.local:9644/v1/brokers + res=$(curl --silent -L $url | tr '{' '\n' | grep node_id | wc -l) && + echo $res > /dev/termination-log && + if [[ "$res" != "3" ]]; then + exit 1; + fi + restartPolicy: Never diff --git a/src/go/k8s/tests/e2e/decommission/02-assert.yaml b/src/go/k8s/tests/e2e/decommission/02-assert.yaml new file mode 100644 index 000000000000..ff279070c069 --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/02-assert.yaml @@ -0,0 +1,22 @@ +apiVersion: redpanda.vectorized.io/v1alpha1 +kind: Cluster +metadata: + name: decommissioning +status: + replicas: 2 +--- + +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +collectors: + - type: pod + selector: app.kubernetes.io/name=redpanda + tail: -1 + - type: pod + namespace: redpanda-system + selector: control-plane=controller-manager + tail: -1 + - type: command + command: kubectl get clusters -o jsonpath={@} -n $NAMESPACE + - type: command + command: kubectl get pods -o jsonpath={@} -n $NAMESPACE diff --git a/src/go/k8s/tests/e2e/decommission/02-redpanda-downscale.yaml b/src/go/k8s/tests/e2e/decommission/02-redpanda-downscale.yaml new file mode 100644 index 000000000000..4ef80085dfdd --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/02-redpanda-downscale.yaml @@ -0,0 +1,6 @@ +apiVersion: redpanda.vectorized.io/v1alpha1 +kind: Cluster +metadata: + name: decommissioning +spec: + replicas: 2 diff --git a/src/go/k8s/tests/e2e/decommission/03-assert.yaml b/src/go/k8s/tests/e2e/decommission/03-assert.yaml new file mode 100644 index 000000000000..875ada95dc8a --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/03-assert.yaml @@ -0,0 +1,29 @@ +apiVersion: v1 +kind: Pod +metadata: + labels: + job-name: wait-for-2-brokers +status: + containerStatuses: + - name: curl + state: + terminated: + message: | + 2 + phase: Succeeded +--- + +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +collectors: + - type: pod + selector: app.kubernetes.io/name=redpanda + tail: -1 + - type: pod + namespace: redpanda-system + selector: control-plane=controller-manager + tail: -1 + - type: command + command: kubectl get clusters -o jsonpath={@} -n $NAMESPACE + - type: command + command: kubectl get pods -o jsonpath={@} -n $NAMESPACE diff --git a/src/go/k8s/tests/e2e/decommission/03-probe.yaml b/src/go/k8s/tests/e2e/decommission/03-probe.yaml new file mode 100644 index 000000000000..c20a7a1e388e --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/03-probe.yaml @@ -0,0 +1,30 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: wait-for-2-brokers +spec: + backoffLimit: 10 + template: + spec: + activeDeadlineSeconds: 90 + containers: + - name: curl + image: curlimages/curl:latest + env: + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + command: + - /bin/sh + - -c + - -ex + args: + - > + url=http://decommissioning-0.decommissioning.$NAMESPACE.svc.cluster.local:9644/v1/brokers + res=$(curl --silent -L $url | tr '{' '\n' | grep node_id | wc -l) && + echo $res > /dev/termination-log && + if [[ "$res" != "2" ]]; then + exit 1; + fi + restartPolicy: Never From e9bba61d996f6bf2de3e6ef2c1b5844442b0508c Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Thu, 9 Jun 2022 14:45:27 +0200 Subject: [PATCH 14/18] operator: disable maintenance mode hook on node 0 when starting up a fresh cluster This allows to have predictable initial cluster formation. When the cluster is first created, it's composed of a single node. On single-node clusters, we should not activate maintenance mode, because, otherwise, a restart of the node will make it drain leadership and the cluster will not form. On the counter-side, enabling maintenance mode when the cluster scales to multiple instances currently causes a restart of node 0. This will be solved when implementing dynamic hooks. --- src/go/k8s/pkg/resources/statefulset.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/go/k8s/pkg/resources/statefulset.go b/src/go/k8s/pkg/resources/statefulset.go index e3eb8969bc21..b04347c72825 100644 --- a/src/go/k8s/pkg/resources/statefulset.go +++ b/src/go/k8s/pkg/resources/statefulset.go @@ -507,7 +507,8 @@ func (r *StatefulSetResource) obj( } // Only multi-replica clusters should use maintenance mode. See: https://github.com/redpanda-data/redpanda/issues/4338 - multiReplica := r.pandaCluster.Spec.Replicas != nil && *r.pandaCluster.Spec.Replicas > 1 + // Startup of a fresh cluster would let the first pod restart, until dynamic hooks are implemented. See: https://github.com/redpanda-data/redpanda/pull/4907 + multiReplica := r.pandaCluster.GetCurrentReplicas() > 1 if featuregates.MaintenanceMode(r.pandaCluster.Spec.Version) && r.pandaCluster.IsUsingMaintenanceModeHooks() && multiReplica { ss.Spec.Template.Spec.Containers[0].Lifecycle = &corev1.Lifecycle{ PreStop: r.getPreStopHook(), From 90812c13da6b50909841f2f5e9abd8452fc33b4c Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Thu, 9 Jun 2022 16:41:08 +0200 Subject: [PATCH 15/18] operator: add documentation to the scale handler --- src/go/k8s/pkg/resources/statefulset_scale.go | 57 ++++++++++++++++++- 1 file changed, 54 insertions(+), 3 deletions(-) diff --git a/src/go/k8s/pkg/resources/statefulset_scale.go b/src/go/k8s/pkg/resources/statefulset_scale.go index bcd4bbe507a3..81d6285e2149 100644 --- a/src/go/k8s/pkg/resources/statefulset_scale.go +++ b/src/go/k8s/pkg/resources/statefulset_scale.go @@ -23,7 +23,40 @@ import ( k8sclient "sigs.k8s.io/controller-runtime/pkg/client" ) -// handleScaling is called to detect cases of replicas change and apply them to the cluster +// handleScaling is responsible for managing the current number of replicas running for a cluster. +// +// Replicas are controlled via the field `status.currentReplicas` that is set in the current method and should be +// respected by all other external reconcile functions, including the one that sets the actual amount of replicas in the StatefulSet. +// External functions should use `cluster.getCurrentReplicas()` to get the number of expected replicas for a cluster, +// since it deals with cases where the status is not initialized yet. +// +// When users change the value of `spec.replicas` for a cluster, this function is responsible for progressively changing the `status.currentReplicas` to match that value. +// In the case of a Cluster downscaled by 1 replica (i.e. `spec.replicas` lower than current value of `status.currentReplicas` by `1`), before lowering the +// value of `status.currentReplicas`, this handler will first decommission the last node, using the admin API of the cluster. +// +// There are cases where the cluster will not decommission a node (e.g. user reduces `spec.replicas` to `2`, but there are topics with `3` partition replicas), +// in which case the draining phase will hang indefinitely in Redpanda. When this happens, the controller will not downscale the cluster and +// users will find in `status.decommissioningNode` that a node is constantly being decommissioned. +// +// In cases where the decommissioning process hangs, users can increase again the value of `spec.replicas` and the handler will contact the admin API +// to recommission the last node, ensuring that the number of replicas matches the expected value and that the node joins the cluster again. +// +// Users can change the value of `spec.replicas` freely (except it cannot be 0). In case of downscaling by multiple replicas, the handler will +// decommission one node at time, until the desired number of replicas is met. +// +// When a new cluster is created, the value of `status.currentReplicas` will be initially set to `1`, no matter what the user sets in `spec.replicas`. +// This handler will first ensure that the initial node forms a cluster, by retrieving the list of brokers using the admin API. +// After the cluster is formed, the handler will increase the `status.currentReplicas` as requested. +// +// This is due to the fact that Redpanda is currently unable to initialize a cluster if each node is given the full list of seed servers: https://github.com/redpanda-data/redpanda/issues/333. +// Previous versions of the operator use to hack the list of seeds server (in the configurator pod) of node with ordinal 0, to set it always to an empty set, +// allowing it to create an initial cluster. That strategy worked because the list of seed servers is not read again from the `redpanda.yaml` file once the +// cluster is initialized. +// But the drawback was that, on an existing running cluster, when node 0 loses its data directory (e.g. because it's using local storage, and it undergoes a k8s node upgrade), +// then node 0 (having no data and an empty seed server list in `redpanda.yaml`) creates a brand new cluster ignoring the other nodes (split-brain). +// The strategy implemented here (to initialize the cluster at 1 replica, then upscaling to the desired number, without hacks on the seed server list), +// should fix this problem, since the list of seeds servers will be the same in all nodes once the cluster is created. +// // nolint:nestif // for clarity func (r *StatefulSetResource) handleScaling(ctx context.Context) error { if r.pandaCluster.Status.DecommissioningNode != nil { @@ -76,7 +109,16 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error { return r.Status().Update(ctx, r.pandaCluster) } -// handleDecommission manages cases of node decommissioning +// handleDecommission manages the case of decommissioning of the last node of a cluster. +// +// When this handler is called, the `status.decommissioningNode` is populated with the pod ordinal (== nodeID) of the +// node that needs to be decommissioned. +// +// The handler verifies that the node is not present in the list of brokers registered in the cluster, via admin API, +// then downscales the StatefulSet via decreasing the `status.currentReplicas`. +// +// Before completing the process, it double-checks if the node is still not registered, for handling cases where the node was +// about to start when the decommissioning process started. If the broker is found, the process is restarted. func (r *StatefulSetResource) handleDecommission(ctx context.Context) error { targetReplicas := *r.pandaCluster.Status.DecommissioningNode r.logger.Info("Handling cluster in decommissioning phase", "target replicas", targetReplicas) @@ -151,7 +193,15 @@ func (r *StatefulSetResource) handleDecommission(ctx context.Context) error { return r.Status().Update(ctx, r.pandaCluster) } -// handleRecommission manages cases of nodes being recommissioned after a failed/wrong decommission +// handleRecommission manages the case of a node being recommissioned after a failed/wrong decommission. +// +// Recommission can only work for nodes that are still in the "draining" phase according to Redpanda. +// +// When this handler is triggered, `status.decommissioningNode` is populated with the node that was being decommissioned and +// `spec.replicas` reports a value that include that node, indicating the intention from the user to recommission it. +// +// The handler ensures that the node is running and also calls the admin API to recommission it. +// The process finishes when the node is registered among brokers and the StatefulSet is correctly scaled. func (r *StatefulSetResource) handleRecommission(ctx context.Context) error { r.logger.Info("Handling cluster in recommissioning phase") @@ -278,6 +328,7 @@ type NodeReappearingError struct { NodeID int } +// Error makes the NodeReappearingError a proper error func (e *NodeReappearingError) Error() string { return fmt.Sprintf("node has appeared in the cluster with id=%d", e.NodeID) } From a9efa0a5f74ba55bd3030e77659cba48f510e361 Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Fri, 10 Jun 2022 12:23:21 +0200 Subject: [PATCH 16/18] operator: fix maintenance mode activation on decommissioning node (workaround for #4999) When a node is shutdown after decommission, the maintenance mode hooks will trigger. While the process has no visible effect on partitions, it leaves the cluster in an inconsistent state, so that other nodes cannot enter maintenance mode. We force reset the flag with this change. --- src/go/k8s/controllers/redpanda/suite_test.go | 8 +++ src/go/k8s/pkg/admin/admin.go | 3 ++ src/go/k8s/pkg/resources/statefulset.go | 6 +++ src/go/k8s/pkg/resources/statefulset_scale.go | 53 +++++++++++++++++++ 4 files changed, 70 insertions(+) diff --git a/src/go/k8s/controllers/redpanda/suite_test.go b/src/go/k8s/controllers/redpanda/suite_test.go index 57bd1250b56b..bce30a3e193d 100644 --- a/src/go/k8s/controllers/redpanda/suite_test.go +++ b/src/go/k8s/controllers/redpanda/suite_test.go @@ -466,6 +466,14 @@ func (m *mockAdminAPI) RecommissionBroker(_ context.Context, id int) error { return m.SetBrokerStatus(id, admin.MembershipStatusActive) } +func (m *mockAdminAPI) EnableMaintenanceMode(_ context.Context, _ int) error { + return nil +} + +func (m *mockAdminAPI) DisableMaintenanceMode(_ context.Context, _ int) error { + return nil +} + // nolint:goerr113 // test code func (m *mockAdminAPI) SetBrokerStatus( id int, status admin.MembershipStatus, diff --git a/src/go/k8s/pkg/admin/admin.go b/src/go/k8s/pkg/admin/admin.go index 1512dc6e25a5..2a37d8a5ecce 100644 --- a/src/go/k8s/pkg/admin/admin.go +++ b/src/go/k8s/pkg/admin/admin.go @@ -90,6 +90,9 @@ type AdminAPIClient interface { Brokers(ctx context.Context) ([]admin.Broker, error) DecommissionBroker(ctx context.Context, node int) error RecommissionBroker(ctx context.Context, node int) error + + EnableMaintenanceMode(ctx context.Context, node int) error + DisableMaintenanceMode(ctx context.Context, node int) error } var _ AdminAPIClient = &admin.AdminAPI{} diff --git a/src/go/k8s/pkg/resources/statefulset.go b/src/go/k8s/pkg/resources/statefulset.go index b04347c72825..1a1309dd0e60 100644 --- a/src/go/k8s/pkg/resources/statefulset.go +++ b/src/go/k8s/pkg/resources/statefulset.go @@ -176,6 +176,12 @@ func (r *StatefulSetResource) Ensure(ctx context.Context) error { } r.LastObservedState = &sts + // Hack for: https://github.com/redpanda-data/redpanda/issues/4999 + err = r.disableMaintenanceModeOnDecommissionedNodes(ctx) + if err != nil { + return err + } + r.logger.Info("Running update", "resource name", r.Key().Name) err = r.runUpdate(ctx, &sts, obj.(*appsv1.StatefulSet)) if err != nil { diff --git a/src/go/k8s/pkg/resources/statefulset_scale.go b/src/go/k8s/pkg/resources/statefulset_scale.go index 81d6285e2149..c76927b8b403 100644 --- a/src/go/k8s/pkg/resources/statefulset_scale.go +++ b/src/go/k8s/pkg/resources/statefulset_scale.go @@ -11,12 +11,14 @@ package resources import ( "context" + "errors" "fmt" "github.com/go-logr/logr" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -261,6 +263,56 @@ func (r *StatefulSetResource) isClusterFormed( return len(brokers) > 0, nil } +// disableMaintenanceModeOnDecommissionedNodes can be used to put a cluster in a consistent state, disabling maintenance mode on +// nodes that have been decommissioned. +// +// A decommissioned node may activate maintenance mode via shutdown hooks and the cluster may enter an inconsistent state, +// preventing other pods clean shutdown. +// +// See: https://github.com/redpanda-data/redpanda/issues/4999 +func (r *StatefulSetResource) disableMaintenanceModeOnDecommissionedNodes( + ctx context.Context, +) error { + if !featuregates.MaintenanceMode(r.pandaCluster.Status.Version) { + return nil + } + + if r.pandaCluster.Status.DecommissioningNode == nil || r.pandaCluster.Status.CurrentReplicas > *r.pandaCluster.Status.DecommissioningNode { + // Only if actually in a decommissioning phase + return nil + } + + ordinal := *r.pandaCluster.Status.DecommissioningNode + targetReplicas := ordinal + + scaledDown, err := r.verifyRunningCount(ctx, targetReplicas) + if err != nil || !scaledDown { + // This should be done only when the pod disappears from the cluster + return err + } + + adminAPI, err := r.getAdminAPIClient(ctx) + if err != nil { + return err + } + + r.logger.Info("Forcing deletion of maintenance mode for the decommissioned node", "node_id", ordinal) + err = adminAPI.DisableMaintenanceMode(ctx, int(ordinal)) + if err != nil { + var httpErr *admin.HTTPResponseError + if errors.As(err, &httpErr) { + if httpErr.Response != nil && httpErr.Response.StatusCode/100 == 4 { + // Cluster says we don't need to do it + r.logger.Info("No need to disable maintenance mode on the decommissioned node", "node_id", ordinal, "status_code", httpErr.Response.StatusCode) + return nil + } + } + return fmt.Errorf("could not disable maintenance mode on decommissioning node %d: %w", ordinal, err) + } + r.logger.Info("Maintenance mode disabled for the decommissioned node", "node_id", ordinal) + return nil +} + // verifyRunningCount checks if the statefulset is configured to run the given amount of replicas and that also pods match the expectations func (r *StatefulSetResource) verifyRunningCount( ctx context.Context, replicas int32, @@ -281,6 +333,7 @@ func (r *StatefulSetResource) verifyRunningCount( if err != nil { return false, fmt.Errorf("could not list pods for checking replicas: %w", err) } + return len(podList.Items) == int(replicas), nil } From b1e3facc9eefb9c6b7916212a6809752eb2065bb Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Wed, 15 Jun 2022 13:17:58 +0200 Subject: [PATCH 17/18] operator: make decommission wait interval configurable and add jitter --- src/go/k8s/main.go | 4 +++- src/go/k8s/pkg/resources/statefulset_scale.go | 13 +++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/go/k8s/main.go b/src/go/k8s/main.go index 25b34231c0bd..ae803047d336 100644 --- a/src/go/k8s/main.go +++ b/src/go/k8s/main.go @@ -56,6 +56,7 @@ func main() { configuratorBaseImage string configuratorTag string configuratorImagePullPolicy string + decommissionWaitInterval time.Duration ) flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") @@ -68,6 +69,7 @@ func main() { flag.StringVar(&configuratorBaseImage, "configurator-base-image", defaultConfiguratorContainerImage, "Set the configurator base image") flag.StringVar(&configuratorTag, "configurator-tag", "latest", "Set the configurator tag") flag.StringVar(&configuratorImagePullPolicy, "configurator-image-pull-policy", "Always", "Set the configurator image pull policy") + flag.DurationVar(&decommissionWaitInterval, "decommission-wait-interval", 8*time.Second, "Set the time to wait for a node decommission to happen in the cluster") opts := zap.Options{ Development: true, @@ -103,7 +105,7 @@ func main() { Log: ctrl.Log.WithName("controllers").WithName("redpanda").WithName("Cluster"), Scheme: mgr.GetScheme(), AdminAPIClientFactory: adminutils.NewInternalAdminAPI, - DecommissionWaitInterval: 10 * time.Second, + DecommissionWaitInterval: decommissionWaitInterval, }).WithClusterDomain(clusterDomain).WithConfiguratorSettings(configurator).SetupWithManager(mgr); err != nil { setupLog.Error(err, "Unable to create controller", "controller", "Cluster") os.Exit(1) diff --git a/src/go/k8s/pkg/resources/statefulset_scale.go b/src/go/k8s/pkg/resources/statefulset_scale.go index c76927b8b403..47b323b0f13a 100644 --- a/src/go/k8s/pkg/resources/statefulset_scale.go +++ b/src/go/k8s/pkg/resources/statefulset_scale.go @@ -22,9 +22,14 @@ import ( "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" ) +const ( + decommissionWaitJitterFactor = 0.2 +) + // handleScaling is responsible for managing the current number of replicas running for a cluster. // // Replicas are controlled via the field `status.currentReplicas` that is set in the current method and should be @@ -93,7 +98,7 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error { } if !formed { return &RequeueAfterError{ - RequeueAfter: r.decommissionWaitInterval, + RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor), Msg: fmt.Sprintf("Waiting for cluster to be formed before upscaling to %d replicas", *r.pandaCluster.Spec.Replicas), } } @@ -155,7 +160,7 @@ func (r *StatefulSetResource) handleDecommission(ctx context.Context) error { // Wait until the node is fully drained (or wait forever if the cluster does not allow decommissioning of that specific node) return &RequeueAfterError{ - RequeueAfter: r.decommissionWaitInterval, + RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor), Msg: fmt.Sprintf("Waiting for node %d to be decommissioned from cluster", broker.NodeID), } } @@ -174,7 +179,7 @@ func (r *StatefulSetResource) handleDecommission(ctx context.Context) error { } if !scaledDown { return &RequeueAfterError{ - RequeueAfter: r.decommissionWaitInterval, + RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor), Msg: fmt.Sprintf("Waiting for statefulset to downscale to %d replicas", targetReplicas), } } @@ -232,7 +237,7 @@ func (r *StatefulSetResource) handleRecommission(ctx context.Context) error { r.logger.Info("Node marked for being recommissioned in cluster", "node_id", *r.pandaCluster.Status.DecommissioningNode) return &RequeueAfterError{ - RequeueAfter: r.decommissionWaitInterval, + RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor), Msg: fmt.Sprintf("Waiting for node %d to be recommissioned into cluster %s", *r.pandaCluster.Status.DecommissioningNode, r.pandaCluster.Name), } } From 5bd42df31171939a7dc40dab736eede10e108af1 Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Thu, 16 Jun 2022 10:55:50 +0200 Subject: [PATCH 18/18] operator: mark downscaling as alpha feature and add a startup flag We should enable downscaling as feature gate when issue with reusable node IDs is fixed. --- .../apis/redpanda/v1alpha1/cluster_webhook.go | 18 ++++++++++++ src/go/k8s/main.go | 1 + .../k8s/tests/e2e/decommission/00-assert.yaml | 12 ++++---- .../decommission/00-enable-downscaling.yaml | 6 ++++ .../k8s/tests/e2e/decommission/01-assert.yaml | 16 ++++------ ...-cluster.yaml => 01-redpanda-cluster.yaml} | 0 .../k8s/tests/e2e/decommission/02-assert.yaml | 15 +++++++--- .../{01-probe.yaml => 02-probe.yaml} | 0 .../k8s/tests/e2e/decommission/03-assert.yaml | 15 +++------- ...nscale.yaml => 03-redpanda-downscale.yaml} | 0 .../k8s/tests/e2e/decommission/04-assert.yaml | 29 +++++++++++++++++++ .../{03-probe.yaml => 04-probe.yaml} | 0 .../decommission/05-restore-downscaling.yaml | 5 ++++ .../k8s/tests/e2e/decommission/06-assert.yaml | 22 ++++++++++++++ 14 files changed, 106 insertions(+), 33 deletions(-) create mode 100644 src/go/k8s/tests/e2e/decommission/00-enable-downscaling.yaml rename src/go/k8s/tests/e2e/decommission/{00-redpanda-cluster.yaml => 01-redpanda-cluster.yaml} (100%) rename src/go/k8s/tests/e2e/decommission/{01-probe.yaml => 02-probe.yaml} (100%) rename src/go/k8s/tests/e2e/decommission/{02-redpanda-downscale.yaml => 03-redpanda-downscale.yaml} (100%) create mode 100644 src/go/k8s/tests/e2e/decommission/04-assert.yaml rename src/go/k8s/tests/e2e/decommission/{03-probe.yaml => 04-probe.yaml} (100%) create mode 100644 src/go/k8s/tests/e2e/decommission/05-restore-downscaling.yaml create mode 100644 src/go/k8s/tests/e2e/decommission/06-assert.yaml diff --git a/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go b/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go index f4c61169cf22..664054b6f81e 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go @@ -43,6 +43,11 @@ const ( defaultSchemaRegistryPort = 8081 ) +// AllowDownscalingInWebhook controls the downscaling alpha feature in the Cluster custom resource. +// Downscaling is not stable since nodeIDs are currently not reusable, so adding to a cluster a node +// that has previously been decommissioned can cause issues. +var AllowDownscalingInWebhook = false + type resourceField struct { resources *corev1.ResourceRequirements path *field.Path @@ -177,6 +182,8 @@ func (r *Cluster) ValidateUpdate(old runtime.Object) error { allErrs = append(allErrs, r.validateScaling()...) + allErrs = append(allErrs, r.validateDownscaling(oldCluster)...) + allErrs = append(allErrs, r.validateKafkaListeners()...) allErrs = append(allErrs, r.validateAdminListeners()...) @@ -227,6 +234,17 @@ func (r *Cluster) validateScaling() field.ErrorList { return allErrs } +func (r *Cluster) validateDownscaling(old *Cluster) field.ErrorList { + var allErrs field.ErrorList + if !AllowDownscalingInWebhook && old.Spec.Replicas != nil && r.Spec.Replicas != nil && *r.Spec.Replicas < *old.Spec.Replicas { + allErrs = append(allErrs, + field.Invalid(field.NewPath("spec").Child("replicas"), + r.Spec.Replicas, + "downscaling is an alpha feature: set --allow-downscaling in the controller parameters to enable it")) + } + return allErrs +} + func (r *Cluster) validateAdminListeners() field.ErrorList { var allErrs field.ErrorList externalAdmin := r.AdminAPIExternal() diff --git a/src/go/k8s/main.go b/src/go/k8s/main.go index ae803047d336..9b55f93d580f 100644 --- a/src/go/k8s/main.go +++ b/src/go/k8s/main.go @@ -70,6 +70,7 @@ func main() { flag.StringVar(&configuratorTag, "configurator-tag", "latest", "Set the configurator tag") flag.StringVar(&configuratorImagePullPolicy, "configurator-image-pull-policy", "Always", "Set the configurator image pull policy") flag.DurationVar(&decommissionWaitInterval, "decommission-wait-interval", 8*time.Second, "Set the time to wait for a node decommission to happen in the cluster") + flag.BoolVar(&redpandav1alpha1.AllowDownscalingInWebhook, "allow-downscaling", false, "Allow to reduce the number of replicas in existing clusters (alpha feature)") opts := zap.Options{ Development: true, diff --git a/src/go/k8s/tests/e2e/decommission/00-assert.yaml b/src/go/k8s/tests/e2e/decommission/00-assert.yaml index f94701ae5a68..d2fa081322b9 100644 --- a/src/go/k8s/tests/e2e/decommission/00-assert.yaml +++ b/src/go/k8s/tests/e2e/decommission/00-assert.yaml @@ -1,10 +1,8 @@ -apiVersion: redpanda.vectorized.io/v1alpha1 -kind: Cluster -metadata: - name: decommissioning -status: - replicas: 3 - currentReplicas: 3 +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +commands: + - command: kubectl rollout status deployment redpanda-controller-manager -n redpanda-system + - command: hack/wait-for-webhook-ready.sh --- apiVersion: kuttl.dev/v1beta1 diff --git a/src/go/k8s/tests/e2e/decommission/00-enable-downscaling.yaml b/src/go/k8s/tests/e2e/decommission/00-enable-downscaling.yaml new file mode 100644 index 000000000000..3aa82a014111 --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/00-enable-downscaling.yaml @@ -0,0 +1,6 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: +# Set the --allow-downscaling=true parameter in the controller manager +- command: kubectl patch deployment redpanda-controller-manager -n redpanda-system --type='json' -p='[{"op":"add", "path":"/spec/template/spec/containers/1/args/-", "value":"--allow-downscaling=true"}]' +- command: kubectl get deployment redpanda-controller-manager -n redpanda-system -o json diff --git a/src/go/k8s/tests/e2e/decommission/01-assert.yaml b/src/go/k8s/tests/e2e/decommission/01-assert.yaml index a3c57ca43421..f94701ae5a68 100644 --- a/src/go/k8s/tests/e2e/decommission/01-assert.yaml +++ b/src/go/k8s/tests/e2e/decommission/01-assert.yaml @@ -1,16 +1,10 @@ -apiVersion: v1 -kind: Pod +apiVersion: redpanda.vectorized.io/v1alpha1 +kind: Cluster metadata: - labels: - job-name: wait-for-3-brokers + name: decommissioning status: - containerStatuses: - - name: curl - state: - terminated: - message: | - 3 - phase: Succeeded + replicas: 3 + currentReplicas: 3 --- apiVersion: kuttl.dev/v1beta1 diff --git a/src/go/k8s/tests/e2e/decommission/00-redpanda-cluster.yaml b/src/go/k8s/tests/e2e/decommission/01-redpanda-cluster.yaml similarity index 100% rename from src/go/k8s/tests/e2e/decommission/00-redpanda-cluster.yaml rename to src/go/k8s/tests/e2e/decommission/01-redpanda-cluster.yaml diff --git a/src/go/k8s/tests/e2e/decommission/02-assert.yaml b/src/go/k8s/tests/e2e/decommission/02-assert.yaml index ff279070c069..a3c57ca43421 100644 --- a/src/go/k8s/tests/e2e/decommission/02-assert.yaml +++ b/src/go/k8s/tests/e2e/decommission/02-assert.yaml @@ -1,9 +1,16 @@ -apiVersion: redpanda.vectorized.io/v1alpha1 -kind: Cluster +apiVersion: v1 +kind: Pod metadata: - name: decommissioning + labels: + job-name: wait-for-3-brokers status: - replicas: 2 + containerStatuses: + - name: curl + state: + terminated: + message: | + 3 + phase: Succeeded --- apiVersion: kuttl.dev/v1beta1 diff --git a/src/go/k8s/tests/e2e/decommission/01-probe.yaml b/src/go/k8s/tests/e2e/decommission/02-probe.yaml similarity index 100% rename from src/go/k8s/tests/e2e/decommission/01-probe.yaml rename to src/go/k8s/tests/e2e/decommission/02-probe.yaml diff --git a/src/go/k8s/tests/e2e/decommission/03-assert.yaml b/src/go/k8s/tests/e2e/decommission/03-assert.yaml index 875ada95dc8a..ff279070c069 100644 --- a/src/go/k8s/tests/e2e/decommission/03-assert.yaml +++ b/src/go/k8s/tests/e2e/decommission/03-assert.yaml @@ -1,16 +1,9 @@ -apiVersion: v1 -kind: Pod +apiVersion: redpanda.vectorized.io/v1alpha1 +kind: Cluster metadata: - labels: - job-name: wait-for-2-brokers + name: decommissioning status: - containerStatuses: - - name: curl - state: - terminated: - message: | - 2 - phase: Succeeded + replicas: 2 --- apiVersion: kuttl.dev/v1beta1 diff --git a/src/go/k8s/tests/e2e/decommission/02-redpanda-downscale.yaml b/src/go/k8s/tests/e2e/decommission/03-redpanda-downscale.yaml similarity index 100% rename from src/go/k8s/tests/e2e/decommission/02-redpanda-downscale.yaml rename to src/go/k8s/tests/e2e/decommission/03-redpanda-downscale.yaml diff --git a/src/go/k8s/tests/e2e/decommission/04-assert.yaml b/src/go/k8s/tests/e2e/decommission/04-assert.yaml new file mode 100644 index 000000000000..875ada95dc8a --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/04-assert.yaml @@ -0,0 +1,29 @@ +apiVersion: v1 +kind: Pod +metadata: + labels: + job-name: wait-for-2-brokers +status: + containerStatuses: + - name: curl + state: + terminated: + message: | + 2 + phase: Succeeded +--- + +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +collectors: + - type: pod + selector: app.kubernetes.io/name=redpanda + tail: -1 + - type: pod + namespace: redpanda-system + selector: control-plane=controller-manager + tail: -1 + - type: command + command: kubectl get clusters -o jsonpath={@} -n $NAMESPACE + - type: command + command: kubectl get pods -o jsonpath={@} -n $NAMESPACE diff --git a/src/go/k8s/tests/e2e/decommission/03-probe.yaml b/src/go/k8s/tests/e2e/decommission/04-probe.yaml similarity index 100% rename from src/go/k8s/tests/e2e/decommission/03-probe.yaml rename to src/go/k8s/tests/e2e/decommission/04-probe.yaml diff --git a/src/go/k8s/tests/e2e/decommission/05-restore-downscaling.yaml b/src/go/k8s/tests/e2e/decommission/05-restore-downscaling.yaml new file mode 100644 index 000000000000..e4d660bfe6b9 --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/05-restore-downscaling.yaml @@ -0,0 +1,5 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: +# Downscaling will be set to the default value +- command: kubectl patch deployment redpanda-controller-manager -n redpanda-system --type='json' -p='[{"op":"remove", "path":"/spec/template/spec/containers/1/args/4"}]' diff --git a/src/go/k8s/tests/e2e/decommission/06-assert.yaml b/src/go/k8s/tests/e2e/decommission/06-assert.yaml new file mode 100644 index 000000000000..522965a7af16 --- /dev/null +++ b/src/go/k8s/tests/e2e/decommission/06-assert.yaml @@ -0,0 +1,22 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +commands: + - command: kubectl rollout status deployment redpanda-controller-manager -n redpanda-system + - command: hack/wait-for-webhook-ready.sh + +--- + +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +collectors: + - type: pod + selector: app.kubernetes.io/name=redpanda + tail: -1 + - type: pod + namespace: redpanda-system + selector: control-plane=controller-manager + tail: -1 + - type: command + command: kubectl get clusters -o jsonpath={@} -n $NAMESPACE + - type: command + command: kubectl get pods -o jsonpath={@} -n $NAMESPACE