-
Notifications
You must be signed in to change notification settings - Fork 579
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
k8s: Wait for restarted broker to catch up #7594
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -22,6 +22,7 @@ import ( | |||
"time" | ||||
|
||||
"github.com/banzaicloud/k8s-objectmatcher/patch" | ||||
"github.com/prometheus/common/expfmt" | ||||
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" | ||||
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates" | ||||
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils" | ||||
|
@@ -35,10 +36,13 @@ const ( | |||
// RequeueDuration is the time controller should | ||||
// requeue resource reconciliation. | ||||
RequeueDuration = time.Second * 10 | ||||
adminAPITimeout = time.Millisecond * 100 | ||||
adminAPITimeout = time.Second * 2 | ||||
) | ||||
|
||||
var errRedpandaNotReady = errors.New("redpanda not ready") | ||||
var ( | ||||
errRedpandaNotReady = errors.New("redpanda not ready") | ||||
errUnderReplicatedPartition = errors.New("partition under replicated") | ||||
) | ||||
|
||||
// runUpdate handles image changes and additional storage in the redpanda cluster | ||||
// CR by removing statefulset with orphans Pods. The stateful set is then recreated | ||||
|
@@ -184,6 +188,15 @@ func (r *StatefulSetResource) rollingUpdate( | |||
if err = r.queryRedpandaStatus(ctx, &adminURL); err != nil { | ||||
return fmt.Errorf("unable to query Redpanda ready status: %w", err) | ||||
} | ||||
|
||||
adminURL.Path = "metrics" | ||||
|
||||
if err = r.evaluateUnderReplicatedPartitions(ctx, &adminURL); err != nil { | ||||
return &RequeueAfterError{ | ||||
RequeueAfter: RequeueDuration, | ||||
Msg: fmt.Sprintf("broker reported under replicated partitions: %v", err), | ||||
} | ||||
} | ||||
} | ||||
|
||||
return nil | ||||
|
@@ -512,6 +525,85 @@ func (r *StatefulSetResource) queryRedpandaStatus( | |||
return nil | ||||
} | ||||
|
||||
// Temporarily using the status/ready endpoint until we have a specific one for restarting. | ||||
func (r *StatefulSetResource) evaluateUnderReplicatedPartitions( | ||||
ctx context.Context, adminURL *url.URL, | ||||
) error { | ||||
client := &http.Client{Timeout: adminAPITimeout} | ||||
|
||||
// TODO right now we support TLS only on one listener so if external | ||||
// connectivity is enabled, TLS is enabled only on external listener. This | ||||
// will be fixed by https://github.com/redpanda-data/redpanda/issues/1084 | ||||
if r.pandaCluster.AdminAPITLS() != nil && | ||||
r.pandaCluster.AdminAPIExternal() == nil { | ||||
tlsConfig, err := r.adminTLSConfigProvider.GetTLSConfig(ctx, r) | ||||
if err != nil { | ||||
return err | ||||
} | ||||
|
||||
client.Transport = &http.Transport{ | ||||
TLSClientConfig: tlsConfig, | ||||
} | ||||
adminURL.Scheme = "https" | ||||
} | ||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, adminURL.String(), http.NoBody) | ||||
if err != nil { | ||||
return err | ||||
} | ||||
resp, err := client.Do(req) | ||||
if err != nil { | ||||
return err | ||||
} | ||||
defer resp.Body.Close() | ||||
|
||||
if resp.StatusCode != http.StatusOK { | ||||
return fmt.Errorf("getting broker metrics (%s): %w", adminURL.String(), errRedpandaNotReady) | ||||
} | ||||
|
||||
var parser expfmt.TextParser | ||||
metrics, err := parser.TextToMetricFamilies(resp.Body) | ||||
if err != nil { | ||||
return err | ||||
} | ||||
|
||||
for name, metricFamily := range metrics { | ||||
if name != "vectorized_cluster_partition_under_replicated_replicas" { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is per node metric? Don't we have to look for metric for this particular node? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will ask core how to discover under replicated replicas There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be honest I forget how I implemented this function. We query current POD. You can see that headless service with exact pod name is used
That means metrics comes from one POD. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh I see, thanks |
||||
continue | ||||
} | ||||
|
||||
for _, m := range metricFamily.Metric { | ||||
if m == nil { | ||||
continue | ||||
} | ||||
if m.Gauge == nil { | ||||
r.logger.Info("cluster_partition_under_replicated_replicas metric does not have value", "labels", m.Label) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thx added |
||||
continue | ||||
} | ||||
|
||||
var namespace, partition, shard, topic string | ||||
for _, l := range m.Label { | ||||
switch *l.Name { | ||||
case "namespace": | ||||
namespace = *l.Value | ||||
case "partition": | ||||
partition = *l.Value | ||||
case "shard": | ||||
shard = *l.Value | ||||
case "topic": | ||||
topic = *l.Value | ||||
} | ||||
} | ||||
|
||||
if r.pandaCluster.Spec.RestartConfig != nil && *m.Gauge.Value > float64(r.pandaCluster.Spec.RestartConfig.UnderReplicatedPartitionThreshold) { | ||||
return fmt.Errorf("in topic (%s), partition (%s), shard (%s), namespace (%s): %w", topic, partition, shard, namespace, errUnderReplicatedPartition) | ||||
} | ||||
} | ||||
} | ||||
|
||||
return nil | ||||
} | ||||
|
||||
// RequeueAfterError error carrying the time after which to requeue. | ||||
type RequeueAfterError struct { | ||||
RequeueAfter time.Duration | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is true, tls could be enabled on internal listener as well. I think the behavior you're describing is true only for schema registry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be brutally honest I copied it from your previous implementation
redpanda/src/go/k8s/pkg/resources/statefulset_update.go
Lines 417 to 419 in a6a021b
6235e96#diff-5619b1cd9bdb790f53d380d84b884e952da245749cb78145fdf8f6b328b1be2cR182-R184