Skip to content

Commit

Permalink
Merge pull request #5019 from nicolaferraro/decommission
Browse files Browse the repository at this point in the history
Operator: add support for downscaling
  • Loading branch information
nicolaferraro committed Jun 21, 2022
2 parents afb991d + 5bd42df commit 035ed04
Show file tree
Hide file tree
Showing 37 changed files with 1,508 additions and 144 deletions.
35 changes: 34 additions & 1 deletion src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,15 @@ type ClusterStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file

// Replicas show how many nodes are working in the cluster
// Replicas show how many nodes have been created for the cluster
// +optional
Replicas int32 `json:"replicas"`
// ReadyReplicas is the number of Pods belonging to the cluster that have a Ready Condition.
// +optional
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
// CurrentReplicas is the number of Pods that the controller currently wants to run for the cluster.
// +optional
CurrentReplicas int32 `json:"currentReplicas,omitempty"`
// Nodes of the provisioned redpanda nodes
// +optional
Nodes NodesList `json:"nodes,omitempty"`
Expand All @@ -306,6 +312,9 @@ type ClusterStatus struct {
// Indicates that a cluster is restarting due to an upgrade or a different reason
// +optional
Restarting bool `json:"restarting"`
// Indicates that a node is currently being decommissioned from the cluster and provides its ordinal number
// +optional
DecommissioningNode *int32 `json:"decommissioningNode,omitempty"`
// Current version of the cluster.
// +optional
Version string `json:"version"`
Expand Down Expand Up @@ -847,6 +856,30 @@ func (s *ClusterStatus) SetRestarting(restarting bool) {
s.DeprecatedUpgrading = restarting
}

// GetCurrentReplicas returns the current number of replicas that the controller wants to run.
// It returns 1 when not initialized (as fresh clusters start from 1 replica)
func (r *Cluster) GetCurrentReplicas() int32 {
if r.Status.CurrentReplicas <= 0 {
// Not initialized, let's give the computed value
return r.ComputeInitialCurrentReplicasField()
}
return r.Status.CurrentReplicas
}

// ComputeInitialCurrentReplicasField calculates the initial value for status.currentReplicas.
//
// It needs to consider the following cases:
// - Fresh cluster: we start from 1 replicas, then upscale if needed (initialization to bypass https://github.com/redpanda-data/redpanda/issues/333)
// - Existing clusters: we keep spec.replicas as starting point
func (r *Cluster) ComputeInitialCurrentReplicasField() int32 {
if r.Status.Replicas > 1 || r.Status.ReadyReplicas > 1 || len(r.Status.Nodes.Internal) > 1 {
// A cluster seems to be already running, we start from the existing amount of replicas
return *r.Spec.Replicas
}
// Clusters start from a single replica, then upscale
return 1
}

// TLSConfig is a generic TLS configuration
type TLSConfig struct {
Enabled bool `json:"enabled,omitempty"`
Expand Down
17 changes: 17 additions & 0 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/utils/pointer"
)

// nolint:funlen // this is ok for a test
Expand Down Expand Up @@ -216,3 +217,19 @@ func TestConditions(t *testing.T) {
assert.Equal(t, condTime, cond2.LastTransitionTime)
})
}

func TestInitialReplicas(t *testing.T) {
cluster := v1alpha1.Cluster{}
cluster.Spec.Replicas = pointer.Int32(3)
assert.Equal(t, int32(1), cluster.GetCurrentReplicas())
cluster.Status.Replicas = 2
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.Replicas = 0
cluster.Status.ReadyReplicas = 2
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.ReadyReplicas = 0
cluster.Status.Nodes.Internal = []string{"1", "2"}
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.Nodes.Internal = nil
assert.Equal(t, int32(1), cluster.GetCurrentReplicas())
}
45 changes: 39 additions & 6 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ const (
defaultSchemaRegistryPort = 8081
)

// AllowDownscalingInWebhook controls the downscaling alpha feature in the Cluster custom resource.
// Downscaling is not stable since nodeIDs are currently not reusable, so adding to a cluster a node
// that has previously been decommissioned can cause issues.
var AllowDownscalingInWebhook = false

type resourceField struct {
resources *corev1.ResourceRequirements
path *field.Path
Expand Down Expand Up @@ -136,6 +141,8 @@ func (r *Cluster) ValidateCreate() error {

var allErrs field.ErrorList

allErrs = append(allErrs, r.validateScaling()...)

allErrs = append(allErrs, r.validateKafkaListeners()...)

allErrs = append(allErrs, r.validateAdminListeners()...)
Expand Down Expand Up @@ -173,12 +180,10 @@ func (r *Cluster) ValidateUpdate(old runtime.Object) error {
oldCluster := old.(*Cluster)
var allErrs field.ErrorList

if r.Spec.Replicas != nil && oldCluster.Spec.Replicas != nil && *r.Spec.Replicas < *oldCluster.Spec.Replicas {
allErrs = append(allErrs,
field.Invalid(field.NewPath("spec").Child("replicas"),
r.Spec.Replicas,
"scaling down is not supported"))
}
allErrs = append(allErrs, r.validateScaling()...)

allErrs = append(allErrs, r.validateDownscaling(oldCluster)...)

allErrs = append(allErrs, r.validateKafkaListeners()...)

allErrs = append(allErrs, r.validateAdminListeners()...)
Expand Down Expand Up @@ -212,6 +217,34 @@ func (r *Cluster) ValidateUpdate(old runtime.Object) error {
r.Name, allErrs)
}

func (r *Cluster) validateScaling() field.ErrorList {
var allErrs field.ErrorList
if r.Spec.Replicas == nil {
allErrs = append(allErrs,
field.Invalid(field.NewPath("spec").Child("replicas"),
r.Spec.Replicas,
"replicas must be specified explicitly"))
} else if *r.Spec.Replicas <= 0 {
allErrs = append(allErrs,
field.Invalid(field.NewPath("spec").Child("replicas"),
r.Spec.Replicas,
"downscaling is not allowed to less than 1 instance"))
}

return allErrs
}

func (r *Cluster) validateDownscaling(old *Cluster) field.ErrorList {
var allErrs field.ErrorList
if !AllowDownscalingInWebhook && old.Spec.Replicas != nil && r.Spec.Replicas != nil && *r.Spec.Replicas < *old.Spec.Replicas {
allErrs = append(allErrs,
field.Invalid(field.NewPath("spec").Child("replicas"),
r.Spec.Replicas,
"downscaling is an alpha feature: set --allow-downscaling in the controller parameters to enable it"))
}
return allErrs
}

func (r *Cluster) validateAdminListeners() field.ErrorList {
var allErrs field.ErrorList
externalAdmin := r.AdminAPIExternal()
Expand Down
19 changes: 15 additions & 4 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
cmmeta "github.com/jetstack/cert-manager/pkg/apis/meta/v1"
"github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -200,16 +201,16 @@ func TestDefault(t *testing.T) {
}

func TestValidateUpdate(t *testing.T) {
var replicas1 int32 = 1
var replicas2 int32 = 2
var replicas0 int32
var replicas3 int32 = 3

redpandaCluster := &v1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "",
},
Spec: v1alpha1.ClusterSpec{
Replicas: pointer.Int32Ptr(replicas2),
Replicas: pointer.Int32Ptr(replicas3),
Configuration: v1alpha1.RedpandaConfig{},
Resources: v1alpha1.RedpandaResourceRequirements{
ResourceRequirements: corev1.ResourceRequirements{
Expand All @@ -224,7 +225,7 @@ func TestValidateUpdate(t *testing.T) {
}

updatedCluster := redpandaCluster.DeepCopy()
updatedCluster.Spec.Replicas = &replicas1
updatedCluster.Spec.Replicas = &replicas0
updatedCluster.Spec.Configuration = v1alpha1.RedpandaConfig{
KafkaAPI: []v1alpha1.KafkaAPI{
{
Expand Down Expand Up @@ -272,6 +273,15 @@ func TestValidateUpdate(t *testing.T) {
}
}

func TestNilReplicasIsNotAllowed(t *testing.T) {
rpCluster := validRedpandaCluster()
err := rpCluster.ValidateCreate()
require.Nil(t, err, "Initial cluster is not valid")
rpCluster.Spec.Replicas = nil
err = rpCluster.ValidateCreate()
assert.Error(t, err)
}

//nolint:funlen // this is ok for a test
func TestValidateUpdate_NoError(t *testing.T) {
var replicas2 int32 = 2
Expand Down Expand Up @@ -1097,6 +1107,7 @@ func validRedpandaCluster() *v1alpha1.Cluster {
Namespace: "",
},
Spec: v1alpha1.ClusterSpec{
Replicas: pointer.Int32Ptr(1),
Configuration: v1alpha1.RedpandaConfig{
KafkaAPI: []v1alpha1.KafkaAPI{{Port: 124}},
AdminAPI: []v1alpha1.AdminAPI{{Port: 126}},
Expand Down
5 changes: 5 additions & 0 deletions src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions src/go/k8s/cmd/configurator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,11 @@ func main() {

cfg.Redpanda.ID = int(hostIndex)

// First Redpanda node need to have cleared seed servers in order
// to form raft group 0
if hostIndex == 0 {
// In case of a single seed server, the list should contain the current node itself.
// Normally the cluster is able to recognize it's talking to itself, except when the cluster is
// configured to use mutual TLS on the Kafka API (see Helm test).
// So, we clear the list of seeds to help Redpanda.
if len(cfg.Redpanda.SeedServers) == 1 {
cfg.Redpanda.SeedServers = []config.SeedServer{}
}

Expand Down
18 changes: 17 additions & 1 deletion src/go/k8s/config/crd/bases/redpanda.vectorized.io_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,16 @@ spec:
- type
type: object
type: array
currentReplicas:
description: CurrentReplicas is the number of Pods that the controller
currently wants to run for the cluster.
format: int32
type: integer
decommissioningNode:
description: Indicates that a node is currently being decommissioned
from the cluster and provides its ordinal number
format: int32
type: integer
nodes:
description: Nodes of the provisioned redpanda nodes
properties:
Expand Down Expand Up @@ -908,8 +918,14 @@ spec:
type: string
type: object
type: object
readyReplicas:
description: ReadyReplicas is the number of Pods belonging to the
cluster that have a Ready Condition.
format: int32
type: integer
replicas:
description: Replicas show how many nodes are working in the cluster
description: Replicas show how many nodes have been created for the
cluster
format: int32
type: integer
restarting:
Expand Down
25 changes: 16 additions & 9 deletions src/go/k8s/controllers/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"reflect"
"strings"
"time"

"github.com/go-logr/logr"
redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/networking"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/certmanager"
resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -45,11 +47,12 @@ var (
// ClusterReconciler reconciles a Cluster object
type ClusterReconciler struct {
client.Client
Log logr.Logger
configuratorSettings resources.ConfiguratorSettings
clusterDomain string
Scheme *runtime.Scheme
AdminAPIClientFactory adminutils.AdminAPIClientFactory
Log logr.Logger
configuratorSettings resources.ConfiguratorSettings
clusterDomain string
Scheme *runtime.Scheme
AdminAPIClientFactory adminutils.AdminAPIClientFactory
DecommissionWaitInterval time.Duration
}

//+kubebuilder:rbac:groups=redpanda.vectorized.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -155,6 +158,8 @@ func (r *ClusterReconciler) Reconcile(
sa.Key().Name,
r.configuratorSettings,
configMapResource.GetNodeConfigHash,
r.AdminAPIClientFactory,
r.DecommissionWaitInterval,
log)

toApply := []resources.Reconciler{
Expand All @@ -179,7 +184,7 @@ func (r *ClusterReconciler) Reconcile(

var e *resources.RequeueAfterError
if errors.As(err, &e) {
log.Error(e, e.Msg)
log.Info(e.Error())
return ctrl.Result{RequeueAfter: e.RequeueAfter}, nil
}

Expand Down Expand Up @@ -324,7 +329,8 @@ func (r *ClusterReconciler) reportStatus(
}

cluster.Status.Nodes = *nodeList
cluster.Status.Replicas = sts.LastObservedState.Status.ReadyReplicas
cluster.Status.ReadyReplicas = sts.LastObservedState.Status.ReadyReplicas
cluster.Status.Replicas = sts.LastObservedState.Status.Replicas
cluster.Status.Version = sts.Version()

err = r.Status().Update(ctx, &cluster)
Expand Down Expand Up @@ -353,7 +359,8 @@ func statusShouldBeUpdated(
!reflect.DeepEqual(nodeList.ExternalPandaproxy, status.Nodes.ExternalPandaproxy) ||
!reflect.DeepEqual(nodeList.SchemaRegistry, status.Nodes.SchemaRegistry) ||
!reflect.DeepEqual(nodeList.ExternalBootstrap, status.Nodes.ExternalBootstrap)) ||
status.Replicas != sts.LastObservedState.Status.ReadyReplicas ||
status.Replicas != sts.LastObservedState.Status.Replicas ||
status.ReadyReplicas != sts.LastObservedState.Status.ReadyReplicas ||
status.Version != sts.Version()
}

Expand Down Expand Up @@ -495,7 +502,7 @@ func (r *ClusterReconciler) setInitialSuperUserPassword(
ctx context.Context,
redpandaCluster *redpandav1alpha1.Cluster,
fqdn string,
adminTLSConfigProvider resources.AdminTLSConfigProvider,
adminTLSConfigProvider resourcetypes.AdminTLSConfigProvider,
objs []types.NamespacedName,
) error {
adminAPI, err := r.AdminAPIClientFactory(ctx, r, redpandaCluster, fqdn, adminTLSConfigProvider)
Expand Down
Loading

0 comments on commit 035ed04

Please sign in to comment.