Skip to content

Commit

Permalink
k8s: Wait for restarted broker to catch up
Browse files Browse the repository at this point in the history
When rolling restart is performed then the broker might need to catch up with
other replica. This change will parse /metrics endpoint to see if
`vectorized_cluster_partition_under_replicated_replicas` is under threshold.
  • Loading branch information
Rafal Korepta committed Jan 2, 2023
1 parent a6a021b commit c5bb116
Show file tree
Hide file tree
Showing 4 changed files with 4,104 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/go/k8s/pkg/resources/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type StatefulSetResource struct {
logger logr.Logger

LastObservedState *appsv1.StatefulSet
threshold float64
}

// NewStatefulSet creates StatefulSetResource
Expand Down Expand Up @@ -139,6 +140,7 @@ func NewStatefulSet(
decommissionWaitInterval,
logger.WithValues("Kind", statefulSetKind()),
nil,
0,
}
}

Expand Down
94 changes: 93 additions & 1 deletion src/go/k8s/pkg/resources/statefulset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/banzaicloud/k8s-objectmatcher/patch"
"github.com/prometheus/common/expfmt"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils"
Expand All @@ -38,7 +39,10 @@ const (
adminAPITimeout = time.Millisecond * 100
)

var errRedpandaNotReady = errors.New("redpanda not ready")
var (
errRedpandaNotReady = errors.New("redpanda not ready")
errUnderReplicatedPartition = errors.New("partition under replicated")
)

// runUpdate handles image changes and additional storage in the redpanda cluster
// CR by removing statefulset with orphans Pods. The stateful set is then recreated
Expand Down Expand Up @@ -202,6 +206,15 @@ func (r *StatefulSetResource) rollingUpdate(
if err = r.queryRedpandaStatus(ctx, &adminURL); err != nil {
return fmt.Errorf("unable to query Redpanda ready status: %w", err)
}

adminURL.Path = "metrics"

if err = r.queryRedpandaUnderReplicatedPartition(ctx, &adminURL); err != nil {
return &RequeueAfterError{
RequeueAfter: RequeueDuration,
Msg: fmt.Sprintf("broker reported under replicated partitions: %v", err),
}
}
}

return nil
Expand Down Expand Up @@ -447,6 +460,85 @@ func (r *StatefulSetResource) queryRedpandaStatus(
return nil
}

// Temporarily using the status/ready endpoint until we have a specific one for restarting.
func (r *StatefulSetResource) queryRedpandaUnderReplicatedPartition(
ctx context.Context, adminURL *url.URL,
) error {
client := &http.Client{Timeout: adminAPITimeout}

// TODO right now we support TLS only on one listener so if external
// connectivity is enabled, TLS is enabled only on external listener. This
// will be fixed by https://github.com/redpanda-data/redpanda/issues/1084
if r.pandaCluster.AdminAPITLS() != nil &&
r.pandaCluster.AdminAPIExternal() == nil {
tlsConfig, err := r.adminTLSConfigProvider.GetTLSConfig(ctx, r)
if err != nil {
return err
}

client.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}
adminURL.Scheme = "https"
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, adminURL.String(), http.NoBody)
if err != nil {
return err
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("getting broker metrics (%s): %w", adminURL.String(), errRedpandaNotReady)
}

var parser expfmt.TextParser
metrics, err := parser.TextToMetricFamilies(resp.Body)
if err != nil {
return err
}

for name, metricFamily := range metrics {
if name != "vectorized_cluster_partition_under_replicated_replicas" {
continue
}

for _, m := range metricFamily.Metric {
if m == nil {
continue
}
if m.Gauge == nil {
r.logger.Info("cluster_partition_under_replicated_replicas metric does not have value", "labels", m.Label)
continue
}

var namespace, partition, shard, topic string
for _, l := range m.Label {
switch *l.Name {
case "namespace":
namespace = *l.Value
case "partition":
partition = *l.Value
case "shard":
shard = *l.Value
case "topic":
topic = *l.Value
}
}

if *m.Gauge.Value > r.threshold {
return fmt.Errorf("in topic (%s), partition (%s), shard (%s), namespace (%s): %w", topic, partition, shard, namespace, errUnderReplicatedPartition)
}
}
}

return nil
}

// RequeueAfterError error carrying the time after which to requeue.
type RequeueAfterError struct {
RequeueAfter time.Duration
Expand Down
52 changes: 52 additions & 0 deletions src/go/k8s/pkg/resources/statefulset_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,16 @@
package resources //nolint:testpackage // needed to test private method

import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"

redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -65,3 +73,47 @@ func TestShouldUpdate_AnnotationChange(t *testing.T) {
require.NoError(t, err)
require.False(t, update)
}

func TestQueryRedpandaUnderReplicatedPartition(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
f, err := os.Open("testdata/metrics.gold.txt")
require.NoError(t, err)

_, err = io.Copy(w, f)
require.NoError(t, err)
}))
defer ts.Close()

ssres := StatefulSetResource{pandaCluster: &redpandav1alpha1.Cluster{}}

adminURL := url.URL{
Scheme: "http",
Host: ts.Listener.Addr().String(),
Path: "metrics",
}

err := ssres.queryRedpandaUnderReplicatedPartition(context.Background(), &adminURL)
require.NoError(t, err)
}

func TestFailedQueryRedpandaUnderReplicatedPartition(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, `
# HELP vectorized_cluster_partition_under_replicated_replicas Number of under replicated replicas
# TYPE vectorized_cluster_partition_under_replicated_replicas gauge
vectorized_cluster_partition_under_replicated_replicas{namespace="kafka",partition="0",shard="0",topic="test"} 1.000000
`)
}))
defer ts.Close()

ssres := StatefulSetResource{pandaCluster: &redpandav1alpha1.Cluster{}}

adminURL := url.URL{
Scheme: "http",
Host: ts.Listener.Addr().String(),
Path: "metrics",
}

err := ssres.queryRedpandaUnderReplicatedPartition(context.Background(), &adminURL)
require.Error(t, err)
}
Loading

0 comments on commit c5bb116

Please sign in to comment.