Skip to content

Commit

Permalink
operator: make decommission wait interval configurable and add jitter
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolaferraro committed Jun 15, 2022
1 parent 9f2f5d1 commit bef2ba2
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
4 changes: 3 additions & 1 deletion src/go/k8s/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func main() {
configuratorBaseImage string
configuratorTag string
configuratorImagePullPolicy string
decommissionWaitInterval time.Duration
)

flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
Expand All @@ -68,6 +69,7 @@ func main() {
flag.StringVar(&configuratorBaseImage, "configurator-base-image", defaultConfiguratorContainerImage, "Set the configurator base image")
flag.StringVar(&configuratorTag, "configurator-tag", "latest", "Set the configurator tag")
flag.StringVar(&configuratorImagePullPolicy, "configurator-image-pull-policy", "Always", "Set the configurator image pull policy")
flag.DurationVar(&decommissionWaitInterval, "decommission-wait-interval", 8*time.Second, "Set the time to wait for a node decommission to happen in the cluster")

opts := zap.Options{
Development: true,
Expand Down Expand Up @@ -103,7 +105,7 @@ func main() {
Log: ctrl.Log.WithName("controllers").WithName("redpanda").WithName("Cluster"),
Scheme: mgr.GetScheme(),
AdminAPIClientFactory: adminutils.NewInternalAdminAPI,
DecommissionWaitInterval: 10 * time.Second,
DecommissionWaitInterval: decommissionWaitInterval,
}).WithClusterDomain(clusterDomain).WithConfiguratorSettings(configurator).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "Cluster")
os.Exit(1)
Expand Down
13 changes: 9 additions & 4 deletions src/go/k8s/pkg/resources/statefulset_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ import (
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
)

const (
decommissionWaitJitterFactor = 0.2
)

// handleScaling is responsible for managing the current number of replicas running for a cluster.
//
// Replicas are controlled via the field `status.currentReplicas` that is set in the current method and should be
Expand Down Expand Up @@ -93,7 +98,7 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error {
}
if !formed {
return &RequeueAfterError{
RequeueAfter: r.decommissionWaitInterval,
RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor),
Msg: fmt.Sprintf("Waiting for cluster to be formed before upscaling to %d replicas", *r.pandaCluster.Spec.Replicas),
}
}
Expand Down Expand Up @@ -155,7 +160,7 @@ func (r *StatefulSetResource) handleDecommission(ctx context.Context) error {

// Wait until the node is fully drained (or wait forever if the cluster does not allow decommissioning of that specific node)
return &RequeueAfterError{
RequeueAfter: r.decommissionWaitInterval,
RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor),
Msg: fmt.Sprintf("Waiting for node %d to be decommissioned from cluster", broker.NodeID),
}
}
Expand All @@ -174,7 +179,7 @@ func (r *StatefulSetResource) handleDecommission(ctx context.Context) error {
}
if !scaledDown {
return &RequeueAfterError{
RequeueAfter: r.decommissionWaitInterval,
RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor),
Msg: fmt.Sprintf("Waiting for statefulset to downscale to %d replicas", targetReplicas),
}
}
Expand Down Expand Up @@ -232,7 +237,7 @@ func (r *StatefulSetResource) handleRecommission(ctx context.Context) error {
r.logger.Info("Node marked for being recommissioned in cluster", "node_id", *r.pandaCluster.Status.DecommissioningNode)

return &RequeueAfterError{
RequeueAfter: r.decommissionWaitInterval,
RequeueAfter: wait.Jitter(r.decommissionWaitInterval, decommissionWaitJitterFactor),
Msg: fmt.Sprintf("Waiting for node %d to be recommissioned into cluster %s", *r.pandaCluster.Status.DecommissioningNode, r.pandaCluster.Name),
}
}
Expand Down

0 comments on commit bef2ba2

Please sign in to comment.