diff --git a/src/go/k8s/pkg/admin/admin.go b/src/go/k8s/pkg/admin/admin.go index 4962683bcddda..92a7f5a91e71b 100644 --- a/src/go/k8s/pkg/admin/admin.go +++ b/src/go/k8s/pkg/admin/admin.go @@ -92,6 +92,7 @@ type AdminAPIClient interface { GetLicenseInfo(ctx context.Context) (admin.License, error) Brokers(ctx context.Context) ([]admin.Broker, error) + Broker(ctx context.Context, nodeID int) (admin.Broker, error) DecommissionBroker(ctx context.Context, node int) error RecommissionBroker(ctx context.Context, node int) error diff --git a/src/go/k8s/pkg/admin/mock_admin.go b/src/go/k8s/pkg/admin/mock_admin.go index b97e51e43df0e..177299e03c7b0 100644 --- a/src/go/k8s/pkg/admin/mock_admin.go +++ b/src/go/k8s/pkg/admin/mock_admin.go @@ -390,6 +390,26 @@ func (m *MockAdminAPI) SetBrokerStatus( return fmt.Errorf("unknown broker %d", id) } +func (m *MockAdminAPI) Broker(_ context.Context, nodeID int) (admin.Broker, error) { + t := true + return admin.Broker{ + NodeID: nodeID, + NumCores: 2, + MembershipStatus: "", + IsAlive: &t, + Version: "unversioned", + Maintenance: &admin.MaintenanceStatus{ + Draining: false, + Finished: true, + Errors: false, + Partitions: 0, + Eligible: 0, + Transferring: 0, + Failed: 0, + }, + }, nil +} + func makeCopy(input, output interface{}) { ser, err := json.Marshal(input) if err != nil { diff --git a/src/go/k8s/pkg/resources/statefulset_update.go b/src/go/k8s/pkg/resources/statefulset_update.go index c12467d22f323..a2d992ca980ec 100644 --- a/src/go/k8s/pkg/resources/statefulset_update.go +++ b/src/go/k8s/pkg/resources/statefulset_update.go @@ -18,6 +18,7 @@ import ( "net/url" "reflect" "sort" + "strconv" "strings" "time" @@ -173,6 +174,22 @@ func (r *StatefulSetResource) rollingUpdate( } if !patchResult.IsEmpty() { + var ordinal int64 + ordinal, err = strconv.ParseInt(pod.Name[len(r.pandaCluster.Name)+1:], 10, 32) + if err != nil { + return fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", r.pandaCluster.Name, pod.Name, err) + } + + if err = r.putInMaintenanceMode(ctx, int32(ordinal)); err != nil { + // As maintenance mode can not be easily watched using controller runtime the requeue error + // is always returned. That way a rolling update will not finish when operator waits for + // maintenance mode finished. + return &RequeueAfterError{ + RequeueAfter: RequeueDuration, + Msg: fmt.Sprintf("putting node (%s) into maintenance mode: %v", pod.Name, err), + } + } + r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod", "pod-name", pod.Name, "patch", patchResult.Patch) @@ -207,6 +224,38 @@ func (r *StatefulSetResource) rollingUpdate( return nil } +//nolint:goerr113 // out of scope for this PR +func (r *StatefulSetResource) putInMaintenanceMode(ctx context.Context, ordinal int32) error { + adminAPIClient, err := r.getAdminAPIClient(ctx, ordinal) + if err != nil { + return fmt.Errorf("creating admin API client: %w", err) + } + + nodeConf, err := adminAPIClient.GetNodeConfig(ctx) + if err != nil { + return fmt.Errorf("getting node config: %w", err) + } + + err = adminAPIClient.EnableMaintenanceMode(ctx, nodeConf.NodeID) + if err != nil { + return fmt.Errorf("enabling maintenance mode: %w", err) + } + + br, err := adminAPIClient.Broker(ctx, nodeConf.NodeID) + if err != nil { + return fmt.Errorf("getting broker infromations: %w", err) + } + if br.Maintenance != nil && + br.Maintenance.Finished && + !br.Maintenance.Draining && + !br.Maintenance.Errors && + br.Maintenance.Failed == 0 { + return fmt.Errorf("maintenance mode is not valid to do rolling update: %v", br.Maintenance) + } + + return nil +} + func (r *StatefulSetResource) updateStatefulSet( ctx context.Context, current *appsv1.StatefulSet,