Skip to content

Commit

Permalink
k8s: Put brokers in maintenance mode before deleting orphant pod
Browse files Browse the repository at this point in the history
During rolling update, before this change, Redpanda operator was calculating
the difference between running pod specification and stateful set pod template.
If the specification did not match the pod was deleted. From release v22.1.1
operator is configuring each broker with pod lifecycle hooks. In the PreStop
hook the script will try to put broker into maintenance mode for 120 seconds
before POD is terminated. Redpanda could not finish within 120 seconds to put
one broker into maintenance mode.

This PR improves the situation by putting maintenance mode before POD is
deleted. The `EnableMaintanaceMode` function is called multiple times until
`Broker` function returns correct status. The assumption is that REST admin API
maintenance mode endpoint is idempotent.

When pod is successfully deleted statefulset would reschedule the pod with
correct pod specification.

redpanda-data#4125
redpanda-data#3023
(cherry picked from commit 3c34855)
  • Loading branch information
Rafal Korepta authored and joejulian committed Apr 13, 2023
1 parent a06e967 commit bcfa443
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 42 deletions.
12 changes: 1 addition & 11 deletions src/go/k8s/controllers/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1
return -1, fmt.Errorf("creating pki: %w", err)
}

ordinal, err := strconv.ParseInt(getPodOrdinal(pod.Name, rp.Name), 10, 0)
ordinal, err := utils.GetPodOrdinal(pod.Name, rp.Name)
if err != nil {
return -1, fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", rp.Name, pod.Name, err)
}
Expand All @@ -567,16 +567,6 @@ func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1
return int32(cfg.NodeID), nil
}

func getPodOrdinal(podName string, clusterName string) string {
// Pod name needs to have at least 2 more characters
if len(podName) < len(clusterName)+2 {
return ""
}

// The +1 is for the separator between stateful set name and pod ordinal
return podName[len(clusterName)+1:]
}

func (r *ClusterReconciler) reportStatus(
ctx context.Context,
redpandaCluster *redpandav1alpha1.Cluster,
Expand Down
1 change: 1 addition & 0 deletions src/go/k8s/pkg/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type AdminAPIClient interface {
GetLicenseInfo(ctx context.Context) (admin.License, error)

Brokers(ctx context.Context) ([]admin.Broker, error)
Broker(ctx context.Context, nodeID int) (admin.Broker, error)
DecommissionBroker(ctx context.Context, node int) error
RecommissionBroker(ctx context.Context, node int) error

Expand Down
35 changes: 24 additions & 11 deletions src/go/k8s/pkg/admin/mock_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@ import (
)

type MockAdminAPI struct {
config admin.Config
schema admin.ConfigSchema
patches []configuration.CentralConfigurationPatch
unavailable bool
invalid []string
unknown []string
directValidation bool
brokers []admin.Broker
monitor sync.Mutex
Log logr.Logger
clusterHealth bool
config admin.Config
schema admin.ConfigSchema
patches []configuration.CentralConfigurationPatch
unavailable bool
invalid []string
unknown []string
directValidation bool
brokers []admin.Broker
monitor sync.Mutex
Log logr.Logger
clusterHealth bool
MaintenanceStatus *admin.MaintenanceStatus
}

var _ AdminAPIClient = &MockAdminAPI{Log: ctrl.Log.WithName("AdminAPIClient").WithName("mockAdminAPI")}
Expand Down Expand Up @@ -422,6 +423,18 @@ func (m *MockAdminAPI) SetBrokerStatus(
return fmt.Errorf("unknown broker %d", id)
}

func (m *MockAdminAPI) Broker(_ context.Context, nodeID int) (admin.Broker, error) {
t := true
return admin.Broker{
NodeID: nodeID,
NumCores: 2,
MembershipStatus: "",
IsAlive: &t,
Version: "unversioned",
Maintenance: m.MaintenanceStatus,
}, nil
}

func makeCopy(input, output interface{}) {
ser, err := json.Marshal(input)
if err != nil {
Expand Down
103 changes: 84 additions & 19 deletions src/go/k8s/pkg/resources/statefulset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,31 +157,13 @@ func (r *StatefulSetResource) rollingUpdate(
volumes[vol.Name] = new(interface{})
}

opts := []patch.CalculateOption{
patch.IgnoreStatusFields(),
ignoreKubernetesTokenVolumeMounts(),
ignoreDefaultToleration(),
ignoreExistingVolumes(volumes),
}

for i := range podList.Items {
pod := podList.Items[i]

patchResult, err := patch.DefaultPatchMaker.Calculate(&pod, &artificialPod, opts...)
if err != nil {
if err = r.podEviction(ctx, &pod, &artificialPod, volumes); err != nil {
return err
}

if !patchResult.IsEmpty() {
r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod",
"pod-name", pod.Name,
"patch", patchResult.Patch)
if err = r.Delete(ctx, &pod); err != nil {
return fmt.Errorf("unable to remove Redpanda pod: %w", err)
}
return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"}
}

if !utils.IsPodReady(&pod) {
return &RequeueAfterError{
RequeueAfter: RequeueDuration,
Expand All @@ -207,6 +189,89 @@ func (r *StatefulSetResource) rollingUpdate(
return nil
}

func (r *StatefulSetResource) podEviction(ctx context.Context, pod, artificialPod *corev1.Pod, newVolumes map[string]interface{}) error {
opts := []patch.CalculateOption{
patch.IgnoreStatusFields(),
ignoreKubernetesTokenVolumeMounts(),
ignoreDefaultToleration(),
ignoreExistingVolumes(newVolumes),
}

patchResult, err := patch.DefaultPatchMaker.Calculate(pod, artificialPod, opts...)
if err != nil {
return err
}

if patchResult.IsEmpty() {
return nil
}

var ordinal int64
ordinal, err = utils.GetPodOrdinal(pod.Name, r.pandaCluster.Name)
if err != nil {
return fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", r.pandaCluster.Name, pod.Name, err)
}

if *r.pandaCluster.Spec.Replicas > 1 {
if err = r.putInMaintenanceMode(ctx, int32(ordinal)); err != nil {
// As maintenance mode can not be easily watched using controller runtime the requeue error
// is always returned. That way a rolling update will not finish when operator waits for
// maintenance mode finished.
return &RequeueAfterError{
RequeueAfter: RequeueDuration,
Msg: fmt.Sprintf("putting node (%s) into maintenance mode: %v", pod.Name, err),
}
}
}

r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod",
"pod-name", pod.Name,
"patch", patchResult.Patch)

if err = r.Delete(ctx, pod); err != nil {
return fmt.Errorf("unable to remove Redpanda pod: %w", err)
}

return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"}
}

var (
ErrMaintenanceNotFinished = errors.New("maintenance mode is not finished")
ErrMaintenanceMissing = errors.New("maintenance definition not returned")
)

func (r *StatefulSetResource) putInMaintenanceMode(ctx context.Context, ordinal int32) error {
adminAPIClient, err := r.getAdminAPIClient(ctx, ordinal)
if err != nil {
return fmt.Errorf("creating admin API client: %w", err)
}

nodeConf, err := adminAPIClient.GetNodeConfig(ctx)
if err != nil {
return fmt.Errorf("getting node config: %w", err)
}

err = adminAPIClient.EnableMaintenanceMode(ctx, nodeConf.NodeID)
if err != nil {
return fmt.Errorf("enabling maintenance mode: %w", err)
}

br, err := adminAPIClient.Broker(ctx, nodeConf.NodeID)
if err != nil {
return fmt.Errorf("getting broker infromations: %w", err)
}

if br.Maintenance == nil {
return ErrMaintenanceMissing
}

if !br.Maintenance.Finished {
return fmt.Errorf("draining (%t), errors (%t), failed (%d), finished (%t): %w", br.Maintenance.Draining, br.Maintenance.Errors, br.Maintenance.Failed, br.Maintenance.Finished, ErrMaintenanceNotFinished)
}

return nil
}

func (r *StatefulSetResource) updateStatefulSet(
ctx context.Context,
current *appsv1.StatefulSet,
Expand Down
82 changes: 82 additions & 0 deletions src/go/k8s/pkg/resources/statefulset_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@
package resources //nolint:testpackage // needed to test private method

import (
"context"
"testing"

redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func TestShouldUpdate_AnnotationChange(t *testing.T) {
Expand Down Expand Up @@ -65,3 +72,78 @@ func TestShouldUpdate_AnnotationChange(t *testing.T) {
require.NoError(t, err)
require.False(t, update)
}

func TestPutInMaintenanceMode(t *testing.T) {
tcs := []struct {
name string
maintenanceStatus *admin.MaintenanceStatus
errorRequired error
}{
{
"maintenance finished",
&admin.MaintenanceStatus{
Finished: true,
},
nil,
},
{
"maintenance draining",
&admin.MaintenanceStatus{
Draining: true,
},
ErrMaintenanceNotFinished,
},
{
"maintenance failed",
&admin.MaintenanceStatus{
Failed: 1,
},
ErrMaintenanceNotFinished,
},
{
"maintenance has errors",
&admin.MaintenanceStatus{
Errors: true,
},
ErrMaintenanceNotFinished,
},
{
"maintenance did not finished",
&admin.MaintenanceStatus{
Finished: false,
},
ErrMaintenanceNotFinished,
},
{
"maintenance was not returned",
nil,
ErrMaintenanceMissing,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
ssres := StatefulSetResource{
adminAPIClientFactory: func(
ctx context.Context,
k8sClient client.Reader,
redpandaCluster *redpandav1alpha1.Cluster,
fqdn string,
adminTLSProvider types.AdminTLSConfigProvider,
ordinals ...int32,
) (adminutils.AdminAPIClient, error) {
return &adminutils.MockAdminAPI{
Log: ctrl.Log.WithName("testAdminAPI").WithName("mockAdminAPI"),
MaintenanceStatus: tc.maintenanceStatus,
}, nil
},
}
err := ssres.putInMaintenanceMode(context.Background(), 0)
if tc.errorRequired == nil {
require.NoError(t, err)
} else {
require.ErrorIs(t, err, tc.errorRequired)
}
})
}
}
24 changes: 23 additions & 1 deletion src/go/k8s/pkg/utils/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@

package utils

import corev1 "k8s.io/api/core/v1"
import (
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"
)

var ErrInvalidInputParameters = fmt.Errorf("invalid input parameters")

// IsPodReady tells if a given pod is ready looking at its status.
func IsPodReady(pod *corev1.Pod) bool {
Expand All @@ -21,3 +28,18 @@ func IsPodReady(pod *corev1.Pod) bool {

return false
}

func GetPodOrdinal(podName, clusterName string) (int64, error) {
// Pod name needs to have at least 2 more characters
if len(podName) < len(clusterName)+2 {
return -1, fmt.Errorf("pod name (%s) and cluster name (%s): %w", podName, clusterName, ErrInvalidInputParameters)
}

// The +1 is for the separator between stateful set name and pod ordinal
ordinalStr := podName[len(clusterName)+1:]
ordinal, err := strconv.ParseInt(ordinalStr, 10, 0)
if err != nil {
return -1, fmt.Errorf("parsing int failed (%s): %w", ordinalStr, err)
}
return ordinal, nil
}
45 changes: 45 additions & 0 deletions src/go/k8s/pkg/utils/kubernetes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2022-2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package utils_test

import (
"fmt"
"testing"

"github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils"
"github.com/stretchr/testify/require"
)

func TestGetPodOrdinal(t *testing.T) {
tcs := []struct {
podName string
clusterName string
expectedError bool
expectedOrdinal int64
}{
{"", "", true, -1},
{"test", "", true, -1},
{"pod-0", "pod", false, 0},
{"pod-99", "pod", false, 99},
{"", "unexpected longer cluster name", true, -1},
{"test+0", "test", false, 0},
{"without-ordinal-", "without-ordinal", true, -1},
}

for _, tc := range tcs {
t.Run(fmt.Sprintf("pod %s and cluster %s", tc.podName, tc.clusterName), func(t *testing.T) {
ordinal, err := utils.GetPodOrdinal(tc.podName, tc.clusterName)
if tc.expectedError {
require.Error(t, err)
}
require.Equal(t, tc.expectedOrdinal, ordinal)
})
}
}

0 comments on commit bcfa443

Please sign in to comment.