Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

k8s: Wait for restarted broker to catch up #7594

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,26 @@ type ClusterSpec struct {
type RestartConfig struct {
// DisableMaintenanceModeHooks deactivates the preStop and postStart hooks that force nodes to enter maintenance mode when stopping and exit maintenance mode when up again
DisableMaintenanceModeHooks *bool `json:"disableMaintenanceModeHooks,omitempty"`

// UnderReplicatedPartitionThreshold controls when rolling update will continue with
// restarts. The procedure can be described as follows:
//
// 1. Rolling update checks if Pod specification needs to be replaced and deletes it
// 2. Deleted Redpanda Pod is put into maintenance mode (postStart hook will disable
// maintenance mode when new Pod starts)
// 3. Rolling update waits for Pod to be in Ready state
// 4. Rolling update checks if cluster is in healthy state
// 5. Rolling update checks if restarted Redpanda Pod admin API Ready endpoint returns HTTP 200 response
// 6. Using UnderReplicatedPartitionThreshold each under replicated partition metric is compared with the threshold
// 7. Rolling update moves to the next Redpanda pod
//
// The metric `vectorized_cluster_partition_under_replicated_replicas` is used in the comparison
//
// Mentioned metrics has the following help description:
// `vectorized_cluster_partition_under_replicated_replicas` Number of under replicated replicas
//
// By default, the UnderReplicatedPartitionThreshold will be 0, which means all partitions needs to catch up without any lag.
UnderReplicatedPartitionThreshold int `json:"underReplicatedPartitionThreshold,omitempty"`
}

// PDBConfig specifies how the PodDisruptionBudget should be created for the
Expand Down
9 changes: 8 additions & 1 deletion src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ func (r *Cluster) Default() {
r.Spec.Configuration.KafkaAPI[i].AuthenticationMethod = noneAuthorizationMechanism
}
}

if r.Spec.RestartConfig == nil {
r.Spec.RestartConfig = &RestartConfig{
DisableMaintenanceModeHooks: nil,
UnderReplicatedPartitionThreshold: 0,
}
}
}

var defaultAdditionalConfiguration = map[string]int{
Expand All @@ -148,7 +155,7 @@ var defaultAdditionalConfiguration = map[string]int{
// setDefaultAdditionalConfiguration sets additional configuration fields based
// on the best practices
func (r *Cluster) setDefaultAdditionalConfiguration() {
if *r.Spec.Replicas >= minimumReplicas {
if r.Spec.Replicas != nil && *r.Spec.Replicas >= minimumReplicas {
if r.Spec.AdditionalConfiguration == nil {
r.Spec.AdditionalConfiguration = make(map[string]string)
}
Expand Down
13 changes: 13 additions & 0 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,19 @@ func TestDefault(t *testing.T) {
redpandaCluster.Default()
assert.Equal(t, v1alpha1.DefaultLicenseSecretKey, redpandaCluster.Spec.LicenseRef.Key)
})

t.Run("when restart config is nil, set UnderReplicatedPartitionThreshold to 0", func(t *testing.T) {
redpandaCluster := &v1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "",
},
Spec: v1alpha1.ClusterSpec{},
}
redpandaCluster.Default()
assert.NotNil(t, redpandaCluster.Spec.RestartConfig)
assert.Equal(t, 0, redpandaCluster.Spec.RestartConfig.UnderReplicatedPartitionThreshold)
})
}

func TestValidateUpdate(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,26 @@ spec:
and postStart hooks that force nodes to enter maintenance mode
when stopping and exit maintenance mode when up again
type: boolean
underReplicatedPartitionThreshold:
description: "UnderReplicatedPartitionThreshold regulate when
rolling update will continue with restarts. The procedure can
be described as follows: \n 1. Rolling update checks if Pod
specification needs to be replaced and deletes it 2. Deleted
Redpanda Pod is put into maintenance mode (postStart hook will
disable maintenance mode when new Pod starts) 3. Rolling
update waits for Pod to be in Ready state 4. Rolling update
checks if cluster is in healthy state 5. Rolling update checks
if restarted Redpanda Pod admin API Ready endpoint returns HTTP
200 response 6. Using UnderReplicatedPartitionThreshold each
under replicated partition metric is compared with the threshold
7. Rolling update moves to the next Redpanda pod \n The metric
`vectorized_cluster_partition_under_replicated_replicas` is
used in the comparison \n Mentioned metrics has the following
help description: `vectorized_cluster_partition_under_replicated_replicas`
Number of under replicated replicas \n By default, the UnderReplicatedPartitionThreshold
will be 0, which means all partitions needs to catch up without
any lag."
type: integer
type: object
sidecars:
description: Sidecars is list of sidecars run alongside redpanda container
Expand Down
96 changes: 94 additions & 2 deletions src/go/k8s/pkg/resources/statefulset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/banzaicloud/k8s-objectmatcher/patch"
"github.com/prometheus/common/expfmt"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils"
Expand All @@ -35,10 +36,13 @@ const (
// RequeueDuration is the time controller should
// requeue resource reconciliation.
RequeueDuration = time.Second * 10
adminAPITimeout = time.Millisecond * 100
adminAPITimeout = time.Second * 2
)

var errRedpandaNotReady = errors.New("redpanda not ready")
var (
errRedpandaNotReady = errors.New("redpanda not ready")
errUnderReplicatedPartition = errors.New("partition under replicated")
)

// runUpdate handles image changes and additional storage in the redpanda cluster
// CR by removing statefulset with orphans Pods. The stateful set is then recreated
Expand Down Expand Up @@ -184,6 +188,15 @@ func (r *StatefulSetResource) rollingUpdate(
if err = r.queryRedpandaStatus(ctx, &adminURL); err != nil {
return fmt.Errorf("unable to query Redpanda ready status: %w", err)
}

adminURL.Path = "metrics"

if err = r.evaluateUnderReplicatedPartitions(ctx, &adminURL); err != nil {
return &RequeueAfterError{
RequeueAfter: RequeueDuration,
Msg: fmt.Sprintf("broker reported under replicated partitions: %v", err),
}
}
}

return nil
Expand Down Expand Up @@ -512,6 +525,85 @@ func (r *StatefulSetResource) queryRedpandaStatus(
return nil
}

// Temporarily using the status/ready endpoint until we have a specific one for restarting.
func (r *StatefulSetResource) evaluateUnderReplicatedPartitions(
ctx context.Context, adminURL *url.URL,
) error {
client := &http.Client{Timeout: adminAPITimeout}

// TODO right now we support TLS only on one listener so if external
// connectivity is enabled, TLS is enabled only on external listener. This
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is true, tls could be enabled on internal listener as well. I think the behavior you're describing is true only for schema registry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be brutally honest I copied it from your previous implementation

// TODO right now we support TLS only on one listener so if external
// connectivity is enabled, TLS is enabled only on external listener. This
// will be fixed by https://github.com/redpanda-data/redpanda/issues/1084

6235e96#diff-5619b1cd9bdb790f53d380d84b884e952da245749cb78145fdf8f6b328b1be2cR182-R184

// will be fixed by https://github.com/redpanda-data/redpanda/issues/1084
if r.pandaCluster.AdminAPITLS() != nil &&
r.pandaCluster.AdminAPIExternal() == nil {
tlsConfig, err := r.adminTLSConfigProvider.GetTLSConfig(ctx, r)
if err != nil {
return err
}

client.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}
adminURL.Scheme = "https"
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, adminURL.String(), http.NoBody)
if err != nil {
return err
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("getting broker metrics (%s): %w", adminURL.String(), errRedpandaNotReady)
}

var parser expfmt.TextParser
metrics, err := parser.TextToMetricFamilies(resp.Body)
if err != nil {
return err
}

for name, metricFamily := range metrics {
if name != "vectorized_cluster_partition_under_replicated_replicas" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is per node metric? Don't we have to look for metric for this particular node?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will ask core how to discover under replicated replicas

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I forget how I implemented this function. We query current POD. You can see that headless service with exact pod name is used

Host: fmt.Sprintf("%s.%s", pod.Name, headlessServiceWithPort),

That means metrics comes from one POD.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see, thanks

continue
}

for _, m := range metricFamily.Metric {
if m == nil {
continue
}
if m.Gauge == nil {
r.logger.Info("cluster_partition_under_replicated_replicas metric does not have value", "labels", m.Label)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continue missing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx added

continue
}

var namespace, partition, shard, topic string
for _, l := range m.Label {
switch *l.Name {
case "namespace":
namespace = *l.Value
case "partition":
partition = *l.Value
case "shard":
shard = *l.Value
case "topic":
topic = *l.Value
}
}

if r.pandaCluster.Spec.RestartConfig != nil && *m.Gauge.Value > float64(r.pandaCluster.Spec.RestartConfig.UnderReplicatedPartitionThreshold) {
return fmt.Errorf("in topic (%s), partition (%s), shard (%s), namespace (%s): %w", topic, partition, shard, namespace, errUnderReplicatedPartition)
}
}
}

return nil
}

// RequeueAfterError error carrying the time after which to requeue.
type RequeueAfterError struct {
RequeueAfter time.Duration
Expand Down
110 changes: 110 additions & 0 deletions src/go/k8s/pkg/resources/statefulset_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ package resources //nolint:testpackage // needed to test private method

import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"

redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
Expand Down Expand Up @@ -147,3 +153,107 @@ func TestPutInMaintenanceMode(t *testing.T) {
})
}
}

func TestEvaluateRedpandaUnderReplicatedPartition(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
f, err := os.Open("testdata/metrics.golden.txt")
require.NoError(t, err)

_, err = io.Copy(w, f)
require.NoError(t, err)
}))
defer ts.Close()

ssres := StatefulSetResource{pandaCluster: &redpandav1alpha1.Cluster{
Spec: redpandav1alpha1.ClusterSpec{
RestartConfig: &redpandav1alpha1.RestartConfig{},
},
}}

adminURL := url.URL{
Scheme: "http",
Host: ts.Listener.Addr().String(),
Path: "metrics",
}

err := ssres.evaluateUnderReplicatedPartitions(context.Background(), &adminURL)
require.NoError(t, err)
}

func TestEvaluateAboveThresholdRedpandaUnderReplicatedPartition(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, `
# HELP vectorized_cluster_partition_under_replicated_replicas Number of under replicated replicas
# TYPE vectorized_cluster_partition_under_replicated_replicas gauge
vectorized_cluster_partition_under_replicated_replicas{namespace="kafka",partition="0",shard="0",topic="test"} 1.000000
`)
}))
defer ts.Close()

ssres := StatefulSetResource{pandaCluster: &redpandav1alpha1.Cluster{
Spec: redpandav1alpha1.ClusterSpec{
RestartConfig: &redpandav1alpha1.RestartConfig{},
},
}}

adminURL := url.URL{
Scheme: "http",
Host: ts.Listener.Addr().String(),
Path: "metrics",
}

err := ssres.evaluateUnderReplicatedPartitions(context.Background(), &adminURL)
require.Error(t, err)
}

func TestEvaluateEqualThresholdInUnderReplicatedPartition(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, `
# HELP vectorized_cluster_partition_under_replicated_replicas Number of under replicated replicas
# TYPE vectorized_cluster_partition_under_replicated_replicas gauge
vectorized_cluster_partition_under_replicated_replicas{namespace="kafka",partition="0",shard="0",topic="test"} 1.000000
`)
}))
defer ts.Close()

ssres := StatefulSetResource{pandaCluster: &redpandav1alpha1.Cluster{
Spec: redpandav1alpha1.ClusterSpec{
RestartConfig: &redpandav1alpha1.RestartConfig{
UnderReplicatedPartitionThreshold: 1,
},
},
}}

adminURL := url.URL{
Scheme: "http",
Host: ts.Listener.Addr().String(),
Path: "metrics",
}

err := ssres.evaluateUnderReplicatedPartitions(context.Background(), &adminURL)
require.NoError(t, err)
}

func TestEvaluateWithoutRestartConfigInUnderReplicatedPartition(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, `
# HELP vectorized_cluster_partition_under_replicated_replicas Number of under replicated replicas
# TYPE vectorized_cluster_partition_under_replicated_replicas gauge
vectorized_cluster_partition_under_replicated_replicas{namespace="kafka",partition="0",shard="0",topic="test"} 1.000000
`)
}))
defer ts.Close()

ssres := StatefulSetResource{pandaCluster: &redpandav1alpha1.Cluster{
Spec: redpandav1alpha1.ClusterSpec{},
}}

adminURL := url.URL{
Scheme: "http",
Host: ts.Listener.Addr().String(),
Path: "metrics",
}

err := ssres.evaluateUnderReplicatedPartitions(context.Background(), &adminURL)
require.NoError(t, err)
}
Loading