diff --git a/src/go/k8s/controllers/redpanda/cluster_controller.go b/src/go/k8s/controllers/redpanda/cluster_controller.go index 3a66f3fcfc55b..024c243821ea6 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller.go @@ -551,7 +551,7 @@ func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1 return -1, fmt.Errorf("creating pki: %w", err) } - ordinal, err := strconv.ParseInt(getPodOrdinal(pod.Name, rp.Name), 10, 0) + ordinal, err := strconv.ParseInt(utils.GetPodOrdinal(pod.Name, rp.Name), 10, 0) if err != nil { return -1, fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", rp.Name, pod.Name, err) } @@ -567,16 +567,6 @@ func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1 return int32(cfg.NodeID), nil } -func getPodOrdinal(podName string, clusterName string) string { - // Pod name needs to have at least 2 more characters - if len(podName) < len(clusterName)+2 { - return "" - } - - // The +1 is for the separator between stateful set name and pod ordinal - return podName[len(clusterName)+1:] -} - func (r *ClusterReconciler) reportStatus( ctx context.Context, redpandaCluster *redpandav1alpha1.Cluster, diff --git a/src/go/k8s/pkg/admin/admin.go b/src/go/k8s/pkg/admin/admin.go index 4962683bcddda..92a7f5a91e71b 100644 --- a/src/go/k8s/pkg/admin/admin.go +++ b/src/go/k8s/pkg/admin/admin.go @@ -92,6 +92,7 @@ type AdminAPIClient interface { GetLicenseInfo(ctx context.Context) (admin.License, error) Brokers(ctx context.Context) ([]admin.Broker, error) + Broker(ctx context.Context, nodeID int) (admin.Broker, error) DecommissionBroker(ctx context.Context, node int) error RecommissionBroker(ctx context.Context, node int) error diff --git a/src/go/k8s/pkg/admin/mock_admin.go b/src/go/k8s/pkg/admin/mock_admin.go index 80f20e1767d2e..9404079a2daee 100644 --- a/src/go/k8s/pkg/admin/mock_admin.go +++ b/src/go/k8s/pkg/admin/mock_admin.go @@ -422,6 +422,26 @@ func (m *MockAdminAPI) SetBrokerStatus( return fmt.Errorf("unknown broker %d", id) } +func (m *MockAdminAPI) Broker(_ context.Context, nodeID int) (admin.Broker, error) { + t := true + return admin.Broker{ + NodeID: nodeID, + NumCores: 2, + MembershipStatus: "", + IsAlive: &t, + Version: "unversioned", + Maintenance: &admin.MaintenanceStatus{ + Draining: false, + Finished: true, + Errors: false, + Partitions: 0, + Eligible: 0, + Transferring: 0, + Failed: 0, + }, + }, nil +} + func makeCopy(input, output interface{}) { ser, err := json.Marshal(input) if err != nil { diff --git a/src/go/k8s/pkg/resources/statefulset_update.go b/src/go/k8s/pkg/resources/statefulset_update.go index c12467d22f323..6b20df60ca2e4 100644 --- a/src/go/k8s/pkg/resources/statefulset_update.go +++ b/src/go/k8s/pkg/resources/statefulset_update.go @@ -18,6 +18,7 @@ import ( "net/url" "reflect" "sort" + "strconv" "strings" "time" @@ -157,31 +158,13 @@ func (r *StatefulSetResource) rollingUpdate( volumes[vol.Name] = new(interface{}) } - opts := []patch.CalculateOption{ - patch.IgnoreStatusFields(), - ignoreKubernetesTokenVolumeMounts(), - ignoreDefaultToleration(), - ignoreExistingVolumes(volumes), - } - for i := range podList.Items { pod := podList.Items[i] - patchResult, err := patch.DefaultPatchMaker.Calculate(&pod, &artificialPod, opts...) - if err != nil { + if err = r.podEviction(ctx, &pod, &artificialPod, volumes); err != nil { return err } - if !patchResult.IsEmpty() { - r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod", - "pod-name", pod.Name, - "patch", patchResult.Patch) - if err = r.Delete(ctx, &pod); err != nil { - return fmt.Errorf("unable to remove Redpanda pod: %w", err) - } - return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"} - } - if !utils.IsPodReady(&pod) { return &RequeueAfterError{ RequeueAfter: RequeueDuration, @@ -207,6 +190,92 @@ func (r *StatefulSetResource) rollingUpdate( return nil } +func (r *StatefulSetResource) podEviction(ctx context.Context, pod, artificialPod *corev1.Pod, newVolumes map[string]interface{}) error { + opts := []patch.CalculateOption{ + patch.IgnoreStatusFields(), + ignoreKubernetesTokenVolumeMounts(), + ignoreDefaultToleration(), + ignoreExistingVolumes(newVolumes), + } + + patchResult, err := patch.DefaultPatchMaker.Calculate(pod, artificialPod, opts...) + if err != nil { + return err + } + + if patchResult.IsEmpty() { + return nil + } + + var ordinal int64 + ordinal, err = strconv.ParseInt(utils.GetPodOrdinal(pod.Name, r.pandaCluster.Name), 10, 32) + if err != nil { + return fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", r.pandaCluster.Name, pod.Name, err) + } + + if *r.pandaCluster.Spec.Replicas > 1 { + if err = r.putInMaintenanceMode(ctx, int32(ordinal)); err != nil { + // As maintenance mode can not be easily watched using controller runtime the requeue error + // is always returned. That way a rolling update will not finish when operator waits for + // maintenance mode finished. + return &RequeueAfterError{ + RequeueAfter: RequeueDuration, + Msg: fmt.Sprintf("putting node (%s) into maintenance mode: %v", pod.Name, err), + } + } + } + + r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod", + "pod-name", pod.Name, + "patch", patchResult.Patch) + + if err = r.Delete(ctx, pod); err != nil { + return fmt.Errorf("unable to remove Redpanda pod: %w", err) + } + + return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"} +} + +//nolint:goerr113 // out of scope for this PR +func (r *StatefulSetResource) putInMaintenanceMode(ctx context.Context, ordinal int32) error { + adminAPIClient, err := r.getAdminAPIClient(ctx, ordinal) + if err != nil { + return fmt.Errorf("creating admin API client: %w", err) + } + + nodeConf, err := adminAPIClient.GetNodeConfig(ctx) + if err != nil { + return fmt.Errorf("getting node config: %w", err) + } + + err = adminAPIClient.EnableMaintenanceMode(ctx, nodeConf.NodeID) + if err != nil { + return fmt.Errorf("enabling maintenance mode: %w", err) + } + + br, err := adminAPIClient.Broker(ctx, nodeConf.NodeID) + if err != nil { + return fmt.Errorf("getting broker infromations: %w", err) + } + + if br.Maintenance == nil { + return fmt.Errorf("maintenance definition not returned") + } + + switch { + case br.Maintenance.Draining: + case br.Maintenance.Errors: + case br.Maintenance.Failed > 0: + return fmt.Errorf("maintenance mode is not valid to do rolling update: %v", br.Maintenance) + } + + if !br.Maintenance.Finished { + return fmt.Errorf("maintenance mode is not finished") + } + + return nil +} + func (r *StatefulSetResource) updateStatefulSet( ctx context.Context, current *appsv1.StatefulSet, diff --git a/src/go/k8s/pkg/utils/kubernetes.go b/src/go/k8s/pkg/utils/kubernetes.go index 27b78f597a6a9..529a087806853 100644 --- a/src/go/k8s/pkg/utils/kubernetes.go +++ b/src/go/k8s/pkg/utils/kubernetes.go @@ -21,3 +21,13 @@ func IsPodReady(pod *corev1.Pod) bool { return false } + +func GetPodOrdinal(podName, clusterName string) string { + // Pod name needs to have at least 2 more characters + if len(podName) < len(clusterName)+2 { + return "" + } + + // The +1 is for the separator between stateful set name and pod ordinal + return podName[len(clusterName)+1:] +} diff --git a/src/go/k8s/pkg/utils/kubernetes_test.go b/src/go/k8s/pkg/utils/kubernetes_test.go new file mode 100644 index 0000000000000..c34a1268e5faf --- /dev/null +++ b/src/go/k8s/pkg/utils/kubernetes_test.go @@ -0,0 +1,39 @@ +// Copyright 2022-2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package utils_test + +import ( + "fmt" + "testing" + + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils" + "github.com/stretchr/testify/require" +) + +func TestGetPodOrdinal(t *testing.T) { + tcs := []struct { + podName string + clusterName string + expectedOrdinal string + }{ + {"", "", ""}, + {"test", "", "est"}, + {"pod-0", "pod", "0"}, + {"", "unexpected longer cluster name", ""}, + {"test+0", "test", "0"}, + {"without-ordinal-", "without-ordinal", ""}, + } + + for _, tc := range tcs { + t.Run(fmt.Sprintf("pod %s and cluster %s", tc.podName, tc.clusterName), func(t *testing.T) { + require.Equal(t, tc.expectedOrdinal, utils.GetPodOrdinal(tc.podName, tc.clusterName)) + }) + } +}