diff --git a/deploy/crds/keda.sh_scaledjobs_crd.yaml b/deploy/crds/keda.sh_scaledjobs_crd.yaml index 9a48fc77dad..5b501f1afda 100644 --- a/deploy/crds/keda.sh_scaledjobs_crd.yaml +++ b/deploy/crds/keda.sh_scaledjobs_crd.yaml @@ -913,10 +913,14 @@ spec: for volumes, optional for env vars' type: string divisor: + anyOf: + - type: integer + - type: string description: Specifies the output format of the exposed resources, defaults to "1" - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true resource: description: 'Required: resource to select' @@ -1396,6 +1400,10 @@ spec: - containerPort type: object type: array + x-kubernetes-list-map-keys: + - containerPort + - protocol + x-kubernetes-list-type: map readinessProbe: description: 'Periodic probe of container service readiness. Container will be removed from service @@ -1527,13 +1535,21 @@ spec: properties: limits: additionalProperties: - type: string + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true description: 'Limits describes the maximum amount of compute resources allowed. More info: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/' type: object requests: additionalProperties: - type: string + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true description: 'Requests describes the minimum amount of compute resources required. If Requests is omitted for a container, it defaults to Limits @@ -2123,10 +2139,14 @@ spec: for volumes, optional for env vars' type: string divisor: + anyOf: + - type: integer + - type: string description: Specifies the output format of the exposed resources, defaults to "1" - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true resource: description: 'Required: resource to select' @@ -2724,13 +2744,21 @@ spec: properties: limits: additionalProperties: - type: string + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true description: 'Limits describes the maximum amount of compute resources allowed. More info: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/' type: object requests: additionalProperties: - type: string + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true description: 'Requests describes the minimum amount of compute resources required. If Requests is omitted for a container, it defaults to Limits @@ -3318,10 +3346,14 @@ spec: for volumes, optional for env vars' type: string divisor: + anyOf: + - type: integer + - type: string description: Specifies the output format of the exposed resources, defaults to "1" - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true resource: description: 'Required: resource to select' @@ -3801,6 +3833,10 @@ spec: - containerPort type: object type: array + x-kubernetes-list-map-keys: + - containerPort + - protocol + x-kubernetes-list-type: map readinessProbe: description: 'Periodic probe of container service readiness. Container will be removed from service @@ -3932,13 +3968,21 @@ spec: properties: limits: additionalProperties: - type: string + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true description: 'Limits describes the maximum amount of compute resources allowed. More info: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/' type: object requests: additionalProperties: - type: string + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true description: 'Requests describes the minimum amount of compute resources required. If Requests is omitted for a container, it defaults to Limits @@ -4364,7 +4408,11 @@ spec: type: object overhead: additionalProperties: - type: string + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true description: 'Overhead represents the resource overhead associated with running a pod for a given RuntimeClass. This field will be autopopulated at admission time by @@ -4761,6 +4809,10 @@ spec: - whenUnsatisfiable type: object type: array + x-kubernetes-list-map-keys: + - topologyKey + - whenUnsatisfiable + x-kubernetes-list-type: map volumes: description: 'List of volumes that can be mounted by containers belonging to the pod. More info: https://kubernetes.io/docs/concepts/storage/volumes' @@ -5123,10 +5175,14 @@ spec: for volumes, optional for env vars' type: string divisor: + anyOf: + - type: integer + - type: string description: Specifies the output format of the exposed resources, defaults to "1" - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true resource: description: 'Required: resource to select' @@ -5151,6 +5207,9 @@ spec: info: https://kubernetes.io/docs/concepts/storage/volumes#emptydir' type: string sizeLimit: + anyOf: + - type: integer + - type: string description: 'Total amount of local storage required for this EmptyDir volume. The size limit is also applicable for memory medium. The maximum @@ -5159,7 +5218,8 @@ spec: here and the sum of memory limits of all containers in a pod. The default is nil which means that the limit is undefined. More info: http://kubernetes.io/docs/user-guide/volumes#emptydir' - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true type: object fc: description: FC represents a Fibre Channel resource @@ -5671,10 +5731,14 @@ spec: for env vars' type: string divisor: + anyOf: + - type: integer + - type: string description: Specifies the output format of the exposed resources, defaults to "1" - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true resource: description: 'Required: resource to select' @@ -6145,6 +6209,27 @@ spec: status: description: ScaledJobStatus defines the observed state of ScaledJob properties: + conditions: + items: + properties: + message: + description: A human readable message indicating details about + the transition. + type: string + reason: + description: The reason for the condition's last transition. + type: string + status: + description: Status of the condition, one of True, False, Unknown. + type: string + type: + description: Type of condition + type: string + required: + - status + - type + type: object + type: array lastActiveTime: format: date-time type: string diff --git a/deploy/crds/keda.sh_scaledobjects_crd.yaml b/deploy/crds/keda.sh_scaledobjects_crd.yaml index 4b8d0e69462..ef014e1e368 100644 --- a/deploy/crds/keda.sh_scaledobjects_crd.yaml +++ b/deploy/crds/keda.sh_scaledobjects_crd.yaml @@ -13,6 +13,12 @@ spec: - JSONPath: .spec.triggers[*].type name: Triggers type: string + - JSONPath: .status.conditions[?(@.type=="Ready")].status + name: Ready + type: string + - JSONPath: .status.conditions[?(@.type=="Active")].status + name: Active + type: string - JSONPath: .metadata.creationTimestamp name: Age type: date @@ -103,6 +109,27 @@ spec: status: description: ScaledObjectStatus is the status for a ScaledObject resource properties: + conditions: + items: + properties: + message: + description: A human readable message indicating details about + the transition. + type: string + reason: + description: The reason for the condition's last transition. + type: string + status: + description: Status of the condition, one of True, False, Unknown. + type: string + type: + description: Type of condition + type: string + required: + - status + - type + type: object + type: array externalMetricNames: items: type: string @@ -130,8 +157,6 @@ spec: type: object scaleTargetKind: type: string - required: - - scaleTargetGVKR type: object required: - spec diff --git a/pkg/apis/keda/v1alpha1/condition_types.go b/pkg/apis/keda/v1alpha1/condition_types.go new file mode 100644 index 00000000000..19cecc91bb5 --- /dev/null +++ b/pkg/apis/keda/v1alpha1/condition_types.go @@ -0,0 +1,133 @@ +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type ConditionType string + +const ( + // ConditionReady specifies that the resource is ready. + // For long-running resources. + ConditionReady ConditionType = "Ready" + // ConditionActive specifies that the resource has finished. + // For resource which run to completion. + ConditionActive ConditionType = "Active" +) + +type Condition struct { + // Type of condition + // +required + Type ConditionType `json:"type" description:"type of status condition"` + + // Status of the condition, one of True, False, Unknown. + // +required + Status metav1.ConditionStatus `json:"status" description:"status of the condition, one of True, False, Unknown"` + + // The reason for the condition's last transition. + // +optional + Reason string `json:"reason,omitempty" description:"one-word CamelCase reason for the condition's last transition"` + + // A human readable message indicating details about the transition. + // +optional + Message string `json:"message,omitempty" description:"human-readable message indicating details about last transition"` +} + +type Conditions []Condition + +// AreInitialized performs check all Conditions are initialized +// return true if Conditions are initialized +// return false if Conditions are not initialized +func (c *Conditions) AreInitialized() bool { + foundReady := false + foundActive := false + if *c != nil { + for _, condition := range *c { + if condition.Type == ConditionReady { + foundReady = true + break + } + } + for _, condition := range *c { + if condition.Type == ConditionActive { + foundActive = true + break + } + } + } + + return foundReady && foundActive +} + +// GetInitializedConditions returns Conditions initialized to the default -> Status: Unknown +func GetInitializedConditions() *Conditions { + return &Conditions{{Type: ConditionReady, Status: metav1.ConditionUnknown}, {Type: ConditionActive, Status: metav1.ConditionUnknown}} +} + +// IsTrue is true if the condition is True +func (c *Condition) IsTrue() bool { + if c == nil { + return false + } + return c.Status == metav1.ConditionTrue +} + +// IsFalse is true if the condition is False +func (c *Condition) IsFalse() bool { + if c == nil { + return false + } + return c.Status == metav1.ConditionFalse +} + +// IsUnknown is true if the condition is Unknown +func (c *Condition) IsUnknown() bool { + if c == nil { + return true + } + return c.Status == metav1.ConditionUnknown +} + +// SetReadyCondition modifies Ready Condition according to input parameters +func (c *Conditions) SetReadyCondition(status metav1.ConditionStatus, reason string, message string) { + if *c == nil { + c = GetInitializedConditions() + } + c.setCondition(ConditionReady, status, reason, message) +} + +// SetActiveCondition modifies Active Condition according to input parameters +func (c *Conditions) SetActiveCondition(status metav1.ConditionStatus, reason string, message string) { + if *c == nil { + c = GetInitializedConditions() + } + c.setCondition(ConditionActive, status, reason, message) +} + +// GetActiveCondition returns Condition of type Active +func (c *Conditions) GetActiveCondition() Condition { + if *c == nil { + c = GetInitializedConditions() + } + return c.getCondition(ConditionActive) +} + +func (c Conditions) getCondition(conditionType ConditionType) Condition { + for i := range c { + if c[i].Type == conditionType { + return c[i] + } + } + return Condition{} +} + +func (c Conditions) setCondition(conditionType ConditionType, status metav1.ConditionStatus, reason string, message string) { + for i := range c { + if c[i].Type == conditionType { + c[i].Status = status + c[i].Reason = reason + c[i].Message = message + break + } + } +} diff --git a/pkg/apis/keda/v1alpha1/scaledjob_types.go b/pkg/apis/keda/v1alpha1/scaledjob_types.go index 59c5f26a763..d89019a4d6b 100644 --- a/pkg/apis/keda/v1alpha1/scaledjob_types.go +++ b/pkg/apis/keda/v1alpha1/scaledjob_types.go @@ -7,6 +7,12 @@ import ( // ScaledJobSpec defines the desired state of ScaledJob // +k8s:openapi-gen=true +// +kubebuilder:subresource:status +// +kubebuilder:resource:path=scaledobjects,scope=Namespaced +// +kubebuilder:printcolumn:name="Triggers",type="string",JSONPath=".spec.triggers[*].type" +// +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type==\"Ready\")].status" +// +kubebuilder:printcolumn:name="Active",type="string",JSONPath=".status.conditions[?(@.type==\"Active\")].status" +// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" type ScaledJobSpec struct { // TODO define the spec @@ -30,6 +36,8 @@ type ScaledJobSpec struct { type ScaledJobStatus struct { // +optional LastActiveTime *metav1.Time `json:"lastActiveTime,omitempty"` + // +optional + Conditions Conditions `json:"conditions,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/keda/v1alpha1/scaledobject_types.go b/pkg/apis/keda/v1alpha1/scaledobject_types.go index 39526d85e54..eb570bde937 100644 --- a/pkg/apis/keda/v1alpha1/scaledobject_types.go +++ b/pkg/apis/keda/v1alpha1/scaledobject_types.go @@ -15,6 +15,8 @@ import ( // +kubebuilder:printcolumn:name="ScaleTargetKind",type="string",JSONPath=".status.scaleTargetKind" // +kubebuilder:printcolumn:name="ScaleTargetName",type="string",JSONPath=".spec.scaleTargetRef.name" // +kubebuilder:printcolumn:name="Triggers",type="string",JSONPath=".spec.triggers[*].type" +// +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type==\"Ready\")].status" +// +kubebuilder:printcolumn:name="Active",type="string",JSONPath=".status.conditions[?(@.type==\"Active\")].status" // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" type ScaledObject struct { metav1.TypeMeta `json:",inline"` @@ -70,13 +72,14 @@ type ScaleTriggers struct { type ScaledObjectStatus struct { // +optional ScaleTargetKind string `json:"scaleTargetKind,omitempty"` - - ScaleTargetGVKR *kedautil.GroupVersionKindResource `json:"scaleTargetGVKR"` - + // +optional + ScaleTargetGVKR *kedautil.GroupVersionKindResource `json:"scaleTargetGVKR,omitempty"` // +optional LastActiveTime *metav1.Time `json:"lastActiveTime,omitempty"` // +optional ExternalMetricNames []string `json:"externalMetricNames,omitempty"` + // +optional + Conditions Conditions `json:"conditions,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/keda/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/keda/v1alpha1/zz_generated.deepcopy.go index a5cfa914bfd..c1f0cae1aef 100644 --- a/pkg/apis/keda/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/keda/v1alpha1/zz_generated.deepcopy.go @@ -58,6 +58,42 @@ func (in *AuthSecretTargetRef) DeepCopy() *AuthSecretTargetRef { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Condition) DeepCopyInto(out *Condition) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Condition. +func (in *Condition) DeepCopy() *Condition { + if in == nil { + return nil + } + out := new(Condition) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in Conditions) DeepCopyInto(out *Conditions) { + { + in := &in + *out = make(Conditions, len(*in)) + copy(*out, *in) + return + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Conditions. +func (in Conditions) DeepCopy() Conditions { + if in == nil { + return nil + } + out := new(Conditions) + in.DeepCopyInto(out) + return *out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ScaleTarget) DeepCopyInto(out *ScaleTarget) { *out = *in @@ -218,6 +254,11 @@ func (in *ScaledJobStatus) DeepCopyInto(out *ScaledJobStatus) { in, out := &in.LastActiveTime, &out.LastActiveTime *out = (*in).DeepCopy() } + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make(Conditions, len(*in)) + copy(*out, *in) + } return } @@ -373,6 +414,11 @@ func (in *ScaledObjectStatus) DeepCopyInto(out *ScaledObjectStatus) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make(Conditions, len(*in)) + copy(*out, *in) + } return } diff --git a/pkg/controller/scaledobject/hpa.go b/pkg/controller/scaledobject/hpa.go index 8ee6f3c8e7a..28f6e49277b 100644 --- a/pkg/controller/scaledobject/hpa.go +++ b/pkg/controller/scaledobject/hpa.go @@ -5,6 +5,7 @@ import ( "fmt" kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1" + kedacontrollerutil "github.com/kedacore/keda/pkg/controller/util" kedautil "github.com/kedacore/keda/pkg/util" "github.com/go-logr/logr" @@ -130,8 +131,9 @@ func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, s } // store External.MetricNames used by scalers defined in the ScaledObject - scaledObject.Status.ExternalMetricNames = externalMetricNames - err = r.client.Status().Update(context.TODO(), scaledObject) + status := scaledObject.Status.DeepCopy() + status.ExternalMetricNames = externalMetricNames + err = kedacontrollerutil.UpdateScaledObjectStatus(r.client, logger, scaledObject, status) if err != nil { logger.Error(err, "Error updating scaledObject status with used externalMetricNames") return nil, err diff --git a/pkg/controller/scaledobject/scaledobject_controller.go b/pkg/controller/scaledobject/scaledobject_controller.go index 34fc3b92001..7a5b20dce45 100644 --- a/pkg/controller/scaledobject/scaledobject_controller.go +++ b/pkg/controller/scaledobject/scaledobject_controller.go @@ -6,6 +6,7 @@ import ( "sync" kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1" + kedacontrollerutil "github.com/kedacore/keda/pkg/controller/util" "github.com/kedacore/keda/pkg/scaling" kedautil "github.com/kedacore/keda/pkg/util" @@ -13,6 +14,7 @@ import ( autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -22,7 +24,6 @@ import ( "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -67,13 +68,10 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { // Watch for changes to primary resource ScaledObject err = c.Watch(&source.Kind{Type: &kedav1alpha1.ScaledObject{}}, &handler.EnqueueRequestForObject{}, - predicate.Funcs{ - UpdateFunc: func(e event.UpdateEvent) bool { - // Ignore updates to ScaledObject Status (in this case metadata.Generation does not change) - // so reconcile loop is not started on Status updates - return e.MetaOld.GetGeneration() != e.MetaNew.GetGeneration() - }, - }) + // Ignore updates to ScaledObject Status (in this case metadata.Generation does not change) + // so reconcile loop is not started on Status updates + predicate.GenerationChangedPredicate{}, + ) if err != nil { return err } @@ -125,7 +123,6 @@ type ReconcileScaledObject struct { // Result.Requeue is true, otherwise upon completion it will remove the work from the queue. func (r *ReconcileScaledObject) Reconcile(request reconcile.Request) (reconcile.Result, error) { reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name) - reqLogger.Info("Reconciling ScaledObject") // Fetch the ScaledObject instance scaledObject := &kedav1alpha1.ScaledObject{} @@ -142,6 +139,8 @@ func (r *ReconcileScaledObject) Reconcile(request reconcile.Request) (reconcile. return reconcile.Result{}, err } + reqLogger.Info("Reconciling ScaledObject") + // Check if the ScaledObject instance is marked to be deleted, which is // indicated by the deletion timestamp being set. if scaledObject.GetDeletionTimestamp() != nil { @@ -153,38 +152,52 @@ func (r *ReconcileScaledObject) Reconcile(request reconcile.Request) (reconcile. return reconcile.Result{}, err } - return reconcile.Result{}, r.reconcileScaledObject(reqLogger, scaledObject) + // ensure Status Conditions are initialized + if !scaledObject.Status.Conditions.AreInitialized() { + conditions := kedav1alpha1.GetInitializedConditions() + kedacontrollerutil.SetStatusConditions(r.client, reqLogger, scaledObject, conditions) + } + + // reconcile ScaledObject and set status appropriately + msg, err := r.reconcileScaledObject(reqLogger, scaledObject) + conditions := scaledObject.Status.Conditions.DeepCopy() + if err != nil { + reqLogger.Error(err, msg) + conditions.SetReadyCondition(v1.ConditionFalse, "ScaledObjectCheckFailed", msg) + conditions.SetActiveCondition(v1.ConditionUnknown, "UnkownState", "ScaledObject check failed") + } else { + reqLogger.V(1).Info(msg) + conditions.SetReadyCondition(v1.ConditionTrue, "ScaledObjectReady", msg) + } + kedacontrollerutil.SetStatusConditions(r.client, reqLogger, scaledObject, &conditions) + return reconcile.Result{}, err } // reconcileScaledObject implements reconciler logic for ScaleObject -func (r *ReconcileScaledObject) reconcileScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error { +func (r *ReconcileScaledObject) reconcileScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (string, error) { // Check scale target Name is specified if scaledObject.Spec.ScaleTargetRef.Name == "" { err := fmt.Errorf("ScaledObject.spec.scaleTargetRef.name is missing") - logger.Error(err, "Notified about ScaledObject with incorrect scaleTargetRef specification") - return err + return "ScaledObject doesn't have correct scaleTargetRef specification", err } // Check the label needed for Metrics servers is present on ScaledObject err := r.ensureScaledObjectLabel(logger, scaledObject) if err != nil { - logger.Error(err, "Failed to update ScaledObject with scaledObjectName label") - return err + return "Failed to update ScaledObject with scaledObjectName label", err } // Check if resource targeted for scaling exists and exposes /scale subresource gvkr, err := r.checkTargetResourceIsScalable(logger, scaledObject) if err != nil { - logger.Error(err, "Notified about ScaledObject with incorrect scaleTargetRef specification") - return err + return "ScaledObject doesn't have correct scaleTargetRef specification", err } // Create a new HPA or update existing one according to ScaledObject newHPACreated, err := r.ensureHPAForScaledObjectExists(logger, scaledObject, &gvkr) if err != nil { - logger.Error(err, "Failed to reconcile HPA for ScaledObject") - return err + return "Failed to reconcile HPA for ScaledObject", err } scaleObjectSpecChanged := false if !newHPACreated { @@ -193,20 +206,18 @@ func (r *ReconcileScaledObject) reconcileScaledObject(logger logr.Logger, scaled // (we can omit this check if a new HPA was created, which fires new ScaleLoop anyway) scaleObjectSpecChanged, err = r.scaledObjectGenerationChanged(logger, scaledObject) if err != nil { - logger.Error(err, "Failed to check whether ScaledObject's Generation was changed") - return err + return "Failed to check whether ScaledObject's Generation was changed", err } } // Notify ScaleHandler if a new HPA was created or if ScaledObject was updated if newHPACreated || scaleObjectSpecChanged { if r.requestScaleLoop(logger, scaledObject) != nil { - logger.Error(err, "Failed to start a new ScaleLoop") - return err + return "Failed to start a new scale loop with scaling logic", err } } - return nil + return "ScaledObject is defined correctly and is ready for scaling", nil } // ensureScaledObjectLabel ensures that scaledObjectName= label exist in the ScaledObject @@ -235,7 +246,8 @@ func (r *ReconcileScaledObject) checkTargetResourceIsScalable(logger logr.Logger logger.Error(err, "Failed to parse Group, Version, Kind, Resource", "apiVersion", scaledObject.Spec.ScaleTargetRef.ApiVersion, "kind", scaledObject.Spec.ScaleTargetRef.Kind) return gvkr, err } - logger.V(1).Info("Parsed Group, Version, Kind, Resource", "GVK", gvkr.GVKString(), "Resource", gvkr.Resource) + gvkString := gvkr.GVKString() + logger.V(1).Info("Parsed Group, Version, Kind, Resource", "GVK", gvkString, "Resource", gvkr.Resource) // let's try to detect /scale subresource _, errScale := (*r.scaleClient).Scales(scaledObject.Namespace).Get(gvkr.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name) @@ -245,23 +257,26 @@ func (r *ReconcileScaledObject) checkTargetResourceIsScalable(logger logr.Logger unstruct.SetGroupVersionKind(gvkr.GroupVersionKind()) if err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, unstruct); err != nil { // resource doesn't exist - logger.Error(err, "Target resource doesn't exist", "resource", gvkr.GVKString(), "name", scaledObject.Spec.ScaleTargetRef.Name) + logger.Error(err, "Target resource doesn't exist", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name) return gvkr, err } else { // resource exist but doesn't expose /scale subresource - logger.Error(errScale, "Target resource doesn't expose /scale subresource", "resource", gvkr.GVKString(), "name", scaledObject.Spec.ScaleTargetRef.Name) + logger.Error(errScale, "Target resource doesn't expose /scale subresource", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name) return gvkr, errScale } } - // store discovered GVK and GVKR into the Status - scaledObject.Status.ScaleTargetKind = gvkr.GVKString() - scaledObject.Status.ScaleTargetGVKR = &gvkr - if err = r.client.Status().Update(context.TODO(), scaledObject); err != nil { - logger.Error(err, "Failed to update ScaledObject.Status", "scaledObject.Status.ScaleTargetKind", gvkr.GVKString(), "scaledObject.Status.ScaleTargetGVKR", gvkr) + // store discovered GVK and GVKR into the Status if it is not present already + if scaledObject.Status.ScaleTargetKind != gvkString { + status := scaledObject.Status.DeepCopy() + status.ScaleTargetKind = gvkString + status.ScaleTargetGVKR = &gvkr + if err := kedacontrollerutil.UpdateScaledObjectStatus(r.client, logger, scaledObject, status); err != nil { + return gvkr, err + } + logger.Info("Detected resource targeted for scaling", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name) } - logger.Info("Detected resource targeted for scaling", "resource", gvkr.GVKString(), "name", scaledObject.Spec.ScaleTargetRef.Name) return gvkr, nil } diff --git a/pkg/controller/util/util.go b/pkg/controller/util/util.go new file mode 100644 index 00000000000..dbd5e63dde3 --- /dev/null +++ b/pkg/controller/util/util.go @@ -0,0 +1,46 @@ +package util + +import ( + "context" + "fmt" + + kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/runtime" + runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +func SetStatusConditions(client runtimeclient.Client, logger logr.Logger, object interface{}, conditions *kedav1alpha1.Conditions) error { + var patch runtimeclient.Patch + + runtimeObj := object.(runtime.Object) + switch obj := runtimeObj.(type) { + case *kedav1alpha1.ScaledObject: + patch = runtimeclient.MergeFrom(obj.DeepCopy()) + obj.Status.Conditions = *conditions + case *kedav1alpha1.ScaledJob: + patch = runtimeclient.MergeFrom(obj.DeepCopy()) + obj.Status.Conditions = *conditions + default: + err := fmt.Errorf("Unknown scalable object type %v", obj) + logger.Error(err, "Failed to patch Objects Status with Conditions") + return err + } + + err := client.Status().Patch(context.TODO(), runtimeObj, patch) + if err != nil { + logger.Error(err, "Failed to patch Objects Status with Conditions") + } + return err +} + +func UpdateScaledObjectStatus(client runtimeclient.Client, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus) error { + patch := runtimeclient.MergeFrom(scaledObject.DeepCopy()) + scaledObject.Status = *status + err := client.Status().Patch(context.TODO(), scaledObject, patch) + if err != nil { + logger.Error(err, "Failed to patch ScaledObjects Status") + } + return err +} diff --git a/pkg/scaling/executor/scale_executor.go b/pkg/scaling/executor/scale_executor.go index 45f7638ca67..c8d099f63d9 100644 --- a/pkg/scaling/executor/scale_executor.go +++ b/pkg/scaling/executor/scale_executor.go @@ -2,6 +2,7 @@ package executor import ( "context" + "fmt" kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1" "github.com/kedacore/keda/pkg/scalers" @@ -87,3 +88,27 @@ func (e *scaleExecutor) updateLastActiveTime(ctx context.Context, object interfa } return nil } + +func (e *scaleExecutor) setActiveCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, mesage string) error { + var patch client.Patch + + runtimeObj := object.(runtime.Object) + switch obj := runtimeObj.(type) { + case *kedav1alpha1.ScaledObject: + patch = client.MergeFrom(obj.DeepCopy()) + obj.Status.Conditions.SetActiveCondition(status, reason, mesage) + case *kedav1alpha1.ScaledJob: + patch = client.MergeFrom(obj.DeepCopy()) + obj.Status.Conditions.SetActiveCondition(status, reason, mesage) + default: + err := fmt.Errorf("Unknown scalable object type %v", obj) + logger.Error(err, "Failed to patch Objects Status") + return err + } + + err := e.client.Status().Patch(ctx, runtimeObj, patch) + if err != nil { + logger.Error(err, "Failed to patch Objects Status") + } + return err +} diff --git a/pkg/scaling/executor/scale_scaledobjects.go b/pkg/scaling/executor/scale_scaledobjects.go index be980b70375..280392cc4bf 100644 --- a/pkg/scaling/executor/scale_scaledobjects.go +++ b/pkg/scaling/executor/scale_scaledobjects.go @@ -9,6 +9,7 @@ import ( "github.com/go-logr/logr" autoscalingv1 "k8s.io/api/autoscaling/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func (e *scaleExecutor) RequestScale(ctx context.Context, scalers []scalers.Scaler, scaledObject *kedav1alpha1.ScaledObject) { @@ -47,7 +48,7 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scalers []scalers.Scal // There is no minimum configured or minimum is set to ZERO. HPA will handles other scale down operations // Try to scale it down. - e.scaleToZero(logger, scaledObject, currentScale) + e.scaleToZero(ctx, logger, scaledObject, currentScale) } else if !isActive && scaledObject.Spec.MinReplicaCount != nil && currentScale.Spec.Replicas < *scaledObject.Spec.MinReplicaCount { @@ -69,11 +70,21 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scalers []scalers.Scal } else { logger.V(1).Info("ScaleTarget no change") } + + condition := scaledObject.Status.Conditions.GetActiveCondition() + if condition.IsUnknown() || condition.IsTrue() != isActive { + if isActive { + e.setActiveCondition(ctx, logger, scaledObject, metav1.ConditionTrue, "ScalerActive", "Scaling is performed because triggers are active") + } else { + e.setActiveCondition(ctx, logger, scaledObject, metav1.ConditionFalse, "ScalerNotActive", "Scaling is not performed because triggers are not active") + } + } + } // An object will be scaled down to 0 only if it's passed its cooldown period // or if LastActiveTime is nil -func (e *scaleExecutor) scaleToZero(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale) { +func (e *scaleExecutor) scaleToZero(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale) { var cooldownPeriod time.Duration if scaledObject.Spec.CooldownPeriod != nil { @@ -91,11 +102,17 @@ func (e *scaleExecutor) scaleToZero(logger logr.Logger, scaledObject *kedav1alph err := e.updateScaleOnScaleTarget(scaledObject, scale) if err == nil { logger.Info("Successfully scaled ScaleTarget to 0 replicas") + e.setActiveCondition(ctx, logger, scaledObject, metav1.ConditionFalse, "ScalerNotActive", "Scaling is not performed because triggers are not active") } } else { logger.V(1).Info("ScaleTarget cooling down", "LastActiveTime", scaledObject.Status.LastActiveTime, "CoolDownPeriod", cooldownPeriod) + + activeCondition := scaledObject.Status.Conditions.GetActiveCondition() + if !activeCondition.IsFalse() || activeCondition.Reason != "ScalerCooldown" { + e.setActiveCondition(ctx, logger, scaledObject, metav1.ConditionFalse, "ScalerCooldown", "Scaler cooling down because triggers are not active") + } } }