Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator: add support for downscaling #5019

Merged
merged 18 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
2711976
operator: split readyReplicas from replicas and add currentReplicas
nicolaferraro Jun 13, 2022
8b3559d
operator: add decommissioningNode status field
nicolaferraro Jun 13, 2022
0131182
operator: change webhook to allow decommissioning
nicolaferraro May 27, 2022
d6dac46
operator: move types to their own package to avoid dependency loop
nicolaferraro May 26, 2022
e9ecea7
rpk: add enum for membership status
nicolaferraro Jun 8, 2022
f8a7bcd
operator: allow scoping internal admin API to specific nodes
nicolaferraro Jun 8, 2022
1913822
operator: remove stack trace from logs when delay is requested
nicolaferraro May 27, 2022
33c1034
operator: enable decommission API functions in internal admin API
nicolaferraro Jun 14, 2022
4d74d4f
operator: add scale handler to properly decommission and recommission…
nicolaferraro Jun 15, 2022
5d62c76
operator: implement progressive initialization to let node 0 create i…
nicolaferraro Jun 13, 2022
d48e957
operator: consider draining field when checking maintenance mode status
nicolaferraro Jun 3, 2022
2b02d34
operator: add controller tests for scaling
nicolaferraro Jun 10, 2022
3b7b77e
operator: add kuttl test for decommission
nicolaferraro Jun 3, 2022
e9bba61
operator: disable maintenance mode hook on node 0 when starting up a …
nicolaferraro Jun 9, 2022
90812c1
operator: add documentation to the scale handler
nicolaferraro Jun 9, 2022
a9efa0a
operator: fix maintenance mode activation on decommissioning node (wo…
nicolaferraro Jun 10, 2022
b1e3fac
operator: make decommission wait interval configurable and add jitter
nicolaferraro Jun 15, 2022
5bd42df
operator: mark downscaling as alpha feature and add a startup flag
nicolaferraro Jun 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,15 @@ type ClusterStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file

// Replicas show how many nodes are working in the cluster
// Replicas show how many nodes have been created for the cluster
// +optional
Replicas int32 `json:"replicas"`
// ReadyReplicas is the number of Pods belonging to the cluster that have a Ready Condition.
// +optional
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
// CurrentReplicas is the number of Pods that the controller currently wants to run for the cluster.
// +optional
CurrentReplicas int32 `json:"currentReplicas,omitempty"`
// Nodes of the provisioned redpanda nodes
// +optional
Nodes NodesList `json:"nodes,omitempty"`
Expand All @@ -306,6 +312,9 @@ type ClusterStatus struct {
// Indicates that a cluster is restarting due to an upgrade or a different reason
// +optional
Restarting bool `json:"restarting"`
// Indicates that a node is currently being decommissioned from the cluster and provides its ordinal number
// +optional
DecommissioningNode *int32 `json:"decommissioningNode,omitempty"`
// Current version of the cluster.
// +optional
Version string `json:"version"`
Expand Down Expand Up @@ -847,6 +856,30 @@ func (s *ClusterStatus) SetRestarting(restarting bool) {
s.DeprecatedUpgrading = restarting
}

// GetCurrentReplicas returns the current number of replicas that the controller wants to run.
// It returns 1 when not initialized (as fresh clusters start from 1 replica)
func (r *Cluster) GetCurrentReplicas() int32 {
if r.Status.CurrentReplicas <= 0 {
// Not initialized, let's give the computed value
return r.ComputeInitialCurrentReplicasField()
}
return r.Status.CurrentReplicas
}

// ComputeInitialCurrentReplicasField calculates the initial value for status.currentReplicas.
//
// It needs to consider the following cases:
// - Fresh cluster: we start from 1 replicas, then upscale if needed (initialization to bypass https://github.com/redpanda-data/redpanda/issues/333)
// - Existing clusters: we keep spec.replicas as starting point
func (r *Cluster) ComputeInitialCurrentReplicasField() int32 {
if r.Status.Replicas > 1 || r.Status.ReadyReplicas > 1 || len(r.Status.Nodes.Internal) > 1 {
// A cluster seems to be already running, we start from the existing amount of replicas
return *r.Spec.Replicas
}
// Clusters start from a single replica, then upscale
return 1
}

// TLSConfig is a generic TLS configuration
type TLSConfig struct {
Enabled bool `json:"enabled,omitempty"`
Expand Down
17 changes: 17 additions & 0 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/utils/pointer"
)

// nolint:funlen // this is ok for a test
Expand Down Expand Up @@ -216,3 +217,19 @@ func TestConditions(t *testing.T) {
assert.Equal(t, condTime, cond2.LastTransitionTime)
})
}

func TestInitialReplicas(t *testing.T) {
cluster := v1alpha1.Cluster{}
cluster.Spec.Replicas = pointer.Int32(3)
assert.Equal(t, int32(1), cluster.GetCurrentReplicas())
cluster.Status.Replicas = 2
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.Replicas = 0
cluster.Status.ReadyReplicas = 2
RafalKorepta marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.ReadyReplicas = 0
cluster.Status.Nodes.Internal = []string{"1", "2"}
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.Nodes.Internal = nil
assert.Equal(t, int32(1), cluster.GetCurrentReplicas())
}
45 changes: 39 additions & 6 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ const (
defaultSchemaRegistryPort = 8081
)

// AllowDownscalingInWebhook controls the downscaling alpha feature in the Cluster custom resource.
// Downscaling is not stable since nodeIDs are currently not reusable, so adding to a cluster a node
// that has previously been decommissioned can cause issues.
Comment on lines +46 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to describe in commit message or in this comment what consequences might happen if someone will downscale the cluster while Kafka clients are still connect.

cc @jcsp @mmaslankaprv

var AllowDownscalingInWebhook = false

type resourceField struct {
resources *corev1.ResourceRequirements
path *field.Path
Expand Down Expand Up @@ -136,6 +141,8 @@ func (r *Cluster) ValidateCreate() error {

var allErrs field.ErrorList

allErrs = append(allErrs, r.validateScaling()...)

allErrs = append(allErrs, r.validateKafkaListeners()...)

allErrs = append(allErrs, r.validateAdminListeners()...)
Expand Down Expand Up @@ -173,12 +180,10 @@ func (r *Cluster) ValidateUpdate(old runtime.Object) error {
oldCluster := old.(*Cluster)
var allErrs field.ErrorList

if r.Spec.Replicas != nil && oldCluster.Spec.Replicas != nil && *r.Spec.Replicas < *oldCluster.Spec.Replicas {
allErrs = append(allErrs,
field.Invalid(field.NewPath("spec").Child("replicas"),
r.Spec.Replicas,
"scaling down is not supported"))
}
allErrs = append(allErrs, r.validateScaling()...)
alenkacz marked this conversation as resolved.
Show resolved Hide resolved

allErrs = append(allErrs, r.validateDownscaling(oldCluster)...)

allErrs = append(allErrs, r.validateKafkaListeners()...)

allErrs = append(allErrs, r.validateAdminListeners()...)
Expand Down Expand Up @@ -212,6 +217,34 @@ func (r *Cluster) ValidateUpdate(old runtime.Object) error {
r.Name, allErrs)
}

func (r *Cluster) validateScaling() field.ErrorList {
var allErrs field.ErrorList
if r.Spec.Replicas == nil {
allErrs = append(allErrs,
field.Invalid(field.NewPath("spec").Child("replicas"),
r.Spec.Replicas,
"replicas must be specified explicitly"))
} else if *r.Spec.Replicas <= 0 {
allErrs = append(allErrs,
field.Invalid(field.NewPath("spec").Child("replicas"),
r.Spec.Replicas,
"downscaling is not allowed to less than 1 instance"))
}

return allErrs
}

func (r *Cluster) validateDownscaling(old *Cluster) field.ErrorList {
var allErrs field.ErrorList
if !AllowDownscalingInWebhook && old.Spec.Replicas != nil && r.Spec.Replicas != nil && *r.Spec.Replicas < *old.Spec.Replicas {
allErrs = append(allErrs,
field.Invalid(field.NewPath("spec").Child("replicas"),
r.Spec.Replicas,
"downscaling is an alpha feature: set --allow-downscaling in the controller parameters to enable it"))
}
return allErrs
}

func (r *Cluster) validateAdminListeners() field.ErrorList {
var allErrs field.ErrorList
externalAdmin := r.AdminAPIExternal()
Expand Down
19 changes: 15 additions & 4 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
cmmeta "github.com/jetstack/cert-manager/pkg/apis/meta/v1"
"github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -200,16 +201,16 @@ func TestDefault(t *testing.T) {
}

func TestValidateUpdate(t *testing.T) {
var replicas1 int32 = 1
var replicas2 int32 = 2
var replicas0 int32
var replicas3 int32 = 3

redpandaCluster := &v1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "",
},
Spec: v1alpha1.ClusterSpec{
Replicas: pointer.Int32Ptr(replicas2),
Replicas: pointer.Int32Ptr(replicas3),
Configuration: v1alpha1.RedpandaConfig{},
Resources: v1alpha1.RedpandaResourceRequirements{
ResourceRequirements: corev1.ResourceRequirements{
Expand All @@ -224,7 +225,7 @@ func TestValidateUpdate(t *testing.T) {
}

updatedCluster := redpandaCluster.DeepCopy()
updatedCluster.Spec.Replicas = &replicas1
updatedCluster.Spec.Replicas = &replicas0
updatedCluster.Spec.Configuration = v1alpha1.RedpandaConfig{
KafkaAPI: []v1alpha1.KafkaAPI{
{
Expand Down Expand Up @@ -272,6 +273,15 @@ func TestValidateUpdate(t *testing.T) {
}
}

func TestNilReplicasIsNotAllowed(t *testing.T) {
rpCluster := validRedpandaCluster()
err := rpCluster.ValidateCreate()
require.Nil(t, err, "Initial cluster is not valid")
rpCluster.Spec.Replicas = nil
err = rpCluster.ValidateCreate()
assert.Error(t, err)
}

//nolint:funlen // this is ok for a test
func TestValidateUpdate_NoError(t *testing.T) {
var replicas2 int32 = 2
Expand Down Expand Up @@ -1097,6 +1107,7 @@ func validRedpandaCluster() *v1alpha1.Cluster {
Namespace: "",
},
Spec: v1alpha1.ClusterSpec{
Replicas: pointer.Int32Ptr(1),
Configuration: v1alpha1.RedpandaConfig{
KafkaAPI: []v1alpha1.KafkaAPI{{Port: 124}},
AdminAPI: []v1alpha1.AdminAPI{{Port: 126}},
Expand Down
5 changes: 5 additions & 0 deletions src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions src/go/k8s/cmd/configurator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,11 @@ func main() {

cfg.Redpanda.ID = int(hostIndex)

// First Redpanda node need to have cleared seed servers in order
// to form raft group 0
if hostIndex == 0 {
// In case of a single seed server, the list should contain the current node itself.
// Normally the cluster is able to recognize it's talking to itself, except when the cluster is
// configured to use mutual TLS on the Kafka API (see Helm test).
// So, we clear the list of seeds to help Redpanda.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know why is that? the nodes does not talk to itself by mtls. Is this a bug in redpanda they are looking to fix at one point?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm going to add a comment about this in the seeds server issue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if len(cfg.Redpanda.SeedServers) == 1 {
cfg.Redpanda.SeedServers = []config.SeedServer{}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,16 @@ spec:
- type
type: object
type: array
currentReplicas:
description: CurrentReplicas is the number of Pods that the controller
currently wants to run for the cluster.
format: int32
type: integer
decommissioningNode:
description: Indicates that a node is currently being decommissioned
from the cluster and provides its ordinal number
format: int32
type: integer
nodes:
description: Nodes of the provisioned redpanda nodes
properties:
Expand Down Expand Up @@ -908,8 +918,14 @@ spec:
type: string
type: object
type: object
readyReplicas:
description: ReadyReplicas is the number of Pods belonging to the
cluster that have a Ready Condition.
format: int32
type: integer
replicas:
description: Replicas show how many nodes are working in the cluster
description: Replicas show how many nodes have been created for the
cluster
format: int32
type: integer
restarting:
Expand Down
25 changes: 16 additions & 9 deletions src/go/k8s/controllers/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"reflect"
"strings"
"time"

"github.com/go-logr/logr"
redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/networking"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/certmanager"
resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -45,11 +47,12 @@ var (
// ClusterReconciler reconciles a Cluster object
type ClusterReconciler struct {
client.Client
Log logr.Logger
configuratorSettings resources.ConfiguratorSettings
clusterDomain string
Scheme *runtime.Scheme
AdminAPIClientFactory adminutils.AdminAPIClientFactory
Log logr.Logger
configuratorSettings resources.ConfiguratorSettings
clusterDomain string
Scheme *runtime.Scheme
AdminAPIClientFactory adminutils.AdminAPIClientFactory
DecommissionWaitInterval time.Duration
}

//+kubebuilder:rbac:groups=redpanda.vectorized.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -155,6 +158,8 @@ func (r *ClusterReconciler) Reconcile(
sa.Key().Name,
r.configuratorSettings,
configMapResource.GetNodeConfigHash,
r.AdminAPIClientFactory,
r.DecommissionWaitInterval,
log)

toApply := []resources.Reconciler{
Expand All @@ -179,7 +184,7 @@ func (r *ClusterReconciler) Reconcile(

var e *resources.RequeueAfterError
if errors.As(err, &e) {
log.Error(e, e.Msg)
log.Info(e.Error())
return ctrl.Result{RequeueAfter: e.RequeueAfter}, nil
}

Expand Down Expand Up @@ -324,7 +329,8 @@ func (r *ClusterReconciler) reportStatus(
}

cluster.Status.Nodes = *nodeList
cluster.Status.Replicas = sts.LastObservedState.Status.ReadyReplicas
cluster.Status.ReadyReplicas = sts.LastObservedState.Status.ReadyReplicas
cluster.Status.Replicas = sts.LastObservedState.Status.Replicas
cluster.Status.Version = sts.Version()

err = r.Status().Update(ctx, &cluster)
Expand Down Expand Up @@ -353,7 +359,8 @@ func statusShouldBeUpdated(
!reflect.DeepEqual(nodeList.ExternalPandaproxy, status.Nodes.ExternalPandaproxy) ||
!reflect.DeepEqual(nodeList.SchemaRegistry, status.Nodes.SchemaRegistry) ||
!reflect.DeepEqual(nodeList.ExternalBootstrap, status.Nodes.ExternalBootstrap)) ||
status.Replicas != sts.LastObservedState.Status.ReadyReplicas ||
status.Replicas != sts.LastObservedState.Status.Replicas ||
status.ReadyReplicas != sts.LastObservedState.Status.ReadyReplicas ||
nicolaferraro marked this conversation as resolved.
Show resolved Hide resolved
status.Version != sts.Version()
}

Expand Down Expand Up @@ -495,7 +502,7 @@ func (r *ClusterReconciler) setInitialSuperUserPassword(
ctx context.Context,
redpandaCluster *redpandav1alpha1.Cluster,
fqdn string,
adminTLSConfigProvider resources.AdminTLSConfigProvider,
adminTLSConfigProvider resourcetypes.AdminTLSConfigProvider,
objs []types.NamespacedName,
) error {
adminAPI, err := r.AdminAPIClientFactory(ctx, r, redpandaCluster, fqdn, adminTLSConfigProvider)
Expand Down
Loading