From 3c34855045ade115dab2ebaacbe25284f5659c23 Mon Sep 17 00:00:00 2001 From: Rafal Korepta Date: Wed, 4 Jan 2023 18:26:26 +0100 Subject: [PATCH] k8s: Put brokers in maintenance mode before deleting orphant pod During rolling update, before this change, Redpanda operator was calculating the difference between running pod specification and stateful set pod template. If the specification did not match the pod was deleted. From release v22.1.1 operator is configuring each broker with pod lifecycle hooks. In the PreStop hook the script will try to put broker into maintenance mode for 120 seconds before POD is terminated. Redpanda could not finish within 120 seconds to put one broker into maintenance mode. This PR improves the situation by putting maintenance mode before POD is deleted. The `EnableMaintanaceMode` function is called multiple times until `Broker` function returns correct status. The assumption is that REST admin API maintenance mode endpoint is idempotent. When pod is successfully deleted statefulset would reschedule the pod with correct pod specification. https://github.com/redpanda-data/redpanda/pull/4125 https://github.com/redpanda-data/redpanda/issues/3023 --- .../redpanda/cluster_controller.go | 12 +- src/go/k8s/pkg/admin/admin.go | 1 + src/go/k8s/pkg/admin/mock_admin.go | 35 ++++-- .../k8s/pkg/resources/statefulset_update.go | 103 ++++++++++++++---- .../pkg/resources/statefulset_update_test.go | 82 ++++++++++++++ src/go/k8s/pkg/utils/kubernetes.go | 24 +++- src/go/k8s/pkg/utils/kubernetes_test.go | 45 ++++++++ 7 files changed, 260 insertions(+), 42 deletions(-) create mode 100644 src/go/k8s/pkg/utils/kubernetes_test.go diff --git a/src/go/k8s/controllers/redpanda/cluster_controller.go b/src/go/k8s/controllers/redpanda/cluster_controller.go index 3a66f3fcfc55..5a18ae211720 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller.go @@ -551,7 +551,7 @@ func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1 return -1, fmt.Errorf("creating pki: %w", err) } - ordinal, err := strconv.ParseInt(getPodOrdinal(pod.Name, rp.Name), 10, 0) + ordinal, err := utils.GetPodOrdinal(pod.Name, rp.Name) if err != nil { return -1, fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", rp.Name, pod.Name, err) } @@ -567,16 +567,6 @@ func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1 return int32(cfg.NodeID), nil } -func getPodOrdinal(podName string, clusterName string) string { - // Pod name needs to have at least 2 more characters - if len(podName) < len(clusterName)+2 { - return "" - } - - // The +1 is for the separator between stateful set name and pod ordinal - return podName[len(clusterName)+1:] -} - func (r *ClusterReconciler) reportStatus( ctx context.Context, redpandaCluster *redpandav1alpha1.Cluster, diff --git a/src/go/k8s/pkg/admin/admin.go b/src/go/k8s/pkg/admin/admin.go index 4962683bcddd..92a7f5a91e71 100644 --- a/src/go/k8s/pkg/admin/admin.go +++ b/src/go/k8s/pkg/admin/admin.go @@ -92,6 +92,7 @@ type AdminAPIClient interface { GetLicenseInfo(ctx context.Context) (admin.License, error) Brokers(ctx context.Context) ([]admin.Broker, error) + Broker(ctx context.Context, nodeID int) (admin.Broker, error) DecommissionBroker(ctx context.Context, node int) error RecommissionBroker(ctx context.Context, node int) error diff --git a/src/go/k8s/pkg/admin/mock_admin.go b/src/go/k8s/pkg/admin/mock_admin.go index 80f20e1767d2..5aeb1f7db643 100644 --- a/src/go/k8s/pkg/admin/mock_admin.go +++ b/src/go/k8s/pkg/admin/mock_admin.go @@ -25,17 +25,18 @@ import ( ) type MockAdminAPI struct { - config admin.Config - schema admin.ConfigSchema - patches []configuration.CentralConfigurationPatch - unavailable bool - invalid []string - unknown []string - directValidation bool - brokers []admin.Broker - monitor sync.Mutex - Log logr.Logger - clusterHealth bool + config admin.Config + schema admin.ConfigSchema + patches []configuration.CentralConfigurationPatch + unavailable bool + invalid []string + unknown []string + directValidation bool + brokers []admin.Broker + monitor sync.Mutex + Log logr.Logger + clusterHealth bool + MaintenanceStatus *admin.MaintenanceStatus } var _ AdminAPIClient = &MockAdminAPI{Log: ctrl.Log.WithName("AdminAPIClient").WithName("mockAdminAPI")} @@ -422,6 +423,18 @@ func (m *MockAdminAPI) SetBrokerStatus( return fmt.Errorf("unknown broker %d", id) } +func (m *MockAdminAPI) Broker(_ context.Context, nodeID int) (admin.Broker, error) { + t := true + return admin.Broker{ + NodeID: nodeID, + NumCores: 2, + MembershipStatus: "", + IsAlive: &t, + Version: "unversioned", + Maintenance: m.MaintenanceStatus, + }, nil +} + func makeCopy(input, output interface{}) { ser, err := json.Marshal(input) if err != nil { diff --git a/src/go/k8s/pkg/resources/statefulset_update.go b/src/go/k8s/pkg/resources/statefulset_update.go index c12467d22f32..0a1973fa0efd 100644 --- a/src/go/k8s/pkg/resources/statefulset_update.go +++ b/src/go/k8s/pkg/resources/statefulset_update.go @@ -157,31 +157,13 @@ func (r *StatefulSetResource) rollingUpdate( volumes[vol.Name] = new(interface{}) } - opts := []patch.CalculateOption{ - patch.IgnoreStatusFields(), - ignoreKubernetesTokenVolumeMounts(), - ignoreDefaultToleration(), - ignoreExistingVolumes(volumes), - } - for i := range podList.Items { pod := podList.Items[i] - patchResult, err := patch.DefaultPatchMaker.Calculate(&pod, &artificialPod, opts...) - if err != nil { + if err = r.podEviction(ctx, &pod, &artificialPod, volumes); err != nil { return err } - if !patchResult.IsEmpty() { - r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod", - "pod-name", pod.Name, - "patch", patchResult.Patch) - if err = r.Delete(ctx, &pod); err != nil { - return fmt.Errorf("unable to remove Redpanda pod: %w", err) - } - return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"} - } - if !utils.IsPodReady(&pod) { return &RequeueAfterError{ RequeueAfter: RequeueDuration, @@ -207,6 +189,89 @@ func (r *StatefulSetResource) rollingUpdate( return nil } +func (r *StatefulSetResource) podEviction(ctx context.Context, pod, artificialPod *corev1.Pod, newVolumes map[string]interface{}) error { + opts := []patch.CalculateOption{ + patch.IgnoreStatusFields(), + ignoreKubernetesTokenVolumeMounts(), + ignoreDefaultToleration(), + ignoreExistingVolumes(newVolumes), + } + + patchResult, err := patch.DefaultPatchMaker.Calculate(pod, artificialPod, opts...) + if err != nil { + return err + } + + if patchResult.IsEmpty() { + return nil + } + + var ordinal int64 + ordinal, err = utils.GetPodOrdinal(pod.Name, r.pandaCluster.Name) + if err != nil { + return fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", r.pandaCluster.Name, pod.Name, err) + } + + if *r.pandaCluster.Spec.Replicas > 1 { + if err = r.putInMaintenanceMode(ctx, int32(ordinal)); err != nil { + // As maintenance mode can not be easily watched using controller runtime the requeue error + // is always returned. That way a rolling update will not finish when operator waits for + // maintenance mode finished. + return &RequeueAfterError{ + RequeueAfter: RequeueDuration, + Msg: fmt.Sprintf("putting node (%s) into maintenance mode: %v", pod.Name, err), + } + } + } + + r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod", + "pod-name", pod.Name, + "patch", patchResult.Patch) + + if err = r.Delete(ctx, pod); err != nil { + return fmt.Errorf("unable to remove Redpanda pod: %w", err) + } + + return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"} +} + +var ( + ErrMaintenanceNotFinished = errors.New("maintenance mode is not finished") + ErrMaintenanceMissing = errors.New("maintenance definition not returned") +) + +func (r *StatefulSetResource) putInMaintenanceMode(ctx context.Context, ordinal int32) error { + adminAPIClient, err := r.getAdminAPIClient(ctx, ordinal) + if err != nil { + return fmt.Errorf("creating admin API client: %w", err) + } + + nodeConf, err := adminAPIClient.GetNodeConfig(ctx) + if err != nil { + return fmt.Errorf("getting node config: %w", err) + } + + err = adminAPIClient.EnableMaintenanceMode(ctx, nodeConf.NodeID) + if err != nil { + return fmt.Errorf("enabling maintenance mode: %w", err) + } + + br, err := adminAPIClient.Broker(ctx, nodeConf.NodeID) + if err != nil { + return fmt.Errorf("getting broker infromations: %w", err) + } + + if br.Maintenance == nil { + return ErrMaintenanceMissing + } + + if !br.Maintenance.Finished { + return fmt.Errorf("draining (%t), errors (%t), failed (%d), finished (%t): %w", br.Maintenance.Draining, br.Maintenance.Errors, br.Maintenance.Failed, br.Maintenance.Finished, ErrMaintenanceNotFinished) + } + + return nil +} + func (r *StatefulSetResource) updateStatefulSet( ctx context.Context, current *appsv1.StatefulSet, diff --git a/src/go/k8s/pkg/resources/statefulset_update_test.go b/src/go/k8s/pkg/resources/statefulset_update_test.go index 27b6ba763de0..65989fad950f 100644 --- a/src/go/k8s/pkg/resources/statefulset_update_test.go +++ b/src/go/k8s/pkg/resources/statefulset_update_test.go @@ -10,12 +10,19 @@ package resources //nolint:testpackage // needed to test private method import ( + "context" "testing" + redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" + adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" ) func TestShouldUpdate_AnnotationChange(t *testing.T) { @@ -65,3 +72,78 @@ func TestShouldUpdate_AnnotationChange(t *testing.T) { require.NoError(t, err) require.False(t, update) } + +func TestPutInMaintenanceMode(t *testing.T) { + tcs := []struct { + name string + maintenanceStatus *admin.MaintenanceStatus + errorRequired error + }{ + { + "maintenance finished", + &admin.MaintenanceStatus{ + Finished: true, + }, + nil, + }, + { + "maintenance draining", + &admin.MaintenanceStatus{ + Draining: true, + }, + ErrMaintenanceNotFinished, + }, + { + "maintenance failed", + &admin.MaintenanceStatus{ + Failed: 1, + }, + ErrMaintenanceNotFinished, + }, + { + "maintenance has errors", + &admin.MaintenanceStatus{ + Errors: true, + }, + ErrMaintenanceNotFinished, + }, + { + "maintenance did not finished", + &admin.MaintenanceStatus{ + Finished: false, + }, + ErrMaintenanceNotFinished, + }, + { + "maintenance was not returned", + nil, + ErrMaintenanceMissing, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + ssres := StatefulSetResource{ + adminAPIClientFactory: func( + ctx context.Context, + k8sClient client.Reader, + redpandaCluster *redpandav1alpha1.Cluster, + fqdn string, + adminTLSProvider types.AdminTLSConfigProvider, + ordinals ...int32, + ) (adminutils.AdminAPIClient, error) { + return &adminutils.MockAdminAPI{ + Log: ctrl.Log.WithName("testAdminAPI").WithName("mockAdminAPI"), + MaintenanceStatus: tc.maintenanceStatus, + }, nil + }, + } + err := ssres.putInMaintenanceMode(context.Background(), 0) + if tc.errorRequired == nil { + require.NoError(t, err) + } else { + require.ErrorIs(t, err, tc.errorRequired) + } + }) + } +} diff --git a/src/go/k8s/pkg/utils/kubernetes.go b/src/go/k8s/pkg/utils/kubernetes.go index 27b78f597a6a..66994d42747a 100644 --- a/src/go/k8s/pkg/utils/kubernetes.go +++ b/src/go/k8s/pkg/utils/kubernetes.go @@ -9,7 +9,14 @@ package utils -import corev1 "k8s.io/api/core/v1" +import ( + "fmt" + "strconv" + + corev1 "k8s.io/api/core/v1" +) + +var ErrInvalidInputParameters = fmt.Errorf("invalid input parameters") // IsPodReady tells if a given pod is ready looking at its status. func IsPodReady(pod *corev1.Pod) bool { @@ -21,3 +28,18 @@ func IsPodReady(pod *corev1.Pod) bool { return false } + +func GetPodOrdinal(podName, clusterName string) (int64, error) { + // Pod name needs to have at least 2 more characters + if len(podName) < len(clusterName)+2 { + return -1, fmt.Errorf("pod name (%s) and cluster name (%s): %w", podName, clusterName, ErrInvalidInputParameters) + } + + // The +1 is for the separator between stateful set name and pod ordinal + ordinalStr := podName[len(clusterName)+1:] + ordinal, err := strconv.ParseInt(ordinalStr, 10, 0) + if err != nil { + return -1, fmt.Errorf("parsing int failed (%s): %w", ordinalStr, err) + } + return ordinal, nil +} diff --git a/src/go/k8s/pkg/utils/kubernetes_test.go b/src/go/k8s/pkg/utils/kubernetes_test.go new file mode 100644 index 000000000000..21030deb17b7 --- /dev/null +++ b/src/go/k8s/pkg/utils/kubernetes_test.go @@ -0,0 +1,45 @@ +// Copyright 2022-2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package utils_test + +import ( + "fmt" + "testing" + + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils" + "github.com/stretchr/testify/require" +) + +func TestGetPodOrdinal(t *testing.T) { + tcs := []struct { + podName string + clusterName string + expectedError bool + expectedOrdinal int64 + }{ + {"", "", true, -1}, + {"test", "", true, -1}, + {"pod-0", "pod", false, 0}, + {"pod-99", "pod", false, 99}, + {"", "unexpected longer cluster name", true, -1}, + {"test+0", "test", false, 0}, + {"without-ordinal-", "without-ordinal", true, -1}, + } + + for _, tc := range tcs { + t.Run(fmt.Sprintf("pod %s and cluster %s", tc.podName, tc.clusterName), func(t *testing.T) { + ordinal, err := utils.GetPodOrdinal(tc.podName, tc.clusterName) + if tc.expectedError { + require.Error(t, err) + } + require.Equal(t, tc.expectedOrdinal, ordinal) + }) + } +}