Skip to content

Commit

Permalink
operator: add scale handler to properly decommission and recommission…
Browse files Browse the repository at this point in the history
… nodes

This adds a handler that correctly manages upscaling and downscaling the cluster, decommissioning nodes wheh needed.

The handler uses `status.currentReplicas` to signal the amount of replicas that all subcontrollers should materialize.
When a cluster is downscaled, the handler first tries to decommission the last node via admin API, then decreases the value of `status.currentReplicas`, to remove the node only when the cluster allows it.

In case the cluster refuses to decommission a node (e.g. min replicas on a topic higher than the desired number of nodes), the user can increase `spec.replicas` to trigger a recommission of the node.
  • Loading branch information
nicolaferraro committed Jun 16, 2022
1 parent 33c1034 commit 4d74d4f
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 11 deletions.
5 changes: 4 additions & 1 deletion src/go/k8s/pkg/resources/resource_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
res "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -76,6 +77,8 @@ func TestEnsure_StatefulSet(t *testing.T) {
cluster := pandaCluster()
cluster = cluster.DeepCopy()
cluster.Name = "ensure-integration-cluster"
err := c.Create(context.Background(), cluster)
require.NoError(t, err)

sts := res.NewStatefulSet(
c,
Expand All @@ -95,7 +98,7 @@ func TestEnsure_StatefulSet(t *testing.T) {
func(ctx context.Context) (string, error) { return hash, nil },
ctrl.Log.WithName("test"))

err := sts.Ensure(context.Background())
err = sts.Ensure(context.Background())
assert.NoError(t, err)

actual := &v1.StatefulSet{}
Expand Down
19 changes: 17 additions & 2 deletions src/go/k8s/pkg/resources/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,15 @@ func (r *StatefulSetResource) Ensure(ctx context.Context) error {
return fmt.Errorf("error while fetching StatefulSet resource: %w", err)
}
r.LastObservedState = &sts

r.logger.Info("Running update", "resource name", r.Key().Name)
return r.runUpdate(ctx, &sts, obj.(*appsv1.StatefulSet))
err = r.runUpdate(ctx, &sts, obj.(*appsv1.StatefulSet))
if err != nil {
return err
}

r.logger.Info("Running scale handler", "resource name", r.Key().Name)
return r.handleScaling(ctx)
}

// GetCentralizedConfigurationHashFromCluster retrieves the current centralized configuratino hash from the statefulset
Expand Down Expand Up @@ -270,6 +277,14 @@ func (r *StatefulSetResource) obj(
externalAddressType = externalListener.External.PreferredAddressType
}
tlsVolumes, tlsVolumeMounts := r.volumeProvider.Volumes()

// We set statefulset replicas via status.currentReplicas in order to control it from the handleScaling function
replicas := r.pandaCluster.Status.CurrentReplicas
if replicas <= 0 {
// Until the state is initialized
replicas = *r.pandaCluster.Spec.Replicas
}

ss := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: r.Key().Namespace,
Expand All @@ -281,7 +296,7 @@ func (r *StatefulSetResource) obj(
APIVersion: "apps/v1",
},
Spec: appsv1.StatefulSetSpec{
Replicas: r.pandaCluster.Spec.Replicas,
Replicas: &replicas,
PodManagementPolicy: appsv1.ParallelPodManagement,
Selector: clusterLabels.AsAPISelector(),
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Expand Down
255 changes: 255 additions & 0 deletions src/go/k8s/pkg/resources/statefulset_scale.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
// Copyright 2022 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package resources

import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
)

const (
DecommissionRequeueDuration = time.Second * 10
)

// handleScaling is called to detect cases of replicas change and apply them to the cluster
func (r *StatefulSetResource) handleScaling(ctx context.Context) error {
if r.pandaCluster.Status.DecommissioningNode != nil {
decommissionTargetReplicas := *r.pandaCluster.Status.DecommissioningNode
if *r.pandaCluster.Spec.Replicas > decommissionTargetReplicas {
// Decommissioning can also be canceled and we need to recommission
return r.handleRecommission(ctx)
}
return r.handleDecommission(ctx)
}

if r.pandaCluster.Status.CurrentReplicas == 0 {
// Initialize the status currentReplicas
r.pandaCluster.Status.CurrentReplicas = *r.pandaCluster.Spec.Replicas
return r.Status().Update(ctx, r.pandaCluster)
}

if *r.pandaCluster.Spec.Replicas == r.pandaCluster.Status.CurrentReplicas {
// No changes to replicas, we do nothing here
return nil
}

if *r.pandaCluster.Spec.Replicas > r.pandaCluster.Status.CurrentReplicas {
r.logger.Info("Upscaling cluster", "replicas", *r.pandaCluster.Spec.Replicas)
// Upscaling request: this is already handled by Redpanda, so we just increase status currentReplicas
return setCurrentReplicas(ctx, r, r.pandaCluster, *r.pandaCluster.Spec.Replicas, r.logger)
}

// User required replicas is lower than current replicas (currentReplicas): start the decommissioning process
targetOrdinal := r.pandaCluster.Status.CurrentReplicas - 1 // Always decommission last node
r.logger.Info("Start decommission of last broker node", "ordinal", targetOrdinal)
r.pandaCluster.Status.DecommissioningNode = &targetOrdinal
return r.Status().Update(ctx, r.pandaCluster)
}

// handleDecommission manages cases of node decommissioning
func (r *StatefulSetResource) handleDecommission(ctx context.Context) error {
targetReplicas := *r.pandaCluster.Status.DecommissioningNode
r.logger.Info("Handling cluster in decommissioning phase", "target replicas", targetReplicas)

adminAPI, err := r.getAdminAPIClient(ctx)
if err != nil {
return err
}

broker, err := getNodeInfoFromCluster(ctx, *r.pandaCluster.Status.DecommissioningNode, adminAPI)
if err != nil {
return err
}

if broker != nil {
r.logger.Info("Broker still exists in the cluster", "node_id", broker.NodeID)
if broker.MembershipStatus != admin.MembershipStatusDraining {
// We ask to decommission since it does not seem done
err = adminAPI.DecommissionBroker(ctx, broker.NodeID)
if err != nil {
return fmt.Errorf("error while trying to decommission node %d in cluster %s: %w", broker.NodeID, r.pandaCluster.Name, err)
}
r.logger.Info("Node marked for decommissioning in cluster", "node_id", broker.NodeID)
}

// The draining phase must always be completed with all nodes running, to let single-replica partitions be transferred.
// The value may diverge in case we restarted the process after a complete scale down.
drainingReplicas := targetReplicas + 1
if r.pandaCluster.Status.CurrentReplicas != drainingReplicas {
return setCurrentReplicas(ctx, r, r.pandaCluster, drainingReplicas, r.logger)
}

// Wait until the node is fully drained (or wait forever if the cluster does not allow decommissioning of that specific node)
return &RequeueAfterError{
RequeueAfter: DecommissionRequeueDuration,
Msg: fmt.Sprintf("Waiting for node %d to be decommissioned from cluster", broker.NodeID),
}
}

// Broker is now missing from cluster API
r.logger.Info("Node is not registered in the cluster: initializing downscale", "node_id", *r.pandaCluster.Status.DecommissioningNode)

// We set status.currentReplicas accordingly to trigger scaling down of the statefulset
if err = setCurrentReplicas(ctx, r, r.pandaCluster, targetReplicas, r.logger); err != nil {
return err
}

scaledDown, err := r.verifyRunningCount(ctx, targetReplicas)
if err != nil {
return err
}
if !scaledDown {
return &RequeueAfterError{
RequeueAfter: DecommissionRequeueDuration,
Msg: fmt.Sprintf("Waiting for statefulset to downscale to %d replicas", targetReplicas),
}
}

// There's a chance that the node was initially not present in the broker list, but appeared after we started to scale down.
// Since the node may hold data that need to be propagated to other nodes, we need to restart it to let the decommission process finish.
broker, err = getNodeInfoFromCluster(ctx, *r.pandaCluster.Status.DecommissioningNode, adminAPI)
if err != nil {
return err
}
if broker != nil {
// Node reappeared in the cluster, we restart the process to handle it
return &NodeReappearingError{NodeID: broker.NodeID}
}

r.logger.Info("Decommissioning process successfully completed", "node_id", *r.pandaCluster.Status.DecommissioningNode)
r.pandaCluster.Status.DecommissioningNode = nil
return r.Status().Update(ctx, r.pandaCluster)
}

// handleRecommission manages cases of nodes being recommissioned after a failed/wrong decommission
func (r *StatefulSetResource) handleRecommission(ctx context.Context) error {
r.logger.Info("Handling cluster in recommissioning phase")

// First we ensure we've enough replicas to let the recommissioning node run
targetReplicas := *r.pandaCluster.Status.DecommissioningNode + 1
err := setCurrentReplicas(ctx, r, r.pandaCluster, targetReplicas, r.logger)
if err != nil {
return err
}

adminAPI, err := r.getAdminAPIClient(ctx)
if err != nil {
return err
}

broker, err := getNodeInfoFromCluster(ctx, *r.pandaCluster.Status.DecommissioningNode, adminAPI)
if err != nil {
return err
}

if broker == nil || broker.MembershipStatus != admin.MembershipStatusActive {
err = adminAPI.RecommissionBroker(ctx, int(*r.pandaCluster.Status.DecommissioningNode))
if err != nil {
return fmt.Errorf("error while trying to recommission node %d in cluster %s: %w", *r.pandaCluster.Status.DecommissioningNode, r.pandaCluster.Name, err)
}
r.logger.Info("Node marked for being recommissioned in cluster", "node_id", *r.pandaCluster.Status.DecommissioningNode)

return &RequeueAfterError{
RequeueAfter: DecommissionRequeueDuration,
Msg: fmt.Sprintf("Waiting for node %d to be recommissioned into cluster %s", *r.pandaCluster.Status.DecommissioningNode, r.pandaCluster.Name),
}
}

r.logger.Info("Recommissioning process successfully completed", "node_id", *r.pandaCluster.Status.DecommissioningNode)
r.pandaCluster.Status.DecommissioningNode = nil
return r.Status().Update(ctx, r.pandaCluster)
}

func (r *StatefulSetResource) getAdminAPIClient(
ctx context.Context, ordinals ...int32,
) (adminutils.AdminAPIClient, error) {
return adminutils.NewInternalAdminAPI(ctx, r, r.pandaCluster, r.serviceFQDN, r.adminTLSConfigProvider, ordinals...)
}

// verifyRunningCount checks if the statefulset is configured to run the given amount of replicas and that also pods match the expectations
func (r *StatefulSetResource) verifyRunningCount(
ctx context.Context, replicas int32,
) (bool, error) {
var sts appsv1.StatefulSet
if err := r.Get(ctx, r.Key(), &sts); err != nil {
return false, fmt.Errorf("could not get statefulset for checking replicas: %w", err)
}
if sts.Spec.Replicas == nil || *sts.Spec.Replicas != replicas || sts.Status.Replicas != replicas {
return false, nil
}

var podList corev1.PodList
err := r.List(ctx, &podList, &k8sclient.ListOptions{
Namespace: r.pandaCluster.Namespace,
LabelSelector: labels.ForCluster(r.pandaCluster).AsClientSelector(),
})
if err != nil {
return false, fmt.Errorf("could not list pods for checking replicas: %w", err)
}
return len(podList.Items) == int(replicas), nil
}

// getNodeInfoFromCluster allows to get broker information using the admin API
func getNodeInfoFromCluster(
ctx context.Context, ordinal int32, adminAPI adminutils.AdminAPIClient,
) (*admin.Broker, error) {
brokers, err := adminAPI.Brokers(ctx)
if err != nil {
return nil, fmt.Errorf("could not get the list of brokers for checking decommission: %w", err)
}
for i := range brokers {
if brokers[i].NodeID == int(ordinal) {
return &brokers[i], nil
}
}
return nil, nil
}

// setCurrentReplicas allows to set the number of status.currentReplicas in the CR, which in turns controls the replicas
// assigned to the StatefulSet
func setCurrentReplicas(
ctx context.Context,
c k8sclient.Client,
pandaCluster *redpandav1alpha1.Cluster,
replicas int32,
logger logr.Logger,
) error {
if pandaCluster.Status.CurrentReplicas == replicas {
// Skip if already done
return nil
}

logger.Info("Scaling StatefulSet", "replicas", replicas)
pandaCluster.Status.CurrentReplicas = replicas
if err := c.Status().Update(ctx, pandaCluster); err != nil {
return fmt.Errorf("could not scale cluster %s to %d replicas: %w", pandaCluster.Name, replicas, err)
}
logger.Info("StatefulSet scaled", "replicas", replicas)
return nil
}

// NodeReappearingError indicates that a node has appeared in the cluster before completion of the a direct downscale
type NodeReappearingError struct {
NodeID int
}

func (e *NodeReappearingError) Error() string {
return fmt.Sprintf("node has appeared in the cluster with id=%d", e.NodeID)
}
8 changes: 0 additions & 8 deletions src/go/k8s/pkg/resources/statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ func TestEnsure(t *testing.T) {
cluster := pandaCluster()
stsResource := stsFromCluster(cluster)

var newReplicas int32 = 3333

replicasUpdatedCluster := cluster.DeepCopy()
replicasUpdatedCluster.Spec.Replicas = &newReplicas
replicasUpdatedSts := stsFromCluster(cluster).DeepCopy()
replicasUpdatedSts.Spec.Replicas = &newReplicas

newResources := corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1111"),
corev1.ResourceMemory: resource.MustParse("2222Gi"),
Expand Down Expand Up @@ -76,7 +69,6 @@ func TestEnsure(t *testing.T) {
expectedObject *v1.StatefulSet
}{
{"none existing", nil, cluster, stsResource},
{"update replicas", stsResource, replicasUpdatedCluster, replicasUpdatedSts},
{"update resources", stsResource, resourcesUpdatedCluster, resourcesUpdatedSts},
{"update redpanda resources", stsResource, resourcesUpdatedRedpandaCluster, resourcesUpdatedSts},
{"disabled sidecar", nil, noSidecarCluster, noSidecarSts},
Expand Down

0 comments on commit 4d74d4f

Please sign in to comment.