diff --git a/src/go/k8s/controllers/redpanda/cluster_controller.go b/src/go/k8s/controllers/redpanda/cluster_controller.go index 3a66f3fcfc55b..5a18ae211720b 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller.go @@ -551,7 +551,7 @@ func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1 return -1, fmt.Errorf("creating pki: %w", err) } - ordinal, err := strconv.ParseInt(getPodOrdinal(pod.Name, rp.Name), 10, 0) + ordinal, err := utils.GetPodOrdinal(pod.Name, rp.Name) if err != nil { return -1, fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", rp.Name, pod.Name, err) } @@ -567,16 +567,6 @@ func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1 return int32(cfg.NodeID), nil } -func getPodOrdinal(podName string, clusterName string) string { - // Pod name needs to have at least 2 more characters - if len(podName) < len(clusterName)+2 { - return "" - } - - // The +1 is for the separator between stateful set name and pod ordinal - return podName[len(clusterName)+1:] -} - func (r *ClusterReconciler) reportStatus( ctx context.Context, redpandaCluster *redpandav1alpha1.Cluster, diff --git a/src/go/k8s/pkg/admin/admin.go b/src/go/k8s/pkg/admin/admin.go index 4962683bcddda..92a7f5a91e71b 100644 --- a/src/go/k8s/pkg/admin/admin.go +++ b/src/go/k8s/pkg/admin/admin.go @@ -92,6 +92,7 @@ type AdminAPIClient interface { GetLicenseInfo(ctx context.Context) (admin.License, error) Brokers(ctx context.Context) ([]admin.Broker, error) + Broker(ctx context.Context, nodeID int) (admin.Broker, error) DecommissionBroker(ctx context.Context, node int) error RecommissionBroker(ctx context.Context, node int) error diff --git a/src/go/k8s/pkg/admin/mock_admin.go b/src/go/k8s/pkg/admin/mock_admin.go index 80f20e1767d2e..5aeb1f7db6439 100644 --- a/src/go/k8s/pkg/admin/mock_admin.go +++ b/src/go/k8s/pkg/admin/mock_admin.go @@ -25,17 +25,18 @@ import ( ) type MockAdminAPI struct { - config admin.Config - schema admin.ConfigSchema - patches []configuration.CentralConfigurationPatch - unavailable bool - invalid []string - unknown []string - directValidation bool - brokers []admin.Broker - monitor sync.Mutex - Log logr.Logger - clusterHealth bool + config admin.Config + schema admin.ConfigSchema + patches []configuration.CentralConfigurationPatch + unavailable bool + invalid []string + unknown []string + directValidation bool + brokers []admin.Broker + monitor sync.Mutex + Log logr.Logger + clusterHealth bool + MaintenanceStatus *admin.MaintenanceStatus } var _ AdminAPIClient = &MockAdminAPI{Log: ctrl.Log.WithName("AdminAPIClient").WithName("mockAdminAPI")} @@ -422,6 +423,18 @@ func (m *MockAdminAPI) SetBrokerStatus( return fmt.Errorf("unknown broker %d", id) } +func (m *MockAdminAPI) Broker(_ context.Context, nodeID int) (admin.Broker, error) { + t := true + return admin.Broker{ + NodeID: nodeID, + NumCores: 2, + MembershipStatus: "", + IsAlive: &t, + Version: "unversioned", + Maintenance: m.MaintenanceStatus, + }, nil +} + func makeCopy(input, output interface{}) { ser, err := json.Marshal(input) if err != nil { diff --git a/src/go/k8s/pkg/resources/statefulset_update.go b/src/go/k8s/pkg/resources/statefulset_update.go index c12467d22f323..0a1973fa0efde 100644 --- a/src/go/k8s/pkg/resources/statefulset_update.go +++ b/src/go/k8s/pkg/resources/statefulset_update.go @@ -157,31 +157,13 @@ func (r *StatefulSetResource) rollingUpdate( volumes[vol.Name] = new(interface{}) } - opts := []patch.CalculateOption{ - patch.IgnoreStatusFields(), - ignoreKubernetesTokenVolumeMounts(), - ignoreDefaultToleration(), - ignoreExistingVolumes(volumes), - } - for i := range podList.Items { pod := podList.Items[i] - patchResult, err := patch.DefaultPatchMaker.Calculate(&pod, &artificialPod, opts...) - if err != nil { + if err = r.podEviction(ctx, &pod, &artificialPod, volumes); err != nil { return err } - if !patchResult.IsEmpty() { - r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod", - "pod-name", pod.Name, - "patch", patchResult.Patch) - if err = r.Delete(ctx, &pod); err != nil { - return fmt.Errorf("unable to remove Redpanda pod: %w", err) - } - return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"} - } - if !utils.IsPodReady(&pod) { return &RequeueAfterError{ RequeueAfter: RequeueDuration, @@ -207,6 +189,89 @@ func (r *StatefulSetResource) rollingUpdate( return nil } +func (r *StatefulSetResource) podEviction(ctx context.Context, pod, artificialPod *corev1.Pod, newVolumes map[string]interface{}) error { + opts := []patch.CalculateOption{ + patch.IgnoreStatusFields(), + ignoreKubernetesTokenVolumeMounts(), + ignoreDefaultToleration(), + ignoreExistingVolumes(newVolumes), + } + + patchResult, err := patch.DefaultPatchMaker.Calculate(pod, artificialPod, opts...) + if err != nil { + return err + } + + if patchResult.IsEmpty() { + return nil + } + + var ordinal int64 + ordinal, err = utils.GetPodOrdinal(pod.Name, r.pandaCluster.Name) + if err != nil { + return fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", r.pandaCluster.Name, pod.Name, err) + } + + if *r.pandaCluster.Spec.Replicas > 1 { + if err = r.putInMaintenanceMode(ctx, int32(ordinal)); err != nil { + // As maintenance mode can not be easily watched using controller runtime the requeue error + // is always returned. That way a rolling update will not finish when operator waits for + // maintenance mode finished. + return &RequeueAfterError{ + RequeueAfter: RequeueDuration, + Msg: fmt.Sprintf("putting node (%s) into maintenance mode: %v", pod.Name, err), + } + } + } + + r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod", + "pod-name", pod.Name, + "patch", patchResult.Patch) + + if err = r.Delete(ctx, pod); err != nil { + return fmt.Errorf("unable to remove Redpanda pod: %w", err) + } + + return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"} +} + +var ( + ErrMaintenanceNotFinished = errors.New("maintenance mode is not finished") + ErrMaintenanceMissing = errors.New("maintenance definition not returned") +) + +func (r *StatefulSetResource) putInMaintenanceMode(ctx context.Context, ordinal int32) error { + adminAPIClient, err := r.getAdminAPIClient(ctx, ordinal) + if err != nil { + return fmt.Errorf("creating admin API client: %w", err) + } + + nodeConf, err := adminAPIClient.GetNodeConfig(ctx) + if err != nil { + return fmt.Errorf("getting node config: %w", err) + } + + err = adminAPIClient.EnableMaintenanceMode(ctx, nodeConf.NodeID) + if err != nil { + return fmt.Errorf("enabling maintenance mode: %w", err) + } + + br, err := adminAPIClient.Broker(ctx, nodeConf.NodeID) + if err != nil { + return fmt.Errorf("getting broker infromations: %w", err) + } + + if br.Maintenance == nil { + return ErrMaintenanceMissing + } + + if !br.Maintenance.Finished { + return fmt.Errorf("draining (%t), errors (%t), failed (%d), finished (%t): %w", br.Maintenance.Draining, br.Maintenance.Errors, br.Maintenance.Failed, br.Maintenance.Finished, ErrMaintenanceNotFinished) + } + + return nil +} + func (r *StatefulSetResource) updateStatefulSet( ctx context.Context, current *appsv1.StatefulSet, diff --git a/src/go/k8s/pkg/resources/statefulset_update_test.go b/src/go/k8s/pkg/resources/statefulset_update_test.go index 27b6ba763de00..65989fad950f3 100644 --- a/src/go/k8s/pkg/resources/statefulset_update_test.go +++ b/src/go/k8s/pkg/resources/statefulset_update_test.go @@ -10,12 +10,19 @@ package resources //nolint:testpackage // needed to test private method import ( + "context" "testing" + redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" + adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" ) func TestShouldUpdate_AnnotationChange(t *testing.T) { @@ -65,3 +72,78 @@ func TestShouldUpdate_AnnotationChange(t *testing.T) { require.NoError(t, err) require.False(t, update) } + +func TestPutInMaintenanceMode(t *testing.T) { + tcs := []struct { + name string + maintenanceStatus *admin.MaintenanceStatus + errorRequired error + }{ + { + "maintenance finished", + &admin.MaintenanceStatus{ + Finished: true, + }, + nil, + }, + { + "maintenance draining", + &admin.MaintenanceStatus{ + Draining: true, + }, + ErrMaintenanceNotFinished, + }, + { + "maintenance failed", + &admin.MaintenanceStatus{ + Failed: 1, + }, + ErrMaintenanceNotFinished, + }, + { + "maintenance has errors", + &admin.MaintenanceStatus{ + Errors: true, + }, + ErrMaintenanceNotFinished, + }, + { + "maintenance did not finished", + &admin.MaintenanceStatus{ + Finished: false, + }, + ErrMaintenanceNotFinished, + }, + { + "maintenance was not returned", + nil, + ErrMaintenanceMissing, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + ssres := StatefulSetResource{ + adminAPIClientFactory: func( + ctx context.Context, + k8sClient client.Reader, + redpandaCluster *redpandav1alpha1.Cluster, + fqdn string, + adminTLSProvider types.AdminTLSConfigProvider, + ordinals ...int32, + ) (adminutils.AdminAPIClient, error) { + return &adminutils.MockAdminAPI{ + Log: ctrl.Log.WithName("testAdminAPI").WithName("mockAdminAPI"), + MaintenanceStatus: tc.maintenanceStatus, + }, nil + }, + } + err := ssres.putInMaintenanceMode(context.Background(), 0) + if tc.errorRequired == nil { + require.NoError(t, err) + } else { + require.ErrorIs(t, err, tc.errorRequired) + } + }) + } +} diff --git a/src/go/k8s/pkg/utils/kubernetes.go b/src/go/k8s/pkg/utils/kubernetes.go index 27b78f597a6a9..66994d42747a7 100644 --- a/src/go/k8s/pkg/utils/kubernetes.go +++ b/src/go/k8s/pkg/utils/kubernetes.go @@ -9,7 +9,14 @@ package utils -import corev1 "k8s.io/api/core/v1" +import ( + "fmt" + "strconv" + + corev1 "k8s.io/api/core/v1" +) + +var ErrInvalidInputParameters = fmt.Errorf("invalid input parameters") // IsPodReady tells if a given pod is ready looking at its status. func IsPodReady(pod *corev1.Pod) bool { @@ -21,3 +28,18 @@ func IsPodReady(pod *corev1.Pod) bool { return false } + +func GetPodOrdinal(podName, clusterName string) (int64, error) { + // Pod name needs to have at least 2 more characters + if len(podName) < len(clusterName)+2 { + return -1, fmt.Errorf("pod name (%s) and cluster name (%s): %w", podName, clusterName, ErrInvalidInputParameters) + } + + // The +1 is for the separator between stateful set name and pod ordinal + ordinalStr := podName[len(clusterName)+1:] + ordinal, err := strconv.ParseInt(ordinalStr, 10, 0) + if err != nil { + return -1, fmt.Errorf("parsing int failed (%s): %w", ordinalStr, err) + } + return ordinal, nil +} diff --git a/src/go/k8s/pkg/utils/kubernetes_test.go b/src/go/k8s/pkg/utils/kubernetes_test.go new file mode 100644 index 0000000000000..21030deb17b75 --- /dev/null +++ b/src/go/k8s/pkg/utils/kubernetes_test.go @@ -0,0 +1,45 @@ +// Copyright 2022-2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package utils_test + +import ( + "fmt" + "testing" + + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils" + "github.com/stretchr/testify/require" +) + +func TestGetPodOrdinal(t *testing.T) { + tcs := []struct { + podName string + clusterName string + expectedError bool + expectedOrdinal int64 + }{ + {"", "", true, -1}, + {"test", "", true, -1}, + {"pod-0", "pod", false, 0}, + {"pod-99", "pod", false, 99}, + {"", "unexpected longer cluster name", true, -1}, + {"test+0", "test", false, 0}, + {"without-ordinal-", "without-ordinal", true, -1}, + } + + for _, tc := range tcs { + t.Run(fmt.Sprintf("pod %s and cluster %s", tc.podName, tc.clusterName), func(t *testing.T) { + ordinal, err := utils.GetPodOrdinal(tc.podName, tc.clusterName) + if tc.expectedError { + require.Error(t, err) + } + require.Equal(t, tc.expectedOrdinal, ordinal) + }) + } +}