From 6fcbd5216c84ca833e5fe588be333aa7fbadef3a Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Mon, 21 Jan 2019 15:05:06 +0800 Subject: [PATCH] Added suspend/resume. Signed-off-by: Klaus Ma --- cmd/cli/job.go | 51 ++++++++++++++++ cmd/cli/vkctl.go | 28 +-------- pkg/cli/job/list.go | 8 +-- pkg/cli/job/resume.go | 49 +++++++++++++++ pkg/cli/job/suspend.go | 49 +++++++++++++++ pkg/cli/job/util.go | 35 +++++++++++ pkg/controllers/job/job_controller.go | 8 ++- pkg/controllers/job/job_controller_actions.go | 61 +++++++++++++++++-- pkg/controllers/job/job_controller_handler.go | 41 +++++++------ pkg/controllers/job/state/pending.go | 14 +++++ 10 files changed, 285 insertions(+), 59 deletions(-) create mode 100644 cmd/cli/job.go create mode 100644 pkg/cli/job/resume.go create mode 100644 pkg/cli/job/suspend.go diff --git a/cmd/cli/job.go b/cmd/cli/job.go new file mode 100644 index 0000000000..100b337768 --- /dev/null +++ b/cmd/cli/job.go @@ -0,0 +1,51 @@ +package main + +import ( + "github.com/spf13/cobra" + + "hpw.cloud/volcano/pkg/cli/job" +) + +func buildJobCmd() *cobra.Command { + jobCmd := &cobra.Command{ + Use: "job", + } + + jobRunCmd := &cobra.Command{ + Use: "run", + Run: func(cmd *cobra.Command, args []string) { + checkError(cmd, job.RunJob()) + }, + } + job.InitRunFlags(jobRunCmd) + jobCmd.AddCommand(jobRunCmd) + + jobListCmd := &cobra.Command{ + Use: "list", + Run: func(cmd *cobra.Command, args []string) { + checkError(cmd, job.ListJobs()) + }, + } + job.InitListFlags(jobListCmd) + jobCmd.AddCommand(jobListCmd) + + jobSuspendCmd := &cobra.Command{ + Use: "suspend", + Run: func(cmd *cobra.Command, args []string) { + checkError(cmd, job.SuspendJob()) + }, + } + job.InitSuspendFlags(jobSuspendCmd) + jobCmd.AddCommand(jobSuspendCmd) + + jobResumeCmd := &cobra.Command{ + Use: "resume", + Run: func(cmd *cobra.Command, args []string) { + checkError(cmd, job.ResumeJob()) + }, + } + job.InitResumeFlags(jobResumeCmd) + jobCmd.AddCommand(jobResumeCmd) + + return jobCmd +} diff --git a/cmd/cli/vkctl.go b/cmd/cli/vkctl.go index f364b63d72..b86ca88c34 100644 --- a/cmd/cli/vkctl.go +++ b/cmd/cli/vkctl.go @@ -24,8 +24,6 @@ import ( "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/util/wait" - - "hpw.cloud/volcano/pkg/cli/job" ) var logFlushFreq = pflag.Duration("log-flush-frequency", 5*time.Second, "Maximum number of seconds between log flushes") @@ -38,32 +36,10 @@ func main() { defer glog.Flush() rootCmd := cobra.Command{ - Use: "vncli", - } - - jobCmd := &cobra.Command{ - Use: "job", - } - - jobRunCmd := &cobra.Command{ - Use: "run", - Run: func(cmd *cobra.Command, args []string) { - checkError(cmd, job.RunJob()) - }, - } - job.InitRunFlags(jobRunCmd) - jobCmd.AddCommand(jobRunCmd) - - jobListCmd := &cobra.Command{ - Use: "list", - Run: func(cmd *cobra.Command, args []string) { - checkError(cmd, job.ListJobs()) - }, + Use: "vkctl", } - job.InitListFlags(jobListCmd) - jobCmd.AddCommand(jobListCmd) - rootCmd.AddCommand(jobCmd) + rootCmd.AddCommand(buildJobCmd()) if err := rootCmd.Execute(); err != nil { fmt.Printf("Failed to execute command: %v", err) diff --git a/pkg/cli/job/list.go b/pkg/cli/job/list.go index 5f564f007d..1e2ac6d56e 100644 --- a/pkg/cli/job/list.go +++ b/pkg/cli/job/list.go @@ -56,16 +56,16 @@ func ListJobs() error { return nil } - fmt.Printf("%-30s%-25s%-12s%-8s%-12s%-12s%-12s%-12s\n", - "Name", "Creation", "Replicas", "Min", "Pending", "Running", "Succeeded", "Failed") + fmt.Printf("%-25s%-25s%-12s%-12s%-6s%-10s%-10s%-12s%-10s\n", + "Name", "Creation", "Phase", "Replicas", "Min", "Pending", "Running", "Succeeded", "Failed") for _, job := range jobs.Items { replicas := int32(0) for _, ts := range job.Spec.Tasks { replicas += ts.Replicas } - fmt.Printf("%-30s%-25s%-12d%-8d%-12d%-12d%-12d%-12d\n", - job.Name, job.CreationTimestamp.Format("2006-01-02 15:04:05"), replicas, + fmt.Printf("%-25s%-25s%-12s%-12d%-6d%-10d%-10d%-12d%-10d\n", + job.Name, job.CreationTimestamp.Format("2006-01-02 15:04:05"), job.Status.State.Phase, replicas, job.Status.MinAvailable, job.Status.Pending, job.Status.Running, job.Status.Succeeded, job.Status.Failed) } diff --git a/pkg/cli/job/resume.go b/pkg/cli/job/resume.go new file mode 100644 index 0000000000..ae618672e3 --- /dev/null +++ b/pkg/cli/job/resume.go @@ -0,0 +1,49 @@ +/* +Copyright 2018 The Vulcan Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package job + +import ( + "github.com/spf13/cobra" + + "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" +) + +type resumeFlags struct { + commonFlags + + Namespace string + JobName string +} + +var resumeJobFlags = &resumeFlags{} + +func InitResumeFlags(cmd *cobra.Command) { + initFlags(cmd, &resumeJobFlags.commonFlags) + + cmd.Flags().StringVarP(&resumeJobFlags.Namespace, "namespace", "", "default", "the namespace of job") + cmd.Flags().StringVarP(&resumeJobFlags.JobName, "name", "n", "", "the name of job") +} + +func ResumeJob() error { + config, err := buildConfig(resumeJobFlags.Master, resumeJobFlags.Kubeconfig) + if err != nil { + return err + } + + return createJobCommand(config, + resumeJobFlags.Namespace, resumeJobFlags.JobName, + v1alpha1.ResumeJobAction) +} diff --git a/pkg/cli/job/suspend.go b/pkg/cli/job/suspend.go new file mode 100644 index 0000000000..51ee52eb5d --- /dev/null +++ b/pkg/cli/job/suspend.go @@ -0,0 +1,49 @@ +/* +Copyright 2018 The Vulcan Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package job + +import ( + "github.com/spf13/cobra" + + "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" +) + +type suspendFlags struct { + commonFlags + + Namespace string + JobName string +} + +var suspendJobFlags = &suspendFlags{} + +func InitSuspendFlags(cmd *cobra.Command) { + initFlags(cmd, &suspendJobFlags.commonFlags) + + cmd.Flags().StringVarP(&suspendJobFlags.Namespace, "namespace", "", "default", "the namespace of job") + cmd.Flags().StringVarP(&suspendJobFlags.JobName, "name", "n", "", "the name of job") +} + +func SuspendJob() error { + config, err := buildConfig(suspendJobFlags.Master, suspendJobFlags.Kubeconfig) + if err != nil { + return err + } + + return createJobCommand(config, + suspendJobFlags.Namespace, suspendJobFlags.JobName, + v1alpha1.AbortJobAction) +} diff --git a/pkg/cli/job/util.go b/pkg/cli/job/util.go index 7c0bc26741..05c7d0e2b7 100644 --- a/pkg/cli/job/util.go +++ b/pkg/cli/job/util.go @@ -20,10 +20,17 @@ import ( "os" "strings" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + + vkbatchv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" + vkbusv1 "hpw.cloud/volcano/pkg/apis/bus/v1alpha1" + "hpw.cloud/volcano/pkg/apis/helpers" + "hpw.cloud/volcano/pkg/client/clientset/versioned" ) func homeDir() string { @@ -61,3 +68,31 @@ func populateResourceListV1(spec string) (v1.ResourceList, error) { } return result, nil } + +func createJobCommand(config *rest.Config, ns, name string, action vkbatchv1.Action) error { + jobClient := versioned.NewForConfigOrDie(config) + job, err := jobClient.BatchV1alpha1().Jobs(ns).Get(name, metav1.GetOptions{}) + if err != nil { + return err + } + + ctrlRef := metav1.NewControllerRef(job, helpers.JobKind) + cmd := &vkbusv1.Command{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: fmt.Sprintf("%s-%s-", + job.Name, strings.ToLower(string(action))), + Namespace: job.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *ctrlRef, + }, + }, + TargetObject: ctrlRef, + Action: string(action), + } + + if _, err := jobClient.BusV1alpha1().Commands(ns).Create(cmd); err != nil { + return err + } + + return nil +} diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index 5a9790250b..992c73d1a5 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -18,7 +18,6 @@ package job import ( "fmt" - "github.com/golang/glog" "k8s.io/api/core/v1" @@ -114,8 +113,8 @@ func NewJobController(config *rest.Config) *Controller { case *v1corev1.Command: return helpers.ControlledBy(t, helpers.JobKind) case cache.DeletedFinalStateUnknown: - if pod, ok := t.Obj.(*v1corev1.Command); ok { - return helpers.ControlledBy(pod, helpers.JobKind) + if cmd, ok := t.Obj.(*v1corev1.Command); ok { + return helpers.ControlledBy(cmd, helpers.JobKind) } runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod", obj)) return false @@ -237,6 +236,9 @@ func (cc *Controller) worker() { action = applyPolicies(req.Event, job, pod) } + glog.V(3).Infof("Execute <%v> on Job <%s/%s> in <%s> by <%T>.", + action, req.Namespace, req.JobName, job.Status.State.Phase, st) + if err := st.Execute(action, req.Reason, req.Message); err != nil { glog.Errorf("Failed to handle Job <%s/%s>: %v", job.Namespace, job.Name, err) diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index c3ceb1d535..3901bbc96e 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -19,12 +19,14 @@ package job import ( "fmt" "sync" + "time" "github.com/golang/glog" "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" @@ -34,6 +36,9 @@ import ( ) func (cc *Controller) killJob(job *vkv1.Job, nextState state.NextStateFn) error { + glog.V(3).Infof("Killing Job <%s/%s>", job.Namespace, job.Name) + defer glog.V(3).Infof("Finished Job <%s/%s> killing", job.Namespace, job.Name) + job, err := cc.jobLister.Jobs(job.Namespace).Get(job.Name) if err != nil { if apierrors.IsNotFound(err) { @@ -93,6 +98,8 @@ func (cc *Controller) killJob(job *vkv1.Job, nextState state.NextStateFn) error } job.Status = vkv1.JobStatus{ + State: job.Status.State, + Pending: pending, Running: running, Succeeded: succeeded, @@ -100,9 +107,11 @@ func (cc *Controller) killJob(job *vkv1.Job, nextState state.NextStateFn) error Terminating: terminating, MinAvailable: int32(job.Spec.MinAvailable), } + if nextState != nil { job.Status.State = nextState(job.Status) } + newState := job.Status.State // Update Job status if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job); err != nil { @@ -111,18 +120,27 @@ func (cc *Controller) killJob(job *vkv1.Job, nextState state.NextStateFn) error return err } + if err := cc.waitForJobState(job, newState); err != nil { + glog.Errorf("Failed to sync Job's status.") + return err + } + // Delete PodGroup if err := cc.kbClients.SchedulingV1alpha1().PodGroups(job.Namespace).Delete(job.Name, nil); err != nil { - glog.Errorf("Failed to delete PodGroup of Job %v/%v: %v", - job.Namespace, job.Name, err) - return err + if !apierrors.IsNotFound(err) { + glog.Errorf("Failed to delete PodGroup of Job %v/%v: %v", + job.Namespace, job.Name, err) + return err + } } // Delete Service if err := cc.kubeClients.CoreV1().Services(job.Namespace).Delete(job.Name, nil); err != nil { - glog.Errorf("Failed to delete Service of Job %v/%v: %v", - job.Namespace, job.Name, err) - return err + if !apierrors.IsNotFound(err) { + glog.Errorf("Failed to delete Service of Job %v/%v: %v", + job.Namespace, job.Name, err) + return err + } } // NOTE(k82cn): DO NOT delete input/output until job is deleted. @@ -131,6 +149,9 @@ func (cc *Controller) killJob(job *vkv1.Job, nextState state.NextStateFn) error } func (cc *Controller) syncJob(job *vkv1.Job, nextState state.NextStateFn) error { + glog.V(3).Infof("Starting to sync up Job <%s/%s>", job.Namespace, job.Name) + defer glog.V(3).Infof("Finished Job <%s/%s> sync up", job.Namespace, job.Name) + job, err := cc.jobLister.Jobs(job.Namespace).Get(job.Name) if err != nil { if apierrors.IsNotFound(err) { @@ -275,6 +296,8 @@ func (cc *Controller) syncJob(job *vkv1.Job, nextState state.NextStateFn) error } job.Status = vkv1.JobStatus{ + State: job.Status.State, + Pending: pending, Running: running, Succeeded: succeeded, @@ -286,6 +309,7 @@ func (cc *Controller) syncJob(job *vkv1.Job, nextState state.NextStateFn) error if nextState != nil { job.Status.State = nextState(job.Status) } + newState := job.Status.State if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job); err != nil { glog.Errorf("Failed to update status of Job %v/%v: %v", @@ -293,6 +317,10 @@ func (cc *Controller) syncJob(job *vkv1.Job, nextState state.NextStateFn) error return err } + if err := cc.waitForJobState(job, newState); err != nil { + return err + } + return err } @@ -431,3 +459,24 @@ func (cc *Controller) createPodGroupIfNotExist(job *vkv1.Job) error { return nil } + +// waitForJobState will wait for job's state changed to the target status. +// TODO(k82cn): enhance by cache/assume +func (cc *Controller) waitForJobState(job *vkv1.Job, newState vkv1.JobState) error { + if err := wait.Poll(100*time.Microsecond, 10*time.Second, func() (bool, error) { + newJob, err := cc.jobLister.Jobs(job.Namespace).Get(job.Name) + if err != nil { + return false, err + } + + if newJob.Status.State == newState { + return true, nil + } + + return false, nil + }); err != nil { + return err + } + + return nil +} diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index e6b7b4ba10..7e7189baa2 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -17,8 +17,6 @@ limitations under the License. package job import ( - "reflect" - "github.com/golang/glog" "k8s.io/api/core/v1" @@ -43,10 +41,21 @@ func (cc *Controller) addCommand(obj interface{}) { Action: vkbatchv1.Action(cmd.Action), } + glog.V(3).Infof("Try to execute command <%v> on Job <%s/%s>", + cmd.Action, req.Namespace, req.JobName) + if err := cc.eventQueue.Add(req); err != nil { glog.Errorf("Failed to add request <%v> into queue: %v", req, err) } + + go func() { + if err := cc.vkClients.BusV1alpha1().Commands(cmd.Namespace).Delete(cmd.Name, nil); err != nil { + glog.Errorf("Failed to delete Command <%s/%s> which maybe executed again.", + cmd.Namespace, cmd.Name) + } + }() + } func (cc *Controller) addJob(obj interface{}) { @@ -76,25 +85,18 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) { return } - oldJob, ok := oldObj.(*vkbatchv1.Job) - if !ok { - glog.Errorf("oldObj is not Job") - return - } - - if !reflect.DeepEqual(oldJob.Spec, newJob.Spec) { - req := &Request{ - Namespace: newJob.Namespace, - JobName: newJob.Name, + req := &Request{ + Namespace: newJob.Namespace, + JobName: newJob.Name, - Event: vkbatchv1.OutOfSyncEvent, - } + Event: vkbatchv1.OutOfSyncEvent, + } - if err := cc.eventQueue.Add(req); err != nil { - glog.Errorf("Failed to add request <%v> into queue: %v", - req, err) - } + if err := cc.eventQueue.Add(req); err != nil { + glog.Errorf("Failed to add request <%v> into queue: %v", + req, err) } + } func (cc *Controller) deleteJob(obj interface{}) { @@ -216,5 +218,4 @@ func (cc *Controller) deletePod(obj interface{}) { } } - -// TODO(k82cn): add handler for PodGroup unschedulable event. \ No newline at end of file +// TODO(k82cn): add handler for PodGroup unschedulable event. diff --git a/pkg/controllers/job/state/pending.go b/pkg/controllers/job/state/pending.go index 6b03e8dfeb..ac6eabc86c 100644 --- a/pkg/controllers/job/state/pending.go +++ b/pkg/controllers/job/state/pending.go @@ -33,6 +33,20 @@ func (ps *pendingState) Execute(action vkv1.Action, reason string, msg string) e phase = vkv1.Restarting } + return vkv1.JobState{ + Phase: phase, + Reason: reason, + Message: msg, + } + }) + + case vkv1.AbortJobAction: + return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { + phase := vkv1.Pending + if status.Terminating != 0 { + phase = vkv1.Aborting + } + return vkv1.JobState{ Phase: phase, Reason: reason,