From bef2ba2829f08128efab4b07e0166e917e07742d Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Wed, 15 Jun 2022 13:17:58 +0200 Subject: [PATCH] operator: make decommission wait interval configurable and add jitter --- src/go/k8s/main.go | 4 +++- src/go/k8s/pkg/resources/statefulset_scale.go | 13 +++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/go/k8s/main.go b/src/go/k8s/main.go index 25b34231c0bdf..ae803047d3369 100644 --- a/src/go/k8s/main.go +++ b/src/go/k8s/main.go @@ -56,6 +56,7 @@ func main() { configuratorBaseImage string configuratorTag string configuratorImagePullPolicy string + decommissionWaitInterval time.Duration ) flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") @@ -68,6 +69,7 @@ func main() { flag.StringVar(&configuratorBaseImage, "configurator-base-image", defaultConfiguratorContainerImage, "Set the configurator base image") flag.StringVar(&configuratorTag, "configurator-tag", "latest", "Set the configurator tag") flag.StringVar(&configuratorImagePullPolicy, "configurator-image-pull-policy", "Always", "Set the configurator image pull policy") + flag.DurationVar(&decommissionWaitInterval, "decommission-wait-interval", 8*time.Second, "Set the time to wait for a node decommission to happen in the cluster") opts := zap.Options{ Development: true, @@ -103,7 +105,7 @@ func main() { Log: ctrl.Log.WithName("controllers").WithName("redpanda").WithName("Cluster"), Scheme: mgr.GetScheme(), AdminAPIClientFactory: adminutils.NewInternalAdminAPI, - DecommissionWaitInterval: 10 * time.Second, + DecommissionWaitInterval: decommissionWaitInterval, }).WithClusterDomain(clusterDomain).WithConfiguratorSettings(configurator).SetupWithManager(mgr); err != nil { setupLog.Error(err, "Unable to create controller", "controller", "Cluster") os.Exit(1) diff --git a/src/go/k8s/pkg/resources/statefulset_scale.go b/src/go/k8s/pkg/resources/statefulset_scale.go index c76927b8b4036..47b323b0f13a3 100644 --- a/src/go/k8s/pkg/resources/statefulset_scale.go +++ b/src/go/k8s/pkg/resources/statefulset_scale.go @@ -22,9 +22,14 @@ import ( "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" ) +const ( + decommissionWaitJitterFactor = 0.2 +) + // handleScaling is responsible for managing the current number of replicas running for a cluster. // // Replicas are controlled via the field `status.currentReplicas` that is set in the current method and should be @@ -93,7 +98,7 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error { } if !formed { return &RequeueAfterError{ - RequeueAfter: r.decommissionWaitInterval, + RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor), Msg: fmt.Sprintf("Waiting for cluster to be formed before upscaling to %d replicas", *r.pandaCluster.Spec.Replicas), } } @@ -155,7 +160,7 @@ func (r *StatefulSetResource) handleDecommission(ctx context.Context) error { // Wait until the node is fully drained (or wait forever if the cluster does not allow decommissioning of that specific node) return &RequeueAfterError{ - RequeueAfter: r.decommissionWaitInterval, + RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor), Msg: fmt.Sprintf("Waiting for node %d to be decommissioned from cluster", broker.NodeID), } } @@ -174,7 +179,7 @@ func (r *StatefulSetResource) handleDecommission(ctx context.Context) error { } if !scaledDown { return &RequeueAfterError{ - RequeueAfter: r.decommissionWaitInterval, + RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor), Msg: fmt.Sprintf("Waiting for statefulset to downscale to %d replicas", targetReplicas), } } @@ -232,7 +237,7 @@ func (r *StatefulSetResource) handleRecommission(ctx context.Context) error { r.logger.Info("Node marked for being recommissioned in cluster", "node_id", *r.pandaCluster.Status.DecommissioningNode) return &RequeueAfterError{ - RequeueAfter: r.decommissionWaitInterval, + RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor), Msg: fmt.Sprintf("Waiting for node %d to be recommissioned into cluster %s", *r.pandaCluster.Status.DecommissioningNode, r.pandaCluster.Name), } }