Skip to content

Commit

Permalink
k8s: Put brokers in maintenance mode before deleting orphant pod
Browse files Browse the repository at this point in the history
During rolling update, before this change, Redpanda operator was calculating
the difference between running pod specification and stateful set pod template.
If the specification did not match the pod was deleted. From release v22.1.1
operator is configuring each broker with pod lifecycle hooks. In the PreStop
hook the script will try to put broker into maintenance mode for 120 seconds
before POD is terminated. Redpanda could not finish within 120 seconds to put
one broker into maintenance mode.

This PR improves the situation by putting maintenance mode before POD is
deleted. The `EnableMaintanaceMode` function is called multiple times until
`Broker` function returns correct status. The assumption is that REST admin API
maintenance mode endpoint is idempotent.

When pod is successfully deleted statefulset would reschedule the pod with
correct pod specification.

redpanda-data#4125
redpanda-data#3023
  • Loading branch information
Rafal Korepta committed Jan 3, 2023
1 parent 15bde9d commit 13dd38f
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 31 deletions.
12 changes: 1 addition & 11 deletions src/go/k8s/controllers/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1
return -1, fmt.Errorf("creating pki: %w", err)
}

ordinal, err := strconv.ParseInt(getPodOrdinal(pod.Name, rp.Name), 10, 0)
ordinal, err := utils.GetPodOrdinal(pod.Name, rp.Name)
if err != nil {
return -1, fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", rp.Name, pod.Name, err)
}
Expand All @@ -567,16 +567,6 @@ func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1
return int32(cfg.NodeID), nil
}

func getPodOrdinal(podName string, clusterName string) string {
// Pod name needs to have at least 2 more characters
if len(podName) < len(clusterName)+2 {
return ""
}

// The +1 is for the separator between stateful set name and pod ordinal
return podName[len(clusterName)+1:]
}

func (r *ClusterReconciler) reportStatus(
ctx context.Context,
redpandaCluster *redpandav1alpha1.Cluster,
Expand Down
1 change: 1 addition & 0 deletions src/go/k8s/pkg/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type AdminAPIClient interface {
GetLicenseInfo(ctx context.Context) (admin.License, error)

Brokers(ctx context.Context) ([]admin.Broker, error)
Broker(ctx context.Context, nodeID int) (admin.Broker, error)
DecommissionBroker(ctx context.Context, node int) error
RecommissionBroker(ctx context.Context, node int) error

Expand Down
20 changes: 20 additions & 0 deletions src/go/k8s/pkg/admin/mock_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,26 @@ func (m *MockAdminAPI) SetBrokerStatus(
return fmt.Errorf("unknown broker %d", id)
}

func (m *MockAdminAPI) Broker(_ context.Context, nodeID int) (admin.Broker, error) {
t := true
return admin.Broker{
NodeID: nodeID,
NumCores: 2,
MembershipStatus: "",
IsAlive: &t,
Version: "unversioned",
Maintenance: &admin.MaintenanceStatus{
Draining: false,
Finished: true,
Errors: false,
Partitions: 0,
Eligible: 0,
Transferring: 0,
Failed: 0,
},
}, nil
}

func makeCopy(input, output interface{}) {
ser, err := json.Marshal(input)
if err != nil {
Expand Down
106 changes: 87 additions & 19 deletions src/go/k8s/pkg/resources/statefulset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,31 +157,13 @@ func (r *StatefulSetResource) rollingUpdate(
volumes[vol.Name] = new(interface{})
}

opts := []patch.CalculateOption{
patch.IgnoreStatusFields(),
ignoreKubernetesTokenVolumeMounts(),
ignoreDefaultToleration(),
ignoreExistingVolumes(volumes),
}

for i := range podList.Items {
pod := podList.Items[i]

patchResult, err := patch.DefaultPatchMaker.Calculate(&pod, &artificialPod, opts...)
if err != nil {
if err = r.podEviction(ctx, &pod, &artificialPod, volumes); err != nil {
return err
}

if !patchResult.IsEmpty() {
r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod",
"pod-name", pod.Name,
"patch", patchResult.Patch)
if err = r.Delete(ctx, &pod); err != nil {
return fmt.Errorf("unable to remove Redpanda pod: %w", err)
}
return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"}
}

if !utils.IsPodReady(&pod) {
return &RequeueAfterError{
RequeueAfter: RequeueDuration,
Expand All @@ -207,6 +189,92 @@ func (r *StatefulSetResource) rollingUpdate(
return nil
}

func (r *StatefulSetResource) podEviction(ctx context.Context, pod, artificialPod *corev1.Pod, newVolumes map[string]interface{}) error {
opts := []patch.CalculateOption{
patch.IgnoreStatusFields(),
ignoreKubernetesTokenVolumeMounts(),
ignoreDefaultToleration(),
ignoreExistingVolumes(newVolumes),
}

patchResult, err := patch.DefaultPatchMaker.Calculate(pod, artificialPod, opts...)
if err != nil {
return err
}

if patchResult.IsEmpty() {
return nil
}

var ordinal int64
ordinal, err = utils.GetPodOrdinal(pod.Name, r.pandaCluster.Name)
if err != nil {
return fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", r.pandaCluster.Name, pod.Name, err)
}

if *r.pandaCluster.Spec.Replicas > 1 {
if err = r.putInMaintenanceMode(ctx, int32(ordinal)); err != nil {
// As maintenance mode can not be easily watched using controller runtime the requeue error
// is always returned. That way a rolling update will not finish when operator waits for
// maintenance mode finished.
return &RequeueAfterError{
RequeueAfter: RequeueDuration,
Msg: fmt.Sprintf("putting node (%s) into maintenance mode: %v", pod.Name, err),
}
}
}

r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod",
"pod-name", pod.Name,
"patch", patchResult.Patch)

if err = r.Delete(ctx, pod); err != nil {
return fmt.Errorf("unable to remove Redpanda pod: %w", err)
}

return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"}
}

//nolint:goerr113 // out of scope for this PR
func (r *StatefulSetResource) putInMaintenanceMode(ctx context.Context, ordinal int32) error {
adminAPIClient, err := r.getAdminAPIClient(ctx, ordinal)
if err != nil {
return fmt.Errorf("creating admin API client: %w", err)
}

nodeConf, err := adminAPIClient.GetNodeConfig(ctx)
if err != nil {
return fmt.Errorf("getting node config: %w", err)
}

err = adminAPIClient.EnableMaintenanceMode(ctx, nodeConf.NodeID)
if err != nil {
return fmt.Errorf("enabling maintenance mode: %w", err)
}

br, err := adminAPIClient.Broker(ctx, nodeConf.NodeID)
if err != nil {
return fmt.Errorf("getting broker infromations: %w", err)
}

if br.Maintenance == nil {
return fmt.Errorf("maintenance definition not returned")
}

switch {
case br.Maintenance.Draining:
case br.Maintenance.Errors:
case br.Maintenance.Failed > 0:
return fmt.Errorf("maintenance mode is not valid to do rolling update: %v", br.Maintenance)
}

if !br.Maintenance.Finished {
return fmt.Errorf("maintenance mode is not finished")
}

return nil
}

func (r *StatefulSetResource) updateStatefulSet(
ctx context.Context,
current *appsv1.StatefulSet,
Expand Down
24 changes: 23 additions & 1 deletion src/go/k8s/pkg/utils/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@

package utils

import corev1 "k8s.io/api/core/v1"
import (
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"
)

var InvalidInputParametersErr = fmt.Errorf("invalid input parameters")

// IsPodReady tells if a given pod is ready looking at its status.
func IsPodReady(pod *corev1.Pod) bool {
Expand All @@ -21,3 +28,18 @@ func IsPodReady(pod *corev1.Pod) bool {

return false
}

func GetPodOrdinal(podName, clusterName string) (int64, error) {
// Pod name needs to have at least 2 more characters
if len(podName) < len(clusterName)+2 {
return -1, fmt.Errorf("pod name (%s) and cluster name (%s): %w", podName, clusterName, InvalidInputParametersErr)
}

// The +1 is for the separator between stateful set name and pod ordinal
ordinalStr := podName[len(clusterName)+1:]
ordinal, err := strconv.ParseInt(ordinalStr, 10, 0)
if err != nil {
return -1, fmt.Errorf("parsing int failed (%s): %w", ordinalStr, err)
}
return ordinal, nil
}
45 changes: 45 additions & 0 deletions src/go/k8s/pkg/utils/kubernetes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2022-2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package utils_test

import (
"fmt"
"testing"

"github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils"
"github.com/stretchr/testify/require"
)

func TestGetPodOrdinal(t *testing.T) {
tcs := []struct {
podName string
clusterName string
expectedError bool
expectedOrdinal int64
}{
{"", "", true, -1},
{"test", "", true, -1},
{"pod-0", "pod", false, 0},
{"pod-99", "pod", false, 99},
{"", "unexpected longer cluster name", true, -1},
{"test+0", "test", false, 0},
{"without-ordinal-", "without-ordinal", true, -1},
}

for _, tc := range tcs {
t.Run(fmt.Sprintf("pod %s and cluster %s", tc.podName, tc.clusterName), func(t *testing.T) {
ordinal, err := utils.GetPodOrdinal(tc.podName, tc.clusterName)
if tc.expectedError {
require.Error(t, err)
}
require.Equal(t, tc.expectedOrdinal, ordinal)
})
}
}

0 comments on commit 13dd38f

Please sign in to comment.