Skip to content

Commit

Permalink
Merge pull request #7594 from RafalKorepta/rk/gh-3023/check-under-rep…
Browse files Browse the repository at this point in the history
…licated-partitions-in-upgrade-procedure
  • Loading branch information
RafalKorepta committed Jan 6, 2023
2 parents 54bccf3 + f40b53c commit 5edf3ad
Show file tree
Hide file tree
Showing 43 changed files with 5,465 additions and 367 deletions.
20 changes: 20 additions & 0 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,26 @@ type ClusterSpec struct {
type RestartConfig struct {
// DisableMaintenanceModeHooks deactivates the preStop and postStart hooks that force nodes to enter maintenance mode when stopping and exit maintenance mode when up again
DisableMaintenanceModeHooks *bool `json:"disableMaintenanceModeHooks,omitempty"`

// UnderReplicatedPartitionThreshold controls when rolling update will continue with
// restarts. The procedure can be described as follows:
//
// 1. Rolling update checks if Pod specification needs to be replaced and deletes it
// 2. Deleted Redpanda Pod is put into maintenance mode (postStart hook will disable
// maintenance mode when new Pod starts)
// 3. Rolling update waits for Pod to be in Ready state
// 4. Rolling update checks if cluster is in healthy state
// 5. Rolling update checks if restarted Redpanda Pod admin API Ready endpoint returns HTTP 200 response
// 6. Using UnderReplicatedPartitionThreshold each under replicated partition metric is compared with the threshold
// 7. Rolling update moves to the next Redpanda pod
//
// The metric `vectorized_cluster_partition_under_replicated_replicas` is used in the comparison
//
// Mentioned metrics has the following help description:
// `vectorized_cluster_partition_under_replicated_replicas` Number of under replicated replicas
//
// By default, the UnderReplicatedPartitionThreshold will be 0, which means all partitions needs to catch up without any lag.
UnderReplicatedPartitionThreshold int `json:"underReplicatedPartitionThreshold,omitempty"`
}

// PDBConfig specifies how the PodDisruptionBudget should be created for the
Expand Down
9 changes: 8 additions & 1 deletion src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ func (r *Cluster) Default() {
r.Spec.Configuration.KafkaAPI[i].AuthenticationMethod = noneAuthorizationMechanism
}
}

if r.Spec.RestartConfig == nil {
r.Spec.RestartConfig = &RestartConfig{
DisableMaintenanceModeHooks: nil,
UnderReplicatedPartitionThreshold: 0,
}
}
}

var defaultAdditionalConfiguration = map[string]int{
Expand All @@ -148,7 +155,7 @@ var defaultAdditionalConfiguration = map[string]int{
// setDefaultAdditionalConfiguration sets additional configuration fields based
// on the best practices
func (r *Cluster) setDefaultAdditionalConfiguration() {
if *r.Spec.Replicas >= minimumReplicas {
if r.Spec.Replicas != nil && *r.Spec.Replicas >= minimumReplicas {
if r.Spec.AdditionalConfiguration == nil {
r.Spec.AdditionalConfiguration = make(map[string]string)
}
Expand Down
13 changes: 13 additions & 0 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,19 @@ func TestDefault(t *testing.T) {
redpandaCluster.Default()
assert.Equal(t, v1alpha1.DefaultLicenseSecretKey, redpandaCluster.Spec.LicenseRef.Key)
})

t.Run("when restart config is nil, set UnderReplicatedPartitionThreshold to 0", func(t *testing.T) {
redpandaCluster := &v1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "",
},
Spec: v1alpha1.ClusterSpec{},
}
redpandaCluster.Default()
assert.NotNil(t, redpandaCluster.Spec.RestartConfig)
assert.Equal(t, 0, redpandaCluster.Spec.RestartConfig.UnderReplicatedPartitionThreshold)
})
}

func TestValidateUpdate(t *testing.T) {
Expand Down
20 changes: 20 additions & 0 deletions src/go/k8s/config/crd/bases/redpanda.vectorized.io_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,26 @@ spec:
and postStart hooks that force nodes to enter maintenance mode
when stopping and exit maintenance mode when up again
type: boolean
underReplicatedPartitionThreshold:
description: "UnderReplicatedPartitionThreshold regulate when
rolling update will continue with restarts. The procedure can
be described as follows: \n 1. Rolling update checks if Pod
specification needs to be replaced and deletes it 2. Deleted
Redpanda Pod is put into maintenance mode (postStart hook will
disable maintenance mode when new Pod starts) 3. Rolling
update waits for Pod to be in Ready state 4. Rolling update
checks if cluster is in healthy state 5. Rolling update checks
if restarted Redpanda Pod admin API Ready endpoint returns HTTP
200 response 6. Using UnderReplicatedPartitionThreshold each
under replicated partition metric is compared with the threshold
7. Rolling update moves to the next Redpanda pod \n The metric
`vectorized_cluster_partition_under_replicated_replicas` is
used in the comparison \n Mentioned metrics has the following
help description: `vectorized_cluster_partition_under_replicated_replicas`
Number of under replicated replicas \n By default, the UnderReplicatedPartitionThreshold
will be 0, which means all partitions needs to catch up without
any lag."
type: integer
type: object
sidecars:
description: Sidecars is list of sidecars run alongside redpanda container
Expand Down
96 changes: 94 additions & 2 deletions src/go/k8s/pkg/resources/statefulset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/banzaicloud/k8s-objectmatcher/patch"
"github.com/prometheus/common/expfmt"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils"
Expand All @@ -35,10 +36,13 @@ const (
// RequeueDuration is the time controller should
// requeue resource reconciliation.
RequeueDuration = time.Second * 10
adminAPITimeout = time.Millisecond * 100
adminAPITimeout = time.Second * 2
)

var errRedpandaNotReady = errors.New("redpanda not ready")
var (
errRedpandaNotReady = errors.New("redpanda not ready")
errUnderReplicatedPartition = errors.New("partition under replicated")
)

// runUpdate handles image changes and additional storage in the redpanda cluster
// CR by removing statefulset with orphans Pods. The stateful set is then recreated
Expand Down Expand Up @@ -184,6 +188,15 @@ func (r *StatefulSetResource) rollingUpdate(
if err = r.queryRedpandaStatus(ctx, &adminURL); err != nil {
return fmt.Errorf("unable to query Redpanda ready status: %w", err)
}

adminURL.Path = "metrics"

if err = r.evaluateUnderReplicatedPartitions(ctx, &adminURL); err != nil {
return &RequeueAfterError{
RequeueAfter: RequeueDuration,
Msg: fmt.Sprintf("broker reported under replicated partitions: %v", err),
}
}
}

return nil
Expand Down Expand Up @@ -512,6 +525,85 @@ func (r *StatefulSetResource) queryRedpandaStatus(
return nil
}

// Temporarily using the status/ready endpoint until we have a specific one for restarting.
func (r *StatefulSetResource) evaluateUnderReplicatedPartitions(
ctx context.Context, adminURL *url.URL,
) error {
client := &http.Client{Timeout: adminAPITimeout}

// TODO right now we support TLS only on one listener so if external
// connectivity is enabled, TLS is enabled only on external listener. This
// will be fixed by https://github.com/redpanda-data/redpanda/issues/1084
if r.pandaCluster.AdminAPITLS() != nil &&
r.pandaCluster.AdminAPIExternal() == nil {
tlsConfig, err := r.adminTLSConfigProvider.GetTLSConfig(ctx, r)
if err != nil {
return err
}

client.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}
adminURL.Scheme = "https"
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, adminURL.String(), http.NoBody)
if err != nil {
return err
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("getting broker metrics (%s): %w", adminURL.String(), errRedpandaNotReady)
}

var parser expfmt.TextParser
metrics, err := parser.TextToMetricFamilies(resp.Body)
if err != nil {
return err
}

for name, metricFamily := range metrics {
if name != "vectorized_cluster_partition_under_replicated_replicas" {
continue
}

for _, m := range metricFamily.Metric {
if m == nil {
continue
}
if m.Gauge == nil {
r.logger.Info("cluster_partition_under_replicated_replicas metric does not have value", "labels", m.Label)
continue
}

var namespace, partition, shard, topic string
for _, l := range m.Label {
switch *l.Name {
case "namespace":
namespace = *l.Value
case "partition":
partition = *l.Value
case "shard":
shard = *l.Value
case "topic":
topic = *l.Value
}
}

if r.pandaCluster.Spec.RestartConfig != nil && *m.Gauge.Value > float64(r.pandaCluster.Spec.RestartConfig.UnderReplicatedPartitionThreshold) {
return fmt.Errorf("in topic (%s), partition (%s), shard (%s), namespace (%s): %w", topic, partition, shard, namespace, errUnderReplicatedPartition)
}
}
}

return nil
}

// RequeueAfterError error carrying the time after which to requeue.
type RequeueAfterError struct {
RequeueAfter time.Duration
Expand Down
110 changes: 110 additions & 0 deletions src/go/k8s/pkg/resources/statefulset_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ package resources //nolint:testpackage // needed to test private method

import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"

redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
Expand Down Expand Up @@ -147,3 +153,107 @@ func TestPutInMaintenanceMode(t *testing.T) {
})
}
}

func TestEvaluateRedpandaUnderReplicatedPartition(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
f, err := os.Open("testdata/metrics.golden.txt")
require.NoError(t, err)

_, err = io.Copy(w, f)
require.NoError(t, err)
}))
defer ts.Close()

ssres := StatefulSetResource{pandaCluster: &redpandav1alpha1.Cluster{
Spec: redpandav1alpha1.ClusterSpec{
RestartConfig: &redpandav1alpha1.RestartConfig{},
},
}}

adminURL := url.URL{
Scheme: "http",
Host: ts.Listener.Addr().String(),
Path: "metrics",
}

err := ssres.evaluateUnderReplicatedPartitions(context.Background(), &adminURL)
require.NoError(t, err)
}

func TestEvaluateAboveThresholdRedpandaUnderReplicatedPartition(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, `
# HELP vectorized_cluster_partition_under_replicated_replicas Number of under replicated replicas
# TYPE vectorized_cluster_partition_under_replicated_replicas gauge
vectorized_cluster_partition_under_replicated_replicas{namespace="kafka",partition="0",shard="0",topic="test"} 1.000000
`)
}))
defer ts.Close()

ssres := StatefulSetResource{pandaCluster: &redpandav1alpha1.Cluster{
Spec: redpandav1alpha1.ClusterSpec{
RestartConfig: &redpandav1alpha1.RestartConfig{},
},
}}

adminURL := url.URL{
Scheme: "http",
Host: ts.Listener.Addr().String(),
Path: "metrics",
}

err := ssres.evaluateUnderReplicatedPartitions(context.Background(), &adminURL)
require.Error(t, err)
}

func TestEvaluateEqualThresholdInUnderReplicatedPartition(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, `
# HELP vectorized_cluster_partition_under_replicated_replicas Number of under replicated replicas
# TYPE vectorized_cluster_partition_under_replicated_replicas gauge
vectorized_cluster_partition_under_replicated_replicas{namespace="kafka",partition="0",shard="0",topic="test"} 1.000000
`)
}))
defer ts.Close()

ssres := StatefulSetResource{pandaCluster: &redpandav1alpha1.Cluster{
Spec: redpandav1alpha1.ClusterSpec{
RestartConfig: &redpandav1alpha1.RestartConfig{
UnderReplicatedPartitionThreshold: 1,
},
},
}}

adminURL := url.URL{
Scheme: "http",
Host: ts.Listener.Addr().String(),
Path: "metrics",
}

err := ssres.evaluateUnderReplicatedPartitions(context.Background(), &adminURL)
require.NoError(t, err)
}

func TestEvaluateWithoutRestartConfigInUnderReplicatedPartition(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, `
# HELP vectorized_cluster_partition_under_replicated_replicas Number of under replicated replicas
# TYPE vectorized_cluster_partition_under_replicated_replicas gauge
vectorized_cluster_partition_under_replicated_replicas{namespace="kafka",partition="0",shard="0",topic="test"} 1.000000
`)
}))
defer ts.Close()

ssres := StatefulSetResource{pandaCluster: &redpandav1alpha1.Cluster{
Spec: redpandav1alpha1.ClusterSpec{},
}}

adminURL := url.URL{
Scheme: "http",
Host: ts.Listener.Addr().String(),
Path: "metrics",
}

err := ssres.evaluateUnderReplicatedPartitions(context.Background(), &adminURL)
require.NoError(t, err)
}
Loading

0 comments on commit 5edf3ad

Please sign in to comment.