From 817571e000f27b4e65f223371e458dc797b973b3 Mon Sep 17 00:00:00 2001 From: Rafal Korepta Date: Wed, 4 Jan 2023 18:26:26 +0100 Subject: [PATCH] k8s: Put brokers in maintenance mode before deleting orphant pod During rolling update, before this change, Redpanda operator was calculating the difference between running pod specification and stateful set pod template. If the specification did not match the pod was deleted. From release v22.1.1 operator is configuring each broker with pod lifecycle hooks. In the PreStop hook the script will try to put broker into maintenance mode for 120 seconds before POD is terminated. Redpanda could not finish within 120 seconds to put one broker into maintenance mode. This PR improves the situation by putting maintenance mode before POD is deleted. The `EnableMaintanaceMode` function is called multiple times until `Broker` function returns correct status. The assumption is that REST admin API maintenance mode endpoint is idempotent. When pod is successfully deleted statefulset would reschedule the pod with correct pod specification. https://github.com/redpanda-data/redpanda/pull/4125 https://github.com/redpanda-data/redpanda/issues/3023 --- .../redpanda/cluster_controller.go | 12 +- src/go/k8s/pkg/admin/admin.go | 1 + src/go/k8s/pkg/admin/mock_admin.go | 35 ++++-- .../k8s/pkg/resources/statefulset_update.go | 108 +++++++++++++++--- .../pkg/resources/statefulset_update_test.go | 82 +++++++++++++ src/go/k8s/pkg/utils/kubernetes.go | 24 +++- src/go/k8s/pkg/utils/kubernetes_test.go | 45 ++++++++ 7 files changed, 265 insertions(+), 42 deletions(-) create mode 100644 src/go/k8s/pkg/utils/kubernetes_test.go diff --git a/src/go/k8s/controllers/redpanda/cluster_controller.go b/src/go/k8s/controllers/redpanda/cluster_controller.go index 3a66f3fcfc55b..5a18ae211720b 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller.go @@ -551,7 +551,7 @@ func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1 return -1, fmt.Errorf("creating pki: %w", err) } - ordinal, err := strconv.ParseInt(getPodOrdinal(pod.Name, rp.Name), 10, 0) + ordinal, err := utils.GetPodOrdinal(pod.Name, rp.Name) if err != nil { return -1, fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", rp.Name, pod.Name, err) } @@ -567,16 +567,6 @@ func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1 return int32(cfg.NodeID), nil } -func getPodOrdinal(podName string, clusterName string) string { - // Pod name needs to have at least 2 more characters - if len(podName) < len(clusterName)+2 { - return "" - } - - // The +1 is for the separator between stateful set name and pod ordinal - return podName[len(clusterName)+1:] -} - func (r *ClusterReconciler) reportStatus( ctx context.Context, redpandaCluster *redpandav1alpha1.Cluster, diff --git a/src/go/k8s/pkg/admin/admin.go b/src/go/k8s/pkg/admin/admin.go index 4962683bcddda..92a7f5a91e71b 100644 --- a/src/go/k8s/pkg/admin/admin.go +++ b/src/go/k8s/pkg/admin/admin.go @@ -92,6 +92,7 @@ type AdminAPIClient interface { GetLicenseInfo(ctx context.Context) (admin.License, error) Brokers(ctx context.Context) ([]admin.Broker, error) + Broker(ctx context.Context, nodeID int) (admin.Broker, error) DecommissionBroker(ctx context.Context, node int) error RecommissionBroker(ctx context.Context, node int) error diff --git a/src/go/k8s/pkg/admin/mock_admin.go b/src/go/k8s/pkg/admin/mock_admin.go index 80f20e1767d2e..5aeb1f7db6439 100644 --- a/src/go/k8s/pkg/admin/mock_admin.go +++ b/src/go/k8s/pkg/admin/mock_admin.go @@ -25,17 +25,18 @@ import ( ) type MockAdminAPI struct { - config admin.Config - schema admin.ConfigSchema - patches []configuration.CentralConfigurationPatch - unavailable bool - invalid []string - unknown []string - directValidation bool - brokers []admin.Broker - monitor sync.Mutex - Log logr.Logger - clusterHealth bool + config admin.Config + schema admin.ConfigSchema + patches []configuration.CentralConfigurationPatch + unavailable bool + invalid []string + unknown []string + directValidation bool + brokers []admin.Broker + monitor sync.Mutex + Log logr.Logger + clusterHealth bool + MaintenanceStatus *admin.MaintenanceStatus } var _ AdminAPIClient = &MockAdminAPI{Log: ctrl.Log.WithName("AdminAPIClient").WithName("mockAdminAPI")} @@ -422,6 +423,18 @@ func (m *MockAdminAPI) SetBrokerStatus( return fmt.Errorf("unknown broker %d", id) } +func (m *MockAdminAPI) Broker(_ context.Context, nodeID int) (admin.Broker, error) { + t := true + return admin.Broker{ + NodeID: nodeID, + NumCores: 2, + MembershipStatus: "", + IsAlive: &t, + Version: "unversioned", + Maintenance: m.MaintenanceStatus, + }, nil +} + func makeCopy(input, output interface{}) { ser, err := json.Marshal(input) if err != nil { diff --git a/src/go/k8s/pkg/resources/statefulset_update.go b/src/go/k8s/pkg/resources/statefulset_update.go index c12467d22f323..839c3c3a7b5c0 100644 --- a/src/go/k8s/pkg/resources/statefulset_update.go +++ b/src/go/k8s/pkg/resources/statefulset_update.go @@ -157,31 +157,13 @@ func (r *StatefulSetResource) rollingUpdate( volumes[vol.Name] = new(interface{}) } - opts := []patch.CalculateOption{ - patch.IgnoreStatusFields(), - ignoreKubernetesTokenVolumeMounts(), - ignoreDefaultToleration(), - ignoreExistingVolumes(volumes), - } - for i := range podList.Items { pod := podList.Items[i] - patchResult, err := patch.DefaultPatchMaker.Calculate(&pod, &artificialPod, opts...) - if err != nil { + if err = r.podEviction(ctx, &pod, &artificialPod, volumes); err != nil { return err } - if !patchResult.IsEmpty() { - r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod", - "pod-name", pod.Name, - "patch", patchResult.Patch) - if err = r.Delete(ctx, &pod); err != nil { - return fmt.Errorf("unable to remove Redpanda pod: %w", err) - } - return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"} - } - if !utils.IsPodReady(&pod) { return &RequeueAfterError{ RequeueAfter: RequeueDuration, @@ -207,6 +189,94 @@ func (r *StatefulSetResource) rollingUpdate( return nil } +func (r *StatefulSetResource) podEviction(ctx context.Context, pod, artificialPod *corev1.Pod, newVolumes map[string]interface{}) error { + opts := []patch.CalculateOption{ + patch.IgnoreStatusFields(), + ignoreKubernetesTokenVolumeMounts(), + ignoreDefaultToleration(), + ignoreExistingVolumes(newVolumes), + } + + patchResult, err := patch.DefaultPatchMaker.Calculate(pod, artificialPod, opts...) + if err != nil { + return err + } + + if patchResult.IsEmpty() { + return nil + } + + var ordinal int64 + ordinal, err = utils.GetPodOrdinal(pod.Name, r.pandaCluster.Name) + if err != nil { + return fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", r.pandaCluster.Name, pod.Name, err) + } + + if *r.pandaCluster.Spec.Replicas > 1 { + if err = r.putInMaintenanceMode(ctx, int32(ordinal)); err != nil { + // As maintenance mode can not be easily watched using controller runtime the requeue error + // is always returned. That way a rolling update will not finish when operator waits for + // maintenance mode finished. + return &RequeueAfterError{ + RequeueAfter: RequeueDuration, + Msg: fmt.Sprintf("putting node (%s) into maintenance mode: %v", pod.Name, err), + } + } + } + + r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod", + "pod-name", pod.Name, + "patch", patchResult.Patch) + + if err = r.Delete(ctx, pod); err != nil { + return fmt.Errorf("unable to remove Redpanda pod: %w", err) + } + + return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"} +} + +var ( + ErrMaintenanceNotValid = errors.New("maintenance mode is not valid to do rolling update") + ErrMaintenanceNotFinished = errors.New("maintenance mode is not finished") + ErrMaintenanceMissing = errors.New("maintenance definition not returned") +) + +func (r *StatefulSetResource) putInMaintenanceMode(ctx context.Context, ordinal int32) error { + adminAPIClient, err := r.getAdminAPIClient(ctx, ordinal) + if err != nil { + return fmt.Errorf("creating admin API client: %w", err) + } + + nodeConf, err := adminAPIClient.GetNodeConfig(ctx) + if err != nil { + return fmt.Errorf("getting node config: %w", err) + } + + err = adminAPIClient.EnableMaintenanceMode(ctx, nodeConf.NodeID) + if err != nil { + return fmt.Errorf("enabling maintenance mode: %w", err) + } + + br, err := adminAPIClient.Broker(ctx, nodeConf.NodeID) + if err != nil { + return fmt.Errorf("getting broker infromations: %w", err) + } + + if br.Maintenance == nil { + return ErrMaintenanceMissing + } + + if br.Maintenance.Draining || br.Maintenance.Errors || br.Maintenance.Failed > 0 { + return fmt.Errorf("draining (%t), errors (%t), failed (%d): %w", br.Maintenance.Draining, br.Maintenance.Errors, br.Maintenance.Failed, ErrMaintenanceNotValid) + } + + if !br.Maintenance.Finished { + return ErrMaintenanceNotFinished + } + + return nil +} + func (r *StatefulSetResource) updateStatefulSet( ctx context.Context, current *appsv1.StatefulSet, diff --git a/src/go/k8s/pkg/resources/statefulset_update_test.go b/src/go/k8s/pkg/resources/statefulset_update_test.go index 27b6ba763de00..cfe0520b3923d 100644 --- a/src/go/k8s/pkg/resources/statefulset_update_test.go +++ b/src/go/k8s/pkg/resources/statefulset_update_test.go @@ -10,12 +10,19 @@ package resources //nolint:testpackage // needed to test private method import ( + "context" "testing" + redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" + adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" ) func TestShouldUpdate_AnnotationChange(t *testing.T) { @@ -65,3 +72,78 @@ func TestShouldUpdate_AnnotationChange(t *testing.T) { require.NoError(t, err) require.False(t, update) } + +func TestPutInMaintenanceMode(t *testing.T) { + tcs := []struct { + name string + maintenanceStatus *admin.MaintenanceStatus + errorRequired error + }{ + { + "maintenance finished", + &admin.MaintenanceStatus{ + Finished: true, + }, + nil, + }, + { + "maintenance draining", + &admin.MaintenanceStatus{ + Draining: true, + }, + ErrMaintenanceNotValid, + }, + { + "maintenance failed", + &admin.MaintenanceStatus{ + Failed: 1, + }, + ErrMaintenanceNotValid, + }, + { + "maintenance has errors", + &admin.MaintenanceStatus{ + Errors: true, + }, + ErrMaintenanceNotValid, + }, + { + "maintenance did not finished", + &admin.MaintenanceStatus{ + Finished: false, + }, + ErrMaintenanceNotFinished, + }, + { + "maintenance was not returned", + nil, + ErrMaintenanceMissing, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + ssres := StatefulSetResource{ + adminAPIClientFactory: func( + ctx context.Context, + k8sClient client.Reader, + redpandaCluster *redpandav1alpha1.Cluster, + fqdn string, + adminTLSProvider types.AdminTLSConfigProvider, + ordinals ...int32, + ) (adminutils.AdminAPIClient, error) { + return &adminutils.MockAdminAPI{ + Log: ctrl.Log.WithName("testAdminAPI").WithName("mockAdminAPI"), + MaintenanceStatus: tc.maintenanceStatus, + }, nil + }, + } + err := ssres.putInMaintenanceMode(context.Background(), 0) + if tc.errorRequired == nil { + require.NoError(t, err) + } else { + require.ErrorIs(t, err, tc.errorRequired) + } + }) + } +} diff --git a/src/go/k8s/pkg/utils/kubernetes.go b/src/go/k8s/pkg/utils/kubernetes.go index 27b78f597a6a9..66994d42747a7 100644 --- a/src/go/k8s/pkg/utils/kubernetes.go +++ b/src/go/k8s/pkg/utils/kubernetes.go @@ -9,7 +9,14 @@ package utils -import corev1 "k8s.io/api/core/v1" +import ( + "fmt" + "strconv" + + corev1 "k8s.io/api/core/v1" +) + +var ErrInvalidInputParameters = fmt.Errorf("invalid input parameters") // IsPodReady tells if a given pod is ready looking at its status. func IsPodReady(pod *corev1.Pod) bool { @@ -21,3 +28,18 @@ func IsPodReady(pod *corev1.Pod) bool { return false } + +func GetPodOrdinal(podName, clusterName string) (int64, error) { + // Pod name needs to have at least 2 more characters + if len(podName) < len(clusterName)+2 { + return -1, fmt.Errorf("pod name (%s) and cluster name (%s): %w", podName, clusterName, ErrInvalidInputParameters) + } + + // The +1 is for the separator between stateful set name and pod ordinal + ordinalStr := podName[len(clusterName)+1:] + ordinal, err := strconv.ParseInt(ordinalStr, 10, 0) + if err != nil { + return -1, fmt.Errorf("parsing int failed (%s): %w", ordinalStr, err) + } + return ordinal, nil +} diff --git a/src/go/k8s/pkg/utils/kubernetes_test.go b/src/go/k8s/pkg/utils/kubernetes_test.go new file mode 100644 index 0000000000000..21030deb17b75 --- /dev/null +++ b/src/go/k8s/pkg/utils/kubernetes_test.go @@ -0,0 +1,45 @@ +// Copyright 2022-2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package utils_test + +import ( + "fmt" + "testing" + + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils" + "github.com/stretchr/testify/require" +) + +func TestGetPodOrdinal(t *testing.T) { + tcs := []struct { + podName string + clusterName string + expectedError bool + expectedOrdinal int64 + }{ + {"", "", true, -1}, + {"test", "", true, -1}, + {"pod-0", "pod", false, 0}, + {"pod-99", "pod", false, 99}, + {"", "unexpected longer cluster name", true, -1}, + {"test+0", "test", false, 0}, + {"without-ordinal-", "without-ordinal", true, -1}, + } + + for _, tc := range tcs { + t.Run(fmt.Sprintf("pod %s and cluster %s", tc.podName, tc.clusterName), func(t *testing.T) { + ordinal, err := utils.GetPodOrdinal(tc.podName, tc.clusterName) + if tc.expectedError { + require.Error(t, err) + } + require.Equal(t, tc.expectedOrdinal, ordinal) + }) + } +}