diff --git a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go index 86868919d699..5afd7688ef74 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go @@ -856,6 +856,30 @@ func (s *ClusterStatus) SetRestarting(restarting bool) { s.DeprecatedUpgrading = restarting } +// GetCurrentReplicas returns the current number of replicas that the controller wants to run. +// It returns 1 when not initialized (as fresh clusters start from 1 replica) +func (r *Cluster) GetCurrentReplicas() int32 { + if r.Status.CurrentReplicas <= 0 { + // Not initialized, let's give the computed value + return r.ComputeInitialCurrentReplicasField() + } + return r.Status.CurrentReplicas +} + +// ComputeInitialCurrentReplicasField calculates the initial value for status.currentReplicas. +// +// It needs to consider the following cases: +// - Fresh cluster: we start from 1 replicas, then upscale if needed (initialization to bypass https://github.com/redpanda-data/redpanda/issues/333) +// - Existing clusters: we keep spec.replicas as starting point +func (r *Cluster) ComputeInitialCurrentReplicasField() int32 { + if r.Status.Replicas > 1 || r.Status.ReadyReplicas > 1 || len(r.Status.Nodes.Internal) > 1 { + // A cluster seems to be already running, we start from the existing amount of replicas + return *r.Spec.Replicas + } + // Clusters start from a single replica, then upscale + return 1 +} + // TLSConfig is a generic TLS configuration type TLSConfig struct { Enabled bool `json:"enabled,omitempty"` diff --git a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go index 5203cae07751..e4a8e2fe3c1a 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/utils/pointer" ) // nolint:funlen // this is ok for a test @@ -216,3 +217,19 @@ func TestConditions(t *testing.T) { assert.Equal(t, condTime, cond2.LastTransitionTime) }) } + +func TestInitialReplicas(t *testing.T) { + cluster := v1alpha1.Cluster{} + cluster.Spec.Replicas = pointer.Int32(3) + assert.Equal(t, int32(1), cluster.GetCurrentReplicas()) + cluster.Status.Replicas = 2 + assert.Equal(t, int32(3), cluster.GetCurrentReplicas()) + cluster.Status.Replicas = 0 + cluster.Status.ReadyReplicas = 2 + assert.Equal(t, int32(3), cluster.GetCurrentReplicas()) + cluster.Status.ReadyReplicas = 0 + cluster.Status.Nodes.Internal = []string{"1", "2"} + assert.Equal(t, int32(3), cluster.GetCurrentReplicas()) + cluster.Status.Nodes.Internal = nil + assert.Equal(t, int32(1), cluster.GetCurrentReplicas()) +} diff --git a/src/go/k8s/cmd/configurator/main.go b/src/go/k8s/cmd/configurator/main.go index b019540579ec..8175c337f0c9 100644 --- a/src/go/k8s/cmd/configurator/main.go +++ b/src/go/k8s/cmd/configurator/main.go @@ -131,9 +131,11 @@ func main() { cfg.Redpanda.ID = int(hostIndex) - // First Redpanda node need to have cleared seed servers in order - // to form raft group 0 - if hostIndex == 0 { + // In case of a single seed server, the list should contain the current node itself. + // Normally the cluster is able to recognize it's talking to itself, except when the cluster is + // configured to use mutual TLS on the Kafka API (see Helm test). + // So, we clear the list of seeds to help Redpanda. + if len(cfg.Redpanda.SeedServers) == 1 { cfg.Redpanda.SeedServers = []config.SeedServer{} } diff --git a/src/go/k8s/pkg/admin/admin.go b/src/go/k8s/pkg/admin/admin.go index ea575687ddf1..1512dc6e25a5 100644 --- a/src/go/k8s/pkg/admin/admin.go +++ b/src/go/k8s/pkg/admin/admin.go @@ -56,16 +56,13 @@ func NewInternalAdminAPI( if len(ordinals) == 0 { // Not a specific node, just go through all them - replicas := redpandaCluster.Status.CurrentReplicas - if replicas <= 0 { - replicas = *redpandaCluster.Spec.Replicas - } + replicas := redpandaCluster.GetCurrentReplicas() for i := int32(0); i < replicas; i++ { ordinals = append(ordinals, i) } } - var urls []string + urls := make([]string, 0, len(ordinals)) for _, on := range ordinals { urls = append(urls, fmt.Sprintf("%s-%d.%s:%d", redpandaCluster.Name, on, fqdn, adminInternalPort)) } diff --git a/src/go/k8s/pkg/resources/configmap.go b/src/go/k8s/pkg/resources/configmap.go index 6e2593deb7f2..8a24e9977264 100644 --- a/src/go/k8s/pkg/resources/configmap.go +++ b/src/go/k8s/pkg/resources/configmap.go @@ -331,7 +331,7 @@ func (r *ConfigMapResource) CreateConfiguration( cfg.SetAdditionalRedpandaProperty("log_segment_size", logSegmentSize) - replicas := *r.pandaCluster.Spec.Replicas + replicas := r.pandaCluster.GetCurrentReplicas() for i := int32(0); i < replicas; i++ { cr.SeedServers = append(cr.SeedServers, config.SeedServer{ Host: config.SocketAddress{ @@ -454,7 +454,7 @@ func (r *ConfigMapResource) preparePandaproxyClient( return nil } - replicas := *r.pandaCluster.Spec.Replicas + replicas := r.pandaCluster.GetCurrentReplicas() cfg.NodeConfiguration.PandaproxyClient = &config.KafkaClient{} for i := int32(0); i < replicas; i++ { cfg.NodeConfiguration.PandaproxyClient.Brokers = append(cfg.NodeConfiguration.PandaproxyClient.Brokers, config.SocketAddress{ @@ -493,7 +493,7 @@ func (r *ConfigMapResource) prepareSchemaRegistryClient( return nil } - replicas := *r.pandaCluster.Spec.Replicas + replicas := r.pandaCluster.GetCurrentReplicas() cfg.NodeConfiguration.SchemaRegistryClient = &config.KafkaClient{} for i := int32(0); i < replicas; i++ { cfg.NodeConfiguration.SchemaRegistryClient.Brokers = append(cfg.NodeConfiguration.SchemaRegistryClient.Brokers, config.SocketAddress{ diff --git a/src/go/k8s/pkg/resources/statefulset.go b/src/go/k8s/pkg/resources/statefulset.go index 65d436d5471b..87c28b1ba1dc 100644 --- a/src/go/k8s/pkg/resources/statefulset.go +++ b/src/go/k8s/pkg/resources/statefulset.go @@ -279,11 +279,7 @@ func (r *StatefulSetResource) obj( tlsVolumes, tlsVolumeMounts := r.volumeProvider.Volumes() // We set statefulset replicas via status.currentReplicas in order to control it from the handleScaling function - replicas := r.pandaCluster.Status.CurrentReplicas - if replicas <= 0 { - // Until the state is initialized - replicas = *r.pandaCluster.Spec.Replicas - } + replicas := r.pandaCluster.GetCurrentReplicas() ss := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ diff --git a/src/go/k8s/pkg/resources/statefulset_scale.go b/src/go/k8s/pkg/resources/statefulset_scale.go index 7911c58ded4f..fece4bdc83d9 100644 --- a/src/go/k8s/pkg/resources/statefulset_scale.go +++ b/src/go/k8s/pkg/resources/statefulset_scale.go @@ -29,6 +29,7 @@ const ( ) // handleScaling is called to detect cases of replicas change and apply them to the cluster +// nolint:nestif // for clarity func (r *StatefulSetResource) handleScaling(ctx context.Context) error { if r.pandaCluster.Status.DecommissioningNode != nil { decommissionTargetReplicas := *r.pandaCluster.Status.DecommissioningNode @@ -40,8 +41,8 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error { } if r.pandaCluster.Status.CurrentReplicas == 0 { - // Initialize the status currentReplicas - r.pandaCluster.Status.CurrentReplicas = *r.pandaCluster.Spec.Replicas + // Initialize the currentReplicas field, so that it can be later controlled + r.pandaCluster.Status.CurrentReplicas = r.pandaCluster.ComputeInitialCurrentReplicasField() return r.Status().Update(ctx, r.pandaCluster) } @@ -52,6 +53,23 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error { if *r.pandaCluster.Spec.Replicas > r.pandaCluster.Status.CurrentReplicas { r.logger.Info("Upscaling cluster", "replicas", *r.pandaCluster.Spec.Replicas) + + // We care about upscaling only when the cluster is moving off 1 replica, which happen e.g. at cluster startup + if r.pandaCluster.Status.CurrentReplicas == 1 { + r.logger.Info("Waiting for first node to form a cluster before upscaling") + formed, err := r.isClusterFormed(ctx) + if err != nil { + return err + } + if !formed { + return &RequeueAfterError{ + RequeueAfter: DecommissionRequeueDuration, + Msg: fmt.Sprintf("Waiting for cluster to be formed before upscaling to %d replicas", *r.pandaCluster.Spec.Replicas), + } + } + r.logger.Info("Initial cluster has been formed") + } + // Upscaling request: this is already handled by Redpanda, so we just increase status currentReplicas return setCurrentReplicas(ctx, r, r.pandaCluster, *r.pandaCluster.Spec.Replicas, r.logger) } @@ -183,6 +201,21 @@ func (r *StatefulSetResource) getAdminAPIClient( return adminutils.NewInternalAdminAPI(ctx, r, r.pandaCluster, r.serviceFQDN, r.adminTLSConfigProvider, ordinals...) } +func (r *StatefulSetResource) isClusterFormed( + ctx context.Context, +) (bool, error) { + rootNodeAdminAPI, err := r.getAdminAPIClient(ctx, 0) + if err != nil { + return false, err + } + brokers, err := rootNodeAdminAPI.Brokers(ctx) + if err != nil { + // Eat the error and return that the cluster is not formed + return false, nil + } + return len(brokers) > 0, nil +} + // verifyRunningCount checks if the statefulset is configured to run the given amount of replicas and that also pods match the expectations func (r *StatefulSetResource) verifyRunningCount( ctx context.Context, replicas int32,