From 5d62c760fdb08ffd5b0599a8fa3d98476eec8917 Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Mon, 13 Jun 2022 14:53:12 +0200 Subject: [PATCH] operator: implement progressive initialization to let node 0 create initial raft group This tries to solve the problem with empty seed_servers on node 0. With this change, all fresh clusters will be initially set to 1 replica (via `status.currentReplicas`), until a cluster is created and the operator can verify it via admin API. Then the cluster is scaled to the number of instances desired by the user. After the cluster is initialized, and for the entire lifetime of the cluster, the `seed_servers` property will be populated with the full list of available servers, in every node of the cluster. This overcomes https://github.com/redpanda-data/redpanda/issues/333. Previously, node 0 was always forced to have an empty seed_servers property, but this caused problems when it lost the data dir, as it tried to create a brand-new cluster. With this change, even if node 0 loses the data dir, the seed_servers property will always point to other nodes, so it will try to join the existing cluster. --- .../apis/redpanda/v1alpha1/cluster_types.go | 24 ++++++++++++ .../redpanda/v1alpha1/cluster_types_test.go | 17 +++++++++ src/go/k8s/cmd/configurator/main.go | 8 ++-- src/go/k8s/pkg/admin/admin.go | 7 +--- src/go/k8s/pkg/resources/configmap.go | 6 +-- src/go/k8s/pkg/resources/statefulset.go | 6 +-- src/go/k8s/pkg/resources/statefulset_scale.go | 37 ++++++++++++++++++- 7 files changed, 87 insertions(+), 18 deletions(-) diff --git a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go index 86868919d699..5afd7688ef74 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go @@ -856,6 +856,30 @@ func (s *ClusterStatus) SetRestarting(restarting bool) { s.DeprecatedUpgrading = restarting } +// GetCurrentReplicas returns the current number of replicas that the controller wants to run. +// It returns 1 when not initialized (as fresh clusters start from 1 replica) +func (r *Cluster) GetCurrentReplicas() int32 { + if r.Status.CurrentReplicas <= 0 { + // Not initialized, let's give the computed value + return r.ComputeInitialCurrentReplicasField() + } + return r.Status.CurrentReplicas +} + +// ComputeInitialCurrentReplicasField calculates the initial value for status.currentReplicas. +// +// It needs to consider the following cases: +// - Fresh cluster: we start from 1 replicas, then upscale if needed (initialization to bypass https://github.com/redpanda-data/redpanda/issues/333) +// - Existing clusters: we keep spec.replicas as starting point +func (r *Cluster) ComputeInitialCurrentReplicasField() int32 { + if r.Status.Replicas > 1 || r.Status.ReadyReplicas > 1 || len(r.Status.Nodes.Internal) > 1 { + // A cluster seems to be already running, we start from the existing amount of replicas + return *r.Spec.Replicas + } + // Clusters start from a single replica, then upscale + return 1 +} + // TLSConfig is a generic TLS configuration type TLSConfig struct { Enabled bool `json:"enabled,omitempty"` diff --git a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go index 5203cae07751..e4a8e2fe3c1a 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/utils/pointer" ) // nolint:funlen // this is ok for a test @@ -216,3 +217,19 @@ func TestConditions(t *testing.T) { assert.Equal(t, condTime, cond2.LastTransitionTime) }) } + +func TestInitialReplicas(t *testing.T) { + cluster := v1alpha1.Cluster{} + cluster.Spec.Replicas = pointer.Int32(3) + assert.Equal(t, int32(1), cluster.GetCurrentReplicas()) + cluster.Status.Replicas = 2 + assert.Equal(t, int32(3), cluster.GetCurrentReplicas()) + cluster.Status.Replicas = 0 + cluster.Status.ReadyReplicas = 2 + assert.Equal(t, int32(3), cluster.GetCurrentReplicas()) + cluster.Status.ReadyReplicas = 0 + cluster.Status.Nodes.Internal = []string{"1", "2"} + assert.Equal(t, int32(3), cluster.GetCurrentReplicas()) + cluster.Status.Nodes.Internal = nil + assert.Equal(t, int32(1), cluster.GetCurrentReplicas()) +} diff --git a/src/go/k8s/cmd/configurator/main.go b/src/go/k8s/cmd/configurator/main.go index b019540579ec..8175c337f0c9 100644 --- a/src/go/k8s/cmd/configurator/main.go +++ b/src/go/k8s/cmd/configurator/main.go @@ -131,9 +131,11 @@ func main() { cfg.Redpanda.ID = int(hostIndex) - // First Redpanda node need to have cleared seed servers in order - // to form raft group 0 - if hostIndex == 0 { + // In case of a single seed server, the list should contain the current node itself. + // Normally the cluster is able to recognize it's talking to itself, except when the cluster is + // configured to use mutual TLS on the Kafka API (see Helm test). + // So, we clear the list of seeds to help Redpanda. + if len(cfg.Redpanda.SeedServers) == 1 { cfg.Redpanda.SeedServers = []config.SeedServer{} } diff --git a/src/go/k8s/pkg/admin/admin.go b/src/go/k8s/pkg/admin/admin.go index ea575687ddf1..1512dc6e25a5 100644 --- a/src/go/k8s/pkg/admin/admin.go +++ b/src/go/k8s/pkg/admin/admin.go @@ -56,16 +56,13 @@ func NewInternalAdminAPI( if len(ordinals) == 0 { // Not a specific node, just go through all them - replicas := redpandaCluster.Status.CurrentReplicas - if replicas <= 0 { - replicas = *redpandaCluster.Spec.Replicas - } + replicas := redpandaCluster.GetCurrentReplicas() for i := int32(0); i < replicas; i++ { ordinals = append(ordinals, i) } } - var urls []string + urls := make([]string, 0, len(ordinals)) for _, on := range ordinals { urls = append(urls, fmt.Sprintf("%s-%d.%s:%d", redpandaCluster.Name, on, fqdn, adminInternalPort)) } diff --git a/src/go/k8s/pkg/resources/configmap.go b/src/go/k8s/pkg/resources/configmap.go index 6e2593deb7f2..8a24e9977264 100644 --- a/src/go/k8s/pkg/resources/configmap.go +++ b/src/go/k8s/pkg/resources/configmap.go @@ -331,7 +331,7 @@ func (r *ConfigMapResource) CreateConfiguration( cfg.SetAdditionalRedpandaProperty("log_segment_size", logSegmentSize) - replicas := *r.pandaCluster.Spec.Replicas + replicas := r.pandaCluster.GetCurrentReplicas() for i := int32(0); i < replicas; i++ { cr.SeedServers = append(cr.SeedServers, config.SeedServer{ Host: config.SocketAddress{ @@ -454,7 +454,7 @@ func (r *ConfigMapResource) preparePandaproxyClient( return nil } - replicas := *r.pandaCluster.Spec.Replicas + replicas := r.pandaCluster.GetCurrentReplicas() cfg.NodeConfiguration.PandaproxyClient = &config.KafkaClient{} for i := int32(0); i < replicas; i++ { cfg.NodeConfiguration.PandaproxyClient.Brokers = append(cfg.NodeConfiguration.PandaproxyClient.Brokers, config.SocketAddress{ @@ -493,7 +493,7 @@ func (r *ConfigMapResource) prepareSchemaRegistryClient( return nil } - replicas := *r.pandaCluster.Spec.Replicas + replicas := r.pandaCluster.GetCurrentReplicas() cfg.NodeConfiguration.SchemaRegistryClient = &config.KafkaClient{} for i := int32(0); i < replicas; i++ { cfg.NodeConfiguration.SchemaRegistryClient.Brokers = append(cfg.NodeConfiguration.SchemaRegistryClient.Brokers, config.SocketAddress{ diff --git a/src/go/k8s/pkg/resources/statefulset.go b/src/go/k8s/pkg/resources/statefulset.go index 65d436d5471b..87c28b1ba1dc 100644 --- a/src/go/k8s/pkg/resources/statefulset.go +++ b/src/go/k8s/pkg/resources/statefulset.go @@ -279,11 +279,7 @@ func (r *StatefulSetResource) obj( tlsVolumes, tlsVolumeMounts := r.volumeProvider.Volumes() // We set statefulset replicas via status.currentReplicas in order to control it from the handleScaling function - replicas := r.pandaCluster.Status.CurrentReplicas - if replicas <= 0 { - // Until the state is initialized - replicas = *r.pandaCluster.Spec.Replicas - } + replicas := r.pandaCluster.GetCurrentReplicas() ss := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ diff --git a/src/go/k8s/pkg/resources/statefulset_scale.go b/src/go/k8s/pkg/resources/statefulset_scale.go index 7911c58ded4f..fece4bdc83d9 100644 --- a/src/go/k8s/pkg/resources/statefulset_scale.go +++ b/src/go/k8s/pkg/resources/statefulset_scale.go @@ -29,6 +29,7 @@ const ( ) // handleScaling is called to detect cases of replicas change and apply them to the cluster +// nolint:nestif // for clarity func (r *StatefulSetResource) handleScaling(ctx context.Context) error { if r.pandaCluster.Status.DecommissioningNode != nil { decommissionTargetReplicas := *r.pandaCluster.Status.DecommissioningNode @@ -40,8 +41,8 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error { } if r.pandaCluster.Status.CurrentReplicas == 0 { - // Initialize the status currentReplicas - r.pandaCluster.Status.CurrentReplicas = *r.pandaCluster.Spec.Replicas + // Initialize the currentReplicas field, so that it can be later controlled + r.pandaCluster.Status.CurrentReplicas = r.pandaCluster.ComputeInitialCurrentReplicasField() return r.Status().Update(ctx, r.pandaCluster) } @@ -52,6 +53,23 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error { if *r.pandaCluster.Spec.Replicas > r.pandaCluster.Status.CurrentReplicas { r.logger.Info("Upscaling cluster", "replicas", *r.pandaCluster.Spec.Replicas) + + // We care about upscaling only when the cluster is moving off 1 replica, which happen e.g. at cluster startup + if r.pandaCluster.Status.CurrentReplicas == 1 { + r.logger.Info("Waiting for first node to form a cluster before upscaling") + formed, err := r.isClusterFormed(ctx) + if err != nil { + return err + } + if !formed { + return &RequeueAfterError{ + RequeueAfter: DecommissionRequeueDuration, + Msg: fmt.Sprintf("Waiting for cluster to be formed before upscaling to %d replicas", *r.pandaCluster.Spec.Replicas), + } + } + r.logger.Info("Initial cluster has been formed") + } + // Upscaling request: this is already handled by Redpanda, so we just increase status currentReplicas return setCurrentReplicas(ctx, r, r.pandaCluster, *r.pandaCluster.Spec.Replicas, r.logger) } @@ -183,6 +201,21 @@ func (r *StatefulSetResource) getAdminAPIClient( return adminutils.NewInternalAdminAPI(ctx, r, r.pandaCluster, r.serviceFQDN, r.adminTLSConfigProvider, ordinals...) } +func (r *StatefulSetResource) isClusterFormed( + ctx context.Context, +) (bool, error) { + rootNodeAdminAPI, err := r.getAdminAPIClient(ctx, 0) + if err != nil { + return false, err + } + brokers, err := rootNodeAdminAPI.Brokers(ctx) + if err != nil { + // Eat the error and return that the cluster is not formed + return false, nil + } + return len(brokers) > 0, nil +} + // verifyRunningCount checks if the statefulset is configured to run the given amount of replicas and that also pods match the expectations func (r *StatefulSetResource) verifyRunningCount( ctx context.Context, replicas int32,