Skip to content

Commit

Permalink
Merge pull request #7530 from RafalKorepta/rk/gh-3023/put-in-maintana…
Browse files Browse the repository at this point in the history
…nce-mode

k8s: Put brokers in maintenance mode before deleting orphan's pod
  • Loading branch information
RafalKorepta committed Jan 5, 2023
2 parents 25f76b2 + 3c34855 commit c178778
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/go/k8s/controllers/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1
return -1, fmt.Errorf("creating pki: %w", err)
}

ordinal, err := strconv.ParseInt(pod.Name[len(rp.Name)+1:], 10, 0)
ordinal, err := utils.GetPodOrdinal(pod.Name, rp.Name)
if err != nil {
return -1, fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", rp.Name, pod.Name, err)
}
Expand Down
1 change: 1 addition & 0 deletions src/go/k8s/pkg/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type AdminAPIClient interface {
GetLicenseInfo(ctx context.Context) (admin.License, error)

Brokers(ctx context.Context) ([]admin.Broker, error)
Broker(ctx context.Context, nodeID int) (admin.Broker, error)
DecommissionBroker(ctx context.Context, node int) error
RecommissionBroker(ctx context.Context, node int) error

Expand Down
35 changes: 24 additions & 11 deletions src/go/k8s/pkg/admin/mock_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@ import (
)

type MockAdminAPI struct {
config admin.Config
schema admin.ConfigSchema
patches []configuration.CentralConfigurationPatch
unavailable bool
invalid []string
unknown []string
directValidation bool
brokers []admin.Broker
monitor sync.Mutex
Log logr.Logger
clusterHealth bool
config admin.Config
schema admin.ConfigSchema
patches []configuration.CentralConfigurationPatch
unavailable bool
invalid []string
unknown []string
directValidation bool
brokers []admin.Broker
monitor sync.Mutex
Log logr.Logger
clusterHealth bool
MaintenanceStatus *admin.MaintenanceStatus
}

var _ AdminAPIClient = &MockAdminAPI{Log: ctrl.Log.WithName("AdminAPIClient").WithName("mockAdminAPI")}
Expand Down Expand Up @@ -422,6 +423,18 @@ func (m *MockAdminAPI) SetBrokerStatus(
return fmt.Errorf("unknown broker %d", id)
}

func (m *MockAdminAPI) Broker(_ context.Context, nodeID int) (admin.Broker, error) {
t := true
return admin.Broker{
NodeID: nodeID,
NumCores: 2,
MembershipStatus: "",
IsAlive: &t,
Version: "unversioned",
Maintenance: m.MaintenanceStatus,
}, nil
}

func makeCopy(input, output interface{}) {
ser, err := json.Marshal(input)
if err != nil {
Expand Down
103 changes: 84 additions & 19 deletions src/go/k8s/pkg/resources/statefulset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,31 +157,13 @@ func (r *StatefulSetResource) rollingUpdate(
volumes[vol.Name] = new(interface{})
}

opts := []patch.CalculateOption{
patch.IgnoreStatusFields(),
ignoreKubernetesTokenVolumeMounts(),
ignoreDefaultToleration(),
ignoreExistingVolumes(volumes),
}

for i := range podList.Items {
pod := podList.Items[i]

patchResult, err := patch.DefaultPatchMaker.Calculate(&pod, &artificialPod, opts...)
if err != nil {
if err = r.podEviction(ctx, &pod, &artificialPod, volumes); err != nil {
return err
}

if !patchResult.IsEmpty() {
r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod",
"pod-name", pod.Name,
"patch", patchResult.Patch)
if err = r.Delete(ctx, &pod); err != nil {
return fmt.Errorf("unable to remove Redpanda pod: %w", err)
}
return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"}
}

if !utils.IsPodReady(&pod) {
return &RequeueAfterError{
RequeueAfter: RequeueDuration,
Expand All @@ -207,6 +189,89 @@ func (r *StatefulSetResource) rollingUpdate(
return nil
}

func (r *StatefulSetResource) podEviction(ctx context.Context, pod, artificialPod *corev1.Pod, newVolumes map[string]interface{}) error {
opts := []patch.CalculateOption{
patch.IgnoreStatusFields(),
ignoreKubernetesTokenVolumeMounts(),
ignoreDefaultToleration(),
ignoreExistingVolumes(newVolumes),
}

patchResult, err := patch.DefaultPatchMaker.Calculate(pod, artificialPod, opts...)
if err != nil {
return err
}

if patchResult.IsEmpty() {
return nil
}

var ordinal int64
ordinal, err = utils.GetPodOrdinal(pod.Name, r.pandaCluster.Name)
if err != nil {
return fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", r.pandaCluster.Name, pod.Name, err)
}

if *r.pandaCluster.Spec.Replicas > 1 {
if err = r.putInMaintenanceMode(ctx, int32(ordinal)); err != nil {
// As maintenance mode can not be easily watched using controller runtime the requeue error
// is always returned. That way a rolling update will not finish when operator waits for
// maintenance mode finished.
return &RequeueAfterError{
RequeueAfter: RequeueDuration,
Msg: fmt.Sprintf("putting node (%s) into maintenance mode: %v", pod.Name, err),
}
}
}

r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod",
"pod-name", pod.Name,
"patch", patchResult.Patch)

if err = r.Delete(ctx, pod); err != nil {
return fmt.Errorf("unable to remove Redpanda pod: %w", err)
}

return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"}
}

var (
ErrMaintenanceNotFinished = errors.New("maintenance mode is not finished")
ErrMaintenanceMissing = errors.New("maintenance definition not returned")
)

func (r *StatefulSetResource) putInMaintenanceMode(ctx context.Context, ordinal int32) error {
adminAPIClient, err := r.getAdminAPIClient(ctx, ordinal)
if err != nil {
return fmt.Errorf("creating admin API client: %w", err)
}

nodeConf, err := adminAPIClient.GetNodeConfig(ctx)
if err != nil {
return fmt.Errorf("getting node config: %w", err)
}

err = adminAPIClient.EnableMaintenanceMode(ctx, nodeConf.NodeID)
if err != nil {
return fmt.Errorf("enabling maintenance mode: %w", err)
}

br, err := adminAPIClient.Broker(ctx, nodeConf.NodeID)
if err != nil {
return fmt.Errorf("getting broker infromations: %w", err)
}

if br.Maintenance == nil {
return ErrMaintenanceMissing
}

if !br.Maintenance.Finished {
return fmt.Errorf("draining (%t), errors (%t), failed (%d), finished (%t): %w", br.Maintenance.Draining, br.Maintenance.Errors, br.Maintenance.Failed, br.Maintenance.Finished, ErrMaintenanceNotFinished)
}

return nil
}

func (r *StatefulSetResource) updateStatefulSet(
ctx context.Context,
current *appsv1.StatefulSet,
Expand Down
82 changes: 82 additions & 0 deletions src/go/k8s/pkg/resources/statefulset_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@
package resources //nolint:testpackage // needed to test private method

import (
"context"
"testing"

redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func TestShouldUpdate_AnnotationChange(t *testing.T) {
Expand Down Expand Up @@ -65,3 +72,78 @@ func TestShouldUpdate_AnnotationChange(t *testing.T) {
require.NoError(t, err)
require.False(t, update)
}

func TestPutInMaintenanceMode(t *testing.T) {
tcs := []struct {
name string
maintenanceStatus *admin.MaintenanceStatus
errorRequired error
}{
{
"maintenance finished",
&admin.MaintenanceStatus{
Finished: true,
},
nil,
},
{
"maintenance draining",
&admin.MaintenanceStatus{
Draining: true,
},
ErrMaintenanceNotFinished,
},
{
"maintenance failed",
&admin.MaintenanceStatus{
Failed: 1,
},
ErrMaintenanceNotFinished,
},
{
"maintenance has errors",
&admin.MaintenanceStatus{
Errors: true,
},
ErrMaintenanceNotFinished,
},
{
"maintenance did not finished",
&admin.MaintenanceStatus{
Finished: false,
},
ErrMaintenanceNotFinished,
},
{
"maintenance was not returned",
nil,
ErrMaintenanceMissing,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
ssres := StatefulSetResource{
adminAPIClientFactory: func(
ctx context.Context,
k8sClient client.Reader,
redpandaCluster *redpandav1alpha1.Cluster,
fqdn string,
adminTLSProvider types.AdminTLSConfigProvider,
ordinals ...int32,
) (adminutils.AdminAPIClient, error) {
return &adminutils.MockAdminAPI{
Log: ctrl.Log.WithName("testAdminAPI").WithName("mockAdminAPI"),
MaintenanceStatus: tc.maintenanceStatus,
}, nil
},
}
err := ssres.putInMaintenanceMode(context.Background(), 0)
if tc.errorRequired == nil {
require.NoError(t, err)
} else {
require.ErrorIs(t, err, tc.errorRequired)
}
})
}
}
24 changes: 23 additions & 1 deletion src/go/k8s/pkg/utils/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@

package utils

import corev1 "k8s.io/api/core/v1"
import (
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"
)

var ErrInvalidInputParameters = fmt.Errorf("invalid input parameters")

// IsPodReady tells if a given pod is ready looking at its status.
func IsPodReady(pod *corev1.Pod) bool {
Expand All @@ -21,3 +28,18 @@ func IsPodReady(pod *corev1.Pod) bool {

return false
}

func GetPodOrdinal(podName, clusterName string) (int64, error) {
// Pod name needs to have at least 2 more characters
if len(podName) < len(clusterName)+2 {
return -1, fmt.Errorf("pod name (%s) and cluster name (%s): %w", podName, clusterName, ErrInvalidInputParameters)
}

// The +1 is for the separator between stateful set name and pod ordinal
ordinalStr := podName[len(clusterName)+1:]
ordinal, err := strconv.ParseInt(ordinalStr, 10, 0)
if err != nil {
return -1, fmt.Errorf("parsing int failed (%s): %w", ordinalStr, err)
}
return ordinal, nil
}
45 changes: 45 additions & 0 deletions src/go/k8s/pkg/utils/kubernetes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2022-2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package utils_test

import (
"fmt"
"testing"

"github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils"
"github.com/stretchr/testify/require"
)

func TestGetPodOrdinal(t *testing.T) {
tcs := []struct {
podName string
clusterName string
expectedError bool
expectedOrdinal int64
}{
{"", "", true, -1},
{"test", "", true, -1},
{"pod-0", "pod", false, 0},
{"pod-99", "pod", false, 99},
{"", "unexpected longer cluster name", true, -1},
{"test+0", "test", false, 0},
{"without-ordinal-", "without-ordinal", true, -1},
}

for _, tc := range tcs {
t.Run(fmt.Sprintf("pod %s and cluster %s", tc.podName, tc.clusterName), func(t *testing.T) {
ordinal, err := utils.GetPodOrdinal(tc.podName, tc.clusterName)
if tc.expectedError {
require.Error(t, err)
}
require.Equal(t, tc.expectedOrdinal, ordinal)
})
}
}

0 comments on commit c178778

Please sign in to comment.