Skip to content

Commit

Permalink
operator: implement progressive initialization to let node 0 create i…
Browse files Browse the repository at this point in the history
…nitial raft group

This tries to solve the problem with empty seed_servers on node 0. With this change, all fresh clusters will be initially set to 1 replica (via `status.currentReplicas`), until a cluster is created and the operator can verify it via admin API. Then the cluster is scaled to the number of instances desired by the user.

After the cluster is initialized, and for the entire lifetime of the cluster, the `seed_servers` property will be populated with the full list of available servers, in every node of the cluster.

This overcomes redpanda-data#333. Previously, node 0 was always forced to have an empty seed_servers property, but this caused problems when it lost the data dir, as it tried to create a brand-new cluster. With this change, even if node 0 loses the data dir, the seed_servers property will always point to other nodes, so it will try to join the existing cluster.
  • Loading branch information
nicolaferraro committed Jun 16, 2022
1 parent 4d74d4f commit 5d62c76
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 18 deletions.
24 changes: 24 additions & 0 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,30 @@ func (s *ClusterStatus) SetRestarting(restarting bool) {
s.DeprecatedUpgrading = restarting
}

// GetCurrentReplicas returns the current number of replicas that the controller wants to run.
// It returns 1 when not initialized (as fresh clusters start from 1 replica)
func (r *Cluster) GetCurrentReplicas() int32 {
if r.Status.CurrentReplicas <= 0 {
// Not initialized, let's give the computed value
return r.ComputeInitialCurrentReplicasField()
}
return r.Status.CurrentReplicas
}

// ComputeInitialCurrentReplicasField calculates the initial value for status.currentReplicas.
//
// It needs to consider the following cases:
// - Fresh cluster: we start from 1 replicas, then upscale if needed (initialization to bypass https://github.com/redpanda-data/redpanda/issues/333)
// - Existing clusters: we keep spec.replicas as starting point
func (r *Cluster) ComputeInitialCurrentReplicasField() int32 {
if r.Status.Replicas > 1 || r.Status.ReadyReplicas > 1 || len(r.Status.Nodes.Internal) > 1 {
// A cluster seems to be already running, we start from the existing amount of replicas
return *r.Spec.Replicas
}
// Clusters start from a single replica, then upscale
return 1
}

// TLSConfig is a generic TLS configuration
type TLSConfig struct {
Enabled bool `json:"enabled,omitempty"`
Expand Down
17 changes: 17 additions & 0 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/utils/pointer"
)

// nolint:funlen // this is ok for a test
Expand Down Expand Up @@ -216,3 +217,19 @@ func TestConditions(t *testing.T) {
assert.Equal(t, condTime, cond2.LastTransitionTime)
})
}

func TestInitialReplicas(t *testing.T) {
cluster := v1alpha1.Cluster{}
cluster.Spec.Replicas = pointer.Int32(3)
assert.Equal(t, int32(1), cluster.GetCurrentReplicas())
cluster.Status.Replicas = 2
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.Replicas = 0
cluster.Status.ReadyReplicas = 2
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.ReadyReplicas = 0
cluster.Status.Nodes.Internal = []string{"1", "2"}
assert.Equal(t, int32(3), cluster.GetCurrentReplicas())
cluster.Status.Nodes.Internal = nil
assert.Equal(t, int32(1), cluster.GetCurrentReplicas())
}
8 changes: 5 additions & 3 deletions src/go/k8s/cmd/configurator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,11 @@ func main() {

cfg.Redpanda.ID = int(hostIndex)

// First Redpanda node need to have cleared seed servers in order
// to form raft group 0
if hostIndex == 0 {
// In case of a single seed server, the list should contain the current node itself.
// Normally the cluster is able to recognize it's talking to itself, except when the cluster is
// configured to use mutual TLS on the Kafka API (see Helm test).
// So, we clear the list of seeds to help Redpanda.
if len(cfg.Redpanda.SeedServers) == 1 {
cfg.Redpanda.SeedServers = []config.SeedServer{}
}

Expand Down
7 changes: 2 additions & 5 deletions src/go/k8s/pkg/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,13 @@ func NewInternalAdminAPI(

if len(ordinals) == 0 {
// Not a specific node, just go through all them
replicas := redpandaCluster.Status.CurrentReplicas
if replicas <= 0 {
replicas = *redpandaCluster.Spec.Replicas
}
replicas := redpandaCluster.GetCurrentReplicas()

for i := int32(0); i < replicas; i++ {
ordinals = append(ordinals, i)
}
}
var urls []string
urls := make([]string, 0, len(ordinals))
for _, on := range ordinals {
urls = append(urls, fmt.Sprintf("%s-%d.%s:%d", redpandaCluster.Name, on, fqdn, adminInternalPort))
}
Expand Down
6 changes: 3 additions & 3 deletions src/go/k8s/pkg/resources/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (r *ConfigMapResource) CreateConfiguration(

cfg.SetAdditionalRedpandaProperty("log_segment_size", logSegmentSize)

replicas := *r.pandaCluster.Spec.Replicas
replicas := r.pandaCluster.GetCurrentReplicas()
for i := int32(0); i < replicas; i++ {
cr.SeedServers = append(cr.SeedServers, config.SeedServer{
Host: config.SocketAddress{
Expand Down Expand Up @@ -454,7 +454,7 @@ func (r *ConfigMapResource) preparePandaproxyClient(
return nil
}

replicas := *r.pandaCluster.Spec.Replicas
replicas := r.pandaCluster.GetCurrentReplicas()
cfg.NodeConfiguration.PandaproxyClient = &config.KafkaClient{}
for i := int32(0); i < replicas; i++ {
cfg.NodeConfiguration.PandaproxyClient.Brokers = append(cfg.NodeConfiguration.PandaproxyClient.Brokers, config.SocketAddress{
Expand Down Expand Up @@ -493,7 +493,7 @@ func (r *ConfigMapResource) prepareSchemaRegistryClient(
return nil
}

replicas := *r.pandaCluster.Spec.Replicas
replicas := r.pandaCluster.GetCurrentReplicas()
cfg.NodeConfiguration.SchemaRegistryClient = &config.KafkaClient{}
for i := int32(0); i < replicas; i++ {
cfg.NodeConfiguration.SchemaRegistryClient.Brokers = append(cfg.NodeConfiguration.SchemaRegistryClient.Brokers, config.SocketAddress{
Expand Down
6 changes: 1 addition & 5 deletions src/go/k8s/pkg/resources/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,7 @@ func (r *StatefulSetResource) obj(
tlsVolumes, tlsVolumeMounts := r.volumeProvider.Volumes()

// We set statefulset replicas via status.currentReplicas in order to control it from the handleScaling function
replicas := r.pandaCluster.Status.CurrentReplicas
if replicas <= 0 {
// Until the state is initialized
replicas = *r.pandaCluster.Spec.Replicas
}
replicas := r.pandaCluster.GetCurrentReplicas()

ss := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Expand Down
37 changes: 35 additions & 2 deletions src/go/k8s/pkg/resources/statefulset_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
)

// handleScaling is called to detect cases of replicas change and apply them to the cluster
// nolint:nestif // for clarity
func (r *StatefulSetResource) handleScaling(ctx context.Context) error {
if r.pandaCluster.Status.DecommissioningNode != nil {
decommissionTargetReplicas := *r.pandaCluster.Status.DecommissioningNode
Expand All @@ -40,8 +41,8 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error {
}

if r.pandaCluster.Status.CurrentReplicas == 0 {
// Initialize the status currentReplicas
r.pandaCluster.Status.CurrentReplicas = *r.pandaCluster.Spec.Replicas
// Initialize the currentReplicas field, so that it can be later controlled
r.pandaCluster.Status.CurrentReplicas = r.pandaCluster.ComputeInitialCurrentReplicasField()
return r.Status().Update(ctx, r.pandaCluster)
}

Expand All @@ -52,6 +53,23 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error {

if *r.pandaCluster.Spec.Replicas > r.pandaCluster.Status.CurrentReplicas {
r.logger.Info("Upscaling cluster", "replicas", *r.pandaCluster.Spec.Replicas)

// We care about upscaling only when the cluster is moving off 1 replica, which happen e.g. at cluster startup
if r.pandaCluster.Status.CurrentReplicas == 1 {
r.logger.Info("Waiting for first node to form a cluster before upscaling")
formed, err := r.isClusterFormed(ctx)
if err != nil {
return err
}
if !formed {
return &RequeueAfterError{
RequeueAfter: DecommissionRequeueDuration,
Msg: fmt.Sprintf("Waiting for cluster to be formed before upscaling to %d replicas", *r.pandaCluster.Spec.Replicas),
}
}
r.logger.Info("Initial cluster has been formed")
}

// Upscaling request: this is already handled by Redpanda, so we just increase status currentReplicas
return setCurrentReplicas(ctx, r, r.pandaCluster, *r.pandaCluster.Spec.Replicas, r.logger)
}
Expand Down Expand Up @@ -183,6 +201,21 @@ func (r *StatefulSetResource) getAdminAPIClient(
return adminutils.NewInternalAdminAPI(ctx, r, r.pandaCluster, r.serviceFQDN, r.adminTLSConfigProvider, ordinals...)
}

func (r *StatefulSetResource) isClusterFormed(
ctx context.Context,
) (bool, error) {
rootNodeAdminAPI, err := r.getAdminAPIClient(ctx, 0)
if err != nil {
return false, err
}
brokers, err := rootNodeAdminAPI.Brokers(ctx)
if err != nil {
// Eat the error and return that the cluster is not formed
return false, nil
}
return len(brokers) > 0, nil
}

// verifyRunningCount checks if the statefulset is configured to run the given amount of replicas and that also pods match the expectations
func (r *StatefulSetResource) verifyRunningCount(
ctx context.Context, replicas int32,
Expand Down

0 comments on commit 5d62c76

Please sign in to comment.