Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] Implement the scaledjob controler for v2 #945

Merged
merged 5 commits into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,8 @@ k8s.io/apimachinery v0.0.0-20191121175448-79c2a76c473a/go.mod h1:b9qmWdKlLuU9EBh
k8s.io/apimachinery v0.18.0/go.mod h1:9SnR/e11v5IbyPCGbvJViimtJ0SwHG4nfZFjU77ftcA=
k8s.io/apimachinery v0.18.2 h1:44CmtbmkzVDAhCpRVSiP2R5PPrC2RtlIv/MoB8xpdRA=
k8s.io/apimachinery v0.18.2/go.mod h1:9SnR/e11v5IbyPCGbvJViimtJ0SwHG4nfZFjU77ftcA=
k8s.io/apimachinery v0.18.5 h1:Lh6tgsM9FMkC12K5T5QjRm7rDs6aQN5JHkA0JomULDM=
k8s.io/apimachinery v0.18.6 h1:RtFHnfGNfd1N0LeSrKCUznz5xtUP1elRGvHJbL3Ntag=
k8s.io/apiserver v0.18.2 h1:fwKxdTWwwYhxvtjo0UUfX+/fsitsNtfErPNegH2x9ic=
k8s.io/apiserver v0.18.2/go.mod h1:Xbh066NqrZO8cbsoenCwyDJ1OSi8Ag8I2lezeHxzwzw=
k8s.io/autoscaler v0.0.0-20190607113959-1b4f1855cb8e/go.mod h1:QEXezc9uKPT91dwqhSJq3GNI3B1HxFRQHiku9kmrsSA=
Expand Down Expand Up @@ -1357,6 +1359,7 @@ k8s.io/kubernetes v1.13.0/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
k8s.io/metrics v0.18.0/go.mod h1:8aYTW18koXqjLVKL7Ds05RPMX9ipJZI3mywYvBOxXd4=
k8s.io/metrics v0.18.2 h1:v4J7WKu/Zo/htSH3w//UWJZT9/CpUThXWYyUbQ/F/jY=
k8s.io/metrics v0.18.2/go.mod h1:qga8E7QfYNR9Q89cSCAjinC9pTZ7yv1XSVGUB0vJypg=
k8s.io/metrics v0.18.6 h1:IRMCn0KKNhbOSnxNZ+MhooRi8c67iIMjpGkKpm6oqOM=
k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0=
k8s.io/utils v0.0.0-20190801114015-581e00157fb1/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
Expand Down
189 changes: 132 additions & 57 deletions pkg/controller/scaledjob/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,27 @@ package scaledjob

import (
"context"
"fmt"

"github.com/go-logr/logr"
kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1"

"github.com/kedacore/keda/pkg/scaling"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/errors"
//metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

//"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"

//"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
Expand All @@ -34,7 +42,10 @@ func Add(mgr manager.Manager) error {

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileScaledJob{client: mgr.GetClient(), scheme: mgr.GetScheme()}
return &ReconcileScaledJob{
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
scaleHandler: scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme())}
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
Expand All @@ -46,17 +57,15 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}

// Watch for changes to primary resource ScaledJob
err = c.Watch(&source.Kind{Type: &kedav1alpha1.ScaledJob{}}, &handler.EnqueueRequestForObject{})
if err != nil {
return err
}

// TODO(user): Modify this to be the types you create that are owned by the primary resource
// Watch for changes to secondary resource Pods and requeue the owner ScaledJob
// err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
// IsController: true,
// OwnerType: &kedav1alpha1.ScaledJob{},
// })
err = c.Watch(&source.Kind{Type: &kedav1alpha1.ScaledJob{}},
&handler.EnqueueRequestForObject{},
predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
// Ignore updates to ScaledObject Status (in this case metadata.Generation does not change)
// so reconcile loop is not started on Status updates
return e.MetaOld.GetGeneration() != e.MetaNew.GetGeneration()
},
})
if err != nil {
return err
}
Expand All @@ -71,8 +80,9 @@ var _ reconcile.Reconciler = &ReconcileScaledJob{}
type ReconcileScaledJob struct {
// This client, initialized using mgr.Client() above, is a split client
// that reads objects from the cache and writes to the apiserver
client client.Client
scheme *runtime.Scheme
client client.Client
scheme *runtime.Scheme
scaleHandler scaling.ScaleHandler
}

// Reconcile reads that state of the cluster for a ScaledJob object and makes changes based on the state read
Expand All @@ -84,11 +94,10 @@ type ReconcileScaledJob struct {
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (r *ReconcileScaledJob) Reconcile(request reconcile.Request) (reconcile.Result, error) {
reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
reqLogger.Info("Reconciling ScaledJob")

// Fetch the ScaledJob instance
instance := &kedav1alpha1.ScaledJob{}
err := r.client.Get(context.TODO(), request.NamespacedName, instance)
scaledJob := &kedav1alpha1.ScaledJob{}
err := r.client.Get(context.TODO(), request.NamespacedName, scaledJob)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
Expand All @@ -97,48 +106,114 @@ func (r *ReconcileScaledJob) Reconcile(request reconcile.Request) (reconcile.Res
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
reqLogger.Error(err, "Failed to get ScaleJob")
return reconcile.Result{}, err
}

reqLogger.Info("Reconciling ScaledJob is NOT IMPLEMENTED yet")
reqLogger.Info("Reconciling ScaledJob")

isScaledJobMarkedToBeDeleted := scaledJob.GetDeletionTimestamp() != nil
if isScaledJobMarkedToBeDeleted {
if contains(scaledJob.GetFinalizers(), scaledJobFinalizer) {
// Run finalization logic for scaledJobFinalizer. If the
// finalization logic fails, don't remove the finalizer so
// that we can retry during the next reconciliation.
if err := r.finalizeScaledJob(reqLogger, scaledJob); err != nil {
return reconcile.Result{}, err
}

// Remove scaledJobFinalizer. Once all finalizers have been
// removed, the object will be deleted.
scaledJob.SetFinalizers(remove(scaledJob.GetFinalizers(), scaledJobFinalizer))
err := r.client.Update(context.TODO(), scaledJob)
if err != nil {
return reconcile.Result{}, err
}
}
return reconcile.Result{}, nil
}

if !contains(scaledJob.GetFinalizers(), scaledJobFinalizer) {
if err := r.addFinalizer(reqLogger, scaledJob); err != nil {
return reconcile.Result{}, err
}
}

var errMsg string
if scaledJob.Spec.JobTargetRef != nil {
reqLogger.Info("Detected ScaleType = Job")
conditions := scaledJob.Status.Conditions.DeepCopy()
msg, err := r.reconcileScaledJob(reqLogger, scaledJob)
if err != nil {
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed")
} else {
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg)
}

return reconcile.Result{}, err
}

return reconcile.Result{}, nil
errMsg = "scaledJob.Spec.JobTargetRef is not set"
err = fmt.Errorf(errMsg)
reqLogger.Error(err, "scaledJob.Spec.JobTargetRef not found")
return reconcile.Result{}, err
}

// FIXME use ScaledJob
// reconcileJobType implemets reconciler logic for K8s Jobs based ScaleObject
// func (r *ReconcileScaledObject) reconcileJobType(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) ( error) {
// // scaledObject.Spec.ScaleType = kedav1alpha1.ScaleTypeJob

// // Delete Jobs owned by the previous version of the ScaledObject
// opts := []client.ListOption{
// client.InNamespace(scaledObject.GetNamespace()),
// client.MatchingLabels(map[string]string{"scaledobject": scaledObject.GetName()}),
// }
// jobs := &batchv1.JobList{}
// err := r.client.List(context.TODO(), jobs, opts...)
// if err != nil {
// logger.Error(err, "Cannot get list of Jobs owned by this ScaledObject")
// return err
// }

// if jobs.Size() > 0 {
// logger.Info("Deleting jobs owned by the previous version of the ScaledObject", "Number of jobs to delete", jobs.Size())
// }
// for _, job := range jobs.Items {
// err = r.client.Delete(context.TODO(), &job, client.PropagationPolicy(metav1.DeletePropagationBackground))
// if err != nil {
// logger.Error(err, "Not able to delete job", "Job", job.Name)
// return err
// }
// }

// // ScaledObject was created or modified - let's start a new ScaleLoop
// err = r.startScaleLoop(logger, scaledObject)
// if err != nil {
// logger.Error(err, "Failed to start a new ScaleLoop")
// return err
// }

// return nil
// }
func (r *ReconcileScaledJob) reconcileScaledJob(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {

msg, err := r.deletePreviousVersionScaleJobs(logger, scaledJob)
if err != nil {
return msg, err
}

// scaledJob was created or modified - let's start a new ScaleLoop
err = r.requestScaleLoop(logger, scaledJob)
if err != nil {
return "Failed to start a new scale loop with scaling logic", err
} else {
logger.Info("Initializing Scaling logic according to ScaledObject Specification")
}

return "ScaledJob is defined correctly and is ready to scaling", nil
}

// Delete Jobs owned by the previous version of the scaledJob
func (r *ReconcileScaledJob) deletePreviousVersionScaleJobs(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
opts := []client.ListOption{
client.InNamespace(scaledJob.GetNamespace()),
client.MatchingLabels(map[string]string{"scaledJob": scaledJob.GetName()}),
}
jobs := &batchv1.JobList{}
err := r.client.List(context.TODO(), jobs, opts...)
if err != nil {
return "Cannot get list of Jobs owned by this scaledJob", err
}

if jobs.Size() > 0 {
logger.Info("Deleting jobs owned by the previous version of the scaledJob", "Number of jobs to delete", jobs.Size())
}
for _, job := range jobs.Items {
err = r.client.Delete(context.TODO(), &job, client.PropagationPolicy(metav1.DeletePropagationBackground))
if err != nil {
return "Not able to delete job: " + job.Name, err
}
}

return fmt.Sprintf("Deleted jobs owned by the previous version of the scaleJob: %d jobs deleted", jobs.Size()), nil
}

// requestScaleLoop request ScaleLoop handler for the respective ScaledJob
func (r *ReconcileScaledJob) requestScaleLoop(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {

logger.V(1).Info("Starting a new ScaleLoop")

if err := r.scaleHandler.HandleScalableObject(scaledJob); err != nil {
return err
}

return nil
}
51 changes: 51 additions & 0 deletions pkg/controller/scaledjob/scaledjob_finalizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package scaledjob

import (
"context"

"github.com/go-logr/logr"
kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1"
)

const (
scaledJobFinalizer = "finalizer.keda.sh"
)

// finalizescaledJob is stopping ScaleLoop for the respective ScaleJob
func (r *ReconcileScaledJob) finalizeScaledJob(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
// TODO implement finalize logic for ScaledJob
logger.Info("Successfully finalized ScaledJob")
return nil
}

// addFinalizer adds finalizer to the scaledJob
func (r *ReconcileScaledJob) addFinalizer(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
logger.Info("Adding Finalizer for the ScaledJob")
scaledJob.SetFinalizers(append(scaledJob.GetFinalizers(), scaledJobFinalizer))

// Update CR
err := r.client.Update(context.TODO(), scaledJob)
if err != nil {
logger.Error(err, "Failed to update ScaledJob with finalizer")
return err
}
return nil
}

func contains(list []string, s string) bool {
for _, v := range list {
if v == s {
return true
}
}
return false
}

func remove(list []string, s string) []string {
for i, v := range list {
if v == s {
list = append(list[:i], list[i+1:]...)
}
}
return list
}
24 changes: 23 additions & 1 deletion pkg/scalers/azure/azure_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package azure

import (
"context"

"github.com/Azure/azure-storage-queue-go/azqueue"
)

Expand All @@ -26,5 +27,26 @@ func GetAzureQueueLength(ctx context.Context, podIdentity string, connectionStri
return -1, err
}

return props.ApproximateMessagesCount(), nil
visibleMessageCount, err := getVisibleCount(&queueURL, 32)
if err != nil {
return -1, err
}
approximateMessageCount := props.ApproximateMessagesCount()

if visibleMessageCount == 32 {
return approximateMessageCount, nil
} else {
return visibleMessageCount, nil
}
}

func getVisibleCount(queueURL *azqueue.QueueURL, maxCount int32) (int32, error) {
messagesURL := queueURL.NewMessagesURL()
ctx := context.Background()
queue, err := messagesURL.Peek(ctx, maxCount)
if err != nil {
return 0, err
}
num := queue.NumMessages()
return num, nil
}
14 changes: 7 additions & 7 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al
}

func (e *scaleExecutor) createJobs(scaledJob *kedav1alpha1.ScaledJob, scaleTo int64, maxScale int64) {
// scaledObject.Spec.JobTargetRef.Template.GenerateName = scaledObject.GetName() + "-"
// if scaledObject.Spec.JobTargetRef.Template.Labels == nil {
// scaledObject.Spec.JobTargetRef.Template.Labels = map[string]string{}
// }
// scaledObject.Spec.JobTargetRef.Template.Labels["scaledobject"] = scaledObject.GetName()
scaledJob.Spec.JobTargetRef.Template.GenerateName = scaledJob.GetName() + "-"
if scaledJob.Spec.JobTargetRef.Template.Labels == nil {
scaledJob.Spec.JobTargetRef.Template.Labels = map[string]string{}
}
scaledJob.Spec.JobTargetRef.Template.Labels["scaledjob"] = scaledJob.GetName()

e.logger.Info("Creating jobs", "Effective number of max jobs", maxScale)

Expand All @@ -65,10 +65,10 @@ func (e *scaleExecutor) createJobs(scaledJob *kedav1alpha1.ScaledJob, scaleTo in
"app.kubernetes.io/version": version.Version,
"app.kubernetes.io/part-of": scaledJob.GetName(),
"app.kubernetes.io/managed-by": "keda-operator",
"scaledobject": scaledJob.GetName(),
"scaledjob": scaledJob.GetName(),
},
},
//Spec: *scaledObject.Spec.JobTargetRef.DeepCopy(),
Spec: *scaledJob.Spec.JobTargetRef.DeepCopy(),
}

// Job doesn't allow RestartPolicyAlways, it seems like this value is set by the client as a default one,
Expand Down
Loading