Skip to content

Commit

Permalink
Emit kubernetes events from KEDA
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmed ElSayed <ahmels@microsoft.com>
  • Loading branch information
ahmelsayed committed Jan 21, 2021
1 parent c53f4ff commit 42e9f8b
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- Add Redis cluster support for Redis list and Redis streams scalers ([#1437](https://github.com/kedacore/keda/pull/1437))
- Global authentication credentials can be managed using `ClusterTriggerAuthentication` objects ([#1452](https://github.com/kedacore/keda/pull/1452))
- Introducing OpenStack Swift scaler ([#1342](https://github.com/kedacore/keda/issues/1342))
- Emit Kubernetes Events on KEDA events ([#1523])

### Improvements
- Support add ScaledJob's label to its job ([#1311](https://github.com/kedacore/keda/issues/1311))
Expand Down
7 changes: 6 additions & 1 deletion adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"strconv"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -67,7 +70,9 @@ func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.Metric
return nil, fmt.Errorf("unable to construct new client (%s)", err)
}

handler := scaling.NewScaleHandler(kubeclient, nil, scheme, globalHTTPTimeout)
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: "keda-metrics-adapter"})
handler := scaling.NewScaleHandler(kubeclient, nil, scheme, globalHTTPTimeout, recorder)

namespace, err := getWatchNamespace()
if err != nil {
Expand Down
9 changes: 8 additions & 1 deletion controllers/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
"fmt"
"time"

"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"

"github.com/go-logr/logr"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -28,13 +32,14 @@ type ScaledJobReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
scaleHandler scaling.ScaleHandler
globalHTTPTimeout time.Duration
}

// SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.globalHTTPTimeout)
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.globalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"))

return ctrl.NewControllerManagedBy(mgr).
// Ignore updates to ScaledJob Status (in this case metadata.Generation does not change)
Expand Down Expand Up @@ -84,9 +89,11 @@ func (r *ScaledJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed")
r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.CheckFailed, msg)
} else {
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg)
r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.Ready, msg)
}

return ctrl.Result{}, err
Expand Down
4 changes: 4 additions & 0 deletions controllers/scaledjob_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package controllers
import (
"context"

"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"

"github.com/go-logr/logr"

kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
Expand Down Expand Up @@ -33,6 +36,7 @@ func (r *ScaledJobReconciler) finalizeScaledJob(logger logr.Logger, scaledJob *k
}

logger.Info("Successfully finalized ScaledJob")
r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.Deleted, "ScaledJob was deleted")
return nil
}

Expand Down
15 changes: 11 additions & 4 deletions controllers/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"sync"
"time"

"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"

"github.com/go-logr/logr"
autoscalingv1 "k8s.io/api/autoscaling/v1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
Expand Down Expand Up @@ -42,9 +46,10 @@ import (

// ScaledObjectReconciler reconciles a ScaledObject object
type ScaledObjectReconciler struct {
Log logr.Logger
Client client.Client
Scheme *runtime.Scheme
Log logr.Logger
Client client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder

scaleClient *scale.ScalesGetter
restMapper meta.RESTMapper
Expand Down Expand Up @@ -92,7 +97,7 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Init the rest of ScaledObjectReconciler
r.restMapper = mgr.GetRESTMapper()
r.scaledObjectsGenerations = &sync.Map{}
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme(), r.globalHTTPTimeout)
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme(), r.globalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"))

// Start controller
return ctrl.NewControllerManagedBy(mgr).
Expand Down Expand Up @@ -160,9 +165,11 @@ func (r *ScaledObjectReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnkownState", "ScaledObject check failed")
r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.CheckFailed, msg)
} else {
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledObjectReady", msg)
r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.Ready, msg)
}
if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledObject, &conditions); err != nil {
return ctrl.Result{}, err
Expand Down
4 changes: 4 additions & 0 deletions controllers/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package controllers
import (
"context"

"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -54,6 +57,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled
}

logger.Info("Successfully finalized ScaledObject")
r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.Deleted, "ScaledObject was deleted")
return nil
}

Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,7 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kedacore/keda v1.5.0 h1:c8xA1Vo3H7rPwFiWUX3CBXnjBSrbYDmUs9iEfDlf4bQ=
github.com/kelseyhightower/envconfig v1.3.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
Expand Down
14 changes: 8 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,19 @@ func main() {
}

if err = (&controllers.ScaledObjectReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("ScaledObject"),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("ScaledObject"),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("scaledobject-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ScaledObject")
os.Exit(1)
}
if err = (&controllers.ScaledJobReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("ScaledJob"),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("ScaledJob"),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("scaledjob-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ScaledJob")
os.Exit(1)
Expand Down
52 changes: 52 additions & 0 deletions pkg/eventreason/eventreason.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
Copyright 2020 The KEDA Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package eventreason

const (
// Ready is for event when ScaledObject or ScaledJob is ready
Ready = "Ready"

// CheckFailed is for event when ScaledObject or ScaledJob validation check failed
CheckFailed = "CheckFailed"

// Deleted is for event when ScaledObject or ScaledJob is deleted
Deleted = "Deleted"

// ScalersStarted is for event when scalers watch started for ScaledObject or ScaledJob
ScalersStarted = "ScalersStarted"

// ScalersRestarted is for event when scalers watch was restarted for ScaledObject or ScaledJob
ScalersRestarted = "ScalersRestarted"

// ScalersStopped is for event when scalers watch was stopped for ScaledObject or ScaledJob
ScalersStopped = "ScalersStopped"

// ScaleTargetActivated is for event when the scale target of ScaledObject was activated
ScaleTargetActivated = "ScaleTargetActivated"

// ScaleTargetDeactivated is for event when the scale target for ScaledObject was deactivated
ScaleTargetDeactivated = "ScaleTargetDeactivated"

// ScaleTargetActivationFailed is for event when the activation the scale target for ScaledObject fails
ScaleTargetActivationFailed = "ScaleTargetActivationFailed"

// ScaleTargetDeactivationFailed is for event when the deactivation of the scale target for ScaledObject fails
ScaleTargetDeactivationFailed = "ScaleTargetDeactivationFailed"

// JobsCreated is for event when jobs for ScaledJob are created
JobsCreated = "JobsCreated"
)
6 changes: 5 additions & 1 deletion pkg/scaling/executor/scale_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"k8s.io/client-go/tools/record"

"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -30,15 +32,17 @@ type scaleExecutor struct {
scaleClient *scale.ScalesGetter
reconcilerScheme *runtime.Scheme
logger logr.Logger
recorder record.EventRecorder
}

// NewScaleExecutor creates a ScaleExecutor object
func NewScaleExecutor(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme) ScaleExecutor {
func NewScaleExecutor(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme, recorder record.EventRecorder) ScaleExecutor {
return &scaleExecutor{
client: client,
scaleClient: scaleClient,
reconcilerScheme: reconcilerScheme,
logger: logf.Log.WithName("scaleexecutor"),
recorder: recorder,
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sort"
"strconv"

"github.com/kedacore/keda/v2/pkg/eventreason"

"github.com/go-logr/logr"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -108,6 +110,7 @@ func (e *scaleExecutor) createJobs(logger logr.Logger, scaledJob *kedav1alpha1.S
}
}
logger.Info("Created jobs", "Number of jobs", scaleTo)
e.recorder.Eventf(scaledJob, corev1.EventTypeNormal, eventreason.JobsCreated, "Created %d jobs", scaleTo)
}

func (e *scaleExecutor) isJobFinished(j *batchv1.Job) bool {
Expand Down
11 changes: 10 additions & 1 deletion pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"time"

"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
Expand Down Expand Up @@ -121,13 +124,16 @@ func (e *scaleExecutor) scaleToZero(ctx context.Context, logger logr.Logger, sca
if scaledObject.Status.LastActiveTime == nil ||
scaledObject.Status.LastActiveTime.Add(cooldownPeriod).Before(time.Now()) {
// or last time a trigger was active was > cooldown period, so scale down.
_, err := e.updateScaleOnScaleTarget(ctx, scaledObject, scale, 0)
currentReplicas, err := e.updateScaleOnScaleTarget(ctx, scaledObject, scale, 0)
if err == nil {
logger.Info("Successfully scaled ScaleTarget to 0 replicas")
e.recorder.Eventf(scaledObject, corev1.EventTypeNormal, eventreason.ScaleTargetDeactivated, "Deactivated %s %s/%s from %d to %d", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, 0)
if err := e.setActiveCondition(ctx, logger, scaledObject, metav1.ConditionFalse, "ScalerNotActive", "Scaling is not performed because triggers are not active"); err != nil {
logger.Error(err, "Error in setting active condition")
return
}
} else {
e.recorder.Eventf(scaledObject, corev1.EventTypeWarning, eventreason.ScaleTargetDeactivationFailed, "Failed to deactivated %s %s/%s", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, 0)
}
} else {
logger.V(1).Info("ScaleTarget cooling down",
Expand Down Expand Up @@ -158,12 +164,15 @@ func (e *scaleExecutor) scaleFromZero(ctx context.Context, logger logr.Logger, s
logger.Info("Successfully updated ScaleTarget",
"Original Replicas Count", currentReplicas,
"New Replicas Count", replicas)
e.recorder.Eventf(scaledObject, corev1.EventTypeNormal, eventreason.ScaleTargetActivated, "Scaled %s %s/%s from %d to %d", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, replicas)

// Scale was successful. Update lastScaleTime and lastActiveTime on the scaledObject
if err := e.updateLastActiveTime(ctx, logger, scaledObject); err != nil {
logger.Error(err, "Error in Updating lastScaleTime and lastActiveTime on the scaledObject")
return
}
} else {
e.recorder.Eventf(scaledObject, corev1.EventTypeWarning, eventreason.ScaleTargetActivationFailed, "Failed to scaled %s %s/%s from %d to %d", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, replicas)
}
}

Expand Down
13 changes: 11 additions & 2 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"sync"
"time"

"github.com/kedacore/keda/v2/pkg/eventreason"
"k8s.io/client-go/tools/record"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/autoscaling/v2beta2"
Expand Down Expand Up @@ -44,16 +47,18 @@ type scaleHandler struct {
scaleLoopContexts *sync.Map
scaleExecutor executor.ScaleExecutor
globalHTTPTimeout time.Duration
recorder record.EventRecorder
}

// NewScaleHandler creates a ScaleHandler object
func NewScaleHandler(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme, globalHTTPTimeout time.Duration) ScaleHandler {
func NewScaleHandler(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme, globalHTTPTimeout time.Duration, recorder record.EventRecorder) ScaleHandler {
return &scaleHandler{
client: client,
logger: logf.Log.WithName("scalehandler"),
scaleLoopContexts: &sync.Map{},
scaleExecutor: executor.NewScaleExecutor(client, scaleClient, reconcilerScheme),
scaleExecutor: executor.NewScaleExecutor(client, scaleClient, reconcilerScheme, recorder),
globalHTTPTimeout: globalHTTPTimeout,
recorder: recorder,
}
}

Expand Down Expand Up @@ -90,6 +95,9 @@ func (h *scaleHandler) HandleScalableObject(scalableObject interface{}) error {
cancelValue()
}
h.scaleLoopContexts.Store(key, cancel)
h.recorder.Event(withTriggers, corev1.EventTypeNormal, eventreason.ScalersRestarted, "Restarted scalers watch")
} else {
h.recorder.Event(withTriggers, corev1.EventTypeNormal, eventreason.ScalersStarted, "Started scalers watch")
}

// a mutex is used to synchronize scale requests per scalableObject
Expand All @@ -115,6 +123,7 @@ func (h *scaleHandler) DeleteScalableObject(scalableObject interface{}) error {
cancel()
}
h.scaleLoopContexts.Delete(key)
h.recorder.Event(withTriggers, corev1.EventTypeNormal, eventreason.ScalersStopped, "Stopped scalers watch")
} else {
h.logger.V(1).Info("ScaleObject was not found in controller cache", "key", key)
}
Expand Down

0 comments on commit 42e9f8b

Please sign in to comment.