Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

k8s: Put brokers in maintenance mode before deleting orphan's pod #7530

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/go/k8s/controllers/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1
return -1, fmt.Errorf("creating pki: %w", err)
}

ordinal, err := strconv.ParseInt(pod.Name[len(rp.Name)+1:], 10, 0)
ordinal, err := utils.GetPodOrdinal(pod.Name, rp.Name)
if err != nil {
return -1, fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", rp.Name, pod.Name, err)
}
Expand Down
1 change: 1 addition & 0 deletions src/go/k8s/pkg/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type AdminAPIClient interface {
GetLicenseInfo(ctx context.Context) (admin.License, error)

Brokers(ctx context.Context) ([]admin.Broker, error)
Broker(ctx context.Context, nodeID int) (admin.Broker, error)
DecommissionBroker(ctx context.Context, node int) error
RecommissionBroker(ctx context.Context, node int) error

Expand Down
35 changes: 24 additions & 11 deletions src/go/k8s/pkg/admin/mock_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@ import (
)

type MockAdminAPI struct {
config admin.Config
schema admin.ConfigSchema
patches []configuration.CentralConfigurationPatch
unavailable bool
invalid []string
unknown []string
directValidation bool
brokers []admin.Broker
monitor sync.Mutex
Log logr.Logger
clusterHealth bool
config admin.Config
schema admin.ConfigSchema
patches []configuration.CentralConfigurationPatch
unavailable bool
invalid []string
unknown []string
directValidation bool
brokers []admin.Broker
monitor sync.Mutex
Log logr.Logger
clusterHealth bool
MaintenanceStatus *admin.MaintenanceStatus
}

var _ AdminAPIClient = &MockAdminAPI{Log: ctrl.Log.WithName("AdminAPIClient").WithName("mockAdminAPI")}
Expand Down Expand Up @@ -422,6 +423,18 @@ func (m *MockAdminAPI) SetBrokerStatus(
return fmt.Errorf("unknown broker %d", id)
}

func (m *MockAdminAPI) Broker(_ context.Context, nodeID int) (admin.Broker, error) {
t := true
return admin.Broker{
NodeID: nodeID,
NumCores: 2,
MembershipStatus: "",
IsAlive: &t,
Version: "unversioned",
Maintenance: m.MaintenanceStatus,
}, nil
}

func makeCopy(input, output interface{}) {
ser, err := json.Marshal(input)
if err != nil {
Expand Down
103 changes: 84 additions & 19 deletions src/go/k8s/pkg/resources/statefulset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,31 +157,13 @@ func (r *StatefulSetResource) rollingUpdate(
volumes[vol.Name] = new(interface{})
}

opts := []patch.CalculateOption{
patch.IgnoreStatusFields(),
ignoreKubernetesTokenVolumeMounts(),
ignoreDefaultToleration(),
ignoreExistingVolumes(volumes),
}

for i := range podList.Items {
pod := podList.Items[i]

patchResult, err := patch.DefaultPatchMaker.Calculate(&pod, &artificialPod, opts...)
if err != nil {
if err = r.podEviction(ctx, &pod, &artificialPod, volumes); err != nil {
return err
}

if !patchResult.IsEmpty() {
r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod",
"pod-name", pod.Name,
"patch", patchResult.Patch)
if err = r.Delete(ctx, &pod); err != nil {
return fmt.Errorf("unable to remove Redpanda pod: %w", err)
}
return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"}
}

if !utils.IsPodReady(&pod) {
return &RequeueAfterError{
RequeueAfter: RequeueDuration,
Expand All @@ -207,6 +189,89 @@ func (r *StatefulSetResource) rollingUpdate(
return nil
}

func (r *StatefulSetResource) podEviction(ctx context.Context, pod, artificialPod *corev1.Pod, newVolumes map[string]interface{}) error {
opts := []patch.CalculateOption{
patch.IgnoreStatusFields(),
ignoreKubernetesTokenVolumeMounts(),
ignoreDefaultToleration(),
ignoreExistingVolumes(newVolumes),
}

patchResult, err := patch.DefaultPatchMaker.Calculate(pod, artificialPod, opts...)
if err != nil {
return err
}

if patchResult.IsEmpty() {
return nil
}

var ordinal int64
ordinal, err = utils.GetPodOrdinal(pod.Name, r.pandaCluster.Name)
if err != nil {
return fmt.Errorf("cluster %s: cannot convert pod name (%s) to ordinal: %w", r.pandaCluster.Name, pod.Name, err)
}

if *r.pandaCluster.Spec.Replicas > 1 {
alenkacz marked this conversation as resolved.
Show resolved Hide resolved
if err = r.putInMaintenanceMode(ctx, int32(ordinal)); err != nil {
// As maintenance mode can not be easily watched using controller runtime the requeue error
// is always returned. That way a rolling update will not finish when operator waits for
// maintenance mode finished.
return &RequeueAfterError{
RequeueAfter: RequeueDuration,
Msg: fmt.Sprintf("putting node (%s) into maintenance mode: %v", pod.Name, err),
}
}
}

r.logger.Info("Changes in Pod definition other than activeDeadlineSeconds, configurator and Redpanda container name. Deleting pod",
alenkacz marked this conversation as resolved.
Show resolved Hide resolved
"pod-name", pod.Name,
"patch", patchResult.Patch)

if err = r.Delete(ctx, pod); err != nil {
return fmt.Errorf("unable to remove Redpanda pod: %w", err)
}

return &RequeueAfterError{RequeueAfter: RequeueDuration, Msg: "wait for pod restart"}
}

var (
ErrMaintenanceNotFinished = errors.New("maintenance mode is not finished")
ErrMaintenanceMissing = errors.New("maintenance definition not returned")
)

func (r *StatefulSetResource) putInMaintenanceMode(ctx context.Context, ordinal int32) error {
adminAPIClient, err := r.getAdminAPIClient(ctx, ordinal)
if err != nil {
return fmt.Errorf("creating admin API client: %w", err)
}

nodeConf, err := adminAPIClient.GetNodeConfig(ctx)
if err != nil {
return fmt.Errorf("getting node config: %w", err)
}

err = adminAPIClient.EnableMaintenanceMode(ctx, nodeConf.NodeID)
if err != nil {
return fmt.Errorf("enabling maintenance mode: %w", err)
}

br, err := adminAPIClient.Broker(ctx, nodeConf.NodeID)
if err != nil {
return fmt.Errorf("getting broker infromations: %w", err)
}

if br.Maintenance == nil {
return ErrMaintenanceMissing
}

if !br.Maintenance.Finished {
return fmt.Errorf("draining (%t), errors (%t), failed (%d), finished (%t): %w", br.Maintenance.Draining, br.Maintenance.Errors, br.Maintenance.Failed, br.Maintenance.Finished, ErrMaintenanceNotFinished)
}

return nil
}

func (r *StatefulSetResource) updateStatefulSet(
ctx context.Context,
current *appsv1.StatefulSet,
Expand Down
82 changes: 82 additions & 0 deletions src/go/k8s/pkg/resources/statefulset_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@
package resources //nolint:testpackage // needed to test private method

import (
"context"
"testing"

redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func TestShouldUpdate_AnnotationChange(t *testing.T) {
Expand Down Expand Up @@ -65,3 +72,78 @@ func TestShouldUpdate_AnnotationChange(t *testing.T) {
require.NoError(t, err)
require.False(t, update)
}

func TestPutInMaintenanceMode(t *testing.T) {
tcs := []struct {
name string
maintenanceStatus *admin.MaintenanceStatus
errorRequired error
}{
{
"maintenance finished",
&admin.MaintenanceStatus{
Finished: true,
},
nil,
},
{
"maintenance draining",
&admin.MaintenanceStatus{
Draining: true,
},
ErrMaintenanceNotFinished,
},
{
"maintenance failed",
&admin.MaintenanceStatus{
Failed: 1,
},
ErrMaintenanceNotFinished,
},
{
"maintenance has errors",
&admin.MaintenanceStatus{
Errors: true,
},
ErrMaintenanceNotFinished,
},
{
"maintenance did not finished",
&admin.MaintenanceStatus{
Finished: false,
},
ErrMaintenanceNotFinished,
},
{
"maintenance was not returned",
nil,
ErrMaintenanceMissing,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
ssres := StatefulSetResource{
adminAPIClientFactory: func(
ctx context.Context,
k8sClient client.Reader,
redpandaCluster *redpandav1alpha1.Cluster,
fqdn string,
adminTLSProvider types.AdminTLSConfigProvider,
ordinals ...int32,
) (adminutils.AdminAPIClient, error) {
return &adminutils.MockAdminAPI{
Log: ctrl.Log.WithName("testAdminAPI").WithName("mockAdminAPI"),
MaintenanceStatus: tc.maintenanceStatus,
}, nil
},
}
err := ssres.putInMaintenanceMode(context.Background(), 0)
if tc.errorRequired == nil {
require.NoError(t, err)
} else {
require.ErrorIs(t, err, tc.errorRequired)
}
})
}
}
24 changes: 23 additions & 1 deletion src/go/k8s/pkg/utils/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@

package utils

import corev1 "k8s.io/api/core/v1"
import (
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"
)

var ErrInvalidInputParameters = fmt.Errorf("invalid input parameters")

// IsPodReady tells if a given pod is ready looking at its status.
func IsPodReady(pod *corev1.Pod) bool {
Expand All @@ -21,3 +28,18 @@ func IsPodReady(pod *corev1.Pod) bool {

return false
}

func GetPodOrdinal(podName, clusterName string) (int64, error) {
// Pod name needs to have at least 2 more characters
if len(podName) < len(clusterName)+2 {
return -1, fmt.Errorf("pod name (%s) and cluster name (%s): %w", podName, clusterName, ErrInvalidInputParameters)
}

// The +1 is for the separator between stateful set name and pod ordinal
ordinalStr := podName[len(clusterName)+1:]
ordinal, err := strconv.ParseInt(ordinalStr, 10, 0)
if err != nil {
return -1, fmt.Errorf("parsing int failed (%s): %w", ordinalStr, err)
}
return ordinal, nil
}
45 changes: 45 additions & 0 deletions src/go/k8s/pkg/utils/kubernetes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2022-2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package utils_test

import (
"fmt"
"testing"

"github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils"
"github.com/stretchr/testify/require"
)

func TestGetPodOrdinal(t *testing.T) {
tcs := []struct {
podName string
clusterName string
expectedError bool
expectedOrdinal int64
}{
{"", "", true, -1},
{"test", "", true, -1},
{"pod-0", "pod", false, 0},
{"pod-99", "pod", false, 99},
{"", "unexpected longer cluster name", true, -1},
{"test+0", "test", false, 0},
{"without-ordinal-", "without-ordinal", true, -1},
}

for _, tc := range tcs {
t.Run(fmt.Sprintf("pod %s and cluster %s", tc.podName, tc.clusterName), func(t *testing.T) {
ordinal, err := utils.GetPodOrdinal(tc.podName, tc.clusterName)
if tc.expectedError {
require.Error(t, err)
}
require.Equal(t, tc.expectedOrdinal, ordinal)
})
}
}