diff --git a/src/go/k8s/pkg/admin/admin.go b/src/go/k8s/pkg/admin/admin.go index 4962683bcddda..92a7f5a91e71b 100644 --- a/src/go/k8s/pkg/admin/admin.go +++ b/src/go/k8s/pkg/admin/admin.go @@ -92,6 +92,7 @@ type AdminAPIClient interface { GetLicenseInfo(ctx context.Context) (admin.License, error) Brokers(ctx context.Context) ([]admin.Broker, error) + Broker(ctx context.Context, nodeID int) (admin.Broker, error) DecommissionBroker(ctx context.Context, node int) error RecommissionBroker(ctx context.Context, node int) error diff --git a/src/go/k8s/pkg/admin/mock_admin.go b/src/go/k8s/pkg/admin/mock_admin.go index 80f20e1767d2e..9404079a2daee 100644 --- a/src/go/k8s/pkg/admin/mock_admin.go +++ b/src/go/k8s/pkg/admin/mock_admin.go @@ -422,6 +422,26 @@ func (m *MockAdminAPI) SetBrokerStatus( return fmt.Errorf("unknown broker %d", id) } +func (m *MockAdminAPI) Broker(_ context.Context, nodeID int) (admin.Broker, error) { + t := true + return admin.Broker{ + NodeID: nodeID, + NumCores: 2, + MembershipStatus: "", + IsAlive: &t, + Version: "unversioned", + Maintenance: &admin.MaintenanceStatus{ + Draining: false, + Finished: true, + Errors: false, + Partitions: 0, + Eligible: 0, + Transferring: 0, + Failed: 0, + }, + }, nil +} + func makeCopy(input, output interface{}) { ser, err := json.Marshal(input) if err != nil { diff --git a/src/go/k8s/pkg/resources/statefulset_update.go b/src/go/k8s/pkg/resources/statefulset_update.go index c12467d22f323..e1c31ae6cd89e 100644 --- a/src/go/k8s/pkg/resources/statefulset_update.go +++ b/src/go/k8s/pkg/resources/statefulset_update.go @@ -18,6 +18,7 @@ import ( "net/url" "reflect" "sort" + "strconv" "strings" "time" @@ -157,31 +158,13 @@ func (r *StatefulSetResource) rollingUpdate( volumes[vol.Name] = new(interface{}) } - opts := []patch.CalculateOption{ - patch.IgnoreStatusFields(), - ignoreKubernetesTokenVolumeMounts(), - ignoreDefaultToleration(), - ignoreExistingVolumes(volumes), - } - for i := range podList.Items { pod := podList.Items[i] - patchResult, err := patch.DefaultPatchMaker.Calculate(&pod, &artificialPod, opts...) - if err != nil { + if err = r.rollingRestart(ctx, &pod, &artificialPod, volumes); err != nil { return err } - if !patchResult.IsEmpty() { - r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod", - "pod-name", pod.Name, - "patch", patchResult.Patch) - if err = r.Delete(ctx, &pod); err != nil { - return fmt.Errorf("unable to remove Redpanda pod: %w", err) - } - return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"} - } - if !utils.IsPodReady(&pod) { return &RequeueAfterError{ RequeueAfter: RequeueDuration, @@ -207,6 +190,84 @@ func (r *StatefulSetResource) rollingUpdate( return nil } +func (r *StatefulSetResource) rollingRestart(ctx context.Context, pod, artificialPod *corev1.Pod, newVolumes map[string]interface{}) error { + opts := []patch.CalculateOption{ + patch.IgnoreStatusFields(), + ignoreKubernetesTokenVolumeMounts(), + ignoreDefaultToleration(), + ignoreExistingVolumes(newVolumes), + } + + patchResult, err := patch.DefaultPatchMaker.Calculate(pod, artificialPod, opts...) + if err != nil { + return err + } + + if patchResult.IsEmpty() { + return nil + } + + var ordinal int64 + ordinal, err = strconv.ParseInt(pod.Name[len(r.pandaCluster.Name)+1:], 10, 32) + if err != nil { + return fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", r.pandaCluster.Name, pod.Name, err) + } + + if *r.pandaCluster.Spec.Replicas > 1 { + if err = r.putInMaintenanceMode(ctx, int32(ordinal)); err != nil { + // As maintenance mode can not be easily watched using controller runtime the requeue error + // is always returned. That way a rolling update will not finish when operator waits for + // maintenance mode finished. + return &RequeueAfterError{ + RequeueAfter: RequeueDuration, + Msg: fmt.Sprintf("putting node (%s) into maintenance mode: %v", pod.Name, err), + } + } + } + + r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod", + "pod-name", pod.Name, + "patch", patchResult.Patch) + + if err = r.Delete(ctx, pod); err != nil { + return fmt.Errorf("unable to remove Redpanda pod: %w", err) + } + + return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"} +} + +//nolint:goerr113 // out of scope for this PR +func (r *StatefulSetResource) putInMaintenanceMode(ctx context.Context, ordinal int32) error { + adminAPIClient, err := r.getAdminAPIClient(ctx, ordinal) + if err != nil { + return fmt.Errorf("creating admin API client: %w", err) + } + + nodeConf, err := adminAPIClient.GetNodeConfig(ctx) + if err != nil { + return fmt.Errorf("getting node config: %w", err) + } + + err = adminAPIClient.EnableMaintenanceMode(ctx, nodeConf.NodeID) + if err != nil { + return fmt.Errorf("enabling maintenance mode: %w", err) + } + + br, err := adminAPIClient.Broker(ctx, nodeConf.NodeID) + if err != nil { + return fmt.Errorf("getting broker infromations: %w", err) + } + if br.Maintenance != nil && + br.Maintenance.Finished && + !br.Maintenance.Draining && + !br.Maintenance.Errors && + br.Maintenance.Failed == 0 { + return fmt.Errorf("maintenance mode is not valid to do rolling update: %v", br.Maintenance) + } + + return nil +} + func (r *StatefulSetResource) updateStatefulSet( ctx context.Context, current *appsv1.StatefulSet,