diff --git a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go index fa55ec50f913..2f1cacd5b821 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go @@ -158,6 +158,12 @@ type ClusterSpec struct { type RestartConfig struct { // DisableMaintenanceModeHooks deactivates the preStop and postStart hooks that force nodes to enter maintenance mode when stopping and exit maintenance mode when up again DisableMaintenanceModeHooks *bool `json:"disableMaintenanceModeHooks,omitempty"` + // DisableReadinessProbe deactivates the readiness probe that verifies the state of each node by querying the Redpanda admin API + DisableReadinessProbe *bool `json:"disableReadinessProbe,omitempty"` + // DisableClusterHealthCheck deactivates the wait for cluster health when restarting + DisableClusterHealthCheck *bool `json:"disableClusterHealthCheck,omitempty"` + // HealthCheckTimeoutSeconds configures the maximum time to wait for the cluster to become healthy before giving up + HealthCheckTimeoutSeconds *int32 `json:"healthCheckTimeoutSeconds,omitempty"` } // PDBConfig specifies how the PodDisruptionBudget should be created for the @@ -332,13 +338,15 @@ type ClusterCondition struct { } // ClusterConditionType is a valid value for ClusterCondition.Type -// +kubebuilder:validation:Enum=ClusterConfigured +// +kubebuilder:validation:Enum=ClusterConfigured;ClusterStable type ClusterConditionType string // These are valid conditions of the cluster. const ( // ClusterConfiguredConditionType indicates whether the Redpanda cluster configuration is in sync with the desired one ClusterConfiguredConditionType ClusterConditionType = "ClusterConfigured" + // ClusterStableConditionType is a stability indicator for the cluster that estimates if the cluster can reach quorum in its current configuration + ClusterStableConditionType ClusterConditionType = "ClusterStable" ) // GetCondition return the condition of the given type @@ -416,6 +424,16 @@ const ( ClusterConfiguredReasonError = "Error" ) +// These are valid reasons for ClusterStable +const ( + // ClusterStableNotEnoughInstances indicates that the cluster is running with less ready instances than the minimum for reaching a quorum + ClusterStableNotEnoughInstances = "NotEnoughInstances" + // ClusterStableRecovering indicates that the cluster has been in an unstable state and is getting to normal. + // A cluster will get to this state when the minimum number of instances for having a quorum will be ready, while + // transition to full stability will happen only once all nodes will reach the ready state. + ClusterStableRecovering = "Recovering" +) + // NodesList shows where client of Cluster custom resource can reach // various listeners of Redpanda cluster type NodesList struct { @@ -832,6 +850,24 @@ func (r *Cluster) IsUsingMaintenanceModeHooks() bool { return true } +// IsUsingReadinessProbe tells if the cluster is configured to use the readiness probe on the pods. +func (r *Cluster) IsUsingReadinessProbe() bool { + // enabled unless explicitly stated + if r.Spec.RestartConfig != nil && r.Spec.RestartConfig.DisableReadinessProbe != nil { + return !*r.Spec.RestartConfig.DisableReadinessProbe + } + return true +} + +// IsUsingClusterHealthCheck tells if the cluster is configured to use wait for cluster health when restarting. +func (r *Cluster) IsUsingClusterHealthCheck() bool { + // enabled unless explicitly stated + if r.Spec.RestartConfig != nil && r.Spec.RestartConfig.DisableClusterHealthCheck != nil { + return !*r.Spec.RestartConfig.DisableClusterHealthCheck + } + return true +} + // ClusterStatus // IsRestarting tells if the cluster is restarting due to a change in configuration or an upgrade in progress diff --git a/src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go b/src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go index 57a60f5c37b9..342415eeaaa2 100644 --- a/src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go +++ b/src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go @@ -302,6 +302,22 @@ func (in *KafkaAPITLS) DeepCopy() *KafkaAPITLS { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ListenerWithName) DeepCopyInto(out *ListenerWithName) { + *out = *in + in.KafkaAPI.DeepCopyInto(&out.KafkaAPI) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ListenerWithName. +func (in *ListenerWithName) DeepCopy() *ListenerWithName { + if in == nil { + return nil + } + out := new(ListenerWithName) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LoadBalancerConfig) DeepCopyInto(out *LoadBalancerConfig) { *out = *in @@ -520,6 +536,21 @@ func (in *RestartConfig) DeepCopyInto(out *RestartConfig) { *out = new(bool) **out = **in } + if in.DisableReadinessProbe != nil { + in, out := &in.DisableReadinessProbe, &out.DisableReadinessProbe + *out = new(bool) + **out = **in + } + if in.DisableClusterHealthCheck != nil { + in, out := &in.DisableClusterHealthCheck, &out.DisableClusterHealthCheck + *out = new(bool) + **out = **in + } + if in.HealthCheckTimeoutSeconds != nil { + in, out := &in.HealthCheckTimeoutSeconds, &out.HealthCheckTimeoutSeconds + *out = new(int32) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RestartConfig. diff --git a/src/go/k8s/config/crd/bases/redpanda.vectorized.io_clusters.yaml b/src/go/k8s/config/crd/bases/redpanda.vectorized.io_clusters.yaml index ef3383c82941..0d9453a7a67d 100644 --- a/src/go/k8s/config/crd/bases/redpanda.vectorized.io_clusters.yaml +++ b/src/go/k8s/config/crd/bases/redpanda.vectorized.io_clusters.yaml @@ -652,11 +652,26 @@ spec: description: RestartConfig allows to control the behavior of the cluster when restarting properties: + disableClusterHealthCheck: + description: DisableClusterHealthCheck deactivates the wait for + cluster health when restarting + type: boolean disableMaintenanceModeHooks: description: DisableMaintenanceModeHooks deactivates the preStop and postStart hooks that force nodes to enter maintenance mode when stopping and exit maintenance mode when up again type: boolean + disableReadinessProbe: + description: DisableReadinessProbe deactivates the readiness probe + that verifies the state of each node by querying the Redpanda + admin API + type: boolean + healthCheckTimeoutSeconds: + description: HealthCheckTimeoutSeconds configures the maximum + time to wait for the cluster to become healthy before giving + up + format: int32 + type: integer type: object sidecars: description: Sidecars is list of sidecars run alongside redpanda container @@ -800,6 +815,7 @@ spec: description: Type is the type of the condition enum: - ClusterConfigured + - ClusterStable type: string required: - status diff --git a/src/go/k8s/config/rbac/role.yaml b/src/go/k8s/config/rbac/role.yaml index e960c798bd17..df4f131abd19 100644 --- a/src/go/k8s/config/rbac/role.yaml +++ b/src/go/k8s/config/rbac/role.yaml @@ -59,6 +59,8 @@ rules: - delete - get - list + - patch + - update - watch - apiGroups: - "" diff --git a/src/go/k8s/controllers/redpanda/cluster_controller.go b/src/go/k8s/controllers/redpanda/cluster_controller.go index bfd0e44ecda5..31ab4dbdac50 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller.go @@ -24,11 +24,13 @@ import ( "github.com/redpanda-data/redpanda/src/go/k8s/pkg/networking" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/certmanager" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/api/admin" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" @@ -58,7 +60,7 @@ type ClusterReconciler struct { //+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch; //+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch; -//+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;delete +//+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;update;patch;delete //+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create; //+kubebuilder:rbac:groups=core,resources=serviceaccounts,verbs=get;list;watch;create;update;patch; //+kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterroles;clusterrolebindings,verbs=get;list;watch;create;update;patch; @@ -143,6 +145,8 @@ func (r *ClusterReconciler) Reconcile( sa := resources.NewServiceAccount(r.Client, &redpandaCluster, r.Scheme, log) configMapResource := resources.NewConfigMap(r.Client, &redpandaCluster, r.Scheme, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), proxySuKey, schemaRegistrySuKey, log) + hooks := resources.NewHooksConfigMap(r.Client, &redpandaCluster, r.Scheme, log) + sts := resources.NewStatefulSet( r.Client, &redpandaCluster, @@ -171,6 +175,7 @@ func (r *ClusterReconciler) Reconcile( resources.NewClusterRole(r.Client, &redpandaCluster, r.Scheme, log), crb, resources.NewPDB(r.Client, &redpandaCluster, r.Scheme, log), + hooks, sts, } @@ -312,7 +317,10 @@ func (r *ClusterReconciler) reportStatus( nodeList.Internal = observedNodesInternal nodeList.SchemaRegistry.Internal = fmt.Sprintf("%s:%d", clusterFQDN, schemaRegistryPort) - if statusShouldBeUpdated(&redpandaCluster.Status, nodeList, sts) { + stableCondition := computeStableCondition(redpandaCluster, observedPods.Items) + conditionChanged := redpandaCluster.Status.SetCondition(stableCondition.Type, stableCondition.Status, stableCondition.Reason, stableCondition.Message) + + if conditionChanged || statusShouldBeUpdated(&redpandaCluster.Status, nodeList, sts) { err := retry.RetryOnConflict(retry.DefaultRetry, func() error { var cluster redpandav1alpha1.Cluster err := r.Get(ctx, types.NamespacedName{ @@ -326,6 +334,7 @@ func (r *ClusterReconciler) reportStatus( cluster.Status.Nodes = *nodeList cluster.Status.Replicas = sts.LastObservedState.Status.ReadyReplicas cluster.Status.Version = sts.Version() + cluster.Status.SetCondition(stableCondition.Type, stableCondition.Status, stableCondition.Reason, stableCondition.Message) err = r.Status().Update(ctx, &cluster) if err == nil { @@ -341,6 +350,54 @@ func (r *ClusterReconciler) reportStatus( return nil } +func computeStableCondition( + cluster *redpandav1alpha1.Cluster, observedPods []corev1.Pod, +) redpandav1alpha1.ClusterCondition { + var requestedInstances int + if cluster.Spec.Replicas != nil { + requestedInstances = int(*cluster.Spec.Replicas) + } else { + requestedInstances = 1 + } + + quorum := (requestedInstances / 2) + 1 + + readyPods := 0 + for i := range observedPods { + if utils.IsPodReady(&observedPods[i]) { + readyPods++ + } + } + + isStable := readyPods >= quorum + isFullyAvailable := readyPods >= requestedInstances + wasStable := cluster.Status.GetConditionStatus(redpandav1alpha1.ClusterStableConditionType) == corev1.ConditionTrue + if (wasStable && isStable) || (!wasStable && isFullyAvailable) { + return redpandav1alpha1.ClusterCondition{ + Type: redpandav1alpha1.ClusterStableConditionType, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Time{}, + Reason: "", + Message: "", + } + } else if !wasStable && isStable { + return redpandav1alpha1.ClusterCondition{ + Type: redpandav1alpha1.ClusterStableConditionType, + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Time{}, + Reason: redpandav1alpha1.ClusterStableRecovering, + Message: fmt.Sprintf("Currently %d out of %d instances ready", readyPods, requestedInstances), + } + } + return redpandav1alpha1.ClusterCondition{ + Type: redpandav1alpha1.ClusterStableConditionType, + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Time{}, + Reason: redpandav1alpha1.ClusterStableNotEnoughInstances, + Message: fmt.Sprintf("Needed %d to reach quorum but only %d are ready", quorum, readyPods), + } +} + func statusShouldBeUpdated( status *redpandav1alpha1.ClusterStatus, nodeList *redpandav1alpha1.NodesList, diff --git a/src/go/k8s/controllers/redpanda/cluster_controller_test.go b/src/go/k8s/controllers/redpanda/cluster_controller_test.go index e4b9a1e6d6f7..584cd99c0986 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller_test.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller_test.go @@ -20,6 +20,7 @@ import ( types2 "github.com/onsi/gomega/types" "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" "github.com/redpanda-data/redpanda/src/go/k8s/controllers/redpanda" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" res "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -28,8 +29,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/util/retry" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) @@ -38,6 +41,9 @@ var _ = Describe("RedPandaCluster controller", func() { timeout = time.Second * 30 interval = time.Second * 1 + timeoutShort = time.Millisecond * 100 + intervalShort = time.Millisecond * 20 + adminPort = 9644 kafkaPort = 9092 pandaProxyPort = 8082 @@ -735,6 +741,64 @@ var _ = Describe("RedPandaCluster controller", func() { Entry("Never image pull policy", "Never", Succeed()), Entry("Empty image pull policy", "", Not(Succeed())), Entry("Random image pull policy", "asdvasd", Not(Succeed()))) + + Context("Populating stable condition", func() { + It("Marks the cluster as unstable until it gets enough instances", func() { + By("Allowing creation of a new cluster") + key, _, redpandaCluster := getInitialTestCluster("initial") + Expect(k8sClient.Create(context.Background(), redpandaCluster)).Should(Succeed()) + + By("Setting the unstable condition to false") + Eventually(clusterStableConditionStatusGetter(key), timeout, interval).Should(Equal(corev1.ConditionFalse)) + + By("Allowing creation of pods") + Expect(createReadyClusterPod(redpandaCluster, "initial-0")).To(Succeed()) + + By("Setting the unstable condition to true") + Eventually(clusterStableConditionStatusGetter(key), timeout, interval).Should(Equal(corev1.ConditionTrue)) + + By("Deleting the cluster") + Expect(k8sClient.Delete(context.Background(), redpandaCluster)).Should(Succeed()) + }) + + It("Exits the unstable conditions only after all instances are ready", func() { + By("Allowing creation of a new cluster") + key, _, redpandaCluster := getInitialTestCluster("multi-instance") + var replicas int32 = 3 + redpandaCluster.Spec.Replicas = &replicas + Expect(k8sClient.Create(context.Background(), redpandaCluster)).Should(Succeed()) + + By("Setting the unstable condition to false") + Eventually(clusterStableConditionStatusGetter(key), timeout, interval).Should(Equal(corev1.ConditionFalse)) + + By("Allowing creation of pods") + Expect(createReadyClusterPod(redpandaCluster, "multi-instance-0")).To(Succeed()) + Expect(createReadyClusterPod(redpandaCluster, "multi-instance-1")).To(Succeed()) + Expect(createReadyClusterPod(redpandaCluster, "multi-instance-2")).To(Succeed()) + + By("Setting the unstable condition to true") + Eventually(clusterStableConditionStatusGetter(key), timeout, interval).Should(Equal(corev1.ConditionTrue)) + + By("Accepting one pod not ready") + Expect(setClusterPodReadiness(redpandaCluster, "multi-instance-0", false)).To(Succeed()) + Consistently(clusterStableConditionStatusGetter(key), timeoutShort, intervalShort).Should(Equal(corev1.ConditionTrue)) + + By("Not accepting a second pod not ready") + Expect(setClusterPodReadiness(redpandaCluster, "multi-instance-1", false)).To(Succeed()) + Eventually(clusterStableConditionStatusGetter(key), timeout, interval).Should(Equal(corev1.ConditionFalse)) + + By("Not recovering with a single pod ready") + Expect(setClusterPodReadiness(redpandaCluster, "multi-instance-1", true)).To(Succeed()) + Consistently(clusterStableConditionStatusGetter(key), timeoutShort, intervalShort).Should(Equal(corev1.ConditionFalse)) + + By("Recovering only when all pods are ready") + Expect(setClusterPodReadiness(redpandaCluster, "multi-instance-0", true)).To(Succeed()) + Eventually(clusterStableConditionStatusGetter(key), timeout, interval).Should(Equal(corev1.ConditionTrue)) + + By("Deleting the cluster") + Expect(k8sClient.Delete(context.Background(), redpandaCluster)).Should(Succeed()) + }) + }) }) func findPort(ports []corev1.ServicePort, name string) int32 { @@ -757,3 +821,111 @@ func validOwner( return owner.Name == cluster.Name && owner.Controller != nil && *owner.Controller } + +func clusterStableConditionGetter( + key client.ObjectKey, +) func() *v1alpha1.ClusterCondition { + return func() *v1alpha1.ClusterCondition { + var cluster v1alpha1.Cluster + if err := k8sClient.Get(context.Background(), key, &cluster); err != nil { + return nil + } + return cluster.Status.GetCondition(v1alpha1.ClusterStableConditionType) + } +} + +func clusterStableConditionStatusGetter( + key client.ObjectKey, +) func() corev1.ConditionStatus { + return func() corev1.ConditionStatus { + cond := clusterStableConditionGetter(key)() + if cond == nil { + return corev1.ConditionUnknown + } + return cond.Status + } +} + +func createReadyClusterPod( + cluster *v1alpha1.Cluster, name string, +) error { + pod := corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: cluster.Namespace, + Labels: labels.ForCluster(cluster), + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "example", + Image: "example", + }, + }, + }, + } + err := k8sClient.Create(context.Background(), &pod) + if err != nil { + return err + } + pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{ + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }) + err = k8sClient.Status().Update(context.Background(), &pod) + if err != nil { + return err + } + return forceReconciliation(cluster) +} + +func setClusterPodReadiness( + cluster *v1alpha1.Cluster, name string, ready bool, +) error { + var pod corev1.Pod + err := k8sClient.Get(context.Background(), types.NamespacedName{Namespace: cluster.Namespace, Name: name}, &pod) + if err != nil { + return err + } + + newStatus := corev1.ConditionFalse + if ready { + newStatus = corev1.ConditionTrue + } + + existing := false + for i := range pod.Status.Conditions { + if pod.Status.Conditions[i].Type == corev1.PodReady { + existing = true + pod.Status.Conditions[i].Status = newStatus + } + } + + if !existing { + pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{ + Type: corev1.PodReady, + Status: newStatus, + }) + } + err = k8sClient.Status().Update(context.Background(), &pod) + if err != nil { + return err + } + return forceReconciliation(cluster) +} + +func forceReconciliation(cluster *v1alpha1.Cluster) error { + // Cluster is not directly receiving events from the pods but from the statefulset, so we force reconciliation + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + err := k8sClient.Get(context.Background(), types.NamespacedName{ + Name: cluster.Name, + Namespace: cluster.Namespace, + }, cluster) + if err != nil { + return err + } + // Change just to trigger reconciliation, but unrelated to the use case + cluster.Spec.CloudStorage.APIEndpointPort++ + return k8sClient.Update(context.Background(), cluster) + }) +} diff --git a/src/go/k8s/pkg/resources/configmap.go b/src/go/k8s/pkg/resources/configmap.go index 542241c87526..038b94d75da3 100644 --- a/src/go/k8s/pkg/resources/configmap.go +++ b/src/go/k8s/pkg/resources/configmap.go @@ -25,6 +25,7 @@ import ( "github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/configuration" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -614,7 +615,7 @@ func (r *ConfigMapResource) Key() types.NamespacedName { // ConfigMapKey provides config map name that derived from redpanda.vectorized.io CR func ConfigMapKey(pandaCluster *redpandav1alpha1.Cluster) types.NamespacedName { - return types.NamespacedName{Name: resourceNameTrim(pandaCluster.Name, baseSuffix), Namespace: pandaCluster.Namespace} + return types.NamespacedName{Name: utils.ResourceNameTrim(pandaCluster.Name, baseSuffix), Namespace: pandaCluster.Namespace} } func configMapKind() string { diff --git a/src/go/k8s/pkg/resources/featuregates/health_overview.go b/src/go/k8s/pkg/resources/featuregates/health_overview.go new file mode 100644 index 000000000000..d90e07e3a9ae --- /dev/null +++ b/src/go/k8s/pkg/resources/featuregates/health_overview.go @@ -0,0 +1,32 @@ +// Copyright 2022 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package featuregates + +import "github.com/Masterminds/semver/v3" + +const ( + healthOverviewMajor = uint64(22) + healthOverviewMinor = uint64(1) +) + +// HealthOverview feature gate should be removed when the operator +// will no longer support 21.x or older versions +func HealthOverview(version string) bool { + if version == devVersion { + // development version contains this feature + return true + } + v, err := semver.NewVersion(version) + if err != nil { + return false + } + + return v.Major() == healthOverviewMajor && v.Minor() >= healthOverviewMinor || v.Major() > healthOverviewMajor +} diff --git a/src/go/k8s/pkg/resources/hooks_configmap.go b/src/go/k8s/pkg/resources/hooks_configmap.go new file mode 100644 index 000000000000..c5a6e7f615c7 --- /dev/null +++ b/src/go/k8s/pkg/resources/hooks_configmap.go @@ -0,0 +1,425 @@ +// Copyright 2022 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package resources + +import ( + "context" + "crypto/sha256" + "fmt" + "sort" + + "github.com/go-logr/logr" + redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + k8sclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +const ( + hooksSuffix = "hooks" + + readinessProbeEntryName = "readiness.sh" + postStartHookEntryName = "post-start.sh" + preStopHookEntryName = "pre-stop.sh" + disableMaintenanceModeEntryName = "disable-maintenance-mode.sh" + enableMaintenanceModeEntryName = "enable-maintenance-mode.sh" + clusterHealthCheckEntryName = "cluster-health-check.sh" + + delayBetweenHooksCheckSeconds = "0.5" + // checking cluster up to a certain timeout, as the health of the cluster may depend also on other nodes + defaultClusterHealthCheckTimeoutSeconds = 300 +) + +var hooksHashAnnotationKey = redpandav1alpha1.GroupVersion.Group + "/hooks-hash" + +var _ Resource = &HooksConfigMapResource{} + +// HooksConfigMapResource manages the configmap containing lifecycle hook scripts +type HooksConfigMapResource struct { + k8sclient.Client + scheme *runtime.Scheme + pandaCluster *redpandav1alpha1.Cluster + + logger logr.Logger +} + +// HooksConfigMapKey returns the key associated with the HooksConfigMapResource +func HooksConfigMapKey( + pandaCluster *redpandav1alpha1.Cluster, +) types.NamespacedName { + return types.NamespacedName{Name: utils.ResourceNameTrim(pandaCluster.Name, hooksSuffix), Namespace: pandaCluster.Namespace} +} + +// Key returns the key associated with the HooksConfigMapResource +func (r *HooksConfigMapResource) Key() types.NamespacedName { + return HooksConfigMapKey(r.pandaCluster) +} + +// NewHooksConfigMap creates a HooksConfigMapResource +func NewHooksConfigMap( + client k8sclient.Client, + pandaCluster *redpandav1alpha1.Cluster, + scheme *runtime.Scheme, + logger logr.Logger, +) *HooksConfigMapResource { + return &HooksConfigMapResource{ + Client: client, + pandaCluster: pandaCluster, + scheme: scheme, + logger: logger.WithValues("Kind", configMapKind()), + } +} + +// Ensure will manage kubernetes v1.ConfigMap for redpanda.vectorized.io CR +func (r *HooksConfigMapResource) Ensure(ctx context.Context) error { + obj, err := r.obj(ctx) + if err != nil { + return fmt.Errorf("unable to construct object: %w", err) + } + created, err := CreateIfNotExists(ctx, r, obj, r.logger) + if err != nil || created { + return err + } + + var cm corev1.ConfigMap + err = r.Get(ctx, r.Key(), &cm) + if err != nil { + return fmt.Errorf("error while fetching ConfigMap resource: %w", err) + } + _, err = Update(ctx, &cm, obj, r.Client, r.logger) + if err != nil { + return err + } + // Also force the pods to reload the config when updating + return r.forceConfigMapReload(ctx, obj.(*corev1.ConfigMap)) +} + +// obj returns resource managed client.Object +func (r *HooksConfigMapResource) obj( + _ context.Context, +) (k8sclient.Object, error) { + hooksConfigMap := corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: r.Key().Namespace, + Name: r.Key().Name, + Labels: labels.ForCluster(r.pandaCluster), + }, + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + Data: map[string]string{ + postStartHookEntryName: r.getPostStartHook(), + disableMaintenanceModeEntryName: r.getPostStartCheck(), + preStopHookEntryName: r.getPreStopHook(), + enableMaintenanceModeEntryName: r.getPreStopCheck(), + readinessProbeEntryName: r.getReadinessProbe(), + clusterHealthCheckEntryName: r.getClusterHealthCheck(), + }, + } + + err := controllerutil.SetControllerReference(r.pandaCluster, &hooksConfigMap, r.scheme) + if err != nil { + return nil, err + } + + return &hooksConfigMap, nil +} + +func (r *HooksConfigMapResource) getPostStartHook() string { + return fmt.Sprintf(`#!/bin/bash + + location=$(dirname $0) + cd $location + + %s + %s +`, + r.getTimedLoopFragment(clusterHealthCheckEntryName, "check_health", delayBetweenHooksCheckSeconds, r.getHealthCheckTimeout()), + r.getLoopFragment(disableMaintenanceModeEntryName, delayBetweenHooksCheckSeconds), + ) +} + +func (r *HooksConfigMapResource) getPostStartCheck() string { + if !r.isMaintenanceModeEnabled("post-start") { + return r.getNoopCheckFunction("check") + } + + curlCommand := r.composeCURLMaintenanceCommand(`-X DELETE --max-time 10 --silent -o /dev/null -w "%{http_code}"`, nil) + return fmt.Sprintf(`#!/bin/bash + + check() { + status=$(%s) + if [ "${status:-}" != "200" ]; then + return 1 + fi + } +`, curlCommand) +} + +func (r *HooksConfigMapResource) getPreStopHook() string { + return fmt.Sprintf(`#!/bin/bash + + location=$(dirname $0) + cd $location + + %s +`, + r.getLoopFragment(enableMaintenanceModeEntryName, delayBetweenHooksCheckSeconds), + ) +} + +func (r *HooksConfigMapResource) getPreStopCheck() string { + if !r.isMaintenanceModeEnabled("pre-stop") { + return r.getNoopCheckFunction("check") + } + + curlCommand := r.composeCURLMaintenanceCommand(`-X PUT --max-time 10 --silent -o /dev/null -w "%{http_code}"`, nil) + genericMaintenancePath := "/v1/maintenance" + curlGetCommand := r.composeCURLMaintenanceCommand(`--max-time 10 --silent`, &genericMaintenancePath) + return fmt.Sprintf(`#!/bin/bash + + check() { + status=$(%s) + if [ "${status:-}" != "200" ]; then + return 1 + fi + + finished=$(%s | grep -o '\"finished\":[^,}]*' | grep -o '[^: ]*$') + if [ "${finished:-}" != "true" ]; then + return 1 + fi + } +`, curlCommand, curlGetCommand) +} + +func (r *HooksConfigMapResource) getClusterHealthCheck() string { + fallbackNoOp := r.getNoopCheckFunction("check_health") + if !r.pandaCluster.IsUsingClusterHealthCheck() { + r.logger.Info("Health overview endpoint explicitly disabled") + return fallbackNoOp + } + + // Current version needs to support the health overview endpoint + if !featuregates.HealthOverview(r.pandaCluster.Status.Version) { + r.logger.Info("Disabling health overview endpoint as not supported") + return fallbackNoOp + } + + // Enable health check only if the cluster can potentially reach quorum + if r.pandaCluster.Status.GetConditionStatus(redpandav1alpha1.ClusterStableConditionType) != corev1.ConditionTrue { + r.logger.Info("Disabling health overview endpoint while cluster is unstable") + return fallbackNoOp + } + + r.logger.Info("Health overview endpoint enabled") + healthOverviewPath := "/v1/cluster/health_overview" + curlCommand := r.composeCURLMaintenanceCommand(`--max-time 10 --silent`, &healthOverviewPath) + return fmt.Sprintf(`#!/bin/bash + + check_health() { + is_healthy=$(%s | grep -o '\"is_healthy\":[^,}]*' | grep -o '[^: ]*$') + if [ "${is_healthy:-}" != "true" ]; then + return 1 + fi + } +`, curlCommand) +} + +func (r *HooksConfigMapResource) getReadinessProbe() string { + if !r.pandaCluster.IsUsingReadinessProbe() { + r.logger.Info("Readiness probe explicitly disabled") + return r.getNoopScript() + } + + r.logger.Info("Readiness probe enabled") + readinessEndpoint := "/v1/status/ready" + curlStatusCommand := r.composeCURLMaintenanceCommand(`--max-time 10 --silent -o /dev/null -w "%{http_code}"`, &readinessEndpoint) + curlGetCommand := r.composeCURLMaintenanceCommand(`--max-time 10 --silent`, &readinessEndpoint) + return fmt.Sprintf(`#!/bin/bash + + status_code=$(%s) + if [ "${status_code:-}" != "200" ]; then + exit 1 + fi + status=$(%s | grep -o '\"status\":\s*\"[^\"]*\"' | grep -o '[^: ]*$' | tr -d '\"') + if [ "${status:-}" != "ready" ]; then + exit 1 + fi +`, curlStatusCommand, curlGetCommand) +} + +func (r *HooksConfigMapResource) isMaintenanceModeEnabled( + hookType string, +) bool { + // Only multi-replica clusters should use maintenance mode. See: https://github.com/redpanda-data/redpanda/issues/4338 + if r.pandaCluster.Spec.Replicas == nil || *r.pandaCluster.Spec.Replicas <= 1 { + r.logger.Info(fmt.Sprintf("Maintenance mode %s hook disabled because of insufficient nodes", hookType)) + return false + } + // When upgrading from pre-v22 to v22, an error is returned until the new version reaches quorum and the maintenance mode feature is enabled + if !featuregates.MaintenanceMode(r.pandaCluster.Status.Version) { + r.logger.Info(fmt.Sprintf("Maintenance mode %s hook disabled as not supported by current version", hookType)) + return false + } + if !r.pandaCluster.IsUsingMaintenanceModeHooks() { + r.logger.Info(fmt.Sprintf("Maintenance mode %s hook explicitly disabled", hookType)) + return false + } + r.logger.Info(fmt.Sprintf("Maintenance mode %s hook enabled", hookType)) + return true +} + +func (r *HooksConfigMapResource) composeCURLMaintenanceCommand( + options string, urlOverwrite *string, +) string { + adminAPI := r.pandaCluster.AdminAPIInternal() + + cmd := fmt.Sprintf(`curl %s `, options) + + tlsConfig := adminAPI.GetTLS() + proto := "http" + if tlsConfig != nil && tlsConfig.Enabled { + proto = "https" + if tlsConfig.RequireClientAuth { + cmd += "--cacert /etc/tls/certs/admin/ca/ca.crt --cert /etc/tls/certs/admin/tls.crt --key /etc/tls/certs/admin/tls.key " + } else { + cmd += "--cacert /etc/tls/certs/admin/tls.crt " + } + } + cmd += fmt.Sprintf("%s://${POD_NAME}.%s.%s.svc.cluster.local:%d", proto, r.pandaCluster.Name, r.pandaCluster.Namespace, adminAPI.Port) + + if urlOverwrite == nil { + prefixLen := len(r.pandaCluster.Name) + 1 + cmd += fmt.Sprintf("/v1/brokers/${POD_NAME:%d}/maintenance", prefixLen) + } else { + cmd += *urlOverwrite + } + return cmd +} + +func (r *HooksConfigMapResource) getNoopScript() string { + return `#!/bin/bash + + exit 0 +` +} + +func (r *HooksConfigMapResource) getNoopCheckFunction(name string) string { + return fmt.Sprintf(`#!/bin/bash + + %s() { + return 0 + } +`, name) +} + +func (r *HooksConfigMapResource) getLoopFragment(source, delay string) string { + return fmt.Sprintf(` + source ./%s + until check; do + sleep %s + source ./%s + done +`, source, delay, source) +} + +func (r *HooksConfigMapResource) getTimedLoopFragment( + source, funcName, delay string, timeout int, +) string { + return fmt.Sprintf(` + start_time=$(date +%%s) + source ./%s + until %s; do + sleep %s + time=$(date +%%s) + if [ $(( time - start_time )) -gt %d ]; then + break + fi + source ./%s + done +`, source, funcName, delay, timeout, source) +} + +func (r *HooksConfigMapResource) getHealthCheckTimeout() int { + if r.pandaCluster.Spec.RestartConfig != nil && r.pandaCluster.Spec.RestartConfig.HealthCheckTimeoutSeconds != nil { + timeout := int(*r.pandaCluster.Spec.RestartConfig.HealthCheckTimeoutSeconds) + if timeout < 0 { + timeout = 0 + } + return timeout + } + return defaultClusterHealthCheckTimeoutSeconds +} + +// forceConfigMapReload changes an annotation on the pod to force the Kubelet wake up and update the configmap mount +func (r *HooksConfigMapResource) forceConfigMapReload( + ctx context.Context, cm *corev1.ConfigMap, +) error { + digest, err := r.computeHash(cm) + if err != nil { + return fmt.Errorf("could not compute digest for ConfigMap %s: %w", cm.Name, err) + } + + var podList corev1.PodList + err = r.List(ctx, &podList, &k8sclient.ListOptions{ + Namespace: r.pandaCluster.Namespace, + LabelSelector: labels.ForCluster(r.pandaCluster).AsClientSelector(), + }) + if err != nil { + return fmt.Errorf("unable to list redpanda pods: %w", err) + } + + for i := range podList.Items { + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + pod := &podList.Items[i] + err = r.Get(ctx, k8sclient.ObjectKeyFromObject(pod), pod) + if err != nil { + return err + } + if pod.Annotations == nil { + pod.Annotations = make(map[string]string, 1) + } + pod.Annotations[hooksHashAnnotationKey] = digest + + return r.Update(ctx, pod) + }) + if err != nil { + return err + } + } + return nil +} + +func (r *HooksConfigMapResource) computeHash( + cm *corev1.ConfigMap, +) (string, error) { + elements := make([]string, 0, len(cm.Data)) + for k := range cm.Data { + elements = append(elements, k) + } + sort.Strings(elements) + digester := sha256.New() + for _, k := range elements { + _, err := digester.Write([]byte(cm.Data[k])) + if err != nil { + return "", err + } + } + digest := digester.Sum(nil) + return fmt.Sprintf("%x", digest), nil +} diff --git a/src/go/k8s/pkg/resources/resource.go b/src/go/k8s/pkg/resources/resource.go index 0ff7d6b48a68..f7adbde26580 100644 --- a/src/go/k8s/pkg/resources/resource.go +++ b/src/go/k8s/pkg/resources/resource.go @@ -24,7 +24,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/validation" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -47,8 +46,6 @@ const ( SchemaRegistryPortName = "schema-registry" scramPasswordLength = 16 - - separator = "-" ) // NamedServicePort allows to pass name ports, e.g., to service resources @@ -197,12 +194,3 @@ func prepareResourceForUpdate(current runtime.Object, modified client.Object) { } } } - -func resourceNameTrim(clusterName, suffix string) string { - suffixLength := len(suffix) - maxClusterNameLength := validation.DNS1123SubdomainMaxLength - suffixLength - len(separator) - if len(clusterName) > maxClusterNameLength { - clusterName = clusterName[:maxClusterNameLength] - } - return fmt.Sprintf("%s%s%s", clusterName, separator, suffix) -} diff --git a/src/go/k8s/pkg/resources/statefulset.go b/src/go/k8s/pkg/resources/statefulset.go index 759601dacc9d..2faf0fddd034 100644 --- a/src/go/k8s/pkg/resources/statefulset.go +++ b/src/go/k8s/pkg/resources/statefulset.go @@ -54,6 +54,8 @@ const ( datadirName = "datadir" archivalCacheIndexAnchorName = "shadow-index-cache" defaultDatadirCapacity = "100Gi" + + hooksMountDir = "/etc/hooks" ) var ( @@ -62,8 +64,8 @@ var ( // CentralizedConfigurationHashAnnotationKey contains the hash of the centralized configuration properties that require a restart when changed CentralizedConfigurationHashAnnotationKey = redpandav1alpha1.GroupVersion.Group + "/centralized-configuration-hash" - // terminationGracePeriodSeconds should account for additional delay introduced by hooks - terminationGracePeriodSeconds int64 = 120 + // terminationGracePeriodSeconds should account for additional delay introduced by hooks (health + maintenance mode) + terminationGracePeriodSeconds int64 = 420 ) // ConfiguratorSettings holds settings related to configurator container and deployment @@ -269,6 +271,7 @@ func (r *StatefulSetResource) obj( externalAddressType = externalListener.External.PreferredAddressType } tlsVolumes, tlsVolumeMounts := r.volumeProvider.Volumes() + var readExecMode int32 = 0o555 ss := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Namespace: r.Key().Namespace, @@ -316,6 +319,17 @@ func (r *StatefulSetResource) obj( EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, + { + Name: "hooks-dir", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: HooksConfigMapKey(r.pandaCluster).Name, + }, + DefaultMode: &readExecMode, + }, + }, + }, }, tlsVolumes...), TerminationGracePeriodSeconds: &terminationGracePeriodSeconds, InitContainers: []corev1.Container{ @@ -447,7 +461,16 @@ func (r *StatefulSetResource) obj( Name: "config-dir", MountPath: configDestinationDir, }, + { + Name: "hooks-dir", + MountPath: hooksMountDir, + }, }, tlsVolumeMounts...), + Lifecycle: &corev1.Lifecycle{ + PostStart: r.getPostStartHook(), + PreStop: r.getPreStopHook(), + }, + ReadinessProbe: r.getReadinessProbe(), }, }, Tolerations: tolerations, @@ -486,15 +509,6 @@ func (r *StatefulSetResource) obj( }, } - // Only multi-replica clusters should use maintenance mode. See: https://github.com/redpanda-data/redpanda/issues/4338 - multiReplica := r.pandaCluster.Spec.Replicas != nil && *r.pandaCluster.Spec.Replicas > 1 - if featuregates.MaintenanceMode(r.pandaCluster.Spec.Version) && r.pandaCluster.IsUsingMaintenanceModeHooks() && multiReplica { - ss.Spec.Template.Spec.Containers[0].Lifecycle = &corev1.Lifecycle{ - PreStop: r.getPreStopHook(), - PostStart: r.getPostStartHook(), - } - } - if featuregates.CentralizedConfiguration(r.pandaCluster.Spec.Version) { ss.Spec.Template.Spec.Containers[0].VolumeMounts = append(ss.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{ Name: "configmap-dir", @@ -518,72 +532,38 @@ func (r *StatefulSetResource) obj( return ss, nil } -// getPrestopHook creates a hook that drains the node before shutting down. +// getPrestopHook creates a handler that executes the pre-stop.sh script in the hooks configmap func (r *StatefulSetResource) getPreStopHook() *corev1.Handler { - // TODO replace scripts with proper RPK calls - curlCommand := r.composeCURLMaintenanceCommand(`-X PUT --silent -o /dev/null -w "%{http_code}"`, nil) - genericMaintenancePath := "/v1/maintenance" - curlGetCommand := r.composeCURLMaintenanceCommand(`--silent`, &genericMaintenancePath) - cmd := fmt.Sprintf(`until [ "${status:-}" = "200" ]; do status=$(%s); sleep 0.5; done`, curlCommand) + - " && " + - fmt.Sprintf(`until [ "${finished:-}" = "true" ]; do finished=$(%s | grep -o '\"finished\":[^,}]*' | grep -o '[^: ]*$'); sleep 0.5; done`, curlGetCommand) - return &corev1.Handler{ Exec: &corev1.ExecAction{ - Command: []string{ - "/bin/bash", - "-c", - cmd, - }, + Command: []string{path.Join(hooksMountDir, preStopHookEntryName)}, }, } } -// getPostStartHook creates a hook that removes maintenance mode after startup. +// getPostStartHook creates a handler that executes the post-start.sh script in the hooks configmap func (r *StatefulSetResource) getPostStartHook() *corev1.Handler { - // TODO replace scripts with proper RPK calls - curlCommand := r.composeCURLMaintenanceCommand(`-X DELETE --silent -o /dev/null -w "%{http_code}"`, nil) - // HTTP code 400 is returned by v22 nodes during an upgrade from v21 until the new version reaches quorum and the maintenance mode feature is enabled - cmd := fmt.Sprintf(`until [ "${status:-}" = "200" ] || [ "${status:-}" = "400" ]; do status=$(%s); sleep 0.5; done`, curlCommand) - return &corev1.Handler{ Exec: &corev1.ExecAction{ - Command: []string{ - "/bin/bash", - "-c", - cmd, - }, + Command: []string{path.Join(hooksMountDir, postStartHookEntryName)}, }, } } -// nolint:goconst // no need -func (r *StatefulSetResource) composeCURLMaintenanceCommand( - options string, urlOverwrite *string, -) string { - adminAPI := r.pandaCluster.AdminAPIInternal() - - cmd := fmt.Sprintf(`curl %s `, options) - - tlsConfig := adminAPI.GetTLS() - proto := "http" - if tlsConfig != nil && tlsConfig.Enabled { - proto = "https" - if tlsConfig.RequireClientAuth { - cmd += "--cacert /etc/tls/certs/admin/ca/ca.crt --cert /etc/tls/certs/admin/tls.crt --key /etc/tls/certs/admin/tls.key " - } else { - cmd += "--cacert /etc/tls/certs/admin/tls.crt " - } - } - cmd += fmt.Sprintf("%s://${POD_NAME}.%s.%s.svc.cluster.local:%d", proto, r.pandaCluster.Name, r.pandaCluster.Namespace, adminAPI.Port) - - if urlOverwrite == nil { - prefixLen := len(r.pandaCluster.Name) + 1 - cmd += fmt.Sprintf("/v1/brokers/${POD_NAME:%d}/maintenance", prefixLen) - } else { - cmd += *urlOverwrite +// getReadinessProbe creates a probe that executes the readiness.sh script in the hooks configmap +func (r *StatefulSetResource) getReadinessProbe() *corev1.Probe { + return &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{path.Join(hooksMountDir, readinessProbeEntryName)}, + }, + }, + InitialDelaySeconds: 1, + TimeoutSeconds: 5, + PeriodSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 1200, } - return cmd } // setCloudStorage manipulates v1.StatefulSet object in order to add cloud storage specific diff --git a/src/go/k8s/pkg/resources/statefulset_update.go b/src/go/k8s/pkg/resources/statefulset_update.go index 5f431ef12445..163f615818e0 100644 --- a/src/go/k8s/pkg/resources/statefulset_update.go +++ b/src/go/k8s/pkg/resources/statefulset_update.go @@ -12,10 +12,7 @@ package resources import ( "context" "encoding/json" - "errors" "fmt" - "net/http" - "net/url" "reflect" "sort" "strings" @@ -34,11 +31,8 @@ const ( // RequeueDuration is the time controller should // requeue resource reconciliation. RequeueDuration = time.Second * 10 - adminAPITimeout = time.Millisecond * 100 ) -var errRedpandaNotReady = errors.New("redpanda not ready") - // runUpdate handles image changes and additional storage in the redpanda cluster // CR by removing statefulset with orphans Pods. The stateful set is then recreated // and all Pods are restarted accordingly to the ordinal number. @@ -125,6 +119,7 @@ func (r *StatefulSetResource) rollingUpdate( ignoreKubernetesTokenVolumeMounts(), ignoreDefaultToleration(), ignoreExistingVolumes(volumes), + utils.IgnoreAnnotation(hooksHashAnnotationKey), } for i := range podList.Items { @@ -151,20 +146,6 @@ func (r *StatefulSetResource) rollingUpdate( Msg: fmt.Sprintf("wait for %s pod to become ready", pod.Name), } } - - headlessServiceWithPort := fmt.Sprintf("%s:%d", r.serviceFQDN, - r.pandaCluster.AdminAPIInternal().Port) - - adminURL := url.URL{ - Scheme: "http", - Host: fmt.Sprintf("%s.%s", pod.Name, headlessServiceWithPort), - Path: "v1/status/ready", - } - - r.logger.Info("Verify that Ready endpoint returns HTTP status OK") - if err = r.queryRedpandaStatus(ctx, &adminURL); err != nil { - return fmt.Errorf("unable to query Redpanda ready status: %w", err) - } } return nil @@ -371,45 +352,6 @@ func deleteKubernetesTokenVolumeMounts(obj []byte) ([]byte, error) { return obj, nil } -// Temporarily using the status/ready endpoint until we have a specific one for restarting. -func (r *StatefulSetResource) queryRedpandaStatus( - ctx context.Context, adminURL *url.URL, -) error { - client := &http.Client{Timeout: adminAPITimeout} - - // TODO right now we support TLS only on one listener so if external - // connectivity is enabled, TLS is enabled only on external listener. This - // will be fixed by https://github.com/redpanda-data/redpanda/issues/1084 - if r.pandaCluster.AdminAPITLS() != nil && - r.pandaCluster.AdminAPIExternal() == nil { - tlsConfig, err := r.adminTLSConfigProvider.GetTLSConfig(ctx, r) - if err != nil { - return err - } - - client.Transport = &http.Transport{ - TLSClientConfig: tlsConfig, - } - adminURL.Scheme = "https" - } - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, adminURL.String(), http.NoBody) - if err != nil { - return err - } - resp, err := client.Do(req) - if err != nil { - return err - } - resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return errRedpandaNotReady - } - - return nil -} - // RequeueAfterError error carrying the time after which to requeue. type RequeueAfterError struct { RequeueAfter time.Duration diff --git a/src/go/k8s/pkg/resources/superusers.go b/src/go/k8s/pkg/resources/superusers.go index 054a4b4de099..ac24d6dfc795 100644 --- a/src/go/k8s/pkg/resources/superusers.go +++ b/src/go/k8s/pkg/resources/superusers.go @@ -15,6 +15,7 @@ import ( "github.com/go-logr/logr" redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -120,5 +121,5 @@ func (r *SuperUsersResource) obj() (k8sclient.Object, error) { // Key returns namespace/name object that is used to identify object. func (r *SuperUsersResource) Key() types.NamespacedName { - return types.NamespacedName{Name: resourceNameTrim(r.pandaCluster.Name, r.suffix), Namespace: r.pandaCluster.Namespace} + return types.NamespacedName{Name: utils.ResourceNameTrim(r.pandaCluster.Name, r.suffix), Namespace: r.pandaCluster.Namespace} } diff --git a/src/go/k8s/pkg/utils/kubernetes.go b/src/go/k8s/pkg/utils/kubernetes.go index 27b78f597a6a..c37f20e54da2 100644 --- a/src/go/k8s/pkg/utils/kubernetes.go +++ b/src/go/k8s/pkg/utils/kubernetes.go @@ -9,7 +9,16 @@ package utils -import corev1 "k8s.io/api/core/v1" +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/validation" +) + +const ( + separator = "-" +) // IsPodReady tells if a given pod is ready looking at its status. func IsPodReady(pod *corev1.Pod) bool { @@ -21,3 +30,13 @@ func IsPodReady(pod *corev1.Pod) bool { return false } + +// ResourceNameTrim builds a resource name out of a base name and a suffix, respecting k8s length constraints +func ResourceNameTrim(baseName, suffix string) string { + suffixLength := len(suffix) + maxNameLength := validation.DNS1123SubdomainMaxLength - suffixLength - len(separator) + if len(baseName) > maxNameLength { + baseName = baseName[:maxNameLength] + } + return fmt.Sprintf("%s%s%s", baseName, separator, suffix) +}